Skip to content

Commit

Permalink
Add information on leadership changes to oban.peer.election event (#…
Browse files Browse the repository at this point in the history
…1148)

Include `was_leader?` in `[:oban, :peer, :election | _]` event metadata
  • Loading branch information
notslang authored Sep 16, 2024
1 parent 83d2fc1 commit b003cf2
Show file tree
Hide file tree
Showing 5 changed files with 17 additions and 10 deletions.
4 changes: 2 additions & 2 deletions lib/oban/peers/global.ex
Original file line number Diff line number Diff line change
Expand Up @@ -90,13 +90,13 @@ defmodule Oban.Peers.Global do

@impl GenServer
def handle_info(:election, %State{} = state) do
meta = %{conf: state.conf, leader: state.leader?, peer: __MODULE__}
meta = %{conf: state.conf, leader: state.leader?, peer: __MODULE__, was_leader?: nil}

locked? =
:telemetry.span([:oban, :peer, :election], meta, fn ->
locked? = :global.set_lock(key(state), nodes(), 0)

{locked?, %{meta | leader: locked?}}
{locked?, %{meta | leader: locked?, was_leader?: meta.leader}}
end)

if locked?, do: notify_lock(state.conf)
Expand Down
6 changes: 3 additions & 3 deletions lib/oban/peers/postgres.ex
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ defmodule Oban.Peers.Postgres do

@impl GenServer
def handle_info(:election, %State{} = state) do
meta = %{conf: state.conf, leader: state.leader?, peer: __MODULE__}
meta = %{conf: state.conf, leader: state.leader?, peer: __MODULE__, was_leader?: nil}

state =
:telemetry.span([:oban, :peer, :election], meta, fn ->
Expand All @@ -106,14 +106,14 @@ defmodule Oban.Peers.Postgres do

case Repo.transaction(state.conf, fun, retry: 1) do
{:ok, state} ->
{state, %{meta | leader: state.leader?}}
{state, %{meta | leader: state.leader?, was_leader?: meta.leader}}

{:error, :rollback} ->
# The peer maintains its current `leader?` status on rollback—this may cause
# inconsistency if the leader encounters an error and multiple rollbacks happen in
# sequence. That tradeoff is acceptable because the situation is unlikely and less of
# an issue than crashing the peer.
{state, meta}
{state, %{meta | was_leader?: meta.leader}}
end
end)

Expand Down
3 changes: 2 additions & 1 deletion lib/oban/telemetry.ex
Original file line number Diff line number Diff line change
Expand Up @@ -177,13 +177,14 @@ defmodule Oban.Telemetry do
| event | measures | metadata |
| ------------ | -------------- | -------------------------------------------------------------- |
| `:start` | `:system_time` | `:conf`, `:leader`, `:peer`, |
| `:stop` | `:duration` | `:conf`, `:leader`, `:peer`, |
| `:stop` | `:duration` | `:conf`, `:leader`, `:peer`, `:was_leader?` |
| `:exception` | `:duration` | `:conf`, `:leader`, `:peer`, `:kind`, `:reason`, `:stacktrace` |
#### Metadata
* `:conf`, `:kind`, `:reason`, `:stacktrace` — see the explanation in notifier metadata above
* `:leader` — whether the peer is the current leader
* `:was_leader?` — whether the peer was the leader before the election occurred
* `:peer` — the module used for peering
## Queue Shutdown Events
Expand Down
7 changes: 5 additions & 2 deletions test/oban/peers/global_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,10 @@ defmodule Oban.Peers.GlobalTest do

start_supervised_oban!(peer: Global, node: "worker.1")

assert_receive {:event, [:election, :start], _measure, %{leader: _, peer: Oban.Peers.Global}}
assert_receive {:event, [:election, :stop], _measure, %{leader: _, peer: Oban.Peers.Global}}
assert_receive {:event, [:election, :start], _measure,
%{leader: _, peer: Oban.Peers.Global, was_leader?: nil}}

assert_receive {:event, [:election, :stop], _measure,
%{leader: _, peer: Oban.Peers.Global, was_leader?: false}}
end
end
7 changes: 5 additions & 2 deletions test/oban/peers/postgres_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@ defmodule Oban.Peers.PostgresTest do
|> Enum.map(&start_supervised!({Peer, conf: conf, name: &1}))
|> Enum.filter(&Postgres.leader?/1)

assert_received {:event, [:election, :start], _measure, %{leader: _, peer: Postgres}}
assert_received {:event, [:election, :stop], _measure, %{leader: _, peer: Postgres}}
assert_received {:event, [:election, :start], _measure,
%{leader: _, peer: Postgres, was_leader?: nil}}

assert_received {:event, [:election, :stop], _measure,
%{leader: _, peer: Postgres, was_leader?: false}}
end

test "gracefully handling a missing oban_peers table" do
Expand Down

0 comments on commit b003cf2

Please sign in to comment.