Skip to content

Commit

Permalink
Use api_versions to control the version of Medatadata being called.
Browse files Browse the repository at this point in the history
  • Loading branch information
jbruggem committed Nov 6, 2018
1 parent 820bd32 commit 468557e
Show file tree
Hide file tree
Showing 12 changed files with 133 additions and 70 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ The 0.9 client includes functionality that cannot be tested with older
clusters.

```
mix test --include integration --include consumer_group --include server_0_p_10_p_1 --include server_0_p_9_p_0 --include server_0_p_8_p_0
./all_tests.sh
```

##### Kafka >= 0.9.0
Expand Down
5 changes: 5 additions & 0 deletions all_tests.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
#! /bin/sh

# WARN: when changing something here, there should probably also be a change in scripts/ci_tests.sh

mix test --include integration --include consumer_group --include server_0_p_10_and_later --include server_0_p_9_p_0 --include server_0_p_8_p_0
1 change: 0 additions & 1 deletion lib/kafka_ex/api_versions.ex
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,5 @@ defmodule KafkaEx.ApiVersions do
end
end
end

end
end
2 changes: 1 addition & 1 deletion lib/kafka_ex/config.ex
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ defmodule KafkaEx.Config do
defp server("0.8.0"), do: KafkaEx.Server0P8P0
defp server("0.8.2"), do: KafkaEx.Server0P8P2
defp server("0.9.0"), do: KafkaEx.Server0P9P0
defp server(_), do: KafkaEx.Server0P10P1
defp server(_), do: KafkaEx.Server0P10AndLater


# ssl_options should be an empty list by default if use_ssl is false
Expand Down
12 changes: 12 additions & 0 deletions lib/kafka_ex/protocol/metadata.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ defmodule KafkaEx.Protocol.Metadata do
alias KafkaEx.Protocol
import KafkaEx.Protocol.Common

@supported_versions_range {0, 1}
@default_api_version 0

