Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding PreUpdateRouting to BaseAspectRoutingResource #413

Merged
merged 13 commits into from
Sep 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion restli-resources/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ dependencies {
compile project(':dao-api')
compile spec.product.pegasus.restliServer
compile spec.product.pegasus.restliClient

implementation 'com.google.protobuf:protobuf-java:3.21.1'
compileOnly externalDependency.lombok
annotationProcessor externalDependency.lombokAnnotationProcessor
testCompile project(':core-models-utils')
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
package com.linkedin.metadata.restli;

import com.google.protobuf.Message;
import com.linkedin.common.AuditStamp;
import com.linkedin.common.urn.Urn;
import com.linkedin.data.template.RecordTemplate;
import com.linkedin.data.template.SetMode;
import com.linkedin.data.template.StringArray;
import com.linkedin.data.template.UnionTemplate;
import com.linkedin.metadata.dao.AspectKey;
import com.linkedin.metadata.dao.ingestion.RestliCompliantPreUpdateRoutingClient;
import com.linkedin.metadata.dao.ingestion.RestliPreUpdateAspectRegistry;
import com.linkedin.metadata.dao.utils.ModelUtils;
import com.linkedin.metadata.events.IngestionTrackingContext;
import com.linkedin.metadata.internal.IngestionParams;
Expand Down Expand Up @@ -70,6 +73,8 @@ public abstract class BaseAspectRoutingResource<
private final Class<INTERNAL_SNAPSHOT> _internalSnapshotClass;
private final Class<INTERNAL_ASPECT_UNION> _internalAspectUnionClass;
private final Class<ASSET> _assetClass;
private RestliPreUpdateAspectRegistry _restliPreUpdateAspectRegistry = null;
private static final List<String> SKIP_INGESTION_FOR_ASPECTS = Collections.singletonList("com.linkedin.dataset.DatasetAccountableOwnership");

public BaseAspectRoutingResource(@Nullable Class<SNAPSHOT> snapshotClass,
@Nullable Class<ASPECT_UNION> aspectUnionClass, @Nonnull Class<URN> urnClass, @Nonnull Class<VALUE> valueClass,
Expand Down Expand Up @@ -105,6 +110,11 @@ public BaseAspectRoutingResource(@Nullable Class<SNAPSHOT> snapshotClass,
*/
public abstract AspectRoutingGmsClientManager getAspectRoutingGmsClientManager();

/** Set the restliPreUpdateAspectRegistry. */
public void setRestliPreUpdateAspectRegistry(RestliPreUpdateAspectRegistry restliPreUpdateAspectRegistry) {
_restliPreUpdateAspectRegistry = restliPreUpdateAspectRegistry;
}

/**
* Retrieves the value for an entity that is made up of latest versions of specified aspects.
*/
Expand Down Expand Up @@ -321,6 +331,16 @@ protected Task<Void> ingestInternal(@Nonnull SNAPSHOT snapshot,
final AuditStamp auditStamp = getAuditor().requestAuditStamp(getContext().getRawRequestContext());
ModelUtils.getAspectsFromSnapshot(snapshot).forEach(aspect -> {
if (!aspectsToIgnore.contains(aspect.getClass())) {
if (_restliPreUpdateAspectRegistry != null && _restliPreUpdateAspectRegistry.isRegistered(
aspect.getClass())) {
aspect = preUpdateRouting(urn, aspect);
}
// Get the fqcn of the aspect class
String aspectFQCN = aspect.getClass().getCanonicalName();
//TODO: META-21112: Remove this check after adding annotations at model level; to handle SKIP/PROCEED for preUpdateRouting
if (SKIP_INGESTION_FOR_ASPECTS.contains(aspectFQCN)) {
return;
}
if (getAspectRoutingGmsClientManager().hasRegistered(aspect.getClass())) {
try {
if (trackingContext != null) {
Expand Down Expand Up @@ -355,6 +375,16 @@ protected Task<Void> ingestInternalAsset(@Nonnull ASSET asset,
ingestionParams != null ? ingestionParams.getIngestionTrackingContext() : null;
ModelUtils.getAspectsFromAsset(asset).forEach(aspect -> {
if (!aspectsToIgnore.contains(aspect.getClass())) {
if (_restliPreUpdateAspectRegistry != null && _restliPreUpdateAspectRegistry.isRegistered(
aspect.getClass())) {
aspect = preUpdateRouting(urn, aspect);
}
// Get the fqcn of the aspect class
String aspectFQCN = aspect.getClass().getCanonicalName();
//TODO: META-21112: Remove this check after adding annotations at model level; to handle SKIP/PROCEED for preUpdateRouting
if (SKIP_INGESTION_FOR_ASPECTS.contains(aspectFQCN)) {
return;
}
if (getAspectRoutingGmsClientManager().hasRegistered(aspect.getClass())) {
try {
if (trackingContext != null) {
Expand Down Expand Up @@ -601,4 +631,18 @@ private List<? extends RecordTemplate> getValueFromRoutingGms(@Nonnull URN urn,
}
}).filter(Objects::nonNull).collect(Collectors.toList());
}

/**
* This method routes the update request to the appropriate custom API for pre-ingestion processing.
* @param urn the urn of the asset
* @param aspect the new aspect value
* @return the updated aspect
*/
private RecordTemplate preUpdateRouting(URN urn, RecordTemplate aspect) {
RestliCompliantPreUpdateRoutingClient client =
_restliPreUpdateAspectRegistry.getPreUpdateRoutingClient(aspect);
Message updatedAspect =
client.routingLambda(client.convertUrnToMessage(urn), client.convertAspectToMessage(aspect));
return client.convertAspectFromMessage(updatedAspect);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import com.linkedin.metadata.dao.utils.ModelUtils;
import com.linkedin.metadata.dao.utils.RecordUtils;
import com.linkedin.metadata.events.IngestionTrackingContext;
import com.linkedin.metadata.restli.ingestion.SamplePreUpdateAspectRegistryImpl;
import com.linkedin.parseq.BaseEngineTest;
import com.linkedin.restli.common.ComplexResourceKey;
import com.linkedin.restli.common.EmptyRecord;
Expand All @@ -30,6 +31,8 @@
import com.linkedin.testing.InternalEntitySnapshot;
import com.linkedin.testing.urn.BazUrn;
import com.linkedin.testing.urn.FooUrn;
import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
import java.net.URISyntaxException;
import java.util.Arrays;
import java.util.Collections;
Expand All @@ -53,6 +56,7 @@ public class BaseAspectRoutingResourceTest extends BaseEngineTest {
private BaseBrowseDAO _mockBrowseDAO;
private BaseLocalDAO _mockLocalDAO;
private BaseAspectRoutingGmsClient _mockAspectFooGmsClient;
private BaseAspectRoutingGmsClient _mockAspectBarGmsClient;
private BaseAspectRoutingGmsClient _mockAspectBazGmsClient;
private BaseAspectRoutingGmsClient _mockAspectAttributeGmsClient;

Expand Down Expand Up @@ -157,6 +161,7 @@ public ResourceContext getContext() {
@BeforeMethod
public void setup() {
_mockAspectFooGmsClient = mock(BaseAspectRoutingGmsClient.class);
_mockAspectBarGmsClient = mock(BaseAspectRoutingGmsClient.class);
_mockAspectAttributeGmsClient = mock(BaseAspectRoutingGmsClient.class);
_mockAspectBazGmsClient = mock(BaseAspectRoutingGmsClient.class);
when(_mockAspectFooGmsClient.getEntityType()).thenReturn(FooUrn.ENTITY_TYPE);
Expand Down Expand Up @@ -543,4 +548,56 @@ public void testBackfillWithNewValue() {
assertEquals(backfillResult.getEntities().size(), 2);
verifyZeroInteractions(_mockAspectFooGmsClient);
}

@Test
public void testPreUpdateRoutingWithRegisteredAspect() {
FooUrn urn = makeFooUrn(1);
AspectFoo foo = new AspectFoo().setValue("foo");
AspectFoo bar = new AspectFoo().setValue("bar");

List<EntityAspectUnion> aspects = Arrays.asList(ModelUtils.newAspectUnion(EntityAspectUnion.class, foo));
EntitySnapshot snapshot = ModelUtils.newSnapshot(EntitySnapshot.class, urn, aspects);
_resource.setRestliPreUpdateAspectRegistry(new SamplePreUpdateAspectRegistryImpl());
runAndWait(_resource.ingest(snapshot));
verify(_mockAspectFooGmsClient, times(1)).ingest(eq(urn), eq(bar));
verify(_mockLocalDAO, times(0)).add(any(), any(), any(), any(), any());
verifyNoMoreInteractions(_mockLocalDAO);
}

@Test
public void testPreUpdateRoutingWithNonRegisteredPreUpdateAspect() {
FooUrn urn = makeFooUrn(1);
AspectBar bar = new AspectBar().setValue("bar");
List<EntityAspectUnion> aspects = Arrays.asList(ModelUtils.newAspectUnion(EntityAspectUnion.class, bar));
EntitySnapshot snapshot = ModelUtils.newSnapshot(EntitySnapshot.class, urn, aspects);
_resource.setRestliPreUpdateAspectRegistry(new SamplePreUpdateAspectRegistryImpl());
runAndWait(_resource.ingest(snapshot));
verify(_mockAspectBarGmsClient, times(0)).ingest(any(), any());
verify(_mockLocalDAO, times(1)).add(eq(urn), eq(bar), any(), eq(null), eq(null));
verifyNoMoreInteractions(_mockLocalDAO);
}

@Test
public void testPreUpdateRoutingWithSkipIngestion() throws NoSuchFieldException, IllegalAccessException {
// Access the SKIP_INGESTION_FOR_ASPECTS field
Field skipIngestionField = BaseAspectRoutingResource.class.getDeclaredField("SKIP_INGESTION_FOR_ASPECTS");
skipIngestionField.setAccessible(true);
// Remove the final modifier from the field
Field modifiersField = Field.class.getDeclaredField("modifiers");
modifiersField.setAccessible(true);
modifiersField.setInt(skipIngestionField, skipIngestionField.getModifiers() & ~Modifier.FINAL);
// Modify the field to contain AspectFoo
List<String> newSkipIngestionList = Arrays.asList("com.linkedin.testing.AspectFoo");
skipIngestionField.set(null, newSkipIngestionList);

FooUrn urn = makeFooUrn(1);
AspectFoo foo = new AspectFoo().setValue("foo");
List<EntityAspectUnion> aspects = Arrays.asList(ModelUtils.newAspectUnion(EntityAspectUnion.class, foo));
EntitySnapshot snapshot = ModelUtils.newSnapshot(EntitySnapshot.class, urn, aspects);
_resource.setRestliPreUpdateAspectRegistry(new SamplePreUpdateAspectRegistryImpl());
runAndWait(_resource.ingest(snapshot));
verify(_mockAspectFooGmsClient, times(0)).ingest(any(), any());
verify(_mockLocalDAO, times(0)).add(any(), any(), any(), any(), any());
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package com.linkedin.metadata.restli.ingestion;

import com.google.common.collect.ImmutableMap;
import com.linkedin.data.template.RecordTemplate;
import com.linkedin.metadata.dao.ingestion.RestliCompliantPreUpdateRoutingClient;
import com.linkedin.metadata.dao.ingestion.RestliPreUpdateAspectRegistry;
import com.linkedin.testing.AspectFoo;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;


public class SamplePreUpdateAspectRegistryImpl implements RestliPreUpdateAspectRegistry {
private final ImmutableMap<Class<? extends RecordTemplate>, RestliCompliantPreUpdateRoutingClient> registry;

public SamplePreUpdateAspectRegistryImpl() {
registry = new ImmutableMap.Builder<Class<? extends RecordTemplate>, RestliCompliantPreUpdateRoutingClient>()
.put(AspectFoo.class, new SamplePreUpdateRoutingClient())
.build();
}
@Nullable
@Override
public <ASPECT extends RecordTemplate> RestliCompliantPreUpdateRoutingClient getPreUpdateRoutingClient(@Nonnull ASPECT aspect) {
return registry.get(aspect.getClass());
}

@Override
public <ASPECT extends RecordTemplate> boolean isRegistered(@Nonnull Class<ASPECT> aspectClass) {
return registry.containsKey(aspectClass);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package com.linkedin.metadata.restli.ingestion;

import com.google.protobuf.Any;
import com.google.protobuf.Message;
import com.google.protobuf.StringValue;
import com.linkedin.common.urn.Urn;
import com.linkedin.data.template.RecordTemplate;
import com.linkedin.metadata.dao.ingestion.RestliCompliantPreUpdateRoutingClient;
import com.linkedin.testing.AspectFoo;


public class SamplePreUpdateRoutingClient implements RestliCompliantPreUpdateRoutingClient {
@Override
public Message routingLambda(Message urn, Message aspect) {
// For testing, change the aspect value to "bar"
return Any.pack(StringValue.of("bar"));
}

@Override
public Message convertUrnToMessage(Urn urn) {
// Directly wrap the URN string into a Protobuf message for testing
return Any.pack(StringValue.of(urn.toString()));
}

@Override
public Message convertAspectToMessage(RecordTemplate pegasusAspect) {
// For testing, convert AspectFoo to a TestMessageProtos.AspectMessage
// Assuming the aspect has a `value` field and its string representation can be used for now
String aspectString = pegasusAspect.toString(); // Extracting the aspect as a string (e.g., {value=foo})

// Wrap the aspect string into a simple Protobuf message for testing
return Any.pack(StringValue.of(aspectString));
}

@Override
public RecordTemplate convertAspectFromMessage(Message messageAspect) {
// For testing, convert TestMessageProtos.AspectMessage back to AspectFoo
// Create a new RecordTemplate (AspectFoo in this case) and set the value field
return new AspectFoo().setValue("bar");
}
}
Loading