Skip to content

Commit

Permalink
Fix splitter logic
Browse files Browse the repository at this point in the history
  • Loading branch information
antonmi committed Dec 12, 2023
1 parent 34f750b commit 1019de4
Showing 1 changed file with 14 additions and 13 deletions.
27 changes: 14 additions & 13 deletions lib/splitter.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ defmodule Strom.Splitter do

def start(opts \\ []) when is_list(opts) do
state = %__MODULE__{
running: MapSet.new(),
running: false,
partitions: %{},
chunk_every: Keyword.get(opts, :chunk_every, @chunk_every)
}
Expand All @@ -33,12 +33,14 @@ defmodule Strom.Splitter do
GenServer.call(splitter.pid, {:set_partitions, partitions})
stream_to_run = Map.fetch!(flow, name)

:ok = GenServer.call(splitter.pid, {:run_stream, stream_to_run})

sub_flow =
partitions
|> Enum.reduce(%{}, fn {name, fun}, flow ->
stream =
Stream.resource(
fn -> GenServer.call(splitter.pid, {:run_stream, stream_to_run, {name, fun}}) end,
fn -> splitter end,
fn splitter ->
case GenServer.call(splitter.pid, {:get_data, {name, fun}}) do
{:ok, data} ->
Expand All @@ -63,7 +65,7 @@ defmodule Strom.Splitter do

def __state__(pid) when is_pid(pid), do: GenServer.call(pid, :__state__)

defp async_run_stream(stream, {name, fun}, chunk_every, pid) do
defp async_run_stream(stream, chunk_every, pid) do
Task.async(fn ->
stream
|> Stream.chunk_every(chunk_every)
Expand All @@ -73,14 +75,15 @@ defmodule Strom.Splitter do
end)
|> Stream.run()

GenServer.call(pid, {:done, {name, fun}})
GenServer.call(pid, :done)
end)
end

defp maybe_wait(data_size, chunk_every) do
if data_size > 10 * chunk_every do
div = div(data_size, 10 * chunk_every)
to_sleep = trunc(:math.pow(2, div))

Process.sleep(to_sleep)
end
end
Expand All @@ -104,10 +107,9 @@ defmodule Strom.Splitter do
{:reply, data_size, %{splitter | partitions: new_partitions}}
end

def handle_call({:run_stream, stream, {name, fun}}, _from, %__MODULE__{} = splitter) do
async_run_stream(stream, {name, fun}, splitter.chunk_every, splitter.pid)
splitter = %{splitter | running: MapSet.put(splitter.running, {name, fun})}
{:reply, splitter, splitter}
def handle_call({:run_stream, stream}, _from, %__MODULE__{} = splitter) do
async_run_stream(stream, splitter.chunk_every, splitter.pid)
{:reply, :ok, %{splitter | running: true}}
end

def handle_call({:set_partitions, partitions}, _from, %__MODULE__{} = splitter) do
Expand All @@ -118,9 +120,8 @@ defmodule Strom.Splitter do
{:reply, splitter, splitter}
end

def handle_call({:done, {name, fun}}, _from, %__MODULE__{} = splitter) do
running = MapSet.delete(splitter.running, {name, fun})
{:reply, :ok, %{splitter | running: running}}
def handle_call(:done, _from, %__MODULE__{} = splitter) do
{:reply, :ok, %{splitter | running: false}}
end

def handle_call(
Expand All @@ -130,15 +131,15 @@ defmodule Strom.Splitter do
) do
data = Map.get(partitions, partition_fun)

if length(data) == 0 && MapSet.size(running) == 0 do
if length(data) == 0 && !running do
{:reply, {:error, :done}, splitter}
else
{:reply, {:ok, data}, %{splitter | partitions: Map.put(partitions, partition_fun, [])}}
end
end

def handle_call(:stop, _from, %__MODULE__{} = splitter) do
{:stop, :normal, :ok, %{splitter | running: MapSet.new(), partitions: %{}}}
{:stop, :normal, :ok, %{splitter | running: false, partitions: %{}}}
end

def handle_call(:__state__, _from, splitter), do: {:reply, splitter, splitter}
Expand Down

0 comments on commit 1019de4

Please sign in to comment.