Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
177 changes: 175 additions & 2 deletions docs/adr/003-completable-future-based-coordination.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,177 @@
# ADR-003: CompletableFuture-Based Operation Coordination

**Status:** Todo
**Date:** 2026-02-18
**Status:** Review
**Date:** 2026-02-18

## Context

Currently, the SDK employs a Phaser-based mechanism for coordinating operations. The design is detailed in [ADR-002: Phaser-Based Operation Coordination](002-phaser-based-coordination.md).

With this design, we can:

- Register a thread when it begins and deregister it when it completes;
- Block `DurableFuture::get()` calls until the operation completes;
- Suspend execution when no registered thread exists.

However, this design has a few issues:

- We allow the Phasers to advance over predefined phase ranges (0 - RUNNING, 1 - COMPLETE). If we received duplicate completion updates from local runner or backend API, the phase could be advanced to 2, 3, and so on.
- We assume that there is only one party during operation replay, and two parties when receiving an operation state from checkpoint API. We call Phaser `arriveAndAwaitAdvance` once or twice based on this assumption, but it could be incorrect. In complex scenarios, this could lead to a deadlock (not enough arrive calls) or exceeding the phase range (too many arrive calls).
- The Phaser has higher complexity and cognitive overhead compared to other synchronization mechanisms.

## Decision

We will implement operation coordination using `CompletableFuture`.,

### Threads

Each piece of user code (e.g. the main Lambda function body, a step body, a child context body) runs in its own user thread from the user thread pool.
Execution manager tracks active running user threads.
When a new step or a new child context is created, a new thread is created and registered in execution manager.
When the step or the child context completes, the corresponding thread is deregistered from execution manager.
When the user code is blocked on `DurableFuture::get()` or another synchronous durable operation (e.g., `wait()`), the caller thread is deregistered from execution manager.
When there is no registered thread in execution manager, the durable execution is suspended.

A special SDK thread is created and managed by the SDK to make checkpoint API requests.

### CompletableFuture

The `CompletableFuture` is used to manage the completion of operations. It allows us to track the progress of operations and handle their completion in a more flexible and readable manner.

Each durable operation has a `CompletableFuture` field.
This field is used by user threads and the SDK thread communicate the completion of operations.

For example, when a context executes a step, the communication occurs as follows

```mermaid
sequenceDiagram
participant Context as Context Thread
participant Future as CompletableFuture
participant EM as Execution Manager
participant SDK as SDK Thread
participant Step as Step Thread

Note over Context: calling context.stepAsync()
Context->>Context: create StepOperation
Context->>Future: create CompletableFuture
Note over EM: Step Thread lifecycle in EM
Context->>EM: register Step Thread
activate Step
activate EM
Context->>+Step: create Step Thread
Note over Context: calling step.get()
Context->>Future: check if CompletableFuture is done
alt is not done
Context->>EM: deregister Context Thread
Context->>Future: attach a callback to register context thread when CompletableFuture is done
Context->>Future: wait for CompletableFuture to complete
Note over Context: (BLOCKED)
end

Note over Step: executing Step logic
Step->>Step: execute user function
Step->>+SDK: checkpoint SUCCESS
SDK->>SDK: call checkpoint API
SDK->>SDK: handle checkpoint response
SDK->>+Future: complete CompletableFuture
alt callback attached
Future->>EM: register Context Thread
Future->>Context: unblock Context Thread
Note over Context: (UNBLOCKED)
end
Future-->>-SDK: CompletableFuture completed
SDK-->>-Step: checkpoint done
Context->>Context: retrieve the step result
Step->>EM: deregister Step thread
deactivate Step
deactivate EM

```

| | Context Thread | Step Thread | SDK Thread |
|---|-------------------------------------------------------------------------------------------------------------|---------------------------------------------------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 1 | create StepOperation (a CompletableFuture is created) | (not created) | (idle) |
| 2 | checkpoint START event (synchronously or asynchronously) | (not created) | call checkpoint API |
| 3 | create and register the Step thread | execute user code for the step | (idle) |
| 4 | call `DurableFuture::get()`, deregister the context thread and wait for the `CompletableFuture` to complete | (continue) | (idle) |
| 5 | (blocked) | checkpoint the step result and wait for checkpoint call to complete | call checkpoint API, and handle the API response. If it is a terminal response, complete the step operation CompletableFuture, register and unblock the context thread. |
| 6 | retrieve the result of the step | deregister and terminate the Step thread | (idle) |

