Skip to content

Commit 8353012

Browse files
author
Alex Wang
committed
feat: Implement json serdes for Operation
- Implement json compatiable serdes for Operation and InvocationInput class
1 parent afd4083 commit 8353012

4 files changed

Lines changed: 1068 additions & 3 deletions

File tree

src/aws_durable_execution_sdk_python/execution.py

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,16 @@ def from_dict(input_dict: MutableMapping[str, Any]) -> InitialExecutionState:
5959
next_marker=input_dict.get("NextMarker", ""),
6060
)
6161

62+
@staticmethod
63+
def from_json_dict(input_dict: MutableMapping[str, Any]) -> InitialExecutionState:
64+
operations = []
65+
if input_operations := input_dict.get("Operations"):
66+
operations = [Operation.from_json_dict(op) for op in input_operations]
67+
return InitialExecutionState(
68+
operations=operations,
69+
next_marker=input_dict.get("NextMarker", ""),
70+
)
71+
6272
def get_execution_operation(self) -> Operation | None:
6373
if not self.operations:
6474
# Due to payload size limitations we may have an empty operations list.
@@ -91,6 +101,12 @@ def to_dict(self) -> MutableMapping[str, Any]:
91101
"NextMarker": self.next_marker,
92102
}
93103

104+
def to_json_dict(self) -> MutableMapping[str, Any]:
105+
return {
106+
"Operations": [op.to_json_dict() for op in self.operations],
107+
"NextMarker": self.next_marker,
108+
}
109+
94110

95111
@dataclass(frozen=True)
96112
class DurableExecutionInvocationInput:
@@ -110,13 +126,32 @@ def from_dict(
110126
),
111127
)
112128

129+
@staticmethod
130+
def from_json_dict(
131+
input_dict: MutableMapping[str, Any],
132+
) -> DurableExecutionInvocationInput:
133+
return DurableExecutionInvocationInput(
134+
durable_execution_arn=input_dict["DurableExecutionArn"],
135+
checkpoint_token=input_dict["CheckpointToken"],
136+
initial_execution_state=InitialExecutionState.from_json_dict(
137+
input_dict.get("InitialExecutionState", {})
138+
),
139+
)
140+
113141
def to_dict(self) -> MutableMapping[str, Any]:
114142
return {
115143
"DurableExecutionArn": self.durable_execution_arn,
116144
"CheckpointToken": self.checkpoint_token,
117145
"InitialExecutionState": self.initial_execution_state.to_dict(),
118146
}
119147

148+
def to_json_dict(self) -> MutableMapping[str, Any]:
149+
return {
150+
"DurableExecutionArn": self.durable_execution_arn,
151+
"CheckpointToken": self.checkpoint_token,
152+
"InitialExecutionState": self.initial_execution_state.to_json_dict(),
153+
}
154+
120155

121156
@dataclass(frozen=True)
122157
class DurableExecutionInvocationInputWithClient(DurableExecutionInvocationInput):

src/aws_durable_execution_sdk_python/lambda_service.py

Lines changed: 94 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
from __future__ import annotations
22

3+
import copy
34
import datetime
45
import logging
56
from dataclasses import dataclass, field
7+
from datetime import UTC
68
from enum import Enum
79
from typing import TYPE_CHECKING, Any, Protocol, TypeAlias
810

