Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use api versions for new api calls (depends on #319) #320

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ The 0.9 client includes functionality that cannot be tested with older
clusters.

```
mix test --include integration --include consumer_group --include server_0_p_10_p_1 --include server_0_p_9_p_0 --include server_0_p_8_p_0
./all_tests.sh
```

##### Kafka >= 0.9.0
Expand Down
5 changes: 5 additions & 0 deletions all_tests.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
#! /bin/sh

# WARN: when changing something here, there should probably also be a change in scripts/ci_tests.sh

mix test --include integration --include consumer_group --include server_0_p_10_and_later --include server_0_p_9_p_0 --include server_0_p_8_p_0
26 changes: 26 additions & 0 deletions lib/kafka_ex/api_versions.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
defmodule KafkaEx.ApiVersions do

def api_versions_map(api_versions) do
api_versions
|> Enum.reduce(%{}, fn version, version_map ->
version_map |> Map.put(version.api_key, version)
end)
end


def find_api_version(api_versions_map, message_type, {min_implemented_version, max_implemented_version}) do
if api_versions_map == [:unsupported] do
{:ok, min_implemented_version}
else
case KafkaEx.Protocol.api_key(message_type) do
nil -> :unknown_message_for_client
api_key -> case api_versions_map[api_key] do
%{min_version: min} when min > max_implemented_version -> :no_version_supported
%{max_version: max} when max < min_implemented_version -> :no_version_supported
%{max_version: max} -> {:ok, Enum.min([max_implemented_version, max])}
_ -> :unknown_message_for_server
end
end
end
end
end
2 changes: 1 addition & 1 deletion lib/kafka_ex/config.ex
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ defmodule KafkaEx.Config do
defp server("0.8.0"), do: KafkaEx.Server0P8P0
defp server("0.8.2"), do: KafkaEx.Server0P8P2
defp server("0.9.0"), do: KafkaEx.Server0P9P0
defp server(_), do: KafkaEx.Server0P10P1
defp server(_), do: KafkaEx.Server0P10AndLater


# ssl_options should be an empty list by default if use_ssl is false
Expand Down
90 changes: 21 additions & 69 deletions lib/kafka_ex/protocol.ex
Original file line number Diff line number Diff line change
@@ -1,81 +1,33 @@
defmodule KafkaEx.Protocol do
@moduledoc false

@produce_request 0
@fetch_request 1
@offset_request 2
@metadata_request 3
@offset_commit_request 8
@offset_fetch_request 9
@consumer_metadata_request 10
@join_group_request 11
@heartbeat_request 12
@leave_group_request 13
@sync_group_request 14
@api_versions_request 18
@create_topics_request 19
@message_type_to_api_key %{
produce: 0,
fetch: 1,
offset: 2,
metadata: 3,
offset_commit: 8,
offset_fetch: 9,
consumer_metadata: 10,
join_group: 11,
heartbeat: 12,
leave_group: 13,
sync_group: 14,
api_versions: 18,
create_topics: 19,
}

# DescribeConfigs 32
# AlterConfigs 33 Valid resource types are "Topic" and "Broker".

@api_version 0

defp api_key(:produce) do
@produce_request
end

defp api_key(:fetch) do
@fetch_request
end

defp api_key(:offset) do
@offset_request
end

defp api_key(:metadata) do
@metadata_request
end

defp api_key(:offset_commit) do
@offset_commit_request
end

defp api_key(:offset_fetch) do
@offset_fetch_request
end

defp api_key(:consumer_metadata) do
@consumer_metadata_request
end

defp api_key(:join_group) do
@join_group_request
end

defp api_key(:heartbeat) do
@heartbeat_request
end

defp api_key(:leave_group) do
@leave_group_request
end

defp api_key(:sync_group) do
@sync_group_request
end

defp api_key(:api_versions) do
@api_versions_request
end

defp api_key(:create_topics) do
@create_topics_request
end
@default_api_version 0

def create_request(type, correlation_id, client_id) do
create_request(type, correlation_id, client_id, @api_version)
@spec api_key(atom) :: integer | nil
def api_key(type) do
Map.get(@message_type_to_api_key, type, nil)
end

def create_request(type, correlation_id, client_id, api_version) do
def create_request(type, correlation_id, client_id, api_version \\ @default_api_version) do
<< api_key(type) :: 16, api_version :: 16, correlation_id :: 32,
byte_size(client_id) :: 16, client_id :: binary >>
end
Expand Down
18 changes: 14 additions & 4 deletions lib/kafka_ex/protocol/create_topics.ex
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@

defmodule KafkaEx.Protocol.CreateTopics do
alias KafkaEx.Protocol
@supported_versions_range {0, 0}
@default_api_version 0

@moduledoc """
Implementation of the Kafka CreateTopics request and response APIs
Expand Down Expand Up @@ -63,8 +65,14 @@ defmodule KafkaEx.Protocol.CreateTopics do
@type t :: %Response{topic_errors: [TopicError]}
end

@spec create_request(integer, binary, Request.t) :: binary
def create_request(correlation_id, client_id, create_topics_request) do
def api_version(api_versions) do
KafkaEx.ApiVersions.find_api_version(api_versions, :create_topics, @supported_versions_range)
end

@spec create_request(integer, binary, Request.t, integer) :: binary
def create_request(correlation_id, client_id, create_topics_request, api_version)

def create_request(correlation_id, client_id, create_topics_request, 0) do
Protocol.create_request(:create_topics, correlation_id, client_id) <>
encode_topic_requests(create_topics_request.create_topic_requests) <>
<< create_topics_request.timeout :: 32-signed >>
Expand Down Expand Up @@ -129,8 +137,10 @@ defmodule KafkaEx.Protocol.CreateTopics do
end
end

@spec parse_response(binary) :: [] | Response.t
def parse_response(<< _correlation_id :: 32-signed, topic_errors_count :: 32-signed, topic_errors :: binary >>) do
@spec parse_response(binary, integer) :: [] | Response.t
def parse_response(message, api_version)

def parse_response(<< _correlation_id :: 32-signed, topic_errors_count :: 32-signed, topic_errors :: binary >>, 0) do
%Response{topic_errors: parse_topic_errors(topic_errors_count, topic_errors)}
end

Expand Down
17 changes: 17 additions & 0 deletions lib/kafka_ex/protocol/metadata.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ defmodule KafkaEx.Protocol.Metadata do
alias KafkaEx.Protocol
import KafkaEx.Protocol.Common

@supported_versions_range {0, 1}
@default_api_version 0

@moduledoc """
Expand Down Expand Up @@ -97,8 +98,24 @@ defmodule KafkaEx.Protocol.Metadata do
}
end

