Skip to content

Commit 0f14135

Browse files
committed
Merge pull request apache#566 from datastax/benchmarks-read
Benchmarks read
2 parents 5254f73 + 86a793f commit 0f14135

5 files changed

Lines changed: 81 additions & 35 deletions

File tree

benchmarks/base.py

Lines changed: 70 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
from threading import Thread
2020
import time
2121
from optparse import OptionParser
22+
import uuid
2223

2324
from greplin import scales
2425

@@ -59,70 +60,96 @@
5960
KEYSPACE = "testkeyspace" + str(int(time.time()))
6061
TABLE = "testtable"
6162

63+
COLUMN_VALUES = {
64+
'int': 42,
65+
'text': "'42'",
66+
'float': 42.0,
67+
'uuid': uuid.uuid4(),
68+
'timestamp': "'2016-02-03 04:05+0000'"
69+
}
6270

63-
def setup(hosts):
71+
72+
def setup(options):
6473
log.info("Using 'cassandra' package from %s", cassandra.__path__)
6574

66-
cluster = Cluster(hosts, protocol_version=1)
67-
cluster.set_core_connections_per_host(HostDistance.LOCAL, 1)
75+
cluster = Cluster(options.hosts, protocol_version=options.protocol_version)
6876
try:
6977
session = cluster.connect()
7078

7179
log.debug("Creating keyspace...")
72-
session.execute("""
73-
CREATE KEYSPACE %s
74-
WITH replication = { 'class': 'SimpleStrategy', 'replication_factor': '2' }
75-
""" % KEYSPACE)
80+
try:
81+
session.execute("""
82+
CREATE KEYSPACE %s
83+
WITH replication = { 'class': 'SimpleStrategy', 'replication_factor': '2' }
84+
""" % options.keyspace)
85+
86+
log.debug("Setting keyspace...")
87+
except cassandra.AlreadyExists:
88+
log.debug("Keyspace already exists")
7689

77-
log.debug("Setting keyspace...")
78-
session.set_keyspace(KEYSPACE)
90+
session.set_keyspace(options.keyspace)
7991

