Skip to content

Commit 413b5f8

Browse files
committed
refactor(map): Fix null results, abstract completion logic
1 parent 3dc0c2a commit 413b5f8

14 files changed

Lines changed: 206 additions & 179 deletions

File tree

docs/core/map.md

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -41,14 +41,14 @@ MapResult<OrderResult> result = future.get();
4141
| Method | Description |
4242
|--------|-------------|
4343
| `getResult(i)` | Result at index `i`, or `null` if that item failed |
44-
| `getError(i)` | `ErrorObject` at index `i`, or `null` if that item succeeded |
44+
| `getError(i)` | `MapError` at index `i`, or `null` if that item succeeded |
4545
| `getItem(i)` | The `MapResultItem` at index `i` with status, result, and error |
4646
| `allSucceeded()` | `true` if every item succeeded |
4747
| `size()` | Number of items in the result |
4848
| `items()` | All result items as an unmodifiable list |
4949
| `results()` | All results as an unmodifiable list (nulls for failed items) |
5050
| `succeeded()` | Only the non-null (successful) results |
51-
| `failed()` | Only the non-null `ErrorObject`s |
51+
| `failed()` | Only the non-null `MapError`s |
5252
| `completionReason()` | Why the operation completed (`ALL_COMPLETED`, `MIN_SUCCESSFUL_REACHED`, `FAILURE_TOLERANCE_EXCEEDED`) |
5353

5454
### MapResultItem
@@ -59,7 +59,17 @@ Each `MapResultItem<T>` contains:
5959
|-------|-------------|
6060
| `status()` | `SUCCEEDED`, `FAILED`, or `NOT_STARTED` |
6161
| `result()` | The result value, or `null` if failed/not started |
62-
| `error()` | The error details as `ErrorObject`, or `null` if succeeded/not started |
62+
| `error()` | The error details as `MapError`, or `null` if succeeded/not started |
63+
64+
### MapError
65+
66+
Failed items store error details as `MapError`, a serializable record that survives checkpoint-and-replay cycles:
67+
68+
| Field | Description |
69+
|-------|-------------|
70+
| `errorType()` | Fully qualified exception class name (e.g., `java.lang.RuntimeException`) |
71+
| `errorMessage()` | The exception message |
72+
| `stackTrace()` | Stack trace frames as a list of strings, or `null` |
6373

6474
### Error Isolation
6575

@@ -87,9 +97,11 @@ var config = MapConfig.builder()
8797
.build();
8898

