Skip to content

Commit

Permalink
Use concat operator for error concatenation
Browse files Browse the repository at this point in the history
The standard `push` operation for updates is designed for arrays and
uses `array_append` internally. This replaces all use of `push` with a
fragment that uses the `||` operator instead, which works for both
arrays and jsonb.

CRDB (Cockroach) doesn't support arrays of jsonb, but they do support
simple jsonb columns. Now we can append to the errors column in either
format for CRDB compatibility.
  • Loading branch information
sorentwo committed Sep 2, 2024
1 parent fae3762 commit a9f7a8c
Showing 1 changed file with 35 additions and 12 deletions.
47 changes: 35 additions & 12 deletions lib/oban/engines/basic.ex
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,14 @@ defmodule Oban.Engines.Basic do
alias Ecto.Changeset
alias Oban.{Config, Engine, Job, Repo}

# This is a replacement for `push`, which uses `array_append` and isn't compatible with jsonb
# arrays. The `||` operator works with both arrays and jsonb.
defmacrop concat_errors(column, error) do
quote do
fragment("? || ?", unquote(column), unquote(error))
end
end

@impl Engine
def init(%Config{} = conf, opts) do
if Keyword.has_key?(opts, :limit) do
Expand Down Expand Up @@ -194,24 +202,36 @@ defmodule Oban.Engines.Basic do

@impl Engine
def discard_job(%Config{} = conf, %Job{} = job) do
updates = [
set: [state: "discarded", discarded_at: utc_now()],
push: [errors: Job.format_attempt(job)]
]
query =
Job
|> where(id: ^job.id)
|> update([j],
set: [
state: "discarded",
discarded_at: ^utc_now(),
errors: concat_errors(j.errors, ^[Job.format_attempt(job)])
]
)

Repo.update_all(conf, where(Job, id: ^job.id), updates)
Repo.update_all(conf, query, [])

:ok
end

@impl Engine
def error_job(%Config{} = conf, %Job{} = job, seconds) when is_integer(seconds) do
updates = [
set: [state: "retryable", scheduled_at: seconds_from_now(seconds)],
push: [errors: Job.format_attempt(job)]
]
query =
Job
|> where(id: ^job.id)
|> update([j],
set: [
state: "retryable",
scheduled_at: ^seconds_from_now(seconds),
errors: concat_errors(j.errors, ^[Job.format_attempt(job)])
]
)

Repo.update_all(conf, where(Job, id: ^job.id), updates)
Repo.update_all(conf, query, [])

:ok
end
Expand All @@ -235,8 +255,11 @@ defmodule Oban.Engines.Basic do
query =
if is_map(job.unsaved_error) do
update(query, [j],
set: [state: "cancelled", cancelled_at: ^utc_now()],
push: [errors: ^Job.format_attempt(job)]
set: [
state: "cancelled",
cancelled_at: ^utc_now(),
errors: concat_errors(j.errors, ^[Job.format_attempt(job)])
]
)
else
query
Expand Down

0 comments on commit a9f7a8c

Please sign in to comment.