Skip to content

Commit b5cb3bf

Browse files
authored
Merge pull request apache#819 from datastax/python-simulacron-tests
Added some tests around network partitioning, closing connections and …
2 parents d81a515 + 5c77d93 commit b5cb3bf

5 files changed

Lines changed: 228 additions & 30 deletions

File tree

tests/integration/simulacron/__init__.py

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,19 @@
1111
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License
14+
try:
15+
import unittest2 as unittest
16+
except ImportError:
17+
import unittest # noqa
18+
19+
from tests.integration.simulacron.utils import stop_simulacron, clear_queries
1420

15-
from tests.integration.simulacron.utils import stop_simulacron
1621

1722
def teardown_package():
18-
stop_simulacron()
23+
stop_simulacron()
24+
25+
26+
class SimulacronBase(unittest.TestCase):
27+
def tearDown(self):
28+
clear_queries()
29+
stop_simulacron()

tests/integration/simulacron/test_connection.py

Lines changed: 183 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -25,20 +25,22 @@
2525

2626
from cassandra import OperationTimedOut
2727
from cassandra.cluster import (EXEC_PROFILE_DEFAULT, Cluster, ExecutionProfile,
28-
_Scheduler)
28+
_Scheduler, NoHostAvailable)
2929
from cassandra.policies import HostStateListener, RoundRobinPolicy
3030
from tests.integration import (CASSANDRA_VERSION, PROTOCOL_VERSION,
3131
requiressimulacron)
3232
from tests.integration.util import assert_quiescent_pool_state
33+
from tests.integration.simulacron import SimulacronBase
3334
from tests.integration.simulacron.utils import (NO_THEN, PrimeOptions,
3435
prime_query, prime_request,
3536
start_and_prime_cluster_defaults,
3637
start_and_prime_singledc,
37-
stop_simulacron)
38+
clear_queries)
3839

3940

4041
class TrackDownListener(HostStateListener):
41-
hosts_marked_down = []
42+
def __init__(self):
43+
self.hosts_marked_down = []
4244

4345
def on_down(self, host):
4446
self.hosts_marked_down.append(host)
@@ -50,8 +52,21 @@ def submit(self, fn, *args, **kwargs):
5052
self.called_functions.append(fn.__name__)
5153
return super(ThreadTracker, self).submit(fn, *args, **kwargs)
5254

55+
56+
class OrderedRoundRobinPolicy(RoundRobinPolicy):
57+
58+
def make_query_plan(self, working_keyspace=None, query=None):
59+
self._position += 1
60+
61+
hosts = []
62+
for _ in range(10):
63+
hosts.extend(sorted(self._live_hosts, key=lambda x : x.address))
64+
65+
return hosts
66+
67+
5368
@requiressimulacron
54-
class ConnectionTest(unittest.TestCase):
69+
class ConnectionTests(SimulacronBase):
5570

5671
def test_heart_beat_timeout(self):
5772
"""
@@ -64,24 +79,23 @@ def test_heart_beat_timeout(self):
6479
@test_category metadata
6580
"""
6681
number_of_dcs = 3
67-
nodes_per_dc = 100
82+
nodes_per_dc = 20
6883

6984
query_to_prime = "INSERT INTO test3rf.test (k, v) VALUES (0, 1);"
7085

7186
idle_heartbeat_timeout = 5
7287
idle_heartbeat_interval = 1
7388

7489
start_and_prime_cluster_defaults(number_of_dcs, nodes_per_dc, CASSANDRA_VERSION)
75-
self.addCleanup(stop_simulacron)
7690

7791
listener = TrackDownListener()
78-
executor = ThreadTracker(max_workers=16)
92+
executor = ThreadTracker(max_workers=8)
7993

8094
# We need to disable compression since it's not supported in simulacron
8195
cluster = Cluster(compression=False,
8296
idle_heartbeat_interval=idle_heartbeat_interval,
8397
idle_heartbeat_timeout=idle_heartbeat_timeout,
84-
executor_threads=16,
98+
executor_threads=8,
8599
execution_profiles={
86100
EXEC_PROFILE_DEFAULT: ExecutionProfile(load_balancing_policy=RoundRobinPolicy())})
87101
self.addCleanup(cluster.shutdown)
@@ -112,7 +126,7 @@ def test_heart_beat_timeout(self):
112126

113127
# We allow from some extra time for all the hosts to be to on_down
114128
# The callbacks should start happening after idle_heartbeat_timeout + idle_heartbeat_interval
115-
time.sleep((idle_heartbeat_timeout + idle_heartbeat_interval) * 2)
129+
time.sleep((idle_heartbeat_timeout + idle_heartbeat_interval) * 2.5)
116130

