Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 80 additions & 1 deletion codex-rs/core/src/codex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,9 @@ use crate::protocol::ReviewDecision;
use crate::protocol::SandboxCommandAssessment;
use crate::protocol::SandboxPolicy;
use crate::protocol::SessionConfiguredEvent;
use crate::protocol::SkillErrorInfo;
use crate::protocol::SkillInfo;
use crate::protocol::SkillLoadOutcomeInfo;
use crate::protocol::StreamErrorEvent;
use crate::protocol::Submission;
use crate::protocol::TokenCountEvent;
Expand All @@ -110,6 +113,10 @@ use crate::rollout::RolloutRecorderParams;
use crate::rollout::map_session_init_error;
use crate::shell;
use crate::shell_snapshot::ShellSnapshot;
use crate::skills::SkillInjections;
use crate::skills::SkillLoadOutcome;
use crate::skills::build_skill_injections;
use crate::skills::load_skills;
use crate::state::ActiveTurn;
use crate::state::SessionServices;
use crate::state::SessionState;
Expand Down Expand Up @@ -174,7 +181,31 @@ impl Codex {
let (tx_sub, rx_sub) = async_channel::bounded(SUBMISSION_CHANNEL_CAPACITY);
let (tx_event, rx_event) = async_channel::unbounded();

let user_instructions = get_user_instructions(&config).await;
let loaded_skills = if config.features.enabled(Feature::Skills) {
Some(load_skills(&config))
} else {
None
};

if let Some(outcome) = &loaded_skills {
for err in &outcome.errors {
error!(
"failed to load skill {}: {}",
err.path.display(),
err.message
);
}
}

let skills_outcome = loaded_skills.clone();

let user_instructions = get_user_instructions(
&config,
skills_outcome
.as_ref()
.map(|outcome| outcome.skills.as_slice()),
)
.await;

let exec_policy = load_exec_policy_for_features(&config.features, &config.codex_home)
.await
Expand Down Expand Up @@ -202,6 +233,7 @@ impl Codex {

// Generate a unique ID for the lifetime of this Codex session.
let session_source_clone = session_configuration.session_source.clone();

let session = Session::new(
session_configuration,
config.clone(),
Expand All @@ -210,6 +242,7 @@ impl Codex {
tx_event.clone(),
conversation_history,
session_source_clone,
skills_outcome.clone(),
)
.await
.map_err(|e| {
Expand Down Expand Up @@ -466,6 +499,7 @@ impl Session {
}
}

#[allow(clippy::too_many_arguments)]
async fn new(
session_configuration: SessionConfiguration,
config: Arc<Config>,
Expand All @@ -474,6 +508,7 @@ impl Session {
tx_event: Sender<Event>,
initial_history: InitialHistory,
session_source: SessionSource,
skills: Option<SkillLoadOutcome>,
) -> anyhow::Result<Arc<Self>> {
debug!(
"Configuring session: model={}; provider={:?}",
Expand Down Expand Up @@ -594,6 +629,7 @@ impl Session {
otel_event_manager,
models_manager: Arc::clone(&models_manager),
tool_approvals: Mutex::new(ApprovalStore::default()),
skills: skills.clone(),
};

let sess = Arc::new(Session {
Expand All @@ -609,6 +645,7 @@ impl Session {
// Dispatch the SessionConfiguredEvent first and then report any errors.
// If resuming, include converted initial messages in the payload so UIs can render them immediately.
let initial_messages = initial_history.get_event_msgs();
let skill_load_outcome = skill_load_outcome_for_client(skills.as_ref());

let events = std::iter::once(Event {
id: INITIAL_SUBMIT_ID.to_owned(),
Expand All @@ -623,6 +660,7 @@ impl Session {
history_log_id,
history_entry_count,
initial_messages,
skill_load_outcome,
rollout_path,
}),
})
Expand Down Expand Up @@ -2017,6 +2055,30 @@ async fn spawn_review_thread(
.await;
}

fn skill_load_outcome_for_client(
outcome: Option<&SkillLoadOutcome>,
) -> Option<SkillLoadOutcomeInfo> {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think we can move SkillLoadOutcome to the protocol directly and avoid double copies?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using the protocol type inside core would couple the loader/cache to the external serde/TS model. The current split keeps boundaries clean, and the mapping is trivial. wdyt?

outcome.map(|outcome| SkillLoadOutcomeInfo {
skills: outcome
.skills
.iter()
.map(|skill| SkillInfo {
name: skill.name.clone(),
description: skill.description.clone(),
path: skill.path.clone(),
})
.collect(),
errors: outcome
.errors
.iter()
.map(|err| SkillErrorInfo {
path: err.path.clone(),
message: err.message.clone(),
})
.collect(),
})
}

/// Takes a user message as input and runs a loop where, at each turn, the model
/// replies with either:
///
Expand Down Expand Up @@ -2045,11 +2107,26 @@ pub(crate) async fn run_task(
});
sess.send_event(&turn_context, event).await;

let SkillInjections {
items: skill_items,
warnings: skill_warnings,
} = build_skill_injections(&input, sess.services.skills.as_ref()).await;

for message in skill_warnings {
sess.send_event(&turn_context, EventMsg::Warning(WarningEvent { message }))
.await;
}

let initial_input_for_turn: ResponseInputItem = ResponseInputItem::from(input);
let response_item: ResponseItem = initial_input_for_turn.clone().into();
sess.record_response_item_and_emit_turn_item(turn_context.as_ref(), response_item)
.await;

if !skill_items.is_empty() {
sess.record_conversation_items(&turn_context, &skill_items)
.await;
}

sess.maybe_start_ghost_snapshot(Arc::clone(&turn_context), cancellation_token.child_token())
.await;
let mut last_agent_message: Option<String> = None;
Expand Down Expand Up @@ -2894,6 +2971,7 @@ mod tests {
otel_event_manager: otel_event_manager.clone(),
models_manager,
tool_approvals: Mutex::new(ApprovalStore::default()),
skills: None,
};

let turn_context = Session::make_turn_context(
Expand Down Expand Up @@ -2976,6 +3054,7 @@ mod tests {
otel_event_manager: otel_event_manager.clone(),
models_manager,
tool_approvals: Mutex::new(ApprovalStore::default()),
skills: None,
};

let turn_context = Arc::new(Session::make_turn_context(
Expand Down
29 changes: 20 additions & 9 deletions codex-rs/core/src/event_mapping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use codex_protocol::user_input::UserInput;
use tracing::warn;
use uuid::Uuid;

use crate::user_instructions::SkillInstructions;
use crate::user_instructions::UserInstructions;
use crate::user_shell_command::is_user_shell_command_text;

Expand All @@ -23,7 +24,9 @@ fn is_session_prefix(text: &str) -> bool {
}

fn parse_user_message(message: &[ContentItem]) -> Option<UserMessageItem> {
if UserInstructions::is_user_instructions(message) {
if UserInstructions::is_user_instructions(message)
|| SkillInstructions::is_skill_instructions(message)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

needs a test. see below in this file

{
return None;
}

Expand Down Expand Up @@ -198,14 +201,22 @@ mod tests {
text: "# AGENTS.md instructions for test_directory\n\n<INSTRUCTIONS>\ntest_text\n</INSTRUCTIONS>".to_string(),
}],
},
ResponseItem::Message {
id: None,
role: "user".to_string(),
content: vec![ContentItem::InputText {
text: "<user_shell_command>echo 42</user_shell_command>".to_string(),
}],
},
];
ResponseItem::Message {
id: None,
role: "user".to_string(),
content: vec![ContentItem::InputText {
text: "<skill>\n<name>demo</name>\n<path>skills/demo/SKILL.md</path>\nbody\n</skill>"
.to_string(),
}],
},
ResponseItem::Message {
id: None,
role: "user".to_string(),
content: vec![ContentItem::InputText {
text: "<user_shell_command>echo 42</user_shell_command>".to_string(),
}],
},
];

for item in items {
let turn_item = parse_turn_item(&item);
Expand Down
Loading
Loading