Skip to content

Commit

Permalink
Emit insert notification directly from Engine
Browse files Browse the repository at this point in the history
Notifications are sent from the engine, within the `insert_*` telemetry
block, so the timing impact is visible. In addition, notifications
aren't emitted for `scheduled` jobs, as there's nothing ready for
producers to fetch.
  • Loading branch information
sorentwo committed Feb 25, 2024
1 parent fb87366 commit a3e8a99
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 49 deletions.
17 changes: 16 additions & 1 deletion lib/oban/engine.ex
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ defmodule Oban.Engine do
"""

alias Ecto.{Changeset, Multi}
alias Oban.{Config, Job}
alias Oban.{Config, Job, Notifier}

@type conf :: Config.t()
@type job :: Job.t()
Expand Down Expand Up @@ -179,6 +179,8 @@ defmodule Oban.Engine do
def insert_job(%Config{} = conf, %Changeset{} = changeset, opts) do
with_span(:insert_job, conf, %{changeset: changeset, opts: opts}, fn engine ->
with {:ok, job} <- engine.insert_job(conf, changeset, opts) do
notify_trigger(conf, [job])

{:meta, {:ok, job}, %{job: job}}
end
end)
Expand All @@ -202,6 +204,8 @@ defmodule Oban.Engine do
with_span(:insert_all_jobs, conf, %{changesets: changesets, opts: opts}, fn engine ->
jobs = engine.insert_all_jobs(conf, expand(changesets, %{}), opts)

notify_trigger(conf, jobs)

{:meta, jobs, %{jobs: jobs}}
end)
end
Expand Down Expand Up @@ -331,4 +335,15 @@ defmodule Oban.Engine do
%Config{conf | engine: Oban.Engines.Basic}
end
end

defp notify_trigger(%{insert_trigger: true} = conf, jobs) do
payload = for job <- jobs, job.state == "available", uniq: true, do: %{queue: job.queue}

unless payload == [], do: Notifier.notify(conf, :insert, payload)
catch
# Insert notification timeouts aren't worth failing inserts over.
:exit, {:timeout, _} -> :ok
end

defp notify_trigger(_conf, _queue), do: :skip
end
5 changes: 3 additions & 2 deletions lib/oban/notifier.ex
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ defmodule Oban.Notifier do

@type channel :: atom()
@type name_or_conf :: Oban.name() | Config.t()
@type payload :: map() | [map()]
@type pubsub_status :: :unknown | :isolated | :solitary | :clustered

@doc """
Expand All @@ -99,7 +100,7 @@ defmodule Oban.Notifier do
@doc """
Broadcast a notification to all subscribers of a channel.
"""
@callback notify(name_or_conf(), channel :: channel(), payload :: [map()]) :: :ok
@callback notify(name_or_conf(), channel(), payload()) :: :ok

