diff --git a/src/config.rs b/src/config.rs index 2f4c0c6..f03056a 100644 --- a/src/config.rs +++ b/src/config.rs @@ -39,6 +39,28 @@ pub struct ProvidersConfig { pub discord: DiscordConfig, #[serde(default)] pub slack: SlackConfig, + #[serde(default)] + pub gemini: GeminiConfig, + #[serde(default)] + pub openrouter: OpenRouterConfig, + #[serde(default)] + pub openai: OpenAiConfig, +} + +#[derive(Debug, Clone, Serialize, Deserialize, Default)] +pub struct GeminiConfig { + pub api_key: Option, +} + +#[derive(Debug, Clone, Serialize, Deserialize, Default)] +pub struct OpenRouterConfig { + pub api_key: Option, +} + +#[derive(Debug, Clone, Serialize, Deserialize, Default)] +pub struct OpenAiConfig { + pub api_key: Option, + pub base_url: Option, } #[derive(Debug, Clone, Serialize, Deserialize, Default)] @@ -287,6 +309,25 @@ pub struct TmuxSessionMonitor { pub channel: Option, pub mention: Option, pub format: Option, + // ── Optional tmux content transformation ── + #[serde(default)] + pub summarize: bool, + #[serde(default = "default_summarizer")] + pub summarizer: String, + #[serde(default)] + pub heartbeat_mins: u64, + /// Minimum number of new lines added before triggering summarization. 0 = no filter. + #[serde(default)] + pub min_new_lines: usize, + /// Minimum minutes between LLM summarization calls for this session. 0 = no throttle. + #[serde(default)] + pub summarize_interval_mins: u64, + /// Minutes between heartbeat events. 0 = disable. Overrides heartbeat_mins when set. + #[serde(default)] + pub heartbeat_interval: u64, + /// Minimum minutes between AI summary events. 0 = use summarize_interval_mins. + #[serde(default)] + pub summary_interval: u64, } impl Default for TmuxSessionMonitor { @@ -299,6 +340,13 @@ impl Default for TmuxSessionMonitor { channel: None, mention: None, format: None, + summarize: false, + summarizer: default_summarizer(), + heartbeat_mins: 0, + min_new_lines: 0, + summarize_interval_mins: 0, + heartbeat_interval: 0, + summary_interval: 0, } } } @@ -428,6 +476,10 @@ fn default_true() -> bool { true } +fn default_summarizer() -> String { + "gemini:gemini-2.5-flash".to_string() +} + pub fn default_sink_name() -> String { "discord".to_string() } @@ -1082,6 +1134,7 @@ mod tests { legacy_default_channel: None, }, slack: SlackConfig::default(), + ..Default::default() }, routes: vec![RouteRule { event: "tmux.keyword".into(), @@ -1319,6 +1372,7 @@ message = " ping " legacy_default_channel: None, }, slack: SlackConfig::default(), + ..Default::default() }, cron: CronConfig { poll_interval_secs: 30, diff --git a/src/daemon.rs b/src/daemon.rs index a99eac5..6e352af 100644 --- a/src/daemon.rs +++ b/src/daemon.rs @@ -734,6 +734,7 @@ mod tests { name: Some("codex".into()), }), active_wrapper_monitor: true, + ..Default::default() }, ); let state = AppState { diff --git a/src/discord.rs b/src/discord.rs index 589c0f2..e02423d 100644 --- a/src/discord.rs +++ b/src/discord.rs @@ -113,7 +113,9 @@ impl DiscordClient { return Ok(()); } Err(error) => { - self.record_failure(&key); + if is_infrastructure_failure(&error) { + self.record_failure(&key); + } if let Some(retry_after) = error.retry_after && attempt < MAX_ATTEMPTS { @@ -270,6 +272,18 @@ impl DiscordClient { } } +/// Returns true only for infrastructure failures (5xx, network/connection errors). +/// Discord API policy rejections (4xx: 404, 400/30046, 401, 429) are NOT infrastructure +/// failures and should not open the circuit breaker — they are handled at each call site. +fn is_infrastructure_failure(error: &DiscordSendError) -> bool { + let m = &error.message; + m.contains("500 ") + || m.contains("502 ") + || m.contains("503 ") + || m.contains("504 ") + || (m.contains("failed:") && !m.contains("failed with")) +} + fn parse_retry_after(status: StatusCode, body: &str) -> Option { if status != StatusCode::TOO_MANY_REQUESTS { return None; diff --git a/src/dispatch.rs b/src/dispatch.rs index 98a9277..ec4041d 100644 --- a/src/dispatch.rs +++ b/src/dispatch.rs @@ -688,6 +688,7 @@ fn should_bypass_routine_batch(event: &IncomingEvent) -> bool { kind.ends_with(".failed") || kind.ends_with(".blocked") || kind == "tmux.stale" + || kind == "tmux.session_ended" || kind.starts_with("github.ci-") } diff --git a/src/events.rs b/src/events.rs index 420e80d..0b01b45 100644 --- a/src/events.rs +++ b/src/events.rs @@ -605,6 +605,53 @@ impl IncomingEvent { } } + /// Tmux content changed — with AI-generated summary. + #[allow(clippy::too_many_arguments)] + pub fn tmux_content_changed_with_metadata( + session: String, + pane_name: String, + summary: String, + raw_truncated: String, + backend: String, + content_mode: String, + channel: Option, + ) -> Self { + Self { + kind: "tmux.content_changed".to_string(), + channel, + mention: None, + format: None, + template: None, + payload: json!({ + "session": session, + "pane": pane_name, + "summary": summary, + "raw_truncated": raw_truncated, + "backend": backend, + "content_mode": content_mode, + }), + } + } + + /// Heartbeat — no changes detected for a given interval. + pub fn tmux_heartbeat( + session: String, + minutes_since_change: u64, + channel: Option, + ) -> Self { + Self { + kind: "tmux.heartbeat".to_string(), + channel, + mention: None, + format: None, + template: None, + payload: json!({ + "session": session, + "minutes_since_change": minutes_since_change, + }), + } + } + pub fn with_mention(mut self, mention: Option) -> Self { self.mention = mention; self diff --git a/src/main.rs b/src/main.rs index 6f0e423..10abbc1 100644 --- a/src/main.rs +++ b/src/main.rs @@ -20,6 +20,7 @@ mod router; mod sink; mod slack; mod source; +mod summarize; mod tmux_wrapper; mod update; @@ -399,6 +400,7 @@ mod tests { name: Some("codex".into()), }), active_wrapper_monitor: true, + ..Default::default() }]); assert!(output.contains( diff --git a/src/render/default.rs b/src/render/default.rs index 2052082..d770e8a 100644 --- a/src/render/default.rs +++ b/src/render/default.rs @@ -303,6 +303,53 @@ impl Renderer for DefaultRenderer { ), ("tmux.stale", MessageFormat::Raw) => serde_json::to_string_pretty(payload)?, + ("tmux.content_changed", MessageFormat::Compact) => { + let session = string_field(payload, "session")?; + if content_mode(payload) == "raw" { + let raw = string_field(payload, "raw_truncated")?; + format!("📋 **{session}**: {raw}") + } else { + let summary = string_field(payload, "summary")?; + format!("🤖 **{session}**: {summary}") + } + } + ("tmux.content_changed", MessageFormat::Alert) => { + let session = string_field(payload, "session")?; + if content_mode(payload) == "raw" { + let raw = string_field(payload, "raw_truncated")?; + format!("📋 **{session}**\n```\n{raw}\n```") + } else { + let summary = string_field(payload, "summary")?; + format!("🤖 **{session}**\n{summary}") + } + } + ("tmux.content_changed", MessageFormat::Inline) => { + let session = string_field(payload, "session")?; + if content_mode(payload) == "raw" { + format!("📋 [{session}]") + } else { + let summary = string_field(payload, "summary")?; + format!("🤖 [{session}] {summary}") + } + } + ("tmux.content_changed", MessageFormat::Raw) => serde_json::to_string_pretty(payload)?, + + ("tmux.heartbeat", MessageFormat::Compact) => { + let session = string_field(payload, "session")?; + let minutes = payload.field_u64("minutes_since_change")?; + format!("💓 **{session}**: idle for {minutes}m") + } + ("tmux.heartbeat", MessageFormat::Alert) => { + let session = string_field(payload, "session")?; + let minutes = payload.field_u64("minutes_since_change")?; + format!("💓 **{session}**: idle for {minutes}m") + } + ("tmux.heartbeat", MessageFormat::Inline) => { + let session = string_field(payload, "session")?; + format!("💓 [{session}]") + } + ("tmux.heartbeat", MessageFormat::Raw) => serde_json::to_string_pretty(payload)?, + (_, MessageFormat::Raw) => serde_json::to_string_pretty(payload)?, (_, _) => serde_json::to_string(payload)?, }; @@ -330,6 +377,10 @@ fn optional_string_field(payload: &Value, key: &str) -> Option { .map(ToString::to_string) } +fn content_mode(payload: &Value) -> String { + optional_string_field(payload, "content_mode").unwrap_or_else(|| "summary".to_string()) +} + fn optional_u64_field(payload: &Value, key: &str) -> Option { payload.get(key).and_then(Value::as_u64) } @@ -965,4 +1016,65 @@ mod tests { assert!(rendered.starts_with("🚨")); assert!(rendered.contains("release published")); } + + #[test] + fn renders_tmux_content_changed_formats() { + let renderer = DefaultRenderer; + let event = IncomingEvent::tmux_content_changed_with_metadata( + "issue-24".into(), + "0.1".into(), + "Agent fixed the failing test".into(), + "tail".into(), + "gemini-cli".into(), + "summary".into(), + None, + ); + + assert_eq!( + renderer.render(&event, &MessageFormat::Compact).unwrap(), + "🤖 **issue-24**: Agent fixed the failing test" + ); + assert_eq!( + renderer.render(&event, &MessageFormat::Inline).unwrap(), + "🤖 [issue-24] Agent fixed the failing test" + ); + } + + #[test] + fn renders_tmux_content_changed_raw_formats() { + let renderer = DefaultRenderer; + let event = IncomingEvent::tmux_content_changed_with_metadata( + "issue-24".into(), + "0.1".into(), + "ignored".into(), + "cargo build\nerror: failed".into(), + "raw".into(), + "raw".into(), + None, + ); + + assert_eq!( + renderer.render(&event, &MessageFormat::Compact).unwrap(), + "📋 **issue-24**: cargo build\nerror: failed" + ); + assert_eq!( + renderer.render(&event, &MessageFormat::Inline).unwrap(), + "📋 [issue-24]" + ); + } + + #[test] + fn renders_tmux_heartbeat_formats() { + let renderer = DefaultRenderer; + let event = IncomingEvent::tmux_heartbeat("issue-24".into(), 42, None); + + assert_eq!( + renderer.render(&event, &MessageFormat::Compact).unwrap(), + "💓 **issue-24**: idle for 42m" + ); + assert_eq!( + renderer.render(&event, &MessageFormat::Inline).unwrap(), + "💓 [issue-24]" + ); + } } diff --git a/src/router.rs b/src/router.rs index 00b0be5..acfd41f 100644 --- a/src/router.rs +++ b/src/router.rs @@ -321,6 +321,9 @@ fn route_candidates(kind: &str) -> Vec<&str> { | "session.handoff-needed" => { vec![kind, "session.*"] } + "tmux.content_changed" | "tmux.heartbeat" => { + vec![kind, "tmux.*"] + } other => vec![other], } } diff --git a/src/source/tmux.rs b/src/source/tmux.rs index 84b8521..2e22044 100644 --- a/src/source/tmux.rs +++ b/src/source/tmux.rs @@ -16,6 +16,7 @@ use crate::events::{IncomingEvent, MessageFormat, RoutingMetadata}; use crate::keyword_window::{PendingKeywordHits, collect_keyword_hits}; use crate::router::glob_match; use crate::source::Source; +use crate::summarize::{SummarizedContent, build_summarizer}; pub type SharedTmuxRegistry = Arc>>; @@ -44,7 +45,7 @@ pub struct ParentProcessInfo { pub name: Option, } -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize, Default)] pub struct RegisteredTmuxSession { pub session: String, pub channel: Option, @@ -65,6 +66,40 @@ pub struct RegisteredTmuxSession { pub parent_process: Option, #[serde(default)] pub active_wrapper_monitor: bool, + #[serde(default)] + pub summarize: bool, + #[serde(default)] + pub summarizer: String, + #[serde(default)] + pub heartbeat_mins: u64, + #[serde(default)] + pub min_new_lines: usize, + #[serde(default)] + pub summarize_interval_mins: u64, + #[serde(default)] + pub heartbeat_interval: u64, + #[serde(default)] + pub summary_interval: u64, +} + +impl RegisteredTmuxSession { + /// Effective heartbeat interval: heartbeat_interval overrides heartbeat_mins when > 0. + pub fn effective_heartbeat_mins(&self) -> u64 { + if self.heartbeat_interval > 0 { + self.heartbeat_interval + } else { + self.heartbeat_mins + } + } + + /// Effective summary throttle: summary_interval overrides summarize_interval_mins when > 0. + pub fn effective_summary_interval(&self) -> u64 { + if self.summary_interval > 0 { + self.summary_interval + } else { + self.summarize_interval_mins + } + } } impl From<&TmuxSessionMonitor> for RegisteredTmuxSession { @@ -82,6 +117,13 @@ impl From<&TmuxSessionMonitor> for RegisteredTmuxSession { registration_source: RegistrationSource::ConfigMonitor, parent_process: None, active_wrapper_monitor: false, + summarize: value.summarize, + summarizer: value.summarizer.clone(), + heartbeat_mins: value.heartbeat_mins, + min_new_lines: value.min_new_lines, + summarize_interval_mins: value.summarize_interval_mins, + heartbeat_interval: value.heartbeat_interval, + summary_interval: value.summary_interval, } } } @@ -151,6 +193,9 @@ struct TmuxPaneState { struct TmuxMonitorState { panes: HashMap, pending_keyword_hits: HashMap, + session_last_heartbeat: HashMap, + session_last_summarized: HashMap, + session_last_summarized_snapshot: HashMap, } struct TmuxPaneSnapshot { @@ -164,9 +209,13 @@ struct TmuxPaneSnapshot { pub async fn monitor_registered_session( registration: RegisteredTmuxSession, client: DaemonClient, + providers: crate::config::ProvidersConfig, ) -> Result<()> { let mut panes = HashMap::new(); let mut pending_keyword_hits = None; + let mut last_heartbeat = None; + let mut last_summarized: Option = None; + let mut last_summarized_snapshot: Option = None; let poll_interval = Duration::from_secs(1); loop { @@ -236,6 +285,7 @@ pub async fn monitor_registered_session( existing.snapshot = pane.content; existing.last_change = now; existing.last_stale_notification = None; + last_heartbeat = Some(now); } else if should_emit_stale(existing, now, registration.stale_minutes) { client .emit(tmux_stale_event( @@ -252,6 +302,24 @@ pub async fn monitor_registered_session( } panes.retain(|pane_id, _| active_panes.contains(pane_id)); + maybe_emit_registered_session_heartbeat( + ®istration, + &client, + &panes, + &mut last_heartbeat, + Instant::now(), + ) + .await?; + maybe_emit_registered_session_summary( + ®istration, + &client, + &panes, + &mut last_summarized, + &mut last_summarized_snapshot, + providers.clone(), + Instant::now(), + ) + .await?; sleep(poll_interval).await; } @@ -311,6 +379,7 @@ async fn poll_tmux( for (session_name, registration) in &sessions { if registration.active_wrapper_monitor { state.pending_keyword_hits.remove(session_name); + state.session_last_heartbeat.remove(session_name); continue; } @@ -338,6 +407,7 @@ async fn poll_tmux( ) .await?; state.panes.retain(|_, pane| pane.session != *session_name); + state.session_last_heartbeat.remove(session_name); continue; } Err(error) => { @@ -352,6 +422,7 @@ async fn poll_tmux( match snapshot_tmux_session(session_name).await { Ok(panes) => { + let mut session_changed = false; for pane in panes { let pane_key = format!("{}::{}", pane.session, pane.pane_id); active_panes.insert(pane_key.clone()); @@ -373,6 +444,10 @@ async fn poll_tmux( pane_dead: pane.pane_dead, }, ); + state + .session_last_heartbeat + .insert(session_name.clone(), now); + session_changed = true; None } Some(existing) => { @@ -388,6 +463,10 @@ async fn poll_tmux( existing.content_hash = hash; existing.last_change = now; existing.last_stale_notification = None; + state + .session_last_heartbeat + .insert(session_name.clone(), now); + session_changed = true; Some(hits) } else { if should_emit_stale(existing, now, registration.stale_minutes) { @@ -414,6 +493,75 @@ async fn poll_tmux( ); } } + maybe_emit_session_heartbeat( + session_name, + registration, + tx, + state, + Instant::now(), + session_changed, + ) + .await?; + + // Interval-based summarization: fire only when stable content changed since last summary + if registration.summarize { + let last_sum = state.session_last_summarized.get(session_name).copied(); + let last_sum_snapshot = state.session_last_summarized_snapshot + .get(session_name) + .cloned(); + + let interval_secs = registration.effective_summary_interval() * 60; + let interval_elapsed = if interval_secs == 0 { + true + } else { + last_sum + .map(|t| now.duration_since(t).as_secs() >= interval_secs) + .unwrap_or(true) + }; + + if interval_elapsed { + let last_change = state + .panes + .values() + .filter(|p| p.session == *session_name) + .map(|p| p.last_change) + .max(); + let changed = if let Some(ls) = last_change { + let last_sum_time = last_sum + .unwrap_or_else(|| now - Duration::from_secs(interval_secs + 1)); + ls > last_sum_time || last_sum.is_none() + } else { + false + }; + + if changed + && let Some(pane) = state + .panes + .values() + .find(|p| p.session == *session_name) + { + let min_ok = registration.min_new_lines == 0 + || count_new_lines( + last_sum_snapshot.as_deref().unwrap_or(""), + &pane.snapshot, + ) >= registration.min_new_lines; + if min_ok { + let snapshot = pane.snapshot.clone(); + let pane_name = pane.pane_name.clone(); + spawn_content_changed_task( + tx.clone(), + registration.clone(), + session_name.clone(), + pane_name, + snapshot.clone(), + config.providers.clone(), + ); + state.session_last_summarized.insert(session_name.to_string(), now); + state.session_last_summarized_snapshot.insert(session_name.to_string(), snapshot); + } + } + } + } } Err(error) => eprintln!( "clawhip source tmux snapshot failed for {}: {error}", @@ -434,6 +582,15 @@ async fn poll_tmux( state .pending_keyword_hits .retain(|session, _| sessions.contains_key(session)); + state + .session_last_heartbeat + .retain(|session, _| sessions.contains_key(session)); + state + .session_last_summarized + .retain(|session, _| sessions.contains_key(session)); + state + .session_last_summarized_snapshot + .retain(|session, _| sessions.contains_key(session)); Ok(()) } @@ -652,6 +809,246 @@ fn tmux_stale_event( .with_format(registration.format.clone()) } +fn tmux_content_changed_event( + registration: &RegisteredTmuxSession, + session: String, + pane: String, + content: SummarizedContent, +) -> IncomingEvent { + IncomingEvent::tmux_content_changed_with_metadata( + session, + pane, + content.summary, + content.raw_truncated, + content.backend, + content.content_mode.as_str().to_string(), + registration.channel.clone(), + ) + .with_mention(registration.mention.clone()) + .with_format(registration.format.clone()) +} + +fn spawn_content_changed_task( + emitter: E, + registration: RegisteredTmuxSession, + session_name: String, + pane_name: String, + content: String, + providers: crate::config::ProvidersConfig, +) where + E: EventEmitter + Clone + Send + Sync + 'static, +{ + tokio::spawn(async move { + match build_summarizer(®istration.summarizer, &providers) { + Ok(summarizer) => match summarizer.summarize(&content, &session_name).await { + Ok(transformed) => { + let event = tmux_content_changed_event( + ®istration, + session_name, + pane_name, + transformed, + ); + let _ = emitter.emit(event).await; + } + Err(error) => { + eprintln!("clawhip: summarize failed for {session_name}: {error}"); + } + }, + Err(error) => { + eprintln!( + "clawhip: could not initialize summarizer '{}' for {session_name}: {error}", + registration.summarizer + ); + } + } + }); +} + +fn tmux_heartbeat_event( + registration: &RegisteredTmuxSession, + session: String, + minutes_since_change: u64, +) -> IncomingEvent { + IncomingEvent::tmux_heartbeat(session, minutes_since_change, registration.channel.clone()) + .with_mention(registration.mention.clone()) + .with_format(registration.format.clone()) +} + +fn count_new_lines(old: &str, new: &str) -> usize { + new.lines().count().saturating_sub(old.lines().count()) +} + +#[cfg(test)] +fn should_summarize_now( + last_summarized: Option, + interval_mins: u64, + min_new_lines: usize, + old_content: &str, + new_content: &str, + now: Instant, +) -> bool { + if min_new_lines > 0 && count_new_lines(old_content, new_content) < min_new_lines { + return false; + } + if interval_mins == 0 { + return true; + } + last_summarized + .map(|t| now.duration_since(t) >= Duration::from_secs(interval_mins * 60)) + .unwrap_or(true) +} + +async fn maybe_emit_session_heartbeat( + session_name: &str, + registration: &RegisteredTmuxSession, + emitter: &E, + state: &mut TmuxMonitorState, + now: Instant, + session_changed: bool, +) -> Result<()> { + if registration.effective_heartbeat_mins() == 0 { + state.session_last_heartbeat.remove(session_name); + return Ok(()); + } + + if session_changed { + state + .session_last_heartbeat + .insert(session_name.to_string(), now); + } + + let interval = Duration::from_secs(registration.effective_heartbeat_mins() * 60); + let Some(last_change) = state + .panes + .values() + .filter(|pane| pane.session == session_name) + .map(|pane| pane.last_change) + .max() + else { + state + .session_last_heartbeat + .entry(session_name.to_string()) + .or_insert(now); + return Ok(()); + }; + + let last_heartbeat = state + .session_last_heartbeat + .entry(session_name.to_string()) + .or_insert(now); + if now.duration_since(last_change) < interval || now.duration_since(*last_heartbeat) < interval + { + return Ok(()); + } + + emitter + .emit(tmux_heartbeat_event( + registration, + session_name.to_string(), + now.duration_since(last_change).as_secs() / 60, + )) + .await?; + *last_heartbeat = now; + Ok(()) +} + +async fn maybe_emit_registered_session_heartbeat( + registration: &RegisteredTmuxSession, + emitter: &E, + panes: &HashMap, + last_heartbeat: &mut Option, + now: Instant, +) -> Result<()> { + if registration.effective_heartbeat_mins() == 0 { + *last_heartbeat = None; + return Ok(()); + } + + let interval = Duration::from_secs(registration.effective_heartbeat_mins() * 60); + let Some(last_change) = panes.values().map(|pane| pane.last_change).max() else { + last_heartbeat.get_or_insert(now); + return Ok(()); + }; + + let last_heartbeat_at = last_heartbeat.get_or_insert(now); + if now.duration_since(last_change) < interval + || now.duration_since(*last_heartbeat_at) < interval + { + return Ok(()); + } + + emitter + .emit(tmux_heartbeat_event( + registration, + registration.session.clone(), + now.duration_since(last_change).as_secs() / 60, + )) + .await?; + *last_heartbeat_at = now; + Ok(()) +} + +async fn maybe_emit_registered_session_summary( + registration: &RegisteredTmuxSession, + emitter: &E, + panes: &HashMap, + last_summarized: &mut Option, + last_summarized_snapshot: &mut Option, + providers: crate::config::ProvidersConfig, + now: Instant, +) -> Result<()> { + if !registration.summarize || registration.effective_summary_interval() == 0 { + return Ok(()); + } + + let interval = Duration::from_secs(registration.effective_summary_interval() * 60); + + // Check: has enough time elapsed since last summary? + let interval_elapsed = last_summarized + .map(|t| now.duration_since(t) >= interval) + .unwrap_or(true); + if !interval_elapsed { + return Ok(()); + } + + // Check: did content change since last summary? + let Some(last_change) = panes.values().map(|p| p.last_change).max() else { + return Ok(()); + }; + let last_sum_time = last_summarized + .unwrap_or_else(|| now - interval - Duration::from_secs(1)); + if last_change <= last_sum_time && last_summarized.is_some() { + return Ok(()); + } + + // Get current pane snapshot + let Some(pane) = panes.values().next() else { + return Ok(()); + }; + + // Check min_new_lines threshold against snapshot at last summary + if registration.min_new_lines > 0 { + let old = last_summarized_snapshot.as_deref().unwrap_or(""); + if count_new_lines(old, &pane.snapshot) < registration.min_new_lines { + return Ok(()); + } + } + + // All conditions met — fire summary + *last_summarized = Some(now); + *last_summarized_snapshot = Some(pane.snapshot.clone()); + + spawn_content_changed_task( + emitter.clone(), + registration.clone(), + pane.session.clone(), + pane.pane_name.clone(), + pane.snapshot.clone(), + providers, + ); + Ok(()) +} + async fn flush_pending_keyword_hits( pending_keyword_hits: &mut Option, registration: &RegisteredTmuxSession, @@ -870,6 +1267,7 @@ mod tests { registration_source: RegistrationSource::ConfigMonitor, parent_process: None, active_wrapper_monitor: false, + ..Default::default() } } @@ -988,6 +1386,7 @@ PR created #7", keyword_window_secs: 30, stale_minutes: 10, format: None, + ..Default::default() }; let registration = RegisteredTmuxSession::from(&monitor); @@ -1018,6 +1417,7 @@ PR created #7", registration_source: RegistrationSource::ConfigMonitor, parent_process: None, active_wrapper_monitor: false, + ..Default::default() }, ), ( @@ -1038,6 +1438,7 @@ PR created #7", name: Some("codex".into()), }), active_wrapper_monitor: true, + ..Default::default() }, ), ( @@ -1055,6 +1456,7 @@ PR created #7", registration_source: RegistrationSource::ConfigMonitor, parent_process: None, active_wrapper_monitor: false, + ..Default::default() }, ), ]); @@ -1076,6 +1478,7 @@ PR created #7", registration_source: RegistrationSource::ConfigMonitor, parent_process: None, active_wrapper_monitor: false, + ..Default::default() }, )]), ); @@ -1386,6 +1789,7 @@ error: failed"; registration_source: RegistrationSource::ConfigMonitor, parent_process: None, active_wrapper_monitor: false, + ..Default::default() }], Some(&available_sessions), ); @@ -1416,6 +1820,7 @@ error: failed"; registration_source: RegistrationSource::ConfigMonitor, parent_process: None, active_wrapper_monitor: false, + ..Default::default() }, RegisteredTmuxSession { session: "omx-*".into(), @@ -1430,6 +1835,7 @@ error: failed"; registration_source: RegistrationSource::ConfigMonitor, parent_process: None, active_wrapper_monitor: false, + ..Default::default() }, ], Some(&available_sessions), @@ -1458,6 +1864,7 @@ error: failed"; registration_source: RegistrationSource::ConfigMonitor, parent_process: None, active_wrapper_monitor: false, + ..Default::default() }, RegisteredTmuxSession { session: "rcc-*".into(), @@ -1472,6 +1879,7 @@ error: failed"; registration_source: RegistrationSource::ConfigMonitor, parent_process: None, active_wrapper_monitor: false, + ..Default::default() }, ], None, @@ -1499,6 +1907,7 @@ error: failed"; registration_source: RegistrationSource::ConfigMonitor, parent_process: None, active_wrapper_monitor: false, + ..Default::default() }, RegisteredTmuxSession { session: "rcc-api".into(), @@ -1513,6 +1922,7 @@ error: failed"; registration_source: RegistrationSource::ConfigMonitor, parent_process: None, active_wrapper_monitor: false, + ..Default::default() }, ], Some(&available_sessions), @@ -1540,6 +1950,7 @@ error: failed"; registration_source: RegistrationSource::ConfigMonitor, parent_process: None, active_wrapper_monitor: false, + ..Default::default() }, RegisteredTmuxSession { session: "rcc-*".into(), @@ -1554,6 +1965,7 @@ error: failed"; registration_source: RegistrationSource::ConfigMonitor, parent_process: None, active_wrapper_monitor: false, + ..Default::default() }, ], Some(&available_sessions), @@ -1586,6 +1998,7 @@ error: failed"; registration_source: RegistrationSource::ConfigMonitor, parent_process: None, active_wrapper_monitor: false, + ..Default::default() }, RegisteredTmuxSession { session: "abc*".into(), @@ -1600,6 +2013,7 @@ error: failed"; registration_source: RegistrationSource::ConfigMonitor, parent_process: None, active_wrapper_monitor: false, + ..Default::default() }, ], Some(&available_sessions), @@ -1656,4 +2070,84 @@ error: failed"; // Dead pane should never emit stale, even after 1 hour idle assert!(!should_emit_stale(&pane, Instant::now(), 1)); } + + #[test] + fn count_new_lines_no_change_returns_zero() { + assert_eq!(count_new_lines("a\nb\n", "a\nb\n"), 0); + } + + #[test] + fn count_new_lines_fewer_lines_returns_zero() { + assert_eq!(count_new_lines("a\nb\nc\n", "a\n"), 0); + } + + #[test] + fn count_new_lines_returns_net_addition() { + assert_eq!(count_new_lines("a\nb\n", "a\nb\nc\nd\n"), 2); + } + + #[test] + fn should_summarize_now_no_filter_no_throttle() { + assert!(should_summarize_now( + None, + 0, + 0, + "old", + "new", + Instant::now() + )); + } + + #[test] + fn should_summarize_now_no_prior_summarize_always_allowed_with_throttle() { + assert!(should_summarize_now( + None, + 5, + 0, + "old", + "new", + Instant::now() + )); + } + + #[test] + fn should_summarize_now_interval_allows_when_expired() { + let now = Instant::now(); + let old_enough = now - Duration::from_secs(6 * 60); + assert!(should_summarize_now( + Some(old_enough), + 5, + 0, + "old", + "new", + now + )); + } + + #[test] + fn should_summarize_now_interval_blocks_when_too_soon() { + let now = Instant::now(); + let recent = now - Duration::from_secs(30); + // interval_mins=5 but only 30 seconds elapsed + assert!(!should_summarize_now(Some(recent), 5, 0, "old", "new", now)); + } + + #[test] + fn should_summarize_now_min_new_lines_allows_when_met() { + let old = "a\n".repeat(10); + let new = format!("{}{}", old, "b\n".repeat(5)); + assert!(should_summarize_now(None, 0, 5, &old, &new, Instant::now())); + } + + #[test] + fn should_summarize_now_min_new_lines_blocks_when_insufficient() { + assert!(!should_summarize_now( + None, + 0, + 5, + "a\nb\n", + "a\nb\nc\n", + Instant::now() + )); + } } diff --git a/src/summarize.rs b/src/summarize.rs new file mode 100644 index 0000000..3c39e21 --- /dev/null +++ b/src/summarize.rs @@ -0,0 +1,580 @@ +use std::error::Error; +use std::time::Duration; + +use async_trait::async_trait; +use reqwest::Client; +use serde_json::{Value, json}; +use tokio::process::Command; + +use crate::config::ProvidersConfig; + +const MAX_INPUT_CHARS: usize = 4000; +const DEFAULT_GEMINI_MODEL: &str = "gemini-2.5-flash"; +const DEFAULT_OPENROUTER_MODEL: &str = "openai/gpt-4o-mini"; +const DEFAULT_OPENAI_MODEL: &str = "gpt-4o-mini"; +const DEFAULT_OPENAI_BASE_URL: &str = "https://api.openai.com/v1"; +const OPENROUTER_BASE_URL: &str = "https://openrouter.ai/api/v1"; +const HTTP_TIMEOUT_SECS: u64 = 30; + +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum ContentMode { + Summary, + Raw, +} + +impl ContentMode { + pub fn as_str(&self) -> &'static str { + match self { + Self::Summary => "summary", + Self::Raw => "raw", + } + } +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct SummarizedContent { + pub summary: String, + pub raw_truncated: String, + pub backend: String, + pub content_mode: ContentMode, +} + +/// Trait for tmux content summarizers/transformers. +#[async_trait] +pub trait Summarizer: Send + Sync { + fn name(&self) -> &str; + + async fn summarize( + &self, + content: &str, + session: &str, + ) -> Result>; +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum SummarizerSpec { + Gemini { model: String }, + OpenRouter { model: String }, + OpenAiCompatible { model: String }, + Raw, +} + +pub fn parse_summarizer_spec( + summarizer: &str, +) -> Result> { + let trimmed = summarizer.trim(); + let gemini_default = || SummarizerSpec::Gemini { + model: DEFAULT_GEMINI_MODEL.to_string(), + }; + if trimmed.eq_ignore_ascii_case("raw") { + return Ok(SummarizerSpec::Raw); + } + if trimmed.eq_ignore_ascii_case("openrouter") { + return Ok(SummarizerSpec::OpenRouter { + model: DEFAULT_OPENROUTER_MODEL.to_string(), + }); + } + if let Some(model) = trimmed.strip_prefix("openrouter:") { + return Ok(SummarizerSpec::OpenRouter { + model: default_if_empty(model, DEFAULT_OPENROUTER_MODEL), + }); + } + if trimmed.eq_ignore_ascii_case("openai") || trimmed.eq_ignore_ascii_case("openai-compatible") { + return Ok(SummarizerSpec::OpenAiCompatible { + model: DEFAULT_OPENAI_MODEL.to_string(), + }); + } + if trimmed.is_empty() || trimmed.eq_ignore_ascii_case("gemini") { + return Ok(gemini_default()); + } + if let Some(model) = trimmed + .strip_prefix("openai:") + .or_else(|| trimmed.strip_prefix("openai-compatible:")) + { + return Ok(SummarizerSpec::OpenAiCompatible { + model: default_if_empty(model, DEFAULT_OPENAI_MODEL), + }); + } + if let Some(model) = trimmed.strip_prefix("gemini:") { + return Ok(SummarizerSpec::Gemini { + model: default_if_empty(model, DEFAULT_GEMINI_MODEL), + }); + } + Err(format!("unsupported summarizer backend '{trimmed}'").into()) +} + +pub fn build_summarizer( + summarizer: &str, + providers: &ProvidersConfig, +) -> Result, Box> { + match parse_summarizer_spec(summarizer)? { + SummarizerSpec::Gemini { model } => Ok(Box::new(GeminiCli { model })), + SummarizerSpec::OpenRouter { model } => { + Ok(Box::new(OpenAiCompatibleSummarizer::new_openrouter( + model, + providers.openrouter.api_key.as_deref(), + )?)) + } + SummarizerSpec::OpenAiCompatible { model } => { + Ok(Box::new(OpenAiCompatibleSummarizer::new_openai_compatible( + model, + providers.openai.api_key.as_deref(), + providers.openai.base_url.as_deref(), + )?)) + } + SummarizerSpec::Raw => Ok(Box::new(RawPassthroughSummarizer)), + } +} + +/// Truncate content to the last `MAX_INPUT_CHARS` characters. +pub fn truncate_for_summarizer(content: &str) -> &str { + if content.len() <= MAX_INPUT_CHARS { + return content; + } + let start = content.len() - MAX_INPUT_CHARS; + let start = content[start..] + .char_indices() + .next() + .map(|(i, _)| start + i) + .unwrap_or(start); + &content[start..] +} + +fn resolve_key( + config_key: Option<&str>, + env_var: &str, + backend: &str, +) -> Result> { + if let Some(key) = config_key.filter(|k| !k.trim().is_empty()) { + return Ok(key.to_string()); + } + std::env::var(env_var).map_err(|_| { + format!( + "{env_var} is required for {backend} summarizer; set it via [providers.{backend}].api_key in config.toml or as an environment variable" + ) + .into() + }) +} + +fn default_if_empty(value: &str, default: &str) -> String { + let trimmed = value.trim(); + if trimmed.is_empty() { + default.to_string() + } else { + trimmed.to_string() + } +} + +fn summarize_system_prompt() -> &'static str { + "You summarize tmux session output for developer monitoring. \ + Focus on what the agent is doing, any errors encountered, and current status. \ + Keep it concise (2-5 sentences).\n\n\ + IMPORTANT: If the terminal output indicates the session is waiting for user input — for example: \ + a [Y/n] confirmation prompt, 'Press enter to continue', 'Allow, Deny, Always allow' tool approval \ + (Claude Code style), 'continue?', 'proceed?', 'overwrite?', an interactive menu asking for a choice, \ + or a shell/REPL prompt awaiting a command — begin your response with exactly this line:\n\ + STATUS: WAITING_FOR_INPUT\n\n\ + Otherwise do not include a STATUS line. Respond in plain text." +} + +fn summarize_user_prompt(session: &str, content: &str) -> String { + format!("Session: {session}\n\n{}", truncate_for_summarizer(content)) +} + +fn summarize_result(summary: String, raw_truncated: String, backend: &str) -> SummarizedContent { + SummarizedContent { + summary, + raw_truncated, + backend: backend.to_string(), + content_mode: ContentMode::Summary, + } +} + +fn raw_result(raw_truncated: String, backend: &str) -> SummarizedContent { + SummarizedContent { + summary: raw_truncated.clone(), + raw_truncated, + backend: backend.to_string(), + content_mode: ContentMode::Raw, + } +} + +fn openai_chat_request(model: &str, session: &str, content: &str) -> Value { + json!({ + "model": model, + "messages": [ + { + "role": "system", + "content": summarize_system_prompt(), + }, + { + "role": "user", + "content": summarize_user_prompt(session, content), + } + ], + "temperature": 0.2 + }) +} + +fn extract_openai_response_text(value: &Value) -> Option { + let content = value.pointer("/choices/0/message/content")?; + match content { + Value::String(text) => Some(text.trim().to_string()).filter(|text| !text.is_empty()), + Value::Array(parts) => { + let joined = parts + .iter() + .filter_map(|part| { + part.get("text") + .and_then(Value::as_str) + .map(str::trim) + .filter(|text| !text.is_empty()) + }) + .collect::>() + .join("\n"); + if joined.trim().is_empty() { + None + } else { + Some(joined) + } + } + _ => None, + } +} + +pub struct GeminiCli { + model: String, +} + +#[async_trait] +impl Summarizer for GeminiCli { + fn name(&self) -> &str { + "gemini-cli" + } + + async fn summarize( + &self, + content: &str, + session: &str, + ) -> Result> { + let raw_truncated = truncate_for_summarizer(content).to_string(); + let prompt = format!( + "{}\n\n{}", + summarize_system_prompt(), + summarize_user_prompt(session, content) + ); + let output = Command::new("gemini") + .arg("-m") + .arg(&self.model) + .arg("-p") + .arg(&prompt) + .output() + .await + .map_err(|e| format!("failed to spawn gemini CLI: {e}"))?; + + if !output.status.success() { + let stderr = String::from_utf8_lossy(&output.stderr); + return Err(format!( + "gemini CLI exited with status {}: {}", + output.status, + stderr.trim() + ) + .into()); + } + + let summary = String::from_utf8_lossy(&output.stdout).trim().to_string(); + if summary.is_empty() { + return Err("gemini returned an empty summary".into()); + } + + Ok(summarize_result(summary, raw_truncated, self.name())) + } +} + +pub struct RawPassthroughSummarizer; + +#[async_trait] +impl Summarizer for RawPassthroughSummarizer { + fn name(&self) -> &str { + "raw" + } + + async fn summarize( + &self, + content: &str, + _session: &str, + ) -> Result> { + Ok(raw_result( + truncate_for_summarizer(content).to_string(), + self.name(), + )) + } +} + +struct OpenAiCompatibleSummarizer { + client: Client, + backend_name: &'static str, + base_url: String, + api_key: String, + model: String, +} + +impl OpenAiCompatibleSummarizer { + fn new_openrouter( + model: String, + config_key: Option<&str>, + ) -> Result> { + let api_key = resolve_key(config_key, "OPENROUTER_API_KEY", "openrouter")?; + Self::new( + "openrouter", + OPENROUTER_BASE_URL.to_string(), + api_key, + model, + ) + } + + fn new_openai_compatible( + model: String, + config_key: Option<&str>, + config_base_url: Option<&str>, + ) -> Result> { + let base_url = config_base_url + .filter(|v| !v.trim().is_empty()) + .map(str::to_string) + .or_else(|| { + std::env::var("OPENAI_BASE_URL") + .ok() + .filter(|v| !v.trim().is_empty()) + }) + .unwrap_or_else(|| DEFAULT_OPENAI_BASE_URL.to_string()); + let api_key = resolve_key(config_key, "OPENAI_API_KEY", "openai")?; + Self::new("openai-compatible", base_url, api_key, model) + } + + fn new( + backend_name: &'static str, + base_url: String, + api_key: String, + model: String, + ) -> Result> { + let client = Client::builder() + .timeout(Duration::from_secs(HTTP_TIMEOUT_SECS)) + .build()?; + Ok(Self { + client, + backend_name, + base_url: base_url.trim_end_matches('/').to_string(), + api_key, + model, + }) + } + + fn chat_completions_url(&self) -> String { + format!("{}/chat/completions", self.base_url) + } +} + +#[async_trait] +impl Summarizer for OpenAiCompatibleSummarizer { + fn name(&self) -> &str { + self.backend_name + } + + async fn summarize( + &self, + content: &str, + session: &str, + ) -> Result> { + let raw_truncated = truncate_for_summarizer(content).to_string(); + let response = self + .client + .post(self.chat_completions_url()) + .bearer_auth(&self.api_key) + .json(&openai_chat_request(&self.model, session, content)) + .send() + .await?; + + if !response.status().is_success() { + let status = response.status(); + let body = response.text().await.unwrap_or_default(); + return Err(format!( + "{} API request failed with {}: {}", + self.backend_name, status, body + ) + .into()); + } + + let payload = response.json::().await?; + let Some(summary) = extract_openai_response_text(&payload) else { + return Err(format!("{} API returned no summary text", self.backend_name).into()); + }; + + Ok(summarize_result(summary, raw_truncated, self.name())) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn truncate_short_content_unchanged() { + let content = "hello world"; + assert_eq!(truncate_for_summarizer(content), "hello world"); + } + + #[test] + fn truncate_long_content() { + let content = "A".repeat(5000); + let truncated = truncate_for_summarizer(&content); + assert_eq!(truncated.len(), 4000); + assert_eq!(truncated, "A".repeat(4000)); + } + + #[test] + fn truncate_at_char_boundary() { + let prefix = "A".repeat(3998); + let content = format!("{}€{}", prefix, "Z"); + let truncated = truncate_for_summarizer(&content); + assert!(truncated.len() <= 4000); + } + + #[test] + fn parse_summarizer_spec_supports_raw() { + assert_eq!(parse_summarizer_spec("raw").unwrap(), SummarizerSpec::Raw); + } + + #[test] + fn parse_summarizer_spec_supports_openrouter() { + assert_eq!( + parse_summarizer_spec("openrouter:google/gemini-2.5-flash").unwrap(), + SummarizerSpec::OpenRouter { + model: "google/gemini-2.5-flash".into() + } + ); + } + + #[test] + fn parse_summarizer_spec_supports_openai() { + assert_eq!( + parse_summarizer_spec("openai:gpt-4.1-mini").unwrap(), + SummarizerSpec::OpenAiCompatible { + model: "gpt-4.1-mini".into() + } + ); + } + + #[test] + fn parse_summarizer_spec_defaults_gemini() { + assert_eq!( + parse_summarizer_spec("gemini").unwrap(), + SummarizerSpec::Gemini { + model: DEFAULT_GEMINI_MODEL.into() + } + ); + } + + #[test] + fn parse_summarizer_spec_with_gemini_model() { + assert_eq!( + parse_summarizer_spec("gemini:gemini-2.5-pro").unwrap(), + SummarizerSpec::Gemini { + model: "gemini-2.5-pro".into() + } + ); + } + + #[test] + fn parse_summarizer_spec_empty_gemini_model_uses_default() { + assert_eq!( + parse_summarizer_spec("gemini:").unwrap(), + SummarizerSpec::Gemini { + model: DEFAULT_GEMINI_MODEL.into() + } + ); + } + + #[test] + fn parse_summarizer_spec_rejects_unknown_prefix() { + assert!(parse_summarizer_spec("custom:something").is_err()); + } + + #[tokio::test] + async fn raw_passthrough_returns_truncated_content() { + let output = RawPassthroughSummarizer + .summarize("hello world", "issue-24") + .await + .unwrap(); + assert_eq!(output.summary, "hello world"); + assert_eq!(output.raw_truncated, "hello world"); + assert_eq!(output.backend, "raw"); + assert_eq!(output.content_mode, ContentMode::Raw); + } + + #[test] + fn openai_chat_request_contains_model_and_messages() { + let payload = openai_chat_request("gpt-4o-mini", "issue-24", "build failed"); + assert_eq!(payload["model"], "gpt-4o-mini"); + assert_eq!(payload["messages"][0]["role"], "system"); + assert_eq!(payload["messages"][1]["role"], "user"); + assert!( + payload["messages"][1]["content"] + .as_str() + .unwrap() + .contains("issue-24") + ); + } + + #[test] + fn extract_openai_response_text_supports_string_content() { + let payload = json!({ + "choices": [ + { + "message": { + "content": "agent fixed the test" + } + } + ] + }); + assert_eq!( + extract_openai_response_text(&payload).as_deref(), + Some("agent fixed the test") + ); + } + + #[test] + fn extract_openai_response_text_supports_content_parts() { + let payload = json!({ + "choices": [ + { + "message": { + "content": [ + {"type": "text", "text": "agent is compiling"}, + {"type": "text", "text": "waiting on cargo"} + ] + } + } + ] + }); + assert_eq!( + extract_openai_response_text(&payload).as_deref(), + Some("agent is compiling\nwaiting on cargo") + ); + } + + #[test] + fn build_summarizer_supports_raw() { + assert!(build_summarizer("raw", &ProvidersConfig::default()).is_ok()); + } + + #[test] + fn system_prompt_includes_waiting_for_input_status_marker() { + let prompt = summarize_system_prompt(); + assert!( + prompt.contains("STATUS: WAITING_FOR_INPUT"), + "system prompt must instruct the LLM to emit STATUS: WAITING_FOR_INPUT" + ); + // Confirm it covers the key OMC/OMX trigger patterns + assert!(prompt.contains("[Y/n]") || prompt.contains("Allow, Deny")); + assert!(prompt.contains("continue?") || prompt.contains("proceed?")); + } +} diff --git a/src/tmux_wrapper.rs b/src/tmux_wrapper.rs index 0ae3554..421b2c9 100644 --- a/src/tmux_wrapper.rs +++ b/src/tmux_wrapper.rs @@ -119,6 +119,7 @@ impl From for RegisteredTmuxSession { registration_source: value.registration_source, parent_process: value.parent_process, active_wrapper_monitor: true, + ..Default::default() } } } @@ -207,8 +208,9 @@ async fn register_and_start_monitor( client.register_tmux(®istration).await?; let monitor_client = client.clone(); + let providers = config.providers.clone(); Ok(tokio::spawn(async move { - monitor_registered_session(registration, monitor_client).await + monitor_registered_session(registration, monitor_client, providers).await })) } @@ -622,6 +624,7 @@ mod tests { name: Some("codex".into()), }), active_wrapper_monitor: true, + ..Default::default() }); assert!(log.contains("session=issue-105"));