Skip to content

Commit 140bab8

Browse files
caohy1988claude
andcommitted
feat(bq-plugin): Add schema auto-upgrade, tool provenance, and HITL tracing
Implements three enhancements for the BigQuery Agent Analytics plugin: 1. Schema Auto-Upgrade: Opt-in config (`auto_schema_upgrade`) that automatically adds missing columns to existing BQ tables. Uses table labels for schema version governance. 2. Tool Provenance: Adds `tool_origin` field to TOOL_* event content, distinguishing LOCAL, MCP, SUB_AGENT, TRANSFER_AGENT, and UNKNOWN origins via isinstance() checks on the tool object. 3. HITL Tracing: Emits additional HITL-specific events (HITL_CONFIRMATION_REQUEST, HITL_CREDENTIAL_REQUEST, HITL_INPUT_REQUEST + _COMPLETED variants) alongside normal TOOL_* events when adk_request_* tools are invoked. Closes #4554 (plugin items 3, 4, 5) Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent f27a9cf commit 140bab8

File tree

2 files changed

+408
-11
lines changed

2 files changed

+408
-11
lines changed

src/google/adk/plugins/bigquery_agent_analytics_plugin.py

Lines changed: 154 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,24 @@
7171
"google.adk.plugins.bigquery_agent_analytics", __version__
7272
)
7373

74+
# Bumped when the schema changes. Used as a table label for
75+
# governance and to decide whether auto-upgrade should run.
76+
_SCHEMA_VERSION = "3"
77+
_SCHEMA_VERSION_LABEL_KEY = "adk_schema_version"
78+
79+
# Human-in-the-loop (HITL) tool names that receive additional
80+
# dedicated event types alongside the normal TOOL_* events.
81+
_HITL_TOOL_NAMES = frozenset({
82+
"adk_request_credential",
83+
"adk_request_confirmation",
84+
"adk_request_input",
85+
})
86+
_HITL_EVENT_MAP = MappingProxyType({
87+
"adk_request_credential": "HITL_CREDENTIAL_REQUEST",
88+
"adk_request_confirmation": "HITL_CONFIRMATION_REQUEST",
89+
"adk_request_input": "HITL_INPUT_REQUEST",
90+
})
91+
7492

