Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kafka 11 - record headers support #414

Merged
merged 9 commits into from
Nov 8, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions lib/kafka_ex/new/adapter.ex
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ defmodule KafkaEx.New.Adapter do
alias Kayrock.MessageSet.Message
alias Kayrock.RecordBatch
alias Kayrock.RecordBatch.Record
alias Kayrock.RecordBatch.RecordHeader

def list_offsets_request(topic, partition, time) do
time = Offset.parse_time(time)
Expand Down Expand Up @@ -557,6 +558,7 @@ defmodule KafkaEx.New.Adapter do
crc: nil,
key: record.key,
value: record.value,
headers: build_fetch_message_headers(record.headers),
offset: record.offset,
topic: topic,
partition: partition,
Expand Down Expand Up @@ -671,6 +673,7 @@ defmodule KafkaEx.New.Adapter do
%Record{
key: msg.key,
value: msg.value,
headers: build_record_headers(msg.headers),
timestamp: minus_one_if_nil(msg.timestamp)
}
end
Expand All @@ -684,4 +687,21 @@ defmodule KafkaEx.New.Adapter do
defp millis_timestamp_now do
:os.system_time(:millisecond)
end

defp build_record_headers(nil), do: []

defp build_record_headers(headers) when is_list(headers) do
Enum.map(headers, fn header ->
{key, value} = header
%RecordHeader{key: key, value: value}
end)
end

defp build_fetch_message_headers(nil), do: []

