From 9571a2cc0a71726ee86a17cc24cfbff01935241e Mon Sep 17 00:00:00 2001 From: Dan Swain Date: Sun, 24 Nov 2019 09:49:32 -0500 Subject: [PATCH] Fix kayrock auto commit behavior on empty results Was trying to auto commit when the message set was empty, which would result in the 'last offset' being nil. --- lib/kafka_ex/new/client_compatibility.ex | 2 +- .../compatibility_consumer_group_test.exs | 23 +++++++++++++++++++ 2 files changed, 24 insertions(+), 1 deletion(-) diff --git a/lib/kafka_ex/new/client_compatibility.ex b/lib/kafka_ex/new/client_compatibility.ex index 1ec3abc1..2027b61f 100644 --- a/lib/kafka_ex/new/client_compatibility.ex +++ b/lib/kafka_ex/new/client_compatibility.ex @@ -104,7 +104,7 @@ defmodule KafkaEx.New.ClientCompatibility do {adapted_resp, last_offset} = Adapter.fetch_response(resp) state_out = - if fetch_request.auto_commit do + if last_offset && fetch_request.auto_commit do consumer_group = state.consumer_group_for_auto_commit commit_request = %OffsetCommitRequest{ diff --git a/test/integration/kayrock/compatibility_consumer_group_test.exs b/test/integration/kayrock/compatibility_consumer_group_test.exs index e9a32f83..81c46956 100644 --- a/test/integration/kayrock/compatibility_consumer_group_test.exs +++ b/test/integration/kayrock/compatibility_consumer_group_test.exs @@ -28,6 +28,29 @@ defmodule KafkaEx.KayrockCompatibilityConsumerGroupTest do KafkaEx.consumer_group(client) end + test "fetch with auto_commit doesn't blow up on no messages", %{ + client: client + } do + topic = TestHelper.generate_random_string() + consumer_group = "auto_commit_consumer_group" + + KafkaExAPI.set_consumer_group_for_auto_commit(client, consumer_group) + + {:ok, offset_before} = KafkaExAPI.latest_offset(client, topic, 0) + assert offset_before == 0 + + [logs] = + KafkaEx.fetch( + topic, + 0, + offset: offset_before, + worker_name: client + ) + + [partition] = logs.partitions + assert partition.message_set == [] + end + test "fetch auto_commits offset by default", %{client: client} do topic = "kafka_ex_consumer_group_test" consumer_group = "auto_commit_consumer_group"