Skip to content

Commit

Permalink
Support Notifier and Peer configuration options
Browse files Browse the repository at this point in the history
Config for both `:notifier` and `:peer` now accept a tuple with keyword
options in addition to simple module names. The extra options are passed
to the module's `start_link` function.

This change is to enable more configurable Notifier and Peer
implementations, neither of the built-in modules require additional
options.
  • Loading branch information
sorentwo committed Nov 5, 2023
1 parent a7d93e9 commit 652378e
Show file tree
Hide file tree
Showing 9 changed files with 116 additions and 54 deletions.
4 changes: 2 additions & 2 deletions lib/oban.ex
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ defmodule Oban do
| {:log, false | Logger.level()}
| {:name, name()}
| {:node, String.t()}
| {:notifier, module()}
| {:peer, false | module()}
| {:notifier, module() | {module(), Keyword.t()}}
| {:peer, false | module() | {module(), Keyword.t()}}
| {:plugins, false | [module() | {module() | Keyword.t()}]}
| {:prefix, String.t()}
| {:queues, false | [{queue_name(), pos_integer() | Keyword.t()}]}
Expand Down
72 changes: 55 additions & 17 deletions lib/oban/config.ex
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@ defmodule Oban.Config do
log: false | Logger.level(),
name: Oban.name(),
node: String.t(),
notifier: module(),
peer: false | module(),
plugins: false | [module() | {module() | Keyword.t()}],
notifier: {module(), Keyword.t()},
peer: {module(), Keyword.t()},
plugins: [module() | {module() | Keyword.t()}],
prefix: false | String.t(),
queues: false | Keyword.t(Keyword.t()),
queues: Keyword.t(Keyword.t()),
repo: module(),
shutdown_grace_period: timeout(),
stage_interval: timeout(),
Expand All @@ -34,8 +34,8 @@ defmodule Oban.Config do
log: false,
name: Oban,
node: nil,
notifier: Oban.Notifiers.Postgres,
peer: Oban.Peers.Postgres,
notifier: {Oban.Notifiers.Postgres, []},
peer: {Oban.Peers.Postgres, []},
plugins: [],
prefix: "public",
queues: [],
Expand All @@ -46,8 +46,8 @@ defmodule Oban.Config do

@cron_keys ~w(crontab timezone)a
@log_levels ~w(false emergency alert critical error warning warn notice info debug)a
@renamed [{:engine, Oban.Queue.BasicEngine}, {:notifier, {Oban.PostgresNotifier, []}}]
@testing_modes ~w(manual inline disabled)a
@renamed [{:engine, Oban.Queue.BasicEngine}, {:notifier, Oban.PostgresNotifier}]

