Skip to content

Commit

Permalink
README updated
Browse files Browse the repository at this point in the history
  • Loading branch information
idugalic committed Jun 27, 2024
1 parent 82a9c6f commit 7619738
Show file tree
Hide file tree
Showing 10 changed files with 176 additions and 6 deletions.
1 change: 1 addition & 0 deletions src/application/mod.rs
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
pub mod order_materialized_view;
pub mod order_restaurant_aggregate;
pub mod restaurant_materialized_view;
8 changes: 8 additions & 0 deletions src/application/order_materialized_view.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
use crate::domain::api::OrderEvent;
use crate::domain::order_view::{OrderView, OrderViewState};
use crate::framework::application::materialized_view::MaterializedView;
use crate::infrastructure::order_view_state_repository::OrderViewStateRepository;

/// A convenient type alias for the order materialized view.
pub type OrderMeterializedView<'a> =
MaterializedView<Option<OrderViewState>, OrderEvent, OrderViewStateRepository, OrderView<'a>>;
6 changes: 6 additions & 0 deletions src/domain/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,12 @@ pub struct RestaurantName(pub String);

#[derive(PostgresType, Serialize, Deserialize, Clone, PartialEq, Eq, Debug)]
pub struct OrderId(pub Uuid);
impl fmt::Display for OrderId {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
// Delegate the formatting to the inner Uuid
write!(f, "{}", self.0)
}
}

#[derive(PostgresType, Serialize, Deserialize, Clone, PartialEq, Eq, Debug)]
pub struct Reason(pub String);
Expand Down
10 changes: 10 additions & 0 deletions src/domain/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,3 +210,13 @@ pub fn event_to_restaurant_event(event: &Event) -> Option<RestaurantEvent> {
Event::OrderPrepared(_e) => None,
}
}

pub fn event_to_order_event(event: &Event) -> Option<OrderEvent> {
match event {
Event::RestaurantCreated(_e) => None,
Event::RestaurantMenuChanged(_e) => None,
Event::OrderPlaced(_e) => None,
Event::OrderCreated(e) => Some(OrderEvent::Created(e.to_owned())),
Event::OrderPrepared(e) => Some(OrderEvent::Prepared(e.to_owned())),
}
}
2 changes: 1 addition & 1 deletion src/domain/order_view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ pub struct OrderViewState {
}

/// A convenient type alias for the Order view
type OrderView<'a> = View<'a, Option<OrderViewState>, OrderEvent>;
pub type OrderView<'a> = View<'a, Option<OrderViewState>, OrderEvent>;

