Skip to content

Commit

Permalink
fix(restoreIndices): batchSize vs limit (#10178)
Browse files Browse the repository at this point in the history
  • Loading branch information
david-leifker authored Apr 1, 2024
1 parent ef637cc commit 9a0a53b
Show file tree
Hide file tree
Showing 19 changed files with 275 additions and 98 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public KafkaJob(UpgradeContext context, RestoreIndicesArgs args) {

@Override
public RestoreIndicesResult call() {
return _entityService.restoreIndices(args, context.report()::addLine);
return _entityService.streamRestoreIndices(args, context.report()::addLine).findFirst().get();
}
}

Expand Down Expand Up @@ -85,7 +85,10 @@ private List<RestoreIndicesResult> iterateFutures(List<Future<RestoreIndicesResu
private RestoreIndicesArgs getArgs(UpgradeContext context) {
RestoreIndicesArgs result = new RestoreIndicesArgs();
result.batchSize = getBatchSize(context.parsedArgs());
// this class assumes batch size == limit
result.limit = getBatchSize(context.parsedArgs());
context.report().addLine(String.format("batchSize is %d", result.batchSize));
context.report().addLine(String.format("limit is %d", result.limit));
result.numThreads = getThreadCount(context.parsedArgs());
context.report().addLine(String.format("numThreads is %d", result.numThreads));
result.batchDelayMs = getBatchDelayMs(context.parsedArgs());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,13 @@
import com.linkedin.metadata.boot.BootstrapStep;
import com.linkedin.metadata.entity.EntityService;
import com.linkedin.metadata.entity.restoreindices.RestoreIndicesArgs;
import com.linkedin.metadata.entity.restoreindices.RestoreIndicesResult;
import java.util.function.Function;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class ReindexDataJobViaNodesCLLStep implements UpgradeStep {

private static final String UPGRADE_ID = "via-node-cll-reindex-datajob";
public static final String UPGRADE_ID = "via-node-cll-reindex-datajob-v2";
private static final Urn UPGRADE_ID_URN = BootstrapStep.getUpgradeUrn(UPGRADE_ID);

private final EntityService<?> entityService;
Expand All @@ -33,13 +32,17 @@ public Function<UpgradeContext, UpgradeStepResult> executable() {
return (context) -> {
RestoreIndicesArgs args =
new RestoreIndicesArgs()
.setAspectName(DATA_JOB_INPUT_OUTPUT_ASPECT_NAME)
.setUrnLike("urn:li:" + DATA_JOB_ENTITY_NAME + ":%")
.setBatchSize(batchSize);
RestoreIndicesResult result =
entityService.restoreIndices(args, x -> context.report().addLine((String) x));
context.report().addLine("Rows migrated: " + result.rowsMigrated);
context.report().addLine("Rows ignored: " + result.ignored);
.aspectName(DATA_JOB_INPUT_OUTPUT_ASPECT_NAME)
.urnLike("urn:li:" + DATA_JOB_ENTITY_NAME + ":%")
.batchSize(batchSize);

entityService
.streamRestoreIndices(args, x -> context.report().addLine((String) x))
.forEach(
result -> {
context.report().addLine("Rows migrated: " + result.rowsMigrated);
context.report().addLine("Rows ignored: " + result.ignored);
});

BootstrapStep.setUpgradeResult(UPGRADE_ID_URN, entityService);
context.report().addLine("State updated: " + UPGRADE_ID_URN);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package com.linkedin.datahub.upgrade;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.testng.AssertJUnit.assertNotNull;

import com.linkedin.datahub.upgrade.impl.DefaultUpgradeManager;
import com.linkedin.datahub.upgrade.system.SystemUpdateNonBlocking;
import com.linkedin.datahub.upgrade.system.vianodes.ReindexDataJobViaNodesCLL;
import com.linkedin.metadata.entity.EntityService;
import com.linkedin.metadata.entity.restoreindices.RestoreIndicesArgs;
import java.util.List;
import javax.inject.Named;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.context.testng.AbstractTestNGSpringContextTests;
import org.testng.annotations.Test;

@ActiveProfiles("test")
@SpringBootTest(
classes = {UpgradeCliApplication.class, UpgradeCliApplicationTestConfiguration.class},
properties = {
"BOOTSTRAP_SYSTEM_UPDATE_DATA_JOB_NODE_CLL_ENABLED=true",
"kafka.schemaRegistry.type=INTERNAL",
"DATAHUB_UPGRADE_HISTORY_TOPIC_NAME=test_due_topic",
"METADATA_CHANGE_LOG_VERSIONED_TOPIC_NAME=test_mcl_versioned_topic"
},
args = {"-u", "SystemUpdateNonBlocking"})
public class DatahubUpgradeNonBlockingTest extends AbstractTestNGSpringContextTests {

@Autowired(required = false)
@Named("systemUpdateNonBlocking")
private SystemUpdateNonBlocking systemUpdateNonBlocking;

@Autowired
@Test
public void testSystemUpdateNonBlockingInit() {
assertNotNull(systemUpdateNonBlocking);
}

@Test
public void testReindexDataJobViaNodesCLLPaging() {
EntityService<?> mockService = mock(EntityService.class);
ReindexDataJobViaNodesCLL cllUpgrade = new ReindexDataJobViaNodesCLL(mockService, true, 10);
SystemUpdateNonBlocking upgrade =
new SystemUpdateNonBlocking(List.of(), List.of(cllUpgrade), null);
DefaultUpgradeManager manager = new DefaultUpgradeManager();
manager.register(upgrade);
manager.execute("SystemUpdateNonBlocking", List.of());
verify(mockService, times(1))
.streamRestoreIndices(
eq(
new RestoreIndicesArgs()
.batchSize(10)
.limit(0)
.aspectName("dataJobInputOutput")
.urnLike("urn:li:dataJob:%")),
any());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import com.linkedin.metadata.entity.ebean.EbeanAspectV2;
import com.linkedin.metadata.entity.restoreindices.RestoreIndicesArgs;
import com.linkedin.metadata.utils.metrics.MetricUtils;
import io.ebean.PagedList;
import io.ebean.Transaction;
import java.sql.Timestamp;
import java.util.List;
Expand Down Expand Up @@ -106,7 +105,7 @@ ListResult<String> listUrns(
Integer countAspect(@Nonnull final String aspectName, @Nullable String urnLike);

@Nonnull
PagedList<EbeanAspectV2> getPagedAspects(final RestoreIndicesArgs args);
Stream<Stream<EbeanAspectV2>> streamAspectBatches(final RestoreIndicesArgs args);

@Nonnull
Stream<EntityAspect> streamAspects(String entityName, String aspectName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@
import com.linkedin.metadata.aspect.plugins.validation.ValidationExceptionCollection;
import com.linkedin.metadata.aspect.utils.DefaultAspectsUtil;
import com.linkedin.metadata.config.PreProcessHooks;
import com.linkedin.metadata.entity.ebean.EbeanAspectV2;
import com.linkedin.metadata.entity.ebean.batch.AspectsBatchImpl;
import com.linkedin.metadata.entity.ebean.batch.ChangeItemImpl;
import com.linkedin.metadata.entity.ebean.batch.DeleteItemImpl;
Expand All @@ -76,7 +75,6 @@
import com.linkedin.mxe.SystemMetadata;
import com.linkedin.r2.RemoteInvocationException;
import com.linkedin.util.Pair;
import io.ebean.PagedList;
import io.ebean.Transaction;
import io.opentelemetry.extension.annotations.WithSpan;
import java.net.URISyntaxException;
Expand Down Expand Up @@ -1177,38 +1175,38 @@ public Integer getCountAspect(@Nonnull String aspectName, @Nullable String urnLi

@Nonnull
@Override
public RestoreIndicesResult restoreIndices(
public Stream<RestoreIndicesResult> streamRestoreIndices(
@Nonnull RestoreIndicesArgs args, @Nonnull Consumer<String> logger) {

logger.accept(String.format("Args are %s", args));
logger.accept(
String.format(
"Reading rows %s through %s from the aspects table started.",
args.start, args.start + args.batchSize));
long startTime = System.currentTimeMillis();
PagedList<EbeanAspectV2> rows = aspectDao.getPagedAspects(args);
long timeSqlQueryMs = System.currentTimeMillis() - startTime;
"Reading rows %s through %s (0 == infinite) in batches of %s from the aspects table started.",
args.start, args.limit, args.batchSize));

logger.accept(
String.format(
"Reading rows %s through %s from the aspects table completed.",
args.start, args.start + args.batchSize));
long startTime = System.currentTimeMillis();
return aspectDao
.streamAspectBatches(args)
.map(
batchStream -> {
long timeSqlQueryMs = System.currentTimeMillis() - startTime;

List<SystemAspect> systemAspects =
EntityUtils.toSystemAspectFromEbeanAspects(
rows != null ? rows.getList() : List.<EbeanAspectV2>of(), this);
List<SystemAspect> systemAspects =
EntityUtils.toSystemAspectFromEbeanAspects(
batchStream.collect(Collectors.toList()), this);

RestoreIndicesResult result = restoreIndices(systemAspects, logger);
RestoreIndicesResult result = restoreIndices(systemAspects, logger);
result.timeSqlQueryMs = timeSqlQueryMs;

try {
TimeUnit.MILLISECONDS.sleep(args.batchDelayMs);
} catch (InterruptedException e) {
throw new RuntimeException(
"Thread interrupted while sleeping after successful batch migration.");
}

result.timeSqlQueryMs = timeSqlQueryMs;
return result;
logger.accept("Batch completed.");
try {
TimeUnit.MILLISECONDS.sleep(args.batchDelayMs);
} catch (InterruptedException e) {
throw new RuntimeException(
"Thread interrupted while sleeping after successful batch migration.");
}
return result;
});
}

@Nonnull
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import com.linkedin.metadata.query.ExtraInfo;
import com.linkedin.metadata.query.ExtraInfoArray;
import com.linkedin.metadata.query.ListResultMetadata;
import io.ebean.PagedList;
import io.ebean.Transaction;
import java.net.URISyntaxException;
import java.nio.charset.StandardCharsets;
Expand Down Expand Up @@ -492,7 +491,7 @@ public Integer countAspect(@Nonnull String aspectName, @Nullable String urnLike)
}

@Nonnull
public PagedList<EbeanAspectV2> getPagedAspects(final RestoreIndicesArgs args) {
public Stream<Stream<EbeanAspectV2>> streamAspectBatches(final RestoreIndicesArgs args) {
// Not implemented
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.collect.Iterators;
import com.linkedin.common.AuditStamp;
import com.linkedin.common.urn.Urn;
import com.linkedin.metadata.aspect.AspectRetriever;
Expand Down Expand Up @@ -43,10 +44,12 @@
import java.net.URISyntaxException;
import java.sql.Timestamp;
import java.time.Clock;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
Expand All @@ -58,6 +61,7 @@
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.persistence.PersistenceException;
Expand Down Expand Up @@ -495,7 +499,7 @@ public Integer countAspect(@Nonnull String aspectName, @Nullable String urnLike)

@Nonnull
@Override
public PagedList<EbeanAspectV2> getPagedAspects(final RestoreIndicesArgs args) {
public Stream<Stream<EbeanAspectV2>> streamAspectBatches(final RestoreIndicesArgs args) {
ExpressionList<EbeanAspectV2> exp =
_server
.find(EbeanAspectV2.class)
Expand All @@ -511,6 +515,15 @@ public PagedList<EbeanAspectV2> getPagedAspects(final RestoreIndicesArgs args) {
if (args.urnLike != null) {
exp = exp.like(EbeanAspectV2.URN_COLUMN, args.urnLike);
}
if (args.gePitEpochMs > 0) {
exp =
exp.ge(
EbeanAspectV2.CREATED_ON_COLUMN,
Timestamp.from(Instant.ofEpochMilli(args.gePitEpochMs)))
.le(
EbeanAspectV2.CREATED_ON_COLUMN,
Timestamp.from(Instant.ofEpochMilli(args.lePitEpochMs)));
}

int start = args.start;
if (args.urnBasedPagination) {
Expand All @@ -531,13 +544,27 @@ public PagedList<EbeanAspectV2> getPagedAspects(final RestoreIndicesArgs args) {
}
}

return exp.orderBy()
.asc(EbeanAspectV2.URN_COLUMN)
.orderBy()
.asc(EbeanAspectV2.ASPECT_COLUMN)
.setFirstRow(start)
.setMaxRows(args.batchSize)
.findPagedList();
if (args.limit > 0) {
exp = exp.setMaxRows(args.limit);
}

return partition(
exp.orderBy()
.asc(EbeanAspectV2.URN_COLUMN)
.orderBy()
.asc(EbeanAspectV2.ASPECT_COLUMN)
.setFirstRow(start)
.findStream(),
args.batchSize);
}

private static <T> Stream<Stream<T>> partition(Stream<T> source, int size) {
final Iterator<T> it = source.iterator();
final Iterator<Stream<T>> partIt =
Iterators.transform(Iterators.partition(it, size), List::stream);
final Iterable<Stream<T>> iterable = () -> partIt;

return StreamSupport.stream(iterable.spliterator(), false);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1578,13 +1578,13 @@ public void testRestoreIndices() throws Exception {
clearInvocations(_mockProducer);

RestoreIndicesArgs args = new RestoreIndicesArgs();
args.setAspectName(UPSTREAM_LINEAGE_ASPECT_NAME);
args.setBatchSize(1);
args.setStart(0);
args.setBatchDelayMs(1L);
args.setNumThreads(1);
args.setUrn(urnStr);
_entityServiceImpl.restoreIndices(args, obj -> {});
args.aspectName(UPSTREAM_LINEAGE_ASPECT_NAME);
args.batchSize(1);
args.start(0);
args.batchDelayMs(1L);
args.numThreads(1);
args.urn(urnStr);
_entityServiceImpl.streamRestoreIndices(args, obj -> {}).collect(Collectors.toList());

ArgumentCaptor<MetadataChangeLog> mclCaptor =
ArgumentCaptor.forClass(MetadataChangeLog.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.linkedin.metadata.entity.EntityService;
import com.linkedin.metadata.entity.restoreindices.RestoreIndicesResult;
import io.datahubproject.metadata.jobs.common.health.kafka.KafkaHealthIndicator;
import java.util.stream.Stream;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.web.client.TestRestTemplate;
Expand All @@ -30,7 +31,7 @@ public class MceConsumerApplicationTest extends AbstractTestNGSpringContextTests
public void testRestliServletConfig() {
RestoreIndicesResult mockResult = new RestoreIndicesResult();
mockResult.setRowsMigrated(100);
when(_mockEntityService.restoreIndices(any(), any())).thenReturn(mockResult);
when(_mockEntityService.streamRestoreIndices(any(), any())).thenReturn(Stream.of(mockResult));

String response =
this.restTemplate.postForObject(
Expand Down
Loading

0 comments on commit 9a0a53b

Please sign in to comment.