|
| 1 | +# Copyright DataStax, Inc. |
| 2 | +# |
| 3 | +# Licensed under the Apache License, Version 2.0 (the "License"); |
| 4 | +# you may not use this file except in compliance with the License. |
| 5 | +# You may obtain a copy of the License at |
| 6 | +# |
| 7 | +# http://www.apache.org/licenses/LICENSE-2.0 |
| 8 | +# |
| 9 | +# Unless required by applicable law or agreed to in writing, software |
| 10 | +# distributed under the License is distributed on an "AS IS" BASIS, |
| 11 | +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 12 | +# See the License for the specific language governing permissions and |
| 13 | +# limitations under the License. |
| 14 | +import time |
| 15 | + |
| 16 | +from cassandra import OperationTimedOut |
| 17 | +from cassandra.cluster import Cluster, ExecutionProfile, EXEC_PROFILE_DEFAULT, NoHostAvailable |
| 18 | +from cassandra.policies import RoundRobinPolicy, WhiteListRoundRobinPolicy |
| 19 | +from tests.integration import requiressimulacron, libevtest |
| 20 | +from tests.integration.simulacron import SimulacronBase, PROTOCOL_VERSION |
| 21 | +from tests.integration.simulacron.utils import ResumeReads, PauseReads, prime_request, start_and_prime_singledc |
| 22 | + |
| 23 | + |
| 24 | +@requiressimulacron |
| 25 | +@libevtest |
| 26 | +class TCPBackpressureTests(SimulacronBase): |
| 27 | + def setUp(self): |
| 28 | + self.callback_successes = 0 |
| 29 | + self.callback_errors = 0 |
| 30 | + |
| 31 | + def callback_success(self, results): |
| 32 | + self.callback_successes += 1 |
| 33 | + |
| 34 | + def callback_error(self, results): |
| 35 | + self.callback_errors += 1 |
| 36 | + |
| 37 | + def _fill_buffers(self, session, query, expected_blocked=3, **execute_kwargs): |
| 38 | + futures = [] |
| 39 | + buffer = '1' * 50000 |
| 40 | + for _ in range(100000): |
| 41 | + future = session.execute_async(query, [buffer], **execute_kwargs) |
| 42 | + futures.append(future) |
| 43 | + |
| 44 | + total_blocked = 0 |
| 45 | + for pool in session.get_pools(): |
| 46 | + if not pool._connection._socket_writable: |
| 47 | + total_blocked += 1 |
| 48 | + if total_blocked == expected_blocked: |
| 49 | + break |
| 50 | + else: |
| 51 | + raise Exception("Unable to fill TCP send buffer on expected number of nodes") |
| 52 | + return futures |
| 53 | + |
| 54 | + def test_paused_connections(self): |
| 55 | + """ Verify all requests come back as expected if node resumes within query timeout """ |
| 56 | + start_and_prime_singledc() |
| 57 | + profile = ExecutionProfile(request_timeout=500, load_balancing_policy=RoundRobinPolicy()) |
| 58 | + cluster = Cluster( |
| 59 | + protocol_version=PROTOCOL_VERSION, |
| 60 | + compression=False, |
| 61 | + execution_profiles={EXEC_PROFILE_DEFAULT: profile}, |
| 62 | + ) |
| 63 | + session = cluster.connect(wait_for_all_pools=True) |
| 64 | + self.addCleanup(cluster.shutdown) |
| 65 | + |
| 66 | + query = session.prepare("INSERT INTO table1 (id) VALUES (?)") |
| 67 | + |
| 68 | + prime_request(PauseReads()) |
| 69 | + futures = self._fill_buffers(session, query) |
| 70 | + |
| 71 | + # Make sure we actually have some stuck in-flight requests |
| 72 | + for in_flight in [pool._connection.in_flight for pool in session.get_pools()]: |
| 73 | + self.assertGreater(in_flight, 100) |
| 74 | + time.sleep(.5) |
| 75 | + for in_flight in [pool._connection.in_flight for pool in session.get_pools()]: |
| 76 | + self.assertGreater(in_flight, 100) |
| 77 | + |
| 78 | + prime_request(ResumeReads()) |
| 79 | + |
| 80 | + for future in futures: |
| 81 | + try: |
| 82 | + future.result() |
| 83 | + except NoHostAvailable as e: |
| 84 | + # We shouldn't have any timeouts here, but all of the queries beyond what can fit |
| 85 | + # in the tcp buffer will have returned with a ConnectionBusy exception |
| 86 | + self.assertIn("ConnectionBusy", str(e)) |
| 87 | + |
| 88 | + # Verify that we can continue sending queries without any problems |
| 89 | + for host in session.cluster.metadata.all_hosts(): |
| 90 | + session.execute(query, ["a"], host=host) |
| 91 | + |
| 92 | + def test_queued_requests_timeout(self): |
| 93 | + """ Verify that queued requests timeout as expected """ |
| 94 | + start_and_prime_singledc() |
| 95 | + profile = ExecutionProfile(request_timeout=.1, load_balancing_policy=RoundRobinPolicy()) |
| 96 | + cluster = Cluster( |
| 97 | + protocol_version=PROTOCOL_VERSION, |
| 98 | + compression=False, |
| 99 | + execution_profiles={EXEC_PROFILE_DEFAULT: profile}, |
| 100 | + ) |
| 101 | + session = cluster.connect(wait_for_all_pools=True) |
| 102 | + self.addCleanup(cluster.shutdown) |
| 103 | + |
| 104 | + query = session.prepare("INSERT INTO table1 (id) VALUES (?)") |
| 105 | + |
| 106 | + prime_request(PauseReads()) |
| 107 | + |
| 108 | + futures = [] |
| 109 | + for i in range(1000): |
| 110 | + future = session.execute_async(query, [str(i)]) |
| 111 | + future.add_callbacks(callback=self.callback_success, errback=self.callback_error) |
| 112 | + futures.append(future) |
| 113 | + |
| 114 | + successes = 0 |
| 115 | + for future in futures: |
| 116 | + try: |
| 117 | + future.result() |
| 118 | + successes += 1 |
| 119 | + except OperationTimedOut: |
| 120 | + pass |
| 121 | + |
| 122 | + # Simulacron will respond to a couple queries before cutting off reads, so we'll just verify |
| 123 | + # that only "a few" successes happened here |
| 124 | + self.assertLess(successes, 50) |
| 125 | + self.assertLess(self.callback_successes, 50) |
| 126 | + self.assertEqual(self.callback_errors, len(futures) - self.callback_successes) |
| 127 | + |
| 128 | + def test_cluster_busy(self): |
| 129 | + """ Verify that once TCP buffer is full we get busy exceptions rather than timeouts """ |
| 130 | + start_and_prime_singledc() |
| 131 | + profile = ExecutionProfile(load_balancing_policy=RoundRobinPolicy()) |
| 132 | + cluster = Cluster( |
| 133 | + protocol_version=PROTOCOL_VERSION, |
| 134 | + compression=False, |
| 135 | + execution_profiles={EXEC_PROFILE_DEFAULT: profile}, |
| 136 | + ) |
| 137 | + session = cluster.connect(wait_for_all_pools=True) |
| 138 | + self.addCleanup(cluster.shutdown) |
| 139 | + |
| 140 | + query = session.prepare("INSERT INTO table1 (id) VALUES (?)") |
| 141 | + |
| 142 | + prime_request(PauseReads()) |
| 143 | + |
| 144 | + # These requests will get stuck in the TCP buffer and we have no choice but to let them time out |
| 145 | + self._fill_buffers(session, query, expected_blocked=3) |
| 146 | + |
| 147 | + # Now that our send buffer is completely full, verify we immediately get busy exceptions rather than timing out |
| 148 | + for i in range(1000): |
| 149 | + with self.assertRaises(NoHostAvailable) as e: |
| 150 | + session.execute(query, [str(i)]) |
| 151 | + self.assertIn("ConnectionBusy", str(e.exception)) |
| 152 | + |
| 153 | + def test_node_busy(self): |
| 154 | + """ Verify that once TCP buffer is full, queries continue to get re-routed to other nodes """ |
| 155 | + start_and_prime_singledc() |
| 156 | + profile = ExecutionProfile(load_balancing_policy=RoundRobinPolicy()) |
| 157 | + cluster = Cluster( |
| 158 | + protocol_version=PROTOCOL_VERSION, |
| 159 | + compression=False, |
| 160 | + execution_profiles={EXEC_PROFILE_DEFAULT: profile}, |
| 161 | + ) |
| 162 | + session = cluster.connect(wait_for_all_pools=True) |
| 163 | + self.addCleanup(cluster.shutdown) |
| 164 | + |
| 165 | + query = session.prepare("INSERT INTO table1 (id) VALUES (?)") |
| 166 | + |
| 167 | + prime_request(PauseReads(dc_id=0, node_id=0)) |
| 168 | + |
| 169 | + blocked_profile = ExecutionProfile(load_balancing_policy=WhiteListRoundRobinPolicy(["127.0.0.1"])) |
| 170 | + cluster.add_execution_profile('blocked_profile', blocked_profile) |
| 171 | + |
| 172 | + # Fill our blocked node's tcp buffer until we get a busy exception |
| 173 | + self._fill_buffers(session, query, expected_blocked=1, execution_profile='blocked_profile') |
| 174 | + |
| 175 | + # Now that our send buffer is completely full on one node, |
| 176 | + # verify queries get re-routed to other nodes and queries complete successfully |
| 177 | + for i in range(1000): |
| 178 | + session.execute(query, [str(i)]) |
| 179 | + |
0 commit comments