Skip to content

Commit 10ecc37

Browse files
author
bjmb
committed
Move calls to asyncore to the loop thread
1 parent 0e65b33 commit 10ecc37

7 files changed

Lines changed: 62 additions & 19 deletions

File tree

build.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,11 @@ schedules:
1414
branches:
1515
include: [/python.*/]
1616
env_vars: |
17-
EVENT_LOOP_MANAGER='libev'
17+
EVENT_LOOP_MANAGER='async'
1818
matrix:
1919
exclude:
2020
- python: [3.4, 3.6]
21-
- cassandra: ['2.0', '2.1', '3.0']
21+
- cassandra: ['2.0', '2.1']
2222

2323
weekly_libev:
2424
schedule: 0 10 * * 6

cassandra/io/asyncorereactor.py

Lines changed: 49 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,10 @@
1818
import os
1919
import socket
2020
import sys
21-
from threading import Lock, Thread
21+
from threading import Lock, Thread, Event
2222
import time
2323
import weakref
24+
import sys
2425

2526
from six.moves import range
2627

@@ -51,6 +52,33 @@ def _cleanup(loop_weakref):
5152
loop._cleanup()
5253

5354

55+
class WaitableTimer(Timer):
56+
def __init__(self, timeout, callback):
57+
Timer.__init__(self, timeout, callback)
58+
self.callback = callback
59+
self.event = Event()
60+
61+
self.final_exception = None
62+
63+
def finish(self, time_now):
64+
try:
65+
finished = Timer.finish(self, time_now)
66+
if finished:
67+
self.event.set()
68+
return True
69+
return False
70+
71+
except Exception as e:
72+
self.final_exception = e
73+
self.event.set()
74+
return True
75+
76+
def wait(self, timeout=None):
77+
self.event.wait(timeout)
78+
if self.final_exception:
79+
raise self.final_exception
80+
81+
5482
class _PipeWrapper(object):
5583

5684
def __init__(self, fd):
@@ -239,6 +267,11 @@ def _run_loop(self):
239267
def add_timer(self, timer):
240268
self._timers.add_timer(timer)
241269

270+
# This function is called from a different thread than the event loop
271+
# thread, so for this call to be thread safe, we must wake up the loop
272+
# in case it's stuck at a select
273+
self.wake_loop()
274+
242275
def _cleanup(self):
243276
global _dispatcher_map
244277

@@ -305,16 +338,23 @@ def __init__(self, *args, **kwargs):
305338
self.deque_lock = Lock()
306339

307340
self._connect_socket()
308-
asyncore.dispatcher.__init__(self, self._socket, _dispatcher_map)
341+
342+
# start the event loop if needed
343+
self._loop.maybe_start()
344+
345+
init_handler = WaitableTimer(
346+
timeout=0,
347+
callback=partial(asyncore.dispatcher.__init__,
348+
self, self._socket, _dispatcher_map)
349+
)
350+
AsyncoreConnection._loop.add_timer(init_handler)
351+
init_handler.wait(kwargs["connect_timeout"])
309352

310353
self._writable = True
311354
self._readable = True
312355

313356
self._send_options_message()
314357

315-
# start the event loop if needed
316-
self._loop.maybe_start()
317-
318358
def close(self):
319359
with self.lock:
320360
if self.is_closed:
@@ -324,7 +364,10 @@ def close(self):
324364
log.debug("Closing connection (%s) to %s", id(self), self.host)
325365
self._writable = False
326366
self._readable = False
327-
asyncore.dispatcher.close(self)
367+
368+
# We don't have to wait for this to be closed, we can just schedule it
369+
AsyncoreConnection.create_timer(0, partial(asyncore.dispatcher.close, self))
370+
328371
log.debug("Closed socket to %s", self.host)
329372

330373
if not self.is_defunct:

