Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 29 additions & 9 deletions agent_sdks/python/src/a2ui/core/parser/streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,11 @@ def __init__(self, catalog: "A2uiCatalog" = None):
# (surfaceId, cid) -> hash of content for change detection
self._yielded_contents: Dict[Any, str] = {}

self._root_id: Optional[str] = None # The root component ID for the layout tree
self._root_ids: Dict[str, str] = {} # The root component IDs mapped per surface
self._default_root_id: Optional[str] = None # Base default root ID for the protocol
self._unbound_root_id: Optional[str] = (
None # Temporary holding variable for when root arrives before surfaceId
)
self._surface_id: Optional[str] = None # The active surface ID tracking the context
self._msg_types: List[str] = [] # Running list of message types seen in the block

Expand Down Expand Up @@ -143,14 +147,30 @@ def surface_id(self) -> Optional[str]:
@surface_id.setter
def surface_id(self, value: Optional[str]):
self._surface_id = value
if value is not None and self._unbound_root_id is not None:
self._root_ids[value] = self._unbound_root_id
self._unbound_root_id = None

@property
def root_id(self) -> Optional[str]:
return self._root_id
if self._surface_id:
return self._root_ids.get(self._surface_id, self._default_root_id)
# Return unbound root ID if explicitly sniffed, otherwise use protocol default
return (
self._unbound_root_id
if self._unbound_root_id is not None
else self._default_root_id
)

@root_id.setter
def root_id(self, value: Optional[str]):
self._root_id = value
if self._surface_id:
if value is not None:
self._root_ids[self._surface_id] = value
else:
self._root_ids.pop(self._surface_id, None)
else:
self._unbound_root_id = value

