diff --git a/AGENTS.md b/AGENTS.md index a9d0fdf8c..371259e50 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -208,7 +208,7 @@ void testAgainstRealLambda() { | `StepOperation` | Executes steps with retry logic | | `WaitOperation` | Handles wait checkpointing | | `MapOperation` | Applies a function across items concurrently via child contexts | -| `BaseConcurrentOperation` | Shared base for map/parallel: concurrency limiting, completion evaluation | +| `ConcurrencyOperation` | Shared base for map/parallel: concurrency limiting, completion evaluation | ## Common Tasks diff --git a/docs/core/map.md b/docs/core/map.md index d26bd3643..0037e740b 100644 --- a/docs/core/map.md +++ b/docs/core/map.md @@ -41,14 +41,14 @@ MapResult result = future.get(); | Method | Description | |--------|-------------| | `getResult(i)` | Result at index `i`, or `null` if that item failed | -| `getError(i)` | Error at index `i`, or `null` if that item succeeded | +| `getError(i)` | `ErrorObject` at index `i`, or `null` if that item succeeded | | `getItem(i)` | The `MapResultItem` at index `i` with status, result, and error | | `allSucceeded()` | `true` if every item succeeded | | `size()` | Number of items in the result | | `items()` | All result items as an unmodifiable list | | `results()` | All results as an unmodifiable list (nulls for failed items) | | `succeeded()` | Only the non-null (successful) results | -| `failed()` | Only the non-null errors | +| `failed()` | Only the non-null `ErrorObject`s | | `completionReason()` | Why the operation completed (`ALL_COMPLETED`, `MIN_SUCCESSFUL_REACHED`, `FAILURE_TOLERANCE_EXCEEDED`) | ### MapResultItem @@ -57,9 +57,9 @@ Each `MapResultItem` contains: | Field | Description | |-------|-------------| -| `status()` | `SUCCEEDED`, `FAILED`, or `null` (not started) | +| `status()` | `SUCCEEDED`, `FAILED`, or `NOT_STARTED` | | `result()` | The result value, or `null` if failed/not started | -| `error()` | The error, or `null` if succeeded/not started | +| `error()` | The error details as `ErrorObject`, or `null` if succeeded/not started | ### Error Isolation @@ -126,7 +126,7 @@ var result = ctx.map("find-two", items, String.class, fn, config); assertEquals(CompletionReason.MIN_SUCCESSFUL_REACHED, result.completionReason()); ``` -When early termination triggers, items that were never started have `null` for both result and error in the `MapResult`. +When early termination triggers, items that were never started have `NOT_STARTED` status with `null` for both result and error in the `MapResult`. ### Checkpoint-and-Replay diff --git a/examples/src/main/java/software/amazon/lambda/durable/examples/ComplexMapExample.java b/examples/src/main/java/software/amazon/lambda/durable/examples/ComplexMapExample.java new file mode 100644 index 000000000..e7ba2726d --- /dev/null +++ b/examples/src/main/java/software/amazon/lambda/durable/examples/ComplexMapExample.java @@ -0,0 +1,63 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 +package software.amazon.lambda.durable.examples; + +import java.time.Duration; +import java.util.List; +import java.util.stream.Collectors; +import software.amazon.lambda.durable.CompletionConfig; +import software.amazon.lambda.durable.DurableContext; +import software.amazon.lambda.durable.DurableHandler; +import software.amazon.lambda.durable.MapConfig; + +/** + * Example demonstrating advanced map features: wait operations inside branches, error handling, and early termination. + * + *
    + *
  1. Concurrent map with step + wait + step inside each branch — simulates multi-stage order processing with a + * cooldown between stages + *
  2. Early termination with {@code minSuccessful(2)} — finds 2 healthy servers then stops + *
+ */ +public class ComplexMapExample extends DurableHandler { + + @Override + public String handleRequest(GreetingRequest input, DurableContext context) { + var name = input.getName(); + context.getLogger().info("Starting complex map example for {}", name); + + // Part 1: Concurrent map with step + wait inside each branch + var orderIds = List.of("order-1", "order-2", "order-3"); + + var orderResult = context.map("process-orders", orderIds, String.class, (orderId, index, ctx) -> { + // Step 1: validate the order + var validated = ctx.step("validate-" + index, String.class, stepCtx -> "validated:" + orderId + ":" + name); + + // Wait between stages (simulates a cooldown or external dependency) + ctx.wait("cooldown-" + index, Duration.ofSeconds(1)); + + // Step 2: finalize the order + return ctx.step("finalize-" + index, String.class, stepCtx -> "done:" + validated); + }); + + var orderSummary = String.join(", ", orderResult.results()); + + // Part 2: Early termination — find 2 healthy servers then stop + var servers = List.of("server-1", "server-2", "server-3", "server-4", "server-5"); + var earlyTermConfig = MapConfig.builder() + .completionConfig(CompletionConfig.minSuccessful(2)) + .build(); + + var serverResult = context.map( + "find-healthy-servers", + servers, + String.class, + (server, index, ctx) -> ctx.step("health-check-" + index, String.class, stepCtx -> server + ":healthy"), + earlyTermConfig); + + var healthyServers = serverResult.succeeded().stream().collect(Collectors.joining(", ")); + + return String.format( + "orders=[%s] | servers=[%s] reason=%s", orderSummary, healthyServers, serverResult.completionReason()); + } +} diff --git a/examples/src/main/java/software/amazon/lambda/durable/examples/MapConfigExample.java b/examples/src/main/java/software/amazon/lambda/durable/examples/MapConfigExample.java deleted file mode 100644 index 5ad8edd04..000000000 --- a/examples/src/main/java/software/amazon/lambda/durable/examples/MapConfigExample.java +++ /dev/null @@ -1,73 +0,0 @@ -// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. -// SPDX-License-Identifier: Apache-2.0 -package software.amazon.lambda.durable.examples; - -import java.util.List; -import java.util.stream.Collectors; -import software.amazon.lambda.durable.CompletionConfig; -import software.amazon.lambda.durable.DurableContext; -import software.amazon.lambda.durable.DurableHandler; -import software.amazon.lambda.durable.MapConfig; - -/** - * Example demonstrating MapConfig options: concurrency limiting and completion strategies. - * - *

This handler runs two map operations to showcase different configurations: - * - *

    - *
  1. Sequential processing with {@code maxConcurrency(1)} — items run one at a time - *
  2. Early termination with {@code minSuccessful(2)} — stops after 2 items succeed - *
- */ -public class MapConfigExample extends DurableHandler { - - @Override - public String handleRequest(GreetingRequest input, DurableContext context) { - var name = input.getName(); - context.getLogger().info("Starting map config example for {}", name); - - // Part 1: Sequential execution with maxConcurrency=1 - var items = List.of("alpha", "beta", "gamma"); - var sequentialConfig = MapConfig.builder().maxConcurrency(1).build(); - - var sequentialResult = context.map( - "sequential-processing", - items, - String.class, - (item, index, ctx) -> { - return ctx.step("transform-" + index, String.class, stepCtx -> item.toUpperCase() + "-" + name); - }, - sequentialConfig); - - var sequentialOutput = String.join(", ", sequentialResult.results()); - context.getLogger().info("Sequential result: {}", sequentialOutput); - - // Part 2: Early termination with minSuccessful(2) - var candidates = List.of("server-1", "server-2", "server-3", "server-4", "server-5"); - var earlyTermConfig = MapConfig.builder() - .maxConcurrency(1) - .completionConfig(CompletionConfig.minSuccessful(2)) - .build(); - - var earlyTermResult = context.map( - "find-healthy-servers", - candidates, - String.class, - (server, index, ctx) -> { - return ctx.step("health-check-" + index, String.class, stepCtx -> server + ":healthy"); - }, - earlyTermConfig); - - context.getLogger() - .info( - "Early termination: reason={}, succeeded={}", - earlyTermResult.completionReason(), - earlyTermResult.succeeded().size()); - - var healthyServers = earlyTermResult.succeeded().stream().collect(Collectors.joining(", ")); - - return String.format( - "sequential=[%s] | earlyTerm=[%s] reason=%s", - sequentialOutput, healthyServers, earlyTermResult.completionReason()); - } -} diff --git a/examples/src/main/java/software/amazon/lambda/durable/examples/MapErrorHandlingExample.java b/examples/src/main/java/software/amazon/lambda/durable/examples/MapErrorHandlingExample.java deleted file mode 100644 index f5de51853..000000000 --- a/examples/src/main/java/software/amazon/lambda/durable/examples/MapErrorHandlingExample.java +++ /dev/null @@ -1,78 +0,0 @@ -// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. -// SPDX-License-Identifier: Apache-2.0 -package software.amazon.lambda.durable.examples; - -import java.util.List; -import java.util.stream.Collectors; -import software.amazon.lambda.durable.DurableContext; -import software.amazon.lambda.durable.DurableHandler; -import software.amazon.lambda.durable.StepConfig; -import software.amazon.lambda.durable.retry.RetryStrategies; - -/** - * Example demonstrating error handling with the map operation. - * - *

Shows how individual item failures are isolated and captured in the {@code MapResult}, while other items continue - * to succeed. Demonstrates inspecting partial results using {@code allSucceeded()}, {@code getError()}, - * {@code succeeded()}, and {@code failed()}. - * - *

    - *
  1. Map over a list of order IDs concurrently - *
  2. Some orders intentionally fail to simulate real-world partial failures - *
  3. Inspect the MapResult to handle successes and failures separately - *
- */ -public class MapErrorHandlingExample extends DurableHandler { - - @Override - public String handleRequest(GreetingRequest input, DurableContext context) { - var name = input.getName(); - context.getLogger().info("Starting map error handling example for {}", name); - - var orderIds = List.of("order-1", "order-INVALID", "order-3", "order-ERROR", "order-5"); - - // Map over orders — some will fail, but others continue processing - var result = context.map("process-orders", orderIds, String.class, (orderId, index, ctx) -> { - return ctx.step( - "process-" + index, - String.class, - stepCtx -> { - if (orderId.contains("INVALID")) { - throw new IllegalArgumentException("Invalid order: " + orderId); - } - if (orderId.contains("ERROR")) { - throw new RuntimeException("Processing error for: " + orderId); - } - return "Processed " + orderId + " for " + name; - }, - StepConfig.builder() - .retryStrategy(RetryStrategies.Presets.NO_RETRY) - .build()); - }); - - context.getLogger() - .info( - "Map completed: allSucceeded={}, succeeded={}, failed={}", - result.allSucceeded(), - result.succeeded().size(), - result.failed().size()); - - // Build a summary showing successful results and error messages - var successSummary = result.succeeded().stream().collect(Collectors.joining(", ")); - - var errorSummary = new StringBuilder(); - for (int i = 0; i < result.size(); i++) { - if (result.getError(i) != null) { - errorSummary.append( - String.format("index %d: %s; ", i, result.getError(i).getMessage())); - } - } - - return String.format( - "succeeded=%d, failed=%d | results=[%s] | errors=[%s]", - result.succeeded().size(), - result.failed().size(), - successSummary, - errorSummary.toString().trim()); - } -} diff --git a/examples/src/test/java/software/amazon/lambda/durable/examples/CloudBasedIntegrationTest.java b/examples/src/test/java/software/amazon/lambda/durable/examples/CloudBasedIntegrationTest.java index 56849ffd3..c0e703cc4 100644 --- a/examples/src/test/java/software/amazon/lambda/durable/examples/CloudBasedIntegrationTest.java +++ b/examples/src/test/java/software/amazon/lambda/durable/examples/CloudBasedIntegrationTest.java @@ -571,39 +571,21 @@ void testSimpleMapExample() { } @Test - void testMapErrorHandlingExample() { - var runner = CloudDurableTestRunner.create( - arn("map-error-handling-example"), GreetingRequest.class, String.class, lambdaClient); - var result = runner.run(new GreetingRequest("Alice")); - - assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus()); - var output = result.getResult(String.class); - assertNotNull(output); - - // 3 of 5 orders succeed, 2 fail - assertTrue(output.contains("succeeded=3")); - assertTrue(output.contains("failed=2")); - assertTrue(output.contains("Processed order-1 for Alice")); - assertTrue(output.contains("Processed order-3 for Alice")); - assertTrue(output.contains("Processed order-5 for Alice")); - } - - @Test - void testMapConfigExample() { - var runner = CloudDurableTestRunner.create( - arn("map-config-example"), GreetingRequest.class, String.class, lambdaClient); + void testComplexMapExample() { + var runner = CloudDurableTestRunner.create(arn("complex-map-example"), GreetingRequest.class, String.class); var result = runner.run(new GreetingRequest("Alice")); assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus()); var output = result.getResult(String.class); assertNotNull(output); - // Sequential part: all 3 items processed - assertTrue(output.contains("ALPHA-Alice")); - assertTrue(output.contains("BETA-Alice")); - assertTrue(output.contains("GAMMA-Alice")); + // Part 1: Concurrent order processing with step + wait + step + assertTrue(output.contains("done:validated:order-1:Alice")); + assertTrue(output.contains("done:validated:order-2:Alice")); + assertTrue(output.contains("done:validated:order-3:Alice")); - // Early termination part + // Part 2: Early termination — find 2 healthy servers then stop + assertTrue(output.contains("healthy")); assertTrue(output.contains("reason=MIN_SUCCESSFUL_REACHED")); } } diff --git a/examples/src/test/java/software/amazon/lambda/durable/examples/ComplexMapExampleTest.java b/examples/src/test/java/software/amazon/lambda/durable/examples/ComplexMapExampleTest.java new file mode 100644 index 000000000..1504b35ff --- /dev/null +++ b/examples/src/test/java/software/amazon/lambda/durable/examples/ComplexMapExampleTest.java @@ -0,0 +1,54 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 +package software.amazon.lambda.durable.examples; + +import static org.junit.jupiter.api.Assertions.*; + +import org.junit.jupiter.api.Test; +import software.amazon.lambda.durable.model.ExecutionStatus; +import software.amazon.lambda.durable.testing.LocalDurableTestRunner; + +class ComplexMapExampleTest { + + @Test + void testComplexMapExample() { + var handler = new ComplexMapExample(); + var runner = LocalDurableTestRunner.create(GreetingRequest.class, handler); + + var result = runner.runUntilComplete(new GreetingRequest("Alice")); + + assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus()); + var output = result.getResult(String.class); + + // Part 1: all 3 orders processed with step + wait + step + assertTrue(output.contains("done:validated:order-1:Alice")); + assertTrue(output.contains("done:validated:order-2:Alice")); + assertTrue(output.contains("done:validated:order-3:Alice")); + + // Part 2: early termination after 2 healthy servers + assertTrue(output.contains("reason=MIN_SUCCESSFUL_REACHED")); + assertTrue(output.contains("healthy")); + } + + @Test + void testReplay() { + var handler = new ComplexMapExample(); + var runner = LocalDurableTestRunner.create(GreetingRequest.class, handler); + + var input = new GreetingRequest("Bob"); + var result1 = runner.runUntilComplete(input); + assertEquals(ExecutionStatus.SUCCEEDED, result1.getStatus()); + + // Replay — should use cached results. + // Structural assertion because the first map has wait() inside branches with unlimited + // concurrency, which can cause non-deterministic thread scheduling across invocations. + var result2 = runner.runUntilComplete(input); + assertEquals(ExecutionStatus.SUCCEEDED, result2.getStatus()); + var output = result2.getResult(String.class); + assertTrue(output.contains("done:validated:order-1:Bob")); + assertTrue(output.contains("done:validated:order-2:Bob")); + assertTrue(output.contains("done:validated:order-3:Bob")); + assertTrue(output.contains("reason=MIN_SUCCESSFUL_REACHED")); + assertTrue(output.contains("healthy")); + } +} diff --git a/examples/src/test/java/software/amazon/lambda/durable/examples/MapConfigExampleTest.java b/examples/src/test/java/software/amazon/lambda/durable/examples/MapConfigExampleTest.java deleted file mode 100644 index f2df81798..000000000 --- a/examples/src/test/java/software/amazon/lambda/durable/examples/MapConfigExampleTest.java +++ /dev/null @@ -1,44 +0,0 @@ -// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. -// SPDX-License-Identifier: Apache-2.0 -package software.amazon.lambda.durable.examples; - -import static org.junit.jupiter.api.Assertions.*; - -import org.junit.jupiter.api.Test; -import software.amazon.lambda.durable.model.ExecutionStatus; -import software.amazon.lambda.durable.testing.LocalDurableTestRunner; - -class MapConfigExampleTest { - - @Test - void testSequentialAndEarlyTermination() { - var handler = new MapConfigExample(); - var runner = LocalDurableTestRunner.create(GreetingRequest.class, handler); - - var result = runner.runUntilComplete(new GreetingRequest("Alice")); - - assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus()); - var output = result.getResult(String.class); - - assertTrue(output.contains("ALPHA-Alice")); - assertTrue(output.contains("BETA-Alice")); - assertTrue(output.contains("GAMMA-Alice")); - assertTrue(output.contains("reason=MIN_SUCCESSFUL_REACHED")); - assertTrue(output.contains("server-1:healthy")); - assertTrue(output.contains("server-2:healthy")); - } - - @Test - void testReplay() { - var handler = new MapConfigExample(); - var runner = LocalDurableTestRunner.create(GreetingRequest.class, handler); - - var input = new GreetingRequest("Bob"); - var result1 = runner.runUntilComplete(input); - assertEquals(ExecutionStatus.SUCCEEDED, result1.getStatus()); - - // Replay — should use cached results - var result2 = runner.runUntilComplete(input); - assertEquals(result1.getResult(String.class), result2.getResult(String.class)); - } -} diff --git a/examples/src/test/java/software/amazon/lambda/durable/examples/MapErrorHandlingExampleTest.java b/examples/src/test/java/software/amazon/lambda/durable/examples/MapErrorHandlingExampleTest.java deleted file mode 100644 index b899edd30..000000000 --- a/examples/src/test/java/software/amazon/lambda/durable/examples/MapErrorHandlingExampleTest.java +++ /dev/null @@ -1,51 +0,0 @@ -// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. -// SPDX-License-Identifier: Apache-2.0 -package software.amazon.lambda.durable.examples; - -import static org.junit.jupiter.api.Assertions.*; - -import org.junit.jupiter.api.Test; -import software.amazon.lambda.durable.model.ExecutionStatus; -import software.amazon.lambda.durable.testing.LocalDurableTestRunner; - -class MapErrorHandlingExampleTest { - - @Test - void testPartialFailure() { - var handler = new MapErrorHandlingExample(); - var runner = LocalDurableTestRunner.create(GreetingRequest.class, handler); - - var result = runner.runUntilComplete(new GreetingRequest("Alice")); - - assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus()); - var output = result.getResult(String.class); - - // 3 of 5 orders succeed, 2 fail - assertTrue(output.contains("succeeded=3")); - assertTrue(output.contains("failed=2")); - assertTrue(output.contains("Processed order-1 for Alice")); - assertTrue(output.contains("Processed order-3 for Alice")); - assertTrue(output.contains("Processed order-5 for Alice")); - assertTrue(output.contains("Invalid order: order-INVALID")); - assertTrue(output.contains("Processing error for: order-ERROR")); - } - - @Test - void testReplay() { - var handler = new MapErrorHandlingExample(); - var runner = LocalDurableTestRunner.create(GreetingRequest.class, handler); - - var input = new GreetingRequest("Bob"); - var result1 = runner.runUntilComplete(input); - assertEquals(ExecutionStatus.SUCCEEDED, result1.getStatus()); - - var result2 = runner.runUntilComplete(input); - assertEquals(ExecutionStatus.SUCCEEDED, result2.getStatus()); - var output = result2.getResult(String.class); - // Replay — errors are not preserved in checkpoints (Throwable is not serializable), - // so the replay result will show failed=0 instead of failed=2. - // The successful results should still match. - assertTrue(output.contains("succeeded=3")); - assertTrue(output.contains("Processed order-1 for Bob")); - } -} diff --git a/examples/template.yaml b/examples/template.yaml index 0487db95c..842178440 100644 --- a/examples/template.yaml +++ b/examples/template.yaml @@ -493,16 +493,16 @@ Resources: DockerContext: ../ DockerTag: durable-examples - MapErrorHandlingExampleFunction: + ComplexMapExampleFunction: Type: AWS::Serverless::Function Properties: PackageType: Image FunctionName: !Join - '' - - - 'map-error-handling-example' + - - 'complex-map-example' - !Ref FunctionNameSuffix ImageConfig: - Command: ["software.amazon.lambda.durable.examples.MapErrorHandlingExample::handleRequest"] + Command: ["software.amazon.lambda.durable.examples.ComplexMapExample::handleRequest"] DurableConfig: ExecutionTimeout: 300 RetentionPeriodInDays: 7 @@ -512,32 +512,7 @@ Resources: Action: - lambda:CheckpointDurableExecutions - lambda:GetDurableExecutionState - Resource: !Sub "arn:aws:lambda:${AWS::Region}:${AWS::AccountId}:function:map-error-handling-example${FunctionNameSuffix}" - Metadata: - Dockerfile: !Ref DockerFile - DockerContext: ../ - DockerTag: durable-examples - - MapConfigExampleFunction: - Type: AWS::Serverless::Function - Properties: - PackageType: Image - FunctionName: !Join - - '' - - - 'map-config-example' - - !Ref FunctionNameSuffix - ImageConfig: - Command: ["software.amazon.lambda.durable.examples.MapConfigExample::handleRequest"] - DurableConfig: - ExecutionTimeout: 300 - RetentionPeriodInDays: 7 - Policies: - - Statement: - - Effect: Allow - Action: - - lambda:CheckpointDurableExecutions - - lambda:GetDurableExecutionState - Resource: !Sub "arn:aws:lambda:${AWS::Region}:${AWS::AccountId}:function:map-config-example${FunctionNameSuffix}" + Resource: !Sub "arn:aws:lambda:${AWS::Region}:${AWS::AccountId}:function:complex-map-example${FunctionNameSuffix}" Metadata: Dockerfile: !Ref DockerFile DockerContext: ../ @@ -696,19 +671,11 @@ Outputs: Description: Simple Map Example Function Name Value: !Ref SimpleMapExampleFunction - MapErrorHandlingExampleFunction: - Description: Map Error Handling Example Function ARN - Value: !GetAtt MapErrorHandlingExampleFunction.Arn - - MapErrorHandlingExampleFunctionName: - Description: Map Error Handling Example Function Name - Value: !Ref MapErrorHandlingExampleFunction - - MapConfigExampleFunction: - Description: Map Config Example Function ARN - Value: !GetAtt MapConfigExampleFunction.Arn + ComplexMapExampleFunction: + Description: Complex Map Example Function ARN + Value: !GetAtt ComplexMapExampleFunction.Arn - MapConfigExampleFunctionName: - Description: Map Config Example Function Name - Value: !Ref MapConfigExampleFunction + ComplexMapExampleFunctionName: + Description: Complex Map Example Function Name + Value: !Ref ComplexMapExampleFunction diff --git a/sdk-integration-tests/src/test/java/software/amazon/lambda/durable/MapIntegrationTest.java b/sdk-integration-tests/src/test/java/software/amazon/lambda/durable/MapIntegrationTest.java index b141d5285..e270bcdaa 100644 --- a/sdk-integration-tests/src/test/java/software/amazon/lambda/durable/MapIntegrationTest.java +++ b/sdk-integration-tests/src/test/java/software/amazon/lambda/durable/MapIntegrationTest.java @@ -4,12 +4,14 @@ import static org.junit.jupiter.api.Assertions.*; +import java.time.Duration; import java.util.ArrayList; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; import org.junit.jupiter.api.Test; import software.amazon.lambda.durable.model.CompletionReason; import software.amazon.lambda.durable.model.ExecutionStatus; +import software.amazon.lambda.durable.model.MapResultItem; import software.amazon.lambda.durable.testing.LocalDurableTestRunner; class MapIntegrationTest { @@ -72,7 +74,7 @@ void testMapPartialFailure_failedItemDoesNotPreventOthers() { assertEquals("A", result.getResult(0)); assertNull(result.getResult(1)); assertNotNull(result.getError(1)); - assertTrue(result.getError(1).getMessage().contains("item failed")); + assertTrue(result.getError(1).errorMessage().contains("item failed")); assertEquals("C", result.getResult(2)); // successful items have no error @@ -111,10 +113,10 @@ void testMapMultipleFailures_allCapturedAtCorrectIndices() { // Failed items at correct indices assertNull(result.getResult(1)); assertNotNull(result.getError(1)); - assertTrue(result.getError(1).getMessage().contains("bad1")); + assertTrue(result.getError(1).errorMessage().contains("bad1")); assertNull(result.getResult(3)); assertNotNull(result.getError(3)); - assertTrue(result.getError(3).getMessage().contains("bad2")); + assertTrue(result.getError(3).errorMessage().contains("bad2")); assertEquals(2, result.succeeded().size()); assertEquals(2, result.failed().size()); @@ -143,8 +145,8 @@ void testMapAllItemsFail() { assertNull(result.getResult(i)); assertNotNull(result.getError(i)); } - assertTrue(result.getError(0).getMessage().contains("fail-x")); - assertTrue(result.getError(1).getMessage().contains("fail-y")); + assertTrue(result.getError(0).errorMessage().contains("fail-x")); + assertTrue(result.getError(1).errorMessage().contains("fail-y")); return "done"; }); @@ -231,10 +233,72 @@ void testMapWithMaxConcurrency2_limitedConcurrency() { } @Test - void testMapWithMaxConcurrencyNull_unlimitedConcurrency() { + void testMapWithToleratedFailureCount_earlyTermination() { + var runner = LocalDurableTestRunner.create(String.class, (input, context) -> { + var items = List.of("ok", "FAIL1", "FAIL2", "ok2", "ok3"); + var config = MapConfig.builder() + .maxConcurrency(1) + .completionConfig(CompletionConfig.toleratedFailureCount(1)) + .build(); + var result = context.map( + "tolerated-fail", + items, + String.class, + (item, index, ctx) -> { + if (item.startsWith("FAIL")) { + throw new RuntimeException("failed: " + item); + } + return item.toUpperCase(); + }, + config); + + assertEquals(CompletionReason.FAILURE_TOLERANCE_EXCEEDED, result.completionReason()); + assertFalse(result.allSucceeded()); + assertEquals(5, result.size()); + assertEquals("OK", result.getResult(0)); + assertNull(result.getResult(1)); + assertNotNull(result.getError(1)); + assertNull(result.getResult(2)); + assertNotNull(result.getError(2)); + + return "done"; + }); + + var result = runner.runUntilComplete("test"); + assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus()); + } + + @Test + void testMapWithMinSuccessful_earlyTermination() { + var runner = LocalDurableTestRunner.create(String.class, (input, context) -> { + var items = List.of("a", "b", "c", "d", "e"); + var config = MapConfig.builder() + .maxConcurrency(1) + .completionConfig(CompletionConfig.minSuccessful(2)) + .build(); + var result = context.map( + "min-successful", items, String.class, (item, index, ctx) -> item.toUpperCase(), config); + + assertEquals(CompletionReason.MIN_SUCCESSFUL_REACHED, result.completionReason()); + assertEquals(5, result.size()); + assertEquals("A", result.getResult(0)); + assertEquals("B", result.getResult(1)); + + return "done"; + }); + + var result = runner.runUntilComplete("test"); + assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus()); + } + + @Test + void testMapReplayAfterInterruption_cachedResultsUsed() { + var executionCounts = new AtomicInteger(0); + var runner = LocalDurableTestRunner.create(String.class, (input, context) -> { var items = List.of("a", "b", "c"); - var result = context.map("unlimited-map", items, String.class, (item, index, ctx) -> { + var result = context.map("replay-map", items, String.class, (item, index, ctx) -> { + executionCounts.incrementAndGet(); return item.toUpperCase(); }); @@ -247,71 +311,232 @@ void testMapWithMaxConcurrencyNull_unlimitedConcurrency() { return String.join(",", result.results()); }); + var result1 = runner.runUntilComplete("test"); + assertEquals(ExecutionStatus.SUCCEEDED, result1.getStatus()); + assertEquals("A,B,C", result1.getResult(String.class)); + var firstRunCount = executionCounts.get(); + assertTrue(firstRunCount >= 3, "Expected at least 3 executions on first run but got " + firstRunCount); + + var result2 = runner.run("test"); + assertEquals(ExecutionStatus.SUCCEEDED, result2.getStatus()); + assertEquals("A,B,C", result2.getResult(String.class)); + assertEquals(firstRunCount, executionCounts.get(), "Map functions should not re-execute on replay"); + } + + @Test + void testNestedMap_mapInsideMapBranch() { + var runner = LocalDurableTestRunner.create(String.class, (input, context) -> { + var outerItems = List.of("group1", "group2"); + var outerResult = context.map("outer-map", outerItems, String.class, (group, outerIndex, outerCtx) -> { + var innerItems = List.of(group + "-a", group + "-b"); + var innerResult = outerCtx.map( + "inner-map-" + outerIndex, + innerItems, + String.class, + (item, innerIndex, innerCtx) -> item.toUpperCase()); + + assertTrue(innerResult.allSucceeded()); + return String.join("+", innerResult.results()); + }); + + assertTrue(outerResult.allSucceeded()); + assertEquals(2, outerResult.size()); + assertEquals("GROUP1-A+GROUP1-B", outerResult.getResult(0)); + assertEquals("GROUP2-A+GROUP2-B", outerResult.getResult(1)); + + var combined = new ArrayList(); + for (int i = 0; i < outerResult.size(); i++) { + combined.add(outerResult.getResult(i)); + } + return String.join("|", combined); + }); + var result = runner.runUntilComplete("test"); assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus()); - assertEquals("A,B,C", result.getResult(String.class)); + assertEquals("GROUP1-A+GROUP1-B|GROUP2-A+GROUP2-B", result.getResult(String.class)); } @Test - void testMapWithMaxConcurrency1_partialFailure() { + void testMapWithWaitInsideBranches() { var runner = LocalDurableTestRunner.create(String.class, (input, context) -> { - var items = List.of("a", "FAIL", "c"); - var config = MapConfig.builder().maxConcurrency(1).build(); + var items = List.of("a", "b"); + var result = context.map("map-with-wait", items, String.class, (item, index, ctx) -> { + var stepped = ctx.step("process-" + index, String.class, stepCtx -> item.toUpperCase()); + ctx.wait("pause-" + index, Duration.ofSeconds(1)); + return stepped + "-done"; + }); + + assertTrue(result.allSucceeded()); + assertEquals("A-done", result.getResult(0)); + assertEquals("B-done", result.getResult(1)); + return String.join(",", result.results()); + }); + + var result = runner.runUntilComplete("test"); + assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus()); + assertEquals("A-done,B-done", result.getResult(String.class)); + } + + @Test + void testMapAsyncWithInterleavedWork() { + var runner = LocalDurableTestRunner.create(String.class, (input, context) -> { + var items = List.of("x", "y"); + var future = context.mapAsync("async-map", items, String.class, (item, index, ctx) -> { + return ctx.step("process-" + index, String.class, stepCtx -> item.toUpperCase()); + }); + + // Do other work while map runs + var other = context.step("other-work", String.class, stepCtx -> "OTHER"); + + // Now collect map results + var mapResult = future.get(); + assertTrue(mapResult.allSucceeded()); + + return other + ":" + String.join(",", mapResult.results()); + }); + + var result = runner.runUntilComplete("test"); + assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus()); + assertEquals("OTHER:X,Y", result.getResult(String.class)); + } + + @Test + void testMapUnlimitedConcurrencyWithToleratedFailureCount() { + var runner = LocalDurableTestRunner.create(String.class, (input, context) -> { + var items = List.of("ok1", "FAIL1", "ok2", "FAIL2", "ok3"); + var config = MapConfig.builder() + .completionConfig(CompletionConfig.toleratedFailureCount(1)) + .build(); var result = context.map( - "sequential-partial-fail", + "unlimited-tolerated", items, String.class, (item, index, ctx) -> { - if ("FAIL".equals(item)) { - throw new RuntimeException("item failed"); + if (item.startsWith("FAIL")) { + throw new RuntimeException("failed: " + item); } return item.toUpperCase(); }, config); + assertEquals(CompletionReason.FAILURE_TOLERANCE_EXCEEDED, result.completionReason()); assertFalse(result.allSucceeded()); - assertEquals(3, result.size()); - assertEquals("A", result.getResult(0)); - assertNull(result.getResult(1)); - assertNotNull(result.getError(1)); - assertEquals("C", result.getResult(2)); + return "done"; + }); + + var result = runner.runUntilComplete("test"); + assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus()); + } + + @Test + void testMapReplayWithFailedBranches() { + var executionCount = new AtomicInteger(0); + + var runner = LocalDurableTestRunner.create(String.class, (input, context) -> { + var items = List.of("ok", "FAIL", "ok2"); + var result = context.map("replay-fail-map", items, String.class, (item, index, ctx) -> { + executionCount.incrementAndGet(); + if ("FAIL".equals(item)) { + throw new RuntimeException("item failed"); + } + return item.toUpperCase(); + }); + // Errors survive replay since they are stored as ErrorObject (not raw Throwable) + assertEquals("OK", result.getResult(0)); + assertEquals("OK2", result.getResult(2)); return "done"; }); + var result1 = runner.runUntilComplete("test"); + assertEquals(ExecutionStatus.SUCCEEDED, result1.getStatus()); + var firstRunCount = executionCount.get(); + + // Replay — functions should not re-execute + var result2 = runner.run("test"); + assertEquals(ExecutionStatus.SUCCEEDED, result2.getStatus()); + assertEquals(firstRunCount, executionCount.get(), "Map functions should not re-execute on replay"); + } + + @Test + void testMapWithSingleItem() { + var runner = LocalDurableTestRunner.create(String.class, (input, context) -> { + var items = List.of("only"); + var result = context.map("single-item", items, String.class, (item, index, ctx) -> { + return ctx.step("process", String.class, stepCtx -> item.toUpperCase()); + }); + + assertTrue(result.allSucceeded()); + assertEquals(1, result.size()); + assertEquals("ONLY", result.getResult(0)); + assertEquals(0, result.failed().size()); + return result.getResult(0); + }); + var result = runner.runUntilComplete("test"); assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus()); + assertEquals("ONLY", result.getResult(String.class)); } @Test - void testMapWithToleratedFailureCount_earlyTermination() { + void testStepBeforeAndAfterMap() { var runner = LocalDurableTestRunner.create(String.class, (input, context) -> { - var items = List.of("ok", "FAIL1", "FAIL2", "ok2", "ok3"); + var before = context.step("before", String.class, stepCtx -> "BEFORE"); + + var items = List.of("a", "b"); + var mapResult = context.map("middle-map", items, String.class, (item, index, ctx) -> item.toUpperCase()); + + var after = context.step("after", String.class, stepCtx -> "AFTER"); + + return before + ":" + String.join(",", mapResult.results()) + ":" + after; + }); + + var result = runner.runUntilComplete("test"); + assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus()); + assertEquals("BEFORE:A,B:AFTER", result.getResult(String.class)); + } + + @Test + void testSequentialMaps() { + var runner = LocalDurableTestRunner.create(String.class, (input, context) -> { + var result1 = + context.map("map-1", List.of("a", "b"), String.class, (item, index, ctx) -> item.toUpperCase()); + var result2 = context.map("map-2", List.of("x", "y"), String.class, (item, index, ctx) -> item + "!"); + + return String.join(",", result1.results()) + "|" + String.join(",", result2.results()); + }); + + var result = runner.runUntilComplete("test"); + assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus()); + assertEquals("A,B|x!,y!", result.getResult(String.class)); + } + + @Test + void testMapWithAllSuccessfulCompletionConfig_stopsOnFirstFailure() { + var runner = LocalDurableTestRunner.create(String.class, (input, context) -> { + var items = List.of("ok1", "FAIL", "ok2", "ok3"); var config = MapConfig.builder() .maxConcurrency(1) - .completionConfig(CompletionConfig.toleratedFailureCount(1)) + .completionConfig(CompletionConfig.allSuccessful()) .build(); var result = context.map( - "tolerated-fail", + "all-successful", items, String.class, (item, index, ctx) -> { if (item.startsWith("FAIL")) { - throw new RuntimeException("failed: " + item); + throw new RuntimeException("failed"); } return item.toUpperCase(); }, config); assertEquals(CompletionReason.FAILURE_TOLERANCE_EXCEEDED, result.completionReason()); - assertFalse(result.allSucceeded()); - assertEquals(5, result.size()); - assertEquals("OK", result.getResult(0)); - assertNull(result.getResult(1)); + assertEquals("OK1", result.getResult(0)); assertNotNull(result.getError(1)); - assertNull(result.getResult(2)); - assertNotNull(result.getError(2)); - + // Items after the failure should be NOT_STARTED + assertEquals(MapResultItem.Status.NOT_STARTED, result.getItem(2).status()); + assertEquals(MapResultItem.Status.NOT_STARTED, result.getItem(3).status()); return "done"; }); @@ -320,21 +545,84 @@ void testMapWithToleratedFailureCount_earlyTermination() { } @Test - void testMapWithMinSuccessful_earlyTermination() { + void testMapWithWaitInsideBranches_replay() { + var executionCount = new AtomicInteger(0); + var runner = LocalDurableTestRunner.create(String.class, (input, context) -> { - var items = List.of("a", "b", "c", "d", "e"); + var items = List.of("a", "b"); + var result = context.map("wait-replay-map", items, String.class, (item, index, ctx) -> { + executionCount.incrementAndGet(); + var stepped = ctx.step("process-" + index, String.class, stepCtx -> item.toUpperCase()); + ctx.wait("pause-" + index, Duration.ofSeconds(1)); + return stepped + "-done"; + }); + + assertTrue(result.allSucceeded()); + return String.join(",", result.results()); + }); + + var result1 = runner.runUntilComplete("test"); + assertEquals(ExecutionStatus.SUCCEEDED, result1.getStatus()); + assertEquals("A-done,B-done", result1.getResult(String.class)); + var firstRunCount = executionCount.get(); + + // Replay — should use cached results, not re-execute + var result2 = runner.run("test"); + assertEquals(ExecutionStatus.SUCCEEDED, result2.getStatus()); + assertEquals("A-done,B-done", result2.getResult(String.class)); + assertEquals(firstRunCount, executionCount.get(), "Map functions should not re-execute on replay"); + } + + @Test + void testNestedMap_replay() { + var executionCount = new AtomicInteger(0); + + var runner = LocalDurableTestRunner.create(String.class, (input, context) -> { + var outerItems = List.of("g1", "g2"); + var outerResult = context.map("outer", outerItems, String.class, (group, outerIdx, outerCtx) -> { + var innerItems = List.of(group + "-a", group + "-b"); + var innerResult = + outerCtx.map("inner-" + outerIdx, innerItems, String.class, (item, innerIdx, innerCtx) -> { + executionCount.incrementAndGet(); + return item.toUpperCase(); + }); + return String.join("+", innerResult.results()); + }); + + return String.join("|", outerResult.results()); + }); + + var result1 = runner.runUntilComplete("test"); + assertEquals(ExecutionStatus.SUCCEEDED, result1.getStatus()); + assertEquals("G1-A+G1-B|G2-A+G2-B", result1.getResult(String.class)); + var firstRunCount = executionCount.get(); + + var result2 = runner.run("test"); + assertEquals(ExecutionStatus.SUCCEEDED, result2.getStatus()); + assertEquals("G1-A+G1-B|G2-A+G2-B", result2.getResult(String.class)); + assertEquals(firstRunCount, executionCount.get(), "Nested map should not re-execute on replay"); + } + + @Test + void testMapWithToleratedFailurePercentage() { + var runner = LocalDurableTestRunner.create(String.class, (input, context) -> { + var items = List.of("ok1", "FAIL1", "ok2", "FAIL2", "ok3", "FAIL3", "ok4"); var config = MapConfig.builder() - .maxConcurrency(1) - .completionConfig(CompletionConfig.minSuccessful(2)) + .completionConfig(CompletionConfig.toleratedFailurePercentage(0.3)) .build(); var result = context.map( - "min-successful", items, String.class, (item, index, ctx) -> item.toUpperCase(), config); - - assertEquals(CompletionReason.MIN_SUCCESSFUL_REACHED, result.completionReason()); - assertEquals(5, result.size()); - assertEquals("A", result.getResult(0)); - assertEquals("B", result.getResult(1)); + "pct-fail", + items, + String.class, + (item, index, ctx) -> { + if (item.startsWith("FAIL")) { + throw new RuntimeException("failed: " + item); + } + return item.toUpperCase(); + }, + config); + assertEquals(CompletionReason.FAILURE_TOLERANCE_EXCEEDED, result.completionReason()); return "done"; }); @@ -343,120 +631,199 @@ void testMapWithMinSuccessful_earlyTermination() { } @Test - void testMapWithFirstSuccessful_stopsAfterFirstSuccess() { + void testMapAsyncWithWaitInsideBranches() { var runner = LocalDurableTestRunner.create(String.class, (input, context) -> { - var items = List.of("a", "b", "c"); - var config = MapConfig.builder() - .maxConcurrency(1) - .completionConfig(CompletionConfig.firstSuccessful()) - .build(); - var result = context.map( - "first-successful", items, String.class, (item, index, ctx) -> item.toUpperCase(), config); + var items = List.of("a", "b"); + var future = context.mapAsync("async-wait-map", items, String.class, (item, index, ctx) -> { + var stepped = ctx.step("process-" + index, String.class, stepCtx -> item.toUpperCase()); + ctx.wait("pause-" + index, Duration.ofSeconds(1)); + return stepped + "-done"; + }); - assertEquals(CompletionReason.MIN_SUCCESSFUL_REACHED, result.completionReason()); - assertEquals(3, result.size()); - assertEquals("A", result.getResult(0)); - assertNull(result.getResult(1)); - assertNull(result.getResult(2)); + var other = context.step("other", String.class, stepCtx -> "OTHER"); + var mapResult = future.get(); + assertTrue(mapResult.allSucceeded()); - return "done"; + return other + ":" + String.join(",", mapResult.results()); }); var result = runner.runUntilComplete("test"); assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus()); + assertEquals("OTHER:A-done,B-done", result.getResult(String.class)); } @Test - void testMapReplayAfterInterruption_cachedResultsUsed() { - var executionCounts = new AtomicInteger(0); + void testMapWithCustomSerDes() { + var customSerDes = new software.amazon.lambda.durable.serde.JacksonSerDes(); + var runner = LocalDurableTestRunner.create(String.class, (input, context) -> { + var items = List.of("a", "b"); + var config = MapConfig.builder().serDes(customSerDes).build(); + var result = context.map( + "custom-serdes-map", items, String.class, (item, index, ctx) -> item.toUpperCase(), config); + + assertTrue(result.allSucceeded()); + return String.join(",", result.results()); + }); + + var result = runner.runUntilComplete("test"); + assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus()); + assertEquals("A,B", result.getResult(String.class)); + } + @Test + void testMapWithGenericResultType() { var runner = LocalDurableTestRunner.create(String.class, (input, context) -> { - var items = List.of("a", "b", "c"); - var result = context.map("replay-map", items, String.class, (item, index, ctx) -> { - executionCounts.incrementAndGet(); - return item.toUpperCase(); + var items = List.of("a,b", "c,d"); + var result = context.map("generic-map", items, new TypeToken>() {}, (item, index, ctx) -> { + return ctx.step( + "split-" + index, new TypeToken>() {}, stepCtx -> List.of(item.split(","))); }); assertTrue(result.allSucceeded()); - assertEquals(3, result.size()); - assertEquals("A", result.getResult(0)); - assertEquals("B", result.getResult(1)); - assertEquals("C", result.getResult(2)); + assertEquals(List.of("a", "b"), result.getResult(0)); + assertEquals(List.of("c", "d"), result.getResult(1)); + return "ok"; + }); + + var result = runner.runUntilComplete("test"); + assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus()); + } + + @Test + void testMapWithWaitInsideBranches_maxConcurrency1() { + var runner = LocalDurableTestRunner.create(String.class, (input, context) -> { + var items = List.of("a", "b"); + var config = MapConfig.builder().maxConcurrency(1).build(); + var result = context.map( + "seq-wait-map", + items, + String.class, + (item, index, ctx) -> { + var stepped = ctx.step("step-" + index, String.class, stepCtx -> item.toUpperCase()); + ctx.wait("pause-" + index, Duration.ofSeconds(1)); + return stepped + "-done"; + }, + config); + assertTrue(result.allSucceeded()); + assertEquals(2, result.size()); + assertEquals("A-done", result.getResult(0)); + assertEquals("B-done", result.getResult(1)); return String.join(",", result.results()); }); + // With maxConcurrency=1, each invocation processes one branch's wait. + // Use explicit run() + advanceTime() loop due to a known thread coordination race + // (same as ChildContextIntegrationTest.twoAsyncChildContextsBothWaitSuspendAndResume). + for (int i = 0; i < 10; i++) { + var result = runner.run("test"); + if (result.getStatus() != ExecutionStatus.PENDING) { + assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus()); + assertEquals("A-done,B-done", result.getResult(String.class)); + return; + } + runner.advanceTime(); + } + fail("Expected SUCCEEDED within 10 invocations"); + } + + @Test + void testMapWithMinSuccessful_replay() { + var executionCount = new AtomicInteger(0); + + var runner = LocalDurableTestRunner.create(String.class, (input, context) -> { + var items = List.of("a", "b", "c", "d", "e"); + var config = MapConfig.builder() + .maxConcurrency(1) + .completionConfig(CompletionConfig.minSuccessful(2)) + .build(); + var result = context.map( + "min-success-replay", + items, + String.class, + (item, index, ctx) -> { + executionCount.incrementAndGet(); + return item.toUpperCase(); + }, + config); + + assertEquals(CompletionReason.MIN_SUCCESSFUL_REACHED, result.completionReason()); + assertEquals("A", result.getResult(0)); + assertEquals("B", result.getResult(1)); + return "done"; + }); + var result1 = runner.runUntilComplete("test"); assertEquals(ExecutionStatus.SUCCEEDED, result1.getStatus()); - assertEquals("A,B,C", result1.getResult(String.class)); - var firstRunCount = executionCounts.get(); - assertTrue(firstRunCount >= 3, "Expected at least 3 executions on first run but got " + firstRunCount); + var firstRunCount = executionCount.get(); + // Replay — small result path: deserialize MapResult from payload, no child replay var result2 = runner.run("test"); assertEquals(ExecutionStatus.SUCCEEDED, result2.getStatus()); - assertEquals("A,B,C", result2.getResult(String.class)); - assertEquals(firstRunCount, executionCounts.get(), "Map functions should not re-execute on replay"); + assertEquals(firstRunCount, executionCount.get(), "Map functions should not re-execute on replay"); } @Test - void testMapReplayWithSteps_cachedStepResultsUsed() { - var stepExecutionCount = new AtomicInteger(0); + void testMapAsyncWithInterleavedWork_replay() { + var executionCount = new AtomicInteger(0); var runner = LocalDurableTestRunner.create(String.class, (input, context) -> { - var items = List.of("hello", "world"); - var result = context.map("replay-steps-map", items, String.class, (item, index, ctx) -> { - return ctx.step("step-" + index, String.class, stepCtx -> { - stepExecutionCount.incrementAndGet(); - return item.toUpperCase(); - }); + var items = List.of("x", "y"); + var future = context.mapAsync("async-replay-map", items, String.class, (item, index, ctx) -> { + executionCount.incrementAndGet(); + return ctx.step("process-" + index, String.class, stepCtx -> item.toUpperCase()); }); - assertTrue(result.allSucceeded()); - return String.join(" ", result.results()); + var other = context.step("other-work", String.class, stepCtx -> "OTHER"); + var mapResult = future.get(); + assertTrue(mapResult.allSucceeded()); + + return other + ":" + String.join(",", mapResult.results()); }); var result1 = runner.runUntilComplete("test"); assertEquals(ExecutionStatus.SUCCEEDED, result1.getStatus()); - assertEquals("HELLO WORLD", result1.getResult(String.class)); - var firstRunStepCount = stepExecutionCount.get(); - assertTrue(firstRunStepCount >= 2, "Expected at least 2 step executions but got " + firstRunStepCount); + assertEquals("OTHER:X,Y", result1.getResult(String.class)); + var firstRunCount = executionCount.get(); + // Replay — async map + interleaved step should all use cached results var result2 = runner.run("test"); assertEquals(ExecutionStatus.SUCCEEDED, result2.getStatus()); - assertEquals("HELLO WORLD", result2.getResult(String.class)); - assertEquals(firstRunStepCount, stepExecutionCount.get(), "Steps should not re-execute on replay"); + assertEquals("OTHER:X,Y", result2.getResult(String.class)); + assertEquals(firstRunCount, executionCount.get(), "Map functions should not re-execute on replay"); } @Test - void testNestedMap_mapInsideMapBranch() { - var runner = LocalDurableTestRunner.create(String.class, (input, context) -> { - var outerItems = List.of("group1", "group2"); - var outerResult = context.map("outer-map", outerItems, String.class, (group, outerIndex, outerCtx) -> { - var innerItems = List.of(group + "-a", group + "-b"); - var innerResult = outerCtx.map( - "inner-map-" + outerIndex, - innerItems, - String.class, - (item, innerIndex, innerCtx) -> item.toUpperCase()); + void testMapWithLargeResult_replayChildren() { + var executionCount = new AtomicInteger(0); + // Generate items that produce results exceeding 256KB total to trigger replayChildren path + var items = new ArrayList(); + for (int i = 0; i < 100; i++) { + items.add("item-" + i); + } - assertTrue(innerResult.allSucceeded()); - return String.join("+", innerResult.results()); + var runner = LocalDurableTestRunner.create(String.class, (input, context) -> { + var result = context.map("large-result-map", items, String.class, (item, index, ctx) -> { + executionCount.incrementAndGet(); + // Each item returns ~3KB string to push total well over 256KB + return item + "-" + "x".repeat(3000); }); - assertTrue(outerResult.allSucceeded()); - assertEquals(2, outerResult.size()); - assertEquals("GROUP1-A+GROUP1-B", outerResult.getResult(0)); - assertEquals("GROUP2-A+GROUP2-B", outerResult.getResult(1)); - - var combined = new ArrayList(); - for (int i = 0; i < outerResult.size(); i++) { - combined.add(outerResult.getResult(i)); - } - return String.join("|", combined); + assertTrue(result.allSucceeded()); + assertEquals(100, result.size()); + assertTrue(result.getResult(0).startsWith("item-0-")); + assertTrue(result.getResult(99).startsWith("item-99-")); + return "ok"; }); - var result = runner.runUntilComplete("test"); - assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus()); - assertEquals("GROUP1-A+GROUP1-B|GROUP2-A+GROUP2-B", result.getResult(String.class)); + var result1 = runner.runUntilComplete("test"); + assertEquals(ExecutionStatus.SUCCEEDED, result1.getStatus()); + var firstRunCount = executionCount.get(); + assertTrue(firstRunCount >= 100); + + // Replay — large result path: replayChildren=true, children replay from cache + var result2 = runner.run("test"); + assertEquals(ExecutionStatus.SUCCEEDED, result2.getStatus()); + assertEquals(firstRunCount, executionCount.get(), "Map functions should not re-execute on replay"); } } diff --git a/sdk/src/main/java/software/amazon/lambda/durable/CompletionConfig.java b/sdk/src/main/java/software/amazon/lambda/durable/CompletionConfig.java index df8de8f99..fc52cd1a3 100644 --- a/sdk/src/main/java/software/amazon/lambda/durable/CompletionConfig.java +++ b/sdk/src/main/java/software/amazon/lambda/durable/CompletionConfig.java @@ -36,16 +36,26 @@ public static CompletionConfig firstSuccessful() { /** Complete when the specified number of items have succeeded. */ public static CompletionConfig minSuccessful(int count) { + if (count < 1) { + throw new IllegalArgumentException("minSuccessful must be at least 1, got: " + count); + } return new CompletionConfig(count, null, null); } /** Complete when more than the specified number of failures have occurred. */ public static CompletionConfig toleratedFailureCount(int count) { + if (count < 0) { + throw new IllegalArgumentException("toleratedFailureCount must be non-negative, got: " + count); + } return new CompletionConfig(null, count, null); } /** Complete when the failure percentage exceeds the specified threshold (0.0 to 1.0). */ public static CompletionConfig toleratedFailurePercentage(double percentage) { + if (percentage < 0.0 || percentage > 1.0) { + throw new IllegalArgumentException( + "toleratedFailurePercentage must be between 0.0 and 1.0, got: " + percentage); + } return new CompletionConfig(null, null, percentage); } diff --git a/sdk/src/main/java/software/amazon/lambda/durable/MapConfig.java b/sdk/src/main/java/software/amazon/lambda/durable/MapConfig.java index fa0e8b397..d4f6b583c 100644 --- a/sdk/src/main/java/software/amazon/lambda/durable/MapConfig.java +++ b/sdk/src/main/java/software/amazon/lambda/durable/MapConfig.java @@ -71,6 +71,9 @@ public Builder serDes(SerDes serDes) { } public MapConfig build() { + if (maxConcurrency != null && maxConcurrency < 1) { + throw new IllegalArgumentException("maxConcurrency must be at least 1, got: " + maxConcurrency); + } return new MapConfig(this); } } diff --git a/sdk/src/main/java/software/amazon/lambda/durable/MapFunction.java b/sdk/src/main/java/software/amazon/lambda/durable/MapFunction.java index a349ac9ea..041dccfc9 100644 --- a/sdk/src/main/java/software/amazon/lambda/durable/MapFunction.java +++ b/sdk/src/main/java/software/amazon/lambda/durable/MapFunction.java @@ -22,7 +22,6 @@ public interface MapFunction { * @param index the zero-based index of the item in the input collection * @param context the durable context for this item's execution * @return the result of processing the item - * @throws Exception if the function fails */ - O apply(I item, int index, DurableContext context) throws Exception; + O apply(I item, int index, DurableContext context); } diff --git a/sdk/src/main/java/software/amazon/lambda/durable/context/DurableContextImpl.java b/sdk/src/main/java/software/amazon/lambda/durable/context/DurableContextImpl.java index f6a510191..7e6b61c8a 100644 --- a/sdk/src/main/java/software/amazon/lambda/durable/context/DurableContextImpl.java +++ b/sdk/src/main/java/software/amazon/lambda/durable/context/DurableContextImpl.java @@ -540,7 +540,13 @@ public DurableFuture> mapAsync( var itemList = List.copyOf(items); var operationId = nextOperationId(); - var operation = new MapOperation<>(operationId, name, itemList, function, resultType, config, this); + var operation = new MapOperation<>( + OperationIdentifier.of(operationId, name, OperationType.CONTEXT, OperationSubType.MAP), + itemList, + function, + resultType, + config, + this); operation.execute(); return operation; } diff --git a/sdk/src/main/java/software/amazon/lambda/durable/model/MapError.java b/sdk/src/main/java/software/amazon/lambda/durable/model/MapError.java new file mode 100644 index 000000000..478a48edd --- /dev/null +++ b/sdk/src/main/java/software/amazon/lambda/durable/model/MapError.java @@ -0,0 +1,17 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 +package software.amazon.lambda.durable.model; + +import java.util.List; + +/** + * Error details for a failed map item. + * + *

Stores error information as plain strings so that {@link MapResult} can be serialized through the user's SerDes + * without requiring AWS SDK-specific Jackson modules. + * + * @param errorType the fully qualified exception class name + * @param errorMessage the error message + * @param stackTrace the stack trace frames, or null + */ +public record MapError(String errorType, String errorMessage, List stackTrace) {} diff --git a/sdk/src/main/java/software/amazon/lambda/durable/model/MapResult.java b/sdk/src/main/java/software/amazon/lambda/durable/model/MapResult.java index 1d735d8a2..2c7009fcb 100644 --- a/sdk/src/main/java/software/amazon/lambda/durable/model/MapResult.java +++ b/sdk/src/main/java/software/amazon/lambda/durable/model/MapResult.java @@ -12,24 +12,19 @@ * item is represented as a {@link MapResultItem} containing its status, result, and error. Includes the * {@link CompletionReason} indicating why the operation completed. * - *

When serialized for checkpointing, only status and result fields of each item are included. Error fields are - * transient because Throwable objects are not reliably serializable. On replay from a small-result checkpoint, errors - * will be null; on replay from a large-result checkpoint (replayChildren), errors are reconstructed from individual - * child context checkpoints. + *

Errors are stored as {@link MapError} rather than raw Throwable, so they survive serialization across + * checkpoint-and-replay cycles without requiring AWS SDK-specific Jackson modules. * + * @param items ordered result items from the map operation + * @param completionReason why the operation completed * @param the result type of each item */ -public class MapResult { +public record MapResult(List> items, CompletionReason completionReason) { - private List> items; - private CompletionReason completionReason; - - /** Default constructor for deserialization. */ - public MapResult() {} - - public MapResult(List> items, CompletionReason completionReason) { - this.items = items != null ? List.copyOf(items) : Collections.emptyList(); - this.completionReason = completionReason != null ? completionReason : CompletionReason.ALL_COMPLETED; + /** Compact constructor that applies defensive copy and defaults. */ + public MapResult { + items = items != null ? List.copyOf(items) : Collections.emptyList(); + completionReason = completionReason != null ? completionReason : CompletionReason.ALL_COMPLETED; } /** Returns an empty MapResult with no items. */ @@ -48,23 +43,13 @@ public T getResult(int index) { } /** Returns the error at the given index, or null if that item succeeded or was not started. */ - public Throwable getError(int index) { + public MapError getError(int index) { return items.get(index).error(); } - /** Returns true if all items succeeded (no errors). */ + /** Returns true if all items succeeded (no failures or not-started items). */ public boolean allSucceeded() { - return items.stream().noneMatch(item -> item.error() != null); - } - - /** Returns the reason the operation completed. */ - public CompletionReason getCompletionReason() { - return completionReason; - } - - /** Returns all result items as an unmodifiable list. */ - public List> getItems() { - return items; + return items.stream().allMatch(item -> item.status() == MapResultItem.Status.SUCCEEDED); } /** Returns the number of items in this result. */ @@ -72,31 +57,25 @@ public int size() { return items.size(); } - // Convenience accessors matching the original API style - - /** Returns the reason the operation completed. */ - public CompletionReason completionReason() { - return completionReason; - } - - /** Returns all result items as an unmodifiable list. */ - public List> items() { - return items; - } - /** Returns all results as an unmodifiable list (nulls for failed/not-started items). */ public List results() { return Collections.unmodifiableList( items.stream().map(MapResultItem::result).toList()); } - /** Returns results that succeeded (non-null results). */ + /** Returns results from items that succeeded (includes null results from successful items). */ public List succeeded() { - return items.stream().map(MapResultItem::result).filter(r -> r != null).toList(); + return items.stream() + .filter(item -> item.status() == MapResultItem.Status.SUCCEEDED) + .map(MapResultItem::result) + .toList(); } - /** Returns errors that occurred (non-null errors). */ - public List failed() { - return items.stream().map(MapResultItem::error).filter(e -> e != null).toList(); + /** Returns errors from items that failed. */ + public List failed() { + return items.stream() + .filter(item -> item.status() == MapResultItem.Status.FAILED) + .map(MapResultItem::error) + .toList(); } } diff --git a/sdk/src/main/java/software/amazon/lambda/durable/model/MapResultItem.java b/sdk/src/main/java/software/amazon/lambda/durable/model/MapResultItem.java index 1ae3b1652..86cb8ce79 100644 --- a/sdk/src/main/java/software/amazon/lambda/durable/model/MapResultItem.java +++ b/sdk/src/main/java/software/amazon/lambda/durable/model/MapResultItem.java @@ -5,34 +5,24 @@ /** * Represents the outcome of a single item in a map operation. * - *

Each item either succeeds with a result or fails with an error. The status field indicates which case applies. + *

Each item either succeeds with a result, fails with an error, or was never started. The status field indicates + * which case applies. * - *

When serialized for checkpointing, only status and result are included. The error field is transient because - * Throwable objects are not reliably serializable across different serializers. On replay from a small-result - * checkpoint, errors will be null; on replay from a large-result checkpoint (replayChildren), errors are reconstructed - * from individual child context checkpoints. + *

Errors are stored as {@link MapError} (plain strings) rather than raw Throwable, so they survive serialization + * across checkpoint-and-replay cycles without requiring AWS SDK-specific Jackson modules. * + * @param status the status of this item + * @param result the result value, or null if failed/not started + * @param error the error details, or null if succeeded/not started * @param the result type */ -public class MapResultItem { +public record MapResultItem(Status status, T result, MapError error) { /** Status of an individual map item. */ public enum Status { SUCCEEDED, - FAILED - } - - private Status status; - private T result; - private transient Throwable error; - - /** Default constructor for deserialization. */ - public MapResultItem() {} - - private MapResultItem(Status status, T result, Throwable error) { - this.status = status; - this.result = result; - this.error = error; + FAILED, + NOT_STARTED } /** Creates a successful result item. */ @@ -41,44 +31,12 @@ public static MapResultItem success(T result) { } /** Creates a failed result item. */ - public static MapResultItem failure(Throwable error) { + public static MapResultItem failure(MapError error) { return new MapResultItem<>(Status.FAILED, null, error); } - /** Creates an empty (not started) result item. */ + /** Creates a not-started result item. */ public static MapResultItem notStarted() { - return new MapResultItem<>(null, null, null); - } - - /** Returns the status of this item, or null if the item was never started. */ - public Status getStatus() { - return status; - } - - /** Returns the result, or null if the item failed or was not started. */ - public T getResult() { - return result; - } - - /** Returns the error, or null if the item succeeded or was not started. */ - public Throwable getError() { - return error; - } - - // Convenience accessors matching the original API style - - /** Returns the status of this item, or null if the item was never started. */ - public Status status() { - return status; - } - - /** Returns the result, or null if the item failed or was not started. */ - public T result() { - return result; - } - - /** Returns the error, or null if the item succeeded or was not started. */ - public Throwable error() { - return error; + return new MapResultItem<>(Status.NOT_STARTED, null, null); } } diff --git a/sdk/src/main/java/software/amazon/lambda/durable/operation/BaseConcurrentOperation.java b/sdk/src/main/java/software/amazon/lambda/durable/operation/BaseConcurrentOperation.java deleted file mode 100644 index a7abbfb6b..000000000 --- a/sdk/src/main/java/software/amazon/lambda/durable/operation/BaseConcurrentOperation.java +++ /dev/null @@ -1,334 +0,0 @@ -// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. -// SPDX-License-Identifier: Apache-2.0 -package software.amazon.lambda.durable.operation; - -import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Queue; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.function.Function; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import software.amazon.awssdk.services.lambda.model.ContextOptions; -import software.amazon.awssdk.services.lambda.model.Operation; -import software.amazon.awssdk.services.lambda.model.OperationAction; -import software.amazon.awssdk.services.lambda.model.OperationStatus; -import software.amazon.awssdk.services.lambda.model.OperationType; -import software.amazon.awssdk.services.lambda.model.OperationUpdate; -import software.amazon.lambda.durable.CompletionConfig; -import software.amazon.lambda.durable.DurableContext; -import software.amazon.lambda.durable.TypeToken; -import software.amazon.lambda.durable.context.DurableContextImpl; -import software.amazon.lambda.durable.execution.OperationIdGenerator; -import software.amazon.lambda.durable.model.CompletionReason; -import software.amazon.lambda.durable.model.OperationIdentifier; -import software.amazon.lambda.durable.model.OperationSubType; -import software.amazon.lambda.durable.serde.SerDes; - -/** - * Abstract base class for concurrent operations (map, parallel). - * - *

Provides the shared concurrent execution framework: root child context creation, queue-based concurrency limiting, - * success/failure tracking, completion criteria evaluation, and thread registration ordering. - * - *

Subclasses implement {@link #startBranches()} to create branches via {@link #branchInternal} and - * {@link #aggregateResults()} to collect branch results into the final result type. - * - * @param the aggregate result type (e.g., {@code MapResult}) - */ -public abstract class BaseConcurrentOperation extends BaseDurableOperation { - - private static final Logger logger = LoggerFactory.getLogger(BaseConcurrentOperation.class); - private static final int LARGE_RESULT_THRESHOLD = 256 * 1024; - - private final List> branches = new ArrayList<>(); - private final Queue> pendingQueue = new ConcurrentLinkedQueue<>(); - private final Set> startedBranches = ConcurrentHashMap.newKeySet(); - private final AtomicInteger activeBranches = new AtomicInteger(0); - private final AtomicInteger succeeded = new AtomicInteger(0); - private final AtomicInteger failed = new AtomicInteger(0); - private final Integer maxConcurrency; - private final CompletionConfig completionConfig; - private final OperationSubType subType; - private volatile CompletionReason completionReason; - private volatile boolean earlyTermination = false; - private DurableContextImpl rootContext; - private OperationIdGenerator operationIdGenerator; - - protected BaseConcurrentOperation( - String operationId, - String name, - OperationSubType subType, - Integer maxConcurrency, - CompletionConfig completionConfig, - TypeToken resultTypeToken, - SerDes resultSerDes, - DurableContextImpl durableContext) { - super( - OperationIdentifier.of(operationId, name, OperationType.CONTEXT, subType), - resultTypeToken, - resultSerDes, - durableContext); - this.subType = subType; - this.maxConcurrency = maxConcurrency; - this.completionConfig = completionConfig; - } - - // ========== lifecycle ========== - - @Override - protected void start() { - sendOperationUpdateAsync( - OperationUpdate.builder().action(OperationAction.START).subType(subType.getValue())); - this.rootContext = getContext().createChildContext(getOperationId(), getName()); - this.operationIdGenerator = new OperationIdGenerator(getOperationId()); - startBranches(); - } - - @Override - protected void replay(Operation existing) { - switch (existing.status()) { - case SUCCEEDED -> { - if (existing.contextDetails() != null - && Boolean.TRUE.equals(existing.contextDetails().replayChildren())) { - // Large result: reconstruct by replaying child contexts - this.rootContext = getContext().createChildContext(getOperationId(), getName()); - this.operationIdGenerator = new OperationIdGenerator(getOperationId()); - startBranches(); - } else { - markAlreadyCompleted(); - } - } - case FAILED -> markAlreadyCompleted(); - case STARTED -> { - // Interrupted mid-execution: resume from last checkpoint - this.rootContext = getContext().createChildContext(getOperationId(), getName()); - this.operationIdGenerator = new OperationIdGenerator(getOperationId()); - startBranches(); - } - default -> - terminateExecutionWithIllegalDurableOperationException( - "Unexpected concurrent operation status: " + existing.status()); - } - } - - // ========== abstract methods for subclasses ========== - - protected abstract void startBranches(); - - protected abstract R aggregateResults(); - - // ========== branch creation ========== - - protected ChildContextOperation branchInternal( - String branchName, - OperationSubType branchSubType, - TypeToken typeToken, - SerDes serDes, - Function function) { - var branchOpId = operationIdGenerator.nextOperationId(); - var branch = new ChildContextOperation<>( - OperationIdentifier.of(branchOpId, branchName, OperationType.CONTEXT, branchSubType), - function, - typeToken, - serDes, - rootContext); - branches.add(branch); - - // Attach callback BEFORE execution starts (or before future can complete). - // The thenRun runs synchronously inside the synchronized(completionFuture) block - // when completionFuture.complete(null) is called, so it executes on the checkpoint - // processing thread. This callback only does lightweight work: update counters, - // evaluate CompletionConfig, dequeue and start next branch. - branch.completionFuture.thenRun(() -> { - var op = branch.getOperation(); - boolean success = op != null && op.status() == OperationStatus.SUCCEEDED; - onChildContextComplete(branch, success); - }); - - if (!earlyTermination && (maxConcurrency == null || activeBranches.get() < maxConcurrency)) { - activeBranches.incrementAndGet(); - startedBranches.add(branch); - branch.execute(); - } else { - pendingQueue.add(branch); - } - return branch; - } - - // ========== completion callback ========== - - /** - * Called on the checkpoint processing thread when a branch's completionFuture completes. Only does lightweight - * work: update counters, evaluate CompletionConfig, dequeue and start next branch. Does NOT call - * finalizeOperation() or checkpointResult() — those happen in get() on the context thread. - */ - protected void onChildContextComplete(ChildContextOperation branch, boolean success) { - if (success) { - succeeded.incrementAndGet(); - } else { - failed.incrementAndGet(); - } - - // Evaluate completion criteria - if (!earlyTermination && shouldTerminateEarly()) { - earlyTermination = true; - completionReason = evaluateCompletionReason(); - logger.trace("Early termination triggered for operation {}: reason={}", getOperationId(), completionReason); - } - - // Start next queued branch with correct thread ordering: - // register new branch thread BEFORE deregistering completed branch thread - if (!earlyTermination) { - var next = pendingQueue.poll(); - if (next != null) { - // activeBranches stays the same (one completing, one starting) - startedBranches.add(next); - next.execute(); // registers new thread internally via ChildContextOperation.start() - } else { - activeBranches.decrementAndGet(); - } - } else { - activeBranches.decrementAndGet(); - } - // completed branch's thread is deregistered by ChildContextOperation's close() in BaseContext - } - - // ========== completion evaluation ========== - - private boolean shouldTerminateEarly() { - // Check minSuccessful - if (completionConfig.minSuccessful() != null && succeeded.get() >= completionConfig.minSuccessful()) { - return true; - } - - // Check toleratedFailureCount - if (completionConfig.toleratedFailureCount() != null - && failed.get() > completionConfig.toleratedFailureCount()) { - return true; - } - - // Check toleratedFailurePercentage - int totalCompleted = succeeded.get() + failed.get(); - if (completionConfig.toleratedFailurePercentage() != null - && totalCompleted > 0 - && ((double) failed.get() / totalCompleted) > completionConfig.toleratedFailurePercentage()) { - return true; - } - - return false; - } - - protected CompletionReason evaluateCompletionReason() { - if (completionConfig.minSuccessful() != null && succeeded.get() >= completionConfig.minSuccessful()) { - return CompletionReason.MIN_SUCCESSFUL_REACHED; - } - return CompletionReason.FAILURE_TOLERANCE_EXCEEDED; - } - - private void finalizeOperation() { - if (completionReason == null) { - completionReason = CompletionReason.ALL_COMPLETED; - } - - R result = aggregateResults(); - checkpointResult(result); - } - - /** - * Checkpoints the parent concurrent operation as SUCCEEDED. Uses synchronous {@code sendOperationUpdate} because - * this is called from the context thread in {@code get()}, where it is safe to block. - * - *

Small results (<256KB) are checkpointed directly as payload. Large results are checkpointed with - * {@code replayChildren=true} and an empty payload, so on replay the result is reconstructed from child contexts. - */ - protected void checkpointResult(R result) { - var serialized = serializeResult(result); - var serializedBytes = serialized.getBytes(StandardCharsets.UTF_8); - - if (serializedBytes.length < LARGE_RESULT_THRESHOLD) { - sendOperationUpdate(OperationUpdate.builder() - .action(OperationAction.SUCCEED) - .subType(subType.getValue()) - .payload(serialized)); - } else { - // Large result: checkpoint with empty payload + replayChildren flag - sendOperationUpdate(OperationUpdate.builder() - .action(OperationAction.SUCCEED) - .subType(subType.getValue()) - .payload("") - .contextOptions( - ContextOptions.builder().replayChildren(true).build())); - } - } - - // ========== get ========== - - @Override - public R get() { - var op = waitForOperationCompletion(); - - if (op.status() == OperationStatus.SUCCEEDED) { - if (op.contextDetails() != null - && Boolean.TRUE.equals(op.contextDetails().replayChildren())) { - // Large result was reconstructed via replay — aggregate from branches - return aggregateResults(); - } - var contextDetails = op.contextDetails(); - var result = (contextDetails != null) ? contextDetails.result() : null; - return deserializeResult(result); - } else if (op.status() == OperationStatus.FAILED) { - var contextDetails = op.contextDetails(); - var errorObject = (contextDetails != null) ? contextDetails.error() : null; - var original = deserializeException(errorObject); - if (original != null) { - throw new RuntimeException(original); - } - throw new RuntimeException("Concurrent operation failed: " + getOperationId()); - } else { - return terminateExecutionWithIllegalDurableOperationException( - "Unexpected operation status after completion: " + op.status()); - } - } - - // ========== protected accessors for subclasses ========== - - protected List> getBranches() { - return Collections.unmodifiableList(branches); - } - - protected CompletionReason getCompletionReason() { - return completionReason; - } - - protected AtomicInteger getSucceeded() { - return succeeded; - } - - protected AtomicInteger getFailed() { - return failed; - } - - protected boolean isEarlyTermination() { - return earlyTermination; - } - - protected DurableContext getRootContext() { - return rootContext; - } - - /** Returns the pending queue of branches that have not yet been started. */ - protected Queue> getPendingQueue() { - return pendingQueue; - } - - /** Returns the set of branches that have been started (had execute() called). */ - protected Set> getStartedBranches() { - return startedBranches; - } -} diff --git a/sdk/src/main/java/software/amazon/lambda/durable/operation/ConcurrencyOperation.java b/sdk/src/main/java/software/amazon/lambda/durable/operation/ConcurrencyOperation.java index f9b1b7205..dc5cbb2fb 100644 --- a/sdk/src/main/java/software/amazon/lambda/durable/operation/ConcurrencyOperation.java +++ b/sdk/src/main/java/software/amazon/lambda/durable/operation/ConcurrencyOperation.java @@ -26,7 +26,7 @@ * Abstract base class for concurrent execution of multiple child context operations. * *

Encapsulates shared concurrency logic: queue-based concurrency control, success/failure counting, and completion - * checking. Both {@code ParallelOperation} and a future {@code MapOperation} extend this base. + * checking. Both {@code ParallelOperation} and {@code MapOperation} extend this base. * *

Key design points: * @@ -55,7 +55,7 @@ public abstract class ConcurrencyOperation extends BaseDurableOperation { private final Queue> pendingQueue = new ConcurrentLinkedDeque<>(); private final List> childOperations = Collections.synchronizedList(new ArrayList<>()); private final Set completedOperations = Collections.synchronizedSet(new HashSet()); - private ConcurrencyCompletionStatus completionStatus; + protected ConcurrencyCompletionStatus completionStatus; private OperationIdGenerator operationIdGenerator; private final DurableContextImpl rootContext; @@ -152,6 +152,40 @@ public ChildContextOperation addItem( return childOp; } + /** + * Creates and enqueues an item without starting execution. Use {@link #startPendingItems()} to begin execution + * after all items have been enqueued. This prevents early termination from blocking item creation when all items + * are known upfront (e.g., map operations). + */ + protected ChildContextOperation enqueueItem( + String name, Function function, TypeToken resultType, SerDes serDes) { + var operationId = this.operationIdGenerator.nextOperationId(); + var childOp = createItem(operationId, name, function, resultType, serDes, this.rootContext); + childOperations.add(childOp); + pendingQueue.add(childOp); + logger.debug("Item enqueued {}", name); + return childOp; + } + + /** + * Starts executing enqueued items up to maxConcurrency. Called after all items have been enqueued via + * {@link #enqueueItem}. + */ + protected void startPendingItems() { + // Start as many items as concurrency allows + while (true) { + synchronized (this) { + if (isOperationCompleted()) return; + if (maxConcurrency != -1 && runningCount.get() >= maxConcurrency) return; + var next = pendingQueue.poll(); + if (next == null) return; + runningCount.incrementAndGet(); + logger.debug("Executing operation {}", next.getName()); + next.execute(); + } + } + } + /** * Starts the next queued item if the running count is below maxConcurrency and the operation hasn't completed yet. * Must be called within {@code synchronized (pendingQueue)}. @@ -248,14 +282,15 @@ protected boolean canComplete() { } private void handleComplete() { - // We do not complete the futrure here, the furture is completed via checkpoint - if (isOperationCompleted()) { - return; - } - if (completionStatus.isSucceeded()) { - handleSuccess(); - } else { - handleFailure(completionStatus); + synchronized (this) { + if (isOperationCompleted()) { + return; + } + if (completionStatus.isSucceeded()) { + handleSuccess(); + } else { + handleFailure(completionStatus); + } } } @@ -292,4 +327,9 @@ protected int getTotalItems() { protected List> getChildOperations() { return Collections.unmodifiableList(childOperations); } + + /** Returns true if all items have finished (no pending, no running). Used by subclasses to override canComplete. */ + protected boolean isAllItemsFinished() { + return isJoined.get() && pendingQueue.isEmpty() && runningCount.get() == 0; + } } diff --git a/sdk/src/main/java/software/amazon/lambda/durable/operation/MapOperation.java b/sdk/src/main/java/software/amazon/lambda/durable/operation/MapOperation.java index e0f5e4760..170f15351 100644 --- a/sdk/src/main/java/software/amazon/lambda/durable/operation/MapOperation.java +++ b/sdk/src/main/java/software/amazon/lambda/durable/operation/MapOperation.java @@ -5,143 +5,285 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; -import software.amazon.awssdk.services.lambda.model.OperationStatus; +import java.util.function.Function; +import software.amazon.awssdk.services.lambda.model.ContextOptions; +import software.amazon.awssdk.services.lambda.model.Operation; +import software.amazon.awssdk.services.lambda.model.OperationAction; +import software.amazon.awssdk.services.lambda.model.OperationType; +import software.amazon.awssdk.services.lambda.model.OperationUpdate; +import software.amazon.lambda.durable.CompletionConfig; +import software.amazon.lambda.durable.DurableContext; import software.amazon.lambda.durable.MapConfig; import software.amazon.lambda.durable.MapFunction; import software.amazon.lambda.durable.TypeToken; import software.amazon.lambda.durable.context.DurableContextImpl; import software.amazon.lambda.durable.model.CompletionReason; +import software.amazon.lambda.durable.model.ConcurrencyCompletionStatus; +import software.amazon.lambda.durable.model.MapError; import software.amazon.lambda.durable.model.MapResult; import software.amazon.lambda.durable.model.MapResultItem; +import software.amazon.lambda.durable.model.OperationIdentifier; import software.amazon.lambda.durable.model.OperationSubType; import software.amazon.lambda.durable.serde.SerDes; +import software.amazon.lambda.durable.util.ExceptionHelper; /** * Executes a map operation: applies a function to each item in a collection concurrently, with each item running in its * own child context. * + *

Extends {@link ConcurrencyOperation} following the same pattern as {@link ParallelOperation}. All branches are + * created upfront in {@code start()}/{@code replay()}, and results are aggregated into a {@link MapResult} in + * {@code get()}. + * * @param the input item type * @param the output result type per item */ -public class MapOperation extends BaseConcurrentOperation> { +public class MapOperation extends ConcurrencyOperation> { + + private static final int LARGE_RESULT_THRESHOLD = 256 * 1024; private final List items; private final MapFunction function; private final TypeToken itemResultType; private final SerDes serDes; + private final CompletionConfig completionConfig; + private boolean replayFromPayload; public MapOperation( - String operationId, - String name, + OperationIdentifier operationIdentifier, List items, MapFunction function, TypeToken itemResultType, MapConfig config, DurableContextImpl durableContext) { super( - operationId, - name, - OperationSubType.MAP, - config.maxConcurrency(), - config.completionConfig(), + operationIdentifier, new TypeToken<>() {}, config.serDes(), - durableContext); + durableContext, + config.maxConcurrency() != null ? config.maxConcurrency() : -1, + config.completionConfig().minSuccessful() != null + ? config.completionConfig().minSuccessful() + : -1, + config.completionConfig().toleratedFailureCount() != null + ? config.completionConfig().toleratedFailureCount() + : Integer.MAX_VALUE, + config.completionConfig().toleratedFailurePercentage() != null + ? config.completionConfig().toleratedFailurePercentage() + : 100); this.items = List.copyOf(items); this.function = function; this.itemResultType = itemResultType; this.serDes = config.serDes(); + this.completionConfig = config.completionConfig(); + } + + @Override + protected ChildContextOperation createItem( + String operationId, + String name, + Function function, + TypeToken resultType, + SerDes serDes, + DurableContextImpl parentContext) { + return new ChildContextOperation<>( + OperationIdentifier.of(operationId, name, OperationType.CONTEXT, OperationSubType.MAP_ITERATION), + function, + resultType, + serDes, + parentContext, + this); } @Override - protected void startBranches() { + protected void start() { + sendOperationUpdateAsync(OperationUpdate.builder() + .action(OperationAction.START) + .subType(getSubType().getValue())); + addAllItems(); + } + + @Override + protected void replay(Operation existing) { + switch (existing.status()) { + case SUCCEEDED -> { + if (existing.contextDetails() != null + && Boolean.TRUE.equals(existing.contextDetails().replayChildren())) { + // Large result: re-execute children to reconstruct MapResult + addAllItems(); + } else { + // Small result: MapResult is in the payload, skip child replay + replayFromPayload = true; + markAlreadyCompleted(); + } + } + case STARTED -> { + // Map was in progress when interrupted — re-create children without sending + // another START (the backend rejects duplicate START for existing operations) + addAllItems(); + } + default -> + terminateExecutionWithIllegalDurableOperationException( + "Unexpected map operation status: " + existing.status()); + } + } + + private void addAllItems() { + // Enqueue all items first, then start execution. This prevents early termination + // criteria (e.g., minSuccessful) from completing the operation mid-loop on replay, + // which would cause subsequent enqueue calls to fail with "completed operation". for (int i = 0; i < items.size(); i++) { var index = i; var item = items.get(i); - branchInternal("map-iteration-" + i, OperationSubType.MAP_ITERATION, itemResultType, serDes, childCtx -> { - try { - return function.apply(item, index, childCtx); - } catch (RuntimeException e) { - throw e; - } catch (Exception e) { - throw new RuntimeException(e); - } - }); + enqueueItem( + "map-iteration-" + i, childCtx -> function.apply(item, index, childCtx), itemResultType, serDes); + } + startPendingItems(); + } + + @Override + protected void handleSuccess() { + checkpointMapResult(); + } + + @Override + protected void handleFailure(ConcurrencyCompletionStatus concurrencyCompletionStatus) { + checkpointMapResult(); + } + + @Override + protected void validateItemCount() { + if (completionConfig.minSuccessful() != null && completionConfig.minSuccessful() > getTotalItems()) { + throw new IllegalArgumentException("minSuccessful (" + completionConfig.minSuccessful() + + ") exceeds the number of items (" + getTotalItems() + ")"); } } /** - * Waits for all branches to complete and aggregates results, then checkpoints the parent MAP operation. - * - *

Handles three cases: - * - *

    - *
  • Replay with small result (parent SUCCEEDED, no replayChildren): deserialize cached MapResult directly - *
  • Replay with large result (parent SUCCEEDED + replayChildren): aggregate from child replays, no - * re-checkpoint needed - *
  • First execution or STARTED replay: aggregate from branches, then checkpoint parent result - *
+ * Overrides the default completion logic from {@link ConcurrencyOperation} to support Map's + * {@link CompletionConfig} semantics. Unlike Parallel (where {@code minSuccessful == -1} means "all must succeed"), + * Map's default {@code allCompleted()} allows failures without early termination. */ @Override - public MapResult get() { - // Check if parent operation already completed (replay with small result) - if (isOperationCompleted()) { - var op = getOperation(); - if (op != null && op.status() == OperationStatus.SUCCEEDED) { - if (op.contextDetails() != null - && Boolean.TRUE.equals(op.contextDetails().replayChildren())) { - // Large result on replay: aggregate from child replays - return aggregateResults(); - } - // Small result on replay: deserialize cached MapResult - var result = (op.contextDetails() != null) ? op.contextDetails().result() : null; - return deserializeResult(result); - } + protected boolean canComplete() { + int succeeded = getSucceededCount(); + int failed = getFailedCount(); + int totalCompleted = succeeded + failed; + + // Check minSuccessful + if (completionConfig.minSuccessful() != null && succeeded >= completionConfig.minSuccessful()) { + completionStatus = ConcurrencyCompletionStatus.MIN_SUCCESSFUL_REACHED; + return true; } - // First execution, STARTED replay, or SUCCEEDED+replayChildren replay: aggregate from branches - var mapResult = aggregateResults(); + // Check toleratedFailureCount + if (completionConfig.toleratedFailureCount() != null && failed > completionConfig.toleratedFailureCount()) { + completionStatus = ConcurrencyCompletionStatus.FAILURE_TOLERANCE_EXCEEDED; + return true; + } + + // Check toleratedFailurePercentage + if (completionConfig.toleratedFailurePercentage() != null + && totalCompleted > 0 + && ((double) failed / totalCompleted) > completionConfig.toleratedFailurePercentage()) { + completionStatus = ConcurrencyCompletionStatus.FAILURE_TOLERANCE_EXCEEDED; + return true; + } - // Check if parent is already SUCCEEDED (replayChildren case) — skip re-checkpointing - var existingOp = getOperation(); - if (existingOp == null || existingOp.status() != OperationStatus.SUCCEEDED) { - // First execution or STARTED: checkpoint parent result from context thread (safe to .join() here) - checkpointResult(mapResult); + // All items finished (no pending, no running) — complete with ALL_COMPLETED + if (isAllItemsFinished()) { + completionStatus = ConcurrencyCompletionStatus.ALL_COMPLETED; + return true; } - return mapResult; + return false; + } + + private void checkpointMapResult() { + var result = aggregateResults(); + var serialized = serializeResult(result); + var serializedBytes = serialized.getBytes(java.nio.charset.StandardCharsets.UTF_8); + + if (serializedBytes.length < LARGE_RESULT_THRESHOLD) { + sendOperationUpdate(OperationUpdate.builder() + .action(OperationAction.SUCCEED) + .subType(getSubType().getValue()) + .payload(serialized)); + } else { + // Large result: checkpoint with empty payload + replayChildren flag + sendOperationUpdate(OperationUpdate.builder() + .action(OperationAction.SUCCEED) + .subType(getSubType().getValue()) + .payload("") + .contextOptions( + ContextOptions.builder().replayChildren(true).build())); + } } @Override + public MapResult get() { + if (replayFromPayload) { + // Small result replay: deserialize MapResult directly from checkpoint payload + var op = waitForOperationCompletion(); + var result = (op.contextDetails() != null) ? op.contextDetails().result() : null; + return deserializeResult(result); + } + // First execution or large result replay: wait for children, then aggregate + join(); + return aggregateResults(); + } + + /** + * Aggregates results from completed branches into a {@code MapResult}. + * + *

Called after all branches have completed. At this point every branch's {@code completionFuture} is already + * done, so {@code branch.get()} returns immediately without blocking. + */ @SuppressWarnings("unchecked") - protected MapResult aggregateResults() { - var branches = getBranches(); - var pendingQueue = getPendingQueue(); + private MapResult aggregateResults() { + var children = getChildOperations(); var resultItems = new ArrayList>(Collections.nCopies(items.size(), null)); - for (int i = 0; i < branches.size(); i++) { - var branch = (ChildContextOperation) branches.get(i); - // Skip branches still in the pending queue (never started due to early termination) - if (pendingQueue.contains(branch)) { + for (int i = 0; i < children.size(); i++) { + var branch = (ChildContextOperation) children.get(i); + if (!branch.isOperationCompleted()) { resultItems.set(i, MapResultItem.notStarted()); continue; } try { resultItems.set(i, MapResultItem.success(branch.get())); } catch (Exception e) { - resultItems.set(i, MapResultItem.failure(e)); + resultItems.set(i, MapResultItem.failure(buildMapError(e))); } } - // Fill any remaining null slots (items beyond branches size) with notStarted - for (int i = branches.size(); i < items.size(); i++) { + // Fill any remaining null slots (items beyond children size) with notStarted + for (int i = children.size(); i < items.size(); i++) { resultItems.set(i, MapResultItem.notStarted()); } - var reason = getCompletionReason(); - if (reason == null) { - reason = !pendingQueue.isEmpty() ? evaluateCompletionReason() : CompletionReason.ALL_COMPLETED; + return new MapResult<>(resultItems, toCompletionReason()); + } + + private CompletionReason toCompletionReason() { + if (completionConfig.minSuccessful() != null && getSucceededCount() >= completionConfig.minSuccessful()) { + return CompletionReason.MIN_SUCCESSFUL_REACHED; } - return new MapResult<>(resultItems, reason); + if (completionConfig.toleratedFailureCount() != null + && getFailedCount() > completionConfig.toleratedFailureCount()) { + return CompletionReason.FAILURE_TOLERANCE_EXCEEDED; + } + if (completionConfig.toleratedFailurePercentage() != null && getFailedCount() > 0) { + int total = getSucceededCount() + getFailedCount(); + if (total > 0 && ((double) getFailedCount() / total) > completionConfig.toleratedFailurePercentage()) { + return CompletionReason.FAILURE_TOLERANCE_EXCEEDED; + } + } + return CompletionReason.ALL_COMPLETED; + } + + private static MapError buildMapError(Exception e) { + return new MapError( + e.getClass().getName(), e.getMessage(), ExceptionHelper.serializeStackTrace(e.getStackTrace())); } } diff --git a/sdk/src/main/java/software/amazon/lambda/durable/operation/WaitOperation.java b/sdk/src/main/java/software/amazon/lambda/durable/operation/WaitOperation.java index febd16333..e9493ba31 100644 --- a/sdk/src/main/java/software/amazon/lambda/durable/operation/WaitOperation.java +++ b/sdk/src/main/java/software/amazon/lambda/durable/operation/WaitOperation.java @@ -71,6 +71,10 @@ private void pollForWaitExpiration() { && existing.waitDetails().scheduledEndTimestamp() != null) { remainingWaitTime = Duration.between(Instant.now(), existing.waitDetails().scheduledEndTimestamp()); + // If the wait has already elapsed, poll immediately with a minimal positive interval + if (remainingWaitTime.isNegative() || remainingWaitTime.isZero()) { + remainingWaitTime = Duration.ofMillis(1); + } } logger.debug("Remaining wait time: {} ms", remainingWaitTime.toMillis()); pollForOperationUpdates(remainingWaitTime); diff --git a/sdk/src/main/java/software/amazon/lambda/durable/validation/ParameterValidator.java b/sdk/src/main/java/software/amazon/lambda/durable/validation/ParameterValidator.java index d1443c805..97222a804 100644 --- a/sdk/src/main/java/software/amazon/lambda/durable/validation/ParameterValidator.java +++ b/sdk/src/main/java/software/amazon/lambda/durable/validation/ParameterValidator.java @@ -7,6 +7,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.IdentityHashMap; +import java.util.LinkedHashSet; import java.util.Set; import java.util.WeakHashMap; import java.util.concurrent.ConcurrentHashMap; @@ -118,8 +119,9 @@ public static void validateOperationName(String name, int maxLength) { /** * Validates that a collection has deterministic iteration order. * - *

Rejects known unordered collection types: {@link HashSet} (and subclasses), and views returned by - * {@link HashMap}, {@link IdentityHashMap}, {@link WeakHashMap}, and {@link ConcurrentHashMap}. + *

Rejects known unordered collection types: {@link HashSet} (but not {@link LinkedHashSet}, which has stable + * insertion-order iteration), and views returned by {@link HashMap}, {@link IdentityHashMap}, {@link WeakHashMap}, + * and {@link ConcurrentHashMap}. * * @param items the collection to validate * @throws IllegalArgumentException if items is null or has non-deterministic iteration order @@ -128,6 +130,10 @@ public static void validateOrderedCollection(Collection items) { if (items == null) { throw new IllegalArgumentException("items cannot be null"); } + // LinkedHashSet extends HashSet but has stable insertion-order iteration — allow it + if (items instanceof LinkedHashSet) { + return; + } if (items instanceof HashSet || isUnorderedMapView(items)) { throw new IllegalArgumentException("items must have deterministic iteration order"); } diff --git a/sdk/src/test/java/software/amazon/lambda/durable/CompletionConfigTest.java b/sdk/src/test/java/software/amazon/lambda/durable/CompletionConfigTest.java index f57ce3edf..710bf9b1f 100644 --- a/sdk/src/test/java/software/amazon/lambda/durable/CompletionConfigTest.java +++ b/sdk/src/test/java/software/amazon/lambda/durable/CompletionConfigTest.java @@ -61,4 +61,48 @@ void toleratedFailurePercentage_setsPercentage() { assertNull(config.toleratedFailureCount()); assertEquals(0.25, config.toleratedFailurePercentage()); } + + @Test + void minSuccessful_withZero_shouldThrow() { + var exception = assertThrows(IllegalArgumentException.class, () -> CompletionConfig.minSuccessful(0)); + assertEquals("minSuccessful must be at least 1, got: 0", exception.getMessage()); + } + + @Test + void minSuccessful_withNegative_shouldThrow() { + var exception = assertThrows(IllegalArgumentException.class, () -> CompletionConfig.minSuccessful(-1)); + assertEquals("minSuccessful must be at least 1, got: -1", exception.getMessage()); + } + + @Test + void toleratedFailureCount_withNegative_shouldThrow() { + var exception = assertThrows(IllegalArgumentException.class, () -> CompletionConfig.toleratedFailureCount(-1)); + assertEquals("toleratedFailureCount must be non-negative, got: -1", exception.getMessage()); + } + + @Test + void toleratedFailurePercentage_withNegative_shouldThrow() { + var exception = + assertThrows(IllegalArgumentException.class, () -> CompletionConfig.toleratedFailurePercentage(-0.1)); + assertEquals("toleratedFailurePercentage must be between 0.0 and 1.0, got: -0.1", exception.getMessage()); + } + + @Test + void toleratedFailurePercentage_aboveOne_shouldThrow() { + var exception = + assertThrows(IllegalArgumentException.class, () -> CompletionConfig.toleratedFailurePercentage(1.5)); + assertEquals("toleratedFailurePercentage must be between 0.0 and 1.0, got: 1.5", exception.getMessage()); + } + + @Test + void toleratedFailurePercentage_atBoundaries_shouldPass() { + assertDoesNotThrow(() -> CompletionConfig.toleratedFailurePercentage(0.0)); + assertDoesNotThrow(() -> CompletionConfig.toleratedFailurePercentage(1.0)); + } + + @Test + void toleratedFailureCount_withZero_shouldPass() { + var config = CompletionConfig.toleratedFailureCount(0); + assertEquals(0, config.toleratedFailureCount()); + } } diff --git a/sdk/src/test/java/software/amazon/lambda/durable/MapConfigTest.java b/sdk/src/test/java/software/amazon/lambda/durable/MapConfigTest.java index c2a507594..11c567d8f 100644 --- a/sdk/src/test/java/software/amazon/lambda/durable/MapConfigTest.java +++ b/sdk/src/test/java/software/amazon/lambda/durable/MapConfigTest.java @@ -101,4 +101,26 @@ void toBuilder_canOverrideValues() { assertEquals(10, modified.maxConcurrency()); assertEquals(4, original.maxConcurrency()); } + + @Test + void builderWithZeroMaxConcurrency_shouldThrow() { + var exception = assertThrows( + IllegalArgumentException.class, + () -> MapConfig.builder().maxConcurrency(0).build()); + assertEquals("maxConcurrency must be at least 1, got: 0", exception.getMessage()); + } + + @Test + void builderWithNegativeMaxConcurrency_shouldThrow() { + var exception = assertThrows( + IllegalArgumentException.class, + () -> MapConfig.builder().maxConcurrency(-1).build()); + assertEquals("maxConcurrency must be at least 1, got: -1", exception.getMessage()); + } + + @Test + void builderWithNullMaxConcurrency_shouldPass() { + var config = MapConfig.builder().maxConcurrency(null).build(); + assertNull(config.maxConcurrency()); + } } diff --git a/sdk/src/test/java/software/amazon/lambda/durable/MapFunctionTest.java b/sdk/src/test/java/software/amazon/lambda/durable/MapFunctionTest.java index ee87d409e..9bc98c1a1 100644 --- a/sdk/src/test/java/software/amazon/lambda/durable/MapFunctionTest.java +++ b/sdk/src/test/java/software/amazon/lambda/durable/MapFunctionTest.java @@ -14,7 +14,7 @@ void isFunctionalInterface() { } @Test - void canBeUsedAsLambda() throws Exception { + void canBeUsedAsLambda() { MapFunction fn = (item, index, ctx) -> item.toUpperCase(); var result = fn.apply("hello", 0, null); @@ -23,23 +23,13 @@ void canBeUsedAsLambda() throws Exception { } @Test - void receivesCorrectIndex() throws Exception { + void receivesCorrectIndex() { MapFunction fn = (item, index, ctx) -> index; assertEquals(0, fn.apply("a", 0, null)); assertEquals(5, fn.apply("b", 5, null)); } - @Test - void canThrowCheckedException() { - MapFunction fn = (item, index, ctx) -> { - throw new Exception("checked"); - }; - - var ex = assertThrows(Exception.class, () -> fn.apply("x", 0, null)); - assertEquals("checked", ex.getMessage()); - } - @Test void canThrowRuntimeException() { MapFunction fn = (item, index, ctx) -> { diff --git a/sdk/src/test/java/software/amazon/lambda/durable/model/MapResultTest.java b/sdk/src/test/java/software/amazon/lambda/durable/model/MapResultTest.java index fc797d3e3..86e7e3476 100644 --- a/sdk/src/test/java/software/amazon/lambda/durable/model/MapResultTest.java +++ b/sdk/src/test/java/software/amazon/lambda/durable/model/MapResultTest.java @@ -9,6 +9,10 @@ class MapResultTest { + private static MapError testError(String message) { + return new MapError("java.lang.RuntimeException", message, null); + } + @Test void empty_returnsZeroSizeResult() { var result = MapResult.empty(); @@ -36,7 +40,7 @@ void allSucceeded_trueWhenNoErrors() { @Test void allSucceeded_falseWhenAnyError() { - var error = new RuntimeException("fail"); + var error = testError("fail"); var result = new MapResult<>( List.of(MapResultItem.success("a"), MapResultItem.failure(error)), CompletionReason.ALL_COMPLETED); @@ -46,7 +50,7 @@ void allSucceeded_falseWhenAnyError() { @Test void getResult_returnsNullForFailedItem() { - var error = new RuntimeException("fail"); + var error = testError("fail"); var result = new MapResult<>( List.of(MapResultItem.success("a"), MapResultItem.failure(error)), CompletionReason.ALL_COMPLETED); @@ -57,7 +61,7 @@ void getResult_returnsNullForFailedItem() { @Test void getError_returnsNullForSucceededItem() { - var error = new RuntimeException("fail"); + var error = testError("fail"); var result = new MapResult<>( List.of(MapResultItem.success("a"), MapResultItem.failure(error)), CompletionReason.ALL_COMPLETED); @@ -71,7 +75,7 @@ void succeeded_filtersNullResults() { var result = new MapResult<>( List.of( MapResultItem.success("a"), - MapResultItem.failure(new RuntimeException()), + MapResultItem.failure(testError("fail")), MapResultItem.success("c")), CompletionReason.ALL_COMPLETED); @@ -80,7 +84,7 @@ void succeeded_filtersNullResults() { @Test void failed_filtersNullErrors() { - var error = new RuntimeException("fail"); + var error = testError("fail"); var result = new MapResult<>( List.of(MapResultItem.success("a"), MapResultItem.failure(error), MapResultItem.success("c")), CompletionReason.ALL_COMPLETED); @@ -107,7 +111,7 @@ void items_returnsUnmodifiableList() { @Test void getItem_returnsMapResultItem() { var result = new MapResult<>( - List.of(MapResultItem.success("a"), MapResultItem.failure(new RuntimeException("fail"))), + List.of(MapResultItem.success("a"), MapResultItem.failure(testError("fail"))), CompletionReason.ALL_COMPLETED); assertEquals(MapResultItem.Status.SUCCEEDED, result.getItem(0).status()); @@ -120,12 +124,12 @@ void getItem_returnsMapResultItem() { } @Test - void notStartedItems_haveNullStatusResultAndError() { + void notStartedItems_haveNotStartedStatusAndNullResultAndError() { var result = new MapResult<>( List.of(MapResultItem.success("a"), MapResultItem.notStarted()), CompletionReason.MIN_SUCCESSFUL_REACHED); - assertNull(result.getItem(1).status()); + assertEquals(MapResultItem.Status.NOT_STARTED, result.getItem(1).status()); assertNull(result.getResult(1)); assertNull(result.getError(1)); } diff --git a/sdk/src/test/java/software/amazon/lambda/durable/operation/BaseDurableOperationTest.java b/sdk/src/test/java/software/amazon/lambda/durable/operation/BaseDurableOperationTest.java index 8df56e796..7ae7ac718 100644 --- a/sdk/src/test/java/software/amazon/lambda/durable/operation/BaseDurableOperationTest.java +++ b/sdk/src/test/java/software/amazon/lambda/durable/operation/BaseDurableOperationTest.java @@ -63,6 +63,9 @@ void setUp() { when(durableContext.getExecutionManager()).thenReturn(executionManager); when(executionManager.getCurrentThreadContext()).thenReturn(new ThreadContext(CONTEXT_ID, ThreadType.CONTEXT)); when(executionManager.getOperationAndUpdateReplayState(OPERATION_ID)).thenReturn(OPERATION); + // Stub runUntilCompleteOrSuspend to pass through the user future — in unit tests there's + // no executionExceptionFuture to race against, so just wait on the completionFuture directly. + when(executionManager.runUntilCompleteOrSuspend(any())).thenAnswer(invocation -> invocation.getArgument(0)); } @Test diff --git a/sdk/src/test/java/software/amazon/lambda/durable/validation/ParameterValidatorTest.java b/sdk/src/test/java/software/amazon/lambda/durable/validation/ParameterValidatorTest.java index 0ee0ee0c0..ad281fe9a 100644 --- a/sdk/src/test/java/software/amazon/lambda/durable/validation/ParameterValidatorTest.java +++ b/sdk/src/test/java/software/amazon/lambda/durable/validation/ParameterValidatorTest.java @@ -331,11 +331,9 @@ void validateOrderedCollection_withOrderedSet_shouldPass() { } @Test - void validateOrderedCollection_withLinkedHashSet_shouldThrow() { - // LinkedHashSet extends HashSet, so it's rejected even though it has deterministic order - assertThrows( - IllegalArgumentException.class, - () -> ParameterValidator.validateOrderedCollection(new LinkedHashSet<>(List.of("a", "b")))); + void validateOrderedCollection_withLinkedHashSet_shouldPass() { + // LinkedHashSet extends HashSet but has stable insertion-order iteration — allowed + assertDoesNotThrow(() -> ParameterValidator.validateOrderedCollection(new LinkedHashSet<>(List.of("a", "b")))); } @Test