diff --git a/README.md b/README.md index f6b2a149..1c575ac1 100644 --- a/README.md +++ b/README.md @@ -25,6 +25,7 @@ KakfaEx supports the following Kafka features: * Message Compression with Snappy and gzip * Offset Management (fetch / commit / autocommit) * Consumer Groups +* Topics Management (create / delete) See [Kafka Protocol Documentation](http://kafka.apache.org/protocol.html) and [A Guide to the Kafka Protocol](https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol) diff --git a/all_tests.sh b/all_tests.sh index f0d1d2c4..95d299b9 100755 --- a/all_tests.sh +++ b/all_tests.sh @@ -2,4 +2,4 @@ # WARN: when changing something here, there should probably also be a change in scripts/ci_tests.sh -mix test --include integration --include consumer_group --include server_0_p_10_and_later --include server_0_p_9_p_0 --include server_0_p_8_p_0 +mix test --include integration --include consumer_group --include server_0_p_10_and_later --include server_0_p_10_p_1 --include server_0_p_9_p_0 --include server_0_p_8_p_0 diff --git a/lib/kafka_ex.ex b/lib/kafka_ex.ex index b678bacd..4dd1273c 100644 --- a/lib/kafka_ex.ex +++ b/lib/kafka_ex.ex @@ -22,7 +22,7 @@ defmodule KafkaEx do alias KafkaEx.Protocol.Produce.Message alias KafkaEx.Protocol.SyncGroup.Request, as: SyncGroupRequest alias KafkaEx.Protocol.SyncGroup.Response, as: SyncGroupResponse - alias KafkaEx.Protocol.CreateTopics.Request, as: CreateTopicsRequest + alias KafkaEx.Protocol.CreateTopics.TopicRequest, as: CreateTopicsRequest alias KafkaEx.Protocol.CreateTopics.Response, as: CreateTopicsResponse alias KafkaEx.Protocol.DeleteTopics.Response, as: DeleteTopicsResponse alias KafkaEx.Protocol.ApiVersions.Response, as: ApiVersionsResponse diff --git a/lib/kafka_ex/consumer_group.ex b/lib/kafka_ex/consumer_group.ex index dd1909a4..41f69368 100644 --- a/lib/kafka_ex/consumer_group.ex +++ b/lib/kafka_ex/consumer_group.ex @@ -94,6 +94,7 @@ defmodule KafkaEx.ConsumerGroup do * `:max_restarts`, `:max_seconds` - Supervisor restart policy parameters * `:partition_assignment_callback` - See `t:KafkaEx.ConsumerGroup.PartitionAssignment.callback/0` + * `:uris` - See `KafkaEx.create_worker/2` Note `:session_timeout` is registered with the broker and determines how long before the broker will de-register a consumer from which it has not heard a @@ -111,6 +112,7 @@ defmodule KafkaEx.ConsumerGroup do | {:name, Supervisor.name()} | {:max_restarts, non_neg_integer} | {:max_seconds, non_neg_integer} + | {:uris, KafkaEx.uri()} @type options :: [option] diff --git a/lib/kafka_ex/gen_consumer/supervisor.ex b/lib/kafka_ex/gen_consumer/supervisor.ex index 767d7d2f..cbe7ce39 100644 --- a/lib/kafka_ex/gen_consumer/supervisor.ex +++ b/lib/kafka_ex/gen_consumer/supervisor.ex @@ -33,7 +33,7 @@ defmodule KafkaEx.GenConsumer.Supervisor do returns `{:ok, pid}`, where `pid` is the PID of the supervisor. """ @spec start_link( - {gen_consumer_module ::module, consumer_module :: module}, + {gen_consumer_module :: module, consumer_module :: module}, consumer_group_name :: binary, assigned_partitions :: [ {topic_name :: binary, partition_id :: non_neg_integer} diff --git a/lib/kafka_ex/protocol/create_topics.ex b/lib/kafka_ex/protocol/create_topics.ex index 106d1976..0a447dfb 100644 --- a/lib/kafka_ex/protocol/create_topics.ex +++ b/lib/kafka_ex/protocol/create_topics.ex @@ -46,15 +46,15 @@ defmodule KafkaEx.Protocol.CreateTopics do topic: binary, num_partitions: integer, replication_factor: integer, - replica_assignment: [ReplicaAssignment], - config_entries: [ConfigEntry] + replica_assignment: [ReplicaAssignment.t()], + config_entries: [ConfigEntry.t()] } end defmodule Request do @moduledoc false defstruct create_topic_requests: nil, timeout: nil - @type t :: %Request{create_topic_requests: [TopicRequest], timeout: integer} + @type t :: %Request{create_topic_requests: [TopicRequest.t()], timeout: integer} end defmodule TopicError do @@ -66,7 +66,7 @@ defmodule KafkaEx.Protocol.CreateTopics do defmodule Response do @moduledoc false defstruct topic_errors: nil - @type t :: %Response{topic_errors: [TopicError]} + @type t :: %Response{topic_errors: [TopicError.t()]} end def api_version(api_versions) do diff --git a/lib/kafka_ex/protocol/delete_topics.ex b/lib/kafka_ex/protocol/delete_topics.ex index 71bf714d..53f662e9 100644 --- a/lib/kafka_ex/protocol/delete_topics.ex +++ b/lib/kafka_ex/protocol/delete_topics.ex @@ -33,7 +33,7 @@ defmodule KafkaEx.Protocol.DeleteTopics do defmodule Response do @moduledoc false defstruct topic_errors: nil - @type t :: %Response{topic_errors: [TopicError]} + @type t :: %Response{topic_errors: [TopicError.t()]} end def api_version(api_versions) do diff --git a/lib/kafka_ex/protocol/produce.ex b/lib/kafka_ex/protocol/produce.ex index 710c2468..1236839b 100644 --- a/lib/kafka_ex/protocol/produce.ex +++ b/lib/kafka_ex/protocol/produce.ex @@ -31,7 +31,7 @@ defmodule KafkaEx.Protocol.Produce do @type t :: %Request{ topic: binary, - partition: integer, + partition: integer | nil, required_acks: integer, timeout: integer, compression: atom, diff --git a/lib/kafka_ex/server.ex b/lib/kafka_ex/server.ex index bb3fda5b..19461656 100644 --- a/lib/kafka_ex/server.ex +++ b/lib/kafka_ex/server.ex @@ -17,7 +17,7 @@ defmodule KafkaEx.Server do alias KafkaEx.Protocol.Produce alias KafkaEx.Protocol.Produce.Request, as: ProduceRequest alias KafkaEx.Protocol.SyncGroup.Request, as: SyncGroupRequest - alias KafkaEx.Protocol.CreateTopics.Request, as: CreateTopicsRequest + alias KafkaEx.Protocol.CreateTopics.TopicRequest, as: CreateTopicsRequest alias KafkaEx.Socket defmodule State do @@ -206,19 +206,20 @@ defmodule KafkaEx.Server do | {:stop, reason, reply, new_state} | {:stop, reason, new_state} when reply: term, new_state: term, reason: term - @callback kafka_create_topics( + @callback kafka_server_create_topics( [CreateTopicsRequest.t()], network_timeout :: integer, state :: State.t() ) :: {:reply, reply, new_state} when reply: term, new_state: term - @callback kafka_delete_topics( + @callback kafka_server_delete_topics( [String.t()], network_timeout :: integer, state :: State.t() ) :: {:reply, reply, new_state} when reply: term, new_state: term - @callback kafka_api_versions(state :: State.t()) :: {:reply, reply, new_state} + @callback kafka_server_api_versions(state :: State.t()) :: + {:reply, reply, new_state} when reply: term, new_state: term @callback kafka_server_update_metadata(state :: State.t()) :: {:noreply, new_state} @@ -337,15 +338,15 @@ defmodule KafkaEx.Server do end def handle_call({:create_topics, requests, network_timeout}, _from, state) do - kafka_create_topics(requests, network_timeout, state) + kafka_server_create_topics(requests, network_timeout, state) end def handle_call({:delete_topics, topics, network_timeout}, _from, state) do - kafka_delete_topics(topics, network_timeout, state) + kafka_server_delete_topics(topics, network_timeout, state) end def handle_call({:api_versions}, _from, state) do - kafka_api_versions(state) + kafka_server_api_versions(state) end def handle_info(:update_metadata, state) do diff --git a/lib/kafka_ex/server_0_p_10_and_later.ex b/lib/kafka_ex/server_0_p_10_and_later.ex index 6cf2cd07..0e6ccbe2 100644 --- a/lib/kafka_ex/server_0_p_10_and_later.ex +++ b/lib/kafka_ex/server_0_p_10_and_later.ex @@ -95,10 +95,11 @@ defmodule KafkaEx.Server0P10AndLater do check_brokers_sockets!(brokers) {_, + %KafkaEx.Protocol.ApiVersions.Response{ api_versions: api_versions, error_code: error_code - }, state} = kafka_api_versions(%State{brokers: brokers}) + }, state} = kafka_server_api_versions(%State{brokers: brokers}) if error_code == :no_response do sleep_for_reconnect() raise "Brokers sockets are closed" @@ -181,7 +182,7 @@ defmodule KafkaEx.Server0P10AndLater do {:noreply, update_metadata(state)} end - def kafka_api_versions(state) do + def kafka_server_api_versions(state) do response = state.correlation_id |> ApiVersions.create_request(@client_id) @@ -191,7 +192,7 @@ defmodule KafkaEx.Server0P10AndLater do {:reply, response, %{state | correlation_id: state.correlation_id + 1}} end - def kafka_delete_topics(topics, network_timeout, state) do + def kafka_server_delete_topics(topics, network_timeout, state) do api_version = case DeleteTopics.api_version(state.api_versions) do {:ok, api_version} -> @@ -235,10 +236,12 @@ defmodule KafkaEx.Server0P10AndLater do {response, %{state | correlation_id: state.correlation_id + 1}} end + state = update_metadata(state) + {:reply, response, state} end - def kafka_create_topics(requests, network_timeout, state) do + def kafka_server_create_topics(requests, network_timeout, state) do api_version = case CreateTopics.api_version(state.api_versions) do {:ok, api_version} -> @@ -284,6 +287,8 @@ defmodule KafkaEx.Server0P10AndLater do {response, %{state | correlation_id: state.correlation_id + 1}} end + state = update_metadata(state) + {:reply, response, state} end diff --git a/lib/kafka_ex/server_0_p_8_p_0.ex b/lib/kafka_ex/server_0_p_8_p_0.ex index 6e1a176a..b92f0499 100644 --- a/lib/kafka_ex/server_0_p_8_p_0.ex +++ b/lib/kafka_ex/server_0_p_8_p_0.ex @@ -14,9 +14,9 @@ defmodule KafkaEx.Server0P8P0 do {:nowarn_function, kafka_server_consumer_group: 1}, {:nowarn_function, kafka_server_offset_commit: 2}, {:nowarn_function, kafka_server_offset_fetch: 2}, - {:nowarn_function, kafka_create_topics: 3}, - {:nowarn_function, kafka_delete_topics: 3}, - {:nowarn_function, kafka_api_versions: 1} + {:nowarn_function, kafka_server_create_topics: 3}, + {:nowarn_function, kafka_server_delete_topics: 3}, + {:nowarn_function, kafka_server_api_versions: 1} ] use KafkaEx.Server @@ -89,13 +89,13 @@ defmodule KafkaEx.Server0P8P0 do "Consumer Group Metadata is not supported in 0.8.0 version of kafka" ) - def kafka_api_versions(_state), + def kafka_server_api_versions(_state), do: raise("ApiVersions is not supported in 0.8.0 version of kafka") - def kafka_create_topics(_, _, _state), + def kafka_server_create_topics(_, _, _state), do: raise("CreateTopic is not supported in 0.8.0 version of kafka") - def kafka_delete_topics(_, _, _state), + def kafka_server_delete_topics(_, _, _state), do: raise("DeleteTopic is not supported in 0.8.0 version of kafka") defp fetch(request, state) do diff --git a/lib/kafka_ex/server_0_p_8_p_2.ex b/lib/kafka_ex/server_0_p_8_p_2.ex index c407cc4b..53007a6e 100644 --- a/lib/kafka_ex/server_0_p_8_p_2.ex +++ b/lib/kafka_ex/server_0_p_8_p_2.ex @@ -9,9 +9,9 @@ defmodule KafkaEx.Server0P8P2 do {:nowarn_function, kafka_server_sync_group: 3}, {:nowarn_function, kafka_server_join_group: 3}, {:nowarn_function, kafka_server_leave_group: 3}, - {:nowarn_function, kafka_create_topics: 3}, - {:nowarn_function, kafka_delete_topics: 3}, - {:nowarn_function, kafka_api_versions: 1} + {:nowarn_function, kafka_server_create_topics: 3}, + {:nowarn_function, kafka_server_delete_topics: 3}, + {:nowarn_function, kafka_server_api_versions: 1} ] use KafkaEx.Server @@ -200,13 +200,13 @@ defmodule KafkaEx.Server0P8P2 do def kafka_server_heartbeat(_, _, _state), do: raise("Heartbeat is not supported in 0.8.2 version of kafka") - def kafka_api_versions(_state), + def kafka_server_api_versions(_state), do: raise("ApiVersions is not supported in 0.8.2 version of kafka") - def kafka_create_topics(_, _, _state), + def kafka_server_create_topics(_, _, _state), do: raise("CreateTopic is not supported in 0.8.2 version of kafka") - def kafka_delete_topics(_, _, _state), + def kafka_server_delete_topics(_, _, _state), do: raise("DeleteTopic is not supported in 0.8.2 version of kafka") defp update_consumer_metadata(state), diff --git a/lib/kafka_ex/server_0_p_9_p_0.ex b/lib/kafka_ex/server_0_p_9_p_0.ex index 8d305172..178de9c5 100644 --- a/lib/kafka_ex/server_0_p_9_p_0.ex +++ b/lib/kafka_ex/server_0_p_9_p_0.ex @@ -6,9 +6,9 @@ defmodule KafkaEx.Server0P9P0 do # these functions aren't implemented for 0.9.0 @dialyzer [ - {:nowarn_function, kafka_create_topics: 3}, - {:nowarn_function, kafka_delete_topics: 3}, - {:nowarn_function, kafka_api_versions: 1} + {:nowarn_function, kafka_server_create_topics: 3}, + {:nowarn_function, kafka_server_delete_topics: 3}, + {:nowarn_function, kafka_server_api_versions: 1} ] use KafkaEx.Server @@ -52,13 +52,13 @@ defmodule KafkaEx.Server0P9P0 do defdelegate kafka_server_consumer_group_metadata(state), to: Server0P8P2 defdelegate kafka_server_update_consumer_metadata(state), to: Server0P8P2 - def kafka_api_versions(_state), + def kafka_server_api_versions(_state), do: raise("ApiVersions is not supported in 0.9.0 version of kafka") - def kafka_create_topics(_, _, _state), + def kafka_server_create_topics(_, _, _state), do: raise("CreateTopic is not supported in 0.9.0 version of kafka") - def kafka_delete_topics(_, _, _state), + def kafka_server_delete_topics(_, _, _state), do: raise("DeleteTopic is not supported in 0.9.0 version of kafka") def kafka_server_init([args]) do diff --git a/test/integration/server0_p_10_and_later_test.exs b/test/integration/server0_p_10_and_later_test.exs index c90ed70d..b8e32737 100644 --- a/test/integration/server0_p_10_and_later_test.exs +++ b/test/integration/server0_p_10_and_later_test.exs @@ -3,6 +3,7 @@ defmodule KafkaEx.Server0P10P1AndLater.Test do import TestHelper @moduletag :server_0_p_10_and_later + @num_partitions 10 @tag :create_topic test "can create a topic" do @@ -19,11 +20,14 @@ defmodule KafkaEx.Server0P10P1AndLater.Test do resp = create_topic(name, config) assert {:topic_already_exists, name} == parse_create_topic_resp(resp) - wait_for(fn -> - Enum.member?(existing_topics(), name) - end) - assert Enum.member?(existing_topics(), name) + + assert @num_partitions == + KafkaEx.Protocol.Metadata.Response.partitions_for_topic( + KafkaEx.metadata(), + name + ) + |> Enum.count() end @tag :delete_topic @@ -39,11 +43,6 @@ defmodule KafkaEx.Server0P10P1AndLater.Test do resp = KafkaEx.delete_topics([name], timeout: 5_000) assert {:no_error, name} = parse_delete_topic_resp(resp) - - wait_for(fn -> - not Enum.member?(existing_topics(), name) - end) - assert not Enum.member?(existing_topics(), name) end @@ -78,13 +77,13 @@ defmodule KafkaEx.Server0P10P1AndLater.Test do [ %{ topic: name, - num_partitions: 10, + num_partitions: @num_partitions, replication_factor: 1, replica_assignment: [], config_entries: config } ], - timeout: 5_000 + timeout: 10_000 ) end