diff --git a/lib/workflow_metal/task/task.ex b/lib/workflow_metal/task/task.ex index 3218628..b7523cb 100644 --- a/lib/workflow_metal/task/task.ex +++ b/lib/workflow_metal/task/task.ex @@ -1,6 +1,6 @@ defmodule WorkflowMetal.Task.Task do @moduledoc """ - A `GenStateMachine` to lock tokens and generate `workitem`. + A `GenServer` to lock tokens and generate `workitem`. ## Storage The data of `:token_table` is stored in ETS in the following format: @@ -26,9 +26,7 @@ defmodule WorkflowMetal.Task.Task do require Logger - use GenStateMachine, - callback_mode: [:handle_event_function, :state_enter], - restart: :transient + use GenServer, restart: :transient use TypedStruct @@ -53,6 +51,7 @@ defmodule WorkflowMetal.Task.Task do typedstruct do field :application, application() field :task_schema, task_schema() + field :state, WorkflowMetal.Storage.Schema.Task.state() field :transition_schema, transition_schema() field :token_table, :ets.tid() field :workitem_table, :ets.tid() @@ -78,7 +77,7 @@ defmodule WorkflowMetal.Task.Task do name = Keyword.fetch!(options, :name) task_schema = Keyword.fetch!(options, :task_schema) - GenStateMachine.start_link( + GenServer.start_link( __MODULE__, {workflow_identifier, task_schema}, name: name @@ -124,7 +123,7 @@ defmodule WorkflowMetal.Task.Task do """ @spec force_abandon(:gen_statem.server_ref()) :: :ok def force_abandon(task_server) do - GenStateMachine.cast(task_server, :force_abandon) + GenServer.cast(task_server, :force_abandon) end @doc """ @@ -132,7 +131,7 @@ defmodule WorkflowMetal.Task.Task do """ @spec receive_tokens(:gen_statem.server_ref(), [token_schema]) :: :ok def receive_tokens(task_server, token_schemas) do - GenStateMachine.cast(task_server, {:receive_tokens, token_schemas}) + GenServer.cast(task_server, {:receive_tokens, token_schemas}) end @doc """ @@ -142,7 +141,7 @@ defmodule WorkflowMetal.Task.Task do """ @spec discard_tokens(:gen_statem.server_ref(), [token_schema]) :: :ok def discard_tokens(task_server, token_schemas) do - GenStateMachine.cast(task_server, {:discard_tokens, token_schemas}) + GenServer.cast(task_server, {:discard_tokens, token_schemas}) end @doc """ @@ -157,7 +156,7 @@ defmodule WorkflowMetal.Task.Task do """ @spec preexecute(:gen_statem.server_ref()) :: on_preexecute def preexecute(task_server) do - GenStateMachine.call(task_server, :preexecute) + GenServer.call(task_server, :preexecute) end @doc """ @@ -165,339 +164,234 @@ defmodule WorkflowMetal.Task.Task do """ @spec update_workitem(:gen_statem.server_ref(), workitem_id, workitem_state) :: :ok def update_workitem(task_server, workitem_id, workitem_state) do - GenStateMachine.cast(task_server, {:update_workitem, workitem_id, workitem_state}) + GenServer.cast(task_server, {:update_workitem, workitem_id, workitem_state}) end # callbacks - @impl GenStateMachine + @impl GenServer def init({{application, _workflow_id}, task_schema}) do - %{ - state: state - } = task_schema + case task_schema.state do + state when state in [:abandoned, :completed] -> + {:stop, :task_not_available} - if state in [:abandoned, :completed] do - {:stop, :task_not_available} - else - { - :ok, - state, - %__MODULE__{ - application: application, - task_schema: task_schema, - token_table: :ets.new(:token_table, [:set, :private]), - workitem_table: :ets.new(:workitem_table, [:set, :private]) + state -> + { + :ok, + %__MODULE__{ + application: application, + task_schema: task_schema, + state: state, + token_table: :ets.new(:token_table, [:set, :private]), + workitem_table: :ets.new(:workitem_table, [:set, :private]) + }, + {:continue, :after_start} } - } end end - @impl GenStateMachine - # init - def handle_event(:enter, state, state, %__MODULE__{} = data) do - {:ok, data} = fetch_workitems(data) - {:ok, data} = fetch_transition(data) + @impl GenServer + def handle_continue(:after_start, %{state: state} = data) + when state in [:started, :allocated] do + data = + data + |> fetch_workitems() + |> fetch_transition() + |> request_tokens() + |> start_created_workitems() - case state do - :started -> - {:ok, data} = request_tokens(data) + {:noreply, data} + end - { - :keep_state, - data, - {:state_timeout, 1, :after_start} - } + def handle_continue(:after_start, %{state: :executing} = data) do + data = + data + |> fetch_workitems() + |> fetch_transition() + |> fetch_locked_tokens() + |> start_created_workitems() - :allocated -> - {:ok, data} = request_tokens(data) + {:noreply, data, {:continue, :try_complete}} + end - { - :keep_state, - data, - {:state_timeout, 1, :after_start} - } + @impl GenServer + def handle_continue(:try_allocate, %{state: :started} = data) do + case JoinController.task_enablement(data) do + :ok -> + data = + data + |> allocate_workitem() + |> update_task(%{state: :allocated}) - :executing -> - {:ok, data} = fetch_locked_tokens(data) + {:noreply, data} - { - :keep_state, - data, - {:state_timeout, 1, :after_start} - } + _other -> + {:noreply, data} end end - @impl GenStateMachine - def handle_event(:enter, old_state, state, %__MODULE__{} = data) do - case {old_state, state} do - {:started, :allocated} -> - Logger.debug(fn -> "#{describe(data)} allocate a workitem." end) - - {:keep_state, data} - - {:allocated, :executing} -> - Logger.debug(fn -> "#{describe(data)} start executing." end) + @impl GenServer + def handle_continue(:try_abandon_by_tokens, %{state: state} = data) + when state in [:started, :allocated, :executing] do + case tokens_abandonment(data) do + {:ok, data} -> + data = + data + |> do_abandon_workitems() + |> update_task(%{state: :abandoned}) - {:keep_state, data} + {:noreply, data, {:continue, :stop_server}} - {:executing, :completed} -> - Logger.debug(fn -> "#{describe(data)} complete the execution." end) + _error -> + {:noreply, data} + end + end - {:stop, :normal} + @impl GenServer + def handle_continue(:try_complete, %{state: :executing} = data) do + case task_completion(data) do + :ok -> + {_tokens, data} = do_consume_tokens(data) + data = do_complete_task(data) - {from, :abandoned} -> - Logger.debug(fn -> "#{describe(data)} has been abandoned from #{from}." end) + {:noreply, data, {:continue, :stop_server}} - {:stop, :normal} + _error -> + {:noreply, data} end end - @impl GenStateMachine - def handle_event(:state_timeout, :after_start, state, %__MODULE__{} = data) do - {:ok, data} = start_created_workitems(data) + @impl GenServer + def handle_continue(:try_abandon_by_workitems, %{state: state} = data) + when state in [:allocated, :executing] do + case workitems_abandonment(data) do + {:ok, data} -> + data = + data + |> unlock_tokens() + |> do_abandon_workitems() + |> update_task(%{state: :abandoned}) - case state do - :executing -> - { - :keep_state, - data, - {:next_event, :cast, :try_complete} - } + {:noreply, data, {:continue, :stop_server}} _ -> - :keep_state_and_data + {:noreply, data} end end - @impl GenStateMachine - def handle_event(:cast, {:receive_tokens, token_schemas}, :started, %__MODULE__{} = data) do - {:ok, data} = - Enum.reduce(token_schemas, {:ok, data}, fn token_schema, {:ok, data} -> - upsert_ets_token(token_schema, data) - end) + @impl GenServer + def handle_continue(:stop_server, data) do + {:stop, :normal, data} + end + + @impl GenServer + def handle_cast({:receive_tokens, token_schemas}, %{state: :started} = data) do + data = Enum.reduce(token_schemas, data, &upsert_ets_token/2) { - :keep_state, + :noreply, data, - {:next_event, :cast, :try_allocate} + {:continue, :try_allocate} } end - @impl GenStateMachine - def handle_event(:cast, {:receive_tokens, token_schemas}, :allocated, %__MODULE__{} = data) do - {:ok, data} = - Enum.reduce(token_schemas, {:ok, data}, fn token_schema, {:ok, data} -> - upsert_ets_token(token_schema, data) - end) + @impl GenServer + def handle_cast({:receive_tokens, token_schemas}, %{state: :allocated} = data) do + data = Enum.reduce(token_schemas, data, &upsert_ets_token/2) - { - :keep_state, - data - } + {:noreply, data} end - @impl GenStateMachine - def handle_event(:cast, {:discard_tokens, token_schemas}, state, %__MODULE__{} = data) + @impl GenServer + def handle_cast({:discard_tokens, token_schemas}, %{state: state} = data) when state in [:started, :allocated, :executing] do - {:ok, data} = - Enum.reduce(token_schemas, {:ok, data}, fn token_schema, {:ok, data} -> - remove_ets_token(token_schema, data) - end) + data = Enum.reduce(token_schemas, data, &remove_ets_token/2) { - :keep_state, + :noreply, data, - {:next_event, :cast, :try_abandon_by_tokens} + {:continue, :try_abandon_by_tokens} } end - @impl GenStateMachine - def handle_event(:cast, :try_allocate, :started, %__MODULE__{} = data) do - case JoinController.task_enablement(data) do - :ok -> - {:ok, data} = allocate_workitem(data) - {:ok, data} = update_task(%{state: :allocated}, data) - - { - :next_state, - :allocated, - data - } - - _ -> - :keep_state_and_data - end - end - - @impl GenStateMachine - def handle_event(:cast, :try_complete, :executing, %__MODULE__{} = data) do - with( - :ok <- task_completion(data), - {:ok, _tokens, data} <- do_consume_tokens(data), - {:ok, data} <- do_complete_task(data) - ) do - { - :next_state, - :completed, - data - } - else - _ -> - :keep_state_and_data - end - end - - @impl GenStateMachine - def handle_event(:cast, :try_abandon_by_workitems, state, %__MODULE__{} = data) - when state in [:allocated, :executing] do - case workitems_abandonment(data) do - {:ok, data} -> - {:ok, data} = unlock_tokens(data) - {:ok, data} = do_abandon_workitems(data) - {:ok, data} = update_task(%{state: :abandoned}, data) - - { - :next_state, - :abandoned, - data - } - - _ -> - :keep_state_and_data - end - end - - @impl GenStateMachine - def handle_event(:cast, :try_abandon_by_tokens, state, %__MODULE__{} = data) + @impl GenServer + def handle_cast(:force_abandon, %{state: state} = data) when state in [:started, :allocated, :executing] do - case tokens_abandonment(data) do - {:ok, data} -> - {:ok, data} = do_abandon_workitems(data) - {:ok, data} = update_task(%{state: :abandoned}, data) - - { - :next_state, - :abandoned, - data - } - - _ -> - :keep_state_and_data - end - end - - @impl GenStateMachine - def handle_event(:cast, :force_abandon, state, %__MODULE__{} = data) - when state in [:started, :allocated, :executing] do - {:ok, data} = do_abandon_workitems(data) - {:ok, data} = update_task(%{state: :abandoned}, data) - - { - :next_state, - :abandoned, + data = data - } + |> do_abandon_workitems() + |> update_task(%{state: :abandoned}) + + {:noreply, data, {:continue, :stop_server}} end - @impl GenStateMachine - def handle_event( - :cast, - {:update_workitem, workitem_id, :started}, - state, - %__MODULE__{} = data - ) + @impl GenServer + def handle_cast({:update_workitem, workitem_id, :started}, %{state: state} = data) when state in [:allocated, :executing] do - {:ok, data} = upsert_ets_workitem({workitem_id, :started}, data) + data = upsert_ets_workitem({workitem_id, :started}, data) - {:keep_state, data} + {:noreply, data} end - @impl GenStateMachine - def handle_event( - :cast, - {:update_workitem, workitem_id, :completed}, - :executing, - %__MODULE__{} = data - ) do - {:ok, data} = upsert_ets_workitem({workitem_id, :completed}, data) + @impl GenServer + def handle_cast({:update_workitem, workitem_id, :completed}, %{state: :executing} = data) do + data = upsert_ets_workitem({workitem_id, :completed}, data) - {:keep_state, data, {:next_event, :cast, :try_complete}} + {:noreply, data, {:continue, :try_complete}} end - @impl GenStateMachine - def handle_event( - :cast, - {:update_workitem, workitem_id, :abandoned}, - state, - %__MODULE__{} = data - ) + @impl GenServer + def handle_cast({:update_workitem, workitem_id, :abandoned}, %{state: state} = data) when state not in [:abandoned, :completed] do - {:ok, data} = upsert_ets_workitem({workitem_id, :abandoned}, data) + data = upsert_ets_workitem({workitem_id, :abandoned}, data) - {:keep_state, data, {:next_event, :cast, :try_abandon_by_workitems}} + {:noreply, data, {:continue, :try_abandon_by_workitems}} end - @impl GenStateMachine - def handle_event(:cast, _event_content, _state, %__MODULE__{}) do - :keep_state_and_data + @impl GenServer + def handle_cast(_msg, %{} = data) do + {:noreply, data} end - @impl GenStateMachine - def handle_event({:call, from}, :preexecute, :allocated, %__MODULE__{} = data) do + @impl GenServer + def handle_call(:preexecute, _from, %__MODULE__{state: :allocated} = data) do case do_lock_tokens(data) do {:ok, locked_token_schemas, data} -> - {:ok, data} = update_task(%{state: :executing}, data) + data = update_task(data, %{state: :executing}) - { - :next_state, - :executing, - data, - {:reply, from, {:ok, locked_token_schemas}} - } + {:reply, {:ok, locked_token_schemas}, data} error -> - { - :keep_state_and_data, - {:reply, from, error} - } + {:reply, error, data} end end - @impl GenStateMachine - def handle_event({:call, from}, :preexecute, :executing, %__MODULE__{} = data) do + @impl GenServer + def handle_call(:preexecute, _from, %{state: :executing} = data) do %{ token_table: token_table } = data - reply = + tokens = token_table |> :ets.tab2list() - |> Enum.reduce({:ok, []}, fn + |> Enum.reduce([], fn {_token_id, token_schema, :locked}, {:ok, tokens} -> - {:ok, [token_schema | tokens]} + [token_schema | tokens] _, acc -> acc end) - { - :keep_state, - data, - {:reply, from, reply} - } - end - - @impl GenStateMachine - def handle_event({:call, from}, _event_content, _state, %__MODULE__{}) do - {:keep_state_and_data, {:reply, from, {:error, :task_not_available}}} + {:reply, {:ok, tokens}, data} end - @impl GenStateMachine - def format_status(_reason, [_pdict, state, data]) do - {:state, %{current_state: state, data: data}} + @impl GenServer + def handle_call(_msg, _from, %{} = data) do + {:reply, {:error, :task_not_available}, data} end + @spec fetch_workitems(t()) :: t() defp fetch_workitems(%__MODULE__{} = data) do %{ application: application, @@ -508,16 +402,12 @@ defmodule WorkflowMetal.Task.Task do {:ok, workitems} = WorkflowMetal.Storage.fetch_workitems(application, task_id) - {:ok, _data} = - Enum.reduce( - workitems, - {:ok, data}, - fn workitem, {:ok, data} -> - upsert_ets_workitem(workitem, data) - end - ) + data = Enum.reduce(workitems, data, &upsert_ets_workitem/2) + + data end + @spec fetch_transition(t()) :: t() defp fetch_transition(%__MODULE__{} = data) do %{ application: application, @@ -528,9 +418,10 @@ defmodule WorkflowMetal.Task.Task do {:ok, transition_schema} = WorkflowMetal.Storage.fetch_transition(application, transition_id) - {:ok, %{data | transition_schema: transition_schema}} + %{data | transition_schema: transition_schema} end + @spec request_tokens(t()) :: t() defp request_tokens(%__MODULE__{} = data) do %{ application: application, @@ -546,22 +437,16 @@ defmodule WorkflowMetal.Task.Task do task_id ) do {:ok, tokens} -> - {:ok, data} = - Enum.reduce( - tokens, - {:ok, data}, - fn token_schema, {:ok, data} -> - upsert_ets_token(token_schema, data) - end - ) + data = Enum.reduce(tokens, data, &upsert_ets_token/2) - {:ok, data} + data _ -> - {:ok, data} + data end end + @spec fetch_locked_tokens(t()) :: t() defp fetch_locked_tokens(%__MODULE__{} = data) do %{ application: application, @@ -578,18 +463,12 @@ defmodule WorkflowMetal.Task.Task do task_id ) - {:ok, data} = - Enum.reduce( - locked_token_schemas, - {:ok, data}, - fn token_schema, {:ok, data} -> - upsert_ets_token(token_schema, data) - end - ) + data = Enum.reduce(locked_token_schemas, data, &upsert_ets_token/2) - {:ok, data} + data end + @spec start_created_workitems(t()) :: t() defp start_created_workitems(%__MODULE__{} = data) do %{ application: application, @@ -606,10 +485,11 @@ defmodule WorkflowMetal.Task.Task do ) end) - {:ok, data} + data end - defp update_task(params, %__MODULE__{} = data) do + @spec update_task(t(), map()) :: t() + defp update_task(%__MODULE__{} = data, params) do %{ application: application, task_schema: task_schema @@ -622,9 +502,33 @@ defmodule WorkflowMetal.Task.Task do params ) - {:ok, %{data | task_schema: task_schema}} + new_data = %{data | task_schema: task_schema, state: task_schema.state} + + log_state_change(new_data, data.state) + + new_data end + defp log_state_change(data, old_state) do + case {old_state, data.state} do + {:started, :allocated} -> + Logger.debug(fn -> "#{describe(data)} allocate a workitem." end) + + {:allocated, :executing} -> + Logger.debug(fn -> "#{describe(data)} start executing." end) + + {:executing, :completed} -> + Logger.debug(fn -> "#{describe(data)} complete the execution." end) + + {from, :abandoned} -> + Logger.debug(fn -> "#{describe(data)} has been abandoned from #{from}." end) + + _other -> + :ok + end + end + + @spec allocate_workitem(t()) :: t() defp allocate_workitem(%__MODULE__{} = data) do %{ application: application, @@ -650,7 +554,7 @@ defmodule WorkflowMetal.Task.Task do {:ok, workitem_schema} = WorkflowMetal.Storage.insert_workitem(application, workitem_schema) - {:ok, data} = upsert_ets_workitem(workitem_schema, data) + data = upsert_ets_workitem(workitem_schema, data) {:ok, _} = WorkflowMetal.Workitem.Supervisor.open_workitem( @@ -658,9 +562,10 @@ defmodule WorkflowMetal.Task.Task do workitem_schema.id ) - {:ok, data} + data end + @spec do_lock_tokens(t()) :: {:ok, [token_schema()], t()} | {:error, atom()} defp do_lock_tokens(%__MODULE__{} = data) do with( {:ok, token_ids} <- JoinController.preexecute(data), @@ -676,19 +581,13 @@ defmodule WorkflowMetal.Task.Task do task_id ) ) do - {:ok, data} = - Enum.reduce( - locked_token_schemas, - {:ok, data}, - fn token_schema, {:ok, data} -> - upsert_ets_token(token_schema, data) - end - ) + data = Enum.reduce(locked_token_schemas, data, &upsert_ets_token/2) {:ok, locked_token_schemas, data} end end + @spec task_completion(t()) :: :ok | {:error, :task_not_completed} defp task_completion(%__MODULE__{} = data) do data |> Map.fetch!(:workitem_table) @@ -704,6 +603,7 @@ defmodule WorkflowMetal.Task.Task do end end + @spec do_consume_tokens(t()) :: {consumed_tokens :: [token_schema()], t()} defp do_consume_tokens(%__MODULE__{} = data) do %{ application: application, @@ -720,14 +620,12 @@ defmodule WorkflowMetal.Task.Task do task_id ) - {:ok, data} = - Enum.reduce(tokens, {:ok, data}, fn token, {:ok, data} -> - upsert_ets_token(token, data) - end) + data = Enum.reduce(tokens, data, &upsert_ets_token/2) - {:ok, tokens, data} + {tokens, data} end + @spec do_complete_task(t()) :: t() defp do_complete_task(%__MODULE__{} = data) do %{ application: application, @@ -747,7 +645,7 @@ defmodule WorkflowMetal.Task.Task do token_schema_list ) - update_task(%{state: :completed, token_payload: token_payload}, data) + update_task(data, %{state: :completed, token_payload: token_payload}) end defp build_token_payload(%__MODULE__{} = data) do @@ -791,6 +689,7 @@ defmodule WorkflowMetal.Task.Task do end end + @spec tokens_abandonment(t()) :: {:ok, t()} | {:error, :task_not_abandoned} defp tokens_abandonment(%__MODULE__{} = data) do %{ token_table: token_table @@ -804,6 +703,7 @@ defmodule WorkflowMetal.Task.Task do end end + @spec unlock_tokens(t()) :: t() defp unlock_tokens(%__MODULE__{} = data) do %{ application: application, @@ -821,9 +721,10 @@ defmodule WorkflowMetal.Task.Task do true = :ets.delete_all_objects(token_table) - {:ok, data} + data end + @spec do_abandon_workitems(t()) :: t() defp do_abandon_workitems(%__MODULE__{} = data) do %{ application: application, @@ -835,15 +736,16 @@ defmodule WorkflowMetal.Task.Task do |> Enum.each(fn {workitem_id, state} when state in [:created, :started] -> :ok = WorkflowMetal.Workitem.Supervisor.abandon_workitem(application, workitem_id) - {:ok, _data} = upsert_ets_workitem({workitem_id, :abandoned}, data) + _data = upsert_ets_workitem({workitem_id, :abandoned}, data) _ -> :skip end) - {:ok, data} + data end + @spec upsert_ets_token(token_schema(), t()) :: t() defp upsert_ets_token(%Schema.Token{} = token_schema, %__MODULE__{} = data) do %{token_table: token_table} = data @@ -854,28 +756,28 @@ defmodule WorkflowMetal.Task.Task do token_schema.state }) - {:ok, data} + data end + @spec remove_ets_token(token_schema(), t()) :: t() defp remove_ets_token(%Schema.Token{} = token_schema, %__MODULE__{} = data) do %{token_table: token_table} = data true = :ets.delete(token_table, token_schema.id) - {:ok, data} + data end + @spec upsert_ets_workitem(workitem_schema(), t()) :: t() defp upsert_ets_workitem(%Schema.Workitem{} = workitem, %__MODULE__{} = data) do - %{workitem_table: workitem_table} = data - true = :ets.insert(workitem_table, {workitem.id, workitem.state}) - - {:ok, data} + upsert_ets_workitem({workitem.id, workitem.state}, data) end + @spec upsert_ets_workitem({workitem_id, workitem_state}, t()) :: t() defp upsert_ets_workitem({workitem_id, workitem_state}, %__MODULE__{} = data) do %{workitem_table: workitem_table} = data true = :ets.insert(workitem_table, {workitem_id, workitem_state}) - {:ok, data} + data end defp describe(%__MODULE__{} = data) do