Skip to content

Commit

Permalink
Merge pull request #473 from b1az/warnings
Browse files Browse the repository at this point in the history
Fix compiler warnings
  • Loading branch information
joshuawscott authored Dec 14, 2022
2 parents 97b577f + d7552e7 commit 4344b02
Show file tree
Hide file tree
Showing 5 changed files with 83 additions and 33 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,6 @@ doc

.elixir_ls
cover

# Dialyzer's Persistent Lookup Table
priv/plts/
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,15 @@ KafkaEx
[![API Docs](https://img.shields.io/badge/api-docs-yellow.svg?style=flat)](http://hexdocs.pm/kafka_ex/)

KafkaEx is an Elixir client for [Apache Kafka](http://kafka.apache.org/) with
support for Kafka versions 0.8.0 and newer. KafkaEx requires Elixir 1.5+ and
support for Kafka versions 0.8.0 and newer. KafkaEx requires Elixir 1.6+ and
Erlang OTP 19+.

See [http://hexdocs.pm/kafka_ex/](http://hexdocs.pm/kafka_ex/) for
documentation,
[https://github.com/kafkaex/kafka_ex/](https://github.com/kafkaex/kafka_ex/)
for code.

KakfaEx supports the following Kafka features:
KafkaEx supports the following Kafka features:

* Broker and Topic Metadata
* Produce Messages
Expand Down
26 changes: 15 additions & 11 deletions lib/kafka_ex/consumer_group.ex
Original file line number Diff line number Diff line change
Expand Up @@ -297,11 +297,9 @@ defmodule KafkaEx.ConsumerGroup do
def active?(supervisor_pid, timeout \\ 5000) do
consumer_supervisor = consumer_supervisor_pid(supervisor_pid, timeout)

if consumer_supervisor && Process.alive?(consumer_supervisor) do
consumer_supervisor &&
Process.alive?(consumer_supervisor) &&
GenConsumer.Supervisor.active?(consumer_supervisor)
else
false
end
end

@doc """
Expand Down Expand Up @@ -334,9 +332,17 @@ defmodule KafkaEx.ConsumerGroup do
opts
) do
child =
supervisor(
KafkaEx.GenConsumer.Supervisor,
[{gen_consumer_module, consumer_module}, group_name, assignments, opts],
Supervisor.child_spec(
{
KafkaEx.GenConsumer.Supervisor,
%{
gen_consumer_module: gen_consumer_module,
consumer_module: consumer_module,
group_name: group_name,
assignments: assignments,
opts: opts
}
},
id: :consumer
)

Expand All @@ -363,10 +369,8 @@ defmodule KafkaEx.ConsumerGroup do
opts = Keyword.put(opts, :supervisor_pid, self())

children = [
worker(
KafkaEx.ConsumerGroup.Manager,
[{gen_consumer_module, consumer_module}, group_name, topics, opts]
)
{KafkaEx.ConsumerGroup.Manager,
{{gen_consumer_module, consumer_module}, group_name, topics, opts}}
]

Supervisor.init(children,
Expand Down
10 changes: 5 additions & 5 deletions lib/kafka_ex/consumer_group/manager.ex
Original file line number Diff line number Diff line change
Expand Up @@ -53,18 +53,18 @@ defmodule KafkaEx.ConsumerGroup.Manager do

@doc false
# use `KafkaEx.ConsumerGroup.start_link/4` instead
@spec start_link(
@spec start_link({
{module, module},
binary,
[binary],
KafkaEx.GenConsumer.options()
) :: GenServer.on_start()
def start_link(
}) :: GenServer.on_start()
def start_link({
{gen_consumer_module, consumer_module},
group_name,
topics,
opts \\ []
) do
opts
}) do
gen_server_opts = Keyword.get(opts, :gen_server_opts, [])
consumer_opts = Keyword.drop(opts, [:gen_server_opts])

Expand Down
73 changes: 58 additions & 15 deletions lib/kafka_ex/gen_consumer/supervisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,23 @@ defmodule KafkaEx.GenConsumer.Supervisor do

use DynamicSupervisor

if Version.match?(System.version(), ">= 1.7.0") do
@doc since: "0.14.0"
end

@doc """
Starts a `GenConsumer.Supervisor` process linked to the current process.
`gen_consumer_module` is a module that implements the `GenServer` behaviour
which consumes events from Kafka.
`consumer_module` is a module that implements the `GenConsumer` behaviour.
`group_name` is the name of a consumer group, and `assignments` is a list of
partitions for the `GenConsumer`s to consume. `opts` accepts the same
options as `KafkaEx.GenConsumer.start_link/5`.
`group_name` is the name of a consumer group.
`assignments` is a list of partitions for the `GenConsumer`s to consume.
`opts` accepts the same options as `KafkaEx.GenConsumer.start_link/5`.
### Return Values
Expand All @@ -32,20 +40,22 @@ defmodule KafkaEx.GenConsumer.Supervisor do
If the supervisor and its consumers are successfully created, this function
returns `{:ok, pid}`, where `pid` is the PID of the supervisor.
"""
@spec start_link(
{gen_consumer_module :: module, consumer_module :: module},
consumer_group_name :: binary,
assigned_partitions :: [
@spec start_link(%{
gen_consumer_module: module,
consumer_module: module,
group_name: binary,
assignments: [
{topic_name :: binary, partition_id :: non_neg_integer}
],
KafkaEx.GenConsumer.options()
) :: Elixir.Supervisor.on_start()
def start_link(
{gen_consumer_module, consumer_module},
group_name,
assignments,
opts \\ []
) do
opts: KafkaEx.GenConsumer.options()
}) :: Supervisor.on_start()
def start_link(%{
gen_consumer_module: gen_consumer_module,
consumer_module: consumer_module,
group_name: group_name,
assignments: assignments,
opts: opts
}) do
start_link_result =
DynamicSupervisor.start_link(
__MODULE__,
Expand All @@ -71,6 +81,39 @@ defmodule KafkaEx.GenConsumer.Supervisor do
end
end

@deprecated "Use start_link/1 instead"
@doc """
Starts a `GenConsumer.Supervisor` process linked to the current process.
Refer to `start_link/1` for documentation of each parameter.
### Return Values
Same as `start_link/1`.
"""
@spec start_link(
{gen_consumer_module :: module, consumer_module :: module},
consumer_group_name :: binary,
assigned_partitions :: [
{topic_name :: binary, partition_id :: non_neg_integer}
],
KafkaEx.GenConsumer.options()
) :: Elixir.Supervisor.on_start()
def start_link(
{gen_consumer_module, consumer_module},
group_name,
assignments,
opts \\ []
) do
start_link(%{
gen_consumer_module: gen_consumer_module,
consumer_module: consumer_module,
group_name: group_name,
assignments: assignments,
opts: opts
})
end

@doc """
Returns a list of child pids
Expand Down

0 comments on commit 4344b02

Please sign in to comment.