Skip to content

Commit

Permalink
Fixed RestliPreUpdateAspectRegistry ingestion bug (#422)
Browse files Browse the repository at this point in the history
* Fixed registry bug

* Rename ingestion to update

* Fixed name of conversion method

* Typo

* typo2

* Addressed comments

---------

Co-authored-by: Rakhi Agrawal <[email protected]>
  • Loading branch information
rakhiagr and Rakhi Agrawal authored Sep 12, 2024
1 parent b6b2c88 commit 7c07e1e
Show file tree
Hide file tree
Showing 7 changed files with 59 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -402,11 +402,18 @@ public void setLambdaFunctionRegistry(@Nullable LambdaFunctionRegistry lambdaFun
/**
* Set pre ingestion aspect registry.
*/
public void setRestliPreIngestionAspectRegistry(
public void setRestliPreUpdateAspectRegistry(
@Nullable RestliPreUpdateAspectRegistry restliPreUpdateAspectRegistry) {
_restliPreUpdateAspectRegistry = restliPreUpdateAspectRegistry;
}

/**
* Get pre ingestion aspect registry.
*/
public RestliPreUpdateAspectRegistry getRestliPreUpdateAspectRegistry() {
return _restliPreUpdateAspectRegistry;
}


/**
* Enables or disables atomic updates of multiple aspects.
Expand Down Expand Up @@ -1649,7 +1656,7 @@ protected <ASPECT extends RecordTemplate> ASPECT preUpdateRouting(URN urn, ASPEC
_restliPreUpdateAspectRegistry.getPreUpdateRoutingClient(newValue);
Message updatedAspect =
client.routingLambda(client.convertUrnToMessage(urn), client.convertAspectToMessage(newValue));
RecordTemplate convertedAspect = client.convertAspectFromMessage(updatedAspect);
RecordTemplate convertedAspect = client.convertAspectToRecordTemplate(updatedAspect);
return (ASPECT) convertedAspect;
}
return newValue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,5 +36,5 @@ public interface RestliCompliantPreUpdateRoutingClient<ASPECT extends Message> e
* @param messageAspect the Protobuf message aspect to be converted
* @return the converted {@link RecordTemplate} aspect
*/
RecordTemplate convertAspectFromMessage(ASPECT messageAspect);
RecordTemplate convertAspectToRecordTemplate(ASPECT messageAspect);
}
Original file line number Diff line number Diff line change
Expand Up @@ -658,7 +658,7 @@ public void testPreUpdateRoutingFromFooToBar() throws URISyntaxException {
FooUrn urn = new FooUrn(1);
AspectFoo foo = new AspectFoo().setValue("foo");
AspectFoo bar = new AspectFoo().setValue("bar");
_dummyLocalDAO.setRestliPreIngestionAspectRegistry(new SamplePreUpdateAspectRegistryImpl());
_dummyLocalDAO.setRestliPreUpdateAspectRegistry(new SamplePreUpdateAspectRegistryImpl());
AspectFoo result = _dummyLocalDAO.preUpdateRouting(urn, foo);
assertEquals(result, bar);
}
Expand All @@ -669,7 +669,7 @@ public void testMAEEmissionForPreUpdateRouting() throws URISyntaxException {
AspectFoo foo = new AspectFoo().setValue("foo");
AspectFoo bar = new AspectFoo().setValue("bar");
_dummyLocalDAO.setAlwaysEmitAuditEvent(true);
_dummyLocalDAO.setRestliPreIngestionAspectRegistry(new SamplePreUpdateAspectRegistryImpl());
_dummyLocalDAO.setRestliPreUpdateAspectRegistry(new SamplePreUpdateAspectRegistryImpl());
expectGetLatest(urn, AspectFoo.class,
Arrays.asList(makeAspectEntry(null, null), makeAspectEntry(foo, _dummyAuditStamp)));

Expand All @@ -687,7 +687,7 @@ public void testPreUpdateRoutingWithUnregisteredAspect() throws URISyntaxExcepti
AspectBar foo = new AspectBar().setValue("foo");

// Inject RestliPreIngestionAspectRegistry with no registered aspect
_dummyLocalDAO.setRestliPreIngestionAspectRegistry(new SamplePreUpdateAspectRegistryImpl());
_dummyLocalDAO.setRestliPreUpdateAspectRegistry(new SamplePreUpdateAspectRegistryImpl());

// Call the add method
AspectBar result = _dummyLocalDAO.preUpdateRouting(urn, foo);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public Message convertAspectToMessage(RecordTemplate pegasusAspect) {
}

@Override
public RecordTemplate convertAspectFromMessage(Message messageAspect) {
public RecordTemplate convertAspectToRecordTemplate(Message messageAspect) {
// For testing, convert TestMessageProtos.AspectMessage back to AspectFoo
// Create a new RecordTemplate (AspectFoo in this case) and set the value field
return new AspectFoo().setValue("bar");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,11 +94,6 @@ public BaseAspectRoutingResource(@Nullable Class<SNAPSHOT> snapshotClass,
*/
public abstract AspectRoutingGmsClientManager getAspectRoutingGmsClientManager();

/** Set the restliPreUpdateAspectRegistry. */
public void setRestliPreUpdateAspectRegistry(RestliPreUpdateAspectRegistry restliPreUpdateAspectRegistry) {
_restliPreUpdateAspectRegistry = restliPreUpdateAspectRegistry;
}

/**
* Retrieves the value for an entity that is made up of latest versions of specified aspects.
*/
Expand Down Expand Up @@ -315,18 +310,19 @@ protected Task<Void> ingestInternal(@Nonnull SNAPSHOT snapshot,
final AuditStamp auditStamp = getAuditor().requestAuditStamp(getContext().getRawRequestContext());
ModelUtils.getAspectsFromSnapshot(snapshot).forEach(aspect -> {
if (!aspectsToIgnore.contains(aspect.getClass())) {
if (_restliPreUpdateAspectRegistry != null && _restliPreUpdateAspectRegistry.isRegistered(
aspect.getClass())) {
aspect = preUpdateRouting(urn, aspect);
}
// Get the fqcn of the aspect class
String aspectFQCN = aspect.getClass().getCanonicalName();
//TODO: META-21112: Remove this check after adding annotations at model level; to handle SKIP/PROCEED for preUpdateRouting
if (SKIP_INGESTION_FOR_ASPECTS.contains(aspectFQCN)) {
return;
}
if (getAspectRoutingGmsClientManager().hasRegistered(aspect.getClass())) {
try {
// get the updated aspect if there is a preupdate routing lambda registered
RestliPreUpdateAspectRegistry registry = getLocalDAO().getRestliPreUpdateAspectRegistry();
if (registry != null && registry.isRegistered(aspect.getClass())) {
aspect = preUpdateRouting(urn, aspect, registry);
}
// Get the fqcn of the aspect class
String aspectFQCN = aspect.getClass().getCanonicalName();
//TODO: META-21112: Remove this check after adding annotations at model level; to handle SKIP/PROCEED for preUpdateRouting
if (SKIP_INGESTION_FOR_ASPECTS.contains(aspectFQCN)) {
return;
}
if (trackingContext != null) {
getAspectRoutingGmsClientManager().getRoutingGmsClient(aspect.getClass()).ingestWithTracking(urn, aspect, trackingContext, ingestionParams);
} else {
Expand Down Expand Up @@ -359,18 +355,19 @@ protected Task<Void> ingestInternalAsset(@Nonnull ASSET asset,
ingestionParams != null ? ingestionParams.getIngestionTrackingContext() : null;
ModelUtils.getAspectsFromAsset(asset).forEach(aspect -> {
if (!aspectsToIgnore.contains(aspect.getClass())) {
if (_restliPreUpdateAspectRegistry != null && _restliPreUpdateAspectRegistry.isRegistered(
aspect.getClass())) {
aspect = preUpdateRouting(urn, aspect);
}
// Get the fqcn of the aspect class
String aspectFQCN = aspect.getClass().getCanonicalName();
//TODO: META-21112: Remove this check after adding annotations at model level; to handle SKIP/PROCEED for preUpdateRouting
if (SKIP_INGESTION_FOR_ASPECTS.contains(aspectFQCN)) {
return;
}
if (getAspectRoutingGmsClientManager().hasRegistered(aspect.getClass())) {
try {
// get the updated aspect if there is a preupdate routing lambda registered
RestliPreUpdateAspectRegistry registry = getLocalDAO().getRestliPreUpdateAspectRegistry();
if (registry != null && registry.isRegistered(aspect.getClass())) {
aspect = preUpdateRouting(urn, aspect, registry);
}
// Get the fqcn of the aspect class
String aspectFQCN = aspect.getClass().getCanonicalName();
//TODO: META-21112: Remove this check after adding annotations at model level; to handle SKIP/PROCEED for preUpdateRouting
if (SKIP_INGESTION_FOR_ASPECTS.contains(aspectFQCN)) {
return;
}
if (trackingContext != null) {
getAspectRoutingGmsClientManager().getRoutingGmsClient(aspect.getClass())
.ingestWithTracking(urn, aspect, trackingContext, ingestionParams);
Expand Down Expand Up @@ -622,11 +619,10 @@ private List<? extends RecordTemplate> getValueFromRoutingGms(@Nonnull URN urn,
* @param aspect the new aspect value
* @return the updated aspect
*/
private RecordTemplate preUpdateRouting(URN urn, RecordTemplate aspect) {
RestliCompliantPreUpdateRoutingClient client =
_restliPreUpdateAspectRegistry.getPreUpdateRoutingClient(aspect);
Message updatedAspect =
client.routingLambda(client.convertUrnToMessage(urn), client.convertAspectToMessage(aspect));
return client.convertAspectFromMessage(updatedAspect);
private RecordTemplate preUpdateRouting(URN urn, RecordTemplate aspect, RestliPreUpdateAspectRegistry registry) {
RestliCompliantPreUpdateRoutingClient client = registry.getPreUpdateRoutingClient(aspect);
Message updatedAspect =
client.routingLambda(client.convertUrnToMessage(urn), client.convertAspectToMessage(aspect));
return client.convertAspectToRecordTemplate(updatedAspect);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,7 @@ public void testIngestWithRoutingAspect() {
verify(_mockLocalDAO, times(1)).add(eq(urn), eq(bar), any(), eq(null), eq(null));
verify(_mockAspectFooGmsClient, times(1)).ingest(eq(urn), eq(foo));
verify(_mockAspectAttributeGmsClient, times(1)).ingest(eq(urn), eq(attributes));
verify(_mockLocalDAO, times(2)).getRestliPreUpdateAspectRegistry();
verifyNoMoreInteractions(_mockLocalDAO);
}

Expand All @@ -324,6 +325,7 @@ public void testIngestWithTrackingWithRoutingAspect() {
verify(_mockLocalDAO, times(1)).add(eq(urn), eq(bar), any(), eq(trackingContext), eq(null));
verify(_mockAspectFooGmsClient, times(1)).ingestWithTracking(eq(urn), eq(foo), eq(trackingContext), eq(null));
verify(_mockAspectAttributeGmsClient, times(1)).ingestWithTracking(eq(urn), eq(attributes), eq(trackingContext), eq(null));
verify(_mockLocalDAO, times(2)).getRestliPreUpdateAspectRegistry();
verifyNoMoreInteractions(_mockLocalDAO);
}

Expand All @@ -335,7 +337,6 @@ public void testIngestWithoutRoutingAspect() {
EntitySnapshot snapshot = ModelUtils.newSnapshot(EntitySnapshot.class, urn, aspects);

runAndWait(_resource.ingest(snapshot));

verify(_mockLocalDAO, times(1)).add(eq(urn), eq(bar), any(), eq(null), eq(null));
verifyZeroInteractions(_mockAspectFooGmsClient);
verifyNoMoreInteractions(_mockLocalDAO);
Expand All @@ -352,7 +353,7 @@ public void testIngestWithOnlyRoutingAspect() {

runAndWait(_resource.ingest(snapshot));

verifyZeroInteractions(_mockLocalDAO);
verify(_mockLocalDAO, times(2)).getRestliPreUpdateAspectRegistry();
// verify(_mockGmsClient, times(1)).ingest(eq(urn), eq(foo));
verify(_mockAspectFooGmsClient, times(1)).ingest(eq(urn), eq(foo));
verify(_mockAspectAttributeGmsClient, times(1)).ingest(eq(urn), eq(attributes));
Expand Down Expand Up @@ -557,9 +558,13 @@ public void testPreUpdateRoutingWithRegisteredAspect() {

List<EntityAspectUnion> aspects = Arrays.asList(ModelUtils.newAspectUnion(EntityAspectUnion.class, foo));
EntitySnapshot snapshot = ModelUtils.newSnapshot(EntitySnapshot.class, urn, aspects);
_resource.setRestliPreUpdateAspectRegistry(new SamplePreUpdateAspectRegistryImpl());

SamplePreUpdateAspectRegistryImpl registry = new SamplePreUpdateAspectRegistryImpl();
when(_mockLocalDAO.getRestliPreUpdateAspectRegistry()).thenReturn(registry);

runAndWait(_resource.ingest(snapshot));
verify(_mockAspectFooGmsClient, times(1)).ingest(eq(urn), eq(bar));
verify(_mockLocalDAO, times(1)).getRestliPreUpdateAspectRegistry();
verify(_mockLocalDAO, times(0)).add(any(), any(), any(), any(), any());
verifyNoMoreInteractions(_mockLocalDAO);
}
Expand All @@ -570,7 +575,10 @@ public void testPreUpdateRoutingWithNonRegisteredPreUpdateAspect() {
AspectBar bar = new AspectBar().setValue("bar");
List<EntityAspectUnion> aspects = Arrays.asList(ModelUtils.newAspectUnion(EntityAspectUnion.class, bar));
EntitySnapshot snapshot = ModelUtils.newSnapshot(EntitySnapshot.class, urn, aspects);
_resource.setRestliPreUpdateAspectRegistry(new SamplePreUpdateAspectRegistryImpl());

SamplePreUpdateAspectRegistryImpl registry = new SamplePreUpdateAspectRegistryImpl();
when(_mockLocalDAO.getRestliPreUpdateAspectRegistry()).thenReturn(registry);

runAndWait(_resource.ingest(snapshot));
verify(_mockAspectBarGmsClient, times(0)).ingest(any(), any());
verify(_mockLocalDAO, times(1)).add(eq(urn), eq(bar), any(), eq(null), eq(null));
Expand All @@ -594,10 +602,14 @@ public void testPreUpdateRoutingWithSkipIngestion() throws NoSuchFieldException,
AspectFoo foo = new AspectFoo().setValue("foo");
List<EntityAspectUnion> aspects = Arrays.asList(ModelUtils.newAspectUnion(EntityAspectUnion.class, foo));
EntitySnapshot snapshot = ModelUtils.newSnapshot(EntitySnapshot.class, urn, aspects);
_resource.setRestliPreUpdateAspectRegistry(new SamplePreUpdateAspectRegistryImpl());
SamplePreUpdateAspectRegistryImpl registry = new SamplePreUpdateAspectRegistryImpl();
when(_mockLocalDAO.getRestliPreUpdateAspectRegistry()).thenReturn(registry);

runAndWait(_resource.ingest(snapshot));
verify(_mockAspectFooGmsClient, times(0)).ingest(any(), any());
verify(_mockLocalDAO, times(1)).getRestliPreUpdateAspectRegistry();
verify(_mockLocalDAO, times(0)).add(any(), any(), any(), any(), any());
verifyNoMoreInteractions(_mockLocalDAO);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public Message convertAspectToMessage(RecordTemplate pegasusAspect) {
}

@Override
public RecordTemplate convertAspectFromMessage(Message messageAspect) {
public RecordTemplate convertAspectToRecordTemplate(Message messageAspect) {
// For testing, convert TestMessageProtos.AspectMessage back to AspectFoo
// Create a new RecordTemplate (AspectFoo in this case) and set the value field
return new AspectFoo().setValue("bar");
Expand Down

0 comments on commit 7c07e1e

Please sign in to comment.