@doc false
@spec child_spec(Keyword.t()) :: Supervisor.child_spec()
Expand Down Expand Up @@ -185,7 +186,7 @@ defmodule Oban.Notifier do
Oban.Notifier.notify(MyOban, :my_channel, %{message: "hi!"})
"""
@spec notify(name_or_conf(), channel :: channel(), payload :: map() | [map()]) :: :ok
@spec notify(name_or_conf(), channel(), payload()) :: :ok
def notify(name_or_conf \\ Oban, channel, payload) when is_atom(channel) do
conf = if is_struct(name_or_conf, Config), do: name_or_conf, else: Oban.config(name_or_conf)
meta = %{conf: conf, channel: channel, payload: payload}
Expand Down
27 changes: 0 additions & 27 deletions lib/oban/stager.ex
Original file line number Diff line number Diff line change
Expand Up @@ -45,15 +45,6 @@ defmodule Oban.Stager do

@impl GenServer
def handle_continue(:start, %State{} = state) do
if state.conf.insert_trigger do
:telemetry.attach_many(
"oban-stager",
[[:oban, :engine, :insert_job, :stop], [:oban, :engine, :insert_all_jobs, :stop]],
&__MODULE__.handle_insert/4,
[]
)
end

state =
state
|> schedule_staging()
Expand All @@ -66,8 +57,6 @@ defmodule Oban.Stager do
def terminate(_reason, %State{timer: timer}) do
if is_reference(timer), do: Process.cancel_timer(timer)

:telemetry.detach("oban-stager")

:ok
end

Expand All @@ -93,22 +82,6 @@ defmodule Oban.Stager do
{:noreply, state}
end

def handle_insert(_event, _measure, meta, _) do
payload =
case meta do
%{job: %{queue: queue}} ->
[%{queue: queue}]

%{jobs: jobs} ->
for %{queue: queue} <- jobs, uniq: true, do: %{queue: queue}

_ ->
[]
end

Notifier.notify(meta.conf, :insert, payload)
end

defp stage_and_notify(true = _leader, state) do
Repo.transaction(state.conf, fn ->
{:ok, staged} = Engine.stage_jobs(state.conf, Job, limit: state.limit)
Expand Down
25 changes: 24 additions & 1 deletion test/oban/engine_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ for engine <- [Oban.Engines.Basic, Oban.Engines.Lite] do
alias Ecto.Adapters.SQL.Sandbox
alias Ecto.Multi
alias Oban.Engines.Lite
alias Oban.TelemetryHandler
alias Oban.{Notifier, TelemetryHandler}

@engine engine
@repo if engine == Lite, do: LiteRepo, else: Repo
Expand All @@ -30,6 +30,16 @@ for engine <- [Oban.Engines.Basic, Oban.Engines.Lite] do
assert_receive {:event, [:insert_job, :stop], _, %{job: ^job, opts: []}}
end

test "broadcasting an insert event", %{name: name} do
Notifier.listen(name, :insert)

Oban.insert(name, Worker.new(%{action: "OK", ref: 0}, queue: :alpha))
Oban.insert(name, Worker.new(%{action: "OK", ref: 0}, queue: :gamma, schedule_in: 1))

assert_received {:notification, :insert, %{"queue" => "alpha"}}
refute_received {:notification, :insert, %{"queue" => "gamma"}}
end

@tag :unique
test "inserting a job with uniqueness applied", %{name: name} do
changeset = Worker.new(%{ref: 1}, unique: [period: 60])
Expand Down Expand Up @@ -361,6 +371,19 @@ for engine <- [Oban.Engines.Basic, Oban.Engines.Lite] do
Oban.insert_all(name, changesets)
end
end

test "broadcasting an insert event for all queues", %{name: name} do
Notifier.listen(name, :insert)

Oban.insert_all(name, [
Worker.new(%{action: "OK", ref: 1}, queue: :gamma),
Worker.new(%{action: "OK", ref: 2}, queue: :gamma),
Worker.new(%{action: "OK", ref: 3}, queue: :delta)
])

assert_receive {:notification, :insert, %{"queue" => "gamma"}}
assert_receive {:notification, :insert, %{"queue" => "delta"}}
end
end

describe "insert_all/3" do
Expand Down
19 changes: 1 addition & 18 deletions test/oban/stager_test.exs
Original file line number Diff line number Diff line change
@@ -1,26 +1,9 @@
defmodule Oban.StagerTest do
use Oban.Case, async: true

alias Oban.{Notifier, Stager}
alias Oban.Stager
alias Oban.TelemetryHandler

test "broadcasting insert events on job insertion" do
name = start_supervised_oban!(stage_interval: 10_000)

Notifier.listen(name, :insert)

Oban.insert(name, Worker.new(%{action: "OK", ref: 0}, queue: :alpha))

Oban.insert_all(name, [
Worker.new(%{action: "OK", ref: 1}, queue: :gamma),
Worker.new(%{action: "OK", ref: 2}, queue: :delta)
])

assert_receive {:notification, :insert, %{"queue" => "alpha"}}
assert_receive {:notification, :insert, %{"queue" => "gamma"}}
assert_receive {:notification, :insert, %{"queue" => "delta"}}
end

test "descheduling jobs to make them available for execution" do
job_1 = insert!(%{}, state: "scheduled", scheduled_at: seconds_ago(10))
job_2 = insert!(%{}, state: "scheduled", scheduled_at: seconds_from_now(10))
Expand Down

0 comments on commit a3e8a99

Please sign in to comment.