From 147d22ab6af83ec0f6fffce88c5a7a6617e51db4 Mon Sep 17 00:00:00 2001 From: Fabien Penso Date: Wed, 1 Apr 2026 11:33:16 +0100 Subject: [PATCH 1/4] feat(observability): add langfuse tracing --- Cargo.lock | 186 ++- Cargo.toml | 2 +- crates/agents/Cargo.toml | 2 + crates/agents/src/runner.rs | 723 +++++++-- crates/chat/Cargo.toml | 2 + crates/chat/src/lib.rs | 1401 +++++++++++------ crates/cli/Cargo.toml | 5 + crates/cli/src/main.rs | 124 +- crates/cli/src/telemetry.rs | 172 ++ crates/common/src/lib.rs | 1 + crates/common/src/observability.rs | 260 +++ crates/config/src/schema.rs | 120 ++ crates/config/src/template.rs | 15 + crates/config/src/validate.rs | 15 + crates/sessions/src/message.rs | 39 + crates/web/src/assets/js/chat-ui.js | 5 +- .../src/assets/js/components/run-detail.js | 22 + crates/web/src/assets/js/sessions.js | 6 + crates/web/src/assets/js/websocket.js | 24 +- docs/src/metrics-and-tracing.md | 42 + 20 files changed, 2481 insertions(+), 685 deletions(-) create mode 100644 crates/cli/src/telemetry.rs create mode 100644 crates/common/src/observability.rs diff --git a/Cargo.lock b/Cargo.lock index 7b828f96a..0ee4a5efc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -161,7 +161,7 @@ version = "1.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "40c48f72fd53cd289104fc64099abca73db4166ad86ea0b4341abe65af83dadc" dependencies = [ - "windows-sys 0.60.2", + "windows-sys 0.61.2", ] [[package]] @@ -172,7 +172,7 @@ checksum = "291e6a250ff86cd4a820112fb8898808a366d8f9f58ce16d1f538353ad55747d" dependencies = [ "anstyle", "once_cell_polyfill", - "windows-sys 0.60.2", + "windows-sys 0.61.2", ] [[package]] @@ -2403,7 +2403,7 @@ dependencies = [ "libc", "option-ext", "redox_users 0.5.2", - "windows-sys 0.59.0", + "windows-sys 0.61.2", ] [[package]] @@ -2701,7 +2701,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "39cab71617ae0d63f51a36d69f866391735b51691dbda63cf6f96d042b63efeb" dependencies = [ "libc", - "windows-sys 0.59.0", + "windows-sys 0.61.2", ] [[package]] @@ -5123,7 +5123,7 @@ dependencies = [ "portable-atomic", "portable-atomic-util", "serde_core", - "windows-sys 0.59.0", + "windows-sys 0.61.2", ] [[package]] @@ -5772,6 +5772,7 @@ name = "moltis" version = "0.1.0" dependencies = [ "anyhow", + "base64 0.22.1", "clap", "dotenvy", "moltis-agents", @@ -5793,6 +5794,9 @@ dependencies = [ "moltis-tools", "moltis-web", "open", + "opentelemetry", + "opentelemetry-otlp", + "opentelemetry_sdk", "rand 0.9.2", "reqwest 0.12.28", "secrecy 0.8.0", @@ -5803,6 +5807,7 @@ dependencies = [ "time", "tokio", "tracing", + "tracing-opentelemetry", "tracing-subscriber", "uuid", "which 8.0.0", @@ -5823,6 +5828,7 @@ dependencies = [ "moltis-metrics", "moltis-sessions", "moltis-skills", + "opentelemetry", "reqwest 0.12.28", "secrecy 0.8.0", "serde", @@ -5832,6 +5838,7 @@ dependencies = [ "tokio", "tokio-stream", "tracing", + "tracing-opentelemetry", "uuid", ] @@ -5982,6 +5989,7 @@ dependencies = [ "moltis-telegram", "moltis-tools", "moltis-voice", + "opentelemetry", "serde", "serde_json", "sqlx", @@ -5991,6 +5999,7 @@ dependencies = [ "tokio-stream", "toml", "tracing", + "tracing-opentelemetry", "uuid", ] @@ -7146,7 +7155,7 @@ version = "0.50.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7957b9740744892f114936ab4a57b3f487491bbeafaf8083688b16841a4240e5" dependencies = [ - "windows-sys 0.59.0", + "windows-sys 0.61.2", ] [[package]] @@ -7380,6 +7389,83 @@ dependencies = [ "vcpkg", ] +[[package]] +name = "opentelemetry" +version = "0.29.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9e87237e2775f74896f9ad219d26a2081751187eb7c9f5c58dde20a23b95d16c" +dependencies = [ + "futures-core", + "futures-sink", + "js-sys", + "pin-project-lite", + "thiserror 2.0.18", + "tracing", +] + +[[package]] +name = "opentelemetry-http" +version = "0.29.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "46d7ab32b827b5b495bd90fa95a6cb65ccc293555dcc3199ae2937d2d237c8ed" +dependencies = [ + "async-trait", + "bytes", + "http 1.4.0", + "opentelemetry", + "reqwest 0.12.28", + "tracing", +] + +[[package]] +name = "opentelemetry-otlp" +version = "0.29.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d899720fe06916ccba71c01d04ecd77312734e2de3467fd30d9d580c8ce85656" +dependencies = [ + "futures-core", + "http 1.4.0", + "opentelemetry", + "opentelemetry-http", + "opentelemetry-proto", + "opentelemetry_sdk", + "prost 0.13.5", + "reqwest 0.12.28", + "thiserror 2.0.18", +] + +[[package]] +name = "opentelemetry-proto" +version = "0.29.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8c40da242381435e18570d5b9d50aca2a4f4f4d8e146231adb4e7768023309b3" +dependencies = [ + "opentelemetry", + "opentelemetry_sdk", + "prost 0.13.5", + "tonic", +] + +[[package]] +name = "opentelemetry_sdk" +version = "0.29.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "afdefb21d1d47394abc1ba6c57363ab141be19e27cc70d0e422b7f303e4d290b" +dependencies = [ + "futures-channel", + "futures-executor", + "futures-util", + "glob", + "opentelemetry", + "percent-encoding", + "rand 0.9.2", + "serde_json", + "thiserror 2.0.18", + "tokio", + "tokio-stream", + "tracing", +] + [[package]] name = "option-ext" version = "0.2.0" @@ -7962,6 +8048,16 @@ dependencies = [ "parking_lot 0.12.5", ] +[[package]] +name = "prost" +version = "0.13.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2796faa41db3ec313a31f7624d9286acf277b52de526150b7e69f3debf891ee5" +dependencies = [ + "bytes", + "prost-derive 0.13.5", +] + [[package]] name = "prost" version = "0.14.3" @@ -7969,7 +8065,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d2ea70524a2f82d518bce41317d0fae74151505651af45faf1ffbd6fd33f0568" dependencies = [ "bytes", - "prost-derive", + "prost-derive 0.14.3", ] [[package]] @@ -7978,17 +8074,30 @@ version = "0.14.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "343d3bd7056eda839b03204e68deff7d1b13aba7af2b2fd16890697274262ee7" dependencies = [ - "heck 0.4.1", + "heck 0.5.0", "itertools 0.14.0", "log", "multimap", "petgraph", - "prost", + "prost 0.14.3", "prost-types", "regex", "tempfile", ] +[[package]] +name = "prost-derive" +version = "0.13.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8a56d757972c98b346a9b766e3f02746cde6dd1cd1d1d563472929fdd74bec4d" +dependencies = [ + "anyhow", + "itertools 0.14.0", + "proc-macro2", + "quote", + "syn 2.0.114", +] + [[package]] name = "prost-derive" version = "0.14.3" @@ -8008,7 +8117,7 @@ version = "0.14.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8991c4cbdb8bc5b11f0b074ffe286c30e523de90fee5ba8132f1399f23cb3dd7" dependencies = [ - "prost", + "prost 0.14.3", ] [[package]] @@ -8783,7 +8892,7 @@ dependencies = [ "errno", "libc", "linux-raw-sys 0.11.0", - "windows-sys 0.59.0", + "windows-sys 0.61.2", ] [[package]] @@ -8884,7 +8993,7 @@ dependencies = [ "security-framework 3.5.1", "security-framework-sys", "webpki-root-certs", - "windows-sys 0.59.0", + "windows-sys 0.61.2", ] [[package]] @@ -10208,7 +10317,7 @@ dependencies = [ "getrandom 0.3.4", "once_cell", "rustix 1.1.3", - "windows-sys 0.59.0", + "windows-sys 0.61.2", ] [[package]] @@ -10613,6 +10722,27 @@ version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5d99f8c9a7727884afe522e9bd5edbfc91a3312b36a77b5fb8926e4c31a41801" +[[package]] +name = "tonic" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "877c5b330756d856ffcc4553ab34a5684481ade925ecc54bcd1bf02b1d0d4d52" +dependencies = [ + "async-trait", + "base64 0.22.1", + "bytes", + "http 1.4.0", + "http-body 1.0.1", + "http-body-util", + "percent-encoding", + "pin-project", + "prost 0.13.5", + "tokio-stream", + "tower-layer", + "tower-service", + "tracing", +] + [[package]] name = "topological-sort" version = "0.2.2" @@ -10728,6 +10858,24 @@ dependencies = [ "tracing-core", ] +[[package]] +name = "tracing-opentelemetry" +version = "0.30.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fd8e764bd6f5813fd8bebc3117875190c5b0415be8f7f8059bffb6ecd979c444" +dependencies = [ + "js-sys", + "once_cell", + "opentelemetry", + "opentelemetry_sdk", + "smallvec", + "tracing", + "tracing-core", + "tracing-log", + "tracing-subscriber", + "web-time", +] + [[package]] name = "tracing-serde" version = "0.2.0" @@ -11269,7 +11417,7 @@ dependencies = [ "md5", "once_cell", "pbkdf2", - "prost", + "prost 0.14.3", "protobuf", "rand 0.9.2", "rand_core 0.9.5", @@ -11291,7 +11439,7 @@ checksum = "d8128197fd310dbc350cf7d417666d31591a84fa2d42b3b5692e48da9e0500ea" dependencies = [ "anyhow", "hkdf", - "prost", + "prost 0.14.3", "serde", "serde-big-array", "serde_json", @@ -11338,7 +11486,7 @@ dependencies = [ "hmac", "itertools 0.14.0", "log", - "prost", + "prost 0.14.3", "rand 0.9.2", "serde", "sha1", @@ -11381,7 +11529,7 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d0fac064a4b1d339d7343612c0544739118e23969464f3e215eae2813e4f65d0" dependencies = [ - "prost", + "prost 0.14.3", "prost-build", "serde", ] @@ -12150,7 +12298,7 @@ dependencies = [ "indexmap 2.13.0", "log", "moka", - "prost", + "prost 0.14.3", "rand 0.9.2", "rand_core 0.9.5", "scopeguard", @@ -12295,7 +12443,7 @@ version = "0.1.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c2a7b1c03c876122aa43f3020e6c3c3ee5c05081c9a00739faf7503aeba10d22" dependencies = [ - "windows-sys 0.48.0", + "windows-sys 0.61.2", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 9b9fffa63..1da8fc794 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -143,7 +143,7 @@ thiserror = "2" openssl = { features = ["vendored"], version = "0.10" } # Logging opentelemetry = { version = "0.29" } -opentelemetry-otlp = { features = ["tonic"], version = "0.29" } +opentelemetry-otlp = { default-features = false, features = ["http-proto", "reqwest-client", "trace"], version = "0.29" } opentelemetry_sdk = { features = ["rt-tokio"], version = "0.29" } tracing = "0.1" tracing-opentelemetry = "0.30" diff --git a/crates/agents/Cargo.toml b/crates/agents/Cargo.toml index ee8ae78d4..e31e35079 100644 --- a/crates/agents/Cargo.toml +++ b/crates/agents/Cargo.toml @@ -16,6 +16,7 @@ moltis-common = { workspace = true } moltis-config = { workspace = true } moltis-sessions = { workspace = true } moltis-skills = { workspace = true } +opentelemetry = { workspace = true } reqwest = { workspace = true } secrecy = { workspace = true } serde = { workspace = true } @@ -24,6 +25,7 @@ thiserror = { workspace = true } tokio = { workspace = true } tokio-stream = { workspace = true } tracing = { workspace = true } +tracing-opentelemetry = { workspace = true } uuid = { workspace = true } # Optional diff --git a/crates/agents/src/runner.rs b/crates/agents/src/runner.rs index eb739168b..068e5cdf7 100644 --- a/crates/agents/src/runner.rs +++ b/crates/agents/src/runner.rs @@ -2,13 +2,20 @@ use std::{borrow::Cow, fmt::Write, sync::Arc}; use { anyhow::{Result, bail}, - tracing::{debug, info, trace, warn}, + tracing::{Instrument, Span, debug, info, trace, warn}, + tracing_opentelemetry::OpenTelemetrySpanExt, }; #[cfg(feature = "metrics")] use moltis_metrics::{counter, histogram, labels, llm as llm_metrics}; -use moltis_common::hooks::{HookAction, HookPayload, HookRegistry}; +use { + moltis_common::{ + hooks::{HookAction, HookPayload, HookRegistry}, + observability::{sanitize_json_for_observability, sanitize_text_for_observability}, + }, + moltis_config::schema::TraceContentMode, +}; use crate::{ model::{ @@ -80,6 +87,23 @@ fn sanitize_tool_name(name: &str) -> Cow<'_, str> { const MALFORMED_TOOL_RETRY_PROMPT: &str = "Your tool call was malformed. Retry with exact format:\n\ ```tool_call\n{\"tool\": \"name\", \"arguments\": {...}}\n```"; const EMPTY_TOOL_NAME_RETRY_PROMPT: &str = "Your structured tool call had an empty tool name. Retry the same tool call using the intended tool's exact name and the same arguments."; +const TELEMETRY_TRACE_CONTENT_KEY: &str = "__telemetry_trace_content"; +const TELEMETRY_MAX_CONTENT_BYTES_KEY: &str = "__telemetry_max_content_bytes"; + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +struct ObservabilitySettings { + trace_content: TraceContentMode, + max_content_bytes: usize, +} + +impl Default for ObservabilitySettings { + fn default() -> Self { + Self { + trace_content: TraceContentMode::Off, + max_content_bytes: 8_192, + } + } +} fn find_empty_tool_name_call(tool_calls: &[ToolCall]) -> Option<&ToolCall> { tool_calls @@ -124,6 +148,129 @@ fn streaming_tool_call_message_content( } } +fn observability_settings(tool_context: &Option) -> ObservabilitySettings { + let Some(ctx) = tool_context.as_ref().and_then(serde_json::Value::as_object) else { + return ObservabilitySettings::default(); + }; + + let trace_content = match ctx + .get(TELEMETRY_TRACE_CONTENT_KEY) + .and_then(serde_json::Value::as_str) + { + Some("sanitized") => TraceContentMode::Sanitized, + Some("full") => TraceContentMode::Full, + _ => TraceContentMode::Off, + }; + let max_content_bytes = ctx + .get(TELEMETRY_MAX_CONTENT_BYTES_KEY) + .and_then(serde_json::Value::as_u64) + .and_then(|value| usize::try_from(value).ok()) + .filter(|value| *value > 0) + .unwrap_or(8_192); + + ObservabilitySettings { + trace_content, + max_content_bytes, + } +} + +fn is_internal_tool_context_key(key: &str) -> bool { + key.starts_with("__telemetry_") +} + +fn merge_tool_context(args: &mut serde_json::Value, tool_context: &Option) { + if let Some(ctx) = tool_context.as_ref() + && let (Some(args_obj), Some(ctx_obj)) = (args.as_object_mut(), ctx.as_object()) + { + for (key, value) in ctx_obj { + if is_internal_tool_context_key(key) { + continue; + } + args_obj.insert(key.clone(), value.clone()); + } + } +} + +fn tool_calls_to_json(tool_calls: &[ToolCall]) -> serde_json::Value { + serde_json::Value::Array( + tool_calls + .iter() + .map(|tool_call| { + serde_json::json!({ + "id": tool_call.id, + "name": tool_call.name, + "arguments": tool_call.arguments, + }) + }) + .collect(), + ) +} + +fn usage_to_json(usage: &Usage) -> serde_json::Value { + serde_json::json!({ + "input": usage.input_tokens, + "output": usage.output_tokens, + "cache_read": usage.cache_read_tokens, + "cache_write": usage.cache_write_tokens, + }) +} + +fn set_json_attribute( + span: &Span, + key: &'static str, + value: &serde_json::Value, + settings: ObservabilitySettings, +) { + let serialized = match settings.trace_content { + TraceContentMode::Off => None, + TraceContentMode::Sanitized => serde_json::to_string(&sanitize_json_for_observability( + value, + settings.max_content_bytes, + true, + )) + .ok(), + TraceContentMode::Full => serde_json::to_string(&sanitize_json_for_observability( + value, + settings.max_content_bytes, + false, + )) + .ok(), + }; + if let Some(serialized) = serialized { + span.set_attribute(key, serialized); + } +} + +fn set_text_attribute( + span: &Span, + key: &'static str, + value: &str, + settings: ObservabilitySettings, +) { + let serialized = match settings.trace_content { + TraceContentMode::Off => None, + TraceContentMode::Sanitized => Some(sanitize_text_for_observability( + value, + settings.max_content_bytes, + true, + )), + TraceContentMode::Full => Some(sanitize_text_for_observability( + value, + settings.max_content_bytes, + false, + )), + }; + if let Some(serialized) = serialized { + span.set_attribute(key, serialized); + } +} + +fn set_usage_attributes(span: &Span, usage: &Usage) { + if let Ok(serialized) = serde_json::to_string(&usage_to_json(usage)) { + span.set_attribute("langfuse.observation.usage_details", serialized); + } +} + /// Error patterns that indicate the context window has been exceeded. const CONTEXT_WINDOW_PATTERNS: &[&str] = &[ "context_length_exceeded", @@ -721,6 +868,7 @@ pub async fn run_agent_loop_with_context( .and_then(|v| v.as_str()) .unwrap_or("") .to_string(); + let observability = observability_settings(&tool_context); let mut iterations = 0; let mut total_tool_calls = 0; @@ -761,6 +909,33 @@ pub async fn run_agent_loop_with_context( ); trace!(iteration = iterations, messages = ?messages, "LLM request messages"); + let llm_span = tracing::info_span!( + "llm.request", + iteration = iterations, + provider = %provider.name(), + model = %provider.id() + ); + llm_span.set_attribute("langfuse.observation.type", "generation"); + llm_span.set_attribute("langfuse.observation.model.name", provider.id().to_string()); + if let Ok(metadata) = serde_json::to_string(&serde_json::json!({ + "iteration": iterations, + "message_count": messages.len(), + "tool_schema_count": schemas_for_api.len(), + "native_tools": native_tools, + })) { + llm_span.set_attribute("langfuse.observation.metadata", metadata); + } + let request_payload = serde_json::json!({ + "messages": messages.iter().map(ChatMessage::to_openai_value).collect::>(), + "tool_schema_count": schemas_for_api.len(), + }); + set_json_attribute( + &llm_span, + "langfuse.observation.input", + &request_payload, + observability, + ); + // Dispatch BeforeLLMCall hook — may block the LLM call. if let Some(ref hooks) = hook_registry { let msgs_json: Vec = @@ -794,45 +969,68 @@ pub async fn run_agent_loop_with_context( cb(RunnerEvent::Thinking); } - let mut response: CompletionResponse = - match provider.complete(&messages, schemas_for_api).await { - Ok(r) => r, - Err(e) => { - let msg = e.to_string(); - if is_context_window_error(&msg) { - return Err(AgentRunError::ContextWindowExceeded(msg)); - } - if let Some(delay_ms) = next_retry_delay_ms( - &msg, - &mut server_retries_remaining, - &mut rate_limit_retries_remaining, - &mut rate_limit_backoff_ms, - ) { - iterations -= 1; - warn!( - error = %msg, + let mut response: CompletionResponse = match provider + .complete(&messages, schemas_for_api) + .instrument(llm_span.clone()) + .await + { + Ok(r) => r, + Err(e) => { + let msg = e.to_string(); + llm_span.set_attribute("langfuse.observation.level", "ERROR"); + llm_span.set_attribute("langfuse.observation.status_message", msg.clone()); + set_text_attribute( + &llm_span, + "langfuse.observation.output", + &msg, + observability, + ); + if is_context_window_error(&msg) { + return Err(AgentRunError::ContextWindowExceeded(msg)); + } + if let Some(delay_ms) = next_retry_delay_ms( + &msg, + &mut server_retries_remaining, + &mut rate_limit_retries_remaining, + &mut rate_limit_backoff_ms, + ) { + iterations -= 1; + warn!( + error = %msg, + delay_ms, + server_retries_remaining, + rate_limit_retries_remaining, + "transient LLM error, retrying after delay" + ); + if let Some(cb) = on_event { + cb(RunnerEvent::RetryingAfterError { + error: msg, delay_ms, - server_retries_remaining, - rate_limit_retries_remaining, - "transient LLM error, retrying after delay" - ); - if let Some(cb) = on_event { - cb(RunnerEvent::RetryingAfterError { - error: msg, - delay_ms, - }); - } - tokio::time::sleep(std::time::Duration::from_millis(delay_ms)).await; - continue; + }); } - return Err(AgentRunError::Other(e)); - }, - }; + tokio::time::sleep(std::time::Duration::from_millis(delay_ms)).await; + continue; + } + return Err(AgentRunError::Other(e)); + }, + }; if let Some(cb) = on_event { cb(RunnerEvent::ThinkingDone); } + set_usage_attributes(&llm_span, &response.usage); + let response_payload = serde_json::json!({ + "text": response.text, + "tool_calls": tool_calls_to_json(&response.tool_calls), + }); + set_json_attribute( + &llm_span, + "langfuse.observation.output", + &response_payload, + observability, + ); + total_input_tokens = total_input_tokens.saturating_add(response.usage.input_tokens); total_output_tokens = total_output_tokens.saturating_add(response.usage.output_tokens); @@ -1064,14 +1262,27 @@ pub async fn run_agent_loop_with_context( let tc_name = sanitized.to_string(); let _tc_id = tc.id.clone(); - if let Some(ref ctx) = tool_context - && let (Some(args_obj), Some(ctx_obj)) = (args.as_object_mut(), ctx.as_object()) - { - for (k, v) in ctx_obj { - args_obj.insert(k.clone(), v.clone()); - } + merge_tool_context(&mut args, &tool_context); + let tool_span = tracing::info_span!( + "tool.call", + tool = %tc_name, + tool_call_id = %tc.id, + ); + tool_span.set_attribute("langfuse.observation.type", "tool"); + if let Ok(metadata) = serde_json::to_string(&serde_json::json!({ + "tool_call_id": tc.id, + "iteration": iterations, + })) { + tool_span.set_attribute("langfuse.observation.metadata", metadata); } + set_json_attribute( + &tool_span, + "langfuse.observation.input", + &args, + observability, + ); async move { + let tool_span = Span::current(); // Run BeforeToolCall hook. if let Some(ref hooks) = hook_registry { let payload = HookPayload::BeforeToolCall { @@ -1083,6 +1294,15 @@ pub async fn run_agent_loop_with_context( Ok(HookAction::Block(reason)) => { warn!(tool = %tc_name, reason = %reason, "tool call blocked by hook"); let err_str = format!("blocked by hook: {reason}"); + tool_span.set_attribute("langfuse.observation.level", "ERROR"); + tool_span + .set_attribute("langfuse.observation.status_message", err_str.clone()); + set_text_attribute( + &tool_span, + "langfuse.observation.output", + &err_str, + observability, + ); return ( false, serde_json::json!({ "error": err_str }), @@ -1128,14 +1348,42 @@ pub async fn run_agent_loop_with_context( } if has_error { + tool_span.set_attribute("langfuse.observation.level", "ERROR"); + if let Some(error) = error_msg.as_ref() { + tool_span.set_attribute( + "langfuse.observation.status_message", + error.clone(), + ); + } + set_json_attribute( + &tool_span, + "langfuse.observation.output", + &val, + observability, + ); // Tool executed but returned an error in the result (false, serde_json::json!({ "result": val }), error_msg) } else { + set_json_attribute( + &tool_span, + "langfuse.observation.output", + &val, + observability, + ); (true, serde_json::json!({ "result": val }), None) } }, Err(e) => { let err_str = e.to_string(); + tool_span.set_attribute("langfuse.observation.level", "ERROR"); + tool_span + .set_attribute("langfuse.observation.status_message", err_str.clone()); + set_text_attribute( + &tool_span, + "langfuse.observation.output", + &err_str, + observability, + ); // Dispatch AfterToolCall hook on failure. if let Some(ref hooks) = hook_registry { let payload = HookPayload::AfterToolCall { @@ -1157,6 +1405,14 @@ pub async fn run_agent_loop_with_context( } } else { let err_str = format!("unknown tool: {tc_name}"); + tool_span.set_attribute("langfuse.observation.level", "ERROR"); + tool_span.set_attribute("langfuse.observation.status_message", err_str.clone()); + set_text_attribute( + &tool_span, + "langfuse.observation.output", + &err_str, + observability, + ); ( false, serde_json::json!({ "error": err_str }), @@ -1164,6 +1420,7 @@ pub async fn run_agent_loop_with_context( ) } } + .instrument(tool_span) }) .collect(); @@ -1272,6 +1529,7 @@ pub async fn run_agent_loop_streaming( .and_then(|v| v.as_str()) .unwrap_or("") .to_string(); + let observability = observability_settings(&tool_context); let mut iterations = 0; let mut total_tool_calls = 0; @@ -1354,119 +1612,162 @@ pub async fn run_agent_loop_streaming( // Use streaming API. #[cfg(feature = "metrics")] let iter_start = std::time::Instant::now(); - let mut stream = provider.stream_with_tools(messages.clone(), schemas_for_api.clone()); - - // Accumulate answer text, reasoning text, and tool calls from the stream. - let mut accumulated_text = String::new(); - let mut accumulated_reasoning = String::new(); - let mut tool_calls: Vec = Vec::new(); - // Map streaming index → accumulated JSON args string. - let mut tool_call_args: std::collections::HashMap = - std::collections::HashMap::new(); - // Map streaming index → position in the `tool_calls` vec. - // The streaming index may not start at 0 (e.g. Copilot proxying - // Anthropic uses the content-block index, so a text block at index 0 - // pushes the tool_use to index 1). - let mut stream_idx_to_vec_pos: std::collections::HashMap = - std::collections::HashMap::new(); - let mut input_tokens: u32 = 0; - let mut output_tokens: u32 = 0; - let mut stream_error: Option = None; - - while let Some(event) = stream.next().await { - match event { - StreamEvent::Delta(text) => { - accumulated_text.push_str(&text); - if let Some(cb) = on_event { - cb(RunnerEvent::TextDelta(text)); - } - }, - StreamEvent::ProviderRaw(raw) => { - if raw_llm_responses.len() < 256 { - raw_llm_responses.push(raw); - } - }, - StreamEvent::ReasoningDelta(text) => { - accumulated_reasoning.push_str(&text); - if let Some(cb) = on_event { - cb(RunnerEvent::ThinkingText(accumulated_reasoning.clone())); - } - }, - StreamEvent::ToolCallStart { id, name, index } => { - let vec_pos = tool_calls.len(); - debug!(tool = %name, id = %id, stream_index = index, vec_pos, "tool call started in stream"); - tool_calls.push(ToolCall { - id, - name, - arguments: serde_json::json!({}), - }); - stream_idx_to_vec_pos.insert(index, vec_pos); - tool_call_args.insert(index, String::new()); - }, - StreamEvent::ToolCallArgumentsDelta { index, delta } => { - if let Some(args) = tool_call_args.get_mut(&index) { - args.push_str(&delta); - } - }, - StreamEvent::ToolCallComplete { index } => { - // Arguments are finalized after stream completes. - // Just log for now - we'll parse accumulated args later. - debug!(index, "tool call arguments complete"); - }, - StreamEvent::Done(usage) => { - input_tokens = usage.input_tokens; - output_tokens = usage.output_tokens; - debug!(input_tokens, output_tokens, "stream done"); - - #[cfg(feature = "metrics")] - { - let provider_name = provider.name().to_string(); - let model_id = provider.id().to_string(); - let duration = iter_start.elapsed().as_secs_f64(); - counter!( - llm_metrics::COMPLETIONS_TOTAL, - labels::PROVIDER => provider_name.clone(), - labels::MODEL => model_id.clone() - ) - .increment(1); - counter!( - llm_metrics::INPUT_TOKENS_TOTAL, - labels::PROVIDER => provider_name.clone(), - labels::MODEL => model_id.clone() - ) - .increment(u64::from(usage.input_tokens)); - counter!( - llm_metrics::OUTPUT_TOKENS_TOTAL, - labels::PROVIDER => provider_name.clone(), - labels::MODEL => model_id.clone() - ) - .increment(u64::from(usage.output_tokens)); - counter!( - llm_metrics::CACHE_READ_TOKENS_TOTAL, - labels::PROVIDER => provider_name.clone(), - labels::MODEL => model_id.clone() - ) - .increment(u64::from(usage.cache_read_tokens)); - counter!( - llm_metrics::CACHE_WRITE_TOKENS_TOTAL, - labels::PROVIDER => provider_name.clone(), - labels::MODEL => model_id.clone() - ) - .increment(u64::from(usage.cache_write_tokens)); - histogram!( - llm_metrics::COMPLETION_DURATION_SECONDS, - labels::PROVIDER => provider_name, - labels::MODEL => model_id - ) - .record(duration); - } - }, - StreamEvent::Error(msg) => { - stream_error = Some(msg); - break; - }, + let llm_span = tracing::info_span!( + "llm.request", + iteration = iterations, + provider = %provider.name(), + model = %provider.id() + ); + llm_span.set_attribute("langfuse.observation.type", "generation"); + llm_span.set_attribute("langfuse.observation.model.name", provider.id().to_string()); + if let Ok(metadata) = serde_json::to_string(&serde_json::json!({ + "iteration": iterations, + "message_count": messages.len(), + "tool_schema_count": schemas_for_api.len(), + "native_tools": native_tools, + "streaming": true, + })) { + llm_span.set_attribute("langfuse.observation.metadata", metadata); + } + let request_payload = serde_json::json!({ + "messages": messages.iter().map(ChatMessage::to_openai_value).collect::>(), + "tool_schema_count": schemas_for_api.len(), + }); + set_json_attribute( + &llm_span, + "langfuse.observation.input", + &request_payload, + observability, + ); + + let ( + mut accumulated_text, + accumulated_reasoning, + mut tool_calls, + tool_call_args, + stream_idx_to_vec_pos, + input_tokens, + output_tokens, + stream_error, + ) = async { + let mut stream = provider.stream_with_tools(messages.clone(), schemas_for_api.clone()); + let mut accumulated_text = String::new(); + let mut accumulated_reasoning = String::new(); + let mut tool_calls: Vec = Vec::new(); + let mut tool_call_args: std::collections::HashMap = + std::collections::HashMap::new(); + let mut stream_idx_to_vec_pos: std::collections::HashMap = + std::collections::HashMap::new(); + let mut input_tokens: u32 = 0; + let mut output_tokens: u32 = 0; + let mut stream_error: Option = None; + + while let Some(event) = stream.next().await { + match event { + StreamEvent::Delta(text) => { + accumulated_text.push_str(&text); + if let Some(cb) = on_event { + cb(RunnerEvent::TextDelta(text)); + } + }, + StreamEvent::ProviderRaw(raw) => { + if raw_llm_responses.len() < 256 { + raw_llm_responses.push(raw); + } + }, + StreamEvent::ReasoningDelta(text) => { + accumulated_reasoning.push_str(&text); + if let Some(cb) = on_event { + cb(RunnerEvent::ThinkingText(accumulated_reasoning.clone())); + } + }, + StreamEvent::ToolCallStart { id, name, index } => { + let vec_pos = tool_calls.len(); + debug!(tool = %name, id = %id, stream_index = index, vec_pos, "tool call started in stream"); + tool_calls.push(ToolCall { + id, + name, + arguments: serde_json::json!({}), + }); + stream_idx_to_vec_pos.insert(index, vec_pos); + tool_call_args.insert(index, String::new()); + }, + StreamEvent::ToolCallArgumentsDelta { index, delta } => { + if let Some(args) = tool_call_args.get_mut(&index) { + args.push_str(&delta); + } + }, + StreamEvent::ToolCallComplete { index } => { + debug!(index, "tool call arguments complete"); + }, + StreamEvent::Done(usage) => { + input_tokens = usage.input_tokens; + output_tokens = usage.output_tokens; + debug!(input_tokens, output_tokens, "stream done"); + + #[cfg(feature = "metrics")] + { + let provider_name = provider.name().to_string(); + let model_id = provider.id().to_string(); + let duration = iter_start.elapsed().as_secs_f64(); + counter!( + llm_metrics::COMPLETIONS_TOTAL, + labels::PROVIDER => provider_name.clone(), + labels::MODEL => model_id.clone() + ) + .increment(1); + counter!( + llm_metrics::INPUT_TOKENS_TOTAL, + labels::PROVIDER => provider_name.clone(), + labels::MODEL => model_id.clone() + ) + .increment(u64::from(usage.input_tokens)); + counter!( + llm_metrics::OUTPUT_TOKENS_TOTAL, + labels::PROVIDER => provider_name.clone(), + labels::MODEL => model_id.clone() + ) + .increment(u64::from(usage.output_tokens)); + counter!( + llm_metrics::CACHE_READ_TOKENS_TOTAL, + labels::PROVIDER => provider_name.clone(), + labels::MODEL => model_id.clone() + ) + .increment(u64::from(usage.cache_read_tokens)); + counter!( + llm_metrics::CACHE_WRITE_TOKENS_TOTAL, + labels::PROVIDER => provider_name.clone(), + labels::MODEL => model_id.clone() + ) + .increment(u64::from(usage.cache_write_tokens)); + histogram!( + llm_metrics::COMPLETION_DURATION_SECONDS, + labels::PROVIDER => provider_name, + labels::MODEL => model_id + ) + .record(duration); + } + }, + StreamEvent::Error(msg) => { + stream_error = Some(msg); + break; + }, + } } + + ( + accumulated_text, + accumulated_reasoning, + tool_calls, + tool_call_args, + stream_idx_to_vec_pos, + input_tokens, + output_tokens, + stream_error, + ) } + .instrument(llm_span.clone()) + .await; if let Some(cb) = on_event { cb(RunnerEvent::ThinkingDone); @@ -1474,6 +1775,14 @@ pub async fn run_agent_loop_streaming( // Handle stream errors — retry on transient failures/rate limits. if let Some(err) = stream_error { + llm_span.set_attribute("langfuse.observation.level", "ERROR"); + llm_span.set_attribute("langfuse.observation.status_message", err.clone()); + set_text_attribute( + &llm_span, + "langfuse.observation.output", + &err, + observability, + ); if is_context_window_error(&err) { return Err(AgentRunError::ContextWindowExceeded(err)); } @@ -1504,6 +1813,24 @@ pub async fn run_agent_loop_streaming( return Err(AgentRunError::Other(anyhow::anyhow!(err))); } + let stream_usage = Usage { + input_tokens, + output_tokens, + ..Default::default() + }; + set_usage_attributes(&llm_span, &stream_usage); + let response_payload = serde_json::json!({ + "text": accumulated_text, + "reasoning": accumulated_reasoning, + "tool_calls": tool_calls_to_json(&tool_calls), + }); + set_json_attribute( + &llm_span, + "langfuse.observation.output", + &response_payload, + observability, + ); + total_input_tokens = total_input_tokens.saturating_add(input_tokens); total_output_tokens = total_output_tokens.saturating_add(output_tokens); @@ -1747,14 +2074,27 @@ pub async fn run_agent_loop_streaming( let session_key = session_key_for_hooks.clone(); let tc_name = sanitized.to_string(); - if let Some(ref ctx) = tool_context - && let (Some(args_obj), Some(ctx_obj)) = (args.as_object_mut(), ctx.as_object()) - { - for (k, v) in ctx_obj { - args_obj.insert(k.clone(), v.clone()); - } + merge_tool_context(&mut args, &tool_context); + let tool_span = tracing::info_span!( + "tool.call", + tool = %tc_name, + tool_call_id = %tc.id, + ); + tool_span.set_attribute("langfuse.observation.type", "tool"); + if let Ok(metadata) = serde_json::to_string(&serde_json::json!({ + "tool_call_id": tc.id, + "iteration": iterations, + })) { + tool_span.set_attribute("langfuse.observation.metadata", metadata); } + set_json_attribute( + &tool_span, + "langfuse.observation.input", + &args, + observability, + ); async move { + let tool_span = Span::current(); // Run BeforeToolCall hook. if let Some(ref hooks) = hook_registry { let payload = HookPayload::BeforeToolCall { @@ -1766,6 +2106,15 @@ pub async fn run_agent_loop_streaming( Ok(HookAction::Block(reason)) => { warn!(tool = %tc_name, reason = %reason, "tool call blocked by hook"); let err_str = format!("blocked by hook: {reason}"); + tool_span.set_attribute("langfuse.observation.level", "ERROR"); + tool_span + .set_attribute("langfuse.observation.status_message", err_str.clone()); + set_text_attribute( + &tool_span, + "langfuse.observation.output", + &err_str, + observability, + ); return ( false, serde_json::json!({ "error": err_str }), @@ -1810,13 +2159,41 @@ pub async fn run_agent_loop_streaming( } if has_error { + tool_span.set_attribute("langfuse.observation.level", "ERROR"); + if let Some(error) = error_msg.as_ref() { + tool_span.set_attribute( + "langfuse.observation.status_message", + error.clone(), + ); + } + set_json_attribute( + &tool_span, + "langfuse.observation.output", + &val, + observability, + ); (false, serde_json::json!({ "result": val }), error_msg) } else { + set_json_attribute( + &tool_span, + "langfuse.observation.output", + &val, + observability, + ); (true, serde_json::json!({ "result": val }), None) } } Err(e) => { let err_str = e.to_string(); + tool_span.set_attribute("langfuse.observation.level", "ERROR"); + tool_span + .set_attribute("langfuse.observation.status_message", err_str.clone()); + set_text_attribute( + &tool_span, + "langfuse.observation.output", + &err_str, + observability, + ); if let Some(ref hooks) = hook_registry { let payload = HookPayload::AfterToolCall { session_key: session_key.clone(), @@ -1837,6 +2214,14 @@ pub async fn run_agent_loop_streaming( } } else { let err_str = format!("unknown tool: {tc_name}"); + tool_span.set_attribute("langfuse.observation.level", "ERROR"); + tool_span.set_attribute("langfuse.observation.status_message", err_str.clone()); + set_text_attribute( + &tool_span, + "langfuse.observation.output", + &err_str, + observability, + ); ( false, serde_json::json!({ "error": err_str }), @@ -1844,6 +2229,7 @@ pub async fn run_agent_loop_streaming( ) } } + .instrument(tool_span) }) .collect(); @@ -1961,6 +2347,29 @@ mod tests { assert!(long_prefix_id.len() <= 40); } + #[test] + fn observability_settings_default_to_off_without_tool_context() { + let settings = observability_settings(&None); + assert_eq!(settings.trace_content, TraceContentMode::Off); + assert_eq!(settings.max_content_bytes, 8_192); + } + + #[test] + fn merge_tool_context_skips_internal_telemetry_keys() { + let mut args = serde_json::json!({ "command": "pwd" }); + let context = Some(serde_json::json!({ + "_session_key": "chat-123", + "__telemetry_trace_content": "sanitized", + "__telemetry_max_content_bytes": 4096, + })); + + merge_tool_context(&mut args, &context); + + assert_eq!(args["_session_key"], "chat-123"); + assert!(args.get("__telemetry_trace_content").is_none()); + assert!(args.get("__telemetry_max_content_bytes").is_none()); + } + #[test] fn test_parse_tool_call_function_block_with_wrapper_and_text() { let text = "I'll do it.\n\n\nstart\npwd\n\n\nDone."; diff --git a/crates/chat/Cargo.toml b/crates/chat/Cargo.toml index f43385067..1fa6fb6ef 100644 --- a/crates/chat/Cargo.toml +++ b/crates/chat/Cargo.toml @@ -26,12 +26,14 @@ moltis-skills = { workspace = true } moltis-telegram = { workspace = true } moltis-tools = { workspace = true } moltis-voice = { workspace = true } +opentelemetry = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } thiserror = { workspace = true } tokio = { workspace = true } tokio-stream = { workspace = true } tracing = { workspace = true } +tracing-opentelemetry = { workspace = true } uuid = { workspace = true } [features] diff --git a/crates/chat/src/lib.rs b/crates/chat/src/lib.rs index 245b9a904..77600187d 100644 --- a/crates/chat/src/lib.rs +++ b/crates/chat/src/lib.rs @@ -10,6 +10,7 @@ use std::{ use { async_trait::async_trait, + opentelemetry::{Array as OtelArray, Value as OtelValue, trace::TraceContextExt}, serde::{Deserialize, Serialize}, serde_json::Value, tokio::{ @@ -17,10 +18,14 @@ use { task::AbortHandle, }, tokio_stream::StreamExt, - tracing::{debug, info, warn}, + tracing::{Instrument, Span, debug, info, warn}, + tracing_opentelemetry::OpenTelemetrySpanExt, }; -use moltis_config::{MessageQueueMode, ToolMode}; +use { + moltis_common::observability::sanitize_json_for_observability, + moltis_config::{MessageQueueMode, ToolMode, schema::TraceContentMode}, +}; use { moltis_agents::{ @@ -215,6 +220,8 @@ struct ChatFinalBroadcast { run_id: String, session_key: String, state: &'static str, + #[serde(skip_serializing_if = "Option::is_none")] + trace_id: Option, text: String, model: String, provider: String, @@ -248,6 +255,8 @@ struct ChatErrorBroadcast { run_id: String, session_key: String, state: &'static str, + #[serde(skip_serializing_if = "Option::is_none")] + trace_id: Option, error: Value, #[serde(skip_serializing_if = "Option::is_none")] seq: Option, @@ -263,6 +272,161 @@ struct AssistantTurnOutput { audio_path: Option, reasoning: Option, llm_api_response: Option, + trace_id: Option, +} + +#[derive(Debug, Clone, Copy)] +struct LangfuseContentSettings { + trace_content: TraceContentMode, + max_content_bytes: usize, +} + +fn langfuse_content_settings( + config: &moltis_config::MoltisConfig, +) -> Option { + let langfuse = &config.metrics.langfuse; + if !langfuse.enabled { + return None; + } + Some(LangfuseContentSettings { + trace_content: langfuse.trace_content, + max_content_bytes: langfuse.max_content_bytes.max(1), + }) +} + +fn set_langfuse_json_attribute( + span: &Span, + key: &'static str, + value: &Value, + settings: Option, +) { + let Some(settings) = settings else { + return; + }; + let serialized = match settings.trace_content { + TraceContentMode::Off => None, + TraceContentMode::Sanitized => serde_json::to_string(&sanitize_json_for_observability( + value, + settings.max_content_bytes, + true, + )) + .ok(), + TraceContentMode::Full => serde_json::to_string(&sanitize_json_for_observability( + value, + settings.max_content_bytes, + false, + )) + .ok(), + }; + if let Some(serialized) = serialized { + span.set_attribute(key, serialized); + } +} + +fn set_langfuse_trace_tags(span: &Span, tags: &[String]) { + if tags.is_empty() { + return; + } + span.set_attribute( + "langfuse.trace.tags", + OtelValue::Array(OtelArray::String( + tags.iter().cloned().map(Into::into).collect(), + )), + ); +} + +fn trace_id_for_span(span: &Span) -> Option { + let context = span.context(); + let trace_span = context.span(); + let span_context = trace_span.span_context(); + span_context + .is_valid() + .then(|| span_context.trace_id().to_string()) +} + +fn user_content_observability_value(content: &UserContent) -> Value { + match content { + UserContent::Text(text) => serde_json::json!({ + "type": "text", + "text": text, + }), + UserContent::Multimodal(parts) => serde_json::json!({ + "type": "multimodal", + "parts": parts + .iter() + .map(|part| match part { + ContentPart::Text(text) => serde_json::json!({ + "type": "text", + "text": text, + }), + ContentPart::Image { media_type, data } => serde_json::json!({ + "type": "image", + "media_type": media_type, + "bytes": data.len(), + }), + }) + .collect::>(), + }), + } +} + +fn usage_observability_value(usage: &moltis_agents::model::Usage) -> Value { + serde_json::json!({ + "input": usage.input_tokens, + "output": usage.output_tokens, + "cache_read": usage.cache_read_tokens, + "cache_write": usage.cache_write_tokens, + }) +} + +fn configure_chat_run_span( + span: &Span, + config: &moltis_config::MoltisConfig, + settings: Option, + run_id: &str, + session_key: &str, + agent_id: &str, + provider_name: &str, + model_id: &str, + desired_reply_medium: ReplyMedium, + user_message_index: usize, + tool_mode: ToolMode, + mcp_disabled: bool, + session_is_sandboxed: bool, + conn_id: Option<&str>, + user_content: &UserContent, +) { + let trace_name = format!("chat:{agent_id}"); + span.set_attribute("langfuse.trace.name", trace_name); + span.set_attribute("langfuse.observation.type", "agent"); + span.set_attribute("session.id", session_key.to_string()); + set_langfuse_trace_tags(span, &config.metrics.langfuse.tags); + + let metadata = serde_json::json!({ + "run_id": run_id, + "agent_id": agent_id, + "provider": provider_name, + "model": model_id, + "reply_medium": desired_reply_medium, + "user_message_index": user_message_index, + "tool_mode": match tool_mode { + ToolMode::Native => "native", + ToolMode::Text => "text", + ToolMode::Off => "off", + ToolMode::Auto => "auto", + }, + "mcp_disabled": mcp_disabled, + "session_is_sandboxed": session_is_sandboxed, + "conn_id": conn_id, + }); + if let Ok(metadata_str) = serde_json::to_string(&metadata) { + span.set_attribute("langfuse.trace.metadata", metadata_str.clone()); + span.set_attribute("langfuse.observation.metadata", metadata_str); + } + + let input = user_content_observability_value(user_content); + set_langfuse_json_attribute(span, "langfuse.trace.input", &input, settings); + set_langfuse_json_attribute(span, "langfuse.observation.input", &input, settings); } #[derive(Debug, Clone, Copy, Default)] @@ -2281,6 +2445,7 @@ struct ActiveAssistantDraft { provider: String, seq: Option, run_id: String, + trace_id: Option, } impl ActiveAssistantDraft { @@ -2292,6 +2457,7 @@ impl ActiveAssistantDraft { provider: provider.to_string(), seq, run_id: run_id.to_string(), + trace_id: None, } } @@ -2306,6 +2472,10 @@ impl ActiveAssistantDraft { self.reasoning.push_str(reasoning); } + fn set_trace_id(&mut self, trace_id: Option) { + self.trace_id = trace_id; + } + fn has_visible_content(&self) -> bool { !self.content.trim().is_empty() || !self.reasoning.trim().is_empty() } @@ -2328,6 +2498,7 @@ impl ActiveAssistantDraft { audio: None, seq: self.seq, run_id: Some(self.run_id.clone()), + trace_id: self.trace_id.clone(), } } } @@ -2376,6 +2547,7 @@ fn build_tool_call_assistant_message( audio: None, seq, run_id: run_id.map(str::to_string), + trace_id: None, } } @@ -3091,6 +3263,7 @@ impl ChatService for LiveChatService { audio: assistant_output.audio_path, seq: client_seq, run_id: Some(run_id_clone.clone()), + trace_id: assistant_output.trace_id, }; if let Err(e) = session_store .append(&session_key_clone, &assistant_msg.to_value()) @@ -3748,6 +3921,7 @@ impl ChatService for LiveChatService { audio: assistant_output.audio_path, seq: client_seq, run_id: Some(run_id_clone.clone()), + trace_id: assistant_output.trace_id, }; if let Err(e) = session_store .append(&session_key_clone, &assistant_msg.to_value()) @@ -4048,6 +4222,7 @@ impl ChatService for LiveChatService { audio: assistant_output.audio_path.clone(), seq: None, run_id: Some(run_id.clone()), + trace_id: assistant_output.trace_id.clone(), }; if let Err(e) = self .session_store @@ -5463,6 +5638,7 @@ async fn run_explicit_shell_command( run_id: run_id.to_string(), session_key: session_key.to_string(), state: "final", + trace_id: None, text: final_text.clone(), model: String::new(), provider: String::new(), @@ -5495,6 +5671,7 @@ async fn run_explicit_shell_command( audio_path: None, reasoning: None, llm_api_response: None, + trace_id: None, } } @@ -5994,6 +6171,39 @@ async fn run_with_tools( } else { false }; + let langfuse_settings = langfuse_content_settings(&persona.config); + let run_span = tracing::info_span!( + "chat.run", + run_id = %run_id, + session_key = %session_key, + agent_id = %agent_id, + provider = %provider_name, + model = %model_id, + tools_enabled, + ); + configure_chat_run_span( + &run_span, + &persona.config, + langfuse_settings, + run_id, + session_key, + agent_id, + provider_name, + model_id, + desired_reply_medium, + user_message_index, + tool_mode, + mcp_disabled, + session_is_sandboxed, + conn_id.as_deref(), + user_content, + ); + let trace_id = trace_id_for_span(&run_span); + if let Some(ref map) = active_partial_assistant + && let Some(draft) = map.write().await.get_mut(session_key) + { + draft.set_trace_id(trace_id.clone()); + } // Broadcast tool events to the UI in the order emitted by the runner. let state_for_events = Arc::clone(state); @@ -6007,363 +6217,208 @@ async fn run_with_tools( .await .map(|dispatcher| Arc::new(Mutex::new(dispatcher))); let channel_stream_for_events = channel_stream_dispatcher.as_ref().map(Arc::clone); - let event_forwarder = tokio::spawn(async move { - // Track tool call arguments from ToolCallStart so they can be persisted in ToolCallEnd. - let mut tool_args_map: HashMap = HashMap::new(); - // Track reasoning text that should be persisted with the first tool call after thinking. - let mut tool_reasoning_map: HashMap = HashMap::new(); - let mut latest_reasoning = String::new(); - while let Some(event) = event_rx.recv().await { - let state = Arc::clone(&state_for_events); - let run_id = run_id_for_events.clone(); - let sk = session_key_for_events.clone(); - let store = session_store_for_events.clone(); - let seq = client_seq; - let payload = match event { - RunnerEvent::Thinking => serde_json::json!({ - "runId": run_id, - "sessionKey": sk, - "state": "thinking", - "seq": seq, - }), - RunnerEvent::ThinkingDone => serde_json::json!({ - "runId": run_id, - "sessionKey": sk, - "state": "thinking_done", - "seq": seq, - }), - RunnerEvent::ToolCallStart { - id, - name, - arguments, - } => { - tool_args_map.insert(id.clone(), arguments.clone()); - - // Track active tool call for chat.peek. - if let Some(ref map) = active_tool_calls { - map.write() - .await - .entry(sk.clone()) - .or_default() - .push(ActiveToolCall { - id: id.clone(), - name: name.clone(), - arguments: arguments.clone(), - started_at: now_ms(), - }); - } - - // Attach reasoning to the first tool call after thinking. - if !latest_reasoning.is_empty() { - tool_reasoning_map - .insert(id.clone(), std::mem::take(&mut latest_reasoning)); - } - - // Send tool status to channels (Telegram, etc.) - let state_clone = Arc::clone(&state); - let sk_clone = sk.clone(); - let name_clone = name.clone(); - let args_clone = arguments.clone(); - tokio::spawn(async move { - send_tool_status_to_channels( - &state_clone, - &sk_clone, - &name_clone, - &args_clone, - ) - .await; - }); - - let is_browser = name == "browser"; - let mut payload = serde_json::json!({ + let event_forwarder = tokio::spawn( + async move { + // Track tool call arguments from ToolCallStart so they can be persisted in ToolCallEnd. + let mut tool_args_map: HashMap = HashMap::new(); + // Track reasoning text that should be persisted with the first tool call after thinking. + let mut tool_reasoning_map: HashMap = HashMap::new(); + let mut latest_reasoning = String::new(); + while let Some(event) = event_rx.recv().await { + let state = Arc::clone(&state_for_events); + let run_id = run_id_for_events.clone(); + let sk = session_key_for_events.clone(); + let store = session_store_for_events.clone(); + let seq = client_seq; + let payload = match event { + RunnerEvent::Thinking => serde_json::json!({ "runId": run_id, "sessionKey": sk, - "state": "tool_call_start", - "toolCallId": id, - "toolName": name, - "arguments": arguments, + "state": "thinking", "seq": seq, - }); - if is_browser { - payload["executionMode"] = serde_json::json!(if session_is_sandboxed { - "sandbox" - } else { - "host" - }); - } - payload - }, - RunnerEvent::ToolCallEnd { - id, - name, - success, - error, - result, - } => { - // Remove from active tool calls tracking. - if let Some(ref map) = active_tool_calls { - let mut guard = map.write().await; - if let Some(calls) = guard.get_mut(&sk) { - calls.retain(|tc| tc.id != id); - if calls.is_empty() { - guard.remove(&sk); - } - } - } - - let mut payload = serde_json::json!({ + }), + RunnerEvent::ThinkingDone => serde_json::json!({ "runId": run_id, "sessionKey": sk, - "state": "tool_call_end", - "toolCallId": id, - "toolName": name, - "success": success, + "state": "thinking_done", "seq": seq, - }); - if let Some(ref err) = error { - payload["error"] = serde_json::json!(parse_chat_error(err, None)); - } - // Check for screenshot/image to send to channel (Telegram, etc.) - let screenshot_to_send = result - .as_ref() - .and_then(|r| r.get("screenshot")) - .and_then(|s| s.as_str()) - .filter(|s| s.starts_with("data:image/")) - .map(String::from); - - let image_caption = result - .as_ref() - .and_then(|r| r.get("caption")) - .and_then(|c| c.as_str()) - .map(String::from); - - // Check for document file to send to channel. - // New path: `document_ref` (lightweight media-dir reference). - // Legacy path: `document` with `data:` URI. - let document_ref_to_send = result - .as_ref() - .and_then(|r| r.get("document_ref")) - .and_then(|d| d.as_str()) - .map(String::from); - - let document_ref_mime = if document_ref_to_send.is_some() { - result - .as_ref() - .and_then(|r| r.get("mime_type")) - .and_then(|m| m.as_str()) - .map(String::from) - } else { - None - }; - - let document_to_send = if document_ref_to_send.is_none() { - result - .as_ref() - .and_then(|r| r.get("document")) - .and_then(|d| d.as_str()) - .filter(|d| d.starts_with("data:")) - .map(String::from) - } else { - None - }; - - let has_document = document_ref_to_send.is_some() || document_to_send.is_some(); - - let document_filename = if has_document { - result - .as_ref() - .and_then(|r| r.get("filename")) - .and_then(|f| f.as_str()) - .map(String::from) - } else { - None - }; - - let document_caption = if has_document { - result - .as_ref() - .and_then(|r| r.get("caption")) - .and_then(|c| c.as_str()) - .map(String::from) - } else { - None - }; - - // Extract location from show_map results for native pin - let location_to_send = if name == "show_map" { - result.as_ref().and_then(|r| { - let lat = r.get("latitude")?.as_f64()?; - let lon = r.get("longitude")?.as_f64()?; - let label = r.get("label").and_then(|l| l.as_str()).map(String::from); - Some((lat, lon, label)) - }) - } else { - None - }; - - if let Some(ref res) = result { - // Cap output sent to the UI to avoid huge WS frames. - let mut capped = res.clone(); - for field in &["stdout", "stderr"] { - if let Some(s) = capped.get(*field).and_then(|v| v.as_str()) - && s.len() > 10_000 - { - let truncated = format!( - "{}\n\n... [truncated — {} bytes total]", - truncate_at_char_boundary(s, 10_000), - s.len() - ); - capped[*field] = Value::String(truncated); - } + }), + RunnerEvent::ToolCallStart { + id, + name, + arguments, + } => { + tool_args_map.insert(id.clone(), arguments.clone()); + + // Track active tool call for chat.peek. + if let Some(ref map) = active_tool_calls { + map.write() + .await + .entry(sk.clone()) + .or_default() + .push(ActiveToolCall { + id: id.clone(), + name: name.clone(), + arguments: arguments.clone(), + started_at: now_ms(), + }); } - // Cap legacy document data URIs — the LLM never sees - // these and the UI doesn't render them. - if let Some(doc) = capped.get("document").and_then(|v| v.as_str()) - && doc.starts_with("data:") - && doc.len() > 200 - { - capped["document"] = - Value::String("[document data omitted]".to_string()); + + // Attach reasoning to the first tool call after thinking. + if !latest_reasoning.is_empty() { + tool_reasoning_map + .insert(id.clone(), std::mem::take(&mut latest_reasoning)); } - payload["result"] = capped; - } - // Send native location pin to channels before the screenshot. - if let Some((lat, lon, label)) = location_to_send { + // Send tool status to channels (Telegram, etc.) let state_clone = Arc::clone(&state); let sk_clone = sk.clone(); + let name_clone = name.clone(); + let args_clone = arguments.clone(); tokio::spawn(async move { - send_location_to_channels( + send_tool_status_to_channels( &state_clone, &sk_clone, - lat, - lon, - label.as_deref(), + &name_clone, + &args_clone, ) .await; }); - } - // Send screenshot/image to channel targets (Telegram) if present. - if let Some(screenshot_data) = screenshot_to_send { - let state_clone = Arc::clone(&state); - let sk_clone = sk.clone(); - tokio::spawn(async move { - send_screenshot_to_channels( - &state_clone, - &sk_clone, - &screenshot_data, - image_caption.as_deref(), - ) - .await; + let is_browser = name == "browser"; + let mut payload = serde_json::json!({ + "runId": run_id, + "sessionKey": sk, + "state": "tool_call_start", + "toolCallId": id, + "toolName": name, + "arguments": arguments, + "seq": seq, }); - } - - // Send document to channel targets if present. - if let Some(media_ref) = document_ref_to_send { - // New path: read from media dir at upload time. - let state_clone = Arc::clone(&state); - let sk_clone = sk.clone(); - let store_clone = store.clone(); - let mime = document_ref_mime - .unwrap_or_else(|| "application/octet-stream".to_string()); - tokio::spawn(async move { - if let Some(payload) = document_payload_from_ref( - store_clone.as_ref(), - &sk_clone, - &media_ref, - &mime, - document_filename.as_deref(), - document_caption.as_deref(), - ) - .await - { - dispatch_document_to_channels(&state_clone, &sk_clone, payload) - .await; + if is_browser { + payload["executionMode"] = serde_json::json!(if session_is_sandboxed { + "sandbox" + } else { + "host" + }); + } + payload + }, + RunnerEvent::ToolCallEnd { + id, + name, + success, + error, + result, + } => { + // Remove from active tool calls tracking. + if let Some(ref map) = active_tool_calls { + let mut guard = map.write().await; + if let Some(calls) = guard.get_mut(&sk) { + calls.retain(|tc| tc.id != id); + if calls.is_empty() { + guard.remove(&sk); + } } + } + + let mut payload = serde_json::json!({ + "runId": run_id, + "sessionKey": sk, + "state": "tool_call_end", + "toolCallId": id, + "toolName": name, + "success": success, + "seq": seq, }); - } else if let Some(document_data) = document_to_send { - // Legacy fallback: data URI. - let state_clone = Arc::clone(&state); - let sk_clone = sk.clone(); - let payload = document_payload_from_data_uri( - &document_data, - document_filename.as_deref(), - document_caption.as_deref(), - ); - tokio::spawn(async move { - dispatch_document_to_channels(&state_clone, &sk_clone, payload).await; - }); - } + if let Some(ref err) = error { + payload["error"] = serde_json::json!(parse_chat_error(err, None)); + } + // Check for screenshot/image to send to channel (Telegram, etc.) + let screenshot_to_send = result + .as_ref() + .and_then(|r| r.get("screenshot")) + .and_then(|s| s.as_str()) + .filter(|s| s.starts_with("data:image/")) + .map(String::from); - // Buffer tool error result for the channel logbook. - if !success { - send_tool_result_to_channels(&state, &sk, &name, success, &error, &result) - .await; - } + let image_caption = result + .as_ref() + .and_then(|r| r.get("caption")) + .and_then(|c| c.as_str()) + .map(String::from); - // Persist tool result to the session JSONL file. - if let Some(ref store) = store { - let tracked_args = tool_args_map.remove(&id); - // Save screenshot to media dir (if present) and replace - // with a lightweight path reference. Strip screenshot_scale - // (only needed for live rendering). Cap stdout/stderr at - // 10 KB, matching the WS broadcast cap. - let store_media = Arc::clone(store); - let sk_media = sk.clone(); - let tool_call_id = id.clone(); - let persisted_result = result.as_ref().map(|res| { - let mut r = res.clone(); - // Try to decode and persist the screenshot to the media - // directory. Extract base64 into an owned Vec first to - // release the borrow on `r`. - let decoded_screenshot = r - .get("screenshot") - .and_then(|v| v.as_str()) - .filter(|s| s.starts_with("data:image/")) - .and_then(|uri| uri.split(',').nth(1)) - .and_then(|b64| { - use base64::Engine; - base64::engine::general_purpose::STANDARD.decode(b64).ok() - }); - if let Some(bytes) = decoded_screenshot { - let filename = format!("{tool_call_id}.png"); - let store_ref = Arc::clone(&store_media); - let sk_ref = sk_media.clone(); - tokio::spawn(async move { - if let Err(e) = - store_ref.save_media(&sk_ref, &filename, &bytes).await - { - warn!("failed to save screenshot media: {e}"); - } - }); - let sanitized = SessionStore::key_to_filename(&sk_media); - r["screenshot"] = - Value::String(format!("media/{sanitized}/{tool_call_id}.png")); - } - // If screenshot is still a data URI (decode failed), strip it. - let strip_screenshot = r - .get("screenshot") - .and_then(|v| v.as_str()) - .is_some_and(|s| s.starts_with("data:")); - // Strip legacy document data URIs — they are only - // needed by the channel dispatch (already extracted - // above) and should not be persisted. - let strip_document = r - .get("document") - .and_then(|v| v.as_str()) - .is_some_and(|s| s.starts_with("data:")); - if let Some(obj) = r.as_object_mut() { - if strip_screenshot { - obj.remove("screenshot"); - } - if strip_document { - obj.remove("document"); - } - obj.remove("screenshot_scale"); - } + // Check for document file to send to channel. + // New path: `document_ref` (lightweight media-dir reference). + // Legacy path: `document` with `data:` URI. + let document_ref_to_send = result + .as_ref() + .and_then(|r| r.get("document_ref")) + .and_then(|d| d.as_str()) + .map(String::from); + + let document_ref_mime = if document_ref_to_send.is_some() { + result + .as_ref() + .and_then(|r| r.get("mime_type")) + .and_then(|m| m.as_str()) + .map(String::from) + } else { + None + }; + + let document_to_send = if document_ref_to_send.is_none() { + result + .as_ref() + .and_then(|r| r.get("document")) + .and_then(|d| d.as_str()) + .filter(|d| d.starts_with("data:")) + .map(String::from) + } else { + None + }; + + let has_document = + document_ref_to_send.is_some() || document_to_send.is_some(); + + let document_filename = if has_document { + result + .as_ref() + .and_then(|r| r.get("filename")) + .and_then(|f| f.as_str()) + .map(String::from) + } else { + None + }; + + let document_caption = if has_document { + result + .as_ref() + .and_then(|r| r.get("caption")) + .and_then(|c| c.as_str()) + .map(String::from) + } else { + None + }; + + // Extract location from show_map results for native pin + let location_to_send = if name == "show_map" { + result.as_ref().and_then(|r| { + let lat = r.get("latitude")?.as_f64()?; + let lon = r.get("longitude")?.as_f64()?; + let label = + r.get("label").and_then(|l| l.as_str()).map(String::from); + Some((lat, lon, label)) + }) + } else { + None + }; + + if let Some(ref res) = result { + // Cap output sent to the UI to avoid huge WS frames. + let mut capped = res.clone(); for field in &["stdout", "stderr"] { - if let Some(s) = r.get(*field).and_then(|v| v.as_str()) + if let Some(s) = capped.get(*field).and_then(|v| v.as_str()) && s.len() > 10_000 { let truncated = format!( @@ -6371,143 +6426,308 @@ async fn run_with_tools( truncate_at_char_boundary(s, 10_000), s.len() ); - r[*field] = Value::String(truncated); + capped[*field] = Value::String(truncated); } } - r - }); - let tracked_reasoning = tool_reasoning_map.remove(&id); - let assistant_tool_call_msg = build_tool_call_assistant_message( - id.clone(), - name.clone(), - tracked_args.clone(), - seq, - Some(run_id.as_str()), - ); - let tool_result_msg = PersistedMessage::ToolResult { - tool_call_id: id, - tool_name: name, - arguments: tracked_args, - success, - result: persisted_result, - error, - reasoning: tracked_reasoning, - created_at: Some(now_ms()), - run_id: Some(run_id.clone()), - }; - persist_tool_history_pair( - store, - &sk, - assistant_tool_call_msg, - tool_result_msg, - "failed to persist assistant tool call", - "failed to persist tool result", - ) - .await; - } + // Cap legacy document data URIs — the LLM never sees + // these and the UI doesn't render them. + if let Some(doc) = capped.get("document").and_then(|v| v.as_str()) + && doc.starts_with("data:") + && doc.len() > 200 + { + capped["document"] = + Value::String("[document data omitted]".to_string()); + } + payload["result"] = capped; + } - payload - }, - RunnerEvent::ThinkingText(text) => { - latest_reasoning = text.clone(); - if let Some(ref map) = active_thinking_text { - map.write().await.insert(sk.clone(), text.clone()); - } - if let Some(ref map) = active_partial_for_events - && let Some(draft) = map.write().await.get_mut(&sk) - { - draft.set_reasoning(&text); - } - serde_json::json!({ + // Send native location pin to channels before the screenshot. + if let Some((lat, lon, label)) = location_to_send { + let state_clone = Arc::clone(&state); + let sk_clone = sk.clone(); + tokio::spawn(async move { + send_location_to_channels( + &state_clone, + &sk_clone, + lat, + lon, + label.as_deref(), + ) + .await; + }); + } + + // Send screenshot/image to channel targets (Telegram) if present. + if let Some(screenshot_data) = screenshot_to_send { + let state_clone = Arc::clone(&state); + let sk_clone = sk.clone(); + tokio::spawn(async move { + send_screenshot_to_channels( + &state_clone, + &sk_clone, + &screenshot_data, + image_caption.as_deref(), + ) + .await; + }); + } + + // Send document to channel targets if present. + if let Some(media_ref) = document_ref_to_send { + // New path: read from media dir at upload time. + let state_clone = Arc::clone(&state); + let sk_clone = sk.clone(); + let store_clone = store.clone(); + let mime = document_ref_mime + .unwrap_or_else(|| "application/octet-stream".to_string()); + tokio::spawn(async move { + if let Some(payload) = document_payload_from_ref( + store_clone.as_ref(), + &sk_clone, + &media_ref, + &mime, + document_filename.as_deref(), + document_caption.as_deref(), + ) + .await + { + dispatch_document_to_channels(&state_clone, &sk_clone, payload) + .await; + } + }); + } else if let Some(document_data) = document_to_send { + // Legacy fallback: data URI. + let state_clone = Arc::clone(&state); + let sk_clone = sk.clone(); + let payload = document_payload_from_data_uri( + &document_data, + document_filename.as_deref(), + document_caption.as_deref(), + ); + tokio::spawn(async move { + dispatch_document_to_channels(&state_clone, &sk_clone, payload) + .await; + }); + } + + // Buffer tool error result for the channel logbook. + if !success { + send_tool_result_to_channels( + &state, &sk, &name, success, &error, &result, + ) + .await; + } + + // Persist tool result to the session JSONL file. + if let Some(ref store) = store { + let tracked_args = tool_args_map.remove(&id); + // Save screenshot to media dir (if present) and replace + // with a lightweight path reference. Strip screenshot_scale + // (only needed for live rendering). Cap stdout/stderr at + // 10 KB, matching the WS broadcast cap. + let store_media = Arc::clone(store); + let sk_media = sk.clone(); + let tool_call_id = id.clone(); + let persisted_result = result.as_ref().map(|res| { + let mut r = res.clone(); + // Try to decode and persist the screenshot to the media + // directory. Extract base64 into an owned Vec first to + // release the borrow on `r`. + let decoded_screenshot = r + .get("screenshot") + .and_then(|v| v.as_str()) + .filter(|s| s.starts_with("data:image/")) + .and_then(|uri| uri.split(',').nth(1)) + .and_then(|b64| { + use base64::Engine; + base64::engine::general_purpose::STANDARD.decode(b64).ok() + }); + if let Some(bytes) = decoded_screenshot { + let filename = format!("{tool_call_id}.png"); + let store_ref = Arc::clone(&store_media); + let sk_ref = sk_media.clone(); + tokio::spawn(async move { + if let Err(e) = + store_ref.save_media(&sk_ref, &filename, &bytes).await + { + warn!("failed to save screenshot media: {e}"); + } + }); + let sanitized = SessionStore::key_to_filename(&sk_media); + r["screenshot"] = Value::String(format!( + "media/{sanitized}/{tool_call_id}.png" + )); + } + // If screenshot is still a data URI (decode failed), strip it. + let strip_screenshot = r + .get("screenshot") + .and_then(|v| v.as_str()) + .is_some_and(|s| s.starts_with("data:")); + // Strip legacy document data URIs — they are only + // needed by the channel dispatch (already extracted + // above) and should not be persisted. + let strip_document = r + .get("document") + .and_then(|v| v.as_str()) + .is_some_and(|s| s.starts_with("data:")); + if let Some(obj) = r.as_object_mut() { + if strip_screenshot { + obj.remove("screenshot"); + } + if strip_document { + obj.remove("document"); + } + obj.remove("screenshot_scale"); + } + for field in &["stdout", "stderr"] { + if let Some(s) = r.get(*field).and_then(|v| v.as_str()) + && s.len() > 10_000 + { + let truncated = format!( + "{}\n\n... [truncated — {} bytes total]", + truncate_at_char_boundary(s, 10_000), + s.len() + ); + r[*field] = Value::String(truncated); + } + } + r + }); + let tracked_reasoning = tool_reasoning_map.remove(&id); + let assistant_tool_call_msg = build_tool_call_assistant_message( + id.clone(), + name.clone(), + tracked_args.clone(), + seq, + Some(run_id.as_str()), + ); + let tool_result_msg = PersistedMessage::ToolResult { + tool_call_id: id, + tool_name: name, + arguments: tracked_args, + success, + result: persisted_result, + error, + reasoning: tracked_reasoning, + created_at: Some(now_ms()), + run_id: Some(run_id.clone()), + }; + persist_tool_history_pair( + store, + &sk, + assistant_tool_call_msg, + tool_result_msg, + "failed to persist assistant tool call", + "failed to persist tool result", + ) + .await; + } + + payload + }, + RunnerEvent::ThinkingText(text) => { + latest_reasoning = text.clone(); + if let Some(ref map) = active_thinking_text { + map.write().await.insert(sk.clone(), text.clone()); + } + if let Some(ref map) = active_partial_for_events + && let Some(draft) = map.write().await.get_mut(&sk) + { + draft.set_reasoning(&text); + } + serde_json::json!({ + "runId": run_id, + "sessionKey": sk, + "state": "thinking_text", + "text": text, + "seq": seq, + }) + }, + RunnerEvent::TextDelta(text) => { + if let Some(ref map) = active_partial_for_events + && let Some(draft) = map.write().await.get_mut(&sk) + { + draft.append_text(&text); + } + if let Some(ref dispatcher) = channel_stream_for_events { + dispatcher.lock().await.send_delta(&text).await; + } + serde_json::json!({ + "runId": run_id, + "sessionKey": sk, + "state": "delta", + "text": text, + "seq": seq, + }) + }, + RunnerEvent::Iteration(n) => serde_json::json!({ "runId": run_id, "sessionKey": sk, - "state": "thinking_text", - "text": text, + "state": "iteration", + "iteration": n, "seq": seq, - }) - }, - RunnerEvent::TextDelta(text) => { - if let Some(ref map) = active_partial_for_events - && let Some(draft) = map.write().await.get_mut(&sk) - { - draft.append_text(&text); - } - if let Some(ref dispatcher) = channel_stream_for_events { - dispatcher.lock().await.send_delta(&text).await; - } - serde_json::json!({ + }), + RunnerEvent::SubAgentStart { task, model, depth } => serde_json::json!({ "runId": run_id, "sessionKey": sk, - "state": "delta", - "text": text, + "state": "sub_agent_start", + "task": task, + "model": model, + "depth": depth, "seq": seq, - }) - }, - RunnerEvent::Iteration(n) => serde_json::json!({ - "runId": run_id, - "sessionKey": sk, - "state": "iteration", - "iteration": n, - "seq": seq, - }), - RunnerEvent::SubAgentStart { task, model, depth } => serde_json::json!({ - "runId": run_id, - "sessionKey": sk, - "state": "sub_agent_start", - "task": task, - "model": model, - "depth": depth, - "seq": seq, - }), - RunnerEvent::SubAgentEnd { - task, - model, - depth, - iterations, - tool_calls_made, - } => serde_json::json!({ - "runId": run_id, - "sessionKey": sk, - "state": "sub_agent_end", - "task": task, - "model": model, - "depth": depth, - "iterations": iterations, - "toolCallsMade": tool_calls_made, - "seq": seq, - }), - RunnerEvent::RetryingAfterError { error, delay_ms } => { - let error_obj = - parse_chat_error(&error, Some(provider_name_for_events.as_str())); - if error_obj.get("type").and_then(|v| v.as_str()) == Some("rate_limit_exceeded") - { - let state_clone = Arc::clone(&state); - let sk_clone = sk.clone(); - let error_clone = error_obj.clone(); - tokio::spawn(async move { - send_retry_status_to_channels( - &state_clone, - &sk_clone, - &error_clone, - Duration::from_millis(delay_ms), - ) - .await; - }); - } - serde_json::json!({ + }), + RunnerEvent::SubAgentEnd { + task, + model, + depth, + iterations, + tool_calls_made, + } => serde_json::json!({ "runId": run_id, "sessionKey": sk, - "state": "retrying", - "error": error_obj, - "retryAfterMs": delay_ms, + "state": "sub_agent_end", + "task": task, + "model": model, + "depth": depth, + "iterations": iterations, + "toolCallsMade": tool_calls_made, "seq": seq, - }) - }, - }; - broadcast(&state, "chat", payload, BroadcastOpts::default()).await; + }), + RunnerEvent::RetryingAfterError { error, delay_ms } => { + let error_obj = + parse_chat_error(&error, Some(provider_name_for_events.as_str())); + if error_obj.get("type").and_then(|v| v.as_str()) + == Some("rate_limit_exceeded") + { + let state_clone = Arc::clone(&state); + let sk_clone = sk.clone(); + let error_clone = error_obj.clone(); + tokio::spawn(async move { + send_retry_status_to_channels( + &state_clone, + &sk_clone, + &error_clone, + Duration::from_millis(delay_ms), + ) + .await; + }); + } + serde_json::json!({ + "runId": run_id, + "sessionKey": sk, + "state": "retrying", + "error": error_obj, + "retryAfterMs": delay_ms, + "seq": seq, + }) + }, + }; + broadcast(&state, "chat", payload, BroadcastOpts::default()).await; + } + latest_reasoning } - latest_reasoning - }); + .instrument(run_span.clone()), + ); active_event_forwarders .write() .await @@ -6540,6 +6760,16 @@ async fn run_with_tools( if let Some(cid) = conn_id.as_deref() { tool_context["_conn_id"] = serde_json::json!(cid); } + if let Some(settings) = langfuse_settings { + tool_context["__telemetry_trace_content"] = + serde_json::json!(match settings.trace_content { + TraceContentMode::Off => "off", + TraceContentMode::Sanitized => "sanitized", + TraceContentMode::Full => "full", + }); + tool_context["__telemetry_max_content_bytes"] = + serde_json::json!(settings.max_content_bytes); + } let provider_ref = provider.clone(); let first_result = run_agent_loop_streaming( @@ -6552,6 +6782,7 @@ async fn run_with_tools( Some(tool_context.clone()), hook_registry.clone(), ) + .instrument(run_span.clone()) .await; // On context-window overflow, compact the session and retry once. @@ -6621,6 +6852,7 @@ async fn run_with_tools( Some(tool_context), hook_registry, ) + .instrument(run_span.clone()) .await }, Err(e) => { @@ -6703,9 +6935,31 @@ async fn run_with_tools( run_id: run_id.to_string(), session_key: session_key.to_string(), state: "error", + trace_id: trace_id.clone(), error: error_obj, seq: client_seq, }; + run_span.set_attribute("langfuse.observation.level", "ERROR"); + run_span.set_attribute( + "langfuse.observation.status_message", + "empty response with zero tokens", + ); + set_langfuse_json_attribute( + &run_span, + "langfuse.trace.output", + &serde_json::json!({ + "error": "empty response with zero tokens", + }), + langfuse_settings, + ); + set_langfuse_json_attribute( + &run_span, + "langfuse.observation.output", + &serde_json::json!({ + "error": "empty response with zero tokens", + }), + langfuse_settings, + ); #[allow(clippy::unwrap_used)] // serializing known-valid struct let payload_val = serde_json::to_value(&error_payload).unwrap(); terminal_runs.write().await.insert(run_id.to_string()); @@ -6757,6 +7011,7 @@ async fn run_with_tools( run_id: run_id.to_string(), session_key: session_key.to_string(), state: "final", + trace_id: trace_id.clone(), text: display_text.clone(), model: provider_ref.id().to_string(), provider: provider_name.to_string(), @@ -6774,6 +7029,26 @@ async fn run_with_tools( reasoning: reasoning.clone(), seq: client_seq, }; + let final_output = serde_json::json!({ + "text": display_text.clone(), + "reasoning": reasoning.clone(), + "usage": usage_observability_value(&usage), + "request_usage": usage_observability_value(&request_usage), + "iterations": iterations, + "tool_calls_made": tool_calls_made, + }); + set_langfuse_json_attribute( + &run_span, + "langfuse.trace.output", + &final_output, + langfuse_settings, + ); + set_langfuse_json_attribute( + &run_span, + "langfuse.observation.output", + &final_output, + langfuse_settings, + ); #[allow(clippy::unwrap_used)] // serializing known-valid struct let payload_val = serde_json::to_value(&final_payload).unwrap(); terminal_runs.write().await.insert(run_id.to_string()); @@ -6805,6 +7080,7 @@ async fn run_with_tools( audio_path, reasoning, llm_api_response, + trace_id: trace_id.clone(), }) }, Err(e) => { @@ -6818,9 +7094,24 @@ async fn run_with_tools( run_id: run_id.to_string(), session_key: session_key.to_string(), state: "error", + trace_id, error: error_obj, seq: client_seq, }; + run_span.set_attribute("langfuse.observation.level", "ERROR"); + run_span.set_attribute("langfuse.observation.status_message", error_str.clone()); + set_langfuse_json_attribute( + &run_span, + "langfuse.trace.output", + &serde_json::json!({ "error": error_str }), + langfuse_settings, + ); + set_langfuse_json_attribute( + &run_span, + "langfuse.observation.output", + &serde_json::json!({ "error": error_str }), + langfuse_settings, + ); #[allow(clippy::unwrap_used)] // serializing known-valid struct let payload_val = serde_json::to_value(&error_payload).unwrap(); terminal_runs.write().await.insert(run_id.to_string()); @@ -7017,6 +7308,39 @@ async fn run_streaming( // Layer 1: instruct the LLM to write speech-friendly output when voice is active. let system_prompt = apply_voice_reply_suffix(system_prompt, desired_reply_medium); + let langfuse_settings = langfuse_content_settings(&persona.config); + let run_span = tracing::info_span!( + "chat.run", + run_id = %run_id, + session_key = %session_key, + agent_id = %agent_id, + provider = %provider_name, + model = %model_id, + tools_enabled = false, + ); + configure_chat_run_span( + &run_span, + &persona.config, + langfuse_settings, + run_id, + session_key, + agent_id, + provider_name, + model_id, + desired_reply_medium, + user_message_index, + ToolMode::Off, + false, + false, + None, + user_content, + ); + let trace_id = trace_id_for_span(&run_span); + if let Some(ref map) = active_partial_assistant + && let Some(draft) = map.write().await.get_mut(session_key) + { + draft.set_trace_id(trace_id.clone()); + } let mut messages: Vec = Vec::new(); messages.push(ChatMessage::system(system_prompt)); @@ -7041,6 +7365,33 @@ async fn run_streaming( #[cfg(feature = "metrics")] let stream_start = Instant::now(); + let llm_span = tracing::info_span!( + parent: &run_span, + "llm.request", + provider = %provider_name, + model = %model_id + ); + llm_span.set_attribute("langfuse.observation.type", "generation"); + llm_span.set_attribute("langfuse.observation.model.name", model_id.to_string()); + if let Ok(metadata) = serde_json::to_string(&serde_json::json!({ + "message_count": messages.len(), + "streaming": true, + "tool_mode": "off", + "retryable_server_retries_remaining": server_retries_remaining, + "retryable_rate_limit_retries_remaining": rate_limit_retries_remaining, + })) { + llm_span.set_attribute("langfuse.observation.metadata", metadata); + } + let request_payload = serde_json::json!({ + "messages": messages.iter().map(ChatMessage::to_openai_value).collect::>(), + }); + set_langfuse_json_attribute( + &llm_span, + "langfuse.observation.input", + &request_payload, + langfuse_settings, + ); + let mut stream = provider.stream(messages.clone()); let mut accumulated = String::new(); let mut accumulated_reasoning = String::new(); @@ -7179,9 +7530,38 @@ async fn run_streaming( run_id: run_id.to_string(), session_key: session_key.to_string(), state: "error", + trace_id: trace_id.clone(), error: error_obj, seq: client_seq, }; + llm_span.set_attribute("langfuse.observation.level", "ERROR"); + llm_span.set_attribute( + "langfuse.observation.status_message", + "empty stream with zero tokens", + ); + set_langfuse_json_attribute( + &llm_span, + "langfuse.observation.output", + &serde_json::json!({ "error": "empty stream with zero tokens" }), + langfuse_settings, + ); + run_span.set_attribute("langfuse.observation.level", "ERROR"); + run_span.set_attribute( + "langfuse.observation.status_message", + "empty stream with zero tokens", + ); + set_langfuse_json_attribute( + &run_span, + "langfuse.trace.output", + &serde_json::json!({ "error": "empty stream with zero tokens" }), + langfuse_settings, + ); + set_langfuse_json_attribute( + &run_span, + "langfuse.observation.output", + &serde_json::json!({ "error": "empty stream with zero tokens" }), + langfuse_settings, + ); #[allow(clippy::unwrap_used)] // serializing known-valid struct let payload_val = serde_json::to_value(&error_payload).unwrap(); terminal_runs.write().await.insert(run_id.to_string()); @@ -7232,6 +7612,7 @@ async fn run_streaming( run_id: run_id.to_string(), session_key: session_key.to_string(), state: "final", + trace_id: trace_id.clone(), text: accumulated.clone(), model: provider.id().to_string(), provider: provider_name.to_string(), @@ -7249,6 +7630,34 @@ async fn run_streaming( reasoning: reasoning.clone(), seq: client_seq, }; + let final_output = serde_json::json!({ + "text": accumulated.clone(), + "reasoning": reasoning.clone(), + "usage": usage_observability_value(&usage), + }); + if let Ok(usage_json) = + serde_json::to_string(&usage_observability_value(&usage)) + { + llm_span.set_attribute("langfuse.observation.usage_details", usage_json); + } + set_langfuse_json_attribute( + &llm_span, + "langfuse.observation.output", + &final_output, + langfuse_settings, + ); + set_langfuse_json_attribute( + &run_span, + "langfuse.trace.output", + &final_output, + langfuse_settings, + ); + set_langfuse_json_attribute( + &run_span, + "langfuse.observation.output", + &final_output, + langfuse_settings, + ); #[allow(clippy::unwrap_used)] // serializing known-valid struct let payload_val = serde_json::to_value(&final_payload).unwrap(); terminal_runs.write().await.insert(run_id.to_string()); @@ -7282,6 +7691,7 @@ async fn run_streaming( audio_path, reasoning, llm_api_response, + trace_id: trace_id.clone(), }); }, StreamEvent::Error(msg) => { @@ -7298,6 +7708,14 @@ async fn run_streaming( &mut rate_limit_backoff_ms, ) { + llm_span.set_attribute("langfuse.observation.level", "ERROR"); + llm_span.set_attribute("langfuse.observation.status_message", msg.clone()); + set_langfuse_json_attribute( + &llm_span, + "langfuse.observation.output", + &serde_json::json!({ "error": msg.clone(), "retry_after_ms": delay_ms }), + langfuse_settings, + ); warn!( run_id, error = %msg, @@ -7347,9 +7765,32 @@ async fn run_streaming( run_id: run_id.to_string(), session_key: session_key.to_string(), state: "error", + trace_id, error: error_obj, seq: client_seq, }; + llm_span.set_attribute("langfuse.observation.level", "ERROR"); + llm_span.set_attribute("langfuse.observation.status_message", msg.clone()); + set_langfuse_json_attribute( + &llm_span, + "langfuse.observation.output", + &serde_json::json!({ "error": msg.clone() }), + langfuse_settings, + ); + run_span.set_attribute("langfuse.observation.level", "ERROR"); + run_span.set_attribute("langfuse.observation.status_message", msg.clone()); + set_langfuse_json_attribute( + &run_span, + "langfuse.trace.output", + &serde_json::json!({ "error": msg }), + langfuse_settings, + ); + set_langfuse_json_attribute( + &run_span, + "langfuse.observation.output", + &serde_json::json!({ "error": msg }), + langfuse_settings, + ); #[allow(clippy::unwrap_used)] // serializing known-valid struct let payload_val = serde_json::to_value(&error_payload).unwrap(); terminal_runs.write().await.insert(run_id.to_string()); @@ -11915,6 +12356,46 @@ mod tests { assert!(json.get("started_at").is_none()); } + #[test] + fn chat_final_broadcast_serializes_trace_id_in_camel_case() { + let payload = ChatFinalBroadcast { + run_id: "run-1".to_string(), + session_key: "session-1".to_string(), + state: "final", + trace_id: Some("trace-1".to_string()), + text: "done".to_string(), + model: "gpt-5".to_string(), + provider: "openai".to_string(), + input_tokens: 12, + output_tokens: 34, + duration_ms: 56, + request_input_tokens: Some(12), + request_output_tokens: Some(34), + message_index: 7, + reply_medium: ReplyMedium::Text, + iterations: None, + tool_calls_made: None, + audio: None, + audio_warning: None, + reasoning: None, + seq: None, + }; + let json = serde_json::to_value(payload).unwrap(); + assert_eq!(json["traceId"], "trace-1"); + assert!(json.get("trace_id").is_none()); + } + + #[test] + fn active_assistant_draft_persists_trace_id() { + let mut draft = ActiveAssistantDraft::new("run-1", "gpt-5", "openai", None); + draft.append_text("partial"); + draft.set_trace_id(Some("trace-1".to_string())); + + let persisted = draft.to_persisted_message().to_value(); + assert_eq!(persisted["trace_id"], "trace-1"); + assert_eq!(persisted["run_id"], "run-1"); + } + #[tokio::test] async fn peek_returns_inactive_when_no_run() { let dir = tempfile::tempdir().expect("tempdir"); @@ -12021,6 +12502,7 @@ mod tests { let mut draft = ActiveAssistantDraft::new(run_id, "test-model", "test-provider", None); draft.append_text("Partial answer"); + draft.set_trace_id(Some("trace-abort".to_string())); draft }); service @@ -12096,6 +12578,7 @@ mod tests { assert_eq!(history[1]["role"].as_str(), Some("tool_result")); assert_eq!(history[2]["role"].as_str(), Some("assistant")); assert_eq!(history[2]["content"].as_str(), Some("Partial answer")); + assert_eq!(history[2]["trace_id"].as_str(), Some("trace-abort")); } #[tokio::test] diff --git a/crates/cli/Cargo.toml b/crates/cli/Cargo.toml index db2f36dd7..5d535437e 100644 --- a/crates/cli/Cargo.toml +++ b/crates/cli/Cargo.toml @@ -61,6 +61,7 @@ path = "src/main.rs" [dependencies] anyhow = { workspace = true } +base64 = { workspace = true } clap = { workspace = true } dotenvy = { workspace = true } moltis-agents = { workspace = true } @@ -82,6 +83,9 @@ moltis-skills = { workspace = true } moltis-tools = { workspace = true } moltis-web = { optional = true, workspace = true } open = { workspace = true } +opentelemetry = { workspace = true } +opentelemetry-otlp = { workspace = true } +opentelemetry_sdk = { workspace = true } rand = { workspace = true } reqwest = { workspace = true } secrecy = { workspace = true } @@ -90,6 +94,7 @@ sqlx = { workspace = true } time = { workspace = true } tokio = { workspace = true } tracing = { workspace = true } +tracing-opentelemetry = { workspace = true } tracing-subscriber = { workspace = true } uuid = { workspace = true } which = { workspace = true } diff --git a/crates/cli/src/main.rs b/crates/cli/src/main.rs index 98543c1e6..70056733e 100644 --- a/crates/cli/src/main.rs +++ b/crates/cli/src/main.rs @@ -39,11 +39,14 @@ mod sandbox_commands; mod service_commands; #[cfg(feature = "tailscale")] mod tailscale_commands; +mod telemetry; use { anyhow::anyhow, clap::{Parser, Subcommand}, moltis_gateway::logs::{EnabledLogLevels, LogBroadcastLayer, LogBuffer}, + opentelemetry::trace::TracerProvider as _, + telemetry::{TelemetryHandles, build_langfuse_tracing}, tracing::info, tracing_subscriber::{EnvFilter, fmt, layer::SubscriberExt, util::SubscriberInitExt}, }; @@ -236,7 +239,11 @@ enum SkillAction { /// Initialise tracing and optionally attach a [`LogBroadcastLayer`] that /// captures events into an in-memory ring buffer for the web UI. -fn init_telemetry(cli: &Cli, log_buffer: Option) { +fn init_telemetry( + cli: &Cli, + config: &moltis_config::MoltisConfig, + log_buffer: Option, +) -> anyhow::Result { // Start with user-specified or default log level let base_filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new(&cli.log_level)); @@ -257,25 +264,50 @@ fn init_telemetry(cli: &Cli, log_buffer: Option) { } let registry = tracing_subscriber::registry().with(filter); - - // Optionally attach the in-memory capture layer. - let log_layer = log_buffer.map(LogBroadcastLayer::new); + let langfuse = build_langfuse_tracing(config)?; if cli.json_logs { - registry - .with(fmt::layer().json().with_target(true).with_thread_ids(false)) - .with(log_layer) - .init(); + if let Some(langfuse) = langfuse { + let fmt_layer = fmt::layer().json().with_target(true).with_thread_ids(false); + let log_layer = log_buffer.map(LogBroadcastLayer::new); + let otel_layer = + tracing_opentelemetry::layer().with_tracer(langfuse.provider.tracer("moltis")); + registry + .with(otel_layer) + .with(fmt_layer) + .with(log_layer) + .init(); + Ok(TelemetryHandles::new(langfuse.provider)) + } else { + let fmt_layer = fmt::layer().json().with_target(true).with_thread_ids(false); + let log_layer = log_buffer.map(LogBroadcastLayer::new); + registry.with(fmt_layer).with(log_layer).init(); + Ok(TelemetryHandles::empty()) + } } else { - registry - .with( - fmt::layer() - .with_target(true) - .with_thread_ids(false) - .with_ansi(true), - ) - .with(log_layer) - .init(); + if let Some(langfuse) = langfuse { + let fmt_layer = fmt::layer() + .with_target(true) + .with_thread_ids(false) + .with_ansi(true); + let log_layer = log_buffer.map(LogBroadcastLayer::new); + let otel_layer = + tracing_opentelemetry::layer().with_tracer(langfuse.provider.tracer("moltis")); + registry + .with(otel_layer) + .with(fmt_layer) + .with(log_layer) + .init(); + Ok(TelemetryHandles::new(langfuse.provider)) + } else { + let fmt_layer = fmt::layer() + .with_target(true) + .with_thread_ids(false) + .with_ansi(true); + let log_layer = log_buffer.map(LogBroadcastLayer::new); + registry.with(fmt_layer).with(log_layer).init(); + Ok(TelemetryHandles::empty()) + } } } @@ -358,21 +390,8 @@ async fn main() -> anyhow::Result<()> { dotenvy::dotenv().ok(); let cli = Cli::parse(); - // Create the log buffer only for the gateway command so the web UI can - // display captured log entries. Default capacity (1000) can be overridden - // via `server.log_buffer_size` in moltis.toml. - let log_buffer = if matches!(cli.command, None | Some(Commands::Gateway)) { - Some(LogBuffer::default()) - } else { - None - }; - - init_telemetry(&cli, log_buffer.clone()); - - info!(version = moltis_config::VERSION, "moltis starting"); - - // Apply directory overrides before any command so all subcommands - // (config check, db, sandbox, etc.) respect --config-dir / --data-dir. + // Apply directory overrides before loading config so telemetry reads the + // same configuration path as the rest of the process. if let Some(ref dir) = cli.config_dir { moltis_config::set_config_dir(dir.clone()); } @@ -387,27 +406,39 @@ async fn main() -> anyhow::Result<()> { // hard requirement for startup; fail fast if directory initialization fails. let config_dir = moltis_config::config_dir().ok_or_else(|| anyhow!("unable to resolve config directory"))?; - std::fs::create_dir_all(&config_dir).unwrap_or_else(|e| { - panic!( - "failed to create config directory {}: {e}", + std::fs::create_dir_all(&config_dir).map_err(|error| { + anyhow!( + "failed to create config directory {}: {error}", config_dir.display() ) - }); + })?; let data_dir = moltis_config::data_dir(); - std::fs::create_dir_all(&data_dir).unwrap_or_else(|e| { - panic!( - "failed to create data directory {}: {e}", + std::fs::create_dir_all(&data_dir).map_err(|error| { + anyhow!( + "failed to create data directory {}: {error}", data_dir.display() ) - }); + })?; - match cli.command { + let config = moltis_config::discover_and_load(); + + // Create the log buffer only for the gateway command so the web UI can + // display captured log entries. Default capacity (1000) can be overridden + // via `server.log_buffer_size` in moltis.toml. + let log_buffer = if matches!(cli.command, None | Some(Commands::Gateway)) { + Some(LogBuffer::default()) + } else { + None + }; + + let telemetry = init_telemetry(&cli, &config, log_buffer.clone())?; + + info!(version = moltis_config::VERSION, "moltis starting"); + + let result = match cli.command { // Default: start gateway when no subcommand is provided None | Some(Commands::Gateway) => { - // Load config to get server settings - let config = moltis_config::discover_and_load(); - // CLI args override config values let bind = cli.bind.unwrap_or(config.server.bind); let port = cli.port.unwrap_or(config.server.port); @@ -474,7 +505,10 @@ async fn main() -> anyhow::Result<()> { eprintln!("command not yet implemented"); Ok(()) }, - } + }; + + telemetry.shutdown(); + result } async fn handle_skills(action: SkillAction) -> anyhow::Result<()> { diff --git a/crates/cli/src/telemetry.rs b/crates/cli/src/telemetry.rs new file mode 100644 index 000000000..839eafdfd --- /dev/null +++ b/crates/cli/src/telemetry.rs @@ -0,0 +1,172 @@ +use std::{collections::HashMap, time::Duration}; + +use { + anyhow::{Context, Result, anyhow}, + base64::Engine, + moltis_config::{MoltisConfig, VERSION}, + opentelemetry::{KeyValue, global}, + opentelemetry_otlp::{Protocol, SpanExporter, WithExportConfig, WithHttpConfig}, + opentelemetry_sdk::{ + Resource, + trace::{Sampler, SdkTracerProvider}, + }, + secrecy::ExposeSecret, +}; + +pub struct TelemetryHandles { + tracer_provider: Option, +} + +impl TelemetryHandles { + #[must_use] + pub const fn empty() -> Self { + Self { + tracer_provider: None, + } + } + + #[must_use] + pub const fn new(tracer_provider: SdkTracerProvider) -> Self { + Self { + tracer_provider: Some(tracer_provider), + } + } + + pub fn shutdown(self) { + if let Some(provider) = self.tracer_provider + && let Err(error) = provider.shutdown() + { + eprintln!("failed to flush OpenTelemetry spans during shutdown: {error}"); + } + } +} + +#[derive(Debug)] +pub struct LangfuseTracing { + pub provider: SdkTracerProvider, +} + +pub fn build_langfuse_tracing(config: &MoltisConfig) -> Result> { + let langfuse = &config.metrics.langfuse; + if !langfuse.enabled { + return Ok(None); + } + + let host = langfuse + .host + .as_deref() + .map(str::trim) + .filter(|value| !value.is_empty()) + .ok_or_else(|| { + anyhow!("metrics.langfuse.enabled = true but metrics.langfuse.host is missing") + })?; + let public_key = langfuse.public_key.as_ref().ok_or_else(|| { + anyhow!("metrics.langfuse.enabled = true but metrics.langfuse.public_key is missing") + })?; + let secret_key = langfuse.secret_key.as_ref().ok_or_else(|| { + anyhow!("metrics.langfuse.enabled = true but metrics.langfuse.secret_key is missing") + })?; + + let endpoint = build_langfuse_traces_endpoint(host); + let exporter = SpanExporter::builder() + .with_http() + .with_protocol(Protocol::HttpBinary) + .with_endpoint(endpoint) + .with_timeout(Duration::from_secs(10)) + .with_headers(langfuse_headers( + public_key.expose_secret(), + secret_key.expose_secret(), + )) + .build() + .context("failed to build Langfuse OTLP exporter")?; + + let mut resource = vec![ + KeyValue::new("service.name", "moltis"), + KeyValue::new("service.version", VERSION.to_string()), + ]; + if let Some(environment) = langfuse + .environment + .as_deref() + .map(str::trim) + .filter(|value| !value.is_empty()) + { + resource.push(KeyValue::new( + "deployment.environment", + environment.to_string(), + )); + } + + let sample_rate = langfuse.sample_rate.clamp(0.0, 1.0); + let provider = SdkTracerProvider::builder() + .with_sampler(Sampler::ParentBased(Box::new(Sampler::TraceIdRatioBased( + sample_rate, + )))) + .with_resource(Resource::builder_empty().with_attributes(resource).build()) + .with_batch_exporter(exporter) + .build(); + + global::set_tracer_provider(provider.clone()); + + Ok(Some(LangfuseTracing { provider })) +} + +fn build_langfuse_traces_endpoint(host: &str) -> String { + let trimmed = host.trim().trim_end_matches('/'); + format!("{trimmed}/api/public/otel/v1/traces") +} + +fn langfuse_headers(public_key: &str, secret_key: &str) -> HashMap { + let auth = + base64::engine::general_purpose::STANDARD.encode(format!("{public_key}:{secret_key}")); + HashMap::from([ + ("Authorization".to_string(), format!("Basic {auth}")), + ("x-langfuse-ingestion-version".to_string(), "4".to_string()), + ]) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn builds_langfuse_traces_endpoint() { + assert_eq!( + build_langfuse_traces_endpoint("https://cloud.langfuse.com/"), + "https://cloud.langfuse.com/api/public/otel/v1/traces" + ); + } + + #[test] + fn langfuse_headers_include_basic_auth_and_ingestion_version() { + let headers = langfuse_headers("pk", "sk"); + assert_eq!( + headers.get("Authorization").map(String::as_str), + Some("Basic cGs6c2s=") + ); + assert_eq!( + headers + .get("x-langfuse-ingestion-version") + .map(String::as_str), + Some("4") + ); + } + + #[test] + fn disabled_langfuse_returns_none() { + let config = MoltisConfig::default(); + assert!(build_langfuse_tracing(&config).expect("ok").is_none()); + } + + #[test] + fn enabled_langfuse_requires_credentials() { + let mut config = MoltisConfig::default(); + config.metrics.langfuse = moltis_config::schema::LangfuseConfig { + enabled: true, + host: Some("https://cloud.langfuse.com".to_string()), + ..moltis_config::schema::LangfuseConfig::default() + }; + + let error = build_langfuse_tracing(&config).expect_err("should fail"); + assert!(error.to_string().contains("public_key")); + } +} diff --git a/crates/common/src/lib.rs b/crates/common/src/lib.rs index 8206b1a7e..79b4b2a38 100644 --- a/crates/common/src/lib.rs +++ b/crates/common/src/lib.rs @@ -2,6 +2,7 @@ pub mod error; pub mod hooks; +pub mod observability; pub mod secret_serde; pub mod types; diff --git a/crates/common/src/observability.rs b/crates/common/src/observability.rs new file mode 100644 index 000000000..95f2041e4 --- /dev/null +++ b/crates/common/src/observability.rs @@ -0,0 +1,260 @@ +use serde_json::Value; + +const REDACTED: &str = "[REDACTED]"; +const SENSITIVE_KEYS: &[&str] = &[ + "api_key", + "apikey", + "token", + "access_token", + "refresh_token", + "password", + "secret", + "authorization", + "cookie", + "set-cookie", + "bearer", +]; +const TOKEN_PREFIXES: &[&str] = &[ + "sk-", + "pk-lf-", + "xoxb-", + "xapp-", + "xoxp-", + "ghp_", + "ghu_", + "github_pat_", +]; + +#[must_use] +pub fn truncate_text(input: &str, max_bytes: usize) -> String { + if input.len() <= max_bytes { + return input.to_string(); + } + + let original_len = input.len(); + let mut end = max_bytes; + while end > 0 && !input.is_char_boundary(end) { + end -= 1; + } + + format!( + "{}\n\n[truncated — {original_len} bytes total]", + &input[..end] + ) +} + +#[must_use] +pub fn is_sensitive_key(key: &str) -> bool { + let normalized = key.trim().to_ascii_lowercase().replace(['-', ' '], "_"); + SENSITIVE_KEYS + .iter() + .any(|needle| normalized.contains(needle)) +} + +#[must_use] +pub fn redact_json_value(value: &Value) -> Value { + match value { + Value::Object(map) => { + let mut redacted = serde_json::Map::with_capacity(map.len()); + for (key, value) in map { + if is_sensitive_key(key) { + redacted.insert(key.clone(), Value::String(REDACTED.to_string())); + } else { + redacted.insert(key.clone(), redact_json_value(value)); + } + } + Value::Object(redacted) + }, + Value::Array(values) => Value::Array(values.iter().map(redact_json_value).collect()), + _ => value.clone(), + } +} + +#[must_use] +pub fn sanitize_json_for_observability(value: &Value, max_bytes: usize, redact: bool) -> Value { + let prepared = if redact { + redact_json_value(value) + } else { + value.clone() + }; + + match serde_json::to_string(&prepared) { + Ok(serialized) if serialized.len() <= max_bytes => prepared, + Ok(serialized) => Value::String(truncate_text(&serialized, max_bytes)), + Err(_) => Value::String("[unserializable json]".to_string()), + } +} + +#[must_use] +pub fn sanitize_text_for_observability(input: &str, max_bytes: usize, redact: bool) -> String { + let text = if redact { + redact_text(input) + } else { + input.to_string() + }; + truncate_text(&text, max_bytes) +} + +#[must_use] +pub fn redact_text(input: &str) -> String { + let mut result = String::with_capacity(input.len()); + + for segment in input.split_inclusive('\n') { + let (line, newline) = if let Some(stripped) = segment.strip_suffix('\n') { + (stripped, "\n") + } else { + (segment, "") + }; + result.push_str(&redact_line(line)); + result.push_str(newline); + } + + result +} + +fn redact_line(line: &str) -> String { + let trimmed = line.trim(); + if trimmed.is_empty() { + return line.to_string(); + } + + for separator in [":", "="] { + if let Some(index) = line.find(separator) { + let key = &line[..index]; + if is_sensitive_key(key) { + let separator_end = index + separator.len(); + let prefix = line[..separator_end].trim_end(); + return format!("{prefix} {REDACTED}"); + } + } + } + + let lower = line.to_ascii_lowercase(); + if let Some(index) = lower.find("authorization:") { + return format!("{}Authorization: {REDACTED}", &line[..index]); + } + if let Some(index) = lower.find("cookie:") { + return format!("{}Cookie: {REDACTED}", &line[..index]); + } + if let Some(index) = lower.find("set-cookie:") { + return format!("{}Set-Cookie: {REDACTED}", &line[..index]); + } + + let bearer_redacted = redact_bearer_token(line); + redact_prefixed_tokens(&bearer_redacted) +} + +fn redact_bearer_token(line: &str) -> String { + let lower = line.to_ascii_lowercase(); + let Some(index) = lower.find("bearer ") else { + return line.to_string(); + }; + + let token_start = index + "bearer ".len(); + let token_len = line[token_start..] + .chars() + .take_while(|ch| is_token_char(*ch)) + .map(char::len_utf8) + .sum::(); + if token_len == 0 { + return line.to_string(); + } + + let token_end = token_start + token_len; + format!("{}Bearer {REDACTED}{}", &line[..index], &line[token_end..]) +} + +fn redact_prefixed_tokens(line: &str) -> String { + let mut current = line.to_string(); + for prefix in TOKEN_PREFIXES { + current = redact_token_prefix(¤t, prefix); + } + current +} + +fn redact_token_prefix(line: &str, prefix: &str) -> String { + let mut result = String::with_capacity(line.len()); + let mut rest = line; + + while let Some(index) = rest.find(prefix) { + let token_start = index; + let token_body = &rest[token_start + prefix.len()..]; + let token_len = token_body + .chars() + .take_while(|ch| is_token_char(*ch)) + .map(char::len_utf8) + .sum::(); + + if token_len < 8 { + result.push_str(&rest[..token_start + prefix.len()]); + rest = &rest[token_start + prefix.len()..]; + continue; + } + + let token_end = token_start + prefix.len() + token_len; + result.push_str(&rest[..token_start]); + result.push_str(REDACTED); + rest = &rest[token_end..]; + } + + result.push_str(rest); + result +} + +fn is_token_char(ch: char) -> bool { + ch.is_ascii_alphanumeric() || matches!(ch, '-' | '_' | '.' | '~') +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn redact_json_replaces_sensitive_keys() { + let input = serde_json::json!({ + "api_key": "sk-123", + "nested": { + "access_token": "abc", + "safe": "ok" + } + }); + + let redacted = redact_json_value(&input); + + assert_eq!(redacted["api_key"], REDACTED); + assert_eq!(redacted["nested"]["access_token"], REDACTED); + assert_eq!(redacted["nested"]["safe"], "ok"); + } + + #[test] + fn redact_text_replaces_sensitive_assignments() { + let input = "api_key = sk-secret\nAuthorization: Bearer abc123456789\nsafe = ok"; + let output = redact_text(input); + + assert!(output.contains("api_key = [REDACTED]")); + assert!(output.contains("Authorization: [REDACTED]")); + assert!(output.contains("safe = ok")); + } + + #[test] + fn redact_text_replaces_known_prefix_tokens() { + let input = "Langfuse pk-lf-1234567890abcdef and OpenAI sk-abcdefghijklmnop"; + let output = redact_text(input); + + assert!(!output.contains("pk-lf-1234567890abcdef")); + assert!(!output.contains("sk-abcdefghijklmnop")); + assert!(output.contains(REDACTED)); + } + + #[test] + fn sanitize_json_truncates_large_payloads() { + let input = serde_json::json!({ + "value": "x".repeat(200) + }); + + let output = sanitize_json_for_observability(&input, 32, false); + let output = output.as_str().expect("string output"); + + assert!(output.contains("[truncated")); + } +} diff --git a/crates/config/src/schema.rs b/crates/config/src/schema.rs index d8cf0e90a..58f2b4f74 100644 --- a/crates/config/src/schema.rs +++ b/crates/config/src/schema.rs @@ -1181,6 +1181,122 @@ impl Default for GraphqlConfig { } } +/// How much content Moltis should attach to Langfuse traces. +#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "kebab-case")] +pub enum TraceContentMode { + /// Only structural metadata, never prompt/output bodies. + Off, + /// Prompt/output bodies are truncated and redacted before export. + #[default] + Sanitized, + /// Prompt/output bodies are truncated but otherwise sent as-is. + Full, +} + +const fn is_default_trace_content_mode(value: &TraceContentMode) -> bool { + matches!(value, TraceContentMode::Sanitized) +} + +const fn default_langfuse_sample_rate() -> f64 { + 1.0 +} + +const fn is_default_langfuse_sample_rate(value: &f64) -> bool { + *value == 1.0 +} + +const fn default_langfuse_max_content_bytes() -> usize { + 8_192 +} + +const fn is_default_langfuse_max_content_bytes(value: &usize) -> bool { + *value == default_langfuse_max_content_bytes() +} + +/// Langfuse tracing export configuration. +#[derive(Clone, Serialize, Deserialize)] +#[serde(default)] +pub struct LangfuseConfig { + /// Enable OTLP trace export to Langfuse. + pub enabled: bool, + /// Base URL of the Langfuse instance. + pub host: Option, + /// Langfuse public key. + #[serde( + default, + serialize_with = "serialize_option_secret", + skip_serializing_if = "Option::is_none" + )] + pub public_key: Option>, + /// Langfuse secret key. + #[serde( + default, + serialize_with = "serialize_option_secret", + skip_serializing_if = "Option::is_none" + )] + pub secret_key: Option>, + /// Deployment environment label shown in Langfuse. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub environment: Option, + /// Additional immutable trace tags. + #[serde(default, skip_serializing_if = "Vec::is_empty")] + pub tags: Vec, + /// Trace sampling ratio in the range `0.0..=1.0`. + #[serde( + default = "default_langfuse_sample_rate", + skip_serializing_if = "is_default_langfuse_sample_rate" + )] + pub sample_rate: f64, + /// How much prompt/output content to attach to traces. + #[serde(default, skip_serializing_if = "is_default_trace_content_mode")] + pub trace_content: TraceContentMode, + /// Maximum bytes to keep for any prompt/output/tool attribute. + #[serde( + default = "default_langfuse_max_content_bytes", + skip_serializing_if = "is_default_langfuse_max_content_bytes" + )] + pub max_content_bytes: usize, +} + +impl std::fmt::Debug for LangfuseConfig { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("LangfuseConfig") + .field("enabled", &self.enabled) + .field("host", &self.host) + .field( + "public_key", + &self.public_key.as_ref().map(|_| "[REDACTED]"), + ) + .field( + "secret_key", + &self.secret_key.as_ref().map(|_| "[REDACTED]"), + ) + .field("environment", &self.environment) + .field("tags", &self.tags) + .field("sample_rate", &self.sample_rate) + .field("trace_content", &self.trace_content) + .field("max_content_bytes", &self.max_content_bytes) + .finish() + } +} + +impl Default for LangfuseConfig { + fn default() -> Self { + Self { + enabled: false, + host: None, + public_key: None, + secret_key: None, + environment: None, + tags: Vec::new(), + sample_rate: default_langfuse_sample_rate(), + trace_content: TraceContentMode::Sanitized, + max_content_bytes: default_langfuse_max_content_bytes(), + } + } +} + /// Metrics and observability configuration. #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(default)] @@ -1199,6 +1315,9 @@ pub struct MetricsConfig { /// Additional labels to add to all metrics. #[serde(default)] pub labels: HashMap, + /// Optional Langfuse export configuration for traces. + #[serde(default)] + pub langfuse: LangfuseConfig, } fn default_metrics_history_points() -> usize { @@ -1212,6 +1331,7 @@ impl Default for MetricsConfig { prometheus_endpoint: true, history_points: default_metrics_history_points(), labels: HashMap::new(), + langfuse: LangfuseConfig::default(), } } } diff --git a/crates/config/src/template.rs b/crates/config/src/template.rs index 16dfb1062..6a1df14ea 100644 --- a/crates/config/src/template.rs +++ b/crates/config/src/template.rs @@ -502,6 +502,21 @@ prometheus_endpoint = true # Expose /metrics endpoint for Prometheus scra # labels = {{ environment = "production", instance = "main" }} # Additional labels to add to all metrics +# ── Langfuse Trace Export (optional) ───────────────────────────────────────── +# Exports tracing spans to Langfuse via OTLP/HTTP. +# This is separate from Prometheus metrics and is disabled by default. + +# [metrics.langfuse] +# enabled = true +# host = "https://cloud.langfuse.com" # Base URL of your Langfuse instance +# public_key = "pk-lf-..." # Or use env substitution +# secret_key = "sk-lf-..." # Or use env substitution +# environment = "development" # Optional environment label in Langfuse +# tags = ["moltis", "local"] # Optional immutable trace tags +# sample_rate = 1.0 # 0.0..=1.0, applied at the root trace +# trace_content = "sanitized" # "off" | "sanitized" | "full" +# max_content_bytes = 8192 # Per prompt/output/tool attribute cap + # ══════════════════════════════════════════════════════════════════════════════ # HEARTBEAT # ══════════════════════════════════════════════════════════════════════════════ diff --git a/crates/config/src/validate.rs b/crates/config/src/validate.rs index 34fafcd06..b3a19708d 100644 --- a/crates/config/src/validate.rs +++ b/crates/config/src/validate.rs @@ -263,6 +263,20 @@ fn build_schema_map() -> KnownKeys { ])) }; + let langfuse = || { + Struct(HashMap::from([ + ("enabled", Leaf), + ("host", Leaf), + ("public_key", Leaf), + ("secret_key", Leaf), + ("environment", Leaf), + ("tags", Leaf), + ("sample_rate", Leaf), + ("trace_content", Leaf), + ("max_content_bytes", Leaf), + ])) + }; + let mcp_oauth_override = || { Struct(HashMap::from([ ("client_id", Leaf), @@ -433,6 +447,7 @@ fn build_schema_map() -> KnownKeys { ("prometheus_endpoint", Leaf), ("history_points", Leaf), ("labels", Map(Box::new(Leaf))), + ("langfuse", langfuse()), ])), ), ( diff --git a/crates/sessions/src/message.rs b/crates/sessions/src/message.rs index 4e339303e..80a595f7f 100644 --- a/crates/sessions/src/message.rs +++ b/crates/sessions/src/message.rs @@ -84,6 +84,9 @@ pub enum PersistedMessage { /// Agent run ID linking this response to its parent user message. #[serde(skip_serializing_if = "Option::is_none")] run_id: Option, + /// OpenTelemetry trace ID for cross-system debugging. + #[serde(skip_serializing_if = "Option::is_none")] + trace_id: Option, }, Tool { tool_call_id: String, @@ -232,6 +235,7 @@ impl PersistedMessage { audio, seq: None, run_id: None, + trace_id: None, } } @@ -429,6 +433,7 @@ mod tests { audio: None, seq: None, run_id: None, + trace_id: None, }; let json = serde_json::to_value(&msg).unwrap(); assert_eq!(json["role"], "assistant"); @@ -604,6 +609,40 @@ mod tests { } } + #[test] + fn assistant_with_trace_id_roundtrips() { + let original = PersistedMessage::Assistant { + content: "response".to_string(), + created_at: Some(12345), + model: Some("gpt-4o".to_string()), + provider: Some("openai".to_string()), + input_tokens: Some(100), + output_tokens: Some(50), + duration_ms: Some(2_000), + request_input_tokens: Some(100), + request_output_tokens: Some(50), + tool_calls: None, + reasoning: None, + llm_api_response: None, + audio: None, + seq: None, + run_id: Some("run-123".to_string()), + trace_id: Some("trace-abc".to_string()), + }; + let json = original.to_value(); + assert_eq!(json["trace_id"], "trace-abc"); + let parsed: PersistedMessage = serde_json::from_value(json).unwrap(); + match parsed { + PersistedMessage::Assistant { + trace_id, run_id, .. + } => { + assert_eq!(run_id.as_deref(), Some("run-123")); + assert_eq!(trace_id.as_deref(), Some("trace-abc")); + }, + _ => panic!("expected Assistant message"), + } + } + #[test] fn assistant_without_audio_field_deserializes() { // Old sessions without audio field should still parse correctly. diff --git a/crates/web/src/assets/js/chat-ui.js b/crates/web/src/assets/js/chat-ui.js index 3975f3b82..e3097d1ab 100644 --- a/crates/web/src/assets/js/chat-ui.js +++ b/crates/web/src/assets/js/chat-ui.js @@ -134,7 +134,7 @@ export function appendReasoningDisclosure(messageEl, reasoningText) { } export function chatAddErrorCard(err) { - if (!S.chatMsgBox) return; + if (!S.chatMsgBox) return null; clearChatEmptyState(); var el = document.createElement("div"); el.className = "msg error-card"; @@ -183,10 +183,11 @@ export function chatAddErrorCard(err) { S.chatMsgBox.appendChild(el); S.chatMsgBox.scrollTop = S.chatMsgBox.scrollHeight; + return el; } export function chatAddErrorMsg(message) { - chatAddErrorCard(parseErrorMessage(message)); + return chatAddErrorCard(parseErrorMessage(message)); } export function renderApprovalCard(requestId, command) { diff --git a/crates/web/src/assets/js/components/run-detail.js b/crates/web/src/assets/js/components/run-detail.js index 08d9b5890..b6de90679 100644 --- a/crates/web/src/assets/js/components/run-detail.js +++ b/crates/web/src/assets/js/components/run-detail.js @@ -30,15 +30,22 @@ function OverviewTab({ data }) { var provider = null; var totalInput = 0; var totalOutput = 0; + var traceIds = new Set(); for (var m of messages) { if (m.role === "assistant") { if (m.model) model = m.model; if (m.provider) provider = m.provider; totalInput += m.inputTokens || 0; totalOutput += m.outputTokens || 0; + if (m.trace_id) traceIds.add(m.trace_id); } } + var traceList = Array.from(traceIds); return html`
+
+ Run: + ${data.runId || "unknown"} +
User messages: ${summary.userMessages || 0} @@ -67,6 +74,14 @@ function OverviewTab({ data }) {
` : null } + ${ + traceList.length > 0 + ? html`
+ Trace: + ${traceList.join(", ")} +
` + : null + }
`; } @@ -115,6 +130,13 @@ function MessagesTab({ data }) { >${m.role} #${i} + ${ + m.trace_id + ? html`trace ${m.trace_id}` + : null + } ${ typeof m.content === "string" && m.content ? html`