-
Notifications
You must be signed in to change notification settings - Fork 17
Expand file tree
/
Copy pathmap.py
More file actions
124 lines (107 loc) · 4.24 KB
/
map.py
File metadata and controls
124 lines (107 loc) · 4.24 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
"""Implementation for Durable Map operation."""
from __future__ import annotations
import json
import logging
from collections.abc import Callable, Sequence
from typing import TYPE_CHECKING, Generic, TypeVar
from aws_durable_execution_sdk_python.concurrency import (
BatchResult,
ConcurrentExecutor,
Executable,
)
from aws_durable_execution_sdk_python.config import MapConfig
from aws_durable_execution_sdk_python.lambda_service import OperationSubType
if TYPE_CHECKING:
from aws_durable_execution_sdk_python.config import ChildConfig
from aws_durable_execution_sdk_python.serdes import SerDes
from aws_durable_execution_sdk_python.state import ExecutionState
from aws_durable_execution_sdk_python.types import DurableContext, SummaryGenerator
logger = logging.getLogger(__name__)
# Input item type
T = TypeVar("T")
# Result type
R = TypeVar("R")
class MapExecutor(Generic[T, R], ConcurrentExecutor[Callable, R]): # noqa: PYI059
def __init__(
self,
executables: list[Executable[Callable]],
items: Sequence[T],
max_concurrency: int | None,
completion_config,
top_level_sub_type: OperationSubType,
iteration_sub_type: OperationSubType,
name_prefix: str,
serdes: SerDes | None,
summary_generator: SummaryGenerator | None = None,
):
super().__init__(
executables=executables,
max_concurrency=max_concurrency,
completion_config=completion_config,
sub_type_top=top_level_sub_type,
sub_type_iteration=iteration_sub_type,
name_prefix=name_prefix,
serdes=serdes,
summary_generator=summary_generator,
)
self.items = items
@classmethod
def from_items(
cls,
items: Sequence[T],
func: Callable,
config: MapConfig,
) -> MapExecutor[T, R]:
"""Create MapExecutor from items and a callable."""
executables: list[Executable[Callable]] = [
Executable(index=i, func=func) for i in range(len(items))
]
return cls(
executables=executables,
items=items,
max_concurrency=config.max_concurrency,
completion_config=config.completion_config,
top_level_sub_type=OperationSubType.MAP,
iteration_sub_type=OperationSubType.MAP_ITERATION,
name_prefix="map-item-",
serdes=config.serdes,
summary_generator=config.summary_generator,
)
def execute_item(self, child_context, executable: Executable[Callable]) -> R:
logger.debug("🗺️ Processing map item: %s", executable.index)
item = self.items[executable.index]
result: R = executable.func(child_context, item, executable.index, self.items)
logger.debug("✅ Processed map item: %s", executable.index)
return result
def map_handler(
items: Sequence[T],
func: Callable,
config: MapConfig | None,
execution_state: ExecutionState,
run_in_child_context: Callable[
[Callable[[DurableContext], R], str | None, ChildConfig | None], R
],
) -> BatchResult[R]:
"""Execute a callable for each item in parallel."""
# Summary Generator Construction (matches TypeScript implementation):
# Construct the summary generator at the handler level, just like TypeScript does in map-handler.ts.
# This matches the pattern where handlers are responsible for configuring operation-specific behavior.
#
# See TypeScript reference: aws-durable-execution-sdk-js/src/handlers/map-handler/map-handler.ts (~line 79)
executor: MapExecutor[T, R] = MapExecutor.from_items(
items=items,
func=func,
config=config or MapConfig(summary_generator=MapSummaryGenerator()),
)
return executor.execute(execution_state, run_in_child_context)
class MapSummaryGenerator:
def __call__(self, result: BatchResult) -> str:
fields = {
"totalCount": result.total_count,
"successCount": result.success_count,
"failureCount": result.failure_count,
"completionReason": result.completion_reason.value,
"status": result.status.value,
"type": "MapResult",
}
return json.dumps(fields)