tests/integration/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -265,6 +265,8 @@ def get_unsupported_upper_protocol():
265265
dseonly = unittest.skipUnless(DSE_VERSION, "Test is only applicalbe to DSE clusters")
266266
pypy = unittest.skipUnless(platform.python_implementation() == "PyPy", "Test is skipped unless it's on PyPy")
267267
notpy3 = unittest.skipIf(sys.version_info >= (3, 0), "Test not applicable for Python 3.x runtime")
268+
requiresmallclockgranularity = unittest.skipIf("Windows" in platform.system() or "async" in EVENT_LOOP_MANAGER,
269+
"This test is not suitible for environments with large clock granularity")
268270

269271

270272
def wait_for_node_socket(node, timeout):

tests/integration/long/test_failure_types.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,8 @@
2020
from cassandra.policies import HostFilterPolicy, RoundRobinPolicy
2121
from cassandra.concurrent import execute_concurrent_with_args
2222
from cassandra.query import SimpleStatement
23-
from tests.integration import use_singledc, PROTOCOL_VERSION, get_cluster, setup_keyspace, remove_cluster, get_node
23+
from tests.integration import use_singledc, PROTOCOL_VERSION, get_cluster, setup_keyspace, remove_cluster, get_node, \
24+
requiresmallclockgranularity
2425
from mock import Mock
2526

2627
try:
@@ -318,6 +319,7 @@ def test_user_function_failure(self):
318319
""", consistency_level=ConsistencyLevel.ALL, expected_exception=None)
319320

320321

322+
@requiresmallclockgranularity
321323
class TimeoutTimerTest(unittest.TestCase):
322324
def setUp(self):
323325
"""

tests/integration/standard/test_connection.py

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,8 @@
3232
from cassandra.policies import HostFilterPolicy, RoundRobinPolicy, HostStateListener
3333
from cassandra.pool import HostConnectionPool
3434

35-
from tests import is_monkey_patched, notwindows
36-
from tests.integration import use_singledc, PROTOCOL_VERSION, get_node, CASSANDRA_IP, local
37-
35+
from tests import is_monkey_patched
36+
from tests.integration import use_singledc, PROTOCOL_VERSION, get_node, CASSANDRA_IP, local, requiresmallclockgranularity
3837
try:
3938
from cassandra.io.libevreactor import LibevConnection
4039
except ImportError:
@@ -364,9 +363,7 @@ def send_msgs(conn, event):
364363
for t in threads:
365364
t.join()
366365

367-
# We skip this one for windows because the clock is not as
368-
# granular as in linux
369-
@notwindows
366+
@requiresmallclockgranularity
370367
def test_connect_timeout(self):
371368
# Underlying socket implementations don't always throw a socket timeout even with min float
372369
# This can be timing sensitive, added retry to ensure failure occurs if it can

tests/integration/standard/test_policies.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
from cassandra.pool import Host
2929

3030
from tests.integration import BasicSharedKeyspaceUnitTestCase, greaterthancass21, PROTOCOL_VERSION
31-
from tests import notwindows
31+
from tests.integration import requiresmallclockgranularity
3232

3333
from mock import patch
3434

@@ -92,8 +92,7 @@ def test_predicate_changes(self):
9292
self.assertEqual(queried_hosts, all_hosts)
9393

9494

95-
# This doesn't work well with Windows clock granularity
96-
@notwindows
95+
@requiresmallclockgranularity
9796
class SpecExecTest(BasicSharedKeyspaceUnitTestCase):
9897

9998
@classmethod

tests/unit/io/test_asyncorereactor.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ def setUp(self):
6363
raise unittest.SkipTest("Can't test asyncore with monkey patching")
6464

6565
def make_connection(self):
66-
c = AsyncoreConnection('1.2.3.4', cql_version='3.0.1')
66+
c = AsyncoreConnection('1.2.3.4', cql_version='3.0.1', connect_timeout=5)
6767
c.socket = Mock()
6868
c.socket.send.side_effect = lambda x: len(x)
6969
return c

0 commit comments

Comments
 (0)