Skip to content

Commit

Permalink
Prune jobs using the scheduled_at timestamp
Browse files Browse the repository at this point in the history
The previous pruning query checked a different timestamp field for each
prunable state, e.g. `cancelled` used `cancelled_at`. There aren't any
indexes for those timestamps, let alone the combination of each state
and timestamp, which led to slow pruning queries in larger databases.

In a database with a mixture of ~1.2m prunable jobs the updated query is
130x faster, reducing the query time from 177ms down to 1.3ms.
  • Loading branch information
sorentwo committed Jul 1, 2023
1 parent 5005d5a commit 2df44ea
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 20 deletions.
10 changes: 6 additions & 4 deletions lib/oban/engines/basic.ex
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ defmodule Oban.Engines.Basic do
subquery =
queryable
|> select([:id, :state])
|> where([j], j.state in ["scheduled", "retryable"])
|> where([j], j.state in ~w(scheduled retryable))
|> where([j], not is_nil(j.queue))
|> where([j], j.priority in [0, 1, 2, 3])
|> where([j], j.scheduled_at <= ^DateTime.utc_now())
Expand All @@ -153,9 +153,11 @@ defmodule Oban.Engines.Basic do

subquery =
queryable
|> or_where([j], j.state == "completed" and j.attempted_at < ^time)
|> or_where([j], j.state == "cancelled" and j.cancelled_at < ^time)
|> or_where([j], j.state == "discarded" and j.discarded_at < ^time)
|> select([:id, :queue, :state])
|> where([j], j.state in ~w(completed cancelled discarded))
|> where([j], not is_nil(j.queue))
|> where([j], j.priority in [0, 1, 2, 3])
|> where([j], j.scheduled_at < ^time)
|> limit(^limit)

query =
Expand Down
11 changes: 5 additions & 6 deletions lib/oban/engines/lite.ex
Original file line number Diff line number Diff line change
Expand Up @@ -121,10 +121,10 @@ defmodule Oban.Engines.Lite do

select_query =
queryable
|> where([j], j.state in ["scheduled", "retryable"])
|> select([j], map(j, [:id, :queue, :state, :worker]))
|> where([j], j.state in ~w(scheduled retryable))
|> where([j], j.scheduled_at <= ^DateTime.utc_now())
|> limit(^limit)
|> select([j], map(j, [:id, :queue, :state, :worker]))

staged = Repo.all(conf, select_query)

Expand All @@ -143,11 +143,10 @@ defmodule Oban.Engines.Lite do

select_query =
queryable
|> or_where([j], j.state == "completed" and j.attempted_at < ^time)
|> or_where([j], j.state == "cancelled" and j.cancelled_at < ^time)
|> or_where([j], j.state == "discarded" and j.discarded_at < ^time)
|> select([j], map(j, [:id, :queue, :state]))
|> where([j], j.state in ~w(completed cancelled discarded))
|> where([j], j.scheduled_at < ^time)
|> limit(^limit)
|> select([j], map(j, [:id, :queue, :state, :worker]))

pruned = Repo.all(conf, select_query)

Expand Down
12 changes: 2 additions & 10 deletions test/oban/engine_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -376,18 +376,10 @@ for engine <- [Oban.Engines.Basic, Oban.Engines.Lite] do
TelemetryHandler.attach_events(span_type: [:job, [:engine, :prune_jobs]])

for state <- Job.states(), seconds <- 59..61 do
state = to_string(state)
stamp = seconds_ago(seconds)

opts =
case state do
"cancelled" -> [state: state, cancelled_at: stamp]
"discarded" -> [state: state, discarded_at: stamp]
_all_others -> [state: state, attempted_at: stamp]
end
opts = [state: to_string(state), scheduled_at: seconds_ago(seconds)]

# Insert one job at a time to avoid a "Cell-wise defaults" error in SQLite.
Oban.insert(name, Worker.new(%{}, opts))
Oban.insert!(name, Worker.new(%{}, opts))
end

{:ok, jobs} =
Expand Down

0 comments on commit 2df44ea

Please sign in to comment.