Skip to content

Commit b625cb8

Browse files
feat(map): Fix map replay, wait-inside-map, and concurrency race conditions (#229)
* feat(map): Fix map replay, wait-inside-map, and concurrency race conditions
1 parent 67fedbc commit b625cb8

29 files changed

Lines changed: 1048 additions & 970 deletions

File tree

AGENTS.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -208,7 +208,7 @@ void testAgainstRealLambda() {
208208
| `StepOperation` | Executes steps with retry logic |
209209
| `WaitOperation` | Handles wait checkpointing |
210210
| `MapOperation` | Applies a function across items concurrently via child contexts |
211-
| `BaseConcurrentOperation` | Shared base for map/parallel: concurrency limiting, completion evaluation |
211+
| `ConcurrencyOperation` | Shared base for map/parallel: concurrency limiting, completion evaluation |
212212

213213
## Common Tasks
214214

docs/core/map.md

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -41,14 +41,14 @@ MapResult<OrderResult> result = future.get();
4141
| Method | Description |
4242
|--------|-------------|
4343
| `getResult(i)` | Result at index `i`, or `null` if that item failed |
44-
| `getError(i)` | Error at index `i`, or `null` if that item succeeded |
44+
| `getError(i)` | `ErrorObject` at index `i`, or `null` if that item succeeded |
4545
| `getItem(i)` | The `MapResultItem` at index `i` with status, result, and error |
4646
| `allSucceeded()` | `true` if every item succeeded |
4747
| `size()` | Number of items in the result |
4848
| `items()` | All result items as an unmodifiable list |
4949
| `results()` | All results as an unmodifiable list (nulls for failed items) |
5050
| `succeeded()` | Only the non-null (successful) results |
51-
| `failed()` | Only the non-null errors |
51+
| `failed()` | Only the non-null `ErrorObject`s |
5252
| `completionReason()` | Why the operation completed (`ALL_COMPLETED`, `MIN_SUCCESSFUL_REACHED`, `FAILURE_TOLERANCE_EXCEEDED`) |
5353

5454
### MapResultItem
@@ -57,9 +57,9 @@ Each `MapResultItem<T>` contains:
5757

5858
| Field | Description |
5959
|-------|-------------|
60-
| `status()` | `SUCCEEDED`, `FAILED`, or `null` (not started) |
60+
| `status()` | `SUCCEEDED`, `FAILED`, or `NOT_STARTED` |
6161
| `result()` | The result value, or `null` if failed/not started |
62-
| `error()` | The error, or `null` if succeeded/not started |
62+
| `error()` | The error details as `ErrorObject`, or `null` if succeeded/not started |
6363

6464
### Error Isolation
6565

@@ -126,7 +126,7 @@ var result = ctx.map("find-two", items, String.class, fn, config);
126126
assertEquals(CompletionReason.MIN_SUCCESSFUL_REACHED, result.completionReason());
127127
```
128128

129-
When early termination triggers, items that were never started have `null` for both result and error in the `MapResult`.
129+
When early termination triggers, items that were never started have `NOT_STARTED` status with `null` for both result and error in the `MapResult`.
130130

131131
### Checkpoint-and-Replay
132132

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
// SPDX-License-Identifier: Apache-2.0
3+
package software.amazon.lambda.durable.examples;
4+
5+
import java.time.Duration;
6+
import java.util.List;
7+
import java.util.stream.Collectors;
8+
import software.amazon.lambda.durable.CompletionConfig;
9+
import software.amazon.lambda.durable.DurableContext;
10+
import software.amazon.lambda.durable.DurableHandler;
11+
import software.amazon.lambda.durable.MapConfig;
12+
13+
/**
14+
* Example demonstrating advanced map features: wait operations inside branches, error handling, and early termination.
15+
*
16+
* <ol>
17+
* <li>Concurrent map with step + wait + step inside each branch — simulates multi-stage order processing with a
18+
* cooldown between stages
19+
* <li>Early termination with {@code minSuccessful(2)} — finds 2 healthy servers then stops
20+
* </ol>
21+
*/
22+
public class ComplexMapExample extends DurableHandler<GreetingRequest, String> {
23+
24+
@Override
25+
public String handleRequest(GreetingRequest input, DurableContext context) {
26+
var name = input.getName();
27+
context.getLogger().info("Starting complex map example for {}", name);
28+
29+
// Part 1: Concurrent map with step + wait inside each branch
30+
var orderIds = List.of("order-1", "order-2", "order-3");
31+
32+
var orderResult = context.map("process-orders", orderIds, String.class, (orderId, index, ctx) -> {
33+
// Step 1: validate the order
34+
var validated = ctx.step("validate-" + index, String.class, stepCtx -> "validated:" + orderId + ":" + name);
35+
36+
// Wait between stages (simulates a cooldown or external dependency)
37+
ctx.wait("cooldown-" + index, Duration.ofSeconds(1));
38+
39+
// Step 2: finalize the order
40+
return ctx.step("finalize-" + index, String.class, stepCtx -> "done:" + validated);
41+
});
42+
43+
var orderSummary = String.join(", ", orderResult.results());
44+
45+
// Part 2: Early termination — find 2 healthy servers then stop
46+
var servers = List.of("server-1", "server-2", "server-3", "server-4", "server-5");
47+
var earlyTermConfig = MapConfig.builder()
48+
.completionConfig(CompletionConfig.minSuccessful(2))
49+
.build();
50+
51+
var serverResult = context.map(
52+
"find-healthy-servers",
53+
servers,
54+
String.class,
55+
(server, index, ctx) -> ctx.step("health-check-" + index, String.class, stepCtx -> server + ":healthy"),
56+
earlyTermConfig);
57+
58+
var healthyServers = serverResult.succeeded().stream().collect(Collectors.joining(", "));
59+
60+
return String.format(
61+
"orders=[%s] | servers=[%s] reason=%s", orderSummary, healthyServers, serverResult.completionReason());
62+
}
63+
}

examples/src/main/java/software/amazon/lambda/durable/examples/MapConfigExample.java

Lines changed: 0 additions & 73 deletions
This file was deleted.

examples/src/main/java/software/amazon/lambda/durable/examples/MapErrorHandlingExample.java

Lines changed: 0 additions & 78 deletions
This file was deleted.

examples/src/test/java/software/amazon/lambda/durable/examples/CloudBasedIntegrationTest.java

Lines changed: 8 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -571,39 +571,21 @@ void testSimpleMapExample() {
571571
}
572572

573573
@Test
574-
void testMapErrorHandlingExample() {
575-
var runner = CloudDurableTestRunner.create(
576-
arn("map-error-handling-example"), GreetingRequest.class, String.class, lambdaClient);
577-
var result = runner.run(new GreetingRequest("Alice"));
578-
579-
assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus());
580-
var output = result.getResult(String.class);
581-
assertNotNull(output);
582-
583-
// 3 of 5 orders succeed, 2 fail
584-
assertTrue(output.contains("succeeded=3"));
585-
assertTrue(output.contains("failed=2"));
586-
assertTrue(output.contains("Processed order-1 for Alice"));
587-
assertTrue(output.contains("Processed order-3 for Alice"));
588-
assertTrue(output.contains("Processed order-5 for Alice"));
589-
}
590-
591-
@Test
592-
void testMapConfigExample() {
593-
var runner = CloudDurableTestRunner.create(
594-
arn("map-config-example"), GreetingRequest.class, String.class, lambdaClient);
574+
void testComplexMapExample() {
575+
var runner = CloudDurableTestRunner.create(arn("complex-map-example"), GreetingRequest.class, String.class);
595576
var result = runner.run(new GreetingRequest("Alice"));
596577

597578
assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus());
598579
var output = result.getResult(String.class);
599580
assertNotNull(output);
600581

601-
// Sequential part: all 3 items processed
602-
assertTrue(output.contains("ALPHA-Alice"));
603-
assertTrue(output.contains("BETA-Alice"));
604-
assertTrue(output.contains("GAMMA-Alice"));
582+
// Part 1: Concurrent order processing with step + wait + step
583+
assertTrue(output.contains("done:validated:order-1:Alice"));
584+
assertTrue(output.contains("done:validated:order-2:Alice"));
585+
assertTrue(output.contains("done:validated:order-3:Alice"));
605586

606-
// Early termination part
587+
// Part 2: Early termination — find 2 healthy servers then stop
588+
assertTrue(output.contains("healthy"));
607589
assertTrue(output.contains("reason=MIN_SUCCESSFUL_REACHED"));
608590
}
609591
}
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
// SPDX-License-Identifier: Apache-2.0
3+
package software.amazon.lambda.durable.examples;
4+
5+
import static org.junit.jupiter.api.Assertions.*;
6+
7+
import org.junit.jupiter.api.Test;
8+
import software.amazon.lambda.durable.model.ExecutionStatus;
9+
import software.amazon.lambda.durable.testing.LocalDurableTestRunner;
10+
11+
class ComplexMapExampleTest {
12+
13+
@Test
14+
void testComplexMapExample() {
15+
var handler = new ComplexMapExample();
16+
var runner = LocalDurableTestRunner.create(GreetingRequest.class, handler);
17+
18+
var result = runner.runUntilComplete(new GreetingRequest("Alice"));
19+
20+
assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus());
21+
var output = result.getResult(String.class);
22+
23+
// Part 1: all 3 orders processed with step + wait + step
24+
assertTrue(output.contains("done:validated:order-1:Alice"));
25+
assertTrue(output.contains("done:validated:order-2:Alice"));
26+
assertTrue(output.contains("done:validated:order-3:Alice"));
27+
28+
// Part 2: early termination after 2 healthy servers
29+
assertTrue(output.contains("reason=MIN_SUCCESSFUL_REACHED"));
30+
assertTrue(output.contains("healthy"));
31+
}
32+
33+
@Test
34+
void testReplay() {
35+
var handler = new ComplexMapExample();
36+
var runner = LocalDurableTestRunner.create(GreetingRequest.class, handler);
37+
38+
var input = new GreetingRequest("Bob");
39+
var result1 = runner.runUntilComplete(input);
40+
assertEquals(ExecutionStatus.SUCCEEDED, result1.getStatus());
41+
42+
// Replay — should use cached results.
43+
// Structural assertion because the first map has wait() inside branches with unlimited
44+
// concurrency, which can cause non-deterministic thread scheduling across invocations.
45+
var result2 = runner.runUntilComplete(input);
46+
assertEquals(ExecutionStatus.SUCCEEDED, result2.getStatus());
47+
var output = result2.getResult(String.class);
48+
assertTrue(output.contains("done:validated:order-1:Bob"));
49+
assertTrue(output.contains("done:validated:order-2:Bob"));
50+
assertTrue(output.contains("done:validated:order-3:Bob"));
51+
assertTrue(output.contains("reason=MIN_SUCCESSFUL_REACHED"));
52+
assertTrue(output.contains("healthy"));
53+
}
54+
}

0 commit comments

Comments
 (0)