Skip to content

Commit 3dd8436

Browse files
etirelliAmbient Code Botclaude
authored
fix(runner): framework-agnostic observability naming + MLflow trace IDs (#1283)
<!-- acp:session_id=session-7d8df959-1f85-4738-ba2b-f08569ce8f7b source=#1283 last_action=2026-04-10T19:17:53Z retry_count=1 --> ## Summary Follow-up to the merged MLflow observability work: **`main` landed the integration before these two commits could ship.** This PR restores them. ### 1. Framework-agnostic tracing names (Langfuse + MLflow) - Turn trace / span name: **`claude_interaction` → `llm_interaction`** - Langfuse tags: **`claude-code` → `runner:<RUNNER_TYPE>`** (from `RUNNER_TYPE`) - Session metrics span: **`Claude Code - Session Metrics` → `Session Metrics`** - Metrics source metadata: **`claude-code-metrics` → `ambient-runner-metrics`** - Module docs updated so observability is described as runner-wide, not Claude-only. **Migration:** Existing Langfuse dashboards, alerts, or saved views that filter on the old trace name or tags need to be updated to the new values. ### 2. MLflow-only: trace IDs for middleware and feedback When **`OBSERVABILITY_BACKENDS=mlflow`** (no Langfuse turn), the runner now: - Exposes **`mlflow.get_active_trace_id()`** via **`ObservabilityManager.get_current_trace_id()`** - Syncs **`last_trace_id`** after **`start_turn`** so AG-UI trace events, corrections, and **`/feedback`** resolution behave like the Langfuse path. ## Testing - `pytest` (runner): `test_observability`, `test_observability_metrics`, `test_observability_mlflow_integration`, `test_privacy_masking` (101 tests in local run) ## Commits - `refactor(runner): framework-agnostic observability naming (Gage review)` (cherry-picked) - `fix(runner): expose MLflow trace ids when Langfuse is off` (cherry-picked) <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit * **New Features** * Runner type is captured and included in observability metadata and span attributes. * **Changes** * Turn traces renamed vendor‑neutrally to "llm_interaction". * Session metrics span renamed to "Session Metrics" and source to "ambient-runner-metrics". * Middleware event name changed to "ambient:trace_id" and now reflects Langfuse or MLflow trace id. * **Bug Fixes** * Improved trace correlation between Langfuse and MLflow when one backend is inactive. * **Documentation** * README updated to document turn trace naming and runner type tagging. * **Tests** * Updated and added tests validating tagging, naming, and MLflow/Langfuse trace behavior. <!-- end of auto-generated comment: release notes by coderabbit.ai --> --------- Co-authored-by: Ambient Code Bot <bot@ambient-code.local> Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 2389565 commit 3dd8436

File tree

9 files changed

+172
-63
lines changed

9 files changed

+172
-63
lines changed

components/runners/ambient-runner/README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,7 @@ The workspace context prompt is built by `build_sdk_system_prompt()` in `prompts
139139
**Backend selection**
140140

141141
- `OBSERVABILITY_BACKENDS` - Comma-separated: `langfuse`, `mlflow`, or both (e.g. `langfuse,mlflow`). If unset, defaults to **`langfuse`** only so existing Langfuse behaviour is preserved.
142+
- Turn traces are named **`llm_interaction`** (vendor-neutral). **`RUNNER_TYPE`** (same values as bridge selection: `claude-agent-sdk`, `gemini-cli`, …) is added to Langfuse tags (`runner:<type>`) and to span metadata for MLflow / Langfuse.
142143

143144
**MLflow GenAI tracing** (optional extra: `pip install 'ambient-runner[mlflow-observability]'` — pins **`mlflow[kubernetes]>=3.11`** for cluster auth)
144145

components/runners/ambient-runner/ambient_runner/middleware/tracing.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,9 @@ async def tracing_middleware(
4242
prompt: User prompt (used as input for the first turn trace).
4343
4444
Yields:
45-
The original events plus an ``ambient:langfuse_trace`` ``CustomEvent``
46-
once the Langfuse trace ID becomes available.
45+
The original events plus an ``ambient:trace_id`` ``CustomEvent``
46+
once the trace ID (from Langfuse or MLflow, depending on active
47+
backend) becomes available.
4748
"""
4849
# Fast path: no observability — just pass through
4950
if obs is None:
@@ -71,7 +72,7 @@ async def tracing_middleware(
7172
if trace_id:
7273
yield CustomEvent(
7374
type=EventType.CUSTOM,
74-
name="ambient:langfuse_trace",
75+
name="ambient:trace_id",
7576
value={"traceId": trace_id},
7677
)
7778
trace_id_emitted = True

components/runners/ambient-runner/ambient_runner/mlflow_observability.py

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,11 @@
88

99
logger = logging.getLogger(__name__)
1010

11+
# Align with observability.TURN_TRACE_NAME / SESSION_METRICS_* (avoid import cycle).
12+
_TURN_SPAN_NAME = "llm_interaction"
13+
_SESSION_METRICS_SPAN_NAME = "Session Metrics"
14+
_SESSION_METRICS_SOURCE = "ambient-runner-metrics"
15+
1116

1217
class MLflowSessionTracer:
1318
"""Mirrors turn/tool boundaries from ObservabilityManager into MLflow spans."""
@@ -22,6 +27,7 @@ def __init__(self, session_id: str, user_id: str, user_name: str) -> None:
2227
self._turn_gen: Any = None
2328
self._turn_span: Any = None
2429
self._tool_ctx: dict[str, tuple[Any, Any]] = {}
30+
self._runner_type = ""
2531

2632
@property
2733
def enabled(self) -> bool:
@@ -41,6 +47,7 @@ def initialize(
4147
workflow_branch: str,
4248
workflow_path: str,
4349
mask_fn: Callable[[Any], Any] | None,
50+
runner_type: str | None = None,
4451
) -> bool:
4552
"""Configure tracking URI and experiment. Returns True on success."""
4653
try:
@@ -88,6 +95,9 @@ def initialize(
8895

8996
self._namespace = namespace
9097
self._mask_fn = mask_fn
98+
self._runner_type = (
99+
runner_type or os.getenv("RUNNER_TYPE", "claude-agent-sdk") or ""
100+
).strip().lower() or "unknown"
91101
self._enabled = True
92102
logger.info(
93103
"MLflow: session tracing enabled (session_id=%s, experiment=%s)",
@@ -125,12 +135,13 @@ def start_turn(self, model: str, user_input: str | None) -> None:
125135
text_in = self._apply_mask(text_in)
126136

127137
gen = mlflow.start_span(
128-
name="claude_interaction",
138+
name=_TURN_SPAN_NAME,
129139
span_type=SpanType.CHAIN,
130140
attributes={
131141
"ambient.session_id": self.session_id,
132142
"ambient.user_id": self.user_id,
133143
"ambient.namespace": self._namespace,
144+
"ambient.runner_type": self._runner_type,
134145
"llm.model_name": model,
135146
},
136147
)
@@ -249,9 +260,12 @@ def emit_session_summary_span(self, metadata: dict[str, Any]) -> None:
249260
from mlflow.entities import SpanType
250261

251262
with mlflow.start_span(
252-
name="Claude Code - Session Metrics",
263+
name=_SESSION_METRICS_SPAN_NAME,
253264
span_type=SpanType.CHAIN,
254-
attributes={"ambient.source": "claude-code-metrics"},
265+
attributes={
266+
"ambient.source": _SESSION_METRICS_SOURCE,
267+
"ambient.runner_type": self._runner_type,
268+
},
255269
) as span:
256270
span.set_inputs(
257271
{

components/runners/ambient-runner/ambient_runner/observability.py

Lines changed: 101 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -1,38 +1,44 @@
11
"""
2-
Observability manager for Claude Code runner - hybrid Langfuse integration.
3-
4-
Provides Langfuse LLM observability for Claude sessions with trace structure:
5-
6-
1. Turn Traces (top-level generations):
7-
- ONE trace per turn (SDK sends multiple AssistantMessages during streaming, but guard prevents duplicates)
8-
- Named: "claude_interaction" (turn number stored in metadata)
9-
- First AssistantMessage creates trace, subsequent ones ignored until end_turn() clears it
10-
- Final trace contains authoritative turn number and usage data from ResultMessage
11-
- Canonical format with separate cache token tracking for accurate cost
12-
- All traces grouped by session_id via propagate_attributes()
13-
14-
2. Tool Spans (observations within turn traces):
15-
- Named: tool_Read, tool_Write, tool_Bash, etc.
16-
- Shows tool execution in real-time
17-
- NO usage/cost data (prevents inflation from SDK's cumulative metrics)
18-
- Child observations of their parent turn trace
2+
Observability manager for ambient-runner — Langfuse and/or MLflow.
3+
4+
Works across runner backends (Claude Agent SDK, Gemini CLI, etc.). Span names are
5+
vendor-neutral; ``RUNNER_TYPE`` tags traces for the active bridge. When both
6+
backends are enabled, the same turn/tool boundaries are mirrored into MLflow.
7+
8+
1. Turn traces (top-level generations):
9+
- ONE trace per turn (SDK may send multiple assistant messages during streaming;
10+
a guard prevents duplicate traces for the same turn)
11+
- Named ``llm_interaction`` (turn number stored in metadata)
12+
- First assistant message for a turn creates the trace; later ones are ignored
13+
until ``end_turn()`` clears the active turn
14+
- Final trace update contains the authoritative turn number and usage from the
15+
SDK result message (e.g. ``ResultMessage``)
16+
- Canonical usage format with separate cache token fields for accurate cost
17+
- Traces grouped by ``session_id`` via ``propagate_attributes()`` (Langfuse);
18+
MLflow uses the same session/user tags on spans
19+
20+
2. Tool spans (within the current turn trace):
21+
- Named ``tool_<name>`` (e.g. ``tool_Read``, ``tool_Write``, ``tool_Bash``)
22+
- Reflect tool execution in real time
23+
- NO usage/cost on tool spans (avoids double-counting vs turn-level usage)
1924
2025
Architecture:
21-
- Session-based grouping via propagate_attributes() with session_id and user_id
22-
- Each turn creates ONE independent trace (not nested under session)
23-
- Langfuse automatically aggregates tokens and costs across all traces with same session_id
24-
- Filter by session_id, user_id, model, or metadata.turn in Langfuse UI
25-
- Sessions can be paused/resumed: each turn creates a trace regardless of session lifecycle
26-
27-
Trace Hierarchy:
28-
claude_interaction (trace - generation, metadata: {turn: 1})
29-
├── tool_Read (observation - span)
30-
└── tool_Write (observation - span)
31-
32-
claude_interaction (trace - generation, metadata: {turn: 2})
33-
└── tool_Bash (observation - span)
34-
35-
Usage Format:
26+
- Session-based grouping via ``propagate_attributes()`` with ``session_id`` and
27+
``user_id`` (Langfuse); MLflow sets equivalent attributes/tags on spans
28+
- Each turn is ONE independent trace (not nested under a single parent session trace)
29+
- Langfuse aggregates tokens/costs across traces sharing ``session_id``; filter by
30+
``session_id``, ``user_id``, model, or ``metadata.turn`` in the Langfuse UI
31+
- Sessions can be paused/resumed: each turn still gets a trace when it runs
32+
33+
Trace hierarchy (conceptual):
34+
llm_interaction (generation, metadata: {turn: 1})
35+
├── tool_Read (observation / span)
36+
└── tool_Write (observation / span)
37+
38+
llm_interaction (generation, metadata: {turn: 2})
39+
└── tool_Bash (observation / span)
40+
41+
Usage format (turn-level):
3642
{
3743
"input": int, # Regular input tokens
3844
"output": int, # Output tokens
@@ -65,6 +71,18 @@
6571
# Alias for tests and legacy imports
6672
_privacy_masking_function = privacy_mask_message_data
6773

74+
75+
def _runner_type_slug() -> str:
76+
"""Stable label from ``RUNNER_TYPE`` (see ``main.BRIDGE_REGISTRY``)."""
77+
return os.getenv("RUNNER_TYPE", "claude-agent-sdk").strip().lower() or "unknown"
78+
79+
80+
# Langfuse / MLflow turn trace name — not tied to a single vendor SDK.
81+
TURN_TRACE_NAME = "llm_interaction"
82+
SESSION_METRICS_SPAN_NAME = "Session Metrics"
83+
# Metadata ``source`` for session-level metric spans (Langfuse + MLflow).
84+
SESSION_METRICS_SOURCE = "ambient-runner-metrics"
85+
6886
# Canonical token key names used across usage dicts from the Claude Agent SDK.
6987
_TOKEN_KEYS = (
7088
"input_tokens",
@@ -80,7 +98,7 @@ def is_langfuse_enabled() -> bool:
8098

8199

82100
class ObservabilityManager:
83-
"""Manages Langfuse observability for Claude sessions."""
101+
"""Manages Langfuse and/or MLflow observability for agent sessions."""
84102

85103
def __init__(self, session_id: str, user_id: str, user_name: str):
86104
"""Initialize observability manager.
@@ -185,6 +203,7 @@ async def initialize(
185203
workflow_branch=workflow_branch,
186204
workflow_path=workflow_path,
187205
mask_fn=mask_fn,
206+
runner_type=_runner_type_slug(),
188207
)
189208
except Exception as e:
190209
logging.warning(
@@ -261,7 +280,7 @@ def _initialize_langfuse(
261280
"initial_prompt": prompt[:200] if len(prompt) > 200 else prompt,
262281
}
263282

264-
tags = ["claude-code", f"namespace:{namespace}"]
283+
tags = [f"runner:{_runner_type_slug()}", f"namespace:{namespace}"]
265284

266285
if model:
267286
sanitized_model = sanitize_model_name(model)
@@ -351,7 +370,7 @@ def _has_active_turn(self) -> bool:
351370

352371
@staticmethod
353372
def _extract_assistant_text(message: Any) -> str:
354-
"""Extract assistant text from a Claude SDK message (or best-effort without SDK)."""
373+
"""Extract assistant text from an agent SDK message (or best-effort without SDK)."""
355374
try:
356375
from claude_agent_sdk import TextBlock
357376
except ImportError:
@@ -421,10 +440,10 @@ def start_turn(self, model: str, user_input: str | None = None) -> None:
421440
self._current_turn_ctx = (
422441
self.langfuse_client.start_as_current_observation(
423442
as_type="generation",
424-
name="claude_interaction",
443+
name=TURN_TRACE_NAME,
425444
input=input_content,
426445
model=model,
427-
metadata={},
446+
metadata={"runner_type": _runner_type_slug()},
428447
)
429448
)
430449
self._current_turn_generation = self._current_turn_ctx.__enter__()
@@ -445,28 +464,58 @@ def start_turn(self, model: str, user_input: str | None = None) -> None:
445464
self._mlflow.start_turn(model, resolved_input)
446465
except Exception as e:
447466
logging.warning("MLflow: start_turn failed: %s", e, exc_info=True)
467+
else:
468+
# Langfuse sets _last_trace_id when _current_turn_generation exists; for
469+
# MLflow-only runs, persist the active MLflow trace id for middleware/feedback.
470+
if not self._current_turn_generation:
471+
self._sync_last_trace_id_from_mlflow()
472+
473+
def _sync_last_trace_id_from_mlflow(self) -> None:
474+
"""Set _last_trace_id from MLflow when a turn span is active (MLflow-only path)."""
475+
if not self.mlflow_tracing_active or self._mlflow is None:
476+
return
477+
if not self._mlflow.has_active_turn:
478+
return
479+
try:
480+
import mlflow
481+
482+
tid = mlflow.get_active_trace_id()
483+
if tid:
484+
self._last_trace_id = tid
485+
except Exception as e:
486+
logging.debug("MLflow: could not read active trace id: %s", e)
448487

449488
def get_current_trace_id(self) -> str | None:
450489
"""Get the current turn's trace ID for feedback association.
451490
452491
Returns:
453-
The Langfuse trace ID if a turn is active, None otherwise.
492+
Langfuse trace ID when a Langfuse turn is active; otherwise the MLflow
493+
active trace ID when MLflow tracing is on and a turn span is open.
454494
"""
455-
if not self._current_turn_generation:
456-
return None
495+
if self._current_turn_generation:
496+
try:
497+
return getattr(self._current_turn_generation, "trace_id", None)
498+
except Exception:
499+
return None
500+
if (
501+
self.mlflow_tracing_active
502+
and self._mlflow is not None
503+
and self._mlflow.has_active_turn
504+
):
505+
try:
506+
import mlflow
457507

458-
# The generation object has a trace_id attribute
459-
try:
460-
return getattr(self._current_turn_generation, "trace_id", None)
461-
except Exception:
462-
return None
508+
return mlflow.get_active_trace_id()
509+
except Exception:
510+
return None
511+
return None
463512

464513
@property
465514
def last_trace_id(self) -> str | None:
466-
"""Most recent Langfuse trace ID (persists after turn ends).
515+
"""Most recent trace ID for the active backends (persists after turn ends).
467516
468-
Used by the feedback endpoint to attach scores to the correct trace
469-
without requiring the backend to scan the event log.
517+
Langfuse or MLflow depending on configuration; used by the feedback endpoint
518+
and AG-UI trace events when the runner owns the correlation id.
470519
"""
471520
return self._last_trace_id
472521

@@ -495,7 +544,7 @@ def end_turn(
495544
496545
Args:
497546
turn_count: Current turn number (from SDK's authoritative num_turns in ResultMessage)
498-
message: AssistantMessage from Claude SDK
547+
message: Assistant message from the active agent SDK
499548
usage: Usage dict from ResultMessage with input_tokens, output_tokens, cache tokens, etc.
500549
"""
501550
if not self.langfuse_client and not self.mlflow_tracing_active:
@@ -871,7 +920,8 @@ def _emit_session_summary(self) -> None:
871920
scores = metric.to_flat_scores()
872921

873922
span_metadata = {
874-
"source": "claude-code-metrics",
923+
"source": SESSION_METRICS_SOURCE,
924+
"runner_type": _runner_type_slug(),
875925
"session_id": self.session_id,
876926
"user_id": self.user_id,
877927
"namespace": self.namespace,
@@ -883,7 +933,7 @@ def _emit_session_summary(self) -> None:
883933

884934
if self.langfuse_client:
885935
with self.langfuse_client.start_as_current_span(
886-
name="Claude Code - Session Metrics",
936+
name=SESSION_METRICS_SPAN_NAME,
887937
input={
888938
"session_id": self.session_id,
889939
"user_id": self.user_id,

components/runners/ambient-runner/tests/test_observability.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -182,7 +182,7 @@ async def test_init_successful(
182182
call_kwargs = mock_propagate.call_args[1]
183183
assert call_kwargs["user_id"] == manager.user_id
184184
assert call_kwargs["session_id"] == manager.session_id
185-
assert "claude-code" in call_kwargs["tags"]
185+
assert "runner:claude-agent-sdk" in call_kwargs["tags"]
186186

187187
assert "Session tracking enabled" in caplog.text
188188

@@ -291,7 +291,7 @@ def test_start_turn_creates_generation(self, manager):
291291
mock_client.start_as_current_observation.assert_called_once()
292292
call_kwargs = mock_client.start_as_current_observation.call_args[1]
293293
assert call_kwargs["as_type"] == "generation"
294-
assert call_kwargs["name"] == "claude_interaction"
294+
assert call_kwargs["name"] == "llm_interaction"
295295
assert call_kwargs["model"] == "claude-3-5-sonnet"
296296

297297
assert manager._current_turn_generation is not None

components/runners/ambient-runner/tests/test_observability_metrics.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -369,10 +369,11 @@ def test_emit_summary_with_metrics(self, manager_with_langfuse):
369369
# Verify Langfuse span was created
370370
assert mgr.langfuse_client.start_as_current_span.called
371371
call_kwargs = mgr.langfuse_client.start_as_current_span.call_args[1]
372-
assert call_kwargs["name"] == "Claude Code - Session Metrics"
372+
assert call_kwargs["name"] == "Session Metrics"
373373

374374
meta = call_kwargs["metadata"]
375-
assert meta["source"] == "claude-code-metrics"
375+
assert meta["source"] == "ambient-runner-metrics"
376+
assert meta["runner_type"] == "claude-agent-sdk"
376377
# Verify consolidated metadata matches trace-level fields
377378
assert meta["namespace"] == ""
378379
assert meta["user_name"] == "Test User"

0 commit comments

Comments
 (0)