diff --git a/pykafka/protocol.py b/pykafka/protocol.py index ad46035dd..54f9e058e 100644 --- a/pykafka/protocol.py +++ b/pykafka/protocol.py @@ -61,7 +61,7 @@ from zlib import crc32 from .common import CompressionType, Message -from .exceptions import ERROR_CODES, NoMessagesConsumedError +from .exceptions import ERROR_CODES, MessageSizeTooLarge from .utils import Serializable, compression, struct_helpers from .utils.compat import iteritems, itervalues, buffer @@ -308,11 +308,13 @@ def decode(cls, buff, partition_id=-1): offset = 0 attempted = False while offset < len(buff): + # if the buffer is not large enough to contain the offset information if len(buff) - offset < 12: break msg_offset, size = struct.unpack_from('!qi', buff, offset) offset += 12 attempted = True + # if the buffer is not large enough to contain the full message if len(buff) - offset < size: break # TODO: Check we have all the requisite bytes @@ -323,7 +325,7 @@ def decode(cls, buff, partition_id=-1): messages.append(message) offset += size if len(messages) == 0 and attempted: - raise NoMessagesConsumedError() + raise MessageSizeTooLarge(size) return MessageSet(messages=messages) def pack_into(self, buff, offset):