Skip to content

Commit

Permalink
Merge pull request #2 from antonmi/supervisors
Browse files Browse the repository at this point in the history
Add FlowSupervisor
  • Loading branch information
antonmi authored Jan 10, 2024
2 parents fc8cd00 + 9e4898b commit 3fb7430
Show file tree
Hide file tree
Showing 22 changed files with 5,360 additions and 264 deletions.
5 changes: 1 addition & 4 deletions TODO.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,2 @@
- Proper supervision tree!!!
- Module behaviour
- restart strategy
- tick source with timeout
- remove chunk_by, introduce buffer
- explicit "map" and "reduce" with function/1,2, and "transform" as module
22 changes: 1 addition & 21 deletions lib/dsl.ex
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ defmodule Strom.DSL do
end
end

defmacro transform(inputs, function, acc, opts) do
defmacro transform(inputs, function, acc \\ nil, opts \\ []) do
quote do
%Strom.DSL.Transform{
function: unquote(function),
Expand All @@ -78,26 +78,6 @@ defmodule Strom.DSL do
end
end

defmacro transform(inputs, function, acc) do
quote do
%Strom.DSL.Transform{
function: unquote(function),
acc: unquote(acc),
inputs: unquote(inputs)
}
end
end

defmacro transform(inputs, function) do
quote do
%Strom.DSL.Transform{
function: unquote(function),
acc: nil,
inputs: unquote(inputs)
}
end
end

defmacro from(module, opts \\ []) do
quote do
unless is_atom(unquote(module)) do
Expand Down
78 changes: 58 additions & 20 deletions lib/flow.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,55 +2,92 @@ defmodule Strom.Flow do
defstruct pid: nil,
name: nil,
module: nil,
sup_pid: nil,
opts: [],
topology: []

use GenServer
alias Strom.DSL
alias Strom.FlowSupervisor

@type t :: %__MODULE__{}

# TODO Supervisor
def start(flow_module, opts \\ []) when is_atom(flow_module) do
state = %__MODULE__{name: flow_module, module: flow_module, opts: opts}

{:ok, pid} = GenServer.start_link(__MODULE__, state, name: flow_module)
strom_sup_pid = Process.whereis(Strom.DynamicSupervisor)

pid =
case DynamicSupervisor.start_child(
strom_sup_pid,
%{
id: __MODULE__,
start:
{__MODULE__, :start_link,
[%__MODULE__{name: flow_module, module: flow_module, opts: opts}]},
restart: :transient
}
) do
{:ok, pid} ->
pid

{:error, {:already_started, pid}} ->
pid
end

__state__(pid)
end

def start_link(%__MODULE__{} = flow) do
GenServer.start_link(__MODULE__, flow, name: flow.name)
end

@impl true
def init(%__MODULE__{module: module} = state) do
def init(%__MODULE__{module: module} = flow) do
sup_pid = start_flow_supervisor(flow.name)

topology =
state.opts
flow.opts
|> module.topology()
|> List.flatten()
|> build()
|> build(self(), sup_pid)

{:ok, %{state | pid: self(), topology: topology}}
{:ok, %{flow | pid: self(), sup_pid: sup_pid, topology: topology}}
end

defp build(components) do
defp start_flow_supervisor(name) do
sup_pid =
case FlowSupervisor.start_link(%{name: :"#{name}_Supervisor"}) do
{:ok, pid} -> pid
{:error, {:already_started, pid}} -> pid
end

Process.unlink(sup_pid)
Process.monitor(sup_pid)
sup_pid
end

defp build(components, flow_pid, sup_pid) do
components
|> Enum.map(fn component ->
case component do
%DSL.Source{origin: origin} = source ->
%{source | source: Strom.Source.start(origin)}
src = %Strom.Source{origin: origin, flow_pid: flow_pid, sup_pid: sup_pid}
%{source | source: Strom.Source.start(src)}

%DSL.Sink{origin: origin} = sink ->
%{sink | sink: Strom.Sink.start(origin)}
snk = %Strom.Sink{origin: origin, flow_pid: flow_pid, sup_pid: sup_pid}
%{sink | sink: Strom.Sink.start(snk)}

%DSL.Mix{opts: opts} = mix ->
%{mix | mixer: Strom.Mixer.start(opts)}
mixer = %Strom.Mixer{opts: opts, flow_pid: flow_pid, sup_pid: sup_pid}
%{mix | mixer: Strom.Mixer.start(mixer)}

%DSL.Split{opts: opts} = split ->
%{split | splitter: Strom.Splitter.start(opts)}

%DSL.Transform{opts: nil} = transform ->
%{transform | transformer: Strom.Transformer.start()}
splitter = %Strom.Splitter{opts: opts, flow_pid: flow_pid, sup_pid: sup_pid}
%{split | splitter: Strom.Splitter.start(splitter)}

%DSL.Transform{opts: opts} = transform when is_list(opts) ->
%{transform | transformer: Strom.Transformer.start(opts)}
transformer = %Strom.Transformer{opts: opts, flow_pid: flow_pid, sup_pid: sup_pid}
%{transform | transformer: Strom.Transformer.start(transformer)}

%DSL.Rename{names: names} = ren ->
rename = Strom.Renamer.start(names)
Expand Down Expand Up @@ -104,8 +141,8 @@ defmodule Strom.Flow do
{:reply, flow, state}
end

def handle_call(:stop, _from, %__MODULE__{} = state) do
state.topology
def handle_call(:stop, _from, %__MODULE__{} = flow) do
flow.topology
|> Enum.each(fn component ->
case component do
%DSL.Source{source: source} ->
Expand All @@ -125,7 +162,8 @@ defmodule Strom.Flow do
end
end)

{:stop, :normal, :ok, state}
Supervisor.stop(flow.sup_pid)
{:stop, :normal, :ok, flow}
end

@impl true
Expand Down
12 changes: 12 additions & 0 deletions lib/flow_supervsor.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
defmodule Strom.FlowSupervisor do
use DynamicSupervisor

def start_link(state) do
DynamicSupervisor.start_link(__MODULE__, state, name: state[:name])
end

@impl true
def init(_state) do
DynamicSupervisor.init(strategy: :one_for_one, max_restarts: 3, max_seconds: 5)
end
end
30 changes: 27 additions & 3 deletions lib/gen_mix.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,29 @@ defmodule Strom.GenMix do
@buffer 1000

defstruct pid: nil,
opts: [],
flow_pid: nil,
sup_pid: nil,
running: false,
buffer: @buffer,
producers: %{},
consumers: %{}

alias Strom.GenMix.Consumer

# TODO supervisor
def start(opts \\ []) when is_list(opts) do
def start(opts \\ [])

def start(%__MODULE__{opts: opts} = gen_mix) when is_list(opts) do
gen_mix = %{
gen_mix
| buffer: Keyword.get(opts, :buffer, @buffer)
}

{:ok, pid} = DynamicSupervisor.start_child(gen_mix.sup_pid, {__MODULE__, gen_mix})
__state__(pid)
end

def start(opts) when is_list(opts) do
state = %__MODULE__{
buffer: Keyword.get(opts, :buffer, @buffer)
}
Expand All @@ -21,6 +35,10 @@ defmodule Strom.GenMix do
__state__(pid)
end

def start_link(%__MODULE__{} = state) do
GenServer.start_link(__MODULE__, state)
end

@impl true
def init(%__MODULE__{} = mix) do
{:ok, %{mix | pid: self()}}
Expand Down Expand Up @@ -49,7 +67,13 @@ defmodule Strom.GenMix do
|> Map.merge(sub_flow)
end

def stop(%__MODULE__{pid: pid}), do: GenServer.call(pid, :stop)
def stop(%__MODULE__{pid: pid, sup_pid: sup_pid}) do
if sup_pid do
:ok
else
GenServer.call(pid, :stop)
end
end

def __state__(pid) when is_pid(pid), do: GenServer.call(pid, :__state__)

Expand Down
2 changes: 1 addition & 1 deletion lib/gen_mix/consumer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ defmodule Strom.GenMix.Consumer do
GenServer.call(cons.pid, :register_client)
end,
fn cons ->
case GenServer.call(cons.pid, :get_data) do
case GenServer.call(cons.pid, :get_data, :infinity) do
{:ok, data} ->
if length(data) == 0 do
receive do
Expand Down
11 changes: 10 additions & 1 deletion lib/mixer.ex
Original file line number Diff line number Diff line change
@@ -1,7 +1,16 @@
defmodule Strom.Mixer do
alias Strom.GenMix

def start(opts \\ []) when is_list(opts) do
defstruct [:opts, :flow_pid, :sup_pid]

def start(args \\ [])

def start(%__MODULE__{opts: opts, flow_pid: flow_pid, sup_pid: sup_pid}) do
gen_mix = %GenMix{opts: opts, flow_pid: flow_pid, sup_pid: sup_pid}
GenMix.start(gen_mix)
end

def start(opts) when is_list(opts) do
GenMix.start(opts)
end

Expand Down
23 changes: 21 additions & 2 deletions lib/sink.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,14 @@ defmodule Strom.Sink do

use GenServer

defstruct [:origin, :pid]
defstruct [:origin, :pid, :flow_pid, :sup_pid]

def start(%__MODULE__{origin: origin} = sink) when is_struct(origin) do
origin = apply(origin.__struct__, :start, [origin])
sink = %{sink | origin: origin}
{:ok, pid} = DynamicSupervisor.start_child(sink.sup_pid, {__MODULE__, sink})
__state__(pid)
end

def start(origin) when is_struct(origin) do
origin = apply(origin.__struct__, :start, [origin])
Expand All @@ -15,12 +22,24 @@ defmodule Strom.Sink do
__state__(pid)
end

def start_link(%__MODULE__{} = state) do
GenServer.start_link(__MODULE__, state)
end

@impl true
def init(%__MODULE__{} = state), do: {:ok, %{state | pid: self()}}

def call(%__MODULE__{pid: pid}, data), do: GenServer.call(pid, {:call, data})

def stop(%__MODULE__{pid: pid}), do: GenServer.call(pid, :stop)
def stop(%__MODULE__{origin: origin, pid: pid, sup_pid: sup_pid}) do
apply(origin.__struct__, :stop, [origin])

if sup_pid do
:ok
else
GenServer.call(pid, :stop)
end
end

def call(flow, sink, names, sync \\ false)

Expand Down
2 changes: 1 addition & 1 deletion lib/sink/io_puts.ex
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
defmodule Strom.Sink.IOPuts do
@behaviour Strom.Sink

defstruct line_sep: "\n", prefix: ""
defstruct line_sep: "", prefix: ""

@impl true
def start(%__MODULE__{} = io_puts), do: io_puts
Expand Down
27 changes: 25 additions & 2 deletions lib/source.ex
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,18 @@ defmodule Strom.Source do

use GenServer

defstruct [:origin, :pid]
defstruct [:origin, :pid, :flow_pid, :sup_pid]

def start(%__MODULE__{origin: list} = source) when is_list(list) do
start(%{source | origin: %Strom.Source.Events{events: list}})
end

def start(%__MODULE__{origin: origin} = source) when is_struct(origin) do
origin = apply(origin.__struct__, :start, [origin])
source = %{source | origin: origin}
{:ok, pid} = DynamicSupervisor.start_child(source.sup_pid, {__MODULE__, source})
__state__(pid)
end

def start(list) when is_list(list) do
start(%Strom.Source.Events{events: list})
Expand All @@ -20,14 +31,26 @@ defmodule Strom.Source do
__state__(pid)
end

def start_link(%__MODULE__{} = state) do
GenServer.start_link(__MODULE__, state)
end

@impl true
def init(%__MODULE__{} = state), do: {:ok, %{state | pid: self()}}

def call(%__MODULE__{pid: pid}), do: GenServer.call(pid, :call, :infinity)

def infinite?(%__MODULE__{pid: pid}), do: GenServer.call(pid, :infinite)

def stop(%__MODULE__{pid: pid}), do: GenServer.call(pid, :stop)
def stop(%__MODULE__{origin: origin, pid: pid, sup_pid: sup_pid}) do
apply(origin.__struct__, :stop, [origin])

if sup_pid do
:ok
else
GenServer.call(pid, :stop)
end
end

def call(flow, %__MODULE__{} = source, names) when is_map(flow) and is_list(names) do
sub_flow =
Expand Down
11 changes: 10 additions & 1 deletion lib/splitter.ex
Original file line number Diff line number Diff line change
@@ -1,7 +1,16 @@
defmodule Strom.Splitter do
alias Strom.GenMix

def start(opts \\ []) when is_list(opts) do
defstruct [:opts, :flow_pid, :sup_pid]

def start(args \\ [])

def start(%__MODULE__{opts: opts, flow_pid: flow_pid, sup_pid: sup_pid}) do
gen_mix = %GenMix{opts: opts, flow_pid: flow_pid, sup_pid: sup_pid}
GenMix.start(gen_mix)
end

def start(opts) when is_list(opts) do
GenMix.start(opts)
end

Expand Down
Loading

0 comments on commit 3fb7430

Please sign in to comment.