Skip to content

Commit

Permalink
feat: add timeout to case, task and workitem
Browse files Browse the repository at this point in the history
  • Loading branch information
fahchen committed Jun 16, 2023
1 parent 961b889 commit 656563f
Show file tree
Hide file tree
Showing 5 changed files with 109 additions and 23 deletions.
17 changes: 16 additions & 1 deletion lib/workflow_metal/application.ex
Original file line number Diff line number Diff line change
@@ -1,5 +1,20 @@
defmodule WorkflowMetal.Application do
@moduledoc false
@moduledoc """
A workflow_metal application is a collection of workflows, cases, tasks and workitems.
In `config/config.exs` set default_lifespan_timeout for any `Case`, `Task` and `Workitem`:
config :my_app, MyApp.Application,
case: [
lifespan_timeout: 30000
],
task: [
lifespan_timeout: 60000
],
workitem: [
lifespan_timeout: 300000
]
"""

alias WorkflowMetal.Application.Config

Expand Down
28 changes: 21 additions & 7 deletions lib/workflow_metal/case/case.ex
Original file line number Diff line number Diff line change
Expand Up @@ -233,15 +233,15 @@ defmodule WorkflowMetal.Case.Case do
{:noreply, data, {:continue, :stop_server}}

_error ->
{:noreply, data}
{:noreply, data, get_timeout(data.application)}
end
end

@impl GenServer
def handle_continue({:revoke_tokens, locked_token_schemas, except_task_id}, %__MODULE__{state: :active} = data) do
data = do_revoke_tokens(locked_token_schemas, except_task_id, data)

{:noreply, data}
{:noreply, data, get_timeout(data.application)}
end

@impl GenServer
Expand All @@ -257,10 +257,10 @@ defmodule WorkflowMetal.Case.Case do
"#{describe(data)}: tokens(#{Enum.join(token_ids, ", ")}) have been locked by the task(#{task_id})"
end)

{:reply, {:ok, locked_token_schemas}, data}
{:reply, {:ok, locked_token_schemas}, data, get_timeout(data.application)}

{:error, _reason} = error ->
{:reply, error, data}
{:reply, error, data, get_timeout(data.application)}
end
end

Expand Down Expand Up @@ -290,19 +290,19 @@ defmodule WorkflowMetal.Case.Case do
def handle_call({:offer_tokens_to_task, task_id}, _from, %{state: :active} = data) do
{tokens, data} = do_offer_tokens_to_task(task_id, data)

{:reply, {:ok, tokens}, data}
{:reply, {:ok, tokens}, data, get_timeout(data.application)}
end

@impl GenServer
def handle_call({:fetch_locked_tokens_from_task, task_id}, _from, %{state: :active} = data) do
tokens = do_fetch_locked_tokens_from_task(task_id, data)

{:reply, {:ok, tokens}, data}
{:reply, {:ok, tokens}, data, get_timeout(data.application)}
end

@impl GenServer
def handle_call(_msg, _from, %{} = data) do
{:reply, {:error, :case_not_available}, data}
{:reply, {:error, :case_not_available}, data, get_timeout(data.application)}
end

@impl GenServer
Expand Down Expand Up @@ -332,6 +332,13 @@ defmodule WorkflowMetal.Case.Case do
{:noreply, data, {:continue, :stop_server}}
end

@impl GenServer
def handle_info(:timeout, %__MODULE__{} = data) do
Logger.debug(describe(data) <> " stopping due to inactivity timeout")

{:stop, :normal, data}
end

