From 0696c399d7163819c384270f79f4e3b44d46f6f6 Mon Sep 17 00:00:00 2001 From: David Rusu Date: Wed, 27 Nov 2019 17:02:12 -0500 Subject: [PATCH 1/2] Configurable client id --- lib/kafka_ex/config.ex | 5 +++++ lib/kafka_ex/new/client.ex | 4 ++-- lib/kafka_ex/server.ex | 10 +++++----- lib/kafka_ex/server_0_p_10_and_later.ex | 9 +++++---- lib/kafka_ex/server_0_p_8_p_2.ex | 7 ++++--- lib/kafka_ex/server_0_p_9_p_0.ex | 5 +++-- 6 files changed, 24 insertions(+), 16 deletions(-) diff --git a/lib/kafka_ex/config.ex b/lib/kafka_ex/config.ex index fa3f4e2f..177fe853 100644 --- a/lib/kafka_ex/config.ex +++ b/lib/kafka_ex/config.ex @@ -16,6 +16,11 @@ defmodule KafkaEx.Config do Application.get_env(:kafka_ex, :disable_default_worker, false) end + @doc false + def client_id do + Application.get_env(:kafka_ex, :client_id, "kafka_ex") + end + @doc false def consumer_group do Application.get_env(:kafka_ex, :consumer_group, "kafka_ex") diff --git a/lib/kafka_ex/new/client.ex b/lib/kafka_ex/new/client.ex index 3287ad56..638339e4 100644 --- a/lib/kafka_ex/new/client.ex +++ b/lib/kafka_ex/new/client.ex @@ -11,6 +11,7 @@ defmodule KafkaEx.New.Client do the legacy KafkaEx API. """ + alias KafkaEx.Config alias KafkaEx.NetworkClient alias KafkaEx.New.Broker @@ -69,7 +70,6 @@ defmodule KafkaEx.New.Client do # Default from GenServer @default_call_timeout 5_000 - @client_id "kafka_ex" @retry_count 3 @sync_timeout 1_000 @@ -364,7 +364,7 @@ defmodule KafkaEx.New.Client do defp client_request(request, state) do %{ request - | client_id: @client_id, + | client_id: Config.client_id(), correlation_id: state.correlation_id } end diff --git a/lib/kafka_ex/server.ex b/lib/kafka_ex/server.ex index 19461656..5612fbd8 100644 --- a/lib/kafka_ex/server.ex +++ b/lib/kafka_ex/server.ex @@ -3,6 +3,7 @@ defmodule KafkaEx.Server do Defines the KafkaEx.Server behavior that all Kafka API servers must implement, this module also provides some common callback functions that are injected into the servers that `use` it. """ + alias KafkaEx.Config alias KafkaEx.NetworkClient alias KafkaEx.Protocol.ConsumerMetadata alias KafkaEx.Protocol.Heartbeat.Request, as: HeartbeatRequest @@ -272,7 +273,6 @@ defmodule KafkaEx.Server do alias KafkaEx.NetworkClient alias KafkaEx.Protocol.Offset - @client_id "kafka_ex" @retry_count 3 @wait_time 10 @min_bytes 1 @@ -392,7 +392,7 @@ defmodule KafkaEx.Server do produce_request_data = try do - Produce.create_request(correlation_id, @client_id, produce_request) + Produce.create_request(correlation_id, Config.client_id(), produce_request) rescue e in FunctionClauseError -> nil end @@ -507,7 +507,7 @@ defmodule KafkaEx.Server do offset_request = Offset.create_request( state.correlation_id, - @client_id, + Config.client_id(), topic, partition, time @@ -690,7 +690,7 @@ defmodule KafkaEx.Server do metadata_request = Metadata.create_request( correlation_id, - @client_id, + Config.client_id(), topic, api_version ) @@ -815,7 +815,7 @@ defmodule KafkaEx.Server do defp client_request(request, state) do %{ request - | client_id: @client_id, + | client_id: Config.client_id(), correlation_id: state.correlation_id } end diff --git a/lib/kafka_ex/server_0_p_10_and_later.ex b/lib/kafka_ex/server_0_p_10_and_later.ex index 0e6ccbe2..b596bce9 100644 --- a/lib/kafka_ex/server_0_p_10_and_later.ex +++ b/lib/kafka_ex/server_0_p_10_and_later.ex @@ -3,6 +3,7 @@ defmodule KafkaEx.Server0P10AndLater do Implements kafkaEx.Server behaviors for kafka 0.10.1 API. """ use KafkaEx.Server + alias KafkaEx.Config alias KafkaEx.Protocol.CreateTopics alias KafkaEx.Protocol.DeleteTopics alias KafkaEx.Protocol.ApiVersions @@ -185,7 +186,7 @@ defmodule KafkaEx.Server0P10AndLater do def kafka_server_api_versions(state) do response = state.correlation_id - |> ApiVersions.create_request(@client_id) + |> ApiVersions.create_request(Config.client_id()) |> first_broker_response(state) |> ApiVersions.parse_response() @@ -205,7 +206,7 @@ defmodule KafkaEx.Server0P10AndLater do main_request = DeleteTopics.create_request( state.correlation_id, - @client_id, + Config.client_id(), %DeleteTopics.Request{ topics: topics, timeout: config_sync_timeout(network_timeout) @@ -259,7 +260,7 @@ defmodule KafkaEx.Server0P10AndLater do main_request = CreateTopics.create_request( state.correlation_id, - @client_id, + Config.client_id(), create_topics_request, api_version ) @@ -318,7 +319,7 @@ defmodule KafkaEx.Server0P10AndLater do ) do response = correlation_id - |> ConsumerMetadata.create_request(@client_id, consumer_group) + |> ConsumerMetadata.create_request(Config.client_id(), consumer_group) |> first_broker_response(state) |> ConsumerMetadata.parse_response() diff --git a/lib/kafka_ex/server_0_p_8_p_2.ex b/lib/kafka_ex/server_0_p_8_p_2.ex index 53007a6e..3b74350b 100644 --- a/lib/kafka_ex/server_0_p_8_p_2.ex +++ b/lib/kafka_ex/server_0_p_8_p_2.ex @@ -15,6 +15,7 @@ defmodule KafkaEx.Server0P8P2 do ] use KafkaEx.Server + alias KafkaEx.Config alias KafkaEx.ConsumerGroupRequiredError alias KafkaEx.InvalidConsumerGroupError alias KafkaEx.Protocol.ConsumerMetadata @@ -138,7 +139,7 @@ defmodule KafkaEx.Server0P8P2 do offset_fetch = %{offset_fetch | consumer_group: consumer_group} offset_fetch_request = - OffsetFetch.create_request(state.correlation_id, @client_id, offset_fetch) + OffsetFetch.create_request(state.correlation_id, Config.client_id(), offset_fetch) {response, state} = case broker do @@ -235,7 +236,7 @@ defmodule KafkaEx.Server0P8P2 do ) do response = correlation_id - |> ConsumerMetadata.create_request(@client_id, consumer_group) + |> ConsumerMetadata.create_request(Config.client_id(), consumer_group) |> first_broker_response(state) |> ConsumerMetadata.parse_response() @@ -302,7 +303,7 @@ defmodule KafkaEx.Server0P8P2 do offset_commit_request_payload = OffsetCommit.create_request( state.correlation_id, - @client_id, + Config.client_id(), offset_commit_request ) diff --git a/lib/kafka_ex/server_0_p_9_p_0.ex b/lib/kafka_ex/server_0_p_9_p_0.ex index 178de9c5..2d391ae0 100644 --- a/lib/kafka_ex/server_0_p_9_p_0.ex +++ b/lib/kafka_ex/server_0_p_9_p_0.ex @@ -12,6 +12,7 @@ defmodule KafkaEx.Server0P9P0 do ] use KafkaEx.Server + alias KafkaEx.Config alias KafkaEx.ConsumerGroupRequiredError alias KafkaEx.InvalidConsumerGroupError alias KafkaEx.Protocol.ConsumerMetadata @@ -212,7 +213,7 @@ defmodule KafkaEx.Server0P9P0 do wire_request = protocol_module.create_request( state.correlation_id, - @client_id, + Config.client_id(), request ) @@ -271,7 +272,7 @@ defmodule KafkaEx.Server0P9P0 do ) do response = correlation_id - |> ConsumerMetadata.create_request(@client_id, consumer_group) + |> ConsumerMetadata.create_request(Config.client_id(), consumer_group) |> first_broker_response(state) |> ConsumerMetadata.parse_response() From 3829e3a9c884e32c10fc87ec9bb798c4a766b39b Mon Sep 17 00:00:00 2001 From: David Rusu Date: Wed, 27 Nov 2019 17:36:28 -0500 Subject: [PATCH 2/2] Add the client_id option to /config/config.exs with some commentary --- config/config.exs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/config/config.exs b/config/config.exs index a1678e23..cc549ec5 100644 --- a/config/config.exs +++ b/config/config.exs @@ -33,6 +33,8 @@ config :kafka_ex, # consumer groups, set this to :no_consumer_group (this is the # only exception to the requirement that this value be a binary) consumer_group: "kafka_ex", + # The client_id is the logical grouping of a set of kafka clients. + client_id: "kafka_ex", # Set this value to true if you do not want the default # `KafkaEx.Server` worker to start during application start-up - # i.e., if you want to start your own set of named workers