Skip to content

Commit f9d69f3

Browse files
committed
PYTHON-1196: Add test to verify we can handle TCP backpressure
1 parent a6a66ca commit f9d69f3

4 files changed

Lines changed: 208 additions & 1 deletion

File tree

Jenkinsfile

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,7 @@ def executeEventLoopTests() {
184184
"tests/integration/standard/test_connection.py"
185185
"tests/integration/standard/test_control_connection.py"
186186
"tests/integration/standard/test_metrics.py"
187+
"tests/integration/simulacron/test_backpressure.py"
187188
"tests/integration/standard/test_query.py"
188189
"tests/integration/simulacron/test_endpoint.py"
189190
"tests/integration/long/test_ssl.py"
Lines changed: 179 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,179 @@
1+
# Copyright DataStax, Inc.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
import time
15+
16+
from cassandra import OperationTimedOut
17+
from cassandra.cluster import Cluster, ExecutionProfile, EXEC_PROFILE_DEFAULT, NoHostAvailable
18+
from cassandra.policies import RoundRobinPolicy, WhiteListRoundRobinPolicy
19+
from tests.integration import requiressimulacron, libevtest
20+
from tests.integration.simulacron import SimulacronBase, PROTOCOL_VERSION
21+
from tests.integration.simulacron.utils import ResumeReads, PauseReads, prime_request, start_and_prime_singledc
22+
23+
24+
@requiressimulacron
25+
@libevtest
26+
class TCPBackpressureTests(SimulacronBase):
27+
def setUp(self):
28+
self.callback_successes = 0
29+
self.callback_errors = 0
30+
31+
def callback_success(self, results):
32+
self.callback_successes += 1
33+
34+
def callback_error(self, results):
35+
self.callback_errors += 1
36+
37+
def _fill_buffers(self, session, query, expected_blocked=3, **execute_kwargs):
38+
futures = []
39+
buffer = '1' * 50000
40+
for _ in range(100000):
41+
future = session.execute_async(query, [buffer], **execute_kwargs)
42+
futures.append(future)
43+
44+
total_blocked = 0
45+
for pool in session.get_pools():
46+
if not pool._connection._socket_writable:
47+
total_blocked += 1
48+
if total_blocked == expected_blocked:
49+
break
50+
else:
51+
raise Exception("Unable to fill TCP send buffer on expected number of nodes")
52+
return futures
53+
54+
def test_paused_connections(self):
55+
""" Verify all requests come back as expected if node resumes within query timeout """
56+
start_and_prime_singledc()
57+
profile = ExecutionProfile(request_timeout=500, load_balancing_policy=RoundRobinPolicy())
58+
cluster = Cluster(
59+
protocol_version=PROTOCOL_VERSION,
60+
compression=False,
61+
execution_profiles={EXEC_PROFILE_DEFAULT: profile},
62+
)
63+
session = cluster.connect(wait_for_all_pools=True)
64+
self.addCleanup(cluster.shutdown)
65+
66+
query = session.prepare("INSERT INTO table1 (id) VALUES (?)")
67+
68+
prime_request(PauseReads())
69+
futures = self._fill_buffers(session, query)
70+
71+
# Make sure we actually have some stuck in-flight requests
72+
for in_flight in [pool._connection.in_flight for pool in session.get_pools()]:
73+
self.assertGreater(in_flight, 100)
74+
time.sleep(.5)
75+
for in_flight in [pool._connection.in_flight for pool in session.get_pools()]:
76+
self.assertGreater(in_flight, 100)
77+
78+
prime_request(ResumeReads())
79+
80+
for future in futures:
81+
try:
82+
future.result()
83+
except NoHostAvailable as e:
84+
# We shouldn't have any timeouts here, but all of the queries beyond what can fit
85+
# in the tcp buffer will have returned with a ConnectionBusy exception
86+
self.assertIn("ConnectionBusy", str(e))
87+
88+
# Verify that we can continue sending queries without any problems
89+
for host in session.cluster.metadata.all_hosts():
90+
session.execute(query, ["a"], host=host)
91+
92+
def test_queued_requests_timeout(self):
93+
""" Verify that queued requests timeout as expected """
94+
start_and_prime_singledc()
95+
profile = ExecutionProfile(request_timeout=.1, load_balancing_policy=RoundRobinPolicy())
96+
cluster = Cluster(
97+
protocol_version=PROTOCOL_VERSION,
98+
compression=False,
99+
execution_profiles={EXEC_PROFILE_DEFAULT: profile},
100+
)
101+
session = cluster.connect(wait_for_all_pools=True)
102+
self.addCleanup(cluster.shutdown)
103+
104+
query = session.prepare("INSERT INTO table1 (id) VALUES (?)")
105+
106+
prime_request(PauseReads())
107+
108+
futures = []
109+
for i in range(1000):
110+
future = session.execute_async(query, [str(i)])
111+
future.add_callbacks(callback=self.callback_success, errback=self.callback_error)
112+
futures.append(future)
113+
114+
successes = 0
115+
for future in futures:
116+
try:
117+
future.result()
118+
successes += 1
119+
except OperationTimedOut:
120+
pass
121+
122+
# Simulacron will respond to a couple queries before cutting off reads, so we'll just verify
123+
# that only "a few" successes happened here
124+
self.assertLess(successes, 50)
125+
self.assertLess(self.callback_successes, 50)
126+
self.assertEqual(self.callback_errors, len(futures) - self.callback_successes)
127+
128+
def test_cluster_busy(self):
129+
""" Verify that once TCP buffer is full we get busy exceptions rather than timeouts """
130+
start_and_prime_singledc()
131+
profile = ExecutionProfile(load_balancing_policy=RoundRobinPolicy())
132+
cluster = Cluster(
133+
protocol_version=PROTOCOL_VERSION,
134+
compression=False,
135+
execution_profiles={EXEC_PROFILE_DEFAULT: profile},
136+
)
137+
session = cluster.connect(wait_for_all_pools=True)
138+
self.addCleanup(cluster.shutdown)
139+
140+
query = session.prepare("INSERT INTO table1 (id) VALUES (?)")
141+
142+
prime_request(PauseReads())
143+
144+
# These requests will get stuck in the TCP buffer and we have no choice but to let them time out
145+
self._fill_buffers(session, query, expected_blocked=3)
146+
147+
# Now that our send buffer is completely full, verify we immediately get busy exceptions rather than timing out
148+
for i in range(1000):
149+
with self.assertRaises(NoHostAvailable) as e:
150+
session.execute(query, [str(i)])
151+
self.assertIn("ConnectionBusy", str(e.exception))
152+
153+
def test_node_busy(self):
154+
""" Verify that once TCP buffer is full, queries continue to get re-routed to other nodes """
155+
start_and_prime_singledc()
156+
profile = ExecutionProfile(load_balancing_policy=RoundRobinPolicy())
157+
cluster = Cluster(
158+
protocol_version=PROTOCOL_VERSION,
159+
compression=False,
160+
execution_profiles={EXEC_PROFILE_DEFAULT: profile},
161+
)
162+
session = cluster.connect(wait_for_all_pools=True)
163+
self.addCleanup(cluster.shutdown)
164+
165+
query = session.prepare("INSERT INTO table1 (id) VALUES (?)")
166+
167+
prime_request(PauseReads(dc_id=0, node_id=0))
168+
169+
blocked_profile = ExecutionProfile(load_balancing_policy=WhiteListRoundRobinPolicy(["127.0.0.1"]))
170+
cluster.add_execution_profile('blocked_profile', blocked_profile)
171+
172+
# Fill our blocked node's tcp buffer until we get a busy exception
173+
self._fill_buffers(session, query, expected_blocked=1, execution_profile='blocked_profile')
174+
175+
# Now that our send buffer is completely full on one node,
176+
# verify queries get re-routed to other nodes and queries complete successfully
177+
for i in range(1000):
178+
session.execute(query, [str(i)])
179+

tests/integration/simulacron/test_connection.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@
3939
start_and_prime_cluster_defaults,
4040
start_and_prime_singledc,
4141
clear_queries, RejectConnections,
42-
RejectType, AcceptConnections)
42+
RejectType, AcceptConnections, PauseReads, ResumeReads)
4343

4444

4545
class TrackDownListener(HostStateListener):

tests/integration/simulacron/utils.py

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -338,6 +338,33 @@ def method(self):
338338
return "DELETE"
339339

340340

341+
class _PauseOrResumeReads(SimulacronRequest):
342+
def __init__(self, cluster_name=DEFAULT_CLUSTER, dc_id=None, node_id=None):
343+
self.path = "pause-reads/{}".format(cluster_name)
344+
if dc_id is not None:
345+
self.path += "/{}".format(dc_id)
346+
if node_id is not None:
347+
self.path += "/{}".format(node_id)
348+
elif node_id:
349+
raise Exception("Can't set node_id without dc_id")
350+
351+
@property
352+
def method(self):
353+
raise NotImplementedError()
354+
355+
356+
class PauseReads(_PauseOrResumeReads):
357+
@property
358+
def method(self):
359+
return "PUT"
360+
361+
362+
class ResumeReads(_PauseOrResumeReads):
363+
@property
364+
def method(self):
365+
return "DELETE"
366+
367+
341368
def prime_driver_defaults():
342369
"""
343370
Function to prime the necessary queries so the test harness can run

0 commit comments

Comments
 (0)