diff --git a/lib/workflow_metal/case/case.ex b/lib/workflow_metal/case/case.ex index 86a376d..07896ef 100644 --- a/lib/workflow_metal/case/case.ex +++ b/lib/workflow_metal/case/case.ex @@ -1,6 +1,6 @@ defmodule WorkflowMetal.Case.Case do @moduledoc """ - `GenStateMachine` process to present a workflow case. + `GenServer` process to present a workflow case. ## Storage The data of `:token_table` is stored in ETS in the following format: @@ -27,9 +27,7 @@ defmodule WorkflowMetal.Case.Case do require Logger - use GenStateMachine, - callback_mode: [:handle_event_function, :state_enter], - restart: :transient + use GenServer, restart: :transient use TypedStruct @@ -46,6 +44,7 @@ defmodule WorkflowMetal.Case.Case do typedstruct do field :application, application field :case_schema, case_schema + field :state, WorkflowMetal.Storage.Schema.Case.state() field :start_place, place_schema field :end_place, place_schema field :token_table, :ets.tid() @@ -63,7 +62,7 @@ defmodule WorkflowMetal.Case.Case do name = Keyword.fetch!(options, :name) case_schema = Keyword.fetch!(options, :case_schema) - GenStateMachine.start_link(__MODULE__, {workflow_identifier, case_schema}, name: name) + GenServer.start_link(__MODULE__, {workflow_identifier, case_schema}, name: name) end @doc false @@ -106,7 +105,7 @@ defmodule WorkflowMetal.Case.Case do """ @spec terminate(:gen_statem.server_ref()) :: :ok def terminate(case_server) do - GenStateMachine.cast(case_server, :terminate) + GenServer.cast(case_server, :terminate) end @doc """ @@ -115,7 +114,7 @@ defmodule WorkflowMetal.Case.Case do @spec issue_tokens(:gen_statem.server_ref(), nonempty_list(token_schema)) :: {:ok, nonempty_list(token_schema)} def issue_tokens(case_server, [_ | _] = token_schema_list) do - GenStateMachine.call(case_server, {:issue_tokens, token_schema_list}) + GenServer.call(case_server, {:issue_tokens, token_schema_list}) end @doc """ @@ -125,7 +124,7 @@ defmodule WorkflowMetal.Case.Case do {:ok, nonempty_list(token_schema)} | {:error, :tokens_not_available} def lock_tokens(case_server, [_ | _] = token_ids, task_id) do - GenStateMachine.call(case_server, {:lock_tokens, token_ids, task_id}) + GenServer.call(case_server, {:lock_tokens, token_ids, task_id}) end @doc """ @@ -134,7 +133,7 @@ defmodule WorkflowMetal.Case.Case do @spec consume_tokens(:gen_statem.server_ref(), task_id) :: {:ok, nonempty_list(token_schema)} | {:error, :tokens_not_available} def consume_tokens(case_server, task_id) do - GenStateMachine.call(case_server, {:consume_tokens, task_id}) + GenServer.call(case_server, {:consume_tokens, task_id}) end @doc """ @@ -144,7 +143,7 @@ defmodule WorkflowMetal.Case.Case do """ @spec offer_tokens_to_task(:gen_statem.server_ref(), task_id) :: {:ok, [token_schema]} def offer_tokens_to_task(case_server, task_id) do - GenStateMachine.call(case_server, {:offer_tokens_to_task, task_id}) + GenServer.call(case_server, {:offer_tokens_to_task, task_id}) end @doc """ @@ -154,7 +153,7 @@ defmodule WorkflowMetal.Case.Case do """ @spec free_tokens_from_task(:gen_statem.server_ref(), task_id) :: :ok def free_tokens_from_task(case_server, task_id) do - GenStateMachine.call(case_server, {:free_tokens_from_task, task_id}) + GenServer.call(case_server, {:free_tokens_from_task, task_id}) end @doc """ @@ -164,157 +163,102 @@ defmodule WorkflowMetal.Case.Case do """ @spec fetch_locked_tokens_from_task(:gen_statem.server_ref(), task_id) :: {:ok, [token_schema]} def fetch_locked_tokens_from_task(case_server, task_id) do - GenStateMachine.call(case_server, {:fetch_locked_tokens_from_task, task_id}) + GenServer.call(case_server, {:fetch_locked_tokens_from_task, task_id}) end # Server (callbacks) @impl true def init({{application, _workflow_id}, case_schema}) do - %{ - state: state - } = case_schema + case case_schema.state do + state when state in [:terminated, :finished] -> + {:stop, :case_not_available} - if state in [:terminated, :finished] do - {:stop, :case_not_available} - else - { - :ok, - state, - %__MODULE__{ - application: application, - case_schema: case_schema, - token_table: :ets.new(:token_table, [:set, :private]) - } - } - end - end - - @impl GenStateMachine - # init - def handle_event(:enter, state, state, %__MODULE__{} = data) do - {:ok, data} = fetch_edge_places(data) - - case state do - :created -> + state -> { - :keep_state, - data, - {:state_timeout, 1, :after_start} - } - - :active -> - {:ok, data} = fetch_unconsumed_tokens(data) - - { - :keep_state, - data, - {:state_timeout, 1, :after_start} + :ok, + %__MODULE__{ + application: application, + case_schema: case_schema, + state: state, + token_table: :ets.new(:token_table, [:set, :private]) + }, + {:continue, :after_start} } end end - @impl GenStateMachine - def handle_event(:enter, old_state, state, %__MODULE__{} = data) do - case {old_state, state} do - {:created, :active} -> - Logger.debug(fn -> "#{describe(data)} is activated." end) - - {:keep_state, data} - - {:active, :finished} -> - Logger.debug(fn -> "#{describe(data)} is finished." end) + @impl GenServer + def handle_continue(:after_start, %{state: :created} = data) do + data = + data + |> fetch_edge_places() + |> do_activate_case() + |> update_case(%{state: :active}) - {:stop, :normal} + {:noreply, data, {:continue, :offer_tokens}} + end - {from, :terminated} -> - Logger.debug(fn -> "#{describe(data)} is terminated from #{from}." end) + @impl GenServer + def handle_continue(:after_start, %{state: :active} = data) do + data = + data + |> fetch_edge_places() + |> fetch_unconsumed_tokens() - {:stop, :normal} - end + {:noreply, data, {:continue, :offer_tokens}} end - @impl GenStateMachine - def handle_event(:state_timeout, :after_start, state, %__MODULE__{} = data) do - case state do - :created -> - {:ok, data} = do_activate_case(data) - {:ok, data} = update_case(%{state: :active}, data) + @impl GenServer + def handle_continue(:offer_tokens, %{state: :active} = data) do + data = do_offer_tokens(data) - { - :next_state, - :active, - data, - {:next_event, :internal, :offer_tokens} - } - - :active -> - { - :keep_state_and_data, - {:next_event, :internal, :offer_tokens} - } - end + {:noreply, data, {:continue, :try_finish}} end - @impl GenStateMachine - def handle_event(:internal, :offer_tokens, :active, %__MODULE__{} = data) do - {:ok, data} = do_offer_tokens(data) + @impl GenServer + def handle_continue({:offer_tokens, tokens}, %{state: :active} = data) do + data = do_offer_tokens(tokens, data) - { - :keep_state, - data, - {:next_event, :internal, :try_finish} - } + {:noreply, data, {:continue, :try_finish}} end - @impl GenStateMachine - def handle_event(:internal, {:offer_tokens, tokens}, :active, %__MODULE__{} = data) do - {:ok, data} = do_offer_tokens(tokens, data) + @impl GenServer + def handle_continue(:try_finish, %{state: :active} = data) do + case case_finishment(data) do + {:finished, data} -> + data = + data + |> do_finish_case() + |> update_case(%{state: :finished}) - { - :keep_state, - data, - {:next_event, :internal, :try_finish} - } + {:noreply, data, {:continue, :stop_server}} + + _error -> + {:noreply, data} + end end - @impl GenStateMachine - def handle_event( - :internal, + @impl GenServer + def handle_continue( {:revoke_tokens, locked_token_schemas, except_task_id}, - :active, - %__MODULE__{} = data + %__MODULE__{state: :active} = data ) do - {:ok, data} = do_revoke_tokens(locked_token_schemas, except_task_id, data) + data = do_revoke_tokens(locked_token_schemas, except_task_id, data) - {:keep_state, data} + {:noreply, data} end - @impl GenStateMachine - def handle_event(:internal, :try_finish, :active, %__MODULE__{} = data) do - with( - {:finished, data} <- case_finishment(data), - {:ok, data} <- do_finish_case(data) - ) do - {:ok, data} = update_case(%{state: :finished}, data) - - { - :next_state, - :finished, - data - } - else - _ -> - :keep_state_and_data - end + @impl GenServer + def handle_continue(:stop_server, data) do + {:stop, :normal, data} end - @impl GenStateMachine - def handle_event( - {:call, from}, + @impl GenServer + def handle_call( {:lock_tokens, token_ids, task_id}, - :active, - %__MODULE__{} = data + _from, + %{state: :active} = data ) do case do_lock_tokens(MapSet.new(token_ids), task_id, data) do {:ok, locked_token_schemas, data} -> @@ -324,29 +268,16 @@ defmodule WorkflowMetal.Case.Case do })" end) - { - :keep_state, - data, - {:reply, from, {:ok, locked_token_schemas}} - } + {:reply, {:ok, locked_token_schemas}, data} - error -> - { - :keep_state, - data, - {:reply, from, error} - } + {:error, _reason} = error -> + {:reply, error, data} end end - @impl GenStateMachine - def handle_event( - {:call, from}, - {:consume_tokens, task_id}, - :active, - %__MODULE__{} = data - ) do - {:ok, tokens, data} = do_consume_tokens(task_id, data) + @impl GenServer + def handle_call({:consume_tokens, task_id}, _from, %{state: :active} = data) do + {tokens, data} = do_consume_tokens(task_id, data) Logger.debug(fn -> "#{describe(data)}: tokens(#{tokens |> Enum.map_join(", ", & &1.id)}) have been consumed by the task(#{ @@ -354,69 +285,47 @@ defmodule WorkflowMetal.Case.Case do })" end) - { - :keep_state, - data, - [ - {:reply, from, {:ok, tokens}}, - {:next_event, :internal, {:revoke_tokens, tokens, task_id}} - ] - } + {:reply, {:ok, tokens}, data, {:continue, {:revoke_tokens, tokens, task_id}}} end - @impl GenStateMachine - def handle_event( - {:call, from}, - {:issue_tokens, token_schema_list}, - :active, - %__MODULE__{} = data - ) do - {:ok, tokens, data} = do_issue_tokens(token_schema_list, data) + @impl GenServer + def handle_call({:issue_tokens, token_schema_list}, _from, %{state: :active} = data) do + {tokens, data} = do_issue_tokens(token_schema_list, data) Logger.debug(fn -> "#{describe(data)}: tokens(#{tokens |> Enum.map_join(", ", & &1.id)}) have been issued" end) - { - :keep_state, - data, - [ - {:reply, from, {:ok, tokens}}, - {:next_event, :internal, {:offer_tokens, tokens}} - ] - } + {:reply, {:ok, tokens}, data, {:continue, {:offer_tokens, tokens}}} end - @impl GenStateMachine - def handle_event({:call, from}, {:offer_tokens_to_task, task_id}, :active, %__MODULE__{} = data) do - {:ok, tokens, data} = do_offer_tokens_to_task(task_id, data) + @impl GenServer + def handle_call({:offer_tokens_to_task, task_id}, _from, %{state: :active} = data) do + {tokens, data} = do_offer_tokens_to_task(task_id, data) - {:keep_state, data, {:reply, from, {:ok, tokens}}} + {:reply, {:ok, tokens}, data} end - @impl GenStateMachine - def handle_event( - {:call, from}, + @impl GenServer + def handle_call( {:fetch_locked_tokens_from_task, task_id}, - :active, - %__MODULE__{} = data + _from, + %{state: :active} = data ) do - {:ok, tokens} = do_fetch_locked_tokens_from_task(task_id, data) + tokens = do_fetch_locked_tokens_from_task(task_id, data) - {:keep_state_and_data, {:reply, from, {:ok, tokens}}} + {:reply, {:ok, tokens}, data} end - @impl GenStateMachine - def handle_event({:call, from}, _event_content, _state, %__MODULE__{}) do - {:keep_state_and_data, {:reply, from, {:error, :case_not_available}}} + @impl GenServer + def handle_call(_msg, _from, %{} = data) do + {:reply, {:error, :case_not_available}, data} end - @impl GenStateMachine - def handle_event( - :cast, + @impl GenServer + def handle_cast( {:free_tokens_from_task, task_id}, - :active, - %__MODULE__{} = data + %{state: :active} = data ) do token_ids = find_locked_token_ids(task_id, data) @@ -429,26 +338,24 @@ defmodule WorkflowMetal.Case.Case do end) { - :keep_state, + :noreply, data, - {:next_event, :internal, {:offer_tokens, tokens}} + {:continue, {:offer_tokens, tokens}} } end - @impl GenStateMachine - def handle_event(:cast, :terminate, state, %__MODULE__{} = data) + @impl GenServer + def handle_cast(:terminate, %__MODULE__{state: state} = data) when state in [:created, :active] do - {:ok, data} = force_abandon_tasks(data) - {:ok, data} = update_case(%{state: :terminated}, data) - - {:next_state, :terminated, data} - end + data = + data + |> force_abandon_tasks() + |> update_case(%{state: :terminated}) - @impl GenStateMachine - def format_status(_reason, [_pdict, state, data]) do - {:state, %{current_state: state, data: data}} + {:noreply, data, {:continue, :stop_server}} end + @spec fetch_unconsumed_tokens(t()) :: t() defp fetch_unconsumed_tokens(%__MODULE__{} = data) do %{ application: application, @@ -457,29 +364,27 @@ defmodule WorkflowMetal.Case.Case do } } = data - with({:ok, tokens} <- WorkflowMetal.Storage.fetch_unconsumed_tokens(application, case_id)) do - free_token_ids = - Enum.reduce(tokens, MapSet.new(), fn - %Schema.Token{state: :free} = token, acc -> - {:ok, _data} = upsert_ets_token(token, data) - MapSet.put(acc, token.id) + {:ok, tokens} = WorkflowMetal.Storage.fetch_unconsumed_tokens(application, case_id) - %Schema.Token{state: :locked} = token, acc -> - {:ok, _data} = upsert_ets_token(token, data) - acc - end) + free_token_ids = + Enum.reduce(tokens, MapSet.new(), fn + %Schema.Token{state: :free} = token, acc -> + {:ok, _data} = upsert_ets_token(token, data) + MapSet.put(acc, token.id) - { - :ok, - Map.update!( - data, - :free_token_ids, - &MapSet.union(&1, MapSet.new(free_token_ids)) - ) - } - end + %Schema.Token{state: :locked} = token, acc -> + {:ok, _data} = upsert_ets_token(token, data) + acc + end) + + Map.update!( + data, + :free_token_ids, + &MapSet.union(&1, MapSet.new(free_token_ids)) + ) end + @spec fetch_edge_places(t()) :: t() defp fetch_edge_places(%__MODULE__{} = data) do %{ application: application, @@ -491,10 +396,11 @@ defmodule WorkflowMetal.Case.Case do {:ok, {start_place, end_place}} = WorkflowMetal.Storage.fetch_edge_places(application, workflow_id) - {:ok, %{data | start_place: start_place, end_place: end_place}} + %{data | start_place: start_place, end_place: end_place} end - defp update_case(params, %__MODULE__{} = data) do + @spec update_case(t(), map()) :: t() + defp update_case(%__MODULE__{} = data, params) do %{ application: application, case_schema: case_schema @@ -507,9 +413,30 @@ defmodule WorkflowMetal.Case.Case do params ) - {:ok, %{data | case_schema: case_schema}} + new_data = %{data | case_schema: case_schema, state: case_schema.state} + + log_state_change(new_data, data.state) + + new_data end + defp log_state_change(data, old_state) do + case {old_state, data.state} do + {:created, :active} -> + Logger.debug(fn -> "#{describe(data)} is activated." end) + + {:active, :finished} -> + Logger.debug(fn -> "#{describe(data)} is finished." end) + + {from, :terminated} -> + Logger.debug(fn -> "#{describe(data)} is terminated from #{from}." end) + + _other -> + :ok + end + end + + @spec do_activate_case(t()) :: t() defp do_activate_case(%__MODULE__{} = data) do %{ start_place: %Schema.Place{ @@ -536,10 +463,7 @@ defmodule WorkflowMetal.Case.Case do {:ok, token_schema} = do_issue_token(genesis_token_schema, data) - { - :ok, - %{data | free_token_ids: MapSet.put(free_token_ids, token_schema.id)} - } + %{data | free_token_ids: MapSet.put(free_token_ids, token_schema.id)} end defp do_issue_token(token_schema, %__MODULE__{} = data) do @@ -554,6 +478,7 @@ defmodule WorkflowMetal.Case.Case do {:ok, token_schema} end + @spec do_issue_tokens([token_schema()], t()) :: {[token_schema()], t()} defp do_issue_tokens(token_schema_list, %__MODULE__{} = data) do new_tokens = Enum.map(token_schema_list, fn token_schema -> @@ -563,7 +488,6 @@ defmodule WorkflowMetal.Case.Case do end) { - :ok, new_tokens, Map.update!( data, @@ -578,6 +502,7 @@ defmodule WorkflowMetal.Case.Case do } end + @spec do_offer_tokens(t()) :: t() defp do_offer_tokens(%__MODULE__{} = data) do %{ token_table: token_table @@ -589,16 +514,17 @@ defmodule WorkflowMetal.Case.Case do do_offer_token(token_schema, data) end) - {:ok, data} + data end + @spec do_offer_tokens([token_schema()], t()) :: t() defp do_offer_tokens(tokens, %__MODULE__{} = data) do tokens |> Enum.each(fn token_schema -> do_offer_token(token_schema, data) end) - {:ok, data} + data end defp do_offer_token(%Schema.Token{} = token_schema, %__MODULE__{} = data) do @@ -654,6 +580,8 @@ defmodule WorkflowMetal.Case.Case do end end + @spec do_lock_tokens(MapSet.t(token_id()), task_id(), t()) :: + {:ok, [token_schema()], t()} | {:error, :tokens_not_available} defp do_lock_tokens(token_ids, task_id, %__MODULE__{} = data) do %{ application: application, @@ -685,6 +613,7 @@ defmodule WorkflowMetal.Case.Case do end end + @spec do_consume_tokens(task_id(), t()) :: {[token_schema()], t()} defp do_consume_tokens(task_id, %__MODULE__{} = data) do %{ application: application @@ -699,9 +628,10 @@ defmodule WorkflowMetal.Case.Case do upsert_ets_token(token, data) end) - {:ok, tokens, data} + {tokens, data} end + @spec case_finishment(t()) :: {:finished, t()} | {:active, t()} defp case_finishment(%__MODULE__{} = data) do %{ end_place: %Schema.Place{ @@ -723,6 +653,7 @@ defmodule WorkflowMetal.Case.Case do end end + @spec do_finish_case(t()) :: t() defp do_finish_case(%__MODULE__{} = data) do %{ application: application, @@ -740,15 +671,16 @@ defmodule WorkflowMetal.Case.Case do {:ok, data} = upsert_ets_token(termination_token, data) - {:ok, data} + data end + @spec do_revoke_tokens([token_schema()], task_id(), t()) :: t() defp do_revoke_tokens(locked_token_schemas, except_task_id, %__MODULE__{} = data) do Enum.each(locked_token_schemas, fn locked_token -> :ok = do_revoke_token(locked_token, except_task_id, data) end) - {:ok, data} + data end defp do_revoke_token(%Schema.Token{} = token_schema, except_task_id, %__MODULE__{} = data) do @@ -824,6 +756,7 @@ defmodule WorkflowMetal.Case.Case do ]) end + @spec do_offer_tokens_to_task(task_id(), t()) :: {[token_schema()], t()} defp do_offer_tokens_to_task(task_id, %__MODULE__{} = data) do %{ application: application, @@ -848,7 +781,7 @@ defmodule WorkflowMetal.Case.Case do |> Enum.map(& &1.place_id) |> case do [] -> - {:ok, [], data} + {[], data} [_ | _] = place_ids -> tokens = @@ -872,10 +805,11 @@ defmodule WorkflowMetal.Case.Case do } ]) - {:ok, tokens, data} + {tokens, data} end end + @spec do_fetch_locked_tokens_from_task(task_id(), t()) :: [token_schema()] defp do_fetch_locked_tokens_from_task(task_id, %__MODULE__{} = data) do %{ application: application @@ -883,12 +817,16 @@ defmodule WorkflowMetal.Case.Case do token_ids = find_locked_token_ids(task_id, data) - WorkflowMetal.Storage.fetch_tokens( - application, - token_ids - ) + {:ok, tokens} = + WorkflowMetal.Storage.fetch_tokens( + application, + token_ids + ) + + tokens end + @spec force_abandon_tasks(t()) :: t() defp force_abandon_tasks(%__MODULE__{} = data) do %{ application: application, @@ -909,7 +847,7 @@ defmodule WorkflowMetal.Case.Case do WorkflowMetal.Task.Supervisor.force_abandon_task(application, task.id) end) - {:ok, data} + data end defp upsert_ets_token(%Schema.Token{} = token_schema, %__MODULE__{} = data) do diff --git a/test/workflow_metal/case/case_test.exs b/test/workflow_metal/case/case_test.exs index 3cd97eb..75c3edf 100644 --- a/test/workflow_metal/case/case_test.exs +++ b/test/workflow_metal/case/case_test.exs @@ -127,19 +127,6 @@ defmodule WorkflowMetal.Case.CaseTest do [a_transition: a_transition, case_schema: case_schema] end - test "restore from created state", %{case_schema: case_schema} do - assert {:ok, _pid} = CaseSupervisor.open_case(DummyApplication, case_schema.id) - - until(fn -> assert_receive :a_completed end) - until(fn -> assert_receive :b_completed end) - - until(fn -> - {:ok, case_schema} = WorkflowMetal.Storage.fetch_case(DummyApplication, case_schema.id) - - assert case_schema.state === :active - end) - end - test "restore from active state", %{a_transition: a_transition, case_schema: case_schema} do {:ok, _task_schema} = WorkflowMetal.Storage.insert_task(