Skip to content

Commit 5381488

Browse files
author
Greg Banks
committed
fix callback chaining benchmarking example
if there was an error in any query, callbacks_full_pipeline.py would hang because "self.event" (or "finished_event" in the docs) would never get set. this also fixes some minor pep8 issues.
1 parent d6a755b commit 5381488

2 files changed

Lines changed: 27 additions & 29 deletions

File tree

benchmarks/callback_full_pipeline.py

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,16 @@
1-
from itertools import count
21
import logging
2+
3+
from itertools import count
34
from threading import Event
45

56
from base import benchmark, BenchmarkThread
67

8+
79
log = logging.getLogger(__name__)
810

9-
initial = object()
11+
12+
sentinal = object()
13+
1014

1115
class Runner(BenchmarkThread):
1216

@@ -16,26 +20,22 @@ def __init__(self, *args, **kwargs):
1620
self.num_finished = count()
1721
self.event = Event()
1822

19-
def handle_error(self, exc):
20-
log.error("Error on insert: %r", exc)
21-
22-
def insert_next(self, previous_result):
23-
current_num = self.num_started.next()
24-
25-
if previous_result is not initial:
26-
num = next(self.num_finished)
27-
if num >= self.num_queries:
23+
def insert_next(self, previous_result=sentinal):
24+
if previous_result is not sentinal:
25+
if isinstance(previous_result, BaseException):
26+
log.error("Error on insert: %r", previous_result)
27+
if self.num_finished.next() >= self.num_queries:
2828
self.event.set()
2929

30-
if current_num <= self.num_queries:
30+
if self.num_started.next() <= self.num_queries:
3131
future = self.session.execute_async(self.query, self.values)
32-
future.add_callbacks(self.insert_next, self.handle_error)
32+
future.add_callbacks(self.insert_next, self.insert_next)
3333

3434
def run(self):
3535
self.start_profile()
3636

37-
for i in range(120):
38-
self.insert_next(initial)
37+
for _ in xrange(min(120, self.num_queries)):
38+
self.insert_next()
3939

4040
self.event.wait()
4141

docs/performance.rst

Lines changed: 12 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -182,28 +182,26 @@ flight at any time:
182182
from itertools import count
183183
from threading import Event
184184
185+
sentinal = object()
186+
num_queries = 100000
185187
num_started = count()
186188
num_finished = count()
187-
initial = object()
188189
finished_event = Event()
189190
190-
def handle_error(exc):
191-
log.error("Error on insert: %r", exc)
192-
193-
def insert_next(previous_result):
194-
current_num = num_started.next()
195-
196-
if previous_result is not initial:
197-
num = next(num_finished)
198-
if num >= 100000:
191+
def insert_next(previous_result=sentinal):
192+
if previous_result is not sentinal:
193+
if isinstance(previous_result, BaseException):
194+
log.error("Error on insert: %r", previous_result)
195+
if num_finished.next() >= num_queries:
199196
finished_event.set()
200197
201-
if current_num <= 100000:
198+
if num_started.next() <= num_queries:
202199
future = session.execute_async(query)
203-
future.add_callbacks(insert_next, handle_error)
200+
# NOTE: this callback also handles errors
201+
future.add_callbacks(insert_next, insert_next)
204202
205-
for i in range(120):
206-
insert_next(initial)
203+
for i in range(min(120, num_queries)):
204+
insert_next()
207205
208206
finished_event.wait()
209207

0 commit comments

Comments
 (0)