Skip to content

Commit 8ac3fbb

Browse files
authored
Merge pull request apache#891 from datastax/python-836-2
Alternative solution for PYTHON-836
2 parents 27b8b9b + 054a183 commit 8ac3fbb

File tree

4 files changed

+54
-57
lines changed

4 files changed

+54
-57
lines changed

CHANGELOG.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ Features
3636
* Include hash of result set metadata in prepared stmt id (PYTHON-808)
3737
* Add NO_COMPACT startup option (PYTHON-839)
3838
* Add new exception type for CDC (PYTHON-837)
39+
* Allow 0ms in ConstantSpeculativeExecutionPolicy (PYTHON-836)
3940

4041
Bug Fixes
4142
---------

cassandra/cluster.py

Lines changed: 12 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -3527,6 +3527,18 @@ def _on_timeout(self):
35273527
def _on_speculative_execute(self):
35283528
self._timer = None
35293529
if not self._event.is_set():
3530+
3531+
# PYTHON-836, the speculative queries must be after
3532+
# the query is sent from the main thread, otherwise the
3533+
# query from the main thread may raise NoHostAvailable
3534+
# if the _query_plan has been exhausted by the specualtive queries.
3535+
# This also prevents a race condition accessing the iterator.
3536+
# We reschedule this call until the main thread has succeeded
3537+
# making a query
3538+
if not self.attempted_hosts:
3539+
self._timer = self.session.cluster.connection_class.create_timer(0.01, self._on_speculative_execute)
3540+
return
3541+
35303542
if self._time_remaining is not None:
35313543
if self._time_remaining <= 0:
35323544
self._on_timeout()
@@ -3540,10 +3552,6 @@ def _make_query_plan(self):
35403552
# calls to send_request (which retries may do) will resume where
35413553
# they last left off
35423554
self.query_plan = iter(self._load_balancer.make_query_plan(self.session.keyspace, self.query))
3543-
# Make iterator thread safe when there can be a speculative delay since it could
3544-
# from different threads
3545-
if isinstance(self._spec_execution_plan, NoSpeculativeExecutionPlan):
3546-
self.query_plan = _threadsafe_iter(self.query_plan)
35473555

