Skip to content

Commit

Permalink
Log when jobs don't finish executing within :shutdown_grace_period
Browse files Browse the repository at this point in the history
If jobs take longer than `:shutdown_grace_period` (by default 15
seconds) then they are brutally killed. Log a message when this occurs.
The message looks like:

> Oban's :alpha queue was unable to cleanly shutdown in allotted time (:shutdown_grace_period was 10). Remaining job ids: (job ids: [1941])
  • Loading branch information
axelson committed Apr 28, 2024
1 parent abcffec commit 3826c61
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 8 deletions.
2 changes: 2 additions & 0 deletions lib/oban/queue/supervisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,11 @@ defmodule Oban.Queue.Supervisor do
|> Keyword.put_new(:dispatch_cooldown, conf.dispatch_cooldown)

watch_opts = [
conf_name: conf.name,
foreman: fore_name,
name: watch_name,
producer: prod_name,
queue: queue,
shutdown: conf.shutdown_grace_period
]

Expand Down
50 changes: 45 additions & 5 deletions lib/oban/queue/watchman.ex
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,28 @@ defmodule Oban.Queue.Watchman do
alias Oban.Queue.Producer
alias __MODULE__, as: State

require Logger

# Give a little extra shutdown time so we can report an unclean shutdown that
# might leave orphaned jobs
@extra_shutdown_time 250

@type option ::
{:foreman, GenServer.name()}
| {:conf_name, Oban.name()}
| {:name, module()}
| {:producer, GenServer.name()}
| {:queue, Oban.queue_name()}
| {:shutdown, timeout()}

defstruct [:foreman, :producer, :shutdown, interval: 10]
defstruct [:foreman, :producer, :queue, :conf_name, :shutdown, interval: 10]

@spec child_spec([option]) :: Supervisor.child_spec()
def child_spec(opts) do
shutdown =
case opts[:shutdown] do
0 -> :brutal_kill
value -> value
value when is_integer(value) -> value + @extra_shutdown_time
end

%{id: __MODULE__, start: {__MODULE__, :start_link, [opts]}, shutdown: shutdown}
Expand Down Expand Up @@ -53,15 +61,47 @@ defmodule Oban.Queue.Watchman do
:ok
end

defp wait_for_executing(state) do
defp wait_for_executing(state, start_time \\ :erlang.monotonic_time(:millisecond)) do
case DynamicSupervisor.count_children(state.foreman) do
%{active: 0} ->
:ok

_ ->
:ok = Process.sleep(state.interval)
if shutdown_time_exceeded?(state, :erlang.monotonic_time(:millisecond) - start_time) do
print_non_clean_exit_message(state)
:ok
else
:ok = Process.sleep(state.interval)

wait_for_executing(state, start_time)
end
end
end

defp print_non_clean_exit_message(%State{conf_name: nil}), do: :ok

wait_for_executing(state)
defp print_non_clean_exit_message(%State{} = state) do
case Oban.check_queue(state.conf_name, queue: state.queue) do
%{running: []} ->
:ok

%{running: running_job_ids} ->
jobs_message = "(job ids: [#{Enum.join(running_job_ids, ", ")}])"

Logger.warning(
"Oban's :#{state.queue} queue was unable to cleanly shutdown in allotted time " <>
"(:shutdown_grace_period was #{state.shutdown}). Remaining job ids: #{jobs_message}"
)
end
catch
:exit, reason ->
Logger.warning("watchman error reason: #{inspect(reason, pretty: true)}")
end

defp shutdown_time_exceeded?(%State{shutdown: shutdown}, elapsed_time)
when is_integer(shutdown) and elapsed_time > shutdown do
true
end

defp shutdown_time_exceeded?(_, _), do: false
end
21 changes: 18 additions & 3 deletions test/oban_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,13 @@ defmodule ObanTest do
assert_receive {:started, 1}
assert_receive {:started, 2}

:ok = stop_supervised(name)
log =
ExUnit.CaptureLog.capture_log(fn ->
:ok = stop_supervised(name)
end)

assert log =~ "Oban's :alpha queue was unable to cleanly shutdown in allotted time"
assert log =~ "job ids: [#{id_2}]"

assert_receive {:ok, 1}
refute_receive {:ok, 2}, 20
Expand All @@ -64,7 +70,9 @@ defmodule ObanTest do

assert_receive {:started, 1}

:ok = stop_supervised(name)
ExUnit.CaptureLog.capture_log(fn ->
:ok = stop_supervised(name)
end)

insert!(ref: 2, sleep: 50)

Expand All @@ -86,7 +94,10 @@ defmodule ObanTest do
assert_receive {:started, 1}
assert_receive {:started, 3}

{time, _} = :timer.tc(fn -> stop_supervised(name) end)
{{time, _}, _log} =
ExUnit.CaptureLog.with_log(fn ->
:timer.tc(fn -> stop_supervised(name) end)
end)

assert System.convert_time_unit(time, :microsecond, :millisecond) >= 10

Expand Down Expand Up @@ -115,6 +126,10 @@ defmodule ObanTest do

assert %{limit: 2, queue: "gamma", running: [_]} = Oban.check_queue(name, queue: :gamma)
assert %{paused: true, queue: "delta", running: []} = Oban.check_queue(name, queue: :delta)

ExUnit.CaptureLog.capture_log(fn ->
:ok = stop_supervised(name)
end)
end

test "checking an unknown or invalid queue" do
Expand Down

0 comments on commit 3826c61

Please sign in to comment.