From f3a14cb3f2e43cc5bf897c16cb87deecf3473a2b Mon Sep 17 00:00:00 2001 From: Quinn Sinclair Date: Thu, 23 Oct 2025 23:29:24 +0000 Subject: [PATCH] fix: Migrate datetime from float to datetime object - Replace float for timestamps with actual datetime objects as those are given back by boto3 --- .../concurrency.py | 26 +++---- .../exceptions.py | 18 ++--- .../operation/wait.py | 4 +- tests/concurrency_test.py | 67 +++++++++++-------- tests/exceptions_test.py | 47 ++++++++----- 5 files changed, 93 insertions(+), 69 deletions(-) diff --git a/src/aws_durable_execution_sdk_python/concurrency.py b/src/aws_durable_execution_sdk_python/concurrency.py index b7ddc24..6ad4cf4 100644 --- a/src/aws_durable_execution_sdk_python/concurrency.py +++ b/src/aws_durable_execution_sdk_python/concurrency.py @@ -5,11 +5,11 @@ import heapq import logging import threading -import time from abc import ABC, abstractmethod from collections import Counter from concurrent.futures import Future, ThreadPoolExecutor from dataclasses import dataclass +from datetime import UTC, datetime from enum import Enum from typing import TYPE_CHECKING, Generic, Self, TypeVar @@ -258,7 +258,7 @@ def __init__(self, executable: Executable[CallableType]): self.executable = executable self._status = BranchStatus.PENDING self._future: Future | None = None - self._suspend_until: float | None = None + self._suspend_until: datetime | None = None self._result: ResultType = None # type: ignore[assignment] self._is_result_set: bool = False self._error: Exception | None = None @@ -293,7 +293,7 @@ def error(self) -> Exception: return self._error @property - def suspend_until(self) -> float | None: + def suspend_until(self) -> datetime | None: """Get suspend timestamp.""" return self._suspend_until @@ -308,7 +308,7 @@ def can_resume(self) -> bool: return self._status is BranchStatus.SUSPENDED or ( self._status is BranchStatus.SUSPENDED_WITH_TIMEOUT and self._suspend_until is not None - and time.time() >= self._suspend_until + and datetime.now(UTC) >= self._suspend_until ) @property @@ -333,7 +333,7 @@ def suspend(self) -> None: self._status = BranchStatus.SUSPENDED self._suspend_until = None - def suspend_with_timeout(self, timestamp: float) -> None: + def suspend_with_timeout(self, timestamp: datetime) -> None: """Transition to SUSPENDED_WITH_TIMEOUT state.""" self._status = BranchStatus.SUSPENDED_WITH_TIMEOUT self._suspend_until = timestamp @@ -507,11 +507,11 @@ def __exit__(self, exc_type, exc_val, exc_tb) -> None: self.shutdown() def schedule_resume( - self, exe_state: ExecutableWithState, resume_time: float + self, exe_state: ExecutableWithState, resume_time: datetime ) -> None: """Schedule a task to resume at the specified time.""" with self._lock: - heapq.heappush(self._pending_resumes, (resume_time, exe_state)) + heapq.heappush(self._pending_resumes, (resume_time.timestamp(), exe_state)) def shutdown(self) -> None: """Shutdown the timer thread and cancel all pending resumes.""" @@ -534,7 +534,7 @@ def _timer_loop(self) -> None: self._shutdown.wait(timeout=0.1) continue - current_time = time.time() + current_time = datetime.now(UTC).timestamp() if current_time >= next_resume_time: # Time to resume with self._lock: @@ -675,7 +675,7 @@ def on_done(future: Future) -> None: def should_execution_suspend(self) -> SuspendResult: """Check if execution should suspend.""" - earliest_timestamp: float = float("inf") + earliest_timestamp: datetime | None = None indefinite_suspend_task: ( ExecutableWithState[CallableType, ResultType] | None ) = None @@ -685,16 +685,16 @@ def should_execution_suspend(self) -> SuspendResult: # Exit here! Still have tasks that can make progress, don't suspend. return SuspendResult.do_not_suspend() if exe_state.status is BranchStatus.SUSPENDED_WITH_TIMEOUT: - if ( - exe_state.suspend_until - and exe_state.suspend_until < earliest_timestamp + if exe_state.suspend_until and ( + earliest_timestamp is None + or exe_state.suspend_until < earliest_timestamp ): earliest_timestamp = exe_state.suspend_until elif exe_state.status is BranchStatus.SUSPENDED: indefinite_suspend_task = exe_state # All tasks are in final states and at least one of them is a suspend. - if earliest_timestamp != float("inf"): + if earliest_timestamp is not None: return SuspendResult.suspend( TimedSuspendExecution( "All concurrent work complete or suspended pending retry.", diff --git a/src/aws_durable_execution_sdk_python/exceptions.py b/src/aws_durable_execution_sdk_python/exceptions.py index 46f04e9..0a2bca8 100644 --- a/src/aws_durable_execution_sdk_python/exceptions.py +++ b/src/aws_durable_execution_sdk_python/exceptions.py @@ -5,12 +5,8 @@ from __future__ import annotations -import time from dataclasses import dataclass -from typing import TYPE_CHECKING - -if TYPE_CHECKING: - import datetime +from datetime import UTC, datetime, timedelta class DurableExecutionsError(Exception): @@ -75,10 +71,10 @@ class TimedSuspendExecution(SuspendExecution): This is a specialized form of SuspendExecution that includes a scheduled resume time. Attributes: - scheduled_timestamp (float): Unix timestamp in seconds at which to resume. + scheduled_timestamp (datetime): DateTime at which to resume. """ - def __init__(self, message: str, scheduled_timestamp: float): + def __init__(self, message: str, scheduled_timestamp: datetime): super().__init__(message) self.scheduled_timestamp = scheduled_timestamp @@ -97,23 +93,23 @@ def from_delay(cls, message: str, delay_seconds: int) -> TimedSuspendExecution: >>> exception = TimedSuspendExecution.from_delay("Waiting for callback", 30) >>> # Will suspend for 30 seconds from now """ - resume_time = time.time() + delay_seconds + resume_time = datetime.now(UTC) + timedelta(seconds=delay_seconds) return cls(message, scheduled_timestamp=resume_time) @classmethod def from_datetime( - cls, message: str, datetime_timestamp: datetime.datetime + cls, message: str, datetime_timestamp: datetime ) -> TimedSuspendExecution: """Create a timed suspension with the delay calculated from now. Args: message: Descriptive message for the suspension - datetime_timestamp: Unix datetime timestamp in seconds at which to resume + datetime_timestamp: DateTime timestamp at which to resume Returns: TimedSuspendExecution: Instance with calculated resume time """ - return cls(message, scheduled_timestamp=datetime_timestamp.timestamp()) + return cls(message, scheduled_timestamp=datetime_timestamp) class OrderedLockError(DurableExecutionsError): diff --git a/src/aws_durable_execution_sdk_python/operation/wait.py b/src/aws_durable_execution_sdk_python/operation/wait.py index 9890e33..6e14c2a 100644 --- a/src/aws_durable_execution_sdk_python/operation/wait.py +++ b/src/aws_durable_execution_sdk_python/operation/wait.py @@ -3,7 +3,7 @@ from __future__ import annotations import logging -import time +from datetime import UTC, datetime, timedelta from typing import TYPE_CHECKING from aws_durable_execution_sdk_python.exceptions import TimedSuspendExecution @@ -48,6 +48,6 @@ def wait_handler( state.create_checkpoint(operation_update=operation) # Calculate when to resume - resume_time = time.time() + seconds + resume_time = datetime.now(UTC) + timedelta(seconds=seconds) msg = f"Wait for {seconds} seconds" raise TimedSuspendExecution(msg, scheduled_timestamp=resume_time) diff --git a/tests/concurrency_test.py b/tests/concurrency_test.py index d3d090b..189cc39 100644 --- a/tests/concurrency_test.py +++ b/tests/concurrency_test.py @@ -3,6 +3,7 @@ import threading import time from concurrent.futures import Future +from datetime import UTC, datetime, timedelta from unittest.mock import Mock, patch import pytest @@ -607,12 +608,12 @@ def test_executable_with_state_can_resume(): assert exe_state.can_resume # Suspended with timeout in future - future_time = time.time() + 10 + future_time = datetime.now(UTC) + timedelta(seconds=10) exe_state.suspend_with_timeout(future_time) assert not exe_state.can_resume # Suspended with timeout in past - past_time = time.time() - 10 + past_time = datetime.now(UTC) - timedelta(seconds=10) exe_state.suspend_with_timeout(past_time) assert exe_state.can_resume @@ -655,7 +656,7 @@ def test_executable_with_state_suspend_with_timeout(): """Test ExecutableWithState suspend_with_timeout method.""" executable = Executable(index=1, func=lambda: "test") exe_state = ExecutableWithState(executable) - timestamp = time.time() + 5 + timestamp = datetime.now(UTC) + timedelta(seconds=5) exe_state.suspend_with_timeout(timestamp) assert exe_state.status == BranchStatus.SUSPENDED_WITH_TIMEOUT @@ -930,8 +931,8 @@ def test_timer_scheduler_double_check_resume_queue(): exe_state2 = ExecutableWithState(Executable(1, lambda: "test")) # Schedule two tasks with different times to avoid comparison issues - past_time1 = time.time() - 2 - past_time2 = time.time() - 1 + past_time1 = datetime.now(UTC) - timedelta(seconds=2) + past_time2 = datetime.now(UTC) - timedelta(seconds=1) scheduler.schedule_resume(exe_state1, past_time1) scheduler.schedule_resume(exe_state2, past_time2) @@ -968,7 +969,9 @@ def execute_item(self, child_context, executable): exe_state = ExecutableWithState(executables[0]) future = Mock() - future.result.side_effect = TimedSuspendExecution("test message", time.time() + 1) + future.result.side_effect = TimedSuspendExecution( + "test message", datetime.now(UTC) + timedelta(seconds=1) + ) scheduler = Mock() scheduler.schedule_resume = Mock() @@ -1192,7 +1195,9 @@ def test_single_task_suspend_bubbles_up(): class TestExecutor(ConcurrentExecutor): def execute_item(self, child_context, executable): msg = "test" - raise TimedSuspendExecution(msg, time.time() + 1) # Future time + raise TimedSuspendExecution( + msg, datetime.now(UTC) + timedelta(seconds=1) + ) # Future time executables = [Executable(0, lambda: "test")] completion_config = CompletionConfig( @@ -1235,7 +1240,9 @@ def execute_item(self, child_context, executable): if executable.index == 0: # Task A self.task_a_suspended.set() msg = "test" - raise TimedSuspendExecution(msg, time.time() + 1) # Future time + raise TimedSuspendExecution( + msg, datetime.now(UTC) + timedelta(seconds=1) + ) # Future time # Task B # Wait for Task A to suspend first self.task_a_suspended.wait(timeout=2.0) @@ -1281,7 +1288,9 @@ def __init__(self, *args, **kwargs): def execute_item(self, child_context, executable): self.call_count += 1 msg = "test" - raise TimedSuspendExecution(msg, time.time() + 10) # Future time + raise TimedSuspendExecution( + msg, datetime.now(UTC) + timedelta(seconds=10) + ) # Future time executables = [Executable(0, lambda: "test")] completion_config = CompletionConfig( @@ -1339,11 +1348,15 @@ def execute_item(self, child_context, executable): if call_count == 1: # First call: immediate resubmit (past timestamp) msg = "immediate" - raise TimedSuspendExecution(msg, time.time() - 1) + raise TimedSuspendExecution( + msg, datetime.now(UTC) - timedelta(seconds=1) + ) if call_count == 2: # Second call: short delay resubmit msg = "short_delay" - raise TimedSuspendExecution(msg, time.time() + 0.2) + raise TimedSuspendExecution( + msg, datetime.now(UTC) + timedelta(seconds=0.2) + ) # Third call: complete successfully result = "result_B" self.task_b_can_complete.set() @@ -1401,7 +1414,7 @@ def test_timer_scheduler_double_check_condition(): exe_state.suspend() # Make it resumable # Schedule a task with past time - past_time = time.time() - 1 + past_time = datetime.now(UTC) - timedelta(seconds=1) scheduler.schedule_resume(exe_state, past_time) # Give scheduler time to process and hit the double-check condition @@ -1437,7 +1450,7 @@ def execute_item(self, child_context, executable): # Create executable with state in SUSPENDED_WITH_TIMEOUT exe_state = ExecutableWithState(executables[0]) - future_time = time.time() + 10 + future_time = datetime.now(UTC) + timedelta(seconds=10) exe_state.suspend_with_timeout(future_time) executor.executables_with_state = [exe_state] @@ -1539,7 +1552,7 @@ def test_timer_scheduler_can_resume_false(): exe_state.complete("done") # Schedule with past time - past_time = time.time() - 1 + past_time = datetime.now(UTC) - timedelta(seconds=1) scheduler.schedule_resume(exe_state, past_time) # Give scheduler time to process @@ -1577,7 +1590,7 @@ def execute_item(self, child_context, executable): exe_state1 = ExecutableWithState(executables[0]) exe_state2 = ExecutableWithState(executables[1]) - future_time = time.time() + 5 + future_time = datetime.now(UTC) + timedelta(seconds=5) exe_state1.suspend_with_timeout(future_time) exe_state2.suspend() # Indefinite @@ -1618,8 +1631,8 @@ def execute_item(self, child_context, executable): exe_state1 = ExecutableWithState(executables[0]) exe_state2 = ExecutableWithState(executables[1]) - later_time = time.time() + 10 - earlier_time = time.time() + 5 + later_time = datetime.now(UTC) + timedelta(seconds=10) + earlier_time = datetime.now(UTC) + timedelta(seconds=5) exe_state1.suspend_with_timeout(later_time) exe_state2.suspend_with_timeout(earlier_time) @@ -1646,14 +1659,14 @@ def test_timer_scheduler_double_check_condition_race(): exe_state2.suspend() # Schedule first task with past time - past_time = time.time() - 1 + past_time = datetime.now(UTC) - timedelta(seconds=1) scheduler.schedule_resume(exe_state1, past_time) # Brief delay to let timer thread see the first task time.sleep(0.05) # Schedule second task with even more past time (will be heap[0]) - very_past_time = time.time() - 2 + very_past_time = datetime.now(UTC) - timedelta(seconds=2) scheduler.schedule_resume(exe_state2, very_past_time) # Wait for processing @@ -1695,9 +1708,9 @@ def execute_item(self, child_context, executable): exe_state2 = ExecutableWithState(executables[1]) exe_state3 = ExecutableWithState(executables[2]) - time1 = time.time() + 10 - time2 = time.time() + 5 # Earliest - time3 = time.time() + 15 + time1 = datetime.now(UTC) + timedelta(seconds=10) + time2 = datetime.now(UTC) + timedelta(seconds=5) # Earliest + time3 = datetime.now(UTC) + timedelta(seconds=15) exe_state1.suspend_with_timeout(time1) exe_state2.suspend_with_timeout(time2) @@ -1762,7 +1775,7 @@ def test_timer_scheduler_cannot_resume_branch(): exe_state.complete("done") # Schedule with past time - past_time = time.time() - 1 + past_time = datetime.now(UTC) - timedelta(seconds=1) scheduler.schedule_resume(exe_state, past_time) # Wait for processing @@ -2057,7 +2070,7 @@ def execute_item(self, child_context, executable): # Create executable with SUSPENDED_WITH_TIMEOUT status exe_state = ExecutableWithState(executables[0]) - future_time = time.time() + 10 + future_time = datetime.now(UTC) + timedelta(seconds=10) exe_state.suspend_with_timeout(future_time) executor.executables_with_state = [exe_state] @@ -2119,7 +2132,7 @@ def execute_item(self, child_context, executable): exe_states[4].suspend() # SUSPENDED_WITH_TIMEOUT - exe_states[5].suspend_with_timeout(time.time() + 10) + exe_states[5].suspend_with_timeout(datetime.now(UTC) + timedelta(seconds=10)) executor.executables_with_state = exe_states @@ -2284,7 +2297,7 @@ def execute_item(self, child_context, executable): exe_states[2].suspend() # SUSPENDED_WITH_TIMEOUT - exe_states[3].suspend_with_timeout(time.time() + 5) + exe_states[3].suspend_with_timeout(datetime.now(UTC) + timedelta(seconds=5)) executor.executables_with_state = exe_states @@ -2336,7 +2349,7 @@ def test_timer_scheduler_future_time_condition_false(): exe_state.suspend() # Schedule with future time so condition will be False - future_time = time.time() + 10 + future_time = datetime.now(UTC) + timedelta(seconds=10) scheduler.schedule_resume(exe_state, future_time) # Wait briefly for timer thread to check and find condition False diff --git a/tests/exceptions_test.py b/tests/exceptions_test.py index 32d2ea6..ed61348 100644 --- a/tests/exceptions_test.py +++ b/tests/exceptions_test.py @@ -1,6 +1,6 @@ """Unit tests for exceptions module.""" -import time +from datetime import UTC, datetime, timedelta from unittest.mock import patch import pytest @@ -130,7 +130,7 @@ def test_callable_runtime_error_serializable_details_frozen(): def test_timed_suspend_execution(): """Test TimedSuspendExecution exception.""" - scheduled_time = 1234567890.0 + scheduled_time = datetime.now(UTC) error = TimedSuspendExecution("timed suspend", scheduled_time) assert str(error) == "timed suspend" assert error.scheduled_timestamp == scheduled_time @@ -143,12 +143,16 @@ def test_timed_suspend_execution_from_delay(): message = "Waiting for callback" delay_seconds = 30 - # Mock time.time() to get predictable results - with patch("time.time", return_value=1000.0): + # Mock datetime.now() to get predictable results + mock_now = datetime(2023, 1, 1, 12, 0, 0, tzinfo=UTC) + with patch("aws_durable_execution_sdk_python.exceptions.datetime") as mock_datetime: + mock_datetime.now.return_value = mock_now + mock_datetime.side_effect = datetime error = TimedSuspendExecution.from_delay(message, delay_seconds) assert str(error) == message - assert error.scheduled_timestamp == 1030.0 # 1000.0 + 30 + expected_time = mock_now + timedelta(seconds=30) + assert error.scheduled_timestamp == expected_time assert isinstance(error, TimedSuspendExecution) assert isinstance(error, SuspendExecution) @@ -158,11 +162,14 @@ def test_timed_suspend_execution_from_delay_zero_delay(): message = "Immediate suspension" delay_seconds = 0 - with patch("time.time", return_value=500.0): + mock_now = datetime(2023, 1, 1, 12, 0, 0, tzinfo=UTC) + with patch("aws_durable_execution_sdk_python.exceptions.datetime") as mock_datetime: + mock_datetime.now.return_value = mock_now + mock_datetime.side_effect = datetime error = TimedSuspendExecution.from_delay(message, delay_seconds) assert str(error) == message - assert error.scheduled_timestamp == 500.0 # 500.0 + 0 + assert error.scheduled_timestamp == mock_now # no delay added assert isinstance(error, TimedSuspendExecution) @@ -171,11 +178,15 @@ def test_timed_suspend_execution_from_delay_negative_delay(): message = "Past suspension" delay_seconds = -10 - with patch("time.time", return_value=100.0): + mock_now = datetime(2023, 1, 1, 12, 0, 0, tzinfo=UTC) + with patch("aws_durable_execution_sdk_python.exceptions.datetime") as mock_datetime: + mock_datetime.now.return_value = mock_now + mock_datetime.side_effect = datetime error = TimedSuspendExecution.from_delay(message, delay_seconds) assert str(error) == message - assert error.scheduled_timestamp == 90.0 # 100.0 + (-10) + expected_time = mock_now + timedelta(seconds=-10) + assert error.scheduled_timestamp == expected_time assert isinstance(error, TimedSuspendExecution) @@ -184,11 +195,15 @@ def test_timed_suspend_execution_from_delay_large_delay(): message = "Long suspension" delay_seconds = 3600 # 1 hour - with patch("time.time", return_value=0.0): + mock_now = datetime(2023, 1, 1, 12, 0, 0, tzinfo=UTC) + with patch("aws_durable_execution_sdk_python.exceptions.datetime") as mock_datetime: + mock_datetime.now.return_value = mock_now + mock_datetime.side_effect = datetime error = TimedSuspendExecution.from_delay(message, delay_seconds) assert str(error) == message - assert error.scheduled_timestamp == 3600.0 # 0.0 + 3600 + expected_time = mock_now + timedelta(seconds=3600) + assert error.scheduled_timestamp == expected_time assert isinstance(error, TimedSuspendExecution) @@ -197,15 +212,15 @@ def test_timed_suspend_execution_from_delay_calculation_accuracy(): message = "Accurate timing test" delay_seconds = 42 - # Test with actual time.time() to ensure the calculation works in real scenarios - before_time = time.time() + # Test with actual datetime.now() to ensure the calculation works in real scenarios + before_time = datetime.now(UTC) error = TimedSuspendExecution.from_delay(message, delay_seconds) - after_time = time.time() + after_time = datetime.now(UTC) # The scheduled timestamp should be within a reasonable range # (accounting for the small time difference between calls) - expected_min = before_time + delay_seconds - expected_max = after_time + delay_seconds + expected_min = before_time + timedelta(seconds=delay_seconds) + expected_max = after_time + timedelta(seconds=delay_seconds) assert expected_min <= error.scheduled_timestamp <= expected_max assert str(error) == message