Skip to content

Commit

Permalink
Check process and propagate notify errors
Browse files Browse the repository at this point in the history
The `Notifier.notify/1` spec showed it would always return `:ok`, but
that wasn't the case when the notifier was disconnected or the process
was no longer running. Now an error tuple is returned when a notifier
process isn't running.

This situation happened most frequently during shutdown, particularly
from external usage of the Notifier like an application or the
`oban_met` package.

In addition, the errors bubble up through top level `Oban` functions
like `scale_queue/1`, `pause_queue/1`, etc. to indicate that the
operation won't actually succeed.

Closes #1060
  • Loading branch information
sorentwo committed Mar 27, 2024
1 parent fa67438 commit a5aa0a8
Show file tree
Hide file tree
Showing 6 changed files with 37 additions and 25 deletions.
30 changes: 17 additions & 13 deletions lib/oban.ex
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,20 @@ defmodule Oban do
"""
@type name :: term()

@type oban_node :: String.t()

@type queue_name :: atom() | binary()

@type queue_option ::
{:queue, queue_name()}
| {:limit, pos_integer()}
| {:local_only, boolean()}
| {:node, String.t()}
{:local_only, boolean()}
| {:node, oban_node()}
| {:queue, queue_name()}

@type queue_all_option :: {:local_only, boolean()} | {:node, oban_node()}

@type queue_state :: %{
:limit => pos_integer(),
:node => binary(),
:node => oban_node(),
:paused => boolean(),
:queue => queue_name(),
:running => [pos_integer()],
Expand All @@ -45,7 +48,7 @@ defmodule Oban do
| {:get_dynamic_repo, nil | (-> pid() | atom())}
| {:log, false | Logger.level()}
| {:name, name()}
| {:node, String.t()}
| {:node, oban_node()}
| {:notifier, module() | {module(), Keyword.t()}}
| {:peer, false | module() | {module(), Keyword.t()}}
| {:plugins, false | [module() | {module() | Keyword.t()}]}
Expand Down Expand Up @@ -825,7 +828,7 @@ defmodule Oban do
:ok
"""
@doc since: "0.12.0"
@spec start_queue(name(), opts :: Keyword.t()) :: :ok
@spec start_queue(name(), opts :: Keyword.t()) :: :ok | {:error, Exception.t()}
def start_queue(name \\ __MODULE__, [_ | _] = opts) do
conf = config(name)

Expand Down Expand Up @@ -877,7 +880,7 @@ defmodule Oban do
:ok
"""
@doc since: "0.2.0"
@spec pause_queue(name(), opts :: [queue_option()]) :: :ok
@spec pause_queue(name(), opts :: [queue_option()]) :: :ok | {:error, Exception.t()}
def pause_queue(name \\ __MODULE__, [_ | _] = opts) do
validate_queue_opts!(opts, [:queue, :local_only, :node])
validate_queue_exists!(name, opts)
Expand Down Expand Up @@ -912,7 +915,7 @@ defmodule Oban do
Oban.pause_all_queues(MyOban)
"""
@doc since: "2.17.0"
@spec pause_all_queues(name(), opts :: [local_only: boolean(), node: String.t()]) :: :ok
@spec pause_all_queues(name(), opts :: [queue_all_option()]) :: :ok | {:error, Exception.t()}
def pause_all_queues(name, opts) do
pause_queue(name, Keyword.put(opts, :queue, :*))
end
Expand Down Expand Up @@ -951,7 +954,7 @@ defmodule Oban do
Oban.resume_queue(queue: :default, node: "worker.1")
"""
@doc since: "0.2.0"
@spec resume_queue(name(), opts :: [queue_option()]) :: :ok
@spec resume_queue(name(), opts :: [queue_option()]) :: :ok | {:error, Exception.t()}
def resume_queue(name \\ __MODULE__, [_ | _] = opts) do
validate_queue_opts!(opts, [:queue, :local_only, :node])
validate_queue_exists!(name, opts)
Expand Down Expand Up @@ -986,7 +989,7 @@ defmodule Oban do
Oban.resume_all_queues(MyOban)
"""
@doc since: "2.17.0"
@spec resume_all_queues(name(), opts :: [local_only: boolean(), node: String.t()]) :: :ok
@spec resume_all_queues(name(), opts :: [queue_all_option()]) :: :ok | {:error, Exception.t()}
def resume_all_queues(name, opts) do
resume_queue(name, Keyword.put(opts, :queue, :*))
end
Expand Down Expand Up @@ -1037,7 +1040,8 @@ defmodule Oban do
:ok
"""
@doc since: "0.2.0"
@spec scale_queue(name(), opts :: [queue_option()]) :: :ok
@spec scale_queue(name(), opts :: [queue_option() | {:limit, pos_integer()}]) ::
:ok | {:error, Exception.t()}
def scale_queue(name \\ __MODULE__, [_ | _] = opts) do
conf = config(name)

Expand Down Expand Up @@ -1094,7 +1098,7 @@ defmodule Oban do
:ok
"""
@doc since: "0.12.0"
@spec stop_queue(name(), opts :: [queue_option()]) :: :ok
@spec stop_queue(name(), opts :: [queue_option()]) :: :ok | {:error, Exception.t()}
def stop_queue(name \\ __MODULE__, [_ | _] = opts) do
validate_queue_opts!(opts, [:queue, :local_only, :node])
validate_queue_exists!(name, opts)
Expand Down
16 changes: 9 additions & 7 deletions lib/oban/notifier.ex
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ defmodule Oban.Notifier do
@doc """
Broadcast a notification to all subscribers of a channel.
"""
@callback notify(name_or_conf(), channel(), payload()) :: :ok | {:error, any()}
@callback notify(name_or_conf(), channel(), payload()) :: :ok | {:error, Exception.t()}

