1616except ImportError :
1717 import unittest # noqa
1818
19- from cassandra import OperationTimedOut
19+ from cassandra import OperationTimedOut , WriteTimeout
2020from cassandra .cluster import Cluster , ExecutionProfile
2121from cassandra .query import SimpleStatement
22- from cassandra .policies import ConstantSpeculativeExecutionPolicy , RoundRobinPolicy
22+ from cassandra .policies import ConstantSpeculativeExecutionPolicy , RoundRobinPolicy , RetryPolicy , WriteType
2323
2424from tests .integration import PROTOCOL_VERSION , greaterthancass21 , requiressimulacron , SIMULACRON_JAR
2525from tests .integration .simulacron .utils import start_and_prime_singledc , prime_query , \
@@ -139,8 +139,6 @@ def test_speculative_execution(self):
139139 result = self .session .execute (prepared_statement , ("0" ,), execution_profile = 'spec_ep_brr' )
140140 self .assertLess (1 , len (result .response_future .attempted_hosts ))
141141
142-
143-
144142 def test_speculative_and_timeout (self ):
145143 """
146144 Test to ensure the timeout is honored when using speculative execution
@@ -166,3 +164,69 @@ def test_speculative_and_timeout(self):
166164
167165 # This is because 14 / 4 + 1 = 4
168166 self .assertEqual (len (response_future .attempted_hosts ), 4 )
167+
168+
169+ class CustomRetryPolicy (RetryPolicy ):
170+ def on_write_timeout (self , query , consistency , write_type ,
171+ required_responses , received_responses , retry_num ):
172+ if retry_num != 0 :
173+ return self .RETHROW , None
174+ elif write_type == WriteType .SIMPLE :
175+ return self .RETHROW , None
176+ elif write_type == WriteType .CDC :
177+ return self .IGNORE , None
178+
179+
180+ @requiressimulacron
181+ class RetryPolicyTets (unittest .TestCase ):
182+ @classmethod
183+ def setUpClass (cls ):
184+ if SIMULACRON_JAR is None :
185+ return
186+ start_and_prime_singledc ()
187+
188+ cls .cluster = Cluster (protocol_version = PROTOCOL_VERSION , compression = False ,
189+ default_retry_policy = CustomRetryPolicy ())
190+ cls .session = cls .cluster .connect (wait_for_all_pools = True )
191+
192+ @classmethod
193+ def tearDownClass (cls ):
194+ if SIMULACRON_JAR is None :
195+ return
196+ cls .cluster .shutdown ()
197+ stop_simulacron ()
198+
199+ def tearDown (self ):
200+ clear_queries ()
201+
202+ def test_retry_policy_ignores_and_rethrows (self ):
203+ """
204+ Test to verify :class:`~cassandra.protocol.WriteTimeoutErrorMessage` is decoded correctly and that
205+ :attr:`.~cassandra.policies.RetryPolicy.RETHROW` and
206+ :attr:`.~cassandra.policies.RetryPolicy.IGNORE` are respected
207+ to localhost
208+
209+ @since 3.12
210+ @jira_ticket PYTHON-812
211+ @expected_result the retry policy functions as expected
212+
213+ @test_category connection
214+ """
215+ query_to_prime_simple = "SELECT * from simulacron_keyspace.simple"
216+ query_to_prime_cdc = "SELECT * from simulacron_keyspace.cdc"
217+ then = {
218+ "result" : "write_timeout" ,
219+ "delay_in_ms" : 0 ,
220+ "consistency_level" : "LOCAL_QUORUM" ,
221+ "received" : 1 ,
222+ "block_for" : 2 ,
223+ "write_type" : "SIMPLE"
224+ }
225+ prime_query (query_to_prime_simple , then = then )
226+ then ["write_type" ] = "CDC"
227+ prime_query (query_to_prime_cdc , then = then )
228+
229+ with self .assertRaises (WriteTimeout ):
230+ self .session .execute (query_to_prime_simple )
231+ #CDC should be ignored
232+ self .session .execute (query_to_prime_cdc )
0 commit comments