diff --git a/src/aws_durable_execution_sdk_python/operation/invoke.py b/src/aws_durable_execution_sdk_python/operation/invoke.py index fa1ff78..597f192 100644 --- a/src/aws_durable_execution_sdk_python/operation/invoke.py +++ b/src/aws_durable_execution_sdk_python/operation/invoke.py @@ -61,7 +61,11 @@ def invoke_handler( ) return None # type: ignore - if checkpointed_result.is_failed() or checkpointed_result.is_timed_out(): + if ( + checkpointed_result.is_failed() + or checkpointed_result.is_timed_out() + or checkpointed_result.is_stopped() + ): # Operation failed, throw the exact same error on replay as the checkpointed failure checkpointed_result.raise_callable_error() diff --git a/src/aws_durable_execution_sdk_python/state.py b/src/aws_durable_execution_sdk_python/state.py index d529b1d..ec32789 100644 --- a/src/aws_durable_execution_sdk_python/state.py +++ b/src/aws_durable_execution_sdk_python/state.py @@ -93,6 +93,14 @@ def is_failed(self) -> bool: return op.status is OperationStatus.FAILED + def is_stopped(self) -> bool: + """Return True if the checkpointed operation is STOPPED""" + op = self.operation + if not op: + return False + + return op.status is OperationStatus.STOPPED + def is_started(self) -> bool: """Return True if the checkpointed operation is STARTED.""" op = self.operation diff --git a/tests/operation/invoke_test.py b/tests/operation/invoke_test.py index 738b2dd..40b9e74 100644 --- a/tests/operation/invoke_test.py +++ b/tests/operation/invoke_test.py @@ -107,7 +107,10 @@ def test_invoke_handler_already_succeeded_no_chained_invoke_details(): assert result is None -def test_invoke_handler_already_failed(): +@pytest.mark.parametrize( + "kind", [OperationStatus.FAILED, OperationStatus.STOPPED, OperationStatus.TIMED_OUT] +) +def test_invoke_handler_already_terminated(kind: OperationStatus): """Test invoke_handler when operation already failed.""" mock_state = Mock(spec=ExecutionState) mock_state.durable_execution_arn = "test_arn" @@ -118,7 +121,7 @@ def test_invoke_handler_already_failed(): operation = Operation( operation_id="invoke4", operation_type=OperationType.CHAINED_INVOKE, - status=OperationStatus.FAILED, + status=kind, chained_invoke_details=ChainedInvokeDetails(error=error), ) mock_result = CheckpointedResult.create_from_operation(operation)