diff --git a/lib/kafka_ex/new/adapter.ex b/lib/kafka_ex/new/adapter.ex index 9646ea89..2afa7dc7 100644 --- a/lib/kafka_ex/new/adapter.ex +++ b/lib/kafka_ex/new/adapter.ex @@ -36,6 +36,7 @@ defmodule KafkaEx.New.Adapter do alias Kayrock.MessageSet.Message alias Kayrock.RecordBatch alias Kayrock.RecordBatch.Record + alias Kayrock.RecordBatch.RecordHeader def list_offsets_request(topic, partition, time) do time = Offset.parse_time(time) @@ -557,6 +558,7 @@ defmodule KafkaEx.New.Adapter do crc: nil, key: record.key, value: record.value, + headers: build_fetch_message_headers(record.headers), offset: record.offset, topic: topic, partition: partition, @@ -671,6 +673,7 @@ defmodule KafkaEx.New.Adapter do %Record{ key: msg.key, value: msg.value, + headers: build_record_headers(msg.headers), timestamp: minus_one_if_nil(msg.timestamp) } end @@ -684,4 +687,21 @@ defmodule KafkaEx.New.Adapter do defp millis_timestamp_now do :os.system_time(:millisecond) end + + defp build_record_headers(nil), do: [] + + defp build_record_headers(headers) when is_list(headers) do + Enum.map(headers, fn header -> + {key, value} = header + %RecordHeader{key: key, value: value} + end) + end + + defp build_fetch_message_headers(nil), do: [] + + defp build_fetch_message_headers(record_headers) do + Enum.map(record_headers, fn header -> + {header.key, header.value} + end) + end end diff --git a/lib/kafka_ex/protocol/fetch.ex b/lib/kafka_ex/protocol/fetch.ex index 8a704746..27e19460 100644 --- a/lib/kafka_ex/protocol/fetch.ex +++ b/lib/kafka_ex/protocol/fetch.ex @@ -56,6 +56,7 @@ defmodule KafkaEx.Protocol.Fetch do offset: nil, key: nil, value: nil, + headers: nil, topic: nil, partition: nil, timestamp: nil @@ -66,6 +67,7 @@ defmodule KafkaEx.Protocol.Fetch do offset: integer, key: binary, value: binary, + headers: [{key::binary, value::binary}], topic: binary, partition: integer, # timestamp supported for `kafka_version: "kayrock"` ONLY diff --git a/lib/kafka_ex/protocol/produce.ex b/lib/kafka_ex/protocol/produce.ex index b57df7cb..92203469 100644 --- a/lib/kafka_ex/protocol/produce.ex +++ b/lib/kafka_ex/protocol/produce.ex @@ -51,10 +51,17 @@ defmodule KafkaEx.Protocol.Produce do - key: is used for partition assignment, can be nil, when none is provided it is defaulted to nil - value: is the message to be written to Kafka logs. + - headers: is a list of keys and values that adds metadata to messages - timestamp: timestamp (`kafka_version: "kayrock"` ONLY) """ - defstruct key: nil, value: nil, timestamp: nil - @type t :: %Message{key: binary, value: binary, timestamp: integer} + defstruct key: nil, value: nil, headers: nil, timestamp: nil + + @type t :: %Message{ + key: binary, + value: binary, + headers: [{key::binary, value::binary}], + timestamp: integer + } end defmodule Response do diff --git a/test/integration/kayrock/compatibility_streaming_test.exs b/test/integration/kayrock/compatibility_streaming_test.exs index cd205a51..0cddd4ea 100644 --- a/test/integration/kayrock/compatibility_streaming_test.exs +++ b/test/integration/kayrock/compatibility_streaming_test.exs @@ -12,8 +12,7 @@ defmodule KafkaEx.KayrockCompatibilityStreamingTest do @moduletag :new_client setup do - {:ok, pid} = - KafkaEx.start_link_worker(:no_name, server_impl: KafkaEx.New.Client) + {:ok, pid} = KafkaEx.start_link_worker(:no_name, server_impl: Client) {:ok, %{client: pid}} end diff --git a/test/integration/kayrock/record_batch_test.exs b/test/integration/kayrock/record_batch_test.exs index 5ab15bcc..bd7cc722 100644 --- a/test/integration/kayrock/record_batch_test.exs +++ b/test/integration/kayrock/record_batch_test.exs @@ -186,6 +186,59 @@ defmodule KafkaEx.KayrockRecordBatchTest do assert message.offset == offset end + test "can specify protocol version for produce - v5 with headers", %{ + client: client + } do + topic = "food" + + record_headers = [ + {"theHeaderKeyI", "theHeaderValueI"}, + {"theHeaderKeyII", "theHeaderValueII"} + ] + + msg = %KafkaEx.Protocol.Produce.Message{ + key: "theKey", + value: "theValue", + headers: record_headers, + timestamp: nil + } + + request = %KafkaEx.Protocol.Produce.Request{ + topic: topic, + partition: 0, + required_acks: 1, + timeout: 100, + compression: :none, + messages: [msg], + api_version: 3 + } + + {:ok, offset} = + KafkaEx.produce( + request, + worker_name: client, + required_acks: 1, + api_version: 3 + ) + + fetch_responses = + KafkaEx.fetch(topic, 0, + offset: offset, + auto_commit: false, + worker_name: client, + api_version: 5 + ) + + [fetch_response | _] = fetch_responses + [partition_response | _] = fetch_response.partitions + message = List.last(partition_response.message_set) + + assert message.key == "theKey" + assert message.value == "theValue" + assert message.headers == record_headers + assert message.offset == offset + end + test "gzip compression - produce v0, fetch v3", %{client: client} do topic = "food" msg = TestHelper.generate_random_string() @@ -203,7 +256,7 @@ defmodule KafkaEx.KayrockRecordBatchTest do fetch_responses = KafkaEx.fetch(topic, 0, - offset: max(offset - 2, 0), + offset: offset, auto_commit: false, worker_name: client, api_version: 3 @@ -234,7 +287,7 @@ defmodule KafkaEx.KayrockRecordBatchTest do fetch_responses = KafkaEx.fetch(topic, 0, - offset: max(offset - 2, 0), + offset: offset, auto_commit: false, worker_name: client, api_version: 5 @@ -265,7 +318,7 @@ defmodule KafkaEx.KayrockRecordBatchTest do fetch_responses = KafkaEx.fetch(topic, 0, - offset: max(offset - 2, 0), + offset: offset, auto_commit: false, worker_name: client, api_version: 0 @@ -296,7 +349,7 @@ defmodule KafkaEx.KayrockRecordBatchTest do fetch_responses = KafkaEx.fetch(topic, 0, - offset: max(offset - 2, 0), + offset: offset, auto_commit: false, worker_name: client, api_version: 0 @@ -327,7 +380,7 @@ defmodule KafkaEx.KayrockRecordBatchTest do fetch_responses = KafkaEx.fetch(topic, 0, - offset: max(offset - 2, 0), + offset: offset, auto_commit: false, worker_name: client, api_version: 0 @@ -358,7 +411,7 @@ defmodule KafkaEx.KayrockRecordBatchTest do fetch_responses = KafkaEx.fetch(topic, 0, - offset: max(offset - 2, 0), + offset: offset, auto_commit: false, worker_name: client, api_version: 3 @@ -389,7 +442,7 @@ defmodule KafkaEx.KayrockRecordBatchTest do fetch_responses = KafkaEx.fetch(topic, 0, - offset: max(offset - 2, 0), + offset: offset, auto_commit: false, worker_name: client, api_version: 5 @@ -420,7 +473,7 @@ defmodule KafkaEx.KayrockRecordBatchTest do fetch_responses = KafkaEx.fetch(topic, 0, - offset: max(offset - 2, 0), + offset: offset, auto_commit: false, worker_name: client, api_version: 0 @@ -451,7 +504,7 @@ defmodule KafkaEx.KayrockRecordBatchTest do fetch_responses = KafkaEx.fetch(topic, 0, - offset: max(offset - 2, 0), + offset: offset, auto_commit: false, worker_name: client, api_version: 0 @@ -482,7 +535,7 @@ defmodule KafkaEx.KayrockRecordBatchTest do fetch_responses = KafkaEx.fetch(topic, 0, - offset: max(offset - 2, 0), + offset: offset, auto_commit: false, worker_name: client, api_version: 0 diff --git a/test/integration/new_client_test.exs b/test/integration/new_client_test.exs index 513b515c..009b594c 100644 --- a/test/integration/new_client_test.exs +++ b/test/integration/new_client_test.exs @@ -9,6 +9,8 @@ defmodule KafkaEx.New.Client.Test do alias KafkaEx.New.NodeSelector alias Kayrock.RecordBatch + alias Kayrock.RecordBatch.Record + alias Kayrock.RecordBatch.RecordHeader @moduletag :new_client @@ -102,6 +104,62 @@ defmodule KafkaEx.New.Client.Test do assert offset_after == offset_before + 3 end + test "produce with record headers (new message format)", %{client: client} do + topic = "test0p8p0" + partition = 1 + + {:ok, offset_before} = KafkaExAPI.latest_offset(client, topic, partition) + + headers = [ + %RecordHeader{key: "source", value: "System-X"}, + %RecordHeader{key: "type", value: "HeaderCreatedEvent"} + ] + + records = [ + %Record{ + headers: headers, + key: "key-0001", + value: "msg value for key 0001" + } + ] + + record_batch = %RecordBatch{ + attributes: 0, + records: records + } + + request = %Kayrock.Produce.V1.Request{ + acks: -1, + timeout: 1000, + topic_data: [ + %{ + topic: topic, + data: [ + %{partition: partition, record_set: record_batch} + ] + } + ] + } + + {:ok, response} = + Client.send_request( + client, + request, + NodeSelector.topic_partition(topic, partition) + ) + + %Kayrock.Produce.V1.Response{responses: [topic_response]} = response + assert topic_response.topic == topic + + [%{partition: ^partition, error_code: error_code}] = + topic_response.partition_responses + + assert error_code == 0 + + {:ok, offset_after} = KafkaExAPI.latest_offset(client, topic, partition) + assert offset_after == offset_before + 1 + end + test "client can receive {:ssl_closed, _}", %{client: client} do send(client, {:ssl_closed, :unused})