Skip to content

Commit d9b69ce

Browse files
authored
[Feature]: add waitForCallback (#147)
* add waitForCallback * fix cloud test cases for waitForCallback * add unit tests * fix exception type * update step name * fix error handling * add name length validator
1 parent 482ff31 commit d9b69ce

File tree

28 files changed

+1037
-116
lines changed

28 files changed

+1037
-116
lines changed

README.md

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,11 @@ Build resilient, long-running AWS Lambda functions that automatically checkpoint
1818
Your durable function extends `DurableHandler<I, O>` and implements `handleRequest(I input, DurableContext ctx)`. The `DurableContext` is your interface to durable operations:
1919

2020
- `ctx.step()` – Execute code and checkpoint the result
21-
- `ctx.stepAsync()` – Start a concurrent step
2221
- `ctx.wait()` – Suspend execution without compute charges
2322
- `ctx.createCallback()` – Wait for external events (approvals, webhooks)
23+
- `ctx.waitForCallback()` – Simplify callback handling by combining callback creation and submission in one operation
2424
- `ctx.invoke()` – Invoke another Lambda function and wait for the result
25-
- `ctx.invokeAsync()` – Start a concurrent Lambda function invocation
2625
- `ctx.runInChildContext()` – Run an isolated child context with its own checkpoint log
27-
- `ctx.runInChildContextAsync()` – Start a concurrent child context
2826

2927
## Quick Start
3028

docs/core/callbacks.md

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ Callbacks suspend execution until an external system sends a result. Use this fo
77
DurableCallbackFuture<String> callback = ctx.createCallback("approval", String.class);
88

99
// Send the callback ID to an external system within a step
10-
ctx.step("send-notification", String.class, () -> {
10+
ctx.step("send-notification", String.class, stepCtx -> {
1111
notificationService.sendApprovalRequest(callback.callbackId(), requestDetails);
1212
return "notification-sent";
1313
});
@@ -18,6 +18,16 @@ String approvalResult = callback.get();
1818

1919
The external system completes the callback by calling the Lambda Durable Functions API with the callback ID and result payload.
2020

21+
#### waitForCallback() ####
22+
23+
`waitForCallback` simplifies callback handling by combining callback creation and submission in one operation. The SDK creates the callback, executes your submitter function with the callback ID, and waits for the result.
24+
25+
```java
26+
ctx.waitForCallback("send-notification", String.class, (callbackId, stepCtx) -> {
27+
notificationService.sendApprovalRequest(callbackId, requestDetails);
28+
})
29+
```
30+
2131
#### Callback Configuration
2232

2333
Configure timeouts and serialization to handle cases where callbacks are never completed or need custom deserialization:
@@ -30,6 +40,12 @@ var config = CallbackConfig.builder()
3040
.build();
3141

3242
var callback = ctx.createCallback("approval", String.class, config);
43+
44+
var waitForCallbackConfig = WaitForCallbackConfig.builder()
45+
.callbackConfig(config)
46+
.stepConfig(StepConfig.builder().retryStrategy(...).build())
47+
.build();
48+
ctx.waitForCallback("approval", String.class, callbackId -> sendApprovalRequest(callbackId), waitForCallbackConfig);
3349
```
3450

3551
| Option | Description |

examples/src/main/java/software/amazon/lambda/durable/examples/CallbackExample.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,10 @@ public String handleRequest(ApprovalRequest input, DurableContext context) {
4444

4545
var config = CallbackConfig.builder().timeout(timeout).build();
4646

47+
var preapprovalCallback = context.waitForCallbackAsync("preapproval", String.class, (callbackId, ctx) -> {
48+
ctx.getLogger().info("Sending callback {} to preapproval system", callbackId);
49+
});
50+
4751
var callback = context.createCallback("approval", String.class, config);
4852

4953
// Step 2.5: Log AWS CLI command to complete the callback
@@ -57,15 +61,14 @@ public String handleRequest(ApprovalRequest input, DurableContext context) {
5761
return null;
5862
});
5963

64+
var preapprovalResult = preapprovalCallback.get();
65+
6066
// Step 3: Wait for external approval (suspends execution)
6167
var approvalResult = callback.get();
6268

6369
// Step 4: Process the approval
64-
var result = context.step("process-approval", String.class, () -> {
65-
return prepared + " - " + approvalResult;
66-
});
67-
68-
return result;
70+
return context.step(
71+
"process-approval", String.class, () -> prepared + " - " + preapprovalResult + " - " + approvalResult);
6972
}
7073
}
7174

examples/src/test/java/software/amazon/lambda/durable/examples/CallbackExampleTest.java

Lines changed: 45 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import static org.junit.jupiter.api.Assertions.*;
66

77
import org.junit.jupiter.api.Test;
8+
import software.amazon.awssdk.services.lambda.model.ErrorObject;
89
import software.amazon.awssdk.services.lambda.model.OperationStatus;
910
import software.amazon.awssdk.services.lambda.model.OperationType;
1011
import software.amazon.lambda.durable.model.ExecutionStatus;
@@ -46,11 +47,53 @@ void testCallbackExampleCompletesAfterApproval() {
4647
var callbackId = runner.getCallbackId("approval");
4748
runner.completeCallback(callbackId, "\"Approved by manager\"");
4849

49-
// Second run - callback complete, finishes processing
50+
result = runner.run(input);
51+
assertEquals(ExecutionStatus.PENDING, result.getStatus());
52+
53+
// second run - pending preapproval
54+
var preapprovalCallbackId = runner.getCallbackId("preapproval-callback");
55+
runner.completeCallback(preapprovalCallbackId, "\"Sent to preapprover\"");
56+
57+
// third run - callback complete, finishes processing
5058
result = runner.run(input);
5159

5260
assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus());
5361
assertEquals(
54-
"Approval request for: New laptop ($1500.0) - Approved by manager", result.getResult(String.class));
62+
"Approval request for: New laptop ($1500.0) - Sent to preapprover - Approved by manager",
63+
result.getResult(String.class));
64+
}
65+
66+
@Test
67+
void testCallbackExampleFail() {
68+
var handler = new CallbackExample();
69+
var runner = LocalDurableTestRunner.create(ApprovalRequest.class, handler);
70+
71+
var input = new ApprovalRequest("New laptop", 1500.00);
72+
73+
// First run - suspends waiting for callback
74+
var result = runner.run(input);
75+
assertEquals(ExecutionStatus.PENDING, result.getStatus());
76+
77+
// Simulate external system approving the request
78+
var callbackId = runner.getCallbackId("approval");
79+
runner.completeCallback(callbackId, "\"Approved by manager\"");
80+
81+
result = runner.run(input);
82+
assertEquals(ExecutionStatus.PENDING, result.getStatus());
83+
84+
// second run - pending preapproval
85+
var preapprovalCallbackId = runner.getCallbackId("preapproval-callback");
86+
runner.failCallback(
87+
preapprovalCallbackId,
88+
ErrorObject.builder()
89+
.errorType("error type")
90+
.errorMessage("error message")
91+
.build());
92+
93+
// third run - callback complete, finishes processing
94+
result = runner.run(input);
95+
96+
assertEquals(ExecutionStatus.FAILED, result.getStatus());
97+
assertEquals("error message", result.getError().get().errorMessage());
5598
}
5699
}

examples/src/test/java/software/amazon/lambda/durable/examples/CloudBasedIntegrationTest.java

Lines changed: 53 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -273,12 +273,20 @@ void testErrorHandlingExample() {
273273
}
274274

275275
@Test
276-
void testCallbackExample() throws Exception {
276+
void testCallbackExample() {
277+
// happy case covering both createCallback (approval) and waitForCallback (preapproval-callback)
277278
var runner = CloudDurableTestRunner.create(arn("callback-example"), ApprovalRequest.class, String.class);
279+
var lambda = LambdaClient.create();
278280

279281
// Start async execution
280282
var execution = runner.startAsync(new ApprovalRequest("Purchase order", 5000.0));
281283

284+
// Complete the preapproval callback
285+
execution.pollUntil(exec -> exec.hasCallback("preapproval-callback"));
286+
var preapprovalCallbackId = execution.getCallbackId("preapproval-callback");
287+
lambda.sendDurableExecutionCallbackSuccess(
288+
req -> req.callbackId(preapprovalCallbackId).result(SdkBytes.fromUtf8String("\"preapproved\"")));
289+
282290
// Wait for callback to appear
283291
execution.pollUntil(exec -> exec.hasCallback("approval"));
284292

@@ -287,7 +295,6 @@ void testCallbackExample() throws Exception {
287295
assertNotNull(callbackId);
288296

289297
// Complete the callback using AWS SDK
290-
var lambda = LambdaClient.create();
291298
lambda.sendDurableExecutionCallbackSuccess(
292299
req -> req.callbackId(callbackId).result(SdkBytes.fromUtf8String("\"approved\"")));
293300

@@ -297,6 +304,7 @@ void testCallbackExample() throws Exception {
297304

298305
var finalResult = result.getResult(String.class);
299306
assertNotNull(finalResult);
307+
assertTrue(finalResult.contains("preapproved"));
300308
assertTrue(finalResult.contains("Approval request for: Purchase order"));
301309
assertTrue(finalResult.contains("5000"));
302310
assertTrue(finalResult.contains("approved"));
@@ -310,10 +318,16 @@ void testCallbackExample() throws Exception {
310318
@Test
311319
void testCallbackExampleWithFailure() {
312320
var runner = CloudDurableTestRunner.create(arn("callback-example"), ApprovalRequest.class, String.class);
321+
var lambda = LambdaClient.create();
313322

314323
// Start async execution
315324
var execution = runner.startAsync(new ApprovalRequest("Purchase order", 5000.0));
316325

326+
execution.pollUntil(exec -> exec.hasCallback("preapproval-callback"));
327+
var preapprovalCallbackId = execution.getCallbackId("preapproval-callback");
328+
lambda.sendDurableExecutionCallbackSuccess(
329+
req -> req.callbackId(preapprovalCallbackId).result(SdkBytes.fromUtf8String("\"preapproved\"")));
330+
317331
// Wait for callback to appear
318332
execution.pollUntil(exec -> exec.hasCallback("approval"));
319333

@@ -322,7 +336,6 @@ void testCallbackExampleWithFailure() {
322336
assertNotNull(callbackId);
323337

324338
// Fail the callback using AWS SDK
325-
var lambda = LambdaClient.create();
326339
lambda.sendDurableExecutionCallbackFailure(req -> req.callbackId(callbackId)
327340
.error(err -> err.errorType("ApprovalRejected").errorMessage("Approval rejected by manager")));
328341

@@ -343,10 +356,16 @@ void testCallbackExampleWithFailure() {
343356
@Test
344357
void testCallbackExampleWithTimeout() {
345358
var runner = CloudDurableTestRunner.create(arn("callback-example"), ApprovalRequest.class, String.class);
359+
var lambda = LambdaClient.create();
346360

347361
// Start async execution with 10 second timeout
348362
var execution = runner.startAsync(new ApprovalRequest("Purchase order", 5000.0, 10));
349363

364+
execution.pollUntil(exec -> exec.hasCallback("preapproval-callback"));
365+
var preapprovalCallbackId = execution.getCallbackId("preapproval-callback");
366+
lambda.sendDurableExecutionCallbackSuccess(
367+
req -> req.callbackId(preapprovalCallbackId).result(SdkBytes.fromUtf8String("\"preapproved\"")));
368+
350369
// Wait for callback to appear
351370
execution.pollUntil(exec -> exec.hasCallback("approval"));
352371

@@ -364,6 +383,37 @@ void testCallbackExampleWithTimeout() {
364383
assertEquals(OperationStatus.TIMED_OUT, approvalOp.getStatus());
365384
}
366385

386+
@Test
387+
void testCallbackExampleWithWaitForCallbackFailure() {
388+
// fail the waitForCallback (preapproval-callback) callback
389+
var runner = CloudDurableTestRunner.create(arn("callback-example"), ApprovalRequest.class, String.class);
390+
var lambda = LambdaClient.create();
391+
392+
// Start async execution with 10 second timeout
393+
var execution = runner.startAsync(new ApprovalRequest("Purchase order", 5000.0, 10));
394+
395+
execution.pollUntil(exec -> exec.hasCallback("preapproval-callback"));
396+
var preapprovalCallbackId = execution.getCallbackId("preapproval-callback");
397+
lambda.sendDurableExecutionCallbackFailure(
398+
req -> req.callbackId(preapprovalCallbackId).error(err -> err.errorMessage("preapproval denied")));
399+
400+
// Wait for callback to appear
401+
execution.pollUntil(exec -> exec.hasCallback("approval"));
402+
403+
// Get callback ID but don't complete it - let it timeout
404+
var callbackId = execution.getCallbackId("approval");
405+
assertNotNull(callbackId);
406+
407+
// Wait for execution to complete (should timeout after 10 seconds)
408+
var result = execution.pollUntilComplete();
409+
assertEquals(ExecutionStatus.FAILED, result.getStatus());
410+
411+
// Verify the callback operation shows timeout status
412+
var approvalOp = execution.getOperation("preapproval-callback");
413+
assertNotNull(approvalOp);
414+
assertEquals(OperationStatus.FAILED, approvalOp.getStatus());
415+
}
416+
367417
@Test
368418
void testChildContextExample() {
369419
var runner = CloudDurableTestRunner.create(arn("child-context-example"), GreetingRequest.class, String.class);

examples/src/test/java/software/amazon/lambda/durable/examples/GenericTypesExampleTest.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,7 @@ class GenericTypesExampleTest {
1212
@Test
1313
void testGenericTypesExample() {
1414
var handler = new GenericTypesExample();
15-
var runner = LocalDurableTestRunner.create(GenericTypesExample.Input.class, handler)
16-
.withSkipTime(true);
15+
var runner = LocalDurableTestRunner.create(GenericTypesExample.Input.class, handler);
1716

1817
var input = new GenericTypesExample.Input("user123");
1918
var result = runner.run(input);
@@ -48,8 +47,7 @@ void testGenericTypesExample() {
4847
@Test
4948
void testOperationTracking() {
5049
var handler = new GenericTypesExample();
51-
var runner = LocalDurableTestRunner.create(GenericTypesExample.Input.class, handler)
52-
.withSkipTime(true);
50+
var runner = LocalDurableTestRunner.create(GenericTypesExample.Input.class, handler);
5351

5452
var input = new GenericTypesExample.Input("user456");
5553
var result = runner.run(input);

sdk-integration-tests/src/test/java/software/amazon/lambda/durable/CallbackIntegrationTest.java

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@
1010
import software.amazon.awssdk.services.lambda.model.ErrorObject;
1111
import software.amazon.awssdk.services.lambda.model.OperationStatus;
1212
import software.amazon.awssdk.services.lambda.model.OperationType;
13+
import software.amazon.lambda.durable.exception.CallbackFailedException;
14+
import software.amazon.lambda.durable.exception.CallbackTimeoutException;
1315
import software.amazon.lambda.durable.model.ExecutionStatus;
1416
import software.amazon.lambda.durable.serde.JacksonSerDes;
1517
import software.amazon.lambda.durable.serde.SerDes;
@@ -259,4 +261,92 @@ void callbackFailedExceptionHandlesVariousErrorFormats() {
259261
assertNotNull(result.getError().get().stackTrace());
260262
assertEquals(1, result.getError().get().stackTrace().size());
261263
}
264+
265+
@Test
266+
void waitForCallbackCallbackFailed() {
267+
var runner = LocalDurableTestRunner.create(String.class, (input, ctx) -> {
268+
try {
269+
ctx.waitForCallback("approval", String.class, (callbackId, stepCtx) -> {});
270+
fail();
271+
return "should not reach here";
272+
} catch (Exception e) {
273+
assertInstanceOf(CallbackFailedException.class, e);
274+
throw e;
275+
}
276+
});
277+
278+
// First run - creates callback
279+
runner.run("test");
280+
281+
// Fail callback with errorType, errorMessage, and stack trace
282+
var callbackId = runner.getCallbackId("approval-callback");
283+
var error = ErrorObject.builder()
284+
.errorType("ValidationError")
285+
.errorMessage("Invalid input data")
286+
.stackTrace(java.util.List.of("com.example.Service|validate|Service.java|42"))
287+
.build();
288+
runner.failCallback(callbackId, error);
289+
290+
// Second run - should fail with formatted message and preserved stack trace
291+
var result = runner.run("test");
292+
assertEquals(ExecutionStatus.FAILED, result.getStatus());
293+
assertTrue(result.getError().isPresent());
294+
assertEquals("ValidationError", result.getError().get().errorType());
295+
assertEquals("Invalid input data", result.getError().get().errorMessage());
296+
assertNotNull(result.getError().get().stackTrace());
297+
assertEquals(1, result.getError().get().stackTrace().size());
298+
}
299+
300+
@Test
301+
void waitForCallbackCallbackTimeout() {
302+
var runner = LocalDurableTestRunner.create(String.class, (input, ctx) -> {
303+
try {
304+
ctx.waitForCallback("approval", String.class, (callbackId, stepCtx) -> {});
305+
fail();
306+
return "should not reach here";
307+
} catch (Exception e) {
308+
assertInstanceOf(CallbackTimeoutException.class, e);
309+
throw e;
310+
}
311+
});
312+
313+
// First run - creates callback
314+
runner.run("test");
315+
316+
// Fail callback with errorType, errorMessage, and stack trace
317+
var callbackId = runner.getCallbackId("approval-callback");
318+
runner.timeoutCallback(callbackId);
319+
320+
// Second run - should fail with formatted message and preserved stack trace
321+
var result = runner.run("test");
322+
assertEquals(ExecutionStatus.FAILED, result.getStatus());
323+
}
324+
325+
@Test
326+
void waitForCallbackCallbackFailedWithUserException() {
327+
var runner = LocalDurableTestRunner.create(String.class, (input, ctx) -> {
328+
var errorMessage = "user exception";
329+
try {
330+
return ctx.waitForCallback("approval", String.class, (callbackId, stepCtx) -> {
331+
// original exception
332+
throw new IllegalArgumentException(errorMessage);
333+
});
334+
} catch (Exception e) {
335+
assertInstanceOf(IllegalArgumentException.class, e);
336+
assertEquals(errorMessage, e.getMessage());
337+
throw e;
338+
}
339+
});
340+
341+
// First run - creates callback
342+
runner.run("test");
343+
344+
// Fail callback with errorType, errorMessage, and stack trace
345+
var callbackId = runner.getCallbackId("approval-callback");
346+
runner.timeoutCallback(callbackId);
347+
348+
// Second run - should fail with formatted message and preserved stack trace
349+
var result = runner.runUntilComplete("test");
350+
assertEquals(ExecutionStatus.FAILED, result.getStatus());
351+
}
262352
}

0 commit comments

Comments
 (0)