Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(dual schema): use aspect table as SOT for optimistic locking in DUAL_SCHEMA mode #483

Merged
merged 1 commit into from
Dec 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -601,7 +601,6 @@ protected <ASPECT extends RecordTemplate> long saveLatest(@Nonnull URN urn, @Non
// 2. write value of latest version (version = 0) as a new version
// 3. update the latest version (version = 0) with the new value. If the value of latest version has been
// changed during this process, then rollback by throwing OptimisticLockException

largestVersion = getNextVersion(urn, aspectClass);
// TODO(yanyang) added for job-gms duplicity debug, throwaway afterwards
if (log.isDebugEnabled()) {
Expand All @@ -618,7 +617,6 @@ protected <ASPECT extends RecordTemplate> long saveLatest(@Nonnull URN urn, @Non
new Timestamp(oldAuditStamp.getTime()), trackingContext, isTestMode);
} else {
// When for fresh ingestion or with changeLog disabled

// TODO(yanyang) added for job-gms duplicity debug, throwaway afterwards
if (log.isDebugEnabled()) {
if ("AzkabanFlowInfo".equals(aspectClass.getSimpleName())) {
Expand Down Expand Up @@ -680,7 +678,7 @@ public <ASPECT extends RecordTemplate> List<LocalRelationshipUpdates> backfillLo
@Nonnull Class<ASPECT> aspectClass, boolean isTestMode) {

EbeanMetadataAspect result;
if (_schemaConfig == SchemaConfig.OLD_SCHEMA_ONLY) {
if (_schemaConfig == SchemaConfig.OLD_SCHEMA_ONLY || _schemaConfig == SchemaConfig.DUAL_SCHEMA) {
final String aspectName = ModelUtils.getAspectName(aspectClass);
final PrimaryKey key = new PrimaryKey(urn.toString(), aspectName, LATEST_VERSION);
if (_findMethodology == FindMethodology.DIRECT_SQL) {
Expand All @@ -696,7 +694,7 @@ public <ASPECT extends RecordTemplate> List<LocalRelationshipUpdates> backfillLo
result = _server.find(EbeanMetadataAspect.class, key);
}
} else {
// for new schema or dual-schema, get latest data from new schema. (Resolving the read de-coupling issue)
// for new schema, get latest data from the new schema entity table. (Resolving the read de-coupling issue)
final List<EbeanMetadataAspect> results =
_localAccess.batchGetUnion(Collections.singletonList(new AspectKey<>(aspectClass, urn, LATEST_VERSION)), 1, 0,
true, isTestMode);
Expand Down Expand Up @@ -801,9 +799,9 @@ protected <ASPECT extends RecordTemplate> void updateWithOptimisticLocking(@Nonn
// ensure atomicity by running old schema update + new schema update in a transaction

final SqlUpdate oldSchemaSqlUpdate;
if (_schemaConfig == SchemaConfig.NEW_SCHEMA_ONLY || _schemaConfig == SchemaConfig.DUAL_SCHEMA) {
// In NEW_SCHEMA or DUAL_SCHEMA, since entity table is the SOT and the getLatest (oldTimestamp) is from the entity
// table, therefore, we will apply compare-and-set with oldTimestamp on entity table (addWithOptimisticLocking)
if (_schemaConfig == SchemaConfig.NEW_SCHEMA_ONLY) {
// In NEW_SCHEMA, the entity table is the SOT and getLatest (oldTimestamp) reads from the entity
// table. Therefore, we will apply compare-and-set with oldTimestamp on entity table (addWithOptimisticLocking)
// aspect table will apply regular update over (urn, aspect, version) primary key combination.
oldSchemaSqlUpdate = assembleOldSchemaSqlUpdate(aspect, null);
numOfUpdatedRows = runInTransactionWithRetry(() -> {
Expand All @@ -814,10 +812,17 @@ protected <ASPECT extends RecordTemplate> void updateWithOptimisticLocking(@Nonn
trackingContext, isTestMode);
}, 1);
} else {
// In OLD_SCHEMA mode since aspect table is the SOT and the getLatest (oldTimestamp) is from the aspect table
// therefore, we will apply compare-and-set with oldTimestamp on aspect table (assemblyOldSchemaSqlUpdate)
// In OLD_SCHEMA and DUAL_SCHEMA mode, the aspect table is the SOT and the getLatest (oldTimestamp) is from the aspect table.
// Therefore, we will apply compare-and-set with oldTimestamp on aspect table (assemblyOldSchemaSqlUpdate)
oldSchemaSqlUpdate = assembleOldSchemaSqlUpdate(aspect, oldTimestamp);
numOfUpdatedRows = _server.execute(oldSchemaSqlUpdate);
numOfUpdatedRows = runInTransactionWithRetry(() -> {
// Additionally, in DUAL_SCHEMA mode: apply a regular update (no optimistic locking) to the entity table
if (_schemaConfig == SchemaConfig.DUAL_SCHEMA) {
_localAccess.addWithOptimisticLocking(urn, (ASPECT) value, aspectClass, newAuditStamp, null,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this conflict with the comment (no optimistic locking)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment is correct, we pass null oldTimestamp to the addWithOptimisticLocking function on the entity tables, which will just do a regular update because of null oldTimestamp

trackingContext, isTestMode);
}
return _server.execute(oldSchemaSqlUpdate);
}, 1);
}
// If there is no single updated row, emit OptimisticLockException
if (numOfUpdatedRows != 1) {
Expand All @@ -830,8 +835,6 @@ protected <ASPECT extends RecordTemplate> void updateWithOptimisticLocking(@Nonn
protected <ASPECT extends RecordTemplate> void insert(@Nonnull URN urn, @Nullable RecordTemplate value,
@Nonnull Class<ASPECT> aspectClass, @Nonnull AuditStamp auditStamp, long version,
@Nullable IngestionTrackingContext trackingContext, boolean isTestMode) {


final EbeanMetadataAspect aspect = buildMetadataAspectBean(urn, value, aspectClass, auditStamp, version);
if (_schemaConfig != SchemaConfig.OLD_SCHEMA_ONLY && version == LATEST_VERSION) {
// insert() could be called when updating log table (moving current versions into new history version)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2905,7 +2905,6 @@ public void testOptimisticLockException() {
aspect.setCreatedBy("fooActor");

if (_schemaConfig == SchemaConfig.OLD_SCHEMA_ONLY) {

// add aspect to the db
_server.insert(aspect);

Expand All @@ -2918,32 +2917,64 @@ public void testOptimisticLockException() {
dao.updateWithOptimisticLocking(fooUrn, fooAspect, AspectFoo.class, makeAuditStamp("fooActor", _now + 100),
0, new Timestamp(_now - 100), null, false);

} else if (_enableChangeLog) {
// either NEW or DUAL schema, whereas entity table is the SOT and aspect table is the log table
} else if (_schemaConfig == SchemaConfig.DUAL_SCHEMA) {
// in DUAL SCHEMA, the aspect table is the SOT even though it also writes to the entity table
// Given:
// 1. in DUAL SCHEMA mode
// 2. (foo:1, lastmodified(_now + 1)) in entity table (discrepancy)
// 3. (foo:1, lastmodified(_now), version=0) in aspect table

dao.insert(fooUrn, fooAspect, AspectFoo.class, makeAuditStamp("fooActor", _now), 0, null, false);

// make inconsistent timestamp only on the entity table
dao.setSchemaConfig(SchemaConfig.NEW_SCHEMA_ONLY);
dao.setChangeLogEnabled(false);
dao.insert(fooUrn, fooAspect, AspectFoo.class, makeAuditStamp("fooActor", _now + 1), 0, null, false);
dao.setChangeLogEnabled(true);
dao.setSchemaConfig(_schemaConfig);

// When: update with old timestamp matches the lastmodifiedon time in entity table
try {
fooAspect.setValue("bar");
dao.updateWithOptimisticLocking(fooUrn, fooAspect, AspectFoo.class, makeAuditStamp("fooActor", _now + 1000L), 0,
new Timestamp(_now), null, false);
} catch (OptimisticLockException e) {
fail("Expect the update pass since the old timestamp matches the lastmodifiedon in aspect table");
}
// Expect: update succeed and the values are updated
BaseLocalDAO.AspectEntry<AspectFoo> result = dao.getLatest(fooUrn, AspectFoo.class, false);
assertEquals(result.getAspect().getValue(), "bar");
assertEquals(result.getExtraInfo().getAudit().getTime(), Long.valueOf(_now + 1000L)); // need to set by at least 1

// When: update with old timestamp does not match the lastmodifiedon in the aspect table
// Expect: OptimisticLockException.
dao.updateWithOptimisticLocking(fooUrn, fooAspect, AspectFoo.class, makeAuditStamp("fooActor", _now + 400), 0,
new Timestamp(_now + 100), null, false);
} else if (_enableChangeLog) {
// either NEW SCHEMA, the entity table is the SOT and the aspect table is the log table
// Given:
// 1. in NEW, DUAL schema mode
// 2. (foo:1, lastmodified(_now + 1), version=0) in aspect table (discrepancy)
// 3. (foo:1, lastmodified(_now)) in entity table
// 1. in NEW SCHEMA mode
// 2. (foo:1, lastmodifiedon(_now + 1), version=0) in aspect table (discrepancy)
// 3. (foo:1, lastmodifiedon(_now)) in entity table

dao.insert(fooUrn, fooAspect, AspectFoo.class, makeAuditStamp("fooActor", _now), 0, null, false);
// make inconsistent timestamp on aspect table
aspect.setCreatedOn(new Timestamp(_now + 1));
_server.update(aspect);

// When: update with old timestamp matches the lastmodified time in entity table
// When: update with old timestamp matches the lastmodifiedon time in entity table
try {
fooAspect.setValue("bar");
dao.updateWithOptimisticLocking(fooUrn, fooAspect, AspectFoo.class, makeAuditStamp("fooActor", _now + 200), 0,
new Timestamp(_now), null, false);
} catch (OptimisticLockException e) {
fail("Expect the update pass since the old timestamp matches the lastmodified in entity table");
fail("Expect the update pass since the old timestamp matches the lastmodifiedon in entity table");
}
// Expect: update succeed and the values are updated
assertEquals(dao.getLatest(fooUrn, AspectFoo.class, false).getAspect().getValue(), "bar");
assertEquals(dao.getLatest(fooUrn, AspectFoo.class, false).getExtraInfo().getAudit().getTime(), Long.valueOf(_now + 200L));

// When: update with old timestamp does not match the lastmodified in the entity table
// When: update with old timestamp does not match the lastmodifiedon in the entity table
// Expect: OptimisticLockException.
dao.updateWithOptimisticLocking(fooUrn, fooAspect, AspectFoo.class, makeAuditStamp("fooActor", _now + 400), 0,
new Timestamp(_now + 100), null, false);
Expand Down
Loading