Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(feat) Simple python script to carry over ES indices from 5 to 7. #2192

Merged
merged 1 commit into from
Mar 8, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
240 changes: 240 additions & 0 deletions contrib/elasticsearch/es7-upgrade/transfer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,240 @@
# Copies indices (settings, mappings, and optionally data) from a 5 cluster to a 7 cluster.
# Note that when copying data, the copied is performed through this machine, meaning all data is downloaded from 5,
# and then uploaded to 7. This can be a very slow process if you have a lot of data, and is recommended you only do
# this for small indices as a result.

# Requires python 3+ and elasticsearch's python lib to be installed (pip install elasticsearch).

import argparse
import elasticsearch
import elasticsearch.helpers
import ssl
import time

parser = argparse.ArgumentParser(description="Transfers ES indexes between clusters.")
parser.add_argument('-s', '--source', required=True, help='Source cluster URL and port.')
parser.add_argument('-d', '--dest', required=True, help='Destination cluster URL and port.')
parser.add_argument('--source-ssl', required=False, default=True, help='Enables / disables source SSL.')
parser.add_argument('--dest-ssl', required=False, default=True, help='Enables / disables destination SSL.')
parser.add_argument('--cert-file', required=False, default=None, help='Cert file to use with SSL.')
parser.add_argument('--key-file', required=False, default=None, help='Key file to use with SSL.')
parser.add_argument('--ca-file', required=False, default=None, help='Certificate authority file to use for SSL.')
parser.add_argument('--create-only', required=False, default=False, help='If true, only create the index (with settings/mappings/aliases).')
parser.add_argument('-i', '--indices', required=False, default="*", help='Regular expression for indexes to copy.')
parser.add_argument('--name-override', required=False, default=None, help='destination index name override')

args = parser.parse_args()


def create_ssl_context():
if args.cert_file is None:
raise Error('--cert-file is required with SSL.')
if args.key_file is None:
raise Error('--key-file is required with SSL.')
if args.ca_file is None:
raise Error('--ca-file is required with SSL.')

context = ssl.create_default_context(
ssl.Purpose.SERVER_AUTH,
cafile=args.ca_file
)
context.load_cert_chain(
certfile=args.cert_file,
keyfile=args.key_file
)

return context


def create_client(host, ssl_context):
return elasticsearch.Elasticsearch(
[host],
ssl_context=ssl_context
)


class EsClients:
def __init__(self, source_client, dest_client):
self.source_client = source_client
self.dest_client = dest_client


def get_index_settings(client, pattern):
indices = elasticsearch.client.IndicesClient(client).get(pattern)
return indices


def clean_settings(config):
# Settings set by the server that we can read, but not write.
del config['settings']['index']['provided_name']
del config['settings']['index']['version']
del config['settings']['index']['creation_date']
del config['settings']['index']['uuid']
return config


def find_max_ngram_diff_helper(obj):
# Finds the greatest diff in ngram settings and returns the value. In Elasticsearch 7, an upper bound must be
# explicitly set.
if not isinstance(obj, dict):
return -1

diff = -1

if 'min_gram' in obj and 'max_gram' in obj:
diff = int(obj['max_gram']) - int(obj['min_gram'])

for value in obj.values():
t = find_max_ngram_diff_helper(value)
diff = max(t, diff)

return diff


def find_max_ngram_diff(config):
settings = config['settings']
return find_max_ngram_diff_helper(settings)


def update_for_seven(config):
# Updates settings and mappings for Elasticsearch 7.

# Should only be one value in 5 - the doc type. Unwrap for 7; document types are deprecated.
config['mappings'] = next(iter(config['mappings'].values()))

# Need to set max_ngram_diff if any ngram diffs are more than 1.
max_ngram = find_max_ngram_diff(config)
if max_ngram > 1:
config['settings']['index']['max_ngram_diff'] = max_ngram

# _all is deprecated and also false by default; so not even explicitly needed...
if '_all' in config['mappings']:
enabled = config['mappings']['_all']['enabled']
if enabled:
raise Error('_all is enabled')
del config['mappings']['_all']

return config


def create_index(client, name, config, name_override=None):
name_override = name if name_override is None else name_override
# Creates the given index on the client.
indices_client = elasticsearch.client.IndicesClient(client)
if indices_client.exists(name_override):
print('WARNING: Index %s already exists!' % name_override)
return
indices_client.create(name_override, body=config)


timing_samples = []

# Copy pasted from source code so that we can transform documents while copying
def reindex(
client,
source_index,
target_index,
query=None,
target_client=None,
chunk_size=500,
scroll="5m",
scan_kwargs={},
bulk_kwargs={},
):
# Like the elasticsearch.helpers.reindex function, but with some custom logic. Namely, allows for source/dest
# indices to be on different clusters, prints status updates, and deletes the _type field.

target_client = client if target_client is None else target_client
docs = elasticsearch.helpers.scan(client, query=query, index=source_index, scroll=scroll, **scan_kwargs)

start = time.time()
count = 0
count_at_last_update = 0
last_print = start
update_interval = 5

def _change_doc_index(hits, index):
for h in hits:
h["_index"] = index
if "fields" in h:
h.update(h.pop("fields"))

# TODO: Need to remove "_type" otherwise it complains about keyword becoming text? Is this legitimate?
if "_type" in h:
del h["_type"]

nonlocal count
nonlocal last_print
nonlocal count_at_last_update
count = count + 1

# Use a window of samples to average over.
if (time.time() - last_print) > update_interval:
timing_samples.append((count - count_at_last_update) / (time.time() - last_print))
if len(timing_samples) > 10:
timing_samples.pop(0)
count_at_last_update = count
last_print = time.time()
print('Transferring %s docs/second. Total %s.' % (sum(timing_samples) / len(timing_samples), count))

yield h

kwargs = {"stats_only": True}
kwargs.update(bulk_kwargs)
return elasticsearch.helpers.bulk(
target_client,
_change_doc_index(docs, target_index),
chunk_size=chunk_size,
raise_on_error=False,
**kwargs
)


def copy_index_data(clients, index, name_override):
# Copies all documents from the source to the dest index.
name_override = index if name_override is None else name_override
print('Copying index %s' % index)
start = time.time()
res = reindex(
clients.source_client,
index,
name_override,
target_client=clients.dest_client
)
end = time.time()
print('Documents written %s. Errors %s.' % res)
print('Took %s seconds.' % (end - start))


def main():
ssl_context=create_ssl_context()
source_ssl_context = ssl_context if args.source_ssl else None
dest_ssl_context = ssl_context if args.dest_ssl else None
clients = EsClients(create_client(args.source, source_ssl_context), create_client(args.dest, dest_ssl_context))
indices = get_index_settings(clients.source_client, args.indices)

def by_index(item):
return item[0]

# Sort for repeatability, and to make it easy to restart part way if the script failed.
indexSorted = list(indices.items())
indexSorted.sort(key=by_index)

for index, config in indexSorted:
# Skip this "hidden" index that is listed for some reason.
if index == '.kibana':
continue

config = clean_settings(config)
config = update_for_seven(config)
print('Creating index %s' % (index if args.name_override is None else args.name_override))
create_index(clients.dest_client, index, config, args.name_override)

if args.create_only:
return

for index, config in indexSorted:
copy_index_data(clients, index, args.name_override)


main()