From 98e4b39bf7f98a7560901bd5d727217f998e17cf Mon Sep 17 00:00:00 2001 From: Parker Selbert Date: Wed, 10 Apr 2024 20:56:39 +0100 Subject: [PATCH] Support alternative namespacing in PG notifier By default, all Oban instances using the same `prefix` option would receive notifications from each other. Now you can use the `namespace` option to separate instances that are in the same cluster _without_ changing the `prefix`. Addresses #1065 --- lib/oban/notifiers/pg.ex | 23 +++++++++++++++++++---- test/oban/notifiers/pg_test.exs | 28 ++++++++++++++++++++++++++++ 2 files changed, 47 insertions(+), 4 deletions(-) create mode 100644 test/oban/notifiers/pg_test.exs diff --git a/lib/oban/notifiers/pg.ex b/lib/oban/notifiers/pg.ex index 7ad746a7..0cee3edd 100644 --- a/lib/oban/notifiers/pg.ex +++ b/lib/oban/notifiers/pg.ex @@ -18,6 +18,18 @@ defmodule Oban.Notifiers.PG do ... ``` + By default, all Oban instances using the same `prefix` option will receive notifications from + each other. You can use the `namespace` option to separate instances that are in the same + cluster _without_ changing the prefix: + + ```elixir + config :my_app, Oban, + notifier: {Oban.Notifiers.PG, namespace: :custom_namespace} + ... + ``` + + The namespace may be any term. + [pg]: https://www.erlang.org/doc/man/pg.html [de]: https://elixir-lang.org/getting-started/mix-otp/distributed-tasks.html#our-first-distributed-code """ @@ -28,12 +40,15 @@ defmodule Oban.Notifiers.PG do alias Oban.Notifier - defstruct [:conf, listeners: %{}] + defstruct [:conf, :namespace, listeners: %{}] @impl Notifier def start_link(opts) do {name, opts} = Keyword.pop(opts, :name) + conf = Keyword.fetch!(opts, :conf) + opts = Keyword.put_new(opts, :namespace, conf.prefix) + GenServer.start_link(__MODULE__, struct!(__MODULE__, opts), name: name) end @@ -49,8 +64,8 @@ defmodule Oban.Notifiers.PG do @impl Notifier def notify(server, channel, payload) do - with %{conf: conf} <- get_state(server) do - pids = :pg.get_members(__MODULE__, conf.prefix) + with %{namespace: namespace} <- get_state(server) do + pids = :pg.get_members(__MODULE__, namespace) for pid <- pids, message <- payload_to_messages(channel, payload) do send(pid, message) @@ -65,7 +80,7 @@ defmodule Oban.Notifiers.PG do put_state(state) :pg.start_link(__MODULE__) - :pg.join(__MODULE__, state.conf.prefix, self()) + :pg.join(__MODULE__, state.namespace, self()) {:ok, state} end diff --git a/test/oban/notifiers/pg_test.exs b/test/oban/notifiers/pg_test.exs new file mode 100644 index 00000000..8cb8e4cb --- /dev/null +++ b/test/oban/notifiers/pg_test.exs @@ -0,0 +1,28 @@ +defmodule Oban.Notifiers.PGTest do + use Oban.Case, async: true + + alias Oban.Notifier + alias Oban.Notifiers.PG + + describe "namespacing" do + test "namespacing by configured prefix without an override" do + name_1 = start_supervised_oban!(notifier: PG, prefix: "pg_test") + name_2 = start_supervised_oban!(notifier: PG, prefix: "pg_test") + + :ok = Notifier.listen(name_1, :signal) + :ok = Notifier.notify(name_2, :signal, %{incoming: "message"}) + + assert_receive {:notification, :signal, %{"incoming" => "message"}} + end + + test "overriding the default namespace" do + name_1 = start_supervised_oban!(notifier: {PG, namespace: :pg_test}, prefix: "pg_a") + name_2 = start_supervised_oban!(notifier: {PG, namespace: :pg_test}, prefix: "pg_b") + + :ok = Notifier.listen(name_1, :signal) + :ok = Notifier.notify(name_2, :signal, %{incoming: "message"}) + + assert_receive {:notification, :signal, %{"incoming" => "message"}} + end + end +end