|
4 | 4 | import unittest # noqa |
5 | 5 |
|
6 | 6 | import errno |
| 7 | +import os |
7 | 8 | from StringIO import StringIO |
8 | 9 | from socket import error as socket_error |
9 | 10 |
|
|
15 | 16 |
|
16 | 17 | from cassandra.decoder import (write_stringmultimap, write_int, write_string, |
17 | 18 | SupportedMessage, ReadyMessage, ServerError) |
18 | | -from cassandra.marshal import uint8_pack, uint32_pack |
| 19 | +from cassandra.marshal import uint8_pack, uint32_pack, int32_pack |
19 | 20 |
|
20 | 21 | try: |
21 | 22 | from cassandra.io.libevreactor import LibevConnection |
@@ -85,6 +86,40 @@ def test_successful_connection(self, *args): |
85 | 86 | c.handle_read(None, 0) |
86 | 87 |
|
87 | 88 | self.assertTrue(c.connected_event.is_set()) |
| 89 | + return c |
| 90 | + |
| 91 | + def test_egain_on_buffer_size(self, *args): |
| 92 | + # get a connection that's already fully started |
| 93 | + c = self.test_successful_connection() |
| 94 | + |
| 95 | + header = '\x00\x00\x00\x00' + int32_pack(20000) |
| 96 | + responses = [ |
| 97 | + header + ('a' * (4096 - len(header))), |
| 98 | + 'a' * 4096, |
| 99 | + socket_error(errno.EAGAIN), |
| 100 | + 'a' * 100, |
| 101 | + socket_error(errno.EAGAIN)] |
| 102 | + |
| 103 | + def side_effect(*args): |
| 104 | + response = responses.pop(0) |
| 105 | + if isinstance(response, socket_error): |
| 106 | + raise response |
| 107 | + else: |
| 108 | + return response |
| 109 | + |
| 110 | + c._socket.recv.side_effect = side_effect |
| 111 | + c.handle_read(None, 0) |
| 112 | + self.assertEquals(c._total_reqd_bytes, 20000 + len(header)) |
| 113 | + # the EAGAIN prevents it from reading the last 100 bytes |
| 114 | + c._iobuf.seek(0, os.SEEK_END) |
| 115 | + pos = c._iobuf.tell() |
| 116 | + self.assertEquals(pos, 4096 + 4096) |
| 117 | + |
| 118 | + # now tell it to read the last 100 bytes |
| 119 | + c.handle_read(None, 0) |
| 120 | + c._iobuf.seek(0, os.SEEK_END) |
| 121 | + pos = c._iobuf.tell() |
| 122 | + self.assertEquals(pos, 4096 + 4096 + 100) |
88 | 123 |
|
89 | 124 | def test_protocol_error(self, *args): |
90 | 125 | c = self.make_connection() |
|
0 commit comments