Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package com.amazonaws.lambda.durable.examples;

import com.amazonaws.lambda.durable.DurableContext;
import com.amazonaws.lambda.durable.DurableHandler;

/**
* Simple example demonstrating a durable function doesn't have any durable operation
*
* <p>This handler processes a greeting request and returns a greeting message
*/
public class NoopExample extends DurableHandler<GreetingRequest, String> {

@Override
public String handleRequest(GreetingRequest input, DurableContext context) {
return "HELLO, " + input.getName() + "!";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,17 @@ void testSimpleStepExample() {
assertEquals("create-greeting", createGreetingOp.getName());
}

@Test
void testNoopExampleWithLargeInput() {
var runner = CloudDurableTestRunner.create(arn("noop-example"), Map.class, String.class);
// 6MB large input
var largeInput = "A".repeat(1024 * 1024 * 6 - 12);
var result = runner.run(Map.of("name", largeInput));

assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus());
assertEquals("HELLO, " + largeInput + "!", result.getResult(String.class));
}

@Test
void testSimpleInvokeExample() {
var runner = CloudDurableTestRunner.create(arn("simple-invoke-example"), Map.class, String.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,25 @@ void testSimpleStepExample() {
assertEquals("HELLO, ALICE!", result.getResult(String.class));
}

@Test
void testWithLargePayload() {
// Create handler
var handler = new SimpleStepExample();

// Create test runner
var runner = LocalDurableTestRunner.create(GreetingRequest.class, handler);
// 6MB large input
var largeInput = "A".repeat(1024).repeat(1024).repeat(6);

// Run with input
var input = new GreetingRequest(largeInput);
var result = runner.run(input);

// Verify result
assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus());
assertEquals("HELLO, " + largeInput + "!", result.getResult(String.class));
}

@Test
void testWithDefaultName() {
var handler = new SimpleStepExample();
Expand Down
26 changes: 26 additions & 0 deletions examples/template.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,24 @@ Globals:
- !Ref Architecture

Resources:
NoopExampleFunction:
Type: AWS::Serverless::Function
Properties:
PackageType: Image
FunctionName: !Join
- ''
- - noop-example
- !Ref FunctionNameSuffix
ImageConfig:
Command: [ "com.amazonaws.lambda.durable.examples.NoopExample::handleRequest" ]
DurableConfig:
ExecutionTimeout: 300
RetentionPeriodInDays: 7
Metadata:
Dockerfile: !Ref DockerFile
DockerContext: ../
DockerTag: durable-examples

SimpleStepExampleFunction:
Type: AWS::Serverless::Function
Properties:
Expand Down Expand Up @@ -351,6 +369,14 @@ Resources:
DockerTag: durable-examples

Outputs:
NoopExampleFunction:
Description: Noop Example Function ARN
Value: !GetAtt NoopExampleFunction.Arn

NoopExampleFunctionName:
Description: Noop Example Function Name
Value: !Ref NoopExampleFunction

SimpleStepExampleFunction:
Description: Simple Step Example Function ARN
Value: !GetAtt SimpleStepExampleFunction.Arn
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import java.util.stream.Collectors;
import software.amazon.awssdk.services.lambda.model.ErrorObject;
import software.amazon.awssdk.services.lambda.model.Event;
import software.amazon.awssdk.services.lambda.model.EventType;
import software.amazon.awssdk.services.lambda.model.OperationStatus;

public class TestResult<O> {
Expand Down Expand Up @@ -50,7 +51,12 @@ public <T> T getResult(Class<T> resultType) {
if (status != ExecutionStatus.SUCCEEDED) {
throw new IllegalStateException("Execution did not succeed: " + status);
}
if (resultPayload == null) {
if (resultPayload == null || resultPayload.isEmpty()) {
var lastEvent = allEvents.get(allEvents.size() - 1);
if (lastEvent.eventType() == EventType.EXECUTION_SUCCEEDED) {
return serDes.deserialize(
lastEvent.executionSucceededDetails().result().payload(), TypeToken.get(resultType));
}
return null;
}
return serDes.deserialize(resultPayload, TypeToken.get(resultType));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,30 +36,9 @@ public static <I, O> DurableExecutionOutput execute(
Class<I> inputType,
BiFunction<I, DurableContext, O> handler,
DurableConfig config) {
logger.debug("DurableExecution.execute() called");
logger.debug("DurableExecutionArn: {}", input.durableExecutionArn());
logger.debug(
"Initial operations count: {}",
input.initialExecutionState() != null
&& input.initialExecutionState().operations() != null
? input.initialExecutionState().operations().size()
: 0);

// Validate initial operation is an EXECUTION operation
if (input.initialExecutionState() == null
|| input.initialExecutionState().operations() == null
|| input.initialExecutionState().operations().isEmpty()
|| input.initialExecutionState().operations().get(0).type() != OperationType.EXECUTION) {
throw new IllegalStateException("First operation must be EXECUTION");
}

var executionManager = new ExecutionManager(
input.durableExecutionArn(), input.checkpointToken(), input.initialExecutionState(), config);

logger.debug(
"EXECUTION operation found: {}",
executionManager.getExecutionOperation().id());

var handlerFuture = CompletableFuture.supplyAsync(
() -> {
var userInput =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import software.amazon.awssdk.services.lambda.model.CheckpointUpdatedExecutionState;
import software.amazon.awssdk.services.lambda.model.Operation;
import software.amazon.awssdk.services.lambda.model.OperationStatus;
import software.amazon.awssdk.services.lambda.model.OperationType;
import software.amazon.awssdk.services.lambda.model.OperationUpdate;

/**
Expand All @@ -44,7 +45,7 @@ public class ExecutionManager {

// ===== Execution State =====
private final Map<String, Operation> operationStorage;
private final String executionOperationId;
private final Operation executionOp;
private final String durableExecutionArn;
private final AtomicReference<ExecutionMode> executionMode;

Expand All @@ -64,7 +65,6 @@ public ExecutionManager(
CheckpointUpdatedExecutionState initialExecutionState,
DurableConfig config) {
this.durableExecutionArn = durableExecutionArn;
this.executionOperationId = initialExecutionState.operations().get(0).id();

// Create checkpoint batcher for internal coordination
this.checkpointBatcher =
Expand All @@ -76,6 +76,17 @@ public ExecutionManager(
// Start in REPLAY mode if we have more than just the initial EXECUTION operation
this.executionMode =
new AtomicReference<>(operationStorage.size() > 1 ? ExecutionMode.REPLAY : ExecutionMode.EXECUTION);

executionOp = findExecutionOp(initialExecutionState);

// Validate initial operation is an EXECUTION operation
if (executionOp == null) {
throw new IllegalStateException("First operation must be EXECUTION");
}
logger.debug("DurableExecution.execute() called");
logger.debug("DurableExecutionArn: {}", durableExecutionArn);
logger.debug("Initial operations count: {}", operationStorage.size());
logger.debug("EXECUTION operation found: {}", executionOp.id());
}

// ===== State Management =====
Expand Down Expand Up @@ -126,7 +137,27 @@ public Operation getOperationAndUpdateReplayState(String operationId) {
}

public Operation getExecutionOperation() {
return operationStorage.get(executionOperationId);
return executionOp;
}

private Operation findExecutionOp(CheckpointUpdatedExecutionState initialExecutionState) {
// find execution OP in the input
if (initialExecutionState != null
&& initialExecutionState.operations() != null
&& !initialExecutionState.operations().isEmpty()) {
var op = initialExecutionState.operations().get(0);
if (op.type() != OperationType.EXECUTION) {
throw new IllegalStateException("First operation must be EXECUTION");
}
return op;
}
// find execution OP in the checkpoint result
for (Operation op : operationStorage.values()) {
if (op.type() == OperationType.EXECUTION) {
return op;
}
}
return null;
}

// ===== Thread Coordination =====
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,26 +8,28 @@
import com.amazonaws.lambda.durable.execution.SuspendExecutionException;
import com.amazonaws.lambda.durable.retry.RetryStrategies;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import org.junit.jupiter.api.Test;
import software.amazon.awssdk.services.lambda.model.*;

class DurableContextTest {
private static final Operation EXECUTION_OP = Operation.builder()
.id("0")
.type(OperationType.EXECUTION)
.status(OperationStatus.STARTED)
.build();

private DurableContext createTestContext() {
var executionOp = Operation.builder()
.id("0")
.type(OperationType.EXECUTION)
.status(OperationStatus.STARTED)
.build();
return createTestContext(List.of(executionOp));
return createTestContext(List.of());
}

private DurableContext createTestContext(List<Operation> initialOperations) {
var client = TestUtils.createMockClient();
var initialExecutionState = CheckpointUpdatedExecutionState.builder()
.operations(initialOperations)
.build();
var operations = new ArrayList<>(List.of(EXECUTION_OP));
operations.addAll(initialOperations);
var initialExecutionState =
CheckpointUpdatedExecutionState.builder().operations(operations).build();
var executionManager = new ExecutionManager(
"arn:aws:lambda:us-east-1:123456789012:function:test:$LATEST/durable-execution/"
+ "349beff4-a89d-4bc8-a56f-af7a8af67a5f/20dae574-53da-37a1-bfd5-b0e2e6ec715d",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,26 @@
package com.amazonaws.lambda.durable.execution;

import static org.junit.jupiter.api.Assertions.*;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import com.amazonaws.lambda.durable.DurableConfig;
import com.amazonaws.lambda.durable.TestUtils;
import com.amazonaws.lambda.durable.client.DurableExecutionClient;
import java.util.List;
import org.junit.jupiter.api.Test;
import software.amazon.awssdk.services.lambda.model.CheckpointUpdatedExecutionState;
import software.amazon.awssdk.services.lambda.model.GetDurableExecutionStateResponse;
import software.amazon.awssdk.services.lambda.model.Operation;
import software.amazon.awssdk.services.lambda.model.OperationStatus;
import software.amazon.awssdk.services.lambda.model.OperationType;

class ExecutionManagerTest {
private DurableExecutionClient client;

private ExecutionManager createManager(List<Operation> operations) {
var client = TestUtils.createMockClient();
client = TestUtils.createMockClient();
var initialState =
CheckpointUpdatedExecutionState.builder().operations(operations).build();
return new ExecutionManager(
Expand Down Expand Up @@ -109,4 +115,26 @@ void staysInReplayModeForFailedOperation() {
assertNotNull(op);
assertTrue(manager.isReplaying());
}

@Test
void emptyInitialState() {
client = mock(DurableExecutionClient.class);
when(client.getExecutionState(any(), any(), any()))
.thenReturn(GetDurableExecutionStateResponse.builder()
.operations(List.of(executionOp()))
.nextMarker(null)
.build());
var initialState = CheckpointUpdatedExecutionState.builder()
.operations(List.of())
.nextMarker("marker")
.build();
var executionManager = new ExecutionManager(
"arn:aws:lambda:us-east-1:123456789012:function:test",
"test-token",
initialState,
DurableConfig.builder().withDurableExecutionClient(client).build());

assertNotNull(executionManager.getExecutionOperation());
assertEquals("0", executionManager.getExecutionOperation().id());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import com.amazonaws.lambda.durable.serde.JacksonSerDes;
import com.amazonaws.lambda.durable.serde.SerDes;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -60,9 +61,15 @@ public <T> T deserialize(String data, TypeToken<T> typeToken) {

private ExecutionManager createExecutionManager(List<Operation> initialOperations) {
var client = TestUtils.createMockClient();
var initialState = CheckpointUpdatedExecutionState.builder()
.operations(initialOperations)
.build();
var operations = new ArrayList<Operation>();
operations.add(Operation.builder()
.id("0")
.type(OperationType.EXECUTION)
.status(OperationStatus.STARTED)
.build());
operations.addAll(initialOperations);
var initialState =
CheckpointUpdatedExecutionState.builder().operations(operations).build();
var executionManager = new ExecutionManager(
"arn:aws:lambda:us-east-1:123456789012:function:test",
"test-token",
Expand All @@ -74,12 +81,7 @@ private ExecutionManager createExecutionManager(List<Operation> initialOperation

@Test
void executeCreatesCheckpointAndGetsCallbackId() {
var executionOp = Operation.builder()
.id("0")
.type(OperationType.EXECUTION)
.status(OperationStatus.STARTED)
.build();
var executionManager = createExecutionManager(List.of(executionOp));
var executionManager = createExecutionManager(List.of());
var serDes = new JacksonSerDes();

var operation = new CallbackOperation<>(
Expand All @@ -95,12 +97,7 @@ void executeCreatesCheckpointAndGetsCallbackId() {

@Test
void executeWithConfigSetsOptions() {
var executionOp = Operation.builder()
.id("0")
.type(OperationType.EXECUTION)
.status(OperationStatus.STARTED)
.build();
var executionManager = createExecutionManager(List.of(executionOp));
var executionManager = createExecutionManager(List.of());
var serDes = new JacksonSerDes();
var config = CallbackConfig.builder()
.timeout(Duration.ofMinutes(5))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

Expand Down
Loading