Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 17 additions & 2 deletions src/aws_durable_execution_sdk_python/concurrency.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
from aws_durable_execution_sdk_python.lambda_service import OperationSubType
from aws_durable_execution_sdk_python.serdes import SerDes
from aws_durable_execution_sdk_python.state import ExecutionState
from aws_durable_execution_sdk_python.types import DurableContext
from aws_durable_execution_sdk_python.types import DurableContext, SummaryGenerator


logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -566,13 +566,24 @@ def __init__(
sub_type_iteration: OperationSubType,
name_prefix: str,
serdes: SerDes | None,
summary_generator: SummaryGenerator | None = None,
):
"""Initialize ConcurrentExecutor.

Args:
summary_generator: Optional function to generate compact summaries for large results.
When the serialized result exceeds 256KB, this generator creates a JSON summary
instead of checkpointing the full result. Used by map/parallel operations to
handle large BatchResult payloads efficiently. Matches TypeScript behavior in
run-in-child-context-handler.ts.
"""
self.executables = executables
self.max_concurrency = max_concurrency
self.completion_config = completion_config
self.sub_type_top = sub_type_top
self.sub_type_iteration = sub_type_iteration
self.name_prefix = name_prefix
self.summary_generator = summary_generator

# Event-driven state tracking for when the executor is done
self._completion_event = threading.Event()
Expand Down Expand Up @@ -785,7 +796,11 @@ def execute_in_child_context(child_context: DurableContext) -> ResultType:
return run_in_child_context(
execute_in_child_context,
f"{self.name_prefix}{executable.index}",
ChildConfig(serdes=self.serdes, sub_type=self.sub_type_iteration),
ChildConfig(
serdes=self.serdes,
sub_type=self.sub_type_iteration,
summary_generator=self.summary_generator,
),
)


Expand Down
171 changes: 169 additions & 2 deletions src/aws_durable_execution_sdk_python/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

from aws_durable_execution_sdk_python.lambda_service import OperationSubType
from aws_durable_execution_sdk_python.serdes import SerDes
from aws_durable_execution_sdk_python.types import SummaryGenerator


Numeric = int | float # deliberately leaving off complex
Expand All @@ -39,6 +40,38 @@ class TerminationMode(Enum):

@dataclass(frozen=True)
class CompletionConfig:
"""Configuration for determining when parallel/map operations complete.

This class defines the success/failure criteria for operations that process
multiple items or branches concurrently.

Args:
min_successful: Minimum number of successful completions required.
If None, no minimum is enforced. Use this to implement "at least N
must succeed" semantics.

tolerated_failure_count: Maximum number of failures allowed before
the operation is considered failed. If None, no limit on failure count.
Use this to implement "fail fast after N failures" semantics.

tolerated_failure_percentage: Maximum percentage of failures allowed
(0.0 to 100.0). If None, no percentage limit is enforced.
Use this to implement "fail if more than X% fail" semantics.

Note:
The operation completes when any of the completion criteria are met:
- Enough successes (min_successful reached)
- Too many failures (tolerated limits exceeded)
- All items/branches completed

Example:
# Succeed if at least 3 succeed, fail if more than 2 fail
config = CompletionConfig(
min_successful=3,
tolerated_failure_count=2
)
"""

min_successful: int | None = None
tolerated_failure_count: int | None = None
tolerated_failure_percentage: int | float | None = None
Expand Down Expand Up @@ -77,11 +110,47 @@ def all_successful():

@dataclass(frozen=True)
class ParallelConfig:
"""Configuration options for parallel execution operations.

This class configures how parallel operations are executed, including
concurrency limits, completion criteria, and serialization behavior.

Args:
max_concurrency: Maximum number of parallel branches to execute concurrently.
If None, no limit is imposed and all branches run concurrently.
Use this to control resource usage and prevent overwhelming the system.

completion_config: Defines when the parallel operation should complete.
Controls success/failure criteria for the overall parallel operation.
Default is CompletionConfig.all_successful() which requires all branches
to succeed. Other options include first_successful() and all_completed().

serdes: Custom serialization/deserialization configuration for parallel results.
If None, uses the default serializer. This allows custom handling of
complex result types or optimization for large result sets.

summary_generator: Function to generate compact summaries for large results (>256KB).
When the serialized result exceeds CHECKPOINT_SIZE_LIMIT, this generator
creates a JSON summary instead of checkpointing the full result. The operation
is marked with ReplayChildren=true to reconstruct the full result during replay.

Used internally by map/parallel operations to handle large BatchResult payloads.
Signature: (result: T) -> str

Example:
# Run at most 3 branches concurrently, succeed if any one succeeds
config = ParallelConfig(
max_concurrency=3,
completion_config=CompletionConfig.first_successful()
)
"""

