Skip to content

Latest commit

 

History

History
593 lines (466 loc) · 30.3 KB

File metadata and controls

593 lines (466 loc) · 30.3 KB

AWS Lambda Durable Execution Java SDK - Internal Design

Note: This document is for SDK developers and contributors. For user-facing documentation, see the README.

Overview

This document explains the internal architecture, threading model, and extension points to help contributors understand how the SDK works under the hood. Core design decisions and advanced concepts are further outlined in the Architecture Decision Records.

Module Structure

aws-durable-execution-sdk-java/
├── sdk/                      # Core SDK - DurableHandler, DurableContext, operations
├── sdk-testing/              # Test utilities for local and cloud testing
├── sdk-integration-tests/    # Integration tests using LocalDurableTestRunner
└── examples/                 # Real-world usage patterns as customers would implement them
Module Purpose Key Classes
sdk Core runtime - extend DurableHandler, use DurableContext for durable operations DurableHandler, DurableContext, DurableExecutor, ExecutionManager
sdk-testing Test utilities: LocalDurableTestRunner (in-memory, simulates re-invocations and time-skipping) and CloudDurableTestRunner (executes against deployed Lambda) LocalDurableTestRunner, CloudDurableTestRunner, LocalMemoryExecutionClient, TestResult
sdk-integration-tests Dogfooding tests - validates the SDK using its own test utilities. Separate module keeps dependencies acyclic: sdksdk-testingsdk-integration-tests. Test classes only
examples Real-world usage patterns as customers would implement them, with local and cloud tests Example handlers, CloudBasedIntegrationTest

API Surface

User-Facing (DurableContext)

// Synchronous step
T step(String name, Class<T> type, Supplier<T> func)
T step(String name, Class<T> type, Supplier<T> func, StepConfig config)
T step(String name, TypeToken<T> type, Supplier<T> func)
T step(String name, TypeToken<T> type, Supplier<T> func, StepConfig config)

// Asynchronous step
DurableFuture<T> stepAsync(String name, Class<T> type, Supplier<T> func)
DurableFuture<T> stepAsync(String name, Class<T> type, Supplier<T> func, StepConfig config)
DurableFuture<T> stepAsync(String name, TypeToken<T> type, Supplier<T> func)
DurableFuture<T> stepAsync(String name, TypeToken<T> type, Supplier<T> func, StepConfig config)

// Wait
void wait(String name, Duration duration)

// Asynchronous wait
DurableFuture<Void> waitAsync(String name, Duration duration)
    
// Invoke
T invoke(String name, String functionName, U payload, Class<T> resultType)
T invoke(String name, String functionName, U payload, TypeToken<T> resultType)
T invoke(String name, String functionName, U payload, Class<T> resultType, InvokeConfig config)
T invoke(String name, String functionName, U payload, TypeToken<T> resultType, InvokeConfig config)

DurableFuture<T> invokeAsync(String name, String functionName, U payload, Class<T> resultType)
DurableFuture<T> invokeAsync(String name, String functionName, U payload, Class<T> resultType, InvokeConfig config)
DurableFuture<T> invokeAsync(String name, String functionName, U payload, TypeToken<T> resultType)
DurableFuture<T> invokeAsync(String name, String functionName, U payload, TypeToken<T> resultType, InvokeConfig config)

// Lambda context access
Context getLambdaContext()

DurableFuture

T get()  // Blocks until complete, may suspend

Handler Configuration

public class MyHandler extends DurableHandler<Input, Output> {
    @Override
    protected DurableConfig createConfiguration() {
        return DurableConfig.builder()
            .withLambdaClient(customLambdaClient)
            .withSerDes(new CustomSerDes())
            .withExecutorService(Executors.newFixedThreadPool(4))
            .build();
    }
}
Option Default
lambdaClient Auto-created LambdaClient for current region, primed for performance (see DurableConfig.java)
serDes JacksonSerDes
executorService Executors.newCachedThreadPool() (for user-defined operations only)
loggerConfig LoggerConfig.defaults() (suppress replay logs)

Thread Pool Architecture

The SDK uses two separate thread pools with distinct responsibilities:

User Executor (DurableConfig.executorService):

  • Runs user-defined operations (the code passed to ctx.step() and ctx.stepAsync())
  • Configurable via DurableConfig.builder().withExecutorService()
  • Default: cached daemon thread pool

