Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 61 additions & 0 deletions lib/sage.ex
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,16 @@ defmodule Sage do

defguardp is_transaction(value) when is_function(value, 2) or is_mfa(value)

@typedoc """
Just like a transaction, although it receives extra argument of the name of the previous transaction
"""
@type intermediate_transaction ::
(effects_so_far :: effects(), attrs :: any(), previous_stage :: stage_name() ->
{:ok | :error | :abort, any()})
| mfa()

defguardp is_intermediate_transaction(value) when is_function(value, 3) or is_mfa(value)

@typedoc """
Tracer callback, can be a module that implements `Sage.Tracer` behaviour, an anonymous function, or an
`{module, function, [arguments]}` tuple.
Expand Down Expand Up @@ -437,6 +447,57 @@ defmodule Sage do
def run_async(sage, name, transaction, compensation, opts \\ []),
do: add_stage(sage, name, build_operation!(:run_async, transaction, compensation, opts))

@doc """
For a given Sage S with transactions :t1 -> :t2 -> :t3, a call to `interleave(S, :name, f)`
will yield a saga with transactions :t1 -> :name_1 -> :t2 -> :name_2 -> :t3 -> :name_3.

This can be useful if you are trying to do a long computation and want to do something with
the intermediate results, such as logging or persistence.

Note:
- This isn't strict interleaving because a transaction is still appended at the end.
- Calling this function twice with the same name will give a `Sage.DuplicateStageError`, but
calling the function twice with a different name will compound its effects.
- Calling this function before the end of your saga definition will mean any stages you add after the
call to `interleave` will not have the intermediate stages added after them
"""
@spec interleave(
sage :: t(),
name :: stage_name(),
intermediate_transaction :: intermediate_transaction(),
compensation :: compensation()
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we also want compensations to be reported in your use case? If so we'll need to add a type for it too or reuse intermediate_transaction here.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question. My use case actually doesn't require compensations at all, I just added them for completeness/generality.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should add it for completeness. If there is some reporter on the saga progress, it should also be able to report compensation progress.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So are you saying that an interleaved compensation should be passed the previously run compensation, to mirror the intermediate_transaction?

) :: t()
def interleave(sage, name, intermediate_transaction, compensation \\ :noop)
when is_intermediate_transaction(intermediate_transaction) and is_compensation(compensation) do
sage.stages
|> Enum.reverse()
|> Enum.with_index(1)
|> Enum.flat_map(fn {stage, index} ->
name = String.to_atom("#{name}_#{index}")
{stage_name, _} = stage

transaction =
case intermediate_transaction do
{m, f, a} ->
{m, f, [stage_name | a]}

_ ->
&intermediate_transaction.(&1, &2, stage_name)
end

[stage, {name, build_operation!(:run, transaction, compensation)}]
Comment thread
apreifsteck marked this conversation as resolved.
Outdated
end)
|> Enum.reduce(new(), fn stage, sage ->
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you don't need to reconstruct the Sage completely, just update the existing one like so: %{sage | steps: steps}. Struct can be extended in the future and we don't want to re-apply everything else that is in there or lose it if we forget about it.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I did this because I didn't want to lose the duplicate stage name checking that add_step gives. But I can refactor it for sure.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I took a stab at a refactor, but I'm not sold on it. I am leaning towards thinking that the best approach is to rebuild the saga and refactor later if need be, but let me know what you think of the refactor attempt.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@apreifsteck if we use the suggestion below and namespace the interleaves then there can't be a collisition with the regular stage names, so we can do something like this:
%{sage | stage_names: MapSet.add(sage.stage_names, {:interleave, name}), steps: steps)
then just add a manual check for another interleave not colliding with the {:interleave, name} right in this function.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've implemented your suggestion to use tuples as the stage names. I'm not sure if I understand the advantage of adding a check in this function to look for duplicate stage names. That seems like a separate responsibility that interleave and run could both take advantage of.

case stage do
{name, {:run, transaction, comp, _opts}} ->
run(sage, name, transaction, comp)

{name, {:run_async, transaction, comp, opts}} ->
run_async(sage, name, transaction, comp, opts)
end
end)
end

@doc """
Executes a Sage.

Expand Down
64 changes: 64 additions & 0 deletions test/sage_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,70 @@ defmodule SageTest do
end
end

describe "interleaves/3" do
test "adds a step between every transaction" do
sage =
new()
|> run(:t1, transaction(:t1))
|> run(:t2, transaction(:t2))
|> run_async(:t_async, transaction(:t_async), :noop)
|> run(:t3, transaction(:t3))
|> interleave(:i, fn _effects, _args, previous_stage_name -> {:ok, previous_stage_name} end)

assert [i_4: _, t3: _, i_3: _, t_async: _, i_2: _, t2: _, i_1: _, t1: _] = sage.stages
assert {:ok, _, %{i_4: :t3, i_3: :t_async, i_2: :t2, i_1: :t1}} = execute(sage)
end

test "adds nothing if there are no transactions" do
assert [] =
new()
|> interleave(:i, fn _effects, _args, _previous_stage_name -> :ok end)
|> Map.get(:stages)
Comment thread
apreifsteck marked this conversation as resolved.
Outdated
end

test "adds a transaction at the end if there is one transaction" do
assert [i_1: _, t1: _] =
new()
|> run(:t1, transaction(:t1))
|> interleave(:i, fn _effects, _args, _previous_stage_name -> :ok end)
|> Map.get(:stages)
end

test "works with mfa" do
sage =
new()
|> run(:t1, transaction(:t1))
|> run(:t2, transaction(:t2))
|> run(:t3, transaction(:t3))
|> interleave(:i, {TestIntermediateTransactionHandler, :intermediate_transaction_handler, [:foo]})

assert {:ok, _, %{i_3: {:t3, :foo}, i_2: {:t2, :foo}, i_1: {:t1, :foo}}} = execute(sage)
end

test "can run a compensations" do
new()
|> run(:t1, transaction(:t1))
|> run(:t2, transaction(:t2))
|> run(:t3, transaction_with_error(:t3))
|> interleave(:i, fn _effects, _args, _previous_stage_name -> {:ok, nil} end, fn _errored_effect, _effects_so_far, _attrs ->
send(self(), :compensating)
:ok
end)
|> execute()

for _ <- 1..2, do: assert_received(:compensating)
end

test "errors if used more than once" do
assert_raise Sage.DuplicateStageError, fn ->
new()
|> run(:t1, transaction(:t1))
|> interleave(:i, fn _effects, _args, _previous_stage_name -> :ok end)
|> interleave(:i, fn _effects, _args, _previous_stage_name -> :ok end)
end
end
end

def dummy_transaction_for_mfa(_effects_so_far, _opts), do: raise("Not implemented")
def dummy_compensation_for_mfa(_effect_to_compensate, _opts), do: raise("Not implemented")
def dummy_final_cb(_status, _opts, _return), do: raise("Not implemented")
Expand Down
5 changes: 5 additions & 0 deletions test/support/test_intermediate_transaction_handler.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
defmodule TestIntermediateTransactionHandler do
def intermediate_transaction_handler(_effects, _args, previous_stage_name, something_else) do
{:ok, {previous_stage_name, something_else}}
end
end