diff --git a/sdk/src/main/java/software/amazon/lambda/durable/execution/ExecutionManager.java b/sdk/src/main/java/software/amazon/lambda/durable/execution/ExecutionManager.java index 466ed8fc0..3c84b43fa 100644 --- a/sdk/src/main/java/software/amazon/lambda/durable/execution/ExecutionManager.java +++ b/sdk/src/main/java/software/amazon/lambda/durable/execution/ExecutionManager.java @@ -293,6 +293,14 @@ public void suspendExecution() { throw ex; } + /** + * returns {@code true} if the execution is terminated exceptionally (with a {@link SuspendExecutionException} or an + * unrecoverable error). + */ + public boolean isExecutionCompletedExceptionally() { + return executionExceptionFuture.isCompletedExceptionally(); + } + private void stopAllOperations(Exception cause) { registeredOperations.values().forEach(op -> op.getCompletionFuture().completeExceptionally(cause)); } diff --git a/sdk/src/main/java/software/amazon/lambda/durable/operation/BaseDurableOperation.java b/sdk/src/main/java/software/amazon/lambda/durable/operation/BaseDurableOperation.java index a5026517e..3acd00bc1 100644 --- a/sdk/src/main/java/software/amazon/lambda/durable/operation/BaseDurableOperation.java +++ b/sdk/src/main/java/software/amazon/lambda/durable/operation/BaseDurableOperation.java @@ -229,6 +229,15 @@ protected void runUserHandler(Runnable runnable, String contextId, ThreadType th executionManager.setCurrentThreadContext(new ThreadContext(contextId, threadType)); try { runnable.run(); + } catch (Throwable throwable) { + // Operations always wrap the user's function and handles all possible exceptions except for + // SuspendExecutionException. + if (!executionManager.isExecutionCompletedExceptionally() + && !(throwable instanceof SuspendExecutionException)) { + logger.error("An unhandled exception is thrown from user function: ", throwable); + throw terminateExecutionWithIllegalDurableOperationException( + "An unhandled exception is thrown from user function: " + throwable); + } } finally { if (contextId != null) { try { diff --git a/sdk/src/main/java/software/amazon/lambda/durable/operation/StepOperation.java b/sdk/src/main/java/software/amazon/lambda/durable/operation/StepOperation.java index 5283e0d51..b7d72d34b 100644 --- a/sdk/src/main/java/software/amazon/lambda/durable/operation/StepOperation.java +++ b/sdk/src/main/java/software/amazon/lambda/durable/operation/StepOperation.java @@ -2,6 +2,7 @@ // SPDX-License-Identifier: Apache-2.0 package software.amazon.lambda.durable.operation; +import java.time.Instant; import java.util.concurrent.CompletableFuture; import java.util.function.Function; import software.amazon.awssdk.services.lambda.model.ErrorObject; @@ -74,7 +75,7 @@ protected void replay(Operation existing) { } } // Step is pending retry - Start polling for PENDING -> READY transition - case PENDING -> pollReadyAndExecuteStepLogic(existing, attempt); + case PENDING -> pollReadyAndExecuteStepLogic(existing.stepDetails().nextAttemptTimestamp(), attempt); // Execute with current attempt case READY -> executeStepLogic(attempt); default -> @@ -83,9 +84,8 @@ protected void replay(Operation existing) { } } - private CompletableFuture pollReadyAndExecuteStepLogic(Operation existing, int attempt) { - var nextAttemptInstant = existing.stepDetails().nextAttemptTimestamp(); - return pollForOperationUpdates(nextAttemptInstant) + private void pollReadyAndExecuteStepLogic(Instant nextAttemptInstant, int attempt) { + pollForOperationUpdates(nextAttemptInstant) .thenCompose(op -> op.status() == OperationStatus.READY ? CompletableFuture.completedFuture(op) : pollForOperationUpdates(nextAttemptInstant)) @@ -163,19 +163,19 @@ private void handleStepFailure(Throwable exception, int attempt) { if (isRetryable && retryDecision.shouldRetry()) { // Send RETRY + var retryDelayInSeconds = Math.toIntExact(retryDecision.delay().toSeconds()); var retryUpdate = OperationUpdate.builder() .action(OperationAction.RETRY) .error(errorObject) .stepOptions(StepOptions.builder() // RetryDecisions always produce integer number of seconds greater or equals to // 1 (no sub-second numbers) - .nextAttemptDelaySeconds( - Math.toIntExact(retryDecision.delay().toSeconds())) + .nextAttemptDelaySeconds(retryDelayInSeconds) .build()); sendOperationUpdate(retryUpdate); // Poll for READY status and then execute the step again - pollReadyAndExecuteStepLogic(getOperation(), attempt + 1); + pollReadyAndExecuteStepLogic(Instant.now().plusSeconds(retryDelayInSeconds), attempt + 1); } else { // Send FAIL - retries exhausted var failUpdate =