Skip to content

Commit

Permalink
Add sync doc to modified index to actively wait for earlier changes
Browse files Browse the repository at this point in the history
  • Loading branch information
EnricoMi committed Sep 8, 2021
1 parent c450d9c commit 86d9c61
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,38 @@
import org.elasticsearch.action.admin.indices.flush.FlushResponse;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.action.admin.indices.refresh.RefreshResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.xcontent.XContentType;
import org.testng.TestException;

import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.TimeUnit;

public class ElasticSearchTestUtils {

private ElasticSearchTestUtils() {
}

public static void syncAfterWrite(RestHighLevelClient searchClient) throws Exception {
// flush changes in ES to disk
public static void syncAfterWrite(RestHighLevelClient client) throws Exception {
syncAfterWrite(client, "test-sync-flag");
}

public static void syncAfterWrite(RestHighLevelClient searchClient, String indexName) throws Exception {
// we pick a random flag so that this can be used concurrently
String syncFlag = UUID.randomUUID().toString();

// add the flag and wait for it to appear, preferably to the indexed modified outside
addSyncFlag(searchClient, syncFlag, indexName);
waitForSyncFlag(searchClient, syncFlag, indexName, true);

// flush changes for all indices in ES to disk
FlushResponse fResponse = searchClient.indices().flush(new FlushRequest(), RequestOptions.DEFAULT);
if (fResponse.getFailedShards() > 0) {
throw new RuntimeException("Failed to flush " + fResponse.getFailedShards() + " of " + fResponse.getTotalShards() + " shards");
Expand All @@ -24,6 +46,38 @@ public static void syncAfterWrite(RestHighLevelClient searchClient) throws Excep
if (rResponse.getFailedShards() > 0) {
throw new RuntimeException("Failed to refresh " + rResponse.getFailedShards() + " of " + rResponse.getTotalShards() + " shards");
}

// remove the flag again and wait for it to disappear
removeSyncFlag(searchClient, syncFlag, indexName);
waitForSyncFlag(searchClient, syncFlag, indexName, false);
}

private static void addSyncFlag(RestHighLevelClient searchClient, String docId, String indexName) throws IOException {
String document = "{ }";
final IndexRequest indexRequest = new IndexRequest(indexName).id(docId).source(document, XContentType.JSON);
final UpdateRequest updateRequest = new UpdateRequest(indexName, docId).doc(document, XContentType.JSON)
.detectNoop(false)
.upsert(indexRequest);
searchClient.update(updateRequest, RequestOptions.DEFAULT);
}

private static void removeSyncFlag(RestHighLevelClient searchClient, String docId, String indexName) throws IOException {
final DeleteRequest deleteRequest = new DeleteRequest(indexName).id(docId);
searchClient.delete(deleteRequest, RequestOptions.DEFAULT);
}

private static void waitForSyncFlag(RestHighLevelClient searchClient, String docId, String indexName, boolean toExist)
throws IOException, InterruptedException {
GetRequest request = new GetRequest(indexName).id(docId);
long timeout = System.currentTimeMillis() + TimeUnit.MILLISECONDS.convert(10, TimeUnit.SECONDS);
while (System.currentTimeMillis() < timeout) {
GetResponse response = searchClient.get(request, RequestOptions.DEFAULT);
if (response.isExists() == toExist) {
return;
}
TimeUnit.MILLISECONDS.sleep(50);
}
throw new TestException("Waiting for sync timed out");
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,14 @@

import javax.annotation.Nonnull;

import static com.linkedin.metadata.graph.elastic.ElasticSearchGraphService.INDEX_NAME;

public class ElasticSearchGraphServiceTest extends GraphServiceTestBase {

private ElasticsearchContainer _elasticsearchContainer;
private RestHighLevelClient _searchClient;
private IndexConvention _indexConvention;
private final IndexConvention _indexConvention = new IndexConventionImpl(null);
private final String _indexName = _indexConvention.getIndexName(INDEX_NAME);
private ElasticSearchGraphService _client;

private static final String IMAGE_NAME = "docker.elastic.co/elasticsearch/elasticsearch:7.9.3";
Expand All @@ -37,7 +40,6 @@ public void wipe() throws Exception {

@BeforeTest
public void setup() throws Exception {
_indexConvention = new IndexConventionImpl(null);
_elasticsearchContainer = new ElasticsearchContainer(IMAGE_NAME);
_elasticsearchContainer.start();
_searchClient = buildRestClient();
Expand Down Expand Up @@ -87,6 +89,7 @@ public void tearDown() {

@Override
protected void syncAfterWrite() throws Exception {
ElasticSearchTestUtils.syncAfterWrite(_searchClient);
ElasticSearchTestUtils.syncAfterWrite(_searchClient, _indexName);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import com.linkedin.metadata.utils.elasticsearch.IndexConvention;
import com.linkedin.metadata.utils.elasticsearch.IndexConventionImpl;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull;
import org.apache.http.HttpHost;
import org.apache.http.impl.nio.reactor.IOReactorConfig;
Expand Down Expand Up @@ -108,7 +107,6 @@ public void testElasticSearchService() throws Exception {
document.set("browsePaths", JsonNodeFactory.instance.textNode("/a/b/c"));
_elasticSearchService.upsertDocument(ENTITY_NAME, document.toString(), urn.toString());
syncAfterWrite(_searchClient);
TimeUnit.SECONDS.sleep(2); // for some reason we still need to wait here though we 'syncAfterWrite'

searchResult = _elasticSearchService.search(ENTITY_NAME, "test", null, null, 0, 10);
assertEquals(searchResult.getNumEntities().intValue(), 1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,22 @@
import org.testng.annotations.Test;

import static com.linkedin.metadata.ElasticSearchTestUtils.syncAfterWrite;
import static com.linkedin.metadata.systemmetadata.ElasticSearchSystemMetadataService.INDEX_NAME;
import static org.testng.Assert.*;

public class ElasticSearchSystemMetadataServiceTest {

private ElasticsearchContainer _elasticsearchContainer;
private RestHighLevelClient _searchClient;
private IndexConvention _indexConvention;
private final IndexConvention _indexConvention = new IndexConventionImpl(null);
private final String _indexName = _indexConvention.getIndexName(INDEX_NAME);
private ElasticSearchSystemMetadataService _client;

private static final String IMAGE_NAME = "docker.elastic.co/elasticsearch/elasticsearch:7.9.3";
private static final int HTTP_PORT = 9200;

@BeforeTest
public void setup() {
_indexConvention = new IndexConventionImpl(null);
_elasticsearchContainer = new ElasticsearchContainer(IMAGE_NAME);
_elasticsearchContainer.start();
_searchClient = buildRestClient();
Expand All @@ -44,7 +45,7 @@ public void setup() {
@BeforeMethod
public void wipe() throws Exception {
_client.clear();
syncAfterWrite(_searchClient);
syncAfterWrite(_searchClient, _indexName);
}

@Nonnull
Expand Down Expand Up @@ -88,7 +89,7 @@ public void testListRuns() throws Exception {
_client.insert(metadata2, "urn:li:chart:2", "chartKey");
_client.insert(metadata2, "urn:li:chart:2", "Ownership");

syncAfterWrite(_searchClient);
syncAfterWrite(_searchClient, _indexName);

List<IngestionRunSummary> runs = _client.listRuns(0, 20);

Expand Down Expand Up @@ -117,7 +118,7 @@ public void testOverwriteRuns() throws Exception {
_client.insert(metadata2, "urn:li:chart:2", "chartKey");
_client.insert(metadata2, "urn:li:chart:2", "Ownership");

syncAfterWrite(_searchClient);
syncAfterWrite(_searchClient, _indexName);

List<IngestionRunSummary> runs = _client.listRuns(0, 20);

Expand Down Expand Up @@ -146,7 +147,7 @@ public void testFindByRunId() throws Exception {
_client.insert(metadata2, "urn:li:chart:2", "chartKey");
_client.insert(metadata2, "urn:li:chart:2", "Ownership");

syncAfterWrite(_searchClient);
syncAfterWrite(_searchClient, _indexName);

List<AspectRowSummary> rows = _client.findByRunId("abc-456");

Expand Down Expand Up @@ -174,11 +175,11 @@ public void testDelete() throws Exception {
_client.insert(metadata2, "urn:li:chart:2", "chartKey");
_client.insert(metadata2, "urn:li:chart:2", "Ownership");

syncAfterWrite(_searchClient);
syncAfterWrite(_searchClient, _indexName);

_client.deleteUrn("urn:li:chart:1");

syncAfterWrite(_searchClient);
syncAfterWrite(_searchClient, _indexName);

List<AspectRowSummary> rows = _client.findByRunId("abc-456");

Expand All @@ -190,7 +191,7 @@ public void testDelete() throws Exception {
public void testInsertNullData() throws Exception {
_client.insert(null, "urn:li:chart:1", "chartKey");

syncAfterWrite(_searchClient);
syncAfterWrite(_searchClient, _indexName);

List<IngestionRunSummary> runs = _client.listRuns(0, 20);

Expand Down

0 comments on commit 86d9c61

Please sign in to comment.