From 54679a1bf6b0fe582578d8c3c297ff83cd35a359 Mon Sep 17 00:00:00 2001 From: Parker Selbert Date: Wed, 21 Aug 2024 16:34:36 -0500 Subject: [PATCH] Start all queues in parallel on init The midwife now starts queues using an async stream to parallelize startup and minimize boot time for applications with many queues. --- lib/oban/midwife.ex | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/lib/oban/midwife.ex b/lib/oban/midwife.ex index 5e9d7781..b715bb07 100644 --- a/lib/oban/midwife.ex +++ b/lib/oban/midwife.ex @@ -15,7 +15,8 @@ defmodule Oban.Midwife do GenServer.start_link(__MODULE__, struct!(State, opts), name: name) end - @spec start_queue(Config.t(), Keyword.t()) :: DynamicSupervisor.on_start_child() + @spec start_queue(Config.t(), Keyword.t() | {String.t(), Keyword.t()}) :: + DynamicSupervisor.on_start_child() def start_queue(conf, opts) when is_list(opts) do queue = opts @@ -33,6 +34,12 @@ defmodule Oban.Midwife do |> DynamicSupervisor.start_child({Queue.Supervisor, opts}) end + def start_queue(conf, {queue, opts}) do + opts + |> Keyword.put(:queue, queue) + |> then(&start_queue(conf, &1)) + end + @spec stop_queue(Config.t(), atom() | String.t()) :: :ok | {:error, :not_found} def stop_queue(conf, queue) do case Registry.whereis(conf.name, {:queue, queue}) do @@ -48,17 +55,13 @@ defmodule Oban.Midwife do @impl GenServer def init(state) do - start_all_queues(state.conf) + state.conf.queues + |> Task.async_stream(fn opts -> {:ok, _} = start_queue(state.conf, opts) end) + |> Stream.run() {:ok, state, {:continue, :start}} end - defp start_all_queues(conf) do - for {queue, opts} <- conf.queues do - {:ok, _pid} = start_queue(conf, Keyword.put(opts, :queue, queue)) - end - end - @impl GenServer def handle_continue(:start, %State{conf: conf} = state) do Notifier.listen(conf.name, :signal)