Skip to content

Commit 4b677e7

Browse files
GWealecopybara-github
authored andcommitted
feat: Extract and merge EventActions from A2A metadata
This change enables the conversion of ADK EventActions, serialized within A2A object metadata, back into ADK Event objects. It includes logic to parse JSON-encoded metadata values and to merge EventActions from multiple sources within an A2A Task Close #3968 Co-authored-by: George Weale <gweale@google.com> PiperOrigin-RevId: 886927688
1 parent 0f4c807 commit 4b677e7

File tree

2 files changed

+332
-9
lines changed

2 files changed

+332
-9
lines changed

src/google/adk/a2a/converters/to_adk_event.py

Lines changed: 103 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
from __future__ import annotations
1616

1717
from collections.abc import Callable
18+
import json
1819
import logging
1920
from typing import Any
2021
from typing import List
@@ -28,9 +29,11 @@
2829
from a2a.types import TaskState
2930
from a2a.types import TaskStatusUpdateEvent
3031
from google.genai import types as genai_types
32+
from pydantic import ValidationError
3133

3234
from ...agents.invocation_context import InvocationContext
3335
from ...events.event import Event
36+
from ...events.event_actions import EventActions
3437
from ..experimental import a2a_experimental
3538
from .part_converter import A2A_DATA_PART_METADATA_IS_LONG_RUNNING_KEY
3639
from .part_converter import A2APartToGenAIPartConverter
@@ -171,11 +174,15 @@ def _create_event(
171174
output_parts: List[genai_types.Part],
172175
invocation_context: Optional[InvocationContext],
173176
author: Optional[str],
177+
actions: Optional[EventActions] = None,
174178
long_running_function_ids: Optional[set[str]] = None,
175179
partial: bool = False,
176180
) -> Optional[Event]:
177181
"""Creates an ADK event from parts and metadata."""
178-
if not output_parts:
182+
event_actions = actions or EventActions()
183+
if not output_parts and not event_actions.model_dump(
184+
exclude_none=True, exclude_defaults=True
185+
):
179186
return None
180187

