Skip to content

Commit

Permalink
Rename call to transformer in Transformer
Browse files Browse the repository at this point in the history
  • Loading branch information
antonmi committed Jan 10, 2024
1 parent b730d3e commit bfe6ec8
Showing 1 changed file with 33 additions and 33 deletions.
66 changes: 33 additions & 33 deletions lib/transformer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ defmodule Strom.Transformer do
{:ok, %{call | pid: self()}}
end

def call(flow, %__MODULE__{} = call, names, {function, acc})
def call(flow, %__MODULE__{} = transformer, names, {function, acc})
when is_map(flow) and is_function(function, 3) do
names = if is_list(names), do: names, else: [names]

Expand All @@ -54,7 +54,7 @@ defmodule Strom.Transformer do
Map.put(streams, {name, function, acc}, Map.fetch!(flow, name))
end)

:ok = GenServer.call(call.pid, {:run_inputs, input_streams})
:ok = GenServer.call(transformer.pid, {:run_inputs, input_streams})

sub_flow =
names
Expand All @@ -65,7 +65,7 @@ defmodule Strom.Transformer do
nil
end,
fn nil ->
case GenServer.call(call.pid, {:get_data, name}, :infinity) do
case GenServer.call(transformer.pid, {:get_data, name}, :infinity) do
{:ok, data} ->
if length(data) == 0 do
receive do
Expand All @@ -91,16 +91,16 @@ defmodule Strom.Transformer do
|> Map.merge(sub_flow)
end

def call(flow, %__MODULE__{} = call, names, {function, acc})
def call(flow, %__MODULE__{} = transformer, names, {function, acc})
when is_map(flow) and is_function(function, 2) do
fun = fn el, acc, nil -> function.(el, acc) end
call(flow, %__MODULE__{} = call, names, {fun, acc})
call(flow, transformer, names, {fun, acc})
end

def call(flow, %__MODULE__{} = call, names, function)
def call(flow, %__MODULE__{} = transformer, names, function)
when is_map(flow) and is_function(function, 1) do
fun = fn el, nil, nil -> {[function.(el)], nil} end
call(flow, %__MODULE__{} = call, names, {fun, nil})
call(flow, transformer, names, {fun, nil})
end

def stop(%__MODULE__{pid: pid, sup_pid: sup_pid}) do
Expand Down Expand Up @@ -155,57 +155,57 @@ defmodule Strom.Transformer do
end

@impl true
def handle_call({:run_inputs, streams_to_call}, _from, %__MODULE__{opts: opts} = call) do
tasks = run_inputs(streams_to_call, call.pid, call.buffer, opts)
def handle_call({:run_inputs, streams_to_call}, _from, %__MODULE__{opts: opts} = transformer) do
tasks = run_inputs(streams_to_call, transformer.pid, transformer.buffer, opts)

{:reply, :ok, %{call | running: true, tasks: tasks}}
{:reply, :ok, %{transformer | running: true, tasks: tasks}}
end

def handle_call({:get_data, name}, {pid, _ref}, call) do
def handle_call({:get_data, name}, {pid, _ref}, transformer) do
send(pid, :continue)

data = Map.get(call.data, name, [])
data = Map.get(transformer.data, name, [])

if length(data) == 0 and !call.running do
{:reply, {:error, :done}, call}
if length(data) == 0 and !transformer.running do
{:reply, {:error, :done}, transformer}
else
call = %{call | data: Map.put(call.data, name, [])}
{:reply, {:ok, data}, call}
transformer = %{transformer | data: Map.put(transformer.data, name, [])}
{:reply, {:ok, data}, transformer}
end
end

def handle_call(:stop, _from, %__MODULE__{} = call) do
{:stop, :normal, :ok, %{call | running: false}}
def handle_call(:stop, _from, %__MODULE__{} = transformer) do
{:stop, :normal, :ok, %{transformer | running: false}}
end

def handle_call(:__state__, _from, call), do: {:reply, call, call}
def handle_call(:__state__, _from, transformer), do: {:reply, transformer, transformer}

@impl true
def handle_cast({:new_data, name, chunk}, %__MODULE__{} = call) do
task = Map.fetch!(call.tasks, name)
def handle_cast({:new_data, name, chunk}, %__MODULE__{} = transformer) do
task = Map.fetch!(transformer.tasks, name)
send(task.pid, :continue)

prev_data = Map.get(call.data, name, [])
new_data = Map.put(call.data, name, prev_data ++ chunk)
call = %{call | data: new_data}
prev_data = Map.get(transformer.data, name, [])
new_data = Map.put(transformer.data, name, prev_data ++ chunk)
transformer = %{transformer | data: new_data}

{:noreply, call}
{:noreply, transformer}
end

def handle_cast({:done, name}, %__MODULE__{} = call) do
call = %{call | tasks: Map.delete(call.tasks, name)}
running = map_size(call.tasks) > 0
{:noreply, %{call | running: running}}
def handle_cast({:done, name}, %__MODULE__{} = transformer) do
transformer = %{transformer | tasks: Map.delete(transformer.tasks, name)}
running = map_size(transformer.tasks) > 0
{:noreply, %{transformer | running: running}}
end

@impl true
def handle_info({_task_ref, :ok}, call) do
def handle_info({_task_ref, :ok}, transformer) do
# do nothing for now
{:noreply, call}
{:noreply, transformer}
end

def handle_info({:DOWN, _task_ref, :process, _task_pid, :normal}, call) do
def handle_info({:DOWN, _task_ref, :process, _task_pid, :normal}, transformer) do
# do nothing for now
{:noreply, call}
{:noreply, transformer}
end
end

0 comments on commit bfe6ec8

Please sign in to comment.