Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve docker-compose by adding healthcheck condition at startup #420

Merged
merged 2 commits into from
Nov 3, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion config/test.exs
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
use Mix.Config

config :ex_unit, capture_log: true
config :ex_unit, capture_log: is_nil(System.get_env("SHOW_LOGS"))

config :kafka_ex,
sync_timeout: 60_000

# Help debug tests that are tricky to understand
config :logger, :console,
format: "$time [$level] [$metadata] $message\n",
metadata: [:module, :function, :pid]
2 changes: 1 addition & 1 deletion docker-compose-kafka.env
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ KAFKA_ZOOKEEPER_CONNECTION_TIMEOUT_MS=6000

# alternative to KAFKA_ADVERTISED_HOST_NAME is: HOSTNAME_COMMAND: ip addr | grep -o "inet [0-9.]*" | grep -v "127\.0\.0\.1" | grep -o "[0-9.]*"
KAFKA_ADVERTISED_HOST_NAME=localhost
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=INSIDE:SSL,OUTSIDE:SSL
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=INSIDE:PLAINTEXT,OUTSIDE:SSL
KAFKA_ADVERTISED_PROTOCOL_NAME=OUTSIDE
KAFKA_PROTOCOL_NAME=INSIDE

Expand Down
61 changes: 60 additions & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,45 +1,104 @@
version: '3.2'
version: '2.4'

services:
zookeeper:
image: wurstmeister/zookeeper:3.4.6
ports:
- "2181:2181"
healthcheck:
test: ["CMD-SHELL", "echo ruok | nc -w 2 127.0.0.1 2181 | grep -q imok"]
interval: 5s
timeout: 10s
retries: 15
start_period: 10s

kafka1:
hostname: kafka1
image: wurstmeister/kafka:0.11.0.1
ports:
- "9093:9093"
depends_on:
- zookeeper
volumes:
- ./ssl:/ssl
- ./scripts/kafka_check_health:/kafka_check_health
env_file: docker-compose-kafka.env
environment:
KAFKA_BROKER_ID: 1
KAFKA_ADVERTISED_PORT: 9093
KAFKA_PORT: 9092
KAFKA_HOST_NAME: kafka1
healthcheck:
test: /kafka_check_health
interval: 10s
timeout: 10s
retries: 30
start_period: 10s
depends_on:
zookeeper:
condition: service_healthy

kafka2:
hostname: kafka2
image: wurstmeister/kafka:0.11.0.1
ports:
- "9094:9094"
depends_on:
- zookeeper
volumes:
- ./ssl:/ssl
- ./scripts/kafka_check_health:/kafka_check_health
env_file: docker-compose-kafka.env
environment:
KAFKA_BROKER_ID: 2
KAFKA_ADVERTISED_PORT: 9094
KAFKA_PORT: 9092
KAFKA_HOST_NAME: kafka2
healthcheck:
test: /kafka_check_health
interval: 10s
timeout: 10s
retries: 30
start_period: 10s
depends_on:
zookeeper:
condition: service_healthy

kafka3:
hostname: kafka3
image: wurstmeister/kafka:0.11.0.1
ports:
- "9095:9095"
depends_on:
- zookeeper
volumes:
- ./ssl:/ssl
- ./scripts/kafka_check_health:/kafka_check_health
env_file: docker-compose-kafka.env
environment:
KAFKA_BROKER_ID: 3
KAFKA_ADVERTISED_PORT: 9095
KAFKA_PORT: 9092
KAFKA_HOST_NAME: kafka3
healthcheck:
test: /kafka_check_health
interval: 10s
timeout: 10s
retries: 30
start_period: 10s
depends_on:
zookeeper:
condition: service_healthy

# This is a dummy service that forces all other services to be healthy before
# docker-compose up can be considered successful.
ready:
image: busybox:1.31-musl
command: tail -f /dev/null
depends_on:
kafka1:
condition: service_healthy
kafka2:
condition: service_healthy
kafka3:
condition: service_healthy
3 changes: 1 addition & 2 deletions lib/kafka_ex/consumer_group/heartbeat.ex
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,8 @@ defmodule KafkaEx.ConsumerGroup.Heartbeat do
{:stop, {:shutdown, {:error, error_code}}, state}

