Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7321,10 +7321,9 @@
"Error":{"shape":"EventError"}
}
},
"InvokeDetails":{
"ChainedInvokeDetails":{
"type":"structure",
"members":{
"DurableExecutionArn":{"shape":"DurableExecutionArn"},
"Result":{"shape":"OperationPayload"},
"Error":{"shape":"ErrorObject"}
}
Expand All @@ -7343,12 +7342,11 @@
"RESPONSE_STREAM"
]
},
"InvokeOptions":{
"ChainedInvokeOptions":{
"type":"structure",
"members":{
"FunctionName":{"shape":"FunctionName"},
"FunctionQualifier":{"shape":"Version"},
"DurableExecutionName":{"shape":"DurableExecutionName"}
"TimeoutSeconds":{"shape":"DurationSeconds"}
}
},
"InvokeResponseStreamUpdate":{
Expand Down Expand Up @@ -9332,7 +9330,7 @@
"StepDetails":{"shape":"StepDetails"},
"WaitDetails":{"shape":"WaitDetails"},
"CallbackDetails":{"shape":"CallbackDetails"},
"InvokeDetails":{"shape":"InvokeDetails"}
"ChainedInvokeDetails":{"shape":"ChainedInvokeDetails"}
}
},
"OperationAction":{
Expand Down Expand Up @@ -9387,7 +9385,7 @@
"STEP",
"WAIT",
"CALLBACK",
"INVOKE"
"CHAINED_INVOKE"
]
},
"OperationUpdate":{
Expand All @@ -9405,7 +9403,7 @@
"StepOptions":{"shape":"StepOptions"},
"WaitOptions":{"shape":"WaitOptions"},
"CallbackOptions":{"shape":"CallbackOptions"},
"InvokeOptions":{"shape":"InvokeOptions"}
"ChainedInvokeOptions":{"shape":"ChainedInvokeOptions"}
}
},
"OperationUpdates":{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7320,10 +7320,9 @@
"Error":{"shape":"EventError"}
}
},
"InvokeDetails":{
"ChainedInvokeDetails":{
"type":"structure",
"members":{
"DurableExecutionArn":{"shape":"DurableExecutionArn"},
"Result":{"shape":"OperationPayload"},
"Error":{"shape":"ErrorObject"}
}
Expand All @@ -7342,12 +7341,11 @@
"RESPONSE_STREAM"
]
},
"InvokeOptions":{
"ChainedInvokeOptions":{
"type":"structure",
"members":{
"FunctionName":{"shape":"FunctionName"},
"FunctionQualifier":{"shape":"Version"},
"DurableExecutionName":{"shape":"DurableExecutionName"}
"TimeoutSeconds":{"shape":"DurationSeconds"}
}
},
"InvokeResponseStreamUpdate":{
Expand Down Expand Up @@ -9331,7 +9329,7 @@
"StepDetails":{"shape":"StepDetails"},
"WaitDetails":{"shape":"WaitDetails"},
"CallbackDetails":{"shape":"CallbackDetails"},
"InvokeDetails":{"shape":"InvokeDetails"}
"ChainedInvokeDetails":{"shape":"ChainedInvokeDetails"}
}
},
"OperationAction":{
Expand Down Expand Up @@ -9386,7 +9384,7 @@
"STEP",
"WAIT",
"CALLBACK",
"INVOKE"
"CHAINED_INVOKE"
]
},
"OperationUpdate":{
Expand All @@ -9404,7 +9402,7 @@
"StepOptions":{"shape":"StepOptions"},
"WaitOptions":{"shape":"WaitOptions"},
"CallbackOptions":{"shape":"CallbackOptions"},
"InvokeOptions":{"shape":"InvokeOptions"}
"ChainedInvokeOptions":{"shape":"ChainedInvokeOptions"}
}
},
"OperationUpdates":{
Expand Down
99 changes: 60 additions & 39 deletions src/aws_durable_execution_sdk_python/lambda_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@

logger = logging.getLogger(__name__)

type ReplayChildren = bool
type OperationPayload = str
type TimeoutSeconds = int


# region model
class OperationAction(Enum):
Expand Down Expand Up @@ -49,7 +53,24 @@ class OperationType(Enum):
STEP = "STEP"
WAIT = "WAIT"
CALLBACK = "CALLBACK"
INVOKE = "INVOKE"
CHAINED_INVOKE = "CHAINED_INVOKE"


class CallbackTimeoutType(Enum):
TIMEOUT = "Callback.Timeout"
HEARTBEAT = "Callback.Heartbeat"


class ChainedInvokeFailedToStartType(Enum):
FAILED_TO_START = "ChainedInvoke.FailedToStart"


class ChainedInvokeTimeoutType(Enum):
TIMEOUT = "ChainedInvoke.Timeout"


class ChainedInvokeStopType(Enum):
STOPPED = "ChainedInvoke.Stopped"


class OperationSubType(Enum):
Expand Down Expand Up @@ -77,8 +98,8 @@ def from_dict(cls, data: MutableMapping[str, Any]) -> ExecutionDetails:

@dataclass(frozen=True)
class ContextDetails:
replay_children: bool = False
result: str | None = None
replay_children: ReplayChildren = False
result: OperationPayload | None = None
error: ErrorObject | None = None

@classmethod
Expand Down Expand Up @@ -150,7 +171,7 @@ def to_callable_runtime_error(self) -> CallableRuntimeError:
class StepDetails:
attempt: int = 0
next_attempt_timestamp: datetime.datetime | None = None
result: str | None = None
result: OperationPayload | None = None
error: ErrorObject | None = None

@classmethod
Expand Down Expand Up @@ -190,16 +211,14 @@ def from_dict(cls, data: MutableMapping[str, Any]) -> CallbackDetails:


@dataclass(frozen=True)
class InvokeDetails:
durable_execution_arn: str
class ChainedInvokeDetails:
result: str | None = None
error: ErrorObject | None = None

@classmethod
def from_dict(cls, data: MutableMapping[str, Any]) -> InvokeDetails:
def from_dict(cls, data: MutableMapping[str, Any]) -> ChainedInvokeDetails:
error_raw = data.get("Error")
return cls(
durable_execution_arn=data["DurableExecutionArn"],
result=data.get("Result"),
error=ErrorObject.from_dict(error_raw) if error_raw else None,
)
Expand Down Expand Up @@ -233,7 +252,7 @@ def to_dict(self) -> MutableMapping[str, Any]:

@dataclass(frozen=True)
class CallbackOptions:
timeout_seconds: int = 0
timeout_seconds: TimeoutSeconds = 0
heartbeat_timeout_seconds: int = 0

@classmethod
Expand All @@ -251,26 +270,28 @@ def to_dict(self) -> MutableMapping[str, Any]:


@dataclass(frozen=True)
class InvokeOptions:
class ChainedInvokeOptions:
function_name: str
timeout_seconds: int = 0
timeout_seconds: TimeoutSeconds = 0

@classmethod
def from_dict(cls, data: MutableMapping[str, Any]) -> InvokeOptions:
def from_dict(cls, data: MutableMapping[str, Any]) -> ChainedInvokeOptions:
return cls(
function_name=data["FunctionName"],
timeout_seconds=data.get("TimeoutSeconds", 0),
)

def to_dict(self) -> MutableMapping[str, Any]:
result: MutableMapping[str, Any] = {"FunctionName": self.function_name}
result["TimeoutSeconds"] = self.timeout_seconds
result: MutableMapping[str, Any] = {
"FunctionName": self.function_name,
"TimeoutSeconds": self.timeout_seconds,
}
return result


@dataclass(frozen=True)
class ContextOptions:
replay_children: bool = False
replay_children: ReplayChildren = False

@classmethod
def from_dict(cls, data: MutableMapping[str, Any]) -> ContextOptions:
Expand Down Expand Up @@ -299,7 +320,7 @@ class OperationUpdate:
step_options: StepOptions | None = None
wait_options: WaitOptions | None = None
callback_options: CallbackOptions | None = None
invoke_options: InvokeOptions | None = None
chained_invoke_options: ChainedInvokeOptions | None = None

def to_dict(self) -> MutableMapping[str, Any]:
result: MutableMapping[str, Any] = {
Expand All @@ -326,8 +347,8 @@ def to_dict(self) -> MutableMapping[str, Any]:
result["WaitOptions"] = self.wait_options.to_dict()
if self.callback_options:
result["CallbackOptions"] = self.callback_options.to_dict()
if self.invoke_options:
result["InvokeOptions"] = self.invoke_options.to_dict()
if self.chained_invoke_options:
result["ChainedInvokeOptions"] = self.chained_invoke_options.to_dict()

return result

Expand All @@ -352,9 +373,9 @@ def from_dict(cls, data: MutableMapping[str, Any]) -> OperationUpdate:
if callback_data := data.get("CallbackOptions"):
callback_options = CallbackOptions.from_dict(callback_data)

invoke_options = None
if invoke_data := data.get("InvokeOptions"):
invoke_options = InvokeOptions.from_dict(invoke_data)
chained_invoke_options = None
if invoke_data := data.get("ChainedInvokeOptions"):
chained_invoke_options = ChainedInvokeOptions.from_dict(invoke_data)

return cls(
operation_id=data["Id"],
Expand All @@ -369,7 +390,7 @@ def from_dict(cls, data: MutableMapping[str, Any]) -> OperationUpdate:
step_options=step_options,
wait_options=wait_options,
callback_options=callback_options,
invoke_options=invoke_options,
chained_invoke_options=chained_invoke_options,
)

@classmethod
Expand Down Expand Up @@ -537,18 +558,18 @@ def create_invoke_start(
cls,
identifier: OperationIdentifier,
payload: str,
invoke_options: InvokeOptions,
chained_invoke_options: ChainedInvokeOptions,
) -> OperationUpdate:
"""Create an instance of OperationUpdate for type: INVOKE, action: START."""
return cls(
operation_id=identifier.operation_id,
parent_id=identifier.parent_id,
operation_type=OperationType.INVOKE,
operation_type=OperationType.CHAINED_INVOKE,
sub_type=OperationSubType.INVOKE,
action=OperationAction.START,
name=identifier.name,
payload=payload,
invoke_options=invoke_options,
chained_invoke_options=chained_invoke_options,
)

# endregion invoke
Expand Down Expand Up @@ -657,7 +678,7 @@ class Operation:
step_details: StepDetails | None = None
wait_details: WaitDetails | None = None
callback_details: CallbackDetails | None = None
invoke_details: InvokeDetails | None = None
chained_invoke_details: ChainedInvokeDetails | None = None

@classmethod
def from_dict(cls, data: MutableMapping[str, Any]) -> Operation:
Expand Down Expand Up @@ -696,9 +717,11 @@ def from_dict(cls, data: MutableMapping[str, Any]) -> Operation:
if callback_details_input := data.get("CallbackDetails"):
callback_details = CallbackDetails.from_dict(callback_details_input)

invoke_details = None
if invoke_details_input := data.get("InvokeDetails"):
invoke_details = InvokeDetails.from_dict(invoke_details_input)
chained_invoke_details = None
if chained_invoke_details := data.get("chained_invoke_details"):
chained_invoke_details = ChainedInvokeDetails.from_dict(
chained_invoke_details
)

return cls(
operation_id=data["Id"],
Expand All @@ -714,7 +737,7 @@ def from_dict(cls, data: MutableMapping[str, Any]) -> Operation:
step_details=step_details,
wait_details=wait_details,
callback_details=callback_details,
invoke_details=invoke_details,
chained_invoke_details=chained_invoke_details,
)

def to_dict(self) -> MutableMapping[str, Any]:
Expand Down Expand Up @@ -763,15 +786,13 @@ def to_dict(self) -> MutableMapping[str, Any]:
if self.callback_details.error:
callback_dict["Error"] = self.callback_details.error.to_dict()
result["CallbackDetails"] = callback_dict
if self.invoke_details:
invoke_dict: MutableMapping[str, Any] = {
"DurableExecutionArn": self.invoke_details.durable_execution_arn
}
if self.invoke_details.result:
invoke_dict["Result"] = self.invoke_details.result
if self.invoke_details.error:
invoke_dict["Error"] = self.invoke_details.error.to_dict()
result["InvokeDetails"] = invoke_dict
if self.chained_invoke_details:
invoke_dict: MutableMapping[str, Any] = {}
if self.chained_invoke_details.result:
invoke_dict["Result"] = self.chained_invoke_details.result
if self.chained_invoke_details.error:
invoke_dict["Error"] = self.chained_invoke_details.error.to_dict()
result["ChainedInvokeDetails"] = invoke_dict
return result


Expand Down
10 changes: 5 additions & 5 deletions src/aws_durable_execution_sdk_python/operation/invoke.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
FatalError,
)
from aws_durable_execution_sdk_python.lambda_service import (
InvokeOptions,
ChainedInvokeOptions,
OperationUpdate,
)
from aws_durable_execution_sdk_python.serdes import deserialize, serialize
Expand Down Expand Up @@ -50,12 +50,12 @@ def invoke_handler(
# Return persisted result - no need to check for errors in successful operations
if (
checkpointed_result.operation
and checkpointed_result.operation.invoke_details
and checkpointed_result.operation.invoke_details.result
and checkpointed_result.operation.chained_invoke_details
and checkpointed_result.operation.chained_invoke_details.result
):
return deserialize(
serdes=config.serdes_result,
data=checkpointed_result.operation.invoke_details.result,
data=checkpointed_result.operation.chained_invoke_details.result,
operation_id=operation_identifier.operation_id,
durable_execution_arn=state.durable_execution_arn,
)
Expand Down Expand Up @@ -85,7 +85,7 @@ def invoke_handler(
start_operation: OperationUpdate = OperationUpdate.create_invoke_start(
identifier=operation_identifier,
payload=serialized_payload,
invoke_options=InvokeOptions(
chained_invoke_options=ChainedInvokeOptions(
function_name=function_name, timeout_seconds=config.timeout_seconds
),
)
Expand Down
4 changes: 2 additions & 2 deletions src/aws_durable_execution_sdk_python/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ def create_from_operation(cls, operation: Operation) -> CheckpointedResult:
result = callback_details.result if callback_details else None
error = callback_details.error if callback_details else None

case OperationType.INVOKE:
invoke_details = operation.invoke_details
case OperationType.CHAINED_INVOKE:
invoke_details = operation.chained_invoke_details
result = invoke_details.result if invoke_details else None
error = invoke_details.error if invoke_details else None

Expand Down
Loading