Skip to content

Commit

Permalink
Merge pull request #388 from davidrusu/configurable-client-id
Browse files Browse the repository at this point in the history
Configurable client id
  • Loading branch information
joshuawscott authored Nov 29, 2019
2 parents 188da6d + 3829e3a commit f587e30
Show file tree
Hide file tree
Showing 7 changed files with 26 additions and 16 deletions.
2 changes: 2 additions & 0 deletions config/config.exs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ config :kafka_ex,
# consumer groups, set this to :no_consumer_group (this is the
# only exception to the requirement that this value be a binary)
consumer_group: "kafka_ex",
# The client_id is the logical grouping of a set of kafka clients.
client_id: "kafka_ex",
# Set this value to true if you do not want the default
# `KafkaEx.Server` worker to start during application start-up -
# i.e., if you want to start your own set of named workers
Expand Down
5 changes: 5 additions & 0 deletions lib/kafka_ex/config.ex
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@ defmodule KafkaEx.Config do
Application.get_env(:kafka_ex, :disable_default_worker, false)
end

@doc false
def client_id do
Application.get_env(:kafka_ex, :client_id, "kafka_ex")
end

@doc false
def consumer_group do
Application.get_env(:kafka_ex, :consumer_group, "kafka_ex")
Expand Down
4 changes: 2 additions & 2 deletions lib/kafka_ex/new/client.ex
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ defmodule KafkaEx.New.Client do
the legacy KafkaEx API.
"""

alias KafkaEx.Config
alias KafkaEx.NetworkClient

alias KafkaEx.New.Broker
Expand Down Expand Up @@ -69,7 +70,6 @@ defmodule KafkaEx.New.Client do

# Default from GenServer
@default_call_timeout 5_000
@client_id "kafka_ex"
@retry_count 3
@sync_timeout 1_000

Expand Down Expand Up @@ -364,7 +364,7 @@ defmodule KafkaEx.New.Client do
defp client_request(request, state) do
%{
request
| client_id: @client_id,
| client_id: Config.client_id(),
correlation_id: state.correlation_id
}
end
Expand Down
10 changes: 5 additions & 5 deletions lib/kafka_ex/server.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ defmodule KafkaEx.Server do
Defines the KafkaEx.Server behavior that all Kafka API servers must implement, this module also provides some common callback functions that are injected into the servers that `use` it.
"""

alias KafkaEx.Config
alias KafkaEx.NetworkClient
alias KafkaEx.Protocol.ConsumerMetadata
alias KafkaEx.Protocol.Heartbeat.Request, as: HeartbeatRequest
Expand Down Expand Up @@ -272,7 +273,6 @@ defmodule KafkaEx.Server do
alias KafkaEx.NetworkClient
alias KafkaEx.Protocol.Offset

@client_id "kafka_ex"
@retry_count 3
@wait_time 10
@min_bytes 1
Expand Down Expand Up @@ -392,7 +392,7 @@ defmodule KafkaEx.Server do