If the step code completes quickly, an alternative scenario could happen as follows

```mermaid
sequenceDiagram
participant Context as Context Thread
participant Future as CompletableFuture
participant EM as Execution Manager
participant SDK as SDK Thread
participant Step as Step Thread

Note over Context: calling context.stepAsync()
Context->>Context: create StepOperation
Context->>Future: create CompletableFuture
Note over EM: Step Thread lifecycle in EM
Context->>EM: register Step Thread
activate EM
Context->>Step: create Step Thread
activate Step
Step->>Step: execute user function
Step->>EM: checkpoint SUCCESS
EM->>SDK: checkpoint SUCCESS
activate SDK
SDK->>SDK: call checkpoint API
SDK->>SDK: handle checkpoint response
SDK->>+Future: complete CompletableFuture
Note over Future: no callback attached
Future-->>-SDK: CompletableFuture completed
SDK-->>Step: checkpoint done
deactivate SDK
Step->>EM: deregister Step thread
deactivate EM
deactivate Step

Note over Context: calling step.get()
Context->>Future: check if CompletableFuture is done
alt is done
Context->>Context: retrieve the step result
end


```

| | Context Thread | Step Thread | SDK Thread |
|---|---------------------------------------------------------------------------------------------|---------------------------------------------------------------------|--------------------------------------------------------------------------------------------------------------------------------|
| 1 | create StepOperation (a CompletableFuture is created) | (not created) | (idle) |
| 2 | checkpoint START event (synchronously or asynchronously) | (not created) | call checkpoint API |
| 3 | create and register the Step thread | execute user code for the step and complete quickly | (idle) |
| 5 | (do something else or just get starved) | checkpoint the step result and wait for checkpoint call to complete | call checkpoint API, and handle the API response. If it is a terminal response, complete the Step operation CompletableFuture. |
| 4 | call `DurableFuture::get()` (non-blocking because `CompletableFuture` is already completed) | deregister and terminate the Step thread | (idle) |
| 6 | retrieve the result of the step | (ended) | (idle) |

The following two key mechanisms make `CompletableFuture` based solution work properly.

- Strict ordering of `register and unblock the context thread` and `deregister and terminate the Step thread`.
- When a step completes, it calls checkpoint API to checkpoint the result and wait for the checkpoint call to complete.
- SDK thread receives the checkpoint request, makes the API call, and processes the API response.
- If the response contains a terminal operation state (it should for a succeeded or failed step), it will send the response to the `StepOperation` to complete `CompletableFuture`. When completing the future, the attached completion stages will be executed synchronously, which will register any context threads that are waiting for the result of the step.
- When SDK thread completes the API request and registers all waiting threads, the step thread continues to deregister itself from execution manager.
- Synchronized access to `CompletableFuture`.
- When a context thread calls `DurableFuture::get()`, it checks if `CompletableFuture` is done.
1. If the future is done, `get()` will return the operation result. Otherwise, the context thread will
2. deregister itself from execution manager;
3. attach a completion stage to `CompletableFuture` that will re-register the context thread when later the future is completed;
4. wait for `CompletableFuture` to complete.
- Meantime, `CompletableFuture` can be completed by SDK thread when handling the checkpoint API responses.
- A race condition will occur if this happens when the context thread is between the step `a` and `c`.
- To prevent the race condition, all the mutating access to `CompletableFuture` either to complete the future or to attach a completion stage is synchronized.

## Consequences

Enables:
- Support for complex scenarios which were not supported by Phaser
- Reduced implementation complexity and improved readability
- `CompletableFuture` based implementation of `DurableFuture::allOf` and `DurableFuture::anyOf`

