Skip to content

Commit

Permalink
fix(graph-edge): fix graph edge delete exception (datahub-project#12025)
Browse files Browse the repository at this point in the history
  • Loading branch information
david-leifker authored Dec 5, 2024
1 parent 3c388a5 commit 3f3f777
Show file tree
Hide file tree
Showing 5 changed files with 268 additions and 39 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/build-and-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ jobs:
path: |
~/.cache/uv
key: ${{ runner.os }}-uv-${{ hashFiles('**/requirements.txt') }}
- name: Install dependencies
run: ./metadata-ingestion/scripts/install_deps.sh
- name: Set up JDK 17
uses: actions/setup-java@v4
with:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
import java.util.Optional;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

/** Extracts fields from a RecordTemplate based on the appropriate {@link FieldSpec}. */
public class FieldExtractor {
Expand All @@ -30,15 +30,34 @@ private static long getNumArrayWildcards(PathSpec pathSpec) {

// Extract the value of each field in the field specs from the input record
public static <T extends FieldSpec> Map<T, List<Object>> extractFields(
@Nonnull RecordTemplate record, List<T> fieldSpecs) {
return extractFields(record, fieldSpecs, MAX_VALUE_LENGTH);
@Nullable RecordTemplate record, List<T> fieldSpecs) {
return extractFields(record, fieldSpecs, false);
}

public static <T extends FieldSpec> Map<T, List<Object>> extractFields(
@Nonnull RecordTemplate record, List<T> fieldSpecs, int maxValueLength) {
@Nullable RecordTemplate record, List<T> fieldSpecs, boolean requiredFieldExtract) {
return extractFields(record, fieldSpecs, MAX_VALUE_LENGTH, requiredFieldExtract);
}

public static <T extends FieldSpec> Map<T, List<Object>> extractFields(
@Nullable RecordTemplate record, List<T> fieldSpecs, int maxValueLength) {
return extractFields(record, fieldSpecs, maxValueLength, false);
}

public static <T extends FieldSpec> Map<T, List<Object>> extractFields(
@Nullable RecordTemplate record,
List<T> fieldSpecs,
int maxValueLength,
boolean requiredFieldExtract) {
final Map<T, List<Object>> extractedFields = new HashMap<>();
for (T fieldSpec : fieldSpecs) {
Optional<Object> value = RecordUtils.getFieldValue(record, fieldSpec.getPath());
if (requiredFieldExtract && record == null) {
throw new IllegalArgumentException(
"Field extraction is required and the RecordTemplate is null");
}
Optional<Object> value =
Optional.ofNullable(record)
.flatMap(maybeRecord -> RecordUtils.getFieldValue(maybeRecord, fieldSpec.getPath()));
if (!value.isPresent()) {
extractedFields.put(fieldSpec, Collections.emptyList());
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,10 @@ private void handleDeleteChangeEvent(
urn.getEntityType(), event.getAspectName()));
}

RecordTemplate aspect = event.getRecordTemplate();
final RecordTemplate aspect =
event.getPreviousRecordTemplate() != null
? event.getPreviousRecordTemplate()
: event.getRecordTemplate();
Boolean isDeletingKey = event.getAspectName().equals(entitySpec.getKeyAspectName());

if (!aspectSpec.isTimeseries()) {
Expand Down Expand Up @@ -280,45 +283,37 @@ private Pair<List<Edge>, HashMap<Urn, Set<String>>> getEdgesAndRelationshipTypes
@Nonnull final RecordTemplate aspect,
@Nonnull final MetadataChangeLog event,
final boolean isNewAspectVersion) {
final List<Edge> edgesToAdd = new ArrayList<>();
final HashMap<Urn, Set<String>> urnToRelationshipTypesBeingAdded = new HashMap<>();
final List<Edge> edges = new ArrayList<>();
final HashMap<Urn, Set<String>> urnToRelationshipTypes = new HashMap<>();

// we need to manually set schemaField <-> schemaField edges for fineGrainedLineage and
// inputFields
// since @Relationship only links between the parent entity urn and something else.
if (aspectSpec.getName().equals(Constants.UPSTREAM_LINEAGE_ASPECT_NAME)) {
UpstreamLineage upstreamLineage = new UpstreamLineage(aspect.data());
updateFineGrainedEdgesAndRelationships(
urn,
upstreamLineage.getFineGrainedLineages(),
edgesToAdd,
urnToRelationshipTypesBeingAdded);
urn, upstreamLineage.getFineGrainedLineages(), edges, urnToRelationshipTypes);
} else if (aspectSpec.getName().equals(Constants.INPUT_FIELDS_ASPECT_NAME)) {
final InputFields inputFields = new InputFields(aspect.data());
updateInputFieldEdgesAndRelationships(
urn, inputFields, edgesToAdd, urnToRelationshipTypesBeingAdded);
updateInputFieldEdgesAndRelationships(urn, inputFields, edges, urnToRelationshipTypes);
} else if (aspectSpec.getName().equals(Constants.DATA_JOB_INPUT_OUTPUT_ASPECT_NAME)) {
DataJobInputOutput dataJobInputOutput = new DataJobInputOutput(aspect.data());
updateFineGrainedEdgesAndRelationships(
urn,
dataJobInputOutput.getFineGrainedLineages(),
edgesToAdd,
urnToRelationshipTypesBeingAdded);
urn, dataJobInputOutput.getFineGrainedLineages(), edges, urnToRelationshipTypes);
}

Map<RelationshipFieldSpec, List<Object>> extractedFields =
FieldExtractor.extractFields(aspect, aspectSpec.getRelationshipFieldSpecs());
FieldExtractor.extractFields(aspect, aspectSpec.getRelationshipFieldSpecs(), true);

for (Map.Entry<RelationshipFieldSpec, List<Object>> entry : extractedFields.entrySet()) {
Set<String> relationshipTypes =
urnToRelationshipTypesBeingAdded.getOrDefault(urn, new HashSet<>());
Set<String> relationshipTypes = urnToRelationshipTypes.getOrDefault(urn, new HashSet<>());
relationshipTypes.add(entry.getKey().getRelationshipName());
urnToRelationshipTypesBeingAdded.put(urn, relationshipTypes);
urnToRelationshipTypes.put(urn, relationshipTypes);
final List<Edge> newEdges =
GraphIndexUtils.extractGraphEdges(entry, aspect, urn, event, isNewAspectVersion);
edgesToAdd.addAll(newEdges);
edges.addAll(newEdges);
}
return Pair.of(edgesToAdd, urnToRelationshipTypesBeingAdded);
return Pair.of(edges, urnToRelationshipTypes);
}

/** Process snapshot and update graph index */
Expand Down Expand Up @@ -433,29 +428,36 @@ private void deleteGraphData(
@Nonnull final OperationContext opContext,
@Nonnull final Urn urn,
@Nonnull final AspectSpec aspectSpec,
@Nonnull final RecordTemplate aspect,
@Nullable final RecordTemplate aspect,
@Nonnull final Boolean isKeyAspect,
@Nonnull final MetadataChangeLog event) {
if (isKeyAspect) {
graphService.removeNode(opContext, urn);
return;
}

Pair<List<Edge>, HashMap<Urn, Set<String>>> edgeAndRelationTypes =
getEdgesAndRelationshipTypesFromAspect(urn, aspectSpec, aspect, event, true);

final HashMap<Urn, Set<String>> urnToRelationshipTypesBeingAdded =
edgeAndRelationTypes.getSecond();
if (!urnToRelationshipTypesBeingAdded.isEmpty()) {
for (Map.Entry<Urn, Set<String>> entry : urnToRelationshipTypesBeingAdded.entrySet()) {
graphService.removeEdgesFromNode(
opContext,
entry.getKey(),
new ArrayList<>(entry.getValue()),
createRelationshipFilter(
new Filter().setOr(new ConjunctiveCriterionArray()),
RelationshipDirection.OUTGOING));
if (aspect != null) {
Pair<List<Edge>, HashMap<Urn, Set<String>>> edgeAndRelationTypes =
getEdgesAndRelationshipTypesFromAspect(urn, aspectSpec, aspect, event, true);

final HashMap<Urn, Set<String>> urnToRelationshipTypesBeingRemoved =
edgeAndRelationTypes.getSecond();
if (!urnToRelationshipTypesBeingRemoved.isEmpty()) {
for (Map.Entry<Urn, Set<String>> entry : urnToRelationshipTypesBeingRemoved.entrySet()) {
graphService.removeEdgesFromNode(
opContext,
entry.getKey(),
new ArrayList<>(entry.getValue()),
createRelationshipFilter(
new Filter().setOr(new ConjunctiveCriterionArray()),
RelationshipDirection.OUTGOING));
}
}
} else {
log.warn(
"Insufficient information to perform graph delete. Missing deleted aspect {} for entity {}",
aspectSpec.getName(),
urn);
}
}
}
Original file line number Diff line number Diff line change
@@ -1,16 +1,23 @@
package com.linkedin.metadata.service;

import static com.linkedin.metadata.Constants.CONTAINER_ENTITY_NAME;
import static com.linkedin.metadata.search.utils.QueryUtils.createRelationshipFilter;
import static com.linkedin.metadata.utils.CriterionUtils.buildCriterion;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.nullable;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;
import static org.testng.Assert.assertEquals;

import com.google.common.collect.ImmutableList;
import com.linkedin.common.Status;
import com.linkedin.common.urn.Urn;
import com.linkedin.common.urn.UrnUtils;
import com.linkedin.container.Container;
import com.linkedin.dataset.DatasetProperties;
import com.linkedin.events.metadata.ChangeType;
import com.linkedin.metadata.Constants;
Expand All @@ -21,6 +28,14 @@
import com.linkedin.metadata.graph.elastic.ElasticSearchGraphService;
import com.linkedin.metadata.models.registry.EntityRegistry;
import com.linkedin.metadata.models.registry.LineageRegistry;
import com.linkedin.metadata.query.filter.Condition;
import com.linkedin.metadata.query.filter.ConjunctiveCriterion;
import com.linkedin.metadata.query.filter.ConjunctiveCriterionArray;
import com.linkedin.metadata.query.filter.Criterion;
import com.linkedin.metadata.query.filter.CriterionArray;
import com.linkedin.metadata.query.filter.Filter;
import com.linkedin.metadata.query.filter.RelationshipDirection;
import com.linkedin.metadata.query.filter.RelationshipFilter;
import com.linkedin.metadata.search.elasticsearch.indexbuilder.ESIndexBuilder;
import com.linkedin.metadata.search.elasticsearch.update.ESBulkProcessor;
import com.linkedin.metadata.utils.GenericRecordUtils;
Expand All @@ -29,6 +44,8 @@
import com.linkedin.mxe.MetadataChangeLog;
import io.datahubproject.metadata.context.OperationContext;
import io.datahubproject.test.metadata.context.TestOperationContexts;
import java.util.List;
import javax.annotation.Nonnull;
import org.mockito.ArgumentCaptor;
import org.opensearch.index.query.QueryBuilder;
import org.opensearch.script.Script;
Expand Down Expand Up @@ -180,4 +197,109 @@ public void testStatusNoOpEvent() {

verifyNoInteractions(mockWriteDAO);
}

@Test
public void testMissingAspectGraphDelete() {
// Test deleting a null aspect
test.handleChangeEvent(
TEST_OP_CONTEXT,
new MetadataChangeLog()
.setChangeType(ChangeType.DELETE)
.setEntityType(TEST_URN.getEntityType())
.setEntityUrn(TEST_URN)
.setAspectName(Constants.CONTAINER_ASPECT_NAME));

// For missing aspects, verify no writes
verifyNoInteractions(mockWriteDAO);
}

@Test
public void testNodeGraphDelete() {
Urn containerUrn = UrnUtils.getUrn("urn:li:container:foo");

// Test deleting container entity
test.handleChangeEvent(
TEST_OP_CONTEXT,
new MetadataChangeLog()
.setChangeType(ChangeType.DELETE)
.setEntityType(CONTAINER_ENTITY_NAME)
.setEntityUrn(containerUrn)
.setAspectName(Constants.CONTAINER_KEY_ASPECT_NAME));

// Delete all outgoing edges of this entity
verify(mockWriteDAO, times(1))
.deleteByQuery(
eq(TEST_OP_CONTEXT),
nullable(String.class),
eq(createUrnFilter(containerUrn)),
nullable(String.class),
eq(new Filter().setOr(new ConjunctiveCriterionArray())),
eq(List.of()),
eq(new RelationshipFilter().setDirection(RelationshipDirection.OUTGOING)));

// Delete all incoming edges of this entity
verify(mockWriteDAO, times(1))
.deleteByQuery(
eq(TEST_OP_CONTEXT),
nullable(String.class),
eq(createUrnFilter(containerUrn)),
nullable(String.class),
eq(new Filter().setOr(new ConjunctiveCriterionArray())),
eq(List.of()),
eq(new RelationshipFilter().setDirection(RelationshipDirection.INCOMING)));

// Delete all edges where this entity is a lifecycle owner
verify(mockWriteDAO, times(1))
.deleteByQuery(
eq(TEST_OP_CONTEXT),
nullable(String.class),
eq(new Filter().setOr(new ConjunctiveCriterionArray())),
nullable(String.class),
eq(new Filter().setOr(new ConjunctiveCriterionArray())),
eq(List.of()),
eq(new RelationshipFilter().setDirection(RelationshipDirection.INCOMING)),
eq(containerUrn.toString()));
}

@Test
public void testContainerDelete() {
Urn containerUrn = UrnUtils.getUrn("urn:li:container:foo");

// Test deleting a container aspect
test.handleChangeEvent(
TEST_OP_CONTEXT,
new MetadataChangeLog()
.setChangeType(ChangeType.DELETE)
.setEntityType(TEST_URN.getEntityType())
.setEntityUrn(TEST_URN)
.setAspectName(Constants.CONTAINER_ASPECT_NAME)
.setPreviousAspectValue(
GenericRecordUtils.serializeAspect(new Container().setContainer(containerUrn))));

// For container aspects, verify that only edges are removed in both cases
verify(mockWriteDAO, times(1))
.deleteByQuery(
eq(TEST_OP_CONTEXT),
nullable(String.class),
eq(createUrnFilter(TEST_URN)),
nullable(String.class),
eq(new Filter().setOr(new ConjunctiveCriterionArray())),
eq(List.of("IsPartOf")),
eq(
createRelationshipFilter(
new Filter().setOr(new ConjunctiveCriterionArray()),
RelationshipDirection.OUTGOING)));
}

private static Filter createUrnFilter(@Nonnull final Urn urn) {
Filter filter = new Filter();
CriterionArray criterionArray = new CriterionArray();
Criterion criterion = buildCriterion("urn", Condition.EQUAL, urn.toString());
criterionArray.add(criterion);
filter.setOr(
new ConjunctiveCriterionArray(
ImmutableList.of(new ConjunctiveCriterion().setAnd(criterionArray))));

return filter;
}
}
Loading

0 comments on commit 3f3f777

Please sign in to comment.