Skip to content

Commit a49562d

Browse files
committed
add docs for CompletableFuture
fix local runner with skipTime fix handling of SuspendExecutionException in ChildContextOperation
1 parent f40431b commit a49562d

File tree

17 files changed

+429
-296
lines changed

17 files changed

+429
-296
lines changed
Lines changed: 175 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,177 @@
11
# ADR-003: CompletableFuture-Based Operation Coordination
22

3-
**Status:** Todo
4-
**Date:** 2026-02-18
3+
**Status:** Review
4+
**Date:** 2026-02-18
5+
6+
## Context
7+
8+
Currently, the SDK employs a Phaser-based mechanism for coordinating operations. The design is detailed in [ADR-002: Phaser-Based Operation Coordination](002-phaser-based-coordination.md).
9+
10+
With this design, we can:
11+
12+
- Register a thread when it begins and deregister it when it completes;
13+
- Block `DurableFuture::get()` calls until the operation completes;
14+
- Suspend execution when no registered thread exists.
15+
16+
However, this design has a few issues:
17+
18+
- We allow the Phasers to advance over predefined phase ranges (0 - RUNNING, 1 - COMPLETE). If we received duplicate completion updates from local runner or backend API, the phase could be advanced to 2, 3, and so on.
19+
- We assume that there is only one party during operation replay, and two parties when receiving an operation state from checkpoint API. We call Phaser `arriveAndAwaitAdvance` once or twice based on this assumption, but it could be incorrect. In complex scenarios, this could lead to a deadlock (not enough arrive calls) or exceeding the phase range (too many arrive calls).
20+
- The Phaser has higher complexity and cognitive overhead compared to other synchronization mechanisms.
21+
22+
## Decision
23+
24+
We will implement operation coordination using `CompletableFuture`.,
25+
26+
### Threads
27+
28+
Each piece of user code (e.g. the main Lambda function body, a step body, a child context body) runs in its own user thread from the user thread pool.
29+
Execution manager tracks active running user threads.
30+
When a new step or a new child context is created, a new thread is created and registered in execution manager.
31+
When the step or the child context completes, the corresponding thread is deregistered from execution manager.
32+
When the user code is blocked on `DurableFuture::get()` or another synchronous durable operation (e.g., `wait()`), the caller thread is deregistered from execution manager.
33+
When there is no registered thread in execution manager, the durable execution is suspended.
34+
35+
A special SDK thread is created and managed by the SDK to make checkpoint API requests.
36+
37+
### CompletableFuture
38+
39+
The `CompletableFuture` is used to manage the completion of operations. It allows us to track the progress of operations and handle their completion in a more flexible and readable manner.
40+
41+
Each durable operation has a `CompletableFuture` field.
42+
This field is used by user threads and the SDK thread communicate the completion of operations.
43+
44+
For example, when a context executes a step, the communication occurs as follows
45+
46+
```mermaid
47+
sequenceDiagram
48+
participant Context as Context Thread
49+
participant Future as CompletableFuture
50+
participant EM as Execution Manager
51+
participant SDK as SDK Thread
52+
participant Step as Step Thread
53+
54+
Note over Context: calling context.stepAsync()
55+
Context->>Context: create StepOperation
56+
Context->>Future: create CompletableFuture
57+
Note over EM: Step Thread lifecycle in EM
58+
Context->>EM: register Step Thread
59+
activate Step
60+
activate EM
61+
Context->>+Step: create Step Thread
62+
Note over Context: calling step.get()
63+
Context->>Future: check if CompletableFuture is done
64+
alt is not done
65+
Context->>EM: deregister Context Thread
66+
Context->>Future: attach a callback to register context thread when CompletableFuture is done
67+
Context->>Future: wait for CompletableFuture to complete
68+
Note over Context: (BLOCKED)
69+
end
70+
71+
Note over Step: executing Step logic
72+
Step->>Step: execute user function
73+
Step->>+SDK: checkpoint SUCCESS
74+
SDK->>SDK: call checkpoint API
75+
SDK->>SDK: handle checkpoint response
76+
SDK->>+Future: complete CompletableFuture
77+
alt callback attached
78+
Future->>EM: register Context Thread
79+
Future->>Context: unblock Context Thread
80+
Note over Context: (UNBLOCKED)
81+
end
82+
Future-->>-SDK: CompletableFuture completed
83+
SDK-->>-Step: checkpoint done
84+
Context->>Context: retrieve the step result
85+
Step->>EM: deregister Step thread
86+
deactivate Step
87+
deactivate EM
88+
89+
```
90+
91+
| | Context Thread | Step Thread | SDK Thread |
92+
|---|-------------------------------------------------------------------------------------------------------------|---------------------------------------------------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
93+
| 1 | create StepOperation (a CompletableFuture is created) | (not created) | (idle) |
94+
| 2 | checkpoint START event (synchronously or asynchronously) | (not created) | call checkpoint API |
95+
| 3 | create and register the Step thread | execute user code for the step | (idle) |
96+
| 4 | call `DurableFuture::get()`, deregister the context thread and wait for the `CompletableFuture` to complete | (continue) | (idle) |
97+
| 5 | (blocked) | checkpoint the step result and wait for checkpoint call to complete | call checkpoint API, and handle the API response. If it is a terminal response, complete the step operation CompletableFuture, register and unblock the context thread. |
98+
| 6 | retrieve the result of the step | deregister and terminate the Step thread | (idle) |
99+
100+
If the step code completes quickly, an alternative scenario could happen as follows
101+
102+
```mermaid
103+
sequenceDiagram
104+
participant Context as Context Thread
105+
participant Future as CompletableFuture
106+
participant EM as Execution Manager
107+
participant SDK as SDK Thread
108+
participant Step as Step Thread
109+
110+
Note over Context: calling context.stepAsync()
111+
Context->>Context: create StepOperation
112+
Context->>Future: create CompletableFuture
113+
Note over EM: Step Thread lifecycle in EM
114+
Context->>EM: register Step Thread
115+
activate EM
116+
Context->>Step: create Step Thread
117+
activate Step
118+
Step->>Step: execute user function
119+
Step->>EM: checkpoint SUCCESS
120+
EM->>SDK: checkpoint SUCCESS
121+
activate SDK
122+
SDK->>SDK: call checkpoint API
123+
SDK->>SDK: handle checkpoint response
124+
SDK->>+Future: complete CompletableFuture
125+
Note over Future: no callback attached
126+
Future-->>-SDK: CompletableFuture completed
127+
SDK-->>Step: checkpoint done
128+
deactivate SDK
129+
Step->>EM: deregister Step thread
130+
deactivate EM
131+
deactivate Step
132+
133+
Note over Context: calling step.get()
134+
Context->>Future: check if CompletableFuture is done
135+
alt is done
136+
Context->>Context: retrieve the step result
137+
end
138+
139+
140+
```
141+
142+
| | Context Thread | Step Thread | SDK Thread |
143+
|---|---------------------------------------------------------------------------------------------|---------------------------------------------------------------------|--------------------------------------------------------------------------------------------------------------------------------|
144+
| 1 | create StepOperation (a CompletableFuture is created) | (not created) | (idle) |
145+
| 2 | checkpoint START event (synchronously or asynchronously) | (not created) | call checkpoint API |
146+
| 3 | create and register the Step thread | execute user code for the step and complete quickly | (idle) |
147+
| 5 | (do something else or just get starved) | checkpoint the step result and wait for checkpoint call to complete | call checkpoint API, and handle the API response. If it is a terminal response, complete the Step operation CompletableFuture. |
148+
| 4 | call `DurableFuture::get()` (non-blocking because `CompletableFuture` is already completed) | deregister and terminate the Step thread | (idle) |
149+
| 6 | retrieve the result of the step | (ended) | (idle) |
150+
151+
The following two key mechanisms make `CompletableFuture` based solution work properly.
152+
153+
- Strict ordering of `register and unblock the context thread` and `deregister and terminate the Step thread`.
154+
- When a step completes, it calls checkpoint API to checkpoint the result and wait for the checkpoint call to complete.
155+
- SDK thread receives the checkpoint request, makes the API call, and processes the API response.
156+
- If the response contains a terminal operation state (it should for a succeeded or failed step), it will send the response to the `StepOperation` to complete `CompletableFuture`. When completing the future, the attached completion stages will be executed synchronously, which will register any context threads that are waiting for the result of the step.
157+
- When SDK thread completes the API request and registers all waiting threads, the step thread continues to deregister itself from execution manager.
158+
- Synchronized access to `CompletableFuture`.
159+
- When a context thread calls `DurableFuture::get()`, it checks if `CompletableFuture` is done.
160+
1. If the future is done, `get()` will return the operation result. Otherwise, the context thread will
161+
2. deregister itself from execution manager;
162+
3. attach a completion stage to `CompletableFuture` that will re-register the context thread when later the future is completed;
163+
4. wait for `CompletableFuture` to complete.
164+
- Meantime, `CompletableFuture` can be completed by SDK thread when handling the checkpoint API responses.
165+
- A race condition will occur if this happens when the context thread is between the step `a` and `c`.
166+
- To prevent the race condition, all the mutating access to `CompletableFuture` either to complete the future or to attach a completion stage is synchronized.
167+
168+
## Consequences
169+
170+
Enables:
171+
- Support for complex scenarios which were not supported by Phaser
172+
- Reduced implementation complexity and improved readability
173+
- `CompletableFuture` based implementation of `DurableFuture::allOf` and `DurableFuture::anyOf`
174+
175+
Cost:
176+
- Synchronized access to `CompletableFuture`
177+
- Obscured ordering of thread registration/deregistration

