Skip to content

Commit 574db48

Browse files
committed
Fix potential race conditions in p2p testing framework
Previously, each NodeConnCB had its own lock to synchronize data structures used by the testing thread and the networking thread, and NodeConn provided a separate additional lock for synchronizing access to each send buffer. This commit replaces those locks with a single global lock (mininode_lock) that we use to synchronize access to all data structures shared by the two threads. Updates comptool and maxblocksinflight to use the new synchronization semantics, eliminating previous race conditions within comptool, and re-enables invalidblockrequest.py in travis.
1 parent 5487975 commit 574db48

File tree

4 files changed

+75
-63
lines changed

4 files changed

+75
-63
lines changed

qa/pull-tester/rpc-tests.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ testScripts=(
3131
'merkle_blocks.py'
3232
# 'forknotify.py'
3333
'maxblocksinflight.py'
34-
# 'invalidblockrequest.py'
34+
'invalidblockrequest.py'
3535
);
3636
if [ "x${ENABLE_BITCOIND}${ENABLE_UTILS}${ENABLE_WALLET}" = "x111" ]; then
3737
for (( i = 0; i < ${#testScripts[@]}; i++ ))

qa/rpc-tests/comptool.py

Lines changed: 47 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@
2525
# on_getheaders: provide headers via BlockStore
2626
# on_getdata: provide blocks via BlockStore
2727

28+
global mininode_lock
29+
2830
class TestNode(NodeConnCB):
2931

3032
def __init__(self, block_store, tx_store):
@@ -148,10 +150,11 @@ def wait_for_verack(self):
148150
max_tries = 10 / sleep_time # Wait at most 10 seconds
149151
while max_tries > 0:
150152
done = True
151-
for c in self.connections:
152-
if c.cb.verack_received is False:
153-
done = False
154-
break
153+
with mininode_lock:
154+
for c in self.connections:
155+
if c.cb.verack_received is False:
156+
done = False
157+
break
155158
if done:
156159
break
157160
time.sleep(sleep_time)
@@ -161,10 +164,11 @@ def wait_for_pings(self, counter):
161164
while received_pongs is not True:
162165
time.sleep(0.05)
163166
received_pongs = True
164-
for c in self.connections:
165-
if c.cb.received_ping_response(counter) is not True:
166-
received_pongs = False
167-
break
167+
with mininode_lock:
168+
for c in self.connections:
169+
if c.cb.received_ping_response(counter) is not True:
170+
received_pongs = False
171+
break
168172

169173
# sync_blocks: Wait for all connections to request the blockhash given
170174
# then send get_headers to find out the tip of each node, and synchronize
@@ -173,8 +177,9 @@ def sync_blocks(self, blockhash, num_blocks):
173177
# Wait for nodes to request block (50ms sleep * 20 tries * num_blocks)
174178
max_tries = 20*num_blocks
175179
while max_tries > 0:
176-
results = [ blockhash in c.cb.block_request_map and
177-
c.cb.block_request_map[blockhash] for c in self.connections ]
180+
with mininode_lock:
181+
results = [ blockhash in c.cb.block_request_map and
182+
c.cb.block_request_map[blockhash] for c in self.connections ]
178183
if False not in results:
179184
break
180185
time.sleep(0.05)
@@ -199,8 +204,9 @@ def sync_transaction(self, txhash, num_events):
199204
# Wait for nodes to request transaction (50ms sleep * 20 tries * num_events)
200205
max_tries = 20*num_events
201206
while max_tries > 0:
202-
results = [ txhash in c.cb.tx_request_map and
203-
c.cb.tx_request_map[txhash] for c in self.connections ]
207+
with mininode_lock:
208+
results = [ txhash in c.cb.tx_request_map and
209+
c.cb.tx_request_map[txhash] for c in self.connections ]
204210
if False not in results:
205211
break
206212
time.sleep(0.05)
@@ -221,19 +227,21 @@ def sync_transaction(self, txhash, num_events):
221227
self.ping_counter += 1
222228

223229
# Sort inv responses from each node
224-
[ c.cb.lastInv.sort() for c in self.connections ]
230+
with mininode_lock:
231+
[ c.cb.lastInv.sort() for c in self.connections ]
225232

226233
# Verify that the tip of each connection all agree with each other, and
227234
# with the expected outcome (if given)
228235
def check_results(self, blockhash, outcome):
229-
for c in self.connections:
230-
if outcome is None:
231-
if c.cb.bestblockhash != self.connections[0].cb.bestblockhash:
236+
with mininode_lock:
237+
for c in self.connections:
238+
if outcome is None:
239+
if c.cb.bestblockhash != self.connections[0].cb.bestblockhash:
240+
return False
241+
elif ((c.cb.bestblockhash == blockhash) != outcome):
242+
# print c.cb.bestblockhash, blockhash, outcome
232243
return False
233-
elif ((c.cb.bestblockhash == blockhash) != outcome):
234-
# print c.cb.bestblockhash, blockhash, outcome
235-
return False
236-
return True
244+
return True
237245

238246
# Either check that the mempools all agree with each other, or that
239247
# txhash's presence in the mempool matches the outcome specified.
@@ -242,16 +250,17 @@ def check_results(self, blockhash, outcome):
242250
# perhaps it would be useful to add the ability to check explicitly that
243251
# a particular tx's existence in the mempool is the same across all nodes.
244252
def check_mempool(self, txhash, outcome):
245-
for c in self.connections:
246-
if outcome is None:
247-
# Make sure the mempools agree with each other
248-
if c.cb.lastInv != self.connections[0].cb.lastInv:
249-
# print c.rpc.getrawmempool()
253+
with mininode_lock:
254+
for c in self.connections:
255+
if outcome is None:
256+
# Make sure the mempools agree with each other
257+
if c.cb.lastInv != self.connections[0].cb.lastInv:
258+
# print c.rpc.getrawmempool()
259+
return False
260+
elif ((txhash in c.cb.lastInv) != outcome):
261+
# print c.rpc.getrawmempool(), c.cb.lastInv
250262
return False
251-
elif ((txhash in c.cb.lastInv) != outcome):
252-
# print c.rpc.getrawmempool(), c.cb.lastInv
253-
return False
254-
return True
263+
return True
255264

256265
def run(self):
257266
# Wait until verack is received
@@ -272,9 +281,10 @@ def run(self):
272281
block = b_or_t
273282
block_outcome = outcome
274283
# Add to shared block_store, set as current block
275-
self.block_store.add_block(block)
276-
for c in self.connections:
277-
c.cb.block_request_map[block.sha256] = False
284+
with mininode_lock:
285+
self.block_store.add_block(block)
286+
for c in self.connections:
287+
c.cb.block_request_map[block.sha256] = False
278288
# Either send inv's to each node and sync, or add
279289
# to invqueue for later inv'ing.
280290
if (test_instance.sync_every_block):
@@ -288,10 +298,11 @@ def run(self):
288298
assert(isinstance(b_or_t, CTransaction))
289299
tx = b_or_t
290300
tx_outcome = outcome
291-
# Add to shared tx store
292-
self.tx_store.add_transaction(tx)
293-
for c in self.connections:
294-
c.cb.tx_request_map[tx.sha256] = False
301+
# Add to shared tx store and clear map entry
302+
with mininode_lock:
303+
self.tx_store.add_transaction(tx)
304+
for c in self.connections:
305+
c.cb.tx_request_map[tx.sha256] = False
295306
# Again, either inv to all nodes or save for later
296307
if (test_instance.sync_every_tx):
297308
[ c.cb.send_inv(tx) for c in self.connections ]

qa/rpc-tests/maxblocksinflight.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -61,10 +61,11 @@ def run(self):
6161
time.sleep(2)
6262

6363
total_requests = 0
64-
for key in self.blockReqCounts:
65-
total_requests += self.blockReqCounts[key]
66-
if self.blockReqCounts[key] > 1:
67-
raise AssertionError("Error, test failed: block %064x requested more than once" % key)
64+
with mininode_lock:
65+
for key in self.blockReqCounts:
66+
total_requests += self.blockReqCounts[key]
67+
if self.blockReqCounts[key] > 1:
68+
raise AssertionError("Error, test failed: block %064x requested more than once" % key)
6869
if total_requests > MAX_REQUESTS:
6970
raise AssertionError("Error, too many blocks (%d) requested" % total_requests)
7071
print "Round %d: success (total requests: %d)" % (count, total_requests)

qa/rpc-tests/mininode.py

Lines changed: 22 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
import random
2727
import cStringIO
2828
import hashlib
29-
from threading import Lock
29+
from threading import RLock
3030
from threading import Thread
3131
import logging
3232
import copy
@@ -42,6 +42,14 @@
4242
# using select)
4343
mininode_socket_map = dict()
4444

45+
# One lock for synchronizing all data access between the networking thread (see
46+
# NetworkThread below) and the thread running the test logic. For simplicity,
47+
# NodeConn acquires this lock whenever delivering a message to to a NodeConnCB,
48+
# and whenever adding anything to the send buffer (in send_message()). This
49+
# lock should be acquired in the thread running the test logic to synchronize
50+
# access to any data shared with the NodeConnCB or NodeConn.
51+
mininode_lock = RLock()
52+
4553
# Serialization/deserialization tools
4654
def sha256(s):
4755
return hashlib.new('sha256', s).digest()
@@ -980,10 +988,6 @@ def __repr__(self):
980988
# Reimplement the on_* functions to provide handling for events
981989
class NodeConnCB(object):
982990
def __init__(self):
983-
# Acquire on all callbacks -- overkill for now since asyncore is
984-
# single-threaded, but may be useful for synchronizing access to
985-
# member variables in derived classes.
986-
self.cbLock = Lock()
987991
self.verack_received = False
988992

989993
# Derived classes should call this function once to set the message map
@@ -1009,7 +1013,7 @@ def create_callback_map(self):
10091013
}
10101014

10111015
def deliver(self, conn, message):
1012-
with self.cbLock:
1016+
with mininode_lock:
10131017
try:
10141018
self.cbmap[message.command](conn, message)
10151019
except:
@@ -1094,7 +1098,6 @@ def __init__(self, dstaddr, dstport, rpc, callback, net="regtest"):
10941098
self.state = "connecting"
10951099
self.network = net
10961100
self.cb = callback
1097-
self.sendbufLock = Lock() # for protecting the sendbuffer
10981101
self.disconnect = False
10991102

11001103
# stuff version msg into sendbuf
@@ -1145,20 +1148,18 @@ def readable(self):
11451148
return True
11461149

11471150
def writable(self):
1148-
self.sendbufLock.acquire()
1149-
length = len(self.sendbuf)
1150-
self.sendbufLock.release()
1151+
with mininode_lock:
1152+
length = len(self.sendbuf)
11511153
return (length > 0)
11521154

11531155
def handle_write(self):
1154-
self.sendbufLock.acquire()
1155-
try:
1156-
sent = self.send(self.sendbuf)
1157-
except:
1158-
self.handle_close()
1159-
return
1160-
self.sendbuf = self.sendbuf[sent:]
1161-
self.sendbufLock.release()
1156+
with mininode_lock:
1157+
try:
1158+
sent = self.send(self.sendbuf)
1159+
except:
1160+
self.handle_close()
1161+
return
1162+
self.sendbuf = self.sendbuf[sent:]
11621163

11631164
def got_data(self):
11641165
while True:
@@ -1202,7 +1203,6 @@ def got_data(self):
12021203
def send_message(self, message, pushbuf=False):
12031204
if self.state != "connected" and not pushbuf:
12041205
return
1205-
self.sendbufLock.acquire()
12061206
self.show_debug_msg("Send %s" % repr(message))
12071207
command = message.command
12081208
data = message.serialize()
@@ -1215,9 +1215,9 @@ def send_message(self, message, pushbuf=False):
12151215
h = sha256(th)
12161216
tmsg += h[:4]
12171217
tmsg += data
1218-
self.sendbuf += tmsg
1219-
self.last_sent = time.time()
1220-
self.sendbufLock.release()
1218+
with mininode_lock:
1219+
self.sendbuf += tmsg
1220+
self.last_sent = time.time()
12211221

12221222
def got_message(self, message):
12231223
if message.command == "version":

0 commit comments

Comments
 (0)