Skip to content

Commit

Permalink
feat: Add Elasticsearch 7 support (#2092)
Browse files Browse the repository at this point in the history
* upgrade to ES7

* add doc summarizing the changes

* Call out the optional parameters in search query

Some parameters in search query are optional and automatically populated. With the ES upgrade these parameters could be removed/added.

* revert unrelated changes

* update required max_ngram_diff field for ES7 (caught by integration tests)

Co-authored-by: Jyoti Wadhwani <[email protected]>
  • Loading branch information
John Plaisted and jywadhwani committed Mar 19, 2021
1 parent 711e023 commit 0a48d5c
Show file tree
Hide file tree
Showing 13 changed files with 73 additions and 73 deletions.
8 changes: 4 additions & 4 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,15 @@ project.ext.externalDependency = [
'commonsLang': 'commons-lang:commons-lang:2.6',
'ebean': 'io.ebean:ebean:11.33.3',
'ebeanAgent': 'io.ebean:ebean-agent:11.27.1',
'elasticSearchRest': 'org.elasticsearch.client:elasticsearch-rest-high-level-client:5.6.8',
'elasticSearchTransport': 'org.elasticsearch.client:transport:5.6.8',
'elasticSearchRest': 'org.elasticsearch.client:elasticsearch-rest-high-level-client:7.9.3',
'elasticSearchTransport': 'org.elasticsearch.client:transport:7.9.3',
'findbugsAnnotations': 'com.google.code.findbugs:annotations:3.0.1',
'gmaCoreModels': "com.linkedin.datahub-gma:core-models-data-template:$gmaVersion",
'gmaDaoApi': "com.linkedin.datahub-gma:dao-api:$gmaVersion",
'gmaDaoApiDataTemplate': "com.linkedin.datahub-gma:dao-api-data-template:$gmaVersion",
'gmaEbeanDao': "com.linkedin.datahub-gma:ebean-dao:$gmaVersion",
'gmaElasticsearchDao': "com.linkedin.datahub-gma:elasticsearch-dao:$gmaVersion",
'gmaElasticsearchIntegTest': "com.linkedin.datahub-gma:elasticsearch-dao-integ-testing-docker:$gmaVersion",
'gmaElasticsearchDao': "com.linkedin.datahub-gma:elasticsearch-dao-7:$gmaVersion",
'gmaElasticsearchIntegTest': "com.linkedin.datahub-gma:elasticsearch-dao-integ-testing-docker-7:$gmaVersion",
'gmaNeo4jDao': "com.linkedin.datahub-gma:neo4j-dao:$gmaVersion",
'gmaRestliResources': "com.linkedin.datahub-gma:restli-resources:$gmaVersion",
'gmaRestliResourcesDataTemplate': "com.linkedin.datahub-gma:restli-resources-data-template:$gmaVersion",
Expand Down
15 changes: 15 additions & 0 deletions docs/advanced/es-7-upgrade.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# Elasticsearch upgrade from 5.6.8 to 7.9.3: Summary of changes

### Search index mapping & settings
- Removal of mapping types (as mentioned [here](https://www.elastic.co/guide/en/elasticsearch/reference/current/removal-of-types.html))
- Specify the maximum allowed difference between `min_gram` and `max_gram` for NGramTokenizer and NGramTokenFilter by adding property `max_ngram_diff` in index settings, particularly if the difference is greater than 1 (as mentioned [here](https://www.elastic.co/guide/en/elasticsearch/reference/current/index-modules.html))


### Search query
The following parameters are/were `optional` and hence automatically populated in the search query. Some tests that expect a certain search query to be sent to ES will change with the ES upgrade.
- `disable_coord` parameter of the `bool` and `common_terms` queries has been removed (as mentioned [here](https://www.elastic.co/guide/en/elasticsearch/reference/6.8/breaking-changes-6.0.html))
- `auto_generate_synonyms_phrase_query` parameter in `match` query is added with a default value of `true` (as mentioned [here](https://www.elastic.co/guide/en/elasticsearch/reference/7.x/query-dsl-match-query.html))

### Java High Level Rest Client
- In 7.9.3, Java High Level Rest Client instance needs a REST low-level client builder to be built. In 5.6.8, the same instance needs REST low-level client
- Document APIs such as the Index API, Delete API, etc no longer takes the doc `type` as an input
Original file line number Diff line number Diff line change
Expand Up @@ -44,47 +44,41 @@ public class RestHighLevelClientFactory {
@Bean(name = "elasticSearchRestHighLevelClient")
@Nonnull
protected RestHighLevelClient createInstance() {
try {
RestClient restClient;
if (useSSL) {
restClient = loadRestHttpsClient(host, port, threadCount, connectionRequestTimeout, sslContext);
} else {
restClient = loadRestHttpClient(host, port, threadCount, connectionRequestTimeout);
}

return new RestHighLevelClient(restClient);
} catch (Exception e) {
throw new RuntimeException("Error: RestClient is not properly initialized. " + e.toString());
RestClientBuilder restClientBuilder;

if (useSSL) {
restClientBuilder = loadRestHttpsClient(host, port, threadCount, connectionRequestTimeout, sslContext);
} else {
restClientBuilder = loadRestHttpClient(host, port, threadCount, connectionRequestTimeout);
}

return new RestHighLevelClient(restClientBuilder);
}

@Nonnull
private static RestClient loadRestHttpClient(@Nonnull String host, int port, int threadCount,
int connectionRequestTimeout) {
private static RestClientBuilder loadRestHttpClient(@Nonnull String host, int port, int threadCount,
int connectionRequestTimeout) {
RestClientBuilder builder = RestClient.builder(new HttpHost(host, port, "http"))
.setHttpClientConfigCallback(httpAsyncClientBuilder ->
httpAsyncClientBuilder.setDefaultIOReactorConfig(IOReactorConfig.custom()
.setIoThreadCount(threadCount).build()));
.setHttpClientConfigCallback(httpAsyncClientBuilder -> httpAsyncClientBuilder.setDefaultIOReactorConfig(
IOReactorConfig.custom().setIoThreadCount(threadCount).build()));

builder.setRequestConfigCallback(requestConfigBuilder -> requestConfigBuilder.
setConnectionRequestTimeout(connectionRequestTimeout));
setConnectionRequestTimeout(connectionRequestTimeout));

return builder.build();
return builder;
}

@Nonnull
private static RestClient loadRestHttpsClient(@Nonnull String host, int port, int threadCount,
int connectionRequestTimeout, @Nonnull SSLContext sslContext) {

private static RestClientBuilder loadRestHttpsClient(@Nonnull String host, int port, int threadCount,
int connectionRequestTimeout, @Nonnull SSLContext sslContext) {
final RestClientBuilder builder = RestClient.builder(new HttpHost(host, port, "https"))
.setHttpClientConfigCallback(httpAsyncClientBuilder -> httpAsyncClientBuilder.setSSLContext(sslContext)
.setSSLHostnameVerifier(new NoopHostnameVerifier())
.setDefaultIOReactorConfig(IOReactorConfig.custom().setIoThreadCount(threadCount).build()));
.setHttpClientConfigCallback(httpAsyncClientBuilder -> httpAsyncClientBuilder.setSSLContext(sslContext)
.setSSLHostnameVerifier(new NoopHostnameVerifier())
.setDefaultIOReactorConfig(IOReactorConfig.custom().setIoThreadCount(threadCount).build()));

builder.setRequestConfigCallback(requestConfigBuilder -> requestConfigBuilder.
setConnectionRequestTimeout(connectionRequestTimeout));
setConnectionRequestTimeout(connectionRequestTimeout));

return builder.build();
return builder;
}
}

Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
{
"index": {
"max_ngram_diff": 47,
"analysis": {
"filter": {
"autocomplete_filter": {
Expand Down
1 change: 1 addition & 0 deletions gms/impl/src/main/resources/index/dataset/settings.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
{
"index": {
"max_ngram_diff": 47,
"analysis": {
"filter": {
"autocomplete_filter": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
},
"post_filter" : {
"bool" : {
"disable_coord" : false,
"adjust_pure_negative" : true,
"boost" : 1.0
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
},
"post_filter" : {
"bool" : {
"disable_coord" : false,
"adjust_pure_negative" : true,
"boost" : 1.0
}
Expand Down
3 changes: 1 addition & 2 deletions gms/impl/src/test/resources/corpUserESSearchRequest.json
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,17 @@
"fuzzy_transpositions" : true,
"lenient" : false,
"zero_terms_query" : "NONE",
"auto_generate_synonyms_phrase_query": true,
"boost" : 1.0
}
}
}
],
"disable_coord" : false,
"adjust_pure_negative" : true,
"boost" : 1.0
}
}
],
"disable_coord" : false,
"adjust_pure_negative" : true,
"boost" : 1.0
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,6 @@
@Component
@EnableKafka
public class MetadataAuditEventsProcessor {
// Doc Type should be updated when ElasticSearch Version is upgraded.
private static final String DOC_TYPE = "doc";

private ElasticsearchConnector elasticSearchConnector;
private SnapshotProcessor snapshotProcessor;
Expand Down Expand Up @@ -122,7 +120,6 @@ private void updateElasticsearch(final Snapshot snapshot) {
continue;
}
elasticEvent.setIndex(indexBuilderForDoc.getDocumentType().getSimpleName().toLowerCase());
elasticEvent.setType(DOC_TYPE);
try {
String urn = indexBuilderForDoc.getDocumentType().getMethod("getUrn").invoke(doc).toString();
elasticEvent.setId(URLEncoder.encode(urn.toLowerCase(), "UTF-8"));
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.linkedin.metadata.kafka.elasticsearch;

import com.linkedin.events.metadata.ChangeType;
import javax.annotation.Nonnull;
import lombok.extern.slf4j.Slf4j;
import org.elasticsearch.action.bulk.BackoffPolicy;
import org.elasticsearch.action.bulk.BulkProcessor;
Expand All @@ -9,12 +10,9 @@
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.threadpool.ThreadPool;

import javax.annotation.Nonnull;

@Slf4j
public class ElasticsearchConnector {
Expand All @@ -39,7 +37,7 @@ public void beforeBulk(long executionId, BulkRequest request) {
@Override
public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
log.info("Successfully feeded bulk request. Number of events: " + response.getItems().length + " Took time ms: "
+ response.getTookInMillis());
+ response.getIngestTookInMillis());
}

@Override
Expand All @@ -48,13 +46,13 @@ public void afterBulk(long executionId, BulkRequest request, Throwable failure)
}
};

ThreadPool threadPool = new ThreadPool(Settings.builder().put(Settings.EMPTY).build());
_bulkProcessor =
new BulkProcessor.Builder(elasticSearchRestClient::bulkAsync, listener, threadPool).setBulkActions(bulkRequestsLimit)
.setFlushInterval(TimeValue.timeValueSeconds(bulkFlushPeriod))
.setBackoffPolicy(BackoffPolicy.constantBackoff(TimeValue.timeValueSeconds(DEFAULT_RETRY_INTERVAL),
DEFAULT_NUMBER_OF_RETRIES))
.build();
_bulkProcessor = BulkProcessor.builder(
(request, bulkListener) -> elasticSearchRestClient.bulkAsync(request, RequestOptions.DEFAULT, bulkListener), listener)
.setBulkActions(bulkRequestsLimit)
.setFlushInterval(TimeValue.timeValueSeconds(bulkFlushPeriod))
.setBackoffPolicy(BackoffPolicy.constantBackoff(TimeValue.timeValueSeconds(DEFAULT_RETRY_INTERVAL),
DEFAULT_NUMBER_OF_RETRIES))
.build();
}

public void feedElasticEvent(@Nonnull ElasticEvent event) {
Expand All @@ -69,18 +67,20 @@ public void feedElasticEvent(@Nonnull ElasticEvent event) {

@Nonnull
private static IndexRequest createIndexRequest(@Nonnull ElasticEvent event) {
return new IndexRequest(event.getIndex(), event.getType(), event.getId()).source(event.buildJson());
return new IndexRequest(event.getIndex()).id(event.getId()).source(event.buildJson());
}

@Nonnull
private static DeleteRequest createDeleteRequest(@Nonnull ElasticEvent event) {
return new DeleteRequest(event.getIndex(), event.getType(), event.getId());
return new DeleteRequest(event.getIndex()).id(event.getId());
}

@Nonnull
private static UpdateRequest createUpsertRequest(@Nonnull ElasticEvent event) {
IndexRequest indexRequest = new IndexRequest(event.getIndex(), event.getType(), event.getId()).source(event.buildJson());
return new UpdateRequest(event.getIndex(), event.getType(),
event.getId()).doc(event.buildJson()).detectNoop(false).upsert(indexRequest).retryOnConflict(3);
final IndexRequest indexRequest = new IndexRequest(event.getIndex()).id(event.getId()).source(event.buildJson());
return new UpdateRequest(event.getIndex(), event.getId()).doc(event.buildJson())
.detectNoop(false)
.upsert(indexRequest);
}
}
}

Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.linkedin.metadata.kafka.elasticsearch;

import com.linkedin.data.template.RecordTemplate;
import org.elasticsearch.common.xcontent.DeprecationHandler;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentBuilder;
import com.linkedin.metadata.dao.utils.RecordUtils;
Expand All @@ -26,11 +27,12 @@ public XContentBuilder buildJson() {
try {
String jsonString = RecordUtils.toJsonString(this._doc);
builder = XContentFactory.jsonBuilder().prettyPrint();
XContentParser parser = XContentFactory.xContent(XContentType.JSON).createParser(NamedXContentRegistry.EMPTY, jsonString);
XContentParser parser = XContentFactory.xContent(XContentType.JSON)
.createParser(NamedXContentRegistry.EMPTY, DeprecationHandler.THROW_UNSUPPORTED_OPERATION, jsonString);
builder.copyCurrentStructure(parser);
} catch (IOException e) {
e.printStackTrace();
}
return builder;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.ContextParser;
import org.elasticsearch.common.xcontent.DeprecationHandler;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.json.JsonXContent;
Expand All @@ -27,8 +28,8 @@
import org.elasticsearch.search.aggregations.bucket.terms.LongTerms;
import org.elasticsearch.search.aggregations.bucket.terms.ParsedStringTerms;
import org.elasticsearch.search.aggregations.bucket.terms.StringTerms;
import org.elasticsearch.search.aggregations.metrics.tophits.ParsedTopHits;
import org.elasticsearch.search.aggregations.metrics.tophits.TopHitsAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.ParsedTopHits;
import org.elasticsearch.search.aggregations.metrics.TopHitsAggregationBuilder;


@Slf4j
Expand All @@ -48,12 +49,6 @@ public static String getWrappedQuery(@Nonnull String query) throws IOException {
return jsonNode.toString();
}

@Nonnull
public static String getUnmodifiedQuery(@Nonnull String query) throws IOException {
JsonNode jsonNode = _objectMapper.readTree(query);
return jsonNode.toString();
}

@Nonnull
public static SearchResponse getSearchResponseFromJSON(@Nonnull String response) throws Exception {
SearchModule searchModule = new SearchModule(Settings.EMPTY, false, Collections.emptyList());
Expand All @@ -64,15 +59,11 @@ public static SearchResponse getSearchResponseFromJSON(@Nonnull String response)
private static SearchResponse getSearchResponsefromNamedContents(@Nonnull List<NamedXContentRegistry.Entry> contents,
@Nonnull String response) throws Exception {
NamedXContentRegistry registry = new NamedXContentRegistry(contents);
XContentParser parser = JsonXContent.jsonXContent.createParser(registry, response);
XContentParser parser =
JsonXContent.jsonXContent.createParser(registry, DeprecationHandler.THROW_UNSUPPORTED_OPERATION, response);
return SearchResponse.fromXContent(parser);
}

@Nonnull
public static SearchResponse getSearchResponseFromJSONWithAgg(@Nonnull String response) throws Exception {
return getSearchResponsefromNamedContents(getDefaultNamedXContents(), response);
}

@Nonnull
private static List<NamedXContentRegistry.Entry> getDefaultNamedXContents() {
Map<String, ContextParser<Object, ? extends Aggregation>> map = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@ private TestUtils() {

@Nonnull
public static String loadJsonFromResource(@Nonnull String resourceName) throws IOException {
return IOUtils.toString(ClassLoader.getSystemResourceAsStream(resourceName), Charset.defaultCharset());
final String jsonStr =
IOUtils.toString(ClassLoader.getSystemResourceAsStream(resourceName), Charset.defaultCharset());
return jsonStr.replaceAll("\\s+", "");
}
}

0 comments on commit 0a48d5c

Please sign in to comment.