Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,11 @@ Build resilient, long-running AWS Lambda functions that automatically checkpoint
Your durable function extends `DurableHandler<I, O>` and implements `handleRequest(I input, DurableContext ctx)`. The `DurableContext` is your interface to durable operations:

- `ctx.step()` – Execute code and checkpoint the result
- `ctx.stepAsync()` – Start a concurrent step
- `ctx.wait()` – Suspend execution without compute charges
- `ctx.createCallback()` – Wait for external events (approvals, webhooks)
- `ctx.waitForCallback()` – Simplify callback handling by combining callback creation and submission in one operation
- `ctx.invoke()` – Invoke another Lambda function and wait for the result
- `ctx.invokeAsync()` – Start a concurrent Lambda function invocation
- `ctx.runInChildContext()` – Run an isolated child context with its own checkpoint log
- `ctx.runInChildContextAsync()` – Start a concurrent child context

## Quick Start

Expand Down
18 changes: 17 additions & 1 deletion docs/core/callbacks.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ Callbacks suspend execution until an external system sends a result. Use this fo
DurableCallbackFuture<String> callback = ctx.createCallback("approval", String.class);

// Send the callback ID to an external system within a step
ctx.step("send-notification", String.class, () -> {
ctx.step("send-notification", String.class, stepCtx -> {
notificationService.sendApprovalRequest(callback.callbackId(), requestDetails);
return "notification-sent";
});
Expand All @@ -18,6 +18,16 @@ String approvalResult = callback.get();

The external system completes the callback by calling the Lambda Durable Functions API with the callback ID and result payload.

#### waitForCallback() ####

`waitForCallback` simplifies callback handling by combining callback creation and submission in one operation. The SDK creates the callback, executes your submitter function with the callback ID, and waits for the result.

```java
ctx.waitForCallback("send-notification", String.class, (callbackId, stepCtx) -> {
notificationService.sendApprovalRequest(callbackId, requestDetails);
})
```

#### Callback Configuration

Configure timeouts and serialization to handle cases where callbacks are never completed or need custom deserialization:
Expand All @@ -30,6 +40,12 @@ var config = CallbackConfig.builder()
.build();

var callback = ctx.createCallback("approval", String.class, config);

var waitForCallbackConfig = WaitForCallbackConfig.builder()
.callbackConfig(config)
.stepConfig(StepConfig.builder().retryStrategy(...).build())
.build();
ctx.waitForCallback("approval", String.class, callbackId -> sendApprovalRequest(callbackId), waitForCallbackConfig);
```

| Option | Description |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ public String handleRequest(ApprovalRequest input, DurableContext context) {

var config = CallbackConfig.builder().timeout(timeout).build();

var preapprovalCallback = context.waitForCallbackAsync("preapproval", String.class, (callbackId, ctx) -> {
ctx.getLogger().info("Sending callback {} to preapproval system", callbackId);
});

var callback = context.createCallback("approval", String.class, config);

// Step 2.5: Log AWS CLI command to complete the callback
Expand All @@ -57,15 +61,14 @@ public String handleRequest(ApprovalRequest input, DurableContext context) {
return null;
});

var preapprovalResult = preapprovalCallback.get();

// Step 3: Wait for external approval (suspends execution)
var approvalResult = callback.get();

// Step 4: Process the approval
var result = context.step("process-approval", String.class, () -> {
return prepared + " - " + approvalResult;
});

return result;
return context.step(
"process-approval", String.class, () -> prepared + " - " + preapprovalResult + " - " + approvalResult);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import static org.junit.jupiter.api.Assertions.*;

import org.junit.jupiter.api.Test;
import software.amazon.awssdk.services.lambda.model.ErrorObject;
import software.amazon.awssdk.services.lambda.model.OperationStatus;
import software.amazon.awssdk.services.lambda.model.OperationType;
import software.amazon.lambda.durable.model.ExecutionStatus;
Expand Down Expand Up @@ -46,11 +47,53 @@ void testCallbackExampleCompletesAfterApproval() {
var callbackId = runner.getCallbackId("approval");
runner.completeCallback(callbackId, "\"Approved by manager\"");

// Second run - callback complete, finishes processing
result = runner.run(input);
assertEquals(ExecutionStatus.PENDING, result.getStatus());

// second run - pending preapproval
var preapprovalCallbackId = runner.getCallbackId("preapproval-callback");
runner.completeCallback(preapprovalCallbackId, "\"Sent to preapprover\"");

// third run - callback complete, finishes processing
result = runner.run(input);

assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus());
assertEquals(
"Approval request for: New laptop ($1500.0) - Approved by manager", result.getResult(String.class));
"Approval request for: New laptop ($1500.0) - Sent to preapprover - Approved by manager",
result.getResult(String.class));
}

@Test
void testCallbackExampleFail() {
var handler = new CallbackExample();
var runner = LocalDurableTestRunner.create(ApprovalRequest.class, handler);

var input = new ApprovalRequest("New laptop", 1500.00);

// First run - suspends waiting for callback
var result = runner.run(input);
assertEquals(ExecutionStatus.PENDING, result.getStatus());

// Simulate external system approving the request
var callbackId = runner.getCallbackId("approval");
runner.completeCallback(callbackId, "\"Approved by manager\"");

result = runner.run(input);
assertEquals(ExecutionStatus.PENDING, result.getStatus());

// second run - pending preapproval
var preapprovalCallbackId = runner.getCallbackId("preapproval-callback");
runner.failCallback(
preapprovalCallbackId,
ErrorObject.builder()
.errorType("error type")
.errorMessage("error message")
.build());

// third run - callback complete, finishes processing
result = runner.run(input);

assertEquals(ExecutionStatus.FAILED, result.getStatus());
assertEquals("error message", result.getError().get().errorMessage());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -273,12 +273,20 @@ void testErrorHandlingExample() {
}

@Test
void testCallbackExample() throws Exception {
void testCallbackExample() {
// happy case covering both createCallback (approval) and waitForCallback (preapproval-callback)
var runner = CloudDurableTestRunner.create(arn("callback-example"), ApprovalRequest.class, String.class);
var lambda = LambdaClient.create();

// Start async execution
var execution = runner.startAsync(new ApprovalRequest("Purchase order", 5000.0));

// Complete the preapproval callback
execution.pollUntil(exec -> exec.hasCallback("preapproval-callback"));
var preapprovalCallbackId = execution.getCallbackId("preapproval-callback");
lambda.sendDurableExecutionCallbackSuccess(
req -> req.callbackId(preapprovalCallbackId).result(SdkBytes.fromUtf8String("\"preapproved\"")));

// Wait for callback to appear
execution.pollUntil(exec -> exec.hasCallback("approval"));

Expand All @@ -287,7 +295,6 @@ void testCallbackExample() throws Exception {
assertNotNull(callbackId);

// Complete the callback using AWS SDK
var lambda = LambdaClient.create();
lambda.sendDurableExecutionCallbackSuccess(
req -> req.callbackId(callbackId).result(SdkBytes.fromUtf8String("\"approved\"")));

Expand All @@ -297,6 +304,7 @@ void testCallbackExample() throws Exception {

var finalResult = result.getResult(String.class);
assertNotNull(finalResult);
assertTrue(finalResult.contains("preapproved"));
assertTrue(finalResult.contains("Approval request for: Purchase order"));
assertTrue(finalResult.contains("5000"));
assertTrue(finalResult.contains("approved"));
Expand All @@ -310,10 +318,16 @@ void testCallbackExample() throws Exception {
@Test
void testCallbackExampleWithFailure() {
var runner = CloudDurableTestRunner.create(arn("callback-example"), ApprovalRequest.class, String.class);
var lambda = LambdaClient.create();

// Start async execution
var execution = runner.startAsync(new ApprovalRequest("Purchase order", 5000.0));

execution.pollUntil(exec -> exec.hasCallback("preapproval-callback"));
var preapprovalCallbackId = execution.getCallbackId("preapproval-callback");
lambda.sendDurableExecutionCallbackSuccess(
req -> req.callbackId(preapprovalCallbackId).result(SdkBytes.fromUtf8String("\"preapproved\"")));

// Wait for callback to appear
execution.pollUntil(exec -> exec.hasCallback("approval"));

Expand All @@ -322,7 +336,6 @@ void testCallbackExampleWithFailure() {
assertNotNull(callbackId);

// Fail the callback using AWS SDK
var lambda = LambdaClient.create();
lambda.sendDurableExecutionCallbackFailure(req -> req.callbackId(callbackId)
.error(err -> err.errorType("ApprovalRejected").errorMessage("Approval rejected by manager")));

Expand All @@ -343,10 +356,16 @@ void testCallbackExampleWithFailure() {
@Test
void testCallbackExampleWithTimeout() {
var runner = CloudDurableTestRunner.create(arn("callback-example"), ApprovalRequest.class, String.class);
var lambda = LambdaClient.create();

// Start async execution with 10 second timeout
var execution = runner.startAsync(new ApprovalRequest("Purchase order", 5000.0, 10));

execution.pollUntil(exec -> exec.hasCallback("preapproval-callback"));
var preapprovalCallbackId = execution.getCallbackId("preapproval-callback");
lambda.sendDurableExecutionCallbackSuccess(
req -> req.callbackId(preapprovalCallbackId).result(SdkBytes.fromUtf8String("\"preapproved\"")));

// Wait for callback to appear
execution.pollUntil(exec -> exec.hasCallback("approval"));

Expand All @@ -364,6 +383,37 @@ void testCallbackExampleWithTimeout() {
assertEquals(OperationStatus.TIMED_OUT, approvalOp.getStatus());
}

@Test
void testCallbackExampleWithWaitForCallbackFailure() {
// fail the waitForCallback (preapproval-callback) callback
var runner = CloudDurableTestRunner.create(arn("callback-example"), ApprovalRequest.class, String.class);
var lambda = LambdaClient.create();

// Start async execution with 10 second timeout
var execution = runner.startAsync(new ApprovalRequest("Purchase order", 5000.0, 10));

execution.pollUntil(exec -> exec.hasCallback("preapproval-callback"));
var preapprovalCallbackId = execution.getCallbackId("preapproval-callback");
lambda.sendDurableExecutionCallbackFailure(
req -> req.callbackId(preapprovalCallbackId).error(err -> err.errorMessage("preapproval denied")));

// Wait for callback to appear
execution.pollUntil(exec -> exec.hasCallback("approval"));

// Get callback ID but don't complete it - let it timeout
var callbackId = execution.getCallbackId("approval");
assertNotNull(callbackId);

// Wait for execution to complete (should timeout after 10 seconds)
var result = execution.pollUntilComplete();
assertEquals(ExecutionStatus.FAILED, result.getStatus());

// Verify the callback operation shows timeout status
var approvalOp = execution.getOperation("preapproval-callback");
assertNotNull(approvalOp);
assertEquals(OperationStatus.FAILED, approvalOp.getStatus());
}

@Test
void testChildContextExample() {
var runner = CloudDurableTestRunner.create(arn("child-context-example"), GreetingRequest.class, String.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,7 @@ class GenericTypesExampleTest {
@Test
void testGenericTypesExample() {
var handler = new GenericTypesExample();
var runner = LocalDurableTestRunner.create(GenericTypesExample.Input.class, handler)
.withSkipTime(true);
var runner = LocalDurableTestRunner.create(GenericTypesExample.Input.class, handler);

var input = new GenericTypesExample.Input("user123");
var result = runner.run(input);
Expand Down Expand Up @@ -48,8 +47,7 @@ void testGenericTypesExample() {
@Test
void testOperationTracking() {
var handler = new GenericTypesExample();
var runner = LocalDurableTestRunner.create(GenericTypesExample.Input.class, handler)
.withSkipTime(true);
var runner = LocalDurableTestRunner.create(GenericTypesExample.Input.class, handler);

var input = new GenericTypesExample.Input("user456");
var result = runner.run(input);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
import software.amazon.awssdk.services.lambda.model.ErrorObject;
import software.amazon.awssdk.services.lambda.model.OperationStatus;
import software.amazon.awssdk.services.lambda.model.OperationType;
import software.amazon.lambda.durable.exception.CallbackFailedException;
import software.amazon.lambda.durable.exception.CallbackTimeoutException;
import software.amazon.lambda.durable.model.ExecutionStatus;
import software.amazon.lambda.durable.serde.JacksonSerDes;
import software.amazon.lambda.durable.serde.SerDes;
Expand Down Expand Up @@ -259,4 +261,92 @@ void callbackFailedExceptionHandlesVariousErrorFormats() {
assertNotNull(result.getError().get().stackTrace());
assertEquals(1, result.getError().get().stackTrace().size());
}

@Test
void waitForCallbackCallbackFailed() {
var runner = LocalDurableTestRunner.create(String.class, (input, ctx) -> {
try {
ctx.waitForCallback("approval", String.class, (callbackId, stepCtx) -> {});
fail();
return "should not reach here";
} catch (Exception e) {
assertInstanceOf(CallbackFailedException.class, e);
throw e;
}
});

// First run - creates callback
runner.run("test");

// Fail callback with errorType, errorMessage, and stack trace
var callbackId = runner.getCallbackId("approval-callback");
var error = ErrorObject.builder()
.errorType("ValidationError")
.errorMessage("Invalid input data")
.stackTrace(java.util.List.of("com.example.Service|validate|Service.java|42"))
.build();
runner.failCallback(callbackId, error);

// Second run - should fail with formatted message and preserved stack trace
var result = runner.run("test");
assertEquals(ExecutionStatus.FAILED, result.getStatus());
assertTrue(result.getError().isPresent());
assertEquals("ValidationError", result.getError().get().errorType());
assertEquals("Invalid input data", result.getError().get().errorMessage());
assertNotNull(result.getError().get().stackTrace());
assertEquals(1, result.getError().get().stackTrace().size());
}

@Test
void waitForCallbackCallbackTimeout() {
var runner = LocalDurableTestRunner.create(String.class, (input, ctx) -> {
try {
ctx.waitForCallback("approval", String.class, (callbackId, stepCtx) -> {});
fail();
return "should not reach here";
} catch (Exception e) {
assertInstanceOf(CallbackTimeoutException.class, e);
throw e;
}
});

// First run - creates callback
runner.run("test");

// Fail callback with errorType, errorMessage, and stack trace
var callbackId = runner.getCallbackId("approval-callback");
runner.timeoutCallback(callbackId);

// Second run - should fail with formatted message and preserved stack trace
var result = runner.run("test");
assertEquals(ExecutionStatus.FAILED, result.getStatus());
}

@Test
void waitForCallbackCallbackFailedWithUserException() {
var runner = LocalDurableTestRunner.create(String.class, (input, ctx) -> {
var errorMessage = "user exception";
try {
return ctx.waitForCallback("approval", String.class, (callbackId, stepCtx) -> {
// original exception
throw new IllegalArgumentException(errorMessage);
});
} catch (Exception e) {
assertInstanceOf(IllegalArgumentException.class, e);
assertEquals(errorMessage, e.getMessage());
throw e;
}
});

// First run - creates callback
runner.run("test");

// Fail callback with errorType, errorMessage, and stack trace
var callbackId = runner.getCallbackId("approval-callback");
runner.timeoutCallback(callbackId);

// Second run - should fail with formatted message and preserved stack trace
var result = runner.runUntilComplete("test");
assertEquals(ExecutionStatus.FAILED, result.getStatus());
}
}
Loading