Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add Elasticsearch 7 support #2092

Merged
merged 5 commits into from
Feb 10, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,15 @@ project.ext.externalDependency = [
'commonsLang': 'commons-lang:commons-lang:2.6',
'ebean': 'io.ebean:ebean:11.33.3',
'ebeanAgent': 'io.ebean:ebean-agent:11.27.1',
'elasticSearchRest': 'org.elasticsearch.client:elasticsearch-rest-high-level-client:5.6.8',
'elasticSearchTransport': 'org.elasticsearch.client:transport:5.6.8',
'elasticSearchRest': 'org.elasticsearch.client:elasticsearch-rest-high-level-client:7.9.3',
jplaisted marked this conversation as resolved.
Show resolved Hide resolved
'elasticSearchTransport': 'org.elasticsearch.client:transport:7.9.3',
'findbugsAnnotations': 'com.google.code.findbugs:annotations:3.0.1',
'gmaCoreModels': "com.linkedin.datahub-gma:core-models-data-template:$gmaVersion",
'gmaDaoApi': "com.linkedin.datahub-gma:dao-api:$gmaVersion",
'gmaDaoApiDataTemplate': "com.linkedin.datahub-gma:dao-api-data-template:$gmaVersion",
'gmaEbeanDao': "com.linkedin.datahub-gma:ebean-dao:$gmaVersion",
'gmaElasticsearchDao': "com.linkedin.datahub-gma:elasticsearch-dao:$gmaVersion",
'gmaElasticsearchIntegTest': "com.linkedin.datahub-gma:elasticsearch-dao-integ-testing-docker:$gmaVersion",
'gmaElasticsearchDao': "com.linkedin.datahub-gma:elasticsearch-dao-7:$gmaVersion",
'gmaElasticsearchIntegTest': "com.linkedin.datahub-gma:elasticsearch-dao-integ-testing-docker-7:$gmaVersion",
'gmaNeo4jDao': "com.linkedin.datahub-gma:neo4j-dao:$gmaVersion",
'gmaRestliResources': "com.linkedin.datahub-gma:restli-resources:$gmaVersion",
'gmaRestliResourcesDataTemplate': "com.linkedin.datahub-gma:restli-resources-data-template:$gmaVersion",
Expand Down
15 changes: 15 additions & 0 deletions docs/advanced/es-7-upgrade.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# Elasticsearch upgrade from 5.6.8 to 7.9.3: Summary of changes

### Search index mapping & settings
- Removal of mapping types (as mentioned [here](https://www.elastic.co/guide/en/elasticsearch/reference/current/removal-of-types.html))
- Specify the maximum allowed difference between `min_gram` and `max_gram` for NGramTokenizer and NGramTokenFilter by adding property `max_ngram_diff` in index settings, particularly if the difference is greater than 1 (as mentioned [here](https://www.elastic.co/guide/en/elasticsearch/reference/current/index-modules.html))


### Search query
The following parameters are/were `optional` and hence automatically populated in the search query. Some tests that expect a certain search query to be sent to ES will change with the ES upgrade.
- `disable_coord` parameter of the `bool` and `common_terms` queries has been removed (as mentioned [here](https://www.elastic.co/guide/en/elasticsearch/reference/6.8/breaking-changes-6.0.html))
- `auto_generate_synonyms_phrase_query` parameter in `match` query is added with a default value of `true` (as mentioned [here](https://www.elastic.co/guide/en/elasticsearch/reference/7.x/query-dsl-match-query.html))

### Java High Level Rest Client
- In 7.9.3, Java High Level Rest Client instance needs a REST low-level client builder to be built. In 5.6.8, the same instance needs REST low-level client
- Document APIs such as the Index API, Delete API, etc no longer takes the doc `type` as an input
Original file line number Diff line number Diff line change
Expand Up @@ -33,31 +33,30 @@ public class RestHighLevelClientFactory {
@Nonnull
protected RestHighLevelClient createInstance() {
try {
RestClient restClient = loadRestHttpClient(
elasticSearchHost,
elasticSearchPort,
elasticSearchThreadCount,
elasticSearchConectionRequestTimeout
RestClientBuilder restClientBuilder = loadRestHttpClient(
elasticSearchHost,
elasticSearchPort,
elasticSearchThreadCount,
elasticSearchConectionRequestTimeout
);

return new RestHighLevelClient(restClient);
return new RestHighLevelClient(restClientBuilder);
} catch (Exception e) {
throw new RuntimeException("Error: RestClient is not properly initialized. " + e.toString());
}
}

@Nonnull
private static RestClient loadRestHttpClient(@Nonnull String host, int port, int threadCount,
int connectionRequestTimeout) throws Exception {
private static RestClientBuilder loadRestHttpClient(@Nonnull String host, int port, int threadCount,
int connectionRequestTimeout) throws Exception {
RestClientBuilder builder = RestClient.builder(new HttpHost(host, port, "http"))
.setHttpClientConfigCallback(httpAsyncClientBuilder ->
httpAsyncClientBuilder.setDefaultIOReactorConfig(IOReactorConfig.custom()
.setIoThreadCount(threadCount).build()));
.setHttpClientConfigCallback(httpAsyncClientBuilder ->
httpAsyncClientBuilder.setDefaultIOReactorConfig(IOReactorConfig.custom()
.setIoThreadCount(threadCount).build()));

builder.setRequestConfigCallback(requestConfigBuilder -> requestConfigBuilder.
setConnectionRequestTimeout(connectionRequestTimeout));
setConnectionRequestTimeout(connectionRequestTimeout));

return builder.build();
return builder;
}
}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
{
"index": {
"max_ngram_diff": 47,
jplaisted marked this conversation as resolved.
Show resolved Hide resolved
"analysis": {
"filter": {
"autocomplete_filter": {
Expand Down
1 change: 1 addition & 0 deletions gms/impl/src/main/resources/index/dataset/settings.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
{
"index": {
"max_ngram_diff": 47,
"analysis": {
"filter": {
"autocomplete_filter": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
},
"post_filter" : {
"bool" : {
"disable_coord" : false,
jplaisted marked this conversation as resolved.
Show resolved Hide resolved
"adjust_pure_negative" : true,
"boost" : 1.0
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
},
"post_filter" : {
"bool" : {
"disable_coord" : false,
"adjust_pure_negative" : true,
"boost" : 1.0
}
Expand Down
3 changes: 1 addition & 2 deletions gms/impl/src/test/resources/corpUserESSearchRequest.json
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,17 @@
"fuzzy_transpositions" : true,
"lenient" : false,
"zero_terms_query" : "NONE",
"auto_generate_synonyms_phrase_query": true,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar question: what is the default value for this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Optional, Boolean) If true, match phrase queries are automatically created for multi-term synonyms. Defaults to true. See Synonyms for an example.

I don't know why this was added, I copied it from Jyoti's PR. I can remove it and see what happens?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, this is in test data. Yes, I'm guessing the ES java library makes some defaults explicit.

"boost" : 1.0
}
}
}
],
"disable_coord" : false,
"adjust_pure_negative" : true,
"boost" : 1.0
}
}
],
"disable_coord" : false,
"adjust_pure_negative" : true,
"boost" : 1.0
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,6 @@
@Component
@EnableKafka
public class MetadataAuditEventsProcessor {
// Doc Type should be updated when ElasticSearch Version is upgraded.
private static final String DOC_TYPE = "doc";

private ElasticsearchConnector elasticSearchConnector;
private SnapshotProcessor snapshotProcessor;
Expand Down Expand Up @@ -122,7 +120,6 @@ private void updateElasticsearch(final Snapshot snapshot) {
continue;
}
elasticEvent.setIndex(indexBuilderForDoc.getDocumentType().getSimpleName().toLowerCase());
elasticEvent.setType(DOC_TYPE);
try {
String urn = indexBuilderForDoc.getDocumentType().getMethod("getUrn").invoke(doc).toString();
elasticEvent.setId(URLEncoder.encode(urn.toLowerCase(), "UTF-8"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.ContextParser;
import org.elasticsearch.common.xcontent.DeprecationHandler;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.json.JsonXContent;
Expand All @@ -27,8 +28,8 @@
import org.elasticsearch.search.aggregations.bucket.terms.LongTerms;
import org.elasticsearch.search.aggregations.bucket.terms.ParsedStringTerms;
import org.elasticsearch.search.aggregations.bucket.terms.StringTerms;
import org.elasticsearch.search.aggregations.metrics.tophits.ParsedTopHits;
import org.elasticsearch.search.aggregations.metrics.tophits.TopHitsAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.ParsedTopHits;
import org.elasticsearch.search.aggregations.metrics.TopHitsAggregationBuilder;


@Slf4j
Expand All @@ -48,12 +49,6 @@ public static String getWrappedQuery(@Nonnull String query) throws IOException {
return jsonNode.toString();
}

@Nonnull
public static String getUnmodifiedQuery(@Nonnull String query) throws IOException {
JsonNode jsonNode = _objectMapper.readTree(query);
return jsonNode.toString();
}

@Nonnull
public static SearchResponse getSearchResponseFromJSON(@Nonnull String response) throws Exception {
SearchModule searchModule = new SearchModule(Settings.EMPTY, false, Collections.emptyList());
Expand All @@ -64,15 +59,11 @@ public static SearchResponse getSearchResponseFromJSON(@Nonnull String response)
private static SearchResponse getSearchResponsefromNamedContents(@Nonnull List<NamedXContentRegistry.Entry> contents,
@Nonnull String response) throws Exception {
NamedXContentRegistry registry = new NamedXContentRegistry(contents);
XContentParser parser = JsonXContent.jsonXContent.createParser(registry, response);
XContentParser parser =
JsonXContent.jsonXContent.createParser(registry, DeprecationHandler.THROW_UNSUPPORTED_OPERATION, response);
return SearchResponse.fromXContent(parser);
}

@Nonnull
public static SearchResponse getSearchResponseFromJSONWithAgg(@Nonnull String response) throws Exception {
return getSearchResponsefromNamedContents(getDefaultNamedXContents(), response);
}

@Nonnull
private static List<NamedXContentRegistry.Entry> getDefaultNamedXContents() {
Map<String, ContextParser<Object, ? extends Aggregation>> map = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@ private TestUtils() {

@Nonnull
public static String loadJsonFromResource(@Nonnull String resourceName) throws IOException {
return IOUtils.toString(ClassLoader.getSystemResourceAsStream(resourceName), Charset.defaultCharset());
final String jsonStr =
IOUtils.toString(ClassLoader.getSystemResourceAsStream(resourceName), Charset.defaultCharset());
return jsonStr.replaceAll("\\s+", "");
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -11,20 +11,19 @@
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.threadpool.ThreadPool;

import javax.annotation.Nonnull;
import java.util.List;

@Slf4j
public class ElasticsearchConnector {

private RestClient _restClient;
private RestClientBuilder _restClient;
private RestHighLevelClient _client;

private Integer _esPort;
Expand Down Expand Up @@ -69,7 +68,7 @@ public void beforeBulk(long executionId, BulkRequest request) {
@Override
public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
log.info("Successfully feeded bulk request. Number of events: " + response.getItems().length + " Took time ms: "
+ response.getTookInMillis());
+ response.getIngestTookInMillis());
}

@Override
Expand All @@ -78,13 +77,13 @@ public void afterBulk(long executionId, BulkRequest request, Throwable failure)
}
};

ThreadPool threadPool = new ThreadPool(Settings.builder().put(Settings.EMPTY).build());
_bulkProcessor =
new BulkProcessor.Builder(_client::bulkAsync, listener, threadPool).setBulkActions(_bulkRequestsLimit)
.setFlushInterval(TimeValue.timeValueSeconds(_bulkFlushPeriod))
.setBackoffPolicy(BackoffPolicy.constantBackoff(TimeValue.timeValueSeconds(DEFAULT_RETRY_INTERVAL),
DEFAULT_NUMBER_OF_RETRIES))
.build();
_bulkProcessor = BulkProcessor.builder(
(request, bulkListener) -> _client.bulkAsync(request, RequestOptions.DEFAULT, bulkListener), listener)
.setBulkActions(_bulkRequestsLimit)
.setFlushInterval(TimeValue.timeValueSeconds(_bulkFlushPeriod))
.setBackoffPolicy(BackoffPolicy.constantBackoff(TimeValue.timeValueSeconds(DEFAULT_RETRY_INTERVAL),
DEFAULT_NUMBER_OF_RETRIES))
.build();
}

public void feedElasticEvent(@Nonnull ElasticEvent event) {
Expand All @@ -99,23 +98,24 @@ public void feedElasticEvent(@Nonnull ElasticEvent event) {

@Nonnull
private static IndexRequest createIndexRequest(@Nonnull ElasticEvent event) {
return new IndexRequest(event.getIndex(), event.getType(), event.getId()).source(event.buildJson());
return new IndexRequest(event.getIndex()).id(event.getId()).source(event.buildJson());
}

@Nonnull
private static DeleteRequest createDeleteRequest(@Nonnull ElasticEvent event) {
return new DeleteRequest(event.getIndex(), event.getType(), event.getId());
return new DeleteRequest(event.getIndex()).id(event.getId());
}

@Nonnull
private static UpdateRequest createUpsertRequest(@Nonnull ElasticEvent event) {
IndexRequest indexRequest = new IndexRequest(event.getIndex(), event.getType(), event.getId()).source(event.buildJson());
return new UpdateRequest(event.getIndex(), event.getType(),
event.getId()).doc(event.buildJson()).detectNoop(false).upsert(indexRequest).retryOnConflict(3);
final IndexRequest indexRequest = new IndexRequest(event.getIndex()).id(event.getId()).source(event.buildJson());
return new UpdateRequest(event.getIndex(), event.getId()).doc(event.buildJson())
.detectNoop(false)
.upsert(indexRequest);
}

@Nonnull
private static RestClient loadRestHttpClient(String[] hosts, Integer port, int threadCount) {
private static RestClientBuilder loadRestHttpClient(String[] hosts, Integer port, int threadCount) {

HttpHost[] httpHosts = new HttpHost[hosts.length];
for (int h = 0; h < hosts.length; h++) {
Expand All @@ -129,6 +129,6 @@ private static RestClient loadRestHttpClient(String[] hosts, Integer port, int t
// TODO: Configure timeouts
builder.setRequestConfigCallback(requestConfigBuilder -> requestConfigBuilder.setConnectionRequestTimeout(0));

return builder.build();
return builder;
}
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.linkedin.metadata.utils.elasticsearch;

import com.linkedin.data.template.RecordTemplate;
import org.elasticsearch.common.xcontent.DeprecationHandler;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentBuilder;
import com.linkedin.metadata.dao.utils.RecordUtils;
Expand All @@ -26,11 +27,12 @@ public XContentBuilder buildJson() {
try {
String jsonString = RecordUtils.toJsonString(this._doc);
builder = XContentFactory.jsonBuilder().prettyPrint();
XContentParser parser = XContentFactory.xContent(XContentType.JSON).createParser(NamedXContentRegistry.EMPTY, jsonString);
XContentParser parser = XContentFactory.xContent(XContentType.JSON)
.createParser(NamedXContentRegistry.EMPTY, DeprecationHandler.THROW_UNSUPPORTED_OPERATION, jsonString);
builder.copyCurrentStructure(parser);
} catch (IOException e) {
e.printStackTrace();
}
return builder;
}
}
}