2020from cassandra .cluster import ExecutionProfile , EXEC_PROFILE_DEFAULT
2121from cassandra .concurrent import execute_concurrent , execute_concurrent_with_args , ExecutionResult
2222from cassandra .policies import HostDistance
23- from cassandra .query import tuple_factory , SimpleStatement
23+ from cassandra .query import dict_factory , tuple_factory , SimpleStatement
2424
2525from tests .integration import use_singledc , PROTOCOL_VERSION , TestCluster
2626
@@ -35,13 +35,16 @@ def setup_module():
3535 use_singledc ()
3636
3737
38+ EXEC_PROFILE_DICT = "dict"
39+
3840class ClusterTests (unittest .TestCase ):
3941
4042 @classmethod
4143 def setUpClass (cls ):
4244 cls .cluster = TestCluster (
4345 execution_profiles = {
44- EXEC_PROFILE_DEFAULT : ExecutionProfile (row_factory = tuple_factory )
46+ EXEC_PROFILE_DEFAULT : ExecutionProfile (row_factory = tuple_factory ),
47+ EXEC_PROFILE_DICT : ExecutionProfile (row_factory = dict_factory )
4548 }
4649 )
4750 if PROTOCOL_VERSION < 3 :
@@ -52,11 +55,11 @@ def setUpClass(cls):
5255 def tearDownClass (cls ):
5356 cls .cluster .shutdown ()
5457
55- def execute_concurrent_helper (self , session , query , results_generator = False ):
58+ def execute_concurrent_helper (self , session , query , ** kwargs ):
5659 count = 0
5760 while count < 100 :
5861 try :
59- return execute_concurrent (session , query , results_generator = False )
62+ return execute_concurrent (session , query , results_generator = False , ** kwargs )
6063 except (ReadTimeout , WriteTimeout , OperationTimedOut , ReadFailure , WriteFailure ):
6164 ex_type , ex , tb = sys .exc_info ()
6265 log .warning ("{0}: {1} Backtrace: {2}" .format (ex_type .__name__ , ex , traceback .extract_tb (tb )))
@@ -65,19 +68,19 @@ def execute_concurrent_helper(self, session, query, results_generator=False):
6568
6669 raise RuntimeError ("Failed to execute query after 100 attempts: {0}" .format (query ))
6770
68- def execute_concurrent_args_helper (self , session , query , params , results_generator = False ):
71+ def execute_concurrent_args_helper (self , session , query , params , results_generator = False , ** kwargs ):
6972 count = 0
7073 while count < 100 :
7174 try :
72- return execute_concurrent_with_args (session , query , params , results_generator = results_generator )
75+ return execute_concurrent_with_args (session , query , params , results_generator = results_generator , ** kwargs )
7376 except (ReadTimeout , WriteTimeout , OperationTimedOut , ReadFailure , WriteFailure ):
7477 ex_type , ex , tb = sys .exc_info ()
7578 log .warning ("{0}: {1} Backtrace: {2}" .format (ex_type .__name__ , ex , traceback .extract_tb (tb )))
7679 del tb
7780
7881 raise RuntimeError ("Failed to execute query after 100 attempts: {0}" .format (query ))
7982
80- def test_execute_concurrent (self ):
83+ def execute_concurrent_base (self , test_fn , validate_fn , zip_args = True ):
8184 for num_statements in (0 , 1 , 2 , 7 , 10 , 99 , 100 , 101 , 199 , 200 , 201 ):
8285 # write
8386 statement = SimpleStatement (
@@ -86,7 +89,9 @@ def test_execute_concurrent(self):
8689 statements = cycle ((statement , ))
8790 parameters = [(i , i ) for i in range (num_statements )]
8891
89- results = self .execute_concurrent_helper (self .session , list (zip (statements , parameters )))
92+ results = \
93+ test_fn (self .session , list (zip (statements , parameters ))) if zip_args else \
94+ test_fn (self .session , statement , parameters )
9095 self .assertEqual (num_statements , len (results ))
9196 for success , result in results :
9297 self .assertTrue (success )
@@ -99,32 +104,37 @@ def test_execute_concurrent(self):
99104 statements = cycle ((statement , ))
100105 parameters = [(i , ) for i in range (num_statements )]
101106
102- results = self .execute_concurrent_helper (self .session , list (zip (statements , parameters )))
107+ results = \
108+ test_fn (self .session , list (zip (statements , parameters ))) if zip_args else \
109+ test_fn (self .session , statement , parameters )
110+ validate_fn (num_statements , results )
111+
112+ def execute_concurrent_valiate_tuple (self , num_statements , results ):
103113 self .assertEqual (num_statements , len (results ))
104114 self .assertEqual ([(True , [(i ,)]) for i in range (num_statements )], results )
105115
106- def test_execute_concurrent_with_args (self ):
107- for num_statements in (0 , 1 , 2 , 7 , 10 , 99 , 100 , 101 , 199 , 200 , 201 ):
108- statement = SimpleStatement (
109- "INSERT INTO test3rf.test (k, v) VALUES (%s, %s)" ,
110- consistency_level = ConsistencyLevel .QUORUM )
111- parameters = [(i , i ) for i in range (num_statements )]
112-
113- results = self .execute_concurrent_args_helper (self .session , statement , parameters )
116+ def execute_concurrent_valiate_dict (self , num_statements , results ):
114117 self .assertEqual (num_statements , len (results ))
115- for success , result in results :
116- self .assertTrue (success )
117- self .assertFalse (result )
118+ self .assertEqual ([(True , [{"v" :i }]) for i in range (num_statements )], results )
118119
119- # read
120- statement = SimpleStatement (
121- "SELECT v FROM test3rf.test WHERE k=%s" ,
122- consistency_level = ConsistencyLevel .QUORUM )
123- parameters = [(i , ) for i in range (num_statements )]
120+ def test_execute_concurrent (self ):
121+ self .execute_concurrent_base (self .execute_concurrent_helper , \
122+ self .execute_concurrent_valiate_tuple )
124123
125- results = self .execute_concurrent_args_helper (self .session , statement , parameters )
126- self .assertEqual (num_statements , len (results ))
127- self .assertEqual ([(True , [(i ,)]) for i in range (num_statements )], results )
124+ def test_execute_concurrent_with_args (self ):
125+ self .execute_concurrent_base (self .execute_concurrent_args_helper , \
126+ self .execute_concurrent_valiate_tuple , \
127+ zip_args = False )
128+
129+ def test_execute_concurrent_with_execution_profile (self ):
130+ def run_fn (* args , ** kwargs ):
131+ return self .execute_concurrent_helper (* args , execution_profile = EXEC_PROFILE_DICT , ** kwargs )
132+ self .execute_concurrent_base (run_fn , self .execute_concurrent_valiate_dict )
133+
134+ def test_execute_concurrent_with_args_and_execution_profile (self ):
135+ def run_fn (* args , ** kwargs ):
136+ return self .execute_concurrent_args_helper (* args , execution_profile = EXEC_PROFILE_DICT , ** kwargs )
137+ self .execute_concurrent_base (run_fn , self .execute_concurrent_valiate_dict , zip_args = False )
128138
129139 def test_execute_concurrent_with_args_generator (self ):
130140 """
0 commit comments