From 984f49a361918f0958455da65c8ebd2b9d4fc355 Mon Sep 17 00:00:00 2001 From: Quinn Sinclair Date: Fri, 24 Oct 2025 21:09:56 +0000 Subject: [PATCH] Revert "fix: Migrate datetime from float to datetime object" This reverts commit 11715212e04cdf88f4e8a571c1bbb1594d88db23. --- .../concurrency.py | 26 +++---- .../exceptions.py | 18 +++-- .../operation/wait.py | 4 +- tests/concurrency_test.py | 67 ++++++++----------- tests/exceptions_test.py | 47 +++++-------- 5 files changed, 69 insertions(+), 93 deletions(-) diff --git a/src/aws_durable_execution_sdk_python/concurrency.py b/src/aws_durable_execution_sdk_python/concurrency.py index 6ad4cf4..b7ddc24 100644 --- a/src/aws_durable_execution_sdk_python/concurrency.py +++ b/src/aws_durable_execution_sdk_python/concurrency.py @@ -5,11 +5,11 @@ import heapq import logging import threading +import time from abc import ABC, abstractmethod from collections import Counter from concurrent.futures import Future, ThreadPoolExecutor from dataclasses import dataclass -from datetime import UTC, datetime from enum import Enum from typing import TYPE_CHECKING, Generic, Self, TypeVar @@ -258,7 +258,7 @@ def __init__(self, executable: Executable[CallableType]): self.executable = executable self._status = BranchStatus.PENDING self._future: Future | None = None - self._suspend_until: datetime | None = None + self._suspend_until: float | None = None self._result: ResultType = None # type: ignore[assignment] self._is_result_set: bool = False self._error: Exception | None = None @@ -293,7 +293,7 @@ def error(self) -> Exception: return self._error @property - def suspend_until(self) -> datetime | None: + def suspend_until(self) -> float | None: """Get suspend timestamp.""" return self._suspend_until @@ -308,7 +308,7 @@ def can_resume(self) -> bool: return self._status is BranchStatus.SUSPENDED or ( self._status is BranchStatus.SUSPENDED_WITH_TIMEOUT and self._suspend_until is not None - and datetime.now(UTC) >= self._suspend_until + and time.time() >= self._suspend_until ) @property @@ -333,7 +333,7 @@ def suspend(self) -> None: self._status = BranchStatus.SUSPENDED self._suspend_until = None - def suspend_with_timeout(self, timestamp: datetime) -> None: + def suspend_with_timeout(self, timestamp: float) -> None: """Transition to SUSPENDED_WITH_TIMEOUT state.""" self._status = BranchStatus.SUSPENDED_WITH_TIMEOUT self._suspend_until = timestamp @@ -507,11 +507,11 @@ def __exit__(self, exc_type, exc_val, exc_tb) -> None: self.shutdown() def schedule_resume( - self, exe_state: ExecutableWithState, resume_time: datetime + self, exe_state: ExecutableWithState, resume_time: float ) -> None: """Schedule a task to resume at the specified time.""" with self._lock: - heapq.heappush(self._pending_resumes, (resume_time.timestamp(), exe_state)) + heapq.heappush(self._pending_resumes, (resume_time, exe_state)) def shutdown(self) -> None: """Shutdown the timer thread and cancel all pending resumes.""" @@ -534,7 +534,7 @@ def _timer_loop(self) -> None: self._shutdown.wait(timeout=0.1) continue - current_time = datetime.now(UTC).timestamp() + current_time = time.time() if current_time >= next_resume_time: # Time to resume with self._lock: @@ -675,7 +675,7 @@ def on_done(future: Future) -> None: def should_execution_suspend(self) -> SuspendResult: """Check if execution should suspend.""" - earliest_timestamp: datetime | None = None + earliest_timestamp: float = float("inf") indefinite_suspend_task: ( ExecutableWithState[CallableType, ResultType] | None ) = None @@ -685,16 +685,16 @@ def should_execution_suspend(self) -> SuspendResult: # Exit here! Still have tasks that can make progress, don't suspend. return SuspendResult.do_not_suspend() if exe_state.status is BranchStatus.SUSPENDED_WITH_TIMEOUT: - if exe_state.suspend_until and ( - earliest_timestamp is None - or exe_state.suspend_until < earliest_timestamp + if ( + exe_state.suspend_until + and exe_state.suspend_until < earliest_timestamp ): earliest_timestamp = exe_state.suspend_until elif exe_state.status is BranchStatus.SUSPENDED: indefinite_suspend_task = exe_state # All tasks are in final states and at least one of them is a suspend. - if earliest_timestamp is not None: + if earliest_timestamp != float("inf"): return SuspendResult.suspend( TimedSuspendExecution( "All concurrent work complete or suspended pending retry.", diff --git a/src/aws_durable_execution_sdk_python/exceptions.py b/src/aws_durable_execution_sdk_python/exceptions.py index caed701..b9657af 100644 --- a/src/aws_durable_execution_sdk_python/exceptions.py +++ b/src/aws_durable_execution_sdk_python/exceptions.py @@ -5,9 +5,13 @@ from __future__ import annotations +import time from dataclasses import dataclass -from datetime import UTC, datetime, timedelta from enum import Enum +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + import datetime class TerminationReason(Enum): @@ -146,10 +150,10 @@ class TimedSuspendExecution(SuspendExecution): This is a specialized form of SuspendExecution that includes a scheduled resume time. Attributes: - scheduled_timestamp (datetime): DateTime at which to resume. + scheduled_timestamp (float): Unix timestamp in seconds at which to resume. """ - def __init__(self, message: str, scheduled_timestamp: datetime): + def __init__(self, message: str, scheduled_timestamp: float): super().__init__(message) self.scheduled_timestamp = scheduled_timestamp @@ -168,23 +172,23 @@ def from_delay(cls, message: str, delay_seconds: int) -> TimedSuspendExecution: >>> exception = TimedSuspendExecution.from_delay("Waiting for callback", 30) >>> # Will suspend for 30 seconds from now """ - resume_time = datetime.now(UTC) + timedelta(seconds=delay_seconds) + resume_time = time.time() + delay_seconds return cls(message, scheduled_timestamp=resume_time) @classmethod def from_datetime( - cls, message: str, datetime_timestamp: datetime + cls, message: str, datetime_timestamp: datetime.datetime ) -> TimedSuspendExecution: """Create a timed suspension with the delay calculated from now. Args: message: Descriptive message for the suspension - datetime_timestamp: DateTime timestamp at which to resume + datetime_timestamp: Unix datetime timestamp in seconds at which to resume Returns: TimedSuspendExecution: Instance with calculated resume time """ - return cls(message, scheduled_timestamp=datetime_timestamp) + return cls(message, scheduled_timestamp=datetime_timestamp.timestamp()) class OrderedLockError(DurableExecutionsError): diff --git a/src/aws_durable_execution_sdk_python/operation/wait.py b/src/aws_durable_execution_sdk_python/operation/wait.py index 6e14c2a..9890e33 100644 --- a/src/aws_durable_execution_sdk_python/operation/wait.py +++ b/src/aws_durable_execution_sdk_python/operation/wait.py @@ -3,7 +3,7 @@ from __future__ import annotations import logging -from datetime import UTC, datetime, timedelta +import time from typing import TYPE_CHECKING from aws_durable_execution_sdk_python.exceptions import TimedSuspendExecution @@ -48,6 +48,6 @@ def wait_handler( state.create_checkpoint(operation_update=operation) # Calculate when to resume - resume_time = datetime.now(UTC) + timedelta(seconds=seconds) + resume_time = time.time() + seconds msg = f"Wait for {seconds} seconds" raise TimedSuspendExecution(msg, scheduled_timestamp=resume_time) diff --git a/tests/concurrency_test.py b/tests/concurrency_test.py index 189cc39..d3d090b 100644 --- a/tests/concurrency_test.py +++ b/tests/concurrency_test.py @@ -3,7 +3,6 @@ import threading import time from concurrent.futures import Future -from datetime import UTC, datetime, timedelta from unittest.mock import Mock, patch import pytest @@ -608,12 +607,12 @@ def test_executable_with_state_can_resume(): assert exe_state.can_resume # Suspended with timeout in future - future_time = datetime.now(UTC) + timedelta(seconds=10) + future_time = time.time() + 10 exe_state.suspend_with_timeout(future_time) assert not exe_state.can_resume # Suspended with timeout in past - past_time = datetime.now(UTC) - timedelta(seconds=10) + past_time = time.time() - 10 exe_state.suspend_with_timeout(past_time) assert exe_state.can_resume @@ -656,7 +655,7 @@ def test_executable_with_state_suspend_with_timeout(): """Test ExecutableWithState suspend_with_timeout method.""" executable = Executable(index=1, func=lambda: "test") exe_state = ExecutableWithState(executable) - timestamp = datetime.now(UTC) + timedelta(seconds=5) + timestamp = time.time() + 5 exe_state.suspend_with_timeout(timestamp) assert exe_state.status == BranchStatus.SUSPENDED_WITH_TIMEOUT @@ -931,8 +930,8 @@ def test_timer_scheduler_double_check_resume_queue(): exe_state2 = ExecutableWithState(Executable(1, lambda: "test")) # Schedule two tasks with different times to avoid comparison issues - past_time1 = datetime.now(UTC) - timedelta(seconds=2) - past_time2 = datetime.now(UTC) - timedelta(seconds=1) + past_time1 = time.time() - 2 + past_time2 = time.time() - 1 scheduler.schedule_resume(exe_state1, past_time1) scheduler.schedule_resume(exe_state2, past_time2) @@ -969,9 +968,7 @@ def execute_item(self, child_context, executable): exe_state = ExecutableWithState(executables[0]) future = Mock() - future.result.side_effect = TimedSuspendExecution( - "test message", datetime.now(UTC) + timedelta(seconds=1) - ) + future.result.side_effect = TimedSuspendExecution("test message", time.time() + 1) scheduler = Mock() scheduler.schedule_resume = Mock() @@ -1195,9 +1192,7 @@ def test_single_task_suspend_bubbles_up(): class TestExecutor(ConcurrentExecutor): def execute_item(self, child_context, executable): msg = "test" - raise TimedSuspendExecution( - msg, datetime.now(UTC) + timedelta(seconds=1) - ) # Future time + raise TimedSuspendExecution(msg, time.time() + 1) # Future time executables = [Executable(0, lambda: "test")] completion_config = CompletionConfig( @@ -1240,9 +1235,7 @@ def execute_item(self, child_context, executable): if executable.index == 0: # Task A self.task_a_suspended.set() msg = "test" - raise TimedSuspendExecution( - msg, datetime.now(UTC) + timedelta(seconds=1) - ) # Future time + raise TimedSuspendExecution(msg, time.time() + 1) # Future time # Task B # Wait for Task A to suspend first self.task_a_suspended.wait(timeout=2.0) @@ -1288,9 +1281,7 @@ def __init__(self, *args, **kwargs): def execute_item(self, child_context, executable): self.call_count += 1 msg = "test" - raise TimedSuspendExecution( - msg, datetime.now(UTC) + timedelta(seconds=10) - ) # Future time + raise TimedSuspendExecution(msg, time.time() + 10) # Future time executables = [Executable(0, lambda: "test")] completion_config = CompletionConfig( @@ -1348,15 +1339,11 @@ def execute_item(self, child_context, executable): if call_count == 1: # First call: immediate resubmit (past timestamp) msg = "immediate" - raise TimedSuspendExecution( - msg, datetime.now(UTC) - timedelta(seconds=1) - ) + raise TimedSuspendExecution(msg, time.time() - 1) if call_count == 2: # Second call: short delay resubmit msg = "short_delay" - raise TimedSuspendExecution( - msg, datetime.now(UTC) + timedelta(seconds=0.2) - ) + raise TimedSuspendExecution(msg, time.time() + 0.2) # Third call: complete successfully result = "result_B" self.task_b_can_complete.set() @@ -1414,7 +1401,7 @@ def test_timer_scheduler_double_check_condition(): exe_state.suspend() # Make it resumable # Schedule a task with past time - past_time = datetime.now(UTC) - timedelta(seconds=1) + past_time = time.time() - 1 scheduler.schedule_resume(exe_state, past_time) # Give scheduler time to process and hit the double-check condition @@ -1450,7 +1437,7 @@ def execute_item(self, child_context, executable): # Create executable with state in SUSPENDED_WITH_TIMEOUT exe_state = ExecutableWithState(executables[0]) - future_time = datetime.now(UTC) + timedelta(seconds=10) + future_time = time.time() + 10 exe_state.suspend_with_timeout(future_time) executor.executables_with_state = [exe_state] @@ -1552,7 +1539,7 @@ def test_timer_scheduler_can_resume_false(): exe_state.complete("done") # Schedule with past time - past_time = datetime.now(UTC) - timedelta(seconds=1) + past_time = time.time() - 1 scheduler.schedule_resume(exe_state, past_time) # Give scheduler time to process @@ -1590,7 +1577,7 @@ def execute_item(self, child_context, executable): exe_state1 = ExecutableWithState(executables[0]) exe_state2 = ExecutableWithState(executables[1]) - future_time = datetime.now(UTC) + timedelta(seconds=5) + future_time = time.time() + 5 exe_state1.suspend_with_timeout(future_time) exe_state2.suspend() # Indefinite @@ -1631,8 +1618,8 @@ def execute_item(self, child_context, executable): exe_state1 = ExecutableWithState(executables[0]) exe_state2 = ExecutableWithState(executables[1]) - later_time = datetime.now(UTC) + timedelta(seconds=10) - earlier_time = datetime.now(UTC) + timedelta(seconds=5) + later_time = time.time() + 10 + earlier_time = time.time() + 5 exe_state1.suspend_with_timeout(later_time) exe_state2.suspend_with_timeout(earlier_time) @@ -1659,14 +1646,14 @@ def test_timer_scheduler_double_check_condition_race(): exe_state2.suspend() # Schedule first task with past time - past_time = datetime.now(UTC) - timedelta(seconds=1) + past_time = time.time() - 1 scheduler.schedule_resume(exe_state1, past_time) # Brief delay to let timer thread see the first task time.sleep(0.05) # Schedule second task with even more past time (will be heap[0]) - very_past_time = datetime.now(UTC) - timedelta(seconds=2) + very_past_time = time.time() - 2 scheduler.schedule_resume(exe_state2, very_past_time) # Wait for processing @@ -1708,9 +1695,9 @@ def execute_item(self, child_context, executable): exe_state2 = ExecutableWithState(executables[1]) exe_state3 = ExecutableWithState(executables[2]) - time1 = datetime.now(UTC) + timedelta(seconds=10) - time2 = datetime.now(UTC) + timedelta(seconds=5) # Earliest - time3 = datetime.now(UTC) + timedelta(seconds=15) + time1 = time.time() + 10 + time2 = time.time() + 5 # Earliest + time3 = time.time() + 15 exe_state1.suspend_with_timeout(time1) exe_state2.suspend_with_timeout(time2) @@ -1775,7 +1762,7 @@ def test_timer_scheduler_cannot_resume_branch(): exe_state.complete("done") # Schedule with past time - past_time = datetime.now(UTC) - timedelta(seconds=1) + past_time = time.time() - 1 scheduler.schedule_resume(exe_state, past_time) # Wait for processing @@ -2070,7 +2057,7 @@ def execute_item(self, child_context, executable): # Create executable with SUSPENDED_WITH_TIMEOUT status exe_state = ExecutableWithState(executables[0]) - future_time = datetime.now(UTC) + timedelta(seconds=10) + future_time = time.time() + 10 exe_state.suspend_with_timeout(future_time) executor.executables_with_state = [exe_state] @@ -2132,7 +2119,7 @@ def execute_item(self, child_context, executable): exe_states[4].suspend() # SUSPENDED_WITH_TIMEOUT - exe_states[5].suspend_with_timeout(datetime.now(UTC) + timedelta(seconds=10)) + exe_states[5].suspend_with_timeout(time.time() + 10) executor.executables_with_state = exe_states @@ -2297,7 +2284,7 @@ def execute_item(self, child_context, executable): exe_states[2].suspend() # SUSPENDED_WITH_TIMEOUT - exe_states[3].suspend_with_timeout(datetime.now(UTC) + timedelta(seconds=5)) + exe_states[3].suspend_with_timeout(time.time() + 5) executor.executables_with_state = exe_states @@ -2349,7 +2336,7 @@ def test_timer_scheduler_future_time_condition_false(): exe_state.suspend() # Schedule with future time so condition will be False - future_time = datetime.now(UTC) + timedelta(seconds=10) + future_time = time.time() + 10 scheduler.schedule_resume(exe_state, future_time) # Wait briefly for timer thread to check and find condition False diff --git a/tests/exceptions_test.py b/tests/exceptions_test.py index b70a1a5..ac425ac 100644 --- a/tests/exceptions_test.py +++ b/tests/exceptions_test.py @@ -1,6 +1,6 @@ """Unit tests for exceptions module.""" -from datetime import UTC, datetime, timedelta +import time from unittest.mock import patch import pytest @@ -138,7 +138,7 @@ def test_callable_runtime_error_serializable_details_frozen(): def test_timed_suspend_execution(): """Test TimedSuspendExecution exception.""" - scheduled_time = datetime.now(UTC) + scheduled_time = 1234567890.0 error = TimedSuspendExecution("timed suspend", scheduled_time) assert str(error) == "timed suspend" assert error.scheduled_timestamp == scheduled_time @@ -151,16 +151,12 @@ def test_timed_suspend_execution_from_delay(): message = "Waiting for callback" delay_seconds = 30 - # Mock datetime.now() to get predictable results - mock_now = datetime(2023, 1, 1, 12, 0, 0, tzinfo=UTC) - with patch("aws_durable_execution_sdk_python.exceptions.datetime") as mock_datetime: - mock_datetime.now.return_value = mock_now - mock_datetime.side_effect = datetime + # Mock time.time() to get predictable results + with patch("time.time", return_value=1000.0): error = TimedSuspendExecution.from_delay(message, delay_seconds) assert str(error) == message - expected_time = mock_now + timedelta(seconds=30) - assert error.scheduled_timestamp == expected_time + assert error.scheduled_timestamp == 1030.0 # 1000.0 + 30 assert isinstance(error, TimedSuspendExecution) assert isinstance(error, SuspendExecution) @@ -170,14 +166,11 @@ def test_timed_suspend_execution_from_delay_zero_delay(): message = "Immediate suspension" delay_seconds = 0 - mock_now = datetime(2023, 1, 1, 12, 0, 0, tzinfo=UTC) - with patch("aws_durable_execution_sdk_python.exceptions.datetime") as mock_datetime: - mock_datetime.now.return_value = mock_now - mock_datetime.side_effect = datetime + with patch("time.time", return_value=500.0): error = TimedSuspendExecution.from_delay(message, delay_seconds) assert str(error) == message - assert error.scheduled_timestamp == mock_now # no delay added + assert error.scheduled_timestamp == 500.0 # 500.0 + 0 assert isinstance(error, TimedSuspendExecution) @@ -186,15 +179,11 @@ def test_timed_suspend_execution_from_delay_negative_delay(): message = "Past suspension" delay_seconds = -10 - mock_now = datetime(2023, 1, 1, 12, 0, 0, tzinfo=UTC) - with patch("aws_durable_execution_sdk_python.exceptions.datetime") as mock_datetime: - mock_datetime.now.return_value = mock_now - mock_datetime.side_effect = datetime + with patch("time.time", return_value=100.0): error = TimedSuspendExecution.from_delay(message, delay_seconds) assert str(error) == message - expected_time = mock_now + timedelta(seconds=-10) - assert error.scheduled_timestamp == expected_time + assert error.scheduled_timestamp == 90.0 # 100.0 + (-10) assert isinstance(error, TimedSuspendExecution) @@ -203,15 +192,11 @@ def test_timed_suspend_execution_from_delay_large_delay(): message = "Long suspension" delay_seconds = 3600 # 1 hour - mock_now = datetime(2023, 1, 1, 12, 0, 0, tzinfo=UTC) - with patch("aws_durable_execution_sdk_python.exceptions.datetime") as mock_datetime: - mock_datetime.now.return_value = mock_now - mock_datetime.side_effect = datetime + with patch("time.time", return_value=0.0): error = TimedSuspendExecution.from_delay(message, delay_seconds) assert str(error) == message - expected_time = mock_now + timedelta(seconds=3600) - assert error.scheduled_timestamp == expected_time + assert error.scheduled_timestamp == 3600.0 # 0.0 + 3600 assert isinstance(error, TimedSuspendExecution) @@ -220,15 +205,15 @@ def test_timed_suspend_execution_from_delay_calculation_accuracy(): message = "Accurate timing test" delay_seconds = 42 - # Test with actual datetime.now() to ensure the calculation works in real scenarios - before_time = datetime.now(UTC) + # Test with actual time.time() to ensure the calculation works in real scenarios + before_time = time.time() error = TimedSuspendExecution.from_delay(message, delay_seconds) - after_time = datetime.now(UTC) + after_time = time.time() # The scheduled timestamp should be within a reasonable range # (accounting for the small time difference between calls) - expected_min = before_time + timedelta(seconds=delay_seconds) - expected_max = after_time + timedelta(seconds=delay_seconds) + expected_min = before_time + delay_seconds + expected_max = after_time + delay_seconds assert expected_min <= error.scheduled_timestamp <= expected_max assert str(error) == message