Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(version): forUpdate needed for versioning #11328

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -51,16 +51,30 @@ Map<EntityAspectIdentifier, EntityAspect> batchGet(
List<EntityAspect> getAspectsInRange(
@Nonnull Urn urn, Set<String> aspectNames, long startTimeMillis, long endTimeMillis);

/**
* @param urn urn to fetch
* @param aspectName aspect to fetch
* @param forUpdate set to true if the result is used for versioning <a
* href="https://ebean.io/docs/query/option#forUpdate">link</a>
* @return
*/
@Nullable
default EntityAspect getLatestAspect(
@Nonnull final String urn, @Nonnull final String aspectName) {
return getLatestAspects(Map.of(urn, Set.of(aspectName)))
@Nonnull final String urn, @Nonnull final String aspectName, boolean forUpdate) {
return getLatestAspects(Map.of(urn, Set.of(aspectName)), forUpdate)
.getOrDefault(urn, Map.of())
.getOrDefault(aspectName, null);
}

/**
* @param urnAspects urn/aspects to fetch
* @param forUpdate set to true if the result is used for versioning <a
* href="https://ebean.io/docs/query/option#forUpdate">link</a>
* @return the data
*/
@Nonnull
Map<String, Map<String, EntityAspect>> getLatestAspects(Map<String, Set<String>> urnAspects);
Map<String, Map<String, EntityAspect>> getLatestAspects(
Map<String, Set<String>> urnAspects, boolean forUpdate);

void saveAspect(
@Nullable Transaction tx,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -849,7 +849,7 @@ private List<UpdateAspectResult> ingestAspectsToLocalDB(
final Map<String, Map<String, SystemAspect>> latestAspects =
EntityUtils.toSystemAspects(
opContext.getRetrieverContext().get(),
aspectDao.getLatestAspects(urnAspects));
aspectDao.getLatestAspects(urnAspects, true));
// read #2 (potentially)
final Map<String, Map<String, Long>> nextVersions =
EntityUtils.calculateNextVersions(aspectDao, latestAspects, urnAspects);
Expand All @@ -866,7 +866,7 @@ private List<UpdateAspectResult> ingestAspectsToLocalDB(
Map<String, Map<String, SystemAspect>> newLatestAspects =
EntityUtils.toSystemAspects(
opContext.getRetrieverContext().get(),
aspectDao.getLatestAspects(updatedItems.getFirst()));
aspectDao.getLatestAspects(updatedItems.getFirst(), true));
// merge
updatedLatestAspects = AspectsBatch.merge(latestAspects, newLatestAspects);

Expand Down Expand Up @@ -2064,7 +2064,7 @@ public RollbackRunResult deleteUrn(@Nonnull OperationContext opContext, Urn urn)

EntityAspect latestKey = null;
try {
latestKey = aspectDao.getLatestAspect(urn.toString(), keyAspectName);
latestKey = aspectDao.getLatestAspect(urn.toString(), keyAspectName, false);
} catch (EntityNotFoundException e) {
log.warn("Entity to delete does not exist. {}", urn.toString());
}
Expand Down Expand Up @@ -2217,7 +2217,7 @@ private RollbackResult deleteAspectWithoutMCL(
(EntityAspect.EntitySystemAspect)
EntityUtils.toSystemAspect(
opContext.getRetrieverContext().get(),
aspectDao.getLatestAspect(urn, aspectName))
aspectDao.getLatestAspect(urn, aspectName, false))
.orElse(null);

// 1.1 If no latest exists, skip this aspect
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,14 +81,15 @@ private boolean validateConnection() {
}

@Override
public EntityAspect getLatestAspect(@Nonnull String urn, @Nonnull String aspectName) {
public EntityAspect getLatestAspect(
@Nonnull String urn, @Nonnull String aspectName, boolean forUpdate) {
validateConnection();
return getAspect(urn, aspectName, ASPECT_LATEST_VERSION);
}

@Override
public Map<String, Map<String, EntityAspect>> getLatestAspects(
Map<String, Set<String>> urnAspects) {
Map<String, Set<String>> urnAspects, boolean forUpdate) {
return urnAspects.entrySet().stream()
.map(
entry ->
Expand All @@ -97,7 +98,8 @@ public Map<String, Map<String, EntityAspect>> getLatestAspects(
entry.getValue().stream()
.map(
aspectName -> {
EntityAspect aspect = getLatestAspect(entry.getKey(), aspectName);
EntityAspect aspect =
getLatestAspect(entry.getKey(), aspectName, forUpdate);
return aspect != null ? Map.entry(aspectName, aspect) : null;
})
.filter(Objects::nonNull)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ private void saveEbeanAspect(

@Override
public Map<String, Map<String, EntityAspect>> getLatestAspects(
@Nonnull Map<String, Set<String>> urnAspects) {
@Nonnull Map<String, Set<String>> urnAspects, boolean forUpdate) {
validateConnection();

List<EbeanAspectV2.PrimaryKey> keys =
Expand All @@ -256,7 +256,12 @@ public Map<String, Map<String, EntityAspect>> getLatestAspects(
entry.getKey(), aspect, ASPECT_LATEST_VERSION)))
.collect(Collectors.toList());

List<EbeanAspectV2> results = _server.find(EbeanAspectV2.class).where().idIn(keys).findList();
final List<EbeanAspectV2> results;
if (forUpdate) {
results = _server.find(EbeanAspectV2.class).where().idIn(keys).forUpdate().findList();
} else {
results = _server.find(EbeanAspectV2.class).where().idIn(keys).findList();
}

return toUrnAspectMap(results);
}
Expand Down Expand Up @@ -814,7 +819,8 @@ public Map<String, Map<String, Long>> getNextVersions(
return result;
}

List<EbeanAspectV2.PrimaryKey> dbResults = exp.endOr().findIds();
// forUpdate is required to avoid duplicate key violations
List<EbeanAspectV2.PrimaryKey> dbResults = exp.endOr().forUpdate().findIds();

for (EbeanAspectV2.PrimaryKey key : dbResults) {
if (result.get(key.getUrn()).get(key.getAspect()) <= key.getVersion()) {
Expand Down
Loading