Skip to content

Commit 2d155cc

Browse files
feat(mcl-upgrade): implement resume & urn pagination (datahub-project#11889)
1 parent d978857 commit 2d155cc

File tree

4 files changed

+248
-46
lines changed

4 files changed

+248
-46
lines changed

datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/AbstractMCLStep.java

Lines changed: 94 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,12 @@
11
package com.linkedin.datahub.upgrade.system;
22

3-
import static com.linkedin.metadata.Constants.DATA_HUB_UPGRADE_RESULT_ASPECT_NAME;
4-
53
import com.linkedin.common.urn.Urn;
64
import com.linkedin.datahub.upgrade.UpgradeContext;
75
import com.linkedin.datahub.upgrade.UpgradeStep;
86
import com.linkedin.datahub.upgrade.UpgradeStepResult;
97
import com.linkedin.datahub.upgrade.impl.DefaultUpgradeStepResult;
108
import com.linkedin.events.metadata.ChangeType;
9+
import com.linkedin.metadata.aspect.SystemAspect;
1110
import com.linkedin.metadata.boot.BootstrapStep;
1211
import com.linkedin.metadata.entity.AspectDao;
1312
import com.linkedin.metadata.entity.EntityService;
@@ -16,10 +15,13 @@
1615
import com.linkedin.metadata.entity.ebean.PartitionedStream;
1716
import com.linkedin.metadata.entity.restoreindices.RestoreIndicesArgs;
1817
import com.linkedin.metadata.utils.AuditStampUtils;
18+
import com.linkedin.upgrade.DataHubUpgradeResult;
1919
import com.linkedin.upgrade.DataHubUpgradeState;
2020
import com.linkedin.util.Pair;
2121
import io.datahubproject.metadata.context.OperationContext;
2222
import java.util.List;
23+
import java.util.Map;
24+
import java.util.Optional;
2325
import java.util.concurrent.ExecutionException;
2426
import java.util.concurrent.Future;
2527
import java.util.function.Function;
@@ -33,6 +35,8 @@
3335
*/
3436
@Slf4j
3537
public abstract class AbstractMCLStep implements UpgradeStep {
38+
public static final String LAST_URN_KEY = "lastUrn";
39+
3640
private final OperationContext opContext;
3741
private final EntityService<?> entityService;
3842
private final AspectDao aspectDao;
@@ -70,10 +74,30 @@ protected Urn getUpgradeIdUrn() {
7074
@Override
7175
public Function<UpgradeContext, UpgradeStepResult> executable() {
7276
return (context) -> {
77+
// Resume state
78+
Optional<DataHubUpgradeResult> prevResult =
79+
context.upgrade().getUpgradeResult(opContext, getUpgradeIdUrn(), entityService);
80+
String resumeUrn =
81+
prevResult
82+
.filter(
83+
result ->
84+
DataHubUpgradeState.IN_PROGRESS.equals(result.getState())
85+
&& result.getResult() != null
86+
&& result.getResult().containsKey(LAST_URN_KEY))
87+
.map(result -> result.getResult().get(LAST_URN_KEY))
88+
.orElse(null);
89+
if (resumeUrn != null) {
90+
log.info("{}: Resuming from URN: {}", getUpgradeIdUrn(), resumeUrn);
91+
}
7392

7493
// re-using for configuring the sql scan
7594
RestoreIndicesArgs args =
76-
new RestoreIndicesArgs().aspectName(getAspectName()).batchSize(batchSize).limit(limit);
95+
new RestoreIndicesArgs()
96+
.aspectName(getAspectName())
97+
.batchSize(batchSize)
98+
.lastUrn(resumeUrn)
99+
.urnBasedPagination(resumeUrn != null)
100+
.limit(limit);
77101

78102
if (getUrnLike() != null) {
79103
args = args.urnLike(getUrnLike());
@@ -86,40 +110,62 @@ public Function<UpgradeContext, UpgradeStepResult> executable() {
86110
batch -> {
87111
log.info("Processing batch({}) of size {}.", getAspectName(), batchSize);
88112

89-
List<Pair<Future<?>, Boolean>> futures;
90-
113+
List<Pair<Future<?>, SystemAspect>> futures;
91114
futures =
92115
EntityUtils.toSystemAspectFromEbeanAspects(
93116
opContext.getRetrieverContext().get(),
94117
batch.collect(Collectors.toList()))
95118
.stream()
96119
.map(
97-
systemAspect ->
98-
entityService.alwaysProduceMCLAsync(
99-
opContext,
100-
systemAspect.getUrn(),
101-
systemAspect.getUrn().getEntityType(),
102-
getAspectName(),
103-
systemAspect.getAspectSpec(),
104-
null,
105-
systemAspect.getRecordTemplate(),
106-
null,
107-
systemAspect
108-
.getSystemMetadata()
109-
.setRunId(id())
110-
.setLastObserved(System.currentTimeMillis()),
111-
AuditStampUtils.createDefaultAuditStamp(),
112-
ChangeType.UPSERT))
113-
.collect(Collectors.toList());
114-
115-
futures.forEach(
116-
f -> {
117-
try {
118-
f.getFirst().get();
119-
} catch (InterruptedException | ExecutionException e) {
120-
throw new RuntimeException(e);
121-
}
122-
});
120+
systemAspect -> {
121+
Pair<Future<?>, Boolean> future =
122+
entityService.alwaysProduceMCLAsync(
123+
opContext,
124+
systemAspect.getUrn(),
125+
systemAspect.getUrn().getEntityType(),
126+
getAspectName(),
127+
systemAspect.getAspectSpec(),
128+
null,
129+
systemAspect.getRecordTemplate(),
130+
null,
131+
systemAspect
132+
.getSystemMetadata()
133+
.setRunId(id())
134+
.setLastObserved(System.currentTimeMillis()),
135+
AuditStampUtils.createDefaultAuditStamp(),
136+
ChangeType.UPSERT);
137+
return Pair.<Future<?>, SystemAspect>of(
138+
future.getFirst(), systemAspect);
139+
})
140+
.toList();
141+
142+
SystemAspect lastAspect =
143+
futures.stream()
144+
.map(
145+
f -> {
146+
try {
147+
f.getFirst().get();
148+
return f.getSecond();
149+
} catch (InterruptedException | ExecutionException e) {
150+
throw new RuntimeException(e);
151+
}
152+
})
153+
.reduce((a, b) -> b)
154+
.orElse(null);
155+
156+
// record progress
157+
if (lastAspect != null) {
158+
log.info(
159+
"{}: Saving state. Last urn:{}", getUpgradeIdUrn(), lastAspect.getUrn());
160+
context
161+
.upgrade()
162+
.setUpgradeResult(
163+
opContext,
164+
getUpgradeIdUrn(),
165+
entityService,
166+
DataHubUpgradeState.IN_PROGRESS,
167+
Map.of(LAST_URN_KEY, lastAspect.getUrn().toString()));
168+
}
123169

124170
if (batchDelayMs > 0) {
125171
log.info("Sleeping for {} ms", batchDelayMs);
@@ -142,12 +188,23 @@ public Function<UpgradeContext, UpgradeStepResult> executable() {
142188
@Override
143189
/** Returns whether the upgrade should be skipped. */
144190
public boolean skip(UpgradeContext context) {
145-
boolean previouslyRun =
146-
entityService.exists(
147-
opContext, getUpgradeIdUrn(), DATA_HUB_UPGRADE_RESULT_ASPECT_NAME, true);
148-
if (previouslyRun) {
149-
log.info("{} was already run. Skipping.", id());
191+
Optional<DataHubUpgradeResult> prevResult =
192+
context.upgrade().getUpgradeResult(opContext, getUpgradeIdUrn(), entityService);
193+
194+
boolean previousRunFinal =
195+
prevResult
196+
.filter(
197+
result ->
198+
DataHubUpgradeState.SUCCEEDED.equals(result.getState())
199+
|| DataHubUpgradeState.ABORTED.equals(result.getState()))
200+
.isPresent();
201+
202+
if (previousRunFinal) {
203+
log.info(
204+
"{} was already run. State: {} Skipping.",
205+
id(),
206+
prevResult.map(DataHubUpgradeResult::getState));
150207
}
151-
return previouslyRun;
208+
return previousRunFinal;
152209
}
153210
}

datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/schemafield/GenerateSchemaFieldsFromSchemaMetadataStep.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package com.linkedin.datahub.upgrade.system.schemafield;
22

3+
import static com.linkedin.datahub.upgrade.system.AbstractMCLStep.LAST_URN_KEY;
34
import static com.linkedin.metadata.Constants.APP_SOURCE;
45
import static com.linkedin.metadata.Constants.DATASET_ENTITY_NAME;
56
import static com.linkedin.metadata.Constants.SCHEMA_METADATA_ASPECT_NAME;
@@ -61,7 +62,6 @@
6162
*/
6263
@Slf4j
6364
public class GenerateSchemaFieldsFromSchemaMetadataStep implements UpgradeStep {
64-
private static final String LAST_URN_KEY = "lastUrn";
6565
private static final List<String> REQUIRED_ASPECTS =
6666
List.of(SCHEMA_METADATA_ASPECT_NAME, STATUS_ASPECT_NAME);
6767

0 commit comments

Comments
 (0)