@doc false
@spec child_spec(Keyword.t()) :: Supervisor.child_spec()
Expand Down Expand Up @@ -186,7 +186,7 @@ defmodule Oban.Notifier do
Oban.Notifier.notify(MyOban, :my_channel, %{message: "hi!"})
"""
@spec notify(name_or_conf(), channel(), payload()) :: :ok
@spec notify(name_or_conf(), channel(), payload()) :: :ok | {:error, Exception.t()}
def notify(name_or_conf \\ Oban, channel, payload) when is_atom(channel) do
conf = if is_struct(name_or_conf, Config), do: name_or_conf, else: Oban.config(name_or_conf)
meta = %{conf: conf, channel: channel, payload: payload}
Expand All @@ -197,9 +197,7 @@ defmodule Oban.Notifier do
|> List.wrap()
|> Enum.map(&encode/1)

apply_callback(conf, :notify, [channel, payload])

{:ok, meta}
{apply_callback(conf, :notify, [channel, payload]), meta}
end)
end

Expand Down Expand Up @@ -271,9 +269,13 @@ defmodule Oban.Notifier do

%{name: name, notifier: {notifier, _}} = conf

pid = Registry.whereis(name, __MODULE__)
case Registry.whereis(name, __MODULE__) do
pid when is_pid(pid) ->
apply(notifier, callback, [pid | args])

apply(notifier, callback, [pid | args])
_ ->
{:error, RuntimeError.exception("no notifier running for instance #{inspect(name)}")}
end
end

defp normalize_channels(channels) do
Expand Down
4 changes: 2 additions & 2 deletions lib/oban/notifiers/isolated.ex
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@ defmodule Oban.Notifiers.Isolated do
for {pid, channels} <- state.listeners, message <- payload, channel in channels do
Notifier.relay(state.conf, [pid], channel, message)
end

:ok
end

:ok
end

@impl GenServer
Expand Down
2 changes: 1 addition & 1 deletion lib/oban/notifiers/pg.ex
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ defmodule Oban.Notifiers.PG do

case Oban.Registry.lookup(name) do
{_pid, state} -> state
nil -> :error
nil -> {:error, RuntimeError.exception("no notifier running as #{inspect(name)}")}
end
end

Expand Down
2 changes: 1 addition & 1 deletion lib/oban/notifiers/postgres.ex
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ if Code.ensure_loaded?(Postgrex) do

case Oban.Registry.lookup(name) do
{_pid, state} -> state
nil -> :error
nil -> {:error, RuntimeError.exception("no notifier running as #{inspect(name)}")}
end
end

Expand Down
8 changes: 7 additions & 1 deletion test/oban/notifier_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ for notifier <- [Oban.Notifiers.Isolated, Oban.Notifiers.PG, Oban.Notifiers.Post
use Oban.Case, async: notifier != Oban.Notifiers.Postgres

alias Ecto.Adapters.SQL.Sandbox
alias Oban.Notifier
alias Oban.{Config, Notifier}

@notifier notifier

Expand All @@ -19,6 +19,12 @@ for notifier <- [Oban.Notifiers.Isolated, Oban.Notifiers.PG, Oban.Notifiers.Post
end)
end

test "returning an error without a live notifier process" do
conf = Config.new(repo: Repo, notifier: @notifier)

assert {:error, %RuntimeError{}} = Notifier.notify(conf, :signal, %{})
end

test "notifying with complex types" do
unboxed_run(fn ->
name = start_supervised_oban!(notifier: @notifier)
Expand Down

0 comments on commit a5aa0a8

Please sign in to comment.