Skip to content

Commit

Permalink
Add from and rename macros
Browse files Browse the repository at this point in the history
  • Loading branch information
antonmi committed Dec 14, 2023
1 parent 433d731 commit e441f8c
Show file tree
Hide file tree
Showing 8 changed files with 198 additions and 24 deletions.
4 changes: 2 additions & 2 deletions TODO.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
- Proper supervision tree!!!
- Module behaviour
- flow and rename in dsl
- tick source with timeout
- tick source with timeout
- remove chunk_by, introduce buffer
59 changes: 46 additions & 13 deletions lib/dsl.ex
Original file line number Diff line number Diff line change
@@ -1,12 +1,4 @@
defmodule Strom.DSL do
defmodule Module do
defstruct module: nil, opts: [], inputs: [], state: nil
end

defmodule Function do
defstruct function: nil, opts: [], inputs: []
end

defmodule Source do
defstruct source: nil, origin: nil, names: []
end
Expand All @@ -23,6 +15,18 @@ defmodule Strom.DSL do
defstruct splitter: nil, opts: [], input: nil, partitions: %{}
end

defmodule Function do
defstruct function: nil, opts: [], inputs: []
end

defmodule Module do
defstruct module: nil, opts: [], inputs: [], state: nil
end

defmodule Rename do
defstruct names: nil, rename: nil
end

defmacro source(names, origin) do
quote do
unless is_struct(unquote(origin)) or is_list(unquote(origin)) do
Expand Down Expand Up @@ -67,22 +71,46 @@ defmodule Strom.DSL do
end
end

defmacro module(inputs, module, opts \\ []) do
defmacro function(inputs, function, opts \\ []) do
quote do
%Strom.DSL.Module{module: unquote(module), opts: unquote(opts), inputs: unquote(inputs)}
%Strom.DSL.Function{
function: unquote(function),
opts: unquote(opts),
inputs: unquote(inputs)
}
end
end

defmacro function(inputs, function, opts \\ []) do
defmacro module(inputs, module, opts \\ []) do
quote do
%Strom.DSL.Function{
function: unquote(function),
%Strom.DSL.Module{
module: unquote(module),
opts: unquote(opts),
inputs: unquote(inputs)
}
end
end

defmacro from(module, opts \\ []) do
quote do
unless is_atom(unquote(module)) do
raise "Flow must be a module, given: #{inspect(unquote(module))}"
end

apply(unquote(module), :topology, [unquote(opts)])
end
end

defmacro rename(names) do
quote do
unless is_map(unquote(names)) do
raise "Names must be a map, given: #{inspect(unquote(names))}"
end

%Strom.DSL.Rename{names: unquote(names)}
end
end

defmacro __using__(_opts) do
quote do
import Strom.DSL
Expand All @@ -101,6 +129,11 @@ defmodule Strom.DSL do
def stop do
Strom.Flow.stop(__MODULE__)
end

@spec info() :: list()
def info do
Strom.Flow.info(__MODULE__)
end
end
end
end
14 changes: 13 additions & 1 deletion lib/flow.ex
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,12 @@ defmodule Strom.Flow do

@impl true
def init(%__MODULE__{module: module} = state) do
topology = build(module.topology(state.opts))
topology =
state.opts
|> module.topology()
|> List.flatten()
|> build()

{:ok, %{state | pid: self(), topology: topology}}
end

Expand All @@ -47,6 +52,10 @@ defmodule Strom.Flow do
%DSL.Module{module: module, opts: opts} = mod ->
module = Strom.Module.start(module, opts)
%{mod | module: module}

%DSL.Rename{names: names} = ren ->
rename = Strom.Rename.start(names)
%{ren | rename: rename}
end
end)
end
Expand Down Expand Up @@ -86,6 +95,9 @@ defmodule Strom.Flow do

%DSL.Module{module: module, inputs: inputs} ->
Strom.Module.call(flow, module, inputs)

%DSL.Rename{rename: rename, names: names} ->
Strom.Rename.call(flow, rename, names)
end
end)

Expand Down
8 changes: 5 additions & 3 deletions lib/mixer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ defmodule Strom.Mixer do
to_sleep = trunc(:math.pow(2, no_data_counter))
Process.sleep(to_sleep)
end

{data, mixer}

{:error, :done} ->
Expand Down Expand Up @@ -126,10 +127,11 @@ defmodule Strom.Mixer do
no_data_counter = if length(all_data) == 0, do: mixer.no_data_counter + 1, else: 0

mixer = %{
mixer |
data: data,
no_data_counter: no_data_counter
mixer
| data: data,
no_data_counter: no_data_counter
}

{:reply, {:ok, {all_data, no_data_counter}}, mixer}
end
end
Expand Down
34 changes: 34 additions & 0 deletions lib/rename.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
defmodule Strom.Rename do
use GenServer

