From 36395cedce00844dcd5a262a8b662cc745a699f0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Martin=20W=C3=B6ginger?= Date: Thu, 14 Mar 2024 22:14:31 +0100 Subject: [PATCH 1/3] WIP: event consumer --- lib/radiator/application.ex | 4 ++- lib/radiator/outline/event_consumer.ex | 43 ++++++++++++++++++++++++++ lib/radiator/outline/event_producer.ex | 6 ++++ 3 files changed, 52 insertions(+), 1 deletion(-) create mode 100644 lib/radiator/outline/event_consumer.ex diff --git a/lib/radiator/application.ex b/lib/radiator/application.ex index b5e5bde0..7920efbb 100644 --- a/lib/radiator/application.ex +++ b/lib/radiator/application.ex @@ -5,6 +5,7 @@ defmodule Radiator.Application do use Application + alias Radiator.Outline.EventConsumer alias Radiator.Outline.EventProducer @impl true @@ -20,7 +21,8 @@ defmodule Radiator.Application do # {Radiator.Worker, arg}, # Start to serve requests, typically the last entry RadiatorWeb.Endpoint, - {EventProducer, name: EventProducer} + {EventProducer, name: EventProducer}, + {EventConsumer, name: EventConsumer} ] # See https://hexdocs.pm/elixir/Supervisor.html diff --git a/lib/radiator/outline/event_consumer.ex b/lib/radiator/outline/event_consumer.ex new file mode 100644 index 00000000..8676870e --- /dev/null +++ b/lib/radiator/outline/event_consumer.ex @@ -0,0 +1,43 @@ +defmodule Radiator.Outline.EventConsumer do + use GenStage + alias Radiator.Outline.EventProducer + + def start_link(opts \\ []) do + GenStage.start_link(__MODULE__, opts, name: __MODULE__) + end + + def init(_opts) do + options = [] + {:consumer, :event_producer, subscribe_to: [{EventProducer, options}]} + end + + def handle_events(events, _from, state) do + IO.inspect(events, label: "EventConsumer handle_events") + + Enum.each(events, fn event -> + process_event(event, state) + IO.inspect(event, label: "EventConsumer handle_events event") + end) + + {:noreply, [], state} + end + + defp process_event(%InsertNodeEvent{} = event) do + # validate + # true-> + # database action: insert node() + # create && persist event (event contains all attributes, user, event_id, timestamps) + # broadcast event (topic: episode_id) + # false-> + # log error and return error (audit log) + end + + defp handle_result(:ok, event) do + persist_event(event) + broadcast_success(event) + end + + defp handle_result(:error, event) do + broadcast_error(event) + end +end diff --git a/lib/radiator/outline/event_producer.ex b/lib/radiator/outline/event_producer.ex index 54021c59..e21cfd32 100644 --- a/lib/radiator/outline/event_producer.ex +++ b/lib/radiator/outline/event_producer.ex @@ -15,6 +15,12 @@ defmodule Radiator.Outline.EventProducer do end def handle_cast({:enqueue, event}, state) do + IO.inspect(state, label: "EventProducer. handle_cast") {:noreply, [event], state} end + + def handle_demand(demand, state) do + IO.inspect(demand, label: "EventConsumer") + {:noreply, [], state} + end end From 2d1afca9e81a023be348c006851efc6e686a6e9a Mon Sep 17 00:00:00 2001 From: sorax Date: Tue, 19 Mar 2024 22:17:23 +0100 Subject: [PATCH 2/3] first draft insert_node_events --- lib/radiator/outline/dispatch.ex | 19 ++++++++++ lib/radiator/outline/event.ex | 7 ++-- .../outline/event/insert_node_event.ex | 3 ++ lib/radiator/outline/event_consumer.ex | 37 ++++++++++--------- lib/radiator/outline/event_producer.ex | 24 ++++++++---- lib/radiator/outline/server.ex | 31 ---------------- test/radiator/outline/dispatch_test.exs | 34 +++++++++++++++++ 7 files changed, 97 insertions(+), 58 deletions(-) create mode 100644 lib/radiator/outline/dispatch.ex create mode 100644 lib/radiator/outline/event/insert_node_event.ex delete mode 100644 lib/radiator/outline/server.ex create mode 100644 test/radiator/outline/dispatch_test.exs diff --git a/lib/radiator/outline/dispatch.ex b/lib/radiator/outline/dispatch.ex new file mode 100644 index 00000000..b6dc0404 --- /dev/null +++ b/lib/radiator/outline/dispatch.ex @@ -0,0 +1,19 @@ +defmodule Radiator.Outline.Dispatch do + @moduledoc false + + alias Radiator.Outline.Event + alias Radiator.Outline.EventProducer + + def insert_node(attributes, user_id, event_id) do + "insert_node" + |> Event.build(attributes, user_id, event_id) + |> EventProducer.enqueue() + end + + # TODO + # update_node + # delete_node + # move_node + + # list_node different case, sync call +end diff --git a/lib/radiator/outline/event.ex b/lib/radiator/outline/event.ex index c9a3fb79..9201896a 100644 --- a/lib/radiator/outline/event.ex +++ b/lib/radiator/outline/event.ex @@ -1,10 +1,11 @@ defmodule Radiator.Outline.Event do @moduledoc false - def build(event_id, event_type, user_id, payload) do - %{ + alias Radiator.Outline.Event.InsertNodeEvent + + def build("insert_node", payload, user_id, event_id) do + %InsertNodeEvent{ event_id: event_id, - event_type: event_type, user_id: user_id, payload: payload } diff --git a/lib/radiator/outline/event/insert_node_event.ex b/lib/radiator/outline/event/insert_node_event.ex new file mode 100644 index 00000000..7519f4fe --- /dev/null +++ b/lib/radiator/outline/event/insert_node_event.ex @@ -0,0 +1,3 @@ +defmodule Radiator.Outline.Event.InsertNodeEvent do + defstruct [:event_id, :user_id, :payload] +end diff --git a/lib/radiator/outline/event_consumer.ex b/lib/radiator/outline/event_consumer.ex index 8676870e..4bbc921d 100644 --- a/lib/radiator/outline/event_consumer.ex +++ b/lib/radiator/outline/event_consumer.ex @@ -1,43 +1,46 @@ defmodule Radiator.Outline.EventConsumer do use GenStage + + alias Radiator.Outline alias Radiator.Outline.EventProducer + alias Radiator.Outline.Event.InsertNodeEvent def start_link(opts \\ []) do GenStage.start_link(__MODULE__, opts, name: __MODULE__) end - def init(_opts) do - options = [] - {:consumer, :event_producer, subscribe_to: [{EventProducer, options}]} + def init(opts \\ [max_demand: 1]) do + {:consumer, :event_producer, subscribe_to: [{EventProducer, opts}]} end - def handle_events(events, _from, state) do - IO.inspect(events, label: "EventConsumer handle_events") - - Enum.each(events, fn event -> - process_event(event, state) - IO.inspect(event, label: "EventConsumer handle_events event") - end) + def handle_events([event], _from, state) do + process_event(event) {:noreply, [], state} end - defp process_event(%InsertNodeEvent{} = event) do - # validate + defp process_event(%InsertNodeEvent{payload: payload} = _event) do + payload + |> Outline.create_node() + |> handle_insert_result() + + # validate # true-> # database action: insert node() # create && persist event (event contains all attributes, user, event_id, timestamps) # broadcast event (topic: episode_id) + # broadcast node (topic: episode_id) # false-> # log error and return error (audit log) end - defp handle_result(:ok, event) do - persist_event(event) - broadcast_success(event) + defp handle_insert_result({:ok, node}) do + {:ok, node} end - defp handle_result(:error, event) do - broadcast_error(event) + defp handle_insert_result({:error, _error}) do + # log_error_please :-) + + :error end end diff --git a/lib/radiator/outline/event_producer.ex b/lib/radiator/outline/event_producer.ex index 6c6cfc2c..b322c8ca 100644 --- a/lib/radiator/outline/event_producer.ex +++ b/lib/radiator/outline/event_producer.ex @@ -8,7 +8,7 @@ defmodule Radiator.Outline.EventProducer do end def init(_opts) do - {:producer, []} + {:producer, {:queue.new(), 0}} end def enqueue(event) do @@ -16,13 +16,23 @@ defmodule Radiator.Outline.EventProducer do :ok end - def handle_cast({:enqueue, event}, state) do - IO.inspect(state, label: "EventProducer. handle_cast") - {:noreply, [event], state} + def handle_cast({:enqueue, event}, {queue, 0}) do + queue = :queue.in(event, queue) + {:noreply, [], {queue, 0}} end - def handle_demand(demand, state) do - IO.inspect(demand, label: "EventConsumer") - {:noreply, [], state} + def handle_cast({:enqueue, event}, {queue, demand}) do + queue = :queue.in(event, queue) + {{:value, event}, queue} = :queue.out(queue) + {:noreply, [event], {queue, demand - 1}} + end + + def handle_demand(_incoming, {queue, demand}) do + with {item, queue} <- :queue.out(queue), + {:value, event} <- item do + {:noreply, [event], {queue, demand}} + else + _ -> {:noreply, [], {queue, demand + 1}} + end end end diff --git a/lib/radiator/outline/server.ex b/lib/radiator/outline/server.ex deleted file mode 100644 index 46df4477..00000000 --- a/lib/radiator/outline/server.ex +++ /dev/null @@ -1,31 +0,0 @@ -defmodule Radiator.Outline.Server do - @moduledoc false - - alias Radiator.Outline.Event - alias Radiator.Outline.EventProducer - - def insert_node(attributes, user_id, event_id) do - "insert_node" - |> Event.build(attributes, user_id, event_id) - |> EventProducer.enqueue() - - # generate event - # send to Eventserver - # validate - # true-> - # database action: insert node() - # create && persist event (event contains all attributes, user, event_id, timestamps) - # broadcast event (topic: episode_id) - # broadcast node (topic: episode_id) - # false-> - # log error and return error (audit log) - :ok - end - - # TODO - # update_node - # delete_node - # move_node - - # list_node different case, sync call -end diff --git a/test/radiator/outline/dispatch_test.exs b/test/radiator/outline/dispatch_test.exs new file mode 100644 index 00000000..550a088c --- /dev/null +++ b/test/radiator/outline/dispatch_test.exs @@ -0,0 +1,34 @@ +defmodule Radiator.Outline.DispatchTest do + use Radiator.DataCase + + import Radiator.AccountsFixtures + import Radiator.PodcastFixtures + + # alias Radiator.Outline + alias Radiator.Outline.Dispatch + alias Radiator.Outline.Node + + describe "outline dispatch" do + setup do + %{episode: episode_fixture()} + end + + test "insert_node does WHAT?", %{episode: episode} do + user = user_fixture() + + node = %Node{episode_id: episode.id, content: "something very special 1!1"} + attributes = Map.from_struct(node) + + event_id = Ecto.UUID.generate() + + Dispatch.insert_node(attributes, user.id, event_id) + + # _inserted_node = + # Outline.list_nodes() + # |> Enum.find(&(&1.content == "something very special 1!1")) + + # assert inserted_node.episode_id == node.episode_id + # assert inserted_node.content == node.content + end + end +end From 4d3c22c13b45e395f7ed894489b740b754caf8ae Mon Sep 17 00:00:00 2001 From: sorax Date: Tue, 19 Mar 2024 22:21:10 +0100 Subject: [PATCH 3/3] add moduledoc false --- lib/radiator/outline/event/insert_node_event.ex | 2 ++ lib/radiator/outline/event_consumer.ex | 4 +++- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/lib/radiator/outline/event/insert_node_event.ex b/lib/radiator/outline/event/insert_node_event.ex index 7519f4fe..21f08394 100644 --- a/lib/radiator/outline/event/insert_node_event.ex +++ b/lib/radiator/outline/event/insert_node_event.ex @@ -1,3 +1,5 @@ defmodule Radiator.Outline.Event.InsertNodeEvent do + @moduledoc false + defstruct [:event_id, :user_id, :payload] end diff --git a/lib/radiator/outline/event_consumer.ex b/lib/radiator/outline/event_consumer.ex index 4bbc921d..4d7d71a7 100644 --- a/lib/radiator/outline/event_consumer.ex +++ b/lib/radiator/outline/event_consumer.ex @@ -1,9 +1,11 @@ defmodule Radiator.Outline.EventConsumer do + @moduledoc false + use GenStage alias Radiator.Outline - alias Radiator.Outline.EventProducer alias Radiator.Outline.Event.InsertNodeEvent + alias Radiator.Outline.EventProducer def start_link(opts \\ []) do GenStage.start_link(__MODULE__, opts, name: __MODULE__)