diff --git a/docs/spec/waitForCondition.md b/docs/spec/waitForCondition.md new file mode 100644 index 00000000..99f0d730 --- /dev/null +++ b/docs/spec/waitForCondition.md @@ -0,0 +1,280 @@ +# Design: waitForCondition for Durable Execution Java SDK + +## Overview + +This design adds a `waitForCondition` operation to the Java Durable Execution SDK. The operation periodically checks a user-supplied condition function, using a configurable wait strategy to determine polling intervals and termination. It follows the same checkpoint-and-replay model as existing operations (`step`, `wait`, `invoke`) and mirrors the JavaScript SDK's `waitForCondition` implementation. + +## Architecture + +### How it works + +`waitForCondition` is implemented as a specialized step operation that uses the RETRY checkpoint action for polling iterations: + +1. User calls `ctx.waitForCondition(name, resultType, checkFunc, config)` +2. A `WaitForConditionOperation` is created with a unique operation ID +3. On first execution: + - Checkpoint START with subtype `WAIT_FOR_CONDITION` + - Execute the check function with `initialState` and a `StepContext` + - Call the wait strategy with the new state and attempt number + - If `stopPolling()`: checkpoint SUCCEED with the final state, return it + - If `continuePolling(delay)`: checkpoint RETRY with the state and delay, poll for READY, then loop + - If check function throws: checkpoint FAIL, propagate the error +4. On replay: + - SUCCEEDED: return cached result (skip re-execution) + - FAILED: re-throw cached error + - PENDING: wait for READY transition, then resume polling + - STARTED/READY: resume execution from current attempt and state + +This matches the JS SDK's behavior where each polling iteration is a RETRY on the same STEP operation. + +### New Classes + +``` +sdk/src/main/java/software/amazon/lambda/durable/ +├── WaitForConditionConfig.java # Config builder (waitStrategy, initialState, serDes) +├── WaitForConditionWaitStrategy.java # Functional interface: (T state, int attempt) → WaitForConditionDecision +├── WaitForConditionDecision.java # Sealed result: continuePolling(Duration) | stopPolling() +├── WaitStrategies.java # Factory with builder for common patterns +├── operation/ +│ └── WaitForConditionOperation.java # Operation implementation +├── model/ +│ └── OperationSubType.java # Add WAIT_FOR_CONDITION enum value +``` + +### Class Diagram + +``` +DurableContext + ├── waitForCondition(name, Class, checkFunc, config) → T + ├── waitForCondition(name, TypeToken, checkFunc, config) → T + ├── waitForConditionAsync(name, Class, checkFunc, config) → DurableFuture + └── waitForConditionAsync(name, TypeToken, checkFunc, config) → DurableFuture + │ + ▼ +WaitForConditionOperation extends BaseDurableOperation + ├── start() → checkpoint START, execute check loop + ├── replay(existing) → handle SUCCEEDED/FAILED/PENDING/STARTED/READY + ├── get() → block, deserialize result or throw + └── executeCheckLoop(currentState, attempt) + │ + ├── calls checkFunc(state, stepContext) → newState + ├── calls waitStrategy.evaluate(newState, attempt) → WaitForConditionDecision + │ ├── stopPolling() → checkpoint SUCCEED + │ └── continuePolling(delay) → checkpoint RETRY, poll, loop + └── on error → checkpoint FAIL +``` + +## Detailed Design + +### WaitForConditionWaitStrategy (Functional Interface) + +```java +@FunctionalInterface +public interface WaitForConditionWaitStrategy { + WaitForConditionDecision evaluate(T state, int attempt); +} +``` + +- `state`: the current state returned by the check function +- `attempt`: 1-based attempt number (first check is attempt 1) +- Returns a `WaitForConditionDecision` indicating whether to continue or stop + +### WaitForConditionDecision + +```java +public sealed interface WaitForConditionDecision { + record ContinuePolling(Duration delay) implements WaitForConditionDecision {} + record StopPolling() implements WaitForConditionDecision {} + + static WaitForConditionDecision continuePolling(Duration delay) { + return new ContinuePolling(delay); + } + + static WaitForConditionDecision stopPolling() { + return new StopPolling(); + } +} +``` + +Uses Java sealed interfaces for type safety. The `delay` in `ContinuePolling` must be >= 1 second (enforced at construction). + +### WaitStrategies (Factory) + +```java +public final class WaitStrategies { + public static Builder builder(Predicate shouldContinuePolling) { ... } + + public static class Builder { + // Defaults match JS SDK + private int maxAttempts = 60; + private Duration initialDelay = Duration.ofSeconds(5); + private Duration maxDelay = Duration.ofSeconds(300); + private double backoffRate = 1.5; + private JitterStrategy jitter = JitterStrategy.FULL; + + public Builder maxAttempts(int maxAttempts) { ... } + public Builder initialDelay(Duration initialDelay) { ... } + public Builder maxDelay(Duration maxDelay) { ... } + public Builder backoffRate(double backoffRate) { ... } + public Builder jitter(JitterStrategy jitter) { ... } + public WaitForConditionWaitStrategy build() { ... } + } +} +``` + +The built strategy: +1. Calls `shouldContinuePolling.test(state)` — if false, returns `stopPolling()` +2. Checks `attempt >= maxAttempts` — if true, throws `WaitForConditionException` +3. Calculates delay: `min(initialDelay * backoffRate^(attempt-1), maxDelay)` +4. Applies jitter using the existing `JitterStrategy` enum +5. Ensures delay >= 1 second, rounds to nearest integer second +6. Returns `continuePolling(delay)` + +### WaitForConditionConfig + +```java +public class WaitForConditionConfig { + private final WaitForConditionWaitStrategy waitStrategy; + private final T initialState; + private final SerDes serDes; // nullable, falls back to DurableConfig default + + public static Builder builder(WaitForConditionWaitStrategy waitStrategy, T initialState) { ... } + + public static class Builder { + public Builder serDes(SerDes serDes) { ... } + public WaitForConditionConfig build() { ... } + } +} +``` + +`waitStrategy` and `initialState` are required constructor parameters on the builder (not optional setters), so they can never be null. + +### WaitForConditionOperation + +Extends `BaseDurableOperation`. Key behaviors: + +- **start()**: Begins the check loop from `initialState` at attempt 0 +- **replay(existing)**: Handles all operation statuses (SUCCEEDED, FAILED, PENDING, STARTED, READY) +- **executeCheckLoop(state, attempt)**: Core polling logic + - Creates a `StepContext` for the check function + - Executes check function in the user executor (same pattern as `StepOperation`) + - Serializes/deserializes state through SerDes (round-trip, matching JS SDK) + - Calls wait strategy with deserialized state + - Checkpoints RETRY with `NextAttemptDelaySeconds` or SUCCEED/FAIL +- **get()**: Blocks on completion, deserializes result or throws exception + +All checkpoint updates use `OperationType.STEP` and `OperationSubType.WAIT_FOR_CONDITION`. + +### DurableContext API Methods + +```java +// Sync methods (block until condition met) +public T waitForCondition(String name, Class resultType, + Function checkFunc, WaitForConditionConfig config) + +public T waitForCondition(String name, TypeToken typeToken, + Function checkFunc, WaitForConditionConfig config) + +// Async methods (return DurableFuture immediately) +public DurableFuture waitForConditionAsync(String name, Class resultType, + Function checkFunc, WaitForConditionConfig config) + +public DurableFuture waitForConditionAsync(String name, TypeToken typeToken, + Function checkFunc, WaitForConditionConfig config) +``` + +The check function signature is `Function` rather than `BiFunction` because the current state is managed internally by the operation. The check function receives the current state via the operation's internal loop — the `StepContext` provides logging and attempt info. Wait, actually looking at the JS SDK more carefully, the check function does receive the current state as a parameter: `(state: T, context) => Promise`. So the Java signature should be `BiFunction`. + +Corrected signature: + +```java +public DurableFuture waitForConditionAsync(String name, TypeToken typeToken, + BiFunction checkFunc, WaitForConditionConfig config) +``` + +### OperationSubType Addition + +```java +public enum OperationSubType { + RUN_IN_CHILD_CONTEXT("RunInChildContext"), + MAP("Map"), + PARALLEL("Parallel"), + WAIT_FOR_CALLBACK("WaitForCallback"), + WAIT_FOR_CONDITION("WaitForCondition"); // NEW + ... +} +``` + +### Error Handling + +- **Check function throws**: Checkpoint FAIL with serialized error, wrap in `WaitForConditionException` +- **Max attempts exceeded**: `WaitStrategies`-built strategy throws `WaitForConditionException("waitForCondition exceeded maximum attempts (N)")` +- **Custom strategy throws**: Propagated as-is (checkpoint FAIL) +- **SerDes failure**: Wrapped in `SerDesException` (existing pattern) + +A new `WaitForConditionException` extends `DurableOperationException` for domain-specific errors. + +### Exception Class + +```java +public class WaitForConditionException extends DurableOperationException { + public WaitForConditionException(String message) { ... } + public WaitForConditionException(Operation operation) { ... } +} +``` + +## Testing Strategy + +### Unit Tests (sdk/src/test/) +- `WaitForConditionDecisionTest`: verify `continuePolling`/`stopPolling` factory methods +- `WaitStrategiesTest`: verify builder defaults, exponential backoff, jitter, max attempts +- `WaitForConditionConfigTest`: verify builder validation +- `WaitForConditionOperationTest`: verify start, replay, error handling + +### Integration Tests (sdk-integration-tests/) +- `WaitForConditionIntegrationTest`: end-to-end with `LocalDurableTestRunner`, verify replay across invocations + +### Example Tests (examples/) +- `WaitForConditionExample`: demonstrates polling with `WaitStrategies` factory +- `WaitForConditionExampleTest`: verifies example with `LocalDurableTestRunner` + +### Testing Framework +- JUnit 5 for all tests +- jqwik for property-based tests (already available in the project's test dependencies — if not, we'll use JUnit 5 parameterized tests with random generators) + +## Correctness Properties + +### Property 1: WaitForConditionWaitStrategy contract — stopPolling terminates +For any state `s` of type `T` and any attempt number `n >= 1`, if `waitStrategy.evaluate(s, n)` returns `StopPolling`, then `waitForCondition` completes with `s` as the result. + +**Validates: Requirements 1.5, 2.1** + +### Property 2: WaitStrategies factory — exponential backoff calculation +For any `initialDelay d`, `backoffRate r >= 1`, `maxDelay m >= d`, and attempt `n >= 1` with jitter=NONE, the delay equals `min(d * r^(n-1), m)` rounded to the nearest integer second, with a minimum of 1 second. + +**Validates: Requirements 2.3, 2.4** + +### Property 3: WaitStrategies factory — max attempts enforcement +For any `maxAttempts N >= 1` and any state where `shouldContinuePolling` returns true, calling the strategy with `attempt >= N` must throw `WaitForConditionException`. + +**Validates: Requirements 2.5** + +### Property 4: WaitForConditionConfig — required fields validation +Building a `WaitForConditionConfig` without a `waitStrategy` or with a null `initialState` must always throw an exception, regardless of other configuration. + +**Validates: Requirements 3.2** + +### Property 5: WaitForConditionWaitStrategy receives correct state and attempt +For any sequence of check function invocations, the wait strategy always receives the state returned by the most recent check function call and the correct 1-based attempt number. + +**Validates: Requirements 1.3, 2.1** + +### Property 6: Operation name validation +For any string that violates `ParameterValidator.validateOperationName` rules, calling `waitForCondition` or `waitForConditionAsync` with that name must throw. + +**Validates: Requirements 5.4** + +### Property 7: Jitter bounds +For any delay `d` and jitter strategy: NONE produces exactly `d`, FULL produces a value in `[0, d]` (clamped to min 1s), HALF produces a value in `[d/2, d]`. + +**Validates: Requirements 2.3** diff --git a/sdk/src/main/java/software/amazon/lambda/durable/DurableContext.java b/sdk/src/main/java/software/amazon/lambda/durable/DurableContext.java index 32c9d794..1f9e110a 100644 --- a/sdk/src/main/java/software/amazon/lambda/durable/DurableContext.java +++ b/sdk/src/main/java/software/amazon/lambda/durable/DurableContext.java @@ -11,6 +11,7 @@ import java.util.Objects; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BiConsumer; +import java.util.function.BiFunction; import java.util.function.Function; import java.util.function.Supplier; import org.slf4j.LoggerFactory; @@ -22,6 +23,7 @@ import software.amazon.lambda.durable.operation.ChildContextOperation; import software.amazon.lambda.durable.operation.InvokeOperation; import software.amazon.lambda.durable.operation.StepOperation; +import software.amazon.lambda.durable.operation.WaitForConditionOperation; import software.amazon.lambda.durable.operation.WaitOperation; import software.amazon.lambda.durable.validation.ParameterValidator; @@ -438,6 +440,56 @@ public DurableFuture waitForCallbackAsync( OperationSubType.WAIT_FOR_CALLBACK); } + // ========== waitForCondition methods ========== + + public T waitForCondition( + String name, + Class resultType, + BiFunction checkFunc, + WaitForConditionConfig config) { + return waitForConditionAsync(name, TypeToken.get(resultType), checkFunc, config) + .get(); + } + + public T waitForCondition( + String name, + TypeToken typeToken, + BiFunction checkFunc, + WaitForConditionConfig config) { + return waitForConditionAsync(name, typeToken, checkFunc, config).get(); + } + + public DurableFuture waitForConditionAsync( + String name, + Class resultType, + BiFunction checkFunc, + WaitForConditionConfig config) { + return waitForConditionAsync(name, TypeToken.get(resultType), checkFunc, config); + } + + public DurableFuture waitForConditionAsync( + String name, + TypeToken typeToken, + BiFunction checkFunc, + WaitForConditionConfig config) { + Objects.requireNonNull(config, "config cannot be null"); + Objects.requireNonNull(typeToken, "typeToken cannot be null"); + ParameterValidator.validateOperationName(name); + + if (config.serDes() == null) { + config = WaitForConditionConfig.builder(config.waitStrategy(), config.initialState()) + .serDes(getDurableConfig().getSerDes()) + .build(); + } + var operationId = nextOperationId(); + + var operation = new WaitForConditionOperation<>(operationId, name, checkFunc, typeToken, config, this); + + operation.execute(); + + return operation; + } + // =============== accessors ================ /** * Returns a logger with execution context information for replay-aware logging. diff --git a/sdk/src/main/java/software/amazon/lambda/durable/WaitForConditionConfig.java b/sdk/src/main/java/software/amazon/lambda/durable/WaitForConditionConfig.java new file mode 100644 index 00000000..f035cd5a --- /dev/null +++ b/sdk/src/main/java/software/amazon/lambda/durable/WaitForConditionConfig.java @@ -0,0 +1,87 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 +package software.amazon.lambda.durable; + +import software.amazon.lambda.durable.serde.SerDes; + +/** + * Configuration for {@code waitForCondition} operations. + * + *

