diff --git a/sdk/src/main/java/software/amazon/lambda/durable/CallbackConfig.java b/sdk/src/main/java/software/amazon/lambda/durable/CallbackConfig.java index fa21e118..3cb8d2fd 100644 --- a/sdk/src/main/java/software/amazon/lambda/durable/CallbackConfig.java +++ b/sdk/src/main/java/software/amazon/lambda/durable/CallbackConfig.java @@ -41,14 +41,17 @@ public SerDes serDes() { return serDes; } + /** Creates a new builder with default values. */ public static Builder builder() { return new Builder(null, null, null); } + /** Creates a new builder pre-populated with this config's values. */ public Builder toBuilder() { return new Builder(timeout, heartbeatTimeout, serDes); } + /** Builder for {@link CallbackConfig}. */ public static class Builder { private Duration timeout; private Duration heartbeatTimeout; @@ -60,12 +63,24 @@ private Builder(Duration timeout, Duration heartbeatTimeout, SerDes serDes) { this.serDes = serDes; } + /** + * Sets the maximum duration to wait for the callback to complete before timing out. + * + * @param timeout the timeout duration + * @return this builder for method chaining + */ public Builder timeout(Duration timeout) { ParameterValidator.validateOptionalDuration(timeout, "Callback timeout"); this.timeout = timeout; return this; } + /** + * Sets the maximum duration between heartbeats before the callback is considered failed. + * + * @param heartbeatTimeout the heartbeat timeout duration + * @return this builder for method chaining + */ public Builder heartbeatTimeout(Duration heartbeatTimeout) { ParameterValidator.validateOptionalDuration(heartbeatTimeout, "Heartbeat timeout"); this.heartbeatTimeout = heartbeatTimeout; @@ -87,6 +102,7 @@ public Builder serDes(SerDes serDes) { return this; } + /** Builds the {@link CallbackConfig} instance. */ public CallbackConfig build() { return new CallbackConfig(this); } diff --git a/sdk/src/main/java/software/amazon/lambda/durable/DurableContext.java b/sdk/src/main/java/software/amazon/lambda/durable/DurableContext.java index 5ae87280..cf027b8d 100644 --- a/sdk/src/main/java/software/amazon/lambda/durable/DurableContext.java +++ b/sdk/src/main/java/software/amazon/lambda/durable/DurableContext.java @@ -95,38 +95,102 @@ public StepContext createStepContext(String stepOperationId, String stepOperatio // ========== step methods ========== + /** + * Executes a durable step with the given name and blocks until it completes. + * + *

On first execution, runs {@code func} and checkpoints the result. On replay, returns the cached result without + * re-executing. + * + * @param the result type + * @param name the unique operation name within this context + * @param resultType the result class for deserialization + * @param func the function to execute, receiving a {@link StepContext} + * @return the step result + */ public T step(String name, Class resultType, Function func) { return step(name, TypeToken.get(resultType), func, StepConfig.builder().build()); } + /** + * Executes a durable step with the given name and configuration, blocking until it completes. + * + * @param the result type + * @param name the unique operation name within this context + * @param resultType the result class for deserialization + * @param func the function to execute, receiving a {@link StepContext} + * @param config the step configuration (retry strategy, semantics, custom SerDes) + * @return the step result + */ public T step(String name, Class resultType, Function func, StepConfig config) { // Simply delegate to stepAsync and block on the result return stepAsync(name, resultType, func, config).get(); } + /** + * Executes a durable step using a {@link TypeToken} for generic result types, blocking until it completes. + * + * @param the result type + * @param name the unique operation name within this context + * @param typeToken the type token for deserialization of generic types + * @param func the function to execute, receiving a {@link StepContext} + * @return the step result + */ public T step(String name, TypeToken typeToken, Function func) { return step(name, typeToken, func, StepConfig.builder().build()); } + /** + * Executes a durable step using a {@link TypeToken} and configuration, blocking until it completes. + * + * @param the result type + * @param name the unique operation name within this context + * @param typeToken the type token for deserialization of generic types + * @param func the function to execute, receiving a {@link StepContext} + * @param config the step configuration (retry strategy, semantics, custom SerDes) + * @return the step result + */ public T step(String name, TypeToken typeToken, Function func, StepConfig config) { // Simply delegate to stepAsync and block on the result return stepAsync(name, typeToken, func, config).get(); } + /** + * Asynchronously executes a durable step, returning a {@link DurableFuture} that can be composed or blocked on. + * + * @param the result type + * @param name the unique operation name within this context + * @param resultType the result class for deserialization + * @param func the function to execute, receiving a {@link StepContext} + * @return a future representing the step result + */ public DurableFuture stepAsync(String name, Class resultType, Function func) { return stepAsync( name, TypeToken.get(resultType), func, StepConfig.builder().build()); } + /** Asynchronously executes a durable step with custom configuration. */ public DurableFuture stepAsync( String name, Class resultType, Function func, StepConfig config) { return stepAsync(name, TypeToken.get(resultType), func, config); } + /** Asynchronously executes a durable step using a {@link TypeToken} for generic result types. */ public DurableFuture stepAsync(String name, TypeToken typeToken, Function func) { return stepAsync(name, typeToken, func, StepConfig.builder().build()); } + /** + * Asynchronously executes a durable step using a {@link TypeToken} and custom configuration. + * + *

This is the core stepAsync implementation. All other step/stepAsync overloads delegate here. + * + * @param the result type + * @param name the unique operation name within this context + * @param typeToken the type token for deserialization of generic types + * @param func the function to execute, receiving a {@link StepContext} + * @param config the step configuration (retry strategy, semantics, custom SerDes) + * @return a future representing the step result + */ public DurableFuture stepAsync( String name, TypeToken typeToken, Function func, StepConfig config) { Objects.requireNonNull(config, "config cannot be null"); @@ -148,6 +212,7 @@ public DurableFuture stepAsync( } /** @deprecated use the variants accepting StepContext instead */ + @Deprecated public T step(String name, Class resultType, Supplier func) { return stepAsync( name, @@ -158,50 +223,74 @@ public T step(String name, Class resultType, Supplier func) { } /** @deprecated use the variants accepting StepContext instead */ + @Deprecated public T step(String name, Class resultType, Supplier func, StepConfig config) { // Simply delegate to stepAsync and block on the result return stepAsync(name, TypeToken.get(resultType), func, config).get(); } /** @deprecated use the variants accepting StepContext instead */ + @Deprecated public T step(String name, TypeToken typeToken, Supplier func) { return stepAsync(name, typeToken, func, StepConfig.builder().build()).get(); } /** @deprecated use the variants accepting StepContext instead */ + @Deprecated public T step(String name, TypeToken typeToken, Supplier func, StepConfig config) { // Simply delegate to stepAsync and block on the result return stepAsync(name, typeToken, func, config).get(); } /** @deprecated use the variants accepting StepContext instead */ + @Deprecated public DurableFuture stepAsync(String name, Class resultType, Supplier func) { return stepAsync( name, TypeToken.get(resultType), func, StepConfig.builder().build()); } /** @deprecated use the variants accepting StepContext instead */ + @Deprecated public DurableFuture stepAsync(String name, Class resultType, Supplier func, StepConfig config) { return stepAsync(name, TypeToken.get(resultType), func, config); } /** @deprecated use the variants accepting StepContext instead */ + @Deprecated public DurableFuture stepAsync(String name, TypeToken typeToken, Supplier func) { return stepAsync(name, typeToken, func, StepConfig.builder().build()); } /** @deprecated use the variants accepting StepContext instead */ + @Deprecated public DurableFuture stepAsync(String name, TypeToken typeToken, Supplier func, StepConfig config) { return stepAsync(name, typeToken, stepContext -> func.get(), config); } // ========== wait methods ========== + /** + * Suspends execution for the specified duration without consuming compute resources. + * + *

On first execution, checkpoints a wait operation and suspends the Lambda. On replay after the duration has + * elapsed, returns immediately. + * + * @param name the unique operation name within this context + * @param duration the duration to wait + * @return always {@code null} + */ public Void wait(String name, Duration duration) { // Block (will throw SuspendExecutionException if there is no active thread) return waitAsync(name, duration).get(); } + /** + * Asynchronously suspends execution for the specified duration. + * + * @param name the unique operation name within this context + * @param duration the duration to wait + * @return a future that completes when the wait duration has elapsed + */ public DurableFuture waitAsync(String name, Duration duration) { ParameterValidator.validateDuration(duration, "Wait duration"); ParameterValidator.validateOperationName(name); @@ -218,6 +307,20 @@ public DurableFuture waitAsync(String name, Duration duration) { // ========== chained invoke methods ========== + /** + * Invokes another Lambda function by name and blocks until the result is available. + * + *

On first execution, checkpoints a chained invoke operation that triggers the target function. On replay, + * returns the cached result without re-invoking. + * + * @param the result type + * @param the payload type + * @param name the unique operation name within this context + * @param functionName the ARN or name of the Lambda function to invoke + * @param payload the input payload to send to the target function + * @param resultType the result class for deserialization + * @return the invocation result + */ public T invoke(String name, String functionName, U payload, Class resultType) { return invokeAsync( name, @@ -228,11 +331,13 @@ public T invoke(String name, String functionName, U payload, Class res .get(); } + /** Invokes another Lambda function with custom configuration, blocking until the result is available. */ public T invoke(String name, String functionName, U payload, Class resultType, InvokeConfig config) { return invokeAsync(name, functionName, payload, TypeToken.get(resultType), config) .get(); } + /** Invokes another Lambda function using a {@link TypeToken} for generic result types, blocking until complete. */ public T invoke(String name, String functionName, U payload, TypeToken typeToken) { return invokeAsync( name, @@ -243,15 +348,18 @@ public T invoke(String name, String functionName, U payload, TypeToken .get(); } + /** Invokes another Lambda function using a {@link TypeToken} and custom configuration, blocking until complete. */ public T invoke(String name, String functionName, U payload, TypeToken typeToken, InvokeConfig config) { return invokeAsync(name, functionName, payload, typeToken, config).get(); } + /** Asynchronously invokes another Lambda function with custom configuration. */ public DurableFuture invokeAsync( String name, String functionName, U payload, Class resultType, InvokeConfig config) { return invokeAsync(name, functionName, payload, TypeToken.get(resultType), config); } + /** Asynchronously invokes another Lambda function, returning a {@link DurableFuture}. */ public DurableFuture invokeAsync(String name, String functionName, U payload, Class resultType) { return invokeAsync( name, @@ -261,11 +369,26 @@ public DurableFuture invokeAsync(String name, String functionName, U p InvokeConfig.builder().build()); } + /** Asynchronously invokes another Lambda function using a {@link TypeToken} for generic result types. */ public DurableFuture invokeAsync(String name, String functionName, U payload, TypeToken resultType) { return invokeAsync( name, functionName, payload, resultType, InvokeConfig.builder().build()); } + /** + * Asynchronously invokes another Lambda function using a {@link TypeToken} and custom configuration. + * + *

This is the core invokeAsync implementation. All other invoke/invokeAsync overloads delegate here. + * + * @param the result type + * @param the payload type + * @param name the unique operation name within this context + * @param functionName the ARN or name of the Lambda function to invoke + * @param payload the input payload to send to the target function + * @param typeToken the type token for deserialization of generic result types + * @param config the invoke configuration (custom SerDes for result and payload) + * @return a future representing the invocation result + */ public DurableFuture invokeAsync( String name, String functionName, U payload, TypeToken typeToken, InvokeConfig config) { Objects.requireNonNull(config, "config cannot be null"); @@ -297,19 +420,34 @@ public DurableFuture invokeAsync( // ========== createCallback methods ========== + /** Creates a callback with custom configuration. */ public DurableCallbackFuture createCallback(String name, Class resultType, CallbackConfig config) { return createCallback(name, TypeToken.get(resultType), config); } + /** Creates a callback using a {@link TypeToken} for generic result types. */ public DurableCallbackFuture createCallback(String name, TypeToken typeToken) { return createCallback(name, typeToken, CallbackConfig.builder().build()); } + /** Creates a callback with default configuration. */ public DurableCallbackFuture createCallback(String name, Class resultType) { return createCallback( name, TypeToken.get(resultType), CallbackConfig.builder().build()); } + /** + * Creates a callback operation that suspends execution until an external system completes it. + * + *

This is the core createCallback implementation. Returns a {@link DurableCallbackFuture} containing a callback + * ID that external systems use to report completion via the Lambda Durable API. + * + * @param the result type + * @param name the unique operation name within this context + * @param typeToken the type token for deserialization of generic result types + * @param config the callback configuration (custom SerDes) + * @return a future containing the callback ID and eventual result + */ public DurableCallbackFuture createCallback(String name, TypeToken typeToken, CallbackConfig config) { ParameterValidator.validateOperationName(name); if (config.serDes() == null) { @@ -326,19 +464,36 @@ public DurableCallbackFuture createCallback(String name, TypeToken typ // ========== runInChildContext methods ========== + /** + * Runs a function in a child context, blocking until it completes. + * + *

Child contexts provide isolated operation ID namespaces, allowing nested workflows to be composed without ID + * collisions. On replay, the child context's operations are replayed independently. + * + * @param the result type + * @param name the unique operation name within this context + * @param resultType the result class for deserialization + * @param func the function to execute, receiving a child {@link DurableContext} + * @return the child context result + */ public T runInChildContext(String name, Class resultType, Function func) { return runInChildContextAsync(name, TypeToken.get(resultType), func).get(); } + /** + * Runs a function in a child context using a {@link TypeToken} for generic result types, blocking until complete. + */ public T runInChildContext(String name, TypeToken typeToken, Function func) { return runInChildContextAsync(name, typeToken, func).get(); } + /** Asynchronously runs a function in a child context, returning a {@link DurableFuture}. */ public DurableFuture runInChildContextAsync( String name, Class resultType, Function func) { return runInChildContextAsync(name, TypeToken.get(resultType), func); } + /** Asynchronously runs a function in a child context using a {@link TypeToken} for generic result types. */ public DurableFuture runInChildContextAsync( String name, TypeToken typeToken, Function func) { return runInChildContextAsync(name, typeToken, func, OperationSubType.RUN_IN_CHILD_CONTEXT); @@ -362,6 +517,19 @@ private DurableFuture runInChildContextAsync( } // ========= waitForCallback methods ============= + + /** + * Executes a submitter function and waits for an external callback, blocking until the callback completes. + * + *

Combines a step (to run the submitter) and a callback (to receive the external result) in a child context. The + * submitter receives a callback ID that external systems use to report completion. + * + * @param the result type + * @param name the unique operation name within this context + * @param resultType the result class for deserialization + * @param func the submitter function, receiving the callback ID and a {@link StepContext} + * @return the callback result + */ public T waitForCallback(String name, Class resultType, BiConsumer func) { return waitForCallbackAsync( name, @@ -371,12 +539,14 @@ public T waitForCallback(String name, Class resultType, BiConsumer T waitForCallback(String name, TypeToken typeToken, BiConsumer func) { return waitForCallbackAsync( name, typeToken, func, WaitForCallbackConfig.builder().build()) .get(); } + /** Executes a submitter and waits for an external callback with custom configuration, blocking until complete. */ public T waitForCallback( String name, Class resultType, @@ -386,6 +556,7 @@ public T waitForCallback( .get(); } + /** Executes a submitter and waits for an external callback using a {@link TypeToken} and custom configuration. */ public T waitForCallback( String name, TypeToken typeToken, @@ -395,6 +566,7 @@ public T waitForCallback( .get(); } + /** Asynchronously executes a submitter and waits for an external callback. */ public DurableFuture waitForCallbackAsync( String name, Class resultType, BiConsumer func) { return waitForCallbackAsync( @@ -404,12 +576,14 @@ public DurableFuture waitForCallbackAsync( WaitForCallbackConfig.builder().build()); } + /** Asynchronously executes a submitter and waits for an external callback using a {@link TypeToken}. */ public DurableFuture waitForCallbackAsync( String name, TypeToken typeToken, BiConsumer func) { return waitForCallbackAsync( name, typeToken, func, WaitForCallbackConfig.builder().build()); } + /** Asynchronously executes a submitter and waits for an external callback with custom configuration. */ public DurableFuture waitForCallbackAsync( String name, Class resultType, @@ -418,6 +592,21 @@ public DurableFuture waitForCallbackAsync( return waitForCallbackAsync(name, TypeToken.get(resultType), func, waitForCallbackConfig); } + /** + * Asynchronously executes a submitter and waits for an external callback using a {@link TypeToken} and custom + * configuration. + * + *

This is the core waitForCallbackAsync implementation. All other waitForCallback/waitForCallbackAsync overloads + * delegate here. Internally creates a child context containing a callback operation and a step that runs the + * submitter function. + * + * @param the result type + * @param name the unique operation name within this context + * @param typeToken the type token for deserialization of generic result types + * @param func the submitter function, receiving the callback ID and a {@link StepContext} + * @param waitForCallbackConfig the configuration for both the callback and submitter step + * @return a future representing the callback result + */ public DurableFuture waitForCallbackAsync( String name, TypeToken typeToken, diff --git a/sdk/src/main/java/software/amazon/lambda/durable/operation/BaseDurableOperation.java b/sdk/src/main/java/software/amazon/lambda/durable/operation/BaseDurableOperation.java index d0d70687..1562a3e8 100644 --- a/sdk/src/main/java/software/amazon/lambda/durable/operation/BaseDurableOperation.java +++ b/sdk/src/main/java/software/amazon/lambda/durable/operation/BaseDurableOperation.java @@ -55,6 +55,14 @@ public abstract class BaseDurableOperation implements DurableFuture { protected final CompletableFuture completionFuture; private final DurableContext durableContext; + /** + * Constructs a new durable operation. + * + * @param operationIdentifier the unique identifier for this operation + * @param resultTypeToken the type token for deserializing the result + * @param resultSerDes the serializer/deserializer for the result + * @param durableContext the parent context this operation belongs to + */ protected BaseDurableOperation( OperationIdentifier operationIdentifier, TypeToken resultTypeToken, @@ -92,12 +100,15 @@ protected DurableContext getContext() { return durableContext; } - /** Gets the operation type */ + /** Gets the operation type. */ public OperationType getType() { return operationIdentifier.operationType(); } - /** Starts the operation, processes the operation updates from backend. Does not block. */ + /** + * Starts the operation by checking for an existing checkpoint. If a checkpoint exists, validates and replays it; + * otherwise starts fresh execution. + */ public void execute() { var existing = getOperation(); @@ -109,10 +120,14 @@ public void execute() { } } - /** Starts the operation. */ + /** Starts the operation on first execution (no existing checkpoint). */ protected abstract void start(); - /** Replays the operation. */ + /** + * Replays the operation from an existing checkpoint. + * + * @param existing the checkpointed operation state + */ protected abstract void replay(Operation existing); /** @@ -151,12 +166,17 @@ private void validateCurrentThreadType() { } } - /** Checks if this operation is completed */ + /** Returns true if this operation has completed (successfully or exceptionally). */ protected boolean isOperationCompleted() { return completionFuture.isDone(); } - /** Waits for the operation to complete and suspends the execution if no active thread is running */ + /** + * Waits for the operation to complete. Deregisters the current thread to allow Lambda suspension if the operation + * is still in progress, then re-registers when the operation completes. + * + * @return the completed operation + */ protected Operation waitForOperationCompletion() { validateCurrentThreadType(); @@ -198,7 +218,12 @@ protected Operation waitForOperationCompletion() { return op; } - /** Receives operation updates from ExecutionManager and updates the internal state of the operation */ + /** + * Receives operation updates from ExecutionManager. Completes the internal future when the operation reaches a + * terminal status, unblocking any threads waiting on this operation. + * + * @param operation the updated operation state + */ public void onCheckpointComplete(Operation operation) { if (ExecutionManager.isTerminalStatus(operation.status())) { // This method handles only terminal status updates. Override this method if a DurableOperation needs to @@ -229,39 +254,63 @@ protected void markAlreadyCompleted() { } } - // terminate the execution + /** + * Terminates the execution with the given exception. + * + * @param exception the unrecoverable exception + * @return never returns normally; always throws + */ protected T terminateExecution(UnrecoverableDurableExecutionException exception) { executionManager.terminateExecution(exception); // Exception is already thrown from above. Keep the throw statement below to make tests happy throw exception; } + /** + * Terminates the execution with an {@link IllegalDurableOperationException}. + * + * @param message the error message + * @return never returns normally; always throws + */ protected T terminateExecutionWithIllegalDurableOperationException(String message) { return terminateExecution(new IllegalDurableOperationException(message)); } - // advanced thread and context control + /** + * Registers a thread as active in the execution manager. + * + * @param threadId the thread identifier to register + */ protected void registerActiveThread(String threadId) { executionManager.registerActiveThread(threadId); } + /** Returns the current thread's context from the execution manager. */ protected ThreadContext getCurrentThreadContext() { return executionManager.getCurrentThreadContext(); } - // polling and checkpointing + /** Polls the backend for updates to this operation. */ protected CompletableFuture pollForOperationUpdates() { return executionManager.pollForOperationUpdates(getOperationId()); } + /** + * Polls the backend for updates to this operation after the specified delay. + * + * @param delay the delay before polling + * @return a future that completes with the updated operation + */ protected CompletableFuture pollForOperationUpdates(Duration delay) { return executionManager.pollForOperationUpdates(getOperationId(), delay); } + /** Sends an operation update synchronously (blocks until the update is acknowledged). */ protected void sendOperationUpdate(OperationUpdate.Builder builder) { sendOperationUpdateAsync(builder).join(); } + /** Sends an operation update asynchronously. */ protected CompletableFuture sendOperationUpdateAsync(OperationUpdate.Builder builder) { return executionManager.sendOperationUpdate(builder.id(getOperationId()) .name(getName()) @@ -270,7 +319,13 @@ protected CompletableFuture sendOperationUpdateAsync(OperationUpdate.Build .build()); } - // serialization/deserialization utilities + /** + * Deserializes a result string into the operation's result type. + * + * @param result the serialized result string + * @return the deserialized result + * @throws SerDesException if deserialization fails + */ protected T deserializeResult(String result) { try { return resultSerDes.deserialize(result, resultTypeToken); @@ -283,14 +338,33 @@ protected T deserializeResult(String result) { } } + /** + * Serializes the result to a string. + * + * @param result the result to serialize + * @return the serialized string + */ protected String serializeResult(T result) { return resultSerDes.serialize(result); } + /** + * Serializes a throwable into an {@link ErrorObject} for checkpointing. + * + * @param throwable the exception to serialize + * @return the serialized error object + */ protected ErrorObject serializeException(Throwable throwable) { return ExceptionHelper.buildErrorObject(throwable, resultSerDes); } + /** + * Deserializes an {@link ErrorObject} back into a throwable, reconstructing the original exception type and stack + * trace when possible. Falls back to null if the exception class is not found or deserialization fails. + * + * @param errorObject the serialized error object + * @return the reconstructed throwable, or null if reconstruction is not possible + */ protected Throwable deserializeException(ErrorObject errorObject) { Throwable original = null; if (errorObject == null) {