From 0887b5c8d6c7f4d636f7f271fe6a5a9d6063a4a0 Mon Sep 17 00:00:00 2001 From: Phil Chen <06fahchen@gmail.com> Date: Fri, 16 Jun 2023 21:54:46 +0800 Subject: [PATCH] feat: add timeout to case, task and workitem --- lib/workflow_metal/application.ex | 17 ++++++++++- lib/workflow_metal/case/case.ex | 28 +++++++++++++----- lib/workflow_metal/task/task.ex | 38 +++++++++++++++++-------- lib/workflow_metal/workitem/workitem.ex | 21 ++++++++++++-- 4 files changed, 81 insertions(+), 23 deletions(-) diff --git a/lib/workflow_metal/application.ex b/lib/workflow_metal/application.ex index d01a29d..21e9719 100644 --- a/lib/workflow_metal/application.ex +++ b/lib/workflow_metal/application.ex @@ -1,5 +1,20 @@ defmodule WorkflowMetal.Application do - @moduledoc false + @moduledoc """ + A workflow_metal application is a collection of workflows, cases, tasks and workitems. + + In `config/config.exs` set default_lifespan_timeout for any `Case`, `Task` and `Workitem`: + + config :my_app, MyApp.Application, + case: [ + lifespan_timeout: 30000 + ], + task: [ + lifespan_timeout: 60000 + ], + workitem: [ + lifespan_timeout: 300000 + ] + """ alias WorkflowMetal.Application.Config diff --git a/lib/workflow_metal/case/case.ex b/lib/workflow_metal/case/case.ex index 5aecfc4..a8c9f7d 100644 --- a/lib/workflow_metal/case/case.ex +++ b/lib/workflow_metal/case/case.ex @@ -233,7 +233,7 @@ defmodule WorkflowMetal.Case.Case do {:noreply, data, {:continue, :stop_server}} _error -> - {:noreply, data} + {:noreply, data, get_timeout(data.application)} end end @@ -241,7 +241,7 @@ defmodule WorkflowMetal.Case.Case do def handle_continue({:revoke_tokens, locked_token_schemas, except_task_id}, %__MODULE__{state: :active} = data) do data = do_revoke_tokens(locked_token_schemas, except_task_id, data) - {:noreply, data} + {:noreply, data, get_timeout(data.application)} end @impl GenServer @@ -257,10 +257,10 @@ defmodule WorkflowMetal.Case.Case do "#{describe(data)}: tokens(#{Enum.join(token_ids, ", ")}) have been locked by the task(#{task_id})" end) - {:reply, {:ok, locked_token_schemas}, data} + {:reply, {:ok, locked_token_schemas}, data, get_timeout(data.application)} {:error, _reason} = error -> - {:reply, error, data} + {:reply, error, data, get_timeout(data.application)} end end @@ -290,19 +290,19 @@ defmodule WorkflowMetal.Case.Case do def handle_call({:offer_tokens_to_task, task_id}, _from, %{state: :active} = data) do {tokens, data} = do_offer_tokens_to_task(task_id, data) - {:reply, {:ok, tokens}, data} + {:reply, {:ok, tokens}, data, get_timeout(data.application)} end @impl GenServer def handle_call({:fetch_locked_tokens_from_task, task_id}, _from, %{state: :active} = data) do tokens = do_fetch_locked_tokens_from_task(task_id, data) - {:reply, {:ok, tokens}, data} + {:reply, {:ok, tokens}, data, get_timeout(data.application)} end @impl GenServer def handle_call(_msg, _from, %{} = data) do - {:reply, {:error, :case_not_available}, data} + {:reply, {:error, :case_not_available}, data, get_timeout(data.application)} end @impl GenServer @@ -332,6 +332,13 @@ defmodule WorkflowMetal.Case.Case do {:noreply, data, {:continue, :stop_server}} end + @impl GenServer + def handle_info(:timeout, %__MODULE__{} = data) do + Logger.debug(describe(data) <> " stopping due to inactivity timeout") + + {:stop, :normal, data} + end + @spec fetch_unconsumed_tokens(t()) :: t() defp fetch_unconsumed_tokens(%__MODULE__{} = data) do %{ @@ -829,6 +836,13 @@ defmodule WorkflowMetal.Case.Case do {:ok, data} end + defp get_timeout(application) do + application + |> WorkflowMetal.Application.Config.get(:case) + |> Kernel.||([]) + |> Keyword.get(:lifespan_timeout, :timer.minutes(5)) + end + defp describe(%__MODULE__{} = data) do %{ case_schema: %Schema.Case{ diff --git a/lib/workflow_metal/task/task.ex b/lib/workflow_metal/task/task.ex index 9456d09..6c10402 100644 --- a/lib/workflow_metal/task/task.ex +++ b/lib/workflow_metal/task/task.ex @@ -198,7 +198,7 @@ defmodule WorkflowMetal.Task.Task do |> request_tokens() |> start_created_workitems() - {:noreply, data} + {:noreply, data, get_timeout(data.application)} end def handle_continue(:after_start, %{state: :executing} = data) do @@ -221,10 +221,10 @@ defmodule WorkflowMetal.Task.Task do |> allocate_workitem() |> update_task(%{state: :allocated}) - {:noreply, data} + {:noreply, data, get_timeout(data.application)} _other -> - {:noreply, data} + {:noreply, data, get_timeout(data.application)} end end @@ -240,7 +240,7 @@ defmodule WorkflowMetal.Task.Task do {:noreply, data, {:continue, :stop_server}} _error -> - {:noreply, data} + {:noreply, data, get_timeout(data.application)} end end @@ -254,7 +254,7 @@ defmodule WorkflowMetal.Task.Task do {:noreply, data, {:continue, :stop_server}} _error -> - {:noreply, data} + {:noreply, data, get_timeout(data.application)} end end @@ -271,7 +271,7 @@ defmodule WorkflowMetal.Task.Task do {:noreply, data, {:continue, :stop_server}} _ -> - {:noreply, data} + {:noreply, data, get_timeout(data.application)} end end @@ -295,7 +295,7 @@ defmodule WorkflowMetal.Task.Task do def handle_cast({:receive_tokens, token_schemas}, %{state: :allocated} = data) do data = Enum.reduce(token_schemas, data, &upsert_ets_token/2) - {:noreply, data} + {:noreply, data, get_timeout(data.application)} end @impl GenServer @@ -325,7 +325,7 @@ defmodule WorkflowMetal.Task.Task do when state in [:allocated, :executing] do data = upsert_ets_workitem({workitem_id, :started}, data) - {:noreply, data} + {:noreply, data, get_timeout(data.application)} end @impl GenServer @@ -345,7 +345,7 @@ defmodule WorkflowMetal.Task.Task do @impl GenServer def handle_cast(_msg, %{} = data) do - {:noreply, data} + {:noreply, data, get_timeout(data.application)} end @impl GenServer @@ -357,7 +357,7 @@ defmodule WorkflowMetal.Task.Task do {:reply, {:ok, locked_token_schemas}, data} error -> - {:reply, error, data} + {:reply, error, data, get_timeout(data.application)} end end @@ -378,12 +378,19 @@ defmodule WorkflowMetal.Task.Task do acc end) - {:reply, {:ok, tokens}, data} + {:reply, {:ok, tokens}, data, get_timeout(data.application)} end @impl GenServer def handle_call(_msg, _from, %{} = data) do - {:reply, {:error, :task_not_available}, data} + {:reply, {:error, :task_not_available}, data, get_timeout(data.application)} + end + + @impl GenServer + def handle_info(:timeout, %__MODULE__{} = data) do + Logger.debug(describe(data) <> " stopping due to inactivity timeout") + + {:stop, :normal, data} end @spec fetch_workitems(t()) :: t() @@ -775,6 +782,13 @@ defmodule WorkflowMetal.Task.Task do data end + defp get_timeout(application) do + application + |> WorkflowMetal.Application.Config.get(:task) + |> Kernel.||([]) + |> Keyword.get(:lifespan_timeout, :timer.minutes(1)) + end + defp describe(%__MODULE__{} = data) do %{ task_schema: %Schema.Task{ diff --git a/lib/workflow_metal/workitem/workitem.ex b/lib/workflow_metal/workitem/workitem.ex index 84744ed..490f7cf 100644 --- a/lib/workflow_metal/workitem/workitem.ex +++ b/lib/workflow_metal/workitem/workitem.ex @@ -151,7 +151,8 @@ defmodule WorkflowMetal.Workitem.Workitem do application: application, state: :started, workitem_schema: workitem_schema - } + }, + get_timeout(application) } state when state in [:completed, :abandoned] -> @@ -167,7 +168,7 @@ defmodule WorkflowMetal.Workitem.Workitem do {:ok, :started, data} -> data = update_workitem(%{state: :started}, data) - {:noreply, data} + {:noreply, data, get_timeout(data.application)} {:ok, {:completed, workitem_output}, data} -> {:noreply, %{data | state: :started}, {:continue, {:complete, workitem_output}}} @@ -175,7 +176,7 @@ defmodule WorkflowMetal.Workitem.Workitem do {:ok, :abandoned, data} -> data = update_workitem(%{state: :abandoned}, data) - {:noreply, data} + {:noreply, data, get_timeout(data.application)} end end @@ -215,6 +216,13 @@ defmodule WorkflowMetal.Workitem.Workitem do {:noreply, data, {:continue, :stop_server}} end + @impl GenServer + def handle_info(:timeout, %__MODULE__{} = data) do + Logger.debug(describe(data) <> " stopping due to inactivity timeout") + + {:stop, :normal, data} + end + @spec update_workitem(map(), t()) :: t() defp update_workitem(params, %__MODULE__{} = data) do %{ @@ -330,6 +338,13 @@ defmodule WorkflowMetal.Workitem.Workitem do {:ok, data} end + defp get_timeout(application) do + application + |> WorkflowMetal.Application.Config.get(:case) + |> Kernel.||([]) + |> Keyword.get(:lifespan_timeout, :timer.seconds(30)) + end + defp describe(%__MODULE__{} = data) do %{ workitem_schema: %Schema.Workitem{