Skip to content

Commit

Permalink
implement dual write in BaseAspectRoutingResource (#420)
Browse files Browse the repository at this point in the history
* implement dual write in BaseAspectRoutingResource

* fix docs for a unit test

* address comments: refactor addSimple -> addSkipPreIngestionUpdates

* rebase and add info logs for pre update lambda

* update unit test
  • Loading branch information
jsdonn authored Sep 16, 2024
1 parent 0307e9b commit af0995a
Show file tree
Hide file tree
Showing 5 changed files with 84 additions and 18 deletions.
16 changes: 14 additions & 2 deletions dao-api/src/main/java/com/linkedin/metadata/dao/BaseLocalDAO.java
Original file line number Diff line number Diff line change
Expand Up @@ -852,11 +852,23 @@ public <ASPECT extends RecordTemplate> ASPECT add(@Nonnull URN urn, @Nonnull ASP
public <ASPECT extends RecordTemplate> ASPECT add(@Nonnull URN urn, @Nonnull ASPECT newValue,
@Nonnull AuditStamp auditStamp, @Nullable IngestionTrackingContext trackingContext,
@Nullable IngestionParams ingestionParams) {
ASPECT updatedAspect = preUpdateRouting(urn, newValue);
return addSkipPreIngestionUpdates(urn, updatedAspect, auditStamp, trackingContext, ingestionParams);
}

/**
* Same as above {@link #add(Urn, RecordTemplate, AuditStamp, IngestionTrackingContext, IngestionParams)} but
* skips any pre-update lambdas. DO NOT USE THIS METHOD WITHOUT EXPLICIT PERMISSION FROM THE METADATA GRAPH TEAM.
* Please use the regular add method linked above.
*/
@Nonnull
public <ASPECT extends RecordTemplate> ASPECT addSkipPreIngestionUpdates(@Nonnull URN urn, @Nonnull ASPECT newValue,
@Nonnull AuditStamp auditStamp, @Nullable IngestionTrackingContext trackingContext,
@Nullable IngestionParams ingestionParams) {
final IngestionParams nonNullIngestionParams =
ingestionParams == null || !ingestionParams.hasTestMode() ? new IngestionParams().setIngestionMode(
IngestionMode.LIVE).setTestMode(false) : ingestionParams;
ASPECT updatedAspect = preUpdateRouting(urn, newValue);
return add(urn, (Class<ASPECT>) newValue.getClass(), ignored -> updatedAspect, auditStamp, trackingContext, nonNullIngestionParams);
return add(urn, (Class<ASPECT>) newValue.getClass(), ignored -> newValue, auditStamp, trackingContext, nonNullIngestionParams);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@

/**
* Extends {@link BaseBrowsableEntityResource} with aspect routing capability.
* For certain aspect of an entity, incoming request will be routed to different GMS.
* For certain aspects of an entity, incoming requests will be routed to a different GMS in addition to being
* written locally (i.e. dual write).
* See http://go/aspect-routing for more details
*/
@Slf4j
Expand Down Expand Up @@ -314,6 +315,7 @@ protected Task<Void> ingestInternal(@Nonnull SNAPSHOT snapshot,
// get the updated aspect if there is a preupdate routing lambda registered
RestliPreUpdateAspectRegistry registry = getLocalDAO().getRestliPreUpdateAspectRegistry();
if (registry != null && registry.isRegistered(aspect.getClass())) {
log.info(String.format("Executing registered pre-update routing lambda for aspect class %s.", aspect.getClass()));
aspect = preUpdateRouting(urn, aspect, registry);
log.info("PreUpdateRouting completed in ingestInternal, urn: {}, updated aspect: {}", urn, aspect);
// Get the fqcn of the aspect class
Expand All @@ -329,6 +331,9 @@ protected Task<Void> ingestInternal(@Nonnull SNAPSHOT snapshot,
} else {
getAspectRoutingGmsClientManager().getRoutingGmsClient(aspect.getClass()).ingest(urn, aspect);
}
// since we already called any pre-update lambdas earlier, call a simple version of BaseLocalDAO::add
// which skips pre-update lambdas.
getLocalDAO().addSkipPreIngestionUpdates(urn, aspect, auditStamp, trackingContext, ingestionParams);
} catch (Exception exception) {
log.error(
String.format("Couldn't ingest routing aspect %s for %s", aspect.getClass().getSimpleName(), urn),
Expand Down Expand Up @@ -361,6 +366,7 @@ protected Task<Void> ingestInternalAsset(@Nonnull ASSET asset,
// get the updated aspect if there is a preupdate routing lambda registered
RestliPreUpdateAspectRegistry registry = getLocalDAO().getRestliPreUpdateAspectRegistry();
if (registry != null && registry.isRegistered(aspect.getClass())) {
log.info(String.format("Executing registered pre-update routing lambda for aspect class %s.", aspect.getClass()));
aspect = preUpdateRouting(urn, aspect, registry);
log.info("PreUpdateRouting completed in ingestInternalAsset, urn: {}, updated aspect: {}", urn, aspect);
// Get the fqcn of the aspect class
Expand All @@ -377,6 +383,9 @@ protected Task<Void> ingestInternalAsset(@Nonnull ASSET asset,
} else {
getAspectRoutingGmsClientManager().getRoutingGmsClient(aspect.getClass()).ingest(urn, aspect);
}
// since we already called any pre-update lambdas earlier, call a simple version of BaseLocalDAO::add
// which skips pre-update lambdas.
getLocalDAO().addSkipPreIngestionUpdates(urn, aspect, auditStamp, trackingContext, ingestionParams);
} catch (Exception exception) {
log.error("Couldn't ingest routing aspect {} for {}", aspect.getClass().getSimpleName(), urn, exception);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,7 @@ public void testIngestWithRoutingAspect() {
verify(_mockAspectFooGmsClient, times(1)).ingest(eq(urn), eq(foo));
verify(_mockAspectAttributeGmsClient, times(1)).ingest(eq(urn), eq(attributes));
verify(_mockLocalDAO, times(2)).getRestliPreUpdateAspectRegistry();
verifyNoMoreInteractions(_mockLocalDAO);
verify(_mockLocalDAO, times(1)).addSkipPreIngestionUpdates(eq(urn), eq(foo), any(), any(), any());
}

@Test
Expand All @@ -326,7 +326,7 @@ public void testIngestWithTrackingWithRoutingAspect() {
verify(_mockAspectFooGmsClient, times(1)).ingestWithTracking(eq(urn), eq(foo), eq(trackingContext), eq(null));
verify(_mockAspectAttributeGmsClient, times(1)).ingestWithTracking(eq(urn), eq(attributes), eq(trackingContext), eq(null));
verify(_mockLocalDAO, times(2)).getRestliPreUpdateAspectRegistry();
verifyNoMoreInteractions(_mockLocalDAO);
verify(_mockLocalDAO, times(1)).addSkipPreIngestionUpdates(eq(urn), eq(foo), any(), any(), any());
}

@Test
Expand Down Expand Up @@ -354,6 +354,7 @@ public void testIngestWithOnlyRoutingAspect() {
runAndWait(_resource.ingest(snapshot));

verify(_mockLocalDAO, times(2)).getRestliPreUpdateAspectRegistry();
verify(_mockLocalDAO, times(1)).addSkipPreIngestionUpdates(eq(urn), eq(foo), any(), any(), any());
// verify(_mockGmsClient, times(1)).ingest(eq(urn), eq(foo));
verify(_mockAspectFooGmsClient, times(1)).ingest(eq(urn), eq(foo));
verify(_mockAspectAttributeGmsClient, times(1)).ingest(eq(urn), eq(attributes));
Expand Down Expand Up @@ -554,32 +555,76 @@ public void testBackfillWithNewValue() {
public void testPreUpdateRoutingWithRegisteredAspect() {
FooUrn urn = makeFooUrn(1);
AspectFoo foo = new AspectFoo().setValue("foo");
AspectFoo bar = new AspectFoo().setValue("bar");

List<EntityAspectUnion> aspects = Arrays.asList(ModelUtils.newAspectUnion(EntityAspectUnion.class, foo));
EntitySnapshot snapshot = ModelUtils.newSnapshot(EntitySnapshot.class, urn, aspects);

SamplePreUpdateAspectRegistryImpl registry = new SamplePreUpdateAspectRegistryImpl();
when(_mockLocalDAO.getRestliPreUpdateAspectRegistry()).thenReturn(registry);

// given: ingest a snapshot containing a routed aspect which has a registered pre-update lambda.
runAndWait(_resource.ingest(snapshot));
verify(_mockAspectFooGmsClient, times(1)).ingest(eq(urn), eq(bar));

verify(_mockLocalDAO, times(1)).getRestliPreUpdateAspectRegistry();
// expected: the pre-update lambda is executed first (aspect value is changed from foo to foobar) and then the aspect is dual-written.
AspectFoo foobar = new AspectFoo().setValue("foobar");
// dual write pt1: ensure the ingestion request is forwarded to the routed GMS.
verify(_mockAspectFooGmsClient, times(1)).ingest(eq(urn), eq(foobar));
// dual write pt2: ensure local write using addSkipPreIngestionUpdates() and not add().
verify(_mockLocalDAO, times(0)).add(any(), any(), any(), any(), any());
verifyNoMoreInteractions(_mockLocalDAO);
verify(_mockLocalDAO, times(1)).addSkipPreIngestionUpdates(eq(urn), eq(foobar), any(), any(), any());
}

@Test
public void testPreUpdateRoutingWithNonRegisteredPreUpdateAspect() {
FooUrn urn = makeFooUrn(1);
AspectFoo foo = new AspectFoo().setValue("foo");

List<EntityAspectUnion> aspects = Arrays.asList(ModelUtils.newAspectUnion(EntityAspectUnion.class, foo));
EntitySnapshot snapshot = ModelUtils.newSnapshot(EntitySnapshot.class, urn, aspects);

// given: ingest a snapshot containing a routed aspect which does not have a registered pre-update lambda.
runAndWait(_resource.ingest(snapshot));

// expected: the aspect value remains unchanged and the aspect is dual-written.
// dual write pt1: ensure the ingestion request is forwarded to the routed GMS.
verify(_mockAspectFooGmsClient, times(1)).ingest(eq(urn), eq(foo));
// dual write pt2: ensure local write using addSkipPreIngestionUpdates() and not add().
verify(_mockLocalDAO, times(0)).add(any(), any(), any(), any(), any());
verify(_mockLocalDAO, times(1)).addSkipPreIngestionUpdates(eq(urn), eq(foo), any(), any(), any());
}

@Test
public void testPreUpdateRoutingWithNonRoutedAspectAndRegisteredPreUpdate() {
FooUrn urn = makeFooUrn(1);
AspectBar bar = new AspectBar().setValue("bar");
List<EntityAspectUnion> aspects = Arrays.asList(ModelUtils.newAspectUnion(EntityAspectUnion.class, bar));
EntitySnapshot snapshot = ModelUtils.newSnapshot(EntitySnapshot.class, urn, aspects);

SamplePreUpdateAspectRegistryImpl registry = new SamplePreUpdateAspectRegistryImpl();
when(_mockLocalDAO.getRestliPreUpdateAspectRegistry()).thenReturn(registry);

// given: ingest a snapshot which contains a non-routed aspect which has a registered pre-update lambda.
runAndWait(_resource.ingest(snapshot));

// expected: the aspect is ingested locally only (not dual written). BaseLocalDAO::add will execute any pre-ingestion
// lambdas which will change the aspect value from bar -> foobar. But no pre-ingestion lambdas are run from within
// the BaseAspectRoutingResource class.
verify(_mockAspectBarGmsClient, times(0)).ingest(any(), any());
verify(_mockLocalDAO, times(1)).add(eq(urn), eq(bar), any(), eq(null), eq(null));
verifyNoMoreInteractions(_mockLocalDAO);
}

@Test
public void testPreUpdateRoutingWithNonRoutedAspectAndNonRegisteredPreUpdate() {
FooUrn urn = makeFooUrn(1);
AspectBar bar = new AspectBar().setValue("bar");
List<EntityAspectUnion> aspects = Arrays.asList(ModelUtils.newAspectUnion(EntityAspectUnion.class, bar));
EntitySnapshot snapshot = ModelUtils.newSnapshot(EntitySnapshot.class, urn, aspects);

// given: ingest a snapshot which contains a non-routed aspect which does not have any registered pre-update lambdas.
runAndWait(_resource.ingest(snapshot));

// expected: the aspect value remains unchanged and the aspect is ingested locally only (not dual written).
verify(_mockAspectBarGmsClient, times(0)).ingest(any(), any());
verify(_mockLocalDAO, times(1)).add(eq(urn), eq(bar), any(), eq(null), eq(null));
verifyNoMoreInteractions(_mockLocalDAO);
Expand Down Expand Up @@ -608,14 +653,13 @@ public void testPreUpdateRoutingWithSkipIngestion() throws NoSuchFieldException,
runAndWait(_resource.ingest(snapshot));
verify(_mockAspectFooGmsClient, times(0)).ingest(any(), any());
verify(_mockLocalDAO, times(1)).getRestliPreUpdateAspectRegistry();
verify(_mockLocalDAO, times(0)).add(any(), any(), any(), any(), any());
// Should not add to local DAO
verifyNoMoreInteractions(_mockLocalDAO);
}

//Testing the case when aspect has no pre lambda but skipIngestion contains the aspect, so it should not skip ingestion
@Test
public void testPreUpdateRoutingWithSkipIngestionNoPreLambda() throws NoSuchFieldException, IllegalAccessException {

Field skipIngestionField = BaseAspectRoutingResource.class.getDeclaredField("SKIP_INGESTION_FOR_ASPECTS");
skipIngestionField.setAccessible(true);
Field modifiersField = Field.class.getDeclaredField("modifiers");
Expand All @@ -634,9 +678,8 @@ public void testPreUpdateRoutingWithSkipIngestionNoPreLambda() throws NoSuchFiel
verify(_mockAspectFooGmsClient, times(1)).ingest(eq(urn), eq(foo));
// Should check for pre lambda
verify(_mockLocalDAO, times(1)).getRestliPreUpdateAspectRegistry();
// Should not add to localDAO
verify(_mockLocalDAO, times(0)).add(any(), any(), any(), any(), any());
// Should continue to dual-write into local DAO
verify(_mockLocalDAO, times(1)).addSkipPreIngestionUpdates(eq(urn), eq(foo), any(), any(), any());
verifyNoMoreInteractions(_mockLocalDAO);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.linkedin.data.template.RecordTemplate;
import com.linkedin.metadata.dao.ingestion.RestliCompliantPreUpdateRoutingClient;
import com.linkedin.metadata.dao.ingestion.RestliPreUpdateAspectRegistry;
import com.linkedin.testing.AspectBar;
import com.linkedin.testing.AspectFoo;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
Expand All @@ -15,6 +16,7 @@ public class SamplePreUpdateAspectRegistryImpl implements RestliPreUpdateAspectR
public SamplePreUpdateAspectRegistryImpl() {
registry = new ImmutableMap.Builder<Class<? extends RecordTemplate>, RestliCompliantPreUpdateRoutingClient>()
.put(AspectFoo.class, new SamplePreUpdateRoutingClient())
.put(AspectBar.class, new SamplePreUpdateRoutingClient())
.build();
}
@Nullable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@
public class SamplePreUpdateRoutingClient implements RestliCompliantPreUpdateRoutingClient {
@Override
public Message routingLambda(Message urn, Message aspect) {
// For testing, change the aspect value to "bar"
return Any.pack(StringValue.of("bar"));
// For testing, change the aspect value to "foobar"
return Any.pack(StringValue.of("foobar"));
}

@Override
Expand All @@ -35,7 +35,7 @@ public Message convertAspectToMessage(RecordTemplate pegasusAspect) {
@Override
public RecordTemplate convertAspectToRecordTemplate(Message messageAspect) {
// For testing, convert TestMessageProtos.AspectMessage back to AspectFoo
// Create a new RecordTemplate (AspectFoo in this case) and set the value field
return new AspectFoo().setValue("bar");
// Create a new RecordTemplate (AspectFoo in this case) and set the value field to foobar
return new AspectFoo().setValue("foobar");
}
}

0 comments on commit af0995a

Please sign in to comment.