forked from zulip/zulip
-
Notifications
You must be signed in to change notification settings - Fork 0
/
check-mirroring
executable file
·354 lines (317 loc) · 12.6 KB
/
check-mirroring
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
#!/usr/bin/env python
import sys
import time
import optparse
import os
import random
import logging
import subprocess
import hashlib
parser = optparse.OptionParser()
parser.add_option('--verbose',
dest='verbose',
default=False,
action='store_true')
parser.add_option('--site',
dest='site',
default="https://api.zulip.com",
action='store')
parser.add_option('--sharded',
default=False,
action='store_true')
parser.add_option('--root-path',
dest='root_path',
default="/home/zulip",
action='store')
(options, args) = parser.parse_args()
# The 'api' directory needs to go first, so that 'import zulip' won't pick up
# some other directory named 'zulip'.
pyzephyr_lib_path = "python-zephyr/build/lib.linux-%s-%s/" % (os.uname()[4], sys.version[0:3])
sys.path[:0] = [os.path.join(options.root_path, "api/"),
os.path.join(options.root_path, "python-zephyr"),
os.path.join(options.root_path, pyzephyr_lib_path),
options.root_path]
sys.path.append(".")
import zulip
zulip_client = zulip.Client(
email=zulip_user,
api_key="xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx",
verbose=True,
client="ZulipMonitoring/0.1",
site=options.site)
# Configure logging
log_file = "/var/log/zulip/check-mirroring-log"
log_format = "%(asctime)s: %(message)s"
logging.basicConfig(format=log_format)
formatter = logging.Formatter(log_format)
file_handler = logging.FileHandler(log_file)
file_handler.setFormatter(formatter)
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
logger.addHandler(file_handler)
# Initialize list of streams to test
if options.sharded:
# NOTE: Streams in this list must be in zulip_user's Zulip
# subscriptions, or we won't receive messages via Zulip.
# The sharded stream list has a bunch of pairs
# (stream, shard_name), where sha1sum(stream).startswith(shard_name)
test_streams = [
("message", "p"),
("tabbott-nagios-test-32", "0"),
("tabbott-nagios-test-33", "1"),
("tabbott-nagios-test-2", "2"),
("tabbott-nagios-test-5", "3"),
("tabbott-nagios-test-13", "4"),
("tabbott-nagios-test-7", "5"),
("tabbott-nagios-test-22", "6"),
("tabbott-nagios-test-35", "7"),
("tabbott-nagios-test-4", "8"),
("tabbott-nagios-test-3", "9"),
("tabbott-nagios-test-1", "a"),
("tabbott-nagios-test-49", "b"),
("tabbott-nagios-test-34", "c"),
("tabbott-nagios-test-12", "d"),
("tabbott-nagios-test-11", "e"),
("tabbott-nagios-test-9", "f"),
]
for (stream, test) in test_streams:
if stream == "message":
continue
assert(hashlib.sha1(stream).hexdigest().startswith(test))
else:
test_streams = [
("message", "p"),
("tabbott-nagios-test", "a"),
]
def print_status_and_exit(status):
# The output of this script is used by Nagios. Various outputs,
# e.g. true success and punting due to a SERVNAK, result in a
# non-alert case, so to give us something unambiguous to check in
# Nagios, print the exit status.
print status
sys.exit(status)
def send_zulip(message):
result = zulip_client.send_message(message)
if result["result"] != "success":
logger.error("Error sending zulip, args were:")
logger.error(message)
logger.error(result)
print_status_and_exit(1)
# Returns True if and only if we "Detected server failure" sending the zephyr.
def send_zephyr(zwrite_args, content):
p = subprocess.Popen(zwrite_args, stdin=subprocess.PIPE,
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stdout, stderr = p.communicate(input=content.encode("utf-8"))
if p.returncode != 0:
if "Detected server failure while receiving acknowledgement for" in stdout:
logger.warning("Got server failure error sending zephyr; retrying")
logger.warning(stderr)
return True
logger.error("Error sending zephyr:")
logger.info(stdout)
logger.error(stderr)
print_status_and_exit(1)
return False
# Subscribe to Zulip
try:
res = zulip_client.register(event_types=["message"])
if 'error' in res.get('result'):
logging.error("Error subscribing to Zulips!")
logging.error(res['msg'])
print_status_and_exit(1)
queue_id, last_event_id = (res['queue_id'], res['last_event_id'])
except Exception:
logger.exception("Unexpected error subscribing to Zulips")
print_status_and_exit(1)
# Subscribe to Zephyrs
import zephyr
zephyr_subs_to_add = []
for (stream, test) in test_streams:
if stream == "message":
zephyr_subs_to_add.append((stream, 'personal', mit_user))
else:
zephyr_subs_to_add.append((stream, '*', '*'))
actually_subscribed = False
for tries in xrange(10):
try:
zephyr.init()
zephyr._z.subAll(zephyr_subs_to_add)
zephyr_subs = zephyr._z.getSubscriptions()
missing = 0
for elt in zephyr_subs_to_add:
if elt not in zephyr_subs:
logging.error("Failed to subscribe to %s" % (elt,))
missing += 1
if missing == 0:
actually_subscribed = True
break
except IOError, e:
if "SERVNAK received" in e:
logger.error("SERVNAK repeatedly received, punting rest of test")
else:
logger.exception("Exception subscribing to zephyrs")
if not actually_subscribed:
logger.error("Failed to subscribe to zephyrs")
print_status_and_exit(1)
# Prepare keys
zhkeys = {}
hzkeys = {}
def gen_key(key_dict):
bits = str(random.getrandbits(32))
while bits in key_dict:
# Avoid the unlikely event that we get the same bits twice
bits = str(random.getrandbits(32))
return bits
def gen_keys(key_dict):
for (stream, test) in test_streams:
key_dict[gen_key(key_dict)] = (stream, test)
gen_keys(zhkeys)
gen_keys(hzkeys)
notices = []
# We check for new zephyrs multiple times, to avoid filling the zephyr
# receive queue with 30+ messages, which might result in messages
# being dropped.
def receive_zephyrs():
while True:
try:
notice = zephyr.receive(block=False)
except Exception:
logging.exception("Exception receiving zephyrs:")
notice = None
if notice is None:
break
if notice.opcode != "":
continue
notices.append(notice)
logger.info("Starting sending messages!")
# Send zephyrs
zsig = "Timothy Good Abbott"
for key, (stream, test) in zhkeys.items():
if stream == "message":
zwrite_args = ["zwrite", "-n", "-s", zsig, mit_user]
else:
zwrite_args = ["zwrite", "-n", "-s", zsig, "-c", stream, "-i", "test"]
server_failure = send_zephyr(zwrite_args, str(key))
if server_failure:
# Replace the key we're not sure was delivered with a new key
value = zhkeys.pop(key)
new_key = gen_key(zhkeys)
zhkeys[new_key] = value
server_failure_again = send_zephyr(zwrite_args, str(new_key))
if server_failure_again:
logging.error("Zephyr server failure twice in a row on keys %s and %s! Aborting." %
(key, new_key))
print_status_and_exit(1)
else:
logging.warning("Replaced key %s with %s due to Zephyr server failure." %
(key, new_key))
receive_zephyrs()
receive_zephyrs()
logger.info("Sent Zephyr messages!")
# Send Zulips
for key, (stream, test) in hzkeys.items():
if stream == "message":
send_zulip({
"type": "private",
"content": str(key),
"to": zulip_user,
})
else:
send_zulip({
"type": "stream",
"subject": "test",
"content": str(key),
"to": stream,
})
receive_zephyrs()
logger.info("Sent Zulip messages!")
# Normally messages manage to forward through in under 3 seconds, but
# sleep 10 to give a safe margin since the messages do need to do 2
# round trips. This alert is for correctness, not performance, and so
# we want it to reliably alert only when messages aren't being
# delivered at all.
time.sleep(10)
receive_zephyrs()
logger.info("Starting receiving messages!")
# receive zulips
res = zulip_client.get_events(queue_id=queue_id, last_event_id=last_event_id)
if 'error' in res.get('result'):
logging.error("Error subscribing to Zulips!")
logging.error(res['msg'])
print_status_and_exit(1)
messages = [event['message'] for event in res['events']]
logger.info("Finished receiving Zulip messages!")
receive_zephyrs()
logger.info("Finished receiving Zephyr messages!")
all_keys = set(zhkeys.keys() + hzkeys.keys())
def process_keys(content_list):
# Start by filtering out any keys that might have come from
# concurrent check-mirroring processes
content_keys = [key for key in content_list if key in all_keys]
key_counts = {}
for key in all_keys:
key_counts[key] = 0
for key in content_keys:
key_counts[key] += 1
z_missing = set(key for key in zhkeys.keys() if key_counts[key] == 0)
h_missing = set(key for key in hzkeys.keys() if key_counts[key] == 0)
duplicates = any(val > 1 for val in key_counts.values())
success = all(val == 1 for val in key_counts.values())
return key_counts, z_missing, h_missing, duplicates, success
# The h_foo variables are about the messages we _received_ in Zulip
# The z_foo variables are about the messages we _received_ in Zephyr
h_contents = [message["content"] for message in messages]
z_contents = [notice.message.split('\0')[1] for notice in notices]
(h_key_counts, h_missing_z, h_missing_h, h_duplicates, h_success) = process_keys(h_contents)
(z_key_counts, z_missing_z, z_missing_h, z_duplicates, z_success) = process_keys(z_contents)
if z_success and h_success:
logger.info("Success!")
print_status_and_exit(0)
elif z_success:
logger.info("Received everything correctly in Zephyr!")
elif h_success:
logger.info("Received everything correctly in Zulip!")
logger.error("Messages received the wrong number of times:")
for key in all_keys:
if z_key_counts[key] == 1 and h_key_counts[key] == 1:
continue
if key in zhkeys:
(stream, test) = zhkeys[key]
logger.warning("%10s: z got %s, h got %s. Sent via Zephyr(%s): class %s" % \
(key, z_key_counts[key], h_key_counts[key], test, stream))
if key in hzkeys:
(stream, test) = hzkeys[key]
logger.warning("%10s: z got %s. h got %s. Sent via Zulip(%s): class %s" % \
(key, z_key_counts[key], h_key_counts[key], test, stream))
logger.error("")
logger.error("Summary of specific problems:")
if h_duplicates:
logger.error("zulip: Received duplicate messages!")
logger.error("zulip: This is probably a bug in our message loop detection.")
logger.error("zulip: where Zulips go zulip=>zephyr=>zulip")
if z_duplicates:
logger.error("zephyr: Received duplicate messages!")
logger.error("zephyr: This is probably a bug in our message loop detection.")
logger.error("zephyr: where Zephyrs go zephyr=>zulip=>zephyr")
if z_missing_z:
logger.error("zephyr: Didn't receive all the Zephyrs we sent on the Zephyr end!")
logger.error("zephyr: This is probably an issue with check-mirroring sending or receiving Zephyrs.")
if h_missing_h:
logger.error("zulip: Didn't receive all the Zulips we sent on the Zulip end!")
logger.error("zulip: This is probably an issue with check-mirroring sending or receiving Zulips.")
if z_missing_h:
logger.error("zephyr: Didn't receive all the Zulips we sent on the Zephyr end!")
if z_missing_h == h_missing_h:
logger.error("zephyr: Including some Zulips that we did receive on the Zulip end.")
logger.error("zephyr: This suggests we have a zulip=>zephyr mirroring problem.")
logger.error("zephyr: aka the personals mirroring script has issues.")
if h_missing_z:
logger.error("zulip: Didn't receive all the Zephyrs we sent on the Zulip end!")
if h_missing_z == z_missing_z:
logger.error("zulip: Including some Zephyrs that we did receive on the Zephyr end.")
logger.error("zulip: This suggests we have a zephyr=>zulip mirroring problem.")
logger.error("zulip: aka the global class mirroring script has issues.")
zulip_client.deregister(queue_id)
print_status_and_exit(1)