Internal Executor (InternalExecutor.INSTANCE):

  • Runs SDK coordination tasks: checkpoint batching, polling for wait completion
  • Dedicated cached thread pool with daemon threads named durable-sdk-internal-*
  • Not configurable by users

Benefits of this separation:

Benefit Description
Isolation User operations can't starve SDK internals, and vice versa
No shutdown management Internal pool uses daemon threads; SDK coordination continues even if the user's executor is shut down
Efficient resource usage Cached thread pool creates threads on demand and reuses idle threads (60s timeout)
Daemon threads Internal threads won't prevent JVM shutdown
Single configuration point Changing InternalExecutor.INSTANCE in one place affects all SDK coordination

Example: Custom thread pool for user operations:

@Override
protected DurableConfig createConfiguration() {
    var executor = new ThreadPoolExecutor(
        4, 10,                          // core/max threads
        60L, TimeUnit.SECONDS,          // idle timeout
        new LinkedBlockingQueue<>(100), // bounded queue
        new ThreadFactoryBuilder()
            .setNameFormat("order-processor-%d")
            .setDaemon(true)
            .build());

    return DurableConfig.builder()
        .withExecutorService(executor)
        .build();
}

Step Configuration

context.step("name", Type.class, supplier,
    StepConfig.builder()
        .serDes(stepSpecificSerDes)
        .retryStrategy(RetryStrategies.exponentialBackoff(3, Duration.ofSeconds(1)))
        .semantics(AT_MOST_ONCE_PER_RETRY)
        .build());

Architecture

┌─────────────────────────────────────────────────────────────────────────┐
│                           Lambda Runtime                                │
└─────────────────────────────────────────────────────────────────────────┘
                                    │
                                    ▼
┌─────────────────────────────────────────────────────────────────────────┐
│  DurableHandler<I,O>                                                    │
│  - Entry point (RequestStreamHandler)                                   │
│  - Extracts input type via reflection                                   │
│  - Delegates to DurableExecutor                                         │
└─────────────────────────────────────────────────────────────────────────┘
                                    │
                                    ▼
┌─────────────────────────────────────────────────────────────────────────┐
│  DurableExecutor                                                        │
│  - Creates ExecutionManager, DurableContext                             │
│  - Runs handler in executor                                             │
│  - Waits for completion OR suspension                                   │
│  - Returns SUCCESS/PENDING/FAILED                                       │
└─────────────────────────────────────────────────────────────────────────┘
                                    │
                    ┌───────────────┴───────────────┐
                    ▼                               ▼
┌──────────────────────────────┐    ┌─────────────────────────────────┐
│  DurableContext              │    │  ExecutionManager               │
│  - User-facing API           │    │  - State (ops, token)           │
│  - step(), stepAsync(), etc  │    │  - Thread coordination          │
│  - wait(), waitAsync()       │    │  - Checkpoint batching          │
│  - Operation ID counter      │    │  - Checkpoint response handling │
└──────────────────────────────┘    │  - Polling                      │
            │                       └─────────────────────────────────┘
            │                                       │
            ▼                                       ▼
┌──────────────────────────────┐    ┌──────────────────────────────┐
│  Operations                  │    │  CheckpointBatcher           │
│  - StepOperation<T>          │    │  - Queues requests           │
│  - WaitOperation             │    │  - Batches API calls (750KB) │
│  - execute() / get()         │    │  - Notifies via callback     │
└──────────────────────────────┘    └──────────────────────────────┘
                                                    │
                                                    ▼
                                    ┌──────────────────────────────┐
                                    │  DurableExecutionClient      │
                                    │  - checkpoint()              │
                                    │  - getExecutionState()       │
                                    └──────────────────────────────┘

Package Structure