@property
def msg_types(self) -> List[str]:
Expand Down Expand Up @@ -205,7 +225,7 @@ def _yield_messages(
if self._validator:
try:
self._validator.validate(
m, root_id=self._root_id, strict_integrity=strict_integrity
m, root_id=self.root_id, strict_integrity=strict_integrity
)
except ValueError as e:
if strict_integrity:
Expand Down Expand Up @@ -835,7 +855,7 @@ def yield_reachable(
raise_on_orphans: If True, uses strict topology analysis to catch loops.
"""
active_msg_type = self._get_active_msg_type_for_components()
if not self._root_id or not active_msg_type:
if not self.root_id or not active_msg_type:
return

# Buffer components until we have a beginRendering or createSurface for a known surface.
Expand All @@ -850,13 +870,13 @@ def yield_reachable(
# Analyze topology of current seen components
components_to_analyze = list(self._seen_components.values())

if check_root and self._root_id not in self._seen_components:
if check_root and self.root_id not in self._seen_components:
raise ValueError(
f"No root component (id='{self._root_id}') found in {active_msg_type}"
f"No root component (id='{self.root_id}') found in {active_msg_type}"
)

reachable_ids = analyze_topology(
self._root_id,
self.root_id,
components_to_analyze,
self._ref_fields_map,
raise_on_orphans=raise_on_orphans,
Expand All @@ -867,7 +887,7 @@ def yield_reachable(

if check_root and not available_reachable:
raise ValueError(
f"No root component (id='{self._root_id}') found in {active_msg_type}"
f"No root component (id='{self.root_id}') found in {active_msg_type}"
)

# 1. Process placeholders and partial children
Expand Down
26 changes: 16 additions & 10 deletions agent_sdks/python/src/a2ui/core/parser/streaming_v08.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ class A2uiStreamParserV08(A2uiStreamParser):

def __init__(self, catalog=None):
super().__init__(catalog=catalog)
self._root_id = None # v0.8 root is determined by beginRendering
self._yielded_begin_rendering_surfaces: Set[str] = set()

@property
Expand Down Expand Up @@ -65,15 +64,22 @@ def _data_model_msg_type(self) -> str:

def _sniff_metadata(self):
"""Sniffs for v0.8 metadata in the json_buffer."""
if not self.surface_id:
match = re.search(r'"surfaceId"\s*:\s*"([^"]+)"', self._json_buffer)
if match:
self.surface_id = match.group(1)

if not self.root_id:
match = re.search(r'"root"\s*:\s*"([^"]+)"', self._json_buffer)
if match:
self.root_id = match.group(1)

def get_latest_value(key: str) -> Optional[str]:
idx = len(self._json_buffer)
while True:
idx = self._json_buffer.rfind(f'"{key}"', 0, idx)
if idx == -1:
return None
match = re.match(rf'"{key}"\s*:\s*"([^"]+)"', self._json_buffer[idx:])
if match:
return match.group(1)

self.surface_id = get_latest_value('surfaceId')

parsed_root = get_latest_value('root')
if parsed_root is not None:
self.root_id = parsed_root

if f'"{MSG_TYPE_BEGIN_RENDERING}":' in self._json_buffer:
self.add_msg_type(MSG_TYPE_BEGIN_RENDERING)
Expand Down
27 changes: 17 additions & 10 deletions agent_sdks/python/src/a2ui/core/parser/streaming_v09.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ class A2uiStreamParserV09(A2uiStreamParser):
def __init__(self, catalog=None):
super().__init__(catalog=catalog)
# v0.9 default root is "root"
self.root_id = DEFAULT_ROOT_ID
self._default_root_id = DEFAULT_ROOT_ID

@property
def _placeholder_component(self) -> Dict[str, Any]:
Expand Down Expand Up @@ -56,15 +56,22 @@ def is_protocol_msg(self, obj: Dict[str, Any]) -> bool:

def _sniff_metadata(self):
"""Sniffs for v0.9 metadata in the json_buffer."""
if not self.surface_id:
match = re.search(r'"surfaceId"\s*:\s*"([^"]+)"', self._json_buffer)
if match:
self.surface_id = match.group(1)

if not self.root_id or self.root_id == DEFAULT_ROOT_ID:
match = re.search(r'"root"\s*:\s*"([^"]+)"', self._json_buffer)
if match:
self.root_id = match.group(1)

def get_latest_value(key: str) -> Optional[str]:
idx = len(self._json_buffer)
while True:
idx = self._json_buffer.rfind(f'"{key}"', 0, idx)
if idx == -1:
return None
match = re.match(rf'"{key}"\s*:\s*"([^"]+)"', self._json_buffer[idx:])
if match:
return match.group(1)

self.surface_id = get_latest_value('surfaceId')

parsed_root = get_latest_value('root')
if parsed_root is not None:
self.root_id = parsed_root

if f'"{MSG_TYPE_CREATE_SURFACE}":' in self._json_buffer:
self.add_msg_type(MSG_TYPE_CREATE_SURFACE)
Expand Down
97 changes: 97 additions & 0 deletions agent_sdks/python/tests/core/parser/test_streaming_v08.py
Original file line number Diff line number Diff line change
Expand Up @@ -1965,3 +1965,100 @@ def test_sniff_partial_component_enforces_required_fields(mock_catalog):
}
}]
assertResponseContainsMessages(response_parts, expected)


def test_multiple_concurrent_surfaces(mock_catalog):
"""Verifies that the parser can handle multiple surfaces simultaneously."""
parser = A2uiStreamParser(catalog=mock_catalog)

# Send A2UI block opening bracket
parser.process_chunk(f"{A2UI_OPEN_TAG}[")

# 1. Establish root for surface 1
parser.process_chunk(
'{"beginRendering": {"surfaceId": "surface1", "root": "root1"}},'
)

# 2. Establish root for surface 2
parser.process_chunk(
'{"beginRendering": {"surfaceId": "surface2", "root": "root2"}},'
)

# 3. Stream components for surface 1 in chunks
chunk_s1_a = (
'{"surfaceUpdate": {"surfaceId": "surface1", "components": ['
'{"id": "root1", "component": {"Card": {"child": "c1"}}}, '
)
response_parts = parser.process_chunk(chunk_s1_a)
expected_s1_a = [{
"surfaceUpdate": {
"surfaceId": "surface1",
"components": [
{
"component": parser._placeholder_component["component"],
"id": "loading_c1",
},
{
"id": "root1",
"component": {"Card": {"child": "loading_c1"}},
},
],
}
}]
assertResponseContainsMessages(response_parts, expected_s1_a)

chunk_s1_b = '{"id": "c1", "component": {"Text": {"text": "hello s1"}}}]}}, '
response_parts = parser.process_chunk(chunk_s1_b)

expected_s1_b = [{
"surfaceUpdate": {
"surfaceId": "surface1",
"components": [
{
"id": "c1",
"component": {
"Text": {
"text": "hello s1",
}
},
},
{
"id": "root1",
"component": {"Card": {"child": "c1"}},
},
],
}
}]
assertResponseContainsMessages(response_parts, expected_s1_b)

# 4. Stream components for surface 2
chunk_s2 = (
'{"surfaceUpdate": {"surfaceId": "surface2", "components": ['
'{"id": "root2", "component": {"Card": {"child": "c2"}}}, '
'{"id": "c2", "component": {"Text": {"text": "hello s2"}}}]}}'
)
response_parts = parser.process_chunk(chunk_s2)

expected_s2 = [{
"surfaceUpdate": {
"surfaceId": "surface2",
"components": [
{
"id": "c2",
"component": {
"Text": {
"text": "hello s2",
}
},
},
{
"id": "root2",
"component": {"Card": {"child": "c2"}},
},
],
}
}]
assertResponseContainsMessages(response_parts, expected_s2)

# Send A2UI block closing bracket
parser.process_chunk(f"]{A2UI_CLOSE_TAG}")
108 changes: 108 additions & 0 deletions agent_sdks/python/tests/core/parser/test_streaming_v09.py
Original file line number Diff line number Diff line change
Expand Up @@ -2032,3 +2032,111 @@ def test_sniff_partial_component_enforces_required_fields(mock_catalog):
},
}]
assertResponseContainsMessages(response_parts, expected)


def test_multiple_concurrent_surfaces(mock_catalog):
"""Verifies that the parser can handle multiple surfaces simultaneously."""
parser = A2uiStreamParser(catalog=mock_catalog)

# Send A2UI block opening bracket
parser.process_chunk(f"{A2UI_OPEN_TAG}[")

# 1. Establish root for surface 1
response_parts = parser.process_chunk(
'{"version": "v0.9", "createSurface": {"surfaceId": "surface1", "catalogId":'
' "test_catalog"}},'
)
expected_cs1 = [{
"version": "v0.9",
"createSurface": {"catalogId": "test_catalog", "surfaceId": "surface1"},
}]
assertResponseContainsMessages(response_parts, expected_cs1)

# 2. Establish root for surface 2
response_parts = parser.process_chunk(
'{"version": "v0.9", "createSurface": {"surfaceId": "surface2", "catalogId":'
' "test_catalog"}},'
)
expected_cs2 = [{
"version": "v0.9",
"createSurface": {"catalogId": "test_catalog", "surfaceId": "surface2"},
}]
assertResponseContainsMessages(response_parts, expected_cs2)

# 3. Stream components for surface 1 in chunks
chunk_s1_a = (
'{"version": "v0.9", "updateComponents": {"surfaceId": "surface1", "root":'
' "root1", "components": [{"id": "root1", "component": "Card", "child": "c1"}, '
)
response_parts = parser.process_chunk(chunk_s1_a)
expected_s1_a = [{
"version": "v0.9",
"updateComponents": {
"surfaceId": "surface1",
"root": "root1",
"components": [
{
**parser._placeholder_component,
"id": "loading_c1",
},
{"id": "root1", "component": "Card", "child": "loading_c1"},
],
},
}]
assertResponseContainsMessages(response_parts, expected_s1_a)

chunk_s1_b = '{"id": "c1", "component": "Text", "text": "hello s1"}]}}, '
response_parts = parser.process_chunk(chunk_s1_b)

expected_s1_b = [{
"version": "v0.9",
"updateComponents": {
"surfaceId": "surface1",
"root": "root1",
"components": [
{
"id": "c1",
"component": "Text",
"text": "hello s1",
},
{
"id": "root1",
"component": "Card",
"child": "c1",
},
],
},
}]
assertResponseContainsMessages(response_parts, expected_s1_b)

# 4. Stream components for surface 2
chunk_s2 = (
'{"version": "v0.9", "updateComponents": {"surfaceId": "surface2", "root":'
' "root2", "components": [{"id": "root2", "component": "Card", "child": "c2"},'
' {"id": "c2", "component": "Text", "text": "hello s2"}]}}'
)
response_parts = parser.process_chunk(chunk_s2)

expected_s2 = [{
"version": "v0.9",
"updateComponents": {
"surfaceId": "surface2",
"root": "root2",
"components": [
{
"id": "c2",
"component": "Text",
"text": "hello s2",
},
{
"id": "root2",
"component": "Card",
"child": "c2",
},
],
},
}]
assertResponseContainsMessages(response_parts, expected_s2)

# Send A2UI block closing bracket
parser.process_chunk(f"]{A2UI_CLOSE_TAG}")
Loading