Skip to content

Commit

Permalink
Flatten notifier state structs into single module
Browse files Browse the repository at this point in the history
  • Loading branch information
sorentwo committed Nov 5, 2023
1 parent c84ea3e commit a7d93e9
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 34 deletions.
16 changes: 6 additions & 10 deletions lib/oban/notifiers/pg.ex
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,7 @@ defmodule Oban.Notifiers.PG do

alias Oban.Notifier

defmodule State do
@moduledoc false

defstruct [:conf, :name, listeners: %{}]
end
defstruct [:conf, listeners: %{}]

@impl Notifier
def start_link(opts) do
Expand All @@ -81,7 +77,7 @@ defmodule Oban.Notifiers.PG do

@impl Notifier
def notify(server, channel, payload) do
with %State{conf: conf} <- get_state(server) do
with %{conf: conf} <- get_state(server) do
pids = :pg.get_members(__MODULE__, conf.prefix)

for pid <- pids, message <- payload_to_messages(channel, payload) do
Expand All @@ -94,7 +90,7 @@ defmodule Oban.Notifiers.PG do

@impl GenServer
def init(opts) do
state = struct!(State, opts)
state = struct!(__MODULE__, opts)

put_state(state)

Expand All @@ -118,7 +114,7 @@ defmodule Oban.Notifiers.PG do
end

@impl GenServer
def handle_call({:listen, channels}, {pid, _}, %State{listeners: listeners} = state) do
def handle_call({:listen, channels}, {pid, _}, %{listeners: listeners} = state) do
if Map.has_key?(listeners, pid) do
{:reply, :ok, state}
else
Expand All @@ -128,7 +124,7 @@ defmodule Oban.Notifiers.PG do
end
end

def handle_call({:unlisten, channels}, {pid, _}, %State{listeners: listeners} = state) do
def handle_call({:unlisten, channels}, {pid, _}, %{listeners: listeners} = state) do
orig_channels = Map.get(listeners, pid, [])

listeners =
Expand All @@ -141,7 +137,7 @@ defmodule Oban.Notifiers.PG do
end

@impl GenServer
def handle_info({:notification, channel, payload}, %State{} = state) do
def handle_info({:notification, channel, payload}, state) do
listeners = for {pid, channels} <- state.listeners, channel in channels, do: pid

Notifier.relay(state.conf, listeners, channel, payload)
Expand Down
41 changes: 18 additions & 23 deletions lib/oban/notifiers/postgres.ex
Original file line number Diff line number Diff line change
Expand Up @@ -52,19 +52,14 @@ if Code.ensure_loaded?(Postgrex) do
alias Oban.{Config, Notifier, Repo}
alias Postgrex.SimpleConnection, as: Simple

defmodule State do
@moduledoc false

@enforce_keys [:conf]
defstruct [
:conf,
:from,
:key,
channels: %{},
connected?: false,
listeners: %{}
]
end
defstruct [
:conf,
:from,
:key,
channels: %{},
connected?: false,
listeners: %{}
]

@doc """
Start the notifier.
Expand Down Expand Up @@ -106,19 +101,19 @@ if Code.ensure_loaded?(Postgrex) do

@impl Simple
def init(opts) do
{:ok, struct!(State, opts)}
{:ok, struct!(__MODULE__, opts)}
end

@impl Simple
def notify(full_channel, payload, %State{} = state) when is_binary(full_channel) do
def notify(full_channel, payload, state) when is_binary(full_channel) do
listeners = Map.get(state.channels, full_channel, [])

Notifier.relay(state.conf, listeners, reverse_channel(full_channel), payload)
end

# This is a Notifier callback, but it has the same name and arity as SimpleConnection
def notify(server, channel, payload) when is_atom(channel) do
with %State{conf: conf} <- Simple.call(server, :get_state) do
with %{conf: conf} <- Simple.call(server, :get_state) do
full_channel = to_full_channel(channel, conf)

Repo.query(
Expand All @@ -132,7 +127,7 @@ if Code.ensure_loaded?(Postgrex) do
end

@impl Simple
def handle_connect(%State{channels: channels} = state) do
def handle_connect(%{channels: channels} = state) do
state = %{state | connected?: true}

if map_size(channels) > 0 do
Expand All @@ -150,18 +145,18 @@ if Code.ensure_loaded?(Postgrex) do
end

@impl Simple
def handle_disconnect(%State{} = state) do
def handle_disconnect(%{} = state) do
{:noreply, %{state | connected?: false}}
end

@impl Simple
def handle_call(:get_state, from, %State{} = state) do
def handle_call(:get_state, from, state) do
Simple.reply(from, state)

{:noreply, state}
end

def handle_call({:listen, pid, channels}, from, %State{} = state) do
def handle_call({:listen, pid, channels}, from, state) do
channels = Enum.map(channels, &to_full_channel(&1, state.conf))
new_channels = channels -- Map.keys(state.channels)

Expand All @@ -182,7 +177,7 @@ if Code.ensure_loaded?(Postgrex) do
end
end

def handle_call({:unlisten, pid, channels}, from, %State{} = state) do
def handle_call({:unlisten, pid, channels}, from, state) do
channels = Enum.map(channels, &to_full_channel(&1, state.conf))

state =
Expand All @@ -205,7 +200,7 @@ if Code.ensure_loaded?(Postgrex) do
end

@impl Simple
def handle_info({:DOWN, _ref, :process, pid, _reason}, %State{} = state) do
def handle_info({:DOWN, _ref, :process, pid, _reason}, state) do
case Map.pop(state.listeners, pid) do
{{_ref, channel_set}, listeners} ->
state =
Expand All @@ -225,7 +220,7 @@ if Code.ensure_loaded?(Postgrex) do
end

@impl Simple
def handle_result(_results, %State{from: from} = state) do
def handle_result(_results, %{from: from} = state) do
from && Simple.reply(from, :ok)

{:noreply, %{state | from: nil}}
Expand Down
3 changes: 2 additions & 1 deletion test/oban/notifier_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@ defmodule Oban.NotifierTest do

alias Ecto.Adapters.SQL.Sandbox
alias Oban.Notifier
alias Oban.Notifiers.{Isolated, PG, Postgres}

for notifier <- [Oban.Notifiers.Isolated, Oban.Notifiers.PG, Oban.Notifiers.Postgres] do
for notifier <- [Isolated, PG, Postgres] do
@notifier notifier

describe "with #{inspect(notifier)}" do
Expand Down

0 comments on commit a7d93e9

Please sign in to comment.