diff --git a/examples/order-fulfillment-compensation/README.md b/examples/order-fulfillment-compensation/README.md new file mode 100644 index 000000000..1ea1cdf13 --- /dev/null +++ b/examples/order-fulfillment-compensation/README.md @@ -0,0 +1,286 @@ +# Order Fulfillment with Compensation (Quarkus Flow + Saga Pattern) + +A complete example demonstrating the **Compensation/Saga Pattern** for distributed transactions using: + +- **Quarkus** (hot-reload dev mode) +- **[Quarkus Flow](https://docs.quarkiverse.io/quarkus-flow/dev/)** (function-first fluent DSL for Serverless Workflow) +- **Try-Catch Error Handling** with automatic compensation/rollback +- **WebSocket** for real-time UI updates +- A **Single Page Application (SPA)** UI with live workflow visualization + +> Run locally with hot reload and _no manual infra setup_. Perfect for learning how to handle failures and rollbacks in distributed workflows. + +--- + +## πŸš€ Quick Start + +### Prerequisites + +- Java 17+ and Maven + +### 1) Run the app + +```bash +# from this example directory +mvn quarkus:dev +``` + +Open: **http://localhost:8080** + +> Quarkus dev mode gives you live reload for Java, resources, and the static UI. + +--- + +## 🧭 What you'll see + +The UI is a clean, responsive Single Page Application that visualizes the order fulfillment workflow in real-time. You can: + +1. **Submit Orders**: Enter one of four test order IDs to see different failure scenarios +2. **Watch Live Progress**: See each step execute in real-time via WebSocket updates +3. **Observe Compensation**: When a step fails, watch the automatic rollback of previous operations + +### Test Scenarios + +The example includes three pre-configured test orders that demonstrate different failure points: + +- **ORDER#001**: Fails at **Stock Reservation** (out of stock) + - Compensation: Cancels stock reservation + +- **ORDER#002**: Fails at **Payment Processing** (service unavailable) + - Compensation: Cancels payment β†’ Cancels stock reservation + +- **ORDER#003**: Fails at **Shipping** (carrier unavailable) + - Compensation: Cancels shipping β†’ Cancels payment β†’ Cancels stock reservation + +- **ORDER#004**: Succeeds through all steps with no failures + +Any other order ID will complete successfully through all three steps. + +--- + +## 🧩 Architecture: The Saga Pattern + +This example implements the **Saga Pattern** for managing distributed transactions. When a step fails, compensation functions automatically rollback all previously completed operations, ensuring data consistency. + +```text +β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” +β”‚ Order Fulfillment Workflow β”‚ +β”‚ β”‚ +β”‚ β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚ +β”‚ β”‚ Reserve │─────▢│ Process │─────▢│ Schedule β”‚ β”‚ +β”‚ β”‚ Stock β”‚ β”‚ Payment β”‚ β”‚ Shipping β”‚ β”‚ +β”‚ β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚ +β”‚ β”‚ β”‚ β”‚ β”‚ +β”‚ β”‚ βœ— Fails β”‚ βœ— Fails β”‚ βœ— Fails β”‚ +β”‚ β–Ό β–Ό β–Ό β”‚ +β”‚ β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚ +β”‚ β”‚ Cancel β”‚ β”‚ Cancel β”‚ β”‚ Cancel β”‚ β”‚ +β”‚ β”‚ Reservation β”‚ β”‚ Payment β”‚ β”‚ Shipping β”‚ β”‚ +β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚ +β”‚ β”‚ β”‚ β”‚ +β”‚ β–Ό β–Ό β”‚ +β”‚ β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚ +β”‚ β”‚ Cancel β”‚ β”‚ Cancel β”‚ β”‚ +β”‚ β”‚ Reservation β”‚ β”‚ Payment β”‚ β”‚ +β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚ +β”‚ β–Ό β”‚ +β”‚ β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚ +β”‚ β”‚ Cancel β”‚ β”‚ +β”‚ β”‚ Reservation β”‚ β”‚ +β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚ +β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ +``` + +### Key Concepts + +1. **Forward Flow**: Each step (stock β†’ payment β†’ shipping) executes sequentially +2. **Compensation Flow**: On failure, compensation functions execute in reverse order +3. **Atomic Operations**: Each step is either fully completed or fully compensated +4. **Error Propagation**: Errors trigger immediate compensation and workflow termination + +--- + +## πŸ”§ The Workflow Definition + +The workflow uses Quarkus Flow's fluent DSL with nested `tryCatch` blocks. Each business operation has a corresponding compensation function: + +```java +public Workflow descriptor() { + return FuncWorkflowBuilder.workflow("order-fulfillment", "examples") + .tasks( + // Step 1: Try to reserve stock + tryCatch("tryStockReservation", + t -> t.tryCatch(function("stockReservation", this::reserveStock)) + .catchError( + err -> err.type(STOCK_ORDER_ERROR), + function("cancelStockReservation", this::cancelReservation) + .then(FlowDirectiveEnum.END) + )), + + // Step 2: Try to process payment + tryCatch("tryPaymentProcessing", + t -> t.tryCatch(function("paymentProcessing", this::processPayment)) + .catchWhen( + "${ .status == 503 }", + function("cancelPayment", this::cancelPayment) + .then(FlowDirectiveEnum.END))), + + // Step 3: Try to schedule shipping + tryCatch("tryShipping", + t -> t.tryCatch(function("scheduleShipping", this::scheduleShipping)) + .catchType( + SHIPPING_ERROR, + function("cancelShipping", this::cancelShipping) + .then(FlowDirectiveEnum.END))), + + // Final step: Mark workflow complete + function("endFlow", this::endFlow) + ) + .build(); +} +``` + +### Error Handling Strategies + +The example demonstrates three different error-catching approaches: + +1. **`catchError`**: Match by error type (e.g., `STOCK_ORDER_ERROR`) +2. **`catchWhen`**: Match by condition expression (e.g., `${ .status == 503 }`) +3. **`catchType`**: Match by error type constant (e.g., `SHIPPING_ERROR`) + +--- + +## 🎯 Business Logic + +Each operation is implemented as a simple Java method that can succeed or fail: + +```java +private OrderStep reserveStock(String order) { + log.info("Reserving stock for order: {}", order); + broadcastStep(order, "stockReservation", "processing", "Reserving stock..."); + + if (order.equals(ORDER_001)) { + broadcastStep(order, "stockReservation", "failed", "Out of stock"); + throw new WorkflowException(WorkflowError.error(STOCK_ORDER_ERROR, 409).build()); + } + + broadcastStep(order, "stockReservation", "completed", "Stock reserved"); + return new OrderStep(order, "reserved"); +} + +private OrderStep cancelReservation(String order) { + log.info("Cancelling reservation for order: {}", order); + broadcastStep(order, "compensation", "processing", "Cancelling stock reservation..."); + broadcastStep(order, "compensation", "completed", "Stock reservation cancelled"); + return new OrderStep(order, "error"); +} +``` + +### Compensation Chain + +When a later step fails, its compensation function calls the compensation of previous steps: + +```java +private OrderStep cancelShipping(OrderStep order) { + log.info("Cancel shipping for order: {}", order); + broadcastStep(order.orderId(), "compensation", "processing", + "Cancelling shipping and previous operations..."); + + // Cascade: Cancel shipping β†’ Cancel payment β†’ Cancel stock + cancelPayment(order); + + broadcastStep(order.orderId(), "compensation", "completed", + "All operations cancelled"); + return new OrderStep(order.orderId(), "error"); +} +``` + +--- + +## 🌐 REST API + +### Start Order Processing + +```bash +curl -X POST http://localhost:8080/api/orders \ + -H "Content-Type: application/json" \ + -d '{"orderId": "ORDER#001"}' +``` + +**Response:** +``` +HTTP/1.1 202 Accepted +Location: /orders/ORDER#001 +``` + +The workflow executes asynchronously. Use the WebSocket UI or check logs to monitor progress. + +--- + +## πŸ“‘ Real-Time Updates + +The example uses WebSocket to push workflow state changes to the browser in real-time: + +```java +private void broadcastStep(String orderId, String step, String status, String message) { + String json = String.format( + "{\"orderId\":\"%s\",\"step\":\"%s\",\"status\":\"%s\",\"message\":\"%s\"}", + orderId, step, status, message + ); + webSocket.broadcast(json); +} +``` + +The UI connects to `ws://localhost:8080/ws/orders` and displays each step as it executes. + +--- + +## πŸ›  Troubleshooting + +- **WebSocket not connecting**: Check browser console for errors. Ensure you're accessing via `http://localhost:8080` (not a different host/port). + +- **Workflow not starting**: Check Quarkus logs for exceptions. Verify the order ID is being sent correctly in the POST request. + +- **No compensation on failure**: Ensure you're using one of the test order IDs (ORDER#001, ORDER#002, ORDER#003). Other IDs will succeed. + +- **Port already in use**: Change the port in `application.properties`: `quarkus.http.port=8081` + +--- + +## πŸ“š Learn More + +This example demonstrates core concepts for building resilient distributed systems: + +- **Saga Pattern**: Coordinating distributed transactions without 2PC +- **Compensation Logic**: Undoing completed operations when later steps fail +- **Error Handling**: Different strategies for catching and handling errors +- **Real-Time Monitoring**: Using WebSocket for live workflow visualization + +### Key Takeaways + +1. **Explicit Compensation**: Each forward operation has a corresponding compensation function +2. **Cascading Rollback**: Later compensations call earlier ones to maintain consistency +3. **Type-Safe Workflows**: Pure Java records and enums ensure compile-time safety +4. **Declarative Error Handling**: Try-catch blocks in the workflow DSL, not scattered in business logic + +--- + +## πŸŽ“ Extend This Example + +Ideas for learning and experimentation: + +- Add a fourth step (e.g., "Send Confirmation Email") with its own compensation +- Implement retry logic before compensation (e.g., retry payment 3 times) +- Add a database to persist order state across restarts +- Create a dashboard showing all orders and their current status +- Implement partial compensation (e.g., partial refund instead of full cancellation) + +--- + +## πŸ“– Documentation + +- **Quarkus Flow**: [https://docs.quarkiverse.io/quarkus-flow/dev/](https://docs.quarkiverse.io/quarkus-flow/dev/) +- **CNCF Serverless Workflow**: [https://serverlessworkflow.io/](https://serverlessworkflow.io/) +- **Saga Pattern**: [https://microservices.io/patterns/data/saga.html](https://microservices.io/patterns/data/saga.html) + +Have fun building resilient workflows with Quarkus Flow! πŸŽ‰ \ No newline at end of file diff --git a/examples/order-fulfillment-compensation/pom.xml b/examples/order-fulfillment-compensation/pom.xml new file mode 100644 index 000000000..05f7bfef3 --- /dev/null +++ b/examples/order-fulfillment-compensation/pom.xml @@ -0,0 +1,153 @@ + + + 4.0.0 + org.acme + order-fulfillment-compensation + 1.0.0-SNAPSHOT + Quarkus Flow :: Examples :: Order Fulfillment Compensation + + + 3.15.0 + 17 + UTF-8 + UTF-8 + quarkus-bom + io.quarkus.platform + 3.34.6 + true + 3.5.5 + 1.0.0-SNAPSHOT + + + + + + ${quarkus.platform.group-id} + ${quarkus.platform.artifact-id} + ${quarkus.platform.version} + pom + import + + + + + + + io.quarkus + quarkus-arc + + + io.quarkus + quarkus-rest-jackson + + + io.quarkus + quarkus-junit + test + + + io.rest-assured + rest-assured + test + + + io.quarkiverse.flow + quarkus-flow + ${quarkus.flow.version} + + + io.quarkus + quarkus-websockets + + + + + + io.quarkiverse.flow + quarkus-flow-deployment + ${quarkus.flow.version} + test + + + io.quarkiverse.flow + quarkus-flow-mvstore-deployment + ${quarkus.flow.version} + test + + + + + + + ${quarkus.platform.group-id} + quarkus-maven-plugin + ${quarkus.platform.version} + true + + + + build + generate-code + generate-code-tests + native-image-agent + + + + + + maven-compiler-plugin + ${compiler-plugin.version} + + true + + + + maven-surefire-plugin + ${surefire-plugin.version} + + --add-opens java.base/java.lang=ALL-UNNAMED + + org.jboss.logmanager.LogManager + ${maven.home} + + + + + maven-failsafe-plugin + ${surefire-plugin.version} + + + + integration-test + verify + + + + + --add-opens java.base/java.lang=ALL-UNNAMED + + ${project.build.directory}/${project.build.finalName}-runner + org.jboss.logmanager.LogManager + ${maven.home} + + + + + + + + + native + + + native + + + + false + false + true + + + + diff --git a/examples/order-fulfillment-compensation/src/main/java/org/acme/flow/OrderFulfillmentWorkflow.java b/examples/order-fulfillment-compensation/src/main/java/org/acme/flow/OrderFulfillmentWorkflow.java new file mode 100644 index 000000000..8ebe7c814 --- /dev/null +++ b/examples/order-fulfillment-compensation/src/main/java/org/acme/flow/OrderFulfillmentWorkflow.java @@ -0,0 +1,151 @@ +package org.acme.flow; + +import io.quarkiverse.flow.Flow; +import io.quarkus.runtime.annotations.RegisterForReflection; +import io.serverlessworkflow.api.types.FlowDirectiveEnum; +import io.serverlessworkflow.api.types.Workflow; +import io.serverlessworkflow.fluent.func.FuncWorkflowBuilder; +import io.serverlessworkflow.fluent.func.dsl.FuncCallStep; +import io.serverlessworkflow.impl.WorkflowError; +import io.serverlessworkflow.impl.WorkflowException; +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.inject.Inject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static io.serverlessworkflow.fluent.func.dsl.FuncDSL.function; +import static io.serverlessworkflow.fluent.func.dsl.FuncDSL.tasks; +import static io.serverlessworkflow.fluent.func.dsl.FuncDSL.tryCatch; + +@ApplicationScoped +public class OrderFulfillmentWorkflow extends Flow { + + private static final Logger log = LoggerFactory.getLogger(OrderFulfillmentWorkflow.class.getName()); + + private static final String STOCK_ORDER_ERROR = "ERR_001"; + private static final String PAYMENT_PROCESSING_ERROR = "ERR_002"; + private static final String SHIPPING_ERROR = "ERR_003"; + + private static final String ORDER_001 = "ORDER#001"; + private static final String ORDER_002 = "ORDER#002"; + private static final String ORDER_003 = "ORDER#003"; + private static final String END_FLOW = "endFlow"; + + @Inject + WorkflowWebSocket webSocket; + + @Override + public Workflow descriptor() { + + FuncCallStep cancelStock = function("cancelStock", + (OrderStep o) -> cancelReservation(o.orderId())); + FuncCallStep cancelPayment = function("cancelPayment", + (OrderStep o) -> cancelPayment(o.orderId())); + + return FuncWorkflowBuilder.workflow("order-fulfillment", "examples") + .tasks( + tryCatch( + "tryStockReservation", + t -> t.tryCatch(function("stockReservation", this::reserveStock)) + .catchError( + err -> err.type(STOCK_ORDER_ERROR), + function("notifyStockFailure", this::notifyStockFailure) + .then(FlowDirectiveEnum.END))), + tryCatch( + "tryPaymentProcessing", + t -> t.tryCatch(function("paymentProcessing", this::processPayment)) + .catchWhen( + "${ .status == 503 }", + cancelStock.then(FlowDirectiveEnum.END))), + tryCatch( + "tryShipping", + t -> t.tryCatch(function("scheduleShipping", this::scheduleShipping)) + .catchType( + SHIPPING_ERROR, + tasks( + cancelPayment, + cancelStock)))) + .build(); + } + + private OrderStep reserveStock(String order) { + log.info("Reserving stock for order: {}", order); + broadcastStep(order, "stockReservation", "processing", "Reserving stock..."); + waitMs(500); + if (order.equals(ORDER_001)) { + broadcastStep(order, "stockReservation", "failed", "Stock reservation failed - Out of stock"); + throw new WorkflowException(WorkflowError.error(STOCK_ORDER_ERROR, 409).build()); + } + broadcastStep(order, "stockReservation", "completed", "Stock reserved successfully"); + return new OrderStep(order, "reserved"); + } + + private OrderStep processPayment(OrderStep order) { + log.info("Processing payment for order: {}", order); + broadcastStep(order.orderId(), "paymentProcessing", "processing", "Processing payment..."); + waitMs(800); + if (ORDER_002.equals(order.orderId())) { + broadcastStep(order.orderId(), "paymentProcessing", "failed", "Payment processing failed - Service unavailable"); + throw new WorkflowException(WorkflowError.error(PAYMENT_PROCESSING_ERROR, 503).build()); + } + broadcastStep(order.orderId(), "paymentProcessing", "completed", "Payment processed successfully"); + return new OrderStep(order.orderId(), "paid"); + } + + private OrderStep scheduleShipping(OrderStep order) { + log.info("Scheduling shipping for order: {}", order); + broadcastStep(order.orderId(), "shipping", "processing", "Scheduling shipping..."); + waitMs(800); + if (ORDER_003.equals(order.orderId())) { + broadcastStep(order.orderId(), "shipping", "failed", "Shipping failed - Carrier unavailable"); + throw new WorkflowException(WorkflowError.error(SHIPPING_ERROR, 500).build()); + } + broadcastStep(order.orderId(), "shipping", "completed", "Shipping scheduled successfully"); + return new OrderStep(order.orderId(), "shipping"); + } + + private WorkflowOutput notifyStockFailure(String order) { + log.info("Stock reservation failed for order: {}, there is nothing to compensate", order); + broadcastStep(order, "compensation", "failed", "Stock unavailable β€” no reservation to cancel"); + return new WorkflowOutput("error"); + } + + private WorkflowOutput cancelReservation(String order) { + log.info("Cancelling stock reservation for order: {}", order); + broadcastStep(order, "compensation", "processing", "Cancelling stock reservation..."); + waitMs(400); + broadcastStep(order, "compensation", "completed", "Stock reservation cancelled"); + return new WorkflowOutput("error"); + } + + private OrderStep cancelPayment(String order) { + log.info("Cancel payment for order: {}", order); + broadcastStep(order, "compensation", "processing", "Cancelling payment..."); + waitMs(400); + broadcastStep(order, "compensation", "completed", "Payment cancelled"); + return new OrderStep(order, "error"); + } + + private void broadcastStep(String orderId, String step, String status, String message) { + String json = String.format( + "{\"orderId\":\"%s\",\"step\":\"%s\",\"status\":\"%s\",\"message\":\"%s\"}", + orderId, step, status, message); + webSocket.broadcast(json); + } + + private static void waitMs(int ms) { + try { + Thread.sleep(ms); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + + @RegisterForReflection + public record OrderStep(String orderId, String status) { + } + + @RegisterForReflection + public record WorkflowOutput(String status) { + } +} diff --git a/examples/order-fulfillment-compensation/src/main/java/org/acme/flow/OrderResource.java b/examples/order-fulfillment-compensation/src/main/java/org/acme/flow/OrderResource.java new file mode 100644 index 000000000..37d30636b --- /dev/null +++ b/examples/order-fulfillment-compensation/src/main/java/org/acme/flow/OrderResource.java @@ -0,0 +1,78 @@ +package org.acme.flow; + +import io.serverlessworkflow.impl.WorkflowInstance; +import io.serverlessworkflow.impl.WorkflowModel; +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.inject.Inject; +import jakarta.ws.rs.POST; +import jakarta.ws.rs.Path; +import jakarta.ws.rs.core.Response; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.CompletableFuture; + +@Path("/api") +@ApplicationScoped +public class OrderResource { + + private static final Logger log = LoggerFactory.getLogger(OrderResource.class); + + @Inject + OrderFulfillmentWorkflow workflow; + + @Inject + WorkflowWebSocket webSocket; + + @POST + @Path("/orders") + public Response order(OrderRequest request) { + + String orderId = request.orderId(); + + // Broadcast order started + broadcast("{\"orderId\":\"%s\",\"status\":\"started\",\"message\":\"Order processing started\"}", orderId); + + try { + WorkflowInstance instance = workflow.instance(orderId); + + CompletableFuture future = instance.start(); + + future.thenAccept(model -> { + var output = model.as(OrderFulfillmentWorkflow.WorkflowOutput.class).orElseThrow(); + if ("error".equals(output.status())) { + broadcast( + "{\"orderId\":\"%s\",\"status\":\"failed\",\"message\":\"Compensation completed: Order cancelled\"}", + orderId); + } else { + broadcast("{\"orderId\":\"%s\",\"status\":\"completed\",\"message\":\"Order fulfilled successfully\"}", + orderId); + } + + }).exceptionally(throwable -> { + log.error("Order failed: {}", orderId, throwable); + broadcast("{\"orderId\":\"%s\",\"status\":\"failed\",\"message\":\"Compensation completed: Order cancelled\"}", + orderId); + return null; + }); + + return Response.accepted() + .header("Location", "/orders/" + orderId) + .build(); + } catch (Exception e) { + log.error("Failed to start workflow for order: {}", orderId, e); + broadcast("{\"orderId\":\"%s\",\"status\":\"failed\",\"message\":\"Failed to start order processing\"}", orderId); + return Response.serverError().build(); + } + } + + private void broadcast(String format, String orderId) { + webSocket.broadcast(String.format( + format, + orderId)); + } + + public record OrderRequest(String orderId) { + } + +} \ No newline at end of file diff --git a/examples/order-fulfillment-compensation/src/main/java/org/acme/flow/WorkflowWebSocket.java b/examples/order-fulfillment-compensation/src/main/java/org/acme/flow/WorkflowWebSocket.java new file mode 100644 index 000000000..c827badf8 --- /dev/null +++ b/examples/order-fulfillment-compensation/src/main/java/org/acme/flow/WorkflowWebSocket.java @@ -0,0 +1,46 @@ +package org.acme.flow; + +import io.quarkus.logging.Log; +import jakarta.inject.Singleton; +import jakarta.websocket.OnClose; +import jakarta.websocket.OnError; +import jakarta.websocket.OnOpen; +import jakarta.websocket.Session; +import jakarta.websocket.server.ServerEndpoint; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +@ServerEndpoint("/order-events") +@Singleton +public class WorkflowWebSocket { + + private final Map sessions = new ConcurrentHashMap<>(); + + @OnOpen + public void onOpen(Session session) { + sessions.put(session.getId(), session); + Log.info("WebSocket connection opened: " + session.getId()); + } + + @OnClose + public void onClose(Session session) { + sessions.remove(session.getId()); + Log.info("WebSocket connection closed: " + session.getId()); + } + + @OnError + public void onError(Session session, Throwable throwable) { + Log.error("WebSocket error for session " + session.getId(), throwable); + sessions.remove(session.getId()); + } + + public void broadcast(String message) { + sessions.values().forEach(session -> { + session.getAsyncRemote().sendText(message, result -> { + if (result.getException() != null) { + Log.error("Unable to send message to session " + session.getId(), result.getException()); + } + }); + }); + } +} diff --git a/examples/order-fulfillment-compensation/src/main/resources/META-INF/resources/index.html b/examples/order-fulfillment-compensation/src/main/resources/META-INF/resources/index.html new file mode 100644 index 000000000..81c8b1b7b --- /dev/null +++ b/examples/order-fulfillment-compensation/src/main/resources/META-INF/resources/index.html @@ -0,0 +1,681 @@ + + + + + + Order Fulfillment Workflow - Quarkus Flow + + + + + +
+ +
+

