Skip to content

Commit ec95133

Browse files
committed
libev: handle EAGAIN when message len matches buffer size
1 parent 6bd519b commit ec95133

File tree

3 files changed

+41
-2
lines changed

3 files changed

+41
-2
lines changed

CHANGELOG.rst

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,10 @@ Bug Fixes
99
* Always close socket when defuncting error'ed connections to avoid a potential
1010
file descriptor leak
1111
* Handle "custom" types (such as the replaced DateType) correctly
12+
* With libevreactor, correctly handle EAGAIN/EWOULDBLOCK when the message from
13+
Cassandra is a multiple of the read buffer size. Previously, if no more data
14+
became available to read on the socket, the message would never be processed,
15+
resulting in an OperationTimedOut error.
1216

1317
Other
1418
-----

cassandra/io/libevreactor.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -313,7 +313,7 @@ def handle_read(self, watcher, revents, errno=None):
313313
except socket.error as err:
314314
if err.args[0] not in NONBLOCKING:
315315
self.defunct(err)
316-
return
316+
return
317317

318318
if self._iobuf.tell():
319319
while True:

tests/unit/io/test_libevreactor.py

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import unittest # noqa
55

66
import errno
7+
import os
78
from StringIO import StringIO
89
from socket import error as socket_error
910

@@ -15,7 +16,7 @@
1516

1617
from cassandra.decoder import (write_stringmultimap, write_int, write_string,
1718
SupportedMessage, ReadyMessage, ServerError)
18-
from cassandra.marshal import uint8_pack, uint32_pack
19+
from cassandra.marshal import uint8_pack, uint32_pack, int32_pack
1920

2021
try:
2122
from cassandra.io.libevreactor import LibevConnection
@@ -85,6 +86,40 @@ def test_successful_connection(self, *args):
8586
c.handle_read(None, 0)
8687

8788
self.assertTrue(c.connected_event.is_set())
89+
return c
90+
91+
def test_egain_on_buffer_size(self, *args):
92+
# get a connection that's already fully started
93+
c = self.test_successful_connection()
94+
95+
header = '\x00\x00\x00\x00' + int32_pack(20000)
96+
responses = [
97+
header + ('a' * (4096 - len(header))),
98+
'a' * 4096,
99+
socket_error(errno.EAGAIN),
100+
'a' * 100,
101+
socket_error(errno.EAGAIN)]
102+
103+
def side_effect(*args):
104+
response = responses.pop(0)
105+
if isinstance(response, socket_error):
106+
raise response
107+
else:
108+
return response
109+
110+
c._socket.recv.side_effect = side_effect
111+
c.handle_read(None, 0)
112+
self.assertEquals(c._total_reqd_bytes, 20000 + len(header))
113+
# the EAGAIN prevents it from reading the last 100 bytes
114+
c._iobuf.seek(0, os.SEEK_END)
115+
pos = c._iobuf.tell()
116+
self.assertEquals(pos, 4096 + 4096)
117+
118+
# now tell it to read the last 100 bytes
119+
c.handle_read(None, 0)
120+
c._iobuf.seek(0, os.SEEK_END)
121+
pos = c._iobuf.tell()
122+
self.assertEquals(pos, 4096 + 4096 + 100)
88123

89124
def test_protocol_error(self, *args):
90125
c = self.make_connection()

0 commit comments

Comments
 (0)