def api_version(api_versions) do
case KafkaEx.ApiVersions.find_api_version(api_versions, :metadata, @supported_versions_range) do
{:ok, version} -> version
# those three should never happen since :metadata is part of the protocol since the beginning.
# they are left here as this will server as reference implementation
# :unknown_message_for_server ->
# :unknown_message_for_client ->
# :no_version_supported ->
_ -> @default_api_version
end
end

def create_request(correlation_id, client_id, topics, api_version \\ @default_api_version)

def create_request(correlation_id, client_id, nil, api_version) do
create_request(correlation_id, client_id, "", api_version)
end

def create_request(correlation_id, client_id, "", api_version) do
topic_count = if 0 == api_version, do: 0, else: -1
KafkaEx.Protocol.create_request(:metadata, correlation_id, client_id, api_version) <> << topic_count :: 32-signed >>
Expand Down
43 changes: 24 additions & 19 deletions lib/kafka_ex/server.ex
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ defmodule KafkaEx.Server do
consumer_group_update_interval: nil,
worker_name: KafkaEx.Server,
ssl_options: [],
use_ssl: false
use_ssl: false,
api_versions: []
)

@type t :: %State{
Expand All @@ -51,6 +52,7 @@ defmodule KafkaEx.Server do
worker_name: atom,
ssl_options: KafkaEx.ssl_options,
use_ssl: boolean,
api_versions: [KafkaEx.Protocol.ApiVersions.ApiVersion],
}

