Skip to content

Commit e2964f0

Browse files
caohy1988claude
andcommitted
fix: add fork-safety and auto-create analytics views to BQ plugin
Add PID tracking to detect post-fork broken gRPC channels (#4636) and automatically create per-event-type BigQuery views that unnest JSON columns into typed, queryable columns (#4639). Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 1206add commit e2964f0

File tree

2 files changed

+383
-0
lines changed

2 files changed

+383
-0
lines changed

src/google/adk/plugins/bigquery_agent_analytics_plugin.py

Lines changed: 179 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import json
2828
import logging
2929
import mimetypes
30+
import os
3031
import random
3132
import time
3233
from types import MappingProxyType
@@ -498,6 +499,9 @@ class BigQueryLoggerConfig:
498499
# dropped or altered). Safe to leave enabled; a version label on the
499500
# table ensures the diff runs at most once per schema version.
500501
auto_schema_upgrade: bool = True
502+
# Automatically create per-event-type BigQuery views that unnest
503+
# JSON columns into typed, queryable columns.
504+
create_views: bool = True
501505

502506

503507
# ==============================================================================
@@ -1581,6 +1585,115 @@ def _get_events_schema() -> list[bigquery.SchemaField]:
15811585
]
15821586

15831587

1588+
# ==============================================================================
1589+
# ANALYTICS VIEW DEFINITIONS
1590+
# ==============================================================================
1591+
1592+
# Columns included in every per-event-type view.
1593+
_VIEW_COMMON_COLUMNS = (
1594+
"timestamp",
1595+
"event_type",
1596+
"agent",
1597+
"session_id",
1598+
"invocation_id",
1599+
"user_id",
1600+
"trace_id",
1601+
"span_id",
1602+
"parent_span_id",
1603+
"status",
1604+
"error_message",
1605+
"is_truncated",
1606+
)
1607+
1608+
# Per-event-type column extractions. Each value is a list of
1609+
# ``"SQL_EXPR AS alias"`` strings that will be appended after the
1610+
# common columns in the view SELECT.
1611+
_EVENT_VIEW_DEFS: dict[str, list[str]] = {
1612+
"USER_MESSAGE_RECEIVED": [],
1613+
"LLM_REQUEST": [
1614+
"JSON_VALUE(attributes, '$.model') AS model",
1615+
"content AS request_content",
1616+
"JSON_QUERY(attributes, '$.llm_config') AS llm_config",
1617+
"JSON_QUERY(attributes, '$.tools') AS tools",
1618+
],
1619+
"LLM_RESPONSE": [
1620+
"JSON_QUERY(content, '$.response') AS response",
1621+
(
1622+
"CAST(JSON_VALUE(content, '$.usage.prompt')"
1623+
" AS INT64) AS usage_prompt_tokens"
1624+
),
1625+
(
1626+
"CAST(JSON_VALUE(content, '$.usage.completion')"
1627+
" AS INT64) AS usage_completion_tokens"
1628+
),
1629+
(
1630+
"CAST(JSON_VALUE(content, '$.usage.total')"
1631+
" AS INT64) AS usage_total_tokens"
1632+
),
1633+
"CAST(JSON_VALUE(latency_ms, '$.total_ms') AS INT64) AS total_ms",
1634+
(
1635+
"CAST(JSON_VALUE(latency_ms,"
1636+
" '$.time_to_first_token_ms') AS INT64) AS ttft_ms"
1637+
),
1638+
"JSON_VALUE(attributes, '$.model_version') AS model_version",
1639+
"JSON_QUERY(attributes, '$.usage_metadata') AS usage_metadata",
1640+
],
1641+
"LLM_ERROR": [
1642+
"CAST(JSON_VALUE(latency_ms, '$.total_ms') AS INT64) AS total_ms",
1643+
],
1644+
"TOOL_STARTING": [
1645+
"JSON_VALUE(content, '$.tool') AS tool_name",
1646+
"JSON_QUERY(content, '$.args') AS tool_args",
1647+
"JSON_VALUE(content, '$.tool_origin') AS tool_origin",
1648+
],
1649+
"TOOL_COMPLETED": [
1650+
"JSON_VALUE(content, '$.tool') AS tool_name",
1651+
"JSON_QUERY(content, '$.result') AS tool_result",
1652+
"JSON_VALUE(content, '$.tool_origin') AS tool_origin",
1653+
"CAST(JSON_VALUE(latency_ms, '$.total_ms') AS INT64) AS total_ms",
1654+
],
1655+
"TOOL_ERROR": [
1656+
"JSON_VALUE(content, '$.tool') AS tool_name",
1657+
"JSON_QUERY(content, '$.args') AS tool_args",
1658+
"JSON_VALUE(content, '$.tool_origin') AS tool_origin",
1659+
"CAST(JSON_VALUE(latency_ms, '$.total_ms') AS INT64) AS total_ms",
1660+
],
1661+
"AGENT_STARTING": [
1662+
"JSON_VALUE(content, '$.text_summary') AS agent_instruction",
1663+
],
1664+
"AGENT_COMPLETED": [
1665+
"CAST(JSON_VALUE(latency_ms, '$.total_ms') AS INT64) AS total_ms",
1666+
],
1667+
"INVOCATION_STARTING": [],
1668+
"INVOCATION_COMPLETED": [],
1669+
"STATE_DELTA": [
1670+
"JSON_QUERY(attributes, '$.state_delta') AS state_delta",
1671+
],
1672+
"HITL_CREDENTIAL_REQUEST": [
1673+
"JSON_VALUE(content, '$.tool') AS tool_name",
1674+
"JSON_QUERY(content, '$.args') AS tool_args",
1675+
],
1676+
"HITL_CONFIRMATION_REQUEST": [
1677+
"JSON_VALUE(content, '$.tool') AS tool_name",
1678+
"JSON_QUERY(content, '$.args') AS tool_args",
1679+
],
1680+
"HITL_INPUT_REQUEST": [
1681+
"JSON_VALUE(content, '$.tool') AS tool_name",
1682+
"JSON_QUERY(content, '$.args') AS tool_args",
1683+
],
1684+
}
1685+
1686+
_VIEW_SQL_TEMPLATE = """\
1687+
CREATE OR REPLACE VIEW `{project}.{dataset}.{view_name}` AS
1688+
SELECT
1689+
{columns}
1690+
FROM
1691+
`{project}.{dataset}.{table}`
1692+
WHERE
1693+
event_type = '{event_type}'
1694+
"""
1695+
1696+
15841697
# ==============================================================================
15851698
# MAIN PLUGIN
15861699
# ==============================================================================
@@ -1660,6 +1773,7 @@ def __init__(
16601773
self.parser: Optional[HybridContentParser] = None
16611774
self._schema = None
16621775
self.arrow_schema = None
1776+
self._init_pid = os.getpid()
16631777

16641778
def _cleanup_stale_loop_states(self) -> None:
16651779
"""Removes entries for event loops that have been closed."""
@@ -1912,6 +2026,8 @@ def _ensure_schema_exists(self) -> None:
19122026
existing_table = self.client.get_table(self.full_table_id)
19132027
if self.config.auto_schema_upgrade:
19142028
self._maybe_upgrade_schema(existing_table)
2029+
if self.config.create_views:
2030+
self._create_analytics_views()
19152031
except cloud_exceptions.NotFound:
19162032
logger.info("Table %s not found, creating table.", self.full_table_id)
19172033
tbl = bigquery.Table(self.full_table_id, schema=self._schema)
@@ -1932,6 +2048,8 @@ def _ensure_schema_exists(self) -> None:
19322048
e,
19332049
exc_info=True,
19342050
)
2051+
if self.config.create_views:
2052+
self._create_analytics_views()
19352053
except Exception as e:
19362054
logger.error(
19372055
"Error checking for table %s: %s",
@@ -1980,6 +2098,44 @@ def _maybe_upgrade_schema(self, existing_table: bigquery.Table) -> None:
19802098
exc_info=True,
19812099
)
19822100

