Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ Your durable function extends `DurableHandler<I, O>` and implements `handleReque
- `ctx.stepAsync()` – Start a concurrent step
- `ctx.wait()` – Suspend execution without compute charges
- `ctx.createCallback()` – Wait for external events (approvals, webhooks)
- `ctx.waitForCallback()` – Run an isolated child context with its own checkpoint log
Comment thread
zhongkechen marked this conversation as resolved.
Outdated
- `ctx.waitForCallbackAsync()` – Start a concurrent child context
- `ctx.invoke()` – Invoke another Lambda function and wait for the result
- `ctx.invokeAsync()` – Start a concurrent Lambda function invocation
- `ctx.runInChildContext()` – Run an isolated child context with its own checkpoint log
Expand Down
18 changes: 17 additions & 1 deletion docs/core/callbacks.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ Callbacks suspend execution until an external system sends a result. Use this fo
DurableCallbackFuture<String> callback = ctx.createCallback("approval", String.class);

// Send the callback ID to an external system within a step
ctx.step("send-notification", String.class, () -> {
ctx.step("send-notification", String.class, stepCtx -> {
notificationService.sendApprovalRequest(callback.callbackId(), requestDetails);
return "notification-sent";
});
Expand All @@ -18,6 +18,16 @@ String approvalResult = callback.get();

The external system completes the callback by calling the Lambda Durable Functions API with the callback ID and result payload.

#### waitForCallback() ####

`waitForCallback` simplifies callback handling by combining callback creation and submission in one operation. The SDK creates the callback, executes your submitter function with the callback ID, and waits for the result.

```java
ctx.waitForCallback("send-notification", String.class, (callbackId, stepCtx) -> {
notificationService.sendApprovalRequest(callbackId, requestDetails);
})
```

#### Callback Configuration

Configure timeouts and serialization to handle cases where callbacks are never completed or need custom deserialization:
Expand All @@ -30,6 +40,12 @@ var config = CallbackConfig.builder()
.build();

var callback = ctx.createCallback("approval", String.class, config);

var waitForCallbackConfig = WaitForCallbackConfig.builder()
.callbackConfig(config)
.stepConfig(StepConfig.builder().retryStrategy(...).build())
.build();
ctx.waitForCallback("approval", String.class, callbackId -> sendApprovalRequest(callbackId), waitForCallbackConfig);
```

| Option | Description |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ public String handleRequest(ApprovalRequest input, DurableContext context) {

var config = CallbackConfig.builder().timeout(timeout).build();

var preapprovalCallback = context.waitForCallbackAsync("preapproval", String.class, (callbackId, ctx) -> {
ctx.getLogger().info("Sending callback {} to preapproval system", callbackId);
});

var callback = context.createCallback("approval", String.class, config);

// Step 2.5: Log AWS CLI command to complete the callback
Expand All @@ -57,15 +61,14 @@ public String handleRequest(ApprovalRequest input, DurableContext context) {
return null;
});

var preapprovalResult = preapprovalCallback.get();

// Step 3: Wait for external approval (suspends execution)
var approvalResult = callback.get();

// Step 4: Process the approval
var result = context.step("process-approval", String.class, () -> {
return prepared + " - " + approvalResult;
});

return result;
return context.step(
"process-approval", String.class, () -> prepared + " - " + preapprovalResult + " - " + approvalResult);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import static org.junit.jupiter.api.Assertions.*;

