Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions src/aws_durable_execution_sdk_python/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@

import time
from dataclasses import dataclass
from typing import TYPE_CHECKING

if TYPE_CHECKING:
import datetime


class DurableExecutionsError(Exception):
Expand Down Expand Up @@ -96,6 +100,21 @@ def from_delay(cls, message: str, delay_seconds: int) -> TimedSuspendExecution:
resume_time = time.time() + delay_seconds
return cls(message, scheduled_timestamp=resume_time)

@classmethod
def from_datetime(
cls, message: str, datetime_timestamp: datetime.datetime
) -> TimedSuspendExecution:
"""Create a timed suspension with the delay calculated from now.

Args:
message: Descriptive message for the suspension
datetime_timestamp: Unix datetime timestamp in seconds at which to resume

Returns:
TimedSuspendExecution: Instance with calculated resume time
"""
return cls(message, scheduled_timestamp=datetime_timestamp.timestamp())


class OrderedLockError(DurableExecutionsError):
"""An error from OrderedLock.
Expand Down
4 changes: 1 addition & 3 deletions src/aws_durable_execution_sdk_python/lambda_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,9 +149,7 @@ def to_callable_runtime_error(self) -> CallableRuntimeError:
@dataclass(frozen=True)
class StepDetails:
attempt: int = 0
next_attempt_timestamp: str | None = (
None # TODO: confirm type, depending on how serialized
)
next_attempt_timestamp: datetime.datetime | None = None
result: str | None = None
error: ErrorObject | None = None

Expand Down
23 changes: 1 addition & 22 deletions src/aws_durable_execution_sdk_python/operation/invoke.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,15 @@
from aws_durable_execution_sdk_python.config import InvokeConfig
from aws_durable_execution_sdk_python.exceptions import (
FatalError,
SuspendExecution,
TimedSuspendExecution,
)
from aws_durable_execution_sdk_python.lambda_service import (
InvokeOptions,
OperationUpdate,
)
from aws_durable_execution_sdk_python.serdes import deserialize, serialize
from aws_durable_execution_sdk_python.suspend import suspend_with_optional_timeout

if TYPE_CHECKING:
from typing import NoReturn

from aws_durable_execution_sdk_python.identifier import OperationIdentifier
from aws_durable_execution_sdk_python.state import ExecutionState

Expand Down Expand Up @@ -108,21 +105,3 @@ def invoke_handler(
# This line should never be reached since suspend_with_optional_timeout always raises
msg = "suspend_with_optional_timeout should have raised an exception, but did not."
raise FatalError(msg) from None


def suspend_with_optional_timeout(
msg: str, timeout_seconds: int | None = None
) -> NoReturn:
"""Suspend execution with optional timeout.

Args:
msg: Descriptive message for the suspension
timeout_seconds: Duration to suspend in seconds, or None/0 for indefinite

