Skip to content

Commit

Permalink
Add the conventional GenConsumer.Supervisor.start_link/1
Browse files Browse the repository at this point in the history
This enables the usage of `child_spec/1` and removal of
the deprecated `Supervisor.Spec.supervisor/3` call.

Fixes #390.
  • Loading branch information
b1az committed Aug 29, 2022
1 parent 71eef86 commit b025ac6
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 18 deletions.
14 changes: 11 additions & 3 deletions lib/kafka_ex/consumer_group.ex
Original file line number Diff line number Diff line change
Expand Up @@ -334,9 +334,17 @@ defmodule KafkaEx.ConsumerGroup do
opts
) do
child =
supervisor(
KafkaEx.GenConsumer.Supervisor,
[{gen_consumer_module, consumer_module}, group_name, assignments, opts],
Supervisor.child_spec(
{
KafkaEx.GenConsumer.Supervisor,
%{
gen_consumer_module: gen_consumer_module,
consumer_module: consumer_module,
group_name: group_name,
assignments: assignments,
opts: opts
}
},
id: :consumer
)

Expand Down
73 changes: 58 additions & 15 deletions lib/kafka_ex/gen_consumer/supervisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,23 @@ defmodule KafkaEx.GenConsumer.Supervisor do

use DynamicSupervisor

if Version.match?(System.version(), ">= 1.7.0") do
@doc since: "0.14.0"
end

@doc """
Starts a `GenConsumer.Supervisor` process linked to the current process.
`gen_consumer_module` is a module that implements the `GenServer` behaviour
which consumes events from Kafka.
`consumer_module` is a module that implements the `GenConsumer` behaviour.
`group_name` is the name of a consumer group, and `assignments` is a list of
partitions for the `GenConsumer`s to consume. `opts` accepts the same
options as `KafkaEx.GenConsumer.start_link/5`.
`group_name` is the name of a consumer group.
`assignments` is a list of partitions for the `GenConsumer`s to consume.
`opts` accepts the same options as `KafkaEx.GenConsumer.start_link/5`.
### Return Values
Expand All @@ -32,20 +40,22 @@ defmodule KafkaEx.GenConsumer.Supervisor do
If the supervisor and its consumers are successfully created, this function
returns `{:ok, pid}`, where `pid` is the PID of the supervisor.
"""
@spec start_link(
{gen_consumer_module :: module, consumer_module :: module},
consumer_group_name :: binary,
assigned_partitions :: [
@spec start_link(%{
gen_consumer_module: module,
consumer_module: module,
group_name: binary,
assignments: [
{topic_name :: binary, partition_id :: non_neg_integer}
],
KafkaEx.GenConsumer.options()
) :: Elixir.Supervisor.on_start()
def start_link(
{gen_consumer_module, consumer_module},
group_name,
assignments,
opts \\ []
) do
opts: KafkaEx.GenConsumer.options()
}) :: Supervisor.on_start()
def start_link(%{
gen_consumer_module: gen_consumer_module,
consumer_module: consumer_module,
group_name: group_name,
assignments: assignments,
opts: opts
}) do
start_link_result =
DynamicSupervisor.start_link(
__MODULE__,
Expand All @@ -71,6 +81,39 @@ defmodule KafkaEx.GenConsumer.Supervisor do
end
end

@deprecated "Use start_link/1 instead"
@doc """
Starts a `GenConsumer.Supervisor` process linked to the current process.
Refer to `start_link/1` for documentation of each parameter.
### Return Values
Same as `start_link/1`.
"""
@spec start_link(
{gen_consumer_module :: module, consumer_module :: module},
consumer_group_name :: binary,
assigned_partitions :: [
{topic_name :: binary, partition_id :: non_neg_integer}
],
KafkaEx.GenConsumer.options()
) :: Elixir.Supervisor.on_start()
def start_link(
{gen_consumer_module, consumer_module},
group_name,
assignments,
opts \\ []
) do
start_link(%{
gen_consumer_module: gen_consumer_module,
consumer_module: consumer_module,
group_name: group_name,
assignments: assignments,
opts: opts
})
end

@doc """
Returns a list of child pids
Expand Down

0 comments on commit b025ac6

Please sign in to comment.