Skip to content

Commit

Permalink
EventStore: persistence for events (#524)
Browse files Browse the repository at this point in the history
* add migration and ecto schema for event_data
* add protocol for events
* save user_id in events
* add missing commands
  • Loading branch information
electronicbites authored May 14, 2024
1 parent a19f84d commit 13aeaa1
Show file tree
Hide file tree
Showing 17 changed files with 309 additions and 48 deletions.
60 changes: 59 additions & 1 deletion lib/radiator/event_store.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,66 @@ defmodule Radiator.EventStore do
EventStore persists events
"""

alias Radiator.EventStore.EventData
alias Radiator.Outline.Event.AbstractEvent
alias Radiator.Repo

def persist_event(event) do
# persist event
{:ok, _stored_event} =
create_event_data(%{
data: AbstractEvent.payload(event),
event_type: AbstractEvent.event_type(event),
uuid: event.event_id,
user_id: event.user_id
})

event
end

@doc """
Returns the list of foo_events.
## Examples
iex> list_events()
[%Event{}, ...]
"""
def list_event_data do
Repo.all(EventData)
end

@doc """
Gets a single event data.
Raises `Ecto.NoResultsError` if the EventData does not exist.
## Examples
iex> get_event_data!(123)
%Event{}
iex> get_event_data!(456)
** (Ecto.NoResultsError)
"""
def get_event_data!(id), do: Repo.get!(EventData, id)

@doc """
Creates a event.
## Examples
iex> create_event_data(%{field: value})
{:ok, %EventData{}}
iex> create_event_data(%{field: bad_value})
{:error, %Ecto.Changeset{}}
"""
def create_event_data(attrs \\ %{}) do
%EventData{}
|> EventData.changeset(attrs)
|> Repo.insert()
end
end
25 changes: 25 additions & 0 deletions lib/radiator/event_store/event_data.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
defmodule Radiator.EventStore.EventData do
@moduledoc """
EventData schema represents a persistend event.
"""
use Ecto.Schema
import Ecto.Changeset

alias Radiator.Accounts.User

@primary_key {:uuid, :binary_id, autogenerate: false}
schema "event_data" do
field :data, :map, default: %{}
field :event_type, :string

belongs_to :user, User
timestamps(type: :utc_datetime)
end

@doc false
def changeset(event, attrs) do
event
|> cast(attrs, [:uuid, :event_type, :data, :user_id])
|> validate_required([:uuid, :event_type, :user_id])
end
end
2 changes: 1 addition & 1 deletion lib/radiator/outline.ex
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ defmodule Radiator.Outline do
{:error, %Ecto.Changeset{}}
"""
def remove_node(%Node{} = node, _socket_id \\ nil) do
def remove_node(%Node{} = node) do
next_node =
Node
|> where([n], n.prev_id == ^node.uuid)
Expand Down
25 changes: 24 additions & 1 deletion lib/radiator/outline/command.ex
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
defmodule Radiator.Outline.Command do
@moduledoc false

alias Radiator.Outline.Command.{ChangeNodeContentCommand, InsertNodeCommand}
alias Radiator.Outline.Command.{
ChangeNodeContentCommand,
DeleteNodeCommand,
InsertNodeCommand,
MoveNodeCommand
}

def build("insert_node", payload, user_id, event_id) do
%InsertNodeCommand{
Expand All @@ -11,6 +16,14 @@ defmodule Radiator.Outline.Command do
}
end

def build("delete_node", node_id, user_id, event_id) do
%DeleteNodeCommand{
event_id: event_id,
user_id: user_id,
node_id: node_id
}
end

def build("change_node_content", node_id, content, user_id, event_id) do
%ChangeNodeContentCommand{
event_id: event_id,
Expand All @@ -19,4 +32,14 @@ defmodule Radiator.Outline.Command do
content: content
}
end

def build("move_node", node_id, parent_node_id, prev_node_id, user_id, event_id) do
%MoveNodeCommand{
event_id: event_id,
user_id: user_id,
node_id: node_id,
parent_node_id: parent_node_id,
prev_node_id: prev_node_id
}
end
end
23 changes: 11 additions & 12 deletions lib/radiator/outline/dispatch.ex
Original file line number Diff line number Diff line change
Expand Up @@ -11,23 +11,22 @@ defmodule Radiator.Outline.Dispatch do
end

def change_node_content(node_id, content, user_id, event_id) do
# IO.inspect(node_id, label: "Dispatcher change_node_content")
"change_node_content"
|> Command.build(node_id, content, user_id, event_id)
|> EventProducer.enqueue()
end

# def move_node(attributes, user_id, event_id) do
# "move_node"
# |> Command.build(attributes, user_id, event_id)
# |> EventProducer.enqueue()
# end

# def delete_node(node_id, user_id, event_id) do
# "delete_node"
# |> Command.build(node_id, user_id, event_id)
# |> EventProducer.enqueue()
# end
def move_node(node_id, parent_node_id, prev_node_id, user_id, event_id) do
"move_node"
|> Command.build(node_id, parent_node_id, prev_node_id, user_id, event_id)
|> EventProducer.enqueue()
end

def delete_node(node_id, user_id, event_id) do
"delete_node"
|> Command.build(node_id, user_id, event_id)
|> EventProducer.enqueue()
end

def subscribe(_episode_id) do
Phoenix.PubSub.subscribe(Radiator.PubSub, "events")
Expand Down
38 changes: 38 additions & 0 deletions lib/radiator/outline/event/abstract_event.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
defprotocol Radiator.Outline.Event.AbstractEvent do
def payload(event)
def event_type(event)
end

alias Radiator.Outline.Event.{NodeContentChangedEvent, NodeInsertedEvent}

defimpl Radiator.Outline.Event.AbstractEvent, for: NodeInsertedEvent do
def payload(event) do
event.node
end

def event_type(_event), do: "NodeInsertedEvent"
end

defimpl Radiator.Outline.Event.AbstractEvent, for: NodeContentChangedEvent do
def payload(event) do
%{node_id: event.node_id, content: event.content}
end

def event_type(_event), do: "NodeInsertedEvent"
end

defimpl Radiator.Outline.Event.AbstractEvent, for: NodeDeletedEvent do
def payload(event) do
event.node_id
end

def event_type(_event), do: "NodeDeletedEvent"
end

defimpl Radiator.Outline.Event.AbstractEvent, for: NodeMovedEvent do
def payload(event) do
%{node_id: event.node_id, parent_id: event.parent_id, prev_id: event.prev_id}
end

def event_type(_event), do: "NodeInsertedEvent"
end
2 changes: 1 addition & 1 deletion lib/radiator/outline/event/node_content_changed_event.ex
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
defmodule Radiator.Outline.Event.NodeContentChangedEvent do
@moduledoc false

defstruct [:event_id, :node]
defstruct [:event_id, :node_id, :content, :user_id]
end
1 change: 1 addition & 0 deletions lib/radiator/outline/event/node_deleted_event.ex
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
defmodule Radiator.Outline.Event.NodeDeletedEvent do
@moduledoc false
defstruct [:event_id, :node_id, :user_id]
end
2 changes: 1 addition & 1 deletion lib/radiator/outline/event/node_inserted_event.ex
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
defmodule Radiator.Outline.Event.NodeInsertedEvent do
@moduledoc false

defstruct [:event_id, :node]
defstruct [:event_id, :node, :user_id]
end
1 change: 1 addition & 0 deletions lib/radiator/outline/event/node_moved_event.ex
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
defmodule Radiator.Outline.Event.NodeMovedEvent do
@moduledoc false
defstruct [:event_id, :node_id, :parent_id, :prev_id, :user_id]
end
14 changes: 10 additions & 4 deletions lib/radiator/outline/event_consumer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,9 @@ defmodule Radiator.Outline.EventConsumer do
{:noreply, [], state}
end

defp process_command(%InsertNodeCommand{payload: payload} = command) do
defp process_command(%InsertNodeCommand{payload: payload, user_id: user_id} = command) do
payload
|> Map.merge(%{"user_id" => user_id})
|> Outline.insert_node()
|> handle_insert_node_result(command)
end
Expand All @@ -37,7 +38,7 @@ defmodule Radiator.Outline.EventConsumer do
end

defp handle_insert_node_result({:ok, node}, command) do
%NodeInsertedEvent{node: node, event_id: command.event_id}
%NodeInsertedEvent{node: node, event_id: command.event_id, user_id: command.user_id}
|> EventStore.persist_event()
|> Dispatch.broadcast()

Expand All @@ -49,8 +50,13 @@ defmodule Radiator.Outline.EventConsumer do
:error
end

def handle_change_node_content_result({:ok, node}, command) do
%NodeContentChangedEvent{node: node, event_id: command.event_id}
def handle_change_node_content_result({:ok, node}, %ChangeNodeContentCommand{} = command) do
%NodeContentChangedEvent{
node_id: node.id,
content: node.content,
user_id: command.user_id,
event_id: command.event_id
}
|> EventStore.persist_event()
|> Dispatch.broadcast()

Expand Down
3 changes: 1 addition & 2 deletions lib/radiator/outline/node.ex
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
defmodule Radiator.Outline.Node do
@moduledoc """
The node model which represents a single node in the outline.
Currenty there is no concept of a tree
The node model represents a single node in the outline.
"""
use Ecto.Schema
import Ecto.Changeset
Expand Down
18 changes: 10 additions & 8 deletions lib/radiator_web/live/episode_live/index.ex
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
defmodule RadiatorWeb.EpisodeLive.Index do
use RadiatorWeb, :live_view

alias Radiator.Outline
alias Radiator.Outline.{Dispatch, NodeRepository}
alias Radiator.Outline.Event.{NodeContentChangedEvent, NodeInsertedEvent}
alias Radiator.Podcast
Expand Down Expand Up @@ -67,13 +66,13 @@ defmodule RadiatorWeb.EpisodeLive.Index do
|> reply(:noreply)
end

def handle_event("delete_node", %{"uuid" => uuid}, socket) do
def handle_event("delete_node", %{"uuid" => _uuid}, socket) do
_event_id = generate_event_id(socket.id)

case NodeRepository.get_node(uuid) do
nil -> nil
node -> Outline.remove_node(node, socket.id)
end
# case NodeRepository.get_node(uuid) do
# nil -> nil
# node -> Outline.remove_node(node, socket.id)
# end

socket
|> reply(:noreply)
Expand All @@ -95,9 +94,12 @@ defmodule RadiatorWeb.EpisodeLive.Index do
|> reply(:noreply)
end

def handle_info(%NodeContentChangedEvent{node: node}, socket) do
def handle_info(
%NodeContentChangedEvent{node_id: id, content: content},
socket
) do
socket
|> push_event("update", %{node: node})
|> push_event("update", %{node: %{id: id, content: content}})
|> reply(:noreply)
end

Expand Down
16 changes: 16 additions & 0 deletions priv/repo/migrations/20240501153929_create_event_data.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
defmodule Radiator.Repo.Migrations.CreateEventData do
use Ecto.Migration

def change do
create table(:event_data, primary_key: false) do
add :uuid, :uuid, primary_key: true
add :event_type, :string
add :user_id, references(:users, on_delete: :nothing)
add :data, :map, default: %{}

timestamps(type: :utc_datetime)
end

create index(:event_data, [:user_id])
end
end
45 changes: 45 additions & 0 deletions test/radiator/event_store_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
defmodule Radiator.EventStoreTest do
use Radiator.DataCase

alias Radiator.EventStore

describe "event_data" do
alias Radiator.EventStore.EventData

alias Radiator.AccountsFixtures
import Radiator.EventStoreFixtures

@invalid_attrs %{data: nil, uuid: nil, event_type: nil}

test "list_event_data/0 returns all event_data" do
event = event_data_fixture()
assert EventStore.list_event_data() == [event]
end

test "get_event!/1 returns the event_data with given id" do
event = event_data_fixture()
assert EventStore.get_event_data!(event.uuid) == event
end

test "create_event/1 with valid data creates a event" do
user = AccountsFixtures.user_fixture()

valid_attrs = %{
data: %{},
uuid: Ecto.UUID.generate(),
event_type: "some event_type",
user_id: user.id
}

assert {:ok, %EventData{} = event} = EventStore.create_event_data(valid_attrs)
assert event.data == %{}
assert event.uuid == valid_attrs.uuid
assert event.event_type == valid_attrs.event_type
assert event.user_id == valid_attrs.user_id
end

test "create_event/1 with invalid data returns error changeset" do
assert {:error, %Ecto.Changeset{}} = EventStore.create_event_data(@invalid_attrs)
end
end
end
Loading

0 comments on commit 13aeaa1

Please sign in to comment.