Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import software.amazon.lambda.durable.config.CallbackConfig;
import software.amazon.lambda.durable.exception.CallbackFailedException;
import software.amazon.lambda.durable.exception.CallbackTimeoutException;
import software.amazon.lambda.durable.execution.SuspendExecutionException;
import software.amazon.lambda.durable.model.ExecutionStatus;
import software.amazon.lambda.durable.serde.JacksonSerDes;
import software.amazon.lambda.durable.serde.SerDes;
Expand Down Expand Up @@ -271,6 +272,9 @@ void waitForCallbackCallbackFailed() {
fail();
return "should not reach here";
} catch (Exception e) {
if (e instanceof SuspendExecutionException) {
throw e;
}
assertInstanceOf(CallbackFailedException.class, e);
throw e;
}
Expand Down Expand Up @@ -306,6 +310,9 @@ void waitForCallbackCallbackTimeout() {
fail();
return "should not reach here";
} catch (Exception e) {
if (e instanceof SuspendExecutionException) {
throw e;
}
assertInstanceOf(CallbackTimeoutException.class, e);
throw e;
}
Expand Down Expand Up @@ -333,6 +340,9 @@ void waitForCallbackCallbackFailedWithUserException() {
throw new IllegalArgumentException(errorMessage);
});
} catch (Exception e) {
if (e instanceof SuspendExecutionException) {
throw e;
}
assertInstanceOf(IllegalArgumentException.class, e);
assertEquals(errorMessage, e.getMessage());
throw e;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -249,12 +249,20 @@ private StepDetails buildStepDetails(OperationUpdate update) {
var existing = existingOp != null ? existingOp.stepDetails() : null;

var detailsBuilder = existing != null ? existing.toBuilder() : StepDetails.builder();
var attempt = existing != null && existing.attempt() != null ? existing.attempt() + 1 : 1;

if (update.action() == OperationAction.RETRY || update.action() == OperationAction.FAIL) {
var attempt = existing != null && existing.attempt() != null ? existing.attempt() + 1 : 1;
if (update.action() == OperationAction.FAIL) {
detailsBuilder.attempt(attempt).error(update.error());
}

if (update.action() == OperationAction.RETRY) {
detailsBuilder
.attempt(attempt)
.error(update.error())
.nextAttemptTimestamp(
Instant.now().plusSeconds(update.stepOptions().nextAttemptDelaySeconds()));
}

if (update.payload() != null) {
detailsBuilder.result(update.payload());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.slf4j.Logger;
Expand Down Expand Up @@ -54,6 +56,7 @@ public class ExecutionManager implements AutoCloseable {
private final Operation executionOp;
private final String durableExecutionArn;
private final AtomicReference<ExecutionMode> executionMode;
private final DurableConfig durableConfig;

// ===== Thread Coordination =====
private final Map<String, BaseDurableOperation> registeredOperations = Collections.synchronizedMap(new HashMap<>());
Expand All @@ -65,6 +68,7 @@ public class ExecutionManager implements AutoCloseable {
private final CheckpointManager checkpointManager;

public ExecutionManager(DurableExecutionInput input, DurableConfig config) {
durableConfig = config;
this.durableExecutionArn = input.durableExecutionArn();

// Create checkpoint batcher for internal coordination
Expand Down Expand Up @@ -276,9 +280,41 @@ public CompletableFuture<Operation> pollForOperationUpdates(String operationId,
/** Shutdown the checkpoint batcher. */
@Override
public void close() {
validateRunningThreads();

checkpointManager.shutdown();
}

private void validateRunningThreads() {
// This will detect stuck user thread and thread leaks in the thread pool
for (BaseDurableOperation op : registeredOperations.values()) {
var userHandlerFuture = op.getRunningUserHandler();
if (userHandlerFuture != null && !userHandlerFuture.isDone()) {
// Some user threads can still be running because
// the operations that run them have never been waiting for and the execution has completed.
logger.info("Waiting for operation to complete before shutting down: {}", op.getOperationId());
try {
userHandlerFuture.get();
} catch (InterruptedException | CancellationException e) {
// if the user handler is stuck
throw new IllegalStateException(
"Stuck running user handler when shutting down: " + op.getOperationId());
} catch (Exception e) {
// ok if the future completed exceptionally
}
}
}

// double check if the thread pool is empty
if (durableConfig.getExecutorService() instanceof ThreadPoolExecutor threadPoolExecutor) {
var threadCount = threadPoolExecutor.getActiveCount();
// This may or may not be a problem because getActiveCount doesn't return an accurate number
if (threadCount > 0) {
logger.warn("{} active threads in user executor pool when shutting down", threadCount);
}
}
}

/** Returns {@code true} if the given status represents a terminal (final) operation state. */
public static boolean isTerminalStatus(OperationStatus status) {
return status == OperationStatus.SUCCEEDED
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -405,4 +405,8 @@ protected void validateReplay(Operation checkpointed) {
getOperationId(), checkpointed.subType(), getSubType())));
}
}

public CompletableFuture<Void> getRunningUserHandler() {
return runningUserHandler.get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,14 @@ protected void replay(Operation existing) {
}
}
// Step is pending retry - Start polling for PENDING -> READY transition
case PENDING -> pollReadyAndExecuteStepLogic(existing.stepDetails().nextAttemptTimestamp(), attempt);
case PENDING -> {
if (existing.stepDetails() != null && existing.stepDetails().nextAttemptTimestamp() != null) {
pollReadyAndExecuteStepLogic(existing.stepDetails().nextAttemptTimestamp(), attempt);
} else {
throw terminateExecutionWithIllegalDurableOperationException(
"Unexpected PENDING step without nextAttemptTimestamp: " + getOperationId());
}
}
// Execute with current attempt
case READY -> executeStepLogic(attempt);
default ->
Expand Down
Loading