Skip to content

Commit 2894e2b

Browse files
author
John Plaisted
authored
(feat) Simple python script to carry over ES indices from 5 to 7. (#2192)
1 parent e3ad0ed commit 2894e2b

File tree

1 file changed

+240
-0
lines changed

1 file changed

+240
-0
lines changed
Lines changed: 240 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,240 @@
1+
# Copies indices (settings, mappings, and optionally data) from a 5 cluster to a 7 cluster.
2+
# Note that when copying data, the copied is performed through this machine, meaning all data is downloaded from 5,
3+
# and then uploaded to 7. This can be a very slow process if you have a lot of data, and is recommended you only do
4+
# this for small indices as a result.
5+
6+
# Requires python 3+ and elasticsearch's python lib to be installed (pip install elasticsearch).
7+
8+
import argparse
9+
import elasticsearch
10+
import elasticsearch.helpers
11+
import ssl
12+
import time
13+
14+
parser = argparse.ArgumentParser(description="Transfers ES indexes between clusters.")
15+
parser.add_argument('-s', '--source', required=True, help='Source cluster URL and port.')
16+
parser.add_argument('-d', '--dest', required=True, help='Destination cluster URL and port.')
17+
parser.add_argument('--source-ssl', required=False, default=True, help='Enables / disables source SSL.')
18+
parser.add_argument('--dest-ssl', required=False, default=True, help='Enables / disables destination SSL.')
19+
parser.add_argument('--cert-file', required=False, default=None, help='Cert file to use with SSL.')
20+
parser.add_argument('--key-file', required=False, default=None, help='Key file to use with SSL.')
21+
parser.add_argument('--ca-file', required=False, default=None, help='Certificate authority file to use for SSL.')
22+
parser.add_argument('--create-only', required=False, default=False, help='If true, only create the index (with settings/mappings/aliases).')
23+
parser.add_argument('-i', '--indices', required=False, default="*", help='Regular expression for indexes to copy.')
24+
parser.add_argument('--name-override', required=False, default=None, help='destination index name override')
25+
26+
args = parser.parse_args()
27+
28+
29+
def create_ssl_context():
30+
if args.cert_file is None:
31+
raise Error('--cert-file is required with SSL.')
32+
if args.key_file is None:
33+
raise Error('--key-file is required with SSL.')
34+
if args.ca_file is None:
35+
raise Error('--ca-file is required with SSL.')
36+
37+
context = ssl.create_default_context(
38+
ssl.Purpose.SERVER_AUTH,
39+
cafile=args.ca_file
40+
)
41+
context.load_cert_chain(
42+
certfile=args.cert_file,
43+
keyfile=args.key_file
44+
)
45+
46+
return context
47+
48+
49+
def create_client(host, ssl_context):
50+
return elasticsearch.Elasticsearch(
51+
[host],
52+
ssl_context=ssl_context
53+
)
54+
55+
56+
class EsClients:
57+
def __init__(self, source_client, dest_client):
58+
self.source_client = source_client
59+
self.dest_client = dest_client
60+
61+
62+
def get_index_settings(client, pattern):
63+
indices = elasticsearch.client.IndicesClient(client).get(pattern)
64+
return indices
65+
66+
67+
def clean_settings(config):
68+
# Settings set by the server that we can read, but not write.
69+
del config['settings']['index']['provided_name']
70+
del config['settings']['index']['version']
71+
del config['settings']['index']['creation_date']
72+
del config['settings']['index']['uuid']
73+
return config
74+
75+
76+
def find_max_ngram_diff_helper(obj):
77+
# Finds the greatest diff in ngram settings and returns the value. In Elasticsearch 7, an upper bound must be
78+
# explicitly set.
79+
if not isinstance(obj, dict):
80+
return -1
81+
82+
diff = -1
83+
84+
if 'min_gram' in obj and 'max_gram' in obj:
85+
diff = int(obj['max_gram']) - int(obj['min_gram'])
86+
87+
for value in obj.values():
88+
t = find_max_ngram_diff_helper(value)
89+
diff = max(t, diff)
90+
91+
return diff
92+
93+
94+
def find_max_ngram_diff(config):
95+
settings = config['settings']
96+
return find_max_ngram_diff_helper(settings)
97+
98+
99+
def update_for_seven(config):
100+
# Updates settings and mappings for Elasticsearch 7.
101+
102+
# Should only be one value in 5 - the doc type. Unwrap for 7; document types are deprecated.
103+
config['mappings'] = next(iter(config['mappings'].values()))
104+
105+
# Need to set max_ngram_diff if any ngram diffs are more than 1.
106+
max_ngram = find_max_ngram_diff(config)
107+
if max_ngram > 1:
108+
config['settings']['index']['max_ngram_diff'] = max_ngram
109+
110+
# _all is deprecated and also false by default; so not even explicitly needed...
111+
if '_all' in config['mappings']:
112+
enabled = config['mappings']['_all']['enabled']
113+
if enabled:
114+
raise Error('_all is enabled')
115+
del config['mappings']['_all']
116+
117+
return config
118+
119+
120+
def create_index(client, name, config, name_override=None):
121+
name_override = name if name_override is None else name_override
122+
# Creates the given index on the client.
123+
indices_client = elasticsearch.client.IndicesClient(client)
124+
if indices_client.exists(name_override):
125+
print('WARNING: Index %s already exists!' % name_override)
126+
return
127+
indices_client.create(name_override, body=config)
128+
129+
130+
timing_samples = []
131+
132+
# Copy pasted from source code so that we can transform documents while copying
133+
def reindex(
134+
client,
135+
source_index,
136+
target_index,
137+
query=None,
138+
target_client=None,
139+
chunk_size=500,
140+
scroll="5m",
141+
scan_kwargs={},
142+
bulk_kwargs={},
143+
):
144+
# Like the elasticsearch.helpers.reindex function, but with some custom logic. Namely, allows for source/dest
145+
# indices to be on different clusters, prints status updates, and deletes the _type field.
146+
147+
target_client = client if target_client is None else target_client
148+
docs = elasticsearch.helpers.scan(client, query=query, index=source_index, scroll=scroll, **scan_kwargs)
149+
150+
start = time.time()
151+
count = 0
152+
count_at_last_update = 0
153+
last_print = start
154+
update_interval = 5
155+
156+
def _change_doc_index(hits, index):
157+
for h in hits:
158+
h["_index"] = index
159+
if "fields" in h:
160+
h.update(h.pop("fields"))
161+
162+
# TODO: Need to remove "_type" otherwise it complains about keyword becoming text? Is this legitimate?
163+
if "_type" in h:
164+
del h["_type"]
165+
166+
nonlocal count
167+
nonlocal last_print
168+
nonlocal count_at_last_update
169+
count = count + 1
170+
171+
# Use a window of samples to average over.
172+
if (time.time() - last_print) > update_interval:
173+
timing_samples.append((count - count_at_last_update) / (time.time() - last_print))
174+
if len(timing_samples) > 10:
175+
timing_samples.pop(0)
176+
count_at_last_update = count
177+
last_print = time.time()
178+
print('Transferring %s docs/second. Total %s.' % (sum(timing_samples) / len(timing_samples), count))
179+
180+
yield h
181+
182+
kwargs = {"stats_only": True}
183+
kwargs.update(bulk_kwargs)
184+
return elasticsearch.helpers.bulk(
185+
target_client,
186+
_change_doc_index(docs, target_index),
187+
chunk_size=chunk_size,
188+
raise_on_error=False,
189+
**kwargs
190+
)
191+
192+
193+
def copy_index_data(clients, index, name_override):
194+
# Copies all documents from the source to the dest index.
195+
name_override = index if name_override is None else name_override
196+
print('Copying index %s' % index)
197+
start = time.time()
198+
res = reindex(
199+
clients.source_client,
200+
index,
201+
name_override,
202+
target_client=clients.dest_client
203+
)
204+
end = time.time()
205+
print('Documents written %s. Errors %s.' % res)
206+
print('Took %s seconds.' % (end - start))
207+
208+
209+
def main():
210+
ssl_context=create_ssl_context()
211+
source_ssl_context = ssl_context if args.source_ssl else None
212+
dest_ssl_context = ssl_context if args.dest_ssl else None
213+
clients = EsClients(create_client(args.source, source_ssl_context), create_client(args.dest, dest_ssl_context))
214+
indices = get_index_settings(clients.source_client, args.indices)
215+
216+
def by_index(item):
217+
return item[0]
218+
219+
# Sort for repeatability, and to make it easy to restart part way if the script failed.
220+
indexSorted = list(indices.items())
221+
indexSorted.sort(key=by_index)
222+
223+
for index, config in indexSorted:
224+
# Skip this "hidden" index that is listed for some reason.
225+
if index == '.kibana':
226+
continue
227+
228+
config = clean_settings(config)
229+
config = update_for_seven(config)
230+
print('Creating index %s' % (index if args.name_override is None else args.name_override))
231+
create_index(clients.dest_client, index, config, args.name_override)
232+
233+
if args.create_only:
234+
return
235+
236+
for index, config in indexSorted:
237+
copy_index_data(clients, index, args.name_override)
238+
239+
240+
main()

0 commit comments

Comments
 (0)