diff --git a/README.md b/README.md index 3ec5798..5b650d9 100644 --- a/README.md +++ b/README.md @@ -1,658 +1,108 @@ # AWS Durable Execution SDK for Python +[![Build](https://github.com/aws/aws-durable-execution-sdk-python/actions/workflows/ci.yml/badge.svg)](https://github.com/aws/aws-durable-execution-sdk-python/actions/workflows/ci.yml) [![PyPI - Version](https://img.shields.io/pypi/v/aws-durable-execution-sdk-python.svg)](https://pypi.org/project/aws-durable-execution-sdk-python) [![PyPI - Python Version](https://img.shields.io/pypi/pyversions/aws-durable-execution-sdk-python.svg)](https://pypi.org/project/aws-durable-execution-sdk-python) - [![OpenSSF Scorecard](https://api.scorecard.dev/projects/github.com/aws/aws-durable-execution-sdk-python/badge)](https://scorecard.dev/viewer/?uri=github.com/aws/aws-durable-execution-sdk-python) +[![License](https://img.shields.io/badge/License-Apache%202.0-blue.svg)](LICENSE) ----- -## Table of Contents - -- [Installation](#installation) -- [License](#license) - -## Installation - -```console -pip install aws-durable-execution-sdk-python -``` - -## Developers -Please see [CONTRIBUTING.md](CONTRIBUTING.md). It contains the testing guide, sample commands and instructions -for how to contribute to this package. - -tldr; use `hatch` and it will manage virtual envs and dependencies for you, so you don't have to do it manually. - -## Core Architecture -The entry-point that consumers of the SDK interact with is the DurableContext. - -### DurableContext Operations -- **Core Methods**: `set_logger`, `step`, `invoke`, `map`, `parallel`, `run_in_child_context`, `wait`, `create_callback`, `wait_for_callback`, `wait_for_condition` -- **Thread Safety**: Uses `OrderedCounter` for generating sequential step IDs -- **State Management**: Delegates to `ExecutionState` for checkpointing - -### Concurrency Implementation -- **Map/Parallel**: Both inherit from `ConcurrentExecutor` abstract base class -- **Thread Pool**: Uses `ThreadPoolExecutor` for concurrent execution -- **State Tracking**: `ExecutableWithState` manages individual task lifecycle -- **Completion Logic**: `ExecutionCounters` tracks success/failure criteria -- **Suspension**: `TimerScheduler` handles timed suspensions and resumptions - -### Configuration System -- **Modular Configs**: Separate config classes for each operation type -- **Completion Control**: `CompletionConfig` defines success/failure criteria -- **Serialization**: `SerDes` interface for custom serialization - -### Operation Handlers -- **Separation of Concerns**: Each operation has dedicated handler function -- **Checkpointing**: All operations integrate with execution state checkpointing -- **Error Handling**: Consistent error handling and retry logic across operations - - -```mermaid -classDiagram - class DurableContext { - -ExecutionState state - -Any lambda_context - -str _parent_id - -OrderedCounter _step_counter - -LogInfo _log_info - -Logger logger - - +set_logger(LoggerInterface new_logger) - +step(Callable func, str name, StepConfig config) T - +invoke(str function_name, P payload, str name, InvokeConfig config) R - +map(Sequence inputs, Callable func, str name, MapConfig config) BatchResult - +parallel(Sequence functions, str name, ParallelConfig config) BatchResult - +run_in_child_context(Callable func, str name, ChildConfig config) T - +wait(int seconds, str name) - +create_callback(str name, CallbackConfig config) Callback - +wait_for_callback(Callable submitter, str name, WaitForCallbackConfig config) Any - +wait_for_condition(Callable check, WaitForConditionConfig config, str name) T - } - - class DurableContextProtocol { - <> - +step(Callable func, str name, StepConfig config) T - +run_in_child_context(Callable func, str name, ChildConfig config) T - +map(Sequence inputs, Callable func, str name, MapConfig config) BatchResult - +parallel(Sequence functions, str name, ParallelConfig config) BatchResult - +wait(int seconds, str name) - +create_callback(str name, CallbackConfig config) Callback - } - - class OrderedCounter { - -OrderedLock _lock - -int _counter - +increment() int - +decrement() int - +get_current() int - } - - class ExecutionState { - +str durable_execution_arn - +get_checkpoint_result(str operation_id) CheckpointedResult - +create_checkpoint(OperationUpdate operation_update) - } - - class Logger { - +LoggerInterface logger - +LogInfo info - +with_log_info(LogInfo info) Logger - +from_log_info(LoggerInterface logger, LogInfo info) Logger - } - - DurableContext ..|> DurableContextProtocol : implements - DurableContext --> ExecutionState : uses - DurableContext --> OrderedCounter : contains - DurableContext --> Logger : contains -``` - -## Operation Handlers -The `DurableContext` calls operation handlers, which contain the execution logic for each operation. - -```mermaid -classDiagram - class DurableContext { - +step(Callable func, str name, StepConfig config) T - +invoke(str function_name, P payload, str name, InvokeConfig config) R - +map(Sequence inputs, Callable func, str name, MapConfig config) BatchResult - +parallel(Sequence functions, str name, ParallelConfig config) BatchResult - +run_in_child_context(Callable func, str name, ChildConfig config) T - +wait(int seconds, str name) - +create_callback(str name, CallbackConfig config) Callback - +wait_for_callback(Callable submitter, str name, WaitForCallbackConfig config) Any - +wait_for_condition(Callable check, WaitForConditionConfig config, str name) T - } - - class step_handler { - <> - +step_handler(Callable func, ExecutionState state, OperationIdentifier op_id, StepConfig config, Logger logger) T - } - - class invoke_handler { - <> - +invoke_handler(str function_name, P payload, ExecutionState state, OperationIdentifier op_id, InvokeConfig config) R - } - - class map_handler { - <> - +map_handler(Sequence items, Callable func, MapConfig config, ExecutionState state, Callable run_in_child_context) BatchResult - } - - class parallel_handler { - <> - +parallel_handler(Sequence callables, ParallelConfig config, ExecutionState state, Callable run_in_child_context) BatchResult - } - - class child_handler { - <> - +child_handler(Callable func, ExecutionState state, OperationIdentifier op_id, ChildConfig config) T - } - - class wait_handler { - <> - +wait_handler(int seconds, ExecutionState state, OperationIdentifier op_id) - } - - class create_callback_handler { - <> - +create_callback_handler(ExecutionState state, OperationIdentifier op_id, CallbackConfig config) str - } - - class wait_for_callback_handler { - <> - +wait_for_callback_handler(DurableContext context, Callable submitter, str name, WaitForCallbackConfig config) Any - } - - class wait_for_condition_handler { - <> - +wait_for_condition_handler(Callable check, WaitForConditionConfig config, ExecutionState state, OperationIdentifier op_id, Logger logger) T - } - - DurableContext --> step_handler : calls - DurableContext --> invoke_handler : calls - DurableContext --> map_handler : calls - DurableContext --> parallel_handler : calls - DurableContext --> child_handler : calls - DurableContext --> wait_handler : calls - DurableContext --> create_callback_handler : calls - DurableContext --> wait_for_callback_handler : calls - DurableContext --> wait_for_condition_handler : calls -``` - -## Configuration Module Classes - -```mermaid -classDiagram - class StepConfig { - +Callable retry_strategy - +StepSemantics step_semantics - +SerDes serdes - } - - class InvokeConfig~P,R~ { - +int timeout_seconds - +SerDes~P~ serdes_payload - +SerDes~R~ serdes_result - } - - class MapConfig { - +int max_concurrency - +ItemBatcher item_batcher - +CompletionConfig completion_config - +SerDes serdes - } - - class ParallelConfig { - +int max_concurrency - +CompletionConfig completion_config - +SerDes serdes - } - - class ChildConfig~T~ { - +SerDes serdes - +OperationSubType sub_type - +Callable~T,str~ summary_generator - } - - class CallbackConfig { - +int timeout_seconds - +int heartbeat_timeout_seconds - +SerDes serdes - } - - class WaitForCallbackConfig { - +Callable retry_strategy - } - - class WaitForConditionConfig~T~ { - +Callable wait_strategy - +T initial_state - +SerDes serdes - } - - class CompletionConfig { - +int min_successful - +int tolerated_failure_count - +float tolerated_failure_percentage - +first_successful()$ CompletionConfig - +all_completed()$ CompletionConfig - +all_successful()$ CompletionConfig - } - - class ItemBatcher~T~ { - +int max_items_per_batch - +float max_item_bytes_per_batch - +T batch_input - } - - WaitForCallbackConfig --|> CallbackConfig : extends - MapConfig --> CompletionConfig : contains - MapConfig --> ItemBatcher : contains - ParallelConfig --> CompletionConfig : contains -``` - -## Types and Protocols Module +Build reliable, long-running AWS Lambda workflows with checkpointed steps, waits, callbacks, and parallel execution. -```mermaid -classDiagram - class DurableContextProtocol { - <> - +step(Callable func, str name, StepConfig config) T - +run_in_child_context(Callable func, str name, ChildConfig config) T - +map(Sequence inputs, Callable func, str name, MapConfig config) BatchResult - +parallel(Sequence functions, str name, ParallelConfig config) BatchResult - +wait(int seconds, str name) - +create_callback(str name, CallbackConfig config) Callback - } +## ✨ Key Features - class LoggerInterface { - <> - +debug(object msg, *args, Mapping extra) - +info(object msg, *args, Mapping extra) - +warning(object msg, *args, Mapping extra) - +error(object msg, *args, Mapping extra) - +exception(object msg, *args, Mapping extra) - } +- **Automatic checkpointing** - Resume execution after Lambda pauses or restarts +- **Durable steps** - Run work with retry strategies and deterministic replay +- **Waits and callbacks** - Pause for time or external signals without blocking Lambda +- **Parallel and map operations** - Fan out work with configurable completion criteria +- **Child contexts** - Structure complex workflows into isolated subflows +- **Replay-safe logging** - Use `context.logger` for structured, de-duplicated logs +- **Local and cloud testing** - Validate workflows with the testing SDK - class CallbackProtocol~C_co~ { - <> - +str callback_id - +result() C_co - } +## 📦 Packages - class BatchResultProtocol~T~ { - <> - +get_results() list~T~ - } +| Package | Description | Version | +| --- | --- | --- | +| `aws-durable-execution-sdk-python` | Execution SDK for Lambda durable functions | [![PyPI - Version](https://img.shields.io/pypi/v/aws-durable-execution-sdk-python.svg)](https://pypi.org/project/aws-durable-execution-sdk-python) | +| `aws-durable-execution-sdk-python-testing` | Local/cloud test runner and pytest helpers | [![PyPI - Version](https://img.shields.io/pypi/v/aws-durable-execution-sdk-python-testing.svg)](https://pypi.org/project/aws-durable-execution-sdk-python-testing) | - class StepContext { - +LoggerInterface logger - } +## 🚀 Quick Start - class WaitForConditionCheckContext { - +LoggerInterface logger - } +Install the execution SDK: - class OperationContext { - +LoggerInterface logger - } - - StepContext --|> OperationContext : extends - WaitForConditionCheckContext --|> OperationContext : extends -``` - -## SerDes Module Classes - -```mermaid -classDiagram - class SerDes~T~ { - <> - +serialize(T value, SerDesContext context) str - +deserialize(str data, SerDesContext context) T - } - - class JsonSerDes~T~ { - +serialize(T value, SerDesContext context) str - +deserialize(str data, SerDesContext context) T - } - - class SerDesContext { - +str operation_id - +str durable_execution_arn - } - - class serialize { - <> - +serialize(SerDes serdes, T value, str operation_id, str durable_execution_arn) str - } - - class deserialize { - <> - +deserialize(SerDes serdes, str data, str operation_id, str durable_execution_arn) T - } - - JsonSerDes ..|> SerDes : implements - serialize --> SerDes : uses - deserialize --> SerDes : uses - SerDes --> SerDesContext : uses -``` - -## Concurrency Architecture - Map and Parallel Operations - -```mermaid -classDiagram - class ConcurrentExecutor~CallableType,ResultType~ { - <> - +list~Executable~ executables - +int max_concurrency - +CompletionConfig completion_config - +ExecutionCounters counters - +list~ExecutableWithState~ executables_with_state - +Event _completion_event - +SuspendExecution _suspend_exception - - +execute(ExecutionState state, Callable run_in_child_context) BatchResult~ResultType~ - +execute_item(DurableContext child_context, Executable executable)* ResultType - +should_execution_suspend() SuspendResult - -_on_task_complete(ExecutableWithState exe_state, Future future, TimerScheduler scheduler) - -_create_result() BatchResult~ResultType~ - } - - class MapExecutor~T,R~ { - +Sequence~T~ items - +execute_item(DurableContext child_context, Executable executable) R - +from_items(Sequence items, Callable func, MapConfig config)$ MapExecutor - } - - class ParallelExecutor { - +execute_item(DurableContext child_context, Executable executable) R - +from_callables(Sequence callables, ParallelConfig config)$ ParallelExecutor - } - - class Executable~CallableType~ { - +int index - +CallableType func - } - - class ExecutableWithState~CallableType,ResultType~ { - +Executable~CallableType~ executable - -BranchStatus _status - -Future _future - -float _suspend_until - -ResultType _result - -Exception _error - - +run(Future future) - +suspend() - +suspend_with_timeout(float timestamp) - +complete(ResultType result) - +fail(Exception error) - +reset_to_pending() - +can_resume() bool - +is_running() bool - } - - class ExecutionCounters { - +int total_tasks - +int min_successful - +int success_count - +int failure_count - -Lock _lock - - +complete_task() - +fail_task() - +should_complete() bool - +is_all_completed() bool - +is_min_successful_reached() bool - +is_failure_tolerance_exceeded() bool - } - - class TimerScheduler { - +Callable resubmit_callback - -list _pending_resumes - -Lock _lock - -Event _shutdown - -Thread _timer_thread - - +schedule_resume(ExecutableWithState exe_state, float resume_time) - +shutdown() - -_timer_loop() - } - - class BatchResult~R~ { - +list~BatchItem~R~~ all - +CompletionReason completion_reason - +succeeded() list~BatchItem~R~~ - +failed() list~BatchItem~R~~ - +get_results() list~R~ - +throw_if_error() - } - - class BatchItem~R~ { - +int index - +BatchItemStatus status - +R result - +ErrorObject error - } - - MapExecutor --|> ConcurrentExecutor : extends - ParallelExecutor --|> ConcurrentExecutor : extends - ConcurrentExecutor --> ExecutableWithState : manages - ConcurrentExecutor --> ExecutionCounters : uses - ConcurrentExecutor --> TimerScheduler : uses - ConcurrentExecutor --> BatchResult : creates - ExecutableWithState --> Executable : contains - BatchResult --> BatchItem : contains -``` - -## Concurrency Flow - -```mermaid -sequenceDiagram - participant DC as DurableContext - participant MH as map_handler - participant ME as MapExecutor - participant CE as ConcurrentExecutor - participant TP as ThreadPoolExecutor - participant TS as TimerScheduler - participant EC as ExecutionCounters - - DC->>MH: map(inputs, func, config) - MH->>ME: MapExecutor.from_items() - ME->>CE: execute(state, run_in_child_context) - - CE->>TP: ThreadPoolExecutor(max_workers) - CE->>TS: TimerScheduler(resubmitter) - CE->>EC: ExecutionCounters(total, min_successful) - - loop For each executable - CE->>TP: submit_task(executable_with_state) - TP->>CE: execute_item_in_child_context() - CE->>DC: run_in_child_context(child_func) - DC->>ME: execute_item(child_context, executable) - end - - par Task Completion Handling - TP->>CE: on_task_complete(future) - CE->>EC: complete_task() / fail_task() - CE->>CE: should_execution_suspend() - alt Should Complete - CE->>CE: _completion_event.set() - else Should Suspend - CE->>TS: schedule_resume(exe_state, timestamp) - end - end - - CE->>CE: _completion_event.wait() - CE->>CE: _create_result() - CE->>DC: BatchResult -``` - -## Threading and Locking - -```mermaid -classDiagram - class OrderedLock { - -Lock _lock - -deque~Event~ _waiters - -bool _is_broken - -Exception _exception - - +acquire() bool - +release() - +reset() - +is_broken() bool - +__enter__() OrderedLock - +__exit__(exc_type, exc_val, exc_tb) - } - - class OrderedCounter { - -OrderedLock _lock - -int _counter - - +increment() int - +decrement() int - +get_current() int - } - - class Event { - <> - +set() - +wait() - } - - class Lock { - <> - +acquire() - +release() - } - - OrderedCounter --> OrderedLock : uses - OrderedLock --> Lock : contains - OrderedLock --> Event : manages queue of -``` - -## Checkpointing System - -The SDK invokes the AWS Lambda checkpoint API to persist execution state. Checkpoints are batched for efficiency and can be either -synchronous (blocking) or asynchronous (non-blocking). Critical checkpoints are blocking, -meaning that execution will not proceed until the checkpoint call has successfully completed. - -### Checkpoint Types - -Checkpoints are categorized by their action (START, SUCCEED, FAIL) and whether they are critical to execution correctness: - -| Operation Type | Action | Is Sync? | Rationale | -|---------------|--------|----------|-----------| -| Step (AtMostOncePerRetry) | START | Yes | Prevents duplicate execution - must wait for confirmation | -| Step (AtLeastOncePerRetry) | START | No | Performance optimization - idempotent operations can retry | -| Step | SUCCEED/FAIL | Yes | Ensures result persisted before returning to caller | -| Callback | START | Yes | Must wait for API to generate callback ID | -| Callback | SUCCEED/FAIL | Yes | Ensures callback result persisted | -| Invoke | START | Yes | Ensures chained invoke recorded before proceeding | -| Invoke | SUCCEED/FAIL | Yes | Ensures invoke result persisted | -| Context (Child) | START | No | Fire-and-forget for performance - parent tracks completion | -| Context (Child) | SUCCEED/FAIL | Yes | Ensures child result available to parent | -| Wait | START | No | Observability only - no blocking needed | -| Wait | SUCCEED | Yes | Ensures wait completion recorded | -| Wait for Condition | START | No | Observability only - condition check is idempotent | -| Wait for Condition | SUCCEED/FAIL | Yes | Ensures condition result persisted | -| Empty Checkpoint | N/A | Yes (default) | Refreshes checkpoint token and operations list | - -### Synchronous vs Asynchronous Checkpoints - -**Synchronous Checkpoints (is_sync=True, default)**: -- Block the caller until the checkpoint is processed by the background thread -- Ensure the checkpoint is persisted before continuing execution -- Safe default for correctness -- Used for critical operations where confirmation is required - -**Asynchronous Checkpoints (is_sync=False, opt-in)**: -- Return immediately without waiting for the checkpoint to complete -- Performance optimization for specific use cases -- Used for observability checkpoints and fire-and-forget operations -- Only safe when the operation is idempotent or non-critical - -### Checkpoint Batching - -The SDK uses a background thread to batch multiple checkpoint operations into a single API call for efficiency. This reduces API overhead and -improves throughput. - -```mermaid -sequenceDiagram - participant MT as Main Thread - participant Q as Checkpoint Queue - participant BT as Background Thread - participant API as Durable Functions API - - Note over MT,API: Synchronous Checkpoint Flow - MT->>Q: Enqueue operation + completion event - MT->>MT: Block on completion event - BT->>Q: Collect batch (up to 1 second or 750KB) - BT->>API: POST /checkpoint (batched operations) - API-->>BT: New checkpoint token + operations - BT->>BT: Update execution state - BT->>MT: Signal completion event - MT->>MT: Resume execution - - Note over MT,API: Asynchronous Checkpoint Flow - MT->>Q: Enqueue operation (no event) - MT->>MT: Continue immediately - BT->>Q: Collect batch (up to 1 second or 750KB) - BT->>API: POST /checkpoint (batched operations) - API-->>BT: New checkpoint token + operations - BT->>BT: Update execution state +```console +pip install aws-durable-execution-sdk-python ``` -### Batching Configuration - -Checkpoint batching is controlled by `CheckpointBatcherConfig`: +Create a durable Lambda handler: ```python -@dataclass(frozen=True) -class CheckpointBatcherConfig: - max_batch_size_bytes: int = 750 * 1024 # 750KB - max_batch_time_seconds: float = 1.0 # 1 second - max_batch_operations: int | float = float("inf") # No limit +from aws_durable_execution_sdk_python import ( + DurableContext, + StepContext, + durable_execution, + durable_step, +) +from aws_durable_execution_sdk_python.config import Duration + +@durable_step +def validate_order(step_ctx: StepContext, order_id: str) -> dict: + step_ctx.logger.info("Validating order", extra={"order_id": order_id}) + return {"order_id": order_id, "valid": True} + +@durable_execution +def handler(event: dict, context: DurableContext) -> dict: + order_id = event["order_id"] + context.logger.info("Starting workflow", extra={"order_id": order_id}) + + validation = context.step(validate_order(order_id), name="validate_order") + if not validation["valid"]: + return {"status": "rejected", "order_id": order_id} + + # simulate approval (real world: use wait_for_callback) + context.wait(duration=Duration.from_seconds(5), name="await_confirmation") + + return {"status": "approved", "order_id": order_id} ``` -The background thread collects operations until one of these limits is reached: -1. Batch size exceeds 750KB -2. 1 second has elapsed since the first operation -3. Maximum operation count is reached (unlimited by default) - -### Concurrency Management +## 📚 Documentation -The checkpointing system handles concurrent operations (map/parallel) by tracking parent-child relationships: +- **[AWS Documentation](https://docs.aws.amazon.com/lambda/latest/dg/durable-functions.html)** - Official AWS Lambda durable functions guide +- **[Documentation index](docs/index.md)** - SDK Overview and navigation -1. When a CONTEXT operation completes (SUCCEED/FAIL), all descendant operations are marked as orphaned -2. Orphaned operations are rejected if they attempt to checkpoint -3. This prevents child operations from checkpointing after their parent has already completed -4. Uses a single lock (`_parent_done_lock`) to coordinate completion and checkpoint validation +**New to durable functions?** +- [Getting started guide](docs/getting-started.md) - Build your first durable function -### Error Handling +**Core operations:** +- [Steps](docs/core/steps.md) - Execute code with automatic checkpointing and retry support +- [Wait operations](docs/core/wait.md) - Pause execution without blocking Lambda resources +- [Callbacks](docs/core/callbacks.md) - Wait for external systems to respond +- [Invoke operations](docs/core/invoke.md) - Call other durable functions and compose workflows +- [Child contexts](docs/core/child-contexts.md) - Organize complex workflows into isolated units +- [Parallel operations](docs/core/parallel.md) - Run multiple operations concurrently +- [Map operations](docs/core/map.md) - Process collections in parallel with batching +- [Logger integration](docs/core/logger.md) - Add structured logging to track execution -When a checkpoint fails in the background thread: +**Advanced topics:** +- [Error handling](docs/advanced/error-handling.md) - Handle failures and implement retry strategies +- [Testing modes](docs/advanced/testing-modes.md) - Run tests locally or against deployed Lambda functions +- [Testing patterns](docs/testing-patterns/basic-tests.md) - Practical testing examples +- [Serialization](docs/advanced/serialization.md) - Customize how data is serialized in checkpoints -1. **Error Signaling**: The background thread creates a `BackgroundThreadError` wrapping the original exception -2. **Event Notification**: All completion events (both in the current batch and queued operations) are signaled with this error -3. **Immediate Propagation**: Synchronous callers waiting on `create_checkpoint(is_sync=True)` immediately receive the `BackgroundThreadError` -4. **Future Prevention**: A failure event (`_checkpointing_failed`) is set to prevent any future checkpoint attempts -5. **Clean Termination**: The background thread exits cleanly after signaling all waiting operations +**Architecture:** +- [Architecture diagrams](docs/architecture.md) - Class diagrams and concurrency flows -For **synchronous operations** (default `is_sync=True`): -- The main thread receives `BackgroundThreadError` immediately when calling `create_checkpoint()` -- This prevents further execution with corrupted state +**API reference:** +- API reference docs are in progress. Use the core operation docs above for now. -For **asynchronous operations** (`is_sync=False`): -- The error is detected on the next synchronous checkpoint attempt -- The `_checkpointing_failed` event causes immediate failure before queuing +## 💬 Feedback & Support -This ensures no code continues executing after a checkpoint failure, maintaining execution state integrity. +- [Bug report](https://github.com/aws/aws-durable-execution-sdk-python/issues/new?template=bug_report.yml) +- [Feature request](https://github.com/aws/aws-durable-execution-sdk-python/issues/new?template=feature_request.yml) +- [Documentation feedback](https://github.com/aws/aws-durable-execution-sdk-python/issues/new?template=documentation.yml) +- [Contributing guide](CONTRIBUTING.md) -## License +## 📄 License -This project is licensed under the [Apache-2.0 License](LICENSE). +See the [LICENSE](LICENSE) file for our project's licensing. diff --git a/docs/advanced/testing-modes.md b/docs/advanced/testing-modes.md index 5d05d35..4a74920 100644 --- a/docs/advanced/testing-modes.md +++ b/docs/advanced/testing-modes.md @@ -488,9 +488,8 @@ Fix the error in your function code and redeploy. ## See also -- [Test Runner](../core/test-runner.md) - Learn about the test runner interface - [Getting Started](../getting-started.md) - Set up your development environment -- [Pytest Integration](pytest-integration.md) - Advanced pytest configuration +- [Testing patterns](../testing-patterns/basic-tests.md) - Practical pytest examples - [Examples README](../../examples/test/README.md) - More examples and configuration details [↑ Back to top](#table-of-contents) diff --git a/docs/architecture.md b/docs/architecture.md new file mode 100644 index 0000000..d6a3180 --- /dev/null +++ b/docs/architecture.md @@ -0,0 +1,505 @@ +# Architecture diagrams + +[Back to main index](index.md) + +## Core architecture + +The entry-point that consumers of the SDK interact with is the DurableContext. + +### DurableContext operations + +- **Core Methods**: `set_logger`, `step`, `invoke`, `map`, `parallel`, `run_in_child_context`, `wait`, `create_callback`, `wait_for_callback`, `wait_for_condition` +- **Thread Safety**: Uses `OrderedCounter` for generating sequential step IDs +- **State Management**: Delegates to `ExecutionState` for checkpointing + +### Concurrency implementation + +- **Map/Parallel**: Both inherit from `ConcurrentExecutor` abstract base class +- **Thread Pool**: Uses `ThreadPoolExecutor` for concurrent execution +- **State Tracking**: `ExecutableWithState` manages individual task lifecycle +- **Completion Logic**: `ExecutionCounters` tracks success/failure criteria +- **Suspension**: `TimerScheduler` handles timed suspensions and resumptions + +### Configuration system + +- **Modular Configs**: Separate config classes for each operation type +- **Completion Control**: `CompletionConfig` defines success/failure criteria +- **Serialization**: `SerDes` interface for custom serialization + +### Operation handlers + +- **Separation of Concerns**: Each operation has dedicated handler function +- **Checkpointing**: All operations integrate with execution state checkpointing +- **Error Handling**: Consistent error handling and retry logic across operations + +```mermaid +classDiagram + class DurableContext { + -ExecutionState state + -Any lambda_context + -str _parent_id + -OrderedCounter _step_counter + -LogInfo _log_info + -Logger logger + + +set_logger(LoggerInterface new_logger) + +step(Callable func, str name, StepConfig config) T + +invoke(str function_name, P payload, str name, InvokeConfig config) R + +map(Sequence inputs, Callable func, str name, MapConfig config) BatchResult + +parallel(Sequence functions, str name, ParallelConfig config) BatchResult + +run_in_child_context(Callable func, str name, ChildConfig config) T + +wait(int seconds, str name) + +create_callback(str name, CallbackConfig config) Callback + +wait_for_callback(Callable submitter, str name, WaitForCallbackConfig config) Any + +wait_for_condition(Callable check, WaitForConditionConfig config, str name) T + } + + class DurableContextProtocol { + <> + +step(Callable func, str name, StepConfig config) T + +run_in_child_context(Callable func, str name, ChildConfig config) T + +map(Sequence inputs, Callable func, str name, MapConfig config) BatchResult + +parallel(Sequence functions, str name, ParallelConfig config) BatchResult + +wait(int seconds, str name) + +create_callback(str name, CallbackConfig config) Callback + } + + class OrderedCounter { + -OrderedLock _lock + -int _counter + +increment() int + +decrement() int + +get_current() int + } + + class ExecutionState { + +str durable_execution_arn + +get_checkpoint_result(str operation_id) CheckpointedResult + +create_checkpoint(OperationUpdate operation_update) + } + + class Logger { + +LoggerInterface logger + +LogInfo info + +with_log_info(LogInfo info) Logger + +from_log_info(LoggerInterface logger, LogInfo info) Logger + } + + DurableContext ..|> DurableContextProtocol : implements + DurableContext --> ExecutionState : uses + DurableContext --> OrderedCounter : contains + DurableContext --> Logger : contains +``` + +## Operation handlers + +The `DurableContext` calls operation handlers, which contain the execution logic for each operation. + +```mermaid +classDiagram + class DurableContext { + +step(Callable func, str name, StepConfig config) T + +invoke(str function_name, P payload, str name, InvokeConfig config) R + +map(Sequence inputs, Callable func, str name, MapConfig config) BatchResult + +parallel(Sequence functions, str name, ParallelConfig config) BatchResult + +run_in_child_context(Callable func, str name, ChildConfig config) T + +wait(int seconds, str name) + +create_callback(str name, CallbackConfig config) Callback + +wait_for_callback(Callable submitter, str name, WaitForCallbackConfig config) Any + +wait_for_condition(Callable check, WaitForConditionConfig config, str name) T + } + + class step_handler { + <> + +step_handler(Callable func, ExecutionState state, OperationIdentifier op_id, StepConfig config, Logger logger) T + } + + class invoke_handler { + <> + +invoke_handler(str function_name, P payload, ExecutionState state, OperationIdentifier op_id, InvokeConfig config) R + } + + class map_handler { + <> + +map_handler(Sequence items, Callable func, MapConfig config, ExecutionState state, Callable run_in_child_context) BatchResult + } + + class parallel_handler { + <> + +parallel_handler(Sequence callables, ParallelConfig config, ExecutionState state, Callable run_in_child_context) BatchResult + } + + class child_handler { + <> + +child_handler(Callable func, ExecutionState state, OperationIdentifier op_id, ChildConfig config) T + } + + class wait_handler { + <> + +wait_handler(int seconds, ExecutionState state, OperationIdentifier op_id) + } + + class create_callback_handler { + <> + +create_callback_handler(ExecutionState state, OperationIdentifier op_id, CallbackConfig config) str + } + + class wait_for_callback_handler { + <> + +wait_for_callback_handler(DurableContext context, Callable submitter, str name, WaitForCallbackConfig config) Any + } + + class wait_for_condition_handler { + <> + +wait_for_condition_handler(Callable check, WaitForConditionConfig config, ExecutionState state, OperationIdentifier op_id, Logger logger) T + } + + DurableContext --> step_handler : calls + DurableContext --> invoke_handler : calls + DurableContext --> map_handler : calls + DurableContext --> parallel_handler : calls + DurableContext --> child_handler : calls + DurableContext --> wait_handler : calls + DurableContext --> create_callback_handler : calls + DurableContext --> wait_for_callback_handler : calls + DurableContext --> wait_for_condition_handler : calls +``` + +## Configuration module classes + +```mermaid +classDiagram + class StepConfig { + +Callable retry_strategy + +StepSemantics step_semantics + +SerDes serdes + } + + class InvokeConfig~P,R~ { + +int timeout_seconds + +SerDes~P~ serdes_payload + +SerDes~R~ serdes_result + } + + class MapConfig { + +int max_concurrency + +ItemBatcher item_batcher + +CompletionConfig completion_config + +SerDes serdes + } + + class ParallelConfig { + +int max_concurrency + +CompletionConfig completion_config + +SerDes serdes + } + + class ChildConfig~T~ { + +SerDes serdes + +OperationSubType sub_type + +Callable~T,str~ summary_generator + } + + class CallbackConfig { + +int timeout_seconds + +int heartbeat_timeout_seconds + +SerDes serdes + } + + class WaitForCallbackConfig { + +Callable retry_strategy + } + + class WaitForConditionConfig~T~ { + +Callable wait_strategy + +T initial_state + +SerDes serdes + } + + class CompletionConfig { + +int min_successful + +int tolerated_failure_count + +float tolerated_failure_percentage + +first_successful()$ CompletionConfig + +all_completed()$ CompletionConfig + +all_successful()$ CompletionConfig + } + + class ItemBatcher~T~ { + +int max_items_per_batch + +float max_item_bytes_per_batch + +T batch_input + } + + WaitForCallbackConfig --|> CallbackConfig : extends + MapConfig --> CompletionConfig : contains + MapConfig --> ItemBatcher : contains + ParallelConfig --> CompletionConfig : contains +``` + +## Types and protocols module + +```mermaid +classDiagram + class DurableContextProtocol { + <> + +step(Callable func, str name, StepConfig config) T + +run_in_child_context(Callable func, str name, ChildConfig config) T + +map(Sequence inputs, Callable func, str name, MapConfig config) BatchResult + +parallel(Sequence functions, str name, ParallelConfig config) BatchResult + +wait(int seconds, str name) + +create_callback(str name, CallbackConfig config) Callback + } + + class LoggerInterface { + <> + +debug(object msg, *args, Mapping extra) + +info(object msg, *args, Mapping extra) + +warning(object msg, *args, Mapping extra) + +error(object msg, *args, Mapping extra) + +exception(object msg, *args, Mapping extra) + } + + class CallbackProtocol~C_co~ { + <> + +str callback_id + +result() C_co + } + + class BatchResultProtocol~T~ { + <> + +get_results() list~T~ + } + + class StepContext { + +LoggerInterface logger + } + + class WaitForConditionCheckContext { + +LoggerInterface logger + } + + class OperationContext { + +LoggerInterface logger + } + + StepContext --|> OperationContext : extends + WaitForConditionCheckContext --|> OperationContext : extends +``` + +## SerDes module classes + +```mermaid +classDiagram + class SerDes~T~ { + <> + +serialize(T value, SerDesContext context) str + +deserialize(str data, SerDesContext context) T + } + + class JsonSerDes~T~ { + +serialize(T value, SerDesContext context) str + +deserialize(str data, SerDesContext context) T + } + + class SerDesContext { + +str operation_id + +str durable_execution_arn + } + + class serialize { + <> + +serialize(SerDes serdes, T value, str operation_id, str durable_execution_arn) str + } + + class deserialize { + <> + +deserialize(SerDes serdes, str data, str operation_id, str durable_execution_arn) T + } + + JsonSerDes ..|> SerDes : implements + serialize --> SerDes : uses + deserialize --> SerDes : uses + SerDes --> SerDesContext : uses +``` + +## Concurrency architecture - map and parallel operations + +```mermaid +classDiagram + class ConcurrentExecutor~CallableType,ResultType~ { + <> + +list~Executable~ executables + +int max_concurrency + +CompletionConfig completion_config + +ExecutionCounters counters + +list~ExecutableWithState~ executables_with_state + +Event _completion_event + +SuspendExecution _suspend_exception + + +execute(ExecutionState state, Callable run_in_child_context) BatchResult~ResultType~ + +execute_item(DurableContext child_context, Executable executable)* ResultType + +should_execution_suspend() SuspendResult + -_on_task_complete(ExecutableWithState exe_state, Future future, TimerScheduler scheduler) + -_create_result() BatchResult~ResultType~ + } + + class MapExecutor~T,R~ { + +Sequence~T~ items + +execute_item(DurableContext child_context, Executable executable) R + +from_items(Sequence items, Callable func, MapConfig config)$ MapExecutor + } + + class ParallelExecutor { + +execute_item(DurableContext child_context, Executable executable) R + +from_callables(Sequence callables, ParallelConfig config)$ ParallelExecutor + } + + class Executable~CallableType~ { + +int index + +CallableType func + } + + class ExecutableWithState~CallableType,ResultType~ { + +Executable~CallableType~ executable + -BranchStatus _status + -Future _future + -float _suspend_until + -ResultType _result + -Exception _error + + +run(Future future) + +suspend() + +suspend_with_timeout(float timestamp) + +complete(ResultType result) + +fail(Exception error) + +reset_to_pending() + +can_resume() bool + +is_running() bool + } + + class ExecutionCounters { + +int total_tasks + +int min_successful + +int success_count + +int failure_count + -Lock _lock + + +complete_task() + +fail_task() + +should_complete() bool + +is_all_completed() bool + +is_min_successful_reached() bool + +is_failure_tolerance_exceeded() bool + } + + class TimerScheduler { + +Callable resubmit_callback + -list _pending_resumes + -Lock _lock + -Event _shutdown + -Thread _timer_thread + + +schedule_resume(ExecutableWithState exe_state, float resume_time) + +shutdown() + -_timer_loop() + } + + class BatchResult~R~ { + +list~BatchItem~R~~ all + +CompletionReason completion_reason + +succeeded() list~BatchItem~R~~ + +failed() list~BatchItem~R~~ + +get_results() list~R~ + +throw_if_error() + } + + class BatchItem~R~ { + +int index + +BatchItemStatus status + +R result + +ErrorObject error + } + + MapExecutor --|> ConcurrentExecutor : extends + ParallelExecutor --|> ConcurrentExecutor : extends + ConcurrentExecutor --> ExecutableWithState : manages + ConcurrentExecutor --> ExecutionCounters : uses + ConcurrentExecutor --> TimerScheduler : uses + ConcurrentExecutor --> BatchResult : creates + ExecutableWithState --> Executable : contains + BatchResult --> BatchItem : contains +``` + +## Concurrency flow + +```mermaid +sequenceDiagram + participant DC as DurableContext + participant MH as map_handler + participant ME as MapExecutor + participant CE as ConcurrentExecutor + participant TP as ThreadPoolExecutor + participant TS as TimerScheduler + participant EC as ExecutionCounters + + DC->>MH: map(inputs, func, config) + MH->>ME: MapExecutor.from_items() + ME->>CE: execute(state, run_in_child_context) + + CE->>TP: ThreadPoolExecutor(max_workers) + CE->>TS: TimerScheduler(resubmitter) + CE->>EC: ExecutionCounters(total, min_successful) + + loop For each executable + CE->>TP: submit_task(executable_with_state) + TP->>CE: execute_item_in_child_context() + CE->>DC: run_in_child_context(child_func) + DC->>ME: execute_item(child_context, executable) + end + + par Task Completion Handling + TP->>CE: on_task_complete(future) + CE->>EC: complete_task() / fail_task() + CE->>CE: should_execution_suspend() + alt Should Complete + CE->>CE: _completion_event.set() + else Should Suspend + CE->>TS: schedule_resume(exe_state, timestamp) + end + end + + CE->>CE: _completion_event.wait() + CE->>CE: _create_result() + CE->>DC: BatchResult +``` + +## Threading and locking + +```mermaid +classDiagram + class OrderedLock { + -Lock _lock + -deque~Event~ _waiters + -bool _is_broken + -Exception _exception + + +acquire() bool + +release() + +reset() + +is_broken() bool + +__enter__() OrderedLock + +__exit__(exc_type, exc_val, exc_tb) + } + + class OrderedCounter { + -OrderedLock _lock + -int _counter + + +increment() int + +decrement() int + +get_current() int + } +``` + +[Back to top](#architecture-diagrams) diff --git a/docs/core/wait.md b/docs/core/wait.md index 904e9fa..e7d40a1 100644 --- a/docs/core/wait.md +++ b/docs/core/wait.md @@ -48,7 +48,7 @@ Use `context.wait()` when you need a simple time-based delay. **Choose a different method if you need:** - **Wait for external system response** → Use [`context.wait_for_callback()`](callbacks.md) -- **Wait until a condition is met** → Use [`context.wait_for_condition()`](../advanced/wait-for-condition.md) +- **Wait until a condition is met** → Use `context.wait_for_condition()` - **Wait for a step to complete** → Use [`context.step()`](steps.md) [↑ Back to top](#table-of-contents) @@ -293,8 +293,6 @@ def handler(event: dict, context: DurableContext) -> dict: return result ``` -See [Wait for Condition](../advanced/wait-for-condition.md) for more details. - [↑ Back to top](#table-of-contents) ## Alternatives to wait operations @@ -355,8 +353,6 @@ def handler(event: dict, context: DurableContext) -> dict: return result ``` -See [Wait for Condition](../advanced/wait-for-condition.md) for more details. - [↑ Back to top](#table-of-contents) ## Testing @@ -438,7 +434,6 @@ def test_named_wait(durable_runner): - [Steps](steps.md) - Execute business logic with automatic checkpointing - [Callbacks](callbacks.md) - Wait for external system responses -- [Wait for Condition](../advanced/wait-for-condition.md) - Poll until a condition is met - [Getting Started](../getting-started.md) - Learn the basics of durable functions [↑ Back to top](#table-of-contents) diff --git a/docs/getting-started.md b/docs/getting-started.md index 6c7e077..3854293 100644 --- a/docs/getting-started.md +++ b/docs/getting-started.md @@ -278,8 +278,9 @@ Now that you've built your first durable function, explore the core features: ## See also -- [DurableContext API](api-reference/context.md) - Complete reference for the context object -- [Decorators](api-reference/decorators.md) - All available decorators +- [Documentation index](index.md) - Browse all guides and examples +- [Architecture diagrams](architecture.md) - Class diagrams and concurrency flows +- [Logger integration](core/logger.md) - Replay-safe structured logging - [Examples directory](https://github.com/awslabs/aws-durable-execution-sdk-python/tree/main/examples) - More working examples [↑ Back to top](#table-of-contents) diff --git a/docs/index.md b/docs/index.md index 443f988..603d02d 100644 --- a/docs/index.md +++ b/docs/index.md @@ -6,7 +6,6 @@ - [What is the Durable Execution SDK?](#what-is-the-durable-execution-sdk) - [Key features](#key-features) -- [Quick navigation](#quick-navigation) - [Installation](#installation) - [Quick example](#quick-example) - [Core concepts](#core-concepts) @@ -36,37 +35,6 @@ The SDK provides a `DurableContext` that gives you operations like steps, waits, [↑ Back to top](#table-of-contents) -## Quick navigation - -**New to durable functions?** -- [Getting started guide](getting-started.md) - Build your first durable function - -**Core operations:** -- [Steps](core/steps.md) - Execute code with automatic checkpointing and retry support -- [Wait operations](core/wait.md) - Pause execution without blocking Lambda resources -- [Callbacks](core/callbacks.md) - Wait for external systems to respond -- [Invoke operations](core/invoke.md) - Call other durable functions and compose workflows -- [Child contexts](core/child-contexts.md) - Organize complex workflows into isolated units -- [Parallel operations](core/parallel.md) - Run multiple operations concurrently -- [Map operations](core/map.md) - Process collections in parallel with batching -- [Logger integration](core/logger.md) - Add structured logging to track execution - -**Advanced topics:** -- [Error handling](advanced/error-handling.md) - Handle failures and implement retry strategies -- [Testing modes](advanced/testing-modes.md) - Run tests locally or against deployed Lambda functions -- [Serialization](advanced/serialization.md) - Customize how data is serialized in checkpoints -- [Configuration](advanced/configuration.md) - Fine-tune operation behavior -- [Performance optimization](advanced/performance.md) - Best practices for efficient workflows - -**API reference:** -- [DurableContext](api-reference/context.md) - Main context class and methods -- [Configuration classes](api-reference/config.md) - StepConfig, CallbackConfig, and more -- [Decorators](api-reference/decorators.md) - @durable_execution, @durable_step, etc. -- [Types and protocols](api-reference/types.md) - Type definitions and interfaces -- [Exceptions](api-reference/exceptions.md) - DurableExecutionsError, InvocationError, and more - -[↑ Back to top](#table-of-contents) - ## Installation Install the SDK using pip: @@ -195,6 +163,8 @@ The SDK integrates with AWS Lambda's durable execution service to provide reliab The SDK uses a background thread to batch checkpoints for efficiency. Critical operations (like step starts with at-most-once semantics) block until the checkpoint is confirmed. Non-critical operations (like observability checkpoints) are asynchronous for better performance +[**See architecture diagrams**](architecture.md) for class diagrams and concurrency flows. + [↑ Back to top](#table-of-contents) ## Use cases diff --git a/docs/testing-patterns/basic-tests.md b/docs/testing-patterns/basic-tests.md index 7f6cd66..ceccddc 100644 --- a/docs/testing-patterns/basic-tests.md +++ b/docs/testing-patterns/basic-tests.md @@ -685,9 +685,8 @@ A: Set environment variables in your test setup or use pytest fixtures to manage ## See also - [Complex workflows](complex-workflows.md) - Testing multi-step workflows -- [Best practices](best-practices.md) - Testing recommendations -- [Pytest integration](../advanced/pytest-integration.md) - Pytest fixtures and markers -- [Custom assertions](../advanced/custom-assertions.md) - Advanced result inspection +- [Best practices](../best-practices.md) - Testing recommendations +- [Testing modes](../advanced/testing-modes.md) - Local and cloud test execution - [Steps](../core/steps.md) - Testing step operations - [Wait operations](../core/wait.md) - Testing wait operations - [Callbacks](../core/callbacks.md) - Testing callback operations diff --git a/docs/testing-patterns/complex-workflows.md b/docs/testing-patterns/complex-workflows.md index cac74a1..9cb50b6 100644 --- a/docs/testing-patterns/complex-workflows.md +++ b/docs/testing-patterns/complex-workflows.md @@ -659,7 +659,7 @@ A: Test both the failure case (verify the error is raised) and the recovery case ## See also - [Basic test patterns](basic-tests.md) - Simple testing patterns -- [Best practices](best-practices.md) - Testing recommendations +- [Best practices](../best-practices.md) - Testing recommendations - [Steps](../core/steps.md) - Step operations - [Wait operations](../core/wait.md) - Wait operations - [Callbacks](../core/callbacks.md) - Callback operations diff --git a/docs/testing-patterns/stores.md b/docs/testing-patterns/stores.md index dce1cfa..4b8d284 100644 --- a/docs/testing-patterns/stores.md +++ b/docs/testing-patterns/stores.md @@ -250,9 +250,8 @@ A: Each execution typically uses a few KB to a few MB depending on the number of ## See also - [Basic tests](basic-tests.md) - Simple test patterns -- [Cloud testing](../advanced/cloud-testing.md) - Testing with deployed functions -- [Test runner](../core/test-runner.md) - Test runner configuration -- [Best practices](best-practices.md) - Testing recommendations +- [Testing modes](../advanced/testing-modes.md) - Local and cloud test execution +- [Best practices](../best-practices.md) - Testing recommendations [↑ Back to top](#table-of-contents)