Cost:
- Synchronized access to `CompletableFuture`
- Obscured ordering of thread registration/deregistration
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ void waitInsideChildContextReturnsPendingThenCompletes() {
runner.advanceTime();

// Second run - should complete
var result2 = runner.run("test");
var result2 = runner.runUntilComplete("test");
assertEquals(ExecutionStatus.SUCCEEDED, result2.getStatus());
assertEquals("done", result2.getResult(String.class));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ public <O> TestResult<O> processEvents(List<Event> events, Class<O> outputType)
var operationEvents = new HashMap<String, List<Event>>();
var status = ExecutionStatus.PENDING;
String result = null;
ErrorObject error = null;

for (var event : events) {
var eventType = event.eventType();
Expand All @@ -51,9 +52,34 @@ public <O> TestResult<O> processEvents(List<Event> events, Class<O> outputType)
result = details.result().payload();
}
}
case EXECUTION_FAILED -> status = ExecutionStatus.FAILED;
case EXECUTION_TIMED_OUT -> status = ExecutionStatus.FAILED;
case EXECUTION_STOPPED -> status = ExecutionStatus.FAILED;
case EXECUTION_FAILED -> {
status = ExecutionStatus.FAILED;
var details = event.executionFailedDetails();
if (details != null
&& details.error() != null
&& details.error().payload() != null) {
error = details.error().payload();
}
}
case EXECUTION_TIMED_OUT -> {
status = ExecutionStatus.FAILED;
var details = event.executionTimedOutDetails();
if (details != null
&& details.error() != null
&& details.error().payload() != null) {
error = details.error().payload();
}
}
case EXECUTION_STOPPED -> {
status = ExecutionStatus.FAILED;

var details = event.executionStoppedDetails();
if (details != null
&& details.error() != null
&& details.error().payload() != null) {
error = details.error().payload();
}
}
case STEP_STARTED -> {
if (operationId != null) {
operations.putIfAbsent(
Expand Down Expand Up @@ -186,7 +212,7 @@ public <O> TestResult<O> processEvents(List<Event> events, Class<O> outputType)
testOperations.add(new TestOperation(entry.getValue(), opEvents, serDes));
}

return new TestResult<>(status, result, null, testOperations, events, serDes);
return new TestResult<>(status, result, error, testOperations, events, serDes);
}

private Operation createStepOperation(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,20 +172,26 @@ public TestResult<O> run(I input) {
return storage.toTestResult(output);
}

/** Run until completion (SUCCEEDED or FAILED), simulating Lambda re-invocations. */
/**
* Run until completion (SUCCEEDED or FAILED) or pending manual intervention, simulating Lambda re-invocations.
* Operations that don't require manual intervention (like WAIT in STARTED or STEP in PENDING) will be automatically
* advanced.
*
* @param input The input to process
* @return Final test result (SUCCEEDED or FAILED) or PENDING if operations pending manual intervention
*/
public TestResult<O> runUntilComplete(I input) {
TestResult<O> result = null;
for (int i = 0; i < MAX_INVOCATIONS; i++) {
result = run(input);

if (result.getStatus() != ExecutionStatus.PENDING) {
return result; // SUCCEEDED or FAILED - we're done
}

if (skipTime) {
storage.advanceReadyOperations(); // Auto-advance and continue loop
} else {
return result; // Return PENDING - let test manually advance time
if (result.getStatus() != ExecutionStatus.PENDING || !skipTime || !storage.advanceReadyOperations()) {
// break the loop if
// - Return SUCCEEDED or FAILED - we're done
// - Return PENDING and let test manually advance operations if
// - auto advance is disabled, or
// - no operations can be auto advanced
break;
}
}
return result;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import software.amazon.awssdk.services.lambda.model.*;

Expand Down Expand Up @@ -63,10 +64,16 @@ public List<Event> getEventsForOperation(String operationId) {
return allEvents.stream().filter(e -> operationId.equals(e.id())).toList();
}

/** Advance all operations (simulates time passing for retries/waits). */
public void advanceReadyOperations() {
/**
* Advance all operations (simulates time passing for retries/waits).
*
* @return true if any operations were advanced, false otherwise
*/
public boolean advanceReadyOperations() {
var replaced = new AtomicBoolean(false);
operations.replaceAll((key, op) -> {
if (op.status() == OperationStatus.PENDING) {
replaced.set(true);
return op.toBuilder().status(OperationStatus.READY).build();
}
if (op.status() == OperationStatus.STARTED && op.type() == OperationType.WAIT) {
Expand All @@ -81,10 +88,12 @@ public void advanceReadyOperations() {
.build();
var event = eventProcessor.processUpdate(update, succeededOp);
allEvents.add(event);
replaced.set(true);
return succeededOp;
}
return op;
});
return replaced.get();
}

public void completeChainedInvoke(String name, OperationResult result) {
Expand Down
Loading
Loading