Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow setting the offset commit api version on fetch requests #370

Merged
merged 1 commit into from
Oct 6, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions lib/kafka_ex.ex
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,9 @@ defmodule KafkaEx do
- max_bytes: maximum bytes to include in the message set for this partition. This helps bound the size of the response. Default is 1,000,000
- auto_commit: specifies if the last offset should be commited or not. Default is true. You must set this to false when using Kafka < 0.8.2 or `:no_consumer_group`.
- api_version: Version of the Fetch API message to send (Kayrock client only, default: 0)
- offset_commit_api_version: Version of the OffsetCommit API message to send
(Kayrock client only, only relevant for auto commit, default: 0, use 2+ to
store offsets in Kafka instead of Zookeeper)

## Example

Expand Down Expand Up @@ -275,6 +278,8 @@ defmodule KafkaEx do
# compatibility with newer message formats and is ignored by the legacy
# server implementations.
api_version = Keyword.get(opts, :api_version, 0)
# same for offset_commit_api_version
offset_commit_api_version = Keyword.get(opts, :offset_commit_api_version, 0)

retrieved_offset =
current_offset(supplied_offset, partition, topic, worker_name)
Expand All @@ -290,7 +295,8 @@ defmodule KafkaEx do
wait_time: wait_time,
min_bytes: min_bytes,
max_bytes: max_bytes,
api_version: api_version
api_version: api_version,
offset_commit_api_version: offset_commit_api_version
}},
opts
)
Expand Down Expand Up @@ -552,7 +558,8 @@ defmodule KafkaEx do
wait_time: wait_time,
min_bytes: min_bytes,
max_bytes: max_bytes,
api_version: Map.fetch!(api_versions, :fetch)
api_version: Map.fetch!(api_versions, :fetch),
offset_commit_api_version: Map.fetch!(api_versions, :offset_commit)
}

%Stream{
Expand Down
27 changes: 13 additions & 14 deletions lib/kafka_ex/new/client_compatibility.ex
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ defmodule KafkaEx.New.ClientCompatibility do

alias KafkaEx.New.Client.State

alias KafkaEx.Protocol.OffsetCommit.Request, as: OffsetCommitRequest

# it's a mixin module...
# credo:disable-for-this-file Credo.Check.Refactor.LongQuoteBlocks

Expand Down Expand Up @@ -101,22 +103,19 @@ defmodule KafkaEx.New.ClientCompatibility do
if fetch_request.auto_commit do
consumer_group = state.consumer_group_for_auto_commit

commit_request = %Kayrock.OffsetCommit.V0.Request{
group_id: consumer_group,
topics: [
%{
topic: topic,
partitions: [
%{
partition: partition,
offset: last_offset,
metadata: ""
}
]
}
]
commit_request = %OffsetCommitRequest{
topic: topic,
partition: partition,
offset: last_offset,
api_version: fetch_request.offset_commit_api_version
}

{commit_request, ^consumer_group} =
Adapter.offset_commit_request(
commit_request,
consumer_group
)

{_, updated_state} =
kayrock_network_request(
commit_request,
Expand Down
7 changes: 5 additions & 2 deletions lib/kafka_ex/protocol/fetch.ex
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ defmodule KafkaEx.Protocol.Fetch do
max_bytes: nil,
auto_commit: nil,
# NOTE api_version only used in new client
api_version: 0
api_version: 0,
# NOTE offset_commit_api_version only used in new client with auto_commit
offset_commit_api_version: 0

@type t :: %Request{
correlation_id: integer,
Expand All @@ -30,7 +32,8 @@ defmodule KafkaEx.Protocol.Fetch do
wait_time: integer,
min_bytes: integer,
max_bytes: integer,
api_version: integer
api_version: integer,
offset_commit_api_version: integer
}
end

Expand Down
60 changes: 60 additions & 0 deletions test/integration/kayrock/compatibility_consumer_group_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,66 @@ defmodule KafkaEx.KayrockCompatibilityConsumerGroupTest do
assert offset_of_last_message == offset_fetch_response_offset
end

test "fetch auto_commits offset by default - specify commit request version",
%{
client: client
} do
topic = "kafka_ex_consumer_group_test"
consumer_group = "auto_commit_consumer_group_store_kafka"

KafkaExAPI.set_consumer_group_for_auto_commit(client, consumer_group)

{:ok, offset_before} = KafkaExAPI.latest_offset(client, topic, 0)

Enum.each(1..10, fn _ ->
msg = %Proto.Produce.Message{value: "hey #{inspect(:os.timestamp())}"}

KafkaEx.produce(
%Proto.Produce.Request{
topic: topic,
partition: 0,
required_acks: 1,
messages: [msg]
},
worker_name: client
)
end)

{:ok, offset_after} = KafkaExAPI.latest_offset(client, topic, 0)
assert offset_after == offset_before + 10

[logs] =
KafkaEx.fetch(
topic,
0,
offset: offset_before,
worker_name: client,
offset_commit_api_version: 3
)

[partition] = logs.partitions
message_set = partition.message_set
assert 10 == length(message_set)

last_message = List.last(message_set)
offset_of_last_message = last_message.offset

offset_request = %Proto.OffsetFetch.Request{
topic: topic,
partition: 0,
consumer_group: consumer_group,
api_version: 3
}

[offset_fetch_response] = KafkaEx.offset_fetch(client, offset_request)
[partition] = offset_fetch_response.partitions
error_code = partition.error_code
offset_fetch_response_offset = partition.offset

assert error_code == :no_error
assert offset_of_last_message == offset_fetch_response_offset
end

test "fetch starts consuming from last committed offset", %{client: client} do
random_string = TestHelper.generate_random_string()
consumer_group = "auto_commit_consumer_group"
Expand Down