8092
log.debug("Creating table...")
81-
session.execute("""
82-
CREATE TABLE %s (
93+
create_table_query = """
94+
CREATE TABLE {} (
8395
thekey text,
84-
col1 text,
85-
col2 text,
86-
PRIMARY KEY (thekey, col1)
87-
)
88-
""" % TABLE)
96+
"""
97+
for i in range(options.num_columns):
98+
create_table_query += "col{} {},\n".format(i, options.column_type)
99+
create_table_query += "PRIMARY KEY (thekey))"
100+
101+
try:
102+
session.execute(create_table_query.format(TABLE))
103+
except cassandra.AlreadyExists:
104+
log.debug("Table already exists.")
105+
89106
finally:
90107
cluster.shutdown()
91108

92109

93-
def teardown(hosts):
94-
cluster = Cluster(hosts, protocol_version=1)
95-
cluster.set_core_connections_per_host(HostDistance.LOCAL, 1)
110+
def teardown(options):
111+
cluster = Cluster(options.hosts, protocol_version=options.protocol_version)
96112
session = cluster.connect()
97-
session.execute("DROP KEYSPACE " + KEYSPACE)
113+
if not options.keep_data:
114+
session.execute("DROP KEYSPACE " + options.keyspace)
98115
cluster.shutdown()
99116

100117

101118
def benchmark(thread_class):
102119
options, args = parse_options()
103120
for conn_class in options.supported_reactors:
104-
setup(options.hosts)
121+
setup(options)
105122
log.info("==== %s ====" % (conn_class.__name__,))
106123

107124
kwargs = {'metrics_enabled': options.enable_metrics,
108125
'connection_class': conn_class}
109126
if options.protocol_version:
110127
kwargs['protocol_version'] = options.protocol_version
111128
cluster = Cluster(options.hosts, **kwargs)
112-
session = cluster.connect(KEYSPACE)
129+
session = cluster.connect(options.keyspace)
113130

114131
log.debug("Sleeping for two seconds...")
115132
time.sleep(2.0)
116133

117-
query = session.prepare("""
118-
INSERT INTO {table} (thekey, col1, col2) VALUES (?, ?, ?)
119-
""".format(table=TABLE))
120-
values = ('key', 'a', 'b')
121134

135+
# Generate the query
136+
if options.read:
137+
query = "SELECT * FROM {} WHERE thekey = '{{key}}'".format(TABLE)
138+
else:
139+
query = "INSERT INTO {} (thekey".format(TABLE)
140+
for i in range(options.num_columns):
141+
query += ", col{}".format(i)
142+
143+
query += ") VALUES ('{key}'"
144+
for i in range(options.num_columns):
145+
query += ", {}".format(COLUMN_VALUES[options.column_type])
146+
query += ")"
147+
148+
values = None # we don't use that anymore. Keeping it in case we go back to prepared statements.
122149
per_thread = options.num_ops // options.threads
123150
threads = []
124151

125-
log.debug("Beginning inserts...")
152+
log.debug("Beginning {}...".format('reads' if options.read else 'inserts'))
126153
start = time.time()
127154
try:
128155
for i in range(options.threads):
@@ -142,7 +169,7 @@ def benchmark(thread_class):
142169
end = time.time()
143170
finally:
144171
cluster.shutdown()
145-
teardown(options.hosts)
172+
teardown(options)
146173

147174
total = end - start
148175
log.info("Total time: %0.2fs" % total)
@@ -190,8 +217,19 @@ def parse_options():
190217
help='logging level: debug, info, warning, or error')
191218
parser.add_option('-p', '--profile', action='store_true', dest='profile',
192219
help='Profile the run')
193-
parser.add_option('--protocol-version', type='int', dest='protocol_version',
220+
parser.add_option('--protocol-version', type='int', dest='protocol_version', default=4,
194221
help='Native protocol version to use')
222+
parser.add_option('-c', '--num-columns', type='int', dest='num_columns', default=2,
223+
help='Specify the number of columns for the schema')
224+
parser.add_option('-k', '--keyspace', type='str', dest='keyspace', default=KEYSPACE,
225+
help='Specify the keyspace name for the schema')
226+
parser.add_option('--keep-data', action='store_true', dest='keep_data', default=False,
227+
help='Keep the data after the benchmark')
228+
parser.add_option('--column-type', type='str', dest='column_type', default='text',
229+
help='Specify the column type for the schema (supported: int, text, float, uuid, timestamp)')
230+
parser.add_option('--read', action='store_true', dest='read', default=False,
231+
help='Read mode')
232+
195233

196234
options, args = parser.parse_args()
197235

@@ -235,6 +273,9 @@ def start_profile(self):
235273
if self.profiler:
236274
self.profiler.enable()
237275

276+
def run_query(self, key, **kwargs):
277+
return self.session.execute_async(self.query.format(key=key), **kwargs)
278+
238279
def finish_profile(self):
239280
if self.profiler:
240281
self.profiler.disable()

benchmarks/callback_full_pipeline.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,10 @@ def insert_next(self, previous_result=sentinel):
4141
if next(self.num_finished) >= self.num_queries:
4242
self.event.set()
4343

44-
if next(self.num_started) <= self.num_queries:
45-
future = self.session.execute_async(self.query, self.values, timeout=None)
44+
i = next(self.num_started)
45+
if i <= self.num_queries:
46+
key = "{}-{}".format(self.thread_num, i)
47+
future = self.run_query(key, timeout=None)
4648
future.add_callbacks(self.insert_next, self.insert_next)
4749

4850
def run(self):

benchmarks/future_batches.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,8 @@ def run(self):
3535
except queue.Empty:
3636
break
3737

38-
future = self.session.execute_async(self.query, self.values)
38+
key = "{}-{}".format(self.thread_num, i)
39+
future = self.run_query(key)
3940
futures.put_nowait(future)
4041

4142
while True:

benchmarks/future_full_pipeline.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,8 @@ def run(self):
3131
old_future = futures.get_nowait()
3232
old_future.result()
3333

34-
future = self.session.execute_async(self.query, self.values)
34+
key = "{}-{}".format(self.thread_num, i)
35+
future = self.run_query(key)
3536
futures.put_nowait(future)
3637

3738
while True:

benchmarks/future_full_throttle.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,9 @@ def run(self):
2525

2626
self.start_profile()
2727

28-
for _ in range(self.num_queries):
29-
future = self.session.execute_async(self.query, self.values)
28+
for i in range(self.num_queries):
29+
key = "{}-{}".format(self.thread_num, i)
30+
future = self.run_query(key)
3031
futures.append(future)
3132

3233
for future in futures:

0 commit comments

Comments
 (0)