Raises:
TimedSuspendExecution: When timeout_seconds > 0
SuspendExecution: When timeout_seconds is None or <= 0
"""
if timeout_seconds and timeout_seconds > 0:
raise TimedSuspendExecution.from_delay(msg, timeout_seconds)
raise SuspendExecution(msg)
36 changes: 22 additions & 14 deletions src/aws_durable_execution_sdk_python/operation/step.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
from __future__ import annotations

import logging
import time
from typing import TYPE_CHECKING, TypeVar

from aws_durable_execution_sdk_python.config import (
Expand All @@ -14,12 +13,18 @@
from aws_durable_execution_sdk_python.exceptions import (
FatalError,
StepInterruptedError,
TimedSuspendExecution,
)
from aws_durable_execution_sdk_python.lambda_service import ErrorObject, OperationUpdate
from aws_durable_execution_sdk_python.lambda_service import (
ErrorObject,
OperationUpdate,
)
from aws_durable_execution_sdk_python.logger import Logger, LogInfo
from aws_durable_execution_sdk_python.retries import RetryPresets
from aws_durable_execution_sdk_python.serdes import deserialize, serialize
from aws_durable_execution_sdk_python.suspend import (
suspend_with_optional_timeout,
suspend_with_optional_timestamp,
)
from aws_durable_execution_sdk_python.types import StepContext

if TYPE_CHECKING:
Expand Down Expand Up @@ -52,7 +57,9 @@ def step_handler(
if not config:
config = StepConfig()

checkpointed_result = state.get_checkpoint_result(operation_identifier.operation_id)
checkpointed_result: CheckpointedResult = state.get_checkpoint_result(
operation_identifier.operation_id
)
if checkpointed_result.is_succeeded():
logger.debug(
"Step already completed, skipping execution for id: %s, name: %s",
Expand All @@ -73,6 +80,13 @@ def step_handler(
# have to throw the exact same error on replay as the checkpointed failure
checkpointed_result.raise_callable_error()

if checkpointed_result.is_pending():
scheduled_timestamp = checkpointed_result.get_next_attempt_timestamp()
suspend_with_optional_timestamp(
msg=f"Retry scheduled for {operation_identifier.name or operation_identifier.operation_id} will retry at timestamp {scheduled_timestamp}",
datetime_timestamp=scheduled_timestamp,
)

if checkpointed_result.is_started():
# step was previously interrupted
if config.step_semantics is StepSemantics.AT_MOST_ONCE_PER_RETRY:
Expand Down Expand Up @@ -193,7 +207,10 @@ def retry_handler(

state.create_checkpoint(operation_update=retry_operation)

_suspend(operation_identifier, retry_decision)
suspend_with_optional_timeout(
msg=f"Retry scheduled for {operation_identifier.operation_id} in {retry_decision.delay_seconds} seconds",
timeout_seconds=retry_decision.delay_seconds,
)

# no retry
fail_operation: OperationUpdate = OperationUpdate.create_step_fail(
Expand All @@ -206,12 +223,3 @@ def retry_handler(
raise error

raise error_object.to_callable_runtime_error()


def _suspend(operation_identifier: OperationIdentifier, retry_decision: RetryDecision):
scheduled_timestamp = time.time() + retry_decision.delay_seconds
msg = f"Retry scheduled for {operation_identifier.operation_id} in {retry_decision.delay_seconds} seconds"
raise TimedSuspendExecution(
msg,
scheduled_timestamp=scheduled_timestamp,
)
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,21 @@
from __future__ import annotations

import logging
import time
from typing import TYPE_CHECKING, TypeVar

from aws_durable_execution_sdk_python.exceptions import (
FatalError,
TimedSuspendExecution,
)
from aws_durable_execution_sdk_python.lambda_service import ErrorObject, OperationUpdate
from aws_durable_execution_sdk_python.lambda_service import (
ErrorObject,
OperationUpdate,
)
from aws_durable_execution_sdk_python.logger import LogInfo
from aws_durable_execution_sdk_python.serdes import deserialize, serialize
from aws_durable_execution_sdk_python.suspend import (
suspend_with_optional_timeout,
suspend_with_optional_timestamp,
)
from aws_durable_execution_sdk_python.types import WaitForConditionCheckContext

if TYPE_CHECKING:
Expand All @@ -24,7 +29,10 @@
)
from aws_durable_execution_sdk_python.identifier import OperationIdentifier
from aws_durable_execution_sdk_python.logger import Logger
from aws_durable_execution_sdk_python.state import ExecutionState
from aws_durable_execution_sdk_python.state import (
CheckpointedResult,
ExecutionState,
)


T = TypeVar("T")
Expand All @@ -49,7 +57,9 @@ def wait_for_condition_handler(
operation_identifier.name,
)

checkpointed_result = state.get_checkpoint_result(operation_identifier.operation_id)
checkpointed_result: CheckpointedResult = state.get_checkpoint_result(
operation_identifier.operation_id
)

# Check if already completed
if checkpointed_result.is_succeeded():
Expand All @@ -70,6 +80,13 @@ def wait_for_condition_handler(
if checkpointed_result.is_failed():
checkpointed_result.raise_callable_error()

if checkpointed_result.is_pending():
scheduled_timestamp = checkpointed_result.get_next_attempt_timestamp()
suspend_with_optional_timestamp(
msg=f"wait_for_condition {operation_identifier.name or operation_identifier.operation_id} will retry at timestamp {scheduled_timestamp}",
datetime_timestamp=scheduled_timestamp,
)

attempt: int = 1
if checkpointed_result.is_started_or_ready():
# This is a retry - get state from previous checkpoint
Expand Down Expand Up @@ -164,7 +181,10 @@ def wait_for_condition_handler(

state.create_checkpoint(operation_update=retry_operation)

_suspend_execution(operation_identifier, decision)
suspend_with_optional_timeout(
msg=f"wait_for_condition {operation_identifier.name or operation_identifier.operation_id} will retry in {decision.delay_seconds} seconds",
timeout_seconds=decision.delay_seconds,
)

except Exception as e:
# Mark as failed - waitForCondition doesn't have its own retry logic for errors
Expand All @@ -184,14 +204,3 @@ def wait_for_condition_handler(

msg: str = "wait_for_condition should never reach this point"
raise FatalError(msg)


def _suspend_execution(
operation_identifier: OperationIdentifier, decision: WaitForConditionDecision
) -> None:
scheduled_timestamp = time.time() + (decision.delay_seconds or 0)
msg = f"wait_for_condition {operation_identifier.name or operation_identifier.operation_id} will retry in {decision.delay_seconds} seconds"
raise TimedSuspendExecution(
msg,
scheduled_timestamp=scheduled_timestamp,
)
13 changes: 13 additions & 0 deletions src/aws_durable_execution_sdk_python/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from aws_durable_execution_sdk_python.threading import OrderedLock

if TYPE_CHECKING:
import datetime
from collections.abc import MutableMapping


Expand Down Expand Up @@ -106,6 +107,13 @@ def is_started_or_ready(self) -> bool:
return False
return op.status in (OperationStatus.STARTED, OperationStatus.READY)

def is_pending(self) -> bool:
"""Return True if the checkpointed operation is PENDING."""
op = self.operation
if not op:
return False
return op.status is OperationStatus.PENDING

def is_timed_out(self) -> bool:
"""Return True if the checkpointed operation is TIMED_OUT."""
op = self.operation
Expand All @@ -126,6 +134,11 @@ def raise_callable_error(self) -> None:

raise self.error.to_callable_runtime_error()

def get_next_attempt_timestamp(self) -> datetime.datetime | None:
if self.operation and self.operation.step_details:
return self.operation.step_details.next_attempt_timestamp
return None


# shared so don't need to create an instance for each not found check
CHECKPOINT_NOT_FOUND = CheckpointedResult.create_not_found()
Expand Down
46 changes: 46 additions & 0 deletions src/aws_durable_execution_sdk_python/suspend.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import datetime

from aws_durable_execution_sdk_python.exceptions import (
SuspendExecution,
TimedSuspendExecution,
)


def suspend_with_optional_timestamp(
msg: str, datetime_timestamp: datetime.datetime | None = None
) -> None:
"""Suspend execution with optional timestamp.

