Skip to content

Commit

Permalink
feat(bootstrap): bootstrap template mcps (#11518)
Browse files Browse the repository at this point in the history
  • Loading branch information
david-leifker authored Oct 4, 2024
1 parent 73d8a46 commit 04349cb
Show file tree
Hide file tree
Showing 58 changed files with 1,969 additions and 1,585 deletions.
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,7 @@ project.ext.externalDependency = [
'annotationApi': 'javax.annotation:javax.annotation-api:1.3.2',
'jakartaAnnotationApi': 'jakarta.annotation:jakarta.annotation-api:3.0.0',
'classGraph': 'io.github.classgraph:classgraph:4.8.172',
'mustache': 'com.github.spullara.mustache.java:compiler:0.9.14'
]

allprojects {
Expand Down
2 changes: 2 additions & 0 deletions datahub-upgrade/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ dependencies {
implementation project(':metadata-dao-impl:kafka-producer')
implementation externalDependency.charle

implementation externalDependency.mustache
implementation externalDependency.javaxInject
implementation(externalDependency.hadoopClient) {
exclude group: 'net.minidev', module: 'json-smart'
Expand Down Expand Up @@ -83,6 +84,7 @@ dependencies {
testImplementation externalDependency.springBootTest
testImplementation externalDependency.mockito
testImplementation externalDependency.testng
testImplementation 'uk.org.webcompere:system-stubs-testng:2.1.7'
testRuntimeOnly externalDependency.logbackClassic

constraints {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
public interface UpgradeManager {

/** Register an {@link Upgrade} with the manaager. */
void register(Upgrade upgrade);
UpgradeManager register(Upgrade upgrade);

/** Kick off an {@link Upgrade} by identifier. */
UpgradeResult execute(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package com.linkedin.datahub.upgrade.config;

import com.linkedin.datahub.upgrade.system.bootstrapmcps.BootstrapMCP;
import com.linkedin.metadata.entity.EntityService;
import io.datahubproject.metadata.context.OperationContext;
import java.io.IOException;
import javax.annotation.Nonnull;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class BootstrapMCPConfig {

@Nonnull
@Value("${systemUpdate.bootstrap.mcpConfig}")
private String bootstrapMCPConfig;

@Bean(name = "bootstrapMCPNonBlocking")
public BootstrapMCP bootstrapMCPNonBlocking(
final OperationContext opContext, EntityService<?> entityService) throws IOException {
return new BootstrapMCP(opContext, bootstrapMCPConfig, entityService, false);
}

@Bean(name = "bootstrapMCPBlocking")
public BootstrapMCP bootstrapMCPBlocking(
final OperationContext opContext, EntityService<?> entityService) throws IOException {
return new BootstrapMCP(opContext, bootstrapMCPConfig, entityService, true);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import com.linkedin.datahub.upgrade.system.SystemUpdate;
import com.linkedin.datahub.upgrade.system.SystemUpdateBlocking;
import com.linkedin.datahub.upgrade.system.SystemUpdateNonBlocking;
import com.linkedin.datahub.upgrade.system.bootstrapmcps.BootstrapMCP;
import com.linkedin.datahub.upgrade.system.elasticsearch.steps.DataHubStartupStep;
import com.linkedin.gms.factory.config.ConfigurationProvider;
import com.linkedin.gms.factory.kafka.DataHubKafkaProducerFactory;
Expand All @@ -31,6 +32,7 @@
import io.datahubproject.metadata.services.RestrictedService;
import java.util.List;
import javax.annotation.Nonnull;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import org.apache.avro.generic.IndexedRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
Expand All @@ -54,21 +56,31 @@ public class SystemUpdateConfig {
public SystemUpdate systemUpdate(
final List<BlockingSystemUpgrade> blockingSystemUpgrades,
final List<NonBlockingSystemUpgrade> nonBlockingSystemUpgrades,
final DataHubStartupStep dataHubStartupStep) {
return new SystemUpdate(blockingSystemUpgrades, nonBlockingSystemUpgrades, dataHubStartupStep);
final DataHubStartupStep dataHubStartupStep,
@Qualifier("bootstrapMCPBlocking") @NonNull final BootstrapMCP bootstrapMCPBlocking,
@Qualifier("bootstrapMCPNonBlocking") @NonNull final BootstrapMCP bootstrapMCPNonBlocking) {
return new SystemUpdate(
blockingSystemUpgrades,
nonBlockingSystemUpgrades,
dataHubStartupStep,
bootstrapMCPBlocking,
bootstrapMCPNonBlocking);
}

@Bean(name = "systemUpdateBlocking")
public SystemUpdateBlocking systemUpdateBlocking(
final List<BlockingSystemUpgrade> blockingSystemUpgrades,
final DataHubStartupStep dataHubStartupStep) {
return new SystemUpdateBlocking(blockingSystemUpgrades, List.of(), dataHubStartupStep);
final DataHubStartupStep dataHubStartupStep,
@Qualifier("bootstrapMCPBlocking") @NonNull final BootstrapMCP bootstrapMCPBlocking) {
return new SystemUpdateBlocking(
blockingSystemUpgrades, dataHubStartupStep, bootstrapMCPBlocking);
}

@Bean(name = "systemUpdateNonBlocking")
public SystemUpdateNonBlocking systemUpdateNonBlocking(
final List<NonBlockingSystemUpgrade> nonBlockingSystemUpgrades) {
return new SystemUpdateNonBlocking(List.of(), nonBlockingSystemUpgrades, null);
final List<NonBlockingSystemUpgrade> nonBlockingSystemUpgrades,
@Qualifier("bootstrapMCPNonBlocking") @NonNull final BootstrapMCP bootstrapMCPNonBlocking) {
return new SystemUpdateNonBlocking(nonBlockingSystemUpgrades, bootstrapMCPNonBlocking);
}

@Value("#{systemEnvironment['DATAHUB_REVISION'] ?: '0'}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public class DefaultUpgradeContext implements UpgradeContext {
private final List<String> args;
private final Map<String, Optional<String>> parsedArgs;

DefaultUpgradeContext(
public DefaultUpgradeContext(
@Nonnull OperationContext opContext,
Upgrade upgrade,
UpgradeReport report,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,9 @@ public class DefaultUpgradeManager implements UpgradeManager {
private final Map<String, Upgrade> _upgrades = new HashMap<>();

@Override
public void register(@Nonnull Upgrade upgrade) {
public UpgradeManager register(@Nonnull Upgrade upgrade) {
_upgrades.put(upgrade.id(), upgrade);
return this;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.linkedin.datahub.upgrade.Upgrade;
import com.linkedin.datahub.upgrade.UpgradeCleanupStep;
import com.linkedin.datahub.upgrade.UpgradeStep;
import com.linkedin.datahub.upgrade.system.bootstrapmcps.BootstrapMCP;
import com.linkedin.datahub.upgrade.system.elasticsearch.steps.DataHubStartupStep;
import java.util.LinkedList;
import java.util.List;
Expand All @@ -22,7 +23,9 @@ public class SystemUpdate implements Upgrade {
public SystemUpdate(
@NonNull final List<BlockingSystemUpgrade> blockingSystemUpgrades,
@NonNull final List<NonBlockingSystemUpgrade> nonBlockingSystemUpgrades,
@Nullable final DataHubStartupStep dataHubStartupStep) {
@Nullable final DataHubStartupStep dataHubStartupStep,
@Nullable final BootstrapMCP bootstrapMCPBlocking,
@Nullable final BootstrapMCP bootstrapMCPNonBlocking) {

steps = new LinkedList<>();
cleanupSteps = new LinkedList<>();
Expand All @@ -32,11 +35,23 @@ public SystemUpdate(
cleanupSteps.addAll(
blockingSystemUpgrades.stream().flatMap(up -> up.cleanupSteps().stream()).toList());

// bootstrap blocking only
if (bootstrapMCPBlocking != null) {
steps.addAll(bootstrapMCPBlocking.steps());
cleanupSteps.addAll(bootstrapMCPBlocking.cleanupSteps());
}

// emit system update message if blocking upgrade(s) present
if (dataHubStartupStep != null && !blockingSystemUpgrades.isEmpty()) {
steps.add(dataHubStartupStep);
}

// bootstrap non-blocking only
if (bootstrapMCPNonBlocking != null) {
steps.addAll(bootstrapMCPNonBlocking.steps());
cleanupSteps.addAll(bootstrapMCPNonBlocking.cleanupSteps());
}

// add non-blocking upgrades last
steps.addAll(nonBlockingSystemUpgrades.stream().flatMap(up -> up.steps().stream()).toList());
cleanupSteps.addAll(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
package com.linkedin.datahub.upgrade.system;

import com.linkedin.datahub.upgrade.system.bootstrapmcps.BootstrapMCP;
import com.linkedin.datahub.upgrade.system.elasticsearch.steps.DataHubStartupStep;
import java.util.List;
import lombok.NonNull;
import org.jetbrains.annotations.Nullable;

public class SystemUpdateBlocking extends SystemUpdate {

public SystemUpdateBlocking(
@NonNull List<BlockingSystemUpgrade> blockingSystemUpgrades,
@NonNull List<NonBlockingSystemUpgrade> nonBlockingSystemUpgrades,
@Nullable DataHubStartupStep dataHubStartupStep) {
super(blockingSystemUpgrades, nonBlockingSystemUpgrades, dataHubStartupStep);
@NonNull DataHubStartupStep dataHubStartupStep,
@NonNull final BootstrapMCP bootstrapMCPBlocking) {
super(blockingSystemUpgrades, List.of(), dataHubStartupStep, bootstrapMCPBlocking, null);
}
}
Original file line number Diff line number Diff line change
@@ -1,16 +1,14 @@
package com.linkedin.datahub.upgrade.system;

import com.linkedin.datahub.upgrade.system.elasticsearch.steps.DataHubStartupStep;
import com.linkedin.datahub.upgrade.system.bootstrapmcps.BootstrapMCP;
import java.util.List;
import lombok.NonNull;
import org.jetbrains.annotations.Nullable;

public class SystemUpdateNonBlocking extends SystemUpdate {

public SystemUpdateNonBlocking(
@NonNull List<BlockingSystemUpgrade> blockingSystemUpgrades,
@NonNull List<NonBlockingSystemUpgrade> nonBlockingSystemUpgrades,
@Nullable DataHubStartupStep dataHubStartupStep) {
super(blockingSystemUpgrades, nonBlockingSystemUpgrades, dataHubStartupStep);
final BootstrapMCP bootstrapMCPNonBlocking) {
super(List.of(), nonBlockingSystemUpgrades, null, null, bootstrapMCPNonBlocking);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package com.linkedin.datahub.upgrade.system.bootstrapmcps;

import com.google.common.collect.ImmutableList;
import com.linkedin.datahub.upgrade.Upgrade;
import com.linkedin.datahub.upgrade.UpgradeStep;
import com.linkedin.metadata.entity.EntityService;
import io.datahubproject.metadata.context.OperationContext;
import java.io.IOException;
import java.util.List;
import javax.annotation.Nullable;

public class BootstrapMCP implements Upgrade {
private final List<UpgradeStep> _steps;

public BootstrapMCP(
OperationContext opContext,
@Nullable String bootstrapMCPConfig,
EntityService<?> entityService,
boolean isBlocking)
throws IOException {
if (bootstrapMCPConfig != null && !bootstrapMCPConfig.isEmpty()) {
_steps =
BootstrapMCPUtil.generateSteps(opContext, isBlocking, bootstrapMCPConfig, entityService);
} else {
_steps = ImmutableList.of();
}
}

@Override
public String id() {
return getClass().getSimpleName();
}

@Override
public List<UpgradeStep> steps() {
return _steps;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package com.linkedin.datahub.upgrade.system.bootstrapmcps;

import static com.linkedin.metadata.Constants.DATA_HUB_UPGRADE_RESULT_ASPECT_NAME;

import com.linkedin.common.urn.Urn;
import com.linkedin.datahub.upgrade.UpgradeContext;
import com.linkedin.datahub.upgrade.UpgradeStep;
import com.linkedin.datahub.upgrade.UpgradeStepResult;
import com.linkedin.datahub.upgrade.impl.DefaultUpgradeStepResult;
import com.linkedin.datahub.upgrade.system.bootstrapmcps.model.BootstrapMCPConfigFile;
import com.linkedin.metadata.aspect.batch.AspectsBatch;
import com.linkedin.metadata.boot.BootstrapStep;
import com.linkedin.metadata.entity.EntityService;
import com.linkedin.upgrade.DataHubUpgradeState;
import io.datahubproject.metadata.context.OperationContext;
import java.io.IOException;
import java.util.List;
import java.util.function.Function;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;

/**
* This bootstrap step is responsible for upgrading DataHub policy documents with new searchable
* fields in ES
*/
@Slf4j
public class BootstrapMCPStep implements UpgradeStep {
private final String upgradeId;
private final Urn upgradeIdUrn;

private final OperationContext opContext;
private final EntityService<?> entityService;
@Getter private final BootstrapMCPConfigFile.MCPTemplate mcpTemplate;

public BootstrapMCPStep(
OperationContext opContext,
EntityService<?> entityService,
BootstrapMCPConfigFile.MCPTemplate mcpTemplate) {
this.opContext = opContext;
this.entityService = entityService;
this.mcpTemplate = mcpTemplate;
this.upgradeId =
String.join("-", List.of("bootstrap", mcpTemplate.getName(), mcpTemplate.getVersion()));
this.upgradeIdUrn = BootstrapStep.getUpgradeUrn(this.upgradeId);
}

@Override
public String id() {
return upgradeId;
}

@Override
public Function<UpgradeContext, UpgradeStepResult> executable() {
return (context) -> {
try {
AspectsBatch batch = BootstrapMCPUtil.generateAspectBatch(opContext, mcpTemplate);
log.info("Ingesting {} MCPs", batch.getItems().size());
entityService.ingestProposal(opContext, batch, mcpTemplate.isAsync());
} catch (IOException e) {
log.error("Error bootstrapping MCPs", e);
return new DefaultUpgradeStepResult(id(), DataHubUpgradeState.FAILED);
}

BootstrapStep.setUpgradeResult(context.opContext(), upgradeIdUrn, entityService);

return new DefaultUpgradeStepResult(id(), DataHubUpgradeState.SUCCEEDED);
};
}

/**
* Returns whether the upgrade should proceed if the step fails after exceeding the maximum
* retries.
*/
@Override
public boolean isOptional() {
return mcpTemplate.isOptional();
}

/** Returns whether the upgrade should be skipped. */
@Override
public boolean skip(UpgradeContext context) {
if (!mcpTemplate.isForce()) {
boolean previouslyRun =
entityService.exists(
context.opContext(), upgradeIdUrn, DATA_HUB_UPGRADE_RESULT_ASPECT_NAME, true);
if (previouslyRun) {
log.info("{} was already run. Skipping.", id());
}
return previouslyRun;
} else {
log.info("{} forced run.", id());
return false;
}
}
}
Loading

0 comments on commit 04349cb

Please sign in to comment.