produce_request_data =
try do
Produce.create_request(correlation_id, @client_id, produce_request)
Produce.create_request(correlation_id, Config.client_id(), produce_request)
rescue
e in FunctionClauseError -> nil
end
Expand Down Expand Up @@ -507,7 +507,7 @@ defmodule KafkaEx.Server do
offset_request =
Offset.create_request(
state.correlation_id,
@client_id,
Config.client_id(),
topic,
partition,
time
Expand Down Expand Up @@ -690,7 +690,7 @@ defmodule KafkaEx.Server do
metadata_request =
Metadata.create_request(
correlation_id,
@client_id,
Config.client_id(),
topic,
api_version
)
Expand Down Expand Up @@ -815,7 +815,7 @@ defmodule KafkaEx.Server do
defp client_request(request, state) do
%{
request
| client_id: @client_id,
| client_id: Config.client_id(),
correlation_id: state.correlation_id
}
end
Expand Down
9 changes: 5 additions & 4 deletions lib/kafka_ex/server_0_p_10_and_later.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ defmodule KafkaEx.Server0P10AndLater do
Implements kafkaEx.Server behaviors for kafka 0.10.1 API.
"""
use KafkaEx.Server
alias KafkaEx.Config
alias KafkaEx.Protocol.CreateTopics
alias KafkaEx.Protocol.DeleteTopics
alias KafkaEx.Protocol.ApiVersions
Expand Down Expand Up @@ -185,7 +186,7 @@ defmodule KafkaEx.Server0P10AndLater do
def kafka_server_api_versions(state) do
response =
state.correlation_id
|> ApiVersions.create_request(@client_id)
|> ApiVersions.create_request(Config.client_id())
|> first_broker_response(state)
|> ApiVersions.parse_response()

Expand All @@ -205,7 +206,7 @@ defmodule KafkaEx.Server0P10AndLater do
main_request =
DeleteTopics.create_request(
state.correlation_id,
@client_id,
Config.client_id(),
%DeleteTopics.Request{
topics: topics,
timeout: config_sync_timeout(network_timeout)
Expand Down Expand Up @@ -259,7 +260,7 @@ defmodule KafkaEx.Server0P10AndLater do
main_request =
CreateTopics.create_request(
state.correlation_id,
@client_id,
Config.client_id(),
create_topics_request,
api_version
)
Expand Down Expand Up @@ -318,7 +319,7 @@ defmodule KafkaEx.Server0P10AndLater do
) do
response =
correlation_id
|> ConsumerMetadata.create_request(@client_id, consumer_group)
|> ConsumerMetadata.create_request(Config.client_id(), consumer_group)
|> first_broker_response(state)
|> ConsumerMetadata.parse_response()

Expand Down
7 changes: 4 additions & 3 deletions lib/kafka_ex/server_0_p_8_p_2.ex
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ defmodule KafkaEx.Server0P8P2 do
]

use KafkaEx.Server
alias KafkaEx.Config
alias KafkaEx.ConsumerGroupRequiredError
alias KafkaEx.InvalidConsumerGroupError
alias KafkaEx.Protocol.ConsumerMetadata
Expand Down Expand Up @@ -138,7 +139,7 @@ defmodule KafkaEx.Server0P8P2 do
offset_fetch = %{offset_fetch | consumer_group: consumer_group}

offset_fetch_request =
OffsetFetch.create_request(state.correlation_id, @client_id, offset_fetch)
OffsetFetch.create_request(state.correlation_id, Config.client_id(), offset_fetch)

{response, state} =
case broker do
Expand Down Expand Up @@ -235,7 +236,7 @@ defmodule KafkaEx.Server0P8P2 do
) do
response =
correlation_id
|> ConsumerMetadata.create_request(@client_id, consumer_group)
|> ConsumerMetadata.create_request(Config.client_id(), consumer_group)
|> first_broker_response(state)
|> ConsumerMetadata.parse_response()

Expand Down Expand Up @@ -302,7 +303,7 @@ defmodule KafkaEx.Server0P8P2 do
offset_commit_request_payload =
OffsetCommit.create_request(
state.correlation_id,
@client_id,
Config.client_id(),
offset_commit_request
)

Expand Down
5 changes: 3 additions & 2 deletions lib/kafka_ex/server_0_p_9_p_0.ex
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ defmodule KafkaEx.Server0P9P0 do
]

use KafkaEx.Server
alias KafkaEx.Config
alias KafkaEx.ConsumerGroupRequiredError
alias KafkaEx.InvalidConsumerGroupError
alias KafkaEx.Protocol.ConsumerMetadata
Expand Down Expand Up @@ -212,7 +213,7 @@ defmodule KafkaEx.Server0P9P0 do
wire_request =
protocol_module.create_request(
state.correlation_id,
@client_id,
Config.client_id(),
request
)

Expand Down Expand Up @@ -271,7 +272,7 @@ defmodule KafkaEx.Server0P9P0 do
) do
response =
correlation_id
|> ConsumerMetadata.create_request(@client_id, consumer_group)
|> ConsumerMetadata.create_request(Config.client_id(), consumer_group)
|> first_broker_response(state)
|> ConsumerMetadata.parse_response()

Expand Down

0 comments on commit f587e30

Please sign in to comment.