From 08df377954305178e026d4197ba7943ab76b7747 Mon Sep 17 00:00:00 2001 From: Quinn Sinclair Date: Tue, 21 Oct 2025 00:02:21 +0000 Subject: [PATCH] parity: resolve impossible-to-succeed TODO - Remove impossible-to-succeed early termination logic and match TypeScript implementation - Refactor ExecutionCounters with separate should_continue() and is_complete() methods mimicking the typescript implementation. - Add documentation explaining TypeScript parity decisions - Update completion logic to properly handle ALL_COMPLETED with failures when all tasks finish Resolves TODO at line 416 by implementing TypeScript-compatible completion behavior where operations continue until normal completion criteria are met rather than terminating early on impossible-to-succeed conditions. --- .../concurrency.py | 132 +++-- tests/concurrency_test.py | 486 +++++++++++++++++- 2 files changed, 572 insertions(+), 46 deletions(-) diff --git a/src/aws_durable_execution_sdk_python/concurrency.py b/src/aws_durable_execution_sdk_python/concurrency.py index e102c92..fb43c03 100644 --- a/src/aws_durable_execution_sdk_python/concurrency.py +++ b/src/aws_durable_execution_sdk_python/concurrency.py @@ -386,25 +386,63 @@ def fail_task(self) -> None: with self._lock: self.failure_count += 1 - def should_complete(self) -> bool: - """Check if execution should complete.""" + def should_continue(self) -> bool: + """ + Check if we should continue starting new tasks (based on failure tolerance). + Matches TypeScript shouldContinue() logic. + """ with self._lock: - # Success condition - if self.success_count >= self.min_successful: - return True + # If no completion config, only continue if no failures + if ( + self.tolerated_failure_count is None + and self.tolerated_failure_percentage is None + ): + return self.failure_count == 0 - # Failure conditions - if self._is_failure_condition_reached( - tolerated_count=self.tolerated_failure_count, - tolerated_percentage=self.tolerated_failure_percentage, - failure_count=self.failure_count, + # Check failure count tolerance + if ( + self.tolerated_failure_count is not None + and self.failure_count > self.tolerated_failure_count ): - return True + return False - # Impossible to succeed condition - # TODO: should this keep running? TS doesn't currently handle this either. - remaining_tasks = self.total_tasks - self.success_count - self.failure_count - return self.success_count + remaining_tasks < self.min_successful + # Check failure percentage tolerance + if self.tolerated_failure_percentage is not None and self.total_tasks > 0: + failure_percentage = (self.failure_count / self.total_tasks) * 100 + if failure_percentage > self.tolerated_failure_percentage: + return False + + return True + + def is_complete(self) -> bool: + """ + Check if execution should complete (based on completion criteria). + Matches TypeScript isComplete() logic. + """ + with self._lock: + completed_count = self.success_count + self.failure_count + + # All tasks completed + if completed_count == self.total_tasks: + # Complete if no failure tolerance OR no failures OR min successful reached + return ( + ( + self.tolerated_failure_count is None + and self.tolerated_failure_percentage is None + ) + or self.failure_count == 0 + or self.success_count >= self.min_successful + ) + + # when we breach min successful, we've completed + return self.success_count >= self.min_successful + + def should_complete(self) -> bool: + """ + Check if execution should complete. + Combines TypeScript shouldContinue() and isComplete() logic. + """ + return self.is_complete() or not self.should_continue() def is_all_completed(self) -> bool: """True if all tasks completed successfully.""" @@ -690,40 +728,46 @@ def _on_task_complete( self._completion_event.set() def _create_result(self) -> BatchResult[ResultType]: - """Build the final BatchResult.""" - batch_items: list[BatchItem[ResultType]] = [] - completed_branches: list[ExecutableWithState] = [] - failed_branches: list[ExecutableWithState] = [] + """ + Build the final BatchResult. + When this function executes, we've terminated the upper/parent context for whatever reason. + It follows that our items can be only in 3 states, Completed, Failed and Started (in all of the possible forms). + We tag each branch based on its observed value at the time of completion of the parent / upper context, and pass the + results to BatchResult. + + Any inference wrt completion reason is left up to BatchResult, keeping the logic inference isolated. + """ + batch_items: list[BatchItem[ResultType]] = [] for executable in self.executables_with_state: - if executable.status is BranchStatus.COMPLETED: - completed_branches.append(executable) - batch_items.append( - BatchItem( - executable.index, BatchItemStatus.SUCCEEDED, executable.result + match executable.status: + case BranchStatus.COMPLETED: + batch_items.append( + BatchItem( + executable.index, + BatchItemStatus.SUCCEEDED, + executable.result, + ) ) - ) - elif executable.status is BranchStatus.FAILED: - failed_branches.append(executable) - batch_items.append( - BatchItem( - executable.index, - BatchItemStatus.FAILED, - error=ErrorObject.from_exception(executable.error), + case BranchStatus.FAILED: + batch_items.append( + BatchItem( + executable.index, + BatchItemStatus.FAILED, + error=ErrorObject.from_exception(executable.error), + ) + ) + case ( + BranchStatus.PENDING + | BranchStatus.RUNNING + | BranchStatus.SUSPENDED + | BranchStatus.SUSPENDED_WITH_TIMEOUT + ): + batch_items.append( + BatchItem(executable.index, BatchItemStatus.STARTED) ) - ) - - completion_reason: CompletionReason = ( - CompletionReason.ALL_COMPLETED - if self.counters.is_all_completed() - else ( - CompletionReason.MIN_SUCCESSFUL_REACHED - if self.counters.is_min_successful_reached() - else CompletionReason.FAILURE_TOLERANCE_EXCEEDED - ) - ) - return BatchResult(batch_items, completion_reason) + return BatchResult.from_items(batch_items, self.completion_config) def _execute_item_in_child_context( self, diff --git a/tests/concurrency_test.py b/tests/concurrency_test.py index c024749..d3d090b 100644 --- a/tests/concurrency_test.py +++ b/tests/concurrency_test.py @@ -1094,7 +1094,10 @@ def mock_run_in_child_context(func, name, config): assert len(result.all) == 2 assert result.all[0].status == BatchItemStatus.SUCCEEDED assert result.all[1].status == BatchItemStatus.FAILED - assert result.completion_reason == CompletionReason.MIN_SUCCESSFUL_REACHED + # WHEN all items complete, THEN completion reason is ALL_COMPLETED. + # we don't consider thresholds and limits. + # https://github.com/aws/aws-durable-execution-sdk-js/blob/ff8b72ef888dd47a840f36d4eb0ee84dd3b55a30/packages/aws-durable-execution-sdk-js/src/handlers/concurrent-execution-handler/concurrent-execution-handler.test.ts#L630-L655 + assert result.completion_reason == CompletionReason.ALL_COMPLETED def test_concurrent_executor_execute_item_in_child_context(): @@ -1177,7 +1180,10 @@ def mock_run_in_child_context(func, name, config): return func(Mock()) result = executor.execute(execution_state, mock_run_in_child_context) - assert result.completion_reason == CompletionReason.FAILURE_TOLERANCE_EXCEEDED + # WHEN all items complete, THEN completion reason is ALL_COMPLETED. + # we don't consider thresholds and limits. + # https://github.com/aws/aws-durable-execution-sdk-js/blob/ff8b72ef888dd47a840f36d4eb0ee84dd3b55a30/packages/aws-durable-execution-sdk-js/src/handlers/concurrent-execution-handler/concurrent-execution-handler.test.ts#L630-L655 + assert result.completion_reason == CompletionReason.ALL_COMPLETED def test_single_task_suspend_bubbles_up(): @@ -1845,6 +1851,482 @@ def mock_run_in_child_context(func, name, config): executor.execute(execution_state, mock_run_in_child_context) +# Tests for _create_result method match statement branches +def test_create_result_completed_branch(): + """Test _create_result with COMPLETED status branch.""" + + class TestExecutor(ConcurrentExecutor): + def execute_item(self, child_context, executable): + return f"result_{executable.index}" + + executables = [Executable(0, lambda: "test")] + completion_config = CompletionConfig(min_successful=1) + + executor = TestExecutor( + executables=executables, + max_concurrency=1, + completion_config=completion_config, + sub_type_top="TOP", + sub_type_iteration="ITER", + name_prefix="test_", + serdes=None, + ) + + # Create executable with COMPLETED status + exe_state = ExecutableWithState(executables[0]) + exe_state.complete("test_result") + executor.executables_with_state = [exe_state] + + result = executor._create_result() # noqa: SLF001 + + assert len(result.all) == 1 + assert result.all[0].status == BatchItemStatus.SUCCEEDED + assert result.all[0].result == "test_result" + assert result.all[0].error is None + assert result.all[0].index == 0 + + +def test_create_result_failed_branch(): + """Test _create_result with FAILED status branch.""" + + class TestExecutor(ConcurrentExecutor): + def execute_item(self, child_context, executable): + return f"result_{executable.index}" + + executables = [Executable(0, lambda: "test")] + completion_config = CompletionConfig(min_successful=1) + + executor = TestExecutor( + executables=executables, + max_concurrency=1, + completion_config=completion_config, + sub_type_top="TOP", + sub_type_iteration="ITER", + name_prefix="test_", + serdes=None, + ) + + # Create executable with FAILED status + exe_state = ExecutableWithState(executables[0]) + test_error = ValueError("Test error message") + exe_state.fail(test_error) + executor.executables_with_state = [exe_state] + + result = executor._create_result() # noqa: SLF001 + + assert len(result.all) == 1 + assert result.all[0].status == BatchItemStatus.FAILED + assert result.all[0].result is None + assert result.all[0].error is not None + assert result.all[0].error.message == "Test error message" + assert result.all[0].error.type == "ValueError" + assert result.all[0].index == 0 + + +def test_create_result_pending_branch(): + """Test _create_result with PENDING status branch.""" + + class TestExecutor(ConcurrentExecutor): + def execute_item(self, child_context, executable): + return f"result_{executable.index}" + + executables = [Executable(0, lambda: "test")] + completion_config = CompletionConfig(min_successful=1) + + executor = TestExecutor( + executables=executables, + max_concurrency=1, + completion_config=completion_config, + sub_type_top="TOP", + sub_type_iteration="ITER", + name_prefix="test_", + serdes=None, + ) + + # Create executable with PENDING status (default state) + exe_state = ExecutableWithState(executables[0]) + # PENDING is the default state, no need to change it + executor.executables_with_state = [exe_state] + + result = executor._create_result() # noqa: SLF001 + + assert len(result.all) == 1 + assert result.all[0].status == BatchItemStatus.STARTED + assert result.all[0].result is None + assert result.all[0].error is None + assert result.all[0].index == 0 + # By default, if we've terminated the reasoning is failure tolerance exceeded + # according to the spec + assert result.completion_reason == CompletionReason.FAILURE_TOLERANCE_EXCEEDED + + +def test_create_result_running_branch(): + """Test _create_result with RUNNING status branch.""" + + class TestExecutor(ConcurrentExecutor): + def execute_item(self, child_context, executable): + return f"result_{executable.index}" + + executables = [Executable(0, lambda: "test")] + completion_config = CompletionConfig(min_successful=1) + + executor = TestExecutor( + executables=executables, + max_concurrency=1, + completion_config=completion_config, + sub_type_top="TOP", + sub_type_iteration="ITER", + name_prefix="test_", + serdes=None, + ) + + # Create executable with RUNNING status + exe_state = ExecutableWithState(executables[0]) + future = Future() + exe_state.run(future) + executor.executables_with_state = [exe_state] + + result = executor._create_result() # noqa: SLF001 + + assert len(result.all) == 1 + assert result.all[0].status == BatchItemStatus.STARTED + assert result.all[0].result is None + assert result.all[0].error is None + assert result.all[0].index == 0 + # By default, if we've terminated the reasoning is failure tolerance exceeded + # according to the spec + assert result.completion_reason == CompletionReason.FAILURE_TOLERANCE_EXCEEDED + + +def test_create_result_suspended_branch(): + """Test _create_result with SUSPENDED status branch.""" + + class TestExecutor(ConcurrentExecutor): + def execute_item(self, child_context, executable): + return f"result_{executable.index}" + + executables = [Executable(0, lambda: "test")] + completion_config = CompletionConfig(min_successful=1) + + executor = TestExecutor( + executables=executables, + max_concurrency=1, + completion_config=completion_config, + sub_type_top="TOP", + sub_type_iteration="ITER", + name_prefix="test_", + serdes=None, + ) + + # Create executable with SUSPENDED status + exe_state = ExecutableWithState(executables[0]) + exe_state.suspend() + executor.executables_with_state = [exe_state] + + result = executor._create_result() # noqa: SLF001 + + assert len(result.all) == 1 + assert result.all[0].status == BatchItemStatus.STARTED + assert result.all[0].result is None + assert result.all[0].error is None + assert result.all[0].index == 0 + # By default, if we've terminated the reasoning is failure tolerance exceeded + # according to the spec + assert result.completion_reason == CompletionReason.FAILURE_TOLERANCE_EXCEEDED + + +def test_create_result_suspended_with_timeout_branch(): + """Test _create_result with SUSPENDED_WITH_TIMEOUT status branch.""" + + class TestExecutor(ConcurrentExecutor): + def execute_item(self, child_context, executable): + return f"result_{executable.index}" + + executables = [Executable(0, lambda: "test")] + completion_config = CompletionConfig(min_successful=1) + + executor = TestExecutor( + executables=executables, + max_concurrency=1, + completion_config=completion_config, + sub_type_top="TOP", + sub_type_iteration="ITER", + name_prefix="test_", + serdes=None, + ) + + # Create executable with SUSPENDED_WITH_TIMEOUT status + exe_state = ExecutableWithState(executables[0]) + future_time = time.time() + 10 + exe_state.suspend_with_timeout(future_time) + executor.executables_with_state = [exe_state] + + result = executor._create_result() # noqa: SLF001 + + assert len(result.all) == 1 + assert result.all[0].status == BatchItemStatus.STARTED + assert result.all[0].result is None + assert result.all[0].error is None + assert result.all[0].index == 0 + # By default, if we've terminated the reasoning is failure tolerance exceeded + # according to the spec + assert result.completion_reason == CompletionReason.FAILURE_TOLERANCE_EXCEEDED + + +def test_create_result_mixed_statuses(): + """Test _create_result with mixed executable statuses covering all branches.""" + + class TestExecutor(ConcurrentExecutor): + def execute_item(self, child_context, executable): + return f"result_{executable.index}" + + executables = [ + Executable(0, lambda: "test0"), # Will be COMPLETED + Executable(1, lambda: "test1"), # Will be FAILED + Executable(2, lambda: "test2"), # Will be PENDING + Executable(3, lambda: "test3"), # Will be RUNNING + Executable(4, lambda: "test4"), # Will be SUSPENDED + Executable(5, lambda: "test5"), # Will be SUSPENDED_WITH_TIMEOUT + ] + completion_config = CompletionConfig(min_successful=1) + + executor = TestExecutor( + executables=executables, + max_concurrency=6, + completion_config=completion_config, + sub_type_top="TOP", + sub_type_iteration="ITER", + name_prefix="test_", + serdes=None, + ) + + # Create executables with different statuses + exe_states = [ExecutableWithState(exe) for exe in executables] + + # COMPLETED + exe_states[0].complete("completed_result") + + # FAILED + exe_states[1].fail(RuntimeError("Test failure")) + + # PENDING (default state, no change needed) + + # RUNNING + future = Future() + exe_states[3].run(future) + + # SUSPENDED + exe_states[4].suspend() + + # SUSPENDED_WITH_TIMEOUT + exe_states[5].suspend_with_timeout(time.time() + 10) + + executor.executables_with_state = exe_states + + result = executor._create_result() # noqa: SLF001 + + assert len(result.all) == 6 + + # Check COMPLETED -> SUCCEEDED + assert result.all[0].status == BatchItemStatus.SUCCEEDED + assert result.all[0].result == "completed_result" + assert result.all[0].error is None + + # Check FAILED -> FAILED + assert result.all[1].status == BatchItemStatus.FAILED + assert result.all[1].result is None + assert result.all[1].error is not None + assert result.all[1].error.message == "Test failure" + + # Check PENDING -> STARTED + assert result.all[2].status == BatchItemStatus.STARTED + assert result.all[2].result is None + assert result.all[2].error is None + + # Check RUNNING -> STARTED + assert result.all[3].status == BatchItemStatus.STARTED + assert result.all[3].result is None + assert result.all[3].error is None + + # Check SUSPENDED -> STARTED + assert result.all[4].status == BatchItemStatus.STARTED + assert result.all[4].result is None + assert result.all[4].error is None + + # Check SUSPENDED_WITH_TIMEOUT -> STARTED + assert result.all[5].status == BatchItemStatus.STARTED + assert result.all[5].result is None + assert result.all[5].error is None + + # we've a min succ set to 1. + assert result.completion_reason == CompletionReason.MIN_SUCCESSFUL_REACHED + + +def test_create_result_multiple_completed(): + """Test _create_result with multiple COMPLETED executables.""" + + class TestExecutor(ConcurrentExecutor): + def execute_item(self, child_context, executable): + return f"result_{executable.index}" + + executables = [ + Executable(0, lambda: "test0"), + Executable(1, lambda: "test1"), + Executable(2, lambda: "test2"), + ] + completion_config = CompletionConfig(min_successful=3) + + executor = TestExecutor( + executables=executables, + max_concurrency=3, + completion_config=completion_config, + sub_type_top="TOP", + sub_type_iteration="ITER", + name_prefix="test_", + serdes=None, + ) + + # Create all executables with COMPLETED status + exe_states = [ExecutableWithState(exe) for exe in executables] + exe_states[0].complete("result_0") + exe_states[1].complete("result_1") + exe_states[2].complete("result_2") + + executor.executables_with_state = exe_states + + result = executor._create_result() # noqa: SLF001 + + assert len(result.all) == 3 + assert all(item.status == BatchItemStatus.SUCCEEDED for item in result.all) + assert result.all[0].result == "result_0" + assert result.all[1].result == "result_1" + assert result.all[2].result == "result_2" + assert result.completion_reason == CompletionReason.ALL_COMPLETED + + +def test_create_result_multiple_failed(): + """Test _create_result with multiple FAILED executables.""" + + class TestExecutor(ConcurrentExecutor): + def execute_item(self, child_context, executable): + return f"result_{executable.index}" + + executables = [ + Executable(0, lambda: "test0"), + Executable(1, lambda: "test1"), + Executable(2, lambda: "test2"), + ] + completion_config = CompletionConfig(min_successful=1) + + executor = TestExecutor( + executables=executables, + max_concurrency=3, + completion_config=completion_config, + sub_type_top="TOP", + sub_type_iteration="ITER", + name_prefix="test_", + serdes=None, + ) + + # Create all executables with FAILED status + exe_states = [ExecutableWithState(exe) for exe in executables] + exe_states[0].fail(ValueError("Error 0")) + exe_states[1].fail(RuntimeError("Error 1")) + exe_states[2].fail(TypeError("Error 2")) + + executor.executables_with_state = exe_states + + result = executor._create_result() # noqa: SLF001 + + assert len(result.all) == 3 + assert all(item.status == BatchItemStatus.FAILED for item in result.all) + assert result.all[0].error.message == "Error 0" + assert result.all[1].error.message == "Error 1" + assert result.all[2].error.message == "Error 2" + assert result.completion_reason == CompletionReason.ALL_COMPLETED + + +def test_create_result_multiple_started_states(): + """Test _create_result with multiple executables in STARTED states.""" + + class TestExecutor(ConcurrentExecutor): + def execute_item(self, child_context, executable): + return f"result_{executable.index}" + + executables = [ + Executable(0, lambda: "test0"), # PENDING + Executable(1, lambda: "test1"), # RUNNING + Executable(2, lambda: "test2"), # SUSPENDED + Executable(3, lambda: "test3"), # SUSPENDED_WITH_TIMEOUT + ] + completion_config = CompletionConfig(min_successful=1) + + executor = TestExecutor( + executables=executables, + max_concurrency=4, + completion_config=completion_config, + sub_type_top="TOP", + sub_type_iteration="ITER", + name_prefix="test_", + serdes=None, + ) + + # Create executables with different STARTED states + exe_states = [ExecutableWithState(exe) for exe in executables] + + # PENDING (default state) + + # RUNNING + future = Future() + exe_states[1].run(future) + + # SUSPENDED + exe_states[2].suspend() + + # SUSPENDED_WITH_TIMEOUT + exe_states[3].suspend_with_timeout(time.time() + 5) + + executor.executables_with_state = exe_states + + result = executor._create_result() # noqa: SLF001 + + assert len(result.all) == 4 + assert all(item.status == BatchItemStatus.STARTED for item in result.all) + assert all(item.result is None for item in result.all) + assert all(item.error is None for item in result.all) + # With completion config min_successful=1 and no completed items, + # this should be FAILURE_TOLERANCE_EXCEEDED + assert result.completion_reason == CompletionReason.FAILURE_TOLERANCE_EXCEEDED + + +def test_create_result_empty_executables(): + """Test _create_result with no executables.""" + + class TestExecutor(ConcurrentExecutor): + def execute_item(self, child_context, executable): + return f"result_{executable.index}" + + executables = [] + completion_config = CompletionConfig(min_successful=0) + + executor = TestExecutor( + executables=executables, + max_concurrency=1, + completion_config=completion_config, + sub_type_top="TOP", + sub_type_iteration="ITER", + name_prefix="test_", + serdes=None, + ) + + executor.executables_with_state = [] + + result = executor._create_result() # noqa: SLF001 + + assert len(result.all) == 0 + assert result.completion_reason == CompletionReason.ALL_COMPLETED + + def test_timer_scheduler_future_time_condition_false(): """Test TimerScheduler when scheduled time is in future (434->433 branch).""" callback = Mock()