2101+
def _create_analytics_views(self) -> None:
2102+
"""Creates per-event-type BigQuery views (idempotent).
2103+
2104+
Each view filters the events table by ``event_type`` and
2105+
extracts JSON columns into typed, queryable columns. Uses
2106+
``CREATE OR REPLACE VIEW`` so it is safe to call repeatedly.
2107+
Errors are logged but never raised.
2108+
"""
2109+
for event_type, extra_cols in _EVENT_VIEW_DEFS.items():
2110+
view_name = "v_" + event_type.lower()
2111+
columns = ",\n ".join(list(_VIEW_COMMON_COLUMNS) + extra_cols)
2112+
sql = _VIEW_SQL_TEMPLATE.format(
2113+
project=self.project_id,
2114+
dataset=self.dataset_id,
2115+
view_name=view_name,
2116+
columns=columns,
2117+
table=self.table_id,
2118+
event_type=event_type,
2119+
)
2120+
try:
2121+
self.client.query(sql).result()
2122+
except Exception as e:
2123+
logger.error(
2124+
"Failed to create view %s: %s",
2125+
view_name,
2126+
e,
2127+
exc_info=True,
2128+
)
2129+
2130+
async def create_analytics_views(self) -> None:
2131+
"""Public async helper to (re-)create all analytics views.
2132+
2133+
Useful when views need to be refreshed explicitly, for example
2134+
after a schema upgrade.
2135+
"""
2136+
loop = asyncio.get_running_loop()
2137+
await loop.run_in_executor(self._executor, self._create_analytics_views)
2138+
19832139
async def shutdown(self, timeout: float | None = None) -> None:
19842140
"""Shuts down the plugin and releases resources.
19852141
@@ -2032,12 +2188,33 @@ def __getstate__(self):
20322188
state["parser"] = None
20332189
state["_started"] = False
20342190
state["_is_shutting_down"] = False
2191+
state["_init_pid"] = 0
20352192
return state
20362193

20372194
def __setstate__(self, state):
20382195
"""Custom unpickling to restore state."""
20392196
self.__dict__.update(state)
20402197

2198+
def _reset_runtime_state(self) -> None:
2199+
"""Resets all runtime state after a fork.
2200+
2201+
gRPC channels and asyncio locks are not safe to use after
2202+
``os.fork()``. This method clears them so the next call to
2203+
``_ensure_started()`` re-initializes everything in the child
2204+
process. Pure-data fields like ``_schema`` and
2205+
``arrow_schema`` are kept because they are safe across fork.
2206+
"""
2207+
self._setup_lock = None
2208+
self.client = None
2209+
self._loop_state_by_loop = {}
2210+
self._write_stream_name = None
2211+
self._executor = None
2212+
self.offloader = None
2213+
self.parser = None
2214+
self._started = False
2215+
self._is_shutting_down = False
2216+
self._init_pid = os.getpid()
2217+
20412218
async def __aenter__(self) -> BigQueryAgentAnalyticsPlugin:
20422219
await self._ensure_started()
20432220
return self
@@ -2047,6 +2224,8 @@ async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
20472224

20482225
async def _ensure_started(self, **kwargs) -> None:
20492226
"""Ensures that the plugin is started and initialized."""
2227+
if os.getpid() != self._init_pid:
2228+
self._reset_runtime_state()
20502229
if not self._started:
20512230
# Kept original lock name as it was not explicitly changed.
20522231
if self._setup_lock is None:

0 commit comments

Comments
 (0)