From 0623c6a6c8e5f182e5fba44078adc971564f1a68 Mon Sep 17 00:00:00 2001 From: luis-moneda Date: Sat, 25 Apr 2026 07:32:40 -0300 Subject: [PATCH 1/3] Expose Codex skills as ACP commands --- src/thread.rs | 507 ++++++++++++++++++++++++++++++++++++++++---------- 1 file changed, 410 insertions(+), 97 deletions(-) diff --git a/src/thread.rs b/src/thread.rs index 143206a..7813bbb 100644 --- a/src/thread.rs +++ b/src/thread.rs @@ -4,7 +4,6 @@ use std::{ path::{Path, PathBuf}, pin::Pin, sync::{Arc, LazyLock, Mutex}, - time::Duration, }; use agent_client_protocol::{ @@ -59,15 +58,16 @@ use codex_protocol::{ ErrorEvent, Event, EventMsg, ExecApprovalRequestEvent, ExecCommandBeginEvent, ExecCommandEndEvent, ExecCommandOutputDeltaEvent, ExecCommandStatus, ExitedReviewModeEvent, FileChange, GuardianAssessmentEvent, GuardianAssessmentStatus, ImageGenerationBeginEvent, - ImageGenerationEndEvent, ItemCompletedEvent, ItemStartedEvent, McpInvocation, - McpStartupCompleteEvent, McpStartupUpdateEvent, McpToolCallBeginEvent, McpToolCallEndEvent, - ModelRerouteEvent, NetworkApprovalContext, NetworkPolicyRuleAction, Op, - PatchApplyBeginEvent, PatchApplyEndEvent, PatchApplyStatus, PatchApplyUpdatedEvent, + ImageGenerationEndEvent, ItemCompletedEvent, ItemStartedEvent, ListSkillsResponseEvent, + McpInvocation, McpStartupCompleteEvent, McpStartupUpdateEvent, McpToolCallBeginEvent, + McpToolCallEndEvent, ModelRerouteEvent, NetworkApprovalContext, NetworkPolicyRuleAction, + Op, PatchApplyBeginEvent, PatchApplyEndEvent, PatchApplyStatus, PatchApplyUpdatedEvent, ReasoningContentDeltaEvent, ReasoningRawContentDeltaEvent, ReviewDecision, - ReviewOutputEvent, ReviewRequest, ReviewTarget, RolloutItem, StreamErrorEvent, - TerminalInteractionEvent, ThreadGoalStatus, ThreadGoalUpdatedEvent, TokenCountEvent, - TurnAbortedEvent, TurnCompleteEvent, TurnStartedEvent, UserMessageEvent, - ViewImageToolCallEvent, WarningEvent, WebSearchBeginEvent, WebSearchEndEvent, + ReviewOutputEvent, ReviewRequest, ReviewTarget, RolloutItem, SkillMetadata, + SkillsListEntry, StreamErrorEvent, TerminalInteractionEvent, ThreadGoalStatus, + ThreadGoalUpdatedEvent, TokenCountEvent, TurnAbortedEvent, TurnCompleteEvent, + TurnStartedEvent, UserMessageEvent, ViewImageToolCallEvent, WarningEvent, + WebSearchBeginEvent, WebSearchEndEvent, }, request_permissions::{ PermissionGrantScope, RequestPermissionProfile, RequestPermissionsEvent, @@ -208,6 +208,39 @@ fn mode_trusts_project(mode_id: &str) -> bool { matches!(mode_id, "auto" | "full-access") } +fn skill_commands(skills: &[SkillMetadata]) -> Vec { + skills + .iter() + .map(|skill| { + AvailableCommand::new( + format!("skills:{}", skill.name), + skill + .short_description + .clone() + .or_else(|| { + skill + .interface + .as_ref() + .and_then(|interface| interface.short_description.clone()) + }) + .unwrap_or_else(|| skill.description.clone()), + ) + .input(AvailableCommandInput::Unstructured( + UnstructuredCommandInput::new("optional additional instructions"), + )) + }) + .collect() +} + +fn skills_for_cwd(cwd: &Path, entries: &[SkillsListEntry]) -> Vec { + entries + .iter() + .find(|entry| entry.cwd.as_path() == cwd) + .or_else(|| entries.first()) + .map(|entry| entry.skills.clone()) + .unwrap_or_default() +} + /// Trait for abstracting over the `CodexThread` to make testing easier. pub trait CodexThreadImpl: Send + Sync { fn submit(&self, op: Op) @@ -272,6 +305,9 @@ enum ThreadMessage { Load { response_tx: oneshot::Sender>, }, + SkillsLoaded { + skills: Option>, + }, GetConfigOptions { response_tx: oneshot::Sender, Error>>, }, @@ -793,20 +829,25 @@ fn format_thread_goal_update(event: &ThreadGoalUpdatedEvent) -> String { } } +#[expect(clippy::large_enum_variant)] enum SubmissionState { - /// User prompts, including slash commands like /init, /review, /compact. + /// Loading skills for the current workspace. + Skills(SkillsState), + /// User prompts, including slash commands like /init, /review, /compact, /undo. Prompt(PromptState), } impl SubmissionState { fn is_active(&self) -> bool { match self { + Self::Skills(state) => state.is_active(), Self::Prompt(state) => state.is_active(), } } async fn handle_event(&mut self, client: &SessionClient, event: EventMsg) { match self { + Self::Skills(state) => state.handle_event(event), Self::Prompt(state) => state.handle_event(client, event).await, } } @@ -818,6 +859,7 @@ impl SubmissionState { response: Result, ) -> Result<(), Error> { match self { + Self::Skills(..) => Ok(()), Self::Prompt(state) => { state .handle_permission_request_resolved(client, request_key, response) @@ -827,10 +869,8 @@ impl SubmissionState { } fn abort_pending_interactions(&mut self) { - match self { - Self::Prompt(state) => { - state.abort_pending_interactions(); - } + if let Self::Prompt(state) = self { + state.abort_pending_interactions(); } } @@ -843,6 +883,38 @@ impl SubmissionState { } } +struct SkillsState { + response_tx: Option, Error>>>, +} + +impl SkillsState { + fn new(response_tx: oneshot::Sender, Error>>) -> Self { + Self { + response_tx: Some(response_tx), + } + } + + fn is_active(&self) -> bool { + let Some(response_tx) = &self.response_tx else { + return false; + }; + !response_tx.is_closed() + } + + fn handle_event(&mut self, event: EventMsg) { + match event { + EventMsg::ListSkillsResponse(ListSkillsResponseEvent { skills }) => { + if let Some(tx) = self.response_tx.take() { + drop(tx.send(Ok(skills))); + } + } + event => { + warn!("Unexpected event: {event:?}"); + } + } + } +} + struct ActiveCommand { tool_call_id: ToolCallId, terminal_output: bool, @@ -2751,6 +2823,8 @@ struct ThreadActor { resolution_rx: mpsc::UnboundedReceiver, /// Last config options state we emitted to the client, used for deduping updates. last_sent_config_options: Option>, + /// Skills discovered for the current working directory. + skills: Vec, } impl ThreadActor { @@ -2776,6 +2850,7 @@ impl ThreadActor { message_rx, resolution_rx, last_sent_config_options: None, + skills: Vec::new(), } } @@ -2814,15 +2889,13 @@ impl ThreadActor { ThreadMessage::Load { response_tx } => { let result = self.handle_load().await; drop(response_tx.send(result)); - let client = self.client.clone(); - // Have this happen after the session is loaded by putting it - // in a separate task - tokio::spawn(async move { - tokio::time::sleep(Duration::from_millis(200)).await; - client.send_notification(SessionUpdate::AvailableCommandsUpdate( - AvailableCommandsUpdate::new(Self::builtin_commands()), - )); - }); + self.refresh_skills(false).await; + } + ThreadMessage::SkillsLoaded { skills } => { + if let Some(skills) = skills { + self.skills = skills; + } + self.send_available_commands_update(); } ThreadMessage::GetConfigOptions { response_tx } => { let result = self.config_options().await; @@ -2924,6 +2997,77 @@ impl ThreadActor { ] } + fn available_commands(&self) -> Vec { + let mut commands = Self::builtin_commands(); + commands.extend(skill_commands(&self.skills)); + commands + } + + fn send_available_commands_update(&self) { + self.client + .send_notification(SessionUpdate::AvailableCommandsUpdate( + AvailableCommandsUpdate::new(self.available_commands()), + )); + } + + async fn load_skills( + &mut self, + force_reload: bool, + ) -> oneshot::Receiver, Error>> { + let (response_tx, response_rx) = oneshot::channel(); + let submission_id = match self + .thread + .submit(Op::ListSkills { + cwds: Vec::new(), + force_reload, + }) + .await + { + Ok(id) => id, + Err(error) => { + drop(response_tx.send(Err(Error::internal_error().data(error.to_string())))); + return response_rx; + } + }; + + self.submissions.insert( + submission_id, + SubmissionState::Skills(SkillsState::new(response_tx)), + ); + + response_rx + } + + async fn refresh_skills(&mut self, force_reload: bool) { + let load_skills = self.load_skills(force_reload).await; + let resolution_tx = self.resolution_tx.clone(); + let cwd = self.config.cwd.clone(); + + tokio::spawn(async move { + let skills = match load_skills.await { + Ok(Ok(entries)) => Some(skills_for_cwd(cwd.as_path(), &entries)), + Ok(Err(error)) => { + error!("Failed to refresh skills: {error:?}"); + None + } + Err(error) => { + error!("Failed to receive skills response: {error:?}"); + None + } + }; + + drop(resolution_tx.send(ThreadMessage::SkillsLoaded { skills })); + }); + } + + fn resolve_skill_command(&self, name: &str) -> Option { + let skill_name = name.strip_prefix("skills:")?; + self.skills + .iter() + .find(|skill| skill.name == skill_name) + .cloned() + } + fn modes(&self) -> Option { let current_mode_id = current_session_mode_id(&self.config)?; @@ -3250,69 +3394,90 @@ impl ThreadActor { let items = build_prompt_items(request.prompt); let op; if let Some((name, rest)) = extract_slash_command(&items) { - match name { - "compact" => op = Op::Compact, - "init" => { - op = Op::UserInput { - items: vec![UserInput::Text { - text: INIT_COMMAND_PROMPT.into(), - text_elements: vec![], - }], - final_output_json_schema: None, - environments: None, - responsesapi_client_metadata: None, - } + if let Some(skill) = self.resolve_skill_command(name) { + let mut skill_items = vec![UserInput::Skill { + name: skill.name, + path: skill.path.to_path_buf(), + }]; + let instructions = rest.trim(); + if !instructions.is_empty() { + skill_items.push(UserInput::Text { + text: instructions.to_owned(), + text_elements: vec![], + }); } - "review" => { - let instructions = rest.trim(); - let target = if instructions.is_empty() { - ReviewTarget::UncommittedChanges - } else { - ReviewTarget::Custom { - instructions: instructions.to_owned(), + op = Op::UserInput { + items: skill_items, + final_output_json_schema: None, + environments: None, + responsesapi_client_metadata: None, + }; + } else { + match name { + "compact" => op = Op::Compact, + "undo" => op = Op::Undo, + "init" => { + op = Op::UserInput { + items: vec![UserInput::Text { + text: INIT_COMMAND_PROMPT.into(), + text_elements: vec![], + }], + final_output_json_schema: None, + environments: None, + responsesapi_client_metadata: None, } - }; + } + "review" => { + let instructions = rest.trim(); + let target = if instructions.is_empty() { + ReviewTarget::UncommittedChanges + } else { + ReviewTarget::Custom { + instructions: instructions.to_owned(), + } + }; - op = Op::Review { - review_request: ReviewRequest { - user_facing_hint: Some(user_facing_hint(&target)), - target, - }, + op = Op::Review { + review_request: ReviewRequest { + user_facing_hint: Some(user_facing_hint(&target)), + target, + }, + } } - } - "review-branch" if !rest.is_empty() => { - let target = ReviewTarget::BaseBranch { - branch: rest.trim().to_owned(), - }; - op = Op::Review { - review_request: ReviewRequest { - user_facing_hint: Some(user_facing_hint(&target)), - target, - }, + "review-branch" if !rest.is_empty() => { + let target = ReviewTarget::BaseBranch { + branch: rest.trim().to_owned(), + }; + op = Op::Review { + review_request: ReviewRequest { + user_facing_hint: Some(user_facing_hint(&target)), + target, + }, + } } - } - "review-commit" if !rest.is_empty() => { - let target = ReviewTarget::Commit { - sha: rest.trim().to_owned(), - title: None, - }; - op = Op::Review { - review_request: ReviewRequest { - user_facing_hint: Some(user_facing_hint(&target)), - target, - }, + "review-commit" if !rest.is_empty() => { + let target = ReviewTarget::Commit { + sha: rest.trim().to_owned(), + title: None, + }; + op = Op::Review { + review_request: ReviewRequest { + user_facing_hint: Some(user_facing_hint(&target)), + target, + }, + } } - } - "logout" => { - self.auth.logout().await?; - return Err(Error::auth_required()); - } - _ => { - op = Op::UserInput { - items, - final_output_json_schema: None, - environments: None, - responsesapi_client_metadata: None, + "logout" => { + self.auth.logout().await?; + return Err(Error::auth_required()); + } + _ => { + op = Op::UserInput { + items, + final_output_json_schema: None, + environments: None, + responsesapi_client_metadata: None, + } } } } @@ -3801,6 +3966,11 @@ impl ThreadActor { } async fn handle_event(&mut self, Event { id, msg }: Event) { + if matches!(msg, EventMsg::SkillsUpdateAvailable) { + self.refresh_skills(true).await; + return; + } + if let Some(submission) = self.submissions.get_mut(&id) { submission.handle_event(&self.client, msg).await; } else { @@ -4717,33 +4887,142 @@ mod tests { } #[tokio::test] - async fn test_delta_deduplication() -> anyhow::Result<()> { - let (session_id, client, _, message_tx, _handle) = setup().await?; - let (prompt_response_tx, prompt_response_rx) = tokio::sync::oneshot::channel(); + async fn test_load_publishes_skills_as_namespaced_commands() -> anyhow::Result<()> { + let skill = SkillMetadata { + name: "demo".to_string(), + description: "Demo skill".to_string(), + short_description: None, + interface: None, + dependencies: None, + path: PathBuf::from("/tmp/demo/SKILL.md").try_into()?, + scope: codex_protocol::protocol::SkillScope::Repo, + enabled: true, + }; + let (_session_id, client, _thread, message_tx, handle) = + setup_with_skills(vec![skill]).await?; + let (load_response_tx, load_response_rx) = tokio::sync::oneshot::channel(); + message_tx.send(ThreadMessage::Load { + response_tx: load_response_tx, + })?; + + drop(load_response_rx.await??); + tokio::time::sleep(Duration::from_millis(20)).await; + drop(message_tx); + handle.await?; + + let notifications = client.notifications.lock().unwrap(); + assert!(notifications.iter().any(|notification| matches!( + ¬ification.update, + SessionUpdate::AvailableCommandsUpdate(AvailableCommandsUpdate { available_commands, .. }) + if available_commands.iter().any(|command| command.name == "skills:demo") + ))); + + Ok(()) + } + + #[tokio::test] + async fn test_skill_command_creates_skill_user_input() -> anyhow::Result<()> { + let skill = SkillMetadata { + name: "demo".to_string(), + description: "Demo skill".to_string(), + short_description: None, + interface: None, + dependencies: None, + path: PathBuf::from("/tmp/demo/SKILL.md").try_into()?, + scope: codex_protocol::protocol::SkillScope::Repo, + enabled: true, + }; + let (session_id, _client, thread, message_tx, handle) = + setup_with_skills(vec![skill.clone()]).await?; + let (load_response_tx, load_response_rx) = tokio::sync::oneshot::channel(); + message_tx.send(ThreadMessage::Load { + response_tx: load_response_tx, + })?; + drop(load_response_rx.await??); + tokio::time::sleep(Duration::from_millis(20)).await; + + let (prompt_response_tx, prompt_response_rx) = tokio::sync::oneshot::channel(); message_tx.send(ThreadMessage::Prompt { - request: PromptRequest::new(session_id.clone(), vec!["test delta".into()]), + request: PromptRequest::new( + session_id.clone(), + vec!["/skills:demo use it well".into()], + ), response_tx: prompt_response_tx, })?; let stop_reason = prompt_response_rx.await??.await??; assert_eq!(stop_reason, StopReason::EndTurn); drop(message_tx); + handle.await?; + + let ops = thread.ops.lock().unwrap(); + let skill_input = ops + .iter() + .find_map(|op| match op { + Op::UserInput { items, .. } => Some(items), + _ => None, + }) + .expect("expected user input op"); + + assert!(skill_input.iter().any(|item| matches!( + item, + UserInput::Skill { name, path } if name == "demo" && path == &skill.path.to_path_buf() + ))); + assert!(skill_input.iter().any(|item| matches!( + item, + UserInput::Text { text, .. } if text == "use it well" + ))); + + Ok(()) + } + + #[tokio::test] + async fn test_skills_update_available_refreshes_commands() -> anyhow::Result<()> { + let initial_skill = SkillMetadata { + name: "demo".to_string(), + description: "Demo skill".to_string(), + short_description: None, + interface: None, + dependencies: None, + path: PathBuf::from("/tmp/demo/SKILL.md").try_into()?, + scope: codex_protocol::protocol::SkillScope::Repo, + enabled: true, + }; + let refreshed_skill = SkillMetadata { + name: "second".to_string(), + description: "Second skill".to_string(), + short_description: None, + interface: None, + dependencies: None, + path: PathBuf::from("/tmp/second/SKILL.md").try_into()?, + scope: codex_protocol::protocol::SkillScope::Repo, + enabled: true, + }; + let (_session_id, client, thread, message_tx, handle) = + setup_with_skills(vec![initial_skill]).await?; + let (load_response_tx, load_response_rx) = tokio::sync::oneshot::channel(); + + message_tx.send(ThreadMessage::Load { + response_tx: load_response_tx, + })?; + + drop(load_response_rx.await??); + thread.skills_entries.lock().unwrap()[0].skills = vec![refreshed_skill]; + thread.op_tx.send(Event { + id: "skill-update".to_string(), + msg: EventMsg::SkillsUpdateAvailable, + })?; + tokio::time::sleep(Duration::from_millis(20)).await; + drop(message_tx); + handle.await?; - // We should only get ONE notification, not duplicates from both delta and non-delta let notifications = client.notifications.lock().unwrap(); - assert_eq!( - notifications.len(), - 1, - "Should only receive delta event, not duplicate non-delta. Got: {notifications:?}" - ); - assert!(matches!( - ¬ifications[0].update, - SessionUpdate::AgentMessageChunk(ContentChunk { - content: ContentBlock::Text(TextContent { text, .. }), - .. - }) if text == "test delta" - )); + assert!(notifications.iter().any(|notification| matches!( + ¬ification.update, + SessionUpdate::AvailableCommandsUpdate(AvailableCommandsUpdate { available_commands, .. }) + if available_commands.iter().any(|command| command.name == "skills:second") + ))); Ok(()) } @@ -4754,6 +5033,18 @@ mod tests { Arc, UnboundedSender, tokio::task::JoinHandle<()>, + )> { + setup_with_skills(Vec::new()).await + } + + async fn setup_with_skills( + skills: Vec, + ) -> anyhow::Result<( + SessionId, + Arc, + Arc, + UnboundedSender, + tokio::task::JoinHandle<()>, )> { let session_id = SessionId::new("test"); let client = Arc::new(StubClient::new()); @@ -4766,6 +5057,15 @@ mod tests { ConfigOverrides::default(), ) .await?; + conversation + .skills_entries + .lock() + .unwrap() + .push(SkillsListEntry { + cwd: config.cwd.clone().to_path_buf(), + skills, + errors: Vec::new(), + }); let (message_tx, message_rx) = tokio::sync::mpsc::unbounded_channel(); let (resolution_tx, resolution_rx) = tokio::sync::mpsc::unbounded_channel(); @@ -4811,6 +5111,7 @@ mod tests { current_id: AtomicUsize, active_prompt_id: std::sync::Mutex>, ops: std::sync::Mutex>, + skills_entries: std::sync::Mutex>, op_tx: mpsc::UnboundedSender, op_rx: Mutex>, } @@ -4822,6 +5123,7 @@ mod tests { current_id: AtomicUsize::new(0), active_prompt_id: std::sync::Mutex::default(), ops: std::sync::Mutex::default(), + skills_entries: std::sync::Mutex::default(), op_tx, op_rx: Mutex::new(op_rx), } @@ -4847,6 +5149,7 @@ mod tests { .into_iter() .map(|i| match i { UserInput::Text { text, .. } => text, + UserInput::Skill { name, .. } => format!("${name}"), _ => unimplemented!(), }) .join("\n"); @@ -5129,6 +5432,16 @@ mod tests { }) .unwrap(); } + Op::ListSkills { .. } => { + self.op_tx + .send(Event { + id: id.to_string(), + msg: EventMsg::ListSkillsResponse(ListSkillsResponseEvent { + skills: self.skills_entries.lock().unwrap().clone(), + }), + }) + .unwrap(); + } Op::ExecApproval { .. } | Op::ResolveElicitation { .. } | Op::RequestPermissionsResponse { .. } From 841344091ef28e20b51af597974f1eba699f66ee Mon Sep 17 00:00:00 2001 From: luis-moneda Date: Sat, 25 Apr 2026 14:42:26 -0300 Subject: [PATCH 2/3] Match Codex skill availability semantics --- src/thread.rs | 95 +++++++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 93 insertions(+), 2 deletions(-) diff --git a/src/thread.rs b/src/thread.rs index 7813bbb..a253d10 100644 --- a/src/thread.rs +++ b/src/thread.rs @@ -211,6 +211,7 @@ fn mode_trusts_project(mode_id: &str) -> bool { fn skill_commands(skills: &[SkillMetadata]) -> Vec { skills .iter() + .filter(|skill| skill.enabled) .map(|skill| { AvailableCommand::new( format!("skills:{}", skill.name), @@ -236,7 +237,6 @@ fn skills_for_cwd(cwd: &Path, entries: &[SkillsListEntry]) -> Vec entries .iter() .find(|entry| entry.cwd.as_path() == cwd) - .or_else(|| entries.first()) .map(|entry| entry.skills.clone()) .unwrap_or_default() } @@ -2889,7 +2889,7 @@ impl ThreadActor { ThreadMessage::Load { response_tx } => { let result = self.handle_load().await; drop(response_tx.send(result)); - self.refresh_skills(false).await; + self.refresh_skills(true).await; } ThreadMessage::SkillsLoaded { skills } => { if let Some(skills) = skills { @@ -4921,6 +4921,97 @@ mod tests { Ok(()) } + #[tokio::test] + async fn test_load_does_not_publish_disabled_skills() -> anyhow::Result<()> { + let skill = SkillMetadata { + name: "disabled-demo".to_string(), + description: "Disabled demo skill".to_string(), + short_description: None, + interface: None, + dependencies: None, + path: PathBuf::from("/tmp/disabled-demo/SKILL.md").try_into()?, + scope: codex_protocol::protocol::SkillScope::Repo, + enabled: false, + }; + let (_session_id, client, _thread, message_tx, handle) = + setup_with_skills(vec![skill]).await?; + let (load_response_tx, load_response_rx) = tokio::sync::oneshot::channel(); + + message_tx.send(ThreadMessage::Load { + response_tx: load_response_tx, + })?; + + drop(load_response_rx.await??); + tokio::time::sleep(Duration::from_millis(20)).await; + drop(message_tx); + handle.await?; + + let notifications = client.notifications.lock().unwrap(); + assert!( + notifications + .iter() + .all(|notification| match ¬ification.update { + SessionUpdate::AvailableCommandsUpdate(AvailableCommandsUpdate { + available_commands, + .. + }) => { + !available_commands + .iter() + .any(|command| command.name == "skills:disabled-demo") + } + _ => true, + }) + ); + + Ok(()) + } + + #[tokio::test] + async fn test_load_does_not_publish_skills_for_other_cwd() -> anyhow::Result<()> { + let skill = SkillMetadata { + name: "other-cwd".to_string(), + description: "Other cwd skill".to_string(), + short_description: None, + interface: None, + dependencies: None, + path: PathBuf::from("/tmp/other-cwd/SKILL.md").try_into()?, + scope: codex_protocol::protocol::SkillScope::Repo, + enabled: true, + }; + let (_session_id, client, thread, message_tx, handle) = + setup_with_skills(vec![skill]).await?; + thread.skills_entries.lock().unwrap()[0].cwd = PathBuf::from("/tmp/not-the-session-cwd"); + let (load_response_tx, load_response_rx) = tokio::sync::oneshot::channel(); + + message_tx.send(ThreadMessage::Load { + response_tx: load_response_tx, + })?; + + drop(load_response_rx.await??); + tokio::time::sleep(Duration::from_millis(20)).await; + drop(message_tx); + handle.await?; + + let notifications = client.notifications.lock().unwrap(); + assert!( + notifications + .iter() + .all(|notification| match ¬ification.update { + SessionUpdate::AvailableCommandsUpdate(AvailableCommandsUpdate { + available_commands, + .. + }) => { + !available_commands + .iter() + .any(|command| command.name == "skills:other-cwd") + } + _ => true, + }) + ); + + Ok(()) + } + #[tokio::test] async fn test_skill_command_creates_skill_user_input() -> anyhow::Result<()> { let skill = SkillMetadata { From 11721ae920d295b57bdc39be39c214180932524c Mon Sep 17 00:00:00 2001 From: luis-moneda Date: Sat, 9 May 2026 07:04:29 -0300 Subject: [PATCH 3/3] Update skills refresh for Codex 0.129 --- Cargo.lock | 1 + Cargo.toml | 1 + src/codex_agent.rs | 10 +- src/thread.rs | 490 +++++++++++++++++++++++++-------------------- 4 files changed, 282 insertions(+), 220 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 5f5ee7b..3bbbfb3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1528,6 +1528,7 @@ dependencies = [ "codex-arg0", "codex-config", "codex-core", + "codex-core-plugins", "codex-exec-server", "codex-login", "codex-mcp-server", diff --git a/Cargo.toml b/Cargo.toml index e68ef0c..bd1f1dc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -25,6 +25,7 @@ codex-apply-patch = { git = "https://github.com/openai/codex", tag = "rust-v0.12 codex-arg0 = { git = "https://github.com/openai/codex", tag = "rust-v0.129.0" } codex-config = { git = "https://github.com/openai/codex", tag = "rust-v0.129.0" } codex-core = { git = "https://github.com/openai/codex", tag = "rust-v0.129.0" } +codex-core-plugins = { git = "https://github.com/openai/codex", tag = "rust-v0.129.0" } codex-exec-server = { git = "https://github.com/openai/codex", tag = "rust-v0.129.0" } codex-mcp-server = { git = "https://github.com/openai/codex", tag = "rust-v0.129.0" } codex-models-manager = { git = "https://github.com/openai/codex", tag = "rust-v0.129.0" } diff --git a/src/codex_agent.rs b/src/codex_agent.rs index bae7687..436ca5d 100644 --- a/src/codex_agent.rs +++ b/src/codex_agent.rs @@ -35,7 +35,7 @@ use std::{ use tracing::{debug, info}; use unicode_segmentation::UnicodeSegmentation; -use crate::thread::Thread; +use crate::thread::{CodexSkillsProvider, Thread}; /// The Codex implementation of the ACP Agent. /// @@ -571,6 +571,10 @@ impl CodexAgent { thread, self.auth_manager.clone(), Arc::new(self.thread_manager.get_models_manager()), + Arc::new(CodexSkillsProvider::new( + self.thread_manager.skills_manager(), + self.thread_manager.plugins_manager(), + )), self.client_capabilities.clone(), config.clone(), cx, @@ -645,6 +649,10 @@ impl CodexAgent { thread, self.auth_manager.clone(), Arc::new(self.thread_manager.get_models_manager()), + Arc::new(CodexSkillsProvider::new( + self.thread_manager.skills_manager(), + self.thread_manager.plugins_manager(), + )), self.client_capabilities.clone(), config.clone(), cx, diff --git a/src/thread.rs b/src/thread.rs index a253d10..5ef755f 100644 --- a/src/thread.rs +++ b/src/thread.rs @@ -29,7 +29,9 @@ use codex_core::{ config::{Config, set_project_trust_level}, review_format::format_review_findings_block, review_prompts::user_facing_hint, + skills::{SkillLoadOutcome, SkillMetadata, SkillsLoadInput, SkillsManager}, }; +use codex_core_plugins::PluginsManager; use codex_login::auth::AuthManager; use codex_models_manager::manager::{ModelsManager, RefreshStrategy}; use codex_protocol::{ @@ -58,13 +60,13 @@ use codex_protocol::{ ErrorEvent, Event, EventMsg, ExecApprovalRequestEvent, ExecCommandBeginEvent, ExecCommandEndEvent, ExecCommandOutputDeltaEvent, ExecCommandStatus, ExitedReviewModeEvent, FileChange, GuardianAssessmentEvent, GuardianAssessmentStatus, ImageGenerationBeginEvent, - ImageGenerationEndEvent, ItemCompletedEvent, ItemStartedEvent, ListSkillsResponseEvent, - McpInvocation, McpStartupCompleteEvent, McpStartupUpdateEvent, McpToolCallBeginEvent, - McpToolCallEndEvent, ModelRerouteEvent, NetworkApprovalContext, NetworkPolicyRuleAction, - Op, PatchApplyBeginEvent, PatchApplyEndEvent, PatchApplyStatus, PatchApplyUpdatedEvent, + ImageGenerationEndEvent, ItemCompletedEvent, ItemStartedEvent, McpInvocation, + McpStartupCompleteEvent, McpStartupUpdateEvent, McpToolCallBeginEvent, McpToolCallEndEvent, + ModelRerouteEvent, NetworkApprovalContext, NetworkPolicyRuleAction, Op, + PatchApplyBeginEvent, PatchApplyEndEvent, PatchApplyStatus, PatchApplyUpdatedEvent, ReasoningContentDeltaEvent, ReasoningRawContentDeltaEvent, ReviewDecision, - ReviewOutputEvent, ReviewRequest, ReviewTarget, RolloutItem, SkillMetadata, - SkillsListEntry, StreamErrorEvent, TerminalInteractionEvent, ThreadGoalStatus, + ReviewOutputEvent, ReviewRequest, ReviewTarget, RolloutItem, StreamErrorEvent, + TerminalInteractionEvent, ThreadGoalStatus, ThreadGoalUpdatedEvent, TokenCountEvent, TurnAbortedEvent, TurnCompleteEvent, TurnStartedEvent, UserMessageEvent, ViewImageToolCallEvent, WarningEvent, WebSearchBeginEvent, WebSearchEndEvent, @@ -208,23 +210,108 @@ fn mode_trusts_project(mode_id: &str) -> bool { matches!(mode_id, "auto" | "full-access") } -fn skill_commands(skills: &[SkillMetadata]) -> Vec { +#[derive(Clone, Debug)] +pub struct LoadedSkill { + metadata: SkillMetadata, + enabled: bool, +} + +impl LoadedSkill { + fn new(metadata: SkillMetadata, enabled: bool) -> Self { + Self { metadata, enabled } + } +} + +pub trait SkillsProvider: Send + Sync { + fn load_skills( + &self, + config: Config, + force_reload: bool, + ) -> Pin> + Send + '_>>; +} + +pub struct CodexSkillsProvider { + skills_manager: Arc, + plugins_manager: Arc, +} + +impl CodexSkillsProvider { + pub fn new(skills_manager: Arc, plugins_manager: Arc) -> Self { + Self { + skills_manager, + plugins_manager, + } + } + + async fn load_skills_for_config( + skills_manager: Arc, + plugins_manager: Arc, + config: Config, + force_reload: bool, + ) -> Vec { + if force_reload { + skills_manager.clear_cache(); + } + + let plugins_input = config.plugins_config_input(); + let effective_skill_roots = plugins_manager + .effective_skill_roots_for_layer_stack(&config.config_layer_stack, &plugins_input) + .await; + let input = SkillsLoadInput::new( + config.cwd.clone(), + effective_skill_roots, + config.config_layer_stack.clone(), + config.bundled_skills_enabled(), + ); + let outcome = skills_manager.skills_for_config(&input, None).await; + loaded_skills_from_outcome(outcome) + } +} + +impl SkillsProvider for CodexSkillsProvider { + fn load_skills( + &self, + config: Config, + force_reload: bool, + ) -> Pin> + Send + '_>> { + let skills_manager = self.skills_manager.clone(); + let plugins_manager = self.plugins_manager.clone(); + + Box::pin(Self::load_skills_for_config( + skills_manager, + plugins_manager, + config, + force_reload, + )) + } +} + +fn loaded_skills_from_outcome(outcome: SkillLoadOutcome) -> Vec { + outcome + .skills + .iter() + .map(|skill| LoadedSkill::new(skill.clone(), outcome.is_skill_enabled(skill))) + .collect() +} + +fn skill_commands(skills: &[LoadedSkill]) -> Vec { skills .iter() .filter(|skill| skill.enabled) .map(|skill| { + let metadata = &skill.metadata; AvailableCommand::new( - format!("skills:{}", skill.name), - skill + format!("skills:{}", metadata.name), + metadata .short_description .clone() .or_else(|| { - skill + metadata .interface .as_ref() .and_then(|interface| interface.short_description.clone()) }) - .unwrap_or_else(|| skill.description.clone()), + .unwrap_or_else(|| metadata.description.clone()), ) .input(AvailableCommandInput::Unstructured( UnstructuredCommandInput::new("optional additional instructions"), @@ -233,14 +320,6 @@ fn skill_commands(skills: &[SkillMetadata]) -> Vec { .collect() } -fn skills_for_cwd(cwd: &Path, entries: &[SkillsListEntry]) -> Vec { - entries - .iter() - .find(|entry| entry.cwd.as_path() == cwd) - .map(|entry| entry.skills.clone()) - .unwrap_or_default() -} - /// Trait for abstracting over the `CodexThread` to make testing easier. pub trait CodexThreadImpl: Send + Sync { fn submit(&self, op: Op) @@ -306,7 +385,7 @@ enum ThreadMessage { response_tx: oneshot::Sender>, }, SkillsLoaded { - skills: Option>, + skills: Vec, }, GetConfigOptions { response_tx: oneshot::Sender, Error>>, @@ -355,11 +434,13 @@ pub struct Thread { } impl Thread { + #[expect(clippy::too_many_arguments)] pub fn new( session_id: SessionId, thread: Arc, auth: Arc, models_manager: Arc, + skills_provider: Arc, client_capabilities: Arc>, config: Config, cx: ConnectionTo, @@ -372,6 +453,7 @@ impl Thread { SessionClient::new(session_id, cx, client_capabilities), thread.clone(), models_manager, + skills_provider, config, message_rx, resolution_tx, @@ -829,10 +911,7 @@ fn format_thread_goal_update(event: &ThreadGoalUpdatedEvent) -> String { } } -#[expect(clippy::large_enum_variant)] enum SubmissionState { - /// Loading skills for the current workspace. - Skills(SkillsState), /// User prompts, including slash commands like /init, /review, /compact, /undo. Prompt(PromptState), } @@ -840,14 +919,12 @@ enum SubmissionState { impl SubmissionState { fn is_active(&self) -> bool { match self { - Self::Skills(state) => state.is_active(), Self::Prompt(state) => state.is_active(), } } async fn handle_event(&mut self, client: &SessionClient, event: EventMsg) { match self { - Self::Skills(state) => state.handle_event(event), Self::Prompt(state) => state.handle_event(client, event).await, } } @@ -859,7 +936,6 @@ impl SubmissionState { response: Result, ) -> Result<(), Error> { match self { - Self::Skills(..) => Ok(()), Self::Prompt(state) => { state .handle_permission_request_resolved(client, request_key, response) @@ -869,48 +945,18 @@ impl SubmissionState { } fn abort_pending_interactions(&mut self) { - if let Self::Prompt(state) = self { - state.abort_pending_interactions(); + match self { + Self::Prompt(state) => state.abort_pending_interactions(), } } fn fail(&mut self, err: Error) { - if let Self::Prompt(state) = self - && let Some(response_tx) = state.response_tx.take() - { - drop(response_tx.send(Err(err))); - } - } -} - -struct SkillsState { - response_tx: Option, Error>>>, -} - -impl SkillsState { - fn new(response_tx: oneshot::Sender, Error>>) -> Self { - Self { - response_tx: Some(response_tx), - } - } - - fn is_active(&self) -> bool { - let Some(response_tx) = &self.response_tx else { - return false; - }; - !response_tx.is_closed() - } - - fn handle_event(&mut self, event: EventMsg) { - match event { - EventMsg::ListSkillsResponse(ListSkillsResponseEvent { skills }) => { - if let Some(tx) = self.response_tx.take() { - drop(tx.send(Ok(skills))); + match self { + Self::Prompt(state) => { + if let Some(response_tx) = state.response_tx.take() { + drop(response_tx.send(Err(err))); } } - event => { - warn!("Unexpected event: {event:?}"); - } } } } @@ -1193,7 +1239,7 @@ impl PromptState { ))); } } - EventMsg::ItemStarted(ItemStartedEvent { thread_id, turn_id, item , started_at_ms: _}) => { + EventMsg::ItemStarted(ItemStartedEvent { thread_id, turn_id, item, .. }) => { info!("Item started with thread_id: {thread_id}, turn_id: {turn_id}, item: {item:?}"); } EventMsg::UserMessage(UserMessageEvent { @@ -1327,7 +1373,7 @@ impl PromptState { ); self.terminal_interaction(client, event); } - EventMsg::DynamicToolCallRequest(DynamicToolCallRequest { call_id, turn_id, namespace, tool, arguments, started_at_ms: _ }) => { + EventMsg::DynamicToolCallRequest(DynamicToolCallRequest { call_id, turn_id, namespace, tool, arguments, .. }) => { info!("Dynamic tool call request: call_id={call_id}, turn_id={turn_id}, namespace={namespace:?}, tool={tool}"); self.start_dynamic_tool_call(client, call_id, tool, arguments); } @@ -1399,7 +1445,7 @@ impl PromptState { thread_id, turn_id, item, - completed_at_ms: _, + .. }) => { info!("Item completed: thread_id={}, turn_id={}, item={:?}", thread_id, turn_id, item); } @@ -1413,6 +1459,13 @@ impl PromptState { response_tx.send(Ok(StopReason::EndTurn)).ok(); } } + EventMsg::ThreadRolledBack(event) => { + if event.num_turns == 1 { + client.send_agent_text("Undo completed."); + } else { + client.send_agent_text(format!("Rolled back {} turns.", event.num_turns)); + } + } EventMsg::StreamError(StreamErrorEvent { message, codex_error_info, @@ -1529,13 +1582,12 @@ impl PromptState { // Ignore these events EventMsg::AgentReasoningRawContent(..) - | EventMsg::ThreadRolledBack(..) | EventMsg::HookStarted(..) | EventMsg::HookCompleted(..) // we already have a way to diff the turn, so ignore | EventMsg::TurnDiff(..) + // Revisit when we can emit status updates | EventMsg::SkillsUpdateAvailable - // Old events | EventMsg::RawResponseItem(..) | EventMsg::SessionConfigured(..) // TODO: Subagent UI? @@ -1823,6 +1875,7 @@ impl PromptState { success, error, duration: _, + .. } = event; client.send_tool_call_update(ToolCallUpdate::new( @@ -2028,6 +2081,7 @@ impl PromptState { cwd, parsed_cmd, process_id: _, + .. } = event; // Create a new tool call for the command execution let tool_call_id = ToolCallId::new(call_id.clone()); @@ -2128,6 +2182,7 @@ impl PromptState { process_id: _, completed_at_ms: _, status, + .. } = event; if let Some(active_command) = self.active_commands.remove(&call_id) { let is_success = exit_code == 0; @@ -2813,6 +2868,8 @@ struct ThreadActor { config: Config, /// The models available for this thread. models_manager: Arc, + /// Loads skills using Codex's shared skills and plugins managers. + skills_provider: Arc, /// Internal message sender used to route spawned interaction results back to the actor. resolution_tx: mpsc::UnboundedSender, /// A sender for each interested `Op` submission that needs events routed. @@ -2824,7 +2881,7 @@ struct ThreadActor { /// Last config options state we emitted to the client, used for deduping updates. last_sent_config_options: Option>, /// Skills discovered for the current working directory. - skills: Vec, + skills: Vec, } impl ThreadActor { @@ -2834,6 +2891,7 @@ impl ThreadActor { client: SessionClient, thread: Arc, models_manager: Arc, + skills_provider: Arc, config: Config, message_rx: mpsc::UnboundedReceiver, resolution_tx: mpsc::UnboundedSender, @@ -2845,6 +2903,7 @@ impl ThreadActor { thread, config, models_manager, + skills_provider, resolution_tx, submissions: HashMap::new(), message_rx, @@ -2892,9 +2951,7 @@ impl ThreadActor { self.refresh_skills(true).await; } ThreadMessage::SkillsLoaded { skills } => { - if let Some(skills) = skills { - self.skills = skills; - } + self.skills = skills; self.send_available_commands_update(); } ThreadMessage::GetConfigOptions { response_tx } => { @@ -3010,52 +3067,13 @@ impl ThreadActor { )); } - async fn load_skills( - &mut self, - force_reload: bool, - ) -> oneshot::Receiver, Error>> { - let (response_tx, response_rx) = oneshot::channel(); - let submission_id = match self - .thread - .submit(Op::ListSkills { - cwds: Vec::new(), - force_reload, - }) - .await - { - Ok(id) => id, - Err(error) => { - drop(response_tx.send(Err(Error::internal_error().data(error.to_string())))); - return response_rx; - } - }; - - self.submissions.insert( - submission_id, - SubmissionState::Skills(SkillsState::new(response_tx)), - ); - - response_rx - } - async fn refresh_skills(&mut self, force_reload: bool) { - let load_skills = self.load_skills(force_reload).await; let resolution_tx = self.resolution_tx.clone(); - let cwd = self.config.cwd.clone(); + let config = self.config.clone(); + let skills_provider = self.skills_provider.clone(); tokio::spawn(async move { - let skills = match load_skills.await { - Ok(Ok(entries)) => Some(skills_for_cwd(cwd.as_path(), &entries)), - Ok(Err(error)) => { - error!("Failed to refresh skills: {error:?}"); - None - } - Err(error) => { - error!("Failed to receive skills response: {error:?}"); - None - } - }; - + let skills = skills_provider.load_skills(config, force_reload).await; drop(resolution_tx.send(ThreadMessage::SkillsLoaded { skills })); }); } @@ -3064,8 +3082,8 @@ impl ThreadActor { let skill_name = name.strip_prefix("skills:")?; self.skills .iter() - .find(|skill| skill.name == skill_name) - .cloned() + .find(|skill| skill.enabled && skill.metadata.name == skill_name) + .map(|skill| skill.metadata.clone()) } fn modes(&self) -> Option { @@ -3397,7 +3415,7 @@ impl ThreadActor { if let Some(skill) = self.resolve_skill_command(name) { let mut skill_items = vec![UserInput::Skill { name: skill.name, - path: skill.path.to_path_buf(), + path: skill.path_to_skills_md.to_path_buf(), }]; let instructions = rest.trim(); if !instructions.is_empty() { @@ -3415,7 +3433,7 @@ impl ThreadActor { } else { match name { "compact" => op = Op::Compact, - "undo" => op = Op::Undo, + "undo" => op = Op::ThreadRollback { num_turns: 1 }, "init" => { op = Op::UserInput { items: vec![UserInput::Text { @@ -4404,7 +4422,7 @@ mod tests { #[tokio::test] async fn test_prompt() -> anyhow::Result<()> { - let (session_id, client, _, message_tx, _handle) = setup().await?; + let (session_id, client, _, _, message_tx, _handle) = setup().await?; let (prompt_response_tx, prompt_response_rx) = tokio::sync::oneshot::channel(); message_tx.send(ThreadMessage::Prompt { @@ -4431,7 +4449,7 @@ mod tests { #[tokio::test] async fn test_thread_goal_updated_is_sent_as_agent_message() -> anyhow::Result<()> { - let (session_id, client, _, message_tx, _handle) = setup().await?; + let (session_id, client, _, _, message_tx, _handle) = setup().await?; let (prompt_response_tx, prompt_response_rx) = tokio::sync::oneshot::channel(); message_tx.send(ThreadMessage::Prompt { @@ -4459,7 +4477,7 @@ mod tests { #[tokio::test] async fn test_image_generation_emits_image_content() -> anyhow::Result<()> { - let (session_id, client, _, message_tx, _handle) = setup().await?; + let (session_id, client, _, _, message_tx, _handle) = setup().await?; let (prompt_response_tx, prompt_response_rx) = tokio::sync::oneshot::channel(); let expected_uri = image_generation_test_saved_path() .to_string_lossy() @@ -4536,7 +4554,7 @@ mod tests { #[tokio::test] async fn test_compact() -> anyhow::Result<()> { - let (session_id, client, thread, message_tx, _handle) = setup().await?; + let (session_id, client, thread, _, message_tx, _handle) = setup().await?; let (prompt_response_tx, prompt_response_rx) = tokio::sync::oneshot::channel(); message_tx.send(ThreadMessage::Prompt { @@ -4657,9 +4675,43 @@ mod tests { assert!(mode_trusts_project("full-access")); } + #[tokio::test] + async fn test_undo() -> anyhow::Result<()> { + let (session_id, client, thread, _, message_tx, _handle) = setup().await?; + let (prompt_response_tx, prompt_response_rx) = tokio::sync::oneshot::channel(); + + message_tx.send(ThreadMessage::Prompt { + request: PromptRequest::new(session_id.clone(), vec!["/undo".into()]), + response_tx: prompt_response_tx, + })?; + + let stop_reason = prompt_response_rx.await??.await??; + assert_eq!(stop_reason, StopReason::EndTurn); + drop(message_tx); + + let notifications = client.notifications.lock().unwrap(); + assert_eq!( + notifications.len(), + 1, + "notifications don't match {notifications:?}" + ); + assert!(matches!( + ¬ifications[0].update, + SessionUpdate::AgentMessageChunk(ContentChunk { + content: ContentBlock::Text(TextContent { text, .. }), + .. + }) if text == "Undo completed." + )); + + let ops = thread.ops.lock().unwrap(); + assert_eq!(ops.as_slice(), &[Op::ThreadRollback { num_turns: 1 }]); + + Ok(()) + } + #[tokio::test] async fn test_init() -> anyhow::Result<()> { - let (session_id, client, thread, message_tx, _handle) = setup().await?; + let (session_id, client, thread, _, message_tx, _handle) = setup().await?; let (prompt_response_tx, prompt_response_rx) = tokio::sync::oneshot::channel(); message_tx.send(ThreadMessage::Prompt { @@ -4702,7 +4754,7 @@ mod tests { #[tokio::test] async fn test_review() -> anyhow::Result<()> { - let (session_id, client, thread, message_tx, _handle) = setup().await?; + let (session_id, client, thread, _, message_tx, _handle) = setup().await?; let (prompt_response_tx, prompt_response_rx) = tokio::sync::oneshot::channel(); message_tx.send(ThreadMessage::Prompt { @@ -4744,7 +4796,7 @@ mod tests { #[tokio::test] async fn test_custom_review() -> anyhow::Result<()> { - let (session_id, client, thread, message_tx, _handle) = setup().await?; + let (session_id, client, thread, _, message_tx, _handle) = setup().await?; let (prompt_response_tx, prompt_response_rx) = tokio::sync::oneshot::channel(); let instructions = "Review what we did in agents.md"; @@ -4794,7 +4846,7 @@ mod tests { #[tokio::test] async fn test_commit_review() -> anyhow::Result<()> { - let (session_id, client, thread, message_tx, _handle) = setup().await?; + let (session_id, client, thread, _, message_tx, _handle) = setup().await?; let (prompt_response_tx, prompt_response_rx) = tokio::sync::oneshot::channel(); message_tx.send(ThreadMessage::Prompt { @@ -4842,7 +4894,7 @@ mod tests { #[tokio::test] async fn test_branch_review() -> anyhow::Result<()> { - let (session_id, client, thread, message_tx, _handle) = setup().await?; + let (session_id, client, thread, _, message_tx, _handle) = setup().await?; let (prompt_response_tx, prompt_response_rx) = tokio::sync::oneshot::channel(); message_tx.send(ThreadMessage::Prompt { @@ -4888,17 +4940,8 @@ mod tests { #[tokio::test] async fn test_load_publishes_skills_as_namespaced_commands() -> anyhow::Result<()> { - let skill = SkillMetadata { - name: "demo".to_string(), - description: "Demo skill".to_string(), - short_description: None, - interface: None, - dependencies: None, - path: PathBuf::from("/tmp/demo/SKILL.md").try_into()?, - scope: codex_protocol::protocol::SkillScope::Repo, - enabled: true, - }; - let (_session_id, client, _thread, message_tx, handle) = + let skill = test_skill("demo", "Demo skill", true)?; + let (_session_id, client, _thread, _, message_tx, handle) = setup_with_skills(vec![skill]).await?; let (load_response_tx, load_response_rx) = tokio::sync::oneshot::channel(); @@ -4923,17 +4966,8 @@ mod tests { #[tokio::test] async fn test_load_does_not_publish_disabled_skills() -> anyhow::Result<()> { - let skill = SkillMetadata { - name: "disabled-demo".to_string(), - description: "Disabled demo skill".to_string(), - short_description: None, - interface: None, - dependencies: None, - path: PathBuf::from("/tmp/disabled-demo/SKILL.md").try_into()?, - scope: codex_protocol::protocol::SkillScope::Repo, - enabled: false, - }; - let (_session_id, client, _thread, message_tx, handle) = + let skill = test_skill("disabled-demo", "Disabled demo skill", false)?; + let (_session_id, client, _thread, _, message_tx, handle) = setup_with_skills(vec![skill]).await?; let (load_response_tx, load_response_rx) = tokio::sync::oneshot::channel(); @@ -4968,19 +5002,8 @@ mod tests { #[tokio::test] async fn test_load_does_not_publish_skills_for_other_cwd() -> anyhow::Result<()> { - let skill = SkillMetadata { - name: "other-cwd".to_string(), - description: "Other cwd skill".to_string(), - short_description: None, - interface: None, - dependencies: None, - path: PathBuf::from("/tmp/other-cwd/SKILL.md").try_into()?, - scope: codex_protocol::protocol::SkillScope::Repo, - enabled: true, - }; - let (_session_id, client, thread, message_tx, handle) = - setup_with_skills(vec![skill]).await?; - thread.skills_entries.lock().unwrap()[0].cwd = PathBuf::from("/tmp/not-the-session-cwd"); + let (_session_id, client, _thread, _, message_tx, handle) = + setup_with_skills(vec![]).await?; let (load_response_tx, load_response_rx) = tokio::sync::oneshot::channel(); message_tx.send(ThreadMessage::Load { @@ -5014,17 +5037,9 @@ mod tests { #[tokio::test] async fn test_skill_command_creates_skill_user_input() -> anyhow::Result<()> { - let skill = SkillMetadata { - name: "demo".to_string(), - description: "Demo skill".to_string(), - short_description: None, - interface: None, - dependencies: None, - path: PathBuf::from("/tmp/demo/SKILL.md").try_into()?, - scope: codex_protocol::protocol::SkillScope::Repo, - enabled: true, - }; - let (session_id, _client, thread, message_tx, handle) = + let skill = test_skill("demo", "Demo skill", true)?; + let expected_path = skill.metadata.path_to_skills_md.to_path_buf(); + let (session_id, _client, thread, _, message_tx, handle) = setup_with_skills(vec![skill.clone()]).await?; let (load_response_tx, load_response_rx) = tokio::sync::oneshot::channel(); message_tx.send(ThreadMessage::Load { @@ -5058,7 +5073,7 @@ mod tests { assert!(skill_input.iter().any(|item| matches!( item, - UserInput::Skill { name, path } if name == "demo" && path == &skill.path.to_path_buf() + UserInput::Skill { name, path } if name == "demo" && path == &expected_path ))); assert!(skill_input.iter().any(|item| matches!( item, @@ -5070,27 +5085,9 @@ mod tests { #[tokio::test] async fn test_skills_update_available_refreshes_commands() -> anyhow::Result<()> { - let initial_skill = SkillMetadata { - name: "demo".to_string(), - description: "Demo skill".to_string(), - short_description: None, - interface: None, - dependencies: None, - path: PathBuf::from("/tmp/demo/SKILL.md").try_into()?, - scope: codex_protocol::protocol::SkillScope::Repo, - enabled: true, - }; - let refreshed_skill = SkillMetadata { - name: "second".to_string(), - description: "Second skill".to_string(), - short_description: None, - interface: None, - dependencies: None, - path: PathBuf::from("/tmp/second/SKILL.md").try_into()?, - scope: codex_protocol::protocol::SkillScope::Repo, - enabled: true, - }; - let (_session_id, client, thread, message_tx, handle) = + let initial_skill = test_skill("demo", "Demo skill", true)?; + let refreshed_skill = test_skill("second", "Second skill", true)?; + let (_session_id, client, thread, skills_provider, message_tx, handle) = setup_with_skills(vec![initial_skill]).await?; let (load_response_tx, load_response_rx) = tokio::sync::oneshot::channel(); @@ -5099,7 +5096,7 @@ mod tests { })?; drop(load_response_rx.await??); - thread.skills_entries.lock().unwrap()[0].skills = vec![refreshed_skill]; + *skills_provider.skills.lock().unwrap() = vec![refreshed_skill]; thread.op_tx.send(Event { id: "skill-update".to_string(), msg: EventMsg::SkillsUpdateAvailable, @@ -5122,6 +5119,7 @@ mod tests { SessionId, Arc, Arc, + Arc, UnboundedSender, tokio::task::JoinHandle<()>, )> { @@ -5129,11 +5127,12 @@ mod tests { } async fn setup_with_skills( - skills: Vec, + skills: Vec, ) -> anyhow::Result<( SessionId, Arc, Arc, + Arc, UnboundedSender, tokio::task::JoinHandle<()>, )> { @@ -5142,21 +5141,13 @@ mod tests { let session_client = SessionClient::with_client(session_id.clone(), client.clone(), Arc::default()); let conversation = Arc::new(StubCodexThread::new()); + let skills_provider = Arc::new(StubSkillsProvider::new(skills)); let models_manager = Arc::new(StubModelsManager); let config = Config::load_with_cli_overrides_and_harness_overrides( vec![], ConfigOverrides::default(), ) .await?; - conversation - .skills_entries - .lock() - .unwrap() - .push(SkillsListEntry { - cwd: config.cwd.clone().to_path_buf(), - skills, - errors: Vec::new(), - }); let (message_tx, message_rx) = tokio::sync::mpsc::unbounded_channel(); let (resolution_tx, resolution_rx) = tokio::sync::mpsc::unbounded_channel(); @@ -5165,6 +5156,7 @@ mod tests { session_client, conversation.clone(), models_manager, + skills_provider.clone(), config, message_rx, resolution_tx, @@ -5172,7 +5164,31 @@ mod tests { ); let handle = tokio::spawn(actor.spawn()); - Ok((session_id, client, conversation, message_tx, handle)) + Ok(( + session_id, + client, + conversation, + skills_provider, + message_tx, + handle, + )) + } + + fn test_skill(name: &str, description: &str, enabled: bool) -> anyhow::Result { + Ok(LoadedSkill::new( + SkillMetadata { + name: name.to_string(), + description: description.to_string(), + short_description: None, + interface: None, + dependencies: None, + policy: None, + path_to_skills_md: PathBuf::from(format!("/tmp/{name}/SKILL.md")).try_into()?, + scope: codex_protocol::protocol::SkillScope::Repo, + plugin_id: None, + }, + enabled, + )) } struct StubAuth; @@ -5198,11 +5214,32 @@ mod tests { } } + struct StubSkillsProvider { + skills: std::sync::Mutex>, + } + + impl StubSkillsProvider { + fn new(skills: Vec) -> Self { + Self { + skills: std::sync::Mutex::new(skills), + } + } + } + + impl SkillsProvider for StubSkillsProvider { + fn load_skills( + &self, + _config: Config, + _force_reload: bool, + ) -> Pin> + Send + '_>> { + Box::pin(async { self.skills.lock().unwrap().clone() }) + } + } + struct StubCodexThread { current_id: AtomicUsize, active_prompt_id: std::sync::Mutex>, ops: std::sync::Mutex>, - skills_entries: std::sync::Mutex>, op_tx: mpsc::UnboundedSender, op_rx: Mutex>, } @@ -5214,7 +5251,6 @@ mod tests { current_id: AtomicUsize::new(0), active_prompt_id: std::sync::Mutex::default(), ops: std::sync::Mutex::default(), - skills_entries: std::sync::Mutex::default(), op_tx, op_rx: Mutex::new(op_rx), } @@ -5487,6 +5523,30 @@ mod tests { }) .unwrap(); } + Op::ThreadRollback { .. } => { + self.op_tx + .send(Event { + id: id.to_string(), + msg: EventMsg::ThreadRolledBack( + codex_protocol::protocol::ThreadRolledBackEvent { + num_turns: 1, + }, + ), + }) + .unwrap(); + self.op_tx + .send(Event { + id: id.to_string(), + msg: EventMsg::TurnComplete(TurnCompleteEvent { + last_agent_message: None, + turn_id: id.to_string(), + completed_at: None, + duration_ms: None, + time_to_first_token_ms: None, + }), + }) + .unwrap(); + } Op::Review { review_request } => { self.op_tx .send(Event { @@ -5523,16 +5583,6 @@ mod tests { }) .unwrap(); } - Op::ListSkills { .. } => { - self.op_tx - .send(Event { - id: id.to_string(), - msg: EventMsg::ListSkillsResponse(ListSkillsResponseEvent { - skills: self.skills_entries.lock().unwrap().clone(), - }), - }) - .unwrap(); - } Op::ExecApproval { .. } | Op::ResolveElicitation { .. } | Op::RequestPermissionsResponse { .. } @@ -5642,7 +5692,7 @@ mod tests { #[tokio::test] async fn test_parallel_exec_commands() -> anyhow::Result<()> { - let (session_id, client, _, message_tx, _handle) = setup().await?; + let (session_id, client, _, _, message_tx, _handle) = setup().await?; let (prompt_response_tx, prompt_response_rx) = tokio::sync::oneshot::channel(); message_tx.send(ThreadMessage::Prompt { @@ -6038,6 +6088,7 @@ mod tests { SessionClient::with_client(session_id.clone(), client.clone(), Arc::default()); let conversation = Arc::new(StubCodexThread::new()); let models_manager = Arc::new(StubModelsManager); + let skills_provider = Arc::new(StubSkillsProvider::new(Vec::new())); let config = Config::load_with_cli_overrides_and_harness_overrides( vec![], ConfigOverrides::default(), @@ -6050,6 +6101,7 @@ mod tests { session_client, conversation.clone(), models_manager, + skills_provider, config, message_rx, resolution_tx,