diff --git a/sdk-integration-tests/src/test/java/software/amazon/lambda/durable/CallbackIntegrationTest.java b/sdk-integration-tests/src/test/java/software/amazon/lambda/durable/CallbackIntegrationTest.java index b253425e..4bd95806 100644 --- a/sdk-integration-tests/src/test/java/software/amazon/lambda/durable/CallbackIntegrationTest.java +++ b/sdk-integration-tests/src/test/java/software/amazon/lambda/durable/CallbackIntegrationTest.java @@ -13,6 +13,7 @@ import software.amazon.lambda.durable.config.CallbackConfig; import software.amazon.lambda.durable.exception.CallbackFailedException; import software.amazon.lambda.durable.exception.CallbackTimeoutException; +import software.amazon.lambda.durable.execution.SuspendExecutionException; import software.amazon.lambda.durable.model.ExecutionStatus; import software.amazon.lambda.durable.serde.JacksonSerDes; import software.amazon.lambda.durable.serde.SerDes; @@ -271,6 +272,9 @@ void waitForCallbackCallbackFailed() { fail(); return "should not reach here"; } catch (Exception e) { + if (e instanceof SuspendExecutionException) { + throw e; + } assertInstanceOf(CallbackFailedException.class, e); throw e; } @@ -306,6 +310,9 @@ void waitForCallbackCallbackTimeout() { fail(); return "should not reach here"; } catch (Exception e) { + if (e instanceof SuspendExecutionException) { + throw e; + } assertInstanceOf(CallbackTimeoutException.class, e); throw e; } @@ -333,6 +340,9 @@ void waitForCallbackCallbackFailedWithUserException() { throw new IllegalArgumentException(errorMessage); }); } catch (Exception e) { + if (e instanceof SuspendExecutionException) { + throw e; + } assertInstanceOf(IllegalArgumentException.class, e); assertEquals(errorMessage, e.getMessage()); throw e; diff --git a/sdk-testing/src/main/java/software/amazon/lambda/durable/testing/LocalMemoryExecutionClient.java b/sdk-testing/src/main/java/software/amazon/lambda/durable/testing/LocalMemoryExecutionClient.java index 5c1cd8ea..e7e99ab7 100644 --- a/sdk-testing/src/main/java/software/amazon/lambda/durable/testing/LocalMemoryExecutionClient.java +++ b/sdk-testing/src/main/java/software/amazon/lambda/durable/testing/LocalMemoryExecutionClient.java @@ -249,12 +249,20 @@ private StepDetails buildStepDetails(OperationUpdate update) { var existing = existingOp != null ? existingOp.stepDetails() : null; var detailsBuilder = existing != null ? existing.toBuilder() : StepDetails.builder(); + var attempt = existing != null && existing.attempt() != null ? existing.attempt() + 1 : 1; - if (update.action() == OperationAction.RETRY || update.action() == OperationAction.FAIL) { - var attempt = existing != null && existing.attempt() != null ? existing.attempt() + 1 : 1; + if (update.action() == OperationAction.FAIL) { detailsBuilder.attempt(attempt).error(update.error()); } + if (update.action() == OperationAction.RETRY) { + detailsBuilder + .attempt(attempt) + .error(update.error()) + .nextAttemptTimestamp( + Instant.now().plusSeconds(update.stepOptions().nextAttemptDelaySeconds())); + } + if (update.payload() != null) { detailsBuilder.result(update.payload()); } diff --git a/sdk/src/main/java/software/amazon/lambda/durable/execution/ExecutionManager.java b/sdk/src/main/java/software/amazon/lambda/durable/execution/ExecutionManager.java index f4ac3cf2..92e1ff1c 100644 --- a/sdk/src/main/java/software/amazon/lambda/durable/execution/ExecutionManager.java +++ b/sdk/src/main/java/software/amazon/lambda/durable/execution/ExecutionManager.java @@ -11,7 +11,9 @@ import java.util.Map; import java.util.Objects; import java.util.Set; +import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; import org.slf4j.Logger; @@ -54,6 +56,7 @@ public class ExecutionManager implements AutoCloseable { private final Operation executionOp; private final String durableExecutionArn; private final AtomicReference executionMode; + private final DurableConfig durableConfig; // ===== Thread Coordination ===== private final Map registeredOperations = Collections.synchronizedMap(new HashMap<>()); @@ -65,6 +68,7 @@ public class ExecutionManager implements AutoCloseable { private final CheckpointManager checkpointManager; public ExecutionManager(DurableExecutionInput input, DurableConfig config) { + durableConfig = config; this.durableExecutionArn = input.durableExecutionArn(); // Create checkpoint batcher for internal coordination @@ -276,9 +280,41 @@ public CompletableFuture pollForOperationUpdates(String operationId, /** Shutdown the checkpoint batcher. */ @Override public void close() { + validateRunningThreads(); + checkpointManager.shutdown(); } + private void validateRunningThreads() { + // This will detect stuck user thread and thread leaks in the thread pool + for (BaseDurableOperation op : registeredOperations.values()) { + var userHandlerFuture = op.getRunningUserHandler(); + if (userHandlerFuture != null && !userHandlerFuture.isDone()) { + // Some user threads can still be running because + // the operations that run them have never been waiting for and the execution has completed. + logger.info("Waiting for operation to complete before shutting down: {}", op.getOperationId()); + try { + userHandlerFuture.get(); + } catch (InterruptedException | CancellationException e) { + // if the user handler is stuck + throw new IllegalStateException( + "Stuck running user handler when shutting down: " + op.getOperationId()); + } catch (Exception e) { + // ok if the future completed exceptionally + } + } + } + + // double check if the thread pool is empty + if (durableConfig.getExecutorService() instanceof ThreadPoolExecutor threadPoolExecutor) { + var threadCount = threadPoolExecutor.getActiveCount(); + // This may or may not be a problem because getActiveCount doesn't return an accurate number + if (threadCount > 0) { + logger.warn("{} active threads in user executor pool when shutting down", threadCount); + } + } + } + /** Returns {@code true} if the given status represents a terminal (final) operation state. */ public static boolean isTerminalStatus(OperationStatus status) { return status == OperationStatus.SUCCEEDED diff --git a/sdk/src/main/java/software/amazon/lambda/durable/operation/BaseDurableOperation.java b/sdk/src/main/java/software/amazon/lambda/durable/operation/BaseDurableOperation.java index 4312a5ab..069977a0 100644 --- a/sdk/src/main/java/software/amazon/lambda/durable/operation/BaseDurableOperation.java +++ b/sdk/src/main/java/software/amazon/lambda/durable/operation/BaseDurableOperation.java @@ -405,4 +405,8 @@ protected void validateReplay(Operation checkpointed) { getOperationId(), checkpointed.subType(), getSubType()))); } } + + public CompletableFuture getRunningUserHandler() { + return runningUserHandler.get(); + } } diff --git a/sdk/src/main/java/software/amazon/lambda/durable/operation/StepOperation.java b/sdk/src/main/java/software/amazon/lambda/durable/operation/StepOperation.java index 26b97f5f..2ee7aa1a 100644 --- a/sdk/src/main/java/software/amazon/lambda/durable/operation/StepOperation.java +++ b/sdk/src/main/java/software/amazon/lambda/durable/operation/StepOperation.java @@ -75,7 +75,14 @@ protected void replay(Operation existing) { } } // Step is pending retry - Start polling for PENDING -> READY transition - case PENDING -> pollReadyAndExecuteStepLogic(existing.stepDetails().nextAttemptTimestamp(), attempt); + case PENDING -> { + if (existing.stepDetails() != null && existing.stepDetails().nextAttemptTimestamp() != null) { + pollReadyAndExecuteStepLogic(existing.stepDetails().nextAttemptTimestamp(), attempt); + } else { + throw terminateExecutionWithIllegalDurableOperationException( + "Unexpected PENDING step without nextAttemptTimestamp: " + getOperationId()); + } + } // Execute with current attempt case READY -> executeStepLogic(attempt); default ->