Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ public Function<UpgradeContext, UpgradeStepResult> executable() {
_entityService.produceMetadataChangeLog(urn, entityName, aspectName, aspectSpec, null, aspectRecord, null,
latestSystemMetadata,
new AuditStamp().setActor(UrnUtils.getUrn(SYSTEM_ACTOR)).setTime(System.currentTimeMillis()),
ChangeType.UPSERT);
ChangeType.RESTATE);

totalRowsMigrated++;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ public void invoke(@Nonnull MetadataChangeLog event) {
}
Urn urn = EntityKeyUtils.getUrnFromLog(event, entitySpec.getKeyAspectSpec());

if (event.getChangeType() == ChangeType.UPSERT) {
if (event.getChangeType() == ChangeType.UPSERT || event.getChangeType() == ChangeType.RESTATE) {

if (!event.hasAspectName() || !event.hasAspect()) {
log.error("Aspect or aspect name is missing");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,14 @@ public class EntityChangeEventGeneratorHook implements MetadataChangeLogHook {
Constants.TAG_KEY_ASPECT_NAME,
Constants.STATUS_ASPECT_NAME
);
/**
* The list of change types that are supported for generating semantic change events.
*/
private static final Set<String> SUPPORTED_OPERATIONS = ImmutableSet.of(
"CREATE",
"UPSERT",
"DELETE"
);
private final AspectDifferRegistry _aspectDifferRegistry;
private final EntityClient _entityClient;
private final Authentication _systemAuthentication;
Expand Down Expand Up @@ -167,7 +175,7 @@ private <T extends RecordTemplate> List<ChangeEvent> generateChangeEvents(
}

private boolean isEligibleForProcessing(final MetadataChangeLog log) {
return SUPPORTED_ASPECT_NAMES.contains(log.getAspectName());
return SUPPORTED_OPERATIONS.contains(log.getChangeType().toString()) && SUPPORTED_ASPECT_NAMES.contains(log.getAspectName());
}

private void emitPlatformEvent(@Nonnull final PlatformEvent event, @Nonnull final String partitioningKey) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,9 @@ enum ChangeType {
* patch the changes instead of full replace
*/
PATCH

/**
* Restate an aspect, eg. in a index refresh.
*/
RESTATE
}
Original file line number Diff line number Diff line change
Expand Up @@ -1679,11 +1679,12 @@
"name" : "ChangeType",
"namespace" : "com.linkedin.events.metadata",
"doc" : "Descriptor for a change action",
"symbols" : [ "UPSERT", "CREATE", "UPDATE", "DELETE", "PATCH" ],
"symbols" : [ "UPSERT", "CREATE", "UPDATE", "DELETE", "PATCH", "RESTATE" ],
"symbolDocs" : {
"CREATE" : "NOT SUPPORTED YET\ninsert if not exists. otherwise fail",
"DELETE" : "NOT SUPPORTED YET\ndelete action",
"PATCH" : "NOT SUPPORTED YET\npatch the changes instead of full replace",
"RESTATE" : "Restate an aspect, eg. in a index refresh.",
"UPDATE" : "NOT SUPPORTED YET\nupdate if exists. otherwise fail",
"UPSERT" : "insert if not exists. otherwise update"
}
Expand Down