diff --git a/README.md b/README.md index 9fc7b648e..879ce10ba 100644 --- a/README.md +++ b/README.md @@ -23,6 +23,8 @@ Your durable function extends `DurableHandler` and implements `handleReque - `ctx.createCallback()` – Wait for external events (approvals, webhooks) - `ctx.invoke()` – Invoke another Lambda function and wait for the result - `ctx.invokeAsync()` – Start a concurrent Lambda function invocation +- `ctx.runInChildContext()` – Run an isolated child context with its own checkpoint log +- `ctx.runInChildContextAsync()` – Start a concurrent child context ## Quick Start @@ -189,6 +191,30 @@ var result = ctx.invoke("invoke-function", ``` +### runInChildContext() – Isolated Execution Contexts + +Child contexts run an isolated stream of work with their own operation counter and checkpoint log. They support the full range of durable operations — `step`, `wait`, `invoke`, `createCallback`, and nested child contexts. + +```java +// Sync: blocks until the child context completes +var result = ctx.runInChildContext("validate-order", String.class, child -> { + var data = child.step("fetch", String.class, () -> fetchData()); + child.wait(Duration.ofMinutes(5)); + return child.step("validate", String.class, () -> validate(data)); +}); + +// Async: returns a DurableFuture for concurrent execution +var futureA = ctx.runInChildContextAsync("branch-a", String.class, child -> { + return child.step("work-a", String.class, () -> doWorkA()); +}); +var futureB = ctx.runInChildContextAsync("branch-b", String.class, child -> { + return child.step("work-b", String.class, () -> doWorkB()); +}); + +// Wait for all child contexts to complete +var results = DurableFuture.allOf(futureA, futureB); +``` + ## Step Configuration Configure step behavior with `StepConfig`: @@ -396,9 +422,10 @@ DurableExecutionException - General durable exception │ ├── InvokeFailedException - Chained invocation failed. Handle the error or propagate failure. │ ├── InvokeTimedoutException - Chained invocation timed out. Handle the error or propagate failure. │ └── InvokeStoppedException - Chained invocation stopped. Handle the error or propagate failure. - └── CallbackException - General callback exception - ├── CallbackFailedException - External system sent an error response to the callback. Handle the error or propagate failure - └── CallbackTimeoutException - Callback exceeded its timeout duration. Handle the error or propagate the failure + ├── CallbackException - General callback exception + │ ├── CallbackFailedException - External system sent an error response to the callback. Handle the error or propagate failure + │ └── CallbackTimeoutException - Callback exceeded its timeout duration. Handle the error or propagate the failure + └── ChildContextFailedException - Child context failed and the original exception could not be reconstructed ``` ```java diff --git a/docs/adr/004-child-context-execution.md b/docs/adr/004-child-context-execution.md new file mode 100644 index 000000000..2a0a3eb04 --- /dev/null +++ b/docs/adr/004-child-context-execution.md @@ -0,0 +1,98 @@ +# ADR-004: Child Context Execution (`runInChildContext`) + +**Status:** Accepted +**Date:** 2026-02-16 + +## Context + +The TypeScript and Python durable execution SDKs support child contexts via `OperationType.CONTEXT`, enabling isolated sub-workflows with independent operation counters and checkpoint logs. The Java SDK needs the same capability to support fan-out/fan-in, parallel processing branches, and hierarchical workflow composition. + +```java +var futureA = ctx.runInChildContextAsync("branch-a", String.class, child -> { + child.step("validate", Void.class, () -> validate(order)); + child.wait(Duration.ofMinutes(5)); + return child.step("charge", String.class, () -> charge(order)); +}); +var futureB = ctx.runInChildContextAsync("branch-b", String.class, child -> { ... }); +var results = DurableFuture.allOf(futureA, futureB); +``` + +## Decision + +### Child context as a CONTEXT operation + +A child context is a `CONTEXT` operation in the checkpoint log with a three-phase lifecycle: + +1. **START** (fire-and-forget) — marks the child context as in-progress +2. Inner operations checkpoint with `parentId` set to the child context's operation ID +3. **SUCCEED** or **FAIL** (blocking) — finalizes the child context + +``` +Op ID | Parent ID | Type | Action | Payload +------|-----------|---------|---------|-------- +3 | null | CONTEXT | START | — +3-1 | 3 | STEP | START | — +3-1 | 3 | STEP | SUCCEED | "result" +3 | null | CONTEXT | SUCCEED | "final result" +``` + +### Operation ID prefixing + +Inner operation IDs are prefixed with the parent context's operation ID using `-` as separator (e.g., `"3-1"`, `"3-2"`). This matches the JavaScript SDK's `stepPrefix` convention and ensures global uniqueness — the backend validates type consistency by operation ID alone. + +- Root context: `"1"`, `"2"`, `"3"` +- Child context `"1"`: `"1-1"`, `"1-2"`, `"1-3"` +- Nested child context `"1-2"`: `"1-2-1"`, `"1-2-2"` + +### Per-context replay state + +A global `executionMode` doesn't work for child contexts — a child may be replaying while the parent is already executing. Each `DurableContext` tracks its own replay state via an `isReplaying` field, initialized by checking `ExecutionManager.hasOperationsForContext(contextId)`. + +### Thread model + +Child context user code runs in a separate thread (same pattern as `StepOperation`): +- `registerActiveThread` before the executor runs (on parent thread) +- `setCurrentContext` inside the executor thread +- `deregisterActiveThread` in the finally block +- `SuspendExecutionException` caught in finally (suspension already signaled) + +### Large result handling + +Results < 256KB are checkpointed directly. Results ≥ 256KB trigger the `ReplayChildren` flow: +- SUCCEED checkpoint with empty payload + `ContextOptions { replayChildren: true }` +- On replay, child context re-executes; inner operations replay from cache +- No new SUCCEED checkpoint during reconstruction + +### Replay behavior + +| Cached status | Behavior | +|---------------|----------| +| SUCCEEDED | Return cached result | +| SUCCEEDED + `replayChildren=true` | Re-execute child to reconstruct large result | +| FAILED | Re-throw cached error | +| STARTED | Re-execute (interrupted mid-flight) | + +## Alternatives Considered + +### Flatten child operations into root checkpoint log +**Rejected:** Breaks operation ID uniqueness. A CONTEXT op with ID `"1"` and an inner STEP with ID `"1"` (different `parentId`) would trigger `InvalidParameterValueException` from the backend. + +### Global replay state with context tracking +**Rejected:** Adds complexity to `ExecutionManager` for something that's naturally per-context. The TypeScript SDK uses per-entity replay state for the same reason. + +## Consequences + +**Positive:** +- Aligns with TypeScript and Python SDK implementations +- Enables fan-out/fan-in, parallel branches, hierarchical workflows +- Clean separation: each child context is self-contained +- Nested child contexts chain naturally via ID prefixing + +**Negative:** +- More threads to coordinate +- Per-context replay state adds complexity vs. global mode + +**Deferred:** +- Orphan detection in `CheckpointBatcher` +- `summaryGenerator` for large-result observability +- Higher-level `map`/`parallel` combinators (different `OperationSubType` values, same `CONTEXT` operation type) diff --git a/examples/src/main/java/com/amazonaws/lambda/durable/examples/ChildContextExample.java b/examples/src/main/java/com/amazonaws/lambda/durable/examples/ChildContextExample.java new file mode 100644 index 000000000..0d1280411 --- /dev/null +++ b/examples/src/main/java/com/amazonaws/lambda/durable/examples/ChildContextExample.java @@ -0,0 +1,74 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 +package com.amazonaws.lambda.durable.examples; + +import com.amazonaws.lambda.durable.DurableContext; +import com.amazonaws.lambda.durable.DurableFuture; +import com.amazonaws.lambda.durable.DurableHandler; +import java.time.Duration; + +/** + * Example demonstrating child context workflows with the Durable Execution SDK. + * + *

This handler runs three concurrent child contexts using {@code runInChildContextAsync}: + * + *

    + *
  1. Order validation — performs a step then suspends via {@code wait()} before completing + *
  2. Inventory check — performs a step then suspends via {@code wait()} before completing + *
  3. Shipping estimate — nests another child context inside it to demonstrate hierarchical contexts + *
+ * + *

