Skip to content

Commit

Permalink
fix(ebean): fix auto-closeable ebean dao streams (#10506)
Browse files Browse the repository at this point in the history
  • Loading branch information
david-leifker authored May 15, 2024
1 parent 7c4faf4 commit a847748
Show file tree
Hide file tree
Showing 14 changed files with 216 additions and 155 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ public KafkaJob(UpgradeContext context, RestoreIndicesArgs args) {
@Override
public RestoreIndicesResult call() {
return _entityService
.streamRestoreIndices(context.opContext(), args, context.report()::addLine)
.restoreIndices(context.opContext(), args, context.report()::addLine)
.stream()
.findFirst()
.get();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
import com.linkedin.metadata.entity.AspectDao;
import com.linkedin.metadata.entity.EntityService;
import com.linkedin.metadata.entity.EntityUtils;
import com.linkedin.metadata.entity.ebean.EbeanAspectV2;
import com.linkedin.metadata.entity.ebean.PartitionedStream;
import com.linkedin.metadata.entity.restoreindices.RestoreIndicesArgs;
import com.linkedin.metadata.utils.AuditStampUtils;
import com.linkedin.util.Pair;
Expand Down Expand Up @@ -76,54 +78,58 @@ public Function<UpgradeContext, UpgradeStepResult> executable() {
args = args.urnLike(getUrnLike());
}

aspectDao
.streamAspectBatches(args)
.forEach(
batch -> {
log.info("Processing batch({}) of size {}.", getAspectName(), batchSize);

List<Pair<Future<?>, Boolean>> futures =
EntityUtils.toSystemAspectFromEbeanAspects(
opContext.getRetrieverContext().get(),
batch.collect(Collectors.toList()))
.stream()
.map(
systemAspect ->
entityService.alwaysProduceMCLAsync(
opContext,
systemAspect.getUrn(),
systemAspect.getUrn().getEntityType(),
getAspectName(),
systemAspect.getAspectSpec(),
null,
systemAspect.getRecordTemplate(),
null,
systemAspect
.getSystemMetadata()
.setRunId(id())
.setLastObserved(System.currentTimeMillis()),
AuditStampUtils.createDefaultAuditStamp(),
ChangeType.UPSERT))
.collect(Collectors.toList());

futures.forEach(
f -> {
try {
f.getFirst().get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
});

if (batchDelayMs > 0) {
log.info("Sleeping for {} ms", batchDelayMs);
try {
Thread.sleep(batchDelayMs);
} catch (InterruptedException e) {
throw new RuntimeException(e);
try (PartitionedStream<EbeanAspectV2> stream = aspectDao.streamAspectBatches(args)) {
stream
.partition(args.batchSize)
.forEach(
batch -> {
log.info("Processing batch({}) of size {}.", getAspectName(), batchSize);

List<Pair<Future<?>, Boolean>> futures;

futures =
EntityUtils.toSystemAspectFromEbeanAspects(
opContext.getRetrieverContext().get(),
batch.collect(Collectors.toList()))
.stream()
.map(
systemAspect ->
entityService.alwaysProduceMCLAsync(
opContext,
systemAspect.getUrn(),
systemAspect.getUrn().getEntityType(),
getAspectName(),
systemAspect.getAspectSpec(),
null,
systemAspect.getRecordTemplate(),
null,
systemAspect
.getSystemMetadata()
.setRunId(id())
.setLastObserved(System.currentTimeMillis()),
AuditStampUtils.createDefaultAuditStamp(),
ChangeType.UPSERT))
.collect(Collectors.toList());

futures.forEach(
f -> {
try {
f.getFirst().get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
});

if (batchDelayMs > 0) {
log.info("Sleeping for {} ms", batchDelayMs);
try {
Thread.sleep(batchDelayMs);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
});
});
}

BootstrapStep.setUpgradeResult(opContext, getUpgradeIdUrn(), entityService);
context.report().addLine("State updated: " + getUpgradeIdUrn());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import com.linkedin.gms.factory.config.ConfigurationProvider;
import com.linkedin.gms.factory.search.BaseElasticSearchComponentsFactory;
import com.linkedin.metadata.entity.AspectDao;
import com.linkedin.metadata.entity.EntityAspect;
import com.linkedin.metadata.models.registry.EntityRegistry;
import com.linkedin.metadata.search.elasticsearch.indexbuilder.ReindexConfig;
import com.linkedin.metadata.shared.ElasticSearchIndexed;
Expand All @@ -28,6 +29,7 @@
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.opensearch.OpenSearchStatusException;
Expand Down Expand Up @@ -156,28 +158,34 @@ private boolean blockWrites(String indexName) throws InterruptedException, IOExc

private static Set<StructuredPropertyDefinition> getActiveStructuredPropertiesDefinitions(
AspectDao aspectDao) {
Set<String> removedStructuredPropertyUrns =
aspectDao
.streamAspects(STRUCTURED_PROPERTY_ENTITY_NAME, STATUS_ASPECT_NAME)
.map(
entityAspect ->
Pair.of(
entityAspect.getUrn(),
RecordUtils.toRecordTemplate(Status.class, entityAspect.getMetadata())))
.filter(status -> status.getSecond().isRemoved())
.map(Pair::getFirst)
.collect(Collectors.toSet());

return aspectDao
.streamAspects(STRUCTURED_PROPERTY_ENTITY_NAME, STRUCTURED_PROPERTY_DEFINITION_ASPECT_NAME)
.map(
entityAspect ->
Pair.of(
entityAspect.getUrn(),
RecordUtils.toRecordTemplate(
StructuredPropertyDefinition.class, entityAspect.getMetadata())))
.filter(definition -> !removedStructuredPropertyUrns.contains(definition.getKey()))
.map(Pair::getSecond)
.collect(Collectors.toSet());
Set<String> removedStructuredPropertyUrns;
try (Stream<EntityAspect> stream =
aspectDao.streamAspects(STRUCTURED_PROPERTY_ENTITY_NAME, STATUS_ASPECT_NAME)) {
removedStructuredPropertyUrns =
stream
.map(
entityAspect ->
Pair.of(
entityAspect.getUrn(),
RecordUtils.toRecordTemplate(Status.class, entityAspect.getMetadata())))
.filter(status -> status.getSecond().isRemoved())
.map(Pair::getFirst)
.collect(Collectors.toSet());
}

try (Stream<EntityAspect> stream =
aspectDao.streamAspects(
STRUCTURED_PROPERTY_ENTITY_NAME, STRUCTURED_PROPERTY_DEFINITION_ASPECT_NAME)) {
return stream
.map(
entityAspect ->
Pair.of(
entityAspect.getUrn(),
RecordUtils.toRecordTemplate(
StructuredPropertyDefinition.class, entityAspect.getMetadata())))
.filter(definition -> !removedStructuredPropertyUrns.contains(definition.getKey()))
.map(Pair::getSecond)
.collect(Collectors.toSet());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.linkedin.common.urn.Urn;
import com.linkedin.metadata.aspect.batch.AspectsBatch;
import com.linkedin.metadata.entity.ebean.EbeanAspectV2;
import com.linkedin.metadata.entity.ebean.PartitionedStream;
import com.linkedin.metadata.entity.restoreindices.RestoreIndicesArgs;
import com.linkedin.metadata.utils.metrics.MetricUtils;
import io.ebean.Transaction;
Expand Down Expand Up @@ -105,7 +106,7 @@ ListResult<String> listUrns(
Integer countAspect(@Nonnull final String aspectName, @Nullable String urnLike);

@Nonnull
Stream<Stream<EbeanAspectV2>> streamAspectBatches(final RestoreIndicesArgs args);
PartitionedStream<EbeanAspectV2> streamAspectBatches(final RestoreIndicesArgs args);

@Nonnull
Stream<EntityAspect> streamAspects(String entityName, String aspectName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@
import com.linkedin.metadata.aspect.plugins.validation.ValidationExceptionCollection;
import com.linkedin.metadata.aspect.utils.DefaultAspectsUtil;
import com.linkedin.metadata.config.PreProcessHooks;
import com.linkedin.metadata.entity.ebean.EbeanAspectV2;
import com.linkedin.metadata.entity.ebean.PartitionedStream;
import com.linkedin.metadata.entity.ebean.batch.AspectsBatchImpl;
import com.linkedin.metadata.entity.ebean.batch.ChangeItemImpl;
import com.linkedin.metadata.entity.ebean.batch.DeleteItemImpl;
Expand Down Expand Up @@ -1248,7 +1250,7 @@ public Integer getCountAspect(

@Nonnull
@Override
public Stream<RestoreIndicesResult> streamRestoreIndices(
public List<RestoreIndicesResult> restoreIndices(
@Nonnull OperationContext opContext,
@Nonnull RestoreIndicesArgs args,
@Nonnull Consumer<String> logger) {
Expand All @@ -1257,32 +1259,35 @@ public Stream<RestoreIndicesResult> streamRestoreIndices(
logger.accept(
String.format(
"Reading rows %s through %s (0 == infinite) in batches of %s from the aspects table started.",
args.start, args.limit, args.batchSize));
args.start, args.start + args.limit, args.batchSize));

long startTime = System.currentTimeMillis();
return aspectDao
.streamAspectBatches(args)
.map(
batchStream -> {
long timeSqlQueryMs = System.currentTimeMillis() - startTime;

List<SystemAspect> systemAspects =
EntityUtils.toSystemAspectFromEbeanAspects(
opContext.getRetrieverContext().get(),
batchStream.collect(Collectors.toList()));

RestoreIndicesResult result = restoreIndices(opContext, systemAspects, logger);
result.timeSqlQueryMs = timeSqlQueryMs;

logger.accept("Batch completed.");
try {
TimeUnit.MILLISECONDS.sleep(args.batchDelayMs);
} catch (InterruptedException e) {
throw new RuntimeException(
"Thread interrupted while sleeping after successful batch migration.");
}
return result;
});
try (PartitionedStream<EbeanAspectV2> stream = aspectDao.streamAspectBatches(args)) {
return stream
.partition(args.batchSize)
.map(
batch -> {
long timeSqlQueryMs = System.currentTimeMillis() - startTime;

List<SystemAspect> systemAspects =
EntityUtils.toSystemAspectFromEbeanAspects(
opContext.getRetrieverContext().get(), batch.collect(Collectors.toList()));

RestoreIndicesResult result = restoreIndices(opContext, systemAspects, logger);
result.timeSqlQueryMs = timeSqlQueryMs;

logger.accept("Batch completed.");
try {
TimeUnit.MILLISECONDS.sleep(args.batchDelayMs);
} catch (InterruptedException e) {
throw new RuntimeException(
"Thread interrupted while sleeping after successful batch migration.");
}
return result;
})
.collect(Collectors.toList());
}
}

@Nonnull
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import com.linkedin.metadata.entity.EntityAspectIdentifier;
import com.linkedin.metadata.entity.ListResult;
import com.linkedin.metadata.entity.ebean.EbeanAspectV2;
import com.linkedin.metadata.entity.ebean.PartitionedStream;
import com.linkedin.metadata.entity.restoreindices.RestoreIndicesArgs;
import com.linkedin.metadata.query.ExtraInfo;
import com.linkedin.metadata.query.ExtraInfoArray;
Expand Down Expand Up @@ -491,7 +492,7 @@ public Integer countAspect(@Nonnull String aspectName, @Nullable String urnLike)
}

@Nonnull
public Stream<Stream<EbeanAspectV2>> streamAspectBatches(final RestoreIndicesArgs args) {
public PartitionedStream<EbeanAspectV2> streamAspectBatches(final RestoreIndicesArgs args) {
// Not implemented
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.collect.Iterators;
import com.linkedin.common.AuditStamp;
import com.linkedin.common.urn.Urn;
import com.linkedin.metadata.aspect.RetrieverContext;
Expand Down Expand Up @@ -49,7 +48,6 @@
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
Expand All @@ -61,7 +59,6 @@
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.persistence.PersistenceException;
Expand Down Expand Up @@ -497,9 +494,15 @@ public Integer countAspect(@Nonnull String aspectName, @Nullable String urnLike)
return exp.findCount();
}

/**
* Warning this inner Streams must be closed
*
* @param args
* @return
*/
@Nonnull
@Override
public Stream<Stream<EbeanAspectV2>> streamAspectBatches(final RestoreIndicesArgs args) {
public PartitionedStream<EbeanAspectV2> streamAspectBatches(final RestoreIndicesArgs args) {
ExpressionList<EbeanAspectV2> exp =
_server
.find(EbeanAspectV2.class)
Expand Down Expand Up @@ -548,25 +551,24 @@ public Stream<Stream<EbeanAspectV2>> streamAspectBatches(final RestoreIndicesArg
exp = exp.setMaxRows(args.limit);
}

return partition(
exp.orderBy()
.asc(EbeanAspectV2.URN_COLUMN)
.orderBy()
.asc(EbeanAspectV2.ASPECT_COLUMN)
.setFirstRow(start)
.findStream(),
args.batchSize);
}

private static <T> Stream<Stream<T>> partition(Stream<T> source, int size) {
final Iterator<T> it = source.iterator();
final Iterator<Stream<T>> partIt =
Iterators.transform(Iterators.partition(it, size), List::stream);
final Iterable<Stream<T>> iterable = () -> partIt;

return StreamSupport.stream(iterable.spliterator(), false);
return PartitionedStream.<EbeanAspectV2>builder()
.delegateStream(
exp.orderBy()
.asc(EbeanAspectV2.URN_COLUMN)
.orderBy()
.asc(EbeanAspectV2.ASPECT_COLUMN)
.setFirstRow(start)
.findStream())
.build();
}

/**
* Warning the stream must be closed
*
* @param entityName
* @param aspectName
* @return
*/
@Override
@Nonnull
public Stream<EntityAspect> streamAspects(String entityName, String aspectName) {
Expand Down
Loading

0 comments on commit a847748

Please sign in to comment.