generated from amazon-archives/__template_Apache-2.0
-
Notifications
You must be signed in to change notification settings - Fork 6
Add waitForCondition Operation #195
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
nvasiu
wants to merge
2
commits into
main
Choose a base branch
from
wait_for_condition
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
Show all changes
2 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,280 @@ | ||
| # Design: waitForCondition for Durable Execution Java SDK | ||
|
|
||
| ## Overview | ||
|
|
||
| This design adds a `waitForCondition` operation to the Java Durable Execution SDK. The operation periodically checks a user-supplied condition function, using a configurable wait strategy to determine polling intervals and termination. It follows the same checkpoint-and-replay model as existing operations (`step`, `wait`, `invoke`) and mirrors the JavaScript SDK's `waitForCondition` implementation. | ||
|
|
||
| ## Architecture | ||
|
|
||
| ### How it works | ||
|
|
||
| `waitForCondition` is implemented as a specialized step operation that uses the RETRY checkpoint action for polling iterations: | ||
|
|
||
| 1. User calls `ctx.waitForCondition(name, resultType, checkFunc, config)` | ||
| 2. A `WaitForConditionOperation` is created with a unique operation ID | ||
| 3. On first execution: | ||
| - Checkpoint START with subtype `WAIT_FOR_CONDITION` | ||
| - Execute the check function with `initialState` and a `StepContext` | ||
| - Call the wait strategy with the new state and attempt number | ||
| - If `stopPolling()`: checkpoint SUCCEED with the final state, return it | ||
| - If `continuePolling(delay)`: checkpoint RETRY with the state and delay, poll for READY, then loop | ||
| - If check function throws: checkpoint FAIL, propagate the error | ||
| 4. On replay: | ||
| - SUCCEEDED: return cached result (skip re-execution) | ||
| - FAILED: re-throw cached error | ||
| - PENDING: wait for READY transition, then resume polling | ||
| - STARTED/READY: resume execution from current attempt and state | ||
|
|
||
| This matches the JS SDK's behavior where each polling iteration is a RETRY on the same STEP operation. | ||
|
|
||
| ### New Classes | ||
|
|
||
| ``` | ||
| sdk/src/main/java/software/amazon/lambda/durable/ | ||
| ├── WaitForConditionConfig.java # Config builder (waitStrategy, initialState, serDes) | ||
| ├── WaitForConditionWaitStrategy.java # Functional interface: (T state, int attempt) → WaitForConditionDecision | ||
| ├── WaitForConditionDecision.java # Sealed result: continuePolling(Duration) | stopPolling() | ||
| ├── WaitStrategies.java # Factory with builder for common patterns | ||
| ├── operation/ | ||
| │ └── WaitForConditionOperation.java # Operation implementation | ||
| ├── model/ | ||
| │ └── OperationSubType.java # Add WAIT_FOR_CONDITION enum value | ||
| ``` | ||
|
|
||
| ### Class Diagram | ||
|
|
||
| ``` | ||
| DurableContext | ||
| ├── waitForCondition(name, Class<T>, checkFunc, config) → T | ||
| ├── waitForCondition(name, TypeToken<T>, checkFunc, config) → T | ||
| ├── waitForConditionAsync(name, Class<T>, checkFunc, config) → DurableFuture<T> | ||
| └── waitForConditionAsync(name, TypeToken<T>, checkFunc, config) → DurableFuture<T> | ||
| │ | ||
| ▼ | ||
| WaitForConditionOperation<T> extends BaseDurableOperation<T> | ||
| ├── start() → checkpoint START, execute check loop | ||
| ├── replay(existing) → handle SUCCEEDED/FAILED/PENDING/STARTED/READY | ||
| ├── get() → block, deserialize result or throw | ||
| └── executeCheckLoop(currentState, attempt) | ||
| │ | ||
| ├── calls checkFunc(state, stepContext) → newState | ||
| ├── calls waitStrategy.evaluate(newState, attempt) → WaitForConditionDecision | ||
| │ ├── stopPolling() → checkpoint SUCCEED | ||
| │ └── continuePolling(delay) → checkpoint RETRY, poll, loop | ||
| └── on error → checkpoint FAIL | ||
| ``` | ||
|
|
||
| ## Detailed Design | ||
|
|
||
| ### WaitForConditionWaitStrategy<T> (Functional Interface) | ||
|
|
||
| ```java | ||
| @FunctionalInterface | ||
| public interface WaitForConditionWaitStrategy<T> { | ||
| WaitForConditionDecision evaluate(T state, int attempt); | ||
| } | ||
| ``` | ||
|
|
||
| - `state`: the current state returned by the check function | ||
| - `attempt`: 1-based attempt number (first check is attempt 1) | ||
| - Returns a `WaitForConditionDecision` indicating whether to continue or stop | ||
|
|
||
| ### WaitForConditionDecision | ||
|
|
||
| ```java | ||
| public sealed interface WaitForConditionDecision { | ||
| record ContinuePolling(Duration delay) implements WaitForConditionDecision {} | ||
| record StopPolling() implements WaitForConditionDecision {} | ||
|
|
||
| static WaitForConditionDecision continuePolling(Duration delay) { | ||
| return new ContinuePolling(delay); | ||
| } | ||
|
|
||
| static WaitForConditionDecision stopPolling() { | ||
| return new StopPolling(); | ||
| } | ||
| } | ||
| ``` | ||
|
|
||
| Uses Java sealed interfaces for type safety. The `delay` in `ContinuePolling` must be >= 1 second (enforced at construction). | ||
|
|
||
| ### WaitStrategies (Factory) | ||
|
|
||
| ```java | ||
| public final class WaitStrategies { | ||
| public static <T> Builder<T> builder(Predicate<T> shouldContinuePolling) { ... } | ||
|
|
||
| public static class Builder<T> { | ||
| // Defaults match JS SDK | ||
| private int maxAttempts = 60; | ||
| private Duration initialDelay = Duration.ofSeconds(5); | ||
| private Duration maxDelay = Duration.ofSeconds(300); | ||
| private double backoffRate = 1.5; | ||
| private JitterStrategy jitter = JitterStrategy.FULL; | ||
|
|
||
| public Builder<T> maxAttempts(int maxAttempts) { ... } | ||
| public Builder<T> initialDelay(Duration initialDelay) { ... } | ||
| public Builder<T> maxDelay(Duration maxDelay) { ... } | ||
| public Builder<T> backoffRate(double backoffRate) { ... } | ||
| public Builder<T> jitter(JitterStrategy jitter) { ... } | ||
| public WaitForConditionWaitStrategy<T> build() { ... } | ||
| } | ||
| } | ||
| ``` | ||
|
|
||
| The built strategy: | ||
| 1. Calls `shouldContinuePolling.test(state)` — if false, returns `stopPolling()` | ||
| 2. Checks `attempt >= maxAttempts` — if true, throws `WaitForConditionException` | ||
| 3. Calculates delay: `min(initialDelay * backoffRate^(attempt-1), maxDelay)` | ||
| 4. Applies jitter using the existing `JitterStrategy` enum | ||
| 5. Ensures delay >= 1 second, rounds to nearest integer second | ||
| 6. Returns `continuePolling(delay)` | ||
|
|
||
| ### WaitForConditionConfig<T> | ||
|
|
||
| ```java | ||
| public class WaitForConditionConfig<T> { | ||
| private final WaitForConditionWaitStrategy<T> waitStrategy; | ||
| private final T initialState; | ||
| private final SerDes serDes; // nullable, falls back to DurableConfig default | ||
|
|
||
| public static <T> Builder<T> builder(WaitForConditionWaitStrategy<T> waitStrategy, T initialState) { ... } | ||
|
|
||
| public static class Builder<T> { | ||
| public Builder<T> serDes(SerDes serDes) { ... } | ||
| public WaitForConditionConfig<T> build() { ... } | ||
| } | ||
| } | ||
| ``` | ||
|
|
||
| `waitStrategy` and `initialState` are required constructor parameters on the builder (not optional setters), so they can never be null. | ||
|
|
||
| ### WaitForConditionOperation<T> | ||
|
|
||
| Extends `BaseDurableOperation<T>`. Key behaviors: | ||
|
|
||
| - **start()**: Begins the check loop from `initialState` at attempt 0 | ||
| - **replay(existing)**: Handles all operation statuses (SUCCEEDED, FAILED, PENDING, STARTED, READY) | ||
| - **executeCheckLoop(state, attempt)**: Core polling logic | ||
| - Creates a `StepContext` for the check function | ||
| - Executes check function in the user executor (same pattern as `StepOperation`) | ||
| - Serializes/deserializes state through SerDes (round-trip, matching JS SDK) | ||
| - Calls wait strategy with deserialized state | ||
| - Checkpoints RETRY with `NextAttemptDelaySeconds` or SUCCEED/FAIL | ||
| - **get()**: Blocks on completion, deserializes result or throws exception | ||
|
|
||
| All checkpoint updates use `OperationType.STEP` and `OperationSubType.WAIT_FOR_CONDITION`. | ||
|
|
||
| ### DurableContext API Methods | ||
|
|
||
| ```java | ||
| // Sync methods (block until condition met) | ||
| public <T> T waitForCondition(String name, Class<T> resultType, | ||
| Function<StepContext, T> checkFunc, WaitForConditionConfig<T> config) | ||
|
|
||
| public <T> T waitForCondition(String name, TypeToken<T> typeToken, | ||
| Function<StepContext, T> checkFunc, WaitForConditionConfig<T> config) | ||
|
|
||
| // Async methods (return DurableFuture immediately) | ||
| public <T> DurableFuture<T> waitForConditionAsync(String name, Class<T> resultType, | ||
| Function<StepContext, T> checkFunc, WaitForConditionConfig<T> config) | ||
|
|
||
| public <T> DurableFuture<T> waitForConditionAsync(String name, TypeToken<T> typeToken, | ||
| Function<StepContext, T> checkFunc, WaitForConditionConfig<T> config) | ||
| ``` | ||
|
|
||
| The check function signature is `Function<StepContext, T>` rather than `BiFunction<T, StepContext, T>` because the current state is managed internally by the operation. The check function receives the current state via the operation's internal loop — the `StepContext` provides logging and attempt info. Wait, actually looking at the JS SDK more carefully, the check function does receive the current state as a parameter: `(state: T, context) => Promise<T>`. So the Java signature should be `BiFunction<T, StepContext, T>`. | ||
|
|
||
| Corrected signature: | ||
|
|
||
| ```java | ||
| public <T> DurableFuture<T> waitForConditionAsync(String name, TypeToken<T> typeToken, | ||
| BiFunction<T, StepContext, T> checkFunc, WaitForConditionConfig<T> config) | ||
| ``` | ||
|
|
||
| ### OperationSubType Addition | ||
|
|
||
| ```java | ||
| public enum OperationSubType { | ||
| RUN_IN_CHILD_CONTEXT("RunInChildContext"), | ||
| MAP("Map"), | ||
| PARALLEL("Parallel"), | ||
| WAIT_FOR_CALLBACK("WaitForCallback"), | ||
| WAIT_FOR_CONDITION("WaitForCondition"); // NEW | ||
| ... | ||
| } | ||
| ``` | ||
|
|
||
| ### Error Handling | ||
|
|
||
| - **Check function throws**: Checkpoint FAIL with serialized error, wrap in `WaitForConditionException` | ||
| - **Max attempts exceeded**: `WaitStrategies`-built strategy throws `WaitForConditionException("waitForCondition exceeded maximum attempts (N)")` | ||
| - **Custom strategy throws**: Propagated as-is (checkpoint FAIL) | ||
| - **SerDes failure**: Wrapped in `SerDesException` (existing pattern) | ||
|
|
||
| A new `WaitForConditionException` extends `DurableOperationException` for domain-specific errors. | ||
|
|
||
| ### Exception Class | ||
|
|
||
| ```java | ||
| public class WaitForConditionException extends DurableOperationException { | ||
| public WaitForConditionException(String message) { ... } | ||
| public WaitForConditionException(Operation operation) { ... } | ||
| } | ||
| ``` | ||
|
|
||
| ## Testing Strategy | ||
|
|
||
| ### Unit Tests (sdk/src/test/) | ||
| - `WaitForConditionDecisionTest`: verify `continuePolling`/`stopPolling` factory methods | ||
| - `WaitStrategiesTest`: verify builder defaults, exponential backoff, jitter, max attempts | ||
| - `WaitForConditionConfigTest`: verify builder validation | ||
| - `WaitForConditionOperationTest`: verify start, replay, error handling | ||
|
|
||
| ### Integration Tests (sdk-integration-tests/) | ||
| - `WaitForConditionIntegrationTest`: end-to-end with `LocalDurableTestRunner`, verify replay across invocations | ||
|
|
||
| ### Example Tests (examples/) | ||
| - `WaitForConditionExample`: demonstrates polling with `WaitStrategies` factory | ||
| - `WaitForConditionExampleTest`: verifies example with `LocalDurableTestRunner` | ||
|
|
||
| ### Testing Framework | ||
| - JUnit 5 for all tests | ||
| - jqwik for property-based tests (already available in the project's test dependencies — if not, we'll use JUnit 5 parameterized tests with random generators) | ||
|
|
||
| ## Correctness Properties | ||
|
|
||
| ### Property 1: WaitForConditionWaitStrategy contract — stopPolling terminates | ||
| For any state `s` of type `T` and any attempt number `n >= 1`, if `waitStrategy.evaluate(s, n)` returns `StopPolling`, then `waitForCondition` completes with `s` as the result. | ||
|
|
||
| **Validates: Requirements 1.5, 2.1** | ||
|
|
||
| ### Property 2: WaitStrategies factory — exponential backoff calculation | ||
| For any `initialDelay d`, `backoffRate r >= 1`, `maxDelay m >= d`, and attempt `n >= 1` with jitter=NONE, the delay equals `min(d * r^(n-1), m)` rounded to the nearest integer second, with a minimum of 1 second. | ||
|
|
||
| **Validates: Requirements 2.3, 2.4** | ||
|
|
||
| ### Property 3: WaitStrategies factory — max attempts enforcement | ||
| For any `maxAttempts N >= 1` and any state where `shouldContinuePolling` returns true, calling the strategy with `attempt >= N` must throw `WaitForConditionException`. | ||
|
|
||
| **Validates: Requirements 2.5** | ||
|
|
||
| ### Property 4: WaitForConditionConfig — required fields validation | ||
| Building a `WaitForConditionConfig` without a `waitStrategy` or with a null `initialState` must always throw an exception, regardless of other configuration. | ||
|
|
||
| **Validates: Requirements 3.2** | ||
|
|
||
| ### Property 5: WaitForConditionWaitStrategy receives correct state and attempt | ||
| For any sequence of check function invocations, the wait strategy always receives the state returned by the most recent check function call and the correct 1-based attempt number. | ||
|
|
||
| **Validates: Requirements 1.3, 2.1** | ||
|
|
||
| ### Property 6: Operation name validation | ||
| For any string that violates `ParameterValidator.validateOperationName` rules, calling `waitForCondition` or `waitForConditionAsync` with that name must throw. | ||
|
|
||
| **Validates: Requirements 5.4** | ||
|
|
||
| ### Property 7: Jitter bounds | ||
| For any delay `d` and jitter strategy: NONE produces exactly `d`, FULL produces a value in `[0, d]` (clamped to min 1s), HALF produces a value in `[d/2, d]`. | ||
|
|
||
| **Validates: Requirements 2.3** | ||
32 changes: 32 additions & 0 deletions
32
examples/src/main/java/software/amazon/lambda/durable/examples/WaitForConditionExample.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,32 @@ | ||
| // Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. | ||
| // SPDX-License-Identifier: Apache-2.0 | ||
| package software.amazon.lambda.durable.examples; | ||
|
|
||
| import java.time.Duration; | ||
| import software.amazon.lambda.durable.DurableContext; | ||
| import software.amazon.lambda.durable.DurableHandler; | ||
| import software.amazon.lambda.durable.WaitForConditionConfig; | ||
| import software.amazon.lambda.durable.WaitStrategies; | ||
| import software.amazon.lambda.durable.retry.JitterStrategy; | ||
|
|
||
| /** | ||
| * Example demonstrating the waitForCondition operation. | ||
| * | ||
| * <p>Polls a counter until it reaches the length of the input name, then returns the final count. | ||
| */ | ||
| public class WaitForConditionExample extends DurableHandler<GreetingRequest, Integer> { | ||
|
|
||
| @Override | ||
| public Integer handleRequest(GreetingRequest input, DurableContext context) { | ||
| var targetCount = input.getName().length(); | ||
|
|
||
| var strategy = WaitStrategies.<Integer>builder(state -> state < targetCount) | ||
| .initialDelay(Duration.ofSeconds(1)) | ||
| .jitter(JitterStrategy.NONE) | ||
| .build(); | ||
|
|
||
| var config = WaitForConditionConfig.<Integer>builder(strategy, 0).build(); | ||
|
|
||
| return context.waitForCondition("count-to-name-length", Integer.class, (state, stepCtx) -> state + 1, config); | ||
| } | ||
| } |
23 changes: 23 additions & 0 deletions
23
...es/src/test/java/software/amazon/lambda/durable/examples/WaitForConditionExampleTest.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,23 @@ | ||
| // Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. | ||
| // SPDX-License-Identifier: Apache-2.0 | ||
| package software.amazon.lambda.durable.examples; | ||
|
|
||
| import static org.junit.jupiter.api.Assertions.assertEquals; | ||
|
|
||
| import org.junit.jupiter.api.Test; | ||
| import software.amazon.lambda.durable.model.ExecutionStatus; | ||
| import software.amazon.lambda.durable.testing.LocalDurableTestRunner; | ||
|
|
||
| class WaitForConditionExampleTest { | ||
|
|
||
| @Test | ||
| void testWaitForConditionExample() { | ||
| var handler = new WaitForConditionExample(); | ||
| var runner = LocalDurableTestRunner.create(GreetingRequest.class, handler); | ||
|
|
||
| var result = runner.runUntilComplete(new GreetingRequest("Alice")); | ||
|
|
||
| assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus()); | ||
| assertEquals(5, result.getResult(Integer.class)); | ||
| } | ||
| } |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are they optional? If not, shouldn't they be the parameters of
waitForConditionmethod?