+ Order Fulfillment Workflow +

+

Compensation Pattern Demo - Live Workflow Visualization

+

Watch each step execute in real-time with automatic compensation on failures

+
+ + +
+
+
+ + + +
+
+ +
    +
  • β€’ ORDER#001 - Fails at stock reservation (triggers compensation)
  • +
  • β€’ ORDER#002 - Fails at payment processing (triggers compensation)
  • +
  • β€’ ORDER#003 - Fails at shipping (triggers compensation)
  • +
  • β€’ ORDER#004 - Completes successfully (no errors)
  • +
+
+
+
+ + +
+ +
+
+

ORDER#001

+
+
+

Stock Reservation Failure

+ + + + +
+ Ready to process... +
+ +
+ + +
+
+

ORDER#002

+
+
+

Payment Processing Failure

+ + + + +
+ Ready to process... +
+ +
+ + +
+
+

ORDER#003

+
+
+

Shipping Failure

+ + + + +
+ Ready to process... +
+ +
+ + +
+
+

ORDER#004

+
+
+

Successful Order Completion

+ + + + +
+ Ready to process... +
+ +
+
+ + +
+
+ Connecting... +
+ + +
+

Powered by Quarkus and Quarkus Flow

+
+
+ + + + \ No newline at end of file diff --git a/examples/order-fulfillment-compensation/src/main/resources/META-INF/resources/workflow-diagram.html b/examples/order-fulfillment-compensation/src/main/resources/META-INF/resources/workflow-diagram.html new file mode 100644 index 000000000..120ab7fee --- /dev/null +++ b/examples/order-fulfillment-compensation/src/main/resources/META-INF/resources/workflow-diagram.html @@ -0,0 +1,266 @@ + + + + + + Workflow Diagram - Order Fulfillment + + + + + +
+ +
+

