Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(entity-service): no-op batches #12047

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ public class Constants {
// App sources
public static final String UI_SOURCE = "ui";
public static final String SYSTEM_UPDATE_SOURCE = "systemUpdate";
public static final String METADATA_TESTS_SOURCE = "metadataTests";

/** Entities */
public static final String CORP_USER_ENTITY_NAME = "corpuser";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -855,6 +855,7 @@ private List<UpdateAspectResult> ingestAspectsToLocalDB(

if (inputBatch.containsDuplicateAspects()) {
log.warn(String.format("Batch contains duplicates: %s", inputBatch));
MetricUtils.counter(EntityServiceImpl.class, "batch_with_duplicate").inc();
}

return aspectDao
Expand Down Expand Up @@ -928,13 +929,15 @@ private List<UpdateAspectResult> ingestAspectsToLocalDB(

// No changes, return
if (changeMCPs.isEmpty()) {
MetricUtils.counter(EntityServiceImpl.class, "batch_empty").inc();
return Collections.<UpdateAspectResult>emptyList();
}

// do final pre-commit checks with previous aspect value
ValidationExceptionCollection exceptions =
AspectsBatch.validatePreCommit(changeMCPs, opContext.getRetrieverContext().get());
if (!exceptions.isEmpty()) {
MetricUtils.counter(EntityServiceImpl.class, "batch_validation_exception").inc();
throw new ValidationException(collectMetrics(exceptions).toString());
}

Expand Down Expand Up @@ -972,10 +975,13 @@ This condition is specifically for an older conditional write ingestAspectIfNotP
*/
if (overwrite || databaseAspect == null) {
result =
ingestAspectToLocalDB(txContext, writeItem, databaseSystemAspect)
.toBuilder()
.request(writeItem)
.build();
Optional.ofNullable(
ingestAspectToLocalDB(
txContext, writeItem, databaseSystemAspect))
.map(
optResult ->
optResult.toBuilder().request(writeItem).build())
.orElse(null);

} else {
RecordTemplate oldValue = databaseSystemAspect.getRecordTemplate();
Expand All @@ -996,49 +1002,56 @@ This condition is specifically for an older conditional write ingestAspectIfNotP

return result;
})
.filter(Objects::nonNull)
.collect(Collectors.toList());

// commit upserts prior to retention or kafka send, if supported by impl
if (txContext != null) {
txContext.commitAndContinue();
}
long took = TimeUnit.NANOSECONDS.toMillis(ingestToLocalDBTimer.stop());
if (took > DB_TIMER_LOG_THRESHOLD_MS) {
log.info("Ingestion of aspects batch to database took {} ms", took);
}
if (!upsertResults.isEmpty()) {
// commit upserts prior to retention or kafka send, if supported by impl
if (txContext != null) {
txContext.commitAndContinue();
}
long took = TimeUnit.NANOSECONDS.toMillis(ingestToLocalDBTimer.stop());
if (took > DB_TIMER_LOG_THRESHOLD_MS) {
log.info("Ingestion of aspects batch to database took {} ms", took);
}

// Retention optimization and tx
if (retentionService != null) {
List<RetentionService.RetentionContext> retentionBatch =
upsertResults.stream()
// Only consider retention when there was a previous version
.filter(
result ->
batchAspects.containsKey(result.getUrn().toString())
&& batchAspects
.get(result.getUrn().toString())
.containsKey(result.getRequest().getAspectName()))
.filter(
result -> {
RecordTemplate oldAspect = result.getOldValue();
RecordTemplate newAspect = result.getNewValue();
// Apply retention policies if there was an update to existing aspect
// value
return oldAspect != newAspect
&& oldAspect != null
&& retentionService != null;
})
.map(
result ->
RetentionService.RetentionContext.builder()
.urn(result.getUrn())
.aspectName(result.getRequest().getAspectName())
.maxVersion(Optional.of(result.getMaxVersion()))
.build())
.collect(Collectors.toList());
retentionService.applyRetentionWithPolicyDefaults(opContext, retentionBatch);
// Retention optimization and tx
if (retentionService != null) {
List<RetentionService.RetentionContext> retentionBatch =
upsertResults.stream()
// Only consider retention when there was a previous version
.filter(
result ->
batchAspects.containsKey(result.getUrn().toString())
&& batchAspects
.get(result.getUrn().toString())
.containsKey(result.getRequest().getAspectName()))
.filter(
result -> {
RecordTemplate oldAspect = result.getOldValue();
RecordTemplate newAspect = result.getNewValue();
// Apply retention policies if there was an update to existing
// aspect
// value
return oldAspect != newAspect
&& oldAspect != null
&& retentionService != null;
})
.map(
result ->
RetentionService.RetentionContext.builder()
.urn(result.getUrn())
.aspectName(result.getRequest().getAspectName())
.maxVersion(Optional.of(result.getMaxVersion()))
.build())
.collect(Collectors.toList());
retentionService.applyRetentionWithPolicyDefaults(opContext, retentionBatch);
} else {
log.warn("Retention service is missing!");
}
} else {
log.warn("Retention service is missing!");
MetricUtils.counter(EntityServiceImpl.class, "batch_empty_transaction").inc();
log.warn("Empty transaction detected. {}", inputBatch);
}

return upsertResults;
Expand Down Expand Up @@ -2506,7 +2519,7 @@ private Map<EntityAspectIdentifier, EnvelopedAspect> getEnvelopedAspects(
* @param databaseAspect The aspect as it exists in the database.
* @return result object
*/
@Nonnull
@Nullable
private UpdateAspectResult ingestAspectToLocalDB(
@Nullable TransactionContext txContext,
@Nonnull final ChangeMCP writeItem,
Expand All @@ -2520,6 +2533,9 @@ private UpdateAspectResult ingestAspectToLocalDB(
.setLastRunId(writeItem.getSystemMetadata().getRunId(GetMode.NULL), SetMode.IGNORE_NULL);

// 2. Compare the latest existing and new.
final RecordTemplate databaseValue =
databaseAspect == null ? null : databaseAspect.getRecordTemplate();

final EntityAspect.EntitySystemAspect previousBatchAspect =
(EntityAspect.EntitySystemAspect) writeItem.getPreviousSystemAspect();
final RecordTemplate previousValue =
Expand All @@ -2528,7 +2544,7 @@ private UpdateAspectResult ingestAspectToLocalDB(
// 3. If there is no difference between existing and new, we just update
// the lastObserved in system metadata. RunId should stay as the original runId
if (previousValue != null
&& DataTemplateUtil.areEqual(previousValue, writeItem.getRecordTemplate())) {
&& DataTemplateUtil.areEqual(databaseValue, writeItem.getRecordTemplate())) {

SystemMetadata latestSystemMetadata = previousBatchAspect.getSystemMetadata();
latestSystemMetadata.setLastObserved(writeItem.getSystemMetadata().getLastObserved());
Expand Down Expand Up @@ -2564,45 +2580,49 @@ private UpdateAspectResult ingestAspectToLocalDB(
}

// 4. Save the newValue as the latest version
log.debug(
"Ingesting aspect with name {}, urn {}", writeItem.getAspectName(), writeItem.getUrn());
String newValueStr = EntityApiUtils.toJsonAspect(writeItem.getRecordTemplate());
long versionOfOld =
aspectDao.saveLatestAspect(
txContext,
writeItem.getUrn().toString(),
writeItem.getAspectName(),
previousBatchAspect == null ? null : EntityApiUtils.toJsonAspect(previousValue),
previousBatchAspect == null ? null : previousBatchAspect.getCreatedBy(),
previousBatchAspect == null
? null
: previousBatchAspect.getEntityAspect().getCreatedFor(),
previousBatchAspect == null ? null : previousBatchAspect.getCreatedOn(),
previousBatchAspect == null ? null : previousBatchAspect.getSystemMetadataRaw(),
newValueStr,
writeItem.getAuditStamp().getActor().toString(),
writeItem.getAuditStamp().hasImpersonator()
? writeItem.getAuditStamp().getImpersonator().toString()
: null,
new Timestamp(writeItem.getAuditStamp().getTime()),
EntityApiUtils.toJsonAspect(writeItem.getSystemMetadata()),
writeItem.getNextAspectVersion());

// metrics
aspectDao.incrementWriteMetrics(
writeItem.getAspectName(), 1, newValueStr.getBytes(StandardCharsets.UTF_8).length);

return UpdateAspectResult.builder()
.urn(writeItem.getUrn())
.oldValue(previousValue)
.newValue(writeItem.getRecordTemplate())
.oldSystemMetadata(
previousBatchAspect == null ? null : previousBatchAspect.getSystemMetadata())
.newSystemMetadata(writeItem.getSystemMetadata())
.operation(MetadataAuditOperation.UPDATE)
.auditStamp(writeItem.getAuditStamp())
.maxVersion(versionOfOld)
.build();
if (!DataTemplateUtil.areEqual(databaseValue, writeItem.getRecordTemplate())) {
log.debug(
"Ingesting aspect with name {}, urn {}", writeItem.getAspectName(), writeItem.getUrn());
String newValueStr = EntityApiUtils.toJsonAspect(writeItem.getRecordTemplate());
long versionOfOld =
aspectDao.saveLatestAspect(
txContext,
writeItem.getUrn().toString(),
writeItem.getAspectName(),
previousBatchAspect == null ? null : EntityApiUtils.toJsonAspect(previousValue),
previousBatchAspect == null ? null : previousBatchAspect.getCreatedBy(),
previousBatchAspect == null
? null
: previousBatchAspect.getEntityAspect().getCreatedFor(),
previousBatchAspect == null ? null : previousBatchAspect.getCreatedOn(),
previousBatchAspect == null ? null : previousBatchAspect.getSystemMetadataRaw(),
newValueStr,
writeItem.getAuditStamp().getActor().toString(),
writeItem.getAuditStamp().hasImpersonator()
? writeItem.getAuditStamp().getImpersonator().toString()
: null,
new Timestamp(writeItem.getAuditStamp().getTime()),
EntityApiUtils.toJsonAspect(writeItem.getSystemMetadata()),
writeItem.getNextAspectVersion());

// metrics
aspectDao.incrementWriteMetrics(
writeItem.getAspectName(), 1, newValueStr.getBytes(StandardCharsets.UTF_8).length);

return UpdateAspectResult.builder()
.urn(writeItem.getUrn())
.oldValue(previousValue)
.newValue(writeItem.getRecordTemplate())
.oldSystemMetadata(
previousBatchAspect == null ? null : previousBatchAspect.getSystemMetadata())
.newSystemMetadata(writeItem.getSystemMetadata())
.operation(MetadataAuditOperation.UPDATE)
.auditStamp(writeItem.getAuditStamp())
.maxVersion(versionOfOld)
.build();
}

return null;
}

private static boolean shouldAspectEmitChangeLog(@Nonnull final AspectSpec aspectSpec) {
Expand Down
Loading
Loading