Skip to content

Commit

Permalink
Adding PreUpdateRouting to BaseAspectRoutingResource (#413)
Browse files Browse the repository at this point in the history
* Adding PreUpdateRouting to BaseAspect

* Added a comment

* Update BaseAspectRoutingResource.java

* Added unit test

* Fixed return type

* Added protobuf

* Fixed java doc

* Fixed compilation error

* Addressed comments

* Checkstyle error

* Forgot to change at other place

* Using fqcn

* Updated unit tests

---------

Co-authored-by: Rakhi Agrawal <[email protected]>
  • Loading branch information
rakhiagr and Rakhi Agrawal authored Sep 5, 2024
1 parent 27ff500 commit 794996a
Show file tree
Hide file tree
Showing 5 changed files with 173 additions and 1 deletion.
2 changes: 1 addition & 1 deletion restli-resources/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ dependencies {
compile project(':dao-api')
compile spec.product.pegasus.restliServer
compile spec.product.pegasus.restliClient

implementation 'com.google.protobuf:protobuf-java:3.21.1'
compileOnly externalDependency.lombok
annotationProcessor externalDependency.lombokAnnotationProcessor
testCompile project(':core-models-utils')
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
package com.linkedin.metadata.restli;

import com.google.protobuf.Message;
import com.linkedin.common.AuditStamp;
import com.linkedin.common.urn.Urn;
import com.linkedin.data.template.RecordTemplate;
import com.linkedin.data.template.SetMode;
import com.linkedin.data.template.StringArray;
import com.linkedin.data.template.UnionTemplate;
import com.linkedin.metadata.dao.AspectKey;
import com.linkedin.metadata.dao.ingestion.RestliCompliantPreUpdateRoutingClient;
import com.linkedin.metadata.dao.ingestion.RestliPreUpdateAspectRegistry;
import com.linkedin.metadata.dao.utils.ModelUtils;
import com.linkedin.metadata.events.IngestionTrackingContext;
import com.linkedin.metadata.internal.IngestionParams;
Expand Down Expand Up @@ -69,6 +72,8 @@ public abstract class BaseAspectRoutingResource<
private final Class<INTERNAL_SNAPSHOT> _internalSnapshotClass;
private final Class<INTERNAL_ASPECT_UNION> _internalAspectUnionClass;
private final Class<ASSET> _assetClass;
private RestliPreUpdateAspectRegistry _restliPreUpdateAspectRegistry = null;
private static final List<String> SKIP_INGESTION_FOR_ASPECTS = Collections.singletonList("com.linkedin.dataset.DatasetAccountableOwnership");

public BaseAspectRoutingResource(@Nullable Class<SNAPSHOT> snapshotClass,
@Nullable Class<ASPECT_UNION> aspectUnionClass, @Nonnull Class<URN> urnClass, @Nonnull Class<VALUE> valueClass,
Expand All @@ -89,6 +94,11 @@ public BaseAspectRoutingResource(@Nullable Class<SNAPSHOT> snapshotClass,
*/
public abstract AspectRoutingGmsClientManager getAspectRoutingGmsClientManager();

/** Set the restliPreUpdateAspectRegistry. */
public void setRestliPreUpdateAspectRegistry(RestliPreUpdateAspectRegistry restliPreUpdateAspectRegistry) {
_restliPreUpdateAspectRegistry = restliPreUpdateAspectRegistry;
}

/**
* Retrieves the value for an entity that is made up of latest versions of specified aspects.
*/
Expand Down Expand Up @@ -305,6 +315,16 @@ protected Task<Void> ingestInternal(@Nonnull SNAPSHOT snapshot,
final AuditStamp auditStamp = getAuditor().requestAuditStamp(getContext().getRawRequestContext());
ModelUtils.getAspectsFromSnapshot(snapshot).forEach(aspect -> {
if (!aspectsToIgnore.contains(aspect.getClass())) {
if (_restliPreUpdateAspectRegistry != null && _restliPreUpdateAspectRegistry.isRegistered(
aspect.getClass())) {
aspect = preUpdateRouting(urn, aspect);
}
// Get the fqcn of the aspect class
String aspectFQCN = aspect.getClass().getCanonicalName();
//TODO: META-21112: Remove this check after adding annotations at model level; to handle SKIP/PROCEED for preUpdateRouting
if (SKIP_INGESTION_FOR_ASPECTS.contains(aspectFQCN)) {
return;
}
if (getAspectRoutingGmsClientManager().hasRegistered(aspect.getClass())) {
try {
if (trackingContext != null) {
Expand Down Expand Up @@ -339,6 +359,16 @@ protected Task<Void> ingestInternalAsset(@Nonnull ASSET asset,
ingestionParams != null ? ingestionParams.getIngestionTrackingContext() : null;
ModelUtils.getAspectsFromAsset(asset).forEach(aspect -> {
if (!aspectsToIgnore.contains(aspect.getClass())) {
if (_restliPreUpdateAspectRegistry != null && _restliPreUpdateAspectRegistry.isRegistered(
aspect.getClass())) {
aspect = preUpdateRouting(urn, aspect);
}
// Get the fqcn of the aspect class
String aspectFQCN = aspect.getClass().getCanonicalName();
//TODO: META-21112: Remove this check after adding annotations at model level; to handle SKIP/PROCEED for preUpdateRouting
if (SKIP_INGESTION_FOR_ASPECTS.contains(aspectFQCN)) {
return;
}
if (getAspectRoutingGmsClientManager().hasRegistered(aspect.getClass())) {
try {
if (trackingContext != null) {
Expand Down Expand Up @@ -585,4 +615,18 @@ private List<? extends RecordTemplate> getValueFromRoutingGms(@Nonnull URN urn,
}
}).filter(Objects::nonNull).collect(Collectors.toList());
}

/**
* This method routes the update request to the appropriate custom API for pre-ingestion processing.
* @param urn the urn of the asset
* @param aspect the new aspect value
* @return the updated aspect
*/
private RecordTemplate preUpdateRouting(URN urn, RecordTemplate aspect) {
RestliCompliantPreUpdateRoutingClient client =
_restliPreUpdateAspectRegistry.getPreUpdateRoutingClient(aspect);
Message updatedAspect =
client.routingLambda(client.convertUrnToMessage(urn), client.convertAspectToMessage(aspect));
return client.convertAspectFromMessage(updatedAspect);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import com.linkedin.metadata.dao.utils.ModelUtils;
import com.linkedin.metadata.dao.utils.RecordUtils;
import com.linkedin.metadata.events.IngestionTrackingContext;
import com.linkedin.metadata.restli.ingestion.SamplePreUpdateAspectRegistryImpl;
import com.linkedin.parseq.BaseEngineTest;
import com.linkedin.restli.common.ComplexResourceKey;
import com.linkedin.restli.common.EmptyRecord;
Expand All @@ -30,6 +31,8 @@
import com.linkedin.testing.InternalEntitySnapshot;
import com.linkedin.testing.urn.BazUrn;
import com.linkedin.testing.urn.FooUrn;
import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
import java.net.URISyntaxException;
import java.util.Arrays;
import java.util.Collections;
Expand All @@ -53,6 +56,7 @@ public class BaseAspectRoutingResourceTest extends BaseEngineTest {
private BaseBrowseDAO _mockBrowseDAO;
private BaseLocalDAO _mockLocalDAO;
private BaseAspectRoutingGmsClient _mockAspectFooGmsClient;
private BaseAspectRoutingGmsClient _mockAspectBarGmsClient;
private BaseAspectRoutingGmsClient _mockAspectBazGmsClient;
private BaseAspectRoutingGmsClient _mockAspectAttributeGmsClient;

Expand Down Expand Up @@ -157,6 +161,7 @@ public ResourceContext getContext() {
@BeforeMethod
public void setup() {
_mockAspectFooGmsClient = mock(BaseAspectRoutingGmsClient.class);
_mockAspectBarGmsClient = mock(BaseAspectRoutingGmsClient.class);
_mockAspectAttributeGmsClient = mock(BaseAspectRoutingGmsClient.class);
_mockAspectBazGmsClient = mock(BaseAspectRoutingGmsClient.class);
when(_mockAspectFooGmsClient.getEntityType()).thenReturn(FooUrn.ENTITY_TYPE);
Expand Down Expand Up @@ -543,4 +548,56 @@ public void testBackfillWithNewValue() {
assertEquals(backfillResult.getEntities().size(), 2);
verifyZeroInteractions(_mockAspectFooGmsClient);
}

@Test
public void testPreUpdateRoutingWithRegisteredAspect() {
FooUrn urn = makeFooUrn(1);
AspectFoo foo = new AspectFoo().setValue("foo");
AspectFoo bar = new AspectFoo().setValue("bar");

List<EntityAspectUnion> aspects = Arrays.asList(ModelUtils.newAspectUnion(EntityAspectUnion.class, foo));
EntitySnapshot snapshot = ModelUtils.newSnapshot(EntitySnapshot.class, urn, aspects);
_resource.setRestliPreUpdateAspectRegistry(new SamplePreUpdateAspectRegistryImpl());
runAndWait(_resource.ingest(snapshot));
verify(_mockAspectFooGmsClient, times(1)).ingest(eq(urn), eq(bar));
verify(_mockLocalDAO, times(0)).add(any(), any(), any(), any(), any());
verifyNoMoreInteractions(_mockLocalDAO);
}

@Test
public void testPreUpdateRoutingWithNonRegisteredPreUpdateAspect() {
FooUrn urn = makeFooUrn(1);
AspectBar bar = new AspectBar().setValue("bar");
List<EntityAspectUnion> aspects = Arrays.asList(ModelUtils.newAspectUnion(EntityAspectUnion.class, bar));
EntitySnapshot snapshot = ModelUtils.newSnapshot(EntitySnapshot.class, urn, aspects);
_resource.setRestliPreUpdateAspectRegistry(new SamplePreUpdateAspectRegistryImpl());
runAndWait(_resource.ingest(snapshot));
verify(_mockAspectBarGmsClient, times(0)).ingest(any(), any());
verify(_mockLocalDAO, times(1)).add(eq(urn), eq(bar), any(), eq(null), eq(null));
verifyNoMoreInteractions(_mockLocalDAO);
}

@Test
public void testPreUpdateRoutingWithSkipIngestion() throws NoSuchFieldException, IllegalAccessException {
// Access the SKIP_INGESTION_FOR_ASPECTS field
Field skipIngestionField = BaseAspectRoutingResource.class.getDeclaredField("SKIP_INGESTION_FOR_ASPECTS");
skipIngestionField.setAccessible(true);
// Remove the final modifier from the field
Field modifiersField = Field.class.getDeclaredField("modifiers");
modifiersField.setAccessible(true);
modifiersField.setInt(skipIngestionField, skipIngestionField.getModifiers() & ~Modifier.FINAL);
// Modify the field to contain AspectFoo
List<String> newSkipIngestionList = Arrays.asList("com.linkedin.testing.AspectFoo");
skipIngestionField.set(null, newSkipIngestionList);

FooUrn urn = makeFooUrn(1);
AspectFoo foo = new AspectFoo().setValue("foo");
List<EntityAspectUnion> aspects = Arrays.asList(ModelUtils.newAspectUnion(EntityAspectUnion.class, foo));
EntitySnapshot snapshot = ModelUtils.newSnapshot(EntitySnapshot.class, urn, aspects);
_resource.setRestliPreUpdateAspectRegistry(new SamplePreUpdateAspectRegistryImpl());
runAndWait(_resource.ingest(snapshot));
verify(_mockAspectFooGmsClient, times(0)).ingest(any(), any());
verify(_mockLocalDAO, times(0)).add(any(), any(), any(), any(), any());
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package com.linkedin.metadata.restli.ingestion;

import com.google.common.collect.ImmutableMap;
import com.linkedin.data.template.RecordTemplate;
import com.linkedin.metadata.dao.ingestion.RestliCompliantPreUpdateRoutingClient;
import com.linkedin.metadata.dao.ingestion.RestliPreUpdateAspectRegistry;
import com.linkedin.testing.AspectFoo;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;


public class SamplePreUpdateAspectRegistryImpl implements RestliPreUpdateAspectRegistry {
private final ImmutableMap<Class<? extends RecordTemplate>, RestliCompliantPreUpdateRoutingClient> registry;

public SamplePreUpdateAspectRegistryImpl() {
registry = new ImmutableMap.Builder<Class<? extends RecordTemplate>, RestliCompliantPreUpdateRoutingClient>()
.put(AspectFoo.class, new SamplePreUpdateRoutingClient())
.build();
}
@Nullable
@Override
public <ASPECT extends RecordTemplate> RestliCompliantPreUpdateRoutingClient getPreUpdateRoutingClient(@Nonnull ASPECT aspect) {
return registry.get(aspect.getClass());
}

@Override
public <ASPECT extends RecordTemplate> boolean isRegistered(@Nonnull Class<ASPECT> aspectClass) {
return registry.containsKey(aspectClass);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package com.linkedin.metadata.restli.ingestion;

import com.google.protobuf.Any;
import com.google.protobuf.Message;
import com.google.protobuf.StringValue;
import com.linkedin.common.urn.Urn;
import com.linkedin.data.template.RecordTemplate;
import com.linkedin.metadata.dao.ingestion.RestliCompliantPreUpdateRoutingClient;
import com.linkedin.testing.AspectFoo;


public class SamplePreUpdateRoutingClient implements RestliCompliantPreUpdateRoutingClient {
@Override
public Message routingLambda(Message urn, Message aspect) {
// For testing, change the aspect value to "bar"
return Any.pack(StringValue.of("bar"));
}

@Override
public Message convertUrnToMessage(Urn urn) {
// Directly wrap the URN string into a Protobuf message for testing
return Any.pack(StringValue.of(urn.toString()));
}

@Override
public Message convertAspectToMessage(RecordTemplate pegasusAspect) {
// For testing, convert AspectFoo to a TestMessageProtos.AspectMessage
// Assuming the aspect has a `value` field and its string representation can be used for now
String aspectString = pegasusAspect.toString(); // Extracting the aspect as a string (e.g., {value=foo})

// Wrap the aspect string into a simple Protobuf message for testing
return Any.pack(StringValue.of(aspectString));
}

@Override
public RecordTemplate convertAspectFromMessage(Message messageAspect) {
// For testing, convert TestMessageProtos.AspectMessage back to AspectFoo
// Create a new RecordTemplate (AspectFoo in this case) and set the value field
return new AspectFoo().setValue("bar");
}
}

0 comments on commit 794996a

Please sign in to comment.