@spec fetch_unconsumed_tokens(t()) :: t()
defp fetch_unconsumed_tokens(%__MODULE__{} = data) do
%{
Expand Down Expand Up @@ -829,6 +836,13 @@ defmodule WorkflowMetal.Case.Case do
{:ok, data}
end

defp get_timeout(application) do
application
|> WorkflowMetal.Application.Config.get(:case)
|> Kernel.||([])
|> Keyword.get(:lifespan_timeout, :timer.minutes(5))
end

defp describe(%__MODULE__{} = data) do
%{
case_schema: %Schema.Case{
Expand Down
38 changes: 26 additions & 12 deletions lib/workflow_metal/task/task.ex
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ defmodule WorkflowMetal.Task.Task do
|> request_tokens()
|> start_created_workitems()

{:noreply, data}
{:noreply, data, get_timeout(data.application)}
end

def handle_continue(:after_start, %{state: :executing} = data) do
Expand All @@ -221,10 +221,10 @@ defmodule WorkflowMetal.Task.Task do
|> allocate_workitem()
|> update_task(%{state: :allocated})

{:noreply, data}
{:noreply, data, get_timeout(data.application)}

_other ->
{:noreply, data}
{:noreply, data, get_timeout(data.application)}
end
end

Expand All @@ -240,7 +240,7 @@ defmodule WorkflowMetal.Task.Task do
{:noreply, data, {:continue, :stop_server}}

_error ->
{:noreply, data}
{:noreply, data, get_timeout(data.application)}
end
end

Expand All @@ -254,7 +254,7 @@ defmodule WorkflowMetal.Task.Task do
{:noreply, data, {:continue, :stop_server}}

_error ->
{:noreply, data}
{:noreply, data, get_timeout(data.application)}
end
end

Expand All @@ -271,7 +271,7 @@ defmodule WorkflowMetal.Task.Task do
{:noreply, data, {:continue, :stop_server}}

_ ->
{:noreply, data}
{:noreply, data, get_timeout(data.application)}
end
end

Expand All @@ -295,7 +295,7 @@ defmodule WorkflowMetal.Task.Task do
def handle_cast({:receive_tokens, token_schemas}, %{state: :allocated} = data) do
data = Enum.reduce(token_schemas, data, &upsert_ets_token/2)

{:noreply, data}
{:noreply, data, get_timeout(data.application)}
end

@impl GenServer
Expand Down Expand Up @@ -325,7 +325,7 @@ defmodule WorkflowMetal.Task.Task do
when state in [:allocated, :executing] do
data = upsert_ets_workitem({workitem_id, :started}, data)

{:noreply, data}
{:noreply, data, get_timeout(data.application)}
end

@impl GenServer
Expand All @@ -345,7 +345,7 @@ defmodule WorkflowMetal.Task.Task do

@impl GenServer
def handle_cast(_msg, %{} = data) do
{:noreply, data}
{:noreply, data, get_timeout(data.application)}
end

@impl GenServer
Expand All @@ -357,7 +357,7 @@ defmodule WorkflowMetal.Task.Task do
{:reply, {:ok, locked_token_schemas}, data}

error ->
{:reply, error, data}
{:reply, error, data, get_timeout(data.application)}
end
end

Expand All @@ -378,12 +378,19 @@ defmodule WorkflowMetal.Task.Task do
acc
end)

{:reply, {:ok, tokens}, data}
{:reply, {:ok, tokens}, data, get_timeout(data.application)}
end

@impl GenServer
def handle_call(_msg, _from, %{} = data) do
{:reply, {:error, :task_not_available}, data}
{:reply, {:error, :task_not_available}, data, get_timeout(data.application)}
end

@impl GenServer
def handle_info(:timeout, %__MODULE__{} = data) do
Logger.debug(describe(data) <> " stopping due to inactivity timeout")

{:stop, :normal, data}
end

@spec fetch_workitems(t()) :: t()
Expand Down Expand Up @@ -775,6 +782,13 @@ defmodule WorkflowMetal.Task.Task do
data
end

defp get_timeout(application) do
application
|> WorkflowMetal.Application.Config.get(:task)
|> Kernel.||([])
|> Keyword.get(:lifespan_timeout, :timer.minutes(1))
end

defp describe(%__MODULE__{} = data) do
%{
task_schema: %Schema.Task{
Expand Down
21 changes: 18 additions & 3 deletions lib/workflow_metal/workitem/workitem.ex
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,8 @@ defmodule WorkflowMetal.Workitem.Workitem do
application: application,
state: :started,
workitem_schema: workitem_schema
}
},
get_timeout(application)
}

state when state in [:completed, :abandoned] ->
Expand All @@ -167,15 +168,15 @@ defmodule WorkflowMetal.Workitem.Workitem do
{:ok, :started, data} ->
data = update_workitem(%{state: :started}, data)

{:noreply, data}
{:noreply, data, get_timeout(data.application)}

{:ok, {:completed, workitem_output}, data} ->
{:noreply, %{data | state: :started}, {:continue, {:complete, workitem_output}}}

{:ok, :abandoned, data} ->
data = update_workitem(%{state: :abandoned}, data)

{:noreply, data}
{:noreply, data, get_timeout(data.application)}
end
end

Expand Down Expand Up @@ -215,6 +216,13 @@ defmodule WorkflowMetal.Workitem.Workitem do
{:noreply, data, {:continue, :stop_server}}
end

@impl GenServer
def handle_info(:timeout, %__MODULE__{} = data) do
Logger.debug(describe(data) <> " stopping due to inactivity timeout")

{:stop, :normal, data}
end

@spec update_workitem(map(), t()) :: t()
defp update_workitem(params, %__MODULE__{} = data) do
%{
Expand Down Expand Up @@ -330,6 +338,13 @@ defmodule WorkflowMetal.Workitem.Workitem do
{:ok, data}
end

defp get_timeout(application) do
application
|> WorkflowMetal.Application.Config.get(:case)
|> Kernel.||([])
|> Keyword.get(:lifespan_timeout, :timer.seconds(30))
end

defp describe(%__MODULE__{} = data) do
%{
workitem_schema: %Schema.Workitem{
Expand Down
28 changes: 28 additions & 0 deletions test/workflow_metal/case/case_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,34 @@ defmodule WorkflowMetal.Case.CaseTest do
end
end

describe "terminate by timeout" do
defmodule DummyTimeoutApplication do
@moduledoc false

use WorkflowMetal.Application,
storage: WorkflowMetal.Storage.Adapters.InMemory,
case: [lifespan_timeout: 10]
end

setup do
start_supervised!(DummyTimeoutApplication)

:ok
end

test "works" do
{:ok, workflow_schema} = SequentialRouting.create(DummyTimeoutApplication)

{:ok, case_schema} = insert_case(DummyTimeoutApplication, workflow_schema)

assert {:ok, pid} = CaseSupervisor.open_case(DummyTimeoutApplication, case_schema.id)

ref = Process.monitor(pid)

assert_receive {:DOWN, ^ref, :process, ^pid, :normal}
end
end

describe "restore_from_storage" do
setup do
{:ok, workflow_schema} = SequentialRouting.create(DummyApplication)
Expand Down

0 comments on commit 656563f

Please sign in to comment.