defp build_fetch_message_headers(record_headers) do
Enum.map(record_headers, fn header ->
{header.key, header.value}
end)
end
end
2 changes: 2 additions & 0 deletions lib/kafka_ex/protocol/fetch.ex
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ defmodule KafkaEx.Protocol.Fetch do
offset: nil,
key: nil,
value: nil,
headers: nil,
topic: nil,
partition: nil,
timestamp: nil
Expand All @@ -66,6 +67,7 @@ defmodule KafkaEx.Protocol.Fetch do
offset: integer,
key: binary,
value: binary,
headers: [{key::binary, value::binary}],
topic: binary,
partition: integer,
# timestamp supported for `kafka_version: "kayrock"` ONLY
Expand Down
11 changes: 9 additions & 2 deletions lib/kafka_ex/protocol/produce.ex
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,17 @@ defmodule KafkaEx.Protocol.Produce do
- key: is used for partition assignment, can be nil, when none is provided
it is defaulted to nil
- value: is the message to be written to Kafka logs.
habutre marked this conversation as resolved.
Show resolved Hide resolved
- headers: is a list of keys and values that adds metadata to messages
- timestamp: timestamp (`kafka_version: "kayrock"` ONLY)
"""
defstruct key: nil, value: nil, timestamp: nil
@type t :: %Message{key: binary, value: binary, timestamp: integer}
defstruct key: nil, value: nil, headers: nil, timestamp: nil

@type t :: %Message{
key: binary,
value: binary,
headers: [{key::binary, value::binary}],
timestamp: integer
}
end

defmodule Response do
Expand Down
3 changes: 1 addition & 2 deletions test/integration/kayrock/compatibility_streaming_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,7 @@ defmodule KafkaEx.KayrockCompatibilityStreamingTest do
@moduletag :new_client

setup do
{:ok, pid} =
KafkaEx.start_link_worker(:no_name, server_impl: KafkaEx.New.Client)
{:ok, pid} = KafkaEx.start_link_worker(:no_name, server_impl: Client)

{:ok, %{client: pid}}
end
Expand Down
73 changes: 63 additions & 10 deletions test/integration/kayrock/record_batch_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,59 @@ defmodule KafkaEx.KayrockRecordBatchTest do
assert message.offset == offset
end

test "can specify protocol version for produce - v5 with headers", %{
client: client
} do
topic = "food"

record_headers = [
{"theHeaderKeyI", "theHeaderValueI"},
{"theHeaderKeyII", "theHeaderValueII"}
]

msg = %KafkaEx.Protocol.Produce.Message{
key: "theKey",
value: "theValue",
headers: record_headers,
timestamp: nil
}

request = %KafkaEx.Protocol.Produce.Request{
topic: topic,
partition: 0,
required_acks: 1,
timeout: 100,
compression: :none,
messages: [msg],
api_version: 3
}

{:ok, offset} =
KafkaEx.produce(
request,
worker_name: client,
required_acks: 1,
api_version: 3
)

fetch_responses =
KafkaEx.fetch(topic, 0,
offset: offset,
auto_commit: false,
worker_name: client,
api_version: 5
)

[fetch_response | _] = fetch_responses
[partition_response | _] = fetch_response.partitions
message = List.last(partition_response.message_set)

assert message.key == "theKey"
assert message.value == "theValue"
assert message.headers == record_headers
assert message.offset == offset
end

test "gzip compression - produce v0, fetch v3", %{client: client} do
topic = "food"
msg = TestHelper.generate_random_string()
Expand All @@ -203,7 +256,7 @@ defmodule KafkaEx.KayrockRecordBatchTest do

fetch_responses =
KafkaEx.fetch(topic, 0,
offset: max(offset - 2, 0),
offset: offset,
auto_commit: false,
worker_name: client,
api_version: 3
Expand Down Expand Up @@ -234,7 +287,7 @@ defmodule KafkaEx.KayrockRecordBatchTest do

fetch_responses =
KafkaEx.fetch(topic, 0,
offset: max(offset - 2, 0),
offset: offset,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't remember exactly why this was set this way - probably some kind of bizarre test behavior. Did you have a compelling reason to change it?

Copy link
Contributor Author

@habutre habutre Nov 6, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At first I tried to understand the reason to get a previous offset instead the one returned by the producer, but since it always worked before I kept is AS-IS. After I have introduced the headers the kayrock tests start failing randomly but always with the same reason nil message.

I watch the topic with Kafka tool so I figure out that the offset - 2 doesn't match with the expected message then I tried to remove the max(offset - 2, 0) and got success on every execution what make me think that was the reason and also no other test failed.

I don't have a rational about that behavior only that on my mind the offset should not be manipulated to get the desired message, it was just a recursively try-n-check.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dantswain in case you're interested, here is the commit where you added this max(offset - 2, 0): 7197a90

I didn't see anything in particular to help me understand. IMO the new changed code makes more sense 🤷

auto_commit: false,
worker_name: client,
api_version: 5
Expand Down Expand Up @@ -265,7 +318,7 @@ defmodule KafkaEx.KayrockRecordBatchTest do

fetch_responses =
KafkaEx.fetch(topic, 0,
offset: max(offset - 2, 0),
offset: offset,
auto_commit: false,
worker_name: client,
api_version: 0
Expand Down Expand Up @@ -296,7 +349,7 @@ defmodule KafkaEx.KayrockRecordBatchTest do

fetch_responses =
KafkaEx.fetch(topic, 0,
offset: max(offset - 2, 0),
offset: offset,
auto_commit: false,
worker_name: client,
api_version: 0
Expand Down Expand Up @@ -327,7 +380,7 @@ defmodule KafkaEx.KayrockRecordBatchTest do

fetch_responses =
KafkaEx.fetch(topic, 0,
offset: max(offset - 2, 0),
offset: offset,
auto_commit: false,
worker_name: client,
api_version: 0
Expand Down Expand Up @@ -358,7 +411,7 @@ defmodule KafkaEx.KayrockRecordBatchTest do

fetch_responses =
KafkaEx.fetch(topic, 0,
offset: max(offset - 2, 0),
offset: offset,
auto_commit: false,
worker_name: client,
api_version: 3
Expand Down Expand Up @@ -389,7 +442,7 @@ defmodule KafkaEx.KayrockRecordBatchTest do

fetch_responses =
KafkaEx.fetch(topic, 0,
offset: max(offset - 2, 0),
offset: offset,
auto_commit: false,
worker_name: client,
api_version: 5
Expand Down Expand Up @@ -420,7 +473,7 @@ defmodule KafkaEx.KayrockRecordBatchTest do

fetch_responses =
KafkaEx.fetch(topic, 0,
offset: max(offset - 2, 0),
offset: offset,
auto_commit: false,
worker_name: client,
api_version: 0
Expand Down Expand Up @@ -451,7 +504,7 @@ defmodule KafkaEx.KayrockRecordBatchTest do

fetch_responses =
KafkaEx.fetch(topic, 0,
offset: max(offset - 2, 0),
offset: offset,
auto_commit: false,
worker_name: client,
api_version: 0
Expand Down Expand Up @@ -482,7 +535,7 @@ defmodule KafkaEx.KayrockRecordBatchTest do

fetch_responses =
KafkaEx.fetch(topic, 0,
offset: max(offset - 2, 0),
offset: offset,
auto_commit: false,
worker_name: client,
api_version: 0
Expand Down
58 changes: 58 additions & 0 deletions test/integration/new_client_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ defmodule KafkaEx.New.Client.Test do
alias KafkaEx.New.NodeSelector

alias Kayrock.RecordBatch
alias Kayrock.RecordBatch.Record
alias Kayrock.RecordBatch.RecordHeader

@moduletag :new_client

Expand Down Expand Up @@ -102,6 +104,62 @@ defmodule KafkaEx.New.Client.Test do
assert offset_after == offset_before + 3
end

test "produce with record headers (new message format)", %{client: client} do
topic = "test0p8p0"
partition = 1

{:ok, offset_before} = KafkaExAPI.latest_offset(client, topic, partition)

headers = [
%RecordHeader{key: "source", value: "System-X"},
%RecordHeader{key: "type", value: "HeaderCreatedEvent"}
]

records = [
%Record{
headers: headers,
key: "key-0001",
value: "msg value for key 0001"
}
]

record_batch = %RecordBatch{
attributes: 0,
records: records
}

request = %Kayrock.Produce.V1.Request{
acks: -1,
timeout: 1000,
topic_data: [
%{
topic: topic,
data: [
%{partition: partition, record_set: record_batch}
]
}
]
}

{:ok, response} =
Client.send_request(
client,
request,
NodeSelector.topic_partition(topic, partition)
)

%Kayrock.Produce.V1.Response{responses: [topic_response]} = response
assert topic_response.topic == topic

[%{partition: ^partition, error_code: error_code}] =
topic_response.partition_responses

assert error_code == 0

{:ok, offset_after} = KafkaExAPI.latest_offset(client, topic, partition)
assert offset_after == offset_before + 1
end

test "client can receive {:ssl_closed, _}", %{client: client} do
send(client, {:ssl_closed, :unused})

Expand Down