Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(dual-write): add new ingestSkipPreUpdates resource endpoints #430

Merged
merged 6 commits into from
Sep 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -853,7 +853,7 @@ public <ASPECT extends RecordTemplate> ASPECT add(@Nonnull URN urn, @Nonnull ASP
@Nonnull AuditStamp auditStamp, @Nullable IngestionTrackingContext trackingContext,
@Nullable IngestionParams ingestionParams) {
ASPECT updatedAspect = preUpdateRouting(urn, newValue);
return addSkipPreIngestionUpdates(urn, updatedAspect, auditStamp, trackingContext, ingestionParams);
return rawAdd(urn, updatedAspect, auditStamp, trackingContext, ingestionParams);
}

/**
Expand All @@ -862,7 +862,7 @@ public <ASPECT extends RecordTemplate> ASPECT add(@Nonnull URN urn, @Nonnull ASP
* Please use the regular add method linked above.
*/
@Nonnull
public <ASPECT extends RecordTemplate> ASPECT addSkipPreIngestionUpdates(@Nonnull URN urn, @Nonnull ASPECT newValue,
public <ASPECT extends RecordTemplate> ASPECT rawAdd(@Nonnull URN urn, @Nonnull ASPECT newValue,
@Nonnull AuditStamp auditStamp, @Nullable IngestionTrackingContext trackingContext,
@Nullable IngestionParams ingestionParams) {
final IngestionParams nonNullIngestionParams =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,15 @@ private Task<BackfillResult> backfillWithNewValue(@ActionParam(PARAM_URNS) @Nonn
});
}

