diff --git a/lib/oban/engines/basic.ex b/lib/oban/engines/basic.ex index 638e09dc..1ff70b18 100644 --- a/lib/oban/engines/basic.ex +++ b/lib/oban/engines/basic.ex @@ -17,6 +17,14 @@ defmodule Oban.Engines.Basic do alias Ecto.Changeset alias Oban.{Config, Engine, Job, Repo} + # This is a replacement for `push`, which uses `array_append` and isn't compatible with jsonb + # arrays. The `||` operator works with both arrays and jsonb. + defmacrop concat_errors(column, error) do + quote do + fragment("? || ?", unquote(column), unquote(error)) + end + end + @impl Engine def init(%Config{} = conf, opts) do if Keyword.has_key?(opts, :limit) do @@ -194,24 +202,36 @@ defmodule Oban.Engines.Basic do @impl Engine def discard_job(%Config{} = conf, %Job{} = job) do - updates = [ - set: [state: "discarded", discarded_at: utc_now()], - push: [errors: Job.format_attempt(job)] - ] + query = + Job + |> where(id: ^job.id) + |> update([j], + set: [ + state: "discarded", + discarded_at: ^utc_now(), + errors: concat_errors(j.errors, ^[Job.format_attempt(job)]) + ] + ) - Repo.update_all(conf, where(Job, id: ^job.id), updates) + Repo.update_all(conf, query, []) :ok end @impl Engine def error_job(%Config{} = conf, %Job{} = job, seconds) when is_integer(seconds) do - updates = [ - set: [state: "retryable", scheduled_at: seconds_from_now(seconds)], - push: [errors: Job.format_attempt(job)] - ] + query = + Job + |> where(id: ^job.id) + |> update([j], + set: [ + state: "retryable", + scheduled_at: ^seconds_from_now(seconds), + errors: concat_errors(j.errors, ^[Job.format_attempt(job)]) + ] + ) - Repo.update_all(conf, where(Job, id: ^job.id), updates) + Repo.update_all(conf, query, []) :ok end @@ -235,8 +255,11 @@ defmodule Oban.Engines.Basic do query = if is_map(job.unsaved_error) do update(query, [j], - set: [state: "cancelled", cancelled_at: ^utc_now()], - push: [errors: ^Job.format_attempt(job)] + set: [ + state: "cancelled", + cancelled_at: ^utc_now(), + errors: concat_errors(j.errors, ^[Job.format_attempt(job)]) + ] ) else query