Skip to content
This repository has been archived by the owner on Mar 24, 2021. It is now read-only.

Consumer param fetch_message_max_bytes should be a soft limit #696

Open
jeffwidman opened this issue Jul 1, 2017 · 8 comments
Open

Consumer param fetch_message_max_bytes should be a soft limit #696

jeffwidman opened this issue Jul 1, 2017 · 8 comments

Comments

@jeffwidman
Copy link
Contributor

jeffwidman commented Jul 1, 2017

We hit a production issue with a stuck pykafka-based consumer that I tracked down to a single message that was larger than the consumer's fetch_message_max_bytes, but smaller than the broker's message.max.bytes (meaning the broker didn't reject the message when it was originally produced).

In the Java reference implementation, the fetch.max.bytes param is a soft limit on both the broker and the new consumer: https://kafka.apache.org/documentation/#newconsumerconfigs

It needs to be a soft limit to avoid exactly this scenario of a single message that is smaller than the broker's message.max.bytes but larger than the consumer's fetch.max.bytes stopping the consumer.

So the broker returned the message, but then when pykafka tried to process the message it blew up here: https://github.com/Parsely/pykafka/blame/c80f66c0d0b11d830aa333ed486967b5f242bc2f/pykafka/protocol.py#L383

I think pykafka should mimic the upstream behavior of treating this as a soft limit rather than a hard limit. Perhaps emit a warning, but still process the message.

@jeffwidman
Copy link
Contributor Author

One design consideration when treating this as a soft-limit is the memory pressure on the client. I didn't dive deep enough into the code to check if the decode buffer is fixed size or dynamic. It needs to be dynamic for this to be a soft-limit.

@amontalenti
Copy link
Contributor

@jeffwidman Likewise, thanks for this -- assigning to @emmett9001.

@emmettbutler
Copy link
Contributor

Thanks @jeffwidman for the investigation, and for opening this as a separate ticket from #697.

@emmettbutler
Copy link
Contributor

Pykafka treats fetch_message_max_bytes as a hard limit specifically because the decode buffer is statically allocated by BrokerConnection on initialization. fetch_message_max_bytes is used as a safeguard against overflowing this buffer. It's a bug that this implementation leads to the problem mentioned by @jeffwidman above in which a message from the broker is larger than fetch_message_max_bytes but smaller than the broker's message.max.bytes.

If my understanding is correct, this problem only happens when fetch_message_max_bytes < message.max.bytes. If this condition doesn't hold, the bug will never appear. Thus, the workaround in current master is to ensure that fetch_message_max_bytes is always at least message.max_bytes.

There may have been a pull request at one point to dynamically allocate the decode buffer, but I can't find it. I remember we ran into some issues last time we tried to make that change.

@jeffwidman
Copy link
Contributor Author

Thus, the workaround in current master is to ensure that fetch_message_max_bytes is always at least message.max_bytes

Yes, I implemented that as a temporary workaround. But that isn't ideal in the long term in certain scenarios, such as worrying about memory usage on the client.

@jeffwidman
Copy link
Contributor Author

Update: I noticed that this behavior changed upstream as part of KIP-74 / KAFKA-2063, so likely the original implementation was actually the expected behavior at the time in the Java world.

@emmettbutler
Copy link
Contributor

@jeffwidman It's hard for me to tell from the breadcrumb trail of KIPs - does pykafka's current behavior now match the reference implementation?

@jeffwidman
Copy link
Contributor Author

Afraid not. The original issue description is still correct, my addendum comment was just that at the time of implementation pykafka matched the expected behavior but no longer does. It should now be a soft limit.

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Projects
None yet
Development

No branches or pull requests

3 participants