Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds a clause to match when a partial messageSet is received. #268

Merged
merged 3 commits into from
Jan 3, 2018
Merged

Adds a clause to match when a partial messageSet is received. #268

merged 3 commits into from
Jan 3, 2018

Conversation

eloraburns
Copy link
Contributor

@eloraburns eloraburns commented Dec 28, 2017

We had a recent production issue where our consumer was unable to progress. It turns out that our Kafka broker was sending the client a Message Set of around 1.7MB, which is incompatible with a max_bytes of 61000. We use a small max_bytes as we currently run all 64 partition consumers on a single host and don't want to OOM if it falls a bit behind. Our message producer is limited to messages <61000 bytes, so we didn't expect KafkaEx to have a problem.

This PR doesn't fix the problem, but it would have greatly reduced the amount of running around and not knowing what was going on (FunctionClauseError really doesn't tell you what is going on). As a stopgap for providing more elastic message sizes, this new error message should at least give users the help they need to proceed.

Here's an example stack trace from patching this locally:

** (exit) exited in: GenServer.call(:pr, {:fetch, "ile_group_requests", 52, 128110540, 10, 1, 60000, false}, 5000)
    ** (EXIT) an exception was raised:
        ** (RuntimeError) Too little data at offset 128110540. Message size 1712101 but only got another 59988 bytes!  Try increasing max_bytes to at least 1712113.
            (kafka_ex) lib/kafka_ex/protocol/fetch.ex:46: KafkaEx.Protocol.Fetch.parse_message_set/2
            (kafka_ex) lib/kafka_ex/protocol/fetch.ex:31: KafkaEx.Protocol.Fetch.parse_partitions/3
            (kafka_ex) lib/kafka_ex/protocol/fetch.ex:24: KafkaEx.Protocol.Fetch.parse_topics/2
            (kafka_ex) lib/kafka_ex/server.ex:324: KafkaEx.Server.fetch/8
            (kafka_ex) lib/kafka_ex/server.ex:92: KafkaEx.Server.handle_call/3
            (stdlib) gen_server.erl:615: :gen_server.try_handle_call/4
            (stdlib) gen_server.erl:647: :gen_server.handle_msg/5
            (stdlib) proc_lib.erl:247: :proc_lib.init_p_do_apply/3
    (elixir) lib/gen_server.ex:604: GenServer.call/3

Let me know what else I can do to help!

@sourcelevel-bot
Copy link

Hello, @taavi! This is your first Pull Request that will be reviewed by Ebert, an automatic Code Review service. It will leave comments on this diff with potential issues and style violations found in the code as you push new commits. You can also see all the issues found on this Pull Request on its review page. Please check our documentation for more information.

@jdcumpson
Copy link

jdcumpson commented Dec 28, 2017

Being apart of the referenced incident, I can say that these logs certainly would have helped clarify that if the logs were actually telling me that we couldn't actually parse the Kafka message action could have been taken sooner. I would love to see it go in.

Aside from that I had talked to @taavi briefly about perhaps making the function call do

defp parse_message_set(list, << offset :: 64, msg_size :: 32, msg_data :: size(msg_size)-binary, rest :: binary >>) do
     cond  do
       byte_size(rest) < msg_size -> 
         {:error, "Too little data fetched at offset #{offset}. " # etc..
       true ->
         {:ok, message} = parse_message(%Message{offset: offset}, msg_data)
         parse_message_set(append_messages(message,  list), rest)
     end
   end

Since I think it follows the record return style with {:error, reason} but I don't know if there is preference or a good reason for either in this case.

@eloraburns
Copy link
Contributor Author

Slight clarification: we'd still need the separate function head (because msg_data :: size(msg_size)-binary doesn't match when we don't have the full message) but it's trivial to have that other function head return {:error, "Too little data…"}. What should it return if that wasn't the case though (but somehow the other function heads still didn't match)? Another {:error, reason} or maybe it should raise FunctionClauseError at that point. :)

@bjhaid
Copy link
Member

bjhaid commented Dec 29, 2017

After reading the code again returning anything other than {ok, ...} will result in a match error elsewhere which will be confusing to the end user, so I think the exception as it is, is okay. This LGTM, if it's okay by @dantswain we can merge

@dantswain
Copy link
Collaborator

I had meant to check this but I think you can do a function clause for the partial message case.

Copy link
Collaborator

@dantswain dantswain left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should also create an issue to track the underlying bug.

@@ -73,6 +73,13 @@ defmodule KafkaEx.Protocol.Fetch do
defp parse_message_set([last|_] = list, _) do
{:ok, Enum.reverse(list), last.offset}
end
defp parse_message_set(_, << offset :: 64, msg_size :: 32, rest :: binary >>) do
Copy link
Collaborator

@dantswain dantswain Dec 29, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than have a fall-through try to replicate the original clause error, how about having a clause specifically for what we're trying to catch?

  defp parse_message_set(_, << offset :: 64, msg_size :: 32, rest :: binary >>) when byte_size(rest) < msg_size do
    raise RuntimeError, "Too little data fetched at offset #{offset}. Message size #{msg_size} but only got another #{byte_size rest} bytes! Try increasing max_bytes to at least #{msg_size + 12}."
  end

Also can I request we clean up the error message a little? How about "Insufficient data fetched at offset #{offset}. Message size is #{msg_size} but we only recieved #{byte_syze(rest)} bytes. Increase the value of max_bytes." (Unless you had a specific reason to suggest increasing it by 12 bytes?)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The 12 byte increase takes the offset and msg_size into account. When I tried consuming with a max_bytes of 60000, there were 59988 bytes of message received. Adding 12 to the msg_size should allow the full message plus offset/size header to be received. Might be too specific, as it's unlikely that that particular size is going to prevent future errors (it really depends on the broker's max setting, as that's the size that will be rejected on publishing).

TIL you can use integers matched out of a binary in a guard clause!
@dantswain
Copy link
Collaborator

Hey @taavi looks like just a typo in your function clause. I can merge this if we get the build & tests passing.

@eloraburns
Copy link
Contributor Author

Wow, that was pretty terrible. That's what I get for trying to fix something right before going on vacation. :)

@dantswain
Copy link
Collaborator

hehe no worries. I'll get it merged.

@dantswain dantswain merged commit b0d472b into kafkaex:master Jan 3, 2018
@eloraburns eloraburns deleted the helpful-error-when-max-bytes-too-small branch January 3, 2018 20:42
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants