Skip to content

Commit

Permalink
Better composition with the topology abstraction
Browse files Browse the repository at this point in the history
  • Loading branch information
antonmi committed Jan 12, 2024
1 parent ae34a6a commit fb67609
Show file tree
Hide file tree
Showing 19 changed files with 5,406 additions and 5,050 deletions.
5 changes: 4 additions & 1 deletion TODO.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
- restart strategy
- renamer can be just a simple module
- source and sink behaviours
- composite component
- rename from to topology
- Different words for flow as data-structure and Flow module (maybe Topology with the call function)
- tick source with timeout
- :__all__ for calling components on all streams in flow
12 changes: 7 additions & 5 deletions lib/flow.ex
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,8 @@ defmodule Strom.Flow do
transformer = %Strom.Transformer{opts: opts, flow_pid: flow_pid, sup_pid: sup_pid}
%{transform | transformer: Strom.Transformer.start(transformer)}

%DSL.Rename{names: names} = ren ->
rename = Strom.Renamer.start(names)
%{ren | rename: rename}
%DSL.Rename{} = ren ->
%{ren | rename: Strom.Renamer.start()}
end
end)
end
Expand Down Expand Up @@ -133,8 +132,8 @@ defmodule Strom.Flow do
Strom.Transformer.call(flow, transformer, inputs, {function, acc})
end

%DSL.Rename{rename: rename, names: names} ->
Strom.Renamer.call(flow, rename, names)
%DSL.Rename{names: names} ->
Strom.Renamer.call(flow, names)
end
end)

Expand All @@ -159,6 +158,9 @@ defmodule Strom.Flow do

%DSL.Transform{transformer: transformer} ->
Strom.Transformer.stop(transformer)

%DSL.Rename{rename: rename} ->
Strom.Renamer.stop(rename)
end
end)

Expand Down
8 changes: 8 additions & 0 deletions lib/mixer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,14 @@ defmodule Strom.Mixer do

defstruct [:opts, :flow_pid, :sup_pid]

def new(inputs, output, opts \\ []) do
unless is_list(inputs) do
raise "Mixer sources must be a list, given: #{inspect(inputs)}"
end

%Strom.DSL.Mix{inputs: inputs, output: output, opts: opts}
end

def start(args \\ [])

def start(%__MODULE__{opts: opts, flow_pid: flow_pid, sup_pid: sup_pid}) do
Expand Down
27 changes: 5 additions & 22 deletions lib/renamer.ex
Original file line number Diff line number Diff line change
@@ -1,34 +1,17 @@
defmodule Strom.Renamer do
use GenServer
defstruct []

defstruct names: nil, pid: nil

def start(names) do
state = %__MODULE__{names: names}

{:ok, pid} = GenServer.start_link(__MODULE__, state)
__state__(pid)
def start() do
%__MODULE__{}
end

@impl true
def init(%__MODULE__{} = state), do: {:ok, %{state | pid: self()}}

def call(flow, %__MODULE__{}, names) when is_map(names) do
def call(flow, names) when is_map(names) do
Enum.reduce(names, flow, fn {name, new_name}, acc ->
acc
|> Map.put(new_name, Map.fetch!(acc, name))
|> Map.delete(name)
end)
end

def stop(%__MODULE__{pid: pid}), do: GenServer.call(pid, :stop)

def __state__(pid) when is_pid(pid), do: GenServer.call(pid, :__state__)

@impl true
def handle_call(:stop, _from, state) do
{:stop, :normal, :ok, state}
end

def handle_call(:__state__, _from, state), do: {:reply, state, state}
def stop(%__MODULE__{}), do: :ok
end
8 changes: 8 additions & 0 deletions lib/sink.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,14 @@ defmodule Strom.Sink do

defstruct [:origin, :pid, :flow_pid, :sup_pid]

def new(names, origin, sync \\ false) do
unless is_struct(origin) do
raise "Sink origin must be a struct, given: #{inspect(origin)}"
end

%Strom.DSL.Sink{origin: origin, names: names, sync: sync}
end

def start(%__MODULE__{origin: origin} = sink) when is_struct(origin) do
origin = apply(origin.__struct__, :start, [origin])
sink = %{sink | origin: origin}
Expand Down
14 changes: 14 additions & 0 deletions lib/sink/null.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
defmodule Strom.Sink.Null do
@behaviour Strom.Sink

defstruct []

@impl true
def start(%__MODULE__{} = null), do: null

@impl true
def call(%__MODULE__{} = null, _data), do: {:ok, {[], null}}

@impl true
def stop(%__MODULE__{} = null), do: null
end
8 changes: 8 additions & 0 deletions lib/source.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,14 @@ defmodule Strom.Source do

defstruct [:origin, :pid, :flow_pid, :sup_pid]

def new(names, origin) do
unless is_struct(origin) or is_list(origin) do
raise "Source origin must be a struct or just simple list, given: #{inspect(origin)}"
end

%Strom.DSL.Source{origin: origin, names: names}
end

def start(%__MODULE__{origin: list} = source) when is_list(list) do
start(%{source | origin: %Strom.Source.Events{events: list}})
end
Expand Down
12 changes: 12 additions & 0 deletions lib/splitter.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,18 @@ defmodule Strom.Splitter do

defstruct [:opts, :flow_pid, :sup_pid]

def new(input, partitions, opts \\ []) do
unless is_map(partitions) and map_size(partitions) > 0 do
raise "Branches in splitter must be a map, given: #{inspect(partitions)}"
end

%Strom.DSL.Split{
input: input,
partitions: partitions,
opts: opts
}
end

def start(args \\ [])

def start(%__MODULE__{opts: opts, flow_pid: flow_pid, sup_pid: sup_pid}) do
Expand Down
183 changes: 183 additions & 0 deletions lib/supervised_topology.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
# defmodule Strom.SupervisedTopology do
# defstruct pid: nil,
# name: nil,
# module: nil,
# sup_pid: nil,
# opts: [],
# topology: []
#
# use GenServer
# alias Strom.DSL
# alias Strom.FlowSupervisor
#
# @type t :: %__MODULE__{}
#
# def start(flow_module, opts \\ []) when is_atom(flow_module) do
# strom_sup_pid = Process.whereis(Strom.DynamicSupervisor)
#
# pid =
# case DynamicSupervisor.start_child(
# strom_sup_pid,
# %{
# id: __MODULE__,
# start:
# {__MODULE__, :start_link,
# [%__MODULE__{name: flow_module, module: flow_module, opts: opts}]},
# restart: :transient
# }
# ) do
# {:ok, pid} ->
# pid
#
# {:error, {:already_started, pid}} ->
# pid
# end
#
# __state__(pid)
# end
#
# def start_link(%__MODULE__{} = flow) do
# GenServer.start_link(__MODULE__, flow, name: flow.name)
# end
#
# @impl true
# def init(%__MODULE__{module: module} = flow) do
# sup_pid = start_flow_supervisor(flow.name)
#
# topology =
# flow.opts
# |> module.topology()
# |> List.flatten()
# |> build(self(), sup_pid)
#
# {:ok, %{flow | pid: self(), sup_pid: sup_pid, topology: topology}}
# end
#
# defp start_flow_supervisor(name) do
# sup_pid =
# case FlowSupervisor.start_link(%{name: :"#{name}_Supervisor"}) do
# {:ok, pid} -> pid
# {:error, {:already_started, pid}} -> pid
# end
#
# Process.unlink(sup_pid)
# Process.monitor(sup_pid)
# sup_pid
# end
#
# defp build(components, flow_pid, sup_pid) do
# components
# |> Enum.map(fn component ->
# case component do
# %DSL.Source{origin: origin} = source ->
# src = %Strom.Source{origin: origin, flow_pid: flow_pid, sup_pid: sup_pid}
# %{source | source: Strom.Source.start(src)}
#
# %DSL.Sink{origin: origin} = sink ->
# snk = %Strom.Sink{origin: origin, flow_pid: flow_pid, sup_pid: sup_pid}
# %{sink | sink: Strom.Sink.start(snk)}
#
# %DSL.Mix{opts: opts} = mix ->
# mixer = %Strom.Mixer{opts: opts, flow_pid: flow_pid, sup_pid: sup_pid}
# %{mix | mixer: Strom.Mixer.start(mixer)}
#
# %DSL.Split{opts: opts} = split ->
# splitter = %Strom.Splitter{opts: opts, flow_pid: flow_pid, sup_pid: sup_pid}
# %{split | splitter: Strom.Splitter.start(splitter)}
#
# %DSL.Transform{opts: opts} = transform when is_list(opts) ->
# transformer = %Strom.Transformer{opts: opts, flow_pid: flow_pid, sup_pid: sup_pid}
# %{transform | transformer: Strom.Transformer.start(transformer)}
#
# %DSL.Rename{names: names} = ren ->
# rename = Strom.Renamer.start(names)
# %{ren | rename: rename}
# end
# end)
# end
#
# def info(flow_module), do: GenServer.call(flow_module, :info)
#
# def call(flow_module, flow), do: GenServer.call(flow_module, {:call, flow}, :infinity)
#
# def stop(flow_module) when is_atom(flow_module), do: GenServer.call(flow_module, :stop)
#
# def __state__(pid) when is_pid(pid), do: GenServer.call(pid, :__state__)
#
# @impl true
# def handle_call(:info, _from, state), do: {:reply, state.topology, state}
#
# def handle_call(:__state__, _from, state), do: {:reply, state, state}
#
# def handle_call({:call, init_flow}, _from, %__MODULE__{} = state) do
# flow =
# state.topology
# |> Enum.reduce(init_flow, fn component, flow ->
# case component do
# %DSL.Source{source: source, names: names} ->
# Strom.Source.call(flow, source, names)
#
# %DSL.Sink{sink: sink, names: names, sync: sync} ->
# Strom.Sink.call(flow, sink, names, sync)
#
# %DSL.Mix{mixer: mixer, inputs: inputs, output: output} ->
# Strom.Mixer.call(flow, mixer, inputs, output)
#
# %DSL.Split{splitter: splitter, input: input, partitions: partitions} ->
# Strom.Splitter.call(flow, splitter, input, partitions)
#
# %DSL.Transform{transformer: transformer, function: function, acc: acc, inputs: inputs} ->
# if is_function(function, 1) do
# Strom.Transformer.call(flow, transformer, inputs, function)
# else
# Strom.Transformer.call(flow, transformer, inputs, {function, acc})
# end
#
# %DSL.Rename{rename: rename, names: names} ->
# Strom.Renamer.call(flow, rename, names)
# end
# end)
#
# {:reply, flow, state}
# end
#
# def handle_call(:stop, _from, %__MODULE__{} = flow) do
# flow.topology
# |> Enum.each(fn component ->
# case component do
# %DSL.Source{source: source} ->
# Strom.Source.stop(source)
#
# %DSL.Sink{sink: sink} ->
# Strom.Sink.stop(sink)
#
# %DSL.Mix{mixer: mixer} ->
# Strom.Mixer.stop(mixer)
#
# %DSL.Split{splitter: splitter} ->
# Strom.Splitter.stop(splitter)
#
# %DSL.Transform{transformer: transformer} ->
# Strom.Transformer.stop(transformer)
# end
# end)
#
# Supervisor.stop(flow.sup_pid)
# {:stop, :normal, :ok, flow}
# end
#
# @impl true
# def handle_info(:continue, flow) do
# {:noreply, flow}
# end
#
# def handle_info({_task_ref, :ok}, flow) do
# # do nothing for now
# {:noreply, flow}
# end
#
# def handle_info({:DOWN, _task_ref, :process, _task_pid, :normal}, flow) do
# # do nothing for now
# {:noreply, flow}
# end
# end
Loading

0 comments on commit fb67609

Please sign in to comment.