Skip to content

Commit 08df377

Browse files
committed
parity: resolve impossible-to-succeed TODO
- Remove impossible-to-succeed early termination logic and match TypeScript implementation - Refactor ExecutionCounters with separate should_continue() and is_complete() methods mimicking the typescript implementation. - Add documentation explaining TypeScript parity decisions - Update completion logic to properly handle ALL_COMPLETED with failures when all tasks finish Resolves TODO at line 416 by implementing TypeScript-compatible completion behavior where operations continue until normal completion criteria are met rather than terminating early on impossible-to-succeed conditions.
1 parent fc0992a commit 08df377

2 files changed

Lines changed: 572 additions & 46 deletions

File tree

src/aws_durable_execution_sdk_python/concurrency.py

Lines changed: 88 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -386,25 +386,63 @@ def fail_task(self) -> None:
386386
with self._lock:
387387
self.failure_count += 1
388388

389-
def should_complete(self) -> bool:
390-
"""Check if execution should complete."""
389+
def should_continue(self) -> bool:
390+
"""
391+
Check if we should continue starting new tasks (based on failure tolerance).
392+
Matches TypeScript shouldContinue() logic.
393+
"""
391394
with self._lock:
392-
# Success condition
393-
if self.success_count >= self.min_successful:
394-
return True
395+
# If no completion config, only continue if no failures
396+
if (
397+
self.tolerated_failure_count is None
398+
and self.tolerated_failure_percentage is None
399+
):
400+
return self.failure_count == 0
395401

396-
# Failure conditions
397-
if self._is_failure_condition_reached(
398-
tolerated_count=self.tolerated_failure_count,
399-
tolerated_percentage=self.tolerated_failure_percentage,
400-
failure_count=self.failure_count,
402+
# Check failure count tolerance
403+
if (
404+
self.tolerated_failure_count is not None
405+
and self.failure_count > self.tolerated_failure_count
401406
):
402-
return True
407+
return False
403408

404-
# Impossible to succeed condition
405-
# TODO: should this keep running? TS doesn't currently handle this either.
406-
remaining_tasks = self.total_tasks - self.success_count - self.failure_count
407-
return self.success_count + remaining_tasks < self.min_successful
409+
# Check failure percentage tolerance
410+
if self.tolerated_failure_percentage is not None and self.total_tasks > 0:
411+
failure_percentage = (self.failure_count / self.total_tasks) * 100
412+
if failure_percentage > self.tolerated_failure_percentage:
413+
return False
414+
415+
return True
416+
417+
def is_complete(self) -> bool:
418+
"""
419+
Check if execution should complete (based on completion criteria).
420+
Matches TypeScript isComplete() logic.
421+
"""
422+
with self._lock:
423+
completed_count = self.success_count + self.failure_count
424+
425+
# All tasks completed
426+
if completed_count == self.total_tasks:
427+
# Complete if no failure tolerance OR no failures OR min successful reached
428+
return (
429+
(
430+
self.tolerated_failure_count is None
431+
and self.tolerated_failure_percentage is None
432+
)
433+
or self.failure_count == 0
434+
or self.success_count >= self.min_successful
435+
)
436+
437+
# when we breach min successful, we've completed
438+
return self.success_count >= self.min_successful
439+
440+
def should_complete(self) -> bool:
441+
"""
442+
Check if execution should complete.
443+
Combines TypeScript shouldContinue() and isComplete() logic.
444+
"""
445+
return self.is_complete() or not self.should_continue()
408446

409447
def is_all_completed(self) -> bool:
410448
"""True if all tasks completed successfully."""
@@ -690,40 +728,46 @@ def _on_task_complete(
690728
self._completion_event.set()
691729

692730
def _create_result(self) -> BatchResult[ResultType]:
693-
"""Build the final BatchResult."""
694-
batch_items: list[BatchItem[ResultType]] = []
695-
completed_branches: list[ExecutableWithState] = []
696-
failed_branches: list[ExecutableWithState] = []
731+
"""
732+
Build the final BatchResult.
697733
734+
When this function executes, we've terminated the upper/parent context for whatever reason.
735+
It follows that our items can be only in 3 states, Completed, Failed and Started (in all of the possible forms).
736+
We tag each branch based on its observed value at the time of completion of the parent / upper context, and pass the
737+
results to BatchResult.
738+
739+
Any inference wrt completion reason is left up to BatchResult, keeping the logic inference isolated.
740+
"""
741+
batch_items: list[BatchItem[ResultType]] = []
698742
for executable in self.executables_with_state:
699-
if executable.status is BranchStatus.COMPLETED:
700-
completed_branches.append(executable)
701-
batch_items.append(
702-
BatchItem(
703-
executable.index, BatchItemStatus.SUCCEEDED, executable.result
743+
match executable.status:
744+
case BranchStatus.COMPLETED:
745+
batch_items.append(
746+
BatchItem(
747+
executable.index,
748+
BatchItemStatus.SUCCEEDED,
749+
executable.result,
750+
)
704751
)
705-
)
706-
elif executable.status is BranchStatus.FAILED:
707-
failed_branches.append(executable)
708-
batch_items.append(
709-
BatchItem(
710-
executable.index,
711-
BatchItemStatus.FAILED,
712-
error=ErrorObject.from_exception(executable.error),
752+
case BranchStatus.FAILED:
753+
batch_items.append(
754+
BatchItem(
755+
executable.index,
756+
BatchItemStatus.FAILED,
757+
error=ErrorObject.from_exception(executable.error),
758+
)
759+
)
760+
case (
761+
BranchStatus.PENDING
762+
| BranchStatus.RUNNING
763+
| BranchStatus.SUSPENDED
764+
| BranchStatus.SUSPENDED_WITH_TIMEOUT
765+
):
766+
batch_items.append(
767+
BatchItem(executable.index, BatchItemStatus.STARTED)
713768
)
714-
)
715-
716-
completion_reason: CompletionReason = (
717-
CompletionReason.ALL_COMPLETED
718-
if self.counters.is_all_completed()
719-
else (
720-
CompletionReason.MIN_SUCCESSFUL_REACHED
721-
if self.counters.is_min_successful_reached()
722-
else CompletionReason.FAILURE_TOLERANCE_EXCEEDED
723-
)
724-
)
725769

726-
return BatchResult(batch_items, completion_reason)
770+
return BatchResult.from_items(batch_items, self.completion_config)
727771

728772
def _execute_item_in_child_context(
729773
self,

0 commit comments

Comments
 (0)