Skip to content

Commit ec70a54

Browse files
committed
Added tests for ignoring and rethrowing in RetryPolicy
1 parent 59e06fb commit ec70a54

1 file changed

Lines changed: 68 additions & 4 deletions

File tree

tests/integration/simulacron/test_policies.py

Lines changed: 68 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,10 @@
1616
except ImportError:
1717
import unittest # noqa
1818

19-
from cassandra import OperationTimedOut
19+
from cassandra import OperationTimedOut, WriteTimeout
2020
from cassandra.cluster import Cluster, ExecutionProfile
2121
from cassandra.query import SimpleStatement
22-
from cassandra.policies import ConstantSpeculativeExecutionPolicy, RoundRobinPolicy
22+
from cassandra.policies import ConstantSpeculativeExecutionPolicy, RoundRobinPolicy, RetryPolicy, WriteType
2323

2424
from tests.integration import PROTOCOL_VERSION, greaterthancass21, requiressimulacron, SIMULACRON_JAR
2525
from tests.integration.simulacron.utils import start_and_prime_singledc, prime_query, \
@@ -139,8 +139,6 @@ def test_speculative_execution(self):
139139
result = self.session.execute(prepared_statement, ("0",), execution_profile='spec_ep_brr')
140140
self.assertLess(1, len(result.response_future.attempted_hosts))
141141

142-
143-
144142
def test_speculative_and_timeout(self):
145143
"""
146144
Test to ensure the timeout is honored when using speculative execution
@@ -166,3 +164,69 @@ def test_speculative_and_timeout(self):
166164

167165
# This is because 14 / 4 + 1 = 4
168166
self.assertEqual(len(response_future.attempted_hosts), 4)
167+
168+
169+
class CustomRetryPolicy(RetryPolicy):
170+
def on_write_timeout(self, query, consistency, write_type,
171+
required_responses, received_responses, retry_num):
172+
if retry_num != 0:
173+
return self.RETHROW, None
174+
elif write_type == WriteType.SIMPLE:
175+
return self.RETHROW, None
176+
elif write_type == WriteType.CDC:
177+
return self.IGNORE, None
178+
179+
180+
@requiressimulacron
181+
class RetryPolicyTets(unittest.TestCase):
182+
@classmethod
183+
def setUpClass(cls):
184+
if SIMULACRON_JAR is None:
185+
return
186+
start_and_prime_singledc()
187+
188+
cls.cluster = Cluster(protocol_version=PROTOCOL_VERSION, compression=False,
189+
default_retry_policy=CustomRetryPolicy())
190+
cls.session = cls.cluster.connect(wait_for_all_pools=True)
191+
192+
@classmethod
193+
def tearDownClass(cls):
194+
if SIMULACRON_JAR is None:
195+
return
196+
cls.cluster.shutdown()
197+
stop_simulacron()
198+
199+
def tearDown(self):
200+
clear_queries()
201+
202+
def test_retry_policy_ignores_and_rethrows(self):
203+
"""
204+
Test to verify :class:`~cassandra.protocol.WriteTimeoutErrorMessage` is decoded correctly and that
205+
:attr:`.~cassandra.policies.RetryPolicy.RETHROW` and
206+
:attr:`.~cassandra.policies.RetryPolicy.IGNORE` are respected
207+
to localhost
208+
209+
@since 3.12
210+
@jira_ticket PYTHON-812
211+
@expected_result the retry policy functions as expected
212+
213+
@test_category connection
214+
"""
215+
query_to_prime_simple = "SELECT * from simulacron_keyspace.simple"
216+
query_to_prime_cdc = "SELECT * from simulacron_keyspace.cdc"
217+
then = {
218+
"result": "write_timeout",
219+
"delay_in_ms": 0,
220+
"consistency_level": "LOCAL_QUORUM",
221+
"received": 1,
222+
"block_for": 2,
223+
"write_type": "SIMPLE"
224+
}
225+
prime_query(query_to_prime_simple, then=then)
226+
then["write_type"] = "CDC"
227+
prime_query(query_to_prime_cdc, then=then)
228+
229+
with self.assertRaises(WriteTimeout):
230+
self.session.execute(query_to_prime_simple)
231+
#CDC should be ignored
232+
self.session.execute(query_to_prime_cdc)

0 commit comments

Comments
 (0)