1919from threading import Thread
2020import time
2121from optparse import OptionParser
22+ import uuid
2223
2324from greplin import scales
2425
5960KEYSPACE = "testkeyspace" + str (int (time .time ()))
6061TABLE = "testtable"
6162
63+ COLUMN_VALUES = {
64+ 'int' : 42 ,
65+ 'text' : "'42'" ,
66+ 'float' : 42.0 ,
67+ 'uuid' : uuid .uuid4 (),
68+ 'timestamp' : "'2016-02-03 04:05+0000'"
69+ }
6270
63- def setup (hosts ):
71+
72+ def setup (options ):
6473 log .info ("Using 'cassandra' package from %s" , cassandra .__path__ )
6574
66- cluster = Cluster (hosts , protocol_version = 1 )
67- cluster .set_core_connections_per_host (HostDistance .LOCAL , 1 )
75+ cluster = Cluster (options .hosts , protocol_version = options .protocol_version )
6876 try :
6977 session = cluster .connect ()
7078
7179 log .debug ("Creating keyspace..." )
72- session .execute ("""
73- CREATE KEYSPACE %s
74- WITH replication = { 'class': 'SimpleStrategy', 'replication_factor': '2' }
75- """ % KEYSPACE )
80+ try :
81+ session .execute ("""
82+ CREATE KEYSPACE %s
83+ WITH replication = { 'class': 'SimpleStrategy', 'replication_factor': '2' }
84+ """ % options .keyspace )
85+
86+ log .debug ("Setting keyspace..." )
87+ except cassandra .AlreadyExists :
88+ log .debug ("Keyspace already exists" )
7689
77- log .debug ("Setting keyspace..." )
78- session .set_keyspace (KEYSPACE )
90+ session .set_keyspace (options .keyspace )
7991
8092 log .debug ("Creating table..." )
81- session . execute ( """
82- CREATE TABLE %s (
93+ create_table_query = """
94+ CREATE TABLE {} (
8395 thekey text,
84- col1 text,
85- col2 text,
86- PRIMARY KEY (thekey, col1)
87- )
88- """ % TABLE )
96+ """
97+ for i in range (options .num_columns ):
98+ create_table_query += "col{} {},\n " .format (i , options .column_type )
99+ create_table_query += "PRIMARY KEY (thekey))"
100+
101+ try :
102+ session .execute (create_table_query .format (TABLE ))
103+ except cassandra .AlreadyExists :
104+ log .debug ("Table already exists." )
105+
89106 finally :
90107 cluster .shutdown ()
91108
92109
93- def teardown (hosts ):
94- cluster = Cluster (hosts , protocol_version = 1 )
95- cluster .set_core_connections_per_host (HostDistance .LOCAL , 1 )
110+ def teardown (options ):
111+ cluster = Cluster (options .hosts , protocol_version = options .protocol_version )
96112 session = cluster .connect ()
97- session .execute ("DROP KEYSPACE " + KEYSPACE )
113+ if not options .keep_data :
114+ session .execute ("DROP KEYSPACE " + options .keyspace )
98115 cluster .shutdown ()
99116
100117
101118def benchmark (thread_class ):
102119 options , args = parse_options ()
103120 for conn_class in options .supported_reactors :
104- setup (options . hosts )
121+ setup (options )
105122 log .info ("==== %s ====" % (conn_class .__name__ ,))
106123
107124 kwargs = {'metrics_enabled' : options .enable_metrics ,
108125 'connection_class' : conn_class }
109126 if options .protocol_version :
110127 kwargs ['protocol_version' ] = options .protocol_version
111128 cluster = Cluster (options .hosts , ** kwargs )
112- session = cluster .connect (KEYSPACE )
129+ session = cluster .connect (options . keyspace )
113130
114131 log .debug ("Sleeping for two seconds..." )
115132 time .sleep (2.0 )
116133
117- query = session .prepare ("""
118- INSERT INTO {table} (thekey, col1, col2) VALUES (?, ?, ?)
119- """ .format (table = TABLE ))
120- values = ('key' , 'a' , 'b' )
121134
135+ # Generate the query
136+ if options .read :
137+ query = "SELECT * FROM {} WHERE thekey = '{{key}}'" .format (TABLE )
138+ else :
139+ query = "INSERT INTO {} (thekey" .format (TABLE )
140+ for i in range (options .num_columns ):
141+ query += ", col{}" .format (i )
142+
143+ query += ") VALUES ('{key}'"
144+ for i in range (options .num_columns ):
145+ query += ", {}" .format (COLUMN_VALUES [options .column_type ])
146+ query += ")"
147+
148+ values = None # we don't use that anymore. Keeping it in case we go back to prepared statements.
122149 per_thread = options .num_ops // options .threads
123150 threads = []
124151
125- log .debug ("Beginning inserts ..." )
152+ log .debug ("Beginning {} ..." . format ( 'reads' if options . read else 'inserts' ) )
126153 start = time .time ()
127154 try :
128155 for i in range (options .threads ):
@@ -142,7 +169,7 @@ def benchmark(thread_class):
142169 end = time .time ()
143170 finally :
144171 cluster .shutdown ()
145- teardown (options . hosts )
172+ teardown (options )
146173
147174 total = end - start
148175 log .info ("Total time: %0.2fs" % total )
@@ -190,8 +217,19 @@ def parse_options():
190217 help = 'logging level: debug, info, warning, or error' )
191218 parser .add_option ('-p' , '--profile' , action = 'store_true' , dest = 'profile' ,
192219 help = 'Profile the run' )
193- parser .add_option ('--protocol-version' , type = 'int' , dest = 'protocol_version' ,
220+ parser .add_option ('--protocol-version' , type = 'int' , dest = 'protocol_version' , default = 4 ,
194221 help = 'Native protocol version to use' )
222+ parser .add_option ('-c' , '--num-columns' , type = 'int' , dest = 'num_columns' , default = 2 ,
223+ help = 'Specify the number of columns for the schema' )
224+ parser .add_option ('-k' , '--keyspace' , type = 'str' , dest = 'keyspace' , default = KEYSPACE ,
225+ help = 'Specify the keyspace name for the schema' )
226+ parser .add_option ('--keep-data' , action = 'store_true' , dest = 'keep_data' , default = False ,
227+ help = 'Keep the data after the benchmark' )
228+ parser .add_option ('--column-type' , type = 'str' , dest = 'column_type' , default = 'text' ,
229+ help = 'Specify the column type for the schema (supported: int, text, float, uuid, timestamp)' )
230+ parser .add_option ('--read' , action = 'store_true' , dest = 'read' , default = False ,
231+ help = 'Read mode' )
232+
195233
196234 options , args = parser .parse_args ()
197235
@@ -235,6 +273,9 @@ def start_profile(self):
235273 if self .profiler :
236274 self .profiler .enable ()
237275
276+ def run_query (self , key , ** kwargs ):
277+ return self .session .execute_async (self .query .format (key = key ), ** kwargs )
278+
238279 def finish_profile (self ):
239280 if self .profiler :
240281 self .profiler .disable ()
0 commit comments