Skip to content

Commit

Permalink
Sleep 1ms in mixer when no data
Browse files Browse the repository at this point in the history
  • Loading branch information
antonmi committed Dec 12, 2023
1 parent ba7e2e1 commit 34f750b
Show file tree
Hide file tree
Showing 4 changed files with 85 additions and 4 deletions.
3 changes: 2 additions & 1 deletion TODO.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,5 @@
- test chunk_every behaviour in mixer and splitter (chunk_every: 1)
- event referencing and component storage for restarting
- smart mixer and smart splitter examples
- flow and cree in dsl
- flow and rename in dsl
- tick source with timeout
2 changes: 2 additions & 0 deletions lib/mixer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ defmodule Strom.Mixer do
fn mixer ->
case GenServer.call(mixer.pid, :get_data) do
{:ok, data} ->
# sleep a bit
if length(data) == 0, do: Process.sleep(1)
{data, mixer}

{:error, :done} ->
Expand Down
6 changes: 3 additions & 3 deletions lib/splitter.ex
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,9 @@ defmodule Strom.Splitter do
end)
end

defp maybe_wait(data_length, chunk_every) do
if data_length > 10 * chunk_every do
div = div(data_length, 10 * chunk_every)
defp maybe_wait(data_size, chunk_every) do
if data_size > 10 * chunk_every do
div = div(data_size, 10 * chunk_every)
to_sleep = trunc(:math.pow(2, div))
Process.sleep(to_sleep)
end
Expand Down
78 changes: 78 additions & 0 deletions test/examples/words_count_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
defmodule Strom.Examples.WordsCountTest do
use ExUnit.Case

defmodule WordsFlow do
use Strom.DSL

alias Strom.Source.{ReadLines, Events}

defmodule DoCount do
def start(_opts), do: %{}

def call(:done, acc, _), do: {[acc], %{}}

def call(string, acc, _) do
acc =
string
|> String.downcase()
|> String.split(~r/[\W]/)
|> Enum.reduce(acc, fn word, acc ->
prev = Map.get(acc, word, 0)
Map.put(acc, word, prev + 1)
end)

{[], acc}
end

def stop(_acc, _opts), do: :ok
end

defmodule SumAll do
def start(_opts), do: %{}

def call(:done, acc, _), do: {[acc], %{}}

def call(sums, acc, _) do
acc =
sums
|> Enum.reduce(acc, fn {word, count}, acc ->
prev = Map.get(acc, word, 0)
Map.put(acc, word, prev + count)
end)

{[], acc}
end

def stop(_acc, _opts), do: :ok
end

def topology({file_name, count}) do
all_names = Enum.map(1..count, &:"lines-#{&1}")

dones =
Enum.map(all_names, fn name ->
source(name, %Events{events: [:done]})
end)

[
source(all_names, %ReadLines{path: file_name})
] ++
dones ++
[
module(all_names, DoCount),
mixer(all_names, :mixed),
source(:mixed, %Events{events: [:done]}),
module(:mixed, SumAll)
]
end
end

test "count" do
WordsFlow.start({"test/data/orders.csv", 10})

%{mixed: counts} = WordsFlow.call(%{})
[counts] = Enum.to_list(counts)
assert counts["00"] == 214
assert counts["order_created"] == 107
end
end

0 comments on commit 34f750b

Please sign in to comment.