Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,14 @@ public void suspendExecution() {
throw ex;
}

/**
* returns {@code true} if the execution is terminated exceptionally (with a {@link SuspendExecutionException} or an
* unrecoverable error).
*/
public boolean isExecutionCompletedExceptionally() {
return executionExceptionFuture.isCompletedExceptionally();
}

private void stopAllOperations(Exception cause) {
registeredOperations.values().forEach(op -> op.getCompletionFuture().completeExceptionally(cause));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,15 @@ protected void runUserHandler(Runnable runnable, String contextId, ThreadType th
executionManager.setCurrentThreadContext(new ThreadContext(contextId, threadType));
try {
runnable.run();
} catch (Throwable throwable) {
// Operations always wrap the user's function and handles all possible exceptions except for
// SuspendExecutionException.
if (!executionManager.isExecutionCompletedExceptionally()
&& !(throwable instanceof SuspendExecutionException)) {
logger.error("An unhandled exception is thrown from user function: ", throwable);
throw terminateExecutionWithIllegalDurableOperationException(
"An unhandled exception is thrown from user function: " + throwable);
}
} finally {
if (contextId != null) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// SPDX-License-Identifier: Apache-2.0
package software.amazon.lambda.durable.operation;

import java.time.Instant;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import software.amazon.awssdk.services.lambda.model.ErrorObject;
Expand Down Expand Up @@ -74,7 +75,7 @@ protected void replay(Operation existing) {
}
}
// Step is pending retry - Start polling for PENDING -> READY transition
case PENDING -> pollReadyAndExecuteStepLogic(existing, attempt);
case PENDING -> pollReadyAndExecuteStepLogic(existing.stepDetails().nextAttemptTimestamp(), attempt);
// Execute with current attempt
case READY -> executeStepLogic(attempt);
default ->
Expand All @@ -83,9 +84,8 @@ protected void replay(Operation existing) {
}
}

private CompletableFuture<Void> pollReadyAndExecuteStepLogic(Operation existing, int attempt) {
var nextAttemptInstant = existing.stepDetails().nextAttemptTimestamp();
return pollForOperationUpdates(nextAttemptInstant)
private void pollReadyAndExecuteStepLogic(Instant nextAttemptInstant, int attempt) {
pollForOperationUpdates(nextAttemptInstant)
.thenCompose(op -> op.status() == OperationStatus.READY
? CompletableFuture.completedFuture(op)
: pollForOperationUpdates(nextAttemptInstant))
Expand Down Expand Up @@ -163,19 +163,19 @@ private void handleStepFailure(Throwable exception, int attempt) {

if (isRetryable && retryDecision.shouldRetry()) {
// Send RETRY
var retryDelayInSeconds = Math.toIntExact(retryDecision.delay().toSeconds());
var retryUpdate = OperationUpdate.builder()
.action(OperationAction.RETRY)
.error(errorObject)
.stepOptions(StepOptions.builder()
// RetryDecisions always produce integer number of seconds greater or equals to
// 1 (no sub-second numbers)
.nextAttemptDelaySeconds(
Math.toIntExact(retryDecision.delay().toSeconds()))
.nextAttemptDelaySeconds(retryDelayInSeconds)
.build());
sendOperationUpdate(retryUpdate);

// Poll for READY status and then execute the step again
pollReadyAndExecuteStepLogic(getOperation(), attempt + 1);
pollReadyAndExecuteStepLogic(Instant.now().plusSeconds(retryDelayInSeconds), attempt + 1);
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For Retry OperationAction, the backend doesn't immediately return an operation with the nextAttemptTimestamp, which is while we get null pointer for it here.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have any tests that needs to be updated?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Existing tests are able to cover this with the change in handling user function exceptions.

} else {
// Send FAIL - retries exhausted
var failUpdate =
Expand Down
Loading