Skip to content

Commit

Permalink
Add Repo.stream callback wrapper
Browse files Browse the repository at this point in the history
  • Loading branch information
sorentwo committed Sep 13, 2021
1 parent 8fdc314 commit 5c64333
Showing 1 changed file with 20 additions and 19 deletions.
39 changes: 20 additions & 19 deletions lib/oban/repo.ex
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ defmodule Oban.Repo do
def all(conf, queryable, opts \\ []) do
with_dynamic_repo(
conf,
fn -> conf.repo.all(queryable, query_opts(opts, conf)) end
fn -> conf.repo.all(queryable, query_opts(conf, opts)) end
)
end

Expand All @@ -26,7 +26,7 @@ defmodule Oban.Repo do
def checkout(conf, function, opts \\ []) do
with_dynamic_repo(
conf,
fn -> conf.repo.checkout(function, query_opts(opts, conf)) end
fn -> conf.repo.checkout(function, query_opts(conf, opts)) end
)
end

Expand All @@ -45,7 +45,7 @@ defmodule Oban.Repo do
def delete(conf, struct_or_changeset, opts \\ []) do
with_dynamic_repo(
conf,
fn -> conf.repo.delete(struct_or_changeset, query_opts(opts, conf)) end
fn -> conf.repo.delete(struct_or_changeset, query_opts(conf, opts)) end
)
end

Expand All @@ -55,7 +55,7 @@ defmodule Oban.Repo do
def delete_all(conf, queryable, opts \\ []) do
with_dynamic_repo(
conf,
fn -> conf.repo.delete_all(queryable, query_opts(opts, conf)) end
fn -> conf.repo.delete_all(queryable, query_opts(conf, opts)) end
)
end

Expand All @@ -66,7 +66,7 @@ defmodule Oban.Repo do
def insert(conf, struct_or_changeset, opts \\ []) do
with_dynamic_repo(
conf,
fn -> conf.repo.insert(struct_or_changeset, query_opts(opts, conf)) end
fn -> conf.repo.insert(struct_or_changeset, query_opts(conf, opts)) end
)
end

Expand All @@ -81,7 +81,7 @@ defmodule Oban.Repo do
def insert_all(conf, schema_or_source, entries, opts \\ []) do
with_dynamic_repo(
conf,
fn -> conf.repo.insert_all(schema_or_source, entries, query_opts(opts, conf)) end
fn -> conf.repo.insert_all(schema_or_source, entries, query_opts(conf, opts)) end
)
end

Expand All @@ -91,7 +91,7 @@ defmodule Oban.Repo do
def one(conf, queryable, opts \\ []) do
with_dynamic_repo(
conf,
fn -> conf.repo.one(queryable, query_opts(opts, conf)) end
fn -> conf.repo.one(queryable, query_opts(conf, opts)) end
)
end

Expand All @@ -106,10 +106,14 @@ defmodule Oban.Repo do
}}
| {:error, Exception.t()}
def query(conf, sql, params \\ [], opts \\ []) do
with_dynamic_repo(
conf,
fn -> conf.repo.query(sql, params, query_opts(opts, conf)) end
)
with_dynamic_repo(conf, fn -> conf.repo.query(sql, params, query_opts(conf, opts)) end)
end

@doc "Wraps `c:Ecto.Repo.stream/2`"
@doc since: "2.9.0"
@spec stream(Config.t(), Queryable.t(), Keyword.t()) :: Enum.t()
def stream(conf, queryable, opts \\ []) do
with_dynamic_repo(conf, fn -> conf.repo.stream(queryable, query_opts(conf, opts)) end)
end

@doc "Wraps `c:Ecto.Repo.transaction/2`."
Expand All @@ -121,7 +125,7 @@ defmodule Oban.Repo do
def transaction(conf, fun_or_multi, opts \\ []) do
with_dynamic_repo(
conf,
fn -> conf.repo.transaction(fun_or_multi, default_opts(opts, conf)) end
fn -> conf.repo.transaction(fun_or_multi, default_opts(conf, opts)) end
)
end

Expand All @@ -144,10 +148,7 @@ defmodule Oban.Repo do
@spec update(Config.t(), Changeset.t(), Keyword.t()) ::
{:ok, Schema.t()} | {:error, Changeset.t()}
def update(conf, changeset, opts \\ []) do
with_dynamic_repo(
conf,
fn -> conf.repo.update(changeset, query_opts(opts, conf)) end
)
with_dynamic_repo(conf, fn -> conf.repo.update(changeset, query_opts(conf, opts)) end)
end

@doc "Wraps `c:Ecto.Repo.update_all/3`."
Expand All @@ -157,7 +158,7 @@ defmodule Oban.Repo do
def update_all(conf, queryable, updates, opts \\ []) do
with_dynamic_repo(
conf,
fn -> conf.repo.update_all(queryable, updates, query_opts(opts, conf)) end
fn -> conf.repo.update_all(queryable, updates, query_opts(conf, opts)) end
)
end

Expand Down Expand Up @@ -201,11 +202,11 @@ defmodule Oban.Repo do

defp in_transaction?(_, _), do: false

defp default_opts(opts, conf) do
defp default_opts(conf, opts) do
Keyword.put(opts, :log, conf.log)
end

defp query_opts(opts, conf) do
defp query_opts(conf, opts) do
opts
|> Keyword.put(:log, conf.log)
|> Keyword.put(:prefix, conf.prefix)
Expand Down

0 comments on commit 5c64333

Please sign in to comment.