sdk-integration-tests/src/test/java/com/amazonaws/lambda/durable/ChildContextIntegrationTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -221,7 +221,7 @@ void waitInsideChildContextReturnsPendingThenCompletes() {
221221
runner.advanceTime();
222222

223223
// Second run - should complete
224-
var result2 = runner.run("test");
224+
var result2 = runner.runUntilComplete("test");
225225
assertEquals(ExecutionStatus.SUCCEEDED, result2.getStatus());
226226
assertEquals("done", result2.getResult(String.class));
227227
}

sdk-testing/src/main/java/com/amazonaws/lambda/durable/testing/HistoryEventProcessor.java

Lines changed: 30 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ public <O> TestResult<O> processEvents(List<Event> events, Class<O> outputType)
2626
var operationEvents = new HashMap<String, List<Event>>();
2727
var status = ExecutionStatus.PENDING;
2828
String result = null;
29+
ErrorObject error = null;
2930

3031
for (var event : events) {
3132
var eventType = event.eventType();
@@ -51,9 +52,34 @@ public <O> TestResult<O> processEvents(List<Event> events, Class<O> outputType)
5152
result = details.result().payload();
5253
}
5354
}
54-
case EXECUTION_FAILED -> status = ExecutionStatus.FAILED;
55-
case EXECUTION_TIMED_OUT -> status = ExecutionStatus.FAILED;
56-
case EXECUTION_STOPPED -> status = ExecutionStatus.FAILED;
55+
case EXECUTION_FAILED -> {
56+
status = ExecutionStatus.FAILED;
57+
var details = event.executionFailedDetails();
58+
if (details != null
59+
&& details.error() != null
60+
&& details.error().payload() != null) {
61+
error = details.error().payload();
62+
}
63+
}
64+
case EXECUTION_TIMED_OUT -> {
65+
status = ExecutionStatus.FAILED;
66+
var details = event.executionTimedOutDetails();
67+
if (details != null
68+
&& details.error() != null
69+
&& details.error().payload() != null) {
70+
error = details.error().payload();
71+
}
72+
}
73+
case EXECUTION_STOPPED -> {
74+
status = ExecutionStatus.FAILED;
75+
76+
var details = event.executionStoppedDetails();
77+
if (details != null
78+
&& details.error() != null
79+
&& details.error().payload() != null) {
80+
error = details.error().payload();
81+
}
82+
}
5783
case STEP_STARTED -> {
5884
if (operationId != null) {
5985
operations.putIfAbsent(
@@ -186,7 +212,7 @@ public <O> TestResult<O> processEvents(List<Event> events, Class<O> outputType)
186212
testOperations.add(new TestOperation(entry.getValue(), opEvents, serDes));
187213
}
188214

189-
return new TestResult<>(status, result, null, testOperations, events, serDes);
215+
return new TestResult<>(status, result, error, testOperations, events, serDes);
190216
}
191217

192218
private Operation createStepOperation(

sdk-testing/src/main/java/com/amazonaws/lambda/durable/testing/LocalDurableTestRunner.java

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -172,20 +172,26 @@ public TestResult<O> run(I input) {
172172
return storage.toTestResult(output);
173173
}
174174

175-
/** Run until completion (SUCCEEDED or FAILED), simulating Lambda re-invocations. */
175+
/**
176+
* Run until completion (SUCCEEDED or FAILED) or pending manual intervention, simulating Lambda re-invocations.
177+
* Operations that don't require manual intervention (like WAIT in STARTED or STEP in PENDING) will be automatically
178+
* advanced.
179+
*
180+
* @param input The input to process
181+
* @return Final test result (SUCCEEDED or FAILED) or PENDING if operations pending manual intervention
182+
*/
176183
public TestResult<O> runUntilComplete(I input) {
177184
TestResult<O> result = null;
178185
for (int i = 0; i < MAX_INVOCATIONS; i++) {
179186
result = run(input);
180187

181-
if (result.getStatus() != ExecutionStatus.PENDING) {
182-
return result; // SUCCEEDED or FAILED - we're done
183-
}
184-
185-
if (skipTime) {
186-
storage.advanceReadyOperations(); // Auto-advance and continue loop
187-
} else {
188-
return result; // Return PENDING - let test manually advance time
188+
if (result.getStatus() != ExecutionStatus.PENDING || !skipTime || !storage.advanceReadyOperations()) {
189+
// break the loop if
190+
// - Return SUCCEEDED or FAILED - we're done
191+
// - Return PENDING and let test manually advance operations if
192+
// - auto advance is disabled, or
193+
// - no operations can be auto advanced
194+
break;
189195
}
190196
}
191197
return result;

sdk-testing/src/main/java/com/amazonaws/lambda/durable/testing/LocalMemoryExecutionClient.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import java.util.UUID;
1414
import java.util.concurrent.ConcurrentHashMap;
1515
import java.util.concurrent.CopyOnWriteArrayList;
16+
import java.util.concurrent.atomic.AtomicBoolean;
1617
import java.util.concurrent.atomic.AtomicReference;
1718
import software.amazon.awssdk.services.lambda.model.*;
1819

@@ -63,10 +64,16 @@ public List<Event> getEventsForOperation(String operationId) {
6364
return allEvents.stream().filter(e -> operationId.equals(e.id())).toList();
6465
}
6566

66-
/** Advance all operations (simulates time passing for retries/waits). */
67-
public void advanceReadyOperations() {
67+
/**
68+
* Advance all operations (simulates time passing for retries/waits).
69+
*
70+
* @return true if any operations were advanced, false otherwise
71+
*/
72+
public boolean advanceReadyOperations() {
73+
var replaced = new AtomicBoolean(false);
6874
operations.replaceAll((key, op) -> {
6975
if (op.status() == OperationStatus.PENDING) {
76+
replaced.set(true);
7077
return op.toBuilder().status(OperationStatus.READY).build();
7178
}
7279
if (op.status() == OperationStatus.STARTED && op.type() == OperationType.WAIT) {
@@ -81,10 +88,12 @@ public void advanceReadyOperations() {
8188
.build();
8289
var event = eventProcessor.processUpdate(update, succeededOp);
8390
allEvents.add(event);
91+
replaced.set(true);
8492
return succeededOp;
8593
}
8694
return op;
8795
});
96+
return replaced.get();
8897
}
8998

9099
public void completeChainedInvoke(String name, OperationResult result) {

0 commit comments

Comments
 (0)