7593
def _safe_callback(func):
7694
"""Decorator that catches and logs exceptions in plugin callbacks.
@@ -132,6 +150,40 @@ def _format_content(
132150
return " | ".join(parts), truncated
133151

134152

153+
def _get_tool_origin(tool: "BaseTool") -> str:
154+
"""Returns the provenance category of a tool.
155+
156+
Uses lazy imports to avoid circular dependencies.
157+
158+
Args:
159+
tool: The tool instance.
160+
161+
Returns:
162+
One of LOCAL, MCP, SUB_AGENT, TRANSFER_AGENT, or UNKNOWN.
163+
"""
164+
# Import lazily to avoid circular dependencies.
165+
# pylint: disable=g-import-not-at-top
166+
from ..tools.agent_tool import AgentTool # pytype: disable=import-error
167+
from ..tools.function_tool import FunctionTool # pytype: disable=import-error
168+
from ..tools.transfer_to_agent_tool import TransferToAgentTool # pytype: disable=import-error
169+
170+
try:
171+
from ..tools.mcp_tool.mcp_tool import McpTool # pytype: disable=import-error
172+
except ImportError:
173+
McpTool = None
174+
175+
# Order matters: TransferToAgentTool is a subclass of FunctionTool.
176+
if McpTool is not None and isinstance(tool, McpTool):
177+
return "MCP"
178+
if isinstance(tool, TransferToAgentTool):
179+
return "TRANSFER_AGENT"
180+
if isinstance(tool, AgentTool):
181+
return "SUB_AGENT"
182+
if isinstance(tool, FunctionTool):
183+
return "LOCAL"
184+
return "UNKNOWN"
185+
186+
135187
def _recursive_smart_truncate(
136188
obj: Any, max_len: int, seen: Optional[set[int]] = None
137189
) -> tuple[Any, bool]:
@@ -435,6 +487,9 @@ class BigQueryLoggerConfig:
435487
log_session_metadata: bool = True
436488
# Static custom tags (e.g. {"agent_role": "sales"})
437489
custom_tags: dict[str, Any] = field(default_factory=dict)
490+
# If True, automatically add new columns to existing tables
491+
# when the plugin schema evolves. Only additive changes are made.
492+
auto_schema_upgrade: bool = False
438493

439494

440495
# ==============================================================================
@@ -1822,16 +1877,25 @@ def _atexit_cleanup(batch_processor: "BatchProcessor") -> None:
18221877
)
18231878

18241879
def _ensure_schema_exists(self) -> None:
1825-
"""Ensures the BigQuery table exists with the correct schema."""
1880+
"""Ensures the BigQuery table exists with the correct schema.
1881+
1882+
When ``config.auto_schema_upgrade`` is True and the table already
1883+
exists, missing columns are added automatically (additive only).
1884+
A ``adk_schema_version`` label is written for governance.
1885+
"""
18261886
try:
1827-
self.client.get_table(self.full_table_id)
1887+
existing_table = self.client.get_table(self.full_table_id)
1888+
if self.config.auto_schema_upgrade:
1889+
self._maybe_upgrade_schema(existing_table)
18281890
except cloud_exceptions.NotFound:
18291891
logger.info("Table %s not found, creating table.", self.full_table_id)
18301892
tbl = bigquery.Table(self.full_table_id, schema=self._schema)
18311893
tbl.time_partitioning = bigquery.TimePartitioning(
1832-
type_=bigquery.TimePartitioningType.DAY, field="timestamp"
1894+
type_=bigquery.TimePartitioningType.DAY,
1895+
field="timestamp",
18331896
)
18341897
tbl.clustering_fields = self.config.clustering_fields
1898+
tbl.labels = {_SCHEMA_VERSION_LABEL_KEY: _SCHEMA_VERSION}
18351899
try:
18361900
self.client.create_table(tbl)
18371901
except cloud_exceptions.Conflict:
@@ -1851,6 +1915,50 @@ def _ensure_schema_exists(self) -> None:
18511915
exc_info=True,
18521916
)
18531917

1918+
def _maybe_upgrade_schema(self, existing_table: bigquery.Table) -> None:
1919+
"""Adds missing columns to an existing table (additive only).
1920+
1921+
Args:
1922+
existing_table: The current BigQuery table object.
1923+
"""
1924+
stored_version = (existing_table.labels or {}).get(
1925+
_SCHEMA_VERSION_LABEL_KEY
1926+
)
1927+
if stored_version == _SCHEMA_VERSION:
1928+
return
1929+
1930+
existing_names = {f.name for f in existing_table.schema}
1931+
new_fields = [f for f in self._schema if f.name not in existing_names]
1932+
1933+
updated = False
1934+
if new_fields:
1935+
merged = list(existing_table.schema) + new_fields
1936+
existing_table.schema = merged
1937+
updated = True
1938+
logger.info(
1939+
"Auto-upgrading table %s: adding columns %s",
1940+
self.full_table_id,
1941+
[f.name for f in new_fields],
1942+
)
1943+
1944+
# Always stamp the version label so we skip on next run.
1945+
labels = dict(existing_table.labels or {})
1946+
labels[_SCHEMA_VERSION_LABEL_KEY] = _SCHEMA_VERSION
1947+
existing_table.labels = labels
1948+
updated = True
1949+
1950+
if updated:
1951+
try:
1952+
update_fields = ["schema", "labels"]
1953+
self.client.update_table(existing_table, update_fields)
1954+
except Exception as e:
1955+
logger.error(
1956+
"Schema auto-upgrade failed for %s: %s",
1957+
self.full_table_id,
1958+
e,
1959+
exc_info=True,
1960+
)
1961+
18541962
async def shutdown(self, timeout: float | None = None) -> None:
18551963
"""Shuts down the plugin and releases resources.
18561964
@@ -2460,14 +2568,28 @@ async def before_tool_callback(
24602568
args_truncated, is_truncated = _recursive_smart_truncate(
24612569
tool_args, self.config.max_content_length
24622570
)
2463-
content_dict = {"tool": tool.name, "args": args_truncated}
2571+
tool_origin = _get_tool_origin(tool)
2572+
content_dict = {
2573+
"tool": tool.name,
2574+
"args": args_truncated,
2575+
"tool_origin": tool_origin,
2576+
}
24642577
TraceManager.push_span(tool_context, "tool")
24652578
await self._log_event(
24662579
"TOOL_STARTING",
24672580
tool_context,
24682581
raw_content=content_dict,
24692582
is_truncated=is_truncated,
24702583
)
2584+
# Emit additional HITL event alongside the normal TOOL_STARTING.
2585+
hitl_event = _HITL_EVENT_MAP.get(tool.name)
2586+
if hitl_event:
2587+
await self._log_event(
2588+
hitl_event,
2589+
tool_context,
2590+
raw_content=content_dict,
2591+
is_truncated=is_truncated,
2592+
)
24712593

24722594
@_safe_callback
24732595
async def after_tool_callback(
@@ -2489,21 +2611,37 @@ async def after_tool_callback(
24892611
resp_truncated, is_truncated = _recursive_smart_truncate(
24902612
result, self.config.max_content_length
24912613
)
2492-
content_dict = {"tool": tool.name, "result": resp_truncated}
2614+
tool_origin = _get_tool_origin(tool)
2615+
content_dict = {
2616+
"tool": tool.name,
2617+
"result": resp_truncated,
2618+
"tool_origin": tool_origin,
2619+
}
24932620
span_id, duration = TraceManager.pop_span()
24942621
parent_span_id, _ = TraceManager.get_current_span_and_parent()
24952622

2623+
event_data = EventData(
2624+
latency_ms=duration,
2625+
span_id_override=span_id,
2626+
parent_span_id_override=parent_span_id,
2627+
)
24962628
await self._log_event(
24972629
"TOOL_COMPLETED",
24982630
tool_context,
24992631
raw_content=content_dict,
25002632
is_truncated=is_truncated,
2501-
event_data=EventData(
2502-
latency_ms=duration,
2503-
span_id_override=span_id,
2504-
parent_span_id_override=parent_span_id,
2505-
),
2633+
event_data=event_data,
25062634
)
2635+
# Emit additional HITL completion event.
2636+
hitl_event = _HITL_EVENT_MAP.get(tool.name)
2637+
if hitl_event:
2638+
await self._log_event(
2639+
hitl_event + "_COMPLETED",
2640+
tool_context,
2641+
raw_content=content_dict,
2642+
is_truncated=is_truncated,
2643+
event_data=event_data,
2644+
)
25072645

25082646
@_safe_callback
25092647
async def on_tool_error_callback(
@@ -2525,7 +2663,12 @@ async def on_tool_error_callback(
25252663
args_truncated, is_truncated = _recursive_smart_truncate(
25262664
tool_args, self.config.max_content_length
25272665
)
2528-
content_dict = {"tool": tool.name, "args": args_truncated}
2666+
tool_origin = _get_tool_origin(tool)
2667+
content_dict = {
2668+
"tool": tool.name,
2669+
"args": args_truncated,
2670+
"tool_origin": tool_origin,
2671+
}
25292672
_, duration = TraceManager.pop_span()
25302673
await self._log_event(
25312674
"TOOL_ERROR",

0 commit comments

Comments
 (0)