From 1e7590101f525031a06b7c0ba17b9ade6034f2bd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rog=C3=A9rio=20Ramos?= Date: Wed, 22 Jul 2020 02:38:24 +0200 Subject: [PATCH 1/9] Make use of defined alias --- test/integration/kayrock/compatibility_streaming_test.exs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/integration/kayrock/compatibility_streaming_test.exs b/test/integration/kayrock/compatibility_streaming_test.exs index cd205a51..92613076 100644 --- a/test/integration/kayrock/compatibility_streaming_test.exs +++ b/test/integration/kayrock/compatibility_streaming_test.exs @@ -13,7 +13,7 @@ defmodule KafkaEx.KayrockCompatibilityStreamingTest do setup do {:ok, pid} = - KafkaEx.start_link_worker(:no_name, server_impl: KafkaEx.New.Client) + KafkaEx.start_link_worker(:no_name, server_impl: Client) {:ok, %{client: pid}} end From d641483d46855b02306d18a1513e8079b7f1d71a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rog=C3=A9rio=20Ramos?= Date: Wed, 22 Jul 2020 02:40:02 +0200 Subject: [PATCH 2/9] Add test case for record headers on new client --- test/integration/new_client_test.exs | 52 ++++++++++++++++++++++++++++ 1 file changed, 52 insertions(+) diff --git a/test/integration/new_client_test.exs b/test/integration/new_client_test.exs index 513b515c..d136a4c1 100644 --- a/test/integration/new_client_test.exs +++ b/test/integration/new_client_test.exs @@ -9,6 +9,8 @@ defmodule KafkaEx.New.Client.Test do alias KafkaEx.New.NodeSelector alias Kayrock.RecordBatch + alias Kayrock.RecordBatch.Record + alias Kayrock.RecordBatch.RecordHeader @moduletag :new_client @@ -102,6 +104,56 @@ defmodule KafkaEx.New.Client.Test do assert offset_after == offset_before + 3 end + test "produce with record headers (new message format)", %{client: client} do + topic = "test0p8p0" + partition = 1 + + {:ok, offset_before} = KafkaExAPI.latest_offset(client, topic, partition) + + headers = [ + %RecordHeader{key: "source", value: "System-X"}, + %RecordHeader{key: "type", value: "HeaderCreatedEvent"} + ] + + records = [%Record{headers: headers, key: "key-0001", value: "msg value for key 0001"}] + + record_batch = %RecordBatch{ + attributes: 0, + records: records + } + + request = %Kayrock.Produce.V1.Request{ + acks: -1, + timeout: 1000, + topic_data: [ + %{ + topic: topic, + data: [ + %{partition: partition, record_set: record_batch} + ] + } + ] + } + + {:ok, response} = + Client.send_request( + client, + request, + NodeSelector.topic_partition(topic, partition) + ) + + %Kayrock.Produce.V1.Response{responses: [topic_response]} = response + assert topic_response.topic == topic + + [%{partition: ^partition, error_code: error_code}] = + topic_response.partition_responses + + assert error_code == 0 + + {:ok, offset_after} = KafkaExAPI.latest_offset(client, topic, partition) + assert offset_after == offset_before + 1 + end + test "client can receive {:ssl_closed, _}", %{client: client} do send(client, {:ssl_closed, :unused}) From e725d88ea0512319d8f03c81c5ad402d61c8f894 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rog=C3=A9rio=20Ramos?= Date: Thu, 13 Aug 2020 22:17:52 +0200 Subject: [PATCH 3/9] Add header key on produce and fetch message protocol To maintain compability while using the new client the message protocol had to be changed. Only the new client is aware about such info on messages --- lib/kafka_ex/new/adapter.ex | 20 ++++++++ lib/kafka_ex/protocol/fetch.ex | 2 + lib/kafka_ex/protocol/produce.ex | 4 +- .../integration/kayrock/record_batch_test.exs | 50 +++++++++++++++++++ test/integration/new_client_test.exs | 8 ++- 5 files changed, 81 insertions(+), 3 deletions(-) diff --git a/lib/kafka_ex/new/adapter.ex b/lib/kafka_ex/new/adapter.ex index 9646ea89..2afa7dc7 100644 --- a/lib/kafka_ex/new/adapter.ex +++ b/lib/kafka_ex/new/adapter.ex @@ -36,6 +36,7 @@ defmodule KafkaEx.New.Adapter do alias Kayrock.MessageSet.Message alias Kayrock.RecordBatch alias Kayrock.RecordBatch.Record + alias Kayrock.RecordBatch.RecordHeader def list_offsets_request(topic, partition, time) do time = Offset.parse_time(time) @@ -557,6 +558,7 @@ defmodule KafkaEx.New.Adapter do crc: nil, key: record.key, value: record.value, + headers: build_fetch_message_headers(record.headers), offset: record.offset, topic: topic, partition: partition, @@ -671,6 +673,7 @@ defmodule KafkaEx.New.Adapter do %Record{ key: msg.key, value: msg.value, + headers: build_record_headers(msg.headers), timestamp: minus_one_if_nil(msg.timestamp) } end @@ -684,4 +687,21 @@ defmodule KafkaEx.New.Adapter do defp millis_timestamp_now do :os.system_time(:millisecond) end + + defp build_record_headers(nil), do: [] + + defp build_record_headers(headers) when is_list(headers) do + Enum.map(headers, fn header -> + {key, value} = header + %RecordHeader{key: key, value: value} + end) + end + + defp build_fetch_message_headers(nil), do: [] + + defp build_fetch_message_headers(record_headers) do + Enum.map(record_headers, fn header -> + {header.key, header.value} + end) + end end diff --git a/lib/kafka_ex/protocol/fetch.ex b/lib/kafka_ex/protocol/fetch.ex index 8a704746..a4a62977 100644 --- a/lib/kafka_ex/protocol/fetch.ex +++ b/lib/kafka_ex/protocol/fetch.ex @@ -56,6 +56,7 @@ defmodule KafkaEx.Protocol.Fetch do offset: nil, key: nil, value: nil, + headers: nil, topic: nil, partition: nil, timestamp: nil @@ -66,6 +67,7 @@ defmodule KafkaEx.Protocol.Fetch do offset: integer, key: binary, value: binary, + headers: list(), topic: binary, partition: integer, # timestamp supported for `kafka_version: "kayrock"` ONLY diff --git a/lib/kafka_ex/protocol/produce.ex b/lib/kafka_ex/protocol/produce.ex index b57df7cb..295a083c 100644 --- a/lib/kafka_ex/protocol/produce.ex +++ b/lib/kafka_ex/protocol/produce.ex @@ -53,8 +53,8 @@ defmodule KafkaEx.Protocol.Produce do - value: is the message to be written to Kafka logs. - timestamp: timestamp (`kafka_version: "kayrock"` ONLY) """ - defstruct key: nil, value: nil, timestamp: nil - @type t :: %Message{key: binary, value: binary, timestamp: integer} + defstruct key: nil, value: nil, headers: nil, timestamp: nil + @type t :: %Message{ key: binary, value: binary, headers: list(), timestamp: integer } end defmodule Response do diff --git a/test/integration/kayrock/record_batch_test.exs b/test/integration/kayrock/record_batch_test.exs index 5ab15bcc..e87d8fca 100644 --- a/test/integration/kayrock/record_batch_test.exs +++ b/test/integration/kayrock/record_batch_test.exs @@ -186,6 +186,56 @@ defmodule KafkaEx.KayrockRecordBatchTest do assert message.offset == offset end + test "can specify protocol version for produce - v5 with headers", %{ + client: client + } do + topic = "food" + + record_headers = [ + {"theHeaderKeyI", "theHeaderValueI"}, + {"theHeaderKeyII", "theHeaderValueII"} + ] + + msg = %KafkaEx.Protocol.Produce.Message{ + key: "theKey", + value: "theValue", + headers: record_headers + } + + request = %KafkaEx.Protocol.Produce.Request{ + topic: topic, + partition: 0, + required_acks: 1, + api_version: 3, + messages: [msg] + } + + {:ok, offset} = + KafkaEx.produce( + request, + worker_name: client, + required_acks: 1, + api_version: 3 + ) + + fetch_responses = + KafkaEx.fetch(topic, 0, + offset: offset, + auto_commit: false, + worker_name: client, + api_version: 5 + ) + + [fetch_response | _] = fetch_responses + [partition_response | _] = fetch_response.partitions + message = List.last(partition_response.message_set) + + assert message.key == "theKey" + assert message.value == "theValue" + assert message.headers == record_headers + assert message.offset == offset + end + test "gzip compression - produce v0, fetch v3", %{client: client} do topic = "food" msg = TestHelper.generate_random_string() diff --git a/test/integration/new_client_test.exs b/test/integration/new_client_test.exs index d136a4c1..009b594c 100644 --- a/test/integration/new_client_test.exs +++ b/test/integration/new_client_test.exs @@ -115,7 +115,13 @@ defmodule KafkaEx.New.Client.Test do %RecordHeader{key: "type", value: "HeaderCreatedEvent"} ] - records = [%Record{headers: headers, key: "key-0001", value: "msg value for key 0001"}] + records = [ + %Record{ + headers: headers, + key: "key-0001", + value: "msg value for key 0001" + } + ] record_batch = %RecordBatch{ attributes: 0, From 7fcc764aee1f6fea1ea17559efbaf0502e35af6d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rog=C3=A9rio=20Ramos?= Date: Thu, 13 Aug 2020 22:49:57 +0200 Subject: [PATCH 4/9] Format code according with .formatter.exs config file As usual I ran the command mix format that added changes to all these files. I don't know if run the `mix format` is a desired or common practice on _KafkaEx_ contribution, so I left it in a separated commit that can be reverted without big efforts --- lib/kafka_ex/protocol/produce.ex | 8 +++++++- test/integration/kayrock/compatibility_streaming_test.exs | 3 +-- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/lib/kafka_ex/protocol/produce.ex b/lib/kafka_ex/protocol/produce.ex index 295a083c..4980e14c 100644 --- a/lib/kafka_ex/protocol/produce.ex +++ b/lib/kafka_ex/protocol/produce.ex @@ -54,7 +54,13 @@ defmodule KafkaEx.Protocol.Produce do - timestamp: timestamp (`kafka_version: "kayrock"` ONLY) """ defstruct key: nil, value: nil, headers: nil, timestamp: nil - @type t :: %Message{ key: binary, value: binary, headers: list(), timestamp: integer } + + @type t :: %Message{ + key: binary, + value: binary, + headers: list(), + timestamp: integer + } end defmodule Response do diff --git a/test/integration/kayrock/compatibility_streaming_test.exs b/test/integration/kayrock/compatibility_streaming_test.exs index 92613076..0cddd4ea 100644 --- a/test/integration/kayrock/compatibility_streaming_test.exs +++ b/test/integration/kayrock/compatibility_streaming_test.exs @@ -12,8 +12,7 @@ defmodule KafkaEx.KayrockCompatibilityStreamingTest do @moduletag :new_client setup do - {:ok, pid} = - KafkaEx.start_link_worker(:no_name, server_impl: Client) + {:ok, pid} = KafkaEx.start_link_worker(:no_name, server_impl: Client) {:ok, %{client: pid}} end From a97ba8d2cc3e9bccbbce21e3d1e8585642c18653 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rog=C3=A9rio=20Ramos?= Date: Wed, 9 Sep 2020 23:35:58 +0200 Subject: [PATCH 5/9] Add default timeout to produce request --- test/integration/kayrock/record_batch_test.exs | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/test/integration/kayrock/record_batch_test.exs b/test/integration/kayrock/record_batch_test.exs index e87d8fca..a87bc1ed 100644 --- a/test/integration/kayrock/record_batch_test.exs +++ b/test/integration/kayrock/record_batch_test.exs @@ -199,15 +199,18 @@ defmodule KafkaEx.KayrockRecordBatchTest do msg = %KafkaEx.Protocol.Produce.Message{ key: "theKey", value: "theValue", - headers: record_headers + headers: record_headers, + timestamp: nil } request = %KafkaEx.Protocol.Produce.Request{ topic: topic, partition: 0, required_acks: 1, - api_version: 3, - messages: [msg] + timeout: 100, + compression: :none, + messages: [msg], + api_version: 3 } {:ok, offset} = From 74dd19d470a32d0b99cad6f3ba47934913266dee Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rog=C3=A9rio=20Ramos=20da=20Silva?= Date: Fri, 11 Sep 2020 16:35:00 -0300 Subject: [PATCH 6/9] Use the retrieved offset to fetch msg The offset resulting from the KafkaEx.produce sometimes was decreased causing some msg not being found or retrieved correct I don't have background to know why in the compressed msg testing the offset was subtract by -2 `max(offset-2, 0)` but the tests only passed when the retrieved offset was used to fetch messages --- .../integration/kayrock/record_batch_test.exs | 20 +++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/test/integration/kayrock/record_batch_test.exs b/test/integration/kayrock/record_batch_test.exs index a87bc1ed..bd7cc722 100644 --- a/test/integration/kayrock/record_batch_test.exs +++ b/test/integration/kayrock/record_batch_test.exs @@ -256,7 +256,7 @@ defmodule KafkaEx.KayrockRecordBatchTest do fetch_responses = KafkaEx.fetch(topic, 0, - offset: max(offset - 2, 0), + offset: offset, auto_commit: false, worker_name: client, api_version: 3 @@ -287,7 +287,7 @@ defmodule KafkaEx.KayrockRecordBatchTest do fetch_responses = KafkaEx.fetch(topic, 0, - offset: max(offset - 2, 0), + offset: offset, auto_commit: false, worker_name: client, api_version: 5 @@ -318,7 +318,7 @@ defmodule KafkaEx.KayrockRecordBatchTest do fetch_responses = KafkaEx.fetch(topic, 0, - offset: max(offset - 2, 0), + offset: offset, auto_commit: false, worker_name: client, api_version: 0 @@ -349,7 +349,7 @@ defmodule KafkaEx.KayrockRecordBatchTest do fetch_responses = KafkaEx.fetch(topic, 0, - offset: max(offset - 2, 0), + offset: offset, auto_commit: false, worker_name: client, api_version: 0 @@ -380,7 +380,7 @@ defmodule KafkaEx.KayrockRecordBatchTest do fetch_responses = KafkaEx.fetch(topic, 0, - offset: max(offset - 2, 0), + offset: offset, auto_commit: false, worker_name: client, api_version: 0 @@ -411,7 +411,7 @@ defmodule KafkaEx.KayrockRecordBatchTest do fetch_responses = KafkaEx.fetch(topic, 0, - offset: max(offset - 2, 0), + offset: offset, auto_commit: false, worker_name: client, api_version: 3 @@ -442,7 +442,7 @@ defmodule KafkaEx.KayrockRecordBatchTest do fetch_responses = KafkaEx.fetch(topic, 0, - offset: max(offset - 2, 0), + offset: offset, auto_commit: false, worker_name: client, api_version: 5 @@ -473,7 +473,7 @@ defmodule KafkaEx.KayrockRecordBatchTest do fetch_responses = KafkaEx.fetch(topic, 0, - offset: max(offset - 2, 0), + offset: offset, auto_commit: false, worker_name: client, api_version: 0 @@ -504,7 +504,7 @@ defmodule KafkaEx.KayrockRecordBatchTest do fetch_responses = KafkaEx.fetch(topic, 0, - offset: max(offset - 2, 0), + offset: offset, auto_commit: false, worker_name: client, api_version: 0 @@ -535,7 +535,7 @@ defmodule KafkaEx.KayrockRecordBatchTest do fetch_responses = KafkaEx.fetch(topic, 0, - offset: max(offset - 2, 0), + offset: offset, auto_commit: false, worker_name: client, api_version: 0 From 36fe923505843d7b7da6c14a58f4b141a388ea8e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rog=C3=A9rio=20Ramos?= Date: Wed, 4 Nov 2020 23:47:41 +0100 Subject: [PATCH 7/9] Specify headers type on fetch protocol --- lib/kafka_ex/protocol/fetch.ex | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/kafka_ex/protocol/fetch.ex b/lib/kafka_ex/protocol/fetch.ex index a4a62977..27e19460 100644 --- a/lib/kafka_ex/protocol/fetch.ex +++ b/lib/kafka_ex/protocol/fetch.ex @@ -67,7 +67,7 @@ defmodule KafkaEx.Protocol.Fetch do offset: integer, key: binary, value: binary, - headers: list(), + headers: [{key::binary, value::binary}], topic: binary, partition: integer, # timestamp supported for `kafka_version: "kayrock"` ONLY From 77e1b0daf7e82752a78a63e11f292e0b17a42873 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rog=C3=A9rio=20Ramos?= Date: Tue, 2 Nov 2021 21:20:15 +0100 Subject: [PATCH 8/9] Make producer headers type explicit Co-authored-by: Jehan Bruggeman --- lib/kafka_ex/protocol/produce.ex | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/kafka_ex/protocol/produce.ex b/lib/kafka_ex/protocol/produce.ex index 4980e14c..3d707188 100644 --- a/lib/kafka_ex/protocol/produce.ex +++ b/lib/kafka_ex/protocol/produce.ex @@ -58,7 +58,7 @@ defmodule KafkaEx.Protocol.Produce do @type t :: %Message{ key: binary, value: binary, - headers: list(), + headers: [{key::binary, value::binary}], timestamp: integer } end From d285306eb2c1a57dff6c4be7847d3620c52c6ef7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rog=C3=A9rio=20Ramos?= Date: Tue, 2 Nov 2021 21:20:42 +0100 Subject: [PATCH 9/9] Provide header param's doc --- lib/kafka_ex/protocol/produce.ex | 1 + 1 file changed, 1 insertion(+) diff --git a/lib/kafka_ex/protocol/produce.ex b/lib/kafka_ex/protocol/produce.ex index 3d707188..92203469 100644 --- a/lib/kafka_ex/protocol/produce.ex +++ b/lib/kafka_ex/protocol/produce.ex @@ -51,6 +51,7 @@ defmodule KafkaEx.Protocol.Produce do - key: is used for partition assignment, can be nil, when none is provided it is defaulted to nil - value: is the message to be written to Kafka logs. + - headers: is a list of keys and values that adds metadata to messages - timestamp: timestamp (`kafka_version: "kayrock"` ONLY) """ defstruct key: nil, value: nil, headers: nil, timestamp: nil