@@ -621,6 +621,7 @@ class _ConnectionIOBuffer(object):
621621 _io_buffer = None
622622 _cql_frame_buffer = None
623623 _connection = None
624+ _segment_consumed = False
624625
625626 def __init__ (self , connection ):
626627 self ._io_buffer = io .BytesIO ()
@@ -643,6 +644,10 @@ def set_checksumming_buffer(self):
643644 def is_checksumming_enabled (self ):
644645 return self ._connection ._is_checksumming_enabled
645646
647+ @property
648+ def has_consumed_segment (self ):
649+ return self ._segment_consumed ;
650+
646651 def readable_io_bytes (self ):
647652 return self .io_buffer .tell ()
648653
@@ -1118,22 +1123,33 @@ def _process_segment_buffer(self):
11181123 try :
11191124 self ._io_buffer .io_buffer .seek (0 )
11201125 segment_header = self ._segment_codec .decode_header (self ._io_buffer .io_buffer )
1126+
11211127 if readable_bytes >= segment_header .segment_length :
11221128 segment = self ._segment_codec .decode (self ._iobuf , segment_header )
1129+ self ._io_buffer ._segment_consumed = True
11231130 self ._io_buffer .cql_frame_buffer .write (segment .payload )
11241131 else :
1125- # not enough data to read the segment
1126- self ._io_buffer .io_buffer .seek (0 , 2 )
1132+ # not enough data to read the segment. reset the buffer pointer at the
1133+ # beginning to not lose what we previously read (header).
1134+ self ._io_buffer ._segment_consumed = False
1135+ self ._io_buffer .io_buffer .seek (0 )
11271136 except CrcException as exc :
11281137 # re-raise an exception that inherits from ConnectionException
11291138 raise CrcMismatchException (str (exc ), self .endpoint )
1139+ else :
1140+ self ._io_buffer ._segment_consumed = False
11301141
11311142 def process_io_buffer (self ):
11321143 while True :
1133- if self ._is_checksumming_enabled :
1144+ if self ._is_checksumming_enabled and self . _io_buffer . readable_io_bytes () :
11341145 self ._process_segment_buffer ()
11351146 self ._io_buffer .reset_io_buffer ()
11361147
1148+ if self ._is_checksumming_enabled and not self ._io_buffer .has_consumed_segment :
1149+ # We couldn't read an entire segment from the io buffer, so return
1150+ # control to allow more bytes to be read off the wire
1151+ return
1152+
11371153 if not self ._current_frame :
11381154 pos = self ._read_frame_header ()
11391155 else :
0 commit comments