Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 13 additions & 13 deletions src/aws_durable_execution_sdk_python/concurrency.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@
import heapq
import logging
import threading
import time
from abc import ABC, abstractmethod
from collections import Counter
from concurrent.futures import Future, ThreadPoolExecutor
from dataclasses import dataclass
from datetime import UTC, datetime
from enum import Enum
from typing import TYPE_CHECKING, Generic, Self, TypeVar

Expand Down Expand Up @@ -258,7 +258,7 @@ def __init__(self, executable: Executable[CallableType]):
self.executable = executable
self._status = BranchStatus.PENDING
self._future: Future | None = None
self._suspend_until: float | None = None
self._suspend_until: datetime | None = None
self._result: ResultType = None # type: ignore[assignment]
self._is_result_set: bool = False
self._error: Exception | None = None
Expand Down Expand Up @@ -293,7 +293,7 @@ def error(self) -> Exception:
return self._error

@property
def suspend_until(self) -> float | None:
def suspend_until(self) -> datetime | None:
"""Get suspend timestamp."""
return self._suspend_until

Expand All @@ -308,7 +308,7 @@ def can_resume(self) -> bool:
return self._status is BranchStatus.SUSPENDED or (
self._status is BranchStatus.SUSPENDED_WITH_TIMEOUT
and self._suspend_until is not None
and time.time() >= self._suspend_until
and datetime.now(UTC) >= self._suspend_until
)

@property
Expand All @@ -333,7 +333,7 @@ def suspend(self) -> None:
self._status = BranchStatus.SUSPENDED
self._suspend_until = None

def suspend_with_timeout(self, timestamp: float) -> None:
def suspend_with_timeout(self, timestamp: datetime) -> None:
"""Transition to SUSPENDED_WITH_TIMEOUT state."""
self._status = BranchStatus.SUSPENDED_WITH_TIMEOUT
self._suspend_until = timestamp
Expand Down Expand Up @@ -507,11 +507,11 @@ def __exit__(self, exc_type, exc_val, exc_tb) -> None:
self.shutdown()

def schedule_resume(
self, exe_state: ExecutableWithState, resume_time: float
self, exe_state: ExecutableWithState, resume_time: datetime
) -> None:
"""Schedule a task to resume at the specified time."""
with self._lock:
heapq.heappush(self._pending_resumes, (resume_time, exe_state))
heapq.heappush(self._pending_resumes, (resume_time.timestamp(), exe_state))

def shutdown(self) -> None:
"""Shutdown the timer thread and cancel all pending resumes."""
Expand All @@ -534,7 +534,7 @@ def _timer_loop(self) -> None:
self._shutdown.wait(timeout=0.1)
continue

current_time = time.time()
current_time = datetime.now(UTC).timestamp()
if current_time >= next_resume_time:
# Time to resume
with self._lock:
Expand Down Expand Up @@ -675,7 +675,7 @@ def on_done(future: Future) -> None:

def should_execution_suspend(self) -> SuspendResult:
"""Check if execution should suspend."""
earliest_timestamp: float = float("inf")
earliest_timestamp: datetime | None = None
indefinite_suspend_task: (
ExecutableWithState[CallableType, ResultType] | None
) = None
Expand All @@ -685,16 +685,16 @@ def should_execution_suspend(self) -> SuspendResult:
# Exit here! Still have tasks that can make progress, don't suspend.
return SuspendResult.do_not_suspend()
if exe_state.status is BranchStatus.SUSPENDED_WITH_TIMEOUT:
if (
exe_state.suspend_until
and exe_state.suspend_until < earliest_timestamp
if exe_state.suspend_until and (
earliest_timestamp is None
or exe_state.suspend_until < earliest_timestamp
):
earliest_timestamp = exe_state.suspend_until
elif exe_state.status is BranchStatus.SUSPENDED:
indefinite_suspend_task = exe_state

# All tasks are in final states and at least one of them is a suspend.
if earliest_timestamp != float("inf"):
if earliest_timestamp is not None:
return SuspendResult.suspend(
TimedSuspendExecution(
"All concurrent work complete or suspended pending retry.",
Expand Down
18 changes: 7 additions & 11 deletions src/aws_durable_execution_sdk_python/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,8 @@

from __future__ import annotations

import time
from dataclasses import dataclass
from typing import TYPE_CHECKING

if TYPE_CHECKING:
import datetime
from datetime import UTC, datetime, timedelta


class DurableExecutionsError(Exception):
Expand Down Expand Up @@ -75,10 +71,10 @@ class TimedSuspendExecution(SuspendExecution):
This is a specialized form of SuspendExecution that includes a scheduled resume time.

Attributes:
scheduled_timestamp (float): Unix timestamp in seconds at which to resume.
scheduled_timestamp (datetime): DateTime at which to resume.
"""

def __init__(self, message: str, scheduled_timestamp: float):
def __init__(self, message: str, scheduled_timestamp: datetime):
super().__init__(message)
self.scheduled_timestamp = scheduled_timestamp

Expand All @@ -97,23 +93,23 @@ def from_delay(cls, message: str, delay_seconds: int) -> TimedSuspendExecution:
>>> exception = TimedSuspendExecution.from_delay("Waiting for callback", 30)
>>> # Will suspend for 30 seconds from now
"""
resume_time = time.time() + delay_seconds
resume_time = datetime.now(UTC) + timedelta(seconds=delay_seconds)
return cls(message, scheduled_timestamp=resume_time)

@classmethod
def from_datetime(
cls, message: str, datetime_timestamp: datetime.datetime
cls, message: str, datetime_timestamp: datetime
) -> TimedSuspendExecution:
"""Create a timed suspension with the delay calculated from now.

Args:
message: Descriptive message for the suspension
datetime_timestamp: Unix datetime timestamp in seconds at which to resume
datetime_timestamp: DateTime timestamp at which to resume

Returns:
TimedSuspendExecution: Instance with calculated resume time
"""
return cls(message, scheduled_timestamp=datetime_timestamp.timestamp())
return cls(message, scheduled_timestamp=datetime_timestamp)


class OrderedLockError(DurableExecutionsError):
Expand Down
4 changes: 2 additions & 2 deletions src/aws_durable_execution_sdk_python/operation/wait.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from __future__ import annotations

import logging
import time
from datetime import UTC, datetime, timedelta
from typing import TYPE_CHECKING

from aws_durable_execution_sdk_python.exceptions import TimedSuspendExecution
Expand Down Expand Up @@ -48,6 +48,6 @@ def wait_handler(
state.create_checkpoint(operation_update=operation)

# Calculate when to resume
resume_time = time.time() + seconds
resume_time = datetime.now(UTC) + timedelta(seconds=seconds)
msg = f"Wait for {seconds} seconds"
raise TimedSuspendExecution(msg, scheduled_timestamp=resume_time)
Loading
Loading