Skip to content

Commit

Permalink
feat(upgrade): common base for mcl upgrades (datahub-project#10429)
Browse files Browse the repository at this point in the history
  • Loading branch information
david-leifker authored May 3, 2024
1 parent 5a686c5 commit b325819
Show file tree
Hide file tree
Showing 6 changed files with 186 additions and 131 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.linkedin.datahub.upgrade.system.vianodes.ReindexDataJobViaNodesCLL;
import com.linkedin.metadata.entity.AspectDao;
import com.linkedin.metadata.entity.EntityService;
import io.datahubproject.metadata.context.OperationContext;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Conditional;
Expand All @@ -15,13 +16,14 @@ public class ReindexDataJobViaNodesCLLConfig {

@Bean
public NonBlockingSystemUpgrade reindexDataJobViaNodesCLL(
final OperationContext opContext,
final EntityService<?> entityService,
final AspectDao aspectDao,
@Value("${systemUpdate.dataJobNodeCLL.enabled}") final boolean enabled,
@Value("${systemUpdate.dataJobNodeCLL.batchSize}") final Integer batchSize,
@Value("${systemUpdate.dataJobNodeCLL.delayMs}") final Integer delayMs,
@Value("${systemUpdate.dataJobNodeCLL.limit}") final Integer limit) {
return new ReindexDataJobViaNodesCLL(
entityService, aspectDao, enabled, batchSize, delayMs, limit);
opContext, entityService, aspectDao, enabled, batchSize, delayMs, limit);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
package com.linkedin.datahub.upgrade.system;

import static com.linkedin.metadata.Constants.DATA_HUB_UPGRADE_RESULT_ASPECT_NAME;

import com.linkedin.common.urn.Urn;
import com.linkedin.datahub.upgrade.UpgradeContext;
import com.linkedin.datahub.upgrade.UpgradeStep;
import com.linkedin.datahub.upgrade.UpgradeStepResult;
import com.linkedin.datahub.upgrade.impl.DefaultUpgradeStepResult;
import com.linkedin.events.metadata.ChangeType;
import com.linkedin.metadata.boot.BootstrapStep;
import com.linkedin.metadata.entity.AspectDao;
import com.linkedin.metadata.entity.EntityService;
import com.linkedin.metadata.entity.EntityUtils;
import com.linkedin.metadata.entity.restoreindices.RestoreIndicesArgs;
import com.linkedin.metadata.models.AspectSpec;
import com.linkedin.metadata.utils.AuditStampUtils;
import com.linkedin.util.Pair;
import io.datahubproject.metadata.context.OperationContext;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import lombok.extern.slf4j.Slf4j;

/**
* Generic upgrade step class for generating MCLs for a given aspect in order to update ES documents
*/
@Slf4j
public abstract class AbstractMCLStep implements UpgradeStep {
private final OperationContext opContext;
private final EntityService<?> entityService;
private final AspectDao aspectDao;

private final int batchSize;
private final int batchDelayMs;
private final int limit;

public AbstractMCLStep(
OperationContext opContext,
EntityService<?> entityService,
AspectDao aspectDao,
Integer batchSize,
Integer batchDelayMs,
Integer limit) {
this.opContext = opContext;
this.entityService = entityService;
this.aspectDao = aspectDao;
this.batchSize = batchSize;
this.batchDelayMs = batchDelayMs;
this.limit = limit;
}

@Nonnull
protected abstract String getAspectName();

protected Urn getUpgradeIdUrn() {
return BootstrapStep.getUpgradeUrn(id());
}

/** Optionally apply an urn-like sql filter, otherwise all urns */
@Nullable
protected abstract String getUrnLike();

@Override
public Function<UpgradeContext, UpgradeStepResult> executable() {
return (context) -> {

// re-using for configuring the sql scan
RestoreIndicesArgs args =
new RestoreIndicesArgs().aspectName(getAspectName()).batchSize(batchSize).limit(limit);

if (getUrnLike() != null) {
args = args.urnLike(getUrnLike());
}

final AspectSpec aspectSpec =
opContext.getEntityRegistry().getAspectSpecs().get(getAspectName());

aspectDao
.streamAspectBatches(args)
.forEach(
batch -> {
log.info("Processing batch({}) of size {}.", getAspectName(), batchSize);

List<Pair<Future<?>, Boolean>> futures =
EntityUtils.toSystemAspectFromEbeanAspects(
opContext.getRetrieverContext().get(),
batch.collect(Collectors.toList()))
.stream()
.map(
systemAspect ->
entityService.alwaysProduceMCLAsync(
opContext,
systemAspect.getUrn(),
systemAspect.getUrn().getEntityType(),
getAspectName(),
aspectSpec,
null,
systemAspect.getRecordTemplate(),
null,
systemAspect
.getSystemMetadata()
.setRunId(id())
.setLastObserved(System.currentTimeMillis()),
AuditStampUtils.createDefaultAuditStamp(),
ChangeType.UPSERT))
.collect(Collectors.toList());

futures.forEach(
f -> {
try {
f.getFirst().get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
});

if (batchDelayMs > 0) {
log.info("Sleeping for {} ms", batchDelayMs);
try {
Thread.sleep(batchDelayMs);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
});

entityService
.streamRestoreIndices(opContext, args, x -> context.report().addLine((String) x))
.forEach(
result -> {
context.report().addLine("Rows migrated: " + result.rowsMigrated);
context.report().addLine("Rows ignored: " + result.ignored);
});

BootstrapStep.setUpgradeResult(opContext, getUpgradeIdUrn(), entityService);
context.report().addLine("State updated: " + getUpgradeIdUrn());

return new DefaultUpgradeStepResult(id(), UpgradeStepResult.Result.SUCCEEDED);
};
}

@Override
/** Returns whether the upgrade should be skipped. */
public boolean skip(UpgradeContext context) {
boolean previouslyRun =
entityService.exists(
opContext, getUpgradeIdUrn(), DATA_HUB_UPGRADE_RESULT_ASPECT_NAME, true);
if (previouslyRun) {
log.info("{} was already run. Skipping.", id());
}
return previouslyRun;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@
import com.linkedin.datahub.upgrade.system.NonBlockingSystemUpgrade;
import com.linkedin.metadata.entity.AspectDao;
import com.linkedin.metadata.entity.EntityService;
import io.datahubproject.metadata.context.OperationContext;
import java.util.List;
import javax.annotation.Nonnull;
import lombok.extern.slf4j.Slf4j;

/**
Expand All @@ -18,6 +20,7 @@ public class ReindexDataJobViaNodesCLL implements NonBlockingSystemUpgrade {
private final List<UpgradeStep> _steps;

public ReindexDataJobViaNodesCLL(
@Nonnull OperationContext opContext,
EntityService<?> entityService,
AspectDao aspectDao,
boolean enabled,
Expand All @@ -28,7 +31,7 @@ public ReindexDataJobViaNodesCLL(
_steps =
ImmutableList.of(
new ReindexDataJobViaNodesCLLStep(
entityService, aspectDao, batchSize, batchDelayMs, limit));
opContext, entityService, aspectDao, batchSize, batchDelayMs, limit));
} else {
_steps = ImmutableList.of();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,148 +2,43 @@

import static com.linkedin.metadata.Constants.*;

import com.linkedin.common.urn.Urn;
import com.linkedin.datahub.upgrade.UpgradeContext;
import com.linkedin.datahub.upgrade.UpgradeStep;
import com.linkedin.datahub.upgrade.UpgradeStepResult;
import com.linkedin.datahub.upgrade.impl.DefaultUpgradeStepResult;
import com.linkedin.events.metadata.ChangeType;
import com.linkedin.metadata.boot.BootstrapStep;
import com.linkedin.datahub.upgrade.system.AbstractMCLStep;
import com.linkedin.metadata.entity.AspectDao;
import com.linkedin.metadata.entity.EntityService;
import com.linkedin.metadata.entity.EntityUtils;
import com.linkedin.metadata.entity.restoreindices.RestoreIndicesArgs;
import com.linkedin.metadata.models.AspectSpec;
import com.linkedin.metadata.utils.AuditStampUtils;
import com.linkedin.util.Pair;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.function.Function;
import java.util.stream.Collectors;
import io.datahubproject.metadata.context.OperationContext;
import javax.annotation.Nonnull;
import lombok.extern.slf4j.Slf4j;
import org.jetbrains.annotations.Nullable;

@Slf4j
public class ReindexDataJobViaNodesCLLStep implements UpgradeStep {

public static final String UPGRADE_ID = "via-node-cll-reindex-datajob-v3";
private static final Urn UPGRADE_ID_URN = BootstrapStep.getUpgradeUrn(UPGRADE_ID);

private final EntityService<?> entityService;
private final AspectDao aspectDao;
private final int batchSize;
private final int batchDelayMs;
private final int limit;
public class ReindexDataJobViaNodesCLLStep extends AbstractMCLStep {

public ReindexDataJobViaNodesCLLStep(
OperationContext opContext,
EntityService<?> entityService,
AspectDao aspectDao,
Integer batchSize,
Integer batchDelayMs,
Integer limit) {
this.entityService = entityService;
this.aspectDao = aspectDao;
this.batchSize = batchSize != null ? batchSize : 200;
this.batchDelayMs = batchDelayMs;
this.limit = limit;
super(opContext, entityService, aspectDao, batchSize, batchDelayMs, limit);
}

@Override
public Function<UpgradeContext, UpgradeStepResult> executable() {
return (context) -> {

// re-using for configuring the sql scan
RestoreIndicesArgs args =
new RestoreIndicesArgs()
.aspectName(DATA_JOB_INPUT_OUTPUT_ASPECT_NAME)
.urnLike("urn:li:" + DATA_JOB_ENTITY_NAME + ":%")
.batchSize(batchSize)
.limit(limit);

final AspectSpec aspectSpec =
context
.opContext()
.getEntityRegistry()
.getAspectSpecs()
.get(DATA_JOB_INPUT_OUTPUT_ASPECT_NAME);

aspectDao
.streamAspectBatches(args)
.forEach(
batch -> {
log.info("Processing batch of size {}.", batchSize);

List<Pair<Future<?>, Boolean>> futures =
EntityUtils.toSystemAspectFromEbeanAspects(
context.opContext().getRetrieverContext().get(),
batch.collect(Collectors.toList()))
.stream()
.map(
systemAspect ->
entityService.alwaysProduceMCLAsync(
context.opContext(),
systemAspect.getUrn(),
systemAspect.getUrn().getEntityType(),
DATA_JOB_INPUT_OUTPUT_ASPECT_NAME,
aspectSpec,
null,
systemAspect.getRecordTemplate(),
null,
systemAspect
.getSystemMetadata()
.setRunId(UPGRADE_ID)
.setLastObserved(System.currentTimeMillis()),
AuditStampUtils.createDefaultAuditStamp(),
ChangeType.UPSERT))
.collect(Collectors.toList());

futures.forEach(
f -> {
try {
f.getFirst().get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
});

if (batchDelayMs > 0) {
log.info("Sleeping for {} ms", batchDelayMs);
try {
Thread.sleep(batchDelayMs);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
});

entityService
.streamRestoreIndices(
context.opContext(), args, x -> context.report().addLine((String) x))
.forEach(
result -> {
context.report().addLine("Rows migrated: " + result.rowsMigrated);
context.report().addLine("Rows ignored: " + result.ignored);
});

BootstrapStep.setUpgradeResult(context.opContext(), UPGRADE_ID_URN, entityService);
context.report().addLine("State updated: " + UPGRADE_ID_URN);

return new DefaultUpgradeStepResult(id(), UpgradeStepResult.Result.SUCCEEDED);
};
public String id() {
return "via-node-cll-reindex-datajob-v3";
}

@Nonnull
@Override
public String id() {
return UPGRADE_ID;
protected String getAspectName() {
return DATA_JOB_INPUT_OUTPUT_ASPECT_NAME;
}

/**
* Returns whether the upgrade should proceed if the step fails after exceeding the maximum
* retries.
*/
@Nullable
@Override
public boolean isOptional() {
return false;
protected String getUrnLike() {
return "urn:li:" + DATA_JOB_ENTITY_NAME + ":%";
}

@Override
Expand All @@ -152,17 +47,11 @@ public boolean isOptional() {
* variable SKIP_REINDEX_DATA_JOB_INPUT_OUTPUT to determine whether to skip.
*/
public boolean skip(UpgradeContext context) {
boolean previouslyRun =
entityService.exists(
context.opContext(), UPGRADE_ID_URN, DATA_HUB_UPGRADE_RESULT_ASPECT_NAME, true);
boolean envFlagRecommendsSkip =
Boolean.parseBoolean(System.getenv("SKIP_REINDEX_DATA_JOB_INPUT_OUTPUT"));
if (previouslyRun) {
log.info("{} was already run. Skipping.", id());
}
if (envFlagRecommendsSkip) {
log.info("Environment variable SKIP_REINDEX_DATA_JOB_INPUT_OUTPUT is set to true. Skipping.");
}
return (previouslyRun || envFlagRecommendsSkip);
return (super.skip(context) || envFlagRecommendsSkip);
}
}
Loading

0 comments on commit b325819

Please sign in to comment.