{:error, reason} ->
Logger.warn("Heartbeat failed, got error reason #{inspect reason}")
Logger.warn("Heartbeat failed, got error reason #{inspect(reason)}")
{:stop, {:shutdown, {:error, reason}}, state}

end
end
end
4 changes: 3 additions & 1 deletion lib/kafka_ex/new/broker.ex
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ defmodule KafkaEx.New.Broker do
broker.socket != nil && Socket.open?(broker.socket)
end

def has_socket?(%__MODULE__{socket: %Socket{socket: socket}}, socket), do: true
def has_socket?(%__MODULE__{socket: %Socket{socket: socket}}, socket),
do: true

def has_socket?(_, _), do: false
end
4 changes: 3 additions & 1 deletion lib/kafka_ex/protocol/common.ex
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ defmodule KafkaEx.Protocol.Common do
mod
) do
struct_module = Module.concat(mod, Response)
{partitions, topics_data} = mod.parse_partitions(partitions_size, rest, [], topic)

{partitions, topics_data} =
mod.parse_partitions(partitions_size, rest, [], topic)

[
%{
Expand Down
6 changes: 5 additions & 1 deletion lib/kafka_ex/protocol/create_topics.ex
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,11 @@ defmodule KafkaEx.Protocol.CreateTopics do
defmodule Request do
@moduledoc false
defstruct create_topic_requests: nil, timeout: nil
@type t :: %Request{create_topic_requests: [TopicRequest.t()], timeout: integer}

@type t :: %Request{
create_topic_requests: [TopicRequest.t()],
timeout: integer
}
end

defmodule TopicError do
Expand Down
16 changes: 9 additions & 7 deletions lib/kafka_ex/protocol/join_group.ex
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,15 @@ defmodule KafkaEx.Protocol.JoinGroup do
member_id: nil,
members: []

@type t :: %Response{
error_code: atom | integer,
generation_id: integer,
leader_id: binary,
member_id: binary,
members: [binary]
} | {:error, atom}
@type t ::
%Response{
error_code: atom | integer,
generation_id: integer,
leader_id: binary,
member_id: binary,
members: [binary]
}
| {:error, atom}

def leader?(%__MODULE__{member_id: member_id, leader_id: leader_id}) do
member_id == leader_id
Expand Down
8 changes: 5 additions & 3 deletions lib/kafka_ex/protocol/leave_group.ex
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,11 @@ defmodule KafkaEx.Protocol.LeaveGroup do

defstruct error_code: nil

@type t :: %Response{
error_code: atom | integer
} | {:error, atom}
@type t ::
%Response{
error_code: atom | integer
}
| {:error, atom}
end

@spec create_request(integer, binary, Request.t()) :: binary
Expand Down
3 changes: 2 additions & 1 deletion lib/kafka_ex/protocol/produce.ex
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,8 @@ defmodule KafkaEx.Protocol.Produce do

{message, msize} = create_message(compressed_message_set, nil, attribute)

{[<<0::64-signed>>, <<msize::32-signed>>, message], @int64_size + @int32_size + msize}
{[<<0::64-signed>>, <<msize::32-signed>>, message],
@int64_size + @int32_size + msize}
end

defp create_message_set_uncompressed([
Expand Down
37 changes: 23 additions & 14 deletions lib/kafka_ex/server.ex
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,11 @@ defmodule KafkaEx.Server do

produce_request_data =
try do
Produce.create_request(correlation_id, Config.client_id(), produce_request)
Produce.create_request(
correlation_id,
Config.client_id(),
produce_request
)
rescue
e in FunctionClauseError -> nil
end
Expand Down Expand Up @@ -762,16 +766,18 @@ defmodule KafkaEx.Server do

check_brokers_sockets!(brokers)

{correlation_id, metadata} = try do
retrieve_metadata(
brokers,
0,
config_sync_timeout()
)
rescue e ->
sleep_for_reconnect()
Kernel.reraise(e, System.stacktrace())
end
{correlation_id, metadata} =
try do
retrieve_metadata(
brokers,
0,
config_sync_timeout()
)
rescue
e ->
sleep_for_reconnect()
Kernel.reraise(e, System.stacktrace())
end

state = %State{
metadata: metadata,
Expand Down Expand Up @@ -800,8 +806,9 @@ defmodule KafkaEx.Server do
end

defp check_brokers_sockets!(brokers) do
any_socket_opened = brokers
|> Enum.any?(fn %Broker{socket: socket} -> not is_nil(socket) end)
any_socket_opened =
brokers
|> Enum.any?(fn %Broker{socket: socket} -> not is_nil(socket) end)

if not any_socket_opened do
sleep_for_reconnect()
Expand Down Expand Up @@ -1000,7 +1007,9 @@ defmodule KafkaEx.Server do
Application.get_env(:kafka_ex, :partitioner, KafkaEx.DefaultPartitioner)
end

defp increment_state_correlation_id(%_{correlation_id: correlation_id} = state) do
defp increment_state_correlation_id(
%_{correlation_id: correlation_id} = state
) do
%{state | correlation_id: correlation_id + 1}
end
end
Expand Down
59 changes: 31 additions & 28 deletions lib/kafka_ex/server_0_p_10_and_later.ex
Original file line number Diff line number Diff line change
Expand Up @@ -96,31 +96,34 @@ defmodule KafkaEx.Server0P10AndLater do
check_brokers_sockets!(brokers)

{_,
%KafkaEx.Protocol.ApiVersions.Response{
api_versions: api_versions,
error_code: error_code
}, state} = kafka_server_api_versions(%State{brokers: brokers})

%KafkaEx.Protocol.ApiVersions.Response{
api_versions: api_versions,
error_code: error_code
}, state} = kafka_server_api_versions(%State{brokers: brokers})
if error_code == :no_response do
sleep_for_reconnect()
raise "Brokers sockets are closed"
end

:no_error = error_code

api_versions = KafkaEx.ApiVersions.api_versions_map(api_versions)

{correlation_id, metadata} = try do
retrieve_metadata(
brokers,
state.correlation_id,
config_sync_timeout(),
[],
api_versions
)
rescue e ->
sleep_for_reconnect()
Kernel.reraise(e, System.stacktrace())
end
{correlation_id, metadata} =
try do
retrieve_metadata(
brokers,
state.correlation_id,
config_sync_timeout(),
[],
api_versions
)
rescue
e ->
sleep_for_reconnect()
Kernel.reraise(e, System.stacktrace())
end

state = %State{
metadata: metadata,
Expand Down Expand Up @@ -274,18 +277,18 @@ defmodule KafkaEx.Server0P10AndLater do
{:topic_not_found, state}

_ ->
broker
|> NetworkClient.send_sync_request(
main_request,
config_sync_timeout()
)
|> case do
{:error, reason} ->
{{:error, reason}, increment_state_correlation_id(state)}

response ->
{{:ok, response}, increment_state_correlation_id(state)}
end
broker
|> NetworkClient.send_sync_request(
main_request,
config_sync_timeout()
)
|> case do
{:error, reason} ->
{{:error, reason}, increment_state_correlation_id(state)}

response ->
{{:ok, response}, increment_state_correlation_id(state)}
end
end
end

Expand Down
14 changes: 8 additions & 6 deletions lib/kafka_ex/server_0_p_9_p_0.ex
Original file line number Diff line number Diff line change
Expand Up @@ -101,12 +101,14 @@ defmodule KafkaEx.Server0P9P0 do

check_brokers_sockets!(brokers)

{correlation_id, metadata} = try do
retrieve_metadata(brokers, 0, config_sync_timeout())
rescue e ->
sleep_for_reconnect()
Kernel.reraise(e, System.stacktrace())
end
{correlation_id, metadata} =
try do
retrieve_metadata(brokers, 0, config_sync_timeout())
rescue
e ->
sleep_for_reconnect()
Kernel.reraise(e, System.stacktrace())
end

state = %State{
metadata: metadata,
Expand Down
Loading