+ Order Fulfillment Workflow +

+

Behind the Scenes - Compensation Pattern

+ + ← Back to Demo + +
+ + +
+

Legend

+
+
+ + + + Normal Flow +
+
+ + + + Error Path +
+
+ + + + Compensation +
+
+
+ + +
+

Main Workflow Flow

+ + + + + + + + + + + + + + + + + + START + + + + Stock + Reservation + + + + + Payment + Processing + + + + + Schedule + Shipping + + + + + END + + + + + Cancel Stock + + + Cancel Payment + + + Cancel Shipping + + + + + + + + + + + + + + ORDER#001 + ORDER#002 + ORDER#003 + +
+ + +
+ +
+

ORDER#001 Scenario

+
+
+ 1. + Workflow starts +
+
+ 2. + Stock Reservation FAILS (ERR_001, status 409) +
+
+
+ throw WorkflowException(ERR_001, 409) +
+
+ + +
+

ORDER#002 Scenario

+
+
+ 1. + Workflow starts +
+
+ 2. + Stock Reservation succeeds +
+
+ 3. + Payment Processing FAILS (ERR_002, status 503) +
+
+ 4. + Compensation: Cancel Stock +
+
+ 5. + Workflow ends (compensated) +
+
+
+ throw WorkflowException(ERR_002, 503) +
+
+ + +
+

