Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Automatically retry transactions with backoff #1134

Merged
merged 2 commits into from
Aug 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions lib/oban/migrations/postgres/v08.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@ defmodule Oban.Migrations.Postgres.V08 do
alter table(:oban_jobs, prefix: prefix) do
add_if_not_exists(:discarded_at, :utc_datetime_usec)
add_if_not_exists(:priority, :integer)
add_if_not_exists(:tags, {:array, :string})
add_if_not_exists(:tags, {:array, :text})
end

alter table(:oban_jobs, prefix: prefix) do
modify :priority, :integer, default: 0
modify :tags, {:array, :string}, default: []
modify :tags, {:array, :text}, default: []
end

drop_if_exists index(:oban_jobs, [:queue, :state, :scheduled_at, :id], prefix: prefix)
Expand Down
50 changes: 48 additions & 2 deletions lib/oban/repo.ex
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ defmodule Oban.Repo do

@moduledoc since: "2.2.0"

alias Oban.Config
alias Oban.{Backoff, Config}

@callbacks_without_opts [
config: 0,
Expand Down Expand Up @@ -59,12 +59,13 @@ defmodule Oban.Repo do
reload!: 2,
reload: 2,
stream: 2,
transaction: 2,
update!: 2,
update: 2,
update_all: 3
]

@retry_opts delay: 100, retry: 5, expected_delay: 5, expected_retry: 10

for {fun, arity} <- @callbacks_without_opts do
args = [Macro.var(:conf, __MODULE__) | Macro.generate_arguments(arity, __MODULE__)]

Expand Down Expand Up @@ -132,6 +133,51 @@ defmodule Oban.Repo do
conf.repo.to_sql(kind, query)
end

@doc """
Wraps `c:Ecto.Repo.transaction/2` with an additional `Oban.Config` argument and automatic
retries with backoff.

## Options

Backoff helpers, in addition to the standard transaction options:

* `delay` — the time to sleep between retries, defaults to `100ms`
* `retry` — the number of retries for unexpected errors, defaults to `5`
* `expected_delay` — the time to sleep between expected errors, e.g. `serialization` or
`lock_not_available`, defaults to `5ms`
* `expected_retry` — the number of retries for expected errors, defaults to `10`
"""
@doc since: "2.18.1"
def transaction(conf, fun_or_multi, opts \\ []) do
transaction(conf, fun_or_multi, opts, 1)
end

defp transaction(conf, fun_or_multi, opts, attempt) do
__dispatch__(:transaction, [conf, fun_or_multi, opts])
rescue
error in [DBConnection.ConnectionError, Postgrex.Error] ->
opts = Keyword.merge(@retry_opts, opts)

cond do
expected_error?(error) and attempt < opts[:expected_retry] ->
jittery_sleep(opts[:expected_delay])

attempt < opts[:retry] ->
jittery_sleep(opts[:delay])

true ->
reraise error, __STACKTRACE__
end

transaction(conf, fun_or_multi, opts, attempt + 1)
end

defp expected_error?(%_{postgres: %{code: :lock_not_available}}), do: true
defp expected_error?(%_{postgres: %{code: :serialization_failure}}), do: true
defp expected_error?(_error), do: false

defp jittery_sleep(delay), do: delay |> Backoff.jitter() |> Process.sleep()

defp __dispatch__(name, [%Config{} = conf | args]) do
with_dynamic_repo(conf, name, args)
end
Expand Down