Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion codex-rs/app-server-test-client/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -545,7 +545,7 @@ impl CodexClient {
print!("{}", event.delta);
std::io::stdout().flush().ok();
}
EventMsg::TaskComplete(event) => {
EventMsg::TurnComplete(event) => {
println!("\n[task complete: {event:?}]");
break;
}
Expand Down
2 changes: 1 addition & 1 deletion codex-rs/app-server/src/bespoke_event_handling.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ pub(crate) async fn apply_bespoke_event_handling(
msg,
} = event;
match msg {
EventMsg::TaskComplete(_ev) => {
EventMsg::TurnComplete(_ev) => {
handle_turn_complete(
conversation_id,
event_turn_id,
Expand Down
8 changes: 6 additions & 2 deletions codex-rs/app-server/src/codex_message_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3573,7 +3573,11 @@ impl CodexMessageProcessor {
// JSON-serializing the `Event` as-is, but these should
// be migrated to be variants of `ServerNotification`
// instead.
let method = format!("codex/event/{}", event.msg);
let event_formatted = match &event.msg {
EventMsg::TurnStarted(_) => "task_started",
EventMsg::TurnComplete(_) => "task_complete",
_ => &event.msg.to_string(),
};
let mut params = match serde_json::to_value(event.clone()) {
Ok(serde_json::Value::Object(map)) => map,
Ok(_) => {
Expand All @@ -3592,7 +3596,7 @@ impl CodexMessageProcessor {

outgoing_for_task
.send_notification(OutgoingNotification {
method,
method: format!("codex/event/{event_formatted}"),
params: Some(params.into()),
})
.await;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ async fn test_send_user_turn_changes_approval_policy_behavior() -> Result<()> {
)
.await?;

// Wait for first TaskComplete
// Wait for first TurnComplete
let _ = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_notification_message("codex/event/task_complete"),
Expand Down
8 changes: 4 additions & 4 deletions codex-rs/core/src/agent/control.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,10 +116,10 @@ mod tests {
use super::*;
use crate::agent::agent_status_from_event;
use codex_protocol::protocol::ErrorEvent;
use codex_protocol::protocol::TaskCompleteEvent;
use codex_protocol::protocol::TaskStartedEvent;
use codex_protocol::protocol::TurnAbortReason;
use codex_protocol::protocol::TurnAbortedEvent;
use codex_protocol::protocol::TurnCompleteEvent;
use codex_protocol::protocol::TurnStartedEvent;
use pretty_assertions::assert_eq;

#[tokio::test]
Expand All @@ -144,15 +144,15 @@ mod tests {

#[tokio::test]
async fn on_event_updates_status_from_task_started() {
let status = agent_status_from_event(&EventMsg::TaskStarted(TaskStartedEvent {
let status = agent_status_from_event(&EventMsg::TurnStarted(TurnStartedEvent {
model_context_window: None,
}));
assert_eq!(status, Some(AgentStatus::Running));
}

#[tokio::test]
async fn on_event_updates_status_from_task_complete() {
let status = agent_status_from_event(&EventMsg::TaskComplete(TaskCompleteEvent {
let status = agent_status_from_event(&EventMsg::TurnComplete(TurnCompleteEvent {
last_agent_message: Some("done".to_string()),
}));
let expected = AgentStatus::Completed(Some("done".to_string()));
Expand Down
4 changes: 2 additions & 2 deletions codex-rs/core/src/agent/status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ use codex_protocol::protocol::EventMsg;
/// Returns `None` when the event does not affect status tracking.
pub(crate) fn agent_status_from_event(msg: &EventMsg) -> Option<AgentStatus> {
match msg {
EventMsg::TaskStarted(_) => Some(AgentStatus::Running),
EventMsg::TaskComplete(ev) => Some(AgentStatus::Completed(ev.last_agent_message.clone())),
EventMsg::TurnStarted(_) => Some(AgentStatus::Running),
EventMsg::TurnComplete(ev) => Some(AgentStatus::Completed(ev.last_agent_message.clone())),
EventMsg::TurnAborted(ev) => Some(AgentStatus::Errored(format!("{:?}", ev.reason))),
EventMsg::Error(ev) => Some(AgentStatus::Errored(ev.message.clone())),
EventMsg::ShutdownComplete => Some(AgentStatus::Shutdown),
Expand Down
12 changes: 6 additions & 6 deletions codex-rs/core/src/codex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,9 @@ use codex_protocol::protocol::RawResponseItemEvent;
use codex_protocol::protocol::ReviewRequest;
use codex_protocol::protocol::RolloutItem;
use codex_protocol::protocol::SessionSource;
use codex_protocol::protocol::TaskStartedEvent;
use codex_protocol::protocol::TurnAbortReason;
use codex_protocol::protocol::TurnContextItem;
use codex_protocol::protocol::TurnStartedEvent;
use codex_rmcp_client::ElicitationResponse;
use futures::future::BoxFuture;
use futures::prelude::*;
Expand Down Expand Up @@ -2324,9 +2324,9 @@ fn errors_to_info(errors: &[SkillError]) -> Vec<SkillErrorInfo> {
/// - If the model requests a function call, we execute it and send the output
/// back to the model in the next turn.
/// - If the model sends only an assistant message, we record it in the
/// conversation history and consider the task complete.
/// conversation history and consider the turn complete.
///
pub(crate) async fn run_task(
pub(crate) async fn run_turn(
sess: Arc<Session>,
turn_context: Arc<TurnContext>,
input: Vec<UserInput>,
Expand All @@ -2342,7 +2342,7 @@ pub(crate) async fn run_task(
if total_usage_tokens >= auto_compact_limit {
run_auto_compact(&sess, &turn_context).await;
}
let event = EventMsg::TaskStarted(TaskStartedEvent {
let event = EventMsg::TurnStarted(TurnStartedEvent {
model_context_window: turn_context.client.get_model_context_window(),
});
sess.send_event(&turn_context, event).await;
Expand Down Expand Up @@ -2407,7 +2407,7 @@ pub(crate) async fn run_task(
})
.map(|user_message| user_message.message())
.collect::<Vec<String>>();
match run_turn(
match run_model_turn(
Arc::clone(&sess),
Arc::clone(&turn_context),
Arc::clone(&turn_diff_tracker),
Expand Down Expand Up @@ -2484,7 +2484,7 @@ async fn run_auto_compact(sess: &Arc<Session>, turn_context: &Arc<TurnContext>)
cwd = %turn_context.cwd.display()
)
)]
async fn run_turn(
async fn run_model_turn(
sess: Arc<Session>,
turn_context: Arc<TurnContext>,
turn_diff_tracker: SharedTurnDiffTracker,
Expand Down
4 changes: 2 additions & 2 deletions codex-rs/core/src/codex_delegate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ pub(crate) async fn run_codex_thread_one_shot(
while let Ok(event) = io_for_bridge.next_event().await {
let should_shutdown = matches!(
event.msg,
EventMsg::TaskComplete(_) | EventMsg::TurnAborted(_)
EventMsg::TurnComplete(_) | EventMsg::TurnAborted(_)
);
let _ = tx_bridge.send(event).await;
if should_shutdown {
Expand Down Expand Up @@ -253,7 +253,7 @@ async fn shutdown_delegate(codex: &Codex) {
while let Ok(event) = codex.next_event().await {
if matches!(
event.msg,
EventMsg::TurnAborted(_) | EventMsg::TaskComplete(_)
EventMsg::TurnAborted(_) | EventMsg::TurnComplete(_)
) {
break;
}
Expand Down
4 changes: 2 additions & 2 deletions codex-rs/core/src/compact.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ use crate::features::Feature;
use crate::protocol::CompactedItem;
use crate::protocol::ContextCompactedEvent;
use crate::protocol::EventMsg;
use crate::protocol::TaskStartedEvent;
use crate::protocol::TurnContextItem;
use crate::protocol::TurnStartedEvent;
use crate::protocol::WarningEvent;
use crate::truncate::TruncationPolicy;
use crate::truncate::approx_token_count;
Expand Down Expand Up @@ -54,7 +54,7 @@ pub(crate) async fn run_compact_task(
turn_context: Arc<TurnContext>,
input: Vec<UserInput>,
) {
let start_event = EventMsg::TaskStarted(TaskStartedEvent {
let start_event = EventMsg::TurnStarted(TurnStartedEvent {
model_context_window: turn_context.client.get_model_context_window(),
});
sess.send_event(&turn_context, start_event).await;
Expand Down
4 changes: 2 additions & 2 deletions codex-rs/core/src/compact_remote.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use crate::protocol::CompactedItem;
use crate::protocol::ContextCompactedEvent;
use crate::protocol::EventMsg;
use crate::protocol::RolloutItem;
use crate::protocol::TaskStartedEvent;
use crate::protocol::TurnStartedEvent;
use codex_protocol::models::ResponseItem;

pub(crate) async fn run_inline_remote_auto_compact_task(
Expand All @@ -19,7 +19,7 @@ pub(crate) async fn run_inline_remote_auto_compact_task(
}

pub(crate) async fn run_remote_compact_task(sess: Arc<Session>, turn_context: Arc<TurnContext>) {
let start_event = EventMsg::TaskStarted(TaskStartedEvent {
let start_event = EventMsg::TurnStarted(TurnStartedEvent {
model_context_window: turn_context.client.get_model_context_window(),
});
sess.send_event(&turn_context, start_event).await;
Expand Down
4 changes: 2 additions & 2 deletions codex-rs/core/src/rollout/policy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ pub(crate) fn should_persist_event_msg(ev: &EventMsg) -> bool {
| EventMsg::TurnAborted(_) => true,
EventMsg::Error(_)
| EventMsg::Warning(_)
| EventMsg::TaskStarted(_)
| EventMsg::TaskComplete(_)
| EventMsg::TurnStarted(_)
| EventMsg::TurnComplete(_)
| EventMsg::AgentMessageDelta(_)
| EventMsg::AgentReasoningDelta(_)
| EventMsg::AgentReasoningRawContentDelta(_)
Expand Down
4 changes: 2 additions & 2 deletions codex-rs/core/src/tasks/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ use crate::codex::Session;
use crate::codex::TurnContext;
use crate::models_manager::manager::ModelsManager;
use crate::protocol::EventMsg;
use crate::protocol::TaskCompleteEvent;
use crate::protocol::TurnAbortReason;
use crate::protocol::TurnAbortedEvent;
use crate::protocol::TurnCompleteEvent;
use crate::state::ActiveTurn;
use crate::state::RunningTask;
use crate::state::TaskKind;
Expand Down Expand Up @@ -180,7 +180,7 @@ impl Session {
if should_close_processes {
self.close_unified_exec_processes().await;
}
let event = EventMsg::TaskComplete(TaskCompleteEvent { last_agent_message });
let event = EventMsg::TurnComplete(TurnCompleteEvent { last_agent_message });
self.send_event(turn_context.as_ref(), event).await;
}

Expand Down
10 changes: 5 additions & 5 deletions codex-rs/core/src/tasks/regular.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::sync::Arc;

use crate::codex::TurnContext;
use crate::codex::run_task;
use crate::codex::run_turn;
use crate::state::TaskKind;
use async_trait::async_trait;
use codex_protocol::user_input::UserInput;
Expand Down Expand Up @@ -29,10 +29,10 @@ impl SessionTask for RegularTask {
cancellation_token: CancellationToken,
) -> Option<String> {
let sess = session.clone_session();
let run_task_span =
trace_span!(parent: sess.services.otel_manager.current_span(), "run_task");
run_task(sess, ctx, input, cancellation_token)
.instrument(run_task_span)
let run_turn_span =
trace_span!(parent: sess.services.otel_manager.current_span(), "run_turn");
run_turn(sess, ctx, input, cancellation_token)
.instrument(run_turn_span)
.await
}
}
4 changes: 2 additions & 2 deletions codex-rs/core/src/tasks/review.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ async fn process_review_events(
})
| EventMsg::AgentMessageDelta(AgentMessageDeltaEvent { .. })
| EventMsg::AgentMessageContentDelta(AgentMessageContentDeltaEvent { .. }) => {}
EventMsg::TaskComplete(task_complete) => {
EventMsg::TurnComplete(task_complete) => {
// Parse review output from the last agent message (if present).
let out = task_complete
.last_agent_message
Expand All @@ -154,7 +154,7 @@ async fn process_review_events(
}
}
}
// Channel closed without TaskComplete: treat as interrupted.
// Channel closed without TurnComplete: treat as interrupted.
None
}

Expand Down
4 changes: 2 additions & 2 deletions codex-rs/core/src/tasks/user_shell.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use crate::protocol::ExecCommandBeginEvent;
use crate::protocol::ExecCommandEndEvent;
use crate::protocol::ExecCommandSource;
use crate::protocol::SandboxPolicy;
use crate::protocol::TaskStartedEvent;
use crate::protocol::TurnStartedEvent;
use crate::sandboxing::ExecEnv;
use crate::sandboxing::SandboxPermissions;
use crate::state::TaskKind;
Expand Down Expand Up @@ -64,7 +64,7 @@ impl SessionTask for UserShellCommandTask {
.otel_manager
.counter("codex.task.user_shell", 1, &[]);

let event = EventMsg::TaskStarted(TaskStartedEvent {
let event = EventMsg::TurnStarted(TurnStartedEvent {
model_context_window: turn_context.client.get_model_context_window(),
});
let session = session.clone_session();
Expand Down
2 changes: 1 addition & 1 deletion codex-rs/core/tests/common/test_codex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ impl TestCodex {
.await?;

wait_for_event(&self.codex, |event| {
matches!(event, EventMsg::TaskComplete(_))
matches!(event, EventMsg::TurnComplete(_))
})
.await;
Ok(())
Expand Down
2 changes: 1 addition & 1 deletion codex-rs/core/tests/suite/abort_tasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ async fn interrupt_tool_records_history_entries() {
.await
.unwrap();

wait_for_event(&codex, |ev| matches!(ev, EventMsg::TaskComplete(_))).await;
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await;

let requests = response_mock.requests();
assert!(
Expand Down
14 changes: 7 additions & 7 deletions codex-rs/core/tests/suite/apply_patch_cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ async fn apply_patch_cli_move_without_content_change_has_no_turn_diff(
saw_turn_diff = true;
false
}
EventMsg::TaskComplete(_) => true,
EventMsg::TurnComplete(_) => true,
_ => false,
})
.await;
Expand Down Expand Up @@ -917,7 +917,7 @@ async fn apply_patch_shell_command_heredoc_with_cd_emits_turn_diff() -> Result<(
saw_turn_diff = Some(ev.unified_diff.clone());
false
}
EventMsg::TaskComplete(_) => true,
EventMsg::TurnComplete(_) => true,
_ => false,
})
.await;
Expand Down Expand Up @@ -982,7 +982,7 @@ async fn apply_patch_shell_command_failure_propagates_error_and_skips_diff() ->
saw_turn_diff = true;
false
}
EventMsg::TaskComplete(_) => true,
EventMsg::TurnComplete(_) => true,
_ => false,
})
.await;
Expand Down Expand Up @@ -1129,7 +1129,7 @@ async fn apply_patch_emits_turn_diff_event_with_unified_diff(
saw_turn_diff = Some(ev.unified_diff.clone());
false
}
EventMsg::TaskComplete(_) => true,
EventMsg::TurnComplete(_) => true,
_ => false,
})
.await;
Expand Down Expand Up @@ -1189,7 +1189,7 @@ async fn apply_patch_turn_diff_for_rename_with_content_change(
last_diff = Some(ev.unified_diff.clone());
false
}
EventMsg::TaskComplete(_) => true,
EventMsg::TurnComplete(_) => true,
_ => false,
})
.await;
Expand Down Expand Up @@ -1257,7 +1257,7 @@ async fn apply_patch_aggregates_diff_across_multiple_tool_calls() -> Result<()>
last_diff = Some(ev.unified_diff.clone());
false
}
EventMsg::TaskComplete(_) => true,
EventMsg::TurnComplete(_) => true,
_ => false,
})
.await;
Expand Down Expand Up @@ -1325,7 +1325,7 @@ async fn apply_patch_aggregates_diff_preserves_success_after_failure() -> Result
last_diff = Some(ev.unified_diff.clone());
false
}
EventMsg::TaskComplete(_) => true,
EventMsg::TurnComplete(_) => true,
_ => false,
})
.await;
Expand Down
Loading
Loading