/// View represents the event handling algorithm. It belongs to the Domain layer.
pub fn order_view<'a>() -> OrderView<'a> {
Expand Down
1 change: 1 addition & 0 deletions src/infrastructure/mod.rs
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
pub mod order_restaurant_event_repository;
pub mod order_view_state_repository;
pub mod restaurant_view_state_repository;
1 change: 1 addition & 0 deletions src/infrastructure/order_restaurant_event_repository.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use crate::framework::infrastructure::event_repository::EventOrchestratingReposi
pub struct OrderAndRestaurantEventRepository {}

/// Implementation of the event orchestrating repository for the restaurant and order domain(s).
/// We use default implementation from the trait. How cool is that?
impl EventOrchestratingRepository<Command, Event> for OrderAndRestaurantEventRepository {}

impl OrderAndRestaurantEventRepository {
Expand Down
89 changes: 89 additions & 0 deletions src/infrastructure/order_view_state_repository.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
use crate::domain::api::OrderEvent;
use crate::domain::order_view::OrderViewState;
use crate::framework::domain::api::Identifier;
use crate::framework::infrastructure::errors::ErrorMessage;
use crate::framework::infrastructure::to_payload;
use crate::framework::infrastructure::view_state_repository::ViewStateRepository;
use pgrx::{IntoDatum, JsonB, PgBuiltInOids, Spi};

/// OrderViewStateRepository struct
/// View state repository is always very specific to the domain. There is no default implementation in the `ViewStateRepository` trait.
pub struct OrderViewStateRepository {}

/// OrderViewStateRepository - struct implementation
impl OrderViewStateRepository {
/// Create a new OrderViewStateRepository
pub fn new() -> Self {
OrderViewStateRepository {}
}
}

/// Implementation of the view state repository for the order `view` state.
impl ViewStateRepository<OrderEvent, Option<OrderViewState>> for OrderViewStateRepository {
/// Fetches current state, based on the event.
fn fetch_state(
&self,
event: &OrderEvent,
) -> Result<Option<Option<OrderViewState>>, ErrorMessage> {
let query = "SELECT data FROM orders WHERE id = $1";
Spi::connect(|client| {
let mut results = Vec::new();
let tup_table = client
.select(
query,
None,
Some(vec![(
PgBuiltInOids::UUIDOID.oid(),
event.identifier().to_string().into_datum(),
)]),
)
.map_err(|err| ErrorMessage {
message: "Failed to fetch the order: ".to_string() + &err.to_string(),
})?;
for row in tup_table {
let data = row["data"].value::<JsonB>().map_err(|err| ErrorMessage {
message: "Failed to fetch the order/payload (map `data` to `JsonB`): ".to_string() + &err.to_string(),
})?.ok_or(ErrorMessage {
message: "Failed to fetch order data/payload (map `data` to `JsonB`): No data/payload found".to_string(),
})?;

results.push(to_payload::<OrderViewState>(data)?);
}
Ok(Some(results.into_iter().last()))
})
}
/// Saves the new state.
fn save(&self, state: &Option<OrderViewState>) -> Result<Option<OrderViewState>, ErrorMessage> {
let state = state.as_ref().ok_or(ErrorMessage {
message: "Failed to save the order: state is empty".to_string(),
})?;
let data = serde_json::to_value(state).map_err(|err| ErrorMessage {
message: "Failed to serialize the order: ".to_string() + &err.to_string(),
})?;

Spi::connect(|mut client| {
client
.update(
"INSERT INTO orders (id, data) VALUES ($1, $2) ON CONFLICT (id) DO UPDATE SET data = $2 RETURNING data",
None,
Some(vec![
(
PgBuiltInOids::UUIDOID.oid(),
state.identifier.to_string().into_datum(),
),
(
PgBuiltInOids::JSONBOID.oid(),
JsonB(data).into_datum(),
),
]),
)?
.first()
.get_one::<JsonB>().map(|o|{ o.map( |it| to_payload(it).unwrap() )})
})
.map(Some)
.map_err(|err| ErrorMessage {
message: "Failed to save the order: ".to_string() + &err.to_string(),
})
.map(|state| state.unwrap())
}
}
5 changes: 3 additions & 2 deletions src/infrastructure/restaurant_view_state_repository.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use crate::framework::infrastructure::view_state_repository::ViewStateRepository
use pgrx::{IntoDatum, JsonB, PgBuiltInOids, Spi};

/// RestaurantViewStateRepository struct
/// View state repository is always very specific to the domain. There is no default implementation in the `ViewStateRepository` trait.
pub struct RestaurantViewStateRepository {}

/// RestaurantViewStateRepository - struct implementation
Expand Down Expand Up @@ -45,7 +46,7 @@ impl ViewStateRepository<RestaurantEvent, Option<RestaurantViewState>>
let data = row["data"].value::<JsonB>().map_err(|err| ErrorMessage {
message: "Failed to fetch the restaurant/payload (map `data` to `JsonB`): ".to_string() + &err.to_string(),
})?.ok_or(ErrorMessage {
message: "Failed to fetch event data/payload (map `data` to `JsonB`): No data/payload found".to_string(),
message: "Failed to fetch restaurant data/payload (map `data` to `JsonB`): No data/payload found".to_string(),
})?;

results.push(to_payload::<RestaurantViewState>(data)?);
Expand Down Expand Up @@ -84,7 +85,7 @@ impl ViewStateRepository<RestaurantEvent, Option<RestaurantViewState>>
.first()
.get_one::<JsonB>().map(|o|{ o.map( |it| to_payload(it).unwrap() )})
})
.map(|state| Some(state))
.map(Some)
.map_err(|err| ErrorMessage {
message: "Failed to save the restaurant: ".to_string() + &err.to_string(),
})
Expand Down
59 changes: 56 additions & 3 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
use crate::application::order_materialized_view::OrderMeterializedView;
use crate::application::order_restaurant_aggregate::OrderAndRestaurantAggregate;
use crate::application::restaurant_materialized_view::RestaurantMeterializedView;
use crate::domain::order_view::order_view;
use crate::domain::restaurant_view::restaurant_view;
use crate::domain::{
event_to_restaurant_event, order_restaurant_decider, order_restaurant_saga, Command, Event,
event_to_order_event, event_to_restaurant_event, order_restaurant_decider,
order_restaurant_saga, Command, Event,
};
use crate::framework::infrastructure::errors::{ErrorMessage, TriggerError};
use crate::framework::infrastructure::to_payload;
use crate::infrastructure::order_restaurant_event_repository::OrderAndRestaurantEventRepository;
use crate::infrastructure::order_view_state_repository::OrderViewStateRepository;
use crate::infrastructure::restaurant_view_state_repository::RestaurantViewStateRepository;
use pgrx::prelude::*;
use pgrx::JsonB;
Expand Down Expand Up @@ -59,7 +63,7 @@ fn handle_all(commands: Vec<Command>) -> Result<Vec<Event>, ErrorMessage> {
.map(|res| res.into_iter().map(|(e, _)| e.clone()).collect())
}

