diff --git a/.cargo/config.toml b/.cargo/config.toml new file mode 100644 index 0000000..13c456b --- /dev/null +++ b/.cargo/config.toml @@ -0,0 +1,3 @@ +[target.'cfg(target_os="macos")'] +# Postgres symbols won't be available until runtime +rustflags = ["-Clink-arg=-Wl,-undefined,dynamic_lookup"] diff --git a/.github/workflows/lint-and-test.yml b/.github/workflows/lint-and-test.yml new file mode 100644 index 0000000..ed8e457 --- /dev/null +++ b/.github/workflows/lint-and-test.yml @@ -0,0 +1,42 @@ +name: šŸ§Ŗ Lint and Test + +on: + push: + branches-ignore: [wip/**] + +jobs: + test: + runs-on: ubuntu-latest + container: pgxn/pgxn-tools + strategy: + matrix: + pg: [11, 12, 13, 14, 15, 16] + name: šŸ˜ Postgres ${{ matrix.pg }} + steps: + - name: Checkout + uses: actions/checkout@v4 + - name: Start PostgreSQL ${{ matrix.pg }} + run: pg-start ${{ matrix.pg }} + - name: Setup Rust Cache + uses: Swatinem/rust-cache@v2 + - name: Test on PostgreSQL ${{ matrix.pg }} + run: pgrx-build-test + + lint: + name: āœ… Lint and Cover + runs-on: ubuntu-latest + container: pgxn/pgxn-tools + env: { PGVERSION: 16 } + steps: + - name: Checkout + uses: actions/checkout@v4 + - name: Start PostgreSQL ${{ env.PGVERSION }} + run: pg-start ${{ env.PGVERSION }} libxml2-utils + - name: Setup Rust Cache + uses: Swatinem/rust-cache@v2 + - name: Install pgrx + run: make install-pgrx + - name: Initialize pgrx + run: make pgrx-init + - name: Format and Lint + run: make lint \ No newline at end of file diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..3906c33 --- /dev/null +++ b/.gitignore @@ -0,0 +1,6 @@ +.DS_Store +.idea/ +/target +*.iml +**/*.rs.bk +Cargo.lock diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000..f386875 --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,36 @@ +[package] +name = "fmodel_rust_postgres" +version = "1.0.0" +edition = "2021" + +[lib] +crate-type = ["cdylib"] + +[features] +default = ["pg15"] +pg11 = ["pgrx/pg11", "pgrx-tests/pg11" ] +pg12 = ["pgrx/pg12", "pgrx-tests/pg12" ] +pg13 = ["pgrx/pg13", "pgrx-tests/pg13" ] +pg14 = ["pgrx/pg14", "pgrx-tests/pg14" ] +pg15 = ["pgrx/pg15", "pgrx-tests/pg15" ] +pg16 = ["pgrx/pg16", "pgrx-tests/pg16" ] +pg_test = [] + +[dependencies] +pgrx = "=0.11.4" +serde = { version = "1.0.203", features = ["derive"] } +fmodel-rust = "0.7.0" +serde_json = "1.0.117" +uuid = { version = "1.8.0", features = ["serde", "v4"] } + +[dev-dependencies] +pgrx-tests = "=0.11.4" + +[profile.dev] +panic = "unwind" + +[profile.release] +panic = "unwind" +opt-level = 3 +lto = "fat" +codegen-units = 1 diff --git a/README.md b/README.md new file mode 100644 index 0000000..97957a3 --- /dev/null +++ b/README.md @@ -0,0 +1,67 @@ +# `fmodel-rust-postgres` + +Effortlessly transform your domain models into powerful PostgreSQL extensions using our GitHub repository template. +With pre-implemented infrastructure and application layers in the `framework` module, you can focus entirely on your core domain logic while running your models directly within your PostgreSQL database for seamless integration and enhanced performance. + +The template includes a demo domain model of a `restaurant/order management system`, showcasing practical implementation and providing a solid foundation for your own projects. + +![event model](restaurant-model.jpg) + +>Actually, the domain model is copied from the traditional application [fmodel-rust-demo](https://github.com/fraktalio/fmodel-rust-demo), demonstrating how to run your unique and single domain model directly within your PostgreSQL database/`as extension`; or connect the application to the database/`traditionally`. + +## Event Sourcing + +With event sourcing, we delve deeper by capturing every decision or alteration as an event. +Each new transfer or modification to the account state is meticulously documented, providing a comprehensive audit trail +of all activities. +This affords you a 100% accurate historical record of your domain, enabling you to effortlessly traverse back +in time and review the state at any given moment. + +**History is always on!** + +## Technology +This project is using: + +- [`rust` programming language](https://www.rust-lang.org/) to build a high-performance, reliable, and efficient system. +- [fmodel-rust library](https://github.com/fraktalio/fmodel-rust) to implement tactical Domain-Driven Design patterns, optimised for Event Sourcing. +- [pgrx](https://github.com/pgcentralfoundation/pgrx) to simplify the creation of custom Postgres extensions and bring `logic` closer to your data(base). + +## Requirements +- [Rust](https://www.rust-lang.org/tools/install) +- [PGRX subcommand](https://github.com/pgcentralfoundation/pgrx?tab=readme-ov-file#getting-started): `cargo install --locked cargo-pgrx` +- Run `cargo pgrx init` once, to properly configure the pgrx development environment. It downloads the latest releases of supported Postgres versions, configures them for debugging, compiles them with assertions, and installs them to `"${PGRX_HOME}"`. These include all contrib extensions and tools included with Postgres. Other cargo pgrx commands such as `run` and `test` will manage and use these installations on your behalf. + +> No manual Postgres database installation is required. + +## Test it / Run it +Run tests: + +```shell +cargo pgrx test +``` + +Compile/install extension to a pgrx-managed Postgres instance and start psql: +```shell +cargo pgrx run +``` + +Confused? Run `cargo pgrx help` + +## The structure of the project + +The project is structured as follows: +- `lib.rs` file contains the entry point of the package/crate. +- `framework` module contains the generalized and parametrized implementation of infrastructure and application layers. +- `domain` module contains the domain model. It is the core and pure domain logic of the application!!! +- `application` module contains the application layer. It is the orchestration of the domain model and the infrastructure layer (empty, as it is implemented in the `framework` module). +- `infrastructure` module contains the infrastructure layer / fetching and storing data (empty, as it is implemented in the `framework` module). + +The framework module offers a generic implementation of the infrastructure and application layers, which can be reused across multiple domain models. +Your focus should be on the `domain` module, where you can implement your unique domain model. We have provided a demo domain model of a `restaurant/order management system` to get you started. + +## References and further reading +- [pgrx](https://github.com/pgcentralfoundation/pgrx) +- [fmodel-rust](https://github.com/fraktalio/fmodel-rust) + +--- +Created with :heart: by [Fraktalio](https://fraktalio.com/) diff --git a/fmodel_rust_postgres.control b/fmodel_rust_postgres.control new file mode 100644 index 0000000..1077cd2 --- /dev/null +++ b/fmodel_rust_postgres.control @@ -0,0 +1,5 @@ +comment = 'fmodel_rust_postgres: Created by pgrx' +default_version = '@CARGO_VERSION@' +module_pathname = '$libdir/fmodel_rust_postgres' +relocatable = false +superuser = true diff --git a/restaurant-model.jpg b/restaurant-model.jpg new file mode 100644 index 0000000..02de40c Binary files /dev/null and b/restaurant-model.jpg differ diff --git a/sql/event_sourcing.sql b/sql/event_sourcing.sql new file mode 100644 index 0000000..ccf95a8 --- /dev/null +++ b/sql/event_sourcing.sql @@ -0,0 +1,157 @@ +-- ######################## +-- ######## TABLES ######## +-- ######################## + +-- Registered deciders and the respectful events that these deciders can publish (decider can publish and/or source its own state from these event types only) +CREATE TABLE IF NOT EXISTS deciders +( + -- decider name/type + "decider" TEXT NOT NULL, + -- event name/type that this decider can publish + "event" TEXT NOT NULL, + PRIMARY KEY ("decider", "event") +); + +INSERT INTO deciders ("decider", "event") VALUES ('Restaurant', 'RestaurantCreated'); +INSERT INTO deciders ("decider", "event") VALUES ('Restaurant', 'RestaurantNotCreated'); +INSERT INTO deciders ("decider", "event") VALUES ('Restaurant', 'RestaurantMenuChanged'); +INSERT INTO deciders ("decider", "event") VALUES ('Restaurant', 'RestaurantMenuNotChanged'); +INSERT INTO deciders ("decider", "event") VALUES ('Restaurant', 'OrderPlaced'); +INSERT INTO deciders ("decider", "event") VALUES ('Restaurant', 'OrderNotPlaced'); +INSERT INTO deciders ("decider", "event") VALUES ('Order', 'OrderCreated'); +INSERT INTO deciders ("decider", "event") VALUES ('Order', 'OrderPrepared'); +INSERT INTO deciders ("decider", "event") VALUES ('Order', 'OrderNotCreated'); +INSERT INTO deciders ("decider", "event") VALUES ('Order', 'OrderNotPrepared'); + + +-- Events +CREATE TABLE IF NOT EXISTS events +( + -- event name/type. Part of a composite foreign key to `deciders` + "event" TEXT NOT NULL, + -- event ID. This value is used by the next event as it's `previous_id` value to guard against a Lost-EventModel problem / optimistic locking. + "event_id" UUID NOT NULL UNIQUE, + -- decider name/type. Part of a composite foreign key to `deciders` + "decider" TEXT NOT NULL, + -- business identifier for the decider + "decider_id" TEXT NOT NULL, + -- event data in JSON format + "data" JSONB NOT NULL, + -- command ID causing this event + "command_id" UUID NULL, + -- previous event uuid; null for first event; null does not trigger UNIQUE constraint; we defined a function `check_first_event_for_decider` + "previous_id" UUID UNIQUE, + -- indicator if the event stream for the `decider_id` is final + "final" BOOLEAN NOT NULL DEFAULT FALSE, + -- The timestamp of the event insertion. AUTOPOPULATESā€”DO NOT INSERT + "created_at" TIMESTAMP WITH TIME ZONE DEFAULT NOW() NOT NULL, + -- ordering sequence/offset for all events in all deciders. AUTOPOPULATESā€”DO NOT INSERT + "offset" BIGSERIAL PRIMARY KEY, + FOREIGN KEY ("decider", "event") REFERENCES deciders ("decider", "event") +); + + +CREATE INDEX IF NOT EXISTS decider_index ON events ("decider_id", "offset"); + +-- ######################## +-- ##### SIDE EFFECTS ##### +-- ######################## + +-- Many things that can be done using triggers can also be implemented using the Postgres rule system. +-- What currently cannot be implemented by rules are some kinds of constraints. +-- It is possible, to place a qualified rule that rewrites a query to NOTHING if the value of a column does not appear in another table. +-- But then the data is silently thrown away, and that's not a good idea. +-- If checks for valid values are required, and in the case of an invalid value an error message should be generated, it must be done by a trigger for now. + +-- SIDE EFFECT (rule): immutable decider - ignore deleting already registered events +--CREATE OR REPLACE RULE ignore_delete_decider_events AS ON DELETE TO deciders +-- DO INSTEAD NOTHING; + +-- SIDE EFFECT (rule): immutable decider - ignore updating already registered events +--CREATE OR REPLACE RULE ignore_update_decider_events AS ON UPDATE TO deciders +-- DO INSTEAD NOTHING; + +-- SIDE EFFECT (rule): immutable events - ignore delete +CREATE OR REPLACE RULE ignore_delete_events AS ON DELETE TO events + DO INSTEAD NOTHING; + +-- SIDE EFFECT (rule): immutable events - ignore update +CREATE OR REPLACE RULE ignore_update_events AS ON UPDATE TO events + DO INSTEAD NOTHING; + + +-- SIDE EFFECT (trigger): Can only use null previousId for first event in an decider +CREATE OR REPLACE FUNCTION check_first_event_for_decider() RETURNS trigger AS +' + BEGIN + IF (NEW.previous_id IS NULL + AND EXISTS(SELECT 1 + FROM events + WHERE NEW.decider_id = decider_id + AND NEW.decider = decider)) + THEN + RAISE EXCEPTION ''previous_id can only be null for first decider event''; + END IF; + RETURN NEW; + END; +' + LANGUAGE plpgsql; + +DROP TRIGGER IF EXISTS t_check_first_event_for_decider ON events; +CREATE TRIGGER t_check_first_event_for_decider + BEFORE INSERT + ON events + FOR EACH ROW +EXECUTE FUNCTION check_first_event_for_decider(); + + +-- SIDE EFFECT (trigger): can only append events if the decider_id stream is not finalized already +CREATE OR REPLACE FUNCTION check_final_event_for_decider() RETURNS trigger AS +' + BEGIN + IF EXISTS(SELECT 1 + FROM events + WHERE NEW.decider_id = decider_id + AND "final" = TRUE + AND NEW.decider = decider) + THEN + RAISE EXCEPTION ''last event for this decider stream is already final. the stream is closed, you can not append events to it.''; + END IF; + RETURN NEW; + END; +' + LANGUAGE plpgsql; + +DROP TRIGGER IF EXISTS t_check_final_event_for_decider ON events; +CREATE TRIGGER t_check_final_event_for_decider + BEFORE INSERT + ON events + FOR EACH ROW +EXECUTE FUNCTION check_final_event_for_decider(); + + +-- SIDE EFFECT (trigger): previousId must be in the same decider as the event +CREATE OR REPLACE FUNCTION check_previous_id_in_same_decider() RETURNS trigger AS +' + BEGIN + IF (NEW.previous_id IS NOT NULL + AND NOT EXISTS(SELECT 1 + FROM events + WHERE NEW.previous_id = event_id + AND NEW.decider_id = decider_id + AND NEW.decider = decider)) + THEN + RAISE EXCEPTION ''previous_id must be in the same decider''; + END IF; + RETURN NEW; + END; +' + LANGUAGE plpgsql; + +DROP TRIGGER IF EXISTS t_check_previous_id_in_same_decider ON events; +CREATE TRIGGER t_check_previous_id_in_same_decider + BEFORE INSERT + ON events + FOR EACH ROW +EXECUTE FUNCTION check_previous_id_in_same_decider(); + diff --git a/src/application/mod.rs b/src/application/mod.rs new file mode 100644 index 0000000..712a76c --- /dev/null +++ b/src/application/mod.rs @@ -0,0 +1 @@ +pub mod order_restaurant_aggregate; diff --git a/src/application/order_restaurant_aggregate.rs b/src/application/order_restaurant_aggregate.rs new file mode 100644 index 0000000..5ce8888 --- /dev/null +++ b/src/application/order_restaurant_aggregate.rs @@ -0,0 +1,15 @@ +use crate::domain::order_decider::Order; +use crate::framework::application::event_sourced_aggregate::EventSourcedOrchestratingAggregate; + +use crate::domain::restaurant_decider::Restaurant; +use crate::domain::{Command, Event}; +use crate::infrastructure::order_restaurant_event_repository::OrderAndRestaurantEventRepository; + +/// A convenient type alias for the order and restaurant aggregate. +pub type OrderAndRestaurantAggregate<'a> = EventSourcedOrchestratingAggregate< + 'a, + Command, + (Option, Option), + Event, + OrderAndRestaurantEventRepository, +>; diff --git a/src/domain/api.rs b/src/domain/api.rs new file mode 100644 index 0000000..ed12d4b --- /dev/null +++ b/src/domain/api.rs @@ -0,0 +1,277 @@ +use pgrx::{PostgresEnum, PostgresType}; +use serde::{Deserialize, Serialize}; +use uuid::Uuid; + +// ######################################################## +// #################### Value Objects ##################### +// ######################################################## + +// The 'newtype' pattern is typical in functional programming. In Haskell, this pattern is supported via the 'newtype' declaration, which allows the programmer to define a new type identical to an existing one except for its name. This is useful for creating type-safe abstractions, enabling the programmer to enforce stronger type constraints on using specific values. +// Similarly, in Rust, the 'newtype' idiom brings compile-time guarantees that the correct value type is supplied. The 'newtype' is a struct that wraps a single value and provides a new type for that value. A 'newtype' is the same as the underlying type at runtime, so it will not introduce any performance overhead. +#[derive(PostgresType, Serialize, Deserialize, Clone, PartialEq, Eq, Debug)] +pub struct RestaurantId(pub Uuid); + +#[derive(PostgresType, Serialize, Deserialize, Clone, PartialEq, Eq, Debug)] +pub struct RestaurantName(pub String); + +#[derive(PostgresType, Serialize, Deserialize, Clone, PartialEq, Eq, Debug)] +pub struct OrderId(pub Uuid); + +#[derive(PostgresType, Serialize, Deserialize, Clone, PartialEq, Eq, Debug)] +pub struct Reason(pub String); + +#[derive(Serialize, Deserialize, Clone, PartialEq, Eq, Debug)] +pub struct Money(pub u64); + +#[derive(PostgresType, Serialize, Deserialize, Clone, PartialEq, Eq, Debug)] +pub struct MenuId(pub Uuid); + +#[derive(PostgresType, Serialize, Deserialize, Clone, PartialEq, Eq, Debug)] +pub struct MenuItemId(pub Uuid); + +#[derive(PostgresType, Serialize, Deserialize, Clone, PartialEq, Eq, Debug)] +pub struct MenuItemName(pub String); + +#[derive(PostgresType, Serialize, Deserialize, Clone, PartialEq, Eq, Debug)] +pub struct OrderLineItemId(pub Uuid); + +#[derive(PostgresType, Serialize, Deserialize, Clone, PartialEq, Eq, Debug)] +pub struct OrderLineItemQuantity(pub u32); + +#[derive(PostgresType, Serialize, Deserialize, Clone, PartialEq, Eq, Debug)] +pub struct MenuItem { + pub id: MenuItemId, + pub name: MenuItemName, + pub price: Money, +} + +#[derive(PostgresEnum, Serialize, Deserialize, Clone, PartialEq, Eq, Debug)] +pub enum RestaurantMenuCuisine { + Italian, + Indian, + Chinese, + Japanese, + American, + Mexican, + French, + Thai, + Vietnamese, + Greek, + Korean, + Spanish, + Lebanese, + Turkish, + Ethiopian, + Moroccan, + Egyptian, + Brazilian, + Polish, + German, + British, + Irish, + Other, +} + +#[derive(PostgresType, Serialize, Deserialize, Clone, PartialEq, Eq, Debug)] +pub struct RestaurantMenu { + pub menu_id: MenuId, + pub items: Vec, + pub cuisine: RestaurantMenuCuisine, +} + +#[derive(PostgresType, Serialize, Deserialize, Clone, PartialEq, Eq, Debug)] +pub struct OrderLineItem { + pub id: OrderLineItemId, + pub quantity: OrderLineItemQuantity, + pub menu_item_id: MenuItemId, + pub name: MenuItemName, +} + +#[derive(PostgresEnum, Serialize, Deserialize, Clone, PartialEq, Eq, Debug)] +pub enum OrderStatus { + Created, + Prepared, + Cancelled, + Rejected, +} + +// ######################################################## +// ####################### COMMANDS ####################### +// ######################################################## + +// #### RESTAURANT #### +/// All possible command variants that could be sent to a restaurant +#[derive(PostgresType, Serialize, Deserialize, Debug, PartialEq, Eq, Clone)] +#[serde(tag = "type")] +pub enum RestaurantCommand { + CreateRestaurant(CreateRestaurant), + ChangeMenu(ChangeRestaurantMenu), + PlaceOrder(PlaceOrder), +} +/// Intent/Command to create a new restaurant +#[derive(PostgresType, Serialize, Deserialize, Debug, PartialEq, Eq, Clone)] +pub struct CreateRestaurant { + pub identifier: RestaurantId, + pub name: RestaurantName, + pub menu: RestaurantMenu, +} + +/// Intent/Command to change the menu of a restaurant +#[derive(PostgresType, Serialize, Deserialize, Debug, PartialEq, Eq, Clone)] +pub struct ChangeRestaurantMenu { + pub identifier: RestaurantId, + pub menu: RestaurantMenu, +} + +/// Intent/Command to place an order at a restaurant +#[derive(PostgresType, Serialize, Deserialize, Debug, PartialEq, Eq, Clone)] +pub struct PlaceOrder { + pub identifier: RestaurantId, + pub order_identifier: OrderId, + pub line_items: Vec, +} + +// #### ORDER #### + +/// All possible command variants that could be sent to an order +#[derive(PostgresType, Serialize, Deserialize, Debug, PartialEq, Eq, Clone)] +#[serde(tag = "type")] +pub enum OrderCommand { + Create(CreateOrder), + MarkAsPrepared(MarkOrderAsPrepared), +} + +/// Intent/Command to create a new order +#[derive(PostgresType, Serialize, Deserialize, Debug, PartialEq, Eq, Clone)] +pub struct CreateOrder { + pub identifier: OrderId, + pub restaurant_identifier: RestaurantId, + pub line_items: Vec, +} + +/// Intent/Command to mark an order as prepared +#[derive(PostgresType, Serialize, Deserialize, Debug, PartialEq, Eq, Clone)] +pub struct MarkOrderAsPrepared { + pub identifier: OrderId, +} + +// ######################################################## +// ######################## EVENTS ######################## +// ######################################################## + +// #### RESTAURANT #### + +/// All possible event variants that could be used to update a restaurant +#[derive(PostgresType, Serialize, Deserialize, Debug, PartialEq, Clone)] +#[serde(tag = "type")] +pub enum RestaurantEvent { + Created(RestaurantCreated), + NotCreated(RestaurantNotCreated), + MenuChanged(RestaurantMenuChanged), + MenuNotChanged(RestaurantMenuNotChanged), + OrderPlaced(OrderPlaced), + OrderNotPlaced(OrderNotPlaced), +} + +/// Fact/Event that a restaurant was created +#[derive(PostgresType, Serialize, Deserialize, Debug, PartialEq, Clone, Eq)] +pub struct RestaurantCreated { + pub identifier: RestaurantId, + pub name: RestaurantName, + pub menu: RestaurantMenu, + pub r#final: bool, +} + +/// Fact/Event that a restaurant was not created (with reason) +#[derive(PostgresType, Serialize, Deserialize, Debug, PartialEq, Clone, Eq)] +pub struct RestaurantNotCreated { + pub identifier: RestaurantId, + pub name: RestaurantName, + pub menu: RestaurantMenu, + pub reason: Reason, + pub r#final: bool, +} + +/// Fact/Event that a restaurant's menu was changed +#[derive(PostgresType, Serialize, Deserialize, Debug, PartialEq, Clone, Eq)] +pub struct RestaurantMenuChanged { + pub identifier: RestaurantId, + pub menu: RestaurantMenu, + pub r#final: bool, +} + +/// Fact/Event that a restaurant's menu was not changed (with reason) +#[derive(PostgresType, Serialize, Deserialize, Debug, PartialEq, Clone, Eq)] +pub struct RestaurantMenuNotChanged { + pub identifier: RestaurantId, + pub menu: RestaurantMenu, + pub reason: Reason, + pub r#final: bool, +} + +/// Fact/Event that an order was placed +#[derive(PostgresType, Serialize, Deserialize, Debug, PartialEq, Clone, Eq)] +pub struct OrderPlaced { + pub identifier: RestaurantId, + pub order_identifier: OrderId, + pub line_items: Vec, + pub r#final: bool, +} + +/// Fact/Event that an order was not placed (with reason) +#[derive(PostgresType, Serialize, Deserialize, Debug, PartialEq, Clone, Eq)] +pub struct OrderNotPlaced { + pub identifier: RestaurantId, + pub order_identifier: OrderId, + pub line_items: Vec, + pub reason: Reason, + pub r#final: bool, +} + +// #### ORDER #### + +/// All possible event variants that could be used to update an order +#[derive(PostgresType, Serialize, Deserialize, Debug, PartialEq, Clone)] +#[serde(tag = "type")] +pub enum OrderEvent { + Created(OrderCreated), + NotCreated(OrderNotCreated), + Prepared(OrderPrepared), + NotPrepared(OrderNotPrepared), +} + +/// Fact/Event that an order was created +#[derive(PostgresType, Serialize, Deserialize, Debug, PartialEq, Clone, Eq)] +pub struct OrderCreated { + pub identifier: OrderId, + pub restaurant_identifier: RestaurantId, + pub status: OrderStatus, + pub line_items: Vec, + pub r#final: bool, +} + +/// Fact/Event that an order was not created (with reason) +#[derive(PostgresType, Serialize, Deserialize, Debug, PartialEq, Clone, Eq)] +pub struct OrderNotCreated { + pub identifier: OrderId, + pub restaurant_identifier: RestaurantId, + pub line_items: Vec, + pub reason: Reason, + pub r#final: bool, +} + +/// Fact/Event that an order was prepared +#[derive(PostgresType, Serialize, Deserialize, Debug, PartialEq, Clone, Eq)] +pub struct OrderPrepared { + pub identifier: OrderId, + pub status: OrderStatus, + pub r#final: bool, +} + +/// Fact/Event that an order was not prepared (with reason) +#[derive(PostgresType, Serialize, Deserialize, Debug, PartialEq, Clone, Eq)] +pub struct OrderNotPrepared { + pub identifier: OrderId, + pub reason: Reason, + pub r#final: bool, +} diff --git a/src/domain/mod.rs b/src/domain/mod.rs new file mode 100644 index 0000000..f3fc54d --- /dev/null +++ b/src/domain/mod.rs @@ -0,0 +1,247 @@ +use crate::domain::api::{ + ChangeRestaurantMenu, CreateOrder, CreateRestaurant, MarkOrderAsPrepared, OrderCommand, + OrderNotCreated, OrderNotPlaced, OrderNotPrepared, PlaceOrder, RestaurantCommand, + RestaurantMenuNotChanged, RestaurantNotCreated, +}; +use crate::domain::order_decider::{order_decider, Order}; +use crate::domain::order_saga::order_saga; +use crate::domain::restaurant_decider::{restaurant_decider, Restaurant}; +use crate::domain::restaurant_saga::restaurant_saga; +use crate::framework::domain::api::{DeciderType, EventType, Identifier, IsFinal}; +use api::{ + OrderCreated, OrderEvent, OrderPlaced, OrderPrepared, RestaurantCreated, RestaurantEvent, + RestaurantMenuChanged, +}; +use fmodel_rust::decider::Decider; +use fmodel_rust::saga::Saga; +use fmodel_rust::Sum; +use pgrx::PostgresType; +use serde::{Deserialize, Serialize}; +use uuid::Uuid; + +pub mod api; +pub mod order_decider; +pub mod order_saga; +pub mod order_view; +pub mod restaurant_decider; +pub mod restaurant_saga; +pub mod restaurant_view; + +/// A convenient type alias for the combined Decider +/// This decider is used to combine the Restaurant and Order deciders into a single decider that can handle both Restaurant and Order commands. +pub type OrderAndRestaurantDecider<'a> = + Decider<'a, Command, (Option, Option), Event>; + +/// A convenient type alias for the combined Saga +/// This saga is used to combine the Restaurant and Order choreography sagas into a single orchestrating saga that can handle both Restaurant and Order events, and produce Restaurant and Order commands as a result. +pub type OrderAndRestaurantSaga<'a> = Saga<'a, Event, Command>; + +/// Combined Decider, combining the Restaurant and Order deciders into a single decider that can handle both Restaurant and Order commands. +pub fn order_restaurant_decider<'a>() -> OrderAndRestaurantDecider<'a> { + restaurant_decider() + .combine(order_decider()) + .map_command(&command_to_sum) + .map_event(&event_to_sum, &sum_to_event) +} + +/// Combined Saga, combining the Restaurant and Order choreography sagas into a single orchestrating saga that can handle both Restaurant and Order events, and produce Restaurant and Order commands as a result. +pub fn order_restaurant_saga<'a>() -> OrderAndRestaurantSaga<'a> { + restaurant_saga() + .combine(order_saga()) + .map_action_result(&event_to_sum2) + .map_action(&sum_to_command) +} + +/// All possible commands in the order&restaurant domains +#[derive(PostgresType, Serialize, Deserialize, Debug, Eq, PartialEq)] +#[serde(tag = "type")] +pub enum Command { + CreateRestaurant(CreateRestaurant), + ChangeRestaurantMenu(ChangeRestaurantMenu), + PlaceOrder(PlaceOrder), + CreateOrder(CreateOrder), + MarkOrderAsPrepared(MarkOrderAsPrepared), +} + +/// Implement the Identifier trait for the Command enum +impl Identifier for Command { + fn identifier(&self) -> Uuid { + match self { + Command::CreateRestaurant(cmd) => cmd.identifier.0, + Command::ChangeRestaurantMenu(cmd) => cmd.identifier.0, + Command::PlaceOrder(cmd) => cmd.identifier.0, + Command::CreateOrder(cmd) => cmd.identifier.0, + Command::MarkOrderAsPrepared(cmd) => cmd.identifier.0, + } + } +} + +/// All possible events in the order&restaurant domains +#[derive(PostgresType, Serialize, Deserialize, Debug, Eq, PartialEq, Clone)] +#[serde(tag = "type")] +pub enum Event { + RestaurantCreated(RestaurantCreated), + RestaurantNotCreated(RestaurantNotCreated), + RestaurantMenuChanged(RestaurantMenuChanged), + RestaurantMenuNotChanged(RestaurantMenuNotChanged), + OrderPlaced(OrderPlaced), + OrderNotPlaced(OrderNotPlaced), + OrderCreated(OrderCreated), + OrderNotCreated(OrderNotCreated), + OrderPrepared(OrderPrepared), + OrderNotPrepared(OrderNotPrepared), +} + +/// Implement the Identifier trait for the Event enum +impl Identifier for Event { + fn identifier(&self) -> Uuid { + match self { + Event::RestaurantCreated(evt) => evt.identifier.0, + Event::RestaurantMenuChanged(evt) => evt.identifier.0, + Event::OrderPlaced(evt) => evt.identifier.0, + Event::OrderCreated(evt) => evt.identifier.0, + Event::OrderPrepared(evt) => evt.identifier.0, + Event::RestaurantNotCreated(evt) => evt.identifier.0, + Event::RestaurantMenuNotChanged(evt) => evt.identifier.0, + Event::OrderNotPlaced(evt) => evt.identifier.0, + Event::OrderNotCreated(evt) => evt.identifier.0, + Event::OrderNotPrepared(evt) => evt.identifier.0, + } + } +} + +/// Implement the EventType trait for the Event enum +impl EventType for Event { + fn event_type(&self) -> String { + match self { + Event::RestaurantCreated(_) => "RestaurantCreated".to_string(), + Event::RestaurantMenuChanged(_) => "RestaurantMenuChanged".to_string(), + Event::OrderPlaced(_) => "OrderPlaced".to_string(), + Event::OrderCreated(_) => "OrderCreated".to_string(), + Event::OrderPrepared(_) => "OrderPrepared".to_string(), + Event::RestaurantNotCreated(_) => "RestaurantNotCreated".to_string(), + Event::RestaurantMenuNotChanged(_) => "RestaurantMenuNotChanged".to_string(), + Event::OrderNotPlaced(_) => "OrderNotPlaced".to_string(), + Event::OrderNotCreated(_) => "OrderNotCreated".to_string(), + Event::OrderNotPrepared(_) => "OrderNotPrepared".to_string(), + } + } +} + +/// Implement the IsFinal trait for the Event enum +impl IsFinal for Event { + fn is_final(&self) -> bool { + match self { + Event::RestaurantCreated(evt) => evt.r#final, + Event::RestaurantMenuChanged(evt) => evt.r#final, + Event::OrderPlaced(evt) => evt.r#final, + Event::OrderCreated(evt) => evt.r#final, + Event::OrderPrepared(evt) => evt.r#final, + Event::RestaurantNotCreated(evt) => evt.r#final, + Event::RestaurantMenuNotChanged(evt) => evt.r#final, + Event::OrderNotPlaced(evt) => evt.r#final, + Event::OrderNotCreated(evt) => evt.r#final, + Event::OrderNotPrepared(evt) => evt.r#final, + } + } +} + +/// Implement the DeciderType trait for the Event enum +impl DeciderType for Event { + fn decider_type(&self) -> String { + match self { + Event::RestaurantCreated(_) => "Restaurant".to_string(), + Event::RestaurantMenuChanged(_) => "Restaurant".to_string(), + Event::OrderPlaced(_) => "Restaurant".to_string(), + Event::RestaurantNotCreated(_) => "Restaurant".to_string(), + Event::RestaurantMenuNotChanged(_) => "Restaurant".to_string(), + Event::OrderNotPlaced(_) => "Restaurant".to_string(), + Event::OrderCreated(_) => "Order".to_string(), + Event::OrderPrepared(_) => "Order".to_string(), + Event::OrderNotCreated(_) => "Order".to_string(), + Event::OrderNotPrepared(_) => "Order".to_string(), + } + } +} + +/// Mapper functions to convert between the `FModel` Sum type and the more appropriate domain specific Command/API type +/// This is necessary because the `FModel` Sum type is used to combine the Restaurant and Order deciders into a single decider that can handle both Restaurant and Order commands. +/// We don't want to expose the `FModel` Sum type to the API, so we need to convert between the `FModel` Sum type and the more appropriate Command/API type. +pub fn command_to_sum(command: &Command) -> Sum { + match command { + Command::CreateRestaurant(c) => { + Sum::First(RestaurantCommand::CreateRestaurant(c.to_owned())) + } + Command::ChangeRestaurantMenu(c) => Sum::First(RestaurantCommand::ChangeMenu(c.to_owned())), + Command::PlaceOrder(c) => Sum::First(RestaurantCommand::PlaceOrder(c.to_owned())), + Command::CreateOrder(c) => Sum::Second(OrderCommand::Create(c.to_owned())), + Command::MarkOrderAsPrepared(c) => Sum::Second(OrderCommand::MarkAsPrepared(c.to_owned())), + } +} + +pub fn event_to_sum(event: &Event) -> Sum { + match event { + Event::RestaurantCreated(e) => Sum::First(RestaurantEvent::Created(e.to_owned())), + Event::RestaurantNotCreated(e) => Sum::First(RestaurantEvent::NotCreated(e.to_owned())), + Event::RestaurantMenuChanged(e) => Sum::First(RestaurantEvent::MenuChanged(e.to_owned())), + Event::RestaurantMenuNotChanged(e) => { + Sum::First(RestaurantEvent::MenuNotChanged(e.to_owned())) + } + Event::OrderPlaced(e) => Sum::First(RestaurantEvent::OrderPlaced(e.to_owned())), + Event::OrderNotPlaced(e) => Sum::First(RestaurantEvent::OrderNotPlaced(e.to_owned())), + Event::OrderCreated(e) => Sum::Second(OrderEvent::Created(e.to_owned())), + Event::OrderNotCreated(e) => Sum::Second(OrderEvent::NotCreated(e.to_owned())), + Event::OrderPrepared(e) => Sum::Second(OrderEvent::Prepared(e.to_owned())), + Event::OrderNotPrepared(e) => Sum::Second(OrderEvent::NotPrepared(e.to_owned())), + } +} + +pub fn event_to_sum2(event: &Event) -> Sum { + match event { + Event::RestaurantCreated(e) => Sum::Second(RestaurantEvent::Created(e.to_owned())), + Event::RestaurantNotCreated(e) => Sum::Second(RestaurantEvent::NotCreated(e.to_owned())), + Event::RestaurantMenuChanged(e) => Sum::Second(RestaurantEvent::MenuChanged(e.to_owned())), + Event::RestaurantMenuNotChanged(e) => { + Sum::Second(RestaurantEvent::MenuNotChanged(e.to_owned())) + } + Event::OrderPlaced(e) => Sum::Second(RestaurantEvent::OrderPlaced(e.to_owned())), + Event::OrderNotPlaced(e) => Sum::Second(RestaurantEvent::OrderNotPlaced(e.to_owned())), + Event::OrderCreated(e) => Sum::First(OrderEvent::Created(e.to_owned())), + Event::OrderNotCreated(e) => Sum::First(OrderEvent::NotCreated(e.to_owned())), + Event::OrderPrepared(e) => Sum::First(OrderEvent::Prepared(e.to_owned())), + Event::OrderNotPrepared(e) => Sum::First(OrderEvent::NotPrepared(e.to_owned())), + } +} + +pub fn sum_to_command(command: &Sum) -> Command { + match command { + Sum::Second(c) => match c { + RestaurantCommand::CreateRestaurant(c) => Command::CreateRestaurant(c.to_owned()), + RestaurantCommand::ChangeMenu(c) => Command::ChangeRestaurantMenu(c.to_owned()), + RestaurantCommand::PlaceOrder(c) => Command::PlaceOrder(c.to_owned()), + }, + Sum::First(c) => match c { + OrderCommand::Create(c) => Command::CreateOrder(c.to_owned()), + OrderCommand::MarkAsPrepared(c) => Command::MarkOrderAsPrepared(c.to_owned()), + }, + } +} + +pub fn sum_to_event(event: &Sum) -> Event { + match event { + Sum::First(e) => match e { + RestaurantEvent::Created(e) => Event::RestaurantCreated(e.to_owned()), + RestaurantEvent::NotCreated(e) => Event::RestaurantNotCreated(e.to_owned()), + RestaurantEvent::MenuChanged(e) => Event::RestaurantMenuChanged(e.to_owned()), + RestaurantEvent::MenuNotChanged(e) => Event::RestaurantMenuNotChanged(e.to_owned()), + RestaurantEvent::OrderPlaced(e) => Event::OrderPlaced(e.to_owned()), + RestaurantEvent::OrderNotPlaced(e) => Event::OrderNotPlaced(e.to_owned()), + }, + Sum::Second(e) => match e { + OrderEvent::Created(e) => Event::OrderCreated(e.to_owned()), + OrderEvent::NotCreated(e) => Event::OrderNotCreated(e.to_owned()), + OrderEvent::Prepared(e) => Event::OrderPrepared(e.to_owned()), + OrderEvent::NotPrepared(e) => Event::OrderNotPrepared(e.to_owned()), + }, + } +} diff --git a/src/domain/order_decider.rs b/src/domain/order_decider.rs new file mode 100644 index 0000000..b3c96d0 --- /dev/null +++ b/src/domain/order_decider.rs @@ -0,0 +1,88 @@ +use fmodel_rust::decider::Decider; + +use crate::domain::api::{ + OrderCommand, OrderCreated, OrderEvent, OrderId, OrderLineItem, OrderNotCreated, + OrderNotPrepared, OrderPrepared, OrderStatus, Reason, RestaurantId, +}; + +/// The state of the Order is represented by this struct. It belongs to the Domain layer. +#[derive(Clone, PartialEq, Debug)] +pub struct Order { + pub identifier: OrderId, + pub restaurant_identifier: RestaurantId, + pub status: OrderStatus, + pub line_items: Vec, +} + +/// A convenient type alias for the Order decider +pub type OrderDecider<'a> = Decider<'a, OrderCommand, Option, OrderEvent>; + +/// Decider is a datatype/struct that represents the main decision-making algorithm. It belongs to the Domain layer. +pub fn order_decider<'a>() -> OrderDecider<'a> { + Decider { + // Decide new events based on the current state and the command + // Exhaustive pattern matching on the command + decide: Box::new(|command, state| match command { + OrderCommand::Create(command) => { + if state.is_some() { + vec![OrderEvent::NotCreated(OrderNotCreated { + identifier: command.identifier.to_owned(), + restaurant_identifier: command.restaurant_identifier.to_owned(), + line_items: command.line_items.to_owned(), + reason: Reason("Order already exists".to_string()), + r#final: false, + })] + } else { + vec![OrderEvent::Created(OrderCreated { + identifier: command.identifier.to_owned(), + restaurant_identifier: command.restaurant_identifier.to_owned(), + status: OrderStatus::Created, + line_items: command.line_items.to_owned(), + r#final: false, + })] + } + } + OrderCommand::MarkAsPrepared(command) => { + if state + .clone() + .is_some_and(|s| OrderStatus::Created == s.status) + { + vec![OrderEvent::Prepared(OrderPrepared { + identifier: command.identifier.to_owned(), + status: OrderStatus::Prepared, + r#final: true, + })] + } else { + vec![OrderEvent::NotPrepared(OrderNotPrepared { + identifier: command.identifier.to_owned(), + reason: Reason("Order in the wrong status previously".to_string()), + r#final: false, + })] + } + } + }), + // Evolve the state based on the current state and the event + // Exhaustive pattern matching on the event + evolve: Box::new(|state, event| match event { + OrderEvent::Created(event) => Some(Order { + identifier: event.identifier.to_owned(), + restaurant_identifier: event.restaurant_identifier.to_owned(), + status: event.status.to_owned(), + line_items: event.line_items.to_owned(), + }), + // On error event we choose NOT TO change the state of the Order, for example. + OrderEvent::NotCreated(..) => state.clone(), + OrderEvent::Prepared(event) => state.clone().map(|s| Order { + identifier: event.identifier.to_owned(), + restaurant_identifier: s.restaurant_identifier, + status: event.status.to_owned(), + line_items: s.line_items, + }), + // On error event we choose NOT TO change the state of the Order, for example. + OrderEvent::NotPrepared(..) => state.clone(), + }), + + // The initial state of the decider + initial_state: Box::new(|| None), + } +} diff --git a/src/domain/order_saga.rs b/src/domain/order_saga.rs new file mode 100644 index 0000000..3bacc2e --- /dev/null +++ b/src/domain/order_saga.rs @@ -0,0 +1,37 @@ +use fmodel_rust::saga::Saga; + +use crate::domain::api::{CreateOrder, OrderCommand, RestaurantEvent}; + +/// A convenient type alias for the Order choreography saga +type OrderSaga<'a> = Saga<'a, RestaurantEvent, OrderCommand>; + +/// The Order choreography saga - represents the central point of control deciding what to execute next. +/// It is a function that takes an event and returns a list of commands. +pub fn order_saga<'a>() -> OrderSaga<'a> { + Saga { + react: Box::new(|event| match event { + RestaurantEvent::OrderPlaced(event) => { + vec![OrderCommand::Create(CreateOrder { + identifier: event.order_identifier.to_owned(), + restaurant_identifier: event.identifier.to_owned(), + line_items: event.line_items.to_owned(), + })] + } + RestaurantEvent::OrderNotPlaced(..) => { + vec![] + } + RestaurantEvent::NotCreated(..) => { + vec![] + } + RestaurantEvent::MenuNotChanged(..) => { + vec![] + } + RestaurantEvent::Created(..) => { + vec![] + } + RestaurantEvent::MenuChanged(..) => { + vec![] + } + }), + } +} diff --git a/src/domain/order_view.rs b/src/domain/order_view.rs new file mode 100644 index 0000000..02d1726 --- /dev/null +++ b/src/domain/order_view.rs @@ -0,0 +1,46 @@ +use fmodel_rust::view::View; +use serde::{Deserialize, Serialize}; + +use crate::domain::api::{OrderEvent, OrderId, OrderLineItem, OrderStatus, RestaurantId}; + +/// The state of the Order is represented by this struct. It belongs to the Domain layer. +#[derive(Clone, PartialEq, Debug, Serialize, Deserialize)] +pub struct OrderViewState { + pub identifier: OrderId, + pub restaurant_identifier: RestaurantId, + pub status: OrderStatus, + pub line_items: Vec, +} + +/// A convenient type alias for the Order view +type OrderView<'a> = View<'a, Option, OrderEvent>; + +/// View represents the event handling algorithm. It belongs to the Domain layer. +pub fn order_view<'a>() -> OrderView<'a> { + View { + // Evolve the state based on the current state and the event + // Exhaustive pattern matching on the event + evolve: Box::new(|state, event| match event { + OrderEvent::Created(event) => Some(OrderViewState { + identifier: event.identifier.to_owned(), + restaurant_identifier: event.restaurant_identifier.to_owned(), + status: event.status.to_owned(), + line_items: event.line_items.to_owned(), + }), + // On error event we choose NOT TO change the state of the Order, for example. + OrderEvent::NotCreated(..) => state.clone(), + + OrderEvent::Prepared(event) => state.clone().map(|s| OrderViewState { + identifier: event.identifier.to_owned(), + restaurant_identifier: s.restaurant_identifier, + status: event.status.to_owned(), + line_items: s.line_items, + }), + // On error event we choose NOT TO change the state of the Order, for example. + OrderEvent::NotPrepared(..) => state.clone(), + }), + + // The initial state of the decider + initial_state: Box::new(|| None), + } +} diff --git a/src/domain/restaurant_decider.rs b/src/domain/restaurant_decider.rs new file mode 100644 index 0000000..391fe13 --- /dev/null +++ b/src/domain/restaurant_decider.rs @@ -0,0 +1,111 @@ +use fmodel_rust::decider::Decider; + +use crate::domain::api::{ + OrderNotPlaced, OrderPlaced, Reason, RestaurantCommand, RestaurantCreated, RestaurantEvent, + RestaurantId, RestaurantMenu, RestaurantMenuChanged, RestaurantMenuNotChanged, RestaurantName, + RestaurantNotCreated, +}; + +/// The state of the Restaurant is represented by this struct. It belongs to the Domain layer. +#[derive(Clone, PartialEq, Debug)] +pub struct Restaurant { + identifier: RestaurantId, + name: RestaurantName, + menu: RestaurantMenu, +} + +/// A convenient type alias for the Restaurant decider +pub type RestaurantDecider<'a> = + Decider<'a, RestaurantCommand, Option, RestaurantEvent>; + +/// Decider is a datatype/struct that represents the main decision-making algorithm. It belongs to the Domain layer. +pub fn restaurant_decider<'a>() -> RestaurantDecider<'a> { + Decider { + // Decide new events based on the current state and the command + // Exhaustive pattern matching on the command + decide: Box::new(|command, state| match command { + RestaurantCommand::CreateRestaurant(command) => { + if state.is_some() { + vec![RestaurantEvent::NotCreated(RestaurantNotCreated { + identifier: command.identifier.to_owned(), + name: command.name.to_owned(), + menu: command.menu.to_owned(), + reason: Reason("Restaurant already exists".to_string()), + r#final: false, + })] + } else { + vec![RestaurantEvent::Created(RestaurantCreated { + identifier: command.identifier.to_owned(), + name: command.name.to_owned(), + menu: command.menu.to_owned(), + r#final: false, + })] + } + } + RestaurantCommand::ChangeMenu(command) => { + if state.is_some() { + vec![RestaurantEvent::MenuChanged(RestaurantMenuChanged { + identifier: command.identifier.to_owned(), + menu: command.menu.to_owned(), + r#final: false, + })] + } else { + vec![RestaurantEvent::MenuNotChanged(RestaurantMenuNotChanged { + identifier: command.identifier.to_owned(), + menu: command.menu.to_owned(), + reason: Reason("Restaurant does not exist".to_string()), + r#final: false, + })] + } + } + RestaurantCommand::PlaceOrder(command) => { + if state.is_some() { + vec![RestaurantEvent::OrderPlaced(OrderPlaced { + identifier: command.identifier.to_owned(), + order_identifier: command.order_identifier.to_owned(), + line_items: command.line_items.to_owned(), + r#final: false, + })] + } else { + vec![RestaurantEvent::OrderNotPlaced(OrderNotPlaced { + identifier: command.identifier.to_owned(), + order_identifier: command.order_identifier.to_owned(), + line_items: command.line_items.to_owned(), + reason: Reason("Restaurant does not exist".to_string()), + r#final: false, + })] + } + } + }), + // Evolve the state based on the current state and the event + // Exhaustive pattern matching on the event + evolve: Box::new(|state, event| match event { + RestaurantEvent::Created(event) => Some(Restaurant { + identifier: event.identifier.to_owned(), + name: event.name.to_owned(), + menu: event.menu.to_owned(), + }), + // On error event we choose NOT TO change the state of the Restaurant, for example. + RestaurantEvent::NotCreated(..) => state.clone(), + + RestaurantEvent::MenuChanged(event) => state.clone().map(|s| Restaurant { + identifier: event.identifier.to_owned(), + name: s.name, + menu: event.menu.to_owned(), + }), + // On error event we choose NOT TO change the state of the Restaurant, for example. + RestaurantEvent::MenuNotChanged(..) => state.clone(), + + RestaurantEvent::OrderPlaced(event) => state.clone().map(|s| Restaurant { + identifier: event.identifier.to_owned(), + name: s.name, + menu: s.menu, + }), + // On error event we choose NOT TO change the state of the Restaurant, for example. + RestaurantEvent::OrderNotPlaced(..) => state.clone(), + }), + + // The initial state of the decider + initial_state: Box::new(|| None), + } +} diff --git a/src/domain/restaurant_saga.rs b/src/domain/restaurant_saga.rs new file mode 100644 index 0000000..e1c42db --- /dev/null +++ b/src/domain/restaurant_saga.rs @@ -0,0 +1,28 @@ +use fmodel_rust::saga::Saga; + +use crate::domain::api::{OrderEvent, RestaurantCommand}; + +/// A convenient type alias for the Restaurant choreography saga +type RestaurantSaga<'a> = Saga<'a, OrderEvent, RestaurantCommand>; + +/// The Restaurant choreography saga - represents the central point of control deciding what to execute next. +/// It is a function that takes an event and returns a list of commands. +/// This Saga is not doing much ;) +pub fn restaurant_saga<'a>() -> RestaurantSaga<'a> { + Saga { + react: Box::new(|_event| match _event { + OrderEvent::Created(..) => { + vec![] + } + OrderEvent::NotCreated(..) => { + vec![] + } + OrderEvent::Prepared(..) => { + vec![] + } + OrderEvent::NotPrepared(..) => { + vec![] + } + }), + } +} diff --git a/src/domain/restaurant_view.rs b/src/domain/restaurant_view.rs new file mode 100644 index 0000000..b43682c --- /dev/null +++ b/src/domain/restaurant_view.rs @@ -0,0 +1,51 @@ +use fmodel_rust::view::View; +use serde::{Deserialize, Serialize}; + +use crate::domain::api::{RestaurantEvent, RestaurantId, RestaurantMenu, RestaurantName}; + +/// The state of the Restaurant View is represented by this struct. It belongs to the Domain layer. +#[derive(Clone, PartialEq, Debug, Serialize, Deserialize)] +pub struct RestaurantViewState { + pub identifier: RestaurantId, + pub name: RestaurantName, + pub menu: RestaurantMenu, +} + +/// A convenient type alias for the Restaurant view +type RestaurantView<'a> = View<'a, Option, RestaurantEvent>; + +/// View represents the event handling algorithm. It belongs to the Domain layer. +pub fn restaurant_view<'a>() -> RestaurantView<'a> { + View { + // Evolve the state based on the current state and the event + // Exhaustive pattern matching on the event + evolve: Box::new(|state, event| match event { + RestaurantEvent::Created(event) => Some(RestaurantViewState { + identifier: event.identifier.to_owned(), + name: event.name.to_owned(), + menu: event.menu.to_owned(), + }), + // On error event we choose NOT TO change the state of the RestaurantView, for example. + RestaurantEvent::NotCreated(..) => state.clone(), + + RestaurantEvent::MenuChanged(event) => state.clone().map(|s| RestaurantViewState { + identifier: event.identifier.to_owned(), + name: s.name, + menu: event.menu.to_owned(), + }), + // On error event we choose NOT TO change the state of the RestaurantView, for example. + RestaurantEvent::MenuNotChanged(..) => state.clone(), + + RestaurantEvent::OrderPlaced(event) => state.clone().map(|s| RestaurantViewState { + identifier: event.identifier.to_owned(), + name: s.name, + menu: s.menu, + }), + // On error event we choose NOT TO change the state of the RestaurantView, for example. + RestaurantEvent::OrderNotPlaced(..) => state.clone(), + }), + + // The initial state of the decider + initial_state: Box::new(|| None), + } +} diff --git a/src/framework/application/event_sourced_aggregate.rs b/src/framework/application/event_sourced_aggregate.rs new file mode 100644 index 0000000..bddf95f --- /dev/null +++ b/src/framework/application/event_sourced_aggregate.rs @@ -0,0 +1,228 @@ +// ################################################################### +// ###################### Regular Aggregate ########################## +// ################################################################### + +use crate::framework::domain::api::{DeciderType, EventType, Identifier, IsFinal}; +use crate::framework::infrastructure::errors::ErrorMessage; +use crate::framework::infrastructure::event_repository::{ + EventOrchestratingRepository, EventRepository, +}; +use fmodel_rust::decider::{Decider, EventComputation}; +use fmodel_rust::saga::Saga; +use serde::de::DeserializeOwned; +use serde::Serialize; +use std::fmt::Debug; +use std::marker::PhantomData; +use uuid::Uuid; + +/// Event sourced aggregate is composed of a repository and a decider. +/// The repository is responsible for fetching and saving events, and it is `sync`, not `async`. +#[allow(dead_code)] +pub struct EventSourcedAggregate +where + Repository: EventRepository, + Decider: EventComputation, + C: Identifier, + E: EventType + Identifier + IsFinal + DeciderType + DeserializeOwned + Serialize, +{ + repository: Repository, + decider: Decider, + _marker: PhantomData<(C, S, E)>, +} + +/// Implementation of the event computation for the event sourced aggregate. +impl EventComputation + for EventSourcedAggregate +where + Repository: EventRepository, + Decider: EventComputation, + C: Identifier, + E: EventType + Identifier + IsFinal + DeciderType + DeserializeOwned + Serialize, +{ + /// Computes new events based on the current events and the command. + fn compute_new_events(&self, current_events: &[E], command: &C) -> Vec { + self.decider.compute_new_events(current_events, command) + } +} + +impl EventSourcedAggregate +where + Repository: EventRepository, + Decider: EventComputation, + C: Identifier, + E: EventType + Identifier + IsFinal + DeciderType + DeserializeOwned + Serialize, +{ + /// Creates a new event sourced aggregate. + #[allow(dead_code)] + pub fn new(repository: Repository, decider: Decider) -> Self { + EventSourcedAggregate { + repository, + decider, + _marker: PhantomData, + } + } + /// Handles the command and returns the new events. + #[allow(dead_code)] + pub fn handle(&self, command: &C) -> Result, ErrorMessage> { + let events: Vec<(E, Uuid)> = self.repository.fetch_events(command)?; + let mut version: Option = None; + let mut current_events: Vec = vec![]; + for (event, ver) in events { + version = Some(ver); + current_events.push(event); + } + let new_events = self.decider.compute_new_events(¤t_events, command); + self.repository.save(&new_events, &version) + } +} + +// ################################################################### +// ################### Orchestrating Aggregate ####################### +// ################################################################### + +/// Event sourced orchestrating aggregate is composed of a repository, a decider, and a saga. +/// The repository is responsible for fetching and saving events, and it is `sync`, not `async`. +pub struct EventSourcedOrchestratingAggregate<'a, C, S, E, Repository> +where + Repository: EventOrchestratingRepository, + C: Identifier, + E: Clone + + EventType + + Identifier + + IsFinal + + DeciderType + + DeserializeOwned + + Serialize + + Debug, +{ + repository: Repository, + decider: Decider<'a, C, S, E>, + saga: Saga<'a, E, C>, + _marker: PhantomData<(C, S, E)>, +} + +/// Implementation of the event computation for the event sourced orchestrating aggregate. +impl<'a, C, S, E, Repository> EventComputation + for EventSourcedOrchestratingAggregate<'a, C, S, E, Repository> +where + Repository: EventOrchestratingRepository, + C: Identifier, + E: Clone + + EventType + + Identifier + + IsFinal + + DeciderType + + DeserializeOwned + + Serialize + + Debug, +{ + fn compute_new_events(&self, current_events: &[E], command: &C) -> Vec { + let current_state: S = current_events + .iter() + .fold((self.decider.initial_state)(), |state, event| { + (self.decider.evolve)(&state, event) + }); + + // Initial resulting events from the decider's decision. + let initial_events = (self.decider.decide)(command, ¤t_state); + + // Commands to process derived from initial resulting events. + let commands_to_process: Vec = initial_events + .iter() + .flat_map(|event| (self.saga.react)(event)) + .collect(); + + // Collect all events including recursively computed new events. + let mut all_events = initial_events.clone(); // Start with initial events. + + for command in commands_to_process.iter() { + let previous_events = [ + self.repository + .fetch_events(command) + .unwrap_or_default() + .iter() + .map(|(e, _)| e.clone()) + .collect::>(), + initial_events.clone(), + ] + .concat(); + + // Recursively compute new events and extend the accumulated events list. + let new_events = self.compute_new_events(&previous_events, command); + all_events.extend(new_events); + } + + all_events + } +} + +impl<'a, C, S, E, Repository> EventSourcedOrchestratingAggregate<'a, C, S, E, Repository> +where + Repository: EventOrchestratingRepository, + C: Identifier, + E: Clone + + EventType + + Identifier + + IsFinal + + DeciderType + + DeserializeOwned + + Serialize + + Debug, +{ + /// Creates a new event sourced orchestrating aggregate. + pub fn new( + repository: Repository, + decider: Decider<'a, C, S, E>, + saga: Saga<'a, E, C>, + ) -> Self { + EventSourcedOrchestratingAggregate { + repository, + decider, + saga, + _marker: PhantomData, + } + } + /// Handles the command and returns the new events that are persisted. + pub fn handle(&self, command: &C) -> Result, ErrorMessage> { + let events: Vec = self + .repository + .fetch_events(command)? + .into_iter() + .map(|(e, _)| e) + .collect(); + let new_events = self.compute_new_events(&events, command); + self.repository.save(&new_events) + } + + /// Handles the list of commands and returns the new events that are persisted. + /// This method is useful for processing multiple commands in a single transaction. + /// Effects/Events of the previous commands are visible to the subsequent commands. + pub fn handle_all(&self, commands: &[C]) -> Result, ErrorMessage> { + let mut all_new_events: Vec = Vec::new(); + + for command in commands { + // Fetch events for the current command + let fetched_events: Vec = self + .repository + .fetch_events(command)? + .into_iter() + .map(|(e, _)| e) + .collect(); + + // Combine all previous new events with fetched events for the current command + let combined_events: Vec = fetched_events + .into_iter() + .chain(all_new_events.iter().cloned()) + .collect(); + + // Compute new events based on the combined events and the current command + let new_events = self.compute_new_events(&combined_events, command); + + // Accumulate all new events + all_new_events.extend(new_events); + } + + // Save all new events at the end + self.repository.save(&all_new_events) + } +} diff --git a/src/framework/application/mod.rs b/src/framework/application/mod.rs new file mode 100644 index 0000000..aec0f7b --- /dev/null +++ b/src/framework/application/mod.rs @@ -0,0 +1 @@ +pub mod event_sourced_aggregate; diff --git a/src/framework/domain/api.rs b/src/framework/domain/api.rs new file mode 100644 index 0000000..2ca0915 --- /dev/null +++ b/src/framework/domain/api.rs @@ -0,0 +1,21 @@ +use uuid::Uuid; + +/// A trait for identifying messages/events/commands +pub trait Identifier { + fn identifier(&self) -> Uuid; +} + +/// A trait for identifying the type/name of an event +pub trait EventType { + fn event_type(&self) -> String; +} + +/// A trait for identifying if an event is final +pub trait IsFinal { + fn is_final(&self) -> bool; +} + +/// A trait for identifying the type/name of a decider in the event. +pub trait DeciderType { + fn decider_type(&self) -> String; +} diff --git a/src/framework/domain/mod.rs b/src/framework/domain/mod.rs new file mode 100644 index 0000000..e5fdf85 --- /dev/null +++ b/src/framework/domain/mod.rs @@ -0,0 +1 @@ +pub mod api; diff --git a/src/framework/infrastructure/errors.rs b/src/framework/infrastructure/errors.rs new file mode 100644 index 0000000..e4a1d0f --- /dev/null +++ b/src/framework/infrastructure/errors.rs @@ -0,0 +1,26 @@ +use serde::{Deserialize, Serialize}; +use std::error::Error; +use std::fmt; + +/// Error message to be returned to the client +#[derive(Serialize, Deserialize)] +pub struct ErrorMessage { + pub message: String, +} + +/// Implement Display for ErrorMessage +impl fmt::Display for ErrorMessage { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "{}", self.message) + } +} + +/// Implement Debug for ErrorMessage +impl fmt::Debug for ErrorMessage { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "ErrorMessage: {}", self.message) + } +} + +/// Implement Error for ErrorMessage +impl Error for ErrorMessage {} diff --git a/src/framework/infrastructure/event_repository.rs b/src/framework/infrastructure/event_repository.rs new file mode 100644 index 0000000..89bd828 --- /dev/null +++ b/src/framework/infrastructure/event_repository.rs @@ -0,0 +1,322 @@ +use crate::framework::domain::api::{DeciderType, EventType, Identifier, IsFinal}; +use crate::framework::infrastructure::errors::ErrorMessage; +use pgrx::{IntoDatum, JsonB, PgBuiltInOids, Spi, Uuid}; +use serde::de::DeserializeOwned; +use serde::Serialize; +use std::fmt::Debug; +use uuid::Uuid as UUID; + +/// Converts a `JsonB` to an event payload type. +fn to_event(jsonb: JsonB) -> Result { + let value = jsonb.0.clone(); + serde_json::from_value(value).map_err(|err| ErrorMessage { + message: "Failed to deserialize event: ".to_string() + &err.to_string(), + }) +} +/// A trait for event repositories. +pub trait EventRepository +where + C: Identifier, + E: Identifier + EventType + IsFinal + DeciderType + DeserializeOwned + Serialize, +{ + /// Fetches current events, based on the command. + fn fetch_events(&self, command: &C) -> Result, ErrorMessage> { + let query = "SELECT * FROM events WHERE decider_id = $1 ORDER BY events.offset"; + Spi::connect(|client| { + let mut results = Vec::new(); + let tup_table = client + .select( + query, + None, + Some(vec![( + PgBuiltInOids::TEXTOID.oid(), + command.identifier().to_string().into_datum(), + )]), + ) + .map_err(|err| ErrorMessage { + message: "Failed to fetch events: ".to_string() + &err.to_string(), + })?; + for row in tup_table { + let data = row["data"].value::().map_err(|err| ErrorMessage { + message: "Failed to fetch event data/payload (map `data` to `JsonB`): ".to_string() + &err.to_string(), + })?.ok_or(ErrorMessage { + message: "Failed to fetch event data/payload (map `data` to `JsonB`): No data/payload found".to_string(), + })?; + let event_id = row["event_id"] + .value::() + .map_err(|err| ErrorMessage { + message: "Failed to fetch event id (map `event_id` to `Uuid`): " + .to_string() + + &err.to_string(), + })? + .ok_or(ErrorMessage { + message: + "Failed to fetch event id (map `data` to `JsonB`): No event id found" + .to_string(), + })?; + + results.push((to_event(data)?, UUID::from_bytes(*event_id.as_bytes()))); + } + Ok(results) + }) + } + /// Saves events. + fn save( + &self, + events: &[E], + latest_version: &Option, + ) -> Result, ErrorMessage> { + let query = " + INSERT INTO events (event, event_id, decider, decider_id, data, command_id, previous_id, final) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8) + RETURNING *"; + + Spi::connect(|mut client| { + let mut results = Vec::new(); + let mut version = latest_version.to_owned(); + for event in events { + let data = serde_json::to_value(event).map_err(|err| ErrorMessage { + message: "Failed to save event! Failed to serialize event data/payload: " + .to_string() + + &err.to_string(), + })?; + let event_id: UUID = UUID::new_v4(); + let tup_table = client + .update( + query, + None, + Some(vec![ + ( + PgBuiltInOids::TEXTOID.oid(), + event.event_type().into_datum(), + ), + ( + PgBuiltInOids::UUIDOID.oid(), + event_id.to_string().into_datum(), + ), + ( + PgBuiltInOids::TEXTOID.oid(), + event.decider_type().into_datum(), + ), + ( + PgBuiltInOids::UUIDOID.oid(), + event.identifier().to_string().into_datum(), + ), + (PgBuiltInOids::JSONBOID.oid(), JsonB(data).into_datum()), + ( + PgBuiltInOids::UUIDOID.oid(), + event_id.to_string().into_datum(), + ), + ( + PgBuiltInOids::UUIDOID.oid(), + version + .map(|v| Uuid::from_bytes(v.into_bytes())) + .into_datum(), + ), + (PgBuiltInOids::BOOLOID.oid(), event.is_final().into_datum()), + ]), + ) + .map_err(|err| ErrorMessage { + message: "Failed to save event: ".to_string() + &err.to_string(), + })?; + + for row in tup_table { + let data = row["data"].value::().map_err(|err| ErrorMessage { + message: "Failed to save event data/payload (map `data` to `JsonB`): ".to_string() + &err.to_string(), + })?.ok_or(ErrorMessage { + message: "Failed to save event data/payload (map `data` to `JsonB`): No data/payload found".to_string(), + })?; + let event_id = row["event_id"] + .value::() + .map_err(|err| ErrorMessage { + message: "Failed to save event id (map `event_id` to `Uuid`): " + .to_string() + + &err.to_string(), + })? + .ok_or(ErrorMessage { + message: + "Failed to save event id (map `data` to `JsonB`): No event id found" + .to_string(), + })?; + + results.push((to_event(data)?, UUID::from_bytes(*event_id.as_bytes()))); + } + version = Some(event_id); + } + Ok(results) + }) + } +} + +/// A trait for event orchestrating repositories. +pub trait EventOrchestratingRepository +where + C: Identifier, + E: Clone + + Identifier + + EventType + + IsFinal + + DeciderType + + DeserializeOwned + + Serialize + + Debug, +{ + /// Fetches current events, based on the command. + fn fetch_events(&self, command: &C) -> Result, ErrorMessage> { + let query = "SELECT * FROM events WHERE decider_id = $1 ORDER BY events.offset"; + Spi::connect(|client| { + let mut results = Vec::new(); + let tup_table = client + .select( + query, + None, + Some(vec![( + PgBuiltInOids::TEXTOID.oid(), + command.identifier().to_string().into_datum(), + )]), + ) + .map_err(|err| ErrorMessage { + message: "Failed to fetch events: ".to_string() + &err.to_string(), + })?; + for row in tup_table { + let data = row["data"].value::().map_err(|err| ErrorMessage { + message: "Failed to fetch event data/payload (map `data` to `JsonB`): ".to_string() + &err.to_string(), + })?.ok_or(ErrorMessage { + message: "Failed to fetch event data/payload (map `data` to `JsonB`): No data/payload found".to_string(), + })?; + let event_id = row["event_id"] + .value::() + .map_err(|err| ErrorMessage { + message: "Failed to fetch event id (map `event_id` to `Uuid`): " + .to_string() + + &err.to_string(), + })? + .ok_or(ErrorMessage { + message: + "Failed to fetch event id (map `data` to `JsonB`): No event id found" + .to_string(), + })?; + results.push((to_event(data)?, UUID::from_bytes(*event_id.as_bytes()))); + } + Ok(results) + }) + } + + /// Fetches the latest version of the event stream to which the event belongs. + fn fetch_latest_version(&self, event: &E) -> Result, ErrorMessage> { + let query = + "SELECT * FROM events WHERE decider_id = $1 ORDER BY events.offset DESC LIMIT 1"; + Spi::connect(|client| { + let mut results = Vec::new(); + let tup_table = client + .select( + query, + None, + Some(vec![( + PgBuiltInOids::TEXTOID.oid(), + event.identifier().to_string().into_datum(), + )]), + ) + .map_err(|err| ErrorMessage { + message: "Failed to fetch latest event / version: ".to_string() + + &err.to_string(), + })?; + for row in tup_table { + let event_id = row["event_id"] + .value::() + .map_err(|err| ErrorMessage { + message: "Failed to fetch latest event id (map `event_id` to `Uuid`): " + .to_string() + + &err.to_string(), + })? + .ok_or(ErrorMessage { + message: + "Failed to fetch latest event id (map `data` to `JsonB`): No event id found" + .to_string(), + })?; + results.push(UUID::from_bytes(*event_id.as_bytes())); + } + Ok(results.first().cloned()) + }) + } + /// Saves events. + fn save(&self, events: &[E]) -> Result, ErrorMessage> { + let query = " + INSERT INTO events (event, event_id, decider, decider_id, data, command_id, previous_id, final) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8) + RETURNING *"; + + Spi::connect(|mut client| { + let mut results = Vec::new(); + for event in events { + let data = serde_json::to_value(event).map_err(|err| ErrorMessage { + message: "Failed to save event! Failed to serialize event data/payload: " + .to_string() + + &err.to_string(), + })?; + let version = self.fetch_latest_version(event)?; + let event_id: UUID = UUID::new_v4(); + let tup_table = client + .update( + query, + None, + Some(vec![ + ( + PgBuiltInOids::TEXTOID.oid(), + event.event_type().into_datum(), + ), + ( + PgBuiltInOids::UUIDOID.oid(), + event_id.to_string().into_datum(), + ), + ( + PgBuiltInOids::TEXTOID.oid(), + event.decider_type().into_datum(), + ), + ( + PgBuiltInOids::TEXTOID.oid(), + event.identifier().to_string().into_datum(), + ), + (PgBuiltInOids::JSONBOID.oid(), JsonB(data).into_datum()), + ( + PgBuiltInOids::UUIDOID.oid(), + event_id.to_string().into_datum(), + ), + ( + PgBuiltInOids::UUIDOID.oid(), + version + .map(|v| Uuid::from_bytes(v.into_bytes())) + .into_datum(), + ), + (PgBuiltInOids::BOOLOID.oid(), event.is_final().into_datum()), + ]), + ) + .map_err(|err| ErrorMessage { + message: "Failed to save event: ".to_string() + &err.to_string(), + })?; + + for row in tup_table { + let data = row["data"].value::().map_err(|err| ErrorMessage { + message: "Failed to save event data/payload (map `data` to `JsonB`): ".to_string() + &err.to_string(), + })?.ok_or(ErrorMessage { + message: "Failed to save event data/payload (map `data` to `JsonB`): No data/payload found".to_string(), + })?; + let event_id = row["event_id"] + .value::() + .map_err(|err| ErrorMessage { + message: "Failed to save event id (map `event_id` to `Uuid`): " + .to_string() + + &err.to_string(), + })? + .ok_or(ErrorMessage { + message: + "Failed to save event id (map `data` to `JsonB`): No event id found" + .to_string(), + })?; + results.push((to_event(data)?, UUID::from_bytes(*event_id.as_bytes()))); + } + } + Ok(results) + }) + } +} diff --git a/src/framework/infrastructure/mod.rs b/src/framework/infrastructure/mod.rs new file mode 100644 index 0000000..b8ff3c6 --- /dev/null +++ b/src/framework/infrastructure/mod.rs @@ -0,0 +1,2 @@ +pub mod errors; +pub mod event_repository; diff --git a/src/framework/mod.rs b/src/framework/mod.rs new file mode 100644 index 0000000..50d0795 --- /dev/null +++ b/src/framework/mod.rs @@ -0,0 +1,3 @@ +pub mod application; +pub mod domain; +pub mod infrastructure; diff --git a/src/infrastructure/mod.rs b/src/infrastructure/mod.rs new file mode 100644 index 0000000..2f9ca1d --- /dev/null +++ b/src/infrastructure/mod.rs @@ -0,0 +1 @@ +pub mod order_restaurant_event_repository; diff --git a/src/infrastructure/order_restaurant_event_repository.rs b/src/infrastructure/order_restaurant_event_repository.rs new file mode 100644 index 0000000..11f4581 --- /dev/null +++ b/src/infrastructure/order_restaurant_event_repository.rs @@ -0,0 +1,15 @@ +use crate::domain::{Command, Event}; +use crate::framework::infrastructure::event_repository::EventOrchestratingRepository; + +/// An event repository for the restaurant and order domain(s). +pub struct OrderAndRestaurantEventRepository {} + +/// Implementation of the event orchestrating repository for the restaurant and order domain(s). +impl EventOrchestratingRepository for OrderAndRestaurantEventRepository {} + +impl OrderAndRestaurantEventRepository { + /// Creates a new restaurant and order event repository. + pub fn new() -> Self { + OrderAndRestaurantEventRepository {} + } +} diff --git a/src/lib.rs b/src/lib.rs new file mode 100644 index 0000000..62af7d8 --- /dev/null +++ b/src/lib.rs @@ -0,0 +1,121 @@ +use crate::application::order_restaurant_aggregate::OrderAndRestaurantAggregate; +use crate::domain::{order_restaurant_decider, order_restaurant_saga, Command, Event}; +use crate::framework::infrastructure::errors::ErrorMessage; +use crate::infrastructure::order_restaurant_event_repository::OrderAndRestaurantEventRepository; +use pgrx::prelude::*; + +mod application; +mod domain; +mod framework; +mod infrastructure; + +pg_module_magic!(); + +extension_sql_file!( + "../sql/event_sourcing.sql", + name = "event_sourcing", + bootstrap +); + +/// Command handler for the whole domain / both, orders and restaurants included. +/// It handles a single command and returns a list of events that were generated and persisted. +#[pg_extern] +fn handle(command: Command) -> Result, ErrorMessage> { + let repository = OrderAndRestaurantEventRepository::new(); + let aggregate = OrderAndRestaurantAggregate::new( + repository, + order_restaurant_decider(), + order_restaurant_saga(), + ); + aggregate + .handle(&command) + .map(|res| res.into_iter().map(|(e, _)| e.clone()).collect()) +} + +/// Compound command handler for the domain / both, orders and restaurants included +/// It handles a list of commands and returns a list of events that were generated and persisted. +/// All commands are executed in a single transaction, and the effects/events of the previous commands are visible to the subsequent commands. +/// If any of the commands fail, the transaction is rolled back, and no events are persisted. +/// This is useful when you need to ensure that all commands are executed or none. +#[pg_extern] +fn handle_all(commands: Vec) -> Result, ErrorMessage> { + let repository = OrderAndRestaurantEventRepository::new(); + let aggregate = OrderAndRestaurantAggregate::new( + repository, + order_restaurant_decider(), + order_restaurant_saga(), + ); + aggregate + .handle_all(&commands) + .map(|res| res.into_iter().map(|(e, _)| e.clone()).collect()) +} + +#[cfg(any(test, feature = "pg_test"))] +#[pg_schema] +mod tests { + use crate::domain::api::{CreateRestaurant, RestaurantCreated}; + use crate::domain::api::{ + MenuId, MenuItem, MenuItemId, MenuItemName, Money, RestaurantId, RestaurantMenu, + RestaurantMenuCuisine, RestaurantName, + }; + use crate::domain::{Command, Event}; + use pgrx::prelude::*; + use uuid::Uuid; + + #[pg_test] + fn create_restaurant_test() { + let restaurant_identifier = + RestaurantId(Uuid::parse_str("02f09a3f-1624-3b1d-8409-44eff7708208").unwrap()); + let restaurant_name = RestaurantName("Test Restaurant".to_string()); + let menu_item_id = + MenuItemId(Uuid::parse_str("02f09a3f-1624-3b1d-8409-44eff7708210").unwrap()); + let menu_id = MenuId(Uuid::parse_str("02f09a3f-1624-3b1d-8409-44eff7708210").unwrap()); + let menu_items = vec![MenuItem { + id: menu_item_id, + name: MenuItemName("Item 1".to_string()), + price: Money(100u64), + }]; + + let create_restaurant_command = Command::CreateRestaurant(CreateRestaurant { + identifier: restaurant_identifier.clone(), + name: restaurant_name.clone(), + menu: RestaurantMenu { + menu_id: menu_id.clone(), + items: menu_items.clone(), + cuisine: RestaurantMenuCuisine::Vietnamese, + }, + }); + let restaurant_created_event = Event::RestaurantCreated(RestaurantCreated { + identifier: restaurant_identifier.clone(), + name: restaurant_name.clone(), + menu: RestaurantMenu { + menu_id: menu_id.clone(), + items: menu_items.clone(), + cuisine: RestaurantMenuCuisine::Vietnamese, + }, + r#final: false, + }); + + assert_eq!( + Some(restaurant_created_event.clone()), + crate::handle(create_restaurant_command) + .unwrap() + .into_iter() + .next() + ); + } +} + +/// This module is required by `cargo pgrx test` invocations. +/// It must be visible at the root of your extension crate. +#[cfg(test)] +pub mod pg_test { + pub fn setup(_options: Vec<&str>) { + // perform one-off initialization when the pg_test framework starts + } + + pub fn postgresql_conf_options() -> Vec<&'static str> { + // return any postgresql.conf settings that are required for your tests + vec![] + } +}