@spec increment_correlation_id(t) :: t
Expand Down Expand Up @@ -313,8 +315,8 @@ defmodule KafkaEx.Server do
def kafka_server_produce_send_request(correlation_id, produce_request, produce_request_data, state) do
{broker, state, corr_id} = case MetadataResponse.broker_for_topic(state.metadata, state.brokers, produce_request.topic, produce_request.partition) do
nil ->
{retrieved_corr_id, _} = retrieve_metadata(state.brokers, state.correlation_id, config_sync_timeout(), produce_request.topic)
state = %{update_metadata(state) | correlation_id: retrieved_corr_id}
{retrieved_corr_id, _} = retrieve_metadata(state.brokers, state.correlation_id, config_sync_timeout(), produce_request.topic, state.api_versions)
state = update_metadata(%{state | correlation_id: retrieved_corr_id})
{
MetadataResponse.broker_for_topic(state.metadata, state.brokers, produce_request.topic, produce_request.partition),
state,
Expand Down Expand Up @@ -383,7 +385,7 @@ defmodule KafkaEx.Server do
end

def kafka_server_metadata(topic, state) do
{correlation_id, metadata} = retrieve_metadata(state.brokers, state.correlation_id, config_sync_timeout(), topic)
{correlation_id, metadata} = retrieve_metadata(state.brokers, state.correlation_id, config_sync_timeout(), topic, state.api_versions)
updated_state = %{state | metadata: metadata, correlation_id: correlation_id}
{:reply, metadata, updated_state}
end
Expand All @@ -392,10 +394,8 @@ defmodule KafkaEx.Server do
{:noreply, update_metadata(state)}
end

def update_metadata(state), do: update_metadata(state, 0)

def update_metadata(state, api_version) do
{correlation_id, metadata} = retrieve_metadata(state.brokers, state.correlation_id, config_sync_timeout(), [], api_version)
def update_metadata(state) do
{correlation_id, metadata} = retrieve_metadata(state.brokers, state.correlation_id, config_sync_timeout(), nil, state.api_versions)
metadata_brokers = metadata.brokers |> Enum.map(&(%{&1 | is_controller: &1.node_id == metadata.controller_id}))
brokers = state.brokers
|> remove_stale_brokers(metadata_brokers)
Expand All @@ -404,30 +404,27 @@ defmodule KafkaEx.Server do
end

# credo:disable-for-next-line Credo.Check.Refactor.FunctionArity
def retrieve_metadata(brokers, correlation_id, sync_timeout, topic \\ [], api_version \\ 0) do
retrieve_metadata(brokers, correlation_id, sync_timeout, topic, @retry_count, 0, api_version)
def retrieve_metadata(brokers, correlation_id, sync_timeout, topic \\ [], server_api_versions \\ [:unsupported]) do
retrieve_metadata(brokers, correlation_id, sync_timeout, topic, @retry_count, 0, server_api_versions)
end

def retrieve_metadata(brokers, correlation_id, sync_timeout, topic, retry, error_code, api_version \\ 0)

# credo:disable-for-next-line Credo.Check.Refactor.FunctionArity
def retrieve_metadata(_, correlation_id, _sync_timeout, topic, 0, error_code, api_version) do
Logger.log(:error, "Metadata request for topic #{inspect topic} failed with error_code #{inspect error_code}")
{correlation_id, %Metadata.Response{}}
def retrieve_metadata(brokers, correlation_id, sync_timeout, topic, retry, error_code, server_api_versions \\ [:unsupported]) do
api_version = Metadata.api_version(server_api_versions)
retrieve_metadata_with_version(brokers, correlation_id, sync_timeout, topic, retry, error_code, api_version)
end

# credo:disable-for-next-line Credo.Check.Refactor.FunctionArity
def retrieve_metadata(brokers, correlation_id, sync_timeout, topic, retry, _error_code, api_version) do
def retrieve_metadata_with_version(brokers, correlation_id, sync_timeout, topic, retry, _error_code, api_version) do
metadata_request = Metadata.create_request(correlation_id, @client_id, topic, api_version)
data = first_broker_response(metadata_request, brokers, sync_timeout)
if data do
response = Metadata.parse_response(data, api_version)

case Enum.find(response.topic_metadatas, &(&1.error_code == :leader_not_available)) do
nil -> {correlation_id + 1, response}
topic_metadata ->
:timer.sleep(300)
retrieve_metadata(brokers, correlation_id + 1, sync_timeout, topic, retry - 1, topic_metadata.error_code, api_version)
retrieve_metadata_with_version(brokers, correlation_id + 1, sync_timeout, topic, retry - 1, topic_metadata.error_code, api_version)
end
else
message = "Unable to fetch metadata from any brokers. Timeout is #{sync_timeout}."
Expand All @@ -437,6 +434,13 @@ defmodule KafkaEx.Server do
end
end

# credo:disable-for-next-line Credo.Check.Refactor.FunctionArity
def retrieve_metadata_with_version(_, correlation_id, _sync_timeout, topic, 0, error_code, server_api_versions) do
Logger.log(:error, "Metadata request for topic #{inspect topic} failed with error_code #{inspect error_code}")
{correlation_id, %Metadata.Response{}}
end


defoverridable [
kafka_server_produce: 2, kafka_server_offset: 4,
kafka_server_metadata: 2, kafka_server_update_metadata: 1,
Expand Down Expand Up @@ -470,7 +474,8 @@ defmodule KafkaEx.Server do
metadata_update_interval: metadata_update_interval,
ssl_options: ssl_options,
use_ssl: use_ssl,
worker_name: name
worker_name: name,
api_versions: [:unsupported]
}

state = update_metadata(state)
Expand Down
Loading