@doc """
Generate a Config struct after normalizing and verifying Oban options.
Expand All @@ -68,18 +68,18 @@ defmodule Oban.Config do
if opts[:engine] == Oban.Engines.Lite do
opts
|> Keyword.put(:prefix, false)
|> Keyword.put_new(:notifier, Oban.Notifiers.PG)
|> Keyword.put_new(:peer, Oban.Peers.Isolated)
|> Keyword.put_new(:notifier, {Oban.Notifiers.PG, []})
|> Keyword.put_new(:peer, {Oban.Peers.Isolated, []})
else
opts
end

opts =
if opts[:testing] in [:manual, :inline] do
opts
|> Keyword.put(:queues, [])
|> Keyword.put(:peer, Oban.Peers.Disabled)
|> Keyword.put(:peer, {Oban.Peers.Disabled, []})
|> Keyword.put(:plugins, [])
|> Keyword.put(:queues, [])
|> Keyword.put(:stage_interval, :infinity)
else
opts
Expand Down Expand Up @@ -190,6 +190,12 @@ defmodule Oban.Config do
end
end

defp validate_opt(_opts, {:notifier, {notifier, opts}}) do
with :ok <- validate_opt([], {:notifier, notifier}) do
validate_keyword(:notifier, opts)
end
end

defp validate_opt(_opts, {:notifier, notifier}) do
if Code.ensure_loaded?(notifier) and function_exported?(notifier, :listen, 2) do
:ok
Expand All @@ -208,6 +214,12 @@ defmodule Oban.Config do
end
end

defp validate_opt(_opts, {:peer, {peer, opts}}) do
with :ok <- validate_opt([], {:peer, peer}) do
validate_keyword(:peer, opts)
end
end

defp validate_opt(_opts, {:peer, peer}) do
if peer == false or Code.ensure_loaded?(peer) do
:ok
Expand Down Expand Up @@ -282,6 +294,14 @@ defmodule Oban.Config do
{:unknown, option, __MODULE__}
end

defp validate_keyword(key, opts) do
if Keyword.keyword?(opts) do
:ok
else
{:error, "expected #{key} opts to be a keyword list, got: #{inspect(opts)}"}
end
end

defp validate_plugin(plugin) when not is_tuple(plugin), do: validate_plugin({plugin, []})

defp validate_plugin({plugin, opts}) do
Expand Down Expand Up @@ -339,7 +359,8 @@ defmodule Oban.Config do
defp normalize(opts) do
opts
|> crontab_to_plugin()
|> peer_to_disabled()
|> normalize_notifier()
|> normalize_peer()
|> Keyword.put_new(:node, node_name())
|> Keyword.update(:queues, [], &normalize_queues/1)
|> Keyword.update(:plugins, [], &normalize_plugins/1)
Expand Down Expand Up @@ -383,11 +404,28 @@ defmodule Oban.Config do
end
end

defp peer_to_disabled(opts) do
if opts[:peer] == false or opts[:plugins] == false do
Keyword.put(opts, :peer, Oban.Peers.Disabled)
else
opts
defp normalize_notifier(opts) do
case Keyword.get(opts, :notifier) do
module when is_atom(module) and not is_nil(module) ->
Keyword.put(opts, :notifier, {module, []})

_ ->
opts
end
end

defp normalize_peer(opts) do
peer = opts[:peer]

cond do
peer == false or opts[:plugins] == false ->
Keyword.put(opts, :peer, {Oban.Peers.Disabled, []})

is_atom(peer) and not is_nil(peer) ->
Keyword.put(opts, :peer, {peer, []})

true ->
opts
end
end

Expand Down
25 changes: 16 additions & 9 deletions lib/oban/notifier.ex
Original file line number Diff line number Diff line change
Expand Up @@ -89,11 +89,16 @@ defmodule Oban.Notifier do
@callback notify(server(), channel :: channel(), payload :: [map()]) :: :ok

@doc false
@spec child_spec(Keyword.t()) :: Supervisor.child_spec()
def child_spec(opts) do
conf = Keyword.fetch!(opts, :conf)
opts = Keyword.put_new(opts, :name, conf.notifier)
%{notifier: {notifier, note_opts}} = Keyword.fetch!(opts, :conf)

%{id: opts[:name], start: {conf.notifier, :start_link, [opts]}}
opts =
opts
|> Keyword.merge(note_opts)
|> Keyword.put_new(:name, notifier)

%{id: opts[:name], start: {notifier, :start_link, [opts]}}
end

@doc """
Expand Down Expand Up @@ -131,11 +136,11 @@ defmodule Oban.Notifier do
raise ArgumentError, "expected channels to be a list of atoms, got: #{inspect(channels)}"
end

conf = Oban.config(name)
%{notifier: {notifier, _}} = Oban.config(name)

name
|> Registry.whereis(Oban.Notifier)
|> conf.notifier.listen(channels)
|> notifier.listen(channels)
end

@doc """
Expand All @@ -153,11 +158,11 @@ defmodule Oban.Notifier do
"""
@spec unlisten(server(), [channel]) :: :ok
def unlisten(name \\ Oban, channels) when is_list(channels) do
conf = Oban.config(name)
%{notifier: {notifier, _}} = Oban.config(name)

name
|> Registry.whereis(Oban.Notifier)
|> conf.notifier.unlisten(channels)
|> notifier.unlisten(channels)
end