@moduledoc """
Expand Down Expand Up @@ -97,8 +98,19 @@ defmodule KafkaEx.Protocol.Metadata do
}
end

def api_version(api_versions) do
case KafkaEx.ApiVersions.find_api_version(api_versions, :metadata, @supported_versions_range) do
{:ok, version} -> version
_ -> @default_api_version
end
end

def create_request(correlation_id, client_id, topics, api_version \\ @default_api_version)

def create_request(correlation_id, client_id, nil, api_version) do
create_request(correlation_id, client_id, "", api_version)
end

def create_request(correlation_id, client_id, "", api_version) do
topic_count = if 0 == api_version, do: 0, else: -1
KafkaEx.Protocol.create_request(:metadata, correlation_id, client_id, api_version) <> << topic_count :: 32-signed >>
Expand Down
43 changes: 24 additions & 19 deletions lib/kafka_ex/server.ex
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ defmodule KafkaEx.Server do
consumer_group_update_interval: nil,
worker_name: KafkaEx.Server,
ssl_options: [],
use_ssl: false
use_ssl: false,
api_versions: []
)

@type t :: %State{
Expand All @@ -51,6 +52,7 @@ defmodule KafkaEx.Server do
worker_name: atom,
ssl_options: KafkaEx.ssl_options,
use_ssl: boolean,
api_versions: [KafkaEx.Protocol.ApiVersions.ApiVersion],
}

@spec increment_correlation_id(t) :: t
Expand Down Expand Up @@ -313,8 +315,8 @@ defmodule KafkaEx.Server do
def kafka_server_produce_send_request(correlation_id, produce_request, produce_request_data, state) do
{broker, state, corr_id} = case MetadataResponse.broker_for_topic(state.metadata, state.brokers, produce_request.topic, produce_request.partition) do
nil ->
{retrieved_corr_id, _} = retrieve_metadata(state.brokers, state.correlation_id, config_sync_timeout(), produce_request.topic)
state = %{update_metadata(state) | correlation_id: retrieved_corr_id}
{retrieved_corr_id, _} = retrieve_metadata(state.brokers, state.correlation_id, config_sync_timeout(), produce_request.topic, state.api_versions)
state = update_metadata(%{state | correlation_id: retrieved_corr_id})
{
MetadataResponse.broker_for_topic(state.metadata, state.brokers, produce_request.topic, produce_request.partition),
state,
Expand Down Expand Up @@ -383,7 +385,7 @@ defmodule KafkaEx.Server do
end

def kafka_server_metadata(topic, state) do
{correlation_id, metadata} = retrieve_metadata(state.brokers, state.correlation_id, config_sync_timeout(), topic)
{correlation_id, metadata} = retrieve_metadata(state.brokers, state.correlation_id, config_sync_timeout(), topic, state.api_versions)
updated_state = %{state | metadata: metadata, correlation_id: correlation_id}
{:reply, metadata, updated_state}
end
Expand All @@ -392,10 +394,8 @@ defmodule KafkaEx.Server do
{:noreply, update_metadata(state)}
end

def update_metadata(state), do: update_metadata(state, 0)

def update_metadata(state, api_version) do
{correlation_id, metadata} = retrieve_metadata(state.brokers, state.correlation_id, config_sync_timeout(), [], api_version)
def update_metadata(state) do
{correlation_id, metadata} = retrieve_metadata(state.brokers, state.correlation_id, config_sync_timeout(), nil, state.api_versions)
metadata_brokers = metadata.brokers |> Enum.map(&(%{&1 | is_controller: &1.node_id == metadata.controller_id}))
brokers = state.brokers
|> remove_stale_brokers(metadata_brokers)
Expand All @@ -404,30 +404,27 @@ defmodule KafkaEx.Server do
end

# credo:disable-for-next-line Credo.Check.Refactor.FunctionArity
def retrieve_metadata(brokers, correlation_id, sync_timeout, topic \\ [], api_version \\ 0) do
retrieve_metadata(brokers, correlation_id, sync_timeout, topic, @retry_count, 0, api_version)
def retrieve_metadata(brokers, correlation_id, sync_timeout, topic \\ [], server_api_versions \\ [:unsupported]) do
retrieve_metadata(brokers, correlation_id, sync_timeout, topic, @retry_count, 0, server_api_versions)
end

def retrieve_metadata(brokers, correlation_id, sync_timeout, topic, retry, error_code, api_version \\ 0)

# credo:disable-for-next-line Credo.Check.Refactor.FunctionArity
def retrieve_metadata(_, correlation_id, _sync_timeout, topic, 0, error_code, api_version) do
Logger.log(:error, "Metadata request for topic #{inspect topic} failed with error_code #{inspect error_code}")
{correlation_id, %Metadata.Response{}}
def retrieve_metadata(brokers, correlation_id, sync_timeout, topic, retry, error_code, server_api_versions \\ [:unsupported]) do
api_version = Metadata.api_version(server_api_versions)
retrieve_metadata_with_version(brokers, correlation_id, sync_timeout, topic, retry, error_code, api_version)
end

# credo:disable-for-next-line Credo.Check.Refactor.FunctionArity
def retrieve_metadata(brokers, correlation_id, sync_timeout, topic, retry, _error_code, api_version) do
def retrieve_metadata_with_version(brokers, correlation_id, sync_timeout, topic, retry, _error_code, api_version) do
metadata_request = Metadata.create_request(correlation_id, @client_id, topic, api_version)
data = first_broker_response(metadata_request, brokers, sync_timeout)
if data do
response = Metadata.parse_response(data, api_version)

case Enum.find(response.topic_metadatas, &(&1.error_code == :leader_not_available)) do
nil -> {correlation_id + 1, response}
topic_metadata ->
:timer.sleep(300)
retrieve_metadata(brokers, correlation_id + 1, sync_timeout, topic, retry - 1, topic_metadata.error_code, api_version)
retrieve_metadata_with_version(brokers, correlation_id + 1, sync_timeout, topic, retry - 1, topic_metadata.error_code, api_version)
end
else
message = "Unable to fetch metadata from any brokers. Timeout is #{sync_timeout}."
Expand All @@ -437,6 +434,13 @@ defmodule KafkaEx.Server do
end
end

# credo:disable-for-next-line Credo.Check.Refactor.FunctionArity
def retrieve_metadata_with_version(_, correlation_id, _sync_timeout, topic, 0, error_code, server_api_versions) do
Logger.log(:error, "Metadata request for topic #{inspect topic} failed with error_code #{inspect error_code}")
{correlation_id, %Metadata.Response{}}
end


defoverridable [
kafka_server_produce: 2, kafka_server_offset: 4,
kafka_server_metadata: 2, kafka_server_update_metadata: 1,
Expand Down Expand Up @@ -470,7 +474,8 @@ defmodule KafkaEx.Server do
metadata_update_interval: metadata_update_interval,
ssl_options: ssl_options,
use_ssl: use_ssl,
worker_name: name
worker_name: name,
api_versions: [:unsupported]
}

state = update_metadata(state)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
defmodule KafkaEx.Server0P10P1 do
defmodule KafkaEx.Server0P10AndLater do
@moduledoc """
Implements kafkaEx.Server behaviors for kafka 0.10.1 API.
"""
Expand All @@ -16,7 +16,6 @@ defmodule KafkaEx.Server0P10P1 do

require Logger

@metadata_api_version 1
@consumer_group_update_interval 30_000


Expand Down Expand Up @@ -66,10 +65,26 @@ defmodule KafkaEx.Server0P10P1 do

brokers = Enum.map(uris, fn({host, port}) -> %Broker{host: host, port: port, socket: NetworkClient.create_socket(host, port, ssl_options, use_ssl)} end)

{correlation_id, metadata} = retrieve_metadata(brokers, 0, config_sync_timeout(), [], @metadata_api_version)
state = %State{metadata: metadata, brokers: brokers, correlation_id: correlation_id, consumer_group: consumer_group, metadata_update_interval: metadata_update_interval, consumer_group_update_interval: consumer_group_update_interval, worker_name: name, ssl_options: ssl_options, use_ssl: use_ssl}
{ _, %KafkaEx.Protocol.ApiVersions.Response{ api_versions: api_versions, error_code: :no_error }, state } = kafka_api_versions(%State{brokers: brokers})
api_versions = KafkaEx.ApiVersions.api_versions_map(api_versions)

{correlation_id, metadata} = retrieve_metadata(brokers, state.correlation_id, config_sync_timeout(), [], api_versions)

state = %State{
metadata: metadata,
brokers: brokers,
correlation_id: correlation_id,
consumer_group: consumer_group,
metadata_update_interval: metadata_update_interval,
consumer_group_update_interval: consumer_group_update_interval,
worker_name: name,
ssl_options: ssl_options,
use_ssl: use_ssl,
api_versions: api_versions,
}

# Get the initial "real" broker list and start a regular refresh cycle.
state = update_metadata(state, @metadata_api_version)
state = update_metadata(state)
{:ok, _} = :timer.send_interval(state.metadata_update_interval, :update_metadata)

state =
Expand All @@ -86,13 +101,13 @@ defmodule KafkaEx.Server0P10P1 do
end

def kafka_server_metadata(topic, state) do
{correlation_id, metadata} = retrieve_metadata(state.brokers, state.correlation_id, config_sync_timeout(), topic, @metadata_api_version)
{correlation_id, metadata} = retrieve_metadata(state.brokers, state.correlation_id, config_sync_timeout(), topic, state.api_versions)
updated_state = %{state | metadata: metadata, correlation_id: correlation_id}
{:reply, metadata, updated_state}
end

def kafka_server_update_metadata(state) do
{:noreply, update_metadata(state, @metadata_api_version)}
{:noreply, update_metadata(state)}
end

def kafka_api_versions(state) do
Expand Down
11 changes: 10 additions & 1 deletion lib/kafka_ex/server_0_p_8_p_2.ex
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,16 @@ defmodule KafkaEx.Server0P8P2 do

brokers = Enum.map(uris, fn({host, port}) -> %Broker{host: host, port: port, socket: NetworkClient.create_socket(host, port)} end)
{correlation_id, metadata} = retrieve_metadata(brokers, 0, config_sync_timeout())
state = %State{metadata: metadata, brokers: brokers, correlation_id: correlation_id, consumer_group: consumer_group, metadata_update_interval: metadata_update_interval, consumer_group_update_interval: consumer_group_update_interval, worker_name: name}
state = %State{
metadata: metadata,
brokers: brokers,
correlation_id: correlation_id,
consumer_group: consumer_group,
metadata_update_interval: metadata_update_interval,
consumer_group_update_interval: consumer_group_update_interval,
worker_name: name,
api_versions: [:unsupported]
}
# Get the initial "real" broker list and start a regular refresh cycle.
state = update_metadata(state)
{:ok, _} = :timer.send_interval(state.metadata_update_interval, :update_metadata)
Expand Down
13 changes: 12 additions & 1 deletion lib/kafka_ex/server_0_p_9_p_0.ex
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,18 @@ defmodule KafkaEx.Server0P9P0 do

brokers = Enum.map(uris, fn({host, port}) -> %Broker{host: host, port: port, socket: NetworkClient.create_socket(host, port, ssl_options, use_ssl)} end)
{correlation_id, metadata} = retrieve_metadata(brokers, 0, config_sync_timeout())
state = %State{metadata: metadata, brokers: brokers, correlation_id: correlation_id, consumer_group: consumer_group, metadata_update_interval: metadata_update_interval, consumer_group_update_interval: consumer_group_update_interval, worker_name: name, ssl_options: ssl_options, use_ssl: use_ssl}
state = %State{
metadata: metadata,
brokers: brokers,
correlation_id: correlation_id,
consumer_group: consumer_group,
metadata_update_interval: metadata_update_interval,
consumer_group_update_interval: consumer_group_update_interval,
worker_name: name,
ssl_options: ssl_options,
use_ssl: use_ssl,
api_versions: [:unsupported]
}
# Get the initial "real" broker list and start a regular refresh cycle.
state = update_metadata(state)
{:ok, _} = :timer.send_interval(state.metadata_update_interval, :update_metadata)
Expand Down
2 changes: 1 addition & 1 deletion scripts/ci_tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ else
TEST_COMMAND=test
fi

INCLUDED_TESTS="--include integration --include consumer_group --include server_0_p_10_p_1 --include server_0_p_9_p_0 --include server_0_p_8_p_0"
INCLUDED_TESTS="--include integration --include consumer_group --include server_0_p_10_and_later --include server_0_p_9_p_0 --include server_0_p_8_p_0"

mix "$TEST_COMMAND" $INCLUDED_TESTS

Expand Down
43 changes: 43 additions & 0 deletions test/integration/server0_p_10_and_later_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
defmodule KafkaEx.Server0P10P1AndLater.Test do
use ExUnit.Case
import TestHelper

@moduletag :server_0_p_10_and_later

@tag :create_topic
test "can create a topic" do
name = "create_topic_#{:rand.uniform(2000000)}"

request = %{
topic: name,
num_partitions: 10,
replication_factor: 1,
replica_assignment: [],
config_entries: [
%{config_name: "cleanup.policy", config_value: "compact"},
%{config_name: "min.compaction.lag.ms", config_value: "0"}
]}

resp = KafkaEx.create_topics([request], timeout: 2000)
assert {:no_error, name} == parse_create_topic_resp(resp)

resp = KafkaEx.create_topics([request], timeout: 2000)
assert {:topic_already_exists, name} == parse_create_topic_resp(resp)

wait_for(fn ->
topics = KafkaEx.metadata.topic_metadatas |> Enum.map(&(&1.topic))
assert Enum.member?(topics, name)
end)
end

def parse_create_topic_resp(response) do
%KafkaEx.Protocol.CreateTopics.Response{
topic_errors: [
%KafkaEx.Protocol.CreateTopics.TopicError{
error_code: error_code,
topic_name: topic_name
}
]} = response
{error_code, topic_name}
end
end
40 changes: 2 additions & 38 deletions test/integration/server0_p_10_p_1_test.exs
Original file line number Diff line number Diff line change
@@ -1,46 +1,10 @@
defmodule KafkaEx.Server0P10P1.Test do
use ExUnit.Case
import TestHelper

@moduletag :server_0_p_10_and_later
@moduletag :server_0_p_10_p_1

@tag :create_topic
test "can create a topic" do
name = "create_topic_#{:rand.uniform(2000000)}"

request = %{
topic: name,
num_partitions: 10,
replication_factor: 1,
replica_assignment: [],
config_entries: [
%{config_name: "cleanup.policy", config_value: "compact"},
%{config_name: "min.compaction.lag.ms", config_value: "0"}
]}

resp = KafkaEx.create_topics([request], timeout: 2000)
assert {:no_error, name} == parse_create_topic_resp(resp)

resp = KafkaEx.create_topics([request], timeout: 2000)
assert {:topic_already_exists, name} == parse_create_topic_resp(resp)

wait_for(fn ->
topics = KafkaEx.metadata.topic_metadatas |> Enum.map(&(&1.topic))
assert Enum.member?(topics, name)
end)
end

def parse_create_topic_resp(response) do
%KafkaEx.Protocol.CreateTopics.Response{
topic_errors: [
%KafkaEx.Protocol.CreateTopics.TopicError{
error_code: error_code,
topic_name: topic_name
}
]} = response
{error_code, topic_name}
end

# specific to this server version because we want to test that the api_versions list is exact
@tag :api_version
test "can retrieve api versions" do

Expand Down

0 comments on commit 468557e

Please sign in to comment.