Bundles the wait strategy, initial state, and optional SerDes for a waitForCondition call. Use + * {@link #builder(WaitForConditionWaitStrategy, Object)} to create instances. + * + * @param the type of state being polled + */ +public class WaitForConditionConfig { + private final WaitForConditionWaitStrategy waitStrategy; + private final T initialState; + private final SerDes serDes; + + private WaitForConditionConfig(Builder builder) { + this.waitStrategy = builder.waitStrategy; + this.initialState = builder.initialState; + this.serDes = builder.serDes; + } + + /** Returns the wait strategy that controls polling behavior. */ + public WaitForConditionWaitStrategy waitStrategy() { + return waitStrategy; + } + + /** Returns the initial state passed to the first check function invocation. */ + public T initialState() { + return initialState; + } + + /** Returns the custom serializer, or null if not specified (uses default SerDes). */ + public SerDes serDes() { + return serDes; + } + + /** + * Creates a new builder with the required wait strategy and initial state. + * + * @param waitStrategy the strategy controlling polling intervals and termination + * @param initialState the initial state for the first check invocation + * @param the type of state being polled + * @return a new builder instance + * @throws IllegalArgumentException if waitStrategy or initialState is null + */ + public static Builder builder(WaitForConditionWaitStrategy waitStrategy, T initialState) { + if (waitStrategy == null) { + throw new IllegalArgumentException("waitStrategy must not be null"); + } + if (initialState == null) { + throw new IllegalArgumentException("initialState must not be null"); + } + return new Builder<>(waitStrategy, initialState); + } + + public static class Builder { + private final WaitForConditionWaitStrategy waitStrategy; + private final T initialState; + private SerDes serDes; + + private Builder(WaitForConditionWaitStrategy waitStrategy, T initialState) { + this.waitStrategy = waitStrategy; + this.initialState = initialState; + } + + /** + * Sets a custom serializer for the waitForCondition operation. + * + *

If not specified, the operation will use the default SerDes configured for the handler. + * + * @param serDes the custom serializer to use, or null to use the default + * @return this builder for method chaining + */ + public Builder serDes(SerDes serDes) { + this.serDes = serDes; + return this; + } + + public WaitForConditionConfig build() { + return new WaitForConditionConfig<>(this); + } + } +} diff --git a/sdk/src/main/java/software/amazon/lambda/durable/WaitForConditionDecision.java b/sdk/src/main/java/software/amazon/lambda/durable/WaitForConditionDecision.java new file mode 100644 index 00000000..88cf304f --- /dev/null +++ b/sdk/src/main/java/software/amazon/lambda/durable/WaitForConditionDecision.java @@ -0,0 +1,39 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 +package software.amazon.lambda.durable; + +import java.time.Duration; + +/** + * Represents the decision made by a {@link WaitForConditionWaitStrategy} after evaluating the current state. + * + *

If {@code shouldContinue} is true, {@code delay} specifies how long to wait before the next poll. If + * {@code shouldContinue} is false, polling stops and the current state becomes the result. + * + * @param shouldContinue true if polling should continue, false if the condition has been met + * @param delay the duration to wait before the next poll (null when shouldContinue is false) + */ +public record WaitForConditionDecision(boolean shouldContinue, Duration delay) { + + /** + * Creates a decision to continue polling after the specified delay. + * + * @param delay the duration to wait before the next poll, must be at least 1 second + * @return a continue-polling decision + */ + public static WaitForConditionDecision continuePolling(Duration delay) { + if (delay == null) { + throw new IllegalArgumentException("delay cannot be null"); + } + return new WaitForConditionDecision(true, delay); + } + + /** + * Creates a decision to stop polling. + * + * @return a stop-polling decision + */ + public static WaitForConditionDecision stopPolling() { + return new WaitForConditionDecision(false, null); + } +} diff --git a/sdk/src/main/java/software/amazon/lambda/durable/WaitForConditionWaitStrategy.java b/sdk/src/main/java/software/amazon/lambda/durable/WaitForConditionWaitStrategy.java new file mode 100644 index 00000000..025e0e98 --- /dev/null +++ b/sdk/src/main/java/software/amazon/lambda/durable/WaitForConditionWaitStrategy.java @@ -0,0 +1,26 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 +package software.amazon.lambda.durable; + +/** + * Strategy that determines whether to continue or stop polling in a {@code waitForCondition} operation. + * + *

Implementations evaluate the current state and attempt number to decide whether to continue polling (with a + * specified delay) or stop polling because the condition has been met. + * + * @param the type of state being polled + * @see WaitForConditionDecision + * @see WaitStrategies + */ +@FunctionalInterface +public interface WaitForConditionWaitStrategy { + + /** + * Evaluates the current state and attempt number to decide whether to continue or stop polling. + * + * @param state the current state returned by the check function + * @param attempt the 1-based attempt number (first check is attempt 1) + * @return a {@link WaitForConditionDecision} indicating whether to continue or stop polling + */ + WaitForConditionDecision evaluate(T state, int attempt); +} diff --git a/sdk/src/main/java/software/amazon/lambda/durable/WaitStrategies.java b/sdk/src/main/java/software/amazon/lambda/durable/WaitStrategies.java new file mode 100644 index 00000000..27afc462 --- /dev/null +++ b/sdk/src/main/java/software/amazon/lambda/durable/WaitStrategies.java @@ -0,0 +1,124 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 +package software.amazon.lambda.durable; + +import java.time.Duration; +import java.util.function.Predicate; +import software.amazon.lambda.durable.exception.WaitForConditionException; +import software.amazon.lambda.durable.retry.JitterStrategy; +import software.amazon.lambda.durable.validation.ParameterValidator; + +/** + * Factory class for creating common {@link WaitForConditionWaitStrategy} implementations. + * + *

Provides a builder for creating strategies with exponential backoff, jitter, and max attempts. Default values + * match the JavaScript SDK: maxAttempts=60, initialDelay=5s, maxDelay=300s, backoffRate=1.5, jitter=FULL. + */ +public final class WaitStrategies { + + private WaitStrategies() { + // Utility class - prevent instantiation + } + + /** + * Creates a builder for a wait strategy that uses a predicate to determine when to stop polling. + * + * @param shouldContinuePolling predicate that returns true if polling should continue + * @param the type of state being polled + * @return a new builder + */ + public static Builder builder(Predicate shouldContinuePolling) { + if (shouldContinuePolling == null) { + throw new IllegalArgumentException("shouldContinuePolling cannot be null"); + } + return new Builder<>(shouldContinuePolling); + } + + public static class Builder { + private final Predicate shouldContinuePolling; // Function to determine if polling should continue + private int maxAttempts = 60; // Maximum number of attempts + private Duration initialDelay = Duration.ofSeconds(5); // Initial delay before first retry + private Duration maxDelay = Duration.ofSeconds(300); // Maximum delay between retries + private double backoffRate = 1.5; // Multiplier for each subsequent retry + private JitterStrategy jitter = JitterStrategy.FULL; // Jitter strategy to apply to retry delays + + private Builder(Predicate shouldContinuePolling) { + this.shouldContinuePolling = shouldContinuePolling; + } + + public Builder maxAttempts(int maxAttempts) { + this.maxAttempts = maxAttempts; + return this; + } + + public Builder initialDelay(Duration initialDelay) { + this.initialDelay = initialDelay; + return this; + } + + public Builder maxDelay(Duration maxDelay) { + this.maxDelay = maxDelay; + return this; + } + + public Builder backoffRate(double backoffRate) { + this.backoffRate = backoffRate; + return this; + } + + public Builder jitter(JitterStrategy jitter) { + this.jitter = jitter; + return this; + } + + /** + * Builds the wait strategy. + * + * @return a {@link WaitForConditionWaitStrategy} implementing exponential backoff with jitter + */ + public WaitForConditionWaitStrategy build() { + ParameterValidator.validatePositiveInteger(maxAttempts, "maxAttempts"); + ParameterValidator.validateDuration(initialDelay, "initialDelay"); + ParameterValidator.validateDuration(maxDelay, "maxDelay"); + if (backoffRate < 1.0) { + throw new IllegalArgumentException("backoffRate must be >= 1.0, got: " + backoffRate); + } + if (jitter == null) { + throw new IllegalArgumentException("jitter cannot be null"); + } + + return (state, attempt) -> { + // Check if condition is met + if (!shouldContinuePolling.test(state)) { + return WaitForConditionDecision.stopPolling(); + } + + // Check if we've exceeded max attempts + if (attempt >= maxAttempts) { + throw new WaitForConditionException( + "waitForCondition exceeded maximum attempts (" + maxAttempts + ")"); + } + + // Calculate delay with exponential backoff + var initialDelaySeconds = initialDelay.toSeconds(); + var maxDelaySeconds = maxDelay.toSeconds(); + + double baseDelay = Math.min(initialDelaySeconds * Math.pow(backoffRate, attempt - 1), maxDelaySeconds); + + // Apply jitter + double delayWithJitter = + switch (jitter) { + case NONE -> baseDelay; + case FULL -> Math.random() * baseDelay; // Random between 0 and delay + case HALF -> + baseDelay / 2 + Math.random() * (baseDelay / 2); // Random between delay/2 and delay + }; + + // Ensure delay is an integer >= 1 + long finalDelaySeconds = Math.max(1, Math.round(delayWithJitter)); + + return WaitForConditionDecision.continuePolling(Duration.ofSeconds(finalDelaySeconds)); + }; + } + } +} diff --git a/sdk/src/main/java/software/amazon/lambda/durable/exception/WaitForConditionException.java b/sdk/src/main/java/software/amazon/lambda/durable/exception/WaitForConditionException.java new file mode 100644 index 00000000..bff29bf3 --- /dev/null +++ b/sdk/src/main/java/software/amazon/lambda/durable/exception/WaitForConditionException.java @@ -0,0 +1,24 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 +package software.amazon.lambda.durable.exception; + +import software.amazon.awssdk.services.lambda.model.Operation; + +/** + * Exception thrown when a {@code waitForCondition} operation fails. + * + *

This can occur when the maximum number of polling attempts is exceeded, or when the check function throws an + * error. + */ +public class WaitForConditionException extends DurableOperationException { + + public WaitForConditionException(String message) { + super(null, null, message); + } + + public WaitForConditionException(Operation operation) { + super( + operation, + operation.stepDetails() != null ? operation.stepDetails().error() : null); + } +} diff --git a/sdk/src/main/java/software/amazon/lambda/durable/model/OperationSubType.java b/sdk/src/main/java/software/amazon/lambda/durable/model/OperationSubType.java index 9e778ef0..4578a1fc 100644 --- a/sdk/src/main/java/software/amazon/lambda/durable/model/OperationSubType.java +++ b/sdk/src/main/java/software/amazon/lambda/durable/model/OperationSubType.java @@ -12,7 +12,8 @@ public enum OperationSubType { RUN_IN_CHILD_CONTEXT("RunInChildContext"), MAP("Map"), PARALLEL("Parallel"), - WAIT_FOR_CALLBACK("WaitForCallback"); + WAIT_FOR_CALLBACK("WaitForCallback"), + WAIT_FOR_CONDITION("WaitForCondition"); private final String value; diff --git a/sdk/src/main/java/software/amazon/lambda/durable/operation/ChildContextOperation.java b/sdk/src/main/java/software/amazon/lambda/durable/operation/ChildContextOperation.java index 95c710de..dfbeeacf 100644 --- a/sdk/src/main/java/software/amazon/lambda/durable/operation/ChildContextOperation.java +++ b/sdk/src/main/java/software/amazon/lambda/durable/operation/ChildContextOperation.java @@ -210,6 +210,7 @@ public T get() { case MAP -> throw new ChildContextFailedException(op); case PARALLEL -> throw new ChildContextFailedException(op); case RUN_IN_CHILD_CONTEXT -> throw new ChildContextFailedException(op); + case WAIT_FOR_CONDITION -> throw new ChildContextFailedException(op); }; } } diff --git a/sdk/src/main/java/software/amazon/lambda/durable/operation/WaitForConditionOperation.java b/sdk/src/main/java/software/amazon/lambda/durable/operation/WaitForConditionOperation.java new file mode 100644 index 00000000..d62595b0 --- /dev/null +++ b/sdk/src/main/java/software/amazon/lambda/durable/operation/WaitForConditionOperation.java @@ -0,0 +1,215 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 +package software.amazon.lambda.durable.operation; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.function.BiFunction; +import software.amazon.awssdk.services.lambda.model.Operation; +import software.amazon.awssdk.services.lambda.model.OperationAction; +import software.amazon.awssdk.services.lambda.model.OperationStatus; +import software.amazon.awssdk.services.lambda.model.OperationType; +import software.amazon.awssdk.services.lambda.model.OperationUpdate; +import software.amazon.awssdk.services.lambda.model.StepOptions; +import software.amazon.lambda.durable.DurableContext; +import software.amazon.lambda.durable.StepContext; +import software.amazon.lambda.durable.TypeToken; +import software.amazon.lambda.durable.WaitForConditionConfig; +import software.amazon.lambda.durable.WaitForConditionDecision; +import software.amazon.lambda.durable.exception.DurableOperationException; +import software.amazon.lambda.durable.exception.UnrecoverableDurableExecutionException; +import software.amazon.lambda.durable.exception.WaitForConditionException; +import software.amazon.lambda.durable.execution.SuspendExecutionException; +import software.amazon.lambda.durable.execution.ThreadContext; +import software.amazon.lambda.durable.execution.ThreadType; +import software.amazon.lambda.durable.model.OperationSubType; +import software.amazon.lambda.durable.util.ExceptionHelper; + +/** + * Durable operation that periodically checks a user-supplied condition function, using a configurable wait strategy to + * determine polling intervals and termination. + * + *

Uses {@link OperationType#STEP} with {@link OperationSubType#WAIT_FOR_CONDITION} subtype. Each polling iteration + * is checkpointed as a RETRY on the same STEP operation. + * + * @param the type of state being polled + */ +public class WaitForConditionOperation extends BaseDurableOperation { + + private final BiFunction checkFunc; + private final WaitForConditionConfig config; + private final ExecutorService userExecutor; + + public WaitForConditionOperation( + String operationId, + String name, + BiFunction checkFunc, + TypeToken resultTypeToken, + WaitForConditionConfig config, + DurableContext durableContext) { + super(operationId, name, OperationType.STEP, resultTypeToken, config.serDes(), durableContext); + + this.checkFunc = checkFunc; + this.config = config; + this.userExecutor = durableContext.getDurableConfig().getExecutorService(); + } + + @Override + protected void start() { + executeCheckLogic(config.initialState(), 0); + } + + @Override + protected void replay(Operation existing) { + switch (existing.status()) { + case SUCCEEDED, FAILED -> markAlreadyCompleted(); + case PENDING -> pollReadyAndResumeCheckLoop(existing); + case STARTED, READY -> resumeCheckLoop(existing); + default -> + terminateExecutionWithIllegalDurableOperationException( + "Unexpected waitForCondition status: " + existing.status()); + } + } + + @Override + public T get() { + var op = waitForOperationCompletion(); + + if (op.status() == OperationStatus.SUCCEEDED) { + var stepDetails = op.stepDetails(); + var result = (stepDetails != null) ? stepDetails.result() : null; + return deserializeResult(result); + } else { + var errorObject = op.stepDetails().error(); + + // Attempt to reconstruct and throw the original exception + Throwable original = deserializeException(errorObject); + if (original != null) { + ExceptionHelper.sneakyThrow(original); + } + // Fallback: wrap in WaitForConditionException + throw new WaitForConditionException(op); + } + } + + private void resumeCheckLoop(Operation existing) { + var stepDetails = existing.stepDetails(); + int attempt = (stepDetails != null && stepDetails.attempt() != null) ? stepDetails.attempt() : 0; + var checkpointData = stepDetails != null ? stepDetails.result() : null; + T currentState; + if (checkpointData != null) { + try { + currentState = deserializeResult(checkpointData); + } catch (Exception e) { + currentState = config.initialState(); + } + } else { + currentState = config.initialState(); + } + executeCheckLogic(currentState, attempt); + } + + private CompletableFuture pollReadyAndResumeCheckLoop(Operation existing) { + return pollForOperationUpdates() + .thenCompose(op -> op.status() == OperationStatus.READY + ? CompletableFuture.completedFuture(op) + : pollForOperationUpdates()) + .thenRun(() -> resumeCheckLoop(existing)); + } + + private void executeCheckLogic(T currentState, int attempt) { + var threadId = getThreadId(); + + // Register thread as active BEFORE executor runs + registerActiveThread(threadId); + + CompletableFuture.runAsync( + () -> { + setCurrentThreadContext(new ThreadContext(threadId, ThreadType.STEP)); + + try (var stepContext = getContext().createStepContext(getOperationId(), getName(), attempt)) { + try { + // Checkpoint START if not already started + var existing = getOperation(); + if (existing == null || existing.status() != OperationStatus.STARTED) { + var startUpdate = OperationUpdate.builder() + .action(OperationAction.START) + .subType(OperationSubType.WAIT_FOR_CONDITION.getValue()); + sendOperationUpdateAsync(startUpdate); + } + + // Execute check function in user executor + T newState = checkFunc.apply(currentState, stepContext); + + // SerDes round-trip (matching JS SDK behavior) + var serializedState = serializeResult(newState); + T deserializedState = deserializeResult(serializedState); + + // Evaluate wait strategy (1-based attempt) + var decision = config.waitStrategy().evaluate(deserializedState, attempt + 1); + + handleDecision(decision, serializedState, attempt); + } catch (Throwable e) { + handleCheckFailure(e); + } finally { + try { + deregisterActiveThread(threadId); + } catch (SuspendExecutionException e) { + // Expected when this is the last active thread + } + } + } + }, + userExecutor); + } + + private void handleDecision(WaitForConditionDecision decision, String serializedState, int attempt) { + if (!decision.shouldContinue()) { + // Condition met — checkpoint SUCCEED + var successUpdate = OperationUpdate.builder() + .action(OperationAction.SUCCEED) + .subType(OperationSubType.WAIT_FOR_CONDITION.getValue()) + .payload(serializedState); + sendOperationUpdate(successUpdate); + } else { + // Checkpoint RETRY with delay + var retryUpdate = OperationUpdate.builder() + .action(OperationAction.RETRY) + .subType(OperationSubType.WAIT_FOR_CONDITION.getValue()) + .payload(serializedState) + .stepOptions(StepOptions.builder() + .nextAttemptDelaySeconds( + Math.toIntExact(decision.delay().toSeconds())) + .build()); + sendOperationUpdate(retryUpdate); + + // Poll for READY, then continue the loop + pollForOperationUpdates() + .thenCompose(op -> op.status() == OperationStatus.READY + ? CompletableFuture.completedFuture(op) + : pollForOperationUpdates()) + .thenRun(() -> executeCheckLogic(deserializeResult(serializedState), attempt + 1)); + } + } + + private void handleCheckFailure(Throwable exception) { + exception = ExceptionHelper.unwrapCompletableFuture(exception); + if (exception instanceof SuspendExecutionException) { + ExceptionHelper.sneakyThrow(exception); + } + if (exception instanceof UnrecoverableDurableExecutionException unrecoverable) { + terminateExecution(unrecoverable); + } + + final var errorObject = (exception instanceof DurableOperationException durableOpEx) + ? durableOpEx.getErrorObject() + : serializeException(exception); + + // Checkpoint FAIL + var failUpdate = OperationUpdate.builder() + .action(OperationAction.FAIL) + .subType(OperationSubType.WAIT_FOR_CONDITION.getValue()) + .error(errorObject); + sendOperationUpdate(failUpdate); + } +}