181188
event = Event(
@@ -186,19 +193,89 @@ def _create_event(
186193
),
187194
author=author or "a2a agent",
188195
branch=invocation_context.branch if invocation_context else None,
196+
actions=event_actions,
189197
long_running_tool_ids=(
190198
long_running_function_ids if long_running_function_ids else None
191199
),
192-
content=genai_types.Content(
193-
role="model",
194-
parts=output_parts,
200+
content=(
201+
genai_types.Content(
202+
role="model",
203+
parts=output_parts,
204+
)
205+
if output_parts
206+
else None
195207
),
196208
partial=partial,
197209
)
198210

199211
return event
200212

201213

214+
def _parse_adk_metadata_value(value: Any) -> Any:
215+
"""Parses ADK metadata values serialized through A2A."""
216+
if not isinstance(value, str):
217+
return value
218+
219+
try:
220+
return json.loads(value)
221+
except json.JSONDecodeError:
222+
return value
223+
224+
225+
def _extract_event_actions(
226+
metadata: Optional[dict[str, Any]],
227+
) -> EventActions:
228+
"""Extracts ADK event actions from A2A metadata."""
229+
if not metadata:
230+
return EventActions()
231+
232+
raw_actions = metadata.get(_get_adk_metadata_key("actions"))
233+
if raw_actions is None:
234+
return EventActions()
235+
236+
parsed_actions = _parse_adk_metadata_value(raw_actions)
237+
if not isinstance(parsed_actions, dict):
238+
logger.warning(
239+
"Ignoring invalid ADK actions metadata of type %s",
240+
type(parsed_actions).__name__,
241+
)
242+
return EventActions()
243+
244+
try:
245+
return EventActions.model_validate(parsed_actions)
246+
except ValidationError as error:
247+
logger.warning("Ignoring invalid ADK actions metadata: %s", error)
248+
return EventActions()
249+
250+
251+
def _merge_top_level_dicts(
252+
base: dict[str, Any], new_values: dict[str, Any]
253+
) -> dict[str, Any]:
254+
"""Merges dictionaries while preserving top-level overwrite semantics."""
255+
merged = dict(base)
256+
for key, value in new_values.items():
257+
if (
258+
key in merged
259+
and isinstance(merged[key], dict)
260+
and isinstance(value, dict)
261+
):
262+
merged[key] = {**merged[key], **value}
263+
else:
264+
merged[key] = value
265+
return merged
266+
267+
268+
def _merge_event_actions(
269+
existing_actions: EventActions, new_actions: EventActions
270+
) -> EventActions:
271+
"""Merges action metadata from multiple A2A sources."""
272+
merged_actions_data = _merge_top_level_dicts(
273+
existing_actions.model_dump(exclude_none=True, by_alias=True),
274+
new_actions.model_dump(exclude_none=True, by_alias=True),
275+
)
276+
return EventActions.model_validate(merged_actions_data)
277+
278+
202279
@a2a_experimental
203280
def convert_a2a_task_to_event(
204281
a2a_task: Task,
@@ -226,19 +303,28 @@ def convert_a2a_task_to_event(
226303
raise ValueError("A2A task cannot be None")
227304

228305
try:
306+
event_actions = EventActions()
229307
output_parts = []
230308
long_running_function_ids = set()
231309
if a2a_task.artifacts:
232310
artifact_parts = [
233311
part for artifact in a2a_task.artifacts for part in artifact.parts
234312
]
313+
for artifact in a2a_task.artifacts:
314+
event_actions = _merge_event_actions(
315+
event_actions, _extract_event_actions(artifact.metadata)
316+
)
235317
output_parts, _ = _convert_a2a_parts_to_adk_parts(
236318
artifact_parts, part_converter
237319
)
238320
if (
239321
a2a_task.status.message
240322
and a2a_task.status.state == TaskState.input_required
241323
):
324+
event_actions = _merge_event_actions(
325+
event_actions,
326+
_extract_event_actions(a2a_task.status.message.metadata),
327+
)
242328
parts, ids = _convert_a2a_parts_to_adk_parts(
243329
a2a_task.status.message.parts, part_converter
244330
)
@@ -249,6 +335,7 @@ def convert_a2a_task_to_event(
249335
output_parts,
250336
invocation_context,
251337
author,
338+
event_actions,
252339
long_running_function_ids,
253340
)
254341

@@ -288,7 +375,12 @@ def convert_a2a_message_to_event(
288375
output_parts, _ = _convert_a2a_parts_to_adk_parts(
289376
a2a_message.parts, part_converter
290377
)
291-
return _create_event(output_parts, invocation_context, author)
378+
return _create_event(
379+
output_parts,
380+
invocation_context,
381+
author,
382+
_extract_event_actions(a2a_message.metadata),
383+
)
292384

293385
except Exception as e:
294386
logger.error("Failed to convert A2A message to event: %s", e)
@@ -319,7 +411,11 @@ def convert_a2a_status_update_to_event(
319411
try:
320412
output_parts = []
321413
long_running_function_ids = set()
414+
event_actions = EventActions()
322415
if a2a_status_update.status.message:
416+
event_actions = _extract_event_actions(
417+
a2a_status_update.status.message.metadata
418+
)
323419
parts, ids = _convert_a2a_parts_to_adk_parts(
324420
a2a_status_update.status.message.parts, part_converter
325421
)
@@ -330,6 +426,7 @@ def convert_a2a_status_update_to_event(
330426
output_parts,
331427
invocation_context,
332428
author,
429+
event_actions,
333430
long_running_function_ids,
334431
)
335432
except Exception as e:
@@ -367,6 +464,7 @@ def convert_a2a_artifact_update_to_event(
367464
output_parts,
368465
invocation_context,
369466
author,
467+
_extract_event_actions(a2a_artifact_update.artifact.metadata),
370468
partial=not a2a_artifact_update.last_chunk,
371469
)
372470
except Exception as e:

0 commit comments

Comments
 (0)