@@ -621,6 +621,7 @@ class _ConnectionIOBuffer(object):
621621 _io_buffer = None
622622 _cql_frame_buffer = None
623623 _connection = None
624+ _segment_consumed = False
624625
625626 def __init__ (self , connection ):
626627 self ._io_buffer = io .BytesIO ()
@@ -643,6 +644,10 @@ def set_checksumming_buffer(self):
643644 def is_checksumming_enabled (self ):
644645 return self ._connection ._is_checksumming_enabled
645646
647+ @property
648+ def has_consumed_segment (self ):
649+ return self ._segment_consumed ;
650+
646651 def readable_io_bytes (self ):
647652 return self .io_buffer .tell ()
648653
@@ -1118,23 +1123,33 @@ def _process_segment_buffer(self):
11181123 try :
11191124 self ._io_buffer .io_buffer .seek (0 )
11201125 segment_header = self ._segment_codec .decode_header (self ._io_buffer .io_buffer )
1126+
11211127 if readable_bytes >= segment_header .segment_length :
11221128 segment = self ._segment_codec .decode (self ._iobuf , segment_header )
1129+ self ._io_buffer ._segment_consumed = True
11231130 self ._io_buffer .cql_frame_buffer .write (segment .payload )
11241131 else :
11251132 # not enough data to read the segment. reset the buffer pointer at the
11261133 # beginning to not lose what we previously read (header).
1134+ self ._io_buffer ._segment_consumed = False
11271135 self ._io_buffer .io_buffer .seek (0 )
11281136 except CrcException as exc :
11291137 # re-raise an exception that inherits from ConnectionException
11301138 raise CrcMismatchException (str (exc ), self .endpoint )
1139+ else :
1140+ self ._io_buffer ._segment_consumed = False
11311141
11321142 def process_io_buffer (self ):
11331143 while True :
1134- if self ._is_checksumming_enabled :
1144+ if self ._is_checksumming_enabled and self . _io_buffer . readable_io_bytes () :
11351145 self ._process_segment_buffer ()
11361146 self ._io_buffer .reset_io_buffer ()
11371147
1148+ if self ._is_checksumming_enabled and not self ._io_buffer .has_consumed_segment :
1149+ # We couldn't read an entire segment from the io buffer, so return
1150+ # control to allow more bytes to be read off the wire
1151+ return
1152+
11381153 if not self ._current_frame :
11391154 pos = self ._read_frame_header ()
11401155 else :
0 commit comments