Skip to content

Commit 16158dd

Browse files
committed
add asyncioreactor
1 parent ec325b9 commit 16158dd

13 files changed

Lines changed: 298 additions & 5 deletions

File tree

CHANGELOG.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ Features
3838
* Add NO_COMPACT startup option (PYTHON-839)
3939
* Add new exception type for CDC (PYTHON-837)
4040
* Allow 0ms in ConstantSpeculativeExecutionPolicy (PYTHON-836)
41+
* Add asyncio reactor (PYTHON-507)
4142

4243
Bug Fixes
4344
---------

appveyor/appveyor.ps1

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ $env:JAVA_HOME="C:\Program Files\Java\jdk1.8.0"
22
$env:PATH="$($env:JAVA_HOME)\bin;$($env:PATH)"
33
$env:CCM_PATH="C:\Users\appveyor\ccm"
44
$env:CASSANDRA_VERSION=$env:cassandra_version
5-
$env:EVENT_LOOP_MANAGER="async"
5+
$env:EVENT_LOOP_MANAGER="asyncore"
66
$env:SIMULACRON_JAR="C:\Users\appveyor\simulacron-standalone-0.7.0.jar"
77

88
python --version
@@ -77,4 +77,4 @@ If (!(Test-Path C:\Users\appveyor\.ccm\repository\$env:cassandra_version)) {
7777
}
7878

7979
Start-Process python -ArgumentList "-m pip install -r test-requirements.txt" -Wait -NoNewWindow
80-
Start-Process python -ArgumentList "-m pip install nose-ignore-docstring" -Wait -NoNewWindow
80+
Start-Process python -ArgumentList "-m pip install nose-ignore-docstring" -Wait -NoNewWindow

benchmarks/base.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,14 @@
5757
except ImportError as exc:
5858
pass
5959

