diff --git a/agent_sdks/python/src/a2ui/core/parser/streaming.py b/agent_sdks/python/src/a2ui/core/parser/streaming.py index 5a23ffe49..1840536d9 100644 --- a/agent_sdks/python/src/a2ui/core/parser/streaming.py +++ b/agent_sdks/python/src/a2ui/core/parser/streaming.py @@ -105,7 +105,11 @@ def __init__(self, catalog: "A2uiCatalog" = None): # (surfaceId, cid) -> hash of content for change detection self._yielded_contents: Dict[Any, str] = {} - self._root_id: Optional[str] = None # The root component ID for the layout tree + self._root_ids: Dict[str, str] = {} # The root component IDs mapped per surface + self._default_root_id: Optional[str] = None # Base default root ID for the protocol + self._unbound_root_id: Optional[str] = ( + None # Temporary holding variable for when root arrives before surfaceId + ) self._surface_id: Optional[str] = None # The active surface ID tracking the context self._msg_types: List[str] = [] # Running list of message types seen in the block @@ -143,14 +147,30 @@ def surface_id(self) -> Optional[str]: @surface_id.setter def surface_id(self, value: Optional[str]): self._surface_id = value + if value is not None and self._unbound_root_id is not None: + self._root_ids[value] = self._unbound_root_id + self._unbound_root_id = None @property def root_id(self) -> Optional[str]: - return self._root_id + if self._surface_id: + return self._root_ids.get(self._surface_id, self._default_root_id) + # Return unbound root ID if explicitly sniffed, otherwise use protocol default + return ( + self._unbound_root_id + if self._unbound_root_id is not None + else self._default_root_id + ) @root_id.setter def root_id(self, value: Optional[str]): - self._root_id = value + if self._surface_id: + if value is not None: + self._root_ids[self._surface_id] = value + else: + self._root_ids.pop(self._surface_id, None) + else: + self._unbound_root_id = value @property def msg_types(self) -> List[str]: @@ -205,7 +225,7 @@ def _yield_messages( if self._validator: try: self._validator.validate( - m, root_id=self._root_id, strict_integrity=strict_integrity + m, root_id=self.root_id, strict_integrity=strict_integrity ) except ValueError as e: if strict_integrity: @@ -835,7 +855,7 @@ def yield_reachable( raise_on_orphans: If True, uses strict topology analysis to catch loops. """ active_msg_type = self._get_active_msg_type_for_components() - if not self._root_id or not active_msg_type: + if not self.root_id or not active_msg_type: return # Buffer components until we have a beginRendering or createSurface for a known surface. @@ -850,13 +870,13 @@ def yield_reachable( # Analyze topology of current seen components components_to_analyze = list(self._seen_components.values()) - if check_root and self._root_id not in self._seen_components: + if check_root and self.root_id not in self._seen_components: raise ValueError( - f"No root component (id='{self._root_id}') found in {active_msg_type}" + f"No root component (id='{self.root_id}') found in {active_msg_type}" ) reachable_ids = analyze_topology( - self._root_id, + self.root_id, components_to_analyze, self._ref_fields_map, raise_on_orphans=raise_on_orphans, @@ -867,7 +887,7 @@ def yield_reachable( if check_root and not available_reachable: raise ValueError( - f"No root component (id='{self._root_id}') found in {active_msg_type}" + f"No root component (id='{self.root_id}') found in {active_msg_type}" ) # 1. Process placeholders and partial children diff --git a/agent_sdks/python/src/a2ui/core/parser/streaming_v08.py b/agent_sdks/python/src/a2ui/core/parser/streaming_v08.py index 6678b77dc..311247470 100644 --- a/agent_sdks/python/src/a2ui/core/parser/streaming_v08.py +++ b/agent_sdks/python/src/a2ui/core/parser/streaming_v08.py @@ -27,7 +27,6 @@ class A2uiStreamParserV08(A2uiStreamParser): def __init__(self, catalog=None): super().__init__(catalog=catalog) - self._root_id = None # v0.8 root is determined by beginRendering self._yielded_begin_rendering_surfaces: Set[str] = set() @property @@ -65,15 +64,22 @@ def _data_model_msg_type(self) -> str: def _sniff_metadata(self): """Sniffs for v0.8 metadata in the json_buffer.""" - if not self.surface_id: - match = re.search(r'"surfaceId"\s*:\s*"([^"]+)"', self._json_buffer) - if match: - self.surface_id = match.group(1) - - if not self.root_id: - match = re.search(r'"root"\s*:\s*"([^"]+)"', self._json_buffer) - if match: - self.root_id = match.group(1) + + def get_latest_value(key: str) -> Optional[str]: + idx = len(self._json_buffer) + while True: + idx = self._json_buffer.rfind(f'"{key}"', 0, idx) + if idx == -1: + return None + match = re.match(rf'"{key}"\s*:\s*"([^"]+)"', self._json_buffer[idx:]) + if match: + return match.group(1) + + self.surface_id = get_latest_value('surfaceId') + + parsed_root = get_latest_value('root') + if parsed_root is not None: + self.root_id = parsed_root if f'"{MSG_TYPE_BEGIN_RENDERING}":' in self._json_buffer: self.add_msg_type(MSG_TYPE_BEGIN_RENDERING) diff --git a/agent_sdks/python/src/a2ui/core/parser/streaming_v09.py b/agent_sdks/python/src/a2ui/core/parser/streaming_v09.py index 5284c0136..264adebfb 100644 --- a/agent_sdks/python/src/a2ui/core/parser/streaming_v09.py +++ b/agent_sdks/python/src/a2ui/core/parser/streaming_v09.py @@ -28,7 +28,7 @@ class A2uiStreamParserV09(A2uiStreamParser): def __init__(self, catalog=None): super().__init__(catalog=catalog) # v0.9 default root is "root" - self.root_id = DEFAULT_ROOT_ID + self._default_root_id = DEFAULT_ROOT_ID @property def _placeholder_component(self) -> Dict[str, Any]: @@ -56,15 +56,22 @@ def is_protocol_msg(self, obj: Dict[str, Any]) -> bool: def _sniff_metadata(self): """Sniffs for v0.9 metadata in the json_buffer.""" - if not self.surface_id: - match = re.search(r'"surfaceId"\s*:\s*"([^"]+)"', self._json_buffer) - if match: - self.surface_id = match.group(1) - - if not self.root_id or self.root_id == DEFAULT_ROOT_ID: - match = re.search(r'"root"\s*:\s*"([^"]+)"', self._json_buffer) - if match: - self.root_id = match.group(1) + + def get_latest_value(key: str) -> Optional[str]: + idx = len(self._json_buffer) + while True: + idx = self._json_buffer.rfind(f'"{key}"', 0, idx) + if idx == -1: + return None + match = re.match(rf'"{key}"\s*:\s*"([^"]+)"', self._json_buffer[idx:]) + if match: + return match.group(1) + + self.surface_id = get_latest_value('surfaceId') + + parsed_root = get_latest_value('root') + if parsed_root is not None: + self.root_id = parsed_root if f'"{MSG_TYPE_CREATE_SURFACE}":' in self._json_buffer: self.add_msg_type(MSG_TYPE_CREATE_SURFACE) diff --git a/agent_sdks/python/tests/core/parser/test_streaming_v08.py b/agent_sdks/python/tests/core/parser/test_streaming_v08.py index 51cfed5bc..1cd912897 100644 --- a/agent_sdks/python/tests/core/parser/test_streaming_v08.py +++ b/agent_sdks/python/tests/core/parser/test_streaming_v08.py @@ -1965,3 +1965,100 @@ def test_sniff_partial_component_enforces_required_fields(mock_catalog): } }] assertResponseContainsMessages(response_parts, expected) + + +def test_multiple_concurrent_surfaces(mock_catalog): + """Verifies that the parser can handle multiple surfaces simultaneously.""" + parser = A2uiStreamParser(catalog=mock_catalog) + + # Send A2UI block opening bracket + parser.process_chunk(f"{A2UI_OPEN_TAG}[") + + # 1. Establish root for surface 1 + parser.process_chunk( + '{"beginRendering": {"surfaceId": "surface1", "root": "root1"}},' + ) + + # 2. Establish root for surface 2 + parser.process_chunk( + '{"beginRendering": {"surfaceId": "surface2", "root": "root2"}},' + ) + + # 3. Stream components for surface 1 in chunks + chunk_s1_a = ( + '{"surfaceUpdate": {"surfaceId": "surface1", "components": [' + '{"id": "root1", "component": {"Card": {"child": "c1"}}}, ' + ) + response_parts = parser.process_chunk(chunk_s1_a) + expected_s1_a = [{ + "surfaceUpdate": { + "surfaceId": "surface1", + "components": [ + { + "component": parser._placeholder_component["component"], + "id": "loading_c1", + }, + { + "id": "root1", + "component": {"Card": {"child": "loading_c1"}}, + }, + ], + } + }] + assertResponseContainsMessages(response_parts, expected_s1_a) + + chunk_s1_b = '{"id": "c1", "component": {"Text": {"text": "hello s1"}}}]}}, ' + response_parts = parser.process_chunk(chunk_s1_b) + + expected_s1_b = [{ + "surfaceUpdate": { + "surfaceId": "surface1", + "components": [ + { + "id": "c1", + "component": { + "Text": { + "text": "hello s1", + } + }, + }, + { + "id": "root1", + "component": {"Card": {"child": "c1"}}, + }, + ], + } + }] + assertResponseContainsMessages(response_parts, expected_s1_b) + + # 4. Stream components for surface 2 + chunk_s2 = ( + '{"surfaceUpdate": {"surfaceId": "surface2", "components": [' + '{"id": "root2", "component": {"Card": {"child": "c2"}}}, ' + '{"id": "c2", "component": {"Text": {"text": "hello s2"}}}]}}' + ) + response_parts = parser.process_chunk(chunk_s2) + + expected_s2 = [{ + "surfaceUpdate": { + "surfaceId": "surface2", + "components": [ + { + "id": "c2", + "component": { + "Text": { + "text": "hello s2", + } + }, + }, + { + "id": "root2", + "component": {"Card": {"child": "c2"}}, + }, + ], + } + }] + assertResponseContainsMessages(response_parts, expected_s2) + + # Send A2UI block closing bracket + parser.process_chunk(f"]{A2UI_CLOSE_TAG}") diff --git a/agent_sdks/python/tests/core/parser/test_streaming_v09.py b/agent_sdks/python/tests/core/parser/test_streaming_v09.py index 6255467e3..88b40d753 100644 --- a/agent_sdks/python/tests/core/parser/test_streaming_v09.py +++ b/agent_sdks/python/tests/core/parser/test_streaming_v09.py @@ -2032,3 +2032,111 @@ def test_sniff_partial_component_enforces_required_fields(mock_catalog): }, }] assertResponseContainsMessages(response_parts, expected) + + +def test_multiple_concurrent_surfaces(mock_catalog): + """Verifies that the parser can handle multiple surfaces simultaneously.""" + parser = A2uiStreamParser(catalog=mock_catalog) + + # Send A2UI block opening bracket + parser.process_chunk(f"{A2UI_OPEN_TAG}[") + + # 1. Establish root for surface 1 + response_parts = parser.process_chunk( + '{"version": "v0.9", "createSurface": {"surfaceId": "surface1", "catalogId":' + ' "test_catalog"}},' + ) + expected_cs1 = [{ + "version": "v0.9", + "createSurface": {"catalogId": "test_catalog", "surfaceId": "surface1"}, + }] + assertResponseContainsMessages(response_parts, expected_cs1) + + # 2. Establish root for surface 2 + response_parts = parser.process_chunk( + '{"version": "v0.9", "createSurface": {"surfaceId": "surface2", "catalogId":' + ' "test_catalog"}},' + ) + expected_cs2 = [{ + "version": "v0.9", + "createSurface": {"catalogId": "test_catalog", "surfaceId": "surface2"}, + }] + assertResponseContainsMessages(response_parts, expected_cs2) + + # 3. Stream components for surface 1 in chunks + chunk_s1_a = ( + '{"version": "v0.9", "updateComponents": {"surfaceId": "surface1", "root":' + ' "root1", "components": [{"id": "root1", "component": "Card", "child": "c1"}, ' + ) + response_parts = parser.process_chunk(chunk_s1_a) + expected_s1_a = [{ + "version": "v0.9", + "updateComponents": { + "surfaceId": "surface1", + "root": "root1", + "components": [ + { + **parser._placeholder_component, + "id": "loading_c1", + }, + {"id": "root1", "component": "Card", "child": "loading_c1"}, + ], + }, + }] + assertResponseContainsMessages(response_parts, expected_s1_a) + + chunk_s1_b = '{"id": "c1", "component": "Text", "text": "hello s1"}]}}, ' + response_parts = parser.process_chunk(chunk_s1_b) + + expected_s1_b = [{ + "version": "v0.9", + "updateComponents": { + "surfaceId": "surface1", + "root": "root1", + "components": [ + { + "id": "c1", + "component": "Text", + "text": "hello s1", + }, + { + "id": "root1", + "component": "Card", + "child": "c1", + }, + ], + }, + }] + assertResponseContainsMessages(response_parts, expected_s1_b) + + # 4. Stream components for surface 2 + chunk_s2 = ( + '{"version": "v0.9", "updateComponents": {"surfaceId": "surface2", "root":' + ' "root2", "components": [{"id": "root2", "component": "Card", "child": "c2"},' + ' {"id": "c2", "component": "Text", "text": "hello s2"}]}}' + ) + response_parts = parser.process_chunk(chunk_s2) + + expected_s2 = [{ + "version": "v0.9", + "updateComponents": { + "surfaceId": "surface2", + "root": "root2", + "components": [ + { + "id": "c2", + "component": "Text", + "text": "hello s2", + }, + { + "id": "root2", + "component": "Card", + "child": "c2", + }, + ], + }, + }] + assertResponseContainsMessages(response_parts, expected_s2) + + # Send A2UI block closing bracket + parser.process_chunk(f"]{A2UI_CLOSE_TAG}")