diff --git a/examples/src/main/java/com/amazonaws/lambda/durable/examples/NoopExample.java b/examples/src/main/java/com/amazonaws/lambda/durable/examples/NoopExample.java new file mode 100644 index 000000000..ba7449c7d --- /dev/null +++ b/examples/src/main/java/com/amazonaws/lambda/durable/examples/NoopExample.java @@ -0,0 +1,19 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 +package com.amazonaws.lambda.durable.examples; + +import com.amazonaws.lambda.durable.DurableContext; +import com.amazonaws.lambda.durable.DurableHandler; + +/** + * Simple example demonstrating a durable function doesn't have any durable operation + * + *

This handler processes a greeting request and returns a greeting message + */ +public class NoopExample extends DurableHandler { + + @Override + public String handleRequest(GreetingRequest input, DurableContext context) { + return "HELLO, " + input.getName() + "!"; + } +} diff --git a/examples/src/test/java/com/amazonaws/lambda/durable/examples/CloudBasedIntegrationTest.java b/examples/src/test/java/com/amazonaws/lambda/durable/examples/CloudBasedIntegrationTest.java index d7d66ca25..6c4bce392 100644 --- a/examples/src/test/java/com/amazonaws/lambda/durable/examples/CloudBasedIntegrationTest.java +++ b/examples/src/test/java/com/amazonaws/lambda/durable/examples/CloudBasedIntegrationTest.java @@ -71,6 +71,17 @@ void testSimpleStepExample() { assertEquals("create-greeting", createGreetingOp.getName()); } + @Test + void testNoopExampleWithLargeInput() { + var runner = CloudDurableTestRunner.create(arn("noop-example"), Map.class, String.class); + // 6MB large input + var largeInput = "A".repeat(1024 * 1024 * 6 - 12); + var result = runner.run(Map.of("name", largeInput)); + + assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus()); + assertEquals("HELLO, " + largeInput + "!", result.getResult(String.class)); + } + @Test void testSimpleInvokeExample() { var runner = CloudDurableTestRunner.create(arn("simple-invoke-example"), Map.class, String.class); diff --git a/examples/src/test/java/com/amazonaws/lambda/durable/examples/SimpleStepExampleTest.java b/examples/src/test/java/com/amazonaws/lambda/durable/examples/SimpleStepExampleTest.java index 7ec2a4257..0678ef1c9 100644 --- a/examples/src/test/java/com/amazonaws/lambda/durable/examples/SimpleStepExampleTest.java +++ b/examples/src/test/java/com/amazonaws/lambda/durable/examples/SimpleStepExampleTest.java @@ -27,6 +27,25 @@ void testSimpleStepExample() { assertEquals("HELLO, ALICE!", result.getResult(String.class)); } + @Test + void testWithLargePayload() { + // Create handler + var handler = new SimpleStepExample(); + + // Create test runner + var runner = LocalDurableTestRunner.create(GreetingRequest.class, handler); + // 6MB large input + var largeInput = "A".repeat(1024).repeat(1024).repeat(6); + + // Run with input + var input = new GreetingRequest(largeInput); + var result = runner.run(input); + + // Verify result + assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus()); + assertEquals("HELLO, " + largeInput + "!", result.getResult(String.class)); + } + @Test void testWithDefaultName() { var handler = new SimpleStepExample(); diff --git a/examples/template.yaml b/examples/template.yaml index a96e84a45..bf863a91d 100644 --- a/examples/template.yaml +++ b/examples/template.yaml @@ -27,6 +27,24 @@ Globals: - !Ref Architecture Resources: + NoopExampleFunction: + Type: AWS::Serverless::Function + Properties: + PackageType: Image + FunctionName: !Join + - '' + - - noop-example + - !Ref FunctionNameSuffix + ImageConfig: + Command: [ "com.amazonaws.lambda.durable.examples.NoopExample::handleRequest" ] + DurableConfig: + ExecutionTimeout: 300 + RetentionPeriodInDays: 7 + Metadata: + Dockerfile: !Ref DockerFile + DockerContext: ../ + DockerTag: durable-examples + SimpleStepExampleFunction: Type: AWS::Serverless::Function Properties: @@ -351,6 +369,14 @@ Resources: DockerTag: durable-examples Outputs: + NoopExampleFunction: + Description: Noop Example Function ARN + Value: !GetAtt NoopExampleFunction.Arn + + NoopExampleFunctionName: + Description: Noop Example Function Name + Value: !Ref NoopExampleFunction + SimpleStepExampleFunction: Description: Simple Step Example Function ARN Value: !GetAtt SimpleStepExampleFunction.Arn diff --git a/sdk-testing/src/main/java/com/amazonaws/lambda/durable/testing/TestResult.java b/sdk-testing/src/main/java/com/amazonaws/lambda/durable/testing/TestResult.java index f0ce51b26..5b9ddee07 100644 --- a/sdk-testing/src/main/java/com/amazonaws/lambda/durable/testing/TestResult.java +++ b/sdk-testing/src/main/java/com/amazonaws/lambda/durable/testing/TestResult.java @@ -12,6 +12,7 @@ import java.util.stream.Collectors; import software.amazon.awssdk.services.lambda.model.ErrorObject; import software.amazon.awssdk.services.lambda.model.Event; +import software.amazon.awssdk.services.lambda.model.EventType; import software.amazon.awssdk.services.lambda.model.OperationStatus; public class TestResult { @@ -50,7 +51,12 @@ public T getResult(Class resultType) { if (status != ExecutionStatus.SUCCEEDED) { throw new IllegalStateException("Execution did not succeed: " + status); } - if (resultPayload == null) { + if (resultPayload == null || resultPayload.isEmpty()) { + var lastEvent = allEvents.get(allEvents.size() - 1); + if (lastEvent.eventType() == EventType.EXECUTION_SUCCEEDED) { + return serDes.deserialize( + lastEvent.executionSucceededDetails().result().payload(), TypeToken.get(resultType)); + } return null; } return serDes.deserialize(resultPayload, TypeToken.get(resultType)); diff --git a/sdk/src/main/java/com/amazonaws/lambda/durable/DurableExecutor.java b/sdk/src/main/java/com/amazonaws/lambda/durable/DurableExecutor.java index bb04a3db0..f0f0d8c69 100644 --- a/sdk/src/main/java/com/amazonaws/lambda/durable/DurableExecutor.java +++ b/sdk/src/main/java/com/amazonaws/lambda/durable/DurableExecutor.java @@ -36,30 +36,9 @@ public static DurableExecutionOutput execute( Class inputType, BiFunction handler, DurableConfig config) { - logger.debug("DurableExecution.execute() called"); - logger.debug("DurableExecutionArn: {}", input.durableExecutionArn()); - logger.debug( - "Initial operations count: {}", - input.initialExecutionState() != null - && input.initialExecutionState().operations() != null - ? input.initialExecutionState().operations().size() - : 0); - - // Validate initial operation is an EXECUTION operation - if (input.initialExecutionState() == null - || input.initialExecutionState().operations() == null - || input.initialExecutionState().operations().isEmpty() - || input.initialExecutionState().operations().get(0).type() != OperationType.EXECUTION) { - throw new IllegalStateException("First operation must be EXECUTION"); - } - var executionManager = new ExecutionManager( input.durableExecutionArn(), input.checkpointToken(), input.initialExecutionState(), config); - logger.debug( - "EXECUTION operation found: {}", - executionManager.getExecutionOperation().id()); - var handlerFuture = CompletableFuture.supplyAsync( () -> { var userInput = diff --git a/sdk/src/main/java/com/amazonaws/lambda/durable/execution/ExecutionManager.java b/sdk/src/main/java/com/amazonaws/lambda/durable/execution/ExecutionManager.java index 03a845bff..726867c7f 100644 --- a/sdk/src/main/java/com/amazonaws/lambda/durable/execution/ExecutionManager.java +++ b/sdk/src/main/java/com/amazonaws/lambda/durable/execution/ExecutionManager.java @@ -18,6 +18,7 @@ import software.amazon.awssdk.services.lambda.model.CheckpointUpdatedExecutionState; import software.amazon.awssdk.services.lambda.model.Operation; import software.amazon.awssdk.services.lambda.model.OperationStatus; +import software.amazon.awssdk.services.lambda.model.OperationType; import software.amazon.awssdk.services.lambda.model.OperationUpdate; /** @@ -44,7 +45,7 @@ public class ExecutionManager { // ===== Execution State ===== private final Map operationStorage; - private final String executionOperationId; + private final Operation executionOp; private final String durableExecutionArn; private final AtomicReference executionMode; @@ -64,7 +65,6 @@ public ExecutionManager( CheckpointUpdatedExecutionState initialExecutionState, DurableConfig config) { this.durableExecutionArn = durableExecutionArn; - this.executionOperationId = initialExecutionState.operations().get(0).id(); // Create checkpoint batcher for internal coordination this.checkpointBatcher = @@ -76,6 +76,17 @@ public ExecutionManager( // Start in REPLAY mode if we have more than just the initial EXECUTION operation this.executionMode = new AtomicReference<>(operationStorage.size() > 1 ? ExecutionMode.REPLAY : ExecutionMode.EXECUTION); + + executionOp = findExecutionOp(initialExecutionState); + + // Validate initial operation is an EXECUTION operation + if (executionOp == null) { + throw new IllegalStateException("First operation must be EXECUTION"); + } + logger.debug("DurableExecution.execute() called"); + logger.debug("DurableExecutionArn: {}", durableExecutionArn); + logger.debug("Initial operations count: {}", operationStorage.size()); + logger.debug("EXECUTION operation found: {}", executionOp.id()); } // ===== State Management ===== @@ -126,7 +137,27 @@ public Operation getOperationAndUpdateReplayState(String operationId) { } public Operation getExecutionOperation() { - return operationStorage.get(executionOperationId); + return executionOp; + } + + private Operation findExecutionOp(CheckpointUpdatedExecutionState initialExecutionState) { + // find execution OP in the input + if (initialExecutionState != null + && initialExecutionState.operations() != null + && !initialExecutionState.operations().isEmpty()) { + var op = initialExecutionState.operations().get(0); + if (op.type() != OperationType.EXECUTION) { + throw new IllegalStateException("First operation must be EXECUTION"); + } + return op; + } + // find execution OP in the checkpoint result + for (Operation op : operationStorage.values()) { + if (op.type() == OperationType.EXECUTION) { + return op; + } + } + return null; } // ===== Thread Coordination ===== diff --git a/sdk/src/test/java/com/amazonaws/lambda/durable/DurableContextTest.java b/sdk/src/test/java/com/amazonaws/lambda/durable/DurableContextTest.java index dfd71115a..8cef35b59 100644 --- a/sdk/src/test/java/com/amazonaws/lambda/durable/DurableContextTest.java +++ b/sdk/src/test/java/com/amazonaws/lambda/durable/DurableContextTest.java @@ -8,26 +8,28 @@ import com.amazonaws.lambda.durable.execution.SuspendExecutionException; import com.amazonaws.lambda.durable.retry.RetryStrategies; import java.time.Duration; +import java.util.ArrayList; import java.util.List; import org.junit.jupiter.api.Test; import software.amazon.awssdk.services.lambda.model.*; class DurableContextTest { + private static final Operation EXECUTION_OP = Operation.builder() + .id("0") + .type(OperationType.EXECUTION) + .status(OperationStatus.STARTED) + .build(); private DurableContext createTestContext() { - var executionOp = Operation.builder() - .id("0") - .type(OperationType.EXECUTION) - .status(OperationStatus.STARTED) - .build(); - return createTestContext(List.of(executionOp)); + return createTestContext(List.of()); } private DurableContext createTestContext(List initialOperations) { var client = TestUtils.createMockClient(); - var initialExecutionState = CheckpointUpdatedExecutionState.builder() - .operations(initialOperations) - .build(); + var operations = new ArrayList<>(List.of(EXECUTION_OP)); + operations.addAll(initialOperations); + var initialExecutionState = + CheckpointUpdatedExecutionState.builder().operations(operations).build(); var executionManager = new ExecutionManager( "arn:aws:lambda:us-east-1:123456789012:function:test:$LATEST/durable-execution/" + "349beff4-a89d-4bc8-a56f-af7a8af67a5f/20dae574-53da-37a1-bfd5-b0e2e6ec715d", diff --git a/sdk/src/test/java/com/amazonaws/lambda/durable/execution/ExecutionManagerTest.java b/sdk/src/test/java/com/amazonaws/lambda/durable/execution/ExecutionManagerTest.java index 61e312cc8..8c1ca9169 100644 --- a/sdk/src/test/java/com/amazonaws/lambda/durable/execution/ExecutionManagerTest.java +++ b/sdk/src/test/java/com/amazonaws/lambda/durable/execution/ExecutionManagerTest.java @@ -3,20 +3,26 @@ package com.amazonaws.lambda.durable.execution; import static org.junit.jupiter.api.Assertions.*; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; import com.amazonaws.lambda.durable.DurableConfig; import com.amazonaws.lambda.durable.TestUtils; +import com.amazonaws.lambda.durable.client.DurableExecutionClient; import java.util.List; import org.junit.jupiter.api.Test; import software.amazon.awssdk.services.lambda.model.CheckpointUpdatedExecutionState; +import software.amazon.awssdk.services.lambda.model.GetDurableExecutionStateResponse; import software.amazon.awssdk.services.lambda.model.Operation; import software.amazon.awssdk.services.lambda.model.OperationStatus; import software.amazon.awssdk.services.lambda.model.OperationType; class ExecutionManagerTest { + private DurableExecutionClient client; private ExecutionManager createManager(List operations) { - var client = TestUtils.createMockClient(); + client = TestUtils.createMockClient(); var initialState = CheckpointUpdatedExecutionState.builder().operations(operations).build(); return new ExecutionManager( @@ -109,4 +115,26 @@ void staysInReplayModeForFailedOperation() { assertNotNull(op); assertTrue(manager.isReplaying()); } + + @Test + void emptyInitialState() { + client = mock(DurableExecutionClient.class); + when(client.getExecutionState(any(), any(), any())) + .thenReturn(GetDurableExecutionStateResponse.builder() + .operations(List.of(executionOp())) + .nextMarker(null) + .build()); + var initialState = CheckpointUpdatedExecutionState.builder() + .operations(List.of()) + .nextMarker("marker") + .build(); + var executionManager = new ExecutionManager( + "arn:aws:lambda:us-east-1:123456789012:function:test", + "test-token", + initialState, + DurableConfig.builder().withDurableExecutionClient(client).build()); + + assertNotNull(executionManager.getExecutionOperation()); + assertEquals("0", executionManager.getExecutionOperation().id()); + } } diff --git a/sdk/src/test/java/com/amazonaws/lambda/durable/operation/CallbackOperationTest.java b/sdk/src/test/java/com/amazonaws/lambda/durable/operation/CallbackOperationTest.java index 4903105f7..cee882271 100644 --- a/sdk/src/test/java/com/amazonaws/lambda/durable/operation/CallbackOperationTest.java +++ b/sdk/src/test/java/com/amazonaws/lambda/durable/operation/CallbackOperationTest.java @@ -17,6 +17,7 @@ import com.amazonaws.lambda.durable.serde.JacksonSerDes; import com.amazonaws.lambda.durable.serde.SerDes; import java.time.Duration; +import java.util.ArrayList; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; import org.junit.jupiter.api.Test; @@ -60,9 +61,15 @@ public T deserialize(String data, TypeToken typeToken) { private ExecutionManager createExecutionManager(List initialOperations) { var client = TestUtils.createMockClient(); - var initialState = CheckpointUpdatedExecutionState.builder() - .operations(initialOperations) - .build(); + var operations = new ArrayList(); + operations.add(Operation.builder() + .id("0") + .type(OperationType.EXECUTION) + .status(OperationStatus.STARTED) + .build()); + operations.addAll(initialOperations); + var initialState = + CheckpointUpdatedExecutionState.builder().operations(operations).build(); var executionManager = new ExecutionManager( "arn:aws:lambda:us-east-1:123456789012:function:test", "test-token", @@ -74,12 +81,7 @@ private ExecutionManager createExecutionManager(List initialOperation @Test void executeCreatesCheckpointAndGetsCallbackId() { - var executionOp = Operation.builder() - .id("0") - .type(OperationType.EXECUTION) - .status(OperationStatus.STARTED) - .build(); - var executionManager = createExecutionManager(List.of(executionOp)); + var executionManager = createExecutionManager(List.of()); var serDes = new JacksonSerDes(); var operation = new CallbackOperation<>( @@ -95,12 +97,7 @@ void executeCreatesCheckpointAndGetsCallbackId() { @Test void executeWithConfigSetsOptions() { - var executionOp = Operation.builder() - .id("0") - .type(OperationType.EXECUTION) - .status(OperationStatus.STARTED) - .build(); - var executionManager = createExecutionManager(List.of(executionOp)); + var executionManager = createExecutionManager(List.of()); var serDes = new JacksonSerDes(); var config = CallbackConfig.builder() .timeout(Duration.ofMinutes(5)) diff --git a/sdk/src/test/java/com/amazonaws/lambda/durable/operation/WaitOperationTest.java b/sdk/src/test/java/com/amazonaws/lambda/durable/operation/WaitOperationTest.java index 520d77859..2e750d258 100644 --- a/sdk/src/test/java/com/amazonaws/lambda/durable/operation/WaitOperationTest.java +++ b/sdk/src/test/java/com/amazonaws/lambda/durable/operation/WaitOperationTest.java @@ -6,7 +6,6 @@ import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.mockito.Mockito.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when;