All three child contexts run concurrently. Results are collected with {@link DurableFuture#allOf} and combined + * into a summary string. + */ +public class ChildContextExample extends DurableHandler { + + @Override + public String handleRequest(GreetingRequest input, DurableContext context) { + var name = input.getName(); + context.getLogger().info("Starting child context workflow for {}", name); + + // Child context 1: Order validation — step + wait + step + var orderFuture = context.runInChildContextAsync("order-validation", String.class, child -> { + var prepared = child.step("prepare-order", String.class, () -> "Order for " + name); + context.getLogger().info("Order prepared, waiting for validation"); + + child.wait("validation-delay", Duration.ofSeconds(5)); + + return child.step("validate-order", String.class, () -> prepared + " [validated]"); + }); + + // Child context 2: Inventory check — step + wait + step + var inventoryFuture = context.runInChildContextAsync("inventory-check", String.class, child -> { + var stock = child.step("check-stock", String.class, () -> "Stock available for " + name); + context.getLogger().info("Stock checked, waiting for confirmation"); + + child.wait("confirmation-delay", Duration.ofSeconds(3)); + + return child.step("confirm-inventory", String.class, () -> stock + " [confirmed]"); + }); + + // Child context 3: Shipping estimate — nests a child context inside it + var shippingFuture = context.runInChildContextAsync("shipping-estimate", String.class, child -> { + var baseRate = child.step("calculate-base-rate", String.class, () -> "Base rate for " + name); + + // Nested child context: calculate regional adjustment + var adjustment = child.runInChildContext( + "regional-adjustment", + String.class, + nested -> nested.step("lookup-region", String.class, () -> baseRate + " + regional adjustment")); + + return child.step("finalize-shipping", String.class, () -> adjustment + " [shipping ready]"); + }); + + // Collect all results using allOf + context.getLogger().info("Waiting for all child contexts to complete"); + var results = DurableFuture.allOf(orderFuture, inventoryFuture, shippingFuture); + + // Combine into summary + var summary = String.join(" | ", results); + context.getLogger().info("All child contexts complete: {}", summary); + + return summary; + } +} diff --git a/examples/src/test/java/com/amazonaws/lambda/durable/examples/ChildContextExampleTest.java b/examples/src/test/java/com/amazonaws/lambda/durable/examples/ChildContextExampleTest.java new file mode 100644 index 000000000..c7ecdd6dd --- /dev/null +++ b/examples/src/test/java/com/amazonaws/lambda/durable/examples/ChildContextExampleTest.java @@ -0,0 +1,55 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 +package com.amazonaws.lambda.durable.examples; + +import static org.junit.jupiter.api.Assertions.*; + +import com.amazonaws.lambda.durable.model.ExecutionStatus; +import com.amazonaws.lambda.durable.testing.LocalDurableTestRunner; +import org.junit.jupiter.api.Test; + +class ChildContextExampleTest { + + @Test + void testChildContextExampleRunsToCompletion() { + var handler = new ChildContextExample(); + var runner = LocalDurableTestRunner.create(GreetingRequest.class, handler); + + var input = new GreetingRequest("Alice"); + var result = runner.runUntilComplete(input); + + assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus()); + assertEquals( + "Order for Alice [validated] | Stock available for Alice [confirmed] | Base rate for Alice + regional adjustment [shipping ready]", + result.getResult(String.class)); + } + + @Test + void testChildContextExampleSuspendsOnFirstRun() { + var handler = new ChildContextExample(); + var runner = LocalDurableTestRunner.create(GreetingRequest.class, handler); + + var input = new GreetingRequest("Bob"); + + // First run should suspend due to wait operations inside child contexts + var result = runner.run(input); + assertEquals(ExecutionStatus.PENDING, result.getStatus()); + } + + @Test + void testChildContextExampleReplay() { + var handler = new ChildContextExample(); + var runner = LocalDurableTestRunner.create(GreetingRequest.class, handler); + + var input = new GreetingRequest("Alice"); + + // First full execution + var result1 = runner.runUntilComplete(input); + assertEquals(ExecutionStatus.SUCCEEDED, result1.getStatus()); + + // Replay — should return cached results + var result2 = runner.run(input); + assertEquals(ExecutionStatus.SUCCEEDED, result2.getStatus()); + assertEquals(result1.getResult(String.class), result2.getResult(String.class)); + } +} diff --git a/examples/src/test/java/com/amazonaws/lambda/durable/examples/CloudBasedIntegrationTest.java b/examples/src/test/java/com/amazonaws/lambda/durable/examples/CloudBasedIntegrationTest.java index d7d66ca25..c7d520e52 100644 --- a/examples/src/test/java/com/amazonaws/lambda/durable/examples/CloudBasedIntegrationTest.java +++ b/examples/src/test/java/com/amazonaws/lambda/durable/examples/CloudBasedIntegrationTest.java @@ -353,6 +353,22 @@ void testCallbackExampleWithTimeout() { assertEquals(OperationStatus.TIMED_OUT, approvalOp.getStatus()); } + @Test + void testChildContextExample() { + var runner = CloudDurableTestRunner.create(arn("child-context-example"), GreetingRequest.class, String.class); + var result = runner.run(new GreetingRequest("Alice")); + + assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus()); + assertEquals( + "Order for Alice [validated] | Stock available for Alice [confirmed] | Base rate for Alice + regional adjustment [shipping ready]", + result.getResult(String.class)); + + // Verify child context operations were tracked + assertNotNull(runner.getOperation("order-validation")); + assertNotNull(runner.getOperation("inventory-check")); + assertNotNull(runner.getOperation("shipping-estimate")); + } + @Test void testManyAsyncStepsExample() { var runner = CloudDurableTestRunner.create( diff --git a/examples/template.yaml b/examples/template.yaml index a96e84a45..2adafb45a 100644 --- a/examples/template.yaml +++ b/examples/template.yaml @@ -350,6 +350,31 @@ Resources: DockerContext: ../ DockerTag: durable-examples + ChildContextExampleFunction: + Type: AWS::Serverless::Function + Properties: + PackageType: Image + FunctionName: !Join + - '' + - - 'child-context-example' + - !Ref FunctionNameSuffix + ImageConfig: + Command: ["com.amazonaws.lambda.durable.examples.ChildContextExample::handleRequest"] + DurableConfig: + ExecutionTimeout: 300 + RetentionPeriodInDays: 7 + Policies: + - Statement: + - Effect: Allow + Action: + - lambda:CheckpointDurableExecutions + - lambda:GetDurableExecutionState + Resource: !Sub "arn:aws:lambda:${AWS::Region}:${AWS::AccountId}:function:child-context-example${FunctionNameSuffix}" + Metadata: + Dockerfile: !Ref DockerFile + DockerContext: ../ + DockerTag: durable-examples + Outputs: SimpleStepExampleFunction: Description: Simple Step Example Function ARN @@ -454,3 +479,11 @@ Outputs: ManyAsyncStepsExampleFunctionName: Description: Many Async Steps Example Function Name Value: !Ref ManyAsyncStepsExampleFunction + + ChildContextExampleFunction: + Description: Child Context Example Function ARN + Value: !GetAtt ChildContextExampleFunction.Arn + + ChildContextExampleFunctionName: + Description: Child Context Example Function Name + Value: !Ref ChildContextExampleFunction diff --git a/sdk-integration-tests/src/test/java/com/amazonaws/lambda/durable/ChildContextIntegrationTest.java b/sdk-integration-tests/src/test/java/com/amazonaws/lambda/durable/ChildContextIntegrationTest.java new file mode 100644 index 000000000..6780ed19a --- /dev/null +++ b/sdk-integration-tests/src/test/java/com/amazonaws/lambda/durable/ChildContextIntegrationTest.java @@ -0,0 +1,501 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 +package com.amazonaws.lambda.durable; + +import static org.junit.jupiter.api.Assertions.*; + +import com.amazonaws.lambda.durable.model.ExecutionStatus; +import com.amazonaws.lambda.durable.testing.LocalDurableTestRunner; +import java.time.Duration; +import java.util.concurrent.atomic.AtomicInteger; +import org.junit.jupiter.api.Test; +import software.amazon.awssdk.services.lambda.model.OperationType; + +/** Integration tests for child context behavior. */ +class ChildContextIntegrationTest { + + /** + * A child context that completes successfully SHALL produce the same result on replay without re-executing the user + * function. + */ + @Test + void childContextResultSurvivesReplay() { + var childExecutionCount = new AtomicInteger(0); + + var runner = LocalDurableTestRunner.create(String.class, (input, ctx) -> { + return ctx.runInChildContext("compute", String.class, child -> { + childExecutionCount.incrementAndGet(); + return child.step("work", String.class, () -> "result-" + input); + }); + }); + + // First run - executes child context + var result = runner.runUntilComplete("test"); + assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus()); + assertEquals("result-test", result.getResult(String.class)); + assertEquals(1, childExecutionCount.get()); + + // Second run - replays, should return cached result without re-executing + result = runner.run("test"); + assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus()); + assertEquals("result-test", result.getResult(String.class)); + assertEquals(1, childExecutionCount.get(), "Child function should not re-execute on replay"); + } + + /** + * A child context that fails with a reconstructable exception SHALL preserve the exception type, message, and error + * details through the checkpoint-and-replay cycle. + */ + @Test + void childContextExceptionPreservedOnReplay() { + var childExecutionCount = new AtomicInteger(0); + + var runner = LocalDurableTestRunner.create(String.class, (input, ctx) -> { + return ctx.runInChildContext("failing", String.class, child -> { + childExecutionCount.incrementAndGet(); + throw new IllegalArgumentException("bad input: " + input); + }); + }); + + // First run - child context fails + var result = runner.run("test"); + assertEquals(ExecutionStatus.FAILED, result.getStatus()); + assertEquals(1, childExecutionCount.get()); + + // Second run - replays, should throw same exception without re-executing + result = runner.run("test"); + assertEquals(ExecutionStatus.FAILED, result.getStatus()); + assertTrue(result.getError().isPresent()); + var error = result.getError().get(); + assertEquals("java.lang.IllegalArgumentException", error.errorType()); + assertEquals("bad input: test", error.errorMessage()); + assertEquals(1, childExecutionCount.get(), "Child function should not re-execute on failed replay"); + } + + /** Operations checkpointed from within a child context SHALL have the child context's ID as their parentId. */ + @Test + void operationsInChildContextHaveCorrectParentId() { + var runner = LocalDurableTestRunner.create(String.class, (input, ctx) -> { + return ctx.runInChildContext("child-ctx", String.class, child -> { + var step1 = child.step("inner-step", String.class, () -> "step-result"); + return step1; + }); + }); + + var result = runner.runUntilComplete("test"); + assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus()); + assertEquals("step-result", result.getResult(String.class)); + + // Verify the inner step has the child context's operation ID as parentId + var innerStep = result.getOperation("inner-step"); + assertNotNull(innerStep, "Inner step should exist"); + } + + /** Each child context SHALL maintain its own operation counter. */ + @Test + void childContextsHaveIndependentOperationCounters() { + var runner = LocalDurableTestRunner.create(String.class, (input, ctx) -> { + var r1 = ctx.runInChildContext("child-a", String.class, child -> { + return child.step("step-a", String.class, () -> "a-result"); + }); + var r2 = ctx.runInChildContext("child-b", String.class, child -> { + return child.step("step-b", String.class, () -> "b-result"); + }); + return r1 + "+" + r2; + }); + + var result = runner.runUntilComplete("test"); + assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus()); + assertEquals("a-result+b-result", result.getResult(String.class)); + + // Both child contexts should have completed successfully + var stepA = result.getOperation("step-a"); + var stepB = result.getOperation("step-b"); + assertNotNull(stepA); + assertNotNull(stepB); + } + + /** Two child contexts with operations that have the same local IDs SHALL NOT interfere with each other. */ + @Test + void parallelChildContextsWithSameLocalIdsDoNotInterfere() { + var runner = LocalDurableTestRunner.create(String.class, (input, ctx) -> { + // Both child contexts will have a step with local operation ID "1" + var futureA = ctx.runInChildContextAsync("ctx-a", String.class, child -> { + return child.step("work", String.class, () -> "result-a"); + }); + var futureB = ctx.runInChildContextAsync("ctx-b", String.class, child -> { + return child.step("work", String.class, () -> "result-b"); + }); + return futureA.get() + "+" + futureB.get(); + }); + + var result = runner.runUntilComplete("test"); + assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus()); + assertEquals("result-a+result-b", result.getResult(String.class)); + } + + /** Each concurrently running async child context SHALL complete with its own correct result. */ + @Test + void multipleAsyncChildContextsReturnCorrectResults() { + var runner = LocalDurableTestRunner.create(String.class, (input, ctx) -> { + var f1 = ctx.runInChildContextAsync("async-1", String.class, child -> { + return child.step("s1", String.class, () -> "one"); + }); + var f2 = ctx.runInChildContextAsync("async-2", String.class, child -> { + return child.step("s2", String.class, () -> "two"); + }); + var f3 = ctx.runInChildContextAsync("async-3", String.class, child -> { + return child.step("s3", String.class, () -> "three"); + }); + return f1.get() + "," + f2.get() + "," + f3.get(); + }); + + var result = runner.runUntilComplete("test"); + assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus()); + assertEquals("one,two,three", result.getResult(String.class)); + } + + /** The results returned by DurableFuture.allOf() SHALL be in the same order as the input futures. */ + @Test + void allOfReturnsResultsInOrder() { + var runner = LocalDurableTestRunner.create(String.class, (input, ctx) -> { + var f1 = ctx.runInChildContextAsync("first", String.class, child -> { + return child.step("s1", String.class, () -> "alpha"); + }); + var f2 = ctx.runInChildContextAsync("second", String.class, child -> { + return child.step("s2", String.class, () -> "beta"); + }); + var f3 = ctx.runInChildContextAsync("third", String.class, child -> { + return child.step("s3", String.class, () -> "gamma"); + }); + + var results = DurableFuture.allOf(f1, f2, f3); + return String.join(",", results); + }); + + var result = runner.runUntilComplete("test"); + assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus()); + assertEquals("alpha,beta,gamma", result.getResult(String.class)); + } + + /** + * A wait() inside a child context SHALL suspend the execution. After the wait completes, the child context SHALL + * resume and complete with the correct result. + */ + @Test + void waitInsideChildContextSuspendsAndResumes() { + var runner = LocalDurableTestRunner.create(String.class, (input, ctx) -> { + return ctx.runInChildContext("workflow", String.class, child -> { + child.step("before-wait", Void.class, () -> null); + child.wait(Duration.ofSeconds(10)); + return child.step("after-wait", String.class, () -> "done"); + }); + }); + runner.withSkipTime(true); + + var result = runner.runUntilComplete("test"); + assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus()); + assertEquals("done", result.getResult(String.class)); + } + + /** + * A wait() inside a child context SHALL cause the execution to return PENDING. After advancing time and re-running, + * the execution SHALL complete successfully. + */ + @Test + void waitInsideChildContextReturnsPendingThenCompletes() { + var runner = LocalDurableTestRunner.create(String.class, (input, ctx) -> { + return ctx.runInChildContext("workflow", String.class, child -> { + child.step("before-wait", Void.class, () -> null); + child.wait(Duration.ofSeconds(10)); + return child.step("after-wait", String.class, () -> "done"); + }); + }); + runner.withSkipTime(false); + + // First run - should suspend at the wait + var result = runner.run("test"); + assertEquals(ExecutionStatus.PENDING, result.getStatus()); + + // Advance time so the wait completes + runner.advanceTime(); + + // Second run - should complete + var result2 = runner.run("test"); + assertEquals(ExecutionStatus.SUCCEEDED, result2.getStatus()); + assertEquals("done", result2.getResult(String.class)); + } + + /** + * When two concurrent child contexts each contain a wait(), the execution SHALL return PENDING. After advancing + * time and re-running, both child contexts SHALL resume and complete with correct results. + */ + @Test + void twoAsyncChildContextsBothWaitSuspendAndResume() { + var runner = LocalDurableTestRunner.create(String.class, (input, ctx) -> { + var f1 = ctx.runInChildContextAsync("child-a", String.class, child -> { + child.step("a-before", Void.class, () -> null); + child.wait(Duration.ofSeconds(5)); + return child.step("a-after", String.class, () -> "a-done"); + }); + var f2 = ctx.runInChildContextAsync("child-b", String.class, child -> { + child.step("b-before", Void.class, () -> null); + child.wait(Duration.ofSeconds(10)); + return child.step("b-after", String.class, () -> "b-done"); + }); + return f1.get() + "+" + f2.get(); + }); + + runner.withSkipTime(false); + + // First run - both child contexts should suspend at their waits + // TODO: Using run() + runUntilComplete() instead of manual run/advanceTime/run due to a + // thread coordination race condition that causes flakiness on slow CI workers. + var result = runner.run("test"); + assertEquals(ExecutionStatus.PENDING, result.getStatus()); + + // Now let runUntilComplete handle the rest (with skipTime so waits auto-advance) + runner.withSkipTime(true); + var finalResult = runner.runUntilComplete("test"); + assertEquals(ExecutionStatus.SUCCEEDED, finalResult.getStatus()); + assertEquals("a-done+b-done", finalResult.getResult(String.class)); + } + + /** + * When one async child context contains a long wait and another is actively processing, the execution SHALL NOT + * suspend until the busy child finishes its work. After the busy child completes, the execution suspends (PENDING) + * because the waiting child's wait is still outstanding. After advancing time, both complete. + */ + @Test + void oneChildWaitsWhileOtherKeepsProcessingSuspendsAfterWorkDone() { + var runner = LocalDurableTestRunner.create(String.class, (input, ctx) -> { + var waiting = ctx.runInChildContextAsync("waiter", String.class, child -> { + child.wait(Duration.ofSeconds(30)); + return child.step("w-after", String.class, () -> "waited"); + }); + var busy = ctx.runInChildContextAsync("busy", String.class, child -> { + return child.step("slow-work", String.class, () -> { + try { + Thread.sleep(200); // Simulate real work keeping the thread active + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + return "done-working"; + }); + }); + return busy.get() + "|" + waiting.get(); + }); + runner.withSkipTime(false); + + // First run: busy child completes its work, but waiter's wait is still outstanding → PENDING + var result = runner.run("test"); + assertEquals(ExecutionStatus.PENDING, result.getStatus()); + + // The busy child's step should have been checkpointed before suspension + var busyStep = result.getOperation("slow-work"); + assertNotNull(busyStep, "Busy child's step should have completed before suspension"); + + // Advance time so the wait completes + runner.advanceTime(); + + // Second run: both children complete + var result2 = runner.run("test"); + assertEquals(ExecutionStatus.SUCCEEDED, result2.getStatus()); + assertEquals("done-working|waited", result2.getResult(String.class)); + } + + /** + * A child context with a result ≥256KB SHALL trigger the ReplayChildren flow. On replay, the child context SHALL be + * re-executed to reconstruct the result. + */ + @Test + void largeResultTriggersReplayChildrenAndReconstructsCorrectly() { + var childExecutionCount = new AtomicInteger(0); + + // Generate a string larger than 256KB + var largePayload = "x".repeat(256 * 1024 + 100); + + var runner = LocalDurableTestRunner.create(String.class, (input, ctx) -> { + return ctx.runInChildContext("large-result", String.class, child -> { + childExecutionCount.incrementAndGet(); + return child.step("produce", String.class, () -> largePayload); + }); + }); + + // First run - executes child context, triggers ReplayChildren + var result = runner.runUntilComplete("test"); + assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus()); + assertEquals(largePayload, result.getResult(String.class)); + assertEquals(1, childExecutionCount.get()); + + // Second run - replays with ReplayChildren, re-executes child to reconstruct + result = runner.runUntilComplete("test"); + assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus()); + assertEquals(largePayload, result.getResult(String.class)); + // Child function IS re-executed for ReplayChildren (to reconstruct the large result) + assertTrue(childExecutionCount.get() >= 2, "Child should re-execute for ReplayChildren reconstruction"); + } + + // ===== Edge Case Tests ===== + + /** + * A child context created within another child context SHALL have its own independent operation counter and correct + * parentId propagation. + */ + @Test + void nestedChildContextsWithIndependentCountersAndCorrectParentId() { + var outerChildCount = new AtomicInteger(0); + var innerChildCount = new AtomicInteger(0); + + var runner = LocalDurableTestRunner.create(String.class, (input, ctx) -> { + return ctx.runInChildContext("outer-child", String.class, outerChild -> { + outerChildCount.incrementAndGet(); + var outerStep = outerChild.step("outer-step", String.class, () -> "outer"); + + var innerResult = outerChild.runInChildContext("inner-child", String.class, innerChild -> { + innerChildCount.incrementAndGet(); + return innerChild.step("inner-step", String.class, () -> "inner"); + }); + + return outerStep + "+" + innerResult; + }); + }); + + // First run - executes both nested child contexts + var result = runner.runUntilComplete("test"); + assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus()); + assertEquals("outer+inner", result.getResult(String.class)); + assertEquals(1, outerChildCount.get()); + assertEquals(1, innerChildCount.get()); + + // Replay - should return cached results without re-executing + result = runner.run("test"); + assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus()); + assertEquals("outer+inner", result.getResult(String.class)); + assertEquals(1, outerChildCount.get(), "Outer child should not re-execute on replay"); + assertEquals(1, innerChildCount.get(), "Inner child should not re-execute on replay"); + + // Verify both steps exist (independent counters — both have local ID "1" in their respective contexts) + var outerStep = result.getOperation("outer-step"); + var innerStep = result.getOperation("inner-step"); + assertNotNull(outerStep, "Outer step should exist"); + assertNotNull(innerStep, "Inner step should exist"); + } + + /** + * When a child context is replayed but the current code uses a different operation name at the same position, the + * execution SHALL fail with a non-deterministic execution error. + */ + @Test + void nonDeterministicReplayDetectionForChildContext() { + var callCount = new AtomicInteger(0); + + var runner = LocalDurableTestRunner.create(String.class, (input, ctx) -> { + var count = callCount.incrementAndGet(); + if (count == 1) { + // First execution: create child context with name "original-name" + return ctx.runInChildContext("original-name", String.class, child -> { + return child.step("work", String.class, () -> "result"); + }); + } else { + // Second execution: use a different name at the same operation position + // This should trigger NonDeterministicExecutionException + return ctx.runInChildContext("different-name", String.class, child -> { + return child.step("work", String.class, () -> "result"); + }); + } + }); + + // First run succeeds + var result = runner.runUntilComplete("test"); + assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus()); + assertEquals("result", result.getResult(String.class)); + + // Second run with different name should fail with non-deterministic error + result = runner.run("test"); + assertEquals(ExecutionStatus.FAILED, result.getStatus()); + assertTrue(result.getError().isPresent()); + var error = result.getError().get(); + assertTrue( + error.errorType().contains("NonDeterministicExecutionException"), + "Expected NonDeterministicExecutionException, got: " + error.errorType()); + assertTrue( + error.errorMessage().contains("name mismatch"), + "Expected name mismatch message, got: " + error.errorMessage()); + } + + /** + * A child context whose function returns a value immediately without performing any durable operations SHALL + * complete successfully and replay correctly. + */ + @Test + void emptyChildContextReturnsImmediately() { + var childExecutionCount = new AtomicInteger(0); + + var runner = LocalDurableTestRunner.create(String.class, (input, ctx) -> { + return ctx.runInChildContext("empty", String.class, child -> { + childExecutionCount.incrementAndGet(); + return "immediate-result"; + }); + }); + + // First run - child context returns immediately + var result = runner.runUntilComplete("test"); + assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus()); + assertEquals("immediate-result", result.getResult(String.class)); + assertEquals(1, childExecutionCount.get()); + + // Replay - should return cached result without re-executing + result = runner.run("test"); + assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus()); + assertEquals("immediate-result", result.getResult(String.class)); + assertEquals(1, childExecutionCount.get(), "Empty child should not re-execute on replay"); + } + + /** + * Operations within a child context SHALL use the child context's own operation counter, producing IDs independent + * of the parent context. Multiple operations within a single child context should get sequential IDs. + */ + @Test + void stepAndInvokeWithinChildContextUseChildOperationCounter() { + var runner = LocalDurableTestRunner.create(String.class, (input, ctx) -> { + // Parent context: operation 1 is a step + var parentStep = ctx.step("parent-step", String.class, () -> "parent"); + + // Parent context: operation 2 is a child context + var childResult = ctx.runInChildContext("child-ctx", String.class, child -> { + // Child context: operations 1, 2, 3 are steps (independent counter) + var s1 = child.step("child-step-1", String.class, () -> "c1"); + var s2 = child.step("child-step-2", String.class, () -> "c2"); + var s3 = child.step("child-step-3", String.class, () -> "c3"); + return s1 + "," + s2 + "," + s3; + }); + + // Parent context: operation 3 is another step (counter continues from parent) + var afterStep = ctx.step("after-step", String.class, () -> "after"); + + return parentStep + "|" + childResult + "|" + afterStep; + }); + + var result = runner.runUntilComplete("test"); + assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus()); + assertEquals("parent|c1,c2,c3|after", result.getResult(String.class)); + + // Verify all operations exist and completed + assertNotNull(result.getOperation("parent-step"), "Parent step should exist"); + assertNotNull(result.getOperation("child-step-1"), "Child step 1 should exist"); + assertNotNull(result.getOperation("child-step-2"), "Child step 2 should exist"); + assertNotNull(result.getOperation("child-step-3"), "Child step 3 should exist"); + assertNotNull(result.getOperation("after-step"), "After step should exist"); + + // Verify child context operation exists + var childCtxOp = result.getOperation("child-ctx"); + assertNotNull(childCtxOp, "Child context operation should exist"); + assertEquals(OperationType.CONTEXT, childCtxOp.getType()); + + // Replay should produce the same result + var replayResult = runner.run("test"); + assertEquals(ExecutionStatus.SUCCEEDED, replayResult.getStatus()); + assertEquals("parent|c1,c2,c3|after", replayResult.getResult(String.class)); + } +} diff --git a/sdk-testing/src/main/java/com/amazonaws/lambda/durable/testing/EventProcessor.java b/sdk-testing/src/main/java/com/amazonaws/lambda/durable/testing/EventProcessor.java index e5f1c6182..33b10a18f 100644 --- a/sdk-testing/src/main/java/com/amazonaws/lambda/durable/testing/EventProcessor.java +++ b/sdk-testing/src/main/java/com/amazonaws/lambda/durable/testing/EventProcessor.java @@ -25,6 +25,7 @@ Event processUpdate(OperationUpdate update, Operation operation) { case CHAINED_INVOKE -> buildInvokeEvent(builder, update, operation); case EXECUTION -> buildExecutionEvent(builder, update); case CALLBACK -> buildCallbackEvent(builder, update); + case CONTEXT -> buildContextEvent(builder, update); default -> throw new IllegalArgumentException("Unsupported operation type: " + update.type()); }; } @@ -157,6 +158,15 @@ private Event buildCallbackEvent(Event.Builder builder, OperationUpdate update) }; } + private Event buildContextEvent(Event.Builder builder, OperationUpdate update) { + return switch (update.action()) { + case START -> builder.eventType(EventType.CONTEXT_STARTED).build(); + case SUCCEED -> builder.eventType(EventType.CONTEXT_SUCCEEDED).build(); + case FAIL -> builder.eventType(EventType.CONTEXT_FAILED).build(); + default -> throw new IllegalArgumentException("Unsupported context action: " + update.action()); + }; + } + private RetryDetails buildRetryDetails(Operation operation) { if (operation == null || operation.stepDetails() == null) { return RetryDetails.builder().currentAttempt(1).build(); diff --git a/sdk-testing/src/main/java/com/amazonaws/lambda/durable/testing/HistoryEventProcessor.java b/sdk-testing/src/main/java/com/amazonaws/lambda/durable/testing/HistoryEventProcessor.java index fd3f1aade..2f2474ba7 100644 --- a/sdk-testing/src/main/java/com/amazonaws/lambda/durable/testing/HistoryEventProcessor.java +++ b/sdk-testing/src/main/java/com/amazonaws/lambda/durable/testing/HistoryEventProcessor.java @@ -9,6 +9,7 @@ import java.util.List; import software.amazon.awssdk.services.lambda.model.CallbackDetails; import software.amazon.awssdk.services.lambda.model.ChainedInvokeDetails; +import software.amazon.awssdk.services.lambda.model.ContextDetails; import software.amazon.awssdk.services.lambda.model.ErrorObject; import software.amazon.awssdk.services.lambda.model.Event; import software.amazon.awssdk.services.lambda.model.Operation; @@ -142,8 +143,26 @@ public TestResult processEvents(List events, Class outputType) // Unknown event type - log and ignore gracefully } - case CONTEXT_STARTED, CONTEXT_SUCCEEDED, CONTEXT_FAILED -> { - throw new UnsupportedOperationException("Context operations currently not supported"); + case CONTEXT_STARTED -> { + if (operationId != null) { + operations.putIfAbsent( + operationId, + createContextOperation(operationId, event.name(), OperationStatus.STARTED, event)); + } + } + case CONTEXT_SUCCEEDED -> { + if (operationId != null) { + operations.put( + operationId, + createContextOperation(operationId, event.name(), OperationStatus.SUCCEEDED, event)); + } + } + case CONTEXT_FAILED -> { + if (operationId != null) { + operations.put( + operationId, + createContextOperation(operationId, event.name(), OperationStatus.FAILED, event)); + } } case CHAINED_INVOKE_STARTED, @@ -292,4 +311,28 @@ private Operation createInvokeOperation(String id, Event event) { .chainedInvokeDetails(builder.build()) .build(); } + + private Operation createContextOperation(String id, String name, OperationStatus status, Event event) { + var builder = ContextDetails.builder(); + + if (event.contextSucceededDetails() != null) { + var details = event.contextSucceededDetails(); + if (details.result() != null && details.result().payload() != null) { + builder.result(details.result().payload()); + } + } else if (event.contextFailedDetails() != null) { + var details = event.contextFailedDetails(); + if (details.error() != null && details.error().payload() != null) { + builder.error(details.error().payload()); + } + } + + return Operation.builder() + .id(id) + .name(name) + .status(status) + .type(OperationType.CONTEXT) + .contextDetails(builder.build()) + .build(); + } } diff --git a/sdk-testing/src/main/java/com/amazonaws/lambda/durable/testing/LocalMemoryExecutionClient.java b/sdk-testing/src/main/java/com/amazonaws/lambda/durable/testing/LocalMemoryExecutionClient.java index bfc825129..75cb655a8 100644 --- a/sdk-testing/src/main/java/com/amazonaws/lambda/durable/testing/LocalMemoryExecutionClient.java +++ b/sdk-testing/src/main/java/com/amazonaws/lambda/durable/testing/LocalMemoryExecutionClient.java @@ -65,7 +65,7 @@ public List getEventsForOperation(String operationId) { /** Advance all operations (simulates time passing for retries/waits). */ public void advanceReadyOperations() { - operations.replaceAll((id, op) -> { + operations.replaceAll((key, op) -> { if (op.status() == OperationStatus.PENDING) { return op.toBuilder().status(OperationStatus.READY).build(); } @@ -74,7 +74,7 @@ public void advanceReadyOperations() { op.toBuilder().status(OperationStatus.SUCCEEDED).build(); // Generate WaitSucceeded event var update = OperationUpdate.builder() - .id(id) + .id(op.id()) .name(op.name()) .type(OperationType.WAIT) .action(OperationAction.SUCCEED) @@ -113,7 +113,7 @@ public void completeChainedInvoke(String name, OperationResult result) { .build(); var event = eventProcessor.processUpdate(update, newOp); allEvents.add(event); - operations.put(op.id(), newOp); + operations.put(compositeKey(op.parentId(), op.id()), newOp); } } @@ -150,7 +150,7 @@ public void resetCheckpointToStarted(String stepName) { throw new IllegalStateException("Operation not found: " + stepName); } var startedOp = op.toBuilder().status(OperationStatus.STARTED).build(); - operations.put(op.id(), startedOp); + operations.put(compositeKey(op.parentId(), op.id()), startedOp); } /** Simulate fire-and-forget checkpoint loss by removing the operation entirely */ @@ -159,22 +159,28 @@ public void simulateFireAndForgetCheckpointLoss(String stepName) { if (op == null) { throw new IllegalStateException("Operation not found: " + stepName); } - operations.remove(op.id()); + operations.remove(compositeKey(op.parentId(), op.id())); } private void applyUpdate(OperationUpdate update) { var operation = toOperation(update); - operations.put(update.id(), operation); + var key = compositeKey(update.parentId(), update.id()); + operations.put(key, operation); var event = eventProcessor.processUpdate(update, operation); allEvents.add(event); } + private static String compositeKey(String parentId, String operationId) { + return (parentId != null ? parentId : "") + ":" + operationId; + } + private Operation toOperation(OperationUpdate update) { var builder = Operation.builder() .id(update.id()) .name(update.name()) .type(update.type()) + .parentId(update.parentId()) .status(deriveStatus(update.action())); switch (update.type()) { @@ -183,7 +189,7 @@ private Operation toOperation(OperationUpdate update) { case CALLBACK -> builder.callbackDetails(buildCallbackDetails(update)); case EXECUTION -> {} // No details needed for EXECUTION operations case CHAINED_INVOKE -> builder.chainedInvokeDetails(buildChainedInvokeDetails(update)); - case CONTEXT -> throw new UnsupportedOperationException("CONTEXT not supported"); + case CONTEXT -> builder.contextDetails(buildContextDetails(update)); case UNKNOWN_TO_SDK_VERSION -> throw new UnsupportedOperationException("UNKNOWN_TO_SDK_VERSION not supported"); } @@ -201,6 +207,17 @@ private ChainedInvokeDetails buildChainedInvokeDetails(OperationUpdate update) { .build(); } + private ContextDetails buildContextDetails(OperationUpdate update) { + var detailsBuilder = ContextDetails.builder().result(update.payload()).error(update.error()); + + if (update.contextOptions() != null + && Boolean.TRUE.equals(update.contextOptions().replayChildren())) { + detailsBuilder.replayChildren(true); + } + + return detailsBuilder.build(); + } + private WaitDetails buildWaitDetails(OperationUpdate update) { if (update.waitOptions() == null) return null; @@ -209,7 +226,8 @@ private WaitDetails buildWaitDetails(OperationUpdate update) { } private StepDetails buildStepDetails(OperationUpdate update) { - var existingOp = operations.get(update.id()); + var key = compositeKey(update.parentId(), update.id()); + var existingOp = operations.get(key); var existing = existingOp != null ? existingOp.stepDetails() : null; var detailsBuilder = existing != null ? existing.toBuilder() : StepDetails.builder(); @@ -227,7 +245,8 @@ private StepDetails buildStepDetails(OperationUpdate update) { } private CallbackDetails buildCallbackDetails(OperationUpdate update) { - var existingOp = operations.get(update.id()); + var key = compositeKey(update.parentId(), update.id()); + var existingOp = operations.get(key); var existing = existingOp != null ? existingOp.callbackDetails() : null; // Preserve existing callbackId, or generate new one on START @@ -259,7 +278,7 @@ public void completeCallback(String callbackId, String result) { .status(OperationStatus.SUCCEEDED) .callbackDetails(op.callbackDetails().toBuilder().result(result).build()) .build(); - operations.put(op.id(), updated); + operations.put(compositeKey(op.parentId(), op.id()), updated); } /** Simulate external system failing callback. */ @@ -272,7 +291,7 @@ public void failCallback(String callbackId, ErrorObject error) { .status(OperationStatus.FAILED) .callbackDetails(op.callbackDetails().toBuilder().error(error).build()) .build(); - operations.put(op.id(), updated); + operations.put(compositeKey(op.parentId(), op.id()), updated); } /** Simulate callback timeout. */ @@ -282,7 +301,7 @@ public void timeoutCallback(String callbackId) { throw new IllegalStateException("Callback not found: " + callbackId); } var updated = op.toBuilder().status(OperationStatus.TIMED_OUT).build(); - operations.put(op.id(), updated); + operations.put(compositeKey(op.parentId(), op.id()), updated); } private Operation findOperationByCallbackId(String callbackId) { diff --git a/sdk/src/main/java/com/amazonaws/lambda/durable/DurableContext.java b/sdk/src/main/java/com/amazonaws/lambda/durable/DurableContext.java index e4ee71231..6e6305d78 100644 --- a/sdk/src/main/java/com/amazonaws/lambda/durable/DurableContext.java +++ b/sdk/src/main/java/com/amazonaws/lambda/durable/DurableContext.java @@ -6,6 +6,7 @@ import com.amazonaws.lambda.durable.execution.ThreadType; import com.amazonaws.lambda.durable.logging.DurableLogger; import com.amazonaws.lambda.durable.operation.CallbackOperation; +import com.amazonaws.lambda.durable.operation.ChildContextOperation; import com.amazonaws.lambda.durable.operation.InvokeOperation; import com.amazonaws.lambda.durable.operation.StepOperation; import com.amazonaws.lambda.durable.operation.WaitOperation; @@ -14,6 +15,7 @@ import java.time.Duration; import java.util.Objects; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Function; import java.util.function.Supplier; import org.slf4j.LoggerFactory; @@ -26,14 +28,19 @@ public class DurableContext { private final AtomicInteger operationCounter; private final DurableLogger logger; private final ExecutionContext executionContext; + private final String contextId; + private boolean isReplaying; - DurableContext( + /** Shared initialization — sets all fields but performs no thread registration. */ + private DurableContext( ExecutionManager executionManager, DurableConfig durableConfig, Context lambdaContext, String contextId) { this.executionManager = executionManager; this.durableConfig = durableConfig; this.lambdaContext = lambdaContext; + this.contextId = contextId; this.operationCounter = new AtomicInteger(0); this.executionContext = new ExecutionContext(executionManager.getDurableExecutionArn()); + this.isReplaying = executionManager.hasOperationsForContext(contextId); var requestId = lambdaContext != null ? lambdaContext.getAwsRequestId() : null; this.logger = new DurableLogger( @@ -41,14 +48,41 @@ public class DurableContext { executionManager, requestId, durableConfig.getLoggerConfig().suppressReplayLogs()); + } - // Register root context thread as active - executionManager.registerActiveThread(contextId, ThreadType.CONTEXT); - executionManager.setCurrentContext(contextId, ThreadType.CONTEXT); + /** + * Creates a root context and registers the current thread for execution coordination. + * + *

The context itself always has a null contextId (making it a root context). The thread is registered with the + * ExecutionManager using the default {@link #ROOT_CONTEXT} identifier. + * + * @param executionManager the execution manager + * @param durableConfig the durable configuration + * @param lambdaContext the Lambda context + * @return a new root DurableContext + */ + static DurableContext createRootContext( + ExecutionManager executionManager, DurableConfig durableConfig, Context lambdaContext) { + var ctx = new DurableContext(executionManager, durableConfig, lambdaContext, null); + executionManager.registerActiveThread(ROOT_CONTEXT, ThreadType.CONTEXT); + executionManager.setCurrentContext(ROOT_CONTEXT, ThreadType.CONTEXT); + return ctx; } - DurableContext(ExecutionManager executionManager, DurableConfig config, Context lambdaContext) { - this(executionManager, config, lambdaContext, ROOT_CONTEXT); + /** + * Creates a child context without registering the current thread. Thread registration is handled by + * ChildContextOperation, which registers on the parent thread before the executor runs and sets the context on the + * child thread inside the executor. + * + * @param executionManager the execution manager + * @param durableConfig the durable configuration + * @param lambdaContext the Lambda context + * @param contextId the child context's ID (the CONTEXT operation's operation ID) + * @return a new DurableContext for the child context + */ + public static DurableContext createChildContext( + ExecutionManager executionManager, DurableConfig durableConfig, Context lambdaContext, String contextId) { + return new DurableContext(executionManager, durableConfig, lambdaContext, contextId); } // ========== step methods ========== @@ -94,7 +128,7 @@ public DurableFuture stepAsync(String name, TypeToken typeToken, Suppl // Create and start step operation with TypeToken var operation = new StepOperation<>( - operationId, name, func, typeToken, config, executionManager, logger, durableConfig); + operationId, name, func, typeToken, config, executionManager, logger, durableConfig, contextId); operation.execute(); // Start the step (returns immediately) @@ -112,7 +146,7 @@ public Void wait(String waitName, Duration duration) { var operationId = nextOperationId(); // Create and start wait operation - var operation = new WaitOperation(operationId, waitName, duration, executionManager); + var operation = new WaitOperation(operationId, waitName, duration, executionManager, contextId); operation.execute(); // Checkpoint the wait return operation.get(); // Block (will throw SuspendExecutionException if needed) @@ -181,8 +215,8 @@ public DurableFuture invokeAsync( var operationId = nextOperationId(); // Create and start invoke operation - var operation = - new InvokeOperation<>(operationId, name, functionName, payload, typeToken, config, executionManager); + var operation = new InvokeOperation<>( + operationId, name, functionName, payload, typeToken, config, executionManager, contextId); operation.execute(); // checkpoint the invoke operation return operation; // Block (will throw SuspendExecutionException if needed) @@ -208,12 +242,47 @@ public DurableCallbackFuture createCallback(String name, TypeToken typ } var operationId = nextOperationId(); - var operation = new CallbackOperation<>(operationId, name, typeToken, config, executionManager); + var operation = new CallbackOperation<>(operationId, name, typeToken, config, executionManager, contextId); operation.execute(); return operation; } + // ========== runInChildContext methods ========== + + public T runInChildContext(String name, Class resultType, Function func) { + return runInChildContextAsync(name, TypeToken.get(resultType), func).get(); + } + + public T runInChildContext(String name, TypeToken typeToken, Function func) { + return runInChildContextAsync(name, typeToken, func).get(); + } + + public DurableFuture runInChildContextAsync( + String name, Class resultType, Function func) { + return runInChildContextAsync(name, TypeToken.get(resultType), func); + } + + public DurableFuture runInChildContextAsync( + String name, TypeToken typeToken, Function func) { + Objects.requireNonNull(typeToken, "typeToken cannot be null"); + var operationId = nextOperationId(); + + var operation = new ChildContextOperation<>( + operationId, + name, + func, + typeToken, + durableConfig.getSerDes(), + executionManager, + durableConfig, + lambdaContext, + contextId); + + operation.execute(); + return operation; + } + // =============== accessors ================ public Context getLambdaContext() { @@ -239,8 +308,30 @@ public ExecutionContext getExecutionContext() { // ============= internal utilities =============== - /** Get the next operationId (latest operationId + 1) */ + /** Gets the context ID for this context. Null for root context, set for child contexts. */ + String getContextId() { + return contextId; + } + + /** Returns whether this context is currently in replay mode. */ + boolean isReplaying() { + return isReplaying; + } + + /** + * Transitions this context from replay to execution mode. Called when the first un-cached operation is encountered. + */ + void setExecutionMode() { + this.isReplaying = false; + } + + /** + * Get the next operationId. For root contexts, returns sequential IDs like "1", "2", "3". For child contexts, + * prefixes with the contextId to ensure global uniqueness, e.g. "1-1", "1-2" for operations inside child context + * "1". This matches the JavaScript SDK's stepPrefix convention and prevents ID collisions in checkpoint batches. + */ private String nextOperationId() { - return String.valueOf(operationCounter.incrementAndGet()); + var counter = String.valueOf(operationCounter.incrementAndGet()); + return contextId != null ? contextId + "-" + counter : counter; } } diff --git a/sdk/src/main/java/com/amazonaws/lambda/durable/DurableExecutor.java b/sdk/src/main/java/com/amazonaws/lambda/durable/DurableExecutor.java index bb04a3db0..00a0f9e52 100644 --- a/sdk/src/main/java/com/amazonaws/lambda/durable/DurableExecutor.java +++ b/sdk/src/main/java/com/amazonaws/lambda/durable/DurableExecutor.java @@ -65,7 +65,7 @@ public static DurableExecutionOutput execute( var userInput = extractUserInput(executionManager.getExecutionOperation(), config.getSerDes(), inputType); // Create context in the executor thread so it detects the correct thread name - var context = new DurableContext(executionManager, config, lambdaContext); + var context = DurableContext.createRootContext(executionManager, config, lambdaContext); return handler.apply(userInput, context); }, config.getExecutorService()); // Get executor from config for running user code diff --git a/sdk/src/main/java/com/amazonaws/lambda/durable/exception/ChildContextFailedException.java b/sdk/src/main/java/com/amazonaws/lambda/durable/exception/ChildContextFailedException.java new file mode 100644 index 000000000..3a0b5ebc0 --- /dev/null +++ b/sdk/src/main/java/com/amazonaws/lambda/durable/exception/ChildContextFailedException.java @@ -0,0 +1,26 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 +package com.amazonaws.lambda.durable.exception; + +import software.amazon.awssdk.services.lambda.model.ErrorObject; +import software.amazon.awssdk.services.lambda.model.Operation; + +/** Exception thrown when a child context fails and the original exception cannot be reconstructed. */ +public class ChildContextFailedException extends DurableOperationException { + public ChildContextFailedException(Operation operation) { + super(operation, getError(operation), formatMessage(getError(operation))); + } + + private static ErrorObject getError(Operation operation) { + return operation.contextDetails() != null ? operation.contextDetails().error() : null; + } + + private static String formatMessage(ErrorObject errorObject) { + if (errorObject == null) { + return "Child context failed without an error"; + } + return String.format( + "Child context failed with error of type %s. Message: %s", + errorObject.errorType(), errorObject.errorMessage()); + } +} diff --git a/sdk/src/main/java/com/amazonaws/lambda/durable/execution/ExecutionManager.java b/sdk/src/main/java/com/amazonaws/lambda/durable/execution/ExecutionManager.java index 03a845bff..8f1187960 100644 --- a/sdk/src/main/java/com/amazonaws/lambda/durable/execution/ExecutionManager.java +++ b/sdk/src/main/java/com/amazonaws/lambda/durable/execution/ExecutionManager.java @@ -10,6 +10,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; @@ -36,6 +37,9 @@ *

This is the single entry point for all execution coordination. Internal coordination (polling, checkpointing) uses * a dedicated SDK thread pool, while user-defined operations run on a customer-configured executor. * + *

Operations are keyed by their globally unique operation ID. Child context operations use prefixed IDs (e.g., + * "1-1", "1-2") to avoid collisions with root-level operations. + * * @see InternalExecutor */ public class ExecutionManager { @@ -107,19 +111,17 @@ private void onCheckpointComplete(List newOperations) { } /** - * Gets an operation by ID and updates replay state. Transitions from REPLAY to EXECUTION mode if the operation is - * not found or is not in a terminal state (still in progress). + * Gets an operation by its globally unique operationId, and updates replay state. Transitions from REPLAY to + * EXECUTION mode if the operation is not found or is not in a terminal state (still in progress). * - * @param operationId the operation ID to get + * @param operationId the globally unique operation ID (e.g., "1" for root, "1-1" for child context) * @return the existing operation, or null if not found (first execution) */ public Operation getOperationAndUpdateReplayState(String operationId) { var existing = operationStorage.get(operationId); - if (executionMode.get() == ExecutionMode.REPLAY) { - if (existing == null || !isTerminalStatus(existing.status())) { - if (executionMode.compareAndSet(ExecutionMode.REPLAY, ExecutionMode.EXECUTION)) { - logger.debug("Transitioned to EXECUTION mode at operation '{}'", operationId); - } + if (executionMode.get() == ExecutionMode.REPLAY && (existing == null || !isTerminalStatus(existing.status()))) { + if (executionMode.compareAndSet(ExecutionMode.REPLAY, ExecutionMode.EXECUTION)) { + logger.debug("Transitioned to EXECUTION mode at operation '{}'", operationId); } } return existing; @@ -129,6 +131,17 @@ public Operation getExecutionOperation() { return operationStorage.get(executionOperationId); } + /** + * Checks whether there are any cached operations for the given parent context ID. Used to initialize per-context + * replay state — a context starts in replay mode if the ExecutionManager has cached operations belonging to it. + * + * @param parentId the context ID to check (null for root context) + * @return true if at least one operation exists with the given parentId + */ + public boolean hasOperationsForContext(String parentId) { + return operationStorage.values().stream().anyMatch(op -> Objects.equals(op.parentId(), parentId)); + } + // ===== Thread Coordination ===== /** * Registers a thread as active without setting the thread local OperationContext. Use this when registration must diff --git a/sdk/src/main/java/com/amazonaws/lambda/durable/model/OperationSubType.java b/sdk/src/main/java/com/amazonaws/lambda/durable/model/OperationSubType.java new file mode 100644 index 000000000..c300ae6d3 --- /dev/null +++ b/sdk/src/main/java/com/amazonaws/lambda/durable/model/OperationSubType.java @@ -0,0 +1,31 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 +package com.amazonaws.lambda.durable.model; + +/** + * Fine-grained classification of durable operations beyond the basic operation types. + * + *

Used as the {@code subType} field in checkpoint updates for {@code CONTEXT} operations. Matches the + * {@code OperationSubType} enum in the JavaScript and Python durable execution SDKs. + */ +public enum OperationSubType { + RUN_IN_CHILD_CONTEXT("RunInChildContext"), + MAP("Map"), + PARALLEL("Parallel"); + + private final String value; + + OperationSubType(String value) { + this.value = value; + } + + /** Returns the wire-format string value sent in checkpoint updates. */ + public String getValue() { + return value; + } + + @Override + public String toString() { + return value; + } +} diff --git a/sdk/src/main/java/com/amazonaws/lambda/durable/operation/BaseDurableOperation.java b/sdk/src/main/java/com/amazonaws/lambda/durable/operation/BaseDurableOperation.java index 8aa1c2e56..443638e13 100644 --- a/sdk/src/main/java/com/amazonaws/lambda/durable/operation/BaseDurableOperation.java +++ b/sdk/src/main/java/com/amazonaws/lambda/durable/operation/BaseDurableOperation.java @@ -46,6 +46,7 @@ public abstract class BaseDurableOperation implements DurableFuture { private final String operationId; private final String name; + private final String parentId; private final OperationType operationType; private final ExecutionManager executionManager; private final TypeToken resultTypeToken; @@ -58,9 +59,11 @@ protected BaseDurableOperation( OperationType operationType, TypeToken resultTypeToken, SerDes resultSerDes, - ExecutionManager executionManager) { + ExecutionManager executionManager, + String parentId) { this.operationId = operationId; this.name = name; + this.parentId = parentId; this.operationType = operationType; this.executionManager = executionManager; this.resultTypeToken = resultTypeToken; @@ -72,6 +75,17 @@ protected BaseDurableOperation( executionManager.registerOperation(this); } + /** Convenience constructor for root-context operations where parentId is null. */ + public BaseDurableOperation( + String operationId, + String name, + OperationType operationType, + TypeToken resultTypeToken, + SerDes resultSerDes, + ExecutionManager executionManager) { + this(operationId, name, operationType, resultTypeToken, resultSerDes, executionManager, null); + } + /** Gets the unique identifier for this operation. */ public String getOperationId() { return operationId; @@ -82,6 +96,11 @@ public String getName() { return name; } + /** Gets the parent context ID. Null for root-context operations, set for child context operations. */ + protected String getParentId() { + return parentId; + } + /** Gets the operation type */ public OperationType getType() { return operationType; @@ -92,7 +111,7 @@ public OperationType getType() { /** * Gets the Operation from ExecutionManager and update the replay state from REPLAY to EXECUTE if operation is not - * found + * found. Operation IDs are globally unique (prefixed for child contexts), so no parentId is needed for lookups. * * @return the operation if found, otherwise null */ @@ -220,11 +239,10 @@ protected void sendOperationUpdate(OperationUpdate.Builder builder) { } protected CompletableFuture sendOperationUpdateAsync(OperationUpdate.Builder builder) { - // todo: add parentId when we support operations in child context return executionManager.sendOperationUpdate(builder.id(operationId) .name(name) .type(operationType) - .parentId(null) + .parentId(parentId) .build()); } diff --git a/sdk/src/main/java/com/amazonaws/lambda/durable/operation/CallbackOperation.java b/sdk/src/main/java/com/amazonaws/lambda/durable/operation/CallbackOperation.java index a10034589..5509b2166 100644 --- a/sdk/src/main/java/com/amazonaws/lambda/durable/operation/CallbackOperation.java +++ b/sdk/src/main/java/com/amazonaws/lambda/durable/operation/CallbackOperation.java @@ -29,11 +29,22 @@ public CallbackOperation( String name, TypeToken resultTypeToken, CallbackConfig config, - ExecutionManager executionManager) { - super(operationId, name, OperationType.CALLBACK, resultTypeToken, config.serDes(), executionManager); + ExecutionManager executionManager, + String parentId) { + super(operationId, name, OperationType.CALLBACK, resultTypeToken, config.serDes(), executionManager, parentId); this.config = config; } + /** Convenience constructor for root-context operations where parentId is null. */ + public CallbackOperation( + String operationId, + String name, + TypeToken resultTypeToken, + CallbackConfig config, + ExecutionManager executionManager) { + this(operationId, name, resultTypeToken, config, executionManager, null); + } + public String callbackId() { return callbackId; } diff --git a/sdk/src/main/java/com/amazonaws/lambda/durable/operation/ChildContextOperation.java b/sdk/src/main/java/com/amazonaws/lambda/durable/operation/ChildContextOperation.java new file mode 100644 index 000000000..d3363d1c6 --- /dev/null +++ b/sdk/src/main/java/com/amazonaws/lambda/durable/operation/ChildContextOperation.java @@ -0,0 +1,204 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 +package com.amazonaws.lambda.durable.operation; + +import static com.amazonaws.lambda.durable.model.OperationSubType.RUN_IN_CHILD_CONTEXT; + +import com.amazonaws.lambda.durable.DurableConfig; +import com.amazonaws.lambda.durable.DurableContext; +import com.amazonaws.lambda.durable.TypeToken; +import com.amazonaws.lambda.durable.exception.ChildContextFailedException; +import com.amazonaws.lambda.durable.exception.DurableOperationException; +import com.amazonaws.lambda.durable.exception.UnrecoverableDurableExecutionException; +import com.amazonaws.lambda.durable.execution.ExecutionManager; +import com.amazonaws.lambda.durable.execution.SuspendExecutionException; +import com.amazonaws.lambda.durable.execution.ThreadType; +import com.amazonaws.lambda.durable.serde.SerDes; +import com.amazonaws.lambda.durable.util.ExceptionHelper; +import com.amazonaws.services.lambda.runtime.Context; +import java.nio.charset.StandardCharsets; +import java.util.concurrent.ExecutorService; +import java.util.function.Function; +import software.amazon.awssdk.services.lambda.model.ContextOptions; +import software.amazon.awssdk.services.lambda.model.ErrorObject; +import software.amazon.awssdk.services.lambda.model.OperationAction; +import software.amazon.awssdk.services.lambda.model.OperationStatus; +import software.amazon.awssdk.services.lambda.model.OperationType; +import software.amazon.awssdk.services.lambda.model.OperationUpdate; + +/** + * Manages the lifecycle of a child execution context. + * + *

A child context runs a user function in a separate thread with its own operation counter and checkpoint log. + * Operations within the child context use the child's context ID as their parentId. + */ +public class ChildContextOperation extends BaseDurableOperation { + + private static final int LARGE_RESULT_THRESHOLD = 256 * 1024; + + private final Function function; + private final DurableConfig durableConfig; + private final Context lambdaContext; + private final ExecutionManager executionManager; + private final ExecutorService userExecutor; + private boolean replayChildContext; + private T reconstructedResult; + + public ChildContextOperation( + String operationId, + String name, + Function function, + TypeToken resultTypeToken, + SerDes resultSerDes, + ExecutionManager executionManager, + DurableConfig durableConfig, + Context lambdaContext, + String parentId) { + super(operationId, name, OperationType.CONTEXT, resultTypeToken, resultSerDes, executionManager, parentId); + this.function = function; + this.durableConfig = durableConfig; + this.lambdaContext = lambdaContext; + this.executionManager = executionManager; + this.userExecutor = durableConfig.getExecutorService(); + } + + @Override + public void execute() { + var existing = getOperation(); + + if (existing != null) { + validateReplay(existing); + switch (existing.status()) { + case SUCCEEDED -> { + if (existing.contextDetails() != null + && Boolean.TRUE.equals(existing.contextDetails().replayChildren())) { + // Large result: re-execute child context to reconstruct result + replayChildContext = true; + executeChildContext(); + } else { + markAlreadyCompleted(); + } + } + case FAILED -> markAlreadyCompleted(); + case STARTED -> executeChildContext(); + default -> + terminateExecutionWithIllegalDurableOperationException( + "Unexpected child context status: " + existing.status()); + } + } else { + // First execution: fire-and-forget START checkpoint, then run + sendOperationUpdateAsync( + OperationUpdate.builder().action(OperationAction.START).subType(RUN_IN_CHILD_CONTEXT.getValue())); + executeChildContext(); + } + } + + private void executeChildContext() { + // The operationId is already globally unique (prefixed by parent context path via + // DurableContext.nextOperationId), so we use it directly as the contextId. + // E.g., root child context "1", nested child context "1-2", deeply nested "1-2-1". + var contextId = getOperationId(); + + // Thread registration is intentionally split across two threads: + // 1. registerActiveThread on the PARENT thread — ensures the child is tracked before the + // parent can deregister and trigger suspension (race prevention). + // 2. setCurrentContext on the CHILD thread — sets the ThreadLocal so operations inside + // the child context know which context they belong to. + // registerActiveThread is idempotent (no-op if already registered). + registerActiveThread(contextId, ThreadType.CONTEXT); + + userExecutor.execute(() -> { + setCurrentContext(contextId, ThreadType.CONTEXT); + try { + var childContext = + DurableContext.createChildContext(executionManager, durableConfig, lambdaContext, contextId); + + T result = function.apply(childContext); + + if (replayChildContext) { + // Replaying a SUCCEEDED child with replayChildren=true — skip checkpointing. + // Advance the phaser so get() doesn't block waiting for a checkpoint response. + this.reconstructedResult = result; + markAlreadyCompleted(); + return; + } + + checkpointSuccess(result); + } catch (Throwable e) { + handleChildContextFailure(e); + } finally { + try { + deregisterActiveThreadAndUnsetCurrentContext(contextId); + } catch (SuspendExecutionException e) { + // Expected when this is the last active thread — suspension already signaled + } + } + }); + } + + private void checkpointSuccess(T result) { + var serialized = serializeResult(result); + var serializedBytes = serialized.getBytes(StandardCharsets.UTF_8); + + if (serializedBytes.length < LARGE_RESULT_THRESHOLD) { + sendOperationUpdate(OperationUpdate.builder() + .action(OperationAction.SUCCEED) + .subType(RUN_IN_CHILD_CONTEXT.getValue()) + .payload(serialized)); + } else { + // Large result: checkpoint with empty payload + ReplayChildren flag. + // Store the result so get() can return it directly without deserializing the empty payload. + this.reconstructedResult = result; + sendOperationUpdate(OperationUpdate.builder() + .action(OperationAction.SUCCEED) + .subType(RUN_IN_CHILD_CONTEXT.getValue()) + .payload("") + .contextOptions( + ContextOptions.builder().replayChildren(true).build())); + } + } + + private void handleChildContextFailure(Throwable exception) { + exception = ExceptionHelper.unwrapCompletableFuture(exception); + if (exception instanceof UnrecoverableDurableExecutionException) { + terminateExecution((UnrecoverableDurableExecutionException) exception); + } + + final ErrorObject errorObject; + if (exception instanceof DurableOperationException opEx) { + errorObject = opEx.getErrorObject(); + } else { + errorObject = serializeException(exception); + } + + sendOperationUpdate(OperationUpdate.builder() + .action(OperationAction.FAIL) + .subType(RUN_IN_CHILD_CONTEXT.getValue()) + .error(errorObject)); + } + + @Override + public T get() { + var op = waitForOperationCompletion(); + + if (op.status() == OperationStatus.SUCCEEDED) { + if (reconstructedResult != null) { + return reconstructedResult; + } + var contextDetails = op.contextDetails(); + var result = (contextDetails != null) ? contextDetails.result() : null; + return deserializeResult(result); + } else { + var contextDetails = op.contextDetails(); + var errorObject = (contextDetails != null) ? contextDetails.error() : null; + + // Attempt to reconstruct and throw the original exception + Throwable original = deserializeException(errorObject); + if (original != null) { + ExceptionHelper.sneakyThrow(original); + } + // Fallback: wrap in ChildContextFailedException + throw new ChildContextFailedException(op); + } + } +} diff --git a/sdk/src/main/java/com/amazonaws/lambda/durable/operation/InvokeOperation.java b/sdk/src/main/java/com/amazonaws/lambda/durable/operation/InvokeOperation.java index 327c34bb5..64fa83fe2 100644 --- a/sdk/src/main/java/com/amazonaws/lambda/durable/operation/InvokeOperation.java +++ b/sdk/src/main/java/com/amazonaws/lambda/durable/operation/InvokeOperation.java @@ -28,8 +28,16 @@ public InvokeOperation( U payload, TypeToken resultTypeToken, InvokeConfig config, - ExecutionManager executionManager) { - super(operationId, name, OperationType.CHAINED_INVOKE, resultTypeToken, config.serDes(), executionManager); + ExecutionManager executionManager, + String parentId) { + super( + operationId, + name, + OperationType.CHAINED_INVOKE, + resultTypeToken, + config.serDes(), + executionManager, + parentId); this.functionName = functionName; this.payload = payload; @@ -37,6 +45,18 @@ public InvokeOperation( this.payloadSerDes = config.payloadSerDes() != null ? config.payloadSerDes() : config.serDes(); } + /** Convenience constructor for root-context operations where parentId is null. */ + public InvokeOperation( + String operationId, + String name, + String functionName, + U payload, + TypeToken resultTypeToken, + InvokeConfig config, + ExecutionManager executionManager) { + this(operationId, name, functionName, payload, resultTypeToken, config, executionManager, null); + } + /** Starts the operation. Returns immediately after starting background work or checkpointing. Does not block. */ @Override public void execute() { diff --git a/sdk/src/main/java/com/amazonaws/lambda/durable/operation/StepOperation.java b/sdk/src/main/java/com/amazonaws/lambda/durable/operation/StepOperation.java index ea91f31a7..973bd65d3 100644 --- a/sdk/src/main/java/com/amazonaws/lambda/durable/operation/StepOperation.java +++ b/sdk/src/main/java/com/amazonaws/lambda/durable/operation/StepOperation.java @@ -44,8 +44,9 @@ public StepOperation( StepConfig config, ExecutionManager executionManager, DurableLogger durableLogger, - DurableConfig durableConfig) { - super(operationId, name, OperationType.STEP, resultTypeToken, config.serDes(), executionManager); + DurableConfig durableConfig, + String parentId) { + super(operationId, name, OperationType.STEP, resultTypeToken, config.serDes(), executionManager, parentId); this.function = function; this.config = config; @@ -53,6 +54,28 @@ public StepOperation( this.userExecutor = durableConfig.getExecutorService(); } + /** Convenience constructor for root-context operations where parentId is null. */ + public StepOperation( + String operationId, + String name, + Supplier function, + TypeToken resultTypeToken, + StepConfig config, + ExecutionManager executionManager, + DurableLogger durableLogger, + DurableConfig durableConfig) { + this( + operationId, + name, + function, + resultTypeToken, + config, + executionManager, + durableLogger, + durableConfig, + null); + } + @Override public void execute() { var existing = getOperation(); @@ -98,7 +121,6 @@ private CompletableFuture pollReadyAndExecuteStepLogic(int attempt) { } private void executeStepLogic(int attempt) { - // TODO: Modify this logic when child contexts are introduced such that the child context id is in this key var stepThreadId = getOperationId() + "-step"; // Register step thread as active BEFORE executor runs (prevents suspension when handler deregisters) diff --git a/sdk/src/main/java/com/amazonaws/lambda/durable/operation/WaitOperation.java b/sdk/src/main/java/com/amazonaws/lambda/durable/operation/WaitOperation.java index 86b9301aa..cc2ae9b4c 100644 --- a/sdk/src/main/java/com/amazonaws/lambda/durable/operation/WaitOperation.java +++ b/sdk/src/main/java/com/amazonaws/lambda/durable/operation/WaitOperation.java @@ -24,12 +24,25 @@ public class WaitOperation extends BaseDurableOperation { private final Duration duration; - public WaitOperation(String operationId, String name, Duration duration, ExecutionManager executionManager) { - super(operationId, name, OperationType.WAIT, TypeToken.get(Void.class), NOOP_SER_DES, executionManager); + public WaitOperation( + String operationId, String name, Duration duration, ExecutionManager executionManager, String parentId) { + super( + operationId, + name, + OperationType.WAIT, + TypeToken.get(Void.class), + NOOP_SER_DES, + executionManager, + parentId); ParameterValidator.validateDuration(duration, "Wait duration"); this.duration = duration; } + /** Convenience constructor for root-context operations where parentId is null. */ + public WaitOperation(String operationId, String name, Duration duration, ExecutionManager executionManager) { + this(operationId, name, duration, executionManager, null); + } + @Override public void execute() { var existing = getOperation(); diff --git a/sdk/src/test/java/com/amazonaws/lambda/durable/DurableContextTest.java b/sdk/src/test/java/com/amazonaws/lambda/durable/DurableContextTest.java index dfd71115a..acde71f0a 100644 --- a/sdk/src/test/java/com/amazonaws/lambda/durable/DurableContextTest.java +++ b/sdk/src/test/java/com/amazonaws/lambda/durable/DurableContextTest.java @@ -34,7 +34,8 @@ private DurableContext createTestContext(List initialOperations) { "test-token", initialExecutionState, DurableConfig.builder().withDurableExecutionClient(client).build()); - return new DurableContext(executionManager, DurableConfig.builder().build(), null); + return DurableContext.createRootContext( + executionManager, DurableConfig.builder().build(), null); } @Test diff --git a/sdk/src/test/java/com/amazonaws/lambda/durable/ReplayValidationTest.java b/sdk/src/test/java/com/amazonaws/lambda/durable/ReplayValidationTest.java index c40129f30..775cd971c 100644 --- a/sdk/src/test/java/com/amazonaws/lambda/durable/ReplayValidationTest.java +++ b/sdk/src/test/java/com/amazonaws/lambda/durable/ReplayValidationTest.java @@ -36,7 +36,8 @@ private DurableContext createTestContext(List initialOperations) { "test-token", initialExecutionState, DurableConfig.builder().withDurableExecutionClient(client).build()); - return new DurableContext(executionManager, DurableConfig.builder().build(), null); + return DurableContext.createRootContext( + executionManager, DurableConfig.builder().build(), null); } @Test diff --git a/sdk/src/test/java/com/amazonaws/lambda/durable/operation/ChildContextOperationTest.java b/sdk/src/test/java/com/amazonaws/lambda/durable/operation/ChildContextOperationTest.java new file mode 100644 index 000000000..c63212eda --- /dev/null +++ b/sdk/src/test/java/com/amazonaws/lambda/durable/operation/ChildContextOperationTest.java @@ -0,0 +1,250 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 +package com.amazonaws.lambda.durable.operation; + +import static org.junit.jupiter.api.Assertions.*; +import static org.mockito.Mockito.*; + +import com.amazonaws.lambda.durable.DurableConfig; +import com.amazonaws.lambda.durable.TypeToken; +import com.amazonaws.lambda.durable.exception.ChildContextFailedException; +import com.amazonaws.lambda.durable.exception.NonDeterministicExecutionException; +import com.amazonaws.lambda.durable.execution.ExecutionManager; +import com.amazonaws.lambda.durable.execution.OperationContext; +import com.amazonaws.lambda.durable.execution.ThreadType; +import com.amazonaws.lambda.durable.serde.JacksonSerDes; +import java.util.List; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicBoolean; +import org.junit.jupiter.api.Test; +import software.amazon.awssdk.services.lambda.model.ContextDetails; +import software.amazon.awssdk.services.lambda.model.ErrorObject; +import software.amazon.awssdk.services.lambda.model.Operation; +import software.amazon.awssdk.services.lambda.model.OperationStatus; +import software.amazon.awssdk.services.lambda.model.OperationType; + +/** Unit tests for ChildContextOperation. */ +class ChildContextOperationTest { + + private static final JacksonSerDes SERDES = new JacksonSerDes(); + + private ExecutionManager createMockExecutionManager() { + var executionManager = mock(ExecutionManager.class); + when(executionManager.getCurrentContext()).thenReturn(new OperationContext("Root", ThreadType.CONTEXT)); + return executionManager; + } + + private DurableConfig createConfig() { + return DurableConfig.builder() + .withExecutorService(Executors.newCachedThreadPool()) + .build(); + } + + private ChildContextOperation createOperation( + ExecutionManager executionManager, + java.util.function.Function func) { + return new ChildContextOperation<>( + "1", + "test-context", + func, + TypeToken.get(String.class), + SERDES, + executionManager, + createConfig(), + null, + null); + } + + // ===== SUCCEEDED replay ===== + + /** SUCCEEDED replay returns cached result without re-executing the function. */ + @Test + void replaySucceededReturnsCachedResult() { + var executionManager = createMockExecutionManager(); + when(executionManager.getOperationAndUpdateReplayState("1")) + .thenReturn(Operation.builder() + .id("1") + .name("test-context") + .type(OperationType.CONTEXT) + .status(OperationStatus.SUCCEEDED) + .contextDetails(ContextDetails.builder() + .result("\"cached-value\"") + .build()) + .build()); + + var functionCalled = new AtomicBoolean(false); + var operation = createOperation(executionManager, ctx -> { + functionCalled.set(true); + return "should-not-execute"; + }); + + operation.execute(); + var result = operation.get(); + + assertEquals("cached-value", result); + assertFalse(functionCalled.get(), "Function should not be called during SUCCEEDED replay"); + } + + // ===== FAILED replay ===== + + /** FAILED replay throws the original exception without re-executing. */ + @Test + void replayFailedThrowsOriginalException() { + var executionManager = createMockExecutionManager(); + var originalException = new IllegalArgumentException("bad input"); + var stackTrace = List.of("com.example.Test|method|Test.java|42"); + + when(executionManager.getOperationAndUpdateReplayState("1")) + .thenReturn(Operation.builder() + .id("1") + .name("test-context") + .type(OperationType.CONTEXT) + .status(OperationStatus.FAILED) + .contextDetails(ContextDetails.builder() + .error(ErrorObject.builder() + .errorType("java.lang.IllegalArgumentException") + .errorMessage("bad input") + .errorData(SERDES.serialize(originalException)) + .stackTrace(stackTrace) + .build()) + .build()) + .build()); + + var functionCalled = new AtomicBoolean(false); + var operation = createOperation(executionManager, ctx -> { + functionCalled.set(true); + return "should-not-execute"; + }); + + operation.execute(); + + var thrown = assertThrows(IllegalArgumentException.class, operation::get); + assertEquals("bad input", thrown.getMessage()); + assertFalse(functionCalled.get(), "Function should not be called during FAILED replay"); + } + + /** FAILED replay falls back to ChildContextFailedException when original cannot be reconstructed. */ + @Test + void replayFailedFallsBackToChildContextFailedException() { + var executionManager = createMockExecutionManager(); + + when(executionManager.getOperationAndUpdateReplayState("1")) + .thenReturn(Operation.builder() + .id("1") + .name("test-context") + .type(OperationType.CONTEXT) + .status(OperationStatus.FAILED) + .contextDetails(ContextDetails.builder() + .error(ErrorObject.builder() + .errorType("com.nonexistent.SomeException") + .errorMessage("unknown error") + .stackTrace(List.of("com.example.Test|method|Test.java|1")) + .build()) + .build()) + .build()); + + var operation = createOperation(executionManager, ctx -> "unused"); + operation.execute(); + + var thrown = assertThrows(ChildContextFailedException.class, operation::get); + assertTrue(thrown.getMessage().contains("com.nonexistent.SomeException")); + assertTrue(thrown.getMessage().contains("unknown error")); + } + + // ===== Replay STARTED ===== + + /** STARTED replay re-executes the child context (interrupted mid-execution). */ + @Test + void replayStartedReExecutesChildContext() throws Exception { + var executionManager = createMockExecutionManager(); + when(executionManager.getOperationAndUpdateReplayState("1")) + .thenReturn(Operation.builder() + .id("1") + .name("test-context") + .type(OperationType.CONTEXT) + .status(OperationStatus.STARTED) + .build()); + // hasOperationsForContext for the child context ID "1" + when(executionManager.hasOperationsForContext("1")).thenReturn(false); + + var functionCalled = new AtomicBoolean(false); + var operation = createOperation(executionManager, ctx -> { + functionCalled.set(true); + return "re-executed"; + }); + + operation.execute(); + + // Give the executor thread time to run + Thread.sleep(100); + assertTrue(functionCalled.get(), "Function should be re-executed for STARTED replay"); + } + + // ===== ReplayChildren path ===== + + /** SUCCEEDED with replayChildren=true re-executes to reconstruct the result. */ + @Test + void replayChildrenReExecutesToReconstructResult() throws Exception { + var executionManager = createMockExecutionManager(); + when(executionManager.getOperationAndUpdateReplayState("1")) + .thenReturn(Operation.builder() + .id("1") + .name("test-context") + .type(OperationType.CONTEXT) + .status(OperationStatus.SUCCEEDED) + .contextDetails( + ContextDetails.builder().replayChildren(true).build()) + .build()); + when(executionManager.hasOperationsForContext("1")).thenReturn(false); + + var functionCalled = new AtomicBoolean(false); + var operation = createOperation(executionManager, ctx -> { + functionCalled.set(true); + return "reconstructed-value"; + }); + + operation.execute(); + + // Give the executor thread time to run + Thread.sleep(100); + assertTrue(functionCalled.get(), "Function should be re-executed for replayChildren path"); + } + + // ===== Non-deterministic detection ===== + + /** Type mismatch during replay terminates execution. */ + @Test + void replayWithTypeMismatchTerminatesExecution() { + var executionManager = createMockExecutionManager(); + when(executionManager.getOperationAndUpdateReplayState("1")) + .thenReturn(Operation.builder() + .id("1") + .name("test-context") + .type(OperationType.STEP) // Wrong type — should be CONTEXT + .status(OperationStatus.SUCCEEDED) + .build()); + + var operation = createOperation(executionManager, ctx -> "unused"); + + assertThrows(NonDeterministicExecutionException.class, operation::execute); + } + + /** Name mismatch during replay terminates execution. */ + @Test + void replayWithNameMismatchTerminatesExecution() { + var executionManager = createMockExecutionManager(); + when(executionManager.getOperationAndUpdateReplayState("1")) + .thenReturn(Operation.builder() + .id("1") + .name("different-name") // Wrong name + .type(OperationType.CONTEXT) + .status(OperationStatus.SUCCEEDED) + .contextDetails( + ContextDetails.builder().result("\"value\"").build()) + .build()); + + var operation = createOperation(executionManager, ctx -> "unused"); + + assertThrows(NonDeterministicExecutionException.class, operation::execute); + } +} diff --git a/sdk/src/test/java/com/amazonaws/lambda/durable/operation/WaitOperationTest.java b/sdk/src/test/java/com/amazonaws/lambda/durable/operation/WaitOperationTest.java index 520d77859..2e750d258 100644 --- a/sdk/src/test/java/com/amazonaws/lambda/durable/operation/WaitOperationTest.java +++ b/sdk/src/test/java/com/amazonaws/lambda/durable/operation/WaitOperationTest.java @@ -6,7 +6,6 @@ import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.mockito.Mockito.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when;