From a3e8a9914c337cdeaccad033ea7ab55589033111 Mon Sep 17 00:00:00 2001 From: Parker Selbert Date: Sun, 25 Feb 2024 20:14:32 +0000 Subject: [PATCH] Emit insert notification directly from Engine Notifications are sent from the engine, within the `insert_*` telemetry block, so the timing impact is visible. In addition, notifications aren't emitted for `scheduled` jobs, as there's nothing ready for producers to fetch. --- lib/oban/engine.ex | 17 ++++++++++++++++- lib/oban/notifier.ex | 5 +++-- lib/oban/stager.ex | 27 --------------------------- test/oban/engine_test.exs | 25 ++++++++++++++++++++++++- test/oban/stager_test.exs | 19 +------------------ 5 files changed, 44 insertions(+), 49 deletions(-) diff --git a/lib/oban/engine.ex b/lib/oban/engine.ex index 4c2e35d7..77bde12f 100644 --- a/lib/oban/engine.ex +++ b/lib/oban/engine.ex @@ -20,7 +20,7 @@ defmodule Oban.Engine do """ alias Ecto.{Changeset, Multi} - alias Oban.{Config, Job} + alias Oban.{Config, Job, Notifier} @type conf :: Config.t() @type job :: Job.t() @@ -179,6 +179,8 @@ defmodule Oban.Engine do def insert_job(%Config{} = conf, %Changeset{} = changeset, opts) do with_span(:insert_job, conf, %{changeset: changeset, opts: opts}, fn engine -> with {:ok, job} <- engine.insert_job(conf, changeset, opts) do + notify_trigger(conf, [job]) + {:meta, {:ok, job}, %{job: job}} end end) @@ -202,6 +204,8 @@ defmodule Oban.Engine do with_span(:insert_all_jobs, conf, %{changesets: changesets, opts: opts}, fn engine -> jobs = engine.insert_all_jobs(conf, expand(changesets, %{}), opts) + notify_trigger(conf, jobs) + {:meta, jobs, %{jobs: jobs}} end) end @@ -331,4 +335,15 @@ defmodule Oban.Engine do %Config{conf | engine: Oban.Engines.Basic} end end + + defp notify_trigger(%{insert_trigger: true} = conf, jobs) do + payload = for job <- jobs, job.state == "available", uniq: true, do: %{queue: job.queue} + + unless payload == [], do: Notifier.notify(conf, :insert, payload) + catch + # Insert notification timeouts aren't worth failing inserts over. + :exit, {:timeout, _} -> :ok + end + + defp notify_trigger(_conf, _queue), do: :skip end diff --git a/lib/oban/notifier.ex b/lib/oban/notifier.ex index b2b8f519..b3225a23 100644 --- a/lib/oban/notifier.ex +++ b/lib/oban/notifier.ex @@ -79,6 +79,7 @@ defmodule Oban.Notifier do @type channel :: atom() @type name_or_conf :: Oban.name() | Config.t() + @type payload :: map() | [map()] @type pubsub_status :: :unknown | :isolated | :solitary | :clustered @doc """ @@ -99,7 +100,7 @@ defmodule Oban.Notifier do @doc """ Broadcast a notification to all subscribers of a channel. """ - @callback notify(name_or_conf(), channel :: channel(), payload :: [map()]) :: :ok + @callback notify(name_or_conf(), channel(), payload()) :: :ok @doc false @spec child_spec(Keyword.t()) :: Supervisor.child_spec() @@ -185,7 +186,7 @@ defmodule Oban.Notifier do Oban.Notifier.notify(MyOban, :my_channel, %{message: "hi!"}) """ - @spec notify(name_or_conf(), channel :: channel(), payload :: map() | [map()]) :: :ok + @spec notify(name_or_conf(), channel(), payload()) :: :ok def notify(name_or_conf \\ Oban, channel, payload) when is_atom(channel) do conf = if is_struct(name_or_conf, Config), do: name_or_conf, else: Oban.config(name_or_conf) meta = %{conf: conf, channel: channel, payload: payload} diff --git a/lib/oban/stager.ex b/lib/oban/stager.ex index 4b7f1e9e..089ca12c 100644 --- a/lib/oban/stager.ex +++ b/lib/oban/stager.ex @@ -45,15 +45,6 @@ defmodule Oban.Stager do @impl GenServer def handle_continue(:start, %State{} = state) do - if state.conf.insert_trigger do - :telemetry.attach_many( - "oban-stager", - [[:oban, :engine, :insert_job, :stop], [:oban, :engine, :insert_all_jobs, :stop]], - &__MODULE__.handle_insert/4, - [] - ) - end - state = state |> schedule_staging() @@ -66,8 +57,6 @@ defmodule Oban.Stager do def terminate(_reason, %State{timer: timer}) do if is_reference(timer), do: Process.cancel_timer(timer) - :telemetry.detach("oban-stager") - :ok end @@ -93,22 +82,6 @@ defmodule Oban.Stager do {:noreply, state} end - def handle_insert(_event, _measure, meta, _) do - payload = - case meta do - %{job: %{queue: queue}} -> - [%{queue: queue}] - - %{jobs: jobs} -> - for %{queue: queue} <- jobs, uniq: true, do: %{queue: queue} - - _ -> - [] - end - - Notifier.notify(meta.conf, :insert, payload) - end - defp stage_and_notify(true = _leader, state) do Repo.transaction(state.conf, fn -> {:ok, staged} = Engine.stage_jobs(state.conf, Job, limit: state.limit) diff --git a/test/oban/engine_test.exs b/test/oban/engine_test.exs index 62e141ee..36c900ba 100644 --- a/test/oban/engine_test.exs +++ b/test/oban/engine_test.exs @@ -5,7 +5,7 @@ for engine <- [Oban.Engines.Basic, Oban.Engines.Lite] do alias Ecto.Adapters.SQL.Sandbox alias Ecto.Multi alias Oban.Engines.Lite - alias Oban.TelemetryHandler + alias Oban.{Notifier, TelemetryHandler} @engine engine @repo if engine == Lite, do: LiteRepo, else: Repo @@ -30,6 +30,16 @@ for engine <- [Oban.Engines.Basic, Oban.Engines.Lite] do assert_receive {:event, [:insert_job, :stop], _, %{job: ^job, opts: []}} end + test "broadcasting an insert event", %{name: name} do + Notifier.listen(name, :insert) + + Oban.insert(name, Worker.new(%{action: "OK", ref: 0}, queue: :alpha)) + Oban.insert(name, Worker.new(%{action: "OK", ref: 0}, queue: :gamma, schedule_in: 1)) + + assert_received {:notification, :insert, %{"queue" => "alpha"}} + refute_received {:notification, :insert, %{"queue" => "gamma"}} + end + @tag :unique test "inserting a job with uniqueness applied", %{name: name} do changeset = Worker.new(%{ref: 1}, unique: [period: 60]) @@ -361,6 +371,19 @@ for engine <- [Oban.Engines.Basic, Oban.Engines.Lite] do Oban.insert_all(name, changesets) end end + + test "broadcasting an insert event for all queues", %{name: name} do + Notifier.listen(name, :insert) + + Oban.insert_all(name, [ + Worker.new(%{action: "OK", ref: 1}, queue: :gamma), + Worker.new(%{action: "OK", ref: 2}, queue: :gamma), + Worker.new(%{action: "OK", ref: 3}, queue: :delta) + ]) + + assert_receive {:notification, :insert, %{"queue" => "gamma"}} + assert_receive {:notification, :insert, %{"queue" => "delta"}} + end end describe "insert_all/3" do diff --git a/test/oban/stager_test.exs b/test/oban/stager_test.exs index ce61fd79..655d77e7 100644 --- a/test/oban/stager_test.exs +++ b/test/oban/stager_test.exs @@ -1,26 +1,9 @@ defmodule Oban.StagerTest do use Oban.Case, async: true - alias Oban.{Notifier, Stager} + alias Oban.Stager alias Oban.TelemetryHandler - test "broadcasting insert events on job insertion" do - name = start_supervised_oban!(stage_interval: 10_000) - - Notifier.listen(name, :insert) - - Oban.insert(name, Worker.new(%{action: "OK", ref: 0}, queue: :alpha)) - - Oban.insert_all(name, [ - Worker.new(%{action: "OK", ref: 1}, queue: :gamma), - Worker.new(%{action: "OK", ref: 2}, queue: :delta) - ]) - - assert_receive {:notification, :insert, %{"queue" => "alpha"}} - assert_receive {:notification, :insert, %{"queue" => "gamma"}} - assert_receive {:notification, :insert, %{"queue" => "delta"}} - end - test "descheduling jobs to make them available for execution" do job_1 = insert!(%{}, state: "scheduled", scheduled_at: seconds_ago(10)) job_2 = insert!(%{}, state: "scheduled", scheduled_at: seconds_from_now(10))