Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 118 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -533,6 +533,124 @@ classDiagram
OrderedLock --> Event : manages queue of
```

## Checkpointing System

The SDK invokes the AWS Lambda checkpoint API to persist execution state. Checkpoints are batched for efficiency and can be either
synchronous (blocking) or asynchronous (non-blocking). Critical checkpoints are blocking,
meaning that execution will not proceed until the checkpoint call has successfully completed.

### Checkpoint Types

Checkpoints are categorized by their action (START, SUCCEED, FAIL) and whether they are critical to execution correctness:

| Operation Type | Action | Is Sync? | Rationale |
|---------------|--------|----------|-----------|
| Step (AtMostOncePerRetry) | START | Yes | Prevents duplicate execution - must wait for confirmation |
| Step (AtLeastOncePerRetry) | START | No | Performance optimization - idempotent operations can retry |
| Step | SUCCEED/FAIL | Yes | Ensures result persisted before returning to caller |
| Callback | START | Yes | Must wait for API to generate callback ID |
| Callback | SUCCEED/FAIL | Yes | Ensures callback result persisted |
| Invoke | START | Yes | Ensures chained invoke recorded before proceeding |
| Invoke | SUCCEED/FAIL | Yes | Ensures invoke result persisted |
| Context (Child) | START | No | Fire-and-forget for performance - parent tracks completion |
| Context (Child) | SUCCEED/FAIL | Yes | Ensures child result available to parent |
| Wait | START | No | Observability only - no blocking needed |
| Wait | SUCCEED | Yes | Ensures wait completion recorded |
| Wait for Condition | START | No | Observability only - condition check is idempotent |
| Wait for Condition | SUCCEED/FAIL | Yes | Ensures condition result persisted |
| Empty Checkpoint | N/A | Yes (default) | Refreshes checkpoint token and operations list |

### Synchronous vs Asynchronous Checkpoints

**Synchronous Checkpoints (is_sync=True, default)**:
- Block the caller until the checkpoint is processed by the background thread
- Ensure the checkpoint is persisted before continuing execution
- Safe default for correctness
- Used for critical operations where confirmation is required

**Asynchronous Checkpoints (is_sync=False, opt-in)**:
- Return immediately without waiting for the checkpoint to complete
- Performance optimization for specific use cases
- Used for observability checkpoints and fire-and-forget operations
- Only safe when the operation is idempotent or non-critical

### Checkpoint Batching

The SDK uses a background thread to batch multiple checkpoint operations into a single API call for efficiency. This reduces API overhead and
improves throughput.

```mermaid
sequenceDiagram
participant MT as Main Thread
participant Q as Checkpoint Queue
participant BT as Background Thread
participant API as Durable Functions API

Note over MT,API: Synchronous Checkpoint Flow
MT->>Q: Enqueue operation + completion event
MT->>MT: Block on completion event
BT->>Q: Collect batch (up to 1 second or 750KB)
BT->>API: POST /checkpoint (batched operations)
API-->>BT: New checkpoint token + operations
BT->>BT: Update execution state
BT->>MT: Signal completion event
MT->>MT: Resume execution