35483556
def send_request(self, error_no_hosts=True):
35493557
""" Internal """
@@ -4316,16 +4324,3 @@ def paging_state(self):
43164324
avoid sending this to untrusted parties.
43174325
"""
43184326
return self.response_future._paging_state
4319-
4320-
4321-
class _threadsafe_iter(six.Iterator):
4322-
def __init__(self, it):
4323-
self.it = it
4324-
self.lock = Lock()
4325-
4326-
def __iter__(self):
4327-
return self
4328-
4329-
def __next__(self):
4330-
with self.lock:
4331-
return next(self.it)

tests/integration/simulacron/test_policies.py

Lines changed: 40 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
import unittest # noqa
1818

1919
from cassandra import OperationTimedOut, WriteTimeout
20-
from cassandra.cluster import Cluster, ExecutionProfile
20+
from cassandra.cluster import Cluster, ExecutionProfile, ResponseFuture
2121
from cassandra.query import SimpleStatement
2222
from cassandra.policies import ConstantSpeculativeExecutionPolicy, RoundRobinPolicy, RetryPolicy, WriteType
2323

@@ -168,6 +168,45 @@ def test_speculative_and_timeout(self):
168168
# This is because 14 / 4 + 1 = 4
169169
self.assertEqual(len(response_future.attempted_hosts), 4)
170170

171+
def test_delay_can_be_0(self):
172+
"""
173+
Test to validate that the delay can be zero for the ConstantSpeculativeExecutionPolicy
174+
@since 3.13
175+
@jira_ticket PYTHON-836
176+
@expected_result all the queries are executed immediately
177+
@test_category policy
178+
"""
179+
query_to_prime = "INSERT INTO madeup_keyspace.madeup_table(k, v) VALUES (1, 2)"
180+
prime_query(query_to_prime, then={"delay_in_ms": 5000})
181+
number_of_requests = 4
182+
spec = ExecutionProfile(load_balancing_policy=RoundRobinPolicy(),
183+
speculative_execution_policy=ConstantSpeculativeExecutionPolicy(0, number_of_requests))
184+
185+
cluster = Cluster()
186+
cluster.add_execution_profile("spec", spec)
187+
session = cluster.connect(wait_for_all_pools=True)
188+
self.addCleanup(cluster.shutdown)
189+
190+
counter = count()
191+
192+
def patch_and_count(f):
193+
def patched(*args, **kwargs):
194+
next(counter)
195+
print("patched")
196+
f(*args, **kwargs)
197+
return patched
198+
199+
self.addCleanup(setattr, ResponseFuture, "send_request", ResponseFuture.send_request)
200+
ResponseFuture.send_request = patch_and_count(ResponseFuture.send_request)
201+
stmt = SimpleStatement(query_to_prime)
202+
stmt.is_idempotent = True
203+
results = session.execute(stmt, execution_profile="spec")
204+
self.assertEqual(len(results.response_future.attempted_hosts), 3)
205+
206+
# send_request is called number_of_requests times for the speculative request
207+
# plus one for the call from the main thread.
208+
self.assertEqual(next(counter), number_of_requests + 1)
209+
171210

172211
class CustomRetryPolicy(RetryPolicy):
173212
def on_write_timeout(self, query, consistency, write_type,
@@ -193,7 +232,6 @@ def on_read_timeout(self, query, consistency, required_responses,
193232

194233
def on_write_timeout(self, query, consistency, write_type,
195234
required_responses, received_responses, retry_num):
196-
print("counter on_write_timeout")
197235
next(self.write_timeout)
198236
return self.IGNORE, None
199237

tests/integration/standard/test_policies.py

Lines changed: 1 addition & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,11 @@
1919

2020
from cassandra.cluster import Cluster, ExecutionProfile, ResponseFuture
2121
from cassandra.policies import HostFilterPolicy, RoundRobinPolicy, SimpleConvictionPolicy, \
22-
WhiteListRoundRobinPolicy, ConstantSpeculativeExecutionPolicy
22+
WhiteListRoundRobinPolicy
2323
from cassandra.pool import Host
24-
from cassandra.query import SimpleStatement
2524

2625
from tests.integration import PROTOCOL_VERSION, local, use_singledc
27-
from tests import notwindows
2826

29-
from itertools import count
3027
from concurrent.futures import wait as wait_futures
3128

3229
def setup_module():
@@ -92,37 +89,3 @@ def test_only_connects_to_subset(self):
9289
queried_hosts.update(response.response_future.attempted_hosts)
9390
queried_hosts = set(host.address for host in queried_hosts)
9491
self.assertEqual(queried_hosts, only_connect_hosts)
95-
96-
97-
class SpeculativeExecutionPolicy(unittest.TestCase):
98-
@notwindows
99-
def test_delay_can_be_0(self):
100-
"""
101-
Test to validate that the delay can be zero for the ConstantSpeculativeExecutionPolicy
102-
@since 3.13
103-
@jira_ticket PYTHON-836
104-
@expected_result all the queries are executed immediately
105-
@test_category policy
106-
"""
107-
number_of_requests = 4
108-
spec = ExecutionProfile(speculative_execution_policy=ConstantSpeculativeExecutionPolicy(0, number_of_requests))
109-
110-
cluster = Cluster()
111-
cluster.add_execution_profile("spec", spec)
112-
session = cluster.connect(wait_for_all_pools=True)
113-
self.addCleanup(cluster.shutdown)
114-
115-
counter = count()
116-
117-
def patch_and_count(f):
118-
def patched(*args, **kwargs):
119-
next(counter)
120-
f(*args, **kwargs)
121-
return patched
122-
123-
ResponseFuture._on_speculative_execute = patch_and_count(ResponseFuture._on_speculative_execute)
124-
stmt = SimpleStatement("INSERT INTO test3rf.test(k, v) VALUES (1, 2)")
125-
stmt.is_idempotent = True
126-
results = session.execute(stmt, execution_profile="spec")
127-
self.assertEqual(len(results.response_future.attempted_hosts), 3)
128-
self.assertEqual(next(counter), number_of_requests)

0 commit comments

Comments
 (0)