Skip to content

Commit c6eea1e

Browse files
fix(version): forUpdate needed for versioning (#11328)
1 parent cf49f80 commit c6eea1e

File tree

4 files changed

+35
-13
lines changed

4 files changed

+35
-13
lines changed

metadata-io/src/main/java/com/linkedin/metadata/entity/AspectDao.java

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -51,16 +51,30 @@ Map<EntityAspectIdentifier, EntityAspect> batchGet(
5151
List<EntityAspect> getAspectsInRange(
5252
@Nonnull Urn urn, Set<String> aspectNames, long startTimeMillis, long endTimeMillis);
5353

54+
/**
55+
* @param urn urn to fetch
56+
* @param aspectName aspect to fetch
57+
* @param forUpdate set to true if the result is used for versioning <a
58+
* href="https://ebean.io/docs/query/option#forUpdate">link</a>
59+
* @return
60+
*/
5461
@Nullable
5562
default EntityAspect getLatestAspect(
56-
@Nonnull final String urn, @Nonnull final String aspectName) {
57-
return getLatestAspects(Map.of(urn, Set.of(aspectName)))
63+
@Nonnull final String urn, @Nonnull final String aspectName, boolean forUpdate) {
64+
return getLatestAspects(Map.of(urn, Set.of(aspectName)), forUpdate)
5865
.getOrDefault(urn, Map.of())
5966
.getOrDefault(aspectName, null);
6067
}
6168

69+
/**
70+
* @param urnAspects urn/aspects to fetch
71+
* @param forUpdate set to true if the result is used for versioning <a
72+
* href="https://ebean.io/docs/query/option#forUpdate">link</a>
73+
* @return the data
74+
*/
6275
@Nonnull
63-
Map<String, Map<String, EntityAspect>> getLatestAspects(Map<String, Set<String>> urnAspects);
76+
Map<String, Map<String, EntityAspect>> getLatestAspects(
77+
Map<String, Set<String>> urnAspects, boolean forUpdate);
6478

6579
void saveAspect(
6680
@Nullable Transaction tx,

metadata-io/src/main/java/com/linkedin/metadata/entity/EntityServiceImpl.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -849,7 +849,7 @@ private List<UpdateAspectResult> ingestAspectsToLocalDB(
849849
final Map<String, Map<String, SystemAspect>> latestAspects =
850850
EntityUtils.toSystemAspects(
851851
opContext.getRetrieverContext().get(),
852-
aspectDao.getLatestAspects(urnAspects));
852+
aspectDao.getLatestAspects(urnAspects, true));
853853
// read #2 (potentially)
854854
final Map<String, Map<String, Long>> nextVersions =
855855
EntityUtils.calculateNextVersions(aspectDao, latestAspects, urnAspects);
@@ -866,7 +866,7 @@ private List<UpdateAspectResult> ingestAspectsToLocalDB(
866866
Map<String, Map<String, SystemAspect>> newLatestAspects =
867867
EntityUtils.toSystemAspects(
868868
opContext.getRetrieverContext().get(),
869-
aspectDao.getLatestAspects(updatedItems.getFirst()));
869+
aspectDao.getLatestAspects(updatedItems.getFirst(), true));
870870
// merge
871871
updatedLatestAspects = AspectsBatch.merge(latestAspects, newLatestAspects);
872872

@@ -2064,7 +2064,7 @@ public RollbackRunResult deleteUrn(@Nonnull OperationContext opContext, Urn urn)
20642064

20652065
EntityAspect latestKey = null;
20662066
try {
2067-
latestKey = aspectDao.getLatestAspect(urn.toString(), keyAspectName);
2067+
latestKey = aspectDao.getLatestAspect(urn.toString(), keyAspectName, false);
20682068
} catch (EntityNotFoundException e) {
20692069
log.warn("Entity to delete does not exist. {}", urn.toString());
20702070
}
@@ -2217,7 +2217,7 @@ private RollbackResult deleteAspectWithoutMCL(
22172217
(EntityAspect.EntitySystemAspect)
22182218
EntityUtils.toSystemAspect(
22192219
opContext.getRetrieverContext().get(),
2220-
aspectDao.getLatestAspect(urn, aspectName))
2220+
aspectDao.getLatestAspect(urn, aspectName, false))
22212221
.orElse(null);
22222222

22232223
// 1.1 If no latest exists, skip this aspect

metadata-io/src/main/java/com/linkedin/metadata/entity/cassandra/CassandraAspectDao.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -81,14 +81,15 @@ private boolean validateConnection() {
8181
}
8282

8383
@Override
84-
public EntityAspect getLatestAspect(@Nonnull String urn, @Nonnull String aspectName) {
84+
public EntityAspect getLatestAspect(
85+
@Nonnull String urn, @Nonnull String aspectName, boolean forUpdate) {
8586
validateConnection();
8687
return getAspect(urn, aspectName, ASPECT_LATEST_VERSION);
8788
}
8889

8990
@Override
9091
public Map<String, Map<String, EntityAspect>> getLatestAspects(
91-
Map<String, Set<String>> urnAspects) {
92+
Map<String, Set<String>> urnAspects, boolean forUpdate) {
9293
return urnAspects.entrySet().stream()
9394
.map(
9495
entry ->
@@ -97,7 +98,8 @@ public Map<String, Map<String, EntityAspect>> getLatestAspects(
9798
entry.getValue().stream()
9899
.map(
99100
aspectName -> {
100-
EntityAspect aspect = getLatestAspect(entry.getKey(), aspectName);
101+
EntityAspect aspect =
102+
getLatestAspect(entry.getKey(), aspectName, forUpdate);
101103
return aspect != null ? Map.entry(aspectName, aspect) : null;
102104
})
103105
.filter(Objects::nonNull)

metadata-io/src/main/java/com/linkedin/metadata/entity/ebean/EbeanAspectDao.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -242,7 +242,7 @@ private void saveEbeanAspect(
242242

243243
@Override
244244
public Map<String, Map<String, EntityAspect>> getLatestAspects(
245-
@Nonnull Map<String, Set<String>> urnAspects) {
245+
@Nonnull Map<String, Set<String>> urnAspects, boolean forUpdate) {
246246
validateConnection();
247247

248248
List<EbeanAspectV2.PrimaryKey> keys =
@@ -256,7 +256,12 @@ public Map<String, Map<String, EntityAspect>> getLatestAspects(
256256
entry.getKey(), aspect, ASPECT_LATEST_VERSION)))
257257
.collect(Collectors.toList());
258258

259-
List<EbeanAspectV2> results = _server.find(EbeanAspectV2.class).where().idIn(keys).findList();
259+
final List<EbeanAspectV2> results;
260+
if (forUpdate) {
261+
results = _server.find(EbeanAspectV2.class).where().idIn(keys).forUpdate().findList();
262+
} else {
263+
results = _server.find(EbeanAspectV2.class).where().idIn(keys).findList();
264+
}
260265

261266
return toUrnAspectMap(results);
262267
}
@@ -814,7 +819,8 @@ public Map<String, Map<String, Long>> getNextVersions(
814819
return result;
815820
}
816821

817-
List<EbeanAspectV2.PrimaryKey> dbResults = exp.endOr().findIds();
822+
// forUpdate is required to avoid duplicate key violations
823+
List<EbeanAspectV2.PrimaryKey> dbResults = exp.endOr().forUpdate().findIds();
818824

819825
for (EbeanAspectV2.PrimaryKey key : dbResults) {
820826
if (result.get(key.getUrn()).get(key.getAspect()) <= key.getVersion()) {

0 commit comments

Comments
 (0)