Note over MT,API: Asynchronous Checkpoint Flow
MT->>Q: Enqueue operation (no event)
MT->>MT: Continue immediately
BT->>Q: Collect batch (up to 1 second or 750KB)
BT->>API: POST /checkpoint (batched operations)
API-->>BT: New checkpoint token + operations
BT->>BT: Update execution state
```

### Batching Configuration

Checkpoint batching is controlled by `CheckpointBatcherConfig`:

```python
@dataclass(frozen=True)
class CheckpointBatcherConfig:
max_batch_size_bytes: int = 750 * 1024 # 750KB
max_batch_time_seconds: float = 1.0 # 1 second
max_batch_operations: int | float = float("inf") # No limit
```

The background thread collects operations until one of these limits is reached:
1. Batch size exceeds 750KB
2. 1 second has elapsed since the first operation
3. Maximum operation count is reached (unlimited by default)

### Concurrency Management

The checkpointing system handles concurrent operations (map/parallel) by tracking parent-child relationships:

1. When a CONTEXT operation completes (SUCCEED/FAIL), all descendant operations are marked as orphaned
2. Orphaned operations are rejected if they attempt to checkpoint
3. This prevents child operations from checkpointing after their parent has already completed
4. Uses a single lock (`_parent_done_lock`) to coordinate completion and checkpoint validation

### Error Handling

When a checkpoint fails in the background thread:

1. **Error Signaling**: The background thread creates a `BackgroundThreadError` wrapping the original exception
2. **Event Notification**: All completion events (both in the current batch and queued operations) are signaled with this error
3. **Immediate Propagation**: Synchronous callers waiting on `create_checkpoint(is_sync=True)` immediately receive the `BackgroundThreadError`
4. **Future Prevention**: A failure event (`_checkpointing_failed`) is set to prevent any future checkpoint attempts
5. **Clean Termination**: The background thread exits cleanly after signaling all waiting operations

For **synchronous operations** (default `is_sync=True`):
- The main thread receives `BackgroundThreadError` immediately when calling `create_checkpoint()`
- This prevents further execution with corrupted state

For **asynchronous operations** (`is_sync=False`):
- The error is detected on the next synchronous checkpoint attempt
- The `_checkpointing_failed` event causes immediate failure before queuing

This ensures no code continues executing after a checkpoint failure, maintaining execution state integrity.

## License

This project is licensed under the [Apache-2.0 License](LICENSE).
21 changes: 21 additions & 0 deletions src/aws_durable_execution_sdk_python/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,27 @@ def __init__(self, message: str, step_id: str | None = None):
self.step_id = step_id


class BackgroundThreadError(BaseException):
"""Critical error from background checkpoint thread.

Derives from BaseException to bypass normal exception handlers.
Similar to KeyboardInterrupt or SystemExit - this is a system-level
error that should terminate execution immediately without attempting
to checkpoint or process the error.

This exception is raised in the user thread when the background
checkpoint processing thread encounters a fatal error. It propagates
through CompletionEvent.wait() to interrupt blocked user code.

Attributes:
source_exception: The original exception from the background thread
"""

def __init__(self, message: str, source_exception: Exception):
super().__init__(message)
self.source_exception = source_exception


class SuspendExecution(BaseException):
"""Raise this exception to suspend the current execution by returning PENDING to DAR.

Expand Down
152 changes: 98 additions & 54 deletions src/aws_durable_execution_sdk_python/execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@

import json
import logging
from concurrent.futures import ThreadPoolExecutor
from dataclasses import dataclass
from enum import Enum
from typing import TYPE_CHECKING, Any

from aws_durable_execution_sdk_python.context import DurableContext, ExecutionState
from aws_durable_execution_sdk_python.exceptions import (
BackgroundThreadError,
CheckpointError,
DurableExecutionsError,
ExecutionError,
Expand Down Expand Up @@ -246,72 +248,114 @@ def wrapper(event: Any, context: LambdaContext) -> MutableMapping[str, Any]:
state=execution_state, lambda_context=context
)

try:
# TODO: logger adapter to inject arn/correlated id for all log entries
# Use ThreadPoolExecutor for concurrent execution of user code and background checkpoint processing
with ThreadPoolExecutor(
max_workers=2, thread_name_prefix="dex-handler"
) as executor:
# Thread 1: Run background checkpoint processing
executor.submit(execution_state.checkpoint_batches_forever)

# Thread 2: Execute user function
logger.debug(
"%s entering user-space...", invocation_input.durable_execution_arn
)
result = func(input_event, durable_context)
user_future = executor.submit(func, input_event, durable_context)

logger.debug(
"%s exiting user-space...", invocation_input.durable_execution_arn
"%s waiting for user code completion...",
invocation_input.durable_execution_arn,
)

# done with userland
serialized_result = json.dumps(result)
try:
# Background checkpointing errors will propagate through CompletionEvent.wait() as BackgroundThreadError
result = user_future.result()

# large response handling here. Remember if checkpointing to complete, NOT to include
# payload in response
if (
serialized_result
and len(serialized_result) > LAMBDA_RESPONSE_SIZE_LIMIT
):
# done with userland
logger.debug(
"Response size (%s bytes) exceeds Lambda limit (%s) bytes). Checkpointing result.",
len(serialized_result),
LAMBDA_RESPONSE_SIZE_LIMIT,
"%s exiting user-space...",
invocation_input.durable_execution_arn,
)
success_operation = OperationUpdate.create_execution_succeed(
payload=serialized_result
)
execution_state.create_checkpoint(success_operation)
serialized_result = json.dumps(result)

# large response handling here. Remember if checkpointing to complete, NOT to include
# payload in response
if (
serialized_result
and len(serialized_result) > LAMBDA_RESPONSE_SIZE_LIMIT
):
logger.debug(
"Response size (%s bytes) exceeds Lambda limit (%s) bytes). Checkpointing result.",
len(serialized_result),
LAMBDA_RESPONSE_SIZE_LIMIT,
)
success_operation = OperationUpdate.create_execution_succeed(
payload=serialized_result
)
# Checkpoint large result with blocking (is_sync=True, default).
# Must ensure the result is persisted before returning to Lambda.
# Large results exceed Lambda response limits and must be stored durably
# before the execution completes.
execution_state.create_checkpoint_sync(success_operation)

# Stop background checkpointing thread
execution_state.stop_checkpointing()

return DurableExecutionInvocationOutput.create_succeeded(
result=""
).to_dict()

# Stop background checkpointing thread
execution_state.stop_checkpointing()

return DurableExecutionInvocationOutput.create_succeeded(
result=""
result=serialized_result
).to_dict()

return DurableExecutionInvocationOutput.create_succeeded(
result=serialized_result
).to_dict()
except SuspendExecution:
logger.debug("Suspending execution...")
return DurableExecutionInvocationOutput(
status=InvocationStatus.PENDING
).to_dict()
except CheckpointError:
logger.exception("Failed to checkpoint")
# Throw the error to terminate the lambda
raise

except InvocationError:
logger.exception("Invocation error. Must terminate.")
# Throw the error to trigger Lambda retry
raise
except ExecutionError as e:
logger.exception("Execution error. Must terminate without retry.")
return DurableExecutionInvocationOutput(
status=InvocationStatus.FAILED,
error=ErrorObject.from_exception(e),
).to_dict()
except Exception as e:
# all user-space errors go here
logger.exception("Execution failed")
failed_operation = OperationUpdate.create_execution_fail(
error=ErrorObject.from_exception(e)
)
# TODO: can optimize, if not too large can just return response rather than checkpoint
execution_state.create_checkpoint(failed_operation)
except BackgroundThreadError as bg_error:
# Background checkpoint system failed - propagated through CompletionEvent
# Do not attempt to checkpoint anything, just terminate immediately
logger.exception("Checkpoint processing failed")
execution_state.stop_checkpointing()
# Raise the original exception
raise bg_error.source_exception from bg_error

except SuspendExecution:
# User code suspended - stop background checkpointing thread
logger.debug("Suspending execution...")
execution_state.stop_checkpointing()
return DurableExecutionInvocationOutput(
status=InvocationStatus.PENDING
).to_dict()

except CheckpointError:
# Checkpoint system is broken - stop background thread and exit immediately
execution_state.stop_checkpointing()
logger.exception("Checkpoint system failed")
raise # Terminate Lambda immediately
except InvocationError:
execution_state.stop_checkpointing()
logger.exception("Invocation error. Must terminate.")
# Throw the error to trigger Lambda retry
raise
except ExecutionError as e:
execution_state.stop_checkpointing()
logger.exception("Execution error. Must terminate without retry.")
return DurableExecutionInvocationOutput(
status=InvocationStatus.FAILED,
error=ErrorObject.from_exception(e),
).to_dict()
except Exception as e:
# all user-space errors go here
logger.exception("Execution failed")
failed_operation = OperationUpdate.create_execution_fail(
error=ErrorObject.from_exception(e)
)
# TODO: can optimize, if not too large can just return response rather than checkpoint
execution_state.create_checkpoint_sync(failed_operation)

return DurableExecutionInvocationOutput(
status=InvocationStatus.FAILED
).to_dict()
execution_state.stop_checkpointing()
return DurableExecutionInvocationOutput(
status=InvocationStatus.FAILED
).to_dict()

return wrapper
3 changes: 3 additions & 0 deletions src/aws_durable_execution_sdk_python/operation/callback.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@ def create_callback_handler(
identifier=operation_identifier,
callback_options=callback_options,
)
# Checkpoint callback START with blocking (is_sync=True, default).
# Must wait for the API to generate and return the callback ID before proceeding.
# The callback ID is needed immediately by the caller to pass to external systems.
state.create_checkpoint(operation_update=create_callback_operation)

result: CheckpointedResult = state.get_checkpoint_result(
Expand Down
13 changes: 12 additions & 1 deletion src/aws_durable_execution_sdk_python/operation/child.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,11 @@ def child_handler(
identifier=operation_identifier,
sub_type=sub_type,
)
state.create_checkpoint(operation_update=start_operation)
# Checkpoint child context START with non-blocking (is_sync=False).
# This is a fire-and-forget operation for performance - we don't need to wait for
# persistence before executing the child context. The START checkpoint is purely
# for observability and tracking the operation hierarchy.
state.create_checkpoint(operation_update=start_operation, is_sync=False)

try:
raw_result: T = func()
Expand Down Expand Up @@ -123,6 +127,10 @@ def child_handler(
sub_type=sub_type,
context_options=ContextOptions(replay_children=replay_children),
)
# Checkpoint child context SUCCEED with blocking (is_sync=True, default).
# Must ensure the child context result is persisted before returning to the parent.
# This guarantees the result is durable and child operations won't be re-executed on replay
# (unless replay_children=True for large payloads).
state.create_checkpoint(operation_update=success_operation)

logger.debug(
Expand All @@ -139,6 +147,9 @@ def child_handler(
fail_operation = OperationUpdate.create_context_fail(
identifier=operation_identifier, error=error_object, sub_type=sub_type
)
# Checkpoint child context FAIL with blocking (is_sync=True, default).
# Must ensure the failure state is persisted before raising the exception.
# This guarantees the error is durable and child operations won't be re-executed on replay.
state.create_checkpoint(operation_update=fail_operation)

# InvocationError and its derivatives can be retried
Expand Down
3 changes: 3 additions & 0 deletions src/aws_durable_execution_sdk_python/operation/invoke.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,9 @@ def invoke_handler(
),
)

# Checkpoint invoke START with blocking (is_sync=True, default).
# Must ensure the chained invocation is recorded before suspending execution.
# This guarantees the invoke operation is durable and will be tracked by the backend.
state.create_checkpoint(operation_update=start_operation)

logger.debug(
Expand Down
Loading