max_concurrency: int | None = None
completion_config: CompletionConfig = field(
default_factory=CompletionConfig.all_successful
)
serdes: SerDes | None = None
summary_generator: SummaryGenerator | None = None


class StepSemantics(Enum):
Expand All @@ -106,12 +175,41 @@ class CheckpointMode(Enum):

@dataclass(frozen=True)
class ChildConfig(Generic[T]):
"""Options when running inside a child context."""
"""Configuration options for child context operations.

This class configures how child contexts are executed and checkpointed,
matching the TypeScript ChildConfig interface behavior.

Args:
serdes: Custom serialization/deserialization configuration for the child context data.
If None, uses the default serializer. This allows different serialization
strategies for child operations vs parent operations.

sub_type: Operation subtype identifier used for tracking and debugging.
Examples: OperationSubType.MAP_ITERATION, OperationSubType.PARALLEL_BRANCH.
Used internally by the execution engine for operation classification.

summary_generator: Function to generate compact summaries for large results (>256KB).
When the serialized result exceeds CHECKPOINT_SIZE_LIMIT, this generator
creates a JSON summary instead of checkpointing the full result. The operation
is marked with ReplayChildren=true to reconstruct the full result during replay.

Used internally by map/parallel operations to handle large BatchResult payloads.
Signature: (result: T) -> str
Note:
checkpoint_mode field is commented out as it's not currently implemented.
When implemented, it will control when checkpoints are created:
- CHECKPOINT_AT_START_AND_FINISH: Checkpoint at both start and completion (default)
- CHECKPOINT_AT_FINISH: Only checkpoint when operation completes
- NO_CHECKPOINT: No automatic checkpointing

See TypeScript reference: aws-durable-execution-sdk-js/src/types/index.ts
"""

# checkpoint_mode: CheckpointMode = CheckpointMode.CHECKPOINT_AT_START_AND_FINISH
serdes: SerDes | None = None
sub_type: OperationSubType | None = None
summary_generator: Callable[[T], str] | None = None
summary_generator: SummaryGenerator | None = None


class ItemsPerBatchUnit(Enum):
Expand All @@ -121,17 +219,86 @@ class ItemsPerBatchUnit(Enum):

@dataclass(frozen=True)
class ItemBatcher(Generic[T]):
"""Configuration for batching items in map operations.

This class defines how individual items should be grouped together into batches
for more efficient processing in map operations.

Args:
max_items_per_batch: Maximum number of items to include in a single batch.
If 0 (default), no item count limit is applied. Use this to control
batch size when processing many small items.

max_item_bytes_per_batch: Maximum total size in bytes for items in a batch.
If 0 (default), no size limit is applied. Use this to control memory
usage when processing large items or when items vary significantly in size.

batch_input: Additional data to include with each batch.
This data is passed to the processing function along with the batched items.
Useful for providing context or configuration that applies to all items
in the batch.

Example:
# Batch up to 100 items or 1MB, whichever comes first
batcher = ItemBatcher(
max_items_per_batch=100,
max_item_bytes_per_batch=1024*1024,
batch_input={"processing_mode": "fast"}
)
"""

max_items_per_batch: int = 0
max_item_bytes_per_batch: int | float = 0
batch_input: T | None = None


@dataclass(frozen=True)
class MapConfig:
"""Configuration options for map operations over collections.

This class configures how map operations process collections of items,
including concurrency, batching, completion criteria, and serialization.

Args:
max_concurrency: Maximum number of items to process concurrently.
If None, no limit is imposed and all items are processed concurrently.
Use this to control resource usage when processing large collections.

item_batcher: Configuration for batching multiple items together for processing.
Allows grouping items by count or size to optimize processing efficiency.
Default is no batching (each item processed individually).

completion_config: Defines when the map operation should complete.
Controls success/failure criteria for the overall map operation.
Default allows any number of failures. Use CompletionConfig.all_successful()
to require all items to succeed.

serdes: Custom serialization/deserialization configuration for map results.
If None, uses the default serializer. This allows custom handling of
complex item types or optimization for large result collections.

summary_generator: Function to generate compact summaries for large results (>256KB).
When the serialized result exceeds CHECKPOINT_SIZE_LIMIT, this generator
creates a JSON summary instead of checkpointing the full result. The operation
is marked with ReplayChildren=true to reconstruct the full result during replay.

Used internally by map/parallel operations to handle large BatchResult payloads.
Signature: (result: T) -> str

Example:
# Process 5 items at a time, batch by count, require all to succeed
config = MapConfig(
max_concurrency=5,
item_batcher=ItemBatcher(max_items_per_batch=10),
completion_config=CompletionConfig.all_successful()
)
"""

