Skip to content

Commit

Permalink
Include process memory and reductions in telemetry
Browse files Browse the repository at this point in the history
Job `stop` and `exception` telemetry now includes the reported memory
and total reductions from the job's process. The values are pulled from
`Process.info/2`, and may be 0 if the job's pid has exited, i.e. after a
timeout.
  • Loading branch information
sorentwo committed Jul 18, 2024
1 parent 7601b8e commit b6d83e6
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 20 deletions.
26 changes: 20 additions & 6 deletions lib/oban/queue/executor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ defmodule Oban.Queue.Executor do
:error,
:job,
:meta,
:pid,
:result,
:snooze,
:start_mono,
Expand All @@ -58,6 +59,7 @@ defmodule Oban.Queue.Executor do
conf: conf,
job: %{job | conf: conf},
meta: event_metadata(conf, job),
pid: self(),
safe: Keyword.get(opts, :safe, true),
start_mono: System.monotonic_time(),
start_time: System.system_time()
Expand Down Expand Up @@ -260,8 +262,6 @@ defmodule Oban.Queue.Executor do

@spec emit_event(t()) :: t()
def emit_event(%__MODULE__{state: state} = exec) when state in [:failure, :exhausted] do
measurements = %{duration: exec.duration, queue_time: exec.queue_time}

kind =
case exec.kind do
{:EXIT, _pid} -> :exit
Expand All @@ -281,23 +281,21 @@ defmodule Oban.Queue.Executor do
state: state
})

:telemetry.execute([:oban, :job, :exception], measurements, meta)
:telemetry.execute([:oban, :job, :exception], measurements(exec), meta)

exec
end

def emit_event(%__MODULE__{state: state} = exec)
when state in [:cancelled, :success, :snoozed, :discard] do
measurements = %{duration: exec.duration, queue_time: exec.queue_time}

meta =
Map.merge(exec.meta, %{
job: exec.job,
state: exec.state,
result: exec.result
})

:telemetry.execute([:oban, :job, :stop], measurements, meta)
:telemetry.execute([:oban, :job, :stop], measurements(exec), meta)

exec
end
Expand All @@ -321,6 +319,22 @@ defmodule Oban.Queue.Executor do
|> Map.merge(%{conf: conf, job: job, prefix: conf.prefix})
end

defp measurements(exec) do
%{
duration: exec.duration,
memory: info_for(exec, :memory),
queue_time: exec.queue_time,
reductions: info_for(exec, :reductions)
}
end

defp info_for(%__MODULE__{pid: pid}, item) do
case Process.info(pid, item) do
{^item, value} -> value
nil -> 0
end
end

defp log_warning(%__MODULE__{safe: true, worker: worker}, returned) do
Logger.warning(fn ->
"""
Expand Down
12 changes: 6 additions & 6 deletions lib/oban/telemetry.ex
Original file line number Diff line number Diff line change
Expand Up @@ -29,18 +29,18 @@ defmodule Oban.Telemetry do
provide the error type, the error itself, and the stacktrace. The following chart shows which
metadata you can expect for each event:
| event | measures | metadata |
| ------------ | -------------------------- | ----------------------------------------------------------------------- |
| `:start` | `:system_time` | `:conf`, `:job` |
| `:stop` | `:duration`, `:queue_time` | `:conf`, `:job`, `:state`, `:result` |
| `:exception` | `:duration`, `:queue_time` | `:conf`, `:job`, `:state`, `:kind`, `:reason`, `:result`, `:stacktrace` |
| event | measures | metadata |
| ------------ | ---------------------------------------------------- | ----------------------------------------------------------------------- |
| `:start` | `:system_time` | `:conf`, `:job` |
| `:stop` | `:duration`, `:memory`, `:queue_time`, `:reductions` | `:conf`, `:job`, `:state`, `:result` |
| `:exception` | `:duration`, `:memory`, `:queue_time`, `:reductions` | `:conf`, `:job`, `:state`, `:kind`, `:reason`, `:result`, `:stacktrace` |
#### Metadata
* `:conf` — the executing Oban instance's config
* `:job` — the executing `Oban.Job`
* `:state` — one of `:success`, `:failure`, `:cancelled`, `:discard` or `:snoozed`
* `:result` — the `perform/1` return value, always `nil` for an exception or crash
* `:state` — one of `:success`, `:failure`, `:cancelled`, `:discard` or `:snoozed`
For `:exception` events the metadata also includes details about what caused the failure. The
`:kind` value is determined by how an error occurred. Here are the possible kinds:
Expand Down
10 changes: 8 additions & 2 deletions test/oban/telemetry_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,17 @@ defmodule Oban.TelemetryTest do
%Job{id: error_id} = insert!([ref: 2, action: "ERROR"], tags: ["foo"])

assert_receive {:event, :start, started_time, start_meta}
assert_receive {:event, :stop, %{duration: stop_duration, queue_time: queue_time}, stop_meta}
assert_receive {:event, :exception, error_duration, %{kind: :error} = error_meta}
assert_receive {:event, :stop, stop_meas, stop_meta}
assert_receive {:event, :exception, error_meas, %{kind: :error} = error_meta}

assert %{duration: stop_duration, queue_time: queue_time} = stop_meas
assert %{memory: stop_memory, reductions: stop_reductions} = stop_meas
assert %{duration: error_duration, queue_time: _} = error_meas

assert started_time > 0
assert stop_duration > 0
assert stop_memory > 0
assert stop_reductions > 0
assert queue_time > 0
assert error_duration > 0

Expand Down
8 changes: 2 additions & 6 deletions test/support/telemetry_handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,8 @@ defmodule Oban.TelemetryHandler do
send(pid, {:event, :start, start_time, meta})
end

def handle([:oban, :job, :stop], measure, meta, pid) do
send(pid, {:event, :stop, measure, meta})
end

def handle([:oban, :job, event], %{duration: duration}, meta, pid) do
send(pid, {:event, event, duration, meta})
def handle([:oban, :job, event], measure, meta, pid) do
send(pid, {:event, event, measure, meta})
end

def handle([:oban, :engine | event], measure, meta, pid) do
Expand Down

0 comments on commit b6d83e6

Please sign in to comment.