Skip to content

Commit

Permalink
Accept enumerable output instead of exact stream
Browse files Browse the repository at this point in the history
Some Stream operations, such as `concat/2`, output an enumerable
function rather than a Stream struct.
  • Loading branch information
sorentwo committed Jul 25, 2024
1 parent b6d83e6 commit 78d9ee7
Show file tree
Hide file tree
Showing 3 changed files with 5 additions and 4 deletions.
3 changes: 2 additions & 1 deletion lib/oban.ex
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,8 @@ defmodule Oban do
is_struct(cw, Stream) or
is_function(cw, 1) or
(is_map_key(cw, :changesets) and is_list(cw.changesets)) or
(is_map_key(cw, :changesets) and is_struct(cw.changesets, Stream))
(is_map_key(cw, :changesets) and is_struct(cw.changesets, Stream)) or
(is_map_key(cw, :changesets) and is_function(cw.changesets))

@doc """
Creates a facade for `Oban` functions and automates fetching configuration from the application
Expand Down
3 changes: 1 addition & 2 deletions lib/oban/engine.ex
Original file line number Diff line number Diff line change
Expand Up @@ -328,8 +328,7 @@ defmodule Oban.Engine do

defp expand(fun, changes) when is_function(fun, 1), do: expand(fun.(changes), changes)
defp expand(%{changesets: changesets}, _), do: expand(changesets, %{})
defp expand(changesets, _) when is_struct(changesets, Stream), do: changesets
defp expand(changesets, _) when is_list(changesets), do: changesets
defp expand(changesets, _), do: changesets

defp with_span(event, %Config{} = conf, base_meta, fun) do
base_meta = Map.merge(base_meta, %{conf: conf, engine: conf.engine})
Expand Down
3 changes: 2 additions & 1 deletion test/oban/engine_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -355,8 +355,9 @@ for engine <- [Oban.Engines.Basic, Oban.Engines.Lite] do

test "inserting multiple jobs from a changeset wrapped stream", %{name: name} do
changesets = Stream.map(0..1, &Worker.new(%{ref: &1}))
changesets = Stream.concat(changesets, changesets)

[_job_1, _job_2] = Oban.insert_all(name, %{changesets: changesets})
[_ | _] = Oban.insert_all(name, %{changesets: changesets})
end

test "handling empty changesets list from a wrapper", %{name: name} do
Expand Down

0 comments on commit 78d9ee7

Please sign in to comment.