diff --git a/codex-rs/core/src/unified_exec/async_watcher.rs b/codex-rs/core/src/unified_exec/async_watcher.rs index 53b6b7c77d4..22aca62aa2f 100644 --- a/codex-rs/core/src/unified_exec/async_watcher.rs +++ b/codex-rs/core/src/unified_exec/async_watcher.rs @@ -7,6 +7,8 @@ use tokio::time::Duration; use tokio::time::Instant; use tokio::time::Sleep; +use super::UnifiedExecContext; +use super::session::UnifiedExecSession; use crate::codex::Session; use crate::codex::TurnContext; use crate::exec::ExecToolCallOutput; @@ -19,10 +21,7 @@ use crate::protocol::ExecOutputStream; use crate::tools::events::ToolEmitter; use crate::tools::events::ToolEventCtx; use crate::tools::events::ToolEventStage; - -use super::CommandTranscript; -use super::UnifiedExecContext; -use super::session::UnifiedExecSession; +use crate::unified_exec::head_tail_buffer::HeadTailBuffer; pub(crate) const TRAILING_OUTPUT_GRACE: Duration = Duration::from_millis(100); @@ -40,7 +39,7 @@ const UNIFIED_EXEC_OUTPUT_DELTA_MAX_BYTES: usize = 8192; pub(crate) fn start_streaming_output( session: &UnifiedExecSession, context: &UnifiedExecContext, - transcript: Arc>, + transcript: Arc>, ) { let mut receiver = session.output_receiver(); let output_drained = session.output_drained_notify(); @@ -112,7 +111,7 @@ pub(crate) fn spawn_exit_watcher( command: Vec, cwd: PathBuf, process_id: String, - transcript: Arc>, + transcript: Arc>, started_at: Instant, ) { let exit_token = session.cancellation_token(); @@ -142,7 +141,7 @@ pub(crate) fn spawn_exit_watcher( async fn process_chunk( pending: &mut Vec, - transcript: &Arc>, + transcript: &Arc>, call_id: &str, session_ref: &Arc, turn_ref: &Arc, @@ -153,7 +152,7 @@ async fn process_chunk( while let Some(prefix) = split_valid_utf8_prefix(pending) { { let mut guard = transcript.lock().await; - guard.append(&prefix); + guard.push_chunk(prefix.to_vec()); } if *emitted_deltas >= MAX_EXEC_OUTPUT_DELTAS_PER_CALL { @@ -183,7 +182,7 @@ pub(crate) async fn emit_exec_end_for_unified_exec( command: Vec, cwd: PathBuf, process_id: Option, - transcript: Arc>, + transcript: Arc>, fallback_output: String, exit_code: i32, duration: Duration, @@ -240,15 +239,15 @@ fn split_valid_utf8_prefix_with_max(buffer: &mut Vec, max_bytes: usize) -> O } async fn resolve_aggregated_output( - transcript: &Arc>, + transcript: &Arc>, fallback: String, ) -> String { let guard = transcript.lock().await; - if guard.data.is_empty() { + if guard.retained_bytes() == 0 { return fallback; } - String::from_utf8_lossy(&guard.data).to_string() + String::from_utf8_lossy(&guard.to_bytes()).to_string() } #[cfg(test)] diff --git a/codex-rs/core/src/unified_exec/head_tail_buffer.rs b/codex-rs/core/src/unified_exec/head_tail_buffer.rs new file mode 100644 index 00000000000..85244660483 --- /dev/null +++ b/codex-rs/core/src/unified_exec/head_tail_buffer.rs @@ -0,0 +1,272 @@ +use crate::unified_exec::UNIFIED_EXEC_OUTPUT_MAX_BYTES; +use std::collections::VecDeque; + +/// A capped buffer that preserves a stable prefix ("head") and suffix ("tail"), +/// dropping the middle once it exceeds the configured maximum. The buffer is +/// symmetric meaning 50% of the capacity is allocated to the head and 50% is +/// allocated to the tail. +#[derive(Debug)] +pub(crate) struct HeadTailBuffer { + max_bytes: usize, + head_budget: usize, + tail_budget: usize, + head: VecDeque>, + tail: VecDeque>, + head_bytes: usize, + tail_bytes: usize, + omitted_bytes: usize, +} + +impl Default for HeadTailBuffer { + fn default() -> Self { + Self::new(UNIFIED_EXEC_OUTPUT_MAX_BYTES) + } +} + +impl HeadTailBuffer { + /// Create a new buffer that retains at most `max_bytes` of output. + /// + /// The retained output is split across a prefix ("head") and suffix ("tail") + /// budget, dropping bytes from the middle once the limit is exceeded. + pub(crate) fn new(max_bytes: usize) -> Self { + let head_budget = max_bytes / 2; + let tail_budget = max_bytes.saturating_sub(head_budget); + Self { + max_bytes, + head_budget, + tail_budget, + head: VecDeque::new(), + tail: VecDeque::new(), + head_bytes: 0, + tail_bytes: 0, + omitted_bytes: 0, + } + } + + // Used for tests. + #[allow(dead_code)] + /// Total bytes currently retained by the buffer (head + tail). + pub(crate) fn retained_bytes(&self) -> usize { + self.head_bytes.saturating_add(self.tail_bytes) + } + + // Used for tests. + #[allow(dead_code)] + /// Total bytes that were dropped from the middle due to the size cap. + pub(crate) fn omitted_bytes(&self) -> usize { + self.omitted_bytes + } + + /// Append a chunk of bytes to the buffer. + /// + /// Bytes are first added to the head until the head budget is full; any + /// remaining bytes are added to the tail, with older tail bytes being + /// dropped to preserve the tail budget. + pub(crate) fn push_chunk(&mut self, chunk: Vec) { + if self.max_bytes == 0 { + self.omitted_bytes = self.omitted_bytes.saturating_add(chunk.len()); + return; + } + + // Fill the head budget first, then keep a capped tail. + if self.head_bytes < self.head_budget { + let remaining_head = self.head_budget.saturating_sub(self.head_bytes); + if chunk.len() <= remaining_head { + self.head_bytes = self.head_bytes.saturating_add(chunk.len()); + self.head.push_back(chunk); + return; + } + + // Split the chunk: part goes to head, remainder goes to tail. + let (head_part, tail_part) = chunk.split_at(remaining_head); + if !head_part.is_empty() { + self.head_bytes = self.head_bytes.saturating_add(head_part.len()); + self.head.push_back(head_part.to_vec()); + } + self.push_to_tail(tail_part.to_vec()); + return; + } + + self.push_to_tail(chunk); + } + + /// Snapshot the retained output as a list of chunks. + /// + /// The returned chunks are ordered as: head chunks first, then tail chunks. + /// Omitted bytes are not represented in the snapshot. + pub(crate) fn snapshot_chunks(&self) -> Vec> { + let mut out = Vec::new(); + out.extend(self.head.iter().cloned()); + out.extend(self.tail.iter().cloned()); + out + } + + /// Return the retained output as a single byte vector. + /// + /// The output is formed by concatenating head chunks, then tail chunks. + /// Omitted bytes are not represented in the returned value. + pub(crate) fn to_bytes(&self) -> Vec { + let mut out = Vec::with_capacity(self.retained_bytes()); + for chunk in self.head.iter() { + out.extend_from_slice(chunk); + } + for chunk in self.tail.iter() { + out.extend_from_slice(chunk); + } + out + } + + /// Drain all retained chunks from the buffer and reset its state. + /// + /// The drained chunks are returned in head-then-tail order. Omitted bytes + /// are discarded along with the retained content. + pub(crate) fn drain_chunks(&mut self) -> Vec> { + let mut out: Vec> = self.head.drain(..).collect(); + out.extend(self.tail.drain(..)); + self.head_bytes = 0; + self.tail_bytes = 0; + self.omitted_bytes = 0; + out + } + + fn push_to_tail(&mut self, chunk: Vec) { + if self.tail_budget == 0 { + self.omitted_bytes = self.omitted_bytes.saturating_add(chunk.len()); + return; + } + + if chunk.len() >= self.tail_budget { + // This single chunk is larger than the whole tail budget. Keep only the last + // tail_budget bytes and drop everything else. + let start = chunk.len().saturating_sub(self.tail_budget); + let kept = chunk[start..].to_vec(); + let dropped = chunk.len().saturating_sub(kept.len()); + self.omitted_bytes = self + .omitted_bytes + .saturating_add(self.tail_bytes) + .saturating_add(dropped); + self.tail.clear(); + self.tail_bytes = kept.len(); + self.tail.push_back(kept); + return; + } + + self.tail_bytes = self.tail_bytes.saturating_add(chunk.len()); + self.tail.push_back(chunk); + self.trim_tail_to_budget(); + } + + fn trim_tail_to_budget(&mut self) { + let mut excess = self.tail_bytes.saturating_sub(self.tail_budget); + while excess > 0 { + match self.tail.front_mut() { + Some(front) if excess >= front.len() => { + excess -= front.len(); + self.tail_bytes = self.tail_bytes.saturating_sub(front.len()); + self.omitted_bytes = self.omitted_bytes.saturating_add(front.len()); + self.tail.pop_front(); + } + Some(front) => { + front.drain(..excess); + self.tail_bytes = self.tail_bytes.saturating_sub(excess); + self.omitted_bytes = self.omitted_bytes.saturating_add(excess); + break; + } + None => break, + } + } + } +} + +#[cfg(test)] +mod tests { + use super::HeadTailBuffer; + + use pretty_assertions::assert_eq; + + #[test] + fn keeps_prefix_and_suffix_when_over_budget() { + let mut buf = HeadTailBuffer::new(10); + + buf.push_chunk(b"0123456789".to_vec()); + assert_eq!(buf.omitted_bytes(), 0); + + // Exceeds max by 2; we should keep head+tail and omit the middle. + buf.push_chunk(b"ab".to_vec()); + assert!(buf.omitted_bytes() > 0); + + let rendered = String::from_utf8_lossy(&buf.to_bytes()).to_string(); + assert!(rendered.starts_with("01234")); + assert!(rendered.ends_with("89ab")); + } + + #[test] + fn max_bytes_zero_drops_everything() { + let mut buf = HeadTailBuffer::new(0); + buf.push_chunk(b"abc".to_vec()); + + assert_eq!(buf.retained_bytes(), 0); + assert_eq!(buf.omitted_bytes(), 3); + assert_eq!(buf.to_bytes(), b"".to_vec()); + assert_eq!(buf.snapshot_chunks(), Vec::>::new()); + } + + #[test] + fn head_budget_zero_keeps_only_last_byte_in_tail() { + let mut buf = HeadTailBuffer::new(1); + buf.push_chunk(b"abc".to_vec()); + + assert_eq!(buf.retained_bytes(), 1); + assert_eq!(buf.omitted_bytes(), 2); + assert_eq!(buf.to_bytes(), b"c".to_vec()); + } + + #[test] + fn draining_resets_state() { + let mut buf = HeadTailBuffer::new(10); + buf.push_chunk(b"0123456789".to_vec()); + buf.push_chunk(b"ab".to_vec()); + + let drained = buf.drain_chunks(); + assert!(!drained.is_empty()); + + assert_eq!(buf.retained_bytes(), 0); + assert_eq!(buf.omitted_bytes(), 0); + assert_eq!(buf.to_bytes(), b"".to_vec()); + } + + #[test] + fn chunk_larger_than_tail_budget_keeps_only_tail_end() { + let mut buf = HeadTailBuffer::new(10); + buf.push_chunk(b"0123456789".to_vec()); + + // Tail budget is 5 bytes. This chunk should replace the tail and keep only its last 5 bytes. + buf.push_chunk(b"ABCDEFGHIJK".to_vec()); + + let out = String::from_utf8_lossy(&buf.to_bytes()).to_string(); + assert!(out.starts_with("01234")); + assert!(out.ends_with("GHIJK")); + assert!(buf.omitted_bytes() > 0); + } + + #[test] + fn fills_head_then_tail_across_multiple_chunks() { + let mut buf = HeadTailBuffer::new(10); + + // Fill the 5-byte head budget across multiple chunks. + buf.push_chunk(b"01".to_vec()); + buf.push_chunk(b"234".to_vec()); + assert_eq!(buf.to_bytes(), b"01234".to_vec()); + + // Then fill the 5-byte tail budget. + buf.push_chunk(b"567".to_vec()); + buf.push_chunk(b"89".to_vec()); + assert_eq!(buf.to_bytes(), b"0123456789".to_vec()); + assert_eq!(buf.omitted_bytes(), 0); + + // One more byte causes the tail to drop its oldest byte. + buf.push_chunk(b"a".to_vec()); + assert_eq!(buf.to_bytes(), b"012346789a".to_vec()); + assert_eq!(buf.omitted_bytes(), 1); + } +} diff --git a/codex-rs/core/src/unified_exec/mod.rs b/codex-rs/core/src/unified_exec/mod.rs index 2cb30e5aa39..c88238fe5e1 100644 --- a/codex-rs/core/src/unified_exec/mod.rs +++ b/codex-rs/core/src/unified_exec/mod.rs @@ -37,6 +37,7 @@ use crate::sandboxing::SandboxPermissions; mod async_watcher; mod errors; +mod head_tail_buffer; mod session; mod session_manager; @@ -53,24 +54,6 @@ pub(crate) const MAX_UNIFIED_EXEC_SESSIONS: usize = 64; // Send a warning message to the models when it reaches this number of sessions. pub(crate) const WARNING_UNIFIED_EXEC_SESSIONS: usize = 60; -#[derive(Debug, Default)] -pub(crate) struct CommandTranscript { - pub data: Vec, -} - -impl CommandTranscript { - pub fn append(&mut self, bytes: &[u8]) { - self.data.extend_from_slice(bytes); - if self.data.len() > UNIFIED_EXEC_OUTPUT_MAX_BYTES { - let excess = self - .data - .len() - .saturating_sub(UNIFIED_EXEC_OUTPUT_MAX_BYTES); - self.data.drain(..excess); - } - } -} - pub(crate) struct UnifiedExecContext { pub session: Arc, pub turn: Arc, @@ -173,6 +156,7 @@ pub(crate) fn generate_chunk_id() -> String { #[cfg(test)] #[cfg(unix)] mod tests { + use super::head_tail_buffer::HeadTailBuffer; use super::*; use crate::codex::Session; use crate::codex::TurnContext; @@ -185,8 +169,6 @@ mod tests { use std::sync::Arc; use tokio::time::Duration; - use super::session::OutputBufferState; - async fn test_session_and_turn() -> (Arc, Arc) { let (session, mut turn) = make_session_and_context().await; turn.approval_policy = AskForApproval::Never; @@ -245,21 +227,36 @@ mod tests { } #[test] - fn push_chunk_trims_only_excess_bytes() { - let mut buffer = OutputBufferState::default(); + fn push_chunk_preserves_prefix_and_suffix() { + let mut buffer = HeadTailBuffer::default(); buffer.push_chunk(vec![b'a'; UNIFIED_EXEC_OUTPUT_MAX_BYTES]); buffer.push_chunk(vec![b'b']); buffer.push_chunk(vec![b'c']); - assert_eq!(buffer.total_bytes, UNIFIED_EXEC_OUTPUT_MAX_BYTES); - let snapshot = buffer.snapshot(); - assert_eq!(snapshot.len(), 3); + assert_eq!(buffer.retained_bytes(), UNIFIED_EXEC_OUTPUT_MAX_BYTES); + let snapshot = buffer.snapshot_chunks(); + + let first = snapshot.first().expect("expected at least one chunk"); + assert_eq!(first.first(), Some(&b'a')); + assert!(snapshot.iter().any(|chunk| chunk.as_slice() == b"b")); assert_eq!( - snapshot.first().unwrap().len(), - UNIFIED_EXEC_OUTPUT_MAX_BYTES - 2 + snapshot + .last() + .expect("expected at least one chunk") + .as_slice(), + b"c" ); - assert_eq!(snapshot.get(2).unwrap(), &vec![b'c']); - assert_eq!(snapshot.get(1).unwrap(), &vec![b'b']); + } + + #[test] + fn head_tail_buffer_default_preserves_prefix_and_suffix() { + let mut buffer = HeadTailBuffer::default(); + buffer.push_chunk(vec![b'a'; UNIFIED_EXEC_OUTPUT_MAX_BYTES]); + buffer.push_chunk(b"bc".to_vec()); + + let rendered = buffer.to_bytes(); + assert_eq!(rendered.first(), Some(&b'a')); + assert!(rendered.ends_with(b"bc")); } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] diff --git a/codex-rs/core/src/unified_exec/session.rs b/codex-rs/core/src/unified_exec/session.rs index 4973a1a6417..0e08c599031 100644 --- a/codex-rs/core/src/unified_exec/session.rs +++ b/codex-rs/core/src/unified_exec/session.rs @@ -1,6 +1,5 @@ #![allow(clippy::module_inception)] -use std::collections::VecDeque; use std::sync::Arc; use tokio::sync::Mutex; use tokio::sync::Notify; @@ -19,54 +18,11 @@ use crate::truncate::formatted_truncate_text; use codex_utils_pty::ExecCommandSession; use codex_utils_pty::SpawnedPty; -use super::UNIFIED_EXEC_OUTPUT_MAX_BYTES; use super::UNIFIED_EXEC_OUTPUT_MAX_TOKENS; use super::UnifiedExecError; +use super::head_tail_buffer::HeadTailBuffer; -#[derive(Debug, Default)] -pub(crate) struct OutputBufferState { - chunks: VecDeque>, - pub(crate) total_bytes: usize, -} - -impl OutputBufferState { - pub(super) fn push_chunk(&mut self, chunk: Vec) { - self.total_bytes = self.total_bytes.saturating_add(chunk.len()); - self.chunks.push_back(chunk); - - let mut excess = self - .total_bytes - .saturating_sub(UNIFIED_EXEC_OUTPUT_MAX_BYTES); - - while excess > 0 { - match self.chunks.front_mut() { - Some(front) if excess >= front.len() => { - excess -= front.len(); - self.total_bytes = self.total_bytes.saturating_sub(front.len()); - self.chunks.pop_front(); - } - Some(front) => { - front.drain(..excess); - self.total_bytes = self.total_bytes.saturating_sub(excess); - break; - } - None => break, - } - } - } - - pub(super) fn drain(&mut self) -> Vec> { - let drained: Vec> = self.chunks.drain(..).collect(); - self.total_bytes = 0; - drained - } - - pub(super) fn snapshot(&self) -> Vec> { - self.chunks.iter().cloned().collect() - } -} - -pub(crate) type OutputBuffer = Arc>; +pub(crate) type OutputBuffer = Arc>; pub(crate) struct OutputHandles { pub(crate) output_buffer: OutputBuffer, pub(crate) output_notify: Arc, @@ -90,7 +46,7 @@ impl UnifiedExecSession { initial_output_rx: tokio::sync::broadcast::Receiver>, sandbox_type: SandboxType, ) -> Self { - let output_buffer = Arc::new(Mutex::new(OutputBufferState::default())); + let output_buffer = Arc::new(Mutex::new(HeadTailBuffer::default())); let output_notify = Arc::new(Notify::new()); let cancellation_token = CancellationToken::new(); let output_drained = Arc::new(Notify::new()); @@ -163,7 +119,7 @@ impl UnifiedExecSession { async fn snapshot_output(&self) -> Vec> { let guard = self.output_buffer.lock().await; - guard.snapshot() + guard.snapshot_chunks() } pub(crate) fn sandbox_type(&self) -> SandboxType { diff --git a/codex-rs/core/src/unified_exec/session_manager.rs b/codex-rs/core/src/unified_exec/session_manager.rs index df473959d2c..1d524c23d47 100644 --- a/codex-rs/core/src/unified_exec/session_manager.rs +++ b/codex-rs/core/src/unified_exec/session_manager.rs @@ -29,27 +29,26 @@ use crate::tools::sandboxing::ToolCtx; use crate::truncate::TruncationPolicy; use crate::truncate::approx_token_count; use crate::truncate::formatted_truncate_text; - -use super::CommandTranscript; -use super::ExecCommandRequest; -use super::MAX_UNIFIED_EXEC_SESSIONS; -use super::SessionEntry; -use super::SessionStore; -use super::UnifiedExecContext; -use super::UnifiedExecError; -use super::UnifiedExecResponse; -use super::UnifiedExecSessionManager; -use super::WARNING_UNIFIED_EXEC_SESSIONS; -use super::WriteStdinRequest; -use super::async_watcher::emit_exec_end_for_unified_exec; -use super::async_watcher::spawn_exit_watcher; -use super::async_watcher::start_streaming_output; -use super::clamp_yield_time; -use super::generate_chunk_id; -use super::resolve_max_tokens; -use super::session::OutputBuffer; -use super::session::OutputHandles; -use super::session::UnifiedExecSession; +use crate::unified_exec::ExecCommandRequest; +use crate::unified_exec::MAX_UNIFIED_EXEC_SESSIONS; +use crate::unified_exec::SessionEntry; +use crate::unified_exec::SessionStore; +use crate::unified_exec::UnifiedExecContext; +use crate::unified_exec::UnifiedExecError; +use crate::unified_exec::UnifiedExecResponse; +use crate::unified_exec::UnifiedExecSessionManager; +use crate::unified_exec::WARNING_UNIFIED_EXEC_SESSIONS; +use crate::unified_exec::WriteStdinRequest; +use crate::unified_exec::async_watcher::emit_exec_end_for_unified_exec; +use crate::unified_exec::async_watcher::spawn_exit_watcher; +use crate::unified_exec::async_watcher::start_streaming_output; +use crate::unified_exec::clamp_yield_time; +use crate::unified_exec::generate_chunk_id; +use crate::unified_exec::head_tail_buffer::HeadTailBuffer; +use crate::unified_exec::resolve_max_tokens; +use crate::unified_exec::session::OutputBuffer; +use crate::unified_exec::session::OutputHandles; +use crate::unified_exec::session::UnifiedExecSession; const UNIFIED_EXEC_ENV: [(&str, &str); 9] = [ ("NO_COLOR", "1"), @@ -144,7 +143,7 @@ impl UnifiedExecSessionManager { } }; - let transcript = Arc::new(tokio::sync::Mutex::new(CommandTranscript::default())); + let transcript = Arc::new(tokio::sync::Mutex::new(HeadTailBuffer::default())); let event_ctx = ToolEventCtx::new( context.session.as_ref(), context.turn.as_ref(), @@ -158,6 +157,7 @@ impl UnifiedExecSessionManager { Some(request.process_id.clone()), ); emitter.emit(event_ctx, ToolEventStage::Begin).await; + start_streaming_output(&session, context, Arc::clone(&transcript)); let max_tokens = resolve_max_tokens(request.max_output_tokens); @@ -408,7 +408,7 @@ impl UnifiedExecSessionManager { cwd: PathBuf, started_at: Instant, process_id: String, - transcript: Arc>, + transcript: Arc>, ) { let entry = SessionEntry { session: Arc::clone(&session), @@ -550,11 +550,11 @@ impl UnifiedExecSessionManager { let mut collected: Vec = Vec::with_capacity(4096); let mut exit_signal_received = cancellation_token.is_cancelled(); loop { - let drained_chunks; + let drained_chunks: Vec>; let mut wait_for_output = None; { let mut guard = output_buffer.lock().await; - drained_chunks = guard.drain(); + drained_chunks = guard.drain_chunks(); if drained_chunks.is_empty() { wait_for_output = Some(output_notify.notified()); }