-
Notifications
You must be signed in to change notification settings - Fork 7
Feat/child context #100
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Feat/child context #100
Changes from 16 commits
Commits
Show all changes
23 commits
Select commit
Hold shift + click to select a range
5a779b4
fix: testing
maschnetwork a411a40
feat: introduce OperationKey and refactor ExecutionManager for compos…
maschnetwork c49d8aa
feat(operations): add parentId support for child context operations
maschnetwork 0844880
feat(context): add replay state tracking and refactor context creation
maschnetwork 92b681d
feat: implement ChildContextOperation, ChildContextFailedException, a…
maschnetwork 707646e
feat: add public methods
maschnetwork 7777543
feat: added integration tests
maschnetwork c0c9023
feat: added edge case tests
maschnetwork b035edc
feat: cloud based tests
maschnetwork 490014b
feat: adapted readme and design to match implementation
maschnetwork df73635
feat: remove debug line from testing commit
maschnetwork b8c8bed
fix: naming and null handling
maschnetwork 75fbe8a
fix: use run instead of complete
maschnetwork 49cf1e9
fix: run for PENDING
maschnetwork 75a6da4
feat: simplify operation id
maschnetwork 8a2b3ab
fix: revert test changes for operation id
maschnetwork 3ac3b9a
fix: simplify testing
maschnetwork 842a106
fix: addressed review comments
maschnetwork 06e5233
Merge remote-tracking branch 'origin/main' into child-context
zhongkechen 1a4c8c4
fix: review comments
maschnetwork 765fbd5
fix: debug tests
maschnetwork c1b30a5
fix: used runUntilComplete for slow CI worker
maschnetwork 45ccbff
fix: ci comment
maschnetwork File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,134 @@ | ||
| # Design: `runInChildContext` — Isolated Execution Contexts | ||
|
|
||
| ## What is this? | ||
|
|
||
| `runInChildContext` lets developers create isolated sub-workflows inside a durable execution. Each child context gets its own operation counter and checkpoint log, enabling concurrent branches of work that each maintain independent replay state. | ||
|
|
||
| ```java | ||
| // Sync — blocks until child completes | ||
| String result = ctx.runInChildContext("process-order", String.class, child -> { | ||
| child.step("validate", Void.class, () -> validate(order)); | ||
| child.wait(Duration.ofMinutes(5)); | ||
| return child.step("charge", String.class, () -> charge(order)); | ||
| }); | ||
|
|
||
| // Async — run multiple child contexts concurrently | ||
| var futureA = ctx.runInChildContextAsync("branch-a", String.class, child -> { ... }); | ||
| var futureB = ctx.runInChildContextAsync("branch-b", String.class, child -> { ... }); | ||
| var results = DurableFuture.allOf(futureA, futureB); // preserves input order | ||
| ``` | ||
|
|
||
| Child contexts support the full range of durable operations internally: `step`, `wait`, `invoke`, `createCallback`, and nested `runInChildContext`. | ||
|
|
||
| ## Why? | ||
|
|
||
| This aligns the Java SDK with the TypeScript and Python reference implementations, which already support child contexts via `OperationType.CONTEXT`. It enables patterns like fan-out/fan-in, parallel processing branches, and hierarchical workflow composition. | ||
|
|
||
| ## How it works | ||
|
|
||
| ### Checkpoint lifecycle | ||
|
|
||
| A child context is a `CONTEXT` operation in the checkpoint log. Its lifecycle: | ||
|
|
||
| 1. **START** (fire-and-forget) — marks the child context as in-progress | ||
| 2. Inner operations checkpoint with `parentId` set to the child context's operation ID | ||
| 3. **SUCCEED** or **FAIL** (blocking) — finalizes the child context | ||
|
|
||
| ``` | ||
| Op ID | Parent ID | Type | Action | Payload | ||
| ------|-----------|---------|---------|-------- | ||
| 3 | null | CONTEXT | START | — | ||
| 3-1 | 3 | STEP | START | — | ||
| 3-1 | 3 | STEP | SUCCEED | "result" | ||
| 3 | null | CONTEXT | SUCCEED | "final result" | ||
| ``` | ||
|
|
||
| Inner operation IDs are prefixed with the parent context's operation ID using `-` as separator (e.g., `"3-1"`, `"3-2"`). This matches the JavaScript SDK's `stepPrefix` convention and ensures operation IDs are globally unique — the backend validates type consistency by operation ID, so bare sequential IDs inside child contexts would collide with root-level operations. | ||
|
|
||
| For nested child contexts, the prefix chains naturally (e.g., `"3-2-1"` for the first operation inside a nested child context that is operation `"2"` inside parent context `"3"`). | ||
|
|
||
| ### Replay behavior | ||
|
|
||
| | Cached status | Behavior | | ||
| |---------------|----------| | ||
| | SUCCEEDED | Return cached result (no re-execution) | | ||
| | SUCCEEDED + `replayChildren=true` | Re-execute child to reconstruct large result (>256KB); inner ops replay from cache; no new SUCCEED checkpoint | | ||
| | FAILED | Re-throw cached error | | ||
| | STARTED | Re-execute (was interrupted mid-flight) | | ||
|
|
||
| ### Operation ID prefixing | ||
|
|
||
| To ensure global uniqueness, `DurableContext.nextOperationId()` prefixes operation IDs with the context's `parentId` when inside a child context: | ||
|
|
||
| - Root context: IDs are `"1"`, `"2"`, `"3"` (no prefix) | ||
| - Child context `"1"`: IDs are `"1-1"`, `"1-2"`, `"1-3"` | ||
| - Nested child context `"1-2"`: IDs are `"1-2-1"`, `"1-2-2"` | ||
|
|
||
| ```java | ||
| private String nextOperationId() { | ||
| var counter = String.valueOf(operationCounter.incrementAndGet()); | ||
| return parentId != null ? parentId + "-" + counter : counter; | ||
| } | ||
| ``` | ||
|
|
||
| This matches the JavaScript SDK's `_stepPrefix` mechanism. The backend validates type consistency by operation ID alone, so without prefixing, a CONTEXT operation with ID `"1"` and an inner STEP with ID `"1"` (different `parentId`) would trigger an `InvalidParameterValueException`. | ||
|
|
||
| `ExecutionManager` still uses plain `String` keys (the globally unique operation ID) for its internal maps, since prefixed IDs are inherently unique across all contexts. | ||
|
|
||
| ### Thread model | ||
|
|
||
| Child context user code runs in a separate thread (same pattern as `StepOperation`): | ||
| - `registerActiveThread` before the executor runs | ||
| - `setCurrentContext` inside the executor thread | ||
| - `deregisterActiveThread` in the finally block | ||
| - `SuspendExecutionException` caught and swallowed (suspension already signaled via `executionExceptionFuture`) | ||
|
|
||
| ### Per-context replay state | ||
|
|
||
| The current global `executionMode` (REPLAY → EXECUTION) doesn't work for child contexts — a child may be replaying while the parent is already executing. Each `DurableContext` tracks its own replay state independently via an `isReplaying` field, initialized by checking `ExecutionManager.hasOperationsForContext(parentId)`. This matches the TypeScript SDK's per-entity approach. | ||
|
|
||
| The `DurableContext` stores its context identity in a `parentId` field — `null` for the root context, set to the qualified context ID for child contexts. This field is passed directly to operations as their `parentId` when constructing them. | ||
|
|
||
| ## Key changes by file | ||
|
|
||
| | File | Change | | ||
| |------|--------| | ||
| | `ChildContextOperation` (new) | Extends `BaseDurableOperation<T>`. Manages child context lifecycle, thread coordination, large result handling. Uses `getOperationId()` directly as `contextId` (already globally unique via prefixed IDs). | | ||
| | `ChildContextFailedException` (new) | Extends `DurableOperationException`. Wraps the `Operation` object; extracts error from `contextDetails()`. | | ||
| | `DurableContext` | New `runInChildContext`/`runInChildContextAsync` methods. New `createChildContext` factory (skips thread registration). Stores `parentId` field (null for root, contextId for child). Per-context replay tracking via `isReplaying` field. `nextOperationId()` prefixes with `parentId` for child contexts (e.g., `"1-1"`). | | ||
| | `BaseDurableOperation` | New `parentId` constructor parameter. `sendOperationUpdateAsync` uses it instead of hardcoded `null`. Protected `getParentId()` getter. | | ||
| | `ExecutionManager` | All maps (`operations`, `openPhasers`) use plain `String` keys (globally unique operation IDs). `getOperationAndUpdateReplayState` and `startPhaser` take a single `operationId` argument. New `hasOperationsForContext(parentId)` method for per-context replay initialization. | | ||
| | `StepOperation` | Thread ID uses `getOperationId() + "-step"` (operation IDs are globally unique via prefixing). | | ||
| | `LocalMemoryExecutionClient` | Handles `CONTEXT` operations (was `throw UnsupportedOperationException`). Propagates `parentId` for all operation types. | | ||
| | `HistoryEventProcessor` | Handles `CONTEXT_STARTED`, `CONTEXT_SUCCEEDED`, `CONTEXT_FAILED` events (was `throw UnsupportedOperationException`). Builds `ContextDetails` with result/error extraction. | | ||
|
|
||
| ## Large result handling | ||
|
|
||
| Results < 256KB (measured in UTF-8 bytes) are checkpointed directly. Results ≥ 256KB trigger the `ReplayChildren` flow: | ||
| - SUCCEED checkpoint sent with empty payload + `ContextOptions { replayChildren: true }` | ||
| - On replay, child context is re-executed; inner operations replay from cache | ||
| - No new SUCCEED checkpoint is created during reconstruction | ||
|
|
||
| `summaryGenerator` (optional compact summary for observability) is deferred for the initial implementation. | ||
|
|
||
| ## Orphan detection (deferred) | ||
|
|
||
| > **Status: Deferred** — orphan detection is not implemented in this release. The mechanism described below is the intended design for a future release, matching the Python and TypeScript SDKs. | ||
|
|
||
| When a parent CONTEXT completes, in-flight child operations must be prevented from checkpointing stale state. The `CheckpointBatcher` would track completed context IDs and silently skip checkpoints from orphaned operations. This matches both the Python SDK (`_parent_done` set) and TypeScript SDK (`markAncestorFinished`). | ||
|
|
||
| ## Error handling | ||
|
|
||
| `ChildContextFailedException` follows the same pattern as `StepFailedException`: | ||
| - Extends `DurableOperationException`, wrapping the `Operation` object | ||
| - Extracts the `ErrorObject` from `operation.contextDetails().error()` | ||
| - `get()` first attempts to reconstruct and re-throw the original exception | ||
| - Falls back to `ChildContextFailedException` if reconstruction fails | ||
|
|
||
| Exceptions from inner operations propagate up through the child context naturally. | ||
|
|
||
| ## What's deferred | ||
|
|
||
| - Orphan detection in `CheckpointBatcher` — preventing stale checkpoints from in-flight child operations after a parent CONTEXT completes (see "Orphan detection" section above) | ||
| - `summaryGenerator` for large-result observability | ||
| - Higher-level `map`/`parallel` combinators (different `SubType` values, same `CONTEXT` operation type) | ||
74 changes: 74 additions & 0 deletions
74
examples/src/main/java/com/amazonaws/lambda/durable/examples/ChildContextExample.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,74 @@ | ||
| // Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. | ||
| // SPDX-License-Identifier: Apache-2.0 | ||
| package com.amazonaws.lambda.durable.examples; | ||
|
|
||
| import com.amazonaws.lambda.durable.DurableContext; | ||
| import com.amazonaws.lambda.durable.DurableFuture; | ||
| import com.amazonaws.lambda.durable.DurableHandler; | ||
| import java.time.Duration; | ||
|
|
||
| /** | ||
| * Example demonstrating child context workflows with the Durable Execution SDK. | ||
| * | ||
| * <p>This handler runs three concurrent child contexts using {@code runInChildContextAsync}: | ||
| * | ||
| * <ol> | ||
| * <li><b>Order validation</b> — performs a step then suspends via {@code wait()} before completing | ||
| * <li><b>Inventory check</b> — performs a step then suspends via {@code wait()} before completing | ||
| * <li><b>Shipping estimate</b> — nests another child context inside it to demonstrate hierarchical contexts | ||
| * </ol> | ||
| * | ||
| * <p>All three child contexts run concurrently. Results are collected with {@link DurableFuture#allOf} and combined | ||
| * into a summary string. | ||
| */ | ||
| public class ChildContextExample extends DurableHandler<GreetingRequest, String> { | ||
|
|
||
| @Override | ||
| public String handleRequest(GreetingRequest input, DurableContext context) { | ||
| var name = input.getName(); | ||
| context.getLogger().info("Starting child context workflow for {}", name); | ||
|
|
||
| // Child context 1: Order validation — step + wait + step | ||
| var orderFuture = context.runInChildContextAsync("order-validation", String.class, child -> { | ||
| var prepared = child.step("prepare-order", String.class, () -> "Order for " + name); | ||
| context.getLogger().info("Order prepared, waiting for validation"); | ||
|
|
||
| child.wait("validation-delay", Duration.ofSeconds(5)); | ||
|
|
||
| return child.step("validate-order", String.class, () -> prepared + " [validated]"); | ||
| }); | ||
|
|
||
| // Child context 2: Inventory check — step + wait + step | ||
| var inventoryFuture = context.runInChildContextAsync("inventory-check", String.class, child -> { | ||
| var stock = child.step("check-stock", String.class, () -> "Stock available for " + name); | ||
| context.getLogger().info("Stock checked, waiting for confirmation"); | ||
|
|
||
| child.wait("confirmation-delay", Duration.ofSeconds(3)); | ||
|
|
||
| return child.step("confirm-inventory", String.class, () -> stock + " [confirmed]"); | ||
| }); | ||
|
|
||
| // Child context 3: Shipping estimate — nests a child context inside it | ||
| var shippingFuture = context.runInChildContextAsync("shipping-estimate", String.class, child -> { | ||
| var baseRate = child.step("calculate-base-rate", String.class, () -> "Base rate for " + name); | ||
|
|
||
| // Nested child context: calculate regional adjustment | ||
| var adjustment = child.runInChildContext( | ||
| "regional-adjustment", | ||
| String.class, | ||
| nested -> nested.step("lookup-region", String.class, () -> baseRate + " + regional adjustment")); | ||
|
|
||
| return child.step("finalize-shipping", String.class, () -> adjustment + " [shipping ready]"); | ||
| }); | ||
|
|
||
| // Collect all results using allOf | ||
| context.getLogger().info("Waiting for all child contexts to complete"); | ||
| var results = DurableFuture.allOf(orderFuture, inventoryFuture, shippingFuture); | ||
|
|
||
| // Combine into summary | ||
| var summary = String.join(" | ", results); | ||
| context.getLogger().info("All child contexts complete: {}", summary); | ||
|
|
||
| return summary; | ||
| } | ||
| } |
55 changes: 55 additions & 0 deletions
55
examples/src/test/java/com/amazonaws/lambda/durable/examples/ChildContextExampleTest.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,55 @@ | ||
| // Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. | ||
| // SPDX-License-Identifier: Apache-2.0 | ||
| package com.amazonaws.lambda.durable.examples; | ||
|
|
||
| import static org.junit.jupiter.api.Assertions.*; | ||
|
|
||
| import com.amazonaws.lambda.durable.model.ExecutionStatus; | ||
| import com.amazonaws.lambda.durable.testing.LocalDurableTestRunner; | ||
| import org.junit.jupiter.api.Test; | ||
|
|
||
| class ChildContextExampleTest { | ||
|
|
||
| @Test | ||
| void testChildContextExampleRunsToCompletion() { | ||
| var handler = new ChildContextExample(); | ||
| var runner = LocalDurableTestRunner.create(GreetingRequest.class, handler); | ||
|
|
||
| var input = new GreetingRequest("Alice"); | ||
| var result = runner.runUntilComplete(input); | ||
|
|
||
| assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus()); | ||
| assertEquals( | ||
| "Order for Alice [validated] | Stock available for Alice [confirmed] | Base rate for Alice + regional adjustment [shipping ready]", | ||
| result.getResult(String.class)); | ||
| } | ||
|
|
||
| @Test | ||
| void testChildContextExampleSuspendsOnFirstRun() { | ||
| var handler = new ChildContextExample(); | ||
| var runner = LocalDurableTestRunner.create(GreetingRequest.class, handler); | ||
|
|
||
| var input = new GreetingRequest("Bob"); | ||
|
|
||
| // First run should suspend due to wait operations inside child contexts | ||
| var result = runner.run(input); | ||
| assertEquals(ExecutionStatus.PENDING, result.getStatus()); | ||
| } | ||
|
|
||
| @Test | ||
| void testChildContextExampleReplay() { | ||
| var handler = new ChildContextExample(); | ||
| var runner = LocalDurableTestRunner.create(GreetingRequest.class, handler); | ||
|
|
||
| var input = new GreetingRequest("Alice"); | ||
|
|
||
| // First full execution | ||
| var result1 = runner.runUntilComplete(input); | ||
| assertEquals(ExecutionStatus.SUCCEEDED, result1.getStatus()); | ||
|
|
||
| // Replay — should return cached results | ||
| var result2 = runner.run(input); | ||
| assertEquals(ExecutionStatus.SUCCEEDED, result2.getStatus()); | ||
| assertEquals(result1.getResult(String.class), result2.getResult(String.class)); | ||
| } | ||
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.