ORDER#003 Scenario

+
+
+ 1. + Workflow starts +
+
+ 2. + Stock Reservation succeeds +
+
+ 3. + Payment Processing succeeds +
+
+ 4. + Shipping FAILS (ERR_003, status 500) +
+
+ 5. + Compensation: Cancel Payment β†’ Cancel Stock +
+
+ 6. + Workflow ends (compensated) +
+
+
+ throw WorkflowException(ERR_003, 500) +
+
+
+ + +
+

Technical Implementation

+
+
+

Workflow Pattern

+
    +
  • β€’ Try-Catch Blocks: Each step wrapped in error handling
  • +
  • β€’ Compensation Logic: Automatic rollback on failures
  • +
  • β€’ Error Types: Specific error codes (ERR_001, ERR_002, ERR_003)
  • +
  • β€’ Status Codes: HTTP-like status codes (409, 503, 500)
  • +
+
+
+

Real-time Updates

+
    +
  • β€’ WebSocket: Bidirectional communication
  • +
  • β€’ Events: started, completed, failed
  • +
  • β€’ Broadcasting: All connected clients notified
  • +
  • β€’ Auto-reconnect: Resilient connection handling
  • +
+
+
+
+
+ + diff --git a/examples/order-fulfillment-compensation/src/main/resources/application.properties b/examples/order-fulfillment-compensation/src/main/resources/application.properties new file mode 100644 index 000000000..e69de29bb diff --git a/examples/pom.xml b/examples/pom.xml index 3ffb04bec..165e5631e 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -24,6 +24,7 @@ micrometer-prometheus durable-workflows-k8s suspend-resume-abort + order-fulfillment-compensation resilient-task-orchestrator