Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refresh metadata after creating or deleting a topic #349

Merged
merged 5 commits into from
Jun 10, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ KakfaEx supports the following Kafka features:
* Message Compression with Snappy and gzip
* Offset Management (fetch / commit / autocommit)
* Consumer Groups
* Topics Management (create / delete)

See [Kafka Protocol Documentation](http://kafka.apache.org/protocol.html) and
[A Guide to the Kafka Protocol](https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol)
Expand Down
2 changes: 1 addition & 1 deletion all_tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@

# WARN: when changing something here, there should probably also be a change in scripts/ci_tests.sh

mix test --include integration --include consumer_group --include server_0_p_10_and_later --include server_0_p_9_p_0 --include server_0_p_8_p_0
mix test --include integration --include consumer_group --include server_0_p_10_and_later --include server_0_p_10_p_1 --include server_0_p_9_p_0 --include server_0_p_8_p_0
2 changes: 1 addition & 1 deletion lib/kafka_ex.ex
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ defmodule KafkaEx do
alias KafkaEx.Protocol.Produce.Message
alias KafkaEx.Protocol.SyncGroup.Request, as: SyncGroupRequest
alias KafkaEx.Protocol.SyncGroup.Response, as: SyncGroupResponse
alias KafkaEx.Protocol.CreateTopics.Request, as: CreateTopicsRequest
alias KafkaEx.Protocol.CreateTopics.TopicRequest, as: CreateTopicsRequest
alias KafkaEx.Protocol.CreateTopics.Response, as: CreateTopicsResponse
alias KafkaEx.Protocol.DeleteTopics.Response, as: DeleteTopicsResponse
alias KafkaEx.Protocol.ApiVersions.Response, as: ApiVersionsResponse
Expand Down
2 changes: 2 additions & 0 deletions lib/kafka_ex/consumer_group.ex
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ defmodule KafkaEx.ConsumerGroup do
* `:max_restarts`, `:max_seconds` - Supervisor restart policy parameters
* `:partition_assignment_callback` - See
`t:KafkaEx.ConsumerGroup.PartitionAssignment.callback/0`
* `:uris` - See `KafkaEx.create_worker/2`

Note `:session_timeout` is registered with the broker and determines how long
before the broker will de-register a consumer from which it has not heard a
Expand All @@ -111,6 +112,7 @@ defmodule KafkaEx.ConsumerGroup do
| {:name, Supervisor.name()}
| {:max_restarts, non_neg_integer}
| {:max_seconds, non_neg_integer}
| {:uris, KafkaEx.uri()}

@type options :: [option]

Expand Down
2 changes: 1 addition & 1 deletion lib/kafka_ex/gen_consumer/supervisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ defmodule KafkaEx.GenConsumer.Supervisor do
returns `{:ok, pid}`, where `pid` is the PID of the supervisor.
"""
@spec start_link(
{gen_consumer_module ::module, consumer_module :: module},
{gen_consumer_module :: module, consumer_module :: module},
consumer_group_name :: binary,
assigned_partitions :: [
{topic_name :: binary, partition_id :: non_neg_integer}
Expand Down
8 changes: 4 additions & 4 deletions lib/kafka_ex/protocol/create_topics.ex
Original file line number Diff line number Diff line change
Expand Up @@ -46,15 +46,15 @@ defmodule KafkaEx.Protocol.CreateTopics do
topic: binary,
num_partitions: integer,
replication_factor: integer,
replica_assignment: [ReplicaAssignment],
config_entries: [ConfigEntry]
replica_assignment: [ReplicaAssignment.t()],
config_entries: [ConfigEntry.t()]
}
end

defmodule Request do
@moduledoc false
defstruct create_topic_requests: nil, timeout: nil
@type t :: %Request{create_topic_requests: [TopicRequest], timeout: integer}
@type t :: %Request{create_topic_requests: [TopicRequest.t()], timeout: integer}
end

defmodule TopicError do
Expand All @@ -66,7 +66,7 @@ defmodule KafkaEx.Protocol.CreateTopics do
defmodule Response do
@moduledoc false
defstruct topic_errors: nil
@type t :: %Response{topic_errors: [TopicError]}
@type t :: %Response{topic_errors: [TopicError.t()]}
end

def api_version(api_versions) do
Expand Down
2 changes: 1 addition & 1 deletion lib/kafka_ex/protocol/delete_topics.ex
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ defmodule KafkaEx.Protocol.DeleteTopics do
defmodule Response do
@moduledoc false
defstruct topic_errors: nil
@type t :: %Response{topic_errors: [TopicError]}
@type t :: %Response{topic_errors: [TopicError.t()]}
end

def api_version(api_versions) do
Expand Down
2 changes: 1 addition & 1 deletion lib/kafka_ex/protocol/produce.ex
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ defmodule KafkaEx.Protocol.Produce do

@type t :: %Request{
topic: binary,
partition: integer,
partition: integer | nil,
required_acks: integer,
timeout: integer,
compression: atom,
Expand Down
15 changes: 8 additions & 7 deletions lib/kafka_ex/server.ex
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ defmodule KafkaEx.Server do
alias KafkaEx.Protocol.Produce
alias KafkaEx.Protocol.Produce.Request, as: ProduceRequest
alias KafkaEx.Protocol.SyncGroup.Request, as: SyncGroupRequest
alias KafkaEx.Protocol.CreateTopics.Request, as: CreateTopicsRequest
alias KafkaEx.Protocol.CreateTopics.TopicRequest, as: CreateTopicsRequest
alias KafkaEx.Socket

defmodule State do
Expand Down Expand Up @@ -206,19 +206,20 @@ defmodule KafkaEx.Server do
| {:stop, reason, reply, new_state}
| {:stop, reason, new_state}
when reply: term, new_state: term, reason: term
@callback kafka_create_topics(
@callback kafka_server_create_topics(
[CreateTopicsRequest.t()],
network_timeout :: integer,
state :: State.t()
) :: {:reply, reply, new_state}
when reply: term, new_state: term
@callback kafka_delete_topics(
@callback kafka_server_delete_topics(
[String.t()],
network_timeout :: integer,
state :: State.t()
) :: {:reply, reply, new_state}
when reply: term, new_state: term
@callback kafka_api_versions(state :: State.t()) :: {:reply, reply, new_state}
@callback kafka_server_api_versions(state :: State.t()) ::
{:reply, reply, new_state}
when reply: term, new_state: term
@callback kafka_server_update_metadata(state :: State.t()) ::
{:noreply, new_state}
Expand Down Expand Up @@ -337,15 +338,15 @@ defmodule KafkaEx.Server do
end

def handle_call({:create_topics, requests, network_timeout}, _from, state) do
kafka_create_topics(requests, network_timeout, state)
kafka_server_create_topics(requests, network_timeout, state)
end

def handle_call({:delete_topics, topics, network_timeout}, _from, state) do
kafka_delete_topics(topics, network_timeout, state)
kafka_server_delete_topics(topics, network_timeout, state)
end

def handle_call({:api_versions}, _from, state) do
kafka_api_versions(state)
kafka_server_api_versions(state)
end

def handle_info(:update_metadata, state) do
Expand Down
13 changes: 9 additions & 4 deletions lib/kafka_ex/server_0_p_10_and_later.ex
Original file line number Diff line number Diff line change
Expand Up @@ -95,10 +95,11 @@ defmodule KafkaEx.Server0P10AndLater do
check_brokers_sockets!(brokers)

{_,

%KafkaEx.Protocol.ApiVersions.Response{
api_versions: api_versions,
error_code: error_code
}, state} = kafka_api_versions(%State{brokers: brokers})
}, state} = kafka_server_api_versions(%State{brokers: brokers})
if error_code == :no_response do
sleep_for_reconnect()
raise "Brokers sockets are closed"
Expand Down Expand Up @@ -181,7 +182,7 @@ defmodule KafkaEx.Server0P10AndLater do
{:noreply, update_metadata(state)}
end

def kafka_api_versions(state) do
def kafka_server_api_versions(state) do
response =
state.correlation_id
|> ApiVersions.create_request(@client_id)
Expand All @@ -191,7 +192,7 @@ defmodule KafkaEx.Server0P10AndLater do
{:reply, response, %{state | correlation_id: state.correlation_id + 1}}
end

def kafka_delete_topics(topics, network_timeout, state) do
def kafka_server_delete_topics(topics, network_timeout, state) do
api_version =
case DeleteTopics.api_version(state.api_versions) do
{:ok, api_version} ->
Expand Down Expand Up @@ -235,10 +236,12 @@ defmodule KafkaEx.Server0P10AndLater do
{response, %{state | correlation_id: state.correlation_id + 1}}
end

state = update_metadata(state)

jbruggem marked this conversation as resolved.
Show resolved Hide resolved
{:reply, response, state}
end

def kafka_create_topics(requests, network_timeout, state) do
def kafka_server_create_topics(requests, network_timeout, state) do
api_version =
case CreateTopics.api_version(state.api_versions) do
{:ok, api_version} ->
Expand Down Expand Up @@ -284,6 +287,8 @@ defmodule KafkaEx.Server0P10AndLater do
{response, %{state | correlation_id: state.correlation_id + 1}}
end

state = update_metadata(state)

jbruggem marked this conversation as resolved.
Show resolved Hide resolved
{:reply, response, state}
end

Expand Down
12 changes: 6 additions & 6 deletions lib/kafka_ex/server_0_p_8_p_0.ex
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ defmodule KafkaEx.Server0P8P0 do
{:nowarn_function, kafka_server_consumer_group: 1},
{:nowarn_function, kafka_server_offset_commit: 2},
{:nowarn_function, kafka_server_offset_fetch: 2},
{:nowarn_function, kafka_create_topics: 3},
{:nowarn_function, kafka_delete_topics: 3},
{:nowarn_function, kafka_api_versions: 1}
{:nowarn_function, kafka_server_create_topics: 3},
{:nowarn_function, kafka_server_delete_topics: 3},
{:nowarn_function, kafka_server_api_versions: 1}
]

use KafkaEx.Server
Expand Down Expand Up @@ -89,13 +89,13 @@ defmodule KafkaEx.Server0P8P0 do
"Consumer Group Metadata is not supported in 0.8.0 version of kafka"
)

def kafka_api_versions(_state),
def kafka_server_api_versions(_state),
do: raise("ApiVersions is not supported in 0.8.0 version of kafka")

def kafka_create_topics(_, _, _state),
def kafka_server_create_topics(_, _, _state),
do: raise("CreateTopic is not supported in 0.8.0 version of kafka")

def kafka_delete_topics(_, _, _state),
def kafka_server_delete_topics(_, _, _state),
do: raise("DeleteTopic is not supported in 0.8.0 version of kafka")

defp fetch(request, state) do
Expand Down
12 changes: 6 additions & 6 deletions lib/kafka_ex/server_0_p_8_p_2.ex
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ defmodule KafkaEx.Server0P8P2 do
{:nowarn_function, kafka_server_sync_group: 3},
{:nowarn_function, kafka_server_join_group: 3},
{:nowarn_function, kafka_server_leave_group: 3},
{:nowarn_function, kafka_create_topics: 3},
{:nowarn_function, kafka_delete_topics: 3},
{:nowarn_function, kafka_api_versions: 1}
{:nowarn_function, kafka_server_create_topics: 3},
{:nowarn_function, kafka_server_delete_topics: 3},
{:nowarn_function, kafka_server_api_versions: 1}
]

use KafkaEx.Server
Expand Down Expand Up @@ -200,13 +200,13 @@ defmodule KafkaEx.Server0P8P2 do
def kafka_server_heartbeat(_, _, _state),
do: raise("Heartbeat is not supported in 0.8.2 version of kafka")

def kafka_api_versions(_state),
def kafka_server_api_versions(_state),
do: raise("ApiVersions is not supported in 0.8.2 version of kafka")

def kafka_create_topics(_, _, _state),
def kafka_server_create_topics(_, _, _state),
do: raise("CreateTopic is not supported in 0.8.2 version of kafka")

def kafka_delete_topics(_, _, _state),
def kafka_server_delete_topics(_, _, _state),
do: raise("DeleteTopic is not supported in 0.8.2 version of kafka")

defp update_consumer_metadata(state),
Expand Down
12 changes: 6 additions & 6 deletions lib/kafka_ex/server_0_p_9_p_0.ex
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ defmodule KafkaEx.Server0P9P0 do

# these functions aren't implemented for 0.9.0
@dialyzer [
{:nowarn_function, kafka_create_topics: 3},
{:nowarn_function, kafka_delete_topics: 3},
{:nowarn_function, kafka_api_versions: 1}
{:nowarn_function, kafka_server_create_topics: 3},
{:nowarn_function, kafka_server_delete_topics: 3},
{:nowarn_function, kafka_server_api_versions: 1}
]

use KafkaEx.Server
Expand Down Expand Up @@ -52,13 +52,13 @@ defmodule KafkaEx.Server0P9P0 do
defdelegate kafka_server_consumer_group_metadata(state), to: Server0P8P2
defdelegate kafka_server_update_consumer_metadata(state), to: Server0P8P2

def kafka_api_versions(_state),
def kafka_server_api_versions(_state),
do: raise("ApiVersions is not supported in 0.9.0 version of kafka")

def kafka_create_topics(_, _, _state),
def kafka_server_create_topics(_, _, _state),
do: raise("CreateTopic is not supported in 0.9.0 version of kafka")

def kafka_delete_topics(_, _, _state),
def kafka_server_delete_topics(_, _, _state),
do: raise("DeleteTopic is not supported in 0.9.0 version of kafka")

def kafka_server_init([args]) do
Expand Down
21 changes: 10 additions & 11 deletions test/integration/server0_p_10_and_later_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ defmodule KafkaEx.Server0P10P1AndLater.Test do
import TestHelper

@moduletag :server_0_p_10_and_later
@num_partitions 10

@tag :create_topic
test "can create a topic" do
Expand All @@ -19,11 +20,14 @@ defmodule KafkaEx.Server0P10P1AndLater.Test do
resp = create_topic(name, config)
assert {:topic_already_exists, name} == parse_create_topic_resp(resp)

wait_for(fn ->
Enum.member?(existing_topics(), name)
end)

assert Enum.member?(existing_topics(), name)

assert @num_partitions ==
KafkaEx.Protocol.Metadata.Response.partitions_for_topic(
KafkaEx.metadata(),
name
)
|> Enum.count()
end

@tag :delete_topic
Expand All @@ -39,11 +43,6 @@ defmodule KafkaEx.Server0P10P1AndLater.Test do

resp = KafkaEx.delete_topics([name], timeout: 5_000)
assert {:no_error, name} = parse_delete_topic_resp(resp)

wait_for(fn ->
not Enum.member?(existing_topics(), name)
end)

assert not Enum.member?(existing_topics(), name)
end

Expand Down Expand Up @@ -78,13 +77,13 @@ defmodule KafkaEx.Server0P10P1AndLater.Test do
[
%{
topic: name,
num_partitions: 10,
num_partitions: @num_partitions,
replication_factor: 1,
replica_assignment: [],
config_entries: config
}
],
timeout: 5_000
timeout: 10_000
)
end

Expand Down