117131
for host in cluster.metadata.all_hosts():
118132
self.assertIn(host, listener.hosts_marked_down)
@@ -133,7 +147,6 @@ def test_callbacks_and_pool_when_oto(self):
133147
@test_category metadata
134148
"""
135149
start_and_prime_singledc()
136-
self.addCleanup(stop_simulacron)
137150

138151
cluster = Cluster(protocol_version=PROTOCOL_VERSION, compression=False)
139152
session = cluster.connect()
@@ -155,3 +168,163 @@ def test_callbacks_and_pool_when_oto(self):
155168
# PYTHON-630 -- only the errback should be called
156169
errback.assert_called_once()
157170
callback.assert_not_called()
171+
172+
def test_close_when_query(self):
173+
"""
174+
Test to ensure the driver behaves correctly if the connection is closed
175+
just when querying
176+
@since 3.12
177+
@expected_result NoHostAvailable is risen
178+
179+
@test_category connection
180+
"""
181+
start_and_prime_singledc()
182+
183+
cluster = Cluster(protocol_version=PROTOCOL_VERSION, compression=False)
184+
session = cluster.connect()
185+
self.addCleanup(cluster.shutdown)
186+
187+
query_to_prime = "SELECT * from testkesypace.testtable"
188+
189+
for close_type in ("disconnect", "shutdown_read", "shutdown_write"):
190+
then = {
191+
"result": "close_connection",
192+
"delay_in_ms": 0,
193+
"close_type": close_type,
194+
"scope": "connection"
195+
}
196+
197+
prime_query(query_to_prime, then=then)
198+
self.assertRaises(NoHostAvailable, session.execute, query_to_prime)
199+
200+
def test_retry_after_defunct(self):
201+
"""
202+
We test cluster._retry is called if an the connection is defunct
203+
in the middle of a query
204+
205+
Finally we verify the driver recovers correctly in the event
206+
of a network partition
207+
208+
@since 3.12
209+
@expected_result the driver is able to query even if a host is marked
210+
as down in the middle of the query, it will go to the next one if the timeout
211+
hasn't expired
212+
213+
@test_category connection
214+
"""
215+
number_of_dcs = 3
216+
nodes_per_dc = 2
217+
218+
query_to_prime = "INSERT INTO test3rf.test (k, v) VALUES (0, 1);"
219+
220+
idle_heartbeat_timeout = 1
221+
idle_heartbeat_interval = 5
222+
223+
simulacron_cluster = start_and_prime_cluster_defaults(number_of_dcs, nodes_per_dc, CASSANDRA_VERSION)
224+
225+
dc_ids = sorted(simulacron_cluster.data_center_ids)
226+
last_host = dc_ids.pop()
227+
prime_query(query_to_prime,
228+
cluster_name="{}/{}".format(simulacron_cluster.cluster_name, last_host))
229+
230+
roundrobin_lbp = OrderedRoundRobinPolicy()
231+
cluster = Cluster(compression=False,
232+
idle_heartbeat_interval=idle_heartbeat_interval,
233+
idle_heartbeat_timeout=idle_heartbeat_timeout,
234+
execution_profiles={
235+
EXEC_PROFILE_DEFAULT: ExecutionProfile(load_balancing_policy=roundrobin_lbp)})
236+
237+
session = cluster.connect(wait_for_all_pools=True)
238+
self.addCleanup(cluster.shutdown)
239+
240+
# This simulates we only have access to one DC
241+
for dc_id in dc_ids:
242+
datacenter_path = "{}/{}".format(simulacron_cluster.cluster_name, dc_id)
243+
prime_query(query_to_prime, then=NO_THEN, cluster_name=datacenter_path)
244+
prime_request(PrimeOptions(then=NO_THEN, cluster_name=datacenter_path))
245+
246+
# Only the last datacenter will respond, therefore the first host won't
247+
# We want to make sure the returned hosts are 127.0.0.1, 127.0.0.2, ... 127.0.0.8
248+
roundrobin_lbp._position = 0
249+
250+
# After 3 + 1 seconds the connection should be marked and down and another host retried
251+
response_future = session.execute_async(query_to_prime, timeout=4 * idle_heartbeat_interval
252+
+ idle_heartbeat_timeout)
253+
response_future.result()
254+
self.assertGreater(len(response_future.attempted_hosts), 1)
255+
256+
# No error should be raised here since the hosts have been marked
257+
# as down and there's still 1 DC available
258+
for _ in range(10):
259+
session.execute(query_to_prime)
260+
261+
# Might take some time to close the previous connections and reconnect
262+
time.sleep(10)
263+
assert_quiescent_pool_state(self, cluster)
264+
clear_queries()
265+
266+
time.sleep(10)
267+
assert_quiescent_pool_state(self, cluster)
268+
269+
def test_idle_connection_is_not_closed(self):
270+
"""
271+
Test to ensure that the connections aren't closed if they are idle
272+
@since 3.12
273+
@jira_ticket PYTHON-573
274+
@expected_result the connections aren't closed nor the hosts are
275+
set to down if the connection is idle
276+
277+
@test_category connection
278+
"""
279+
start_and_prime_singledc()
280+
281+
idle_heartbeat_timeout = 1
282+
idle_heartbeat_interval = 1
283+
284+
listener = TrackDownListener()
285+
cluster = Cluster(compression=False,
286+
idle_heartbeat_interval=idle_heartbeat_interval,
287+
idle_heartbeat_timeout=idle_heartbeat_timeout)
288+
session = cluster.connect(wait_for_all_pools=True)
289+
cluster.register_listener(listener)
290+
291+
self.addCleanup(cluster.shutdown)
292+
293+
time.sleep(20)
294+
295+
self.assertEqual(listener.hosts_marked_down, [])
296+
297+
def test_host_is_not_set_to_down_after_query_oto(self):
298+
"""
299+
Test to ensure that the connections aren't closed if there's an
300+
OperationTimedOut in a normal query. This should only happen from the
301+
heart beat thread (in the case of a OperationTimedOut) with the default
302+
configuration
303+
@since 3.12
304+
@expected_result the connections aren't closed nor the hosts are
305+
set to down
306+
307+
@test_category connection
308+
"""
309+
start_and_prime_singledc()
310+
311+
query_to_prime = "SELECT * FROM madeup_keyspace.madeup_table"
312+
313+
prime_query(query_to_prime, then=NO_THEN)
314+
315+
listener = TrackDownListener()
316+
cluster = Cluster(compression=False)
317+
session = cluster.connect(wait_for_all_pools=True)
318+
cluster.register_listener(listener)
319+
320+
futures = []
321+
for _ in range(10):
322+
future = session.execute_async(query_to_prime)
323+
futures.append(future)
324+
325+
for f in futures:
326+
f._event.wait()
327+
self.assertIsInstance(f._final_exception, OperationTimedOut)
328+
329+
self.assertEqual(listener.hosts_marked_down, [])
330+
assert_quiescent_pool_state(self, cluster)

tests/integration/simulacron/test_policies.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
from cassandra.policies import ConstantSpeculativeExecutionPolicy, RoundRobinPolicy
2323

2424
from tests.integration import PROTOCOL_VERSION, greaterthancass21, requiressimulacron, SIMULACRON_JAR
25-
from tests import notwindows
2625
from tests.integration.simulacron.utils import start_and_prime_singledc, prime_query, \
2726
stop_simulacron, NO_THEN, clear_queries
2827

tests/integration/simulacron/utils.py

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
from tests.integration import CASSANDRA_VERSION, SIMULACRON_JAR
1818
import subprocess
1919
import time
20+
import nose
2021

2122
DEFAULT_CLUSTER = "python_simulacron_cluster"
2223

@@ -280,7 +281,7 @@ def start_and_prime_singledc(cluster_name=DEFAULT_CLUSTER):
280281
:param cluster_name: name of the cluster to start and prime
281282
:return:
282283
"""
283-
start_and_prime_cluster_defaults(number_of_dc=1, nodes_per_dc=3, cluster_name=cluster_name)
284+
return start_and_prime_cluster_defaults(number_of_dc=1, nodes_per_dc=3, cluster_name=cluster_name)
284285