defstruct names: nil, pid: nil

def start(names) do
state = %__MODULE__{names: names}

{:ok, pid} = GenServer.start_link(__MODULE__, state)
__state__(pid)
end

@impl true
def init(%__MODULE__{} = state), do: {:ok, %{state | pid: self()}}

def call(flow, %__MODULE__{}, names) when is_map(names) do
Enum.reduce(names, flow, fn {name, new_name}, acc ->
acc
|> Map.put(new_name, Map.fetch!(acc, name))
|> Map.delete(name)
end)
end

def stop(%__MODULE__{pid: pid}), do: GenServer.call(pid, :stop)

def __state__(pid) when is_pid(pid), do: GenServer.call(pid, :__state__)

@impl true
def handle_call(:stop, _from, state) do
{:stop, :normal, :ok, state}
end

def handle_call(:__state__, _from, state), do: {:reply, state, state}
end
9 changes: 5 additions & 4 deletions lib/splitter.ex
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ defmodule Strom.Splitter do
chunk_every: 100,
no_data_counter: 0


def start(opts \\ []) when is_list(opts) do
state = %__MODULE__{
chunk_every: Keyword.get(opts, :chunk_every, @chunk_every)
Expand Down Expand Up @@ -52,6 +51,7 @@ defmodule Strom.Splitter do
to_sleep = trunc(:math.pow(2, no_data_counter))
Process.sleep(to_sleep)
end

{data, splitter}

{:error, :done} ->
Expand Down Expand Up @@ -143,10 +143,11 @@ defmodule Strom.Splitter do
{:reply, {:error, :done}, splitter}
else
no_data_counter = if length(data) == 0, do: splitter.no_data_counter + 1, else: 0

splitter = %{
splitter |
partitions: Map.put(partitions, partition_fun, []),
no_data_counter: no_data_counter
splitter
| partitions: Map.put(partitions, partition_fun, []),
no_data_counter: no_data_counter
}

{:reply, {:ok, {data, no_data_counter}}, splitter}
Expand Down
56 changes: 55 additions & 1 deletion test/dsl_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -95,9 +95,63 @@ defmodule Strom.DSLTest do
defmodule Flow1 do
use Strom.DSL

def topology(mixed_name) do
[
source(:s1, [1, 2, 3]),
source(:s2, [10, 20, 30]),
source(:s3, [100, 200, 300]),
mixer([:s1, :s2], mixed_name)
]
end
end

defmodule Flow2 do
use Strom.DSL

def add_one(el), do: el + 1
def to_string(el), do: "#{el}"

def topology(_) do
source(:s1, [1, 2, 3])
[
function(:stream1, &__MODULE__.add_one/1),
function(:stream2, &__MODULE__.add_one/1)
]
end
end

defmodule Flow3 do
use Strom.DSL

def add_one(el), do: el + 1
def to_string(el), do: "#{el}"

def topology(name) do
[
mixer([:str1, :str2], name)
]
end
end

defmodule ComposedFlow do
use Strom.DSL

def topology(name) do
[
from(Flow1, :s12),
rename(%{s12: :stream1, s3: :stream2}),
from(Flow2),
rename(%{stream1: :str1, stream2: :str2}),
from(Flow3, name)
]
end
end

test "ComposedFlow" do
ComposedFlow.start(:mixed)
%{mixed: stream} = ComposedFlow.call(%{})
results = Enum.to_list(stream)
assert length(results) == 9
assert Enum.sort(results) == [2, 3, 4, 11, 21, 31, 101, 201, 301]
end
end
end
38 changes: 38 additions & 0 deletions test/rename_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
defmodule Strom.RenameTest do
use ExUnit.Case, async: true

alias Strom.Rename

test "start and stop" do
rename = Rename.start(%{s1: :s2})
assert Process.alive?(rename.pid)
:ok = Rename.stop(rename)
refute Process.alive?(rename.pid)
end

test "rename" do
names = %{s1: :foo1, s2: :foo2}
rename = Rename.start(names)

flow = %{s1: [1], s2: [2], s3: [3]}

new_flow = Rename.call(flow, rename, names)

refute new_flow[:s1]
refute new_flow[:s2]

assert Enum.to_list(new_flow[:foo1]) == [1]
assert Enum.to_list(new_flow[:foo2]) == [2]
assert Enum.to_list(new_flow[:s3]) == [3]
end

test "raise when there is no such name" do
names = %{s2: :foo2}
rename = Rename.start(names)
flow = %{s1: [1]}

assert_raise KeyError, fn ->
Rename.call(flow, rename, names)
end
end
end

0 comments on commit e441f8c

Please sign in to comment.