Skip to content

Commit e14a7a0

Browse files
committed
PYTHON-1248: Libevreactor: Raise ConnectionBusy if tcp send buffer is full
1 parent f9d69f3 commit e14a7a0

6 files changed

Lines changed: 16 additions & 5 deletions

File tree

CHANGELOG.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ Features
66
--------
77
* Make geomet an optional dependency at runtime (PYTHON-1237)
88
* Add use_default_tempdir cloud config options (PYTHON-1245)
9+
* Tcp flow control for libevreactor (PYTHON-1248)
910

1011
Bug Fixes
1112
---------

Jenkinsfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -601,7 +601,7 @@ pipeline {
601601
EVENT_LOOP_MANAGER = "${params.EVENT_LOOP_MANAGER.toLowerCase()}"
602602
EXECUTE_LONG_TESTS = "${params.EXECUTE_LONG_TESTS ? 'True' : 'False'}"
603603
CCM_ENVIRONMENT_SHELL = '/usr/local/bin/ccm_environment.sh'
604-
CCM_MAX_HEAP_SIZE = '1536M'
604+
CCM_MAX_HEAP_SIZE = '1280M'
605605
}
606606

607607
stages {

cassandra/cluster.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@
4848
from cassandra.connection import (ConnectionException, ConnectionShutdown,
4949
ConnectionHeartbeat, ProtocolVersionUnsupported,
5050
EndPoint, DefaultEndPoint, DefaultEndPointFactory,
51-
ContinuousPagingState, SniEndPointFactory)
51+
ContinuousPagingState, SniEndPointFactory, ConnectionBusy)
5252
from cassandra.cqltypes import UserType
5353
from cassandra.encoder import Encoder
5454
from cassandra.protocol import (QueryMessage, ResultMessage,
@@ -4445,15 +4445,18 @@ def _query(self, host, message=None, cb=None):
44454445
except NoConnectionsAvailable as exc:
44464446
log.debug("All connections for host %s are at capacity, moving to the next host", host)
44474447
self._errors[host] = exc
4448-
return None
4448+
except ConnectionBusy as exc:
4449+
log.debug("Connection for host %s is busy, moving to the next host", host)
4450+
self._errors[host] = exc
44494451
except Exception as exc:
44504452
log.debug("Error querying host %s", host, exc_info=True)
44514453
self._errors[host] = exc
44524454
if self._metrics is not None:
44534455
self._metrics.on_connection_error()
44544456
if connection:
44554457
pool.return_connection(connection)
4456-
return None
4458+
4459+
return None
44574460

44584461
@property
44594462
def has_more_pages(self):

cassandra/connection.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -692,6 +692,7 @@ def __init__(self, host='127.0.0.1', port=9042, authenticator=None,
692692
self._requests = {}
693693
self._iobuf = io.BytesIO()
694694
self._continuous_paging_sessions = {}
695+
self._socket_writable = True
695696

696697
if ssl_options:
697698
self._check_hostname = bool(self.ssl_options.pop('check_hostname', False))
@@ -926,6 +927,8 @@ def send_msg(self, msg, request_id, cb, encoder=ProtocolHandler.encode_message,
926927
raise ConnectionShutdown("Connection to %s is defunct" % self.endpoint)
927928
elif self.is_closed:
928929
raise ConnectionShutdown("Connection to %s is closed" % self.endpoint)
930+
elif not self._socket_writable:
931+
raise ConnectionBusy("Connection %s is overloaded" % self.endpoint)
929932

930933
# queue the decoder function with the request
931934
# this allows us to inject custom functions per request to encode, decode messages

cassandra/io/libevreactor.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -310,13 +310,17 @@ def handle_write(self, watcher, revents, errno=None):
310310
with self._deque_lock:
311311
next_msg = self.deque.popleft()
312312
except IndexError:
313+
if not self._socket_writable:
314+
self._socket_writable = True
313315
return
314316

315317
try:
316318
sent = self._socket.send(next_msg)
317319
except socket.error as err:
318320
if (err.args[0] in NONBLOCKING or
319321
err.args[0] in (ssl.SSL_ERROR_WANT_READ, ssl.SSL_ERROR_WANT_WRITE)):
322+
if err.args[0] in NONBLOCKING:
323+
self._socket_writable = False
320324
with self._deque_lock:
321325
self.deque.appendleft(next_msg)
322326
else:

tests/integration/simulacron/test_backpressure.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ def _fill_buffers(self, session, query, expected_blocked=3, **execute_kwargs):
4545
for pool in session.get_pools():
4646
if not pool._connection._socket_writable:
4747
total_blocked += 1
48-
if total_blocked == expected_blocked:
48+
if total_blocked >= expected_blocked:
4949
break
5050
else:
5151
raise Exception("Unable to fill TCP send buffer on expected number of nodes")

0 commit comments

Comments
 (0)