Args:
msg: Descriptive message for the suspension
timestamp: Timestamp to suspend until, or None/0 for indefinite

Raises:
TimedSuspendExecution: When timestamp is in the future
SuspendExecution: When timestamp is None or in the past
"""
# TODO: confirm with backend about the behaviour of 0 time suspend
if datetime_timestamp and datetime_timestamp > datetime.datetime.now(
tz=datetime.UTC
):
raise TimedSuspendExecution.from_datetime(msg, datetime_timestamp)
msg = f"Invalid timestamp {datetime_timestamp}, suspending without retry timestamp, original operation: [{msg}]"
raise SuspendExecution(msg)


def suspend_with_optional_timeout(msg: str, timeout_seconds: int | None = None) -> None:
"""Suspend execution with optional timeout.

Args:
msg: Descriptive message for the suspension
timeout_seconds: Duration to suspend in seconds, or None/0 for indefinite

Raises:
TimedSuspendExecution: When timeout_seconds > 0
SuspendExecution: When timeout_seconds is None or <= 0
"""
# TODO: confirm with backend about the behaviour of 0 time suspend
if timeout_seconds and timeout_seconds > 0:
raise TimedSuspendExecution.from_delay(msg, timeout_seconds)
msg = f"Invalid timeout seconds {timeout_seconds}, suspending without retry timestamp, original operation: [{msg}]"
raise SuspendExecution(msg)
Loading