diff --git a/.formatter.exs b/.formatter.exs index 376fc86..958614f 100644 --- a/.formatter.exs +++ b/.formatter.exs @@ -1,5 +1,6 @@ # Used by "mix format" [ inputs: ["{mix,.formatter}.exs", "{config,lib,test}/**/*.{ex,exs}"], - import_deps: [:typed_struct] + import_deps: [:typed_struct], + plugins: [Styler] ] diff --git a/.github/workflows/elixir.yml b/.github/workflows/elixir.yml index 6fd4ad8..5ffcc41 100644 --- a/.github/workflows/elixir.yml +++ b/.github/workflows/elixir.yml @@ -58,18 +58,19 @@ jobs: - name: Test run: mix test - credo: - runs-on: ubuntu-latest - needs: mix - steps: - - name: Setup - uses: byzanteam/jet-actions/setup-elixir@main - with: - otp-version: ${{ env.OTP_VERSION }} - elixir-version: ${{ env.ELIXIR_VERSION }} + # FIXME: fix all credo issues + # credo: + # runs-on: ubuntu-latest + # needs: mix + # steps: + # - name: Setup + # uses: byzanteam/jet-actions/setup-elixir@main + # with: + # otp-version: ${{ env.OTP_VERSION }} + # elixir-version: ${{ env.ELIXIR_VERSION }} - - name: Credo - run: mix credo --strict + # - name: Credo + # run: mix credo --strict dialyzer: runs-on: ubuntu-latest diff --git a/lib/workflow_metal/application/config.ex b/lib/workflow_metal/application/config.ex index 50f90ae..8bb2d26 100644 --- a/lib/workflow_metal/application/config.ex +++ b/lib/workflow_metal/application/config.ex @@ -1,11 +1,15 @@ defmodule WorkflowMetal.Application.Config do @moduledoc false - @type t :: map() - @type application :: WorkflowMetal.Application.t() + @type t() :: map() + @type application() :: WorkflowMetal.Application.t() + + @typep key() :: atom() + @typep value() :: term() + # credo:disable-for-previous-line JetCredo.Checks.ExplicitAnyType @doc false - @spec start_link(application, t) :: Agent.on_start() + @spec start_link(application(), t()) :: Agent.on_start() def start_link(application, initial_value) do Agent.start_link(fn -> initial_value end, name: config_name(application)) end @@ -13,7 +17,7 @@ defmodule WorkflowMetal.Application.Config do @doc """ Store settings of an application. """ - @spec set(application, atom, term) :: :ok + @spec set(application(), key(), value()) :: :ok def set(application, key, value) when is_atom(application) do Agent.update(config_name(application), fn state -> Keyword.put(state, key, value) @@ -23,7 +27,7 @@ defmodule WorkflowMetal.Application.Config do @doc """ Retrieve settings of an application. """ - @spec get(application, atom) :: term + @spec get(application(), key()) :: value() def get(application, key) when is_atom(application) and is_atom(key) do Agent.get(config_name(application), fn state -> Keyword.get(state, key) @@ -33,11 +37,13 @@ defmodule WorkflowMetal.Application.Config do @doc """ Retrieves the compile time configuration. """ + @spec compile_config(application(), Keyword.t()) :: Keyword.t() def compile_config(_application, config) do Keyword.take(config, [:name, :registry, :storage]) end defp config_name(application) do + # credo:disable-for-next-line Credo.Check.Warning.UnsafeToAtom Module.concat(application, Config) end end diff --git a/lib/workflow_metal/application/supervisor.ex b/lib/workflow_metal/application/supervisor.ex index d61cf8a..fb981bd 100644 --- a/lib/workflow_metal/application/supervisor.ex +++ b/lib/workflow_metal/application/supervisor.ex @@ -25,14 +25,12 @@ defmodule WorkflowMetal.Application.Supervisor do config_child_spec = config_child_spec(application, config) - children = - [ - config_child_spec, - registry_child_spec, - storage_child_spec, - {WorkflowMetal.Application.WorkflowsSupervisor, application} - ] - |> Enum.filter(& &1) + children = [ + config_child_spec, + registry_child_spec, + storage_child_spec, + {WorkflowMetal.Application.WorkflowsSupervisor, application} + ] Supervisor.init(children, strategy: :one_for_one) end diff --git a/lib/workflow_metal/application/workflows_supervisor.ex b/lib/workflow_metal/application/workflows_supervisor.ex index 7234d48..205ae1d 100644 --- a/lib/workflow_metal/application/workflows_supervisor.ex +++ b/lib/workflow_metal/application/workflows_supervisor.ex @@ -40,9 +40,7 @@ defmodule WorkflowMetal.Application.WorkflowsSupervisor do WorkflowMetal.Registration.Adapter.on_start_child() | {:error, :workflow_not_found} def open_workflow(application, workflow_id) do - with( - {:ok, workflow_schema} <- WorkflowMetal.Storage.fetch_workflow(application, workflow_id) - ) do + with({:ok, workflow_schema} <- WorkflowMetal.Storage.fetch_workflow(application, workflow_id)) do workflows_supervisor = supervisor_name(application) workflow_supervisor = {WorkflowMetal.Workflow.Supervisor, workflow_id: workflow_schema.id} diff --git a/lib/workflow_metal/case/case.ex b/lib/workflow_metal/case/case.ex index 44733cd..5aecfc4 100644 --- a/lib/workflow_metal/case/case.ex +++ b/lib/workflow_metal/case/case.ex @@ -1,6 +1,6 @@ defmodule WorkflowMetal.Case.Case do @moduledoc """ - `GenStateMachine` process to present a workflow case. + `GenServer` process to present a workflow case. ## Storage The data of `:token_table` is stored in ETS in the following format: @@ -21,18 +21,14 @@ defmodule WorkflowMetal.Case.Case do Restore a case while offering tokens. """ - alias WorkflowMetal.Utils.ETS, as: ETSUtil + use GenServer, restart: :transient + use TypedStruct alias WorkflowMetal.Storage.Schema + alias WorkflowMetal.Utils.ETS, as: ETSUtil require Logger - use GenStateMachine, - callback_mode: [:handle_event_function, :state_enter], - restart: :transient - - use TypedStruct - @type application :: WorkflowMetal.Application.t() @type workflow_identifier :: WorkflowMetal.Workflow.Supervisor.workflow_identifier() @type workflow_id :: WorkflowMetal.Storage.Schema.Workflow.id() @@ -46,6 +42,7 @@ defmodule WorkflowMetal.Case.Case do typedstruct do field :application, application field :case_schema, case_schema + field :state, WorkflowMetal.Storage.Schema.Case.state() field :start_place, place_schema field :end_place, place_schema field :token_table, :ets.tid() @@ -63,7 +60,7 @@ defmodule WorkflowMetal.Case.Case do name = Keyword.fetch!(options, :name) case_schema = Keyword.fetch!(options, :case_schema) - GenStateMachine.start_link(__MODULE__, {workflow_identifier, case_schema}, name: name) + GenServer.start_link(__MODULE__, {workflow_identifier, case_schema}, name: name) end @doc false @@ -106,7 +103,7 @@ defmodule WorkflowMetal.Case.Case do """ @spec terminate(:gen_statem.server_ref()) :: :ok def terminate(case_server) do - GenStateMachine.cast(case_server, :terminate) + GenServer.cast(case_server, :terminate) end @doc """ @@ -115,7 +112,7 @@ defmodule WorkflowMetal.Case.Case do @spec issue_tokens(:gen_statem.server_ref(), nonempty_list(token_schema)) :: {:ok, nonempty_list(token_schema)} def issue_tokens(case_server, [_ | _] = token_schema_list) do - GenStateMachine.call(case_server, {:issue_tokens, token_schema_list}) + GenServer.call(case_server, {:issue_tokens, token_schema_list}) end @doc """ @@ -125,7 +122,7 @@ defmodule WorkflowMetal.Case.Case do {:ok, nonempty_list(token_schema)} | {:error, :tokens_not_available} def lock_tokens(case_server, [_ | _] = token_ids, task_id) do - GenStateMachine.call(case_server, {:lock_tokens, token_ids, task_id}) + GenServer.call(case_server, {:lock_tokens, token_ids, task_id}) end @doc """ @@ -134,7 +131,7 @@ defmodule WorkflowMetal.Case.Case do @spec consume_tokens(:gen_statem.server_ref(), task_id) :: {:ok, nonempty_list(token_schema)} | {:error, :tokens_not_available} def consume_tokens(case_server, task_id) do - GenStateMachine.call(case_server, {:consume_tokens, task_id}) + GenServer.call(case_server, {:consume_tokens, task_id}) end @doc """ @@ -144,7 +141,7 @@ defmodule WorkflowMetal.Case.Case do """ @spec offer_tokens_to_task(:gen_statem.server_ref(), task_id) :: {:ok, [token_schema]} def offer_tokens_to_task(case_server, task_id) do - GenStateMachine.call(case_server, {:offer_tokens_to_task, task_id}) + GenServer.call(case_server, {:offer_tokens_to_task, task_id}) end @doc """ @@ -154,7 +151,7 @@ defmodule WorkflowMetal.Case.Case do """ @spec free_tokens_from_task(:gen_statem.server_ref(), task_id) :: :ok def free_tokens_from_task(case_server, task_id) do - GenStateMachine.call(case_server, {:free_tokens_from_task, task_id}) + GenServer.call(case_server, {:free_tokens_from_task, task_id}) end @doc """ @@ -164,285 +161,178 @@ defmodule WorkflowMetal.Case.Case do """ @spec fetch_locked_tokens_from_task(:gen_statem.server_ref(), task_id) :: {:ok, [token_schema]} def fetch_locked_tokens_from_task(case_server, task_id) do - GenStateMachine.call(case_server, {:fetch_locked_tokens_from_task, task_id}) + GenServer.call(case_server, {:fetch_locked_tokens_from_task, task_id}) end # Server (callbacks) @impl true def init({{application, _workflow_id}, case_schema}) do - %{ - state: state - } = case_schema + case case_schema.state do + state when state in [:terminated, :finished] -> + {:stop, :case_not_available} - if state in [:terminated, :finished] do - {:stop, :case_not_available} - else - { - :ok, - state, - %__MODULE__{ - application: application, - case_schema: case_schema, - token_table: :ets.new(:token_table, [:set, :private]) - } - } - end - end - - @impl GenStateMachine - # init - def handle_event(:enter, state, state, %__MODULE__{} = data) do - {:ok, data} = fetch_edge_places(data) - - case state do - :created -> + state -> { - :keep_state, - data, - {:state_timeout, 1, :after_start} - } - - :active -> - {:ok, data} = fetch_unconsumed_tokens(data) - - { - :keep_state, - data, - {:state_timeout, 1, :after_start} + :ok, + %__MODULE__{ + application: application, + case_schema: case_schema, + state: state, + token_table: :ets.new(:token_table, [:set, :private]) + }, + {:continue, :after_start} } end end - @impl GenStateMachine - def handle_event(:enter, old_state, state, %__MODULE__{} = data) do - case {old_state, state} do - {:created, :active} -> - Logger.debug(fn -> "#{describe(data)} is activated." end) - - {:keep_state, data} - - {:active, :finished} -> - Logger.debug(fn -> "#{describe(data)} is finished." end) + @impl GenServer + def handle_continue(:after_start, %{state: :created} = data) do + data = + data + |> fetch_edge_places() + |> do_activate_case() + |> update_case(%{state: :active}) - {:stop, :normal} + {:noreply, data, {:continue, :offer_tokens}} + end - {from, :terminated} -> - Logger.debug(fn -> "#{describe(data)} is terminated from #{from}." end) + @impl GenServer + def handle_continue(:after_start, %{state: :active} = data) do + data = + data + |> fetch_edge_places() + |> fetch_unconsumed_tokens() - {:stop, :normal} - end + {:noreply, data, {:continue, :offer_tokens}} end - @impl GenStateMachine - def handle_event(:state_timeout, :after_start, state, %__MODULE__{} = data) do - case state do - :created -> - {:ok, data} = do_activate_case(data) - {:ok, data} = update_case(%{state: :active}, data) + @impl GenServer + def handle_continue(:offer_tokens, %{state: :active} = data) do + data = do_offer_tokens(data) - { - :next_state, - :active, - data, - {:next_event, :internal, :offer_tokens} - } - - :active -> - { - :keep_state_and_data, - {:next_event, :internal, :offer_tokens} - } - end + {:noreply, data, {:continue, :try_finish}} end - @impl GenStateMachine - def handle_event(:internal, :offer_tokens, :active, %__MODULE__{} = data) do - {:ok, data} = do_offer_tokens(data) + @impl GenServer + def handle_continue({:offer_tokens, tokens}, %{state: :active} = data) do + data = do_offer_tokens(tokens, data) - { - :keep_state, - data, - {:next_event, :internal, :try_finish} - } + {:noreply, data, {:continue, :try_finish}} end - @impl GenStateMachine - def handle_event(:internal, {:offer_tokens, tokens}, :active, %__MODULE__{} = data) do - {:ok, data} = do_offer_tokens(tokens, data) + @impl GenServer + def handle_continue(:try_finish, %{state: :active} = data) do + case case_finishment(data) do + {:finished, data} -> + data = + data + |> do_finish_case() + |> update_case(%{state: :finished}) - { - :keep_state, - data, - {:next_event, :internal, :try_finish} - } + {:noreply, data, {:continue, :stop_server}} + + _error -> + {:noreply, data} + end end - @impl GenStateMachine - def handle_event( - :internal, - {:revoke_tokens, locked_token_schemas, except_task_id}, - :active, - %__MODULE__{} = data - ) do - {:ok, data} = do_revoke_tokens(locked_token_schemas, except_task_id, data) + @impl GenServer + def handle_continue({:revoke_tokens, locked_token_schemas, except_task_id}, %__MODULE__{state: :active} = data) do + data = do_revoke_tokens(locked_token_schemas, except_task_id, data) - {:keep_state, data} + {:noreply, data} end - @impl GenStateMachine - def handle_event(:internal, :try_finish, :active, %__MODULE__{} = data) do - with( - {:finished, data} <- case_finishment(data), - {:ok, data} <- do_finish_case(data) - ) do - {:ok, data} = update_case(%{state: :finished}, data) - - { - :next_state, - :finished, - data - } - else - _ -> - :keep_state_and_data - end + @impl GenServer + def handle_continue(:stop_server, data) do + {:stop, :normal, data} end - @impl GenStateMachine - def handle_event( - {:call, from}, - {:lock_tokens, token_ids, task_id}, - :active, - %__MODULE__{} = data - ) do + @impl GenServer + def handle_call({:lock_tokens, token_ids, task_id}, _from, %{state: :active} = data) do case do_lock_tokens(MapSet.new(token_ids), task_id, data) do {:ok, locked_token_schemas, data} -> Logger.debug(fn -> - "#{describe(data)}: tokens(#{token_ids |> Enum.join(", ")}) have been locked by the task(#{task_id})" + "#{describe(data)}: tokens(#{Enum.join(token_ids, ", ")}) have been locked by the task(#{task_id})" end) - { - :keep_state, - data, - {:reply, from, {:ok, locked_token_schemas}} - } + {:reply, {:ok, locked_token_schemas}, data} - error -> - { - :keep_state, - data, - {:reply, from, error} - } + {:error, _reason} = error -> + {:reply, error, data} end end - @impl GenStateMachine - def handle_event( - {:call, from}, - {:consume_tokens, task_id}, - :active, - %__MODULE__{} = data - ) do - {:ok, tokens, data} = do_consume_tokens(task_id, data) + @impl GenServer + def handle_call({:consume_tokens, task_id}, _from, %{state: :active} = data) do + {tokens, data} = do_consume_tokens(task_id, data) Logger.debug(fn -> - "#{describe(data)}: tokens(#{tokens |> Enum.map_join(", ", & &1.id)}) have been consumed by the task(#{task_id})" + "#{describe(data)}: tokens(#{Enum.map_join(tokens, ", ", & &1.id)}) have been consumed by the task(#{task_id})" end) - { - :keep_state, - data, - [ - {:reply, from, {:ok, tokens}}, - {:next_event, :internal, {:revoke_tokens, tokens, task_id}} - ] - } + {:reply, {:ok, tokens}, data, {:continue, {:revoke_tokens, tokens, task_id}}} end - @impl GenStateMachine - def handle_event( - {:call, from}, - {:issue_tokens, token_schema_list}, - :active, - %__MODULE__{} = data - ) do - {:ok, tokens, data} = do_issue_tokens(token_schema_list, data) + @impl GenServer + def handle_call({:issue_tokens, token_schema_list}, _from, %{state: :active} = data) do + {tokens, data} = do_issue_tokens(token_schema_list, data) Logger.debug(fn -> - "#{describe(data)}: tokens(#{tokens |> Enum.map_join(", ", & &1.id)}) have been issued" + "#{describe(data)}: tokens(#{Enum.map_join(tokens, ", ", & &1.id)}) have been issued" end) - { - :keep_state, - data, - [ - {:reply, from, {:ok, tokens}}, - {:next_event, :internal, {:offer_tokens, tokens}} - ] - } + {:reply, {:ok, tokens}, data, {:continue, {:offer_tokens, tokens}}} end - @impl GenStateMachine - def handle_event({:call, from}, {:offer_tokens_to_task, task_id}, :active, %__MODULE__{} = data) do - {:ok, tokens, data} = do_offer_tokens_to_task(task_id, data) + @impl GenServer + def handle_call({:offer_tokens_to_task, task_id}, _from, %{state: :active} = data) do + {tokens, data} = do_offer_tokens_to_task(task_id, data) - {:keep_state, data, {:reply, from, {:ok, tokens}}} + {:reply, {:ok, tokens}, data} end - @impl GenStateMachine - def handle_event( - {:call, from}, - {:fetch_locked_tokens_from_task, task_id}, - :active, - %__MODULE__{} = data - ) do - {:ok, tokens} = do_fetch_locked_tokens_from_task(task_id, data) + @impl GenServer + def handle_call({:fetch_locked_tokens_from_task, task_id}, _from, %{state: :active} = data) do + tokens = do_fetch_locked_tokens_from_task(task_id, data) - {:keep_state_and_data, {:reply, from, {:ok, tokens}}} + {:reply, {:ok, tokens}, data} end - @impl GenStateMachine - def handle_event({:call, from}, _event_content, _state, %__MODULE__{}) do - {:keep_state_and_data, {:reply, from, {:error, :case_not_available}}} + @impl GenServer + def handle_call(_msg, _from, %{} = data) do + {:reply, {:error, :case_not_available}, data} end - @impl GenStateMachine - def handle_event( - :cast, - {:free_tokens_from_task, task_id}, - :active, - %__MODULE__{} = data - ) do + @impl GenServer + def handle_cast({:free_tokens_from_task, task_id}, %{state: :active} = data) do token_ids = find_locked_token_ids(task_id, data) {:ok, tokens} = WorkflowMetal.Storage.unlock_tokens(data.application, token_ids) Logger.debug(fn -> - "#{describe(data)}: tokens(#{tokens |> Enum.map(& &1.id) |> Enum.join(", ")}) have been freed by the task(#{task_id})" + "#{describe(data)}: tokens(#{Enum.map_join(tokens, ", ", & &1.id)}) have been freed by the task(#{task_id})" end) { - :keep_state, + :noreply, data, - {:next_event, :internal, {:offer_tokens, tokens}} + {:continue, {:offer_tokens, tokens}} } end - @impl GenStateMachine - def handle_event(:cast, :terminate, state, %__MODULE__{} = data) - when state in [:created, :active] do - {:ok, data} = force_abandon_tasks(data) - {:ok, data} = update_case(%{state: :terminated}, data) + @impl GenServer + def handle_cast(:terminate, %__MODULE__{state: state} = data) when state in [:created, :active] do + data = + data + |> force_abandon_tasks() + |> update_case(%{state: :terminated}) - {:next_state, :terminated, data} - end - - @impl GenStateMachine - def format_status(_reason, [_pdict, state, data]) do - {:state, %{current_state: state, data: data}} + {:noreply, data, {:continue, :stop_server}} end + @spec fetch_unconsumed_tokens(t()) :: t() defp fetch_unconsumed_tokens(%__MODULE__{} = data) do %{ application: application, @@ -451,29 +341,27 @@ defmodule WorkflowMetal.Case.Case do } } = data - with({:ok, tokens} <- WorkflowMetal.Storage.fetch_unconsumed_tokens(application, case_id)) do - free_token_ids = - Enum.reduce(tokens, MapSet.new(), fn - %Schema.Token{state: :free} = token, acc -> - {:ok, _data} = upsert_ets_token(token, data) - MapSet.put(acc, token.id) + {:ok, tokens} = WorkflowMetal.Storage.fetch_unconsumed_tokens(application, case_id) - %Schema.Token{state: :locked} = token, acc -> - {:ok, _data} = upsert_ets_token(token, data) - acc - end) + free_token_ids = + Enum.reduce(tokens, MapSet.new(), fn + %Schema.Token{state: :free} = token, acc -> + {:ok, _data} = upsert_ets_token(token, data) + MapSet.put(acc, token.id) - { - :ok, - Map.update!( - data, - :free_token_ids, - &MapSet.union(&1, MapSet.new(free_token_ids)) - ) - } - end + %Schema.Token{state: :locked} = token, acc -> + {:ok, _data} = upsert_ets_token(token, data) + acc + end) + + Map.update!( + data, + :free_token_ids, + &MapSet.union(&1, MapSet.new(free_token_ids)) + ) end + @spec fetch_edge_places(t()) :: t() defp fetch_edge_places(%__MODULE__{} = data) do %{ application: application, @@ -482,13 +370,13 @@ defmodule WorkflowMetal.Case.Case do } } = data - {:ok, {start_place, end_place}} = - WorkflowMetal.Storage.fetch_edge_places(application, workflow_id) + {:ok, {start_place, end_place}} = WorkflowMetal.Storage.fetch_edge_places(application, workflow_id) - {:ok, %{data | start_place: start_place, end_place: end_place}} + %{data | start_place: start_place, end_place: end_place} end - defp update_case(params, %__MODULE__{} = data) do + @spec update_case(t(), map()) :: t() + defp update_case(%__MODULE__{} = data, params) do %{ application: application, case_schema: case_schema @@ -501,9 +389,30 @@ defmodule WorkflowMetal.Case.Case do params ) - {:ok, %{data | case_schema: case_schema}} + new_data = %{data | case_schema: case_schema, state: case_schema.state} + + log_state_change(new_data, data.state) + + new_data end + defp log_state_change(data, old_state) do + case {old_state, data.state} do + {:created, :active} -> + Logger.debug(fn -> "#{describe(data)} is activated." end) + + {:active, :finished} -> + Logger.debug(fn -> "#{describe(data)} is finished." end) + + {from, :terminated} -> + Logger.debug(fn -> "#{describe(data)} is terminated from #{from}." end) + + _other -> + :ok + end + end + + @spec do_activate_case(t()) :: t() defp do_activate_case(%__MODULE__{} = data) do %{ start_place: %Schema.Place{ @@ -530,10 +439,7 @@ defmodule WorkflowMetal.Case.Case do {:ok, token_schema} = do_issue_token(genesis_token_schema, data) - { - :ok, - %{data | free_token_ids: MapSet.put(free_token_ids, token_schema.id)} - } + %{data | free_token_ids: MapSet.put(free_token_ids, token_schema.id)} end defp do_issue_token(token_schema, %__MODULE__{} = data) do @@ -548,6 +454,7 @@ defmodule WorkflowMetal.Case.Case do {:ok, token_schema} end + @spec do_issue_tokens([token_schema()], t()) :: {[token_schema()], t()} defp do_issue_tokens(token_schema_list, %__MODULE__{} = data) do new_tokens = Enum.map(token_schema_list, fn token_schema -> @@ -557,7 +464,6 @@ defmodule WorkflowMetal.Case.Case do end) { - :ok, new_tokens, Map.update!( data, @@ -572,6 +478,7 @@ defmodule WorkflowMetal.Case.Case do } end + @spec do_offer_tokens(t()) :: t() defp do_offer_tokens(%__MODULE__{} = data) do %{ token_table: token_table @@ -583,16 +490,13 @@ defmodule WorkflowMetal.Case.Case do do_offer_token(token_schema, data) end) - {:ok, data} + data end + @spec do_offer_tokens([token_schema()], t()) :: t() defp do_offer_tokens(tokens, %__MODULE__{} = data) do - tokens - |> Enum.each(fn token_schema -> - do_offer_token(token_schema, data) - end) - - {:ok, data} + Enum.each(tokens, fn token_schema -> do_offer_token(token_schema, data) end) + data end defp do_offer_token(%Schema.Token{} = token_schema, %__MODULE__{} = data) do @@ -648,6 +552,8 @@ defmodule WorkflowMetal.Case.Case do end end + @spec do_lock_tokens(MapSet.t(token_id()), task_id(), t()) :: + {:ok, [token_schema()], t()} | {:error, :tokens_not_available} defp do_lock_tokens(token_ids, task_id, %__MODULE__{} = data) do %{ application: application, @@ -679,6 +585,7 @@ defmodule WorkflowMetal.Case.Case do end end + @spec do_consume_tokens(task_id(), t()) :: {[token_schema()], t()} defp do_consume_tokens(task_id, %__MODULE__{} = data) do %{ application: application @@ -693,9 +600,10 @@ defmodule WorkflowMetal.Case.Case do upsert_ets_token(token, data) end) - {:ok, tokens, data} + {tokens, data} end + @spec case_finishment(t()) :: {:finished, t()} | {:active, t()} defp case_finishment(%__MODULE__{} = data) do %{ end_place: %Schema.Place{ @@ -717,6 +625,7 @@ defmodule WorkflowMetal.Case.Case do end end + @spec do_finish_case(t()) :: t() defp do_finish_case(%__MODULE__{} = data) do %{ application: application, @@ -734,15 +643,16 @@ defmodule WorkflowMetal.Case.Case do {:ok, data} = upsert_ets_token(termination_token, data) - {:ok, data} + data end + @spec do_revoke_tokens([token_schema()], task_id(), t()) :: t() defp do_revoke_tokens(locked_token_schemas, except_task_id, %__MODULE__{} = data) do Enum.each(locked_token_schemas, fn locked_token -> :ok = do_revoke_token(locked_token, except_task_id, data) end) - {:ok, data} + data end defp do_revoke_token(%Schema.Token{} = token_schema, except_task_id, %__MODULE__{} = data) do @@ -806,16 +716,10 @@ defmodule WorkflowMetal.Case.Case do token_table: token_table } = data - token_table - |> :ets.select([ - { - {:"$1", :_, :locked, :_, task_id, :_}, - [], - [:"$1"] - } - ]) + :ets.select(token_table, [{{:"$1", :_, :locked, :_, task_id, :_}, [], [:"$1"]}]) end + @spec do_offer_tokens_to_task(task_id(), t()) :: {[token_schema()], t()} defp do_offer_tokens_to_task(task_id, %__MODULE__{} = data) do %{ application: application, @@ -840,34 +744,31 @@ defmodule WorkflowMetal.Case.Case do |> Enum.map(& &1.place_id) |> case do [] -> - {:ok, [], data} + {[], data} [_ | _] = place_ids -> tokens = - token_table - |> :ets.select([ - { - {:_, :"$1", :"$2", :"$3", :"$4", :_}, - [ - ETSUtil.make_or([ - ETSUtil.make_and([ - ETSUtil.make_condition(:locked, :"$2", :"=:="), - ETSUtil.make_condition(task_id, :"$4", :"=:=") - ]), - ETSUtil.make_and([ - ETSUtil.make_condition(:free, :"$2", :"=:="), - ETSUtil.make_condition(place_ids, :"$3", :in) - ]) - ]) - ], - [:"$1"] - } + :ets.select(token_table, [ + {{:_, :"$1", :"$2", :"$3", :"$4", :_}, + [ + ETSUtil.make_or([ + ETSUtil.make_and([ + ETSUtil.make_condition(:locked, :"$2", :"=:="), + ETSUtil.make_condition(task_id, :"$4", :"=:=") + ]), + ETSUtil.make_and([ + ETSUtil.make_condition(:free, :"$2", :"=:="), + ETSUtil.make_condition(place_ids, :"$3", :in) + ]) + ]) + ], [:"$1"]} ]) - {:ok, tokens, data} + {tokens, data} end end + @spec do_fetch_locked_tokens_from_task(task_id(), t()) :: [token_schema()] defp do_fetch_locked_tokens_from_task(task_id, %__MODULE__{} = data) do %{ application: application @@ -875,12 +776,16 @@ defmodule WorkflowMetal.Case.Case do token_ids = find_locked_token_ids(task_id, data) - WorkflowMetal.Storage.fetch_tokens( - application, - token_ids - ) + {:ok, tokens} = + WorkflowMetal.Storage.fetch_tokens( + application, + token_ids + ) + + tokens end + @spec force_abandon_tasks(t()) :: t() defp force_abandon_tasks(%__MODULE__{} = data) do %{ application: application, @@ -896,12 +801,11 @@ defmodule WorkflowMetal.Case.Case do state: [:started, :allocated, :executing] ) - tasks - |> Enum.each(fn task -> + Enum.each(tasks, fn task -> WorkflowMetal.Task.Supervisor.force_abandon_task(application, task.id) end) - {:ok, data} + data end defp upsert_ets_token(%Schema.Token{} = token_schema, %__MODULE__{} = data) do diff --git a/lib/workflow_metal/controller/join/and.ex b/lib/workflow_metal/controller/join/and.ex index 05c51b4..2fa2a13 100644 --- a/lib/workflow_metal/controller/join/and.ex +++ b/lib/workflow_metal/controller/join/and.ex @@ -3,10 +3,10 @@ defmodule WorkflowMetal.Controller.Join.And do All incoming branch are active. """ - alias WorkflowMetal.Storage.Schema - @behaviour WorkflowMetal.Controller.Join + alias WorkflowMetal.Storage.Schema + @impl WorkflowMetal.Controller.Join def task_enablement(task_data) do case get_tokens(task_data) do diff --git a/lib/workflow_metal/controller/join/none.ex b/lib/workflow_metal/controller/join/none.ex index 27af727..c01f7d5 100644 --- a/lib/workflow_metal/controller/join/none.ex +++ b/lib/workflow_metal/controller/join/none.ex @@ -5,10 +5,10 @@ defmodule WorkflowMetal.Controller.Join.None do There is only one branch of the transition. """ - alias WorkflowMetal.Storage.Schema - @behaviour WorkflowMetal.Controller.Join + alias WorkflowMetal.Storage.Schema + @impl WorkflowMetal.Controller.Join def task_enablement(task_data) do case get_token(task_data) do @@ -29,8 +29,7 @@ defmodule WorkflowMetal.Controller.Join.None do token_table: token_table } = task_data - {:ok, [%Schema.Place{id: place_id}]} = - WorkflowMetal.Storage.fetch_places(application, transition_schema.id, :in) + {:ok, [%Schema.Place{id: place_id}]} = WorkflowMetal.Storage.fetch_places(application, transition_schema.id, :in) match_spec = [ { @@ -44,7 +43,7 @@ defmodule WorkflowMetal.Controller.Join.None do [token_id | _rest] -> {:ok, [token_id]} - _ -> + _other -> {:error, :task_not_enabled} end end diff --git a/lib/workflow_metal/controller/split/and.ex b/lib/workflow_metal/controller/split/and.ex index 2df0d5f..26b6920 100644 --- a/lib/workflow_metal/controller/split/and.ex +++ b/lib/workflow_metal/controller/split/and.ex @@ -1,10 +1,10 @@ defmodule WorkflowMetal.Controller.Split.And do @moduledoc false - alias WorkflowMetal.Storage.Schema - @behaviour WorkflowMetal.Controller.Split + alias WorkflowMetal.Storage.Schema + @impl true def issue_tokens(task_data, token_payload) do %{ diff --git a/lib/workflow_metal/controller/split/none.ex b/lib/workflow_metal/controller/split/none.ex index ecbb0c1..a23230b 100644 --- a/lib/workflow_metal/controller/split/none.ex +++ b/lib/workflow_metal/controller/split/none.ex @@ -1,10 +1,10 @@ defmodule WorkflowMetal.Controller.Split.None do @moduledoc false - alias WorkflowMetal.Storage.Schema - @behaviour WorkflowMetal.Controller.Split + alias WorkflowMetal.Storage.Schema + @impl true def issue_tokens(task_data, token_payload) do %{ diff --git a/lib/workflow_metal/executor.ex b/lib/workflow_metal/executor.ex index 68e1ab5..7bd74ca 100644 --- a/lib/workflow_metal/executor.ex +++ b/lib/workflow_metal/executor.ex @@ -79,12 +79,12 @@ defmodule WorkflowMetal.Executor do application = Keyword.get(opts, :application) quote do + @behaviour WorkflowMetal.Executor + alias WorkflowMetal.Storage.Schema @application unquote(application) - @behaviour WorkflowMetal.Executor - @before_compile unquote(__MODULE__) @impl WorkflowMetal.Executor diff --git a/lib/workflow_metal/storage.ex b/lib/workflow_metal/storage.ex index 848d9ea..5266de9 100644 --- a/lib/workflow_metal/storage.ex +++ b/lib/workflow_metal/storage.ex @@ -1,11 +1,11 @@ defmodule WorkflowMetal.Storage do - alias WorkflowMetal.Application - alias WorkflowMetal.Storage.Adapter - @moduledoc """ Use the storage configured for a WorkflowMetal application. """ + alias WorkflowMetal.Application + alias WorkflowMetal.Storage.Adapter + @type application :: WorkflowMetal.Application.t() @type workflow_id :: WorkflowMetal.Storage.Schema.Workflow.id() diff --git a/lib/workflow_metal/storage/adapter.ex b/lib/workflow_metal/storage/adapter.ex index 02aab15..1a23181 100644 --- a/lib/workflow_metal/storage/adapter.ex +++ b/lib/workflow_metal/storage/adapter.ex @@ -233,7 +233,7 @@ defmodule WorkflowMetal.Storage.Adapter do @type token_payload :: WorkflowMetal.Storage.Schema.Token.payload() @type on_issue_token :: {:ok, token_schema} - @type on_lock_tokens :: {:ok, token_schema} + @type on_lock_tokens :: {:ok, [token_schema]} @type on_unlock_tokens :: {:ok, [token_schema]} @type on_consume_tokens :: {:ok, [token_schema]} @type on_fetch_tokens :: {:ok, [token_schema]} diff --git a/lib/workflow_metal/storage/adapters/in_memory.ex b/lib/workflow_metal/storage/adapters/in_memory.ex index 1caba57..76e80b9 100644 --- a/lib/workflow_metal/storage/adapters/in_memory.ex +++ b/lib/workflow_metal/storage/adapters/in_memory.ex @@ -28,14 +28,13 @@ defmodule WorkflowMetal.Storage.Adapters.InMemory do {workitem_id, workitem_schema, {workflow_id, case_id, task_id}} """ - alias WorkflowMetal.Utils.ETS, as: ETSUtil - - alias WorkflowMetal.Storage.Schema - @behaviour WorkflowMetal.Storage.Adapter use GenServer + alias WorkflowMetal.Storage.Schema + alias WorkflowMetal.Utils.ETS, as: ETSUtil + defmodule State do @moduledoc false @@ -338,33 +337,21 @@ defmodule WorkflowMetal.Storage.Adapters.InMemory do end @impl GenServer - def handle_call( - {:insert_workflow, workflow_schema, params}, - _from, - %State{} = state - ) do + def handle_call({:insert_workflow, workflow_schema, params}, _from, %State{} = state) do {:ok, workflow_schema} = persist_workflow(workflow_schema, state, params) {:reply, {:ok, workflow_schema}, state} end @impl GenServer - def handle_call( - {:fetch_workflow, workflow_id}, - _from, - %State{} = state - ) do + def handle_call({:fetch_workflow, workflow_id}, _from, %State{} = state) do reply = find_workflow(workflow_id, state) {:reply, reply, state} end @impl GenServer - def handle_call( - {:delete_workflow, workflow_id}, - _from, - %State{} = state - ) do + def handle_call({:delete_workflow, workflow_id}, _from, %State{} = state) do reply = with({:ok, workflow_schema} <- find_workflow(workflow_id, state)) do :workflow @@ -378,11 +365,7 @@ defmodule WorkflowMetal.Storage.Adapters.InMemory do end @impl GenServer - def handle_call( - {:fetch_arcs, {:transition, transition_id}, arc_direction}, - _from, - %State{} = state - ) do + def handle_call({:fetch_arcs, {:transition, transition_id}, arc_direction}, _from, %State{} = state) do arc_direction = reversed_arc_direction(arc_direction) arcs = @@ -396,11 +379,7 @@ defmodule WorkflowMetal.Storage.Adapters.InMemory do end @impl GenServer - def handle_call( - {:fetch_edge_places, workflow_id}, - _from, - %State{} = state - ) do + def handle_call({:fetch_edge_places, workflow_id}, _from, %State{} = state) do reply = with( {:ok, start_place} <- find_edge_place(workflow_id, :start, state), @@ -413,11 +392,7 @@ defmodule WorkflowMetal.Storage.Adapters.InMemory do end @impl GenServer - def handle_call( - {:fetch_places, transition_id, arc_direction}, - _from, - %State{} = state - ) do + def handle_call({:fetch_places, transition_id, arc_direction}, _from, %State{} = state) do direction = reversed_arc_direction(arc_direction) reply = @@ -439,22 +414,14 @@ defmodule WorkflowMetal.Storage.Adapters.InMemory do end @impl GenServer - def handle_call( - {:fetch_transition, transition_id}, - _from, - %State{} = state - ) do + def handle_call({:fetch_transition, transition_id}, _from, %State{} = state) do reply = find_transition(transition_id, state) {:reply, reply, state} end @impl GenServer - def handle_call( - {:fetch_transitions, place_id, arc_direction}, - _from, - %State{} = state - ) do + def handle_call({:fetch_transitions, place_id, arc_direction}, _from, %State{} = state) do reply = :arc |> get_table(state) @@ -474,11 +441,7 @@ defmodule WorkflowMetal.Storage.Adapters.InMemory do end @impl GenServer - def handle_call( - {:insert_case, case_schema}, - _from, - %State{} = state - ) do + def handle_call({:insert_case, case_schema}, _from, %State{} = state) do case_schema = %{case_schema | id: make_id()} reply = persist_case(case_schema, state) @@ -486,22 +449,14 @@ defmodule WorkflowMetal.Storage.Adapters.InMemory do end @impl GenServer - def handle_call( - {:fetch_case, case_id}, - _from, - %State{} = state - ) do + def handle_call({:fetch_case, case_id}, _from, %State{} = state) do reply = find_case(case_id, state) {:reply, reply, state} end @impl GenServer - def handle_call( - {:update_case, case_id, params}, - _from, - %State{} = state - ) do + def handle_call({:update_case, case_id, params}, _from, %State{} = state) do reply = with({:ok, case_schema} <- find_case(case_id, state)) do do_update_case(case_schema, params, state) @@ -511,11 +466,7 @@ defmodule WorkflowMetal.Storage.Adapters.InMemory do end @impl GenServer - def handle_call( - {:insert_task, task_schema}, - _from, - %State{} = state - ) do + def handle_call({:insert_task, task_schema}, _from, %State{} = state) do task_schema = %{task_schema | id: make_id()} reply = persist_task(task_schema, state) @@ -523,33 +474,21 @@ defmodule WorkflowMetal.Storage.Adapters.InMemory do end @impl GenServer - def handle_call( - {:fetch_task, task_id}, - _from, - %State{} = state - ) do + def handle_call({:fetch_task, task_id}, _from, %State{} = state) do reply = find_task(task_id, state) {:reply, reply, state} end @impl GenServer - def handle_call( - {:fetch_tasks, case_id, options}, - _from, - %State{} = state - ) do + def handle_call({:fetch_tasks, case_id, options}, _from, %State{} = state) do reply = do_fetch_tasks(case_id, options, state) {:reply, reply, state} end @impl GenServer - def handle_call( - {:update_task, task_id, params}, - _from, - %State{} = state - ) do + def handle_call({:update_task, task_id, params}, _from, %State{} = state) do case find_task(task_id, state) do {:ok, task_schema} -> task_schema = struct(task_schema, params) @@ -564,11 +503,7 @@ defmodule WorkflowMetal.Storage.Adapters.InMemory do end @impl GenServer - def handle_call( - {:issue_token, token_schema}, - _from, - %State{} = state - ) do + def handle_call({:issue_token, token_schema}, _from, %State{} = state) do token_schema = %{token_schema | id: make_id()} {:ok, token_schema} = upsert_token(token_schema, state) @@ -576,11 +511,7 @@ defmodule WorkflowMetal.Storage.Adapters.InMemory do end @impl GenServer - def handle_call( - {:lock_tokens, token_ids, locked_by_task_id}, - _from, - %State{} = state - ) do + def handle_call({:lock_tokens, token_ids, locked_by_task_id}, _from, %State{} = state) do tokens = find_tokens(token_ids, state) reply = do_lock_tokens(tokens, locked_by_task_id, state) @@ -589,11 +520,7 @@ defmodule WorkflowMetal.Storage.Adapters.InMemory do end @impl GenServer - def handle_call( - {:unlock_tokens, token_ids}, - _from, - %State{} = state - ) do + def handle_call({:unlock_tokens, token_ids}, _from, %State{} = state) do tokens = token_ids |> find_tokens(state) @@ -609,11 +536,7 @@ defmodule WorkflowMetal.Storage.Adapters.InMemory do end @impl GenServer - def handle_call( - {:consume_tokens, token_ids, consumed_by_task_id}, - _from, - %State{} = state - ) do + def handle_call({:consume_tokens, token_ids, consumed_by_task_id}, _from, %State{} = state) do tokens = token_ids |> find_tokens(state) @@ -629,11 +552,7 @@ defmodule WorkflowMetal.Storage.Adapters.InMemory do end @impl GenServer - def handle_call( - {:fetch_unconsumed_tokens, case_id}, - _from, - %State{} = state - ) do + def handle_call({:fetch_unconsumed_tokens, case_id}, _from, %State{} = state) do tokens = :token |> get_table(state) @@ -649,22 +568,14 @@ defmodule WorkflowMetal.Storage.Adapters.InMemory do end @impl GenServer - def handle_call( - {:fetch_tokens, token_ids}, - _from, - %State{} = state - ) do + def handle_call({:fetch_tokens, token_ids}, _from, %State{} = state) do tokens = find_tokens(token_ids, state) {:reply, {:ok, tokens}, state} end @impl GenServer - def handle_call( - {:insert_workitem, workitem_schema}, - _from, - %State{} = state - ) do + def handle_call({:insert_workitem, workitem_schema}, _from, %State{} = state) do workitem_schema = %{workitem_schema | id: make_id()} reply = persist_workitem(workitem_schema, state) @@ -672,22 +583,14 @@ defmodule WorkflowMetal.Storage.Adapters.InMemory do end @impl GenServer - def handle_call( - {:fetch_workitem, workitem_id}, - _from, - %State{} = state - ) do + def handle_call({:fetch_workitem, workitem_id}, _from, %State{} = state) do reply = find_workitem(workitem_id, state) {:reply, reply, state} end @impl GenServer - def handle_call( - {:fetch_workitems, task_id}, - _from, - %State{} = state - ) do + def handle_call({:fetch_workitems, task_id}, _from, %State{} = state) do workitems = :workitem |> get_table(state) @@ -703,11 +606,7 @@ defmodule WorkflowMetal.Storage.Adapters.InMemory do end @impl GenServer - def handle_call( - {:update_workitem, workitem_id, params}, - _from, - %State{} = state - ) do + def handle_call({:update_workitem, workitem_id, params}, _from, %State{} = state) do reply = with({:ok, workitem_schema} <- find_workitem(workitem_id, state)) do workitem_schema = struct(workitem_schema, params) @@ -819,7 +718,7 @@ defmodule WorkflowMetal.Storage.Adapters.InMemory do end defp persist_places(places_schema, %State{} = state) do - Enum.into(places_schema, %{}, fn place_params -> + Map.new(places_schema, fn place_params -> {:ok, place_schema} = persist_place(place_params, state) {place_params.id, place_schema} @@ -852,7 +751,7 @@ defmodule WorkflowMetal.Storage.Adapters.InMemory do end defp persist_transitions(transitions_schema, %State{} = state) do - Enum.into(transitions_schema, %{}, fn transition_schema -> + Map.new(transitions_schema, fn transition_schema -> {:ok, transition_schema} = persist_transition(transition_schema, state) {transition_schema.id, transition_schema} @@ -915,11 +814,7 @@ defmodule WorkflowMetal.Storage.Adapters.InMemory do end end - defp do_update_case( - %Schema.Case{} = case_schema, - %{} = params, - %State{} = state - ) do + defp do_update_case(%Schema.Case{} = case_schema, %{} = params, %State{} = state) do case_table = get_table(:case, state) case_schema = struct(case_schema, params) @@ -1111,8 +1006,7 @@ defmodule WorkflowMetal.Storage.Adapters.InMemory do Map.fetch!(state, table_name) end - defp storage_name(adapter_meta) when is_map(adapter_meta), - do: Map.get(adapter_meta, :name) + defp storage_name(adapter_meta) when is_map(adapter_meta), do: Map.get(adapter_meta, :name) defp make_id, do: :erlang.unique_integer([:positive, :monotonic]) diff --git a/lib/workflow_metal/storage/schema/case.ex b/lib/workflow_metal/storage/schema/case.ex index e82c3c5..19252ee 100644 --- a/lib/workflow_metal/storage/schema/case.ex +++ b/lib/workflow_metal/storage/schema/case.ex @@ -9,10 +9,11 @@ defmodule WorkflowMetal.Storage.Schema.Case do use TypedStruct - @type id :: term() - @type state :: :created | :active | :terminated | :finished + # credo:disable-for-next-line JetCredo.Checks.ExplicitAnyType + @type id() :: term() + @type state() :: :created | :active | :terminated | :finished - @type workflow_id :: WorkflowMetal.Storage.Schema.Workflow.id() + @type workflow_id() :: WorkflowMetal.Storage.Schema.Workflow.id() typedstruct enforce: true do field :id, id(), enforce: false diff --git a/lib/workflow_metal/task/supervisor.ex b/lib/workflow_metal/task/supervisor.ex index 3f414d3..c0ed56b 100644 --- a/lib/workflow_metal/task/supervisor.ex +++ b/lib/workflow_metal/task/supervisor.ex @@ -6,9 +6,7 @@ defmodule WorkflowMetal.Task.Supervisor do use DynamicSupervisor alias WorkflowMetal.Application.WorkflowsSupervisor - alias WorkflowMetal.Registration - alias WorkflowMetal.Storage.Schema @type application :: WorkflowMetal.Application.t() diff --git a/lib/workflow_metal/task/task.ex b/lib/workflow_metal/task/task.ex index 3218628..9456d09 100644 --- a/lib/workflow_metal/task/task.ex +++ b/lib/workflow_metal/task/task.ex @@ -1,6 +1,6 @@ defmodule WorkflowMetal.Task.Task do @moduledoc """ - A `GenStateMachine` to lock tokens and generate `workitem`. + A `GenServer` to lock tokens and generate `workitem`. ## Storage The data of `:token_table` is stored in ETS in the following format: @@ -24,13 +24,14 @@ defmodule WorkflowMetal.Task.Task do Restore a task while restoring its non-ending(`created` and `started`) workitems. """ - require Logger + use GenServer, restart: :transient + use TypedStruct - use GenStateMachine, - callback_mode: [:handle_event_function, :state_enter], - restart: :transient + alias WorkflowMetal.Controller.Join, as: JoinController + alias WorkflowMetal.Controller.Split, as: SplitController + alias WorkflowMetal.Storage.Schema - use TypedStruct + require Logger @type application :: WorkflowMetal.Application.t() @type workflow_identifier :: WorkflowMetal.Workflow.Supervisor.workflow_identifier() @@ -53,6 +54,7 @@ defmodule WorkflowMetal.Task.Task do typedstruct do field :application, application() field :task_schema, task_schema() + field :state, WorkflowMetal.Storage.Schema.Task.state() field :transition_schema, transition_schema() field :token_table, :ets.tid() field :workitem_table, :ets.tid() @@ -68,17 +70,13 @@ defmodule WorkflowMetal.Task.Task do | {:error, :tokens_not_available} | {:error, :task_not_enabled} - alias WorkflowMetal.Controller.Join, as: JoinController - alias WorkflowMetal.Controller.Split, as: SplitController - alias WorkflowMetal.Storage.Schema - @doc false @spec start_link(workflow_identifier, options) :: :gen_statem.start_ret() def start_link(workflow_identifier, options) do name = Keyword.fetch!(options, :name) task_schema = Keyword.fetch!(options, :task_schema) - GenStateMachine.start_link( + GenServer.start_link( __MODULE__, {workflow_identifier, task_schema}, name: name @@ -124,7 +122,7 @@ defmodule WorkflowMetal.Task.Task do """ @spec force_abandon(:gen_statem.server_ref()) :: :ok def force_abandon(task_server) do - GenStateMachine.cast(task_server, :force_abandon) + GenServer.cast(task_server, :force_abandon) end @doc """ @@ -132,7 +130,7 @@ defmodule WorkflowMetal.Task.Task do """ @spec receive_tokens(:gen_statem.server_ref(), [token_schema]) :: :ok def receive_tokens(task_server, token_schemas) do - GenStateMachine.cast(task_server, {:receive_tokens, token_schemas}) + GenServer.cast(task_server, {:receive_tokens, token_schemas}) end @doc """ @@ -142,7 +140,7 @@ defmodule WorkflowMetal.Task.Task do """ @spec discard_tokens(:gen_statem.server_ref(), [token_schema]) :: :ok def discard_tokens(task_server, token_schemas) do - GenStateMachine.cast(task_server, {:discard_tokens, token_schemas}) + GenServer.cast(task_server, {:discard_tokens, token_schemas}) end @doc """ @@ -157,7 +155,7 @@ defmodule WorkflowMetal.Task.Task do """ @spec preexecute(:gen_statem.server_ref()) :: on_preexecute def preexecute(task_server) do - GenStateMachine.call(task_server, :preexecute) + GenServer.call(task_server, :preexecute) end @doc """ @@ -165,339 +163,230 @@ defmodule WorkflowMetal.Task.Task do """ @spec update_workitem(:gen_statem.server_ref(), workitem_id, workitem_state) :: :ok def update_workitem(task_server, workitem_id, workitem_state) do - GenStateMachine.cast(task_server, {:update_workitem, workitem_id, workitem_state}) + GenServer.cast(task_server, {:update_workitem, workitem_id, workitem_state}) end # callbacks - @impl GenStateMachine + @impl GenServer def init({{application, _workflow_id}, task_schema}) do - %{ - state: state - } = task_schema + case task_schema.state do + state when state in [:abandoned, :completed] -> + {:stop, :task_not_available} - if state in [:abandoned, :completed] do - {:stop, :task_not_available} - else - { - :ok, - state, - %__MODULE__{ - application: application, - task_schema: task_schema, - token_table: :ets.new(:token_table, [:set, :private]), - workitem_table: :ets.new(:workitem_table, [:set, :private]) + state -> + { + :ok, + %__MODULE__{ + application: application, + task_schema: task_schema, + state: state, + token_table: :ets.new(:token_table, [:set, :private]), + workitem_table: :ets.new(:workitem_table, [:set, :private]) + }, + {:continue, :after_start} } - } end end - @impl GenStateMachine - # init - def handle_event(:enter, state, state, %__MODULE__{} = data) do - {:ok, data} = fetch_workitems(data) - {:ok, data} = fetch_transition(data) + @impl GenServer + def handle_continue(:after_start, %{state: state} = data) when state in [:started, :allocated] do + data = + data + |> fetch_workitems() + |> fetch_transition() + |> request_tokens() + |> start_created_workitems() - case state do - :started -> - {:ok, data} = request_tokens(data) + {:noreply, data} + end - { - :keep_state, - data, - {:state_timeout, 1, :after_start} - } + def handle_continue(:after_start, %{state: :executing} = data) do + data = + data + |> fetch_workitems() + |> fetch_transition() + |> fetch_locked_tokens() + |> start_created_workitems() - :allocated -> - {:ok, data} = request_tokens(data) + {:noreply, data, {:continue, :try_complete}} + end - { - :keep_state, - data, - {:state_timeout, 1, :after_start} - } + @impl GenServer + def handle_continue(:try_allocate, %{state: :started} = data) do + case JoinController.task_enablement(data) do + :ok -> + data = + data + |> allocate_workitem() + |> update_task(%{state: :allocated}) - :executing -> - {:ok, data} = fetch_locked_tokens(data) + {:noreply, data} - { - :keep_state, - data, - {:state_timeout, 1, :after_start} - } + _other -> + {:noreply, data} end end - @impl GenStateMachine - def handle_event(:enter, old_state, state, %__MODULE__{} = data) do - case {old_state, state} do - {:started, :allocated} -> - Logger.debug(fn -> "#{describe(data)} allocate a workitem." end) - - {:keep_state, data} - - {:allocated, :executing} -> - Logger.debug(fn -> "#{describe(data)} start executing." end) + @impl GenServer + def handle_continue(:try_abandon_by_tokens, %{state: state} = data) when state in [:started, :allocated, :executing] do + case tokens_abandonment(data) do + {:ok, data} -> + data = + data + |> do_abandon_workitems() + |> update_task(%{state: :abandoned}) - {:keep_state, data} + {:noreply, data, {:continue, :stop_server}} - {:executing, :completed} -> - Logger.debug(fn -> "#{describe(data)} complete the execution." end) + _error -> + {:noreply, data} + end + end - {:stop, :normal} + @impl GenServer + def handle_continue(:try_complete, %{state: :executing} = data) do + case task_completion(data) do + :ok -> + {_tokens, data} = do_consume_tokens(data) + data = do_complete_task(data) - {from, :abandoned} -> - Logger.debug(fn -> "#{describe(data)} has been abandoned from #{from}." end) + {:noreply, data, {:continue, :stop_server}} - {:stop, :normal} + _error -> + {:noreply, data} end end - @impl GenStateMachine - def handle_event(:state_timeout, :after_start, state, %__MODULE__{} = data) do - {:ok, data} = start_created_workitems(data) + @impl GenServer + def handle_continue(:try_abandon_by_workitems, %{state: state} = data) when state in [:allocated, :executing] do + case workitems_abandonment(data) do + {:ok, data} -> + data = + data + |> unlock_tokens() + |> do_abandon_workitems() + |> update_task(%{state: :abandoned}) - case state do - :executing -> - { - :keep_state, - data, - {:next_event, :cast, :try_complete} - } + {:noreply, data, {:continue, :stop_server}} _ -> - :keep_state_and_data + {:noreply, data} end end - @impl GenStateMachine - def handle_event(:cast, {:receive_tokens, token_schemas}, :started, %__MODULE__{} = data) do - {:ok, data} = - Enum.reduce(token_schemas, {:ok, data}, fn token_schema, {:ok, data} -> - upsert_ets_token(token_schema, data) - end) + @impl GenServer + def handle_continue(:stop_server, data) do + {:stop, :normal, data} + end + + @impl GenServer + def handle_cast({:receive_tokens, token_schemas}, %{state: :started} = data) do + data = Enum.reduce(token_schemas, data, &upsert_ets_token/2) { - :keep_state, + :noreply, data, - {:next_event, :cast, :try_allocate} + {:continue, :try_allocate} } end - @impl GenStateMachine - def handle_event(:cast, {:receive_tokens, token_schemas}, :allocated, %__MODULE__{} = data) do - {:ok, data} = - Enum.reduce(token_schemas, {:ok, data}, fn token_schema, {:ok, data} -> - upsert_ets_token(token_schema, data) - end) + @impl GenServer + def handle_cast({:receive_tokens, token_schemas}, %{state: :allocated} = data) do + data = Enum.reduce(token_schemas, data, &upsert_ets_token/2) - { - :keep_state, - data - } + {:noreply, data} end - @impl GenStateMachine - def handle_event(:cast, {:discard_tokens, token_schemas}, state, %__MODULE__{} = data) + @impl GenServer + def handle_cast({:discard_tokens, token_schemas}, %{state: state} = data) when state in [:started, :allocated, :executing] do - {:ok, data} = - Enum.reduce(token_schemas, {:ok, data}, fn token_schema, {:ok, data} -> - remove_ets_token(token_schema, data) - end) + data = Enum.reduce(token_schemas, data, &remove_ets_token/2) { - :keep_state, + :noreply, data, - {:next_event, :cast, :try_abandon_by_tokens} + {:continue, :try_abandon_by_tokens} } end - @impl GenStateMachine - def handle_event(:cast, :try_allocate, :started, %__MODULE__{} = data) do - case JoinController.task_enablement(data) do - :ok -> - {:ok, data} = allocate_workitem(data) - {:ok, data} = update_task(%{state: :allocated}, data) - - { - :next_state, - :allocated, - data - } - - _ -> - :keep_state_and_data - end - end - - @impl GenStateMachine - def handle_event(:cast, :try_complete, :executing, %__MODULE__{} = data) do - with( - :ok <- task_completion(data), - {:ok, _tokens, data} <- do_consume_tokens(data), - {:ok, data} <- do_complete_task(data) - ) do - { - :next_state, - :completed, - data - } - else - _ -> - :keep_state_and_data - end - end - - @impl GenStateMachine - def handle_event(:cast, :try_abandon_by_workitems, state, %__MODULE__{} = data) - when state in [:allocated, :executing] do - case workitems_abandonment(data) do - {:ok, data} -> - {:ok, data} = unlock_tokens(data) - {:ok, data} = do_abandon_workitems(data) - {:ok, data} = update_task(%{state: :abandoned}, data) - - { - :next_state, - :abandoned, - data - } - - _ -> - :keep_state_and_data - end - end - - @impl GenStateMachine - def handle_event(:cast, :try_abandon_by_tokens, state, %__MODULE__{} = data) - when state in [:started, :allocated, :executing] do - case tokens_abandonment(data) do - {:ok, data} -> - {:ok, data} = do_abandon_workitems(data) - {:ok, data} = update_task(%{state: :abandoned}, data) - - { - :next_state, - :abandoned, - data - } - - _ -> - :keep_state_and_data - end - end - - @impl GenStateMachine - def handle_event(:cast, :force_abandon, state, %__MODULE__{} = data) - when state in [:started, :allocated, :executing] do - {:ok, data} = do_abandon_workitems(data) - {:ok, data} = update_task(%{state: :abandoned}, data) - - { - :next_state, - :abandoned, + @impl GenServer + def handle_cast(:force_abandon, %{state: state} = data) when state in [:started, :allocated, :executing] do + data = data - } + |> do_abandon_workitems() + |> update_task(%{state: :abandoned}) + + {:noreply, data, {:continue, :stop_server}} end - @impl GenStateMachine - def handle_event( - :cast, - {:update_workitem, workitem_id, :started}, - state, - %__MODULE__{} = data - ) + @impl GenServer + def handle_cast({:update_workitem, workitem_id, :started}, %{state: state} = data) when state in [:allocated, :executing] do - {:ok, data} = upsert_ets_workitem({workitem_id, :started}, data) + data = upsert_ets_workitem({workitem_id, :started}, data) - {:keep_state, data} + {:noreply, data} end - @impl GenStateMachine - def handle_event( - :cast, - {:update_workitem, workitem_id, :completed}, - :executing, - %__MODULE__{} = data - ) do - {:ok, data} = upsert_ets_workitem({workitem_id, :completed}, data) + @impl GenServer + def handle_cast({:update_workitem, workitem_id, :completed}, %{state: :executing} = data) do + data = upsert_ets_workitem({workitem_id, :completed}, data) - {:keep_state, data, {:next_event, :cast, :try_complete}} + {:noreply, data, {:continue, :try_complete}} end - @impl GenStateMachine - def handle_event( - :cast, - {:update_workitem, workitem_id, :abandoned}, - state, - %__MODULE__{} = data - ) + @impl GenServer + def handle_cast({:update_workitem, workitem_id, :abandoned}, %{state: state} = data) when state not in [:abandoned, :completed] do - {:ok, data} = upsert_ets_workitem({workitem_id, :abandoned}, data) + data = upsert_ets_workitem({workitem_id, :abandoned}, data) - {:keep_state, data, {:next_event, :cast, :try_abandon_by_workitems}} + {:noreply, data, {:continue, :try_abandon_by_workitems}} end - @impl GenStateMachine - def handle_event(:cast, _event_content, _state, %__MODULE__{}) do - :keep_state_and_data + @impl GenServer + def handle_cast(_msg, %{} = data) do + {:noreply, data} end - @impl GenStateMachine - def handle_event({:call, from}, :preexecute, :allocated, %__MODULE__{} = data) do + @impl GenServer + def handle_call(:preexecute, _from, %__MODULE__{state: :allocated} = data) do case do_lock_tokens(data) do {:ok, locked_token_schemas, data} -> - {:ok, data} = update_task(%{state: :executing}, data) + data = update_task(data, %{state: :executing}) - { - :next_state, - :executing, - data, - {:reply, from, {:ok, locked_token_schemas}} - } + {:reply, {:ok, locked_token_schemas}, data} error -> - { - :keep_state_and_data, - {:reply, from, error} - } + {:reply, error, data} end end - @impl GenStateMachine - def handle_event({:call, from}, :preexecute, :executing, %__MODULE__{} = data) do + @impl GenServer + def handle_call(:preexecute, _from, %{state: :executing} = data) do %{ token_table: token_table } = data - reply = + tokens = token_table |> :ets.tab2list() - |> Enum.reduce({:ok, []}, fn + |> Enum.reduce([], fn {_token_id, token_schema, :locked}, {:ok, tokens} -> - {:ok, [token_schema | tokens]} + [token_schema | tokens] _, acc -> acc end) - { - :keep_state, - data, - {:reply, from, reply} - } + {:reply, {:ok, tokens}, data} end - @impl GenStateMachine - def handle_event({:call, from}, _event_content, _state, %__MODULE__{}) do - {:keep_state_and_data, {:reply, from, {:error, :task_not_available}}} - end - - @impl GenStateMachine - def format_status(_reason, [_pdict, state, data]) do - {:state, %{current_state: state, data: data}} + @impl GenServer + def handle_call(_msg, _from, %{} = data) do + {:reply, {:error, :task_not_available}, data} end + @spec fetch_workitems(t()) :: t() defp fetch_workitems(%__MODULE__{} = data) do %{ application: application, @@ -508,16 +397,12 @@ defmodule WorkflowMetal.Task.Task do {:ok, workitems} = WorkflowMetal.Storage.fetch_workitems(application, task_id) - {:ok, _data} = - Enum.reduce( - workitems, - {:ok, data}, - fn workitem, {:ok, data} -> - upsert_ets_workitem(workitem, data) - end - ) + data = Enum.reduce(workitems, data, &upsert_ets_workitem/2) + + data end + @spec fetch_transition(t()) :: t() defp fetch_transition(%__MODULE__{} = data) do %{ application: application, @@ -528,9 +413,10 @@ defmodule WorkflowMetal.Task.Task do {:ok, transition_schema} = WorkflowMetal.Storage.fetch_transition(application, transition_id) - {:ok, %{data | transition_schema: transition_schema}} + %{data | transition_schema: transition_schema} end + @spec request_tokens(t()) :: t() defp request_tokens(%__MODULE__{} = data) do %{ application: application, @@ -546,22 +432,16 @@ defmodule WorkflowMetal.Task.Task do task_id ) do {:ok, tokens} -> - {:ok, data} = - Enum.reduce( - tokens, - {:ok, data}, - fn token_schema, {:ok, data} -> - upsert_ets_token(token_schema, data) - end - ) + data = Enum.reduce(tokens, data, &upsert_ets_token/2) - {:ok, data} + data _ -> - {:ok, data} + data end end + @spec fetch_locked_tokens(t()) :: t() defp fetch_locked_tokens(%__MODULE__{} = data) do %{ application: application, @@ -578,18 +458,12 @@ defmodule WorkflowMetal.Task.Task do task_id ) - {:ok, data} = - Enum.reduce( - locked_token_schemas, - {:ok, data}, - fn token_schema, {:ok, data} -> - upsert_ets_token(token_schema, data) - end - ) + data = Enum.reduce(locked_token_schemas, data, &upsert_ets_token/2) - {:ok, data} + data end + @spec start_created_workitems(t()) :: t() defp start_created_workitems(%__MODULE__{} = data) do %{ application: application, @@ -606,10 +480,11 @@ defmodule WorkflowMetal.Task.Task do ) end) - {:ok, data} + data end - defp update_task(params, %__MODULE__{} = data) do + @spec update_task(t(), map()) :: t() + defp update_task(%__MODULE__{} = data, params) do %{ application: application, task_schema: task_schema @@ -622,9 +497,33 @@ defmodule WorkflowMetal.Task.Task do params ) - {:ok, %{data | task_schema: task_schema}} + new_data = %{data | task_schema: task_schema, state: task_schema.state} + + log_state_change(new_data, data.state) + + new_data end + defp log_state_change(data, old_state) do + case {old_state, data.state} do + {:started, :allocated} -> + Logger.debug(fn -> "#{describe(data)} allocate a workitem." end) + + {:allocated, :executing} -> + Logger.debug(fn -> "#{describe(data)} start executing." end) + + {:executing, :completed} -> + Logger.debug(fn -> "#{describe(data)} complete the execution." end) + + {from, :abandoned} -> + Logger.debug(fn -> "#{describe(data)} has been abandoned from #{from}." end) + + _other -> + :ok + end + end + + @spec allocate_workitem(t()) :: t() defp allocate_workitem(%__MODULE__{} = data) do %{ application: application, @@ -650,7 +549,7 @@ defmodule WorkflowMetal.Task.Task do {:ok, workitem_schema} = WorkflowMetal.Storage.insert_workitem(application, workitem_schema) - {:ok, data} = upsert_ets_workitem(workitem_schema, data) + data = upsert_ets_workitem(workitem_schema, data) {:ok, _} = WorkflowMetal.Workitem.Supervisor.open_workitem( @@ -658,9 +557,10 @@ defmodule WorkflowMetal.Task.Task do workitem_schema.id ) - {:ok, data} + data end + @spec do_lock_tokens(t()) :: {:ok, [token_schema()], t()} | {:error, atom()} defp do_lock_tokens(%__MODULE__{} = data) do with( {:ok, token_ids} <- JoinController.preexecute(data), @@ -676,19 +576,13 @@ defmodule WorkflowMetal.Task.Task do task_id ) ) do - {:ok, data} = - Enum.reduce( - locked_token_schemas, - {:ok, data}, - fn token_schema, {:ok, data} -> - upsert_ets_token(token_schema, data) - end - ) + data = Enum.reduce(locked_token_schemas, data, &upsert_ets_token/2) {:ok, locked_token_schemas, data} end end + @spec task_completion(t()) :: :ok | {:error, :task_not_completed} defp task_completion(%__MODULE__{} = data) do data |> Map.fetch!(:workitem_table) @@ -704,6 +598,7 @@ defmodule WorkflowMetal.Task.Task do end end + @spec do_consume_tokens(t()) :: {consumed_tokens :: [token_schema()], t()} defp do_consume_tokens(%__MODULE__{} = data) do %{ application: application, @@ -720,14 +615,12 @@ defmodule WorkflowMetal.Task.Task do task_id ) - {:ok, data} = - Enum.reduce(tokens, {:ok, data}, fn token, {:ok, data} -> - upsert_ets_token(token, data) - end) + data = Enum.reduce(tokens, data, &upsert_ets_token/2) - {:ok, tokens, data} + {tokens, data} end + @spec do_complete_task(t()) :: t() defp do_complete_task(%__MODULE__{} = data) do %{ application: application, @@ -747,7 +640,7 @@ defmodule WorkflowMetal.Task.Task do token_schema_list ) - update_task(%{state: :completed, token_payload: token_payload}, data) + update_task(data, %{state: :completed, token_payload: token_payload}) end defp build_token_payload(%__MODULE__{} = data) do @@ -791,6 +684,7 @@ defmodule WorkflowMetal.Task.Task do end end + @spec tokens_abandonment(t()) :: {:ok, t()} | {:error, :task_not_abandoned} defp tokens_abandonment(%__MODULE__{} = data) do %{ token_table: token_table @@ -804,6 +698,7 @@ defmodule WorkflowMetal.Task.Task do end end + @spec unlock_tokens(t()) :: t() defp unlock_tokens(%__MODULE__{} = data) do %{ application: application, @@ -821,9 +716,10 @@ defmodule WorkflowMetal.Task.Task do true = :ets.delete_all_objects(token_table) - {:ok, data} + data end + @spec do_abandon_workitems(t()) :: t() defp do_abandon_workitems(%__MODULE__{} = data) do %{ application: application, @@ -835,15 +731,16 @@ defmodule WorkflowMetal.Task.Task do |> Enum.each(fn {workitem_id, state} when state in [:created, :started] -> :ok = WorkflowMetal.Workitem.Supervisor.abandon_workitem(application, workitem_id) - {:ok, _data} = upsert_ets_workitem({workitem_id, :abandoned}, data) + _data = upsert_ets_workitem({workitem_id, :abandoned}, data) _ -> :skip end) - {:ok, data} + data end + @spec upsert_ets_token(token_schema(), t()) :: t() defp upsert_ets_token(%Schema.Token{} = token_schema, %__MODULE__{} = data) do %{token_table: token_table} = data @@ -854,28 +751,28 @@ defmodule WorkflowMetal.Task.Task do token_schema.state }) - {:ok, data} + data end + @spec remove_ets_token(token_schema(), t()) :: t() defp remove_ets_token(%Schema.Token{} = token_schema, %__MODULE__{} = data) do %{token_table: token_table} = data true = :ets.delete(token_table, token_schema.id) - {:ok, data} + data end + @spec upsert_ets_workitem(workitem_schema(), t()) :: t() defp upsert_ets_workitem(%Schema.Workitem{} = workitem, %__MODULE__{} = data) do - %{workitem_table: workitem_table} = data - true = :ets.insert(workitem_table, {workitem.id, workitem.state}) - - {:ok, data} + upsert_ets_workitem({workitem.id, workitem.state}, data) end + @spec upsert_ets_workitem({workitem_id, workitem_state}, t()) :: t() defp upsert_ets_workitem({workitem_id, workitem_state}, %__MODULE__{} = data) do %{workitem_table: workitem_table} = data true = :ets.insert(workitem_table, {workitem_id, workitem_state}) - {:ok, data} + data end defp describe(%__MODULE__{} = data) do diff --git a/lib/workflow_metal/workitem/supervisor.ex b/lib/workflow_metal/workitem/supervisor.ex index 6df4993..b6b2092 100644 --- a/lib/workflow_metal/workitem/supervisor.ex +++ b/lib/workflow_metal/workitem/supervisor.ex @@ -6,9 +6,7 @@ defmodule WorkflowMetal.Workitem.Supervisor do use DynamicSupervisor alias WorkflowMetal.Application.WorkflowsSupervisor - alias WorkflowMetal.Registration - alias WorkflowMetal.Storage.Schema @type application :: WorkflowMetal.Application.t() @@ -49,9 +47,7 @@ defmodule WorkflowMetal.Workitem.Supervisor do WorkflowMetal.Registration.Adapter.on_start_child() | {:error, :workitem_not_found} def open_workitem(application, %Schema.Workitem{} = workitem_schema) do - with( - {:ok, _} <- WorkflowsSupervisor.open_workflow(application, workitem_schema.workflow_id) - ) do + with({:ok, _} <- WorkflowsSupervisor.open_workflow(application, workitem_schema.workflow_id)) do workitem_supervisor = via_name(application, workitem_schema.workflow_id) workitem_spec = { @@ -69,9 +65,7 @@ defmodule WorkflowMetal.Workitem.Supervisor do end def open_workitem(application, workitem_id) do - with( - {:ok, workitem_schema} <- WorkflowMetal.Storage.fetch_workitem(application, workitem_id) - ) do + with({:ok, workitem_schema} <- WorkflowMetal.Storage.fetch_workitem(application, workitem_id)) do open_workitem(application, workitem_schema) end end diff --git a/lib/workflow_metal/workitem/workitem.ex b/lib/workflow_metal/workitem/workitem.ex index 1465663..84744ed 100644 --- a/lib/workflow_metal/workitem/workitem.ex +++ b/lib/workflow_metal/workitem/workitem.ex @@ -1,6 +1,6 @@ defmodule WorkflowMetal.Workitem.Workitem do @moduledoc """ - A `GenStateMachine` to run a workitem. + A `GenServer` to run a workitem. ## Flow @@ -29,16 +29,13 @@ defmodule WorkflowMetal.Workitem.Workitem do Restore itself only. """ - require Logger - - use GenStateMachine, - callback_mode: [:handle_event_function, :state_enter], - restart: :transient - + use GenServer, restart: :transient use TypedStruct alias WorkflowMetal.Storage.Schema + require Logger + @type application :: WorkflowMetal.Application.t() @type workflow_identifier :: WorkflowMetal.Workflow.Supervisor.workflow_identifier() @@ -53,6 +50,7 @@ defmodule WorkflowMetal.Workitem.Workitem do typedstruct do field :application, application() + field :state, WorkflowMetal.Storage.Schema.Workitem.state() field :workitem_schema, workitem_schema() end @@ -70,7 +68,7 @@ defmodule WorkflowMetal.Workitem.Workitem do name = Keyword.fetch!(options, :name) workitem_schema = Keyword.fetch!(options, :workitem_schema) - GenStateMachine.start_link( + GenServer.start_link( __MODULE__, {workflow_identifier, workitem_schema}, name: name @@ -119,7 +117,7 @@ defmodule WorkflowMetal.Workitem.Workitem do """ @spec complete(:gen_statem.server_ref(), workitem_output) :: on_complete def complete(workitem_server, output) do - GenStateMachine.call(workitem_server, {:complete, output}) + GenServer.call(workitem_server, {:complete, output}) end @doc """ @@ -127,148 +125,97 @@ defmodule WorkflowMetal.Workitem.Workitem do """ @spec abandon(:gen_statem.server_ref()) :: :ok def abandon(workitem_server) do - GenStateMachine.cast(workitem_server, :abandon) + GenServer.cast(workitem_server, :abandon) end # callbacks - @impl GenStateMachine + @impl GenServer def init({{application, _workflow_id}, workitem_schema}) do - %{ - state: state - } = workitem_schema - - if state in [:completed, :abandoned] do - {:stop, :workitem_not_available} - else - { - :ok, - state, - %__MODULE__{ - application: application, - workitem_schema: workitem_schema - } - } - end - end - - @impl GenStateMachine - def handle_event(:enter, state, state, %__MODULE__{}) do - case state do + case workitem_schema.state do :created -> { - :keep_state_and_data, - # Give some time to handle request on init - {:state_timeout, 1, :start_on_created} + :ok, + %__MODULE__{ + application: application, + state: :created, + workitem_schema: workitem_schema + }, + {:continue, :execute_on_created} } :started -> - :keep_state_and_data + { + :ok, + %__MODULE__{ + application: application, + state: :started, + workitem_schema: workitem_schema + } + } + + state when state in [:completed, :abandoned] -> + {:stop, :workitem_not_available} end end - @impl GenStateMachine - def handle_event(:enter, old_state, state, %__MODULE__{} = data) do - {:ok, data} = update_workitem_in_task_server(data) + @impl GenServer + def handle_continue(:execute_on_created, %{state: :created} = data) do + Logger.debug("#{describe(data)} start executing.") - case {old_state, state} do - {:created, :started} -> - Logger.debug(fn -> "#{describe(data)} start executing." end) - - {:keep_state, data} + case do_execute(data) do + {:ok, :started, data} -> + data = update_workitem(%{state: :started}, data) - {:started, :completed} -> - Logger.debug(fn -> "#{describe(data)} complete the execution." end) + {:noreply, data} - {:stop, :normal} + {:ok, {:completed, workitem_output}, data} -> + {:noreply, %{data | state: :started}, {:continue, {:complete, workitem_output}}} - {from, :abandoned} -> - Logger.debug(fn -> "#{describe(data)} has been abandoned from #{from}." end) + {:ok, :abandoned, data} -> + data = update_workitem(%{state: :abandoned}, data) - {:stop, :normal} + {:noreply, data} end end - @impl GenStateMachine - def handle_event({:call, from}, {:complete, output}, :started, %__MODULE__{} = data) do - {:ok, data} = update_workitem(%{state: :completed, output: output}, data) + @impl GenServer + def handle_continue({:complete, output}, %{state: :started} = data) do + data = do_complete(data, output) - { - :next_state, - :completed, - data, - {:reply, from, :ok} - } + {:noreply, data, {:continue, :stop_server}} end - @impl GenStateMachine - def handle_event({:call, from}, _event_content, _state, %__MODULE__{}) do - {:keep_state_and_data, {:reply, from, {:error, :workitem_not_available}}} + @impl GenServer + def handle_continue(:stop_server, data) do + {:stop, :normal, data} end - @impl GenStateMachine - def handle_event(:cast, {:complete, output}, :started, %__MODULE__{} = data) do - {:ok, data} = update_workitem(%{state: :completed, output: output}, data) + @impl GenServer + def handle_call({:complete, output}, _from, %{state: :started} = data) do + data = do_complete(data, output) - { - :next_state, - :completed, - data - } + {:reply, :ok, data, {:continue, :stop_server}} end - @impl GenStateMachine - def handle_event(:cast, :abandon, state, %__MODULE__{} = data) - when state not in [:abandoned, :completed] do - do_abanodn(data) - - {:ok, data} = update_workitem(%{state: :abandoned}, data) - - { - :next_state, - :abandoned, - data - } + @impl GenServer + def handle_call({:complete, _output}, _from, data) do + {:reply, {:error, :workitem_not_available}, data, {:continue, :stop_server}} end - @impl GenStateMachine - def handle_event(:cast, _event_content, _state, %__MODULE__{}) do - :keep_state_and_data - end + @impl GenServer + def handle_cast(:abandon, %{state: state} = data) when state not in [:abandoned, :completed] do + data = do_abanodn(data) - @impl GenStateMachine - def handle_event(:state_timeout, :start_on_created, :created, %__MODULE__{}) do - {:keep_state_and_data, [{:next_event, :internal, :execute}]} + {:noreply, data, {:continue, :stop_server}} end - @impl GenStateMachine - def handle_event(:internal, :execute, :created, %__MODULE__{} = data) do - case do_execute(data) do - {:ok, :started, data} -> - {:ok, data} = update_workitem(%{state: :started}, data) - - {:next_state, :started, data} - - {:ok, {:completed, workitem_output}, data} -> - { - :next_state, - :started, - data, - {:next_event, :cast, {:complete, workitem_output}} - } - - {:ok, :abandoned, data} -> - {:ok, data} = update_workitem(%{state: :abandoned}, data) - - {:next_state, :abandoned, data} - end - end - - @impl GenStateMachine - def format_status(_reason, [_pdict, state, data]) do - {:state, %{current_state: state, data: data}} + @impl GenServer + def handle_cast(:abandon, data) do + {:noreply, data, {:continue, :stop_server}} end + @spec update_workitem(map(), t()) :: t() defp update_workitem(params, %__MODULE__{} = data) do %{ application: application, @@ -282,7 +229,13 @@ defmodule WorkflowMetal.Workitem.Workitem do params ) - {:ok, %{data | workitem_schema: workitem_schema}} + new_data = %{data | workitem_schema: workitem_schema, state: workitem_schema.state} + + if new_data.state != data.state do + update_workitem_in_task_server(new_data) + end + + new_data end defp do_execute(%__MODULE__{} = data) do @@ -318,6 +271,16 @@ defmodule WorkflowMetal.Workitem.Workitem do end end + @spec do_complete(t(), output :: term()) :: t() + defp do_complete(%__MODULE__{} = data, output) do + data = update_workitem(%{state: :completed, output: output}, data) + + Logger.debug("#{describe(data)} complete the execution.") + + data + end + + @spec do_abanodn(t()) :: t() defp do_abanodn(%__MODULE__{} = data) do %{ application: application, @@ -340,6 +303,12 @@ defmodule WorkflowMetal.Workitem.Workitem do executor_params: executor_params, application: application ) + + Logger.debug("#{describe(data)} has been abandoned from #{data.state}.") + + new_data = %{data | state: :abandoned} + + update_workitem(%{state: :abandoned}, new_data) end defp update_workitem_in_task_server(%__MODULE__{} = data) do diff --git a/mix.exs b/mix.exs index 6169e86..e3da2a8 100644 --- a/mix.exs +++ b/mix.exs @@ -22,15 +22,15 @@ defmodule WorkflowMetal.MixProject do def application do [ - extra_applications: [:logger, :gen_state_machine] + extra_applications: [:logger] ] end - defp description() do + defp description do "Workflow engine based on PetriNet" end - defp package() do + defp package do [ licenses: ["MIT"], links: %{"GitHub" => "https://github.com/Byzanteam/workflow_metal"} @@ -41,20 +41,15 @@ defmodule WorkflowMetal.MixProject do [ {:ex_doc, ">= 0.0.0", only: :dev, runtime: false}, {:doctor, "~> 0.17.0", only: [:dev]}, - {:gen_state_machine, "~> 3.0"}, {:typed_struct, "~> 0.3.0"}, {:credo, "~> 1.6", only: [:dev, :test], runtime: false}, {:jet_credo, [github: "Byzanteam/jet_credo", only: [:dev, :test], runtime: false]}, + {:styler, "~> 0.7", only: [:dev, :test], runtime: false}, {:dialyxir, "~> 1.0", only: [:dev], runtime: false} ] end - defp elixirc_paths(:test), - do: [ - "lib", - "test/support", - "test/helpers" - ] + defp elixirc_paths(:test), do: ["lib", "test/support", "test/helpers"] defp elixirc_paths(_), do: ["lib"] diff --git a/mix.lock b/mix.lock index 8db82bb..d23d257 100644 --- a/mix.lock +++ b/mix.lock @@ -8,12 +8,12 @@ "erlex": {:hex, :erlex, "0.2.6", "c7987d15e899c7a2f34f5420d2a2ea0d659682c06ac607572df55a43753aa12e", [:mix], [], "hexpm", "2ed2e25711feb44d52b17d2780eabf998452f6efda104877a3881c2f8c0c0c75"}, "ex_doc": {:hex, :ex_doc, "0.28.1", "34fab7e7201c5a1f275f3b2f837125c940c512e8543d181bd4dd7acb19c8dba0", [:mix], [{:earmark_parser, "~> 1.4.19", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1", [hex: :makeup_erlang, repo: "hexpm", optional: false]}], "hexpm", "10e564dd59101a5edc4de7009a54baed015a246dee01f7200aab24e8f57fc044"}, "file_system": {:hex, :file_system, "0.2.10", "fb082005a9cd1711c05b5248710f8826b02d7d1784e7c3451f9c1231d4fc162d", [:mix], [], "hexpm", "41195edbfb562a593726eda3b3e8b103a309b733ad25f3d642ba49696bf715dc"}, - "gen_state_machine": {:hex, :gen_state_machine, "3.0.0", "1e57f86a494e5c6b14137ebef26a7eb342b3b0070c7135f2d6768ed3f6b6cdff", [:mix], [], "hexpm", "0a59652574bebceb7309f6b749d2a41b45fdeda8dbb4da0791e355dd19f0ed15"}, "jason": {:hex, :jason, "1.4.0", "e855647bc964a44e2f67df589ccf49105ae039d4179db7f6271dfd3843dc27e6", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "79a3791085b2a0f743ca04cec0f7be26443738779d09302e01318f97bdb82121"}, "jet_credo": {:git, "https://github.com/Byzanteam/jet_credo.git", "7e5855de2e8b41abfb0a1f5870bbc768a325f4e8", []}, "makeup": {:hex, :makeup, "1.1.0", "6b67c8bc2882a6b6a445859952a602afc1a41c2e08379ca057c0f525366fc3ca", [:mix], [{:nimble_parsec, "~> 1.2.2 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "0a45ed501f4a8897f580eabf99a2e5234ea3e75a4373c8a52824f6e873be57a6"}, "makeup_elixir": {:hex, :makeup_elixir, "0.15.2", "dc72dfe17eb240552857465cc00cce390960d9a0c055c4ccd38b70629227e97c", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.1", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "fd23ae48d09b32eff49d4ced2b43c9f086d402ee4fd4fcb2d7fad97fa8823e75"}, "makeup_erlang": {:hex, :makeup_erlang, "0.1.1", "3fcb7f09eb9d98dc4d208f49cc955a34218fc41ff6b84df7c75b3e6e533cc65f", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "174d0809e98a4ef0b3309256cbf97101c6ec01c4ab0b23e926a9e17df2077cbb"}, "nimble_parsec": {:hex, :nimble_parsec, "1.2.2", "b99ca56bbce410e9d5ee4f9155a212e942e224e259c7ebbf8f2c86ac21d4fa3c", [:mix], [], "hexpm", "98d51bd64d5f6a2a9c6bb7586ee8129e27dfaab1140b5a4753f24dac0ba27d2f"}, + "styler": {:hex, :styler, "0.7.12", "2ef9014ff6fa2d7fc728f86436ca2f8ff88078f76d4e099dae5a658a90d8e12a", [:mix], [], "hexpm", "9b309b7c02b302118a471b99a65e184f79b0a3b7803f970eef681d5f1bdbe472"}, "typed_struct": {:hex, :typed_struct, "0.3.0", "939789e3c1dca39d7170c87f729127469d1315dcf99fee8e152bb774b17e7ff7", [:mix], [], "hexpm", "c50bd5c3a61fe4e198a8504f939be3d3c85903b382bde4865579bc23111d1b6d"}, } diff --git a/test/support/in_memory_storage_case.ex b/test/support/in_memory_storage_case.ex index 1967993..101c237 100644 --- a/test/support/in_memory_storage_case.ex +++ b/test/support/in_memory_storage_case.ex @@ -7,6 +7,8 @@ defmodule WorkflowMetal.Support.InMemoryStorageCase do using do quote location: :keep do + import unquote(__MODULE__) + alias WorkflowMetal.Storage.Schema defmodule DummyApplication do @@ -16,8 +18,6 @@ defmodule WorkflowMetal.Support.InMemoryStorageCase do storage: WorkflowMetal.Storage.Adapters.InMemory end - import unquote(__MODULE__) - setup_all do start_supervised!(DummyApplication) @@ -27,8 +27,7 @@ defmodule WorkflowMetal.Support.InMemoryStorageCase do end def generate_genesis_token(application, workflow_schema, case_schema) do - {:ok, {start_place, _end_place}} = - WorkflowMetal.Storage.fetch_edge_places(application, workflow_schema.id) + {:ok, {start_place, _end_place}} = WorkflowMetal.Storage.fetch_edge_places(application, workflow_schema.id) genesis_token_schema = struct( diff --git a/test/support/workflows/sequential_routing.ex b/test/support/workflows/sequential_routing.ex index e3fd349..291648f 100644 --- a/test/support/workflows/sequential_routing.ex +++ b/test/support/workflows/sequential_routing.ex @@ -117,10 +117,7 @@ defmodule WorkflowMetal.Support.Workflows.SequentialRouting do end @doc false - def build_workflow_associations_params( - %Schema.Workflow{} = workflow_schema, - [first | rest] = nodes - ) do + def build_workflow_associations_params(%Schema.Workflow{} = workflow_schema, [first | rest] = nodes) do {_, arcs} = Enum.reduce(rest, {first, []}, fn %Schema.Place{} = to, {%Schema.Transition{} = from, arcs} -> diff --git a/test/workflow_metal/application/workflows_supervisor_test.exs b/test/workflow_metal/application/workflows_supervisor_test.exs index 156c73d..bcdf41f 100644 --- a/test/workflow_metal/application/workflows_supervisor_test.exs +++ b/test/workflow_metal/application/workflows_supervisor_test.exs @@ -1,14 +1,15 @@ defmodule WorkflowMetal.Application.WorkflowsSupervisorTest do use ExUnit.Case, async: true + alias WorkflowMetal.Application.WorkflowsSupervisor + alias WorkflowMetal.Support.Workflows.SequentialRouting + defmodule DummyApplication do + @moduledoc false use WorkflowMetal.Application, storage: WorkflowMetal.Storage.Adapters.InMemory end - alias WorkflowMetal.Application.WorkflowsSupervisor - alias WorkflowMetal.Support.Workflows.SequentialRouting - setup_all do start_supervised!(DummyApplication) @@ -17,8 +18,7 @@ defmodule WorkflowMetal.Application.WorkflowsSupervisorTest do describe ".open_workflow/2" do test "failed to open a non-existing workflow" do - assert {:error, :workflow_not_found} = - WorkflowsSupervisor.open_workflow(DummyApplication, 123) + assert {:error, :workflow_not_found} = WorkflowsSupervisor.open_workflow(DummyApplication, 123) end test "open a sequential_routing workflow successfully" do diff --git a/test/workflow_metal/application_test.exs b/test/workflow_metal/application_test.exs index 6a53699..d03ba7b 100644 --- a/test/workflow_metal/application_test.exs +++ b/test/workflow_metal/application_test.exs @@ -1,19 +1,18 @@ defmodule WorkflowMetal.ApplicationTest do use ExUnit.Case, async: true + alias WorkflowMetal.Application.Config + defmodule DummyApplication do + @moduledoc false use WorkflowMetal.Application, name: __MODULE__.TestApplication, storage: WorkflowMetal.Storage.Adapters.InMemory end - alias WorkflowMetal.Application.Config - - alias DummyApplication.TestApplication - test "build an application" do - start_supervised(DummyApplication) + start_supervised!(DummyApplication) - assert Config.get(TestApplication, :registry) + assert Config.get(DummyApplication.TestApplication, :registry) end end diff --git a/test/workflow_metal/case/case_test.exs b/test/workflow_metal/case/case_test.exs index 3cd97eb..44ef135 100644 --- a/test/workflow_metal/case/case_test.exs +++ b/test/workflow_metal/case/case_test.exs @@ -63,16 +63,12 @@ defmodule WorkflowMetal.Case.CaseTest do assert case_schema.state === :terminated end) - {:ok, {_start_place, end_place}} = - WorkflowMetal.Storage.fetch_edge_places(DummyApplication, workflow_schema.id) + {:ok, {_start_place, end_place}} = WorkflowMetal.Storage.fetch_edge_places(DummyApplication, workflow_schema.id) - {:ok, [b_transition]} = - WorkflowMetal.Storage.fetch_transitions(DummyApplication, end_place.id, :in) + {:ok, [b_transition]} = WorkflowMetal.Storage.fetch_transitions(DummyApplication, end_place.id, :in) {:ok, [task_schema]} = - WorkflowMetal.Storage.fetch_tasks(DummyApplication, case_schema.id, - transition_id: b_transition.id - ) + WorkflowMetal.Storage.fetch_tasks(DummyApplication, case_schema.id, transition_id: b_transition.id) assert task_schema.state === :abandoned end @@ -118,28 +114,13 @@ defmodule WorkflowMetal.Case.CaseTest do %{state: :active} ) - {:ok, {start_place, _end_place}} = - WorkflowMetal.Storage.fetch_edge_places(DummyApplication, workflow_schema.id) + {:ok, {start_place, _end_place}} = WorkflowMetal.Storage.fetch_edge_places(DummyApplication, workflow_schema.id) - {:ok, [a_transition]} = - WorkflowMetal.Storage.fetch_transitions(DummyApplication, start_place.id, :out) + {:ok, [a_transition]} = WorkflowMetal.Storage.fetch_transitions(DummyApplication, start_place.id, :out) [a_transition: a_transition, case_schema: case_schema] end - test "restore from created state", %{case_schema: case_schema} do - assert {:ok, _pid} = CaseSupervisor.open_case(DummyApplication, case_schema.id) - - until(fn -> assert_receive :a_completed end) - until(fn -> assert_receive :b_completed end) - - until(fn -> - {:ok, case_schema} = WorkflowMetal.Storage.fetch_case(DummyApplication, case_schema.id) - - assert case_schema.state === :active - end) - end - test "restore from active state", %{a_transition: a_transition, case_schema: case_schema} do {:ok, _task_schema} = WorkflowMetal.Storage.insert_task( @@ -174,8 +155,7 @@ defmodule WorkflowMetal.Case.CaseTest do %{state: :terminated} ) - assert {:error, :case_not_available} = - CaseSupervisor.open_case(DummyApplication, case_schema.id) + assert {:error, :case_not_available} = CaseSupervisor.open_case(DummyApplication, case_schema.id) end test "restore from finished state", %{case_schema: case_schema} do @@ -195,8 +175,7 @@ defmodule WorkflowMetal.Case.CaseTest do until(fn -> refute Process.alive?(pid) end) - assert {:error, :case_not_available} = - CaseSupervisor.open_case(DummyApplication, case_schema.id) + assert {:error, :case_not_available} = CaseSupervisor.open_case(DummyApplication, case_schema.id) end end diff --git a/test/workflow_metal/integrations/abandon_competitors_test.exs b/test/workflow_metal/integrations/abandon_competitors_test.exs index 943604b..aaf14b0 100644 --- a/test/workflow_metal/integrations/abandon_competitors_test.exs +++ b/test/workflow_metal/integrations/abandon_competitors_test.exs @@ -5,11 +5,8 @@ defmodule WorkflowMetal.Integrations.AbandonCompetitorsTest do import WorkflowMetal.Helpers.Wait alias WorkflowMetal.Case.Supervisor, as: CaseSupervisor - - alias WorkflowMetal.Support.Workflows.DeferredChoiceRouting.{ - ManualTransition, - SimpleTransition - } + alias WorkflowMetal.Support.Workflows.DeferredChoiceRouting.ManualTransition + alias WorkflowMetal.Support.Workflows.DeferredChoiceRouting.SimpleTransition setup do params = %{workflow_id: 1} @@ -90,7 +87,7 @@ defmodule WorkflowMetal.Integrations.AbandonCompetitorsTest do ^last -> :end - _ -> + _other -> :normal end diff --git a/test/workflow_metal/task/task_test.exs b/test/workflow_metal/task/task_test.exs index 6667177..3e73784 100644 --- a/test/workflow_metal/task/task_test.exs +++ b/test/workflow_metal/task/task_test.exs @@ -26,21 +26,20 @@ defmodule WorkflowMetal.Task.TaskTest do until(fn -> {:ok, tasks} = InMemoryStorage.list_tasks(DummyApplication, workflow_schema.id) - {:ok, workitem_schemas} = - InMemoryStorage.list_workitems(DummyApplication, workflow_schema.id) + {:ok, workitem_schemas} = InMemoryStorage.list_workitems(DummyApplication, workflow_schema.id) assert length(tasks) === 2 [a_task, b_task] = Enum.filter(tasks, &(&1.case_id === case_schema.id)) assert a_task.token_payload === %{ - (workitem_schemas |> Enum.find(&(&1.task_id == a_task.id))).id => %{ + Enum.find(workitem_schemas, &(&1.task_id == a_task.id)).id => %{ reply: :a_completed } } assert b_task.token_payload === %{ - (workitem_schemas |> Enum.find(&(&1.task_id == b_task.id))).id => %{ + Enum.find(workitem_schemas, &(&1.task_id == b_task.id)).id => %{ reply: :b_completed } } @@ -68,11 +67,9 @@ defmodule WorkflowMetal.Task.TaskTest do %{state: :active} ) - {:ok, {start_place, _end_place}} = - WorkflowMetal.Storage.fetch_edge_places(DummyApplication, workflow_schema.id) + {:ok, {start_place, _end_place}} = WorkflowMetal.Storage.fetch_edge_places(DummyApplication, workflow_schema.id) - {:ok, [a_transition]} = - WorkflowMetal.Storage.fetch_transitions(DummyApplication, start_place.id, :out) + {:ok, [a_transition]} = WorkflowMetal.Storage.fetch_transitions(DummyApplication, start_place.id, :out) {:ok, task_schema} = WorkflowMetal.Storage.insert_task( @@ -98,8 +95,7 @@ defmodule WorkflowMetal.Task.TaskTest do until(fn -> {:ok, tasks} = InMemoryStorage.list_tasks(DummyApplication, task_schema.workflow_id) - {:ok, workitem_schemas} = - InMemoryStorage.list_workitems(DummyApplication, task_schema.workflow_id) + {:ok, workitem_schemas} = InMemoryStorage.list_workitems(DummyApplication, task_schema.workflow_id) assert length(tasks) === 2 @@ -109,13 +105,13 @@ defmodule WorkflowMetal.Task.TaskTest do assert b_task.state === :completed assert a_task.token_payload === %{ - (workitem_schemas |> Enum.find(&(&1.task_id == a_task.id))).id => %{ + Enum.find(workitem_schemas, &(&1.task_id == a_task.id)).id => %{ reply: :a_completed } } assert b_task.token_payload === %{ - (workitem_schemas |> Enum.find(&(&1.task_id == b_task.id))).id => %{ + Enum.find(workitem_schemas, &(&1.task_id == b_task.id)).id => %{ reply: :b_completed } } @@ -160,8 +156,7 @@ defmodule WorkflowMetal.Task.TaskTest do until(fn -> {:ok, tasks} = InMemoryStorage.list_tasks(DummyApplication, task_schema.workflow_id) - {:ok, workitem_schemas} = - InMemoryStorage.list_workitems(DummyApplication, task_schema.workflow_id) + {:ok, workitem_schemas} = InMemoryStorage.list_workitems(DummyApplication, task_schema.workflow_id) assert length(tasks) === 2 @@ -171,13 +166,13 @@ defmodule WorkflowMetal.Task.TaskTest do assert b_task.state === :completed assert a_task.token_payload === %{ - (workitem_schemas |> Enum.find(&(&1.task_id == a_task.id))).id => %{ + Enum.find(workitem_schemas, &(&1.task_id == a_task.id)).id => %{ reply: :a_completed } } assert b_task.token_payload === %{ - (workitem_schemas |> Enum.find(&(&1.task_id == b_task.id))).id => %{ + Enum.find(workitem_schemas, &(&1.task_id == b_task.id)).id => %{ reply: :b_completed } } @@ -207,18 +202,15 @@ defmodule WorkflowMetal.Task.TaskTest do Enum.each(tasks, fn task -> assert task.state === :completed - assert {:error, :task_not_available} = - TaskSupervisor.open_task(DummyApplication, task.id) + assert {:error, :task_not_available} = TaskSupervisor.open_task(DummyApplication, task.id) end) end) end test "restore from abandoned state", %{task_schema: task_schema} do - {:ok, task_schema} = - WorkflowMetal.Storage.update_task(DummyApplication, task_schema.id, %{state: :abandoned}) + {:ok, task_schema} = WorkflowMetal.Storage.update_task(DummyApplication, task_schema.id, %{state: :abandoned}) - assert {:error, :task_not_available} = - TaskSupervisor.open_task(DummyApplication, task_schema.id) + assert {:error, :task_not_available} = TaskSupervisor.open_task(DummyApplication, task_schema.id) end end @@ -250,11 +242,9 @@ defmodule WorkflowMetal.Task.TaskTest do %{state: :active} ) - {:ok, {start_place, _end_place}} = - WorkflowMetal.Storage.fetch_edge_places(DummyApplication, workflow_schema.id) + {:ok, {start_place, _end_place}} = WorkflowMetal.Storage.fetch_edge_places(DummyApplication, workflow_schema.id) - {:ok, [a_transition]} = - WorkflowMetal.Storage.fetch_transitions(DummyApplication, start_place.id, :out) + {:ok, [a_transition]} = WorkflowMetal.Storage.fetch_transitions(DummyApplication, start_place.id, :out) {:ok, task_schema} = WorkflowMetal.Storage.insert_task( @@ -302,8 +292,7 @@ defmodule WorkflowMetal.Task.TaskTest do } ) - {:ok, _} = - WorkflowMetal.Storage.lock_tokens(DummyApplication, [genesis_token.id], task_schema.id) + {:ok, _} = WorkflowMetal.Storage.lock_tokens(DummyApplication, [genesis_token.id], task_schema.id) {:ok, task_schema} = WorkflowMetal.Storage.update_task( @@ -325,8 +314,7 @@ defmodule WorkflowMetal.Task.TaskTest do until(fn -> {:ok, tasks} = InMemoryStorage.list_tasks(DummyApplication, task_schema.workflow_id) - {:ok, workitem_schemas} = - InMemoryStorage.list_workitems(DummyApplication, task_schema.workflow_id) + {:ok, workitem_schemas} = InMemoryStorage.list_workitems(DummyApplication, task_schema.workflow_id) assert length(tasks) === 2 @@ -336,13 +324,13 @@ defmodule WorkflowMetal.Task.TaskTest do assert b_task.state === :completed assert a_task.token_payload === %{ - (workitem_schemas |> Enum.find(&(&1.task_id == a_task.id))).id => %{ + Enum.find(workitem_schemas, &(&1.task_id == a_task.id)).id => %{ reply: :a_completed } } assert b_task.token_payload === %{ - (workitem_schemas |> Enum.find(&(&1.task_id == b_task.id))).id => %{ + Enum.find(workitem_schemas, &(&1.task_id == b_task.id)).id => %{ reply: :b_completed } } diff --git a/test/workflow_metal/utils/ets_test.exs b/test/workflow_metal/utils/ets_test.exs index bfdcd63..a295830 100644 --- a/test/workflow_metal/utils/ets_test.exs +++ b/test/workflow_metal/utils/ets_test.exs @@ -1,4 +1,5 @@ defmodule WorkflowMetal.Utils.ETSTest do use ExUnit.Case, async: true + doctest WorkflowMetal.Utils.ETS end diff --git a/test/workflow_metal/workitem/workitem_test.exs b/test/workflow_metal/workitem/workitem_test.exs index bfbbd03..5fe31e7 100644 --- a/test/workflow_metal/workitem/workitem_test.exs +++ b/test/workflow_metal/workitem/workitem_test.exs @@ -23,6 +23,7 @@ defmodule WorkflowMetal.Workitem.WorkitemTest do end defmodule AsynchronousTransition do + @moduledoc false use WorkflowMetal.Executor @impl true @@ -116,6 +117,7 @@ defmodule WorkflowMetal.Workitem.WorkitemTest do end defmodule TwiceLockTransition do + @moduledoc false use WorkflowMetal.Executor @impl true @@ -191,11 +193,9 @@ defmodule WorkflowMetal.Workitem.WorkitemTest do %{state: :active} ) - {:ok, {start_place, _end_place}} = - WorkflowMetal.Storage.fetch_edge_places(DummyApplication, workflow_schema.id) + {:ok, {start_place, _end_place}} = WorkflowMetal.Storage.fetch_edge_places(DummyApplication, workflow_schema.id) - {:ok, [a_transition]} = - WorkflowMetal.Storage.fetch_transitions(DummyApplication, start_place.id, :out) + {:ok, [a_transition]} = WorkflowMetal.Storage.fetch_transitions(DummyApplication, start_place.id, :out) {:ok, task_schema} = WorkflowMetal.Storage.insert_task( @@ -232,15 +232,6 @@ defmodule WorkflowMetal.Workitem.WorkitemTest do [workitem_schema: workitem_schema] end - test "cannt complete a created workitem", %{workitem_schema: workitem_schema} do - {:error, :workitem_not_available} = - WorkflowMetal.Workitem.Supervisor.complete_workitem( - DummyApplication, - workitem_schema.id, - :a_completed - ) - end - test "complete a started workitem", %{workitem_schema: workitem_schema} do WorkflowMetal.Storage.update_workitem( DummyApplication, @@ -255,8 +246,7 @@ defmodule WorkflowMetal.Workitem.WorkitemTest do :a_completed ) - {:ok, workitem_schema} = - WorkflowMetal.Storage.fetch_workitem(DummyApplication, workitem_schema.id) + {:ok, workitem_schema} = WorkflowMetal.Storage.fetch_workitem(DummyApplication, workitem_schema.id) assert workitem_schema.state === :completed end @@ -327,11 +317,9 @@ defmodule WorkflowMetal.Workitem.WorkitemTest do %{state: :active} ) - {:ok, {start_place, _end_place}} = - WorkflowMetal.Storage.fetch_edge_places(DummyApplication, workflow_schema.id) + {:ok, {start_place, _end_place}} = WorkflowMetal.Storage.fetch_edge_places(DummyApplication, workflow_schema.id) - {:ok, [a_transition]} = - WorkflowMetal.Storage.fetch_transitions(DummyApplication, start_place.id, :out) + {:ok, [a_transition]} = WorkflowMetal.Storage.fetch_transitions(DummyApplication, start_place.id, :out) {:ok, task_schema} = WorkflowMetal.Storage.insert_task( @@ -383,8 +371,7 @@ defmodule WorkflowMetal.Workitem.WorkitemTest do ) until(fn -> - {:ok, workitem_schema} = - WorkflowMetal.Storage.fetch_workitem(DummyApplication, workitem_schema.id) + {:ok, workitem_schema} = WorkflowMetal.Storage.fetch_workitem(DummyApplication, workitem_schema.id) assert workitem_schema.state === :abandoned end) @@ -415,8 +402,7 @@ defmodule WorkflowMetal.Workitem.WorkitemTest do until(fn -> refute Process.alive?(pid) end) until(fn -> - {:ok, workitem_schema} = - WorkflowMetal.Storage.fetch_workitem(DummyApplication, workitem_schema.id) + {:ok, workitem_schema} = WorkflowMetal.Storage.fetch_workitem(DummyApplication, workitem_schema.id) assert workitem_schema.state === :abandoned end) @@ -475,11 +461,9 @@ defmodule WorkflowMetal.Workitem.WorkitemTest do %{state: :active} ) - {:ok, {start_place, _end_place}} = - WorkflowMetal.Storage.fetch_edge_places(DummyApplication, workflow_schema.id) + {:ok, {start_place, _end_place}} = WorkflowMetal.Storage.fetch_edge_places(DummyApplication, workflow_schema.id) - {:ok, [a_transition]} = - WorkflowMetal.Storage.fetch_transitions(DummyApplication, start_place.id, :out) + {:ok, [a_transition]} = WorkflowMetal.Storage.fetch_transitions(DummyApplication, start_place.id, :out) {:ok, task_schema} = WorkflowMetal.Storage.insert_task( @@ -517,8 +501,7 @@ defmodule WorkflowMetal.Workitem.WorkitemTest do end test "from created", %{workitem_schema: workitem_schema} do - {:ok, _} = - WorkflowMetal.Workitem.Supervisor.open_workitem(DummyApplication, workitem_schema.id) + {:ok, _} = WorkflowMetal.Workitem.Supervisor.open_workitem(DummyApplication, workitem_schema.id) until(fn -> assert_receive :a_completed end) until(fn -> assert_receive :b_completed end) @@ -532,8 +515,7 @@ defmodule WorkflowMetal.Workitem.WorkitemTest do %{state: :started} ) - {:ok, _pid} = - WorkflowMetal.Workitem.Supervisor.open_workitem(DummyApplication, workitem_schema.id) + {:ok, _pid} = WorkflowMetal.Workitem.Supervisor.open_workitem(DummyApplication, workitem_schema.id) refute_receive :a_completed end