software.amazon.lambda.durable
├── DurableHandler<I,O>      # Entry point
├── DurableExecutor          # Lifecycle orchestration
├── DurableContext           # User API
├── DurableFuture<T>         # Async handle
├── StepConfig               # Step configuration
├── TypeToken<T>             # Generic type capture
│
├── execution/
│   ├── ExecutionManager     # Central coordinator
│   ├── ExecutionMode        # REPLAY or EXECUTION state
│   ├── CheckpointBatcher    # Batching (package-private)
│   ├── CheckpointCallback   # Callback interface
│   ├── SuspendExecutionException
│   └── ThreadType           # CONTEXT, STEP
│
├── operation/
│   ├── BaseDurableOperation<T>  # Common operation logic
│   ├── StepOperation<T>         # Step logic
│   ├── InvokeOperation<T>       # Invoke logic
│   ├── CallbackOperation<T>     # Callback logic
│   └── WaitOperation            # Wait logic
│
├── logging/
│   ├── DurableLogger        # Context-aware logger wrapper (MDC-based)
│   └── LoggerConfig         # Replay suppression config
│
├── retry/
│   ├── RetryStrategy        # Interface
│   ├── RetryStrategies      # Presets
│   ├── RetryDecision        # shouldRetry + delay
│   └── JitterStrategy       # Jitter options
│
├── client/
│   ├── DurableExecutionClient        # Interface
│   └── LambdaDurableFunctionsClient  # AWS SDK impl
│
├── model/
│   ├── DurableExecutionInput   # Lambda input
│   ├── DurableExecutionOutput  # Lambda output
│   └── ExecutionStatus         # SUCCEEDED/PENDING/FAILED
│
├── serde/
│   ├── SerDes              # Interface
│   ├── JacksonSerDes       # Jackson impl
│   └── AwsSdkV2Module      # SDK type support
│
└── exception/
    ├── DurableExecutionException
    ├── NonDeterministicExecutionException
    ├── StepFailedException
    ├── StepInterruptedException
    └── SerDesException

Sequence Diagrams

Normal Step Execution

sequenceDiagram
    participant UC as User Code
    participant DC as DurableContext
    participant SO as StepOperation
    participant EM as ExecutionManager
    participant Backend

    UC->>DC: step("name", Type.class, func)
    DC->>SO: new StepOperation(...)
    DC->>SO: execute()
    SO->>EM: sendOperationUpdate(START)
    EM->>Backend: checkpoint(START)
    
    SO->>SO: func.get() [execute user code]
    
    SO->>EM: sendOperationUpdate(SUCCEED)
    EM->>Backend: checkpoint(SUCCEED)
    
    DC->>SO: get()
    SO-->>DC: result
    DC-->>UC: result
Loading

Replay Scenario

sequenceDiagram
    participant LR as Lambda Runtime
    participant DE as DurableExecutor
    participant UC as User Code
    participant DC as DurableContext
    participant SO as StepOperation
    participant EM as ExecutionManager

    Note over LR: Re-invocation with existing state
    
    LR->>DE: execute(input with operations)
    DE->>EM: new ExecutionManager(existingOps)
    
    UC->>DC: step("step1", ...)
    DC->>SO: execute()
    SO->>EM: getOperation("1")
    EM-->>SO: existing op (SUCCEEDED)
    Note over SO: Skip execution
    DC->>SO: get()
    SO-->>DC: cached result
    DC-->>UC: result
Loading

Wait with Suspension

sequenceDiagram
    participant UC as User Code
    participant DC as DurableContext
    participant WO as WaitOperation
    participant EM as ExecutionManager
    participant Backend

    UC->>DC: wait(null, Duration.ofMinutes(5))
    DC->>WO: execute()
    WO->>EM: sendOperationUpdate(WAIT, duration)
    EM->>Backend: checkpoint
    
    DC->>WO: get()
    WO->>EM: deregisterActiveThread("Root")
    
    Note over EM: No active threads!
    EM->>EM: executionExceptionFuture.completeExceptionally(SuspendExecutionException)
    EM-->>WO: throw SuspendExecutionException
    
    Note over UC: Execution suspended, returns PENDING
Loading

Exception Hierarchy

DurableExecutionException (base)
├── StepFailedException          # Step failed after all retries
├── StepInterruptedException     # Step interrupted (AT_MOST_ONCE)
├── NonDeterministicExecutionException  # Replay mismatch
└── SerDesException              # Serialization error

SuspendExecutionException        # Internal: triggers suspension (not user-facing)
Exception Trigger Recovery
StepFailedException Step throws after exhausting retries Catch in handler or let fail
StepInterruptedException AT_MOST_ONCE step interrupted mid-execution Treat as failure
NonDeterministicExecutionException Replay finds different operation than expected Bug in handler (non-deterministic code)
SerDesException Jackson fails to serialize/deserialize Fix data model or custom SerDes

Logging Internals

Replay Mode Tracking

ExecutionManager tracks whether we're replaying completed operations or executing new ones via ExecutionMode:

  • REPLAY: Starts in this mode if operations.size() > 1 (has checkpointed operations beyond the initial EXECUTION op)
  • EXECUTION: Transitions when getOperationAndUpdateReplayState() encounters:
    • An operation ID not in the checkpoint log (new operation)
    • An operation that is NOT in a terminal state (needs to continue executing)