@doc """
Expand All @@ -179,10 +184,12 @@ defmodule Oban.Notifier do
def notify(conf_or_name \\ Oban, channel, payload)

def notify(%Config{} = conf, channel, payload) when is_atom(channel) do
%{name: name, notifier: {notifier, _}} = conf

with_span(conf, channel, payload, fn ->
conf.name
name
|> Registry.whereis(Oban.Notifier)
|> conf.notifier.notify(channel, normalize_payload(payload))
|> notifier.notify(channel, normalize_payload(payload))
end)
end

Expand Down
1 change: 1 addition & 0 deletions lib/oban/notifiers/postgres.ex
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ if Code.ensure_loaded?(Postgrex) do

@impl Simple
def init(opts) do
IO.inspect(opts)

Check warning on line 104 in lib/oban/notifiers/postgres.ex

View workflow job for this annotation

GitHub Actions / ci (1.15, 26.0, 15.1-alpine, gossip, lint)

There should be no calls to IO.inspect/1.
{:ok, struct!(__MODULE__, opts)}
end

Expand Down
32 changes: 17 additions & 15 deletions lib/oban/peer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,19 @@ defmodule Oban.Peer do
"""
@callback leader?(Config.t() | GenServer.server()) :: boolean()

@doc false
@spec child_spec(Keyword.t()) :: Supervisor.child_spec()
def child_spec(opts) do
%{peer: {peer, peer_opts}} = Keyword.fetch!(opts, :conf)

opts =
opts
|> Keyword.merge(peer_opts)
|> Keyword.put_new(:name, peer)

%{id: opts[:name], start: {peer, :start_link, [opts]}}
end

