diff --git a/config/test.exs b/config/test.exs index 5a237558..8f942991 100644 --- a/config/test.exs +++ b/config/test.exs @@ -1,6 +1,11 @@ use Mix.Config -config :ex_unit, capture_log: true +config :ex_unit, capture_log: is_nil(System.get_env("SHOW_LOGS")) config :kafka_ex, sync_timeout: 60_000 + +# Help debug tests that are tricky to understand +config :logger, :console, + format: "$time [$level] [$metadata] $message\n", + metadata: [:module, :function, :pid] diff --git a/docker-compose-kafka.env b/docker-compose-kafka.env index c3c1bf6e..f1bbcf91 100644 --- a/docker-compose-kafka.env +++ b/docker-compose-kafka.env @@ -17,7 +17,7 @@ KAFKA_ZOOKEEPER_CONNECTION_TIMEOUT_MS=6000 # alternative to KAFKA_ADVERTISED_HOST_NAME is: HOSTNAME_COMMAND: ip addr | grep -o "inet [0-9.]*" | grep -v "127\.0\.0\.1" | grep -o "[0-9.]*" KAFKA_ADVERTISED_HOST_NAME=localhost -KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=INSIDE:SSL,OUTSIDE:SSL +KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=INSIDE:PLAINTEXT,OUTSIDE:SSL KAFKA_ADVERTISED_PROTOCOL_NAME=OUTSIDE KAFKA_PROTOCOL_NAME=INSIDE diff --git a/docker-compose.yml b/docker-compose.yml index 990d089d..700c59ac 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,11 +1,19 @@ -version: '3.2' +version: '2.4' services: zookeeper: image: wurstmeister/zookeeper:3.4.6 ports: - "2181:2181" + healthcheck: + test: ["CMD-SHELL", "echo ruok | nc -w 2 127.0.0.1 2181 | grep -q imok"] + interval: 5s + timeout: 10s + retries: 15 + start_period: 10s + kafka1: + hostname: kafka1 image: wurstmeister/kafka:0.11.0.1 ports: - "9093:9093" @@ -13,12 +21,25 @@ services: - zookeeper volumes: - ./ssl:/ssl + - ./scripts/kafka_check_health:/kafka_check_health env_file: docker-compose-kafka.env environment: KAFKA_BROKER_ID: 1 KAFKA_ADVERTISED_PORT: 9093 + KAFKA_PORT: 9092 + KAFKA_HOST_NAME: kafka1 + healthcheck: + test: /kafka_check_health + interval: 10s + timeout: 10s + retries: 30 + start_period: 10s + depends_on: + zookeeper: + condition: service_healthy kafka2: + hostname: kafka2 image: wurstmeister/kafka:0.11.0.1 ports: - "9094:9094" @@ -26,12 +47,25 @@ services: - zookeeper volumes: - ./ssl:/ssl + - ./scripts/kafka_check_health:/kafka_check_health env_file: docker-compose-kafka.env environment: KAFKA_BROKER_ID: 2 KAFKA_ADVERTISED_PORT: 9094 + KAFKA_PORT: 9092 + KAFKA_HOST_NAME: kafka2 + healthcheck: + test: /kafka_check_health + interval: 10s + timeout: 10s + retries: 30 + start_period: 10s + depends_on: + zookeeper: + condition: service_healthy kafka3: + hostname: kafka3 image: wurstmeister/kafka:0.11.0.1 ports: - "9095:9095" @@ -39,7 +73,32 @@ services: - zookeeper volumes: - ./ssl:/ssl + - ./scripts/kafka_check_health:/kafka_check_health env_file: docker-compose-kafka.env environment: KAFKA_BROKER_ID: 3 KAFKA_ADVERTISED_PORT: 9095 + KAFKA_PORT: 9092 + KAFKA_HOST_NAME: kafka3 + healthcheck: + test: /kafka_check_health + interval: 10s + timeout: 10s + retries: 30 + start_period: 10s + depends_on: + zookeeper: + condition: service_healthy + + # This is a dummy service that forces all other services to be healthy before + # docker-compose up can be considered successful. + ready: + image: busybox:1.31-musl + command: tail -f /dev/null + depends_on: + kafka1: + condition: service_healthy + kafka2: + condition: service_healthy + kafka3: + condition: service_healthy diff --git a/lib/kafka_ex/consumer_group/heartbeat.ex b/lib/kafka_ex/consumer_group/heartbeat.ex index 056d62a5..9eee5aab 100644 --- a/lib/kafka_ex/consumer_group/heartbeat.ex +++ b/lib/kafka_ex/consumer_group/heartbeat.ex @@ -81,9 +81,8 @@ defmodule KafkaEx.ConsumerGroup.Heartbeat do {:stop, {:shutdown, {:error, error_code}}, state} {:error, reason} -> - Logger.warn("Heartbeat failed, got error reason #{inspect reason}") + Logger.warn("Heartbeat failed, got error reason #{inspect(reason)}") {:stop, {:shutdown, {:error, reason}}, state} - end end end diff --git a/lib/kafka_ex/new/broker.ex b/lib/kafka_ex/new/broker.ex index 99780a51..70a17283 100644 --- a/lib/kafka_ex/new/broker.ex +++ b/lib/kafka_ex/new/broker.ex @@ -21,6 +21,8 @@ defmodule KafkaEx.New.Broker do broker.socket != nil && Socket.open?(broker.socket) end - def has_socket?(%__MODULE__{socket: %Socket{socket: socket}}, socket), do: true + def has_socket?(%__MODULE__{socket: %Socket{socket: socket}}, socket), + do: true + def has_socket?(_, _), do: false end diff --git a/lib/kafka_ex/protocol/common.ex b/lib/kafka_ex/protocol/common.ex index db52d97c..e57d4f93 100644 --- a/lib/kafka_ex/protocol/common.ex +++ b/lib/kafka_ex/protocol/common.ex @@ -22,7 +22,9 @@ defmodule KafkaEx.Protocol.Common do mod ) do struct_module = Module.concat(mod, Response) - {partitions, topics_data} = mod.parse_partitions(partitions_size, rest, [], topic) + + {partitions, topics_data} = + mod.parse_partitions(partitions_size, rest, [], topic) [ %{ diff --git a/lib/kafka_ex/protocol/create_topics.ex b/lib/kafka_ex/protocol/create_topics.ex index 0a447dfb..fa3d24da 100644 --- a/lib/kafka_ex/protocol/create_topics.ex +++ b/lib/kafka_ex/protocol/create_topics.ex @@ -54,7 +54,11 @@ defmodule KafkaEx.Protocol.CreateTopics do defmodule Request do @moduledoc false defstruct create_topic_requests: nil, timeout: nil - @type t :: %Request{create_topic_requests: [TopicRequest.t()], timeout: integer} + + @type t :: %Request{ + create_topic_requests: [TopicRequest.t()], + timeout: integer + } end defmodule TopicError do diff --git a/lib/kafka_ex/protocol/join_group.ex b/lib/kafka_ex/protocol/join_group.ex index 17fd1024..cab15798 100644 --- a/lib/kafka_ex/protocol/join_group.ex +++ b/lib/kafka_ex/protocol/join_group.ex @@ -29,13 +29,15 @@ defmodule KafkaEx.Protocol.JoinGroup do member_id: nil, members: [] - @type t :: %Response{ - error_code: atom | integer, - generation_id: integer, - leader_id: binary, - member_id: binary, - members: [binary] - } | {:error, atom} + @type t :: + %Response{ + error_code: atom | integer, + generation_id: integer, + leader_id: binary, + member_id: binary, + members: [binary] + } + | {:error, atom} def leader?(%__MODULE__{member_id: member_id, leader_id: leader_id}) do member_id == leader_id diff --git a/lib/kafka_ex/protocol/leave_group.ex b/lib/kafka_ex/protocol/leave_group.ex index e81aa2dd..8f030d1f 100644 --- a/lib/kafka_ex/protocol/leave_group.ex +++ b/lib/kafka_ex/protocol/leave_group.ex @@ -14,9 +14,11 @@ defmodule KafkaEx.Protocol.LeaveGroup do defstruct error_code: nil - @type t :: %Response{ - error_code: atom | integer - } | {:error, atom} + @type t :: + %Response{ + error_code: atom | integer + } + | {:error, atom} end @spec create_request(integer, binary, Request.t()) :: binary diff --git a/lib/kafka_ex/protocol/produce.ex b/lib/kafka_ex/protocol/produce.ex index 54362f52..d420ba0c 100644 --- a/lib/kafka_ex/protocol/produce.ex +++ b/lib/kafka_ex/protocol/produce.ex @@ -103,7 +103,8 @@ defmodule KafkaEx.Protocol.Produce do {message, msize} = create_message(compressed_message_set, nil, attribute) - {[<<0::64-signed>>, <>, message], @int64_size + @int32_size + msize} + {[<<0::64-signed>>, <>, message], + @int64_size + @int32_size + msize} end defp create_message_set_uncompressed([ diff --git a/lib/kafka_ex/server.ex b/lib/kafka_ex/server.ex index 98bbdad8..7f1f274b 100644 --- a/lib/kafka_ex/server.ex +++ b/lib/kafka_ex/server.ex @@ -401,7 +401,11 @@ defmodule KafkaEx.Server do produce_request_data = try do - Produce.create_request(correlation_id, Config.client_id(), produce_request) + Produce.create_request( + correlation_id, + Config.client_id(), + produce_request + ) rescue e in FunctionClauseError -> nil end @@ -762,16 +766,18 @@ defmodule KafkaEx.Server do check_brokers_sockets!(brokers) - {correlation_id, metadata} = try do - retrieve_metadata( - brokers, - 0, - config_sync_timeout() - ) - rescue e -> - sleep_for_reconnect() - Kernel.reraise(e, System.stacktrace()) - end + {correlation_id, metadata} = + try do + retrieve_metadata( + brokers, + 0, + config_sync_timeout() + ) + rescue + e -> + sleep_for_reconnect() + Kernel.reraise(e, System.stacktrace()) + end state = %State{ metadata: metadata, @@ -800,8 +806,9 @@ defmodule KafkaEx.Server do end defp check_brokers_sockets!(brokers) do - any_socket_opened = brokers - |> Enum.any?(fn %Broker{socket: socket} -> not is_nil(socket) end) + any_socket_opened = + brokers + |> Enum.any?(fn %Broker{socket: socket} -> not is_nil(socket) end) if not any_socket_opened do sleep_for_reconnect() @@ -1000,7 +1007,9 @@ defmodule KafkaEx.Server do Application.get_env(:kafka_ex, :partitioner, KafkaEx.DefaultPartitioner) end - defp increment_state_correlation_id(%_{correlation_id: correlation_id} = state) do + defp increment_state_correlation_id( + %_{correlation_id: correlation_id} = state + ) do %{state | correlation_id: correlation_id + 1} end end diff --git a/lib/kafka_ex/server_0_p_10_and_later.ex b/lib/kafka_ex/server_0_p_10_and_later.ex index 40f50dbc..0cfd1284 100644 --- a/lib/kafka_ex/server_0_p_10_and_later.ex +++ b/lib/kafka_ex/server_0_p_10_and_later.ex @@ -96,31 +96,34 @@ defmodule KafkaEx.Server0P10AndLater do check_brokers_sockets!(brokers) {_, + %KafkaEx.Protocol.ApiVersions.Response{ + api_versions: api_versions, + error_code: error_code + }, state} = kafka_server_api_versions(%State{brokers: brokers}) - %KafkaEx.Protocol.ApiVersions.Response{ - api_versions: api_versions, - error_code: error_code - }, state} = kafka_server_api_versions(%State{brokers: brokers}) if error_code == :no_response do sleep_for_reconnect() raise "Brokers sockets are closed" end + :no_error = error_code api_versions = KafkaEx.ApiVersions.api_versions_map(api_versions) - {correlation_id, metadata} = try do - retrieve_metadata( - brokers, - state.correlation_id, - config_sync_timeout(), - [], - api_versions - ) - rescue e -> - sleep_for_reconnect() - Kernel.reraise(e, System.stacktrace()) - end + {correlation_id, metadata} = + try do + retrieve_metadata( + brokers, + state.correlation_id, + config_sync_timeout(), + [], + api_versions + ) + rescue + e -> + sleep_for_reconnect() + Kernel.reraise(e, System.stacktrace()) + end state = %State{ metadata: metadata, @@ -274,18 +277,18 @@ defmodule KafkaEx.Server0P10AndLater do {:topic_not_found, state} _ -> - broker - |> NetworkClient.send_sync_request( - main_request, - config_sync_timeout() - ) - |> case do - {:error, reason} -> - {{:error, reason}, increment_state_correlation_id(state)} - - response -> - {{:ok, response}, increment_state_correlation_id(state)} - end + broker + |> NetworkClient.send_sync_request( + main_request, + config_sync_timeout() + ) + |> case do + {:error, reason} -> + {{:error, reason}, increment_state_correlation_id(state)} + + response -> + {{:ok, response}, increment_state_correlation_id(state)} + end end end diff --git a/lib/kafka_ex/server_0_p_9_p_0.ex b/lib/kafka_ex/server_0_p_9_p_0.ex index f7b0b184..f924870c 100644 --- a/lib/kafka_ex/server_0_p_9_p_0.ex +++ b/lib/kafka_ex/server_0_p_9_p_0.ex @@ -101,12 +101,14 @@ defmodule KafkaEx.Server0P9P0 do check_brokers_sockets!(brokers) - {correlation_id, metadata} = try do - retrieve_metadata(brokers, 0, config_sync_timeout()) - rescue e -> - sleep_for_reconnect() - Kernel.reraise(e, System.stacktrace()) - end + {correlation_id, metadata} = + try do + retrieve_metadata(brokers, 0, config_sync_timeout()) + rescue + e -> + sleep_for_reconnect() + Kernel.reraise(e, System.stacktrace()) + end state = %State{ metadata: metadata, diff --git a/scripts/kafka_check_health b/scripts/kafka_check_health new file mode 100755 index 00000000..b8b7eba7 --- /dev/null +++ b/scripts/kafka_check_health @@ -0,0 +1,21 @@ +#! /bin/bash + +cd $(dirname $0) + +broker_ready() { + # Inspired by: https://github.com/wurstmeister/kafka-docker/issues/167#issuecomment-439849789 + current_broker=$(cat $KAFKA_HOME/config/server.properties | awk 'BEGIN{FS="="}/^broker.id=/{print $2}') + brokers=$($KAFKA_HOME/bin/zookeeper-shell.sh zookeeper:2181 <<< "ls /brokers/ids" | tail -1) + + echo "brokers: $brokers" + echo "current_broker: $current_broker" + + echo "${brokers}" | jq '.[]' | grep -q "^${current_broker}$"; + return $? +} + +####################### + +broker_ready + +####################### diff --git a/test/integration/new_client_test.exs b/test/integration/new_client_test.exs index a629e86c..513b515c 100644 --- a/test/integration/new_client_test.exs +++ b/test/integration/new_client_test.exs @@ -104,19 +104,23 @@ defmodule KafkaEx.New.Client.Test do test "client can receive {:ssl_closed, _}", %{client: client} do send(client, {:ssl_closed, :unused}) + TestHelper.wait_for(fn -> {:message_queue_len, m} = Process.info(client, :message_queue_len) m == 0 end) + assert Process.alive?(client) end test "client can receive {:tcp_closed, _}", %{client: client} do send(client, {:tcp_closed, :unused}) + TestHelper.wait_for(fn -> {:message_queue_len, m} = Process.info(client, :message_queue_len) m == 0 end) + assert Process.alive?(client) end end diff --git a/test/kafka_ex/utils/murmur_test.exs b/test/kafka_ex/utils/murmur_test.exs index 8141012f..c396d159 100644 --- a/test/kafka_ex/utils/murmur_test.exs +++ b/test/kafka_ex/utils/murmur_test.exs @@ -5,12 +5,15 @@ defmodule KafkaEx.Utils.MurmurTest do test "murmur2 correctly encodes strings" do # Taken from https://github.com/apache/kafka/blob/8ab0994919752cd4870e771221ba934a6a539a67/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java#L66-L78 - assert Murmur.murmur2("21") == -973932308 - assert Murmur.murmur2("foobar") == -790332482 - assert Murmur.murmur2("a-little-bit-long-string") == -985981536 - assert Murmur.murmur2("a-little-bit-longer-string") == -1486304829 - assert Murmur.murmur2("lkjh234lh9fiuh90y23oiuhsafujhadof229phr9h19h89h8") == -58897971 - assert Murmur.murmur2("abc") == 479470107 + assert Murmur.murmur2("21") == -973_932_308 + assert Murmur.murmur2("foobar") == -790_332_482 + assert Murmur.murmur2("a-little-bit-long-string") == -985_981_536 + assert Murmur.murmur2("a-little-bit-longer-string") == -1_486_304_829 + + assert Murmur.murmur2("lkjh234lh9fiuh90y23oiuhsafujhadof229phr9h19h89h8") == + -58_897_971 + + assert Murmur.murmur2("abc") == 479_470_107 end test "umurmur2 correctly encodes strings" do diff --git a/test/protocol/delete_topics_test.exs b/test/protocol/delete_topics_test.exs index c4a1bf88..28b49f7b 100644 --- a/test/protocol/delete_topics_test.exs +++ b/test/protocol/delete_topics_test.exs @@ -3,29 +3,64 @@ defmodule KafkaEx.Protocol.DeleteTopicsTest do describe "create_request/4" do test "creates a request to delete a single topic" do - expected_request = <<20::16, 0::16, 999::32, 13::16, "the-client-id"::binary, 1::32-signed, 6::16, "topic1"::binary, 100::32-signed>> - delete_request = %KafkaEx.Protocol.DeleteTopics.Request{topics: ["topic1"], timeout: 100} + expected_request = + <<20::16, 0::16, 999::32, 13::16, "the-client-id"::binary, 1::32-signed, + 6::16, "topic1"::binary, 100::32-signed>> - delete_response = KafkaEx.Protocol.DeleteTopics.create_request(999, "the-client-id", delete_request, 0) + delete_request = %KafkaEx.Protocol.DeleteTopics.Request{ + topics: ["topic1"], + timeout: 100 + } + + delete_response = + KafkaEx.Protocol.DeleteTopics.create_request( + 999, + "the-client-id", + delete_request, + 0 + ) assert expected_request == delete_response end test "creates a request to delete a multiple topic" do - expected_response = <<20::16, 0::16, 999::32, 13::16, "the-client-id"::binary, 3::32-signed, 6::16, "topic3"::binary, 6::16, "topic2"::binary, 6::16, "topic1"::binary, 100::32-signed>> + expected_response = + <<20::16, 0::16, 999::32, 13::16, "the-client-id"::binary, 3::32-signed, + 6::16, "topic3"::binary, 6::16, "topic2"::binary, 6::16, + "topic1"::binary, 100::32-signed>> + + delete_request = %KafkaEx.Protocol.DeleteTopics.Request{ + topics: ["topic1", "topic2", "topic3"], + timeout: 100 + } - delete_request = %KafkaEx.Protocol.DeleteTopics.Request{topics: ["topic1", "topic2", "topic3"], timeout: 100} - delete_response = KafkaEx.Protocol.DeleteTopics.create_request(999, "the-client-id", delete_request, 0) + delete_response = + KafkaEx.Protocol.DeleteTopics.create_request( + 999, + "the-client-id", + delete_request, + 0 + ) assert expected_response == delete_response end test "raise error when non-zero api_version is sent" do - delete_request = %KafkaEx.Protocol.DeleteTopics.Request{topics: ["topic1"], timeout: 100} + delete_request = %KafkaEx.Protocol.DeleteTopics.Request{ + topics: ["topic1"], + timeout: 100 + } assert_raise FunctionClauseError, - "no function clause matching in KafkaEx.Protocol.DeleteTopics.create_request/4", - fn -> KafkaEx.Protocol.DeleteTopics.create_request(999, "the-client-id", delete_request, 1) end + "no function clause matching in KafkaEx.Protocol.DeleteTopics.create_request/4", + fn -> + KafkaEx.Protocol.DeleteTopics.create_request( + 999, + "the-client-id", + delete_request, + 1 + ) + end end end end diff --git a/test/protocol/fetch_test.exs b/test/protocol/fetch_test.exs index 96f617f2..33fc7d33 100644 --- a/test/protocol/fetch_test.exs +++ b/test/protocol/fetch_test.exs @@ -169,7 +169,15 @@ defmodule KafkaEx.Protocol.Fetch.Test do hw_mark_offset: 10, last_offset: 1, message_set: [ - %Message{attributes: 0, crc: 0, key: key, offset: 1, value: nil, topic: topic, partition: 0} + %Message{ + attributes: 0, + crc: 0, + key: key, + offset: 1, + value: nil, + topic: topic, + partition: 0 + } ], partition: 0 } @@ -241,7 +249,15 @@ defmodule KafkaEx.Protocol.Fetch.Test do hw_mark_offset: 10, last_offset: 1, message_set: [ - %Message{attributes: 0, crc: 0, key: nil, offset: 1, value: "baz", topic: topic, partition: 1} + %Message{ + attributes: 0, + crc: 0, + key: nil, + offset: 1, + value: "baz", + topic: topic, + partition: 1 + } ], partition: 1 }, @@ -250,7 +266,15 @@ defmodule KafkaEx.Protocol.Fetch.Test do hw_mark_offset: 10, last_offset: 1, message_set: [ - %Message{attributes: 0, crc: 0, key: nil, offset: 1, value: "bar", topic: topic, partition: 0} + %Message{ + attributes: 0, + crc: 0, + key: nil, + offset: 1, + value: "bar", + topic: topic, + partition: 0 + } ], partition: 0 } @@ -479,6 +503,7 @@ defmodule KafkaEx.Protocol.Fetch.Test do test "parse_response correctly parses a valid response with batched snappy-encoded messages" do partition_id = 0 + response = <<0, 0, 0, 14, 0, 0, 0, 1, 0, 17, 115, 110, 97, 112, 112, 121, 95, 98, 97, 116, 99, 104, 95, 116, 101, 115, 116, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0,