Skip to content

Commit

Permalink
Included Entity type in the routing map (#455)
Browse files Browse the repository at this point in the history
* Fixed routing map

* Addressed comments

---------

Co-authored-by: Rakhi Agrawal <[email protected]>
  • Loading branch information
rakhiagr and Rakhi Agrawal authored Oct 21, 2024
1 parent d4a2261 commit 842691e
Show file tree
Hide file tree
Showing 8 changed files with 195 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1724,9 +1724,10 @@ protected <ASPECT extends RecordTemplate> ASPECT updatePreIngestionLambdas(@Nonn
*/
protected <ASPECT extends RecordTemplate> AspectUpdateResult aspectCallbackHelper(URN urn, ASPECT newAspectValue,
Optional<ASPECT> oldAspectValue, IngestionParams ingestionParams) {

if (_aspectCallbackRegistry != null && _aspectCallbackRegistry.isRegistered(
newAspectValue.getClass())) {
AspectCallbackRoutingClient client = _aspectCallbackRegistry.getAspectCallbackRoutingClient(newAspectValue.getClass());
newAspectValue.getClass(), urn.getEntityType())) {
AspectCallbackRoutingClient client = _aspectCallbackRegistry.getAspectCallbackRoutingClient(newAspectValue.getClass(), urn.getEntityType());
AspectCallbackResponse aspectCallbackResponse = client.routeAspectCallback(urn, newAspectValue, oldAspectValue, ingestionParams);
ASPECT updatedAspect = (ASPECT) aspectCallbackResponse.getUpdatedAspect();
log.info("Aspect callback routing completed in BaseLocalDao, urn: {}, updated aspect: {}", urn, updatedAspect);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package com.linkedin.metadata.dao.ingestion;

import com.linkedin.data.template.RecordTemplate;
import lombok.Data;


/**
* A key class used in the AspectCallbackRegistry map to uniquely identify aspect callback routing clients.
* The key is a combination of the aspect class and the entity type.
*/
@Data
public class AspectCallbackMapKey {
private final Class<? extends RecordTemplate> aspectClass;
private final String entityType;

/**
* Indicates whether some other object is "equal to" this one.
* Two AspectCallbackMapKey objects are considered equal if their aspectClass and entityType are equal.
*
* @param o the reference object with which to compare
* @return true if this object is the same as the obj argument; false otherwise
*/
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
AspectCallbackMapKey that = (AspectCallbackMapKey) o;
return aspectClass.equals(that.aspectClass) && entityType.equals(that.entityType);
}

/**
* Returns a hash code value for the object.
* The hash code is computed based on the aspectClass and entityType.
*
* @return a hash code value for this object
*/
@Override
public int hashCode() {
int result = aspectClass.hashCode();
result = 31 * result + entityType.hashCode();
return result;
}
}


Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,13 @@
@Slf4j
public class AspectCallbackRegistry {

private final Map<Class<? extends RecordTemplate>, AspectCallbackRoutingClient> aspectCallbackMap;
private final Map<AspectCallbackMapKey, AspectCallbackRoutingClient> aspectCallbackMap;

/**
* Constructor to register aspect callback routing clients for aspects.
* @param aspectCallbackMap map containing aspect classes and their corresponding cleints
*/
public AspectCallbackRegistry(@Nonnull Map<Class<? extends RecordTemplate>, AspectCallbackRoutingClient> aspectCallbackMap) {
public AspectCallbackRegistry(@Nonnull Map<AspectCallbackMapKey, AspectCallbackRoutingClient> aspectCallbackMap) {
this.aspectCallbackMap = new HashMap<>(aspectCallbackMap);
log.info("Registered aspect callback clients for aspects: {}", aspectCallbackMap.keySet());
}
Expand All @@ -30,15 +30,15 @@ public AspectCallbackRegistry(@Nonnull Map<Class<? extends RecordTemplate>, Aspe
* @return AspectCallbackRoutingClient for the given aspect class, or null if not found
*/
public <ASPECT extends RecordTemplate> AspectCallbackRoutingClient getAspectCallbackRoutingClient(
@Nonnull Class<ASPECT> aspectClass) {
return aspectCallbackMap.get(aspectClass);
@Nonnull Class<ASPECT> aspectClass, @Nonnull String entityType) {
return aspectCallbackMap.get(new AspectCallbackMapKey(aspectClass, entityType));
}

/**
* Check if Aspect Callback Routing Client is registered for an aspect.
*/
public <ASPECT extends RecordTemplate> boolean isRegistered(@Nonnull final Class<ASPECT> aspectClass) {
return aspectCallbackMap.containsKey(aspectClass);
public <ASPECT extends RecordTemplate> boolean isRegistered(@Nonnull Class<ASPECT> aspectClass, @Nonnull String entityType) {
return aspectCallbackMap.containsKey(new AspectCallbackMapKey(aspectClass, entityType));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import com.linkedin.data.template.SetMode;
import com.linkedin.data.template.UnionTemplate;
import com.linkedin.metadata.dao.builder.BaseLocalRelationshipBuilder.LocalRelationshipUpdates;
import com.linkedin.metadata.dao.ingestion.AspectCallbackMapKey;
import com.linkedin.metadata.dao.ingestion.AspectCallbackRoutingClient;
import com.linkedin.metadata.dao.ingestion.SampleAspectCallbackRoutingClient;
import com.linkedin.metadata.dao.ingestion.SampleLambdaFunctionRegistryImpl;
Expand Down Expand Up @@ -662,8 +663,9 @@ public void testAspectCallbackHelperFromFooToBar() throws URISyntaxException {
AspectFoo foo = new AspectFoo().setValue("foo");
AspectFoo bar = new AspectFoo().setValue("bar");

Map<Class<? extends RecordTemplate>, AspectCallbackRoutingClient> aspectCallbackMap = new HashMap<>();
aspectCallbackMap.put(AspectFoo.class, new SampleAspectCallbackRoutingClient());
Map<AspectCallbackMapKey, AspectCallbackRoutingClient> aspectCallbackMap = new HashMap<>();
AspectCallbackMapKey aspectCallbackMapKey = new AspectCallbackMapKey(AspectFoo.class, urn.getEntityType());
aspectCallbackMap.put(aspectCallbackMapKey, new SampleAspectCallbackRoutingClient());

AspectCallbackRegistry aspectCallbackRegistry = new AspectCallbackRegistry(aspectCallbackMap);
_dummyLocalDAO.setAspectCallbackRegistry(aspectCallbackRegistry);
Expand All @@ -678,8 +680,8 @@ public void testMAEEmissionForAspectCallbackHelper() throws URISyntaxException {
AspectFoo foo = new AspectFoo().setValue("foo");
AspectFoo bar = new AspectFoo().setValue("bar");
_dummyLocalDAO.setAlwaysEmitAuditEvent(true);
Map<Class<? extends RecordTemplate>, AspectCallbackRoutingClient> aspectCallbackMap = new HashMap<>();
aspectCallbackMap.put(AspectFoo.class, new SampleAspectCallbackRoutingClient());
Map<AspectCallbackMapKey, AspectCallbackRoutingClient> aspectCallbackMap = new HashMap<>();
aspectCallbackMap.put(new AspectCallbackMapKey(AspectFoo.class, urn.getEntityType()), new SampleAspectCallbackRoutingClient());
AspectCallbackRegistry aspectCallbackRegistry = new AspectCallbackRegistry(aspectCallbackMap);
_dummyLocalDAO.setAspectCallbackRegistry(aspectCallbackRegistry);
expectGetLatest(urn, AspectFoo.class,
Expand All @@ -699,8 +701,8 @@ public void testAspectCallbackHelperWithUnregisteredAspect() throws URISyntaxExc
AspectBar foo = new AspectBar().setValue("foo");

// Inject RestliPreIngestionAspectRegistry with no registered aspect
Map<Class<? extends RecordTemplate>, AspectCallbackRoutingClient> aspectCallbackMap = new HashMap<>();
aspectCallbackMap.put(AspectFoo.class, new SampleAspectCallbackRoutingClient());
Map<AspectCallbackMapKey, AspectCallbackRoutingClient> aspectCallbackMap = new HashMap<>();
aspectCallbackMap.put(new AspectCallbackMapKey(AspectFoo.class, urn.getEntityType()), new SampleAspectCallbackRoutingClient());
AspectCallbackRegistry aspectCallbackRegistry = new AspectCallbackRegistry(aspectCallbackMap);
_dummyLocalDAO.setAspectCallbackRegistry(aspectCallbackRegistry);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package com.linkedin.metadata.dao.ingestion;

import com.linkedin.data.template.RecordTemplate;
import com.linkedin.testing.AspectBar;
import com.linkedin.testing.AspectFoo;
import org.testng.annotations.Test;

import static org.mockito.Mockito.*;
import static org.testng.Assert.*;


public class AspectCallbackMapKeyTest {

@Test
public void testConstructorAndGetters() {
// Use a real class instead of mocking
Class<? extends RecordTemplate> mockAspectClass = RecordTemplate.class;
String entityType = "testEntity";

// Create an instance of AspectCallbackMapKey
AspectCallbackMapKey key = new AspectCallbackMapKey(mockAspectClass, entityType);

// Verify that the getters return the correct values
assertEquals(mockAspectClass, key.getAspectClass());
assertEquals(entityType, key.getEntityType());
}

@Test
public void testEquals() {
// Create mock instances of RecordTemplate class
Class<? extends RecordTemplate> mockAspectClass1 = AspectFoo.class;
Class<? extends RecordTemplate> mockAspectClass2 = AspectBar.class;
String entityType1 = "testEntity1";
String entityType2 = "testEntity2";

// Create instances of AspectCallbackMapKey
AspectCallbackMapKey key1 = new AspectCallbackMapKey(mockAspectClass1, entityType1);
AspectCallbackMapKey key2 = new AspectCallbackMapKey(mockAspectClass1, entityType1);
AspectCallbackMapKey key3 = new AspectCallbackMapKey(mockAspectClass2, entityType1);
AspectCallbackMapKey key4 = new AspectCallbackMapKey(mockAspectClass1, entityType2);

// Verify equality
assertEquals(key1, key2); // Same class and entity type
assertNotEquals(key1, key3); // Different class, same entity type
assertNotEquals(key1, key4); // Same class, different entity type
assertNotEquals(key1, null); // Not equal to null
assertNotEquals(key1, new Object()); // Not equal to a different type
}

@Test
public void testHashCode() {
// Create mock instances of RecordTemplate class
Class<? extends RecordTemplate> mockAspectClass1 = AspectFoo.class;
Class<? extends RecordTemplate> mockAspectClass2 = AspectBar.class;
String entityType1 = "testEntity1";
String entityType2 = "testEntity2";

// Create instances of AspectCallbackMapKey
AspectCallbackMapKey key1 = new AspectCallbackMapKey(mockAspectClass1, entityType1);
AspectCallbackMapKey key2 = new AspectCallbackMapKey(mockAspectClass1, entityType1);
AspectCallbackMapKey key3 = new AspectCallbackMapKey(mockAspectClass2, entityType1);
AspectCallbackMapKey key4 = new AspectCallbackMapKey(mockAspectClass1, entityType2);

assertEquals(key1.hashCode(), key2.hashCode());
assertNotEquals(key1.hashCode(), key3.hashCode());
assertNotEquals(key1.hashCode(), key4.hashCode());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package com.linkedin.metadata.dao.ingestion;

import java.util.HashMap;
import java.util.Map;
import com.linkedin.testing.AspectBar;
import com.linkedin.testing.AspectFoo;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

import static org.testng.Assert.*;


public class AspectCallbackRegistryTest {
private AspectCallbackRegistry registry;
private AspectCallbackRoutingClient client1;
private AspectCallbackRoutingClient client2;

@BeforeMethod
public void setUp() {
client1 = new SampleAspectCallbackRoutingClient();
client2 = new SampleAspectCallbackRoutingClient();

Map<AspectCallbackMapKey, AspectCallbackRoutingClient> aspectCallbackMap = new HashMap<>();
aspectCallbackMap.put(new AspectCallbackMapKey(AspectFoo.class, "entityType1"), client1);
aspectCallbackMap.put(new AspectCallbackMapKey(AspectBar.class, "entityType2"), client2);

registry = new AspectCallbackRegistry(aspectCallbackMap);
}

@Test
public void testConstructor() {
assertNotNull(registry);
}

@Test
public void testGetAspectCallbackRoutingClient() {
assertEquals(registry.getAspectCallbackRoutingClient(AspectFoo.class, "entityType1"), client1);
assertEquals(registry.getAspectCallbackRoutingClient(AspectBar.class, "entityType2"), client2);
assertNull(registry.getAspectCallbackRoutingClient(AspectFoo.class, "entityType2"));
}

@Test
public void testIsRegistered() {
assertTrue(registry.isRegistered(AspectFoo.class, "entityType1"));
assertTrue(registry.isRegistered(AspectBar.class, "entityType2"));
assertFalse(registry.isRegistered(AspectFoo.class, "entityType2"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -417,7 +417,7 @@ private void ingestAspect(Set<Class<? extends RecordTemplate>> aspectsToIgnore,
try {
// get the updated aspect if there is a preupdate routing lambda registered
AspectCallbackRegistry registry = getLocalDAO().getAspectCallbackRegistry();
if (!skipExtraProcessing && registry != null && registry.isRegistered(aspect.getClass())) {
if (!skipExtraProcessing && registry != null && registry.isRegistered(aspect.getClass(), urn.getEntityType())) {
log.info(String.format("Executing registered pre-update routing lambda for aspect class %s.", aspect.getClass()));
aspect = aspectCallbackHelper((URN) urn, aspect, registry);
log.info("PreUpdateRouting completed in ingestInternalAsset, urn: {}, updated aspect: {}", urn, aspect);
Expand Down Expand Up @@ -691,7 +691,7 @@ private List<? extends RecordTemplate> getValueFromRoutingGms(@Nonnull URN urn,
* @return the updated aspect
*/
private RecordTemplate aspectCallbackHelper(URN urn, RecordTemplate aspect, AspectCallbackRegistry registry) {
AspectCallbackRoutingClient preUpdateClient = registry.getAspectCallbackRoutingClient(aspect.getClass());
AspectCallbackRoutingClient preUpdateClient = registry.getAspectCallbackRoutingClient(aspect.getClass(), urn.getEntityType());
AspectCallbackResponse aspectCallbackResponse = preUpdateClient.routeAspectCallback(urn, aspect, null);
return aspectCallbackResponse.getUpdatedAspect();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import com.linkedin.metadata.dao.BaseBrowseDAO;
import com.linkedin.metadata.dao.BaseLocalDAO;
import com.linkedin.metadata.dao.BaseSearchDAO;
import com.linkedin.metadata.dao.ingestion.AspectCallbackMapKey;
import com.linkedin.metadata.dao.ingestion.AspectCallbackRegistry;
import com.linkedin.metadata.dao.ingestion.AspectCallbackRoutingClient;
import com.linkedin.metadata.dao.utils.ModelUtils;
Expand Down Expand Up @@ -562,9 +563,9 @@ public void testAspectCallbackHelperWithRegisteredAspect() {
List<EntityAspectUnion> aspects = Arrays.asList(ModelUtils.newAspectUnion(EntityAspectUnion.class, foo));
EntitySnapshot snapshot = ModelUtils.newSnapshot(EntitySnapshot.class, urn, aspects);

Map<Class<? extends RecordTemplate>, AspectCallbackRoutingClient> preUpdateMap = new HashMap<>();
preUpdateMap.put(AspectFoo.class, new SampleAspectCallbackRoutingClient());
AspectCallbackRegistry aspectCallbackRegistry = new AspectCallbackRegistry(preUpdateMap);
Map<AspectCallbackMapKey, AspectCallbackRoutingClient> aspectCallbackMap = new HashMap<>();
aspectCallbackMap.put(new AspectCallbackMapKey(AspectFoo.class, urn.getEntityType()), new SampleAspectCallbackRoutingClient());
AspectCallbackRegistry aspectCallbackRegistry = new AspectCallbackRegistry(aspectCallbackMap);

when(_mockLocalDAO.getAspectCallbackRegistry()).thenReturn(aspectCallbackRegistry);

Expand Down Expand Up @@ -607,9 +608,9 @@ public void testPreUpdateRoutingWithNonRoutedAspectAndRegisteredInUpdate() {
List<EntityAspectUnion> aspects = Arrays.asList(ModelUtils.newAspectUnion(EntityAspectUnion.class, bar));
EntitySnapshot snapshot = ModelUtils.newSnapshot(EntitySnapshot.class, urn, aspects);

Map<Class<? extends RecordTemplate>, AspectCallbackRoutingClient> preUpdateMap = new HashMap<>();
preUpdateMap.put(AspectFoo.class, new SampleAspectCallbackRoutingClient());
AspectCallbackRegistry aspectCallbackRegistry = new AspectCallbackRegistry(preUpdateMap);
Map<AspectCallbackMapKey, AspectCallbackRoutingClient> aspectCallbackMap = new HashMap<>();
aspectCallbackMap.put(new AspectCallbackMapKey(AspectFoo.class, urn.getEntityType()), new SampleAspectCallbackRoutingClient());
AspectCallbackRegistry aspectCallbackRegistry = new AspectCallbackRegistry(aspectCallbackMap);

when(_mockLocalDAO.getAspectCallbackRegistry()).thenReturn(aspectCallbackRegistry);

Expand Down Expand Up @@ -658,9 +659,9 @@ public void testAspectCallbackHelperWithSkipIngestion() throws NoSuchFieldExcept
List<EntityAspectUnion> aspects = Arrays.asList(ModelUtils.newAspectUnion(EntityAspectUnion.class, foo));
EntitySnapshot snapshot = ModelUtils.newSnapshot(EntitySnapshot.class, urn, aspects);

Map<Class<? extends RecordTemplate>, AspectCallbackRoutingClient> preUpdateMap = new HashMap<>();
preUpdateMap.put(AspectFoo.class, new SampleAspectCallbackRoutingClient());
AspectCallbackRegistry registry = new AspectCallbackRegistry(preUpdateMap);
Map<AspectCallbackMapKey, AspectCallbackRoutingClient> aspectCallbackMap = new HashMap<>();
aspectCallbackMap.put(new AspectCallbackMapKey(AspectFoo.class, urn.getEntityType()), new SampleAspectCallbackRoutingClient());
AspectCallbackRegistry registry = new AspectCallbackRegistry(aspectCallbackMap);

when(_mockLocalDAO.getAspectCallbackRegistry()).thenReturn(registry);

Expand All @@ -683,6 +684,7 @@ public void testPreUpdateRoutingWithSkipIngestionNoInLambda() throws NoSuchField
skipIngestionField.set(null, newSkipIngestionList);

FooUrn urn = makeFooUrn(1);

AspectFoo foo = new AspectFoo().setValue("foo");
List<EntityAspectUnion> aspects = Arrays.asList(ModelUtils.newAspectUnion(EntityAspectUnion.class, foo));
EntitySnapshot snapshot = ModelUtils.newSnapshot(EntitySnapshot.class, urn, aspects);
Expand Down

0 comments on commit 842691e

Please sign in to comment.