diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 708b856..d4da86d 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -25,6 +25,102 @@ hatch run types:check hatch fmt ``` +There is a convenience script for the above that you can run from the root of the repo as you prepare your PR: +``` +ops/ci-checks.sh +``` + +## Coding Standards +Consistency is important for maintainability. Please adhere to the house-style of the repo, unless there's a really +good reason to break pattern. + +### General style +1. Follow the [Python Style Guide by Google](https://google.github.io/styleguide/pyguide.html) in general. +2. Standardize to [ruff](https://docs.astral.sh/ruff/) formatting and linting rules. CI checks enforce these too. +3. Avoid pulling in extra runtime dependencies. The only dependency is [boto3](https://boto3.amazonaws.com/). The + reason is that this SDK adds size to the AWS Lambda function of the consumer, so we should keep it as light as + possible. +4. Never use `RLock` when `Lock` would do. The reason is to highlight recursive calls that have the potential for deadlocking + immediately, so that RLock is a deliberate and considered decision after having considered deadlocking concerns, rather + than just the default. + +### Organization +1. Do not allow circular references, even if you can get away with it by using `if TYPE_CHECKING`. Circular references are a + sign that the structure of the code is not clear enough. It makes for inefficient memory management and it makes the + code harder to understand and follow. Do use `config` and `types` as the lowest-level import if you run into circular + reference issues. +2. Do not use `__init__` files for any meaningful code or even just type declarations. Why? Because the purpose of init is not + to serve as a grab-bag of code that doesn't otherwise have a home. +3. Do not introduce `utils` or `helper` style modules as a grab-bag of ad hoc functions. Introduce domain-specific classes to + encapsulate and model logic. + +### Data Structures & Typing +1. Model data structure with immutable classes and precise type hints. (In other words, use frozen dataclasses with exact, + narrow type hints.) Do not rely on unstructured dicts. Why immutable? These are inherently thread-safe, and it forces you + to think carefully about when and where you need to mutate values. + +2. A rare exception to the general rule to prefer immutable classes wherever possible, is `state.ExecutionState`, which maintains + the state of the on-going Durable Execution and encapsulates thread-safe state mutations as the execution progresses. + +3. Rely on exact and explicit type declarations rather than duck typing. Why? Yes, duck typing is very pythonic. However, this + is a complex code-base, and exact and explicit type declarations signal intent clearly so that the type checker can help + you catch errors more quickly. LLMs have an easier time understanding the intent of the code with the type hints, and it makes + it easier for you to spot mistaken assumptions that the LLMs might make about the code. The other reason is that it makes the + experience of developers much easier with intelligent and context-aware autocomplete hints in an IDE. + +4. Declare a type definition wherever you declare a variable, even within a function scope and even where it's implied. For example, + even though the `str` might be _implied_ because of the `call` return type, make it explicit: + +``` +def my_function() -> str: + my_var: str = arb.call(1, 2, 3) + return f"arb result: {my_var}" +``` + +5. To update a field in a frozen dataclass, prefer to use a `clone` or `with_field` class method constructor or reinitialization, + rather than dataclass `replace`. There is no big technical reason for this, it's more a soft pattern. The philosophy of an update + should be more about thoughfully and purposefully creating a _new_ instance than "in-place editing" an existing one. + + +### Initialization and conversion +1. Class constructors must be light and not do more than initialize the class. In a dataclass you shouldn't even need an `__init__`. + Use a `@classmethod` factory method instead to encapsulate more advanced logic. For example, if a class depends on logic that + might fail, encapsulate this in a `create` classmethod: + +```python +@dataclass(frozen=True) +class MyClass: + id: str + name: str + timeout: int + + @classmethod + def create(cls, name: str, timeout: int = 30) -> Config: + """Factory contains """ + if timeout <= 0: + raise ValueError("timeout must be positive") + + # Generate unique ID + config_id: str = f"cfg_{uuid.uuid4().hex[:8]}" + + return cls(id=config_id, name=name, timeout=timeout) +``` + +2. Encapsulate conversion logic in a `from_x` factory and `to_x` method on a class. + +```python +@dataclass(frozen=True) +class WaitOptions: + wait_seconds: int = 0 + + @classmethod + def from_dict(cls, data: MutableMapping[str, Any]) -> WaitOptions: + return cls(wait_seconds=data.get("WaitSeconds", 0)) + + def to_dict(self) -> MutableMapping[str, Any]: + return {"WaitSeconds": self.wait_seconds} +``` + ## Set up your IDE Point your IDE at the hatch virtual environment to have it recognize dependencies and imports. diff --git a/README.md b/README.md index 2b74024..518f9e5 100644 --- a/README.md +++ b/README.md @@ -22,6 +22,517 @@ for how to contribute to this package. tldr; use `hatch` and it will manage virtual envs and dependencies for you, so you don't have to do it manually. +## Core Architecture +The entry-point that consumers of the SDK interact with is the DurableContext. + +### DurableContext Operations +- **Core Methods**: `set_logger`, `step`, `invoke`, `map`, `parallel`, `run_in_child_context`, `wait`, `create_callback`, `wait_for_callback`, `wait_for_condition` +- **Thread Safety**: Uses `OrderedCounter` for generating sequential step IDs +- **State Management**: Delegates to `ExecutionState` for checkpointing + +### Concurrency Implementation +- **Map/Parallel**: Both inherit from `ConcurrentExecutor` abstract base class +- **Thread Pool**: Uses `ThreadPoolExecutor` for concurrent execution +- **State Tracking**: `ExecutableWithState` manages individual task lifecycle +- **Completion Logic**: `ExecutionCounters` tracks success/failure criteria +- **Suspension**: `TimerScheduler` handles timed suspensions and resumptions + +### Configuration System +- **Modular Configs**: Separate config classes for each operation type +- **Completion Control**: `CompletionConfig` defines success/failure criteria +- **Serialization**: `SerDes` interface for custom serialization + +### Operation Handlers +- **Separation of Concerns**: Each operation has dedicated handler function +- **Checkpointing**: All operations integrate with execution state checkpointing +- **Error Handling**: Consistent error handling and retry logic across operations + + +```mermaid +classDiagram + class DurableContext { + -ExecutionState state + -Any lambda_context + -str _parent_id + -OrderedCounter _step_counter + -LogInfo _log_info + -Logger logger + + +set_logger(LoggerInterface new_logger) + +step(Callable func, str name, StepConfig config) T + +invoke(str function_name, P payload, str name, InvokeConfig config) R + +map(Sequence inputs, Callable func, str name, MapConfig config) BatchResult + +parallel(Sequence functions, str name, ParallelConfig config) BatchResult + +run_in_child_context(Callable func, str name, ChildConfig config) T + +wait(int seconds, str name) + +create_callback(str name, CallbackConfig config) Callback + +wait_for_callback(Callable submitter, str name, WaitForCallbackConfig config) Any + +wait_for_condition(Callable check, WaitForConditionConfig config, str name) T + } + + class DurableContextProtocol { + <> + +step(Callable func, str name, StepConfig config) T + +run_in_child_context(Callable func, str name, ChildConfig config) T + +map(Sequence inputs, Callable func, str name, MapConfig config) BatchResult + +parallel(Sequence functions, str name, ParallelConfig config) BatchResult + +wait(int seconds, str name) + +create_callback(str name, CallbackConfig config) Callback + } + + class OrderedCounter { + -OrderedLock _lock + -int _counter + +increment() int + +decrement() int + +get_current() int + } + + class ExecutionState { + +str durable_execution_arn + +get_checkpoint_result(str operation_id) CheckpointedResult + +create_checkpoint(OperationUpdate operation_update) + } + + class Logger { + +LoggerInterface logger + +LogInfo info + +with_log_info(LogInfo info) Logger + +from_log_info(LoggerInterface logger, LogInfo info) Logger + } + + DurableContext ..|> DurableContextProtocol : implements + DurableContext --> ExecutionState : uses + DurableContext --> OrderedCounter : contains + DurableContext --> Logger : contains +``` + +## Operation Handlers +The `DurableContext` calls operation handlers, which contain the execution logic for each operation. + +```mermaid +classDiagram + class DurableContext { + +step(Callable func, str name, StepConfig config) T + +invoke(str function_name, P payload, str name, InvokeConfig config) R + +map(Sequence inputs, Callable func, str name, MapConfig config) BatchResult + +parallel(Sequence functions, str name, ParallelConfig config) BatchResult + +run_in_child_context(Callable func, str name, ChildConfig config) T + +wait(int seconds, str name) + +create_callback(str name, CallbackConfig config) Callback + +wait_for_callback(Callable submitter, str name, WaitForCallbackConfig config) Any + +wait_for_condition(Callable check, WaitForConditionConfig config, str name) T + } + + class step_handler { + <> + +step_handler(Callable func, ExecutionState state, OperationIdentifier op_id, StepConfig config, Logger logger) T + } + + class invoke_handler { + <> + +invoke_handler(str function_name, P payload, ExecutionState state, OperationIdentifier op_id, InvokeConfig config) R + } + + class map_handler { + <> + +map_handler(Sequence items, Callable func, MapConfig config, ExecutionState state, Callable run_in_child_context) BatchResult + } + + class parallel_handler { + <> + +parallel_handler(Sequence callables, ParallelConfig config, ExecutionState state, Callable run_in_child_context) BatchResult + } + + class child_handler { + <> + +child_handler(Callable func, ExecutionState state, OperationIdentifier op_id, ChildConfig config) T + } + + class wait_handler { + <> + +wait_handler(int seconds, ExecutionState state, OperationIdentifier op_id) + } + + class create_callback_handler { + <> + +create_callback_handler(ExecutionState state, OperationIdentifier op_id, CallbackConfig config) str + } + + class wait_for_callback_handler { + <> + +wait_for_callback_handler(DurableContext context, Callable submitter, str name, WaitForCallbackConfig config) Any + } + + class wait_for_condition_handler { + <> + +wait_for_condition_handler(Callable check, WaitForConditionConfig config, ExecutionState state, OperationIdentifier op_id, Logger logger) T + } + + DurableContext --> step_handler : calls + DurableContext --> invoke_handler : calls + DurableContext --> map_handler : calls + DurableContext --> parallel_handler : calls + DurableContext --> child_handler : calls + DurableContext --> wait_handler : calls + DurableContext --> create_callback_handler : calls + DurableContext --> wait_for_callback_handler : calls + DurableContext --> wait_for_condition_handler : calls +``` + +## Configuration Module Classes + +```mermaid +classDiagram + class StepConfig { + +Callable retry_strategy + +StepSemantics step_semantics + +SerDes serdes + } + + class InvokeConfig~P,R~ { + +int timeout_seconds + +SerDes~P~ serdes_payload + +SerDes~R~ serdes_result + } + + class MapConfig { + +int max_concurrency + +ItemBatcher item_batcher + +CompletionConfig completion_config + +SerDes serdes + } + + class ParallelConfig { + +int max_concurrency + +CompletionConfig completion_config + +SerDes serdes + } + + class ChildConfig~T~ { + +SerDes serdes + +OperationSubType sub_type + +Callable~T,str~ summary_generator + } + + class CallbackConfig { + +int timeout_seconds + +int heartbeat_timeout_seconds + +SerDes serdes + } + + class WaitForCallbackConfig { + +Callable retry_strategy + } + + class WaitForConditionConfig~T~ { + +Callable wait_strategy + +T initial_state + +SerDes serdes + } + + class CompletionConfig { + +int min_successful + +int tolerated_failure_count + +float tolerated_failure_percentage + +first_successful()$ CompletionConfig + +all_completed()$ CompletionConfig + +all_successful()$ CompletionConfig + } + + class ItemBatcher~T~ { + +int max_items_per_batch + +float max_item_bytes_per_batch + +T batch_input + } + + WaitForCallbackConfig --|> CallbackConfig : extends + MapConfig --> CompletionConfig : contains + MapConfig --> ItemBatcher : contains + ParallelConfig --> CompletionConfig : contains +``` + +## Types and Protocols Module + +```mermaid +classDiagram + class DurableContextProtocol { + <> + +step(Callable func, str name, StepConfig config) T + +run_in_child_context(Callable func, str name, ChildConfig config) T + +map(Sequence inputs, Callable func, str name, MapConfig config) BatchResult + +parallel(Sequence functions, str name, ParallelConfig config) BatchResult + +wait(int seconds, str name) + +create_callback(str name, CallbackConfig config) Callback + } + + class LoggerInterface { + <> + +debug(object msg, *args, Mapping extra) + +info(object msg, *args, Mapping extra) + +warning(object msg, *args, Mapping extra) + +error(object msg, *args, Mapping extra) + +exception(object msg, *args, Mapping extra) + } + + class CallbackProtocol~C_co~ { + <> + +str callback_id + +result() C_co + } + + class BatchResultProtocol~T~ { + <> + +get_results() list~T~ + } + + class StepContext { + +LoggerInterface logger + } + + class WaitForConditionCheckContext { + +LoggerInterface logger + } + + class OperationContext { + +LoggerInterface logger + } + + StepContext --|> OperationContext : extends + WaitForConditionCheckContext --|> OperationContext : extends +``` + +## SerDes Module Classes + +```mermaid +classDiagram + class SerDes~T~ { + <> + +serialize(T value, SerDesContext context) str + +deserialize(str data, SerDesContext context) T + } + + class JsonSerDes~T~ { + +serialize(T value, SerDesContext context) str + +deserialize(str data, SerDesContext context) T + } + + class SerDesContext { + +str operation_id + +str durable_execution_arn + } + + class serialize { + <> + +serialize(SerDes serdes, T value, str operation_id, str durable_execution_arn) str + } + + class deserialize { + <> + +deserialize(SerDes serdes, str data, str operation_id, str durable_execution_arn) T + } + + JsonSerDes ..|> SerDes : implements + serialize --> SerDes : uses + deserialize --> SerDes : uses + SerDes --> SerDesContext : uses +``` + +## Concurrency Architecture - Map and Parallel Operations + +```mermaid +classDiagram + class ConcurrentExecutor~CallableType,ResultType~ { + <> + +list~Executable~ executables + +int max_concurrency + +CompletionConfig completion_config + +ExecutionCounters counters + +list~ExecutableWithState~ executables_with_state + +Event _completion_event + +SuspendExecution _suspend_exception + + +execute(ExecutionState state, Callable run_in_child_context) BatchResult~ResultType~ + +execute_item(DurableContext child_context, Executable executable)* ResultType + +should_execution_suspend() SuspendResult + -_on_task_complete(ExecutableWithState exe_state, Future future, TimerScheduler scheduler) + -_create_result() BatchResult~ResultType~ + } + + class MapExecutor~T,R~ { + +Sequence~T~ items + +execute_item(DurableContext child_context, Executable executable) R + +from_items(Sequence items, Callable func, MapConfig config)$ MapExecutor + } + + class ParallelExecutor { + +execute_item(DurableContext child_context, Executable executable) R + +from_callables(Sequence callables, ParallelConfig config)$ ParallelExecutor + } + + class Executable~CallableType~ { + +int index + +CallableType func + } + + class ExecutableWithState~CallableType,ResultType~ { + +Executable~CallableType~ executable + -BranchStatus _status + -Future _future + -float _suspend_until + -ResultType _result + -Exception _error + + +run(Future future) + +suspend() + +suspend_with_timeout(float timestamp) + +complete(ResultType result) + +fail(Exception error) + +reset_to_pending() + +can_resume() bool + +is_running() bool + } + + class ExecutionCounters { + +int total_tasks + +int min_successful + +int success_count + +int failure_count + -Lock _lock + + +complete_task() + +fail_task() + +should_complete() bool + +is_all_completed() bool + +is_min_successful_reached() bool + +is_failure_tolerance_exceeded() bool + } + + class TimerScheduler { + +Callable resubmit_callback + -list _pending_resumes + -Lock _lock + -Event _shutdown + -Thread _timer_thread + + +schedule_resume(ExecutableWithState exe_state, float resume_time) + +shutdown() + -_timer_loop() + } + + class BatchResult~R~ { + +list~BatchItem~R~~ all + +CompletionReason completion_reason + +succeeded() list~BatchItem~R~~ + +failed() list~BatchItem~R~~ + +get_results() list~R~ + +throw_if_error() + } + + class BatchItem~R~ { + +int index + +BatchItemStatus status + +R result + +ErrorObject error + } + + MapExecutor --|> ConcurrentExecutor : extends + ParallelExecutor --|> ConcurrentExecutor : extends + ConcurrentExecutor --> ExecutableWithState : manages + ConcurrentExecutor --> ExecutionCounters : uses + ConcurrentExecutor --> TimerScheduler : uses + ConcurrentExecutor --> BatchResult : creates + ExecutableWithState --> Executable : contains + BatchResult --> BatchItem : contains +``` + +## Concurrency Flow + +```mermaid +sequenceDiagram + participant DC as DurableContext + participant MH as map_handler + participant ME as MapExecutor + participant CE as ConcurrentExecutor + participant TP as ThreadPoolExecutor + participant TS as TimerScheduler + participant EC as ExecutionCounters + + DC->>MH: map(inputs, func, config) + MH->>ME: MapExecutor.from_items() + ME->>CE: execute(state, run_in_child_context) + + CE->>TP: ThreadPoolExecutor(max_workers) + CE->>TS: TimerScheduler(resubmitter) + CE->>EC: ExecutionCounters(total, min_successful) + + loop For each executable + CE->>TP: submit_task(executable_with_state) + TP->>CE: execute_item_in_child_context() + CE->>DC: run_in_child_context(child_func) + DC->>ME: execute_item(child_context, executable) + end + + par Task Completion Handling + TP->>CE: on_task_complete(future) + CE->>EC: complete_task() / fail_task() + CE->>CE: should_execution_suspend() + alt Should Complete + CE->>CE: _completion_event.set() + else Should Suspend + CE->>TS: schedule_resume(exe_state, timestamp) + end + end + + CE->>CE: _completion_event.wait() + CE->>CE: _create_result() + CE->>DC: BatchResult +``` + +## Threading and Locking + +```mermaid +classDiagram + class OrderedLock { + -Lock _lock + -deque~Event~ _waiters + -bool _is_broken + -Exception _exception + + +acquire() bool + +release() + +reset() + +is_broken() bool + +__enter__() OrderedLock + +__exit__(exc_type, exc_val, exc_tb) + } + + class OrderedCounter { + -OrderedLock _lock + -int _counter + + +increment() int + +decrement() int + +get_current() int + } + + class Event { + <> + +set() + +wait() + } + + class Lock { + <> + +acquire() + +release() + } + + OrderedCounter --> OrderedLock : uses + OrderedLock --> Lock : contains + OrderedLock --> Event : manages queue of +``` + ## License This project is licensed under the [Apache-2.0 License](LICENSE). diff --git a/ops/ci-checks.sh b/ops/ci-checks.sh new file mode 100755 index 0000000..c2ad664 --- /dev/null +++ b/ops/ci-checks.sh @@ -0,0 +1,15 @@ + +#!/bin/sh + +set -e + +hatch run test:cov +echo SUCCESS: tests + coverage + +# type checks +hatch run types:check +echo SUCCESS: typings + +# static analysis +hatch fmt +echo SUCCESS: linting/fmt