60+
have_asyncio = False
61+
try:
62+
from cassandra.io.asyncioreactor import AsyncioConnection
63+
have_asyncio = True
64+
supported_reactors.append(AsyncioConnection)
65+
except ImportError:
66+
pass
67+
6068
have_twisted = False
6169
try:
6270
from cassandra.io.twistedreactor import TwistedConnection
@@ -216,6 +224,8 @@ def parse_options():
216224
help='number of operations [default: %default]')
217225
parser.add_option('--asyncore-only', action='store_true', dest='asyncore_only',
218226
help='only benchmark with asyncore connections')
227+
parser.add_option('--asyncio-only', action='store_true', dest='asyncio_only',
228+
help='only benchmark with asyncio connections')
219229
parser.add_option('--libev-only', action='store_true', dest='libev_only',
220230
help='only benchmark with libev connections')
221231
parser.add_option('--twisted-only', action='store_true', dest='twisted_only',
@@ -252,6 +262,8 @@ def parse_options():
252262

253263
if options.asyncore_only:
254264
options.supported_reactors = [AsyncoreConnection]
265+
elif options.asyncio_only:
266+
options.supported_reactors = [AsyncioConnection]
255267
elif options.libev_only:
256268
if not have_libev:
257269
log.error("libev is not available")

build.yaml

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,12 +44,22 @@ schedules:
4444
env_vars: |
4545
EVENT_LOOP_MANAGER='eventlet'
4646
47+
weekly_asyncio:
48+
schedule: 0 22 * * 6
49+
branches:
50+
include: [master]
51+
env_vars: |
52+
EVENT_LOOP_MANAGER='asyncio'
53+
matrix:
54+
exclude:
55+
- python: [2.7]
56+
4757
weekly_async:
4858
schedule: 0 10 * * 7
4959
branches:
5060
include: [master]
5161
env_vars: |
52-
EVENT_LOOP_MANAGER='async'
62+
EVENT_LOOP_MANAGER='asyncore'
5363
5464
weekly_twister:
5565
schedule: 0 14 * * 7

cassandra/cluster.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -594,6 +594,7 @@ def default_retry_policy(self, policy):
594594
* :class:`cassandra.io.eventletreactor.EventletConnection` (requires monkey-patching - see doc for details)
595595
* :class:`cassandra.io.geventreactor.GeventConnection` (requires monkey-patching - see doc for details)
596596
* :class:`cassandra.io.twistedreactor.TwistedConnection`
597+
* EXPERIMENTAL: :class:`cassandra.io.asyncioreactor.AsyncioConnection`
597598
598599
By default, ``AsyncoreConnection`` will be used, which uses
599600
the ``asyncore`` module in the Python standard library.
@@ -602,6 +603,11 @@ def default_retry_policy(self, policy):
602603
603604
If ``gevent`` or ``eventlet`` monkey-patching is detected, the corresponding
604605
connection class will be used automatically.
606+
607+
``AsyncioConnection``, which uses the ``asyncio`` module in the Python
608+
standard library, is also available, but currently experimental. Note that
609+
it requires ``asyncio`` features that were only introduced in the 3.4 line
610+
in 3.4.6, and in the 3.5 line in 3.5.1.
605611
"""
606612

607613
control_connection_timeout = 2.0

cassandra/io/asyncioreactor.py

Lines changed: 185 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,185 @@
1+
from cassandra.connection import Connection
2+
3+
import asyncio
4+
import logging
5+
import os
6+
import socket
7+
import ssl
8+
from threading import Lock, Thread
9+
10+
11+
log = logging.getLogger(__name__)
12+
13+
14+
# This module uses ``yield from`` and ``@asyncio.coroutine`` over ``await`` and
15+
# ``async def`` for pre-Python-3.5 compatibility, so keep in mind that the
16+
# managed coroutines are generator-based, not native coroutines. See PEP 492:
17+
# https://www.python.org/dev/peps/pep-0492/#coroutine-objects
18+
19+
20+
try:
21+
asyncio.run_coroutine_threadsafe
22+
except AttributeError:
23+
raise ImportError(
24+
'Cannot use asyncioreactor without access to '
25+
'asyncio.run_coroutine_threadsafe (added in 3.4.6 and 3.5.1)'
26+
)
27+
28+
29+
class AsyncioTimer(object):
30+
"""
31+
An ``asyncioreactor``-specific Timer. Similar to :class:`.connection.Timer,
32+
but with a slightly different API due to limitations in the underlying
33+
``call_later`` interface. Not meant to be used with a
34+
:class:`.connection.TimerManager`.
35+
"""
36+
37+
@property
38+
def end(self):
39+
raise NotImplementedError('{} is not compatible with TimerManager and '
40+
'does not implement .end()')
41+
42+
def __init__(self, timeout, callback, loop):
43+
delayed = self._call_delayed_coro(timeout=timeout,
44+
callback=callback,
45+
loop=loop)
46+
self._handle = asyncio.run_coroutine_threadsafe(delayed, loop=loop)
47+
48+
@staticmethod
49+
@asyncio.coroutine
50+
def _call_delayed_coro(timeout, callback, loop):
51+
yield from asyncio.sleep(timeout, loop=loop)
52+
return callback()
53+
54+
def __lt__(self, other):
55+
try:
56+
return self._handle < other._handle
57+
except AttributeError:
58+
raise NotImplemented
59+
60+
def cancel(self):
61+
self._handle.cancel()
62+
63+
def finish(self):
64+
# connection.Timer method not implemented here because we can't inspect
65+
# the Handle returned from call_later
66+
raise NotImplementedError('{} is not compatible with TimerManager and '
67+
'does not implement .finish()')
68+
69+
70+
class AsyncioConnection(Connection):
71+
"""
72+
An experimental implementation of :class:`.Connection` that uses the
73+
``asyncio`` module in the Python standard library for its event loop.
74+
75+
Note that it requires ``asyncio`` features that were only introduced in the
76+
3.4 line in 3.4.6, and in the 3.5 line in 3.5.1.
77+
"""
78+
79+
_loop = None
80+
_pid = os.getpid()
81+
82+
_lock = Lock()
83+
_loop_thread = None
84+
85+
_write_queue = None
86+
87+
def __init__(self, *args, **kwargs):
88+
Connection.__init__(self, *args, **kwargs)
89+
90+
self._connect_socket()
91+
self._socket.setblocking(0)
92+
93+
self._write_queue = asyncio.Queue(loop=self._loop)
94+
95+
# see initialize_reactor -- loop is running in a separate thread, so we
96+
# have to use a threadsafe call
97+
self._read_watcher = asyncio.run_coroutine_threadsafe(
98+
self.handle_read(), loop=self._loop
99+
)
100+
self._write_watcher = asyncio.run_coroutine_threadsafe(
101+
self.handle_write(), loop=self._loop
102+
)
103+
self._send_options_message()
104+
105+
@classmethod
106+
def initialize_reactor(cls):
107+
with cls._lock:
108+
if cls._pid != os.getpid():
109+
cls._loop = None
110+
if cls._loop is None:
111+
cls._loop = asyncio.get_event_loop()
112+
113+
if not cls._loop_thread:
114+
# daemonize so the loop will be shut down on interpreter
115+
# shutdown
116+
cls._loop_thread = Thread(target=cls._loop.run_forever,
117+
daemon=True)
118+
cls._loop_thread.start()
119+
120+
@classmethod
121+
def create_timer(cls, timeout, callback):
122+
return AsyncioTimer(timeout, callback, loop=cls._loop)
123+
124+
def close(self):
125+
log.debug("Closing connection (%s) to %s" % (id(self), self.host))
126+
with self.lock:
127+
if self.is_closed:
128+
return
129+
self.is_closed = True
130+
131+
self._write_watcher.cancel()
132+
self._read_watcher.cancel()
133+
self.connected_event.set()
134+
135+
def push(self, data):
136+
buff_size = self.out_buffer_size
137+
if len(data) > buff_size:
138+
for i in range(0, len(data), buff_size):
139+
self._push_chunk(data[i:i + buff_size])
140+
else:
141+
self._push_chunk(data)
142+
143+
def _push_chunk(self, chunk):
144+
asyncio.run_coroutine_threadsafe(
145+
self._write_queue.put(chunk),
146+
loop=self._loop
147+
)
148+
149+
@asyncio.coroutine
150+
def handle_write(self):
151+
while True:
152+
try:
153+
next_msg = yield from self._write_queue.get()
154+
if next_msg:
155+
yield from self._loop.sock_sendall(self._socket, next_msg)
156+
except socket.error as err:
157+
log.debug("Exception in send for %s: %s", self, err)
158+
self.defunct(err)
159+
return
160+
161+
@asyncio.coroutine
162+
def handle_read(self):
163+
while True:
164+
try:
165+
buf = yield from self._loop.sock_recv(self._socket, self.in_buffer_size)
166+
self._iobuf.write(buf)
167+
# sock_recv expects EWOULDBLOCK if socket provides no data, but
168+
# nonblocking ssl sockets raise these instead, so we handle them
169+
# ourselves by yielding to the event loop, where the socket will
170+
# get the reading/writing it "wants" before retrying
171+
except (ssl.SSLWantWriteError, ssl.SSLWantReadError):
172+
yield
173+
continue
174+
except socket.error as err:
175+
log.debug("Exception during socket recv for %s: %s",
176+
self, err)
177+
self.defunct(err)
178+
return # leave the read loop
179+
180+
if buf and self._iobuf.tell():
181+
self.process_io_buffer()
182+
else:
183+
log.debug("Connection %s closed by server", self)
184+
self.close()
185+
return
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
``cassandra.io.asyncioreactor`` - ``asyncio`` Event Loop
2+
=====================================================================
3+
4+
.. module:: cassandra.io.asyncioreactor
5+
6+
.. autoclass:: AsyncioConnection
7+
:members:

docs/api/index.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ Core Driver
2020
cassandra/concurrent
2121
cassandra/connection
2222
cassandra/util
23+
cassandra/io/asyncioreactor
2324
cassandra/io/asyncorereactor
2425
cassandra/io/eventletreactor
2526
cassandra/io/libevreactor

test-requirements.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,3 +13,4 @@ gevent>=1.0
1313
eventlet
1414
cython>=0.20,<0.28
1515
packaging
16+
asynctest

tests/__init__.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,12 +71,15 @@ def is_monkey_patched():
7171

7272
from cassandra.io.eventletreactor import EventletConnection
7373
connection_class = EventletConnection
74-
elif "async" in EVENT_LOOP_MANAGER:
74+
elif "asyncore" in EVENT_LOOP_MANAGER:
7575
from cassandra.io.asyncorereactor import AsyncoreConnection
7676
connection_class = AsyncoreConnection
7777
elif "twisted" in EVENT_LOOP_MANAGER:
7878
from cassandra.io.twistedreactor import TwistedConnection
7979
connection_class = TwistedConnection
80+
elif "asyncio" in EVENT_LOOP_MANAGER:
81+
from cassandra.io.asyncioreactor import AsyncioConnection
82+
connection_class = AsyncioConnection
8083

8184
else:
8285
try:

0 commit comments

Comments
 (0)