max_concurrency: int | None = None
item_batcher: ItemBatcher = field(default_factory=ItemBatcher)
completion_config: CompletionConfig = field(default_factory=CompletionConfig)
serdes: SerDes | None = None
summary_generator: SummaryGenerator | None = None


@dataclass
Expand Down
2 changes: 1 addition & 1 deletion src/aws_durable_execution_sdk_python/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ def parallel(
) -> BatchResult[T]:
"""Execute multiple callables in parallel."""

def parallel_in_child_context(child_context):
def parallel_in_child_context(child_context) -> BatchResult[T]:
return parallel_handler(
callables=functions,
config=config,
Expand Down
16 changes: 15 additions & 1 deletion src/aws_durable_execution_sdk_python/operation/child.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,14 +88,28 @@ def child_handler(
operation_id=operation_identifier.operation_id,
durable_execution_arn=state.durable_execution_arn,
)
# Summary Generator Logic:
# When the serialized result exceeds 256KB, we use ReplayChildren mode to avoid
# checkpointing large payloads. Instead, we checkpoint a compact summary and mark
# the operation for replay. This matches the TypeScript implementation behavior.
#
# See TypeScript reference:
# - aws-durable-execution-sdk-js/src/handlers/run-in-child-context-handler/run-in-child-context-handler.ts (lines ~200-220)
#
# The summary generator creates a JSON summary with metadata (type, counts, status)
# instead of the full BatchResult. During replay, the child context is re-executed
# to reconstruct the full result rather than deserializing from the checkpoint.
replay_children: bool = False
if len(serialized_result) > CHECKPOINT_SIZE_LIMIT:
logger.debug(
"Large payload detected, using ReplayChildren mode: id: %s, name: %s",
"Large payload detected, using ReplayChildren mode: id: %s, name: %s, payload_size: %d, limit: %d",
operation_identifier.operation_id,
operation_identifier.name,
len(serialized_result),
CHECKPOINT_SIZE_LIMIT,
)
replay_children = True
# Use summary generator if provided, otherwise use empty string (matches TypeScript)
serialized_result = (
config.summary_generator(raw_result) if config.summary_generator else ""
)
Expand Down
29 changes: 27 additions & 2 deletions src/aws_durable_execution_sdk_python/operation/map.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from __future__ import annotations

import json
import logging
from collections.abc import Callable, Sequence
from typing import TYPE_CHECKING, Generic, TypeVar
Expand All @@ -18,7 +19,7 @@
from aws_durable_execution_sdk_python.config import ChildConfig
from aws_durable_execution_sdk_python.serdes import SerDes
from aws_durable_execution_sdk_python.state import ExecutionState
from aws_durable_execution_sdk_python.types import DurableContext
from aws_durable_execution_sdk_python.types import DurableContext, SummaryGenerator


logger = logging.getLogger(__name__)
Expand All @@ -40,6 +41,7 @@ def __init__(
iteration_sub_type: OperationSubType,
name_prefix: str,
serdes: SerDes | None,
summary_generator: SummaryGenerator | None = None,
):
super().__init__(
executables=executables,
Expand All @@ -49,6 +51,7 @@ def __init__(
sub_type_iteration=iteration_sub_type,
name_prefix=name_prefix,
serdes=serdes,
summary_generator=summary_generator,
)
self.items = items

Expand All @@ -73,6 +76,7 @@ def from_items(
iteration_sub_type=OperationSubType.MAP_ITERATION,
name_prefix="map-item-",
serdes=config.serdes,
summary_generator=config.summary_generator,
)

def execute_item(self, child_context, executable: Executable[Callable]) -> R:
Expand All @@ -93,7 +97,28 @@ def map_handler(
],
) -> BatchResult[R]:
"""Execute a callable for each item in parallel."""
# Summary Generator Construction (matches TypeScript implementation):
# Construct the summary generator at the handler level, just like TypeScript does in map-handler.ts.
# This matches the pattern where handlers are responsible for configuring operation-specific behavior.
#
# See TypeScript reference: aws-durable-execution-sdk-js/src/handlers/map-handler/map-handler.ts (~line 79)

executor: MapExecutor[T, R] = MapExecutor.from_items(
items=items, func=func, config=config or MapConfig()
items=items,
func=func,
config=config or MapConfig(summary_generator=MapSummaryGenerator()),
)
return executor.execute(execution_state, run_in_child_context)


class MapSummaryGenerator:
def __call__(self, result: BatchResult) -> str:
fields = {
"totalCount": result.total_count,
"successCount": result.success_count,
"failureCount": result.failure_count,
"completionReason": result.completion_reason.value,
"status": result.status.value,
"type": "MapResult",
}
return json.dumps(fields)
Loading