Skip to content

Commit

Permalink
feat(graph): graph index soft-delete support (datahub-project#11453)
Browse files Browse the repository at this point in the history
  • Loading branch information
david-leifker authored Sep 23, 2024
1 parent 315ff8f commit a481ea4
Show file tree
Hide file tree
Showing 66 changed files with 2,149 additions and 954 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,13 @@
import com.linkedin.gms.factory.kafka.common.TopicConventionFactory;
import com.linkedin.gms.factory.kafka.schemaregistry.InternalSchemaRegistryFactory;
import com.linkedin.gms.factory.search.BaseElasticSearchComponentsFactory;
import com.linkedin.metadata.aspect.GraphRetriever;
import com.linkedin.metadata.config.kafka.KafkaConfiguration;
import com.linkedin.metadata.dao.producer.KafkaEventProducer;
import com.linkedin.metadata.dao.producer.KafkaHealthChecker;
import com.linkedin.metadata.entity.EntityService;
import com.linkedin.metadata.entity.EntityServiceAspectRetriever;
import com.linkedin.metadata.graph.GraphService;
import com.linkedin.metadata.graph.SystemGraphRetriever;
import com.linkedin.metadata.models.registry.EntityRegistry;
import com.linkedin.metadata.search.SearchService;
import com.linkedin.metadata.search.SearchServiceSearchRetriever;
Expand Down Expand Up @@ -145,7 +146,7 @@ protected OperationContext javaSystemOperationContext(
@Nonnull final EntityRegistry entityRegistry,
@Nonnull final EntityService<?> entityService,
@Nonnull final RestrictedService restrictedService,
@Nonnull final GraphRetriever graphRetriever,
@Nonnull final GraphService graphService,
@Nonnull final SearchService searchService,
@Qualifier("baseElasticSearchComponents")
BaseElasticSearchComponentsFactory.BaseElasticSearchComponents components) {
Expand All @@ -159,6 +160,9 @@ protected OperationContext javaSystemOperationContext(
SearchServiceSearchRetriever searchServiceSearchRetriever =
SearchServiceSearchRetriever.builder().searchService(searchService).build();

SystemGraphRetriever systemGraphRetriever =
SystemGraphRetriever.builder().graphService(graphService).build();

OperationContext systemOperationContext =
OperationContext.asSystem(
operationContextConfig,
Expand All @@ -168,11 +172,12 @@ protected OperationContext javaSystemOperationContext(
components.getIndexConvention(),
RetrieverContext.builder()
.aspectRetriever(entityServiceAspectRetriever)
.graphRetriever(graphRetriever)
.graphRetriever(systemGraphRetriever)
.searchRetriever(searchServiceSearchRetriever)
.build());

entityServiceAspectRetriever.setSystemOperationContext(systemOperationContext);
systemGraphRetriever.setSystemOperationContext(systemOperationContext);
searchServiceSearchRetriever.setSystemOperationContext(systemOperationContext);

return systemOperationContext;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
package com.linkedin.datahub.upgrade.config;
package com.linkedin.datahub.upgrade.config.graph;

import com.linkedin.datahub.upgrade.config.SystemUpdateCondition;
import com.linkedin.datahub.upgrade.system.NonBlockingSystemUpgrade;
import com.linkedin.datahub.upgrade.system.vianodes.ReindexDataJobViaNodesCLL;
import com.linkedin.datahub.upgrade.system.graph.vianodes.ReindexDataJobViaNodesCLL;
import com.linkedin.metadata.entity.AspectDao;
import com.linkedin.metadata.entity.EntityService;
import io.datahubproject.metadata.context.OperationContext;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package com.linkedin.datahub.upgrade.config.graph;

import com.linkedin.datahub.upgrade.config.SystemUpdateCondition;
import com.linkedin.datahub.upgrade.system.NonBlockingSystemUpgrade;
import com.linkedin.datahub.upgrade.system.graph.edgestatus.ReindexEdgeStatus;
import com.linkedin.metadata.entity.AspectDao;
import com.linkedin.metadata.entity.EntityService;
import io.datahubproject.metadata.context.OperationContext;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Conditional;
import org.springframework.context.annotation.Configuration;

@Configuration
@Conditional(SystemUpdateCondition.NonBlockingSystemUpdateCondition.class)
public class ReindexEdgeStatusConfig {

@Bean
public NonBlockingSystemUpgrade reindexEdgeStatus(
final OperationContext opContext,
final EntityService<?> entityService,
final AspectDao aspectDao,
@Value("${elasticsearch.search.graph.graphStatusEnabled}") final boolean featureEnabled,
@Value("${systemUpdate.edgeStatus.enabled}") final boolean enabled,
@Value("${systemUpdate.edgeStatus.batchSize}") final Integer batchSize,
@Value("${systemUpdate.edgeStatus.delayMs}") final Integer delayMs,
@Value("${systemUpdate.edgeStatus.limit}") final Integer limit) {
return new ReindexEdgeStatus(
opContext, entityService, aspectDao, featureEnabled && enabled, batchSize, delayMs, limit);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package com.linkedin.datahub.upgrade.system.graph.edgestatus;

import com.google.common.collect.ImmutableList;
import com.linkedin.datahub.upgrade.UpgradeStep;
import com.linkedin.datahub.upgrade.system.NonBlockingSystemUpgrade;
import com.linkedin.metadata.entity.AspectDao;
import com.linkedin.metadata.entity.EntityService;
import io.datahubproject.metadata.context.OperationContext;
import java.util.List;
import javax.annotation.Nonnull;
import lombok.extern.slf4j.Slf4j;

/**
* A job that reindexes all status aspects as part of the graph edges containing status information.
* This is required to make sure previously written status information is present in the graph
* index.
*/
@Slf4j
public class ReindexEdgeStatus implements NonBlockingSystemUpgrade {

private final List<UpgradeStep> _steps;

public ReindexEdgeStatus(
@Nonnull OperationContext opContext,
EntityService<?> entityService,
AspectDao aspectDao,
boolean enabled,
Integer batchSize,
Integer batchDelayMs,
Integer limit) {
if (enabled) {
_steps =
ImmutableList.of(
new ReindexReindexEdgeStatusStep(
opContext, entityService, aspectDao, batchSize, batchDelayMs, limit));
} else {
_steps = ImmutableList.of();
}
}

@Override
public String id() {
return this.getClass().getName();
}

@Override
public List<UpgradeStep> steps() {
return _steps;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package com.linkedin.datahub.upgrade.system.graph.edgestatus;

import static com.linkedin.metadata.Constants.STATUS_ASPECT_NAME;

import com.linkedin.datahub.upgrade.UpgradeContext;
import com.linkedin.datahub.upgrade.system.AbstractMCLStep;
import com.linkedin.metadata.entity.AspectDao;
import com.linkedin.metadata.entity.EntityService;
import io.datahubproject.metadata.context.OperationContext;
import javax.annotation.Nonnull;
import lombok.extern.slf4j.Slf4j;
import org.jetbrains.annotations.Nullable;

@Slf4j
public class ReindexReindexEdgeStatusStep extends AbstractMCLStep {

public ReindexReindexEdgeStatusStep(
OperationContext opContext,
EntityService<?> entityService,
AspectDao aspectDao,
Integer batchSize,
Integer batchDelayMs,
Integer limit) {
super(opContext, entityService, aspectDao, batchSize, batchDelayMs, limit);
}

@Override
public String id() {
return "edge-status-reindex-v1";
}

@Nonnull
@Override
protected String getAspectName() {
return STATUS_ASPECT_NAME;
}

@Nullable
@Override
protected String getUrnLike() {
return null;
}

@Override
/**
* Returns whether the upgrade should be skipped. Uses previous run history or the environment
* variable to determine whether to skip.
*/
public boolean skip(UpgradeContext context) {
boolean envFlagRecommendsSkip = Boolean.parseBoolean(System.getenv("SKIP_REINDEX_EDGE_STATUS"));
if (envFlagRecommendsSkip) {
log.info("Environment variable SKIP_REINDEX_EDGE_STATUS is set to true. Skipping.");
}
return (super.skip(context) || envFlagRecommendsSkip);
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.linkedin.datahub.upgrade.system.vianodes;
package com.linkedin.datahub.upgrade.system.graph.vianodes;

import com.google.common.collect.ImmutableList;
import com.linkedin.datahub.upgrade.UpgradeStep;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.linkedin.datahub.upgrade.system.vianodes;
package com.linkedin.datahub.upgrade.system.graph.vianodes;

import static com.linkedin.metadata.Constants.*;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

import com.linkedin.datahub.upgrade.impl.DefaultUpgradeManager;
import com.linkedin.datahub.upgrade.system.SystemUpdateNonBlocking;
import com.linkedin.datahub.upgrade.system.vianodes.ReindexDataJobViaNodesCLL;
import com.linkedin.datahub.upgrade.system.graph.vianodes.ReindexDataJobViaNodesCLL;
import com.linkedin.metadata.boot.kafka.MockSystemUpdateDeserializer;
import com.linkedin.metadata.boot.kafka.MockSystemUpdateSerializer;
import com.linkedin.metadata.config.kafka.KafkaConfiguration;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import java.util.Optional;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.EqualsAndHashCode;
Expand All @@ -36,6 +37,10 @@ public class Edge {
@EqualsAndHashCode.Include private Urn lifecycleOwner;
// An entity through which the edge between source and destination is created
@EqualsAndHashCode.Include private Urn via;
@EqualsAndHashCode.Exclude @Nullable private Boolean sourceStatus;
@EqualsAndHashCode.Exclude @Nullable private Boolean destinationStatus;
@EqualsAndHashCode.Exclude @Nullable private Boolean viaStatus;
@EqualsAndHashCode.Exclude @Nullable private Boolean lifecycleOwnerStatus;

// For backwards compatibility
public Edge(
Expand All @@ -57,6 +62,38 @@ public Edge(
updatedActor,
properties,
null,
null,
null,
null,
null,
null);
}

public Edge(
Urn source,
Urn destination,
String relationshipType,
Long createdOn,
Urn createdActor,
Long updatedOn,
Urn updatedActor,
Map<String, Object> properties,
Urn lifecycleOwner,
Urn via) {
this(
source,
destination,
relationshipType,
createdOn,
createdActor,
updatedOn,
updatedActor,
properties,
lifecycleOwner,
via,
null,
null,
null,
null);
}

Expand Down Expand Up @@ -91,6 +128,10 @@ public String toDocId(@Nonnull String idHashAlgo) {
public static final String EDGE_FIELD_LIFECYCLE_OWNER = "lifecycleOwner";
public static final String EDGE_SOURCE_URN_FIELD = "source.urn";
public static final String EDGE_DESTINATION_URN_FIELD = "destination.urn";
public static final String EDGE_SOURCE_STATUS = "source.removed";
public static final String EDGE_DESTINATION_STATUS = "destination.removed";
public static final String EDGE_FIELD_VIA_STATUS = "viaRemoved";
public static final String EDGE_FIELD_LIFECYCLE_OWNER_STATUS = "lifecycleOwnerRemoved";

public static final List<Pair<String, SortOrder>> KEY_SORTS =
ImmutableList.of(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package com.linkedin.metadata.aspect.models.graph;

public enum EdgeUrnType {
SOURCE,
DESTINATION,
VIA,
LIFECYCLE_OWNER
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@
import java.util.Map;
import java.util.Objects;
import javax.annotation.Nonnull;
import lombok.EqualsAndHashCode;

/** Class representing an authenticated actor accessing DataHub. */
@EqualsAndHashCode
public class Authentication {

private final Actor authenticatedActor;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import com.linkedin.common.urn.UrnUtils;
import com.linkedin.metadata.query.filter.RelationshipDirection;
import com.linkedin.metadata.search.utils.QueryUtils;
import io.datahubproject.metadata.context.OperationContext;
import java.net.URISyntaxException;
import java.util.List;
import java.util.stream.Collectors;
Expand All @@ -19,10 +20,13 @@
@Slf4j
public class JavaGraphClient implements GraphClient {

GraphService _graphService;
private final OperationContext systemOpContext;
private final GraphService graphService;

public JavaGraphClient(@Nonnull GraphService graphService) {
this._graphService = graphService;
public JavaGraphClient(
@Nonnull OperationContext systemOpContext, @Nonnull GraphService graphService) {
this.systemOpContext = systemOpContext;
this.graphService = graphService;
}

/**
Expand All @@ -43,7 +47,8 @@ public EntityRelationships getRelatedEntities(
count = count == null ? DEFAULT_PAGE_SIZE : count;

RelatedEntitiesResult relatedEntitiesResult =
_graphService.findRelatedEntities(
graphService.findRelatedEntities(
systemOpContext,
null,
QueryUtils.newFilter("urn", rawUrn),
null,
Expand Down Expand Up @@ -91,7 +96,8 @@ public EntityLineageResult getLineageEntities(
@Nullable Integer count,
int maxHops,
String actor) {
return _graphService.getLineage(
return graphService.getLineage(
systemOpContext,
UrnUtils.getUrn(rawUrn),
direction,
start != null ? start : 0,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,13 +64,7 @@ public EntityLineageResult getLineage(
if (separateSiblings) {
return ValidationUtils.validateEntityLineageResult(
opContext,
_graphService.getLineage(
entityUrn,
direction,
offset,
count,
maxHops,
opContext.getSearchContext().getLineageFlags()),
_graphService.getLineage(opContext, entityUrn, direction, offset, count, maxHops),
_entityService);
}

Expand All @@ -81,13 +75,7 @@ public EntityLineageResult getLineage(
}

EntityLineageResult entityLineage =
_graphService.getLineage(
entityUrn,
direction,
offset,
count,
maxHops,
opContext.getSearchContext().getLineageFlags());
_graphService.getLineage(opContext, entityUrn, direction, offset, count, maxHops);

Siblings siblingAspectOfEntity =
(Siblings) _entityService.getLatestAspect(opContext, entityUrn, SIBLINGS_ASPECT_NAME);
Expand Down
Loading

0 comments on commit a481ea4

Please sign in to comment.