Skip to content

Commit

Permalink
Introduce check_available/1 engine callback
Browse files Browse the repository at this point in the history
The `check_available/1` callback allows engines to customize the query
used to find jobs in the `available` state. The callback is optional,
and will default to the `Basic` implementation when an engine doesn't
support it.
  • Loading branch information
sorentwo committed Jun 3, 2024
1 parent b2e1782 commit 56cada9
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 17 deletions.
36 changes: 29 additions & 7 deletions lib/oban/engine.ex
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,11 @@ defmodule Oban.Engine do
"""
@callback prune_jobs(conf(), queryable(), opts()) :: {:ok, [Job.t()]}

@doc """
Check for a list of queues with available jobs.
"""
@callback check_available(conf()) :: {:ok, [String.t()]}

@doc """
Record that a job completed successfully.
"""
Expand Down Expand Up @@ -138,7 +143,13 @@ defmodule Oban.Engine do
"""
@callback retry_all_jobs(conf(), queryable()) :: {:ok, [Job.t()]}

@optional_callbacks [insert_all_jobs: 5, insert_job: 5, prune_jobs: 3, stage_jobs: 3]
@optional_callbacks [
check_available: 1,
insert_all_jobs: 5,
insert_job: 5,
prune_jobs: 3,
stage_jobs: 3
]

@doc false
def init(%Config{} = conf, [_ | _] = opts) do
Expand Down Expand Up @@ -227,7 +238,7 @@ defmodule Oban.Engine do

@doc false
def stage_jobs(%Config{} = conf, queryable, opts) do
conf = with_compatible_engine(conf, :stage_jobs)
conf = with_compatible_engine(conf, :stage_jobs, 3)

with_span(:stage_jobs, conf, %{opts: opts, queryable: queryable}, fn engine ->
{:ok, jobs} = engine.stage_jobs(conf, queryable, opts)
Expand All @@ -237,14 +248,24 @@ defmodule Oban.Engine do

@doc false
def prune_jobs(%Config{} = conf, queryable, opts) do
conf = with_compatible_engine(conf, :prune_jobs)
conf = with_compatible_engine(conf, :prune_jobs, 3)

with_span(:prune_jobs, conf, %{opts: opts, queryable: queryable}, fn engine ->
{:ok, jobs} = engine.prune_jobs(conf, queryable, opts)
{:meta, {:ok, jobs}, %{jobs: jobs}}
end)
end

@doc false
def check_available(%Config{} = conf) do
conf = with_compatible_engine(conf, :check_available, 1)

with_span(:check_available, conf, %{}, fn engine ->
{:ok, queues} = engine.check_available(conf)
{:meta, {:ok, queues}, %{queues: queues}}
end)
end

@doc false
def complete_job(%Config{} = conf, %Job{} = job) do
with_span(:complete_job, conf, %{job: job}, fn engine ->
Expand Down Expand Up @@ -326,10 +347,11 @@ defmodule Oban.Engine do
end)
end

# External engines aren't guaranteed to implement stage_jobs/3 or prune_jobs/3, but we can
# assume they were built for Postgres and safely fall back to the Basic engine.
defp with_compatible_engine(%{engine: engine} = conf, function) do
if function_exported?(engine, function, 3) do
# External engines aren't guaranteed to implement `stage_jobs/3,` `prune_jobs/3`, or
# `check_available/1`, but we can assume they were built for Postgres and safely fall back to
# the Basic engine.
defp with_compatible_engine(%{engine: engine} = conf, function, arity) do
if function_exported?(engine, function, arity) do
conf
else
%Config{conf | engine: Oban.Engines.Basic}
Expand Down
12 changes: 12 additions & 0 deletions lib/oban/engines/basic.ex
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,18 @@ defmodule Oban.Engines.Basic do
{:ok, pruned}
end

@impl Engine
def check_available(%Config{} = conf) do
query =
Job
|> where([j], j.state == "available")
|> where([j], not is_nil(j.queue))
|> select([j], j.queue)
|> distinct(true)

{:ok, Repo.all(conf, query)}
end

@impl Engine
def complete_job(%Config{} = conf, %Job{} = job) do
Repo.update_all(
Expand Down
13 changes: 3 additions & 10 deletions lib/oban/stager.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@ defmodule Oban.Stager do

use GenServer

import Ecto.Query, only: [distinct: 2, select: 3, where: 3]

alias Oban.{Engine, Job, Notifier, Peer, Plugin, Repo}
alias __MODULE__, as: State

Expand Down Expand Up @@ -87,14 +85,9 @@ defmodule Oban.Stager do
end

defp notify_queues(%{conf: conf, mode: :global}) do
query =
Job
|> where([j], j.state == "available")
|> where([j], not is_nil(j.queue))
|> select([j], %{queue: j.queue})
|> distinct(true)

payload = Repo.all(conf, query)
{:ok, queues} = Engine.check_available(conf)

payload = Enum.map(queues, &%{queue: &1})

Notifier.notify(conf, :insert, payload)
end
Expand Down
1 change: 1 addition & 0 deletions lib/oban/telemetry.ex
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ defmodule Oban.Telemetry do
* `[:oban, :engine, :init, :start | :stop | :exception]`
* `[:oban, :engine, :refresh, :start | :stop | :exception]`
* `[:oban, :engine, :put_meta, :start | :stop | :exception]`
* `[:oban, :engine, :check_available, :start | :stop | :exception]`
| event | measures | metadata |
| ------------ | -------------- | ----------------------------------------------------- |
Expand Down

0 comments on commit 56cada9

Please sign in to comment.