/// Event handler for Restaurant events / Trigger function that handles events and updates the materialized view.
/// Event handler for Restaurant events / Trigger function that handles restaurant related events and updates the materialized view/table.
#[pg_trigger]
fn handle_restaurant_events<'a>(
trigger: &'a PgTrigger<'a>,
Expand Down Expand Up @@ -105,6 +109,52 @@ extension_sql!(
requires = [handle_restaurant_events]
);

/// Event handler for Order events / Trigger function that handles order related events and updates the materialized view/table.
#[pg_trigger]
fn handle_order_events<'a>(
trigger: &'a PgTrigger<'a>,
) -> Result<Option<PgHeapTuple<'a, impl WhoAllocated>>, TriggerError> {
let new = trigger
.new()
.ok_or(TriggerError::NullTriggerTuple)?
.into_owned();
let event: JsonB = new
.get_by_name::<JsonB>("data")?
.ok_or(TriggerError::NullTriggerTuple)?;
let materialized_view =
OrderMeterializedView::new(OrderViewStateRepository::new(), order_view());

match event_to_order_event(
&to_payload::<Event>(event)
.map_err(|err| TriggerError::EventHandlingError(err.to_string()))?,
) {
// If the event is not a Restaurant event, we do nothing
None => return Ok(Some(new)),
// If the event is a Restaurant event, we handle it
Some(e) => {
materialized_view
.handle(&e)
.map_err(|err| TriggerError::EventHandlingError(err.message))?;
}
}
Ok(Some(new))
}

// Materialized view / Table for the Order query side model
// This table is updated by the trigger function / event handler `handle_order_events`
extension_sql!(
r#"
CREATE TABLE IF NOT EXISTS orders (
id UUID PRIMARY KEY,
data JSONB
);
CREATE TRIGGER order_event_handler_trigger AFTER INSERT ON events FOR EACH ROW EXECUTE PROCEDURE handle_order_events();
"#,
name = "order_event_handler_trigger",
requires = [handle_order_events]
);

#[cfg(any(test, feature = "pg_test"))]
#[pg_schema]
mod tests {
Expand All @@ -115,7 +165,10 @@ mod tests {
VALUES ('RestaurantCreated', '5f8bdf95-c95b-4e4b-8535-d2ac4663bea9', 'Restaurant', 'e48d4d9e-403e-453f-b1ba-328e0ce23737', '{"type": "RestaurantCreated","identifier": "e48d4d9e-403e-453f-b1ba-328e0ce23737", "name": "Pljeska", "menu": {"menu_id": "02f09a3f-1624-3b1d-8409-44eff7708210", "items": [{"id": "02f09a3f-1624-3b1d-8409-44eff7708210","name": "supa","price": 10},{"id": "02f09a3f-1624-3b1d-8409-44eff7708210","name": "sarma","price": 20 }],"cuisine": "Vietnamese"}, "final": false }', 'e48d4d9e-403e-453f-b1ba-328e0ce23737', NULL, FALSE);
"#,
name = "data_insert",
requires = ["restaurant_event_handler_trigger"]
requires = [
"restaurant_event_handler_trigger",
"order_event_handler_trigger"
]
);
use crate::domain::api::{
ChangeRestaurantMenu, CreateRestaurant, OrderCreated, OrderLineItem, OrderPlaced,
Expand Down

0 comments on commit 7619738

Please sign in to comment.