Terminal states (SUCCEEDED, FAILED, CANCELLED, TIMED_OUT, STOPPED) stay in REPLAY mode since we're just returning cached results.

This is a one-way transition (REPLAY → EXECUTION, never back). DurableLogger checks isReplaying() to suppress duplicate logs during replay.

MDC-Based Context Enrichment

DurableLogger uses SLF4J's MDC (Mapped Diagnostic Context) to enrich log entries with execution metadata. MDC is thread-local by design, so context is set once per thread rather than per log call for performance.

MDC Keys:

Key Set When Description
durableExecutionArn Logger construction Execution ARN
requestId Logger construction Lambda request ID
operationId Step start Current operation ID
operationName Step start Step name
attempt Step start Retry attempt number

Context Flow:

  1. DurableLogger constructor sets execution-level MDC (ARN, requestId) on the handler thread
  2. StepOperation.executeStepLogic() calls durableLogger.setOperationContext() before user code runs
  3. User code logs via context.getLogger() - MDC values automatically included
  4. clearOperationContext() called in finally block after step completes

Log Pattern Example (Log4j2):

<PatternLayout pattern="%d %-5level %logger - %msg%notEmpty{ | arn=%X{durableExecutionArn}}%notEmpty{ id=%X{operationId}}%notEmpty{ op=%X{operationName}}%notEmpty{ attempt=%X{attempt}}%n"/>

Output:

12:34:56 INFO  c.a.l.d.DurableContext - Processing order | arn=arn:aws:lambda:us-east-1:123:function:test
12:34:56 DEBUG c.a.l.d.DurableContext - Validating items | arn=arn:aws:lambda:us-east-1:123:function:test id=1 op=validate attempt=0

Backend Integration

Large Response Handling

If result > 6MB Lambda limit:

  1. Checkpoint result to backend
  2. Return empty response
  3. Backend stores and returns result

Checkpoint Batching

Multiple concurrent operations may checkpoint simultaneously. CheckpointBatcher batches these into single API calls to reduce latency and stay within the 750KB request limit.

Currently uses micro-batching: batches only what accumulates during the polling thread scheduling overhead. Early tests suggest this window may be too short for effective batching—an artificial delay might need to be introduced.

StepOperation 1 ──┐
                  │
StepOperation 2 ──┼──► CheckpointBatcher ──► Backend
                  │
WaitOperation ────┘

Callback mechanism avoids cyclic dependency between ExecutionManager and CheckpointBatcher:

interface CheckpointCallback {
    void onComplete(String newToken, List<Operation> operations);
}

Testing Infrastructure

LocalDurableTestRunner

In-memory test runner that simulates the full execution lifecycle without AWS.

// Default: auto-skip time
runner.runUntilComplete(input);  // Instantly completes waits

// Manual control
runner.withSkipTime(false);
runner.run(input);               // Returns PENDING at wait
runner.advanceTime();            // Move past wait
runner.run(input);               // Continues from wait

Failure Simulation

// Simulate checkpoint loss (fire-and-forget START lost)
runner.simulateFireAndForgetCheckpointLoss("step-name");

// Reset step to STARTED (simulate crash after START checkpoint)
runner.resetCheckpointToStarted("step-name");

CloudDurableTestRunner

Tests against deployed Lambda:

var runner = CloudDurableTestRunner.create(arn, Input.class, Output.class)
    .withPollInterval(Duration.ofSeconds(2))
    .withTimeout(Duration.ofMinutes(5));

TestResult<Output> result = runner.run(input);

Extension Points for Testing

DurableExecutionClient Interface - Backend abstraction for testing or alternative implementations:

public interface DurableExecutionClient {
    CheckpointDurableExecutionResponse checkpoint(
        String arn, String token, List<OperationUpdate> updates);
    
    GetDurableExecutionStateResponse getExecutionState(String arn, String marker);
}

Implementations:

  • LambdaDurableFunctionsClient - Production (wraps AWS SDK)
  • LocalMemoryExecutionClient - Testing (in-memory)

For production customization, use DurableConfig.builder().withLambdaClient(lambdaClient). For testing, use DurableConfig.builder().withDurableExecutionClient(localMemoryClient).


Custom SerDes and TypeToken

Custom SerDes Interface:

public interface SerDes {
    String serialize(Object value);
    <T> T deserialize(String data, Class<T> type);
    <T> T deserialize(String data, TypeToken<T> typeToken);
}

TypeToken and Type Erasure:

Java's type erasure removes generic type parameters at runtime (List<User> becomes List). This is problematic for deserialization—Jackson needs the full type to reconstruct objects correctly.

TypeToken<T> solves this by capturing generic types at compile time. Creating new TypeToken<List<User>>() {} produces an anonymous subclass whose superclass type parameter is preserved in bytecode and accessible via reflection (getGenericSuperclass()).

The SerDes interface provides both Class<T> and TypeToken<T> overloads:

  • Use Class<T> for simple types: String.class, User.class
  • Use TypeToken<T> for parameterized types: new TypeToken<List<User>>() {}

Thread Coordination and Suspension Mechanism (Advanced)

The SDK uses a threaded execution model where the handler runs in a background thread, racing against a suspension future. This enables immediate suspension when operations need to pause execution (waits, retries), without waiting for the handler to complete naturally.

Complete Suspension Flow

1. Handler Level - The Suspension Race

Handler runs in background thread, racing against suspension detection:

// DurableExecutor - which completes first?
CompletableFuture.anyOf(suspendFuture, handlerFuture).get();

Returns PENDING if suspension wins, SUCCESS if handler completes. See ADR-001: Threaded Handler Execution.

2. Suspension Detection - Unified Thread Counting

We use thread counting as the suspension trigger because threads naturally deregister when they cannot make progress on durable operations. This provides a simple, unified mechanism that works across all operation types.

// ExecutionManager.deregisterActiveThread() - ONLY suspension trigger
synchronized (this) {
    activeThreads.remove(threadId);
    if (activeThreads.isEmpty()) {
        suspendExecutionFuture.complete(null); // Suspension wins the race
        throw new SuspendExecutionException();
    }
}

Suspension triggers when: There are no active threads (all have deregistered). The SDK tracks two types of threads:

Thread Type Purpose Deregisters When
Root thread Main execution thread running the handler function • Calling future.get() to allow suspension while blocked
• Calling context.wait() or context.waitAsync().get() to trigger suspension
Step threads Background threads executing individual step operations • Completing work: After checkpointing result (success or failure)

Why root thread deregistration matters: Critical for allowing suspension when steps are retrying or when multiple operations depend on each other. This approach ensures suspension happens precisely when no thread can make progress on durable operations.

Advanced Feature: In-Process Completion

In scenarios where waits or step retries would normally suspend execution, but other active threads prevent suspension, the SDK automatically switches to in-process completion by polling the backend until timing conditions are met. This allows complex concurrent workflows to complete efficiently without unnecessary Lambda re-invocations or extended waiting periods.

Active Thread Tracking and Operation Completion Coordination

Each piece of user code - main function body, step body or child context body - runs in its own thread. Execution manager tracks active running threads. When a new step or child context is created, a new thread will be created and registered in execution manager. When the user code is blocked on get() or synchronous durable operations, the thread will be deregistered from execution manager. When there is no active running thread, the function execution will be suspended.

These user threads and the system thread use CompletableFuture to communicate the completion of operations. When a context executes a step, the communication happens as shown below

Sequence Context thread Step Thread System Thread
1 create StepOperation, create CompletableFuture (not created) (idle)
2 checkpoint START event (synchronously or asynchronously) (not created) call checkpoint API
3 create and register the Step thread execute user code for the step (idle)
4 call get(), deregister the context thread and wait for the CompletableFuture to complete (continue) (idle)
5 (blocked) checkpoint the step result and wait for checkpoint call to complete call checkpoint API, and handle the API response. If it is a terminal response, it will complete the Step operation CompletableFuture, register and unblock the context thread.
6 retrieve the result of the step deregister and terminate the Step thread (idle)

If the user code completes quickly, an alternative scenario could happen as follows

Sequence Context thread Step Thread System Thread
1 create StepOperation, create CompletableFuture (not created) (idle)
2 checkpoint START event (synchronously or asynchronously) (not created) call checkpoint API
3 create and register the Step thread execute user code for the step and complete quickly (idle)
5 (do something else or just get starved) checkpoint the step result and wait for checkpoint call to complete call checkpoint API, and handle the API response. If it is a terminal response, it will complete the Step operation CompletableFuture.
4 call get(). It's not blocked because CompletableFuture is already completed deregister and terminate the Step thread (idle)
6 retrieve the result of the step (ended) (idle)