8999
var result = ctx.map("process-orders", items, OrderResult.class,
90-
(orderId, index, childCtx) -> process(childCtx, orderId), config);
100+
(orderId, index, childCtx) -> process(orderId, childCtx), config);
91101
```
92102

103+
`MapConfig` also supports a custom `serDes` for serialization via `.serDes(customSerDes)`. By default, the context's serializer is used. `maxConcurrency` must be at least 1 if set.
104+
93105
#### Concurrency Limiting
94106

95107
`maxConcurrency` controls how many items execute concurrently. When set, items beyond the limit are queued and started as earlier items complete. Default is `null` (unlimited).
@@ -158,7 +170,7 @@ The function passed to `map()` is a `MapFunction<I, O>`:
158170
```java
159171
@FunctionalInterface
160172
public interface MapFunction<I, O> {
161-
O apply(I item, int index, DurableContext context) throws Exception;
173+
O apply(I item, int index, DurableContext context);
162174
}
163175
```
164176

docs/design.md

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -199,8 +199,12 @@ context.step("name", Type.class, supplier,
199199
│ - StepOperation<T> │ │ - Queues requests │
200200
│ - WaitOperation │ │ - Batches API calls (750KB) │
201201
│ - WaitForConditionOperation │ │ │
202-
│ - execute() / get() │ │ - Notifies via callback │
203-
└──────────────────────────────┘ └──────────────────────────────┘
202+
│ - ConcurrencyOperation<T> │ │ - Notifies via callback │
203+
│ - MapOperation<I,O> │ └──────────────────────────────┘
204+
│ - ParallelOperation │
205+
│ - ChildContextOperation<T> │
206+
│ - execute() / get() │
207+
└──────────────────────────────┘
204208
205209
206210
┌──────────────────────────────┐
@@ -235,7 +239,11 @@ software.amazon.lambda.durable
235239
│ ├── InvokeOperation<T> # Invoke logic
236240
│ ├── CallbackOperation<T> # Callback logic
237241
│ ├── WaitOperation # Wait logic
238-
│ └── WaitForConditionOperation<T> # Polling condition logic
242+
│ ├── WaitForConditionOperation<T> # Polling condition logic
243+
│ ├── ConcurrencyOperation<T> # Shared base for map/parallel
244+
│ ├── MapOperation<I,O> # Map operation logic
245+
│ ├── ParallelOperation # Parallel operation logic
246+
│ └── ChildContextOperation<T> # Per-item child context execution
239247
240248
├── logging/
241249
│ ├── DurableLogger # Context-aware logger wrapper (MDC-based)

sdk-integration-tests/src/test/java/software/amazon/lambda/durable/MapInputValidationTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
import java.util.HashSet;
88
import java.util.List;
99
import org.junit.jupiter.api.Test;
10-
import software.amazon.lambda.durable.model.CompletionReason;
10+
import software.amazon.lambda.durable.model.ConcurrencyCompletionStatus;
1111
import software.amazon.lambda.durable.model.ExecutionStatus;
1212
import software.amazon.lambda.durable.testing.LocalDurableTestRunner;
1313

@@ -54,7 +54,7 @@ void mapWithEmptyCollection_returnsEmptyMapResult() {
5454

5555
assertEquals(0, result.size());
5656
assertTrue(result.allSucceeded());
57-
assertEquals(CompletionReason.ALL_COMPLETED, result.completionReason());
57+
assertEquals(ConcurrencyCompletionStatus.ALL_COMPLETED, result.completionReason());
5858

5959
return "done";
6060
});

sdk-integration-tests/src/test/java/software/amazon/lambda/durable/MapIntegrationTest.java

Lines changed: 66 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
import java.util.List;
1010
import java.util.concurrent.atomic.AtomicInteger;
1111
import org.junit.jupiter.api.Test;
12-
import software.amazon.lambda.durable.model.CompletionReason;
12+
import software.amazon.lambda.durable.model.ConcurrencyCompletionStatus;
1313
import software.amazon.lambda.durable.model.ExecutionStatus;
1414
import software.amazon.lambda.durable.model.MapResultItem;
1515
import software.amazon.lambda.durable.testing.LocalDurableTestRunner;
@@ -81,7 +81,7 @@ void testMapPartialFailure_failedItemDoesNotPreventOthers() {
8181
assertNull(result.getError(0));
8282
assertNull(result.getError(2));
8383

84-
assertEquals(CompletionReason.ALL_COMPLETED, result.completionReason());
84+
assertEquals(ConcurrencyCompletionStatus.ALL_COMPLETED, result.completionReason());
8585

8686
return "done";
8787
});
@@ -252,7 +252,7 @@ void testMapWithToleratedFailureCount_earlyTermination() {
252252
},
253253
config);
254254

255-
assertEquals(CompletionReason.FAILURE_TOLERANCE_EXCEEDED, result.completionReason());
255+
assertEquals(ConcurrencyCompletionStatus.FAILURE_TOLERANCE_EXCEEDED, result.completionReason());
256256
assertFalse(result.allSucceeded());
257257
assertEquals(5, result.size());
258258
assertEquals("OK", result.getResult(0));
@@ -279,7 +279,7 @@ void testMapWithMinSuccessful_earlyTermination() {
279279
var result = context.map(
280280
"min-successful", items, String.class, (item, index, ctx) -> item.toUpperCase(), config);
281281

282-
assertEquals(CompletionReason.MIN_SUCCESSFUL_REACHED, result.completionReason());
282+
assertEquals(ConcurrencyCompletionStatus.MIN_SUCCESSFUL_REACHED, result.completionReason());
283283
assertEquals(5, result.size());
284284
assertEquals("A", result.getResult(0));
285285
assertEquals("B", result.getResult(1));
@@ -419,7 +419,7 @@ void testMapUnlimitedConcurrencyWithToleratedFailureCount() {
419419
},
420420
config);
421421

422-
assertEquals(CompletionReason.FAILURE_TOLERANCE_EXCEEDED, result.completionReason());
422+
assertEquals(ConcurrencyCompletionStatus.FAILURE_TOLERANCE_EXCEEDED, result.completionReason());
423423
assertFalse(result.allSucceeded());
424424
return "done";
425425
});
@@ -442,7 +442,7 @@ void testMapReplayWithFailedBranches() {
442442
return item.toUpperCase();
443443
});
444444

445-
// Errors survive replay since they are stored as ErrorObject (not raw Throwable)
445+
// Errors survive replay since they are stored as MapError (not raw Throwable)
446446
assertEquals("OK", result.getResult(0));
447447
assertEquals("OK2", result.getResult(2));
448448
return "done";
@@ -531,7 +531,7 @@ void testMapWithAllSuccessfulCompletionConfig_stopsOnFirstFailure() {
531531
},
532532
config);
533533

534-
assertEquals(CompletionReason.FAILURE_TOLERANCE_EXCEEDED, result.completionReason());
534+
assertEquals(ConcurrencyCompletionStatus.FAILURE_TOLERANCE_EXCEEDED, result.completionReason());
535535
assertEquals("OK1", result.getResult(0));
536536
assertNotNull(result.getError(1));
537537
// Items after the failure should be NOT_STARTED
@@ -622,14 +622,51 @@ void testMapWithToleratedFailurePercentage() {
622622
},
623623
config);
624624

625-
assertEquals(CompletionReason.FAILURE_TOLERANCE_EXCEEDED, result.completionReason());
625+
assertEquals(ConcurrencyCompletionStatus.FAILURE_TOLERANCE_EXCEEDED, result.completionReason());
626626
return "done";
627627
});
628628

629629
var result = runner.runUntilComplete("test");
630630
assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus());
631631
}
632632

633+
@Test
634+
void testMapWithToleratedFailurePercentage_replay() {
635+
var executionCount = new AtomicInteger(0);
636+
637+
var runner = LocalDurableTestRunner.create(String.class, (input, context) -> {
638+
var items = List.of("ok1", "FAIL1", "ok2", "FAIL2", "ok3", "FAIL3", "ok4");
639+
var config = MapConfig.builder()
640+
.completionConfig(CompletionConfig.toleratedFailurePercentage(0.3))
641+
.build();
642+
var result = context.map(
643+
"pct-fail-replay",
644+
items,
645+
String.class,
646+
(item, index, ctx) -> {
647+
executionCount.incrementAndGet();
648+
if (item.startsWith("FAIL")) {
649+
throw new RuntimeException("failed: " + item);
650+
}
651+
return item.toUpperCase();
652+
},
653+
config);
654+
655+
assertEquals(ConcurrencyCompletionStatus.FAILURE_TOLERANCE_EXCEEDED, result.completionReason());
656+
return "done";
657+
});
658+
659+
var result1 = runner.runUntilComplete("test");
660+
assertEquals(ExecutionStatus.SUCCEEDED, result1.getStatus());
661+
var firstRunCount = executionCount.get();
662+
663+
// Replay — with unlimited concurrency, children replay simultaneously.
664+
// Verify completionReason is consistent and no re-execution occurs.
665+
var result2 = runner.run("test");
666+
assertEquals(ExecutionStatus.SUCCEEDED, result2.getStatus());
667+
assertEquals(firstRunCount, executionCount.get(), "Map functions should not re-execute on replay");
668+
}
669+
633670
@Test
634671
void testMapAsyncWithWaitInsideBranches() {
635672
var runner = LocalDurableTestRunner.create(String.class, (input, context) -> {
@@ -747,7 +784,7 @@ void testMapWithMinSuccessful_replay() {
747784
},
748785
config);
749786

750-
assertEquals(CompletionReason.MIN_SUCCESSFUL_REACHED, result.completionReason());
787+
assertEquals(ConcurrencyCompletionStatus.MIN_SUCCESSFUL_REACHED, result.completionReason());
751788
assertEquals("A", result.getResult(0));
752789
assertEquals("B", result.getResult(1));
753790
return "done";
@@ -826,4 +863,24 @@ void testMapWithLargeResult_replayChildren() {
826863
assertEquals(ExecutionStatus.SUCCEEDED, result2.getStatus());
827864
assertEquals(firstRunCount, executionCount.get(), "Map functions should not re-execute on replay");
828865
}
866+
867+
@Test
868+
void testMapWithNullResults() {
869+
var runner = LocalDurableTestRunner.create(String.class, (input, context) -> {
870+
var items = List.of("a", "b", "c");
871+
var result = context.map("null-map", items, String.class, (item, index, ctx) -> null);
872+
873+
assertTrue(result.allSucceeded());
874+
assertEquals(3, result.size());
875+
for (int i = 0; i < result.size(); i++) {
876+
assertEquals(MapResultItem.Status.SUCCEEDED, result.getItem(i).status());
877+
assertNull(result.getResult(i));
878+
assertNull(result.getError(i));
879+
}
880+
return "done";
881+
});
882+
883+
var result = runner.runUntilComplete("test");
884+
assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus());
885+
}
829886
}

sdk/src/main/java/software/amazon/lambda/durable/model/CompletionReason.java

Lines changed: 0 additions & 10 deletions
This file was deleted.

sdk/src/main/java/software/amazon/lambda/durable/model/ConcurrencyCompletionStatus.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
public enum ConcurrencyCompletionStatus {
66
ALL_COMPLETED,
77
MIN_SUCCESSFUL_REACHED,
8-
MIN_SUCCESSFUL_NOT_REACHED,
98
FAILURE_TOLERANCE_EXCEEDED;
109

1110
@Override

sdk/src/main/java/software/amazon/lambda/durable/model/MapResult.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
*
1111
* <p>Holds ordered results from a map operation. Each index corresponds to the input item at the same position. Each
1212
* item is represented as a {@link MapResultItem} containing its status, result, and error. Includes the
13-
* {@link CompletionReason} indicating why the operation completed.
13+
* {@link ConcurrencyCompletionStatus} indicating why the operation completed.
1414
*
1515
* <p>Errors are stored as {@link MapError} rather than raw Throwable, so they survive serialization across
1616
* checkpoint-and-replay cycles without requiring AWS SDK-specific Jackson modules.
@@ -19,17 +19,17 @@
1919
* @param completionReason why the operation completed
2020
* @param <T> the result type of each item
2121
*/
22-
public record MapResult<T>(List<MapResultItem<T>> items, CompletionReason completionReason) {
22+
public record MapResult<T>(List<MapResultItem<T>> items, ConcurrencyCompletionStatus completionReason) {
2323

2424
/** Compact constructor that applies defensive copy and defaults. */
2525
public MapResult {
2626
items = items != null ? List.copyOf(items) : Collections.emptyList();
27-
completionReason = completionReason != null ? completionReason : CompletionReason.ALL_COMPLETED;
27+
completionReason = completionReason != null ? completionReason : ConcurrencyCompletionStatus.ALL_COMPLETED;
2828
}
2929

3030
/** Returns an empty MapResult with no items. */
3131
public static <T> MapResult<T> empty() {
32-
return new MapResult<>(Collections.emptyList(), CompletionReason.ALL_COMPLETED);
32+
return new MapResult<>(Collections.emptyList(), ConcurrencyCompletionStatus.ALL_COMPLETED);
3333
}
3434

3535
/** Returns the result item at the given index. */

sdk/src/main/java/software/amazon/lambda/durable/operation/ChildContextOperation.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -175,9 +175,8 @@ private void checkpointSuccess(T result) {
175175
}
176176

177177
var serialized = serializeResult(result);
178-
var serializedBytes = serialized.getBytes(StandardCharsets.UTF_8);
179178

180-
if (serializedBytes.length < LARGE_RESULT_THRESHOLD) {
179+
if (serialized == null || serialized.getBytes(StandardCharsets.UTF_8).length < LARGE_RESULT_THRESHOLD) {
181180
sendOperationUpdate(
182181
OperationUpdate.builder().action(OperationAction.SUCCEED).payload(serialized));
183182
} else {

0 commit comments

Comments
 (0)