Skip to content

fix(sdk): pass item_serdes in factory_method#123

Merged
1 commit merged intomainfrom
item-serdes
Nov 6, 2025
Merged

fix(sdk): pass item_serdes in factory_method#123
1 commit merged intomainfrom
item-serdes

Conversation

@ghost
Copy link
Copy Markdown

@ghost ghost commented Nov 6, 2025

  • feat: Batch Result serialization
  • Fixup: pass item_serdes in factory_method

Issue #, if available:

Description of changes:

By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.

@ghost ghost requested review from wangyb-A and yaythomas as code owners November 6, 2025 04:38
@ghost ghost changed the title Fixup: pass item_serdes in factory_method Fix(sdk): pass item_serdes in factory_method Nov 6, 2025
@ghost ghost closed this Nov 6, 2025
@ghost ghost reopened this Nov 6, 2025
@ghost ghost force-pushed the item-serdes branch from 8d966a5 to 012b130 Compare November 6, 2025 04:44
@ghost ghost changed the title Fix(sdk): pass item_serdes in factory_method fix(sdk): pass item_serdes in factory_method Nov 6, 2025
@ghost ghost closed this Nov 6, 2025
@ghost ghost reopened this Nov 6, 2025
@ghost ghost force-pushed the item-serdes branch from 012b130 to e2565f2 Compare November 6, 2025 04:58
Comment on lines +768 to +804
"""Test map serializes items with item_serdes or fallback."""
mock_serialize.return_value = '"serialized"'

parent_checkpoint = Mock()
parent_checkpoint.is_succeeded.return_value = False
parent_checkpoint.is_failed.return_value = False
parent_checkpoint.is_started.return_value = False
parent_checkpoint.is_existent.return_value = True
parent_checkpoint.is_replay_children.return_value = False

child_checkpoint = Mock()
child_checkpoint.is_succeeded.return_value = False
child_checkpoint.is_failed.return_value = False
child_checkpoint.is_started.return_value = False
child_checkpoint.is_existent.return_value = True
child_checkpoint.is_replay_children.return_value = False

def get_checkpoint(op_id):
return child_checkpoint if op_id.startswith("child-") else parent_checkpoint

mock_state = Mock()
mock_state.durable_execution_arn = "arn:test"
mock_state.get_checkpoint_result = Mock(side_effect=get_checkpoint)
mock_state.create_checkpoint = Mock()

context_map = {}

def create_id(self, i):
ctx_id = id(self)
if ctx_id not in context_map:
context_map[ctx_id] = []
context_map[ctx_id].append(i)
return (
"parent"
if len(context_map) == 1 and len(context_map[ctx_id]) == 1
else f"child-{i}"
)
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Setting up the state to allow us to properly reach children and execute them.

Comment on lines +815 to +820
assert mock_serialize.call_args_list[0][1]["serdes"] is expected
assert mock_serialize.call_args_list[0][1]["operation_id"] == "child-0"
assert mock_serialize.call_args_list[1][1]["serdes"] is expected
assert mock_serialize.call_args_list[1][1]["operation_id"] == "child-1"
assert mock_serialize.call_args_list[2][1]["serdes"] is batch_serdes
assert mock_serialize.call_args_list[2][1]["operation_id"] == "parent"
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Test that we are actually calling serialize.

Map and Parallel produce trees of operations. We expect to see N children first, and then 1 operation for the parent.

Comment on lines +833 to +874
"""Test map deserializes items with item_serdes or fallback."""
mock_deserialize.return_value = "deserialized"

parent_checkpoint = Mock()
parent_checkpoint.is_succeeded.return_value = False
parent_checkpoint.is_failed.return_value = False
parent_checkpoint.is_existent.return_value = False

child_checkpoint = Mock()
child_checkpoint.is_succeeded.return_value = True
child_checkpoint.is_failed.return_value = False
child_checkpoint.is_replay_children.return_value = False
child_checkpoint.result = '"cached"'

def get_checkpoint(op_id):
return child_checkpoint if op_id.startswith("child-") else parent_checkpoint

