Skip to content

Commit

Permalink
Merge pull request #519 from podlove/draft/event_consumer
Browse files Browse the repository at this point in the history
Draft/event consumer
  • Loading branch information
sorax committed Mar 19, 2024
2 parents 433f834 + 4d3c22c commit e148580
Show file tree
Hide file tree
Showing 8 changed files with 132 additions and 38 deletions.
4 changes: 3 additions & 1 deletion lib/radiator/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ defmodule Radiator.Application do

use Application

alias Radiator.Outline.EventConsumer
alias Radiator.Outline.EventProducer

@impl true
Expand All @@ -20,7 +21,8 @@ defmodule Radiator.Application do
# {Radiator.Worker, arg},
# Start to serve requests, typically the last entry
RadiatorWeb.Endpoint,
{EventProducer, name: EventProducer}
{EventProducer, name: EventProducer},
{EventConsumer, name: EventConsumer}
]

# See https://hexdocs.pm/elixir/Supervisor.html
Expand Down
19 changes: 19 additions & 0 deletions lib/radiator/outline/dispatch.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
defmodule Radiator.Outline.Dispatch do
@moduledoc false

alias Radiator.Outline.Event
alias Radiator.Outline.EventProducer

def insert_node(attributes, user_id, event_id) do
"insert_node"
|> Event.build(attributes, user_id, event_id)
|> EventProducer.enqueue()
end

# TODO
# update_node
# delete_node
# move_node

# list_node different case, sync call
end
7 changes: 4 additions & 3 deletions lib/radiator/outline/event.ex
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
defmodule Radiator.Outline.Event do
@moduledoc false

def build(event_id, event_type, user_id, payload) do
%{
alias Radiator.Outline.Event.InsertNodeEvent

def build("insert_node", payload, user_id, event_id) do
%InsertNodeEvent{
event_id: event_id,
event_type: event_type,
user_id: user_id,
payload: payload
}
Expand Down
5 changes: 5 additions & 0 deletions lib/radiator/outline/event/insert_node_event.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
defmodule Radiator.Outline.Event.InsertNodeEvent do
@moduledoc false

defstruct [:event_id, :user_id, :payload]
end
48 changes: 48 additions & 0 deletions lib/radiator/outline/event_consumer.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
defmodule Radiator.Outline.EventConsumer do
@moduledoc false

use GenStage

alias Radiator.Outline
alias Radiator.Outline.Event.InsertNodeEvent
alias Radiator.Outline.EventProducer

def start_link(opts \\ []) do
GenStage.start_link(__MODULE__, opts, name: __MODULE__)
end

def init(opts \\ [max_demand: 1]) do
{:consumer, :event_producer, subscribe_to: [{EventProducer, opts}]}
end

def handle_events([event], _from, state) do
process_event(event)

{:noreply, [], state}
end

defp process_event(%InsertNodeEvent{payload: payload} = _event) do
payload
|> Outline.create_node()
|> handle_insert_result()

# validate
# true->
# database action: insert node()
# create && persist event (event contains all attributes, user, event_id, timestamps)
# broadcast event (topic: episode_id)
# broadcast node (topic: episode_id)
# false->
# log error and return error (audit log)
end

defp handle_insert_result({:ok, node}) do
{:ok, node}
end

defp handle_insert_result({:error, _error}) do
# log_error_please :-)

:error
end
end
22 changes: 19 additions & 3 deletions lib/radiator/outline/event_producer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,31 @@ defmodule Radiator.Outline.EventProducer do
end

def init(_opts) do
{:producer, []}
{:producer, {:queue.new(), 0}}
end

def enqueue(event) do
GenStage.cast(__MODULE__, {:enqueue, event})
:ok
end

def handle_cast({:enqueue, event}, state) do
{:noreply, [event], state}
def handle_cast({:enqueue, event}, {queue, 0}) do
queue = :queue.in(event, queue)
{:noreply, [], {queue, 0}}
end

def handle_cast({:enqueue, event}, {queue, demand}) do
queue = :queue.in(event, queue)
{{:value, event}, queue} = :queue.out(queue)
{:noreply, [event], {queue, demand - 1}}
end

def handle_demand(_incoming, {queue, demand}) do
with {item, queue} <- :queue.out(queue),
{:value, event} <- item do
{:noreply, [event], {queue, demand}}
else
_ -> {:noreply, [], {queue, demand + 1}}
end
end
end
31 changes: 0 additions & 31 deletions lib/radiator/outline/server.ex

This file was deleted.

34 changes: 34 additions & 0 deletions test/radiator/outline/dispatch_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
defmodule Radiator.Outline.DispatchTest do
use Radiator.DataCase

import Radiator.AccountsFixtures
import Radiator.PodcastFixtures

# alias Radiator.Outline
alias Radiator.Outline.Dispatch
alias Radiator.Outline.Node

describe "outline dispatch" do
setup do
%{episode: episode_fixture()}
end

test "insert_node does WHAT?", %{episode: episode} do
user = user_fixture()

node = %Node{episode_id: episode.id, content: "something very special 1!1"}
attributes = Map.from_struct(node)

event_id = Ecto.UUID.generate()

Dispatch.insert_node(attributes, user.id, event_id)

# _inserted_node =
# Outline.list_nodes()
# |> Enum.find(&(&1.content == "something very special 1!1"))

# assert inserted_node.episode_id == node.episode_id
# assert inserted_node.content == node.content
end
end
end

0 comments on commit e148580

Please sign in to comment.