Skip to content

Commit

Permalink
feat(local relationship): support local relationship tables in old sc…
Browse files Browse the repository at this point in the history
…hema (#391)

* feat(local relationship): support local relationship tables in old schema

* fix unit tests

* minor changes; address first round of comments

* rebase

* delete an unused var

* address comments - add some better javadocs

* add javadocs to findEntities

* fix unit tests

---------

Co-authored-by: Justin Donn <[email protected]>
  • Loading branch information
jsdonn and Justin Donn authored Aug 15, 2024
1 parent a884353 commit be56ee0
Show file tree
Hide file tree
Showing 18 changed files with 595 additions and 388 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -895,9 +895,8 @@ protected abstract <ASPECT extends RecordTemplate> long saveLatest(@Nonnull URN
*
* @param urn the URN for the entity the aspect (which the local relationship is derived from) is attached to
* @param aspectClass class of the aspect to backfill
* @return A list of local relationship updates executed.
*/
public abstract <ASPECT extends RecordTemplate> List<LocalRelationshipUpdates> backfillLocalRelationshipsFromEntityTables(
public abstract <ASPECT extends RecordTemplate> List<LocalRelationshipUpdates> backfillLocalRelationships(
@Nonnull URN urn, @Nonnull Class<ASPECT> aspectClass);

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ public <ASPECT extends RecordTemplate> void updateEntityTables(@Nonnull FooUrn u
}

@Override
public <ASPECT extends RecordTemplate> List<LocalRelationshipUpdates> backfillLocalRelationshipsFromEntityTables(
public <ASPECT extends RecordTemplate> List<LocalRelationshipUpdates> backfillLocalRelationships(
@Nonnull FooUrn urn, @Nonnull Class<ASPECT> aspectClass) {
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@
import com.linkedin.data.template.RecordTemplate;
import com.linkedin.data.template.SetMode;
import com.linkedin.metadata.aspect.AuditedAspect;
import com.linkedin.metadata.dao.builder.BaseLocalRelationshipBuilder.LocalRelationshipUpdates;
import com.linkedin.metadata.dao.builder.LocalRelationshipBuilderRegistry;
import com.linkedin.metadata.dao.urnpath.EmptyPathExtractor;
import com.linkedin.metadata.dao.urnpath.UrnPathExtractor;
import com.linkedin.metadata.dao.utils.EBeanDAOUtils;
Expand All @@ -33,7 +31,6 @@
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
Expand Down Expand Up @@ -64,8 +61,6 @@ public class EbeanLocalAccess<URN extends Urn> implements IEbeanLocalAccess<URN>
private final Class<URN> _urnClass;
private final String _entityType;
private UrnPathExtractor<URN> _urnPathExtractor;
private final EbeanLocalRelationshipWriterDAO _localRelationshipWriterDAO;
private LocalRelationshipBuilderRegistry _localRelationshipBuilderRegistry;
private final SchemaEvolutionManager _schemaEvolutionManager;
private final boolean _nonDollarVirtualColumnsEnabled;

Expand All @@ -84,7 +79,6 @@ public EbeanLocalAccess(EbeanServer server, ServerConfig serverConfig, @Nonnull
_urnClass = urnClass;
_urnPathExtractor = urnPathExtractor;
_entityType = ModelUtils.getEntityTypeFromUrnClass(_urnClass);
_localRelationshipWriterDAO = new EbeanLocalRelationshipWriterDAO(_server);
_schemaEvolutionManager = createSchemaEvolutionManager(serverConfig);
_nonDollarVirtualColumnsEnabled = nonDollarVirtualColumnsEnabled;
}
Expand Down Expand Up @@ -138,25 +132,9 @@ public <ASPECT extends RecordTemplate> int addWithOptimisticLocking(

// newValue is null if aspect is to be soft-deleted.
if (newValue == null) {
/*
TODO:
Local relationship is derived from an aspect. If an aspect metadata is deleted, then the local relationships derived from it
should also be invalidated. But how this invalidation process should work is still unclear. We can re-visited this part
once we see clear use case. For now, to prevent inconsistency between entity table and local relationship table, we do not allow
an aspect to be deleted if there's local relationship being derived from it.
*/
if (_localRelationshipBuilderRegistry != null && _localRelationshipBuilderRegistry.isRegistered(aspectClass)) {
throw new UnsupportedOperationException(
String.format("Aspect %s cannot be soft-deleted because it has a local relationship builder registered.",
aspectClass.getCanonicalName()));
}

return sqlUpdate.setParameter("metadata", DELETED_VALUE).execute();
}

// Add local relationships if builder is provided.
addRelationships(urn, newValue, aspectClass);

AuditedAspect auditedAspect = new AuditedAspect()
.setAspect(RecordUtils.toJsonString(newValue))
.setCanonicalName(aspectClass.getCanonicalName())
Expand All @@ -172,18 +150,6 @@ public <ASPECT extends RecordTemplate> int addWithOptimisticLocking(
return sqlUpdate.setParameter("metadata", metadata).execute();
}

@Override
public <ASPECT extends RecordTemplate> List<LocalRelationshipUpdates> addRelationships(@Nonnull URN urn,
@Nonnull ASPECT aspect, @Nonnull Class<ASPECT> aspectClass) {
if (_localRelationshipBuilderRegistry != null && _localRelationshipBuilderRegistry.isRegistered(aspectClass)) {
List<LocalRelationshipUpdates> localRelationshipUpdates =
_localRelationshipBuilderRegistry.getLocalRelationshipBuilder(aspect).buildRelationships(urn, aspect);
_localRelationshipWriterDAO.processLocalRelationshipUpdates(urn, localRelationshipUpdates);
return localRelationshipUpdates;
}
return new ArrayList<>();
}

/**
* Construct and execute a SQL statement as follows.
* SELECT urn, aspect1, lastmodifiedon, lastmodifiedby FROM metadata_entity_foo WHERE urn = 'urn:1' AND JSON_EXTRACT(aspect1, '$.gma_deleted') IS NULL
Expand Down Expand Up @@ -523,13 +489,6 @@ private String toJsonString(@Nonnull URN urn) {
return JSONObject.toJSONString(pathValueMap);
}

/**
* Set local relationship builder registry.
*/
public void setLocalRelationshipBuilderRegistry(@Nullable LocalRelationshipBuilderRegistry localRelationshipBuilderRegistry) {
_localRelationshipBuilderRegistry = localRelationshipBuilderRegistry;
}

@Nonnull
private SchemaEvolutionManager createSchemaEvolutionManager(@Nonnull ServerConfig serverConfig) {
SchemaEvolutionManager.Config config = new SchemaEvolutionManager.Config(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,10 @@ public class EbeanLocalDAO<ASPECT_UNION extends UnionTemplate, URN extends Urn>
private final static int DEFAULT_BATCH_SIZE = 50;
private int _queryKeysCount = DEFAULT_BATCH_SIZE;
private IEbeanLocalAccess<URN> _localAccess;
private EbeanLocalRelationshipWriterDAO _localRelationshipWriterDAO;
private LocalRelationshipBuilderRegistry _localRelationshipBuilderRegistry = null;

private RelationshipSource _relationshipSource = RelationshipSource.RELATIONSHIP_BUILDERS;
private SchemaConfig _schemaConfig = SchemaConfig.OLD_SCHEMA_ONLY;
private final EBeanDAOConfig _eBeanDAOConfig = new EBeanDAOConfig();

Expand All @@ -92,6 +96,15 @@ public enum SchemaConfig {
DUAL_SCHEMA // Write to both the old and new tables and perform a comparison between values when reading
}

/**
* Where to look for the relationship(s) for ingestion. Either extract the relationships from the aspect metadata or
* from the local relationship registry (legacy), which uses relationship builders.
*/
public enum RelationshipSource {
ASPECT_METADATA, // Look at the aspect model and extract any relationship from its fields
RELATIONSHIP_BUILDERS // Use the relationship registry, which relies on relationship builders
}

// Which approach to be used for record retrieval when inserting a new record
// See GCN-38382
private FindMethodology _findMethodology = FindMethodology.UNIQUE_ID;
Expand Down Expand Up @@ -120,6 +133,15 @@ public boolean isChangeLogEnabled() {
return _changeLogEnabled;
}

/**
* Set where the relationships should be derived from during ingestion. Either from aspect models or from relationship
* builders. The default is relationship builders. This should only be used during DAO instantiation i.e. in the DAO factory.
* @param relationshipSource {@link RelationshipSource ASPECT_METADATA or RELATIONSHIP_BUILDERS}
*/
public void setRelationshipSource(RelationshipSource relationshipSource) {
_relationshipSource = relationshipSource;
}

public void setOverwriteLatestVersionEnabled(boolean overwriteLatestVersionEnabled) {
if (_schemaConfig == SchemaConfig.NEW_SCHEMA_ONLY) {
if (isChangeLogEnabled()) {
Expand Down Expand Up @@ -382,13 +404,15 @@ private EbeanLocalDAO(@Nonnull Class<ASPECT_UNION> aspectUnionClass, @Nonnull Ba
super(aspectUnionClass, producer, urnClass, new EmptyPathExtractor<>());
_server = server;
_urnClass = urnClass;
_localRelationshipWriterDAO = new EbeanLocalRelationshipWriterDAO(_server);
}

private EbeanLocalDAO(@Nonnull Class<ASPECT_UNION> aspectUnionClass, @Nonnull BaseTrackingMetadataEventProducer producer,
@Nonnull EbeanServer server, @Nonnull Class<URN> urnClass, @Nonnull BaseTrackingManager trackingManager) {
super(aspectUnionClass, producer, trackingManager, urnClass, new EmptyPathExtractor<>());
_server = server;
_urnClass = urnClass;
_localRelationshipWriterDAO = new EbeanLocalRelationshipWriterDAO(_server);
}
private EbeanLocalDAO(@Nonnull Class<ASPECT_UNION> aspectUnionClass, @Nonnull BaseMetadataEventProducer producer,
@Nonnull EbeanServer server, @Nonnull ServerConfig serverConfig, @Nonnull Class<URN> urnClass, @Nonnull SchemaConfig schemaConfig) {
Expand Down Expand Up @@ -453,6 +477,7 @@ private EbeanLocalDAO(@Nonnull Class<ASPECT_UNION> aspectUnionClass,
super(producer, storageConfig, urnClass, urnPathExtractor);
_server = server;
_urnClass = urnClass;
_localRelationshipWriterDAO = new EbeanLocalRelationshipWriterDAO(_server);
}

private EbeanLocalDAO(@Nonnull BaseTrackingMetadataEventProducer producer, @Nonnull EbeanServer server,
Expand All @@ -461,6 +486,7 @@ private EbeanLocalDAO(@Nonnull BaseTrackingMetadataEventProducer producer, @Nonn
super(producer, storageConfig, trackingManager, urnClass, urnPathExtractor);
_server = server;
_urnClass = urnClass;
_localRelationshipWriterDAO = new EbeanLocalRelationshipWriterDAO(_server);
}

private EbeanLocalDAO(@Nonnull BaseMetadataEventProducer producer, @Nonnull EbeanServer server,
Expand Down Expand Up @@ -570,6 +596,24 @@ protected <T> T runInTransactionWithRetry(@Nonnull Supplier<T> block, int maxTra
protected <ASPECT extends RecordTemplate> long saveLatest(@Nonnull URN urn, @Nonnull Class<ASPECT> aspectClass,
@Nullable ASPECT oldValue, @Nullable AuditStamp oldAuditStamp, @Nullable ASPECT newValue,
@Nonnull AuditStamp newAuditStamp, boolean isSoftDeleted, @Nullable IngestionTrackingContext trackingContext) {
// First, check that if the aspect is going to be soft-deleted that it does not have any relationships derived from it.
// We currently don't support soft-deleting aspects from which local relationships are derived from.
if (newValue == null) {
if (_relationshipSource == RelationshipSource.RELATIONSHIP_BUILDERS
&& _localRelationshipBuilderRegistry != null
&& _localRelationshipBuilderRegistry.isRegistered(aspectClass)) {
throw new UnsupportedOperationException(
String.format("Aspect %s cannot be soft-deleted because it has a local relationship builder registered.",
aspectClass.getCanonicalName()));
}

if (_relationshipSource == RelationshipSource.ASPECT_METADATA) {
// TODO: not yet implemented
throw new UnsupportedOperationException("This method has not been implemented yet to support the "
+ "ASPECT_METADATA RelationshipSource type yet.");
}
}

// Save oldValue as the largest version + 1
long largestVersion = 0;
if ((isSoftDeleted || oldValue != null) && oldAuditStamp != null && _changeLogEnabled) {
Expand Down Expand Up @@ -608,6 +652,9 @@ protected <ASPECT extends RecordTemplate> long saveLatest(@Nonnull URN urn, @Non
insert(urn, newValue, aspectClass, newAuditStamp, LATEST_VERSION, trackingContext);
}

// Add any local relationships that are derived from the aspect.
addRelationshipsIfAny(urn, newValue, aspectClass);

return largestVersion;
}

Expand All @@ -629,19 +676,19 @@ public <ASPECT extends RecordTemplate> void updateEntityTables(@Nonnull URN urn,
}, 1);
}

public <ASPECT extends RecordTemplate> List<LocalRelationshipUpdates> backfillLocalRelationshipsFromEntityTables(
public <ASPECT extends RecordTemplate> List<LocalRelationshipUpdates> backfillLocalRelationships(
@Nonnull URN urn, @Nonnull Class<ASPECT> aspectClass) {
if (_schemaConfig == SchemaConfig.OLD_SCHEMA_ONLY) {
throw new UnsupportedOperationException("Local relationship tables cannot be used in OLD_SCHEMA_ONLY mode, so they cannot be backfilled.");
}
AspectKey<URN, ASPECT> key = new AspectKey<>(aspectClass, urn, LATEST_VERSION);
return runInTransactionWithRetry(() -> {
List<EbeanMetadataAspect> results = _localAccess.batchGetUnion(Collections.singletonList(key), 1, 0, false);
List<EbeanMetadataAspect> results = batchGet(Collections.singleton(key), 1);
if (results.size() == 0) {
return new ArrayList<>();
}
Optional<ASPECT> aspect = toRecordTemplate(aspectClass, results.get(0));
return aspect.map(value -> _localAccess.addRelationships(urn, value, aspectClass)).orElse(new ArrayList<>());
if (aspect.isPresent()) {
return addRelationshipsIfAny(urn, aspect.get(), aspectClass);
}
return Collections.emptyList();
}, 1);
}

Expand Down Expand Up @@ -804,6 +851,7 @@ protected <ASPECT extends RecordTemplate> void updateWithOptimisticLocking(@Nonn
protected <ASPECT extends RecordTemplate> void insert(@Nonnull URN urn, @Nullable RecordTemplate value,
@Nonnull Class<ASPECT> aspectClass, @Nonnull AuditStamp auditStamp, long version, @Nullable IngestionTrackingContext trackingContext) {


final EbeanMetadataAspect aspect = buildMetadataAspectBean(urn, value, aspectClass, auditStamp, version);
if (_schemaConfig != SchemaConfig.OLD_SCHEMA_ONLY && version == LATEST_VERSION) {
// insert() could be called when updating log table (moving current versions into new history version)
Expand All @@ -817,6 +865,34 @@ protected <ASPECT extends RecordTemplate> void insert(@Nonnull URN urn, @Nullabl
}
}

/**
* If the aspect is associated with at least one relationship, upsert the relationship into the corresponding local
* relationship table. Associated means that the aspect has a registered relationship build or it includes a relationship field.
* @param urn Urn of the metadata update
* @param aspect Aspect of the metadata update
* @param aspectClass Aspect class of the metadata update
* @return List of LocalRelationshipUpdates that were executed
*/
public <ASPECT extends RecordTemplate> List<LocalRelationshipUpdates> addRelationshipsIfAny(@Nonnull URN urn, @Nullable ASPECT aspect,
@Nonnull Class<ASPECT> aspectClass) {
if (_relationshipSource == RelationshipSource.ASPECT_METADATA) {
// TODO: not yet implemented
throw new UnsupportedOperationException("This method has not been implemented yet to support the "
+ "ASPECT_METADATA RelationshipSource type yet.");
} else if (_relationshipSource == RelationshipSource.RELATIONSHIP_BUILDERS) {
if (_localRelationshipBuilderRegistry != null && _localRelationshipBuilderRegistry.isRegistered(aspectClass)) {
List<LocalRelationshipUpdates> localRelationshipUpdates =
_localRelationshipBuilderRegistry.getLocalRelationshipBuilder(aspect).buildRelationships(urn, aspect);
_localRelationshipWriterDAO.processLocalRelationshipUpdates(urn, localRelationshipUpdates);
return localRelationshipUpdates;
}
} else {
throw new UnsupportedOperationException("Please ensure that the RelationshipSource enum is properly set using "
+ "setRelationshipSource method.");
}
return Collections.emptyList();
}

@Nonnull
Map<Class<? extends RecordTemplate>, LocalDAOStorageConfig.AspectStorageConfig> getStrongConsistentIndexPaths() {
return Collections.unmodifiableMap(new HashMap<>(_storageConfig.getAspectStorageConfigMap()));
Expand Down Expand Up @@ -961,10 +1037,14 @@ public void setQueryKeysCount(int keysCount) {
}

/**
* Set a local relationship builder registry.
* Provide a local relationship builder registry. Local relationships will be built based on the builders during data ingestion.
* If set to null, local relationship ingestion will be turned off for this particular DAO instance. This is beneficial
* in situations where some relationships are still in the process of onboarding (i.e. tables have not been created yet).
* @param localRelationshipBuilderRegistry All local relationship builders should be registered in this registry.
* Can be set to null to turn off local relationship ingestion.
*/
public void setLocalRelationshipBuilderRegistry(@Nonnull LocalRelationshipBuilderRegistry localRelationshipBuilderRegistry) {
_localAccess.setLocalRelationshipBuilderRegistry(localRelationshipBuilderRegistry);
public void setLocalRelationshipBuilderRegistry(@Nullable LocalRelationshipBuilderRegistry localRelationshipBuilderRegistry) {
_localRelationshipBuilderRegistry = localRelationshipBuilderRegistry;
}

/**
Expand Down
Loading

0 comments on commit be56ee0

Please sign in to comment.