Skip to content

Commit

Permalink
Merge pull request #349 from euranova/fix_bug_create_topics
Browse files Browse the repository at this point in the history
Refresh metadata after creating or deleting a topic
  • Loading branch information
joshuawscott authored Jun 10, 2019
2 parents 8ff19af + df72ac5 commit 7655de9
Show file tree
Hide file tree
Showing 14 changed files with 57 additions and 49 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ KakfaEx supports the following Kafka features:
* Message Compression with Snappy and gzip
* Offset Management (fetch / commit / autocommit)
* Consumer Groups
* Topics Management (create / delete)

See [Kafka Protocol Documentation](http://kafka.apache.org/protocol.html) and
[A Guide to the Kafka Protocol](https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol)
Expand Down
2 changes: 1 addition & 1 deletion all_tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@

# WARN: when changing something here, there should probably also be a change in scripts/ci_tests.sh

mix test --include integration --include consumer_group --include server_0_p_10_and_later --include server_0_p_9_p_0 --include server_0_p_8_p_0
mix test --include integration --include consumer_group --include server_0_p_10_and_later --include server_0_p_10_p_1 --include server_0_p_9_p_0 --include server_0_p_8_p_0
2 changes: 1 addition & 1 deletion lib/kafka_ex.ex
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ defmodule KafkaEx do
alias KafkaEx.Protocol.Produce.Message
alias KafkaEx.Protocol.SyncGroup.Request, as: SyncGroupRequest
alias KafkaEx.Protocol.SyncGroup.Response, as: SyncGroupResponse
alias KafkaEx.Protocol.CreateTopics.Request, as: CreateTopicsRequest
alias KafkaEx.Protocol.CreateTopics.TopicRequest, as: CreateTopicsRequest
alias KafkaEx.Protocol.CreateTopics.Response, as: CreateTopicsResponse
alias KafkaEx.Protocol.DeleteTopics.Response, as: DeleteTopicsResponse
alias KafkaEx.Protocol.ApiVersions.Response, as: ApiVersionsResponse
Expand Down
2 changes: 2 additions & 0 deletions lib/kafka_ex/consumer_group.ex
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ defmodule KafkaEx.ConsumerGroup do
* `:max_restarts`, `:max_seconds` - Supervisor restart policy parameters
* `:partition_assignment_callback` - See
`t:KafkaEx.ConsumerGroup.PartitionAssignment.callback/0`
* `:uris` - See `KafkaEx.create_worker/2`
Note `:session_timeout` is registered with the broker and determines how long
before the broker will de-register a consumer from which it has not heard a
Expand All @@ -111,6 +112,7 @@ defmodule KafkaEx.ConsumerGroup do
| {:name, Supervisor.name()}
| {:max_restarts, non_neg_integer}
| {:max_seconds, non_neg_integer}
| {:uris, KafkaEx.uri()}

@type options :: [option]

Expand Down
2 changes: 1 addition & 1 deletion lib/kafka_ex/gen_consumer/supervisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ defmodule KafkaEx.GenConsumer.Supervisor do
returns `{:ok, pid}`, where `pid` is the PID of the supervisor.
"""
@spec start_link(
{gen_consumer_module ::module, consumer_module :: module},
{gen_consumer_module :: module, consumer_module :: module},
consumer_group_name :: binary,
assigned_partitions :: [
{topic_name :: binary, partition_id :: non_neg_integer}
Expand Down
8 changes: 4 additions & 4 deletions lib/kafka_ex/protocol/create_topics.ex
Original file line number Diff line number Diff line change
Expand Up @@ -46,15 +46,15 @@ defmodule KafkaEx.Protocol.CreateTopics do
topic: binary,
num_partitions: integer,
replication_factor: integer,
replica_assignment: [ReplicaAssignment],
config_entries: [ConfigEntry]
replica_assignment: [ReplicaAssignment.t()],
config_entries: [ConfigEntry.t()]
}
end

defmodule Request do
@moduledoc false
defstruct create_topic_requests: nil, timeout: nil
@type t :: %Request{create_topic_requests: [TopicRequest], timeout: integer}
@type t :: %Request{create_topic_requests: [TopicRequest.t()], timeout: integer}
end

defmodule TopicError do
Expand All @@ -66,7 +66,7 @@ defmodule KafkaEx.Protocol.CreateTopics do
defmodule Response do
@moduledoc false
defstruct topic_errors: nil
@type t :: %Response{topic_errors: [TopicError]}
@type t :: %Response{topic_errors: [TopicError.t()]}
end

def api_version(api_versions) do
Expand Down
2 changes: 1 addition & 1 deletion lib/kafka_ex/protocol/delete_topics.ex
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ defmodule KafkaEx.Protocol.DeleteTopics do
defmodule Response do
@moduledoc false
defstruct topic_errors: nil
@type t :: %Response{topic_errors: [TopicError]}
@type t :: %Response{topic_errors: [TopicError.t()]}
end

def api_version(api_versions) do
Expand Down
2 changes: 1 addition & 1 deletion lib/kafka_ex/protocol/produce.ex
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ defmodule KafkaEx.Protocol.Produce do

@type t :: %Request{
topic: binary,
partition: integer,
partition: integer | nil,
required_acks: integer,
timeout: integer,
compression: atom,
Expand Down
15 changes: 8 additions & 7 deletions lib/kafka_ex/server.ex
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ defmodule KafkaEx.Server do
alias KafkaEx.Protocol.Produce
alias KafkaEx.Protocol.Produce.Request, as: ProduceRequest
alias KafkaEx.Protocol.SyncGroup.Request, as: SyncGroupRequest
alias KafkaEx.Protocol.CreateTopics.Request, as: CreateTopicsRequest
alias KafkaEx.Protocol.CreateTopics.TopicRequest, as: CreateTopicsRequest
alias KafkaEx.Socket

defmodule State do
Expand Down Expand Up @@ -206,19 +206,20 @@ defmodule KafkaEx.Server do
| {:stop, reason, reply, new_state}
| {:stop, reason, new_state}
when reply: term, new_state: term, reason: term
@callback kafka_create_topics(
@callback kafka_server_create_topics(
[CreateTopicsRequest.t()],
network_timeout :: integer,
state :: State.t()
) :: {:reply, reply, new_state}
when reply: term, new_state: term
@callback kafka_delete_topics(
@callback kafka_server_delete_topics(
[String.t()],
network_timeout :: integer,
state :: State.t()
) :: {:reply, reply, new_state}
when reply: term, new_state: term
@callback kafka_api_versions(state :: State.t()) :: {:reply, reply, new_state}
@callback kafka_server_api_versions(state :: State.t()) ::
{:reply, reply, new_state}
when reply: term, new_state: term
@callback kafka_server_update_metadata(state :: State.t()) ::
{:noreply, new_state}
Expand Down Expand Up @@ -337,15 +338,15 @@ defmodule KafkaEx.Server do
end

def handle_call({:create_topics, requests, network_timeout}, _from, state) do
kafka_create_topics(requests, network_timeout, state)
kafka_server_create_topics(requests, network_timeout, state)
end

def handle_call({:delete_topics, topics, network_timeout}, _from, state) do
kafka_delete_topics(topics, network_timeout, state)
kafka_server_delete_topics(topics, network_timeout, state)
end

def handle_call({:api_versions}, _from, state) do
kafka_api_versions(state)
kafka_server_api_versions(state)
end

def handle_info(:update_metadata, state) do
Expand Down
13 changes: 9 additions & 4 deletions lib/kafka_ex/server_0_p_10_and_later.ex
Original file line number Diff line number Diff line change
Expand Up @@ -95,10 +95,11 @@ defmodule KafkaEx.Server0P10AndLater do
check_brokers_sockets!(brokers)

{_,

%KafkaEx.Protocol.ApiVersions.Response{
api_versions: api_versions,
error_code: error_code
}, state} = kafka_api_versions(%State{brokers: brokers})
}, state} = kafka_server_api_versions(%State{brokers: brokers})
if error_code == :no_response do
sleep_for_reconnect()
raise "Brokers sockets are closed"
Expand Down Expand Up @@ -181,7 +182,7 @@ defmodule KafkaEx.Server0P10AndLater do
{:noreply, update_metadata(state)}
end

def kafka_api_versions(state) do
def kafka_server_api_versions(state) do
response =
state.correlation_id
|> ApiVersions.create_request(@client_id)
Expand All @@ -191,7 +192,7 @@ defmodule KafkaEx.Server0P10AndLater do
{:reply, response, %{state | correlation_id: state.correlation_id + 1}}
end

def kafka_delete_topics(topics, network_timeout, state) do
def kafka_server_delete_topics(topics, network_timeout, state) do
api_version =
case DeleteTopics.api_version(state.api_versions) do
{:ok, api_version} ->
Expand Down Expand Up @@ -235,10 +236,12 @@ defmodule KafkaEx.Server0P10AndLater do
{response, %{state | correlation_id: state.correlation_id + 1}}
end

state = update_metadata(state)

{:reply, response, state}
end

def kafka_create_topics(requests, network_timeout, state) do
def kafka_server_create_topics(requests, network_timeout, state) do
api_version =
case CreateTopics.api_version(state.api_versions) do
{:ok, api_version} ->
Expand Down Expand Up @@ -284,6 +287,8 @@ defmodule KafkaEx.Server0P10AndLater do
{response, %{state | correlation_id: state.correlation_id + 1}}
end

state = update_metadata(state)

{:reply, response, state}
end

Expand Down
12 changes: 6 additions & 6 deletions lib/kafka_ex/server_0_p_8_p_0.ex
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ defmodule KafkaEx.Server0P8P0 do
{:nowarn_function, kafka_server_consumer_group: 1},
{:nowarn_function, kafka_server_offset_commit: 2},
{:nowarn_function, kafka_server_offset_fetch: 2},
{:nowarn_function, kafka_create_topics: 3},
{:nowarn_function, kafka_delete_topics: 3},
{:nowarn_function, kafka_api_versions: 1}
{:nowarn_function, kafka_server_create_topics: 3},
{:nowarn_function, kafka_server_delete_topics: 3},
{:nowarn_function, kafka_server_api_versions: 1}
]

use KafkaEx.Server
Expand Down Expand Up @@ -89,13 +89,13 @@ defmodule KafkaEx.Server0P8P0 do
"Consumer Group Metadata is not supported in 0.8.0 version of kafka"
)

def kafka_api_versions(_state),
def kafka_server_api_versions(_state),
do: raise("ApiVersions is not supported in 0.8.0 version of kafka")

def kafka_create_topics(_, _, _state),
def kafka_server_create_topics(_, _, _state),
do: raise("CreateTopic is not supported in 0.8.0 version of kafka")

def kafka_delete_topics(_, _, _state),
def kafka_server_delete_topics(_, _, _state),
do: raise("DeleteTopic is not supported in 0.8.0 version of kafka")

defp fetch(request, state) do
Expand Down
12 changes: 6 additions & 6 deletions lib/kafka_ex/server_0_p_8_p_2.ex
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ defmodule KafkaEx.Server0P8P2 do
{:nowarn_function, kafka_server_sync_group: 3},
{:nowarn_function, kafka_server_join_group: 3},
{:nowarn_function, kafka_server_leave_group: 3},
{:nowarn_function, kafka_create_topics: 3},
{:nowarn_function, kafka_delete_topics: 3},
{:nowarn_function, kafka_api_versions: 1}
{:nowarn_function, kafka_server_create_topics: 3},
{:nowarn_function, kafka_server_delete_topics: 3},
{:nowarn_function, kafka_server_api_versions: 1}
]

use KafkaEx.Server
Expand Down Expand Up @@ -200,13 +200,13 @@ defmodule KafkaEx.Server0P8P2 do
def kafka_server_heartbeat(_, _, _state),
do: raise("Heartbeat is not supported in 0.8.2 version of kafka")

def kafka_api_versions(_state),
def kafka_server_api_versions(_state),
do: raise("ApiVersions is not supported in 0.8.2 version of kafka")

def kafka_create_topics(_, _, _state),
def kafka_server_create_topics(_, _, _state),
do: raise("CreateTopic is not supported in 0.8.2 version of kafka")

def kafka_delete_topics(_, _, _state),
def kafka_server_delete_topics(_, _, _state),
do: raise("DeleteTopic is not supported in 0.8.2 version of kafka")

defp update_consumer_metadata(state),
Expand Down
12 changes: 6 additions & 6 deletions lib/kafka_ex/server_0_p_9_p_0.ex
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ defmodule KafkaEx.Server0P9P0 do

# these functions aren't implemented for 0.9.0
@dialyzer [
{:nowarn_function, kafka_create_topics: 3},
{:nowarn_function, kafka_delete_topics: 3},
{:nowarn_function, kafka_api_versions: 1}
{:nowarn_function, kafka_server_create_topics: 3},
{:nowarn_function, kafka_server_delete_topics: 3},
{:nowarn_function, kafka_server_api_versions: 1}
]

use KafkaEx.Server
Expand Down Expand Up @@ -52,13 +52,13 @@ defmodule KafkaEx.Server0P9P0 do
defdelegate kafka_server_consumer_group_metadata(state), to: Server0P8P2
defdelegate kafka_server_update_consumer_metadata(state), to: Server0P8P2

def kafka_api_versions(_state),
def kafka_server_api_versions(_state),
do: raise("ApiVersions is not supported in 0.9.0 version of kafka")

def kafka_create_topics(_, _, _state),
def kafka_server_create_topics(_, _, _state),
do: raise("CreateTopic is not supported in 0.9.0 version of kafka")

def kafka_delete_topics(_, _, _state),
def kafka_server_delete_topics(_, _, _state),
do: raise("DeleteTopic is not supported in 0.9.0 version of kafka")

def kafka_server_init([args]) do
Expand Down
21 changes: 10 additions & 11 deletions test/integration/server0_p_10_and_later_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ defmodule KafkaEx.Server0P10P1AndLater.Test do
import TestHelper

@moduletag :server_0_p_10_and_later
@num_partitions 10

@tag :create_topic
test "can create a topic" do
Expand All @@ -19,11 +20,14 @@ defmodule KafkaEx.Server0P10P1AndLater.Test do
resp = create_topic(name, config)
assert {:topic_already_exists, name} == parse_create_topic_resp(resp)

wait_for(fn ->
Enum.member?(existing_topics(), name)
end)

assert Enum.member?(existing_topics(), name)

assert @num_partitions ==
KafkaEx.Protocol.Metadata.Response.partitions_for_topic(
KafkaEx.metadata(),
name
)
|> Enum.count()
end

@tag :delete_topic
Expand All @@ -39,11 +43,6 @@ defmodule KafkaEx.Server0P10P1AndLater.Test do

resp = KafkaEx.delete_topics([name], timeout: 5_000)
assert {:no_error, name} = parse_delete_topic_resp(resp)

wait_for(fn ->
not Enum.member?(existing_topics(), name)
end)

assert not Enum.member?(existing_topics(), name)
end

Expand Down Expand Up @@ -78,13 +77,13 @@ defmodule KafkaEx.Server0P10P1AndLater.Test do
[
%{
topic: name,
num_partitions: 10,
num_partitions: @num_partitions,
replication_factor: 1,
replica_assignment: [],
config_entries: config
}
],
timeout: 5_000
timeout: 10_000
)
end

Expand Down

0 comments on commit 7655de9

Please sign in to comment.