/**
* Internal ingest method for snapshots. First execute any pre-ingestion updates. Then, route any aspects which have a registered routing
* GMS client to the respective GMS for ingestion. Finally, continue to save the aspect locally as well (i.e. dual write)
* @param snapshot snapshot to process
* @param aspectsToIgnore aspects to ignore
* @param trackingContext context for tracking ingestion health
* @param ingestionParams optional ingestion parameters
* @return Restli Task for metadata ingestion
*/
@Nonnull
@Override
protected Task<Void> ingestInternal(@Nonnull SNAPSHOT snapshot,
Expand All @@ -308,46 +317,44 @@ protected Task<Void> ingestInternal(@Nonnull SNAPSHOT snapshot,
return RestliUtils.toTask(() -> {
final URN urn = (URN) ModelUtils.getUrnFromSnapshot(snapshot);
final AuditStamp auditStamp = getAuditor().requestAuditStamp(getContext().getRawRequestContext());
ModelUtils.getAspectsFromSnapshot(snapshot).forEach(aspect -> {
if (!aspectsToIgnore.contains(aspect.getClass())) {
if (getAspectRoutingGmsClientManager().hasRegistered(aspect.getClass())) {
try {
// get the updated aspect if there is a preupdate routing lambda registered
RestliPreUpdateAspectRegistry registry = getLocalDAO().getRestliPreUpdateAspectRegistry();
if (registry != null && registry.isRegistered(aspect.getClass())) {
log.info(String.format("Executing registered pre-update routing lambda for aspect class %s.", aspect.getClass()));
aspect = preUpdateRouting(urn, aspect, registry);
log.info("PreUpdateRouting completed in ingestInternal, urn: {}, updated aspect: {}", urn, aspect);
// Get the fqcn of the aspect class
String aspectFQCN = aspect.getClass().getCanonicalName();
//TODO: META-21112: Remove this check after adding annotations at model level; to handle SKIP/PROCEED for preUpdateRouting
if (SKIP_INGESTION_FOR_ASPECTS.contains(aspectFQCN)) {
log.info("Skip ingestion in ingestInternal for urn: {}, aspectFQCN: {}", urn, aspectFQCN);
return;
}
}
if (trackingContext != null) {
getAspectRoutingGmsClientManager().getRoutingGmsClient(aspect.getClass()).ingestWithTracking(urn, aspect, trackingContext, ingestionParams);
} else {
getAspectRoutingGmsClientManager().getRoutingGmsClient(aspect.getClass()).ingest(urn, aspect);
}
// since we already called any pre-update lambdas earlier, call a simple version of BaseLocalDAO::add
// which skips pre-update lambdas.
getLocalDAO().addSkipPreIngestionUpdates(urn, aspect, auditStamp, trackingContext, ingestionParams);
} catch (Exception exception) {
log.error(
String.format("Couldn't ingest routing aspect %s for %s", aspect.getClass().getSimpleName(), urn),
exception);
}
} else {
getLocalDAO().add(urn, aspect, auditStamp, trackingContext, ingestionParams);
}
}
});
ModelUtils.getAspectsFromSnapshot(snapshot).forEach(aspect ->
ingestAspect(aspectsToIgnore, urn, aspect, trackingContext, ingestionParams, auditStamp, false));
return null;
});
}

/**
* Raw internal ingest method for snapshots which skips any pre-, intra-, or post-processing. Route any aspects which
* have a registered routing GMS client to the respective GMS for ingestion. Finally, continue to save the aspect
* locally as well (i.e. dual write)
* @param snapshot snapshot to process
* @param aspectsToIgnore aspects to ignore
* @param trackingContext context for tracking ingestion health
* @param ingestionParams optional ingestion parameters
* @return Restli Task for metadata ingestion
*/
@Nonnull
protected Task<Void> rawIngestInternal(@Nonnull SNAPSHOT snapshot,
@Nonnull Set<Class<? extends RecordTemplate>> aspectsToIgnore, @Nullable IngestionTrackingContext trackingContext,
@Nullable IngestionParams ingestionParams) {
// TODO: META-18950: add trackingContext to BaseAspectRoutingResource. currently the param is unused.
return RestliUtils.toTask(() -> {
final URN urn = (URN) ModelUtils.getUrnFromSnapshot(snapshot);
final AuditStamp auditStamp = getAuditor().requestAuditStamp(getContext().getRawRequestContext());
ModelUtils.getAspectsFromSnapshot(snapshot).forEach(aspect ->
ingestAspect(aspectsToIgnore, urn, aspect, trackingContext, ingestionParams, auditStamp, true));
return null;
});
}

/**
* Internal ingest method for assets. First execute any pre-ingestion updates. Then, route any aspects which have a registered routing
* GMS client to the respective GMS for ingestion. Finally, continue to save the aspect locally as well (i.e. dual write)
* @param asset asset to process
* @param aspectsToIgnore aspects to ignore
* @param ingestionParams optional ingestion parameters
* @return Restli Task for metadata ingestion
*/
@Nonnull
@Override
protected Task<Void> ingestInternalAsset(@Nonnull ASSET asset,
Expand All @@ -359,43 +366,90 @@ protected Task<Void> ingestInternalAsset(@Nonnull ASSET asset,
final AuditStamp auditStamp = getAuditor().requestAuditStamp(getContext().getRawRequestContext());
IngestionTrackingContext trackingContext =
ingestionParams != null ? ingestionParams.getIngestionTrackingContext() : null;
ModelUtils.getAspectsFromAsset(asset).forEach(aspect -> {
if (!aspectsToIgnore.contains(aspect.getClass())) {
if (getAspectRoutingGmsClientManager().hasRegistered(aspect.getClass())) {
try {
// get the updated aspect if there is a preupdate routing lambda registered
RestliPreUpdateAspectRegistry registry = getLocalDAO().getRestliPreUpdateAspectRegistry();
if (registry != null && registry.isRegistered(aspect.getClass())) {
log.info(String.format("Executing registered pre-update routing lambda for aspect class %s.", aspect.getClass()));
aspect = preUpdateRouting(urn, aspect, registry);
log.info("PreUpdateRouting completed in ingestInternalAsset, urn: {}, updated aspect: {}", urn, aspect);
// Get the fqcn of the aspect class
String aspectFQCN = aspect.getClass().getCanonicalName();
//TODO: META-21112: Remove this check after adding annotations at model level; to handle SKIP/PROCEED for preUpdateRouting
if (SKIP_INGESTION_FOR_ASPECTS.contains(aspectFQCN)) {
log.info("Skip ingestion in ingestInternalAsset for urn: {}, aspectFQCN: {}", urn, aspectFQCN);
return;
}
}
if (trackingContext != null) {
getAspectRoutingGmsClientManager().getRoutingGmsClient(aspect.getClass())
.ingestWithTracking(urn, aspect, trackingContext, ingestionParams);
} else {
getAspectRoutingGmsClientManager().getRoutingGmsClient(aspect.getClass()).ingest(urn, aspect);
}
// since we already called any pre-update lambdas earlier, call a simple version of BaseLocalDAO::add
// which skips pre-update lambdas.
getLocalDAO().addSkipPreIngestionUpdates(urn, aspect, auditStamp, trackingContext, ingestionParams);
} catch (Exception exception) {
log.error("Couldn't ingest routing aspect {} for {}", aspect.getClass().getSimpleName(), urn, exception);
ModelUtils.getAspectsFromAsset(asset).forEach(aspect ->
ingestAspect(aspectsToIgnore, urn, aspect, trackingContext, ingestionParams, auditStamp, false));
return null;
});
}

/**
* Raw internal ingest method for assets which skips any pre-, intra-, or post-processing. Route any aspects which
* have a registered routing GMS client to the respective GMS for ingestion. Finally, continue to save the aspect
* locally as well (i.e. dual write)
* @param asset asset to process
* @param aspectsToIgnore aspects to ignore
* @param ingestionParams optional ingestion parameters
* @return Restli Task for metadata ingestion
*/
@Nonnull
protected Task<Void> rawIngestInternalAsset(@Nonnull ASSET asset,
@Nonnull Set<Class<? extends RecordTemplate>> aspectsToIgnore,
@Nullable IngestionParams ingestionParams) {
// TODO: META-18950: add trackingContext to BaseAspectRoutingResource. currently the param is unused.
return RestliUtils.toTask(() -> {
final URN urn = (URN) ModelUtils.getUrnFromAsset(asset);
final AuditStamp auditStamp = getAuditor().requestAuditStamp(getContext().getRawRequestContext());
IngestionTrackingContext trackingContext =
ingestionParams != null ? ingestionParams.getIngestionTrackingContext() : null;
ModelUtils.getAspectsFromAsset(asset).forEach(aspect ->
ingestAspect(aspectsToIgnore, urn, aspect, trackingContext, ingestionParams, auditStamp, true));
return null;
});
}

/**
* Helper function to ingest an aspect either via routing or locally (or both). There is a flag that can be toggled
* to indicate whether to execute pre-, intra-, or post-ingestion updates if they exist.
* @param aspectsToIgnore set of aspect classes to ignore, if any
* @param urn urn associated with the aspect to ingest
* @param aspect aspect to ingest
* @param trackingContext context for tracking ingestion health
* @param ingestionParams optional ingestion parameters
* @param auditStamp audit information of the update
* @param skipExtraProcessing flag to indicate whether to execute pre-, intra-, or post-ingestion updates
*/
private void ingestAspect(Set<Class<? extends RecordTemplate>> aspectsToIgnore, Urn urn, RecordTemplate aspect,
IngestionTrackingContext trackingContext, IngestionParams ingestionParams, AuditStamp auditStamp,
boolean skipExtraProcessing) {
if (!aspectsToIgnore.contains(aspect.getClass())) {
if (getAspectRoutingGmsClientManager().hasRegistered(aspect.getClass())) {
try {
// get the updated aspect if there is a preupdate routing lambda registered
RestliPreUpdateAspectRegistry registry = getLocalDAO().getRestliPreUpdateAspectRegistry();
if (!skipExtraProcessing && registry != null && registry.isRegistered(aspect.getClass())) {
log.info(String.format("Executing registered pre-update routing lambda for aspect class %s.", aspect.getClass()));
aspect = preUpdateRouting((URN) urn, aspect, registry);
log.info("PreUpdateRouting completed in ingestInternalAsset, urn: {}, updated aspect: {}", urn, aspect);
// Get the fqcn of the aspect class
String aspectFQCN = aspect.getClass().getCanonicalName();
//TODO: META-21112: Remove this check after adding annotations at model level; to handle SKIP/PROCEED for preUpdateRouting
if (SKIP_INGESTION_FOR_ASPECTS.contains(aspectFQCN)) {
log.info("Skip ingestion in ingestInternalAsset for urn: {}, aspectFQCN: {}", urn, aspectFQCN);
return;
}
}
if (trackingContext != null) {
getAspectRoutingGmsClientManager().getRoutingGmsClient(aspect.getClass())
.ingestWithTracking(urn, aspect, trackingContext, ingestionParams);
} else {
getLocalDAO().add(urn, aspect, auditStamp, trackingContext, ingestionParams);
getAspectRoutingGmsClientManager().getRoutingGmsClient(aspect.getClass()).ingest(urn, aspect);
}
// here, always call a simple version of BaseLocalDAO::add which skips pre-update lambdas regardless of
// the value of param skipExtraProcessing since any pre-update lambdas would have already been executed
// in the code above.
getLocalDAO().rawAdd((URN) urn, aspect, auditStamp, trackingContext, ingestionParams);
} catch (Exception exception) {
log.error("Couldn't ingest routing aspect {} for {}", aspect.getClass().getSimpleName(), urn, exception);
}
});
return null;
});
} else {
if (skipExtraProcessing) {
// call a simple version of BaseLocalDAO::add which skips pre-update lambdas.
getLocalDAO().rawAdd((URN) urn, aspect, auditStamp, trackingContext, ingestionParams);
} else {
getLocalDAO().add((URN) urn, aspect, auditStamp, trackingContext, ingestionParams);
}
}
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,23 @@ public Task<Void> ingestWithTracking(@ActionParam(PARAM_SNAPSHOT) @Nonnull SNAPS
}

/**
* An action method for automated ingestion pipeline.
* Deprecated to use {@link #rawIngestAsset(RecordTemplate, IngestionParams)} instead.
* Same as {@link #ingestWithTracking(RecordTemplate, IngestionTrackingContext, IngestionParams)} but skips any pre-ingestion updates.
* @param snapshot Snapshot of the metadata change to be ingested
* @param trackingContext {@link IngestionTrackingContext} to 1) track DAO-level metrics and 2) to pass on to MAE emission
* @return ingest task
*/
@Deprecated
@Action(name = ACTION_RAW_INGEST)
@Nonnull
public Task<Void> rawIngest(@ActionParam(PARAM_SNAPSHOT) @Nonnull SNAPSHOT snapshot,
@ActionParam(PARAM_TRACKING_CONTEXT) @Nonnull IngestionTrackingContext trackingContext,
@Optional @ActionParam(PARAM_INGESTION_PARAMS) IngestionParams ingestionParams) {
return rawIngestInternal(snapshot, Collections.emptySet(), trackingContext, ingestionParams);
}

/**
* An action method for automated ingestion pipeline, also called high-level write.
* @param asset Asset of the metadata change to be ingested
* @return ingest task
*/
Expand All @@ -342,6 +358,26 @@ public Task<Void> ingestAsset(@ActionParam(PARAM_ASSET) @Nonnull ASSET asset,
return ingestInternalAsset(asset, Collections.emptySet(), ingestionParams);
}

/**
* An action method for automated ingestion pipeline which skips any pre-ingestion updates, also called low-level write.
* @param asset Asset of the metadata change to be ingested
* @return ingest task
*/
@Action(name = ACTION_RAW_INGEST_ASSET)
@Nonnull
public Task<Void> rawIngestAsset(@ActionParam(PARAM_ASSET) @Nonnull ASSET asset,
@Optional @ActionParam(PARAM_INGESTION_PARAMS) IngestionParams ingestionParams) {
return rawIngestAssetInternal(asset, Collections.emptySet(), ingestionParams);
}

/**
* Internal ingest method for snapshots. First execute any pre-ingestion updates. Then, save the aspect locally.
* @param snapshot snapshot to process
* @param aspectsToIgnore aspects to ignore
* @param trackingContext context for tracking ingestion health
* @param ingestionParams optional ingestion parameters
* @return Restli Task for metadata ingestion
*/
@Nonnull
protected Task<Void> ingestInternal(@Nonnull SNAPSHOT snapshot,
@Nonnull Set<Class<? extends RecordTemplate>> aspectsToIgnore, @Nullable IngestionTrackingContext trackingContext,
Expand All @@ -358,6 +394,37 @@ protected Task<Void> ingestInternal(@Nonnull SNAPSHOT snapshot,
});
}

/**
* Raw internal ingest method for snapshots which skips any pre-, intra-, or post-processing. Save the aspect locally.
* @param snapshot snapshot to process
* @param aspectsToIgnore aspects to ignore
* @param trackingContext context for tracking ingestion health
* @param ingestionParams optional ingestion parameters
* @return Restli Task for metadata ingestion
*/
@Nonnull
protected Task<Void> rawIngestInternal(@Nonnull SNAPSHOT snapshot,
@Nonnull Set<Class<? extends RecordTemplate>> aspectsToIgnore, @Nullable IngestionTrackingContext trackingContext,
@Nullable IngestionParams ingestionParams) {
return RestliUtils.toTask(() -> {
final URN urn = (URN) ModelUtils.getUrnFromSnapshot(snapshot);
final AuditStamp auditStamp = getAuditor().requestAuditStamp(getContext().getRawRequestContext());
ModelUtils.getAspectsFromSnapshot(snapshot).stream().forEach(aspect -> {
if (!aspectsToIgnore.contains(aspect.getClass())) {
getLocalDAO().rawAdd(urn, aspect, auditStamp, trackingContext, ingestionParams);
}
});
return null;
});
}

/**
* Internal ingest method for assets. First execute any pre-ingestion updates. Then, save the aspect locally.
* @param asset asset to process
* @param aspectsToIgnore aspects to ignore
* @param ingestionParams optional ingestion parameters
* @return Restli Task for metadata ingestion
*/
@Nonnull
protected Task<Void> ingestInternalAsset(@Nonnull ASSET asset,
@Nonnull Set<Class<? extends RecordTemplate>> aspectsToIgnore,
Expand All @@ -376,6 +443,31 @@ protected Task<Void> ingestInternalAsset(@Nonnull ASSET asset,
});
}

/**
* Raw internal ingest method for assets which skips any pre-, intra-, or post-processing. Save the aspect locally.
* @param asset asset to process
* @param aspectsToIgnore aspects to ignore
* @param ingestionParams optional ingestion parameters
* @return Restli Task for metadata ingestion
*/
@Nonnull
protected Task<Void> rawIngestAssetInternal(@Nonnull ASSET asset,
@Nonnull Set<Class<? extends RecordTemplate>> aspectsToIgnore,
@Nullable IngestionParams ingestionParams) {
return RestliUtils.toTask(() -> {
final URN urn = (URN) ModelUtils.getUrnFromAsset(asset);
final AuditStamp auditStamp = getAuditor().requestAuditStamp(getContext().getRawRequestContext());
IngestionTrackingContext ingestionTrackingContext =
ingestionParams != null ? ingestionParams.getIngestionTrackingContext() : null;
ModelUtils.getAspectsFromAsset(asset).stream().forEach(aspect -> {
if (!aspectsToIgnore.contains(aspect.getClass())) {
getLocalDAO().rawAdd(urn, aspect, auditStamp, ingestionTrackingContext, ingestionParams);
}
});
return null;
});
}

/**
* Deprecated to use {@link #getAsset(String, String[])} instead.
* An action method for getting a snapshot of aspects for an entity.
Expand Down
Loading
Loading