import org.junit.jupiter.api.Test;
import software.amazon.awssdk.services.lambda.model.ErrorObject;
import software.amazon.awssdk.services.lambda.model.OperationStatus;
import software.amazon.awssdk.services.lambda.model.OperationType;
import software.amazon.lambda.durable.model.ExecutionStatus;
Expand Down Expand Up @@ -46,11 +47,53 @@ void testCallbackExampleCompletesAfterApproval() {
var callbackId = runner.getCallbackId("approval");
runner.completeCallback(callbackId, "\"Approved by manager\"");

// Second run - callback complete, finishes processing
result = runner.run(input);
assertEquals(ExecutionStatus.PENDING, result.getStatus());

// second run - pending preapproval
var preapprovalCallbackId = runner.getCallbackId("preapproval-callback");
runner.completeCallback(preapprovalCallbackId, "\"Sent to preapprover\"");

// third run - callback complete, finishes processing
result = runner.run(input);

assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus());
assertEquals(
"Approval request for: New laptop ($1500.0) - Approved by manager", result.getResult(String.class));
"Approval request for: New laptop ($1500.0) - Sent to preapprover - Approved by manager",
result.getResult(String.class));
}

@Test
void testCallbackExampleFail() {
var handler = new CallbackExample();
var runner = LocalDurableTestRunner.create(ApprovalRequest.class, handler);

var input = new ApprovalRequest("New laptop", 1500.00);

// First run - suspends waiting for callback
var result = runner.run(input);
assertEquals(ExecutionStatus.PENDING, result.getStatus());

// Simulate external system approving the request
var callbackId = runner.getCallbackId("approval");
runner.completeCallback(callbackId, "\"Approved by manager\"");

result = runner.run(input);
assertEquals(ExecutionStatus.PENDING, result.getStatus());

// second run - pending preapproval
var preapprovalCallbackId = runner.getCallbackId("preapproval-callback");
runner.failCallback(
preapprovalCallbackId,
ErrorObject.builder()
.errorType("error type")
.errorMessage("error message")
.build());

// third run - callback complete, finishes processing
result = runner.run(input);

assertEquals(ExecutionStatus.FAILED, result.getStatus());
assertEquals("error message", result.getError().get().errorMessage());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -273,12 +273,20 @@ void testErrorHandlingExample() {
}

@Test
void testCallbackExample() throws Exception {
void testCallbackExample() {
// happy case covering both createCallback (approval) and waitForCallback (preapproval-callback)
var runner = CloudDurableTestRunner.create(arn("callback-example"), ApprovalRequest.class, String.class);
var lambda = LambdaClient.create();

// Start async execution
var execution = runner.startAsync(new ApprovalRequest("Purchase order", 5000.0));

// Complete the preapproval callback
execution.pollUntil(exec -> exec.hasCallback("preapproval-callback"));
var preapprovalCallbackId = execution.getCallbackId("preapproval-callback");
lambda.sendDurableExecutionCallbackSuccess(
req -> req.callbackId(preapprovalCallbackId).result(SdkBytes.fromUtf8String("\"preapproved\"")));

// Wait for callback to appear
execution.pollUntil(exec -> exec.hasCallback("approval"));

Expand All @@ -287,7 +295,6 @@ void testCallbackExample() throws Exception {
assertNotNull(callbackId);

// Complete the callback using AWS SDK
var lambda = LambdaClient.create();
lambda.sendDurableExecutionCallbackSuccess(
req -> req.callbackId(callbackId).result(SdkBytes.fromUtf8String("\"approved\"")));

Expand All @@ -297,6 +304,7 @@ void testCallbackExample() throws Exception {

var finalResult = result.getResult(String.class);
assertNotNull(finalResult);
assertTrue(finalResult.contains("preapproved"));
assertTrue(finalResult.contains("Approval request for: Purchase order"));
assertTrue(finalResult.contains("5000"));
assertTrue(finalResult.contains("approved"));
Expand All @@ -310,10 +318,16 @@ void testCallbackExample() throws Exception {
@Test
void testCallbackExampleWithFailure() {
var runner = CloudDurableTestRunner.create(arn("callback-example"), ApprovalRequest.class, String.class);
var lambda = LambdaClient.create();

// Start async execution
var execution = runner.startAsync(new ApprovalRequest("Purchase order", 5000.0));

execution.pollUntil(exec -> exec.hasCallback("preapproval-callback"));
var preapprovalCallbackId = execution.getCallbackId("preapproval-callback");
lambda.sendDurableExecutionCallbackSuccess(
req -> req.callbackId(preapprovalCallbackId).result(SdkBytes.fromUtf8String("\"preapproved\"")));

// Wait for callback to appear
execution.pollUntil(exec -> exec.hasCallback("approval"));

Expand All @@ -322,7 +336,6 @@ void testCallbackExampleWithFailure() {
assertNotNull(callbackId);

// Fail the callback using AWS SDK
var lambda = LambdaClient.create();
lambda.sendDurableExecutionCallbackFailure(req -> req.callbackId(callbackId)
.error(err -> err.errorType("ApprovalRejected").errorMessage("Approval rejected by manager")));

Expand All @@ -343,10 +356,16 @@ void testCallbackExampleWithFailure() {
@Test
void testCallbackExampleWithTimeout() {
var runner = CloudDurableTestRunner.create(arn("callback-example"), ApprovalRequest.class, String.class);
var lambda = LambdaClient.create();

// Start async execution with 10 second timeout
var execution = runner.startAsync(new ApprovalRequest("Purchase order", 5000.0, 10));

execution.pollUntil(exec -> exec.hasCallback("preapproval-callback"));
var preapprovalCallbackId = execution.getCallbackId("preapproval-callback");
lambda.sendDurableExecutionCallbackSuccess(
req -> req.callbackId(preapprovalCallbackId).result(SdkBytes.fromUtf8String("\"preapproved\"")));

// Wait for callback to appear
execution.pollUntil(exec -> exec.hasCallback("approval"));

Expand All @@ -364,6 +383,37 @@ void testCallbackExampleWithTimeout() {
assertEquals(OperationStatus.TIMED_OUT, approvalOp.getStatus());
}

@Test
void testCallbackExampleWithWaitForCallbackFailure() {
// fail the waitForCallback (preapproval-callback) callback
var runner = CloudDurableTestRunner.create(arn("callback-example"), ApprovalRequest.class, String.class);
var lambda = LambdaClient.create();

// Start async execution with 10 second timeout
var execution = runner.startAsync(new ApprovalRequest("Purchase order", 5000.0, 10));

execution.pollUntil(exec -> exec.hasCallback("preapproval-callback"));
var preapprovalCallbackId = execution.getCallbackId("preapproval-callback");
lambda.sendDurableExecutionCallbackFailure(
req -> req.callbackId(preapprovalCallbackId).error(err -> err.errorMessage("preapproval denied")));

// Wait for callback to appear
execution.pollUntil(exec -> exec.hasCallback("approval"));

// Get callback ID but don't complete it - let it timeout
var callbackId = execution.getCallbackId("approval");
assertNotNull(callbackId);

// Wait for execution to complete (should timeout after 10 seconds)
var result = execution.pollUntilComplete();
assertEquals(ExecutionStatus.FAILED, result.getStatus());

// Verify the callback operation shows timeout status
var approvalOp = execution.getOperation("preapproval-callback");
assertNotNull(approvalOp);
assertEquals(OperationStatus.FAILED, approvalOp.getStatus());
}

@Test
void testChildContextExample() {
var runner = CloudDurableTestRunner.create(arn("child-context-example"), GreetingRequest.class, String.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,7 @@ class GenericTypesExampleTest {
@Test
void testGenericTypesExample() {
var handler = new GenericTypesExample();
var runner = LocalDurableTestRunner.create(GenericTypesExample.Input.class, handler)
.withSkipTime(true);
var runner = LocalDurableTestRunner.create(GenericTypesExample.Input.class, handler);

var input = new GenericTypesExample.Input("user123");
var result = runner.run(input);
Expand Down Expand Up @@ -48,8 +47,7 @@ void testGenericTypesExample() {
@Test
void testOperationTracking() {
var handler = new GenericTypesExample();
var runner = LocalDurableTestRunner.create(GenericTypesExample.Input.class, handler)
.withSkipTime(true);
var runner = LocalDurableTestRunner.create(GenericTypesExample.Input.class, handler);

var input = new GenericTypesExample.Input("user456");
var result = runner.run(input);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
import software.amazon.awssdk.services.lambda.model.ErrorObject;
import software.amazon.awssdk.services.lambda.model.OperationStatus;
import software.amazon.awssdk.services.lambda.model.OperationType;
import software.amazon.lambda.durable.exception.CallbackFailedException;
import software.amazon.lambda.durable.exception.CallbackTimeoutException;
import software.amazon.lambda.durable.model.ExecutionStatus;
import software.amazon.lambda.durable.serde.JacksonSerDes;
import software.amazon.lambda.durable.serde.SerDes;
Expand Down Expand Up @@ -259,4 +261,92 @@ void callbackFailedExceptionHandlesVariousErrorFormats() {
assertNotNull(result.getError().get().stackTrace());
assertEquals(1, result.getError().get().stackTrace().size());
}

@Test
void waitForCallbackCallbackFailed() {
var runner = LocalDurableTestRunner.create(String.class, (input, ctx) -> {
try {
ctx.waitForCallback("approval", String.class, (callbackId, stepCtx) -> {});
fail();
return "should not reach here";
} catch (Exception e) {
assertInstanceOf(CallbackFailedException.class, e);
throw e;
}
});

// First run - creates callback
runner.run("test");

// Fail callback with errorType, errorMessage, and stack trace
var callbackId = runner.getCallbackId("approval-callback");
var error = ErrorObject.builder()
.errorType("ValidationError")
.errorMessage("Invalid input data")
.stackTrace(java.util.List.of("com.example.Service|validate|Service.java|42"))
.build();
runner.failCallback(callbackId, error);

// Second run - should fail with formatted message and preserved stack trace
var result = runner.run("test");
assertEquals(ExecutionStatus.FAILED, result.getStatus());
assertTrue(result.getError().isPresent());
assertEquals("ValidationError", result.getError().get().errorType());
assertEquals("Invalid input data", result.getError().get().errorMessage());
assertNotNull(result.getError().get().stackTrace());
assertEquals(1, result.getError().get().stackTrace().size());
}

@Test
void waitForCallbackCallbackTimeout() {
var runner = LocalDurableTestRunner.create(String.class, (input, ctx) -> {
try {
ctx.waitForCallback("approval", String.class, (callbackId, stepCtx) -> {});
fail();
return "should not reach here";
} catch (Exception e) {
assertInstanceOf(CallbackTimeoutException.class, e);
throw e;
}
});

// First run - creates callback
runner.run("test");

// Fail callback with errorType, errorMessage, and stack trace
var callbackId = runner.getCallbackId("approval-callback");
runner.timeoutCallback(callbackId);

// Second run - should fail with formatted message and preserved stack trace
var result = runner.run("test");
assertEquals(ExecutionStatus.FAILED, result.getStatus());
}

@Test
void waitForCallbackCallbackFailedWithUserException() {
var runner = LocalDurableTestRunner.create(String.class, (input, ctx) -> {
var errorMessage = "user exception";
try {
return ctx.waitForCallback("approval", String.class, (callbackId, stepCtx) -> {
// original exception
throw new IllegalArgumentException(errorMessage);
});
} catch (Exception e) {
assertInstanceOf(IllegalArgumentException.class, e);
assertEquals(errorMessage, e.getMessage());
throw e;
}
});

// First run - creates callback
runner.run("test");

// Fail callback with errorType, errorMessage, and stack trace
var callbackId = runner.getCallbackId("approval-callback");
runner.timeoutCallback(callbackId);

// Second run - should fail with formatted message and preserved stack trace
var result = runner.runUntilComplete("test");
assertEquals(ExecutionStatus.FAILED, result.getStatus());
}
}
Loading