@doc """
Check whether the current instance leads the cluster.
Expand All @@ -93,19 +106,17 @@ defmodule Oban.Peer do
@spec leader?(Config.t() | GenServer.server()) :: boolean()
def leader?(conf_or_name \\ Oban, timeout \\ 5_000)

def leader?(%Config{} = conf, timeout) do
case Registry.whereis(conf.name, Oban.Peer) do
def leader?(%Config{name: name, peer: {peer, _}}, timeout) do
case Registry.whereis(name, Oban.Peer) do
pid when is_pid(pid) ->
conf.peer.leader?(pid, timeout)
peer.leader?(pid, timeout)

nil ->
false
end
catch
:exit, {:timeout, _} = reason ->
Logger.warning("""
Oban.Peer.leader?/2 check failed due to #{inspect(reason)}.
""")
Logger.warning("Oban.Peer.leader?/2 check failed due to #{inspect(reason)}.")

false
end
Expand All @@ -115,13 +126,4 @@ defmodule Oban.Peer do
|> Oban.config()
|> leader?(timeout)
end

@doc false
@spec child_spec(Keyword.t()) :: Supervisor.child_spec()
def child_spec(opts) do
conf = Keyword.fetch!(opts, :conf)
opts = Keyword.put_new(opts, :name, conf.peer)

%{id: opts[:name], start: {conf.peer, :start_link, [opts]}}
end
end
21 changes: 14 additions & 7 deletions test/oban/config_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,10 @@ defmodule Oban.ConfigTest do
test ":notifier is validated as a notifier module" do
refute_valid(notifier: nil)
refute_valid(notifier: Repo)
refute_valid(notifier: {Oban.Notifiers.Postgres, true})

assert_valid(notifier: Oban.Notifiers.Postgres)
assert_valid(notifier: {Oban.Notifiers.Postgres, [some: :opt]})
end

test ":node is validated as a binary" do
Expand All @@ -53,10 +55,12 @@ defmodule Oban.ConfigTest do
test ":peer is validated as false or a peer module" do
refute_valid(peer: nil)
refute_valid(peer: Fake)
refute_valid(peer: {Oban.Peers.Global, false})

assert_valid(peer: false)
assert_valid(peer: Oban.Peers.Global)
assert_valid(peer: Oban.Peers.Postgres)
assert_valid(peer: {Oban.Peers.Postgres, [some: :opt]})
end

test ":plugins are validated as complete plugins with possible options" do
Expand Down Expand Up @@ -130,7 +134,8 @@ defmodule Oban.ConfigTest do
end

test ":notifier translates to the correct postgres module" do
assert %Config{notifier: Oban.Notifiers.Postgres} = conf(notifier: Oban.PostgresNotifier)
assert %Config{notifier: {Oban.Notifiers.Postgres, []}} =
conf(notifier: Oban.PostgresNotifier)
end

test ":engine translates to the correct basic module" do
Expand All @@ -145,7 +150,7 @@ defmodule Oban.ConfigTest do
assert conf = conf(queues: [alpha: 1], plugins: [Pruner], testing: :manual)

assert %{queues: [], plugins: []} = conf
assert %{peer: Oban.Peers.Disabled, stage_interval: :infinity} = conf
assert %{peer: {Oban.Peers.Disabled, []}, stage_interval: :infinity} = conf
end

test "normalizing plugins as a module options tuple" do
Expand Down Expand Up @@ -176,17 +181,19 @@ defmodule Oban.ConfigTest do
end

test "translating peer false to the disabled module" do
assert %Config{peer: Oban.Peers.Disabled} = conf(peer: false)
assert %Config{peer: Oban.Peers.Disabled} = conf(plugins: false)
assert %Config{peer: Oban.Peers.Disabled} = conf(peer: Oban.Peers.Global, plugins: false)
assert %Config{peer: {Oban.Peers.Disabled, []}} = conf(peer: false)
assert %Config{peer: {Oban.Peers.Disabled, []}} = conf(plugins: false)

assert %Config{peer: {Oban.Peers.Disabled, []}} =
conf(peer: Oban.Peers.Global, plugins: false)
end

test "setting sane defaults for the Lite engine" do
conf = conf(engine: Oban.Engines.Lite)

refute conf.prefix
assert conf.notifier == Oban.Notifiers.PG
assert conf.peer == Oban.Peers.Isolated
assert {Oban.Notifiers.PG, []} = conf.notifier
assert {Oban.Peers.Isolated, []} = conf.peer
end
end

Expand Down
11 changes: 9 additions & 2 deletions test/oban/peer_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ defmodule Oban.PeerTest do
use Oban.Case

alias Oban.{Peer, Registry}
alias Oban.Peers.{Global, Postgres}

describe "configuration" do
test "leadership is disabled when peer is false" do
Expand All @@ -17,10 +18,16 @@ defmodule Oban.PeerTest do
end
end

for peer <- [Oban.Peers.Global, Oban.Peers.Postgres] do
for peer <- [Global, Postgres] do
@peer peer

describe "using #{peer}" do
test "forwarding opts to peer instances" do
assert_raise RuntimeError, ~r/key :unknown not found/, fn ->
start_supervised_oban!(peer: {@peer, unknown: :option})
end
end

test "a single node acquires leadership" do
name = start_supervised_oban!(peer: @peer, poll_interval: 250)

Expand All @@ -29,7 +36,7 @@ defmodule Oban.PeerTest do

test "leadership transfers to another peer when the leader exits" do
name = start_supervised_oban!(plugins: false)
conf = %{Oban.config(name) | peer: @peer}
conf = %{Oban.config(name) | peer: {@peer, []}}

peer_a = start_supervised!({Peer, conf: conf, name: Peer.A})

Expand Down
2 changes: 1 addition & 1 deletion test/oban/peers/global_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ defmodule Oban.Peers.GlobalTest do
[peer: false, node: "worker.1"]
|> start_supervised_oban!()
|> Oban.config()
|> Map.put(:peer, Global)
|> Map.put(:peer, {Global, []})

peer_1 = start_supervised!({Peer, name: A, conf: %{conf | name: Oban, node: "web.1"}})
peer_2 = start_supervised!({Peer, name: B, conf: %{conf | name: Oban, node: "web.2"}})
Expand Down
Loading

0 comments on commit 652378e

Please sign in to comment.