285286

286287
def start_and_prime_cluster_defaults(number_of_dc=1, nodes_per_dc=3, version=None, cluster_name=DEFAULT_CLUSTER):
@@ -291,9 +292,11 @@ def start_and_prime_cluster_defaults(number_of_dc=1, nodes_per_dc=3, version=Non
291292
"""
292293
start_simulacron()
293294
data_centers = ",".join([str(nodes_per_dc)] * number_of_dc)
294-
prime_cluster(data_centers=data_centers, version=version, cluster_name=cluster_name)
295+
simulacron_cluster = prime_cluster(data_centers=data_centers, version=version, cluster_name=cluster_name)
295296
prime_driver_defaults()
296297

298+
return simulacron_cluster
299+
297300

298301
default_column_types = {
299302
"key": "bigint",
@@ -316,6 +319,13 @@ def prime_query(query, rows=default_rows, column_types=default_column_types, whe
316319
Shortcut function for priming a query
317320
:return:
318321
"""
322+
# If then is set, then rows and column_types should not
323+
if then:
324+
nose.tools.assert_equal(rows, default_rows)
325+
nose.tools.assert_equal(column_types, default_column_types)
326+
rows=None
327+
column_types=None
328+
319329
query = PrimeQuery(query, rows=rows, column_types=column_types, when=when, then=then, cluster_name=cluster_name)
320330
response = prime_request(query)
321331
return response

0 commit comments

Comments
 (0)