-
Notifications
You must be signed in to change notification settings - Fork 15
Expand file tree
/
Copy pathparallel.py
More file actions
122 lines (105 loc) · 4.6 KB
/
parallel.py
File metadata and controls
122 lines (105 loc) · 4.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
"""Implementation for Durable Parallel operation."""
from __future__ import annotations
import json
import logging
from collections.abc import Callable, Sequence
from typing import TYPE_CHECKING, TypeVar
from aws_durable_execution_sdk_python.concurrency.executor import ConcurrentExecutor
from aws_durable_execution_sdk_python.concurrency.models import Executable
from aws_durable_execution_sdk_python.config import ParallelConfig
from aws_durable_execution_sdk_python.lambda_service import OperationSubType
if TYPE_CHECKING:
from aws_durable_execution_sdk_python.concurrency.models import BatchResult
from aws_durable_execution_sdk_python.context import DurableContext
from aws_durable_execution_sdk_python.identifier import OperationIdentifier
from aws_durable_execution_sdk_python.serdes import SerDes
from aws_durable_execution_sdk_python.state import ExecutionState
from aws_durable_execution_sdk_python.types import SummaryGenerator
logger = logging.getLogger(__name__)
# Result type
R = TypeVar("R")
class ParallelExecutor(ConcurrentExecutor[Callable, R]):
def __init__(
self,
executables: list[Executable[Callable]],
max_concurrency: int | None,
completion_config,
top_level_sub_type: OperationSubType,
iteration_sub_type: OperationSubType,
name_prefix: str,
serdes: SerDes | None,
summary_generator: SummaryGenerator | None = None,
item_serdes: SerDes | None = None,
):
super().__init__(
executables=executables,
max_concurrency=max_concurrency,
completion_config=completion_config,
sub_type_top=top_level_sub_type,
sub_type_iteration=iteration_sub_type,
name_prefix=name_prefix,
serdes=serdes,
summary_generator=summary_generator,
item_serdes=item_serdes,
)
@classmethod
def from_callables(
cls,
callables: Sequence[Callable],
config: ParallelConfig,
) -> ParallelExecutor:
"""Create ParallelExecutor from a sequence of callables."""
executables: list[Executable[Callable]] = [
Executable(index=i, func=func) for i, func in enumerate(callables)
]
return cls(
executables=executables,
max_concurrency=config.max_concurrency,
completion_config=config.completion_config,
top_level_sub_type=OperationSubType.PARALLEL,
iteration_sub_type=OperationSubType.PARALLEL_BRANCH,
name_prefix="parallel-branch-",
serdes=config.serdes,
summary_generator=config.summary_generator,
item_serdes=config.item_serdes,
)
def execute_item(self, child_context, executable: Executable[Callable]) -> R: # noqa: PLR6301
logger.debug("🔀 Processing parallel branch: %s", executable.index)
result: R = executable.func(child_context)
logger.debug("✅ Processed parallel branch: %s", executable.index)
return result
def parallel_handler(
callables: Sequence[Callable],
config: ParallelConfig | None,
execution_state: ExecutionState,
parallel_context: DurableContext,
operation_identifier: OperationIdentifier,
) -> BatchResult[R]:
"""Execute multiple operations in parallel."""
# Summary Generator Construction (matches TypeScript implementation):
# Construct the summary generator at the handler level, just like TypeScript does in parallel-handler.ts.
# This matches the pattern where handlers are responsible for configuring operation-specific behavior.
#
# See TypeScript reference: aws-durable-execution-sdk-js/src/handlers/parallel-handler/parallel-handler.ts (~line 112)
executor = ParallelExecutor.from_callables(
callables,
config or ParallelConfig(summary_generator=ParallelSummaryGenerator()),
)
checkpoint = execution_state.get_checkpoint_result(
operation_identifier.operation_id
)
if checkpoint.is_succeeded():
return executor.replay(execution_state, parallel_context)
return executor.execute(execution_state, executor_context=parallel_context)
class ParallelSummaryGenerator:
def __call__(self, result: BatchResult) -> str:
fields = {
"totalCount": result.total_count,
"successCount": result.success_count,
"failureCount": result.failure_count,
"completionReason": result.completion_reason.value,
"status": result.status.value,
"startedCount": result.started_count,
"type": "ParallelResult",
}
return json.dumps(fields)