@@ -805,9 +807,11 @@ def to_dict(self) -> MutableMapping[str, Any]:
805807
step_dict["Error"] = self.step_details.error.to_dict()
806808
result["StepDetails"] = step_dict
807809
if self.wait_details:
808-
result["WaitDetails"] = {
809-
"ScheduledEndTimestamp": self.wait_details.scheduled_end_timestamp
810-
}
810+
result["WaitDetails"] = (
811+
{"ScheduledEndTimestamp": self.wait_details.scheduled_end_timestamp}
812+
if self.wait_details.scheduled_end_timestamp
813+
else {}
814+
)
811815
if self.callback_details:
812816
callback_dict: MutableMapping[str, Any] = {
813817
"CallbackId": self.callback_details.callback_id
@@ -826,6 +830,93 @@ def to_dict(self) -> MutableMapping[str, Any]:
826830
result["ChainedInvokeDetails"] = invoke_dict
827831
return result
828832

833+
@classmethod
834+
def _dt_to_ms(cls, dt: datetime.datetime | None) -> int | None:
835+
"""Convert datetime to millisecond timestamp."""
836+
if not dt:
837+
return None
838+
return int(dt.timestamp() * 1000)
839+
840+
@classmethod
841+
def _ms_to_dt(cls, ms: int | None) -> datetime.datetime | None:
842+
"""Convert millisecond timestamp to datetime."""
843+
if not ms:
844+
return None
845+
return datetime.datetime.fromtimestamp(ms / 1000, tz=UTC)
846+
847+
def to_json_dict(self) -> MutableMapping[str, Any]:
848+
"""Convert the Operation to a JSON-serializable dictionary.
849+
850+
Converts datetime objects to millisecond timestamps for JSON compatibility.
851+
852+
Returns:
853+
A dictionary with JSON-serializable values
854+
"""
855+
# Start with the regular to_dict output
856+
result = self.to_dict()
857+
858+
# Convert datetime objects to millisecond timestamps
859+
if "StartTimestamp" in result and isinstance(
860+
result["StartTimestamp"], datetime.datetime
861+
):
862+
result["StartTimestamp"] = self._dt_to_ms(result["StartTimestamp"])
863+
864+
if "EndTimestamp" in result and isinstance(
865+
result["EndTimestamp"], datetime.datetime
866+
):
867+
result["EndTimestamp"] = self._dt_to_ms(result["EndTimestamp"])
868+
869+
if self.step_details and self.step_details.next_attempt_timestamp:
870+
result["StepDetails"]["NextAttemptTimestamp"] = self._dt_to_ms(
871+
result["StepDetails"].get("NextAttemptTimestamp")
872+
)
873+
874+
if self.wait_details and self.wait_details.scheduled_end_timestamp:
875+
result["WaitDetails"]["ScheduledEndTimestamp"] = self._dt_to_ms(
876+
result["WaitDetails"]["ScheduledEndTimestamp"]
877+
)
878+
879+
return result
880+
881+
@classmethod
882+
def from_json_dict(cls, data: MutableMapping[str, Any]) -> Operation:
883+
"""Create an Operation from a JSON-serializable dictionary.
884+
885+
Converts millisecond timestamps back to datetime objects.
886+
887+
Args:
888+
data: Dictionary with JSON-serializable values (millisecond timestamps)
889+
890+
Returns:
891+
An Operation instance with datetime objects
892+
"""
893+
# Make a copy to avoid modifying the original data
894+
data_copy = copy.deepcopy(data)
895+
896+
# Convert millisecond timestamps back to datetime objects
897+
if "StartTimestamp" in data_copy and isinstance(
898+
data_copy["StartTimestamp"], int
899+
):
900+
data_copy["StartTimestamp"] = cls._ms_to_dt(data_copy["StartTimestamp"])
901+
902+
if "EndTimestamp" in data_copy and isinstance(data_copy["EndTimestamp"], int):
903+
data_copy["EndTimestamp"] = cls._ms_to_dt(data_copy["EndTimestamp"])
904+
905+
step_details = data_copy.get("StepDetails")
906+
if step_details and isinstance(step_details.get("NextAttemptTimestamp"), int):
907+
step_details["NextAttemptTimestamp"] = cls._ms_to_dt(
908+
step_details["NextAttemptTimestamp"]
909+
)
910+
911+
wait_details = data_copy.get("WaitDetails")
912+
if wait_details and isinstance(wait_details.get("ScheduledEndTimestamp"), int):
913+
wait_details["ScheduledEndTimestamp"] = cls._ms_to_dt(
914+
wait_details["ScheduledEndTimestamp"]
915+
)
916+
917+
# Use the existing from_dict method with the converted data
918+
return cls.from_dict(data_copy)
919+
829920

830921
@dataclass(frozen=True)
831922
class CheckpointUpdatedExecutionState:

0 commit comments

Comments
 (0)