Skip to content

Commit 6e42fd7

Browse files
committed
Add multithreading support to benchmarks
1 parent d6e36ab commit 6e42fd7

11 files changed

Lines changed: 234 additions & 134 deletions

benchmarks/base.py

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,10 @@
1111

1212
from cassandra.cluster import Cluster
1313
from cassandra.io.asyncorereactor import AsyncoreConnection
14+
from cassandra.policies import HostDistance
1415
from cassandra.query import SimpleStatement
1516

1617
log = logging.getLogger()
17-
log.setLevel('INFO')
1818
handler = logging.StreamHandler()
1919
handler.setFormatter(logging.Formatter("%(asctime)s [%(levelname)s] %(name)s: %(message)s"))
2020
log.addHandler(handler)
@@ -31,9 +31,10 @@
3131
KEYSPACE = "testkeyspace"
3232
TABLE = "testtable"
3333

34-
def setup():
34+
def setup(hosts):
3535

36-
cluster = Cluster(['127.0.0.1'])
36+
cluster = Cluster(hosts)
37+
cluster.set_core_connections_per_host(HostDistance.LOCAL, 1)
3738
session = cluster.connect()
3839

3940
rows = session.execute("SELECT keyspace_name FROM system.schema_keyspaces")
@@ -60,16 +61,17 @@ def setup():
6061
)
6162
""" % TABLE)
6263

63-
def teardown():
64-
cluster = Cluster(['127.0.0.1'])
64+
def teardown(hosts):
65+
cluster = Cluster(hosts)
66+
cluster.set_core_connections_per_host(HostDistance.LOCAL, 1)
6567
session = cluster.connect()
6668
session.execute("DROP KEYSPACE " + KEYSPACE)
6769

6870

6971
def benchmark(run_fn):
7072
options, args = parse_options()
7173
for conn_class in options.supported_reactors:
72-
setup()
74+
setup(options.hosts)
7375
log.info("==== %s ====" % (conn_class.__name__,))
7476

7577
cluster = Cluster(options.hosts, metrics_enabled=options.enable_metrics)
@@ -88,10 +90,10 @@ def benchmark(run_fn):
8890
log.debug("Beginning inserts...")
8991
start = time.time()
9092
try:
91-
run_fn(session, query, values, options.num_ops)
93+
run_fn(session, query, values, options.num_ops, options.threads)
9294
end = time.time()
9395
finally:
94-
teardown()
96+
teardown(options.hosts)
9597

9698
total = end - start
9799
log.info("Total time: %0.2fs" % total)
@@ -133,10 +135,14 @@ def parse_options():
133135
help='only benchmark with libev connections')
134136
parser.add_option('-m', '--metrics', action='store_true', dest='enable_metrics',
135137
help='enable and print metrics for operations')
138+
parser.add_option('-l', '--log-level', default='info',
139+
help='logging level: debug, info, warning, or error')
136140
options, args = parser.parse_args()
137141

138142
options.hosts = options.hosts.split(',')
139143

144+
log.setLevel(options.log_level.upper())
145+
140146
if options.libev_only:
141147
if not have_libev:
142148
log.error("libev is not available")
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
from base import benchmark
2+
3+
import logging
4+
from itertools import count
5+
from threading import Event, Thread
6+
7+
log = logging.getLogger(__name__)
8+
9+
initial = object()
10+
11+
class Runner(Thread):
12+
13+
def __init__(self, session, query, values, num_queries, *args, **kwargs):
14+
self.session = session
15+
self.query = query
16+
self.values = values
17+
self.num_queries = num_queries
18+
self.num_started = count()
19+
self.num_finished = count()
20+
self.event = Event()
21+
Thread.__init__(self)
22+
23+
def handle_error(self, exc):
24+
log.error("Error on insert: %r", exc)
25+
26+
def insert_next(self, previous_result):
27+
current_num = self.num_started.next()
28+
29+
if previous_result is not initial:
30+
num = next(self.num_finished)
31+
if num >= self.num_queries:
32+
self.event.set()
33+
34+
if current_num <= self.num_queries:
35+
future = self.session.execute_async(self.query, self.values)
36+
future.add_callbacks(self.insert_next, self.handle_error)
37+
38+
def run(self):
39+
for i in range(120):
40+
self.insert_next(initial)
41+
42+
self.event.wait()
43+
44+
def execute(session, query, values, num_queries, num_threads):
45+
46+
per_thread = num_queries / num_threads
47+
threads = []
48+
for i in range(num_threads):
49+
thread = Runner(session, query, values, per_thread)
50+
thread.daemon = True
51+
threads.append(thread)
52+
53+
for thread in threads:
54+
thread.start()
55+
56+
for thread in threads:
57+
while thread.is_alive():
58+
thread.join(timeout=0.5)
59+
60+
61+
if __name__ == "__main__":
62+
benchmark(execute)

benchmarks/future_batches.py

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
import logging
2+
import Queue
3+
from threading import Thread
4+
5+
from base import benchmark
6+
7+
log = logging.getLogger(__name__)
8+
9+
def execute(session, query, values, num_queries, num_threads):
10+
11+
per_thread = num_queries / num_threads
12+
13+
def run():
14+
futures = Queue.Queue(maxsize=121)
15+
16+
for i in range(per_thread):
17+
if i > 0 and i % 120 == 0:
18+
# clear the existing queue
19+
while True:
20+
try:
21+
futures.get_nowait().result()
22+
except Queue.Empty:
23+
break
24+
25+
future = session.execute_async(query, values)
26+
futures.put_nowait(future)
27+
28+
while True:
29+
try:
30+
futures.get_nowait().result()
31+
except Queue.Empty:
32+
break
33+
34+
threads = []
35+
for i in range(num_threads):
36+
thread = Thread(target=run)
37+
thread.daemon = True
38+
threads.append(thread)
39+
40+
for thread in threads:
41+
thread.start()
42+
43+
for thread in threads:
44+
while thread.is_alive():
45+
thread.join(timeout=0.5)
46+
47+
48+
if __name__ == "__main__":
49+
benchmark(execute)

benchmarks/future_full_pipeline.py

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
import logging
2+
import Queue
3+
from threading import Thread
4+
5+
from base import benchmark
6+
7+
log = logging.getLogger(__name__)
8+
9+
def execute(session, query, values, num_queries, num_threads):
10+
11+
per_thread = num_queries / num_threads
12+
13+
def run():
14+
futures = Queue.Queue(maxsize=121)
15+
16+
for i in range(per_thread):
17+
if i >= 120:
18+
old_future = futures.get_nowait()
19+
old_future.result()
20+
21+
future = session.execute_async(query, values)
22+
futures.put_nowait(future)
23+
24+
while True:
25+
try:
26+
futures.get_nowait().result()
27+
except Queue.Empty:
28+
break
29+
30+
threads = []
31+
for i in range(num_threads):
32+
thread = Thread(target=run)
33+
thread.daemon = True
34+
threads.append(thread)
35+
36+
for thread in threads:
37+
thread.start()
38+
39+
for thread in threads:
40+
while thread.is_alive():
41+
thread.join(timeout=0.5)
42+
43+
44+
if __name__ == "__main__":
45+
benchmark(execute)

benchmarks/future_full_throttle.py

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
import logging
2+
from threading import Thread
3+
4+
from base import benchmark
5+
6+
log = logging.getLogger(__name__)
7+
8+
def execute(session, query, values, num_queries, num_threads):
9+
10+
per_thread = num_queries / num_threads
11+
12+
def run():
13+
futures = []
14+
15+
for i in range(per_thread):
16+
future = session.execute_async(query, values)
17+
futures.append(future)
18+
19+
for future in futures:
20+
future.result()
21+
22+
threads = []
23+
for i in range(num_threads):
24+
thread = Thread(target=run)
25+
thread.daemon = True
26+
threads.append(thread)
27+
28+
for thread in threads:
29+
thread.start()
30+
31+
for thread in threads:
32+
while thread.is_alive():
33+
thread.join(timeout=0.5)
34+
35+
36+
if __name__ == "__main__":
37+
benchmark(execute)

benchmarks/single_thread_callback_full_pipeline.py

Lines changed: 0 additions & 38 deletions
This file was deleted.

benchmarks/single_thread_future_batches.py

Lines changed: 0 additions & 32 deletions
This file was deleted.

benchmarks/single_thread_future_full_pipeline.py

Lines changed: 0 additions & 28 deletions
This file was deleted.

benchmarks/single_thread_future_full_throttle.py

Lines changed: 0 additions & 20 deletions
This file was deleted.

0 commit comments

Comments
 (0)