Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/aws_durable_execution_sdk_python/concurrency/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,8 @@ def __init__(
self._suspend_exception: SuspendExecution | None = None

# ExecutionCounters will keep track of completion criteria and on-going counters
min_successful = self.completion_config.min_successful or len(self.executables)
# Pass None if min_successful not explicitly set to distinguish from "require all"
min_successful = self.completion_config.min_successful
tolerated_failure_count = self.completion_config.tolerated_failure_count
tolerated_failure_percentage = (
self.completion_config.tolerated_failure_percentage
Expand Down
38 changes: 21 additions & 17 deletions src/aws_durable_execution_sdk_python/concurrency/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -355,12 +355,12 @@ class ExecutionCounters:
def __init__(
self,
total_tasks: int,
min_successful: int,
min_successful: int | None,
tolerated_failure_count: int | None,
tolerated_failure_percentage: float | None,
):
self.total_tasks: int = total_tasks
self.min_successful: int = min_successful
self.min_successful: int | None = min_successful
self.tolerated_failure_count: int | None = tolerated_failure_count
self.tolerated_failure_percentage: float | None = tolerated_failure_percentage
self.success_count: int = 0
Expand Down Expand Up @@ -408,25 +408,26 @@ def should_continue(self) -> bool:
def is_complete(self) -> bool:
"""
Check if execution should complete (based on completion criteria).
Matches TypeScript isComplete() logic.

Note: This method only checks completion criteria (all done, or min_successful met).
Failure tolerance is enforced separately by should_continue() and combined in should_complete().
"""
with self._lock:
completed_count = self.success_count + self.failure_count

# All tasks completed
if completed_count == self.total_tasks:
# Complete if no failure tolerance OR no failures OR min successful reached
return (
(
self.tolerated_failure_count is None
and self.tolerated_failure_percentage is None
)
or self.failure_count == 0
or self.success_count >= self.min_successful
)

# when we breach min successful, we've completed
return self.success_count >= self.min_successful
# If min_successful is explicitly set, check if we met it
# Otherwise, complete when all tasks are done
if self.min_successful is not None:
return self.success_count >= self.min_successful
return True

# Early completion: when we breach min_successful (only if explicitly set)
return (
self.min_successful is not None
and self.success_count >= self.min_successful
)

def should_complete(self) -> bool:
"""
Expand All @@ -441,9 +442,12 @@ def is_all_completed(self) -> bool:
return self.success_count == self.total_tasks

def is_min_successful_reached(self) -> bool:
"""True if minimum successful tasks reached."""
"""True if minimum successful task is both set and reached."""
with self._lock:
return self.success_count >= self.min_successful
return (
self.min_successful is not None
and self.success_count >= self.min_successful
)

def is_failure_tolerance_exceeded(self) -> bool:
"""True if failure tolerance was exceeded."""
Expand Down