mock_state = Mock()
mock_state.durable_execution_arn = "arn:test"
mock_state.get_checkpoint_result = Mock(side_effect=get_checkpoint)
mock_state.create_checkpoint = Mock()

context_map = {}

def create_id(self, i):
ctx_id = id(self)
if ctx_id not in context_map:
context_map[ctx_id] = []
context_map[ctx_id].append(i)
return (
"parent"
if len(context_map) == 1 and len(context_map[ctx_id]) == 1
else f"child-{i}"
)

with patch.object(DurableContext, "_create_step_id_for_logical_step", create_id):
context = DurableContext(state=mock_state)
context.map(
["a", "b"],
lambda ctx, item, idx, items: item,
config=MapConfig(serdes=batch_serdes, item_serdes=item_serdes),
)
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Context and state setup.

Comment on lines +875 to +880

expected = item_serdes or batch_serdes
assert mock_deserialize.call_args_list[0][1]["serdes"] is expected
assert mock_deserialize.call_args_list[0][1]["operation_id"] == "child-0"
assert mock_deserialize.call_args_list[1][1]["serdes"] is expected
assert mock_deserialize.call_args_list[1][1]["operation_id"] == "child-1"
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Verify that we are calling deserialize on children.

Comment on lines +747 to +792
@patch("aws_durable_execution_sdk_python.operation.child.serialize")
def test_parallel_item_serialize(mock_serialize, item_serdes, batch_serdes):
"""Test parallel serializes branches with item_serdes or fallback."""
mock_serialize.return_value = '"serialized"'

parent_checkpoint = Mock()
parent_checkpoint.is_succeeded.return_value = False
parent_checkpoint.is_failed.return_value = False
parent_checkpoint.is_started.return_value = False
parent_checkpoint.is_existent.return_value = True
parent_checkpoint.is_replay_children.return_value = False

child_checkpoint = Mock()
child_checkpoint.is_succeeded.return_value = False
child_checkpoint.is_failed.return_value = False
child_checkpoint.is_started.return_value = False
child_checkpoint.is_existent.return_value = True
child_checkpoint.is_replay_children.return_value = False

def get_checkpoint(op_id):
return child_checkpoint if op_id.startswith("child-") else parent_checkpoint

mock_state = Mock()
mock_state.durable_execution_arn = "arn:test"
mock_state.get_checkpoint_result = Mock(side_effect=get_checkpoint)
mock_state.create_checkpoint = Mock()

context_map = {}

def create_id(self, i):
ctx_id = id(self)
if ctx_id not in context_map:
context_map[ctx_id] = []
context_map[ctx_id].append(i)
return (
"parent"
if len(context_map) == 1 and len(context_map[ctx_id]) == 1
else f"child-{i}"
)

with patch.object(DurableContext, "_create_step_id_for_logical_step", create_id):
context = DurableContext(state=mock_state)
context.parallel(
[lambda ctx: "a", lambda ctx: "b"],
config=ParallelConfig(serdes=batch_serdes, item_serdes=item_serdes),
)
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Setup the state.

Comment on lines +795 to +800
assert mock_serialize.call_args_list[0][1]["serdes"] is expected
assert mock_serialize.call_args_list[0][1]["operation_id"] == "child-0"
assert mock_serialize.call_args_list[1][1]["serdes"] is expected
assert mock_serialize.call_args_list[1][1]["operation_id"] == "child-1"
assert mock_serialize.call_args_list[2][1]["serdes"] is batch_serdes
assert mock_serialize.call_args_list[2][1]["operation_id"] == "parent"
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Verify that we are seeing children and parent in correct ordering here as we are executing recursively.

1. Pass item_serdes to executor factory methods.
2. Add tests to verify fallback and default behaviour.
@ghost ghost force-pushed the item-serdes branch from e2565f2 to e4eca42 Compare November 6, 2025 05:17
@ghost ghost merged commit 3ec77a3 into main Nov 6, 2025
8 of 9 checks passed
@wangyb-A wangyb-A deleted the item-serdes branch December 9, 2025 22:29
This pull request was closed.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants