2525
2626from cassandra import OperationTimedOut
2727from cassandra .cluster import (EXEC_PROFILE_DEFAULT , Cluster , ExecutionProfile ,
28- _Scheduler )
28+ _Scheduler , NoHostAvailable )
2929from cassandra .policies import HostStateListener , RoundRobinPolicy
3030from tests .integration import (CASSANDRA_VERSION , PROTOCOL_VERSION ,
3131 requiressimulacron )
3232from tests .integration .util import assert_quiescent_pool_state
33+ from tests .integration .simulacron import SimulacronBase
3334from tests .integration .simulacron .utils import (NO_THEN , PrimeOptions ,
3435 prime_query , prime_request ,
3536 start_and_prime_cluster_defaults ,
3637 start_and_prime_singledc ,
37- stop_simulacron )
38+ clear_queries )
3839
3940
4041class TrackDownListener (HostStateListener ):
41- hosts_marked_down = []
42+ def __init__ (self ):
43+ self .hosts_marked_down = []
4244
4345 def on_down (self , host ):
4446 self .hosts_marked_down .append (host )
@@ -50,8 +52,21 @@ def submit(self, fn, *args, **kwargs):
5052 self .called_functions .append (fn .__name__ )
5153 return super (ThreadTracker , self ).submit (fn , * args , ** kwargs )
5254
55+
56+ class OrderedRoundRobinPolicy (RoundRobinPolicy ):
57+
58+ def make_query_plan (self , working_keyspace = None , query = None ):
59+ self ._position += 1
60+
61+ hosts = []
62+ for _ in range (10 ):
63+ hosts .extend (sorted (self ._live_hosts , key = lambda x : x .address ))
64+
65+ return hosts
66+
67+
5368@requiressimulacron
54- class ConnectionTest ( unittest . TestCase ):
69+ class ConnectionTests ( SimulacronBase ):
5570
5671 def test_heart_beat_timeout (self ):
5772 """
@@ -64,24 +79,23 @@ def test_heart_beat_timeout(self):
6479 @test_category metadata
6580 """
6681 number_of_dcs = 3
67- nodes_per_dc = 100
82+ nodes_per_dc = 20
6883
6984 query_to_prime = "INSERT INTO test3rf.test (k, v) VALUES (0, 1);"
7085
7186 idle_heartbeat_timeout = 5
7287 idle_heartbeat_interval = 1
7388
7489 start_and_prime_cluster_defaults (number_of_dcs , nodes_per_dc , CASSANDRA_VERSION )
75- self .addCleanup (stop_simulacron )
7690
7791 listener = TrackDownListener ()
78- executor = ThreadTracker (max_workers = 16 )
92+ executor = ThreadTracker (max_workers = 8 )
7993
8094 # We need to disable compression since it's not supported in simulacron
8195 cluster = Cluster (compression = False ,
8296 idle_heartbeat_interval = idle_heartbeat_interval ,
8397 idle_heartbeat_timeout = idle_heartbeat_timeout ,
84- executor_threads = 16 ,
98+ executor_threads = 8 ,
8599 execution_profiles = {
86100 EXEC_PROFILE_DEFAULT : ExecutionProfile (load_balancing_policy = RoundRobinPolicy ())})
87101 self .addCleanup (cluster .shutdown )
@@ -112,7 +126,7 @@ def test_heart_beat_timeout(self):
112126
113127 # We allow from some extra time for all the hosts to be to on_down
114128 # The callbacks should start happening after idle_heartbeat_timeout + idle_heartbeat_interval
115- time .sleep ((idle_heartbeat_timeout + idle_heartbeat_interval ) * 2 )
129+ time .sleep ((idle_heartbeat_timeout + idle_heartbeat_interval ) * 2.5 )
116130
117131 for host in cluster .metadata .all_hosts ():
118132 self .assertIn (host , listener .hosts_marked_down )
@@ -133,7 +147,6 @@ def test_callbacks_and_pool_when_oto(self):
133147 @test_category metadata
134148 """
135149 start_and_prime_singledc ()
136- self .addCleanup (stop_simulacron )
137150
138151 cluster = Cluster (protocol_version = PROTOCOL_VERSION , compression = False )
139152 session = cluster .connect ()
@@ -155,3 +168,163 @@ def test_callbacks_and_pool_when_oto(self):
155168 # PYTHON-630 -- only the errback should be called
156169 errback .assert_called_once ()
157170 callback .assert_not_called ()
171+
172+ def test_close_when_query (self ):
173+ """
174+ Test to ensure the driver behaves correctly if the connection is closed
175+ just when querying
176+ @since 3.12
177+ @expected_result NoHostAvailable is risen
178+
179+ @test_category connection
180+ """
181+ start_and_prime_singledc ()
182+
183+ cluster = Cluster (protocol_version = PROTOCOL_VERSION , compression = False )
184+ session = cluster .connect ()
185+ self .addCleanup (cluster .shutdown )
186+
187+ query_to_prime = "SELECT * from testkesypace.testtable"
188+
189+ for close_type in ("disconnect" , "shutdown_read" , "shutdown_write" ):
190+ then = {
191+ "result" : "close_connection" ,
192+ "delay_in_ms" : 0 ,
193+ "close_type" : close_type ,
194+ "scope" : "connection"
195+ }
196+
197+ prime_query (query_to_prime , then = then )
198+ self .assertRaises (NoHostAvailable , session .execute , query_to_prime )
199+
200+ def test_retry_after_defunct (self ):
201+ """
202+ We test cluster._retry is called if an the connection is defunct
203+ in the middle of a query
204+
205+ Finally we verify the driver recovers correctly in the event
206+ of a network partition
207+
208+ @since 3.12
209+ @expected_result the driver is able to query even if a host is marked
210+ as down in the middle of the query, it will go to the next one if the timeout
211+ hasn't expired
212+
213+ @test_category connection
214+ """
215+ number_of_dcs = 3
216+ nodes_per_dc = 2
217+
218+ query_to_prime = "INSERT INTO test3rf.test (k, v) VALUES (0, 1);"
219+
220+ idle_heartbeat_timeout = 1
221+ idle_heartbeat_interval = 5
222+
223+ simulacron_cluster = start_and_prime_cluster_defaults (number_of_dcs , nodes_per_dc , CASSANDRA_VERSION )
224+
225+ dc_ids = sorted (simulacron_cluster .data_center_ids )
226+ last_host = dc_ids .pop ()
227+ prime_query (query_to_prime ,
228+ cluster_name = "{}/{}" .format (simulacron_cluster .cluster_name , last_host ))
229+
230+ roundrobin_lbp = OrderedRoundRobinPolicy ()
231+ cluster = Cluster (compression = False ,
232+ idle_heartbeat_interval = idle_heartbeat_interval ,
233+ idle_heartbeat_timeout = idle_heartbeat_timeout ,
234+ execution_profiles = {
235+ EXEC_PROFILE_DEFAULT : ExecutionProfile (load_balancing_policy = roundrobin_lbp )})
236+
237+ session = cluster .connect (wait_for_all_pools = True )
238+ self .addCleanup (cluster .shutdown )
239+
240+ # This simulates we only have access to one DC
241+ for dc_id in dc_ids :
242+ datacenter_path = "{}/{}" .format (simulacron_cluster .cluster_name , dc_id )
243+ prime_query (query_to_prime , then = NO_THEN , cluster_name = datacenter_path )
244+ prime_request (PrimeOptions (then = NO_THEN , cluster_name = datacenter_path ))
245+
246+ # Only the last datacenter will respond, therefore the first host won't
247+ # We want to make sure the returned hosts are 127.0.0.1, 127.0.0.2, ... 127.0.0.8
248+ roundrobin_lbp ._position = 0
249+
250+ # After 3 + 1 seconds the connection should be marked and down and another host retried
251+ response_future = session .execute_async (query_to_prime , timeout = 4 * idle_heartbeat_interval
252+ + idle_heartbeat_timeout )
253+ response_future .result ()
254+ self .assertGreater (len (response_future .attempted_hosts ), 1 )
255+
256+ # No error should be raised here since the hosts have been marked
257+ # as down and there's still 1 DC available
258+ for _ in range (10 ):
259+ session .execute (query_to_prime )
260+
261+ # Might take some time to close the previous connections and reconnect
262+ time .sleep (10 )
263+ assert_quiescent_pool_state (self , cluster )
264+ clear_queries ()
265+
266+ time .sleep (10 )
267+ assert_quiescent_pool_state (self , cluster )
268+
269+ def test_idle_connection_is_not_closed (self ):
270+ """
271+ Test to ensure that the connections aren't closed if they are idle
272+ @since 3.12
273+ @jira_ticket PYTHON-573
274+ @expected_result the connections aren't closed nor the hosts are
275+ set to down if the connection is idle
276+
277+ @test_category connection
278+ """
279+ start_and_prime_singledc ()
280+
281+ idle_heartbeat_timeout = 1
282+ idle_heartbeat_interval = 1
283+
284+ listener = TrackDownListener ()
285+ cluster = Cluster (compression = False ,
286+ idle_heartbeat_interval = idle_heartbeat_interval ,
287+ idle_heartbeat_timeout = idle_heartbeat_timeout )
288+ session = cluster .connect (wait_for_all_pools = True )
289+ cluster .register_listener (listener )
290+
291+ self .addCleanup (cluster .shutdown )
292+
293+ time .sleep (20 )
294+
295+ self .assertEqual (listener .hosts_marked_down , [])
296+
297+ def test_host_is_not_set_to_down_after_query_oto (self ):
298+ """
299+ Test to ensure that the connections aren't closed if there's an
300+ OperationTimedOut in a normal query. This should only happen from the
301+ heart beat thread (in the case of a OperationTimedOut) with the default
302+ configuration
303+ @since 3.12
304+ @expected_result the connections aren't closed nor the hosts are
305+ set to down
306+
307+ @test_category connection
308+ """
309+ start_and_prime_singledc ()
310+
311+ query_to_prime = "SELECT * FROM madeup_keyspace.madeup_table"
312+
313+ prime_query (query_to_prime , then = NO_THEN )
314+
315+ listener = TrackDownListener ()
316+ cluster = Cluster (compression = False )
317+ session = cluster .connect (wait_for_all_pools = True )
318+ cluster .register_listener (listener )
319+
320+ futures = []
321+ for _ in range (10 ):
322+ future = session .execute_async (query_to_prime )
323+ futures .append (future )
324+
325+ for f in futures :
326+ f ._event .wait ()
327+ self .assertIsInstance (f ._final_exception , OperationTimedOut )
328+
329+ self .assertEqual (listener .hosts_marked_down , [])
330+ assert_quiescent_pool_state (self , cluster )
0 commit comments