Skip to content

Commit

Permalink
Start all queues in parallel on init
Browse files Browse the repository at this point in the history
The midwife now starts queues using an async stream to parallelize
startup and minimize boot time for applications with many queues.
  • Loading branch information
sorentwo committed Aug 21, 2024
1 parent e59973f commit 54679a1
Showing 1 changed file with 11 additions and 8 deletions.
19 changes: 11 additions & 8 deletions lib/oban/midwife.ex
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ defmodule Oban.Midwife do
GenServer.start_link(__MODULE__, struct!(State, opts), name: name)
end

@spec start_queue(Config.t(), Keyword.t()) :: DynamicSupervisor.on_start_child()
@spec start_queue(Config.t(), Keyword.t() | {String.t(), Keyword.t()}) ::
DynamicSupervisor.on_start_child()
def start_queue(conf, opts) when is_list(opts) do
queue =
opts
Expand All @@ -33,6 +34,12 @@ defmodule Oban.Midwife do
|> DynamicSupervisor.start_child({Queue.Supervisor, opts})
end

def start_queue(conf, {queue, opts}) do
opts
|> Keyword.put(:queue, queue)
|> then(&start_queue(conf, &1))
end

@spec stop_queue(Config.t(), atom() | String.t()) :: :ok | {:error, :not_found}
def stop_queue(conf, queue) do
case Registry.whereis(conf.name, {:queue, queue}) do
Expand All @@ -48,17 +55,13 @@ defmodule Oban.Midwife do

@impl GenServer
def init(state) do
start_all_queues(state.conf)
state.conf.queues
|> Task.async_stream(fn opts -> {:ok, _} = start_queue(state.conf, opts) end)
|> Stream.run()

{:ok, state, {:continue, :start}}
end

defp start_all_queues(conf) do
for {queue, opts} <- conf.queues do
{:ok, _pid} = start_queue(conf, Keyword.put(opts, :queue, queue))
end
end

@impl GenServer
def handle_continue(:start, %State{conf: conf} = state) do
Notifier.listen(conf.name, :signal)
Expand Down

0 comments on commit 54679a1

Please sign in to comment.