Skip to content
This repository has been archived by the owner on Mar 24, 2021. It is now read-only.

Commit

Permalink
raise proper exception type when message is too large to fetch. fixes #…
Browse files Browse the repository at this point in the history
  • Loading branch information
emmettbutler committed Mar 11, 2016
1 parent 59c94b5 commit 1fcf609
Showing 1 changed file with 4 additions and 2 deletions.
6 changes: 4 additions & 2 deletions pykafka/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@
from zlib import crc32

from .common import CompressionType, Message
from .exceptions import ERROR_CODES, NoMessagesConsumedError
from .exceptions import ERROR_CODES, MessageSizeTooLarge
from .utils import Serializable, compression, struct_helpers
from .utils.compat import iteritems, itervalues, buffer

Expand Down Expand Up @@ -308,11 +308,13 @@ def decode(cls, buff, partition_id=-1):
offset = 0
attempted = False
while offset < len(buff):
# if the buffer is not large enough to contain the offset information
if len(buff) - offset < 12:
break
msg_offset, size = struct.unpack_from('!qi', buff, offset)
offset += 12
attempted = True
# if the buffer is not large enough to contain the full message
if len(buff) - offset < size:
break
# TODO: Check we have all the requisite bytes
Expand All @@ -323,7 +325,7 @@ def decode(cls, buff, partition_id=-1):
messages.append(message)
offset += size
if len(messages) == 0 and attempted:
raise NoMessagesConsumedError()
raise MessageSizeTooLarge(size)
return MessageSet(messages=messages)

def pack_into(self, buff, offset):
Expand Down

0 comments on commit 1fcf609

Please sign in to comment.