diff --git a/.github/workflows/copilot-review-notify.yml b/.github/workflows/copilot-review-notify.yml new file mode 100644 index 00000000000..698c25504a4 --- /dev/null +++ b/.github/workflows/copilot-review-notify.yml @@ -0,0 +1,36 @@ +name: copilot-review-notify + +on: + pull_request_review: + types: [submitted] + +jobs: + comment: + if: github.event.review.user.login == 'github-copilot[bot]' + runs-on: ubuntu-latest + permissions: + pull-requests: write + contents: read + steps: + - name: Add comment noting Copilot review + uses: actions/github-script@v7 + with: + script: | + const owner = context.repo.owner; + const repo = context.repo.repo; + const number = context.payload.pull_request.number; + const url = context.payload.pull_request.html_url; + await github.rest.issues.createComment({ + owner, + repo, + issue_number: number, + body: `Automated: GitHub Copilot has submitted a review for ${url}.` + }); + - name: Optional Slack notify + if: ${{ secrets.SLACK_WEBHOOK_URL != '' }} + env: + SLACK_WEBHOOK_URL: ${{ secrets.SLACK_WEBHOOK_URL }} + run: | + pr_url="${{ github.event.pull_request.html_url }}" + payload=$(jq -n --arg text "Copilot reviewed $pr_url" '{text: $text}') + curl -X POST -H 'Content-type: application/json' --data "$payload" "$SLACK_WEBHOOK_URL" diff --git a/.github/workflows/wait-for-copilot.yml b/.github/workflows/wait-for-copilot.yml new file mode 100644 index 00000000000..b6f8f250904 --- /dev/null +++ b/.github/workflows/wait-for-copilot.yml @@ -0,0 +1,48 @@ +name: wait-for-copilot + +on: + pull_request: + types: [opened, synchronize, reopened] + +jobs: + wait: + name: Wait for Copilot review + runs-on: ubuntu-latest + permissions: + pull-requests: read + contents: read + concurrency: + group: wait-for-copilot-${{ github.event.pull_request.number }} + cancel-in-progress: true + steps: + - name: Poll for Copilot review (max 15m) + uses: actions/github-script@v7 + with: + script: | + const pr = context.payload.pull_request; + const owner = context.repo.owner; + const repo = context.repo.repo; + const number = pr.number; + const deadline = Date.now() + 15 * 60 * 1000; // 15 minutes + const botLogin = 'github-copilot[bot]'; + + async function hasCopilotFeedback() { + const [reviews, comments] = await Promise.all([ + github.rest.pulls.listReviews({ owner, repo, pull_number: number, per_page: 100 }), + github.rest.issues.listComments({ owner, repo, issue_number: number, per_page: 100 }), + ]); + const anyReview = reviews.data.some(r => (r.user?.login || '') === botLogin); + const anyComment = comments.data.some(c => (c.user?.login || '') === botLogin); + core.info(`Copilot review detected: ${anyReview}, comment detected: ${anyComment}`); + return anyReview || anyComment; + } + + while (Date.now() < deadline) { + if (await hasCopilotFeedback()) { + core.notice('Copilot review found.'); + return; + } + await new Promise(r => setTimeout(r, 20_000)); + } + core.setFailed('Copilot review not found within 15 minutes.'); + diff --git a/AGENTS.md b/AGENTS.md index 288719447e4..023422b6c15 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -65,3 +65,53 @@ If you don’t have the tool: ### Test assertions - Tests should use pretty_assertions::assert_eq for clearer diffs. Import this at the top of the test module if it isn't already. + +## PR Autopilot (agent policy) + +The agent may prepare and update PRs when explicitly asked. It follows these guardrails: + +- Preconditions + - The user says: “prepare PR”, “update PR”, “trigger Codex review”, “post PR comment”, or “generate review bundle”. + - `gh` is authenticated and SSO‑authorized for the target org/repo. + - Remotes `origin` (fork) and `upstream` (main) are configured. + +- Steps (prepare PR) + - Scope check: `git diff --name-only origin/main...HEAD` and confirm only intended files are touched. + - Formatting + lint: `cd codex-rs && just fmt` and `just fix -p ` for touched crates. + - Targeted tests for touched crates (e.g., `cargo test -p codex-tui`). + - Create/checkout a branch, `git add -A`, `git commit`, `git push --set-upstream origin `. + - Open/update PR with `gh pr create`/`gh pr edit` using a local markdown body file. + +- Steps (trigger Codex Review) + - `git fetch upstream && git rebase upstream/main`. + - `git commit --allow-empty -m "chore: re-run Codex Review" && git push --force-with-lease`. + +- Steps (comments/body) + - `gh pr comment -R --body-file `. + - `gh pr edit -R --body-file `. + +- Guardrails + - Never push unrelated diffs; trim PRs to the intended files. + - Always prefer `--force-with-lease` over `--force`. + - For larger changes, generate a single‑file review bundle (see scripts below) and include a short “How to verify” section. + +## PR helpers (scripts + just targets) + +We keep a few lightweight commands so the agent (and humans) can perform routine PR tasks reliably. + +- `scripts/pr.sh` (invoked by just targets) + - `prepare `: run fmt/fix/tests, push branch, and create/update PR. + - `comment `: post a PR comment from a file. + - `body `: update PR body from a file. + - `trigger`: create an empty commit and push to re‑run Codex Review. + +- `codex-rs/justfile` (targets) + - `pr-fmt`: `cd codex-rs && just fmt` + - `pr-fix-`: `cd codex-rs && just fix -p ` + - `pr-tests-`: e.g., `cd codex-rs && cargo test -p codex-tui` + - `pr-trigger`: empty commit + push with lease + - `pr-body PR= FILE=`: update PR body + - `pr-comment PR= FILE=`: post PR comment + - `review-bundle FILE=`: generate a single‑file gist for code review (diffs + excerpts + test plan) + +These helpers are safe defaults the agent can call when asked; they also make human review runs reproducible. diff --git a/codex-rs/Cargo.lock b/codex-rs/Cargo.lock index 198c986d87f..be867f8be82 100644 --- a/codex-rs/Cargo.lock +++ b/codex-rs/Cargo.lock @@ -671,7 +671,7 @@ dependencies = [ "serde_json", "tempfile", "tokio", - "toml", + "toml 0.9.7", "tracing", "tracing-subscriber", "uuid", @@ -839,7 +839,7 @@ dependencies = [ "codex-core", "codex-protocol", "serde", - "toml", + "toml 0.9.7", ] [[package]] @@ -862,7 +862,7 @@ dependencies = [ "codex-protocol", "codex-rmcp-client", "core_test_support", - "dirs", + "dirs 6.0.0", "env-flags", "escargot", "eventsource-stream", @@ -893,8 +893,8 @@ dependencies = [ "tokio", "tokio-test", "tokio-util", - "toml", - "toml_edit", + "toml 0.9.7", + "toml_edit 0.23.6", "tracing", "tracing-test", "tree-sitter", @@ -1047,10 +1047,12 @@ version = "0.0.0" dependencies = [ "anyhow", "assert_cmd", + "async-trait", "codex-arg0", "codex-common", "codex-core", "codex-protocol", + "codex-scheduler", "codex-utils-json-to-toml", "core_test_support", "mcp-types", @@ -1176,6 +1178,29 @@ dependencies = [ "tracing", ] +[[package]] +name = "codex-scheduler" +version = "0.1.0" +dependencies = [ + "anyhow", + "async-trait", + "chrono", + "cron", + "dirs 5.0.1", + "hostname", + "once_cell", + "rand 0.9.2", + "reqwest", + "serde", + "serde_json", + "thiserror 1.0.69", + "tokio", + "toml 0.8.23", + "tracing", + "uuid", + "which", +] + [[package]] name = "codex-tui" version = "0.0.0" @@ -1199,7 +1224,7 @@ dependencies = [ "color-eyre", "crossterm", "diffy", - "dirs", + "dirs 6.0.0", "image", "insta", "itertools 0.14.0", @@ -1239,7 +1264,7 @@ version = "0.0.0" dependencies = [ "pretty_assertions", "serde_json", - "toml", + "toml 0.9.7", ] [[package]] @@ -1396,6 +1421,17 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "cron" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6f8c3e73077b4b4a6ab1ea5047c37c57aee77657bc8ecd6f29b0af082d0b0c07" +dependencies = [ + "chrono", + "nom", + "once_cell", +] + [[package]] name = "crossbeam-channel" version = "0.5.15" @@ -1692,13 +1728,22 @@ dependencies = [ "crypto-common", ] +[[package]] +name = "dirs" +version = "5.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "44c45a9d03d6676652bcb5e724c7e988de1acad23a711b5217ab9cbecbec2225" +dependencies = [ + "dirs-sys 0.4.1", +] + [[package]] name = "dirs" version = "6.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c3e8aa94d75141228480295a7d0e7feb620b1a5ad9f12bc40be62411e38cce4e" dependencies = [ - "dirs-sys", + "dirs-sys 0.5.0", ] [[package]] @@ -1711,6 +1756,18 @@ dependencies = [ "dirs-sys-next", ] +[[package]] +name = "dirs-sys" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "520f05a5cbd335fae5a99ff7a6ab8627577660ee5cfd6a94a6a929b52ff0321c" +dependencies = [ + "libc", + "option-ext", + "redox_users 0.4.6", + "windows-sys 0.48.0", +] + [[package]] name = "dirs-sys" version = "0.5.0" @@ -2390,6 +2447,17 @@ dependencies = [ "windows-sys 0.59.0", ] +[[package]] +name = "hostname" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a56f203cd1c76362b69e3863fd987520ac36cf70a8c92627449b2f64a8cf7d65" +dependencies = [ + "cfg-if", + "libc", + "windows-link 0.1.3", +] + [[package]] name = "http" version = "1.3.1" @@ -4064,7 +4132,7 @@ dependencies = [ "once_cell", "socket2 0.5.10", "tracing", - "windows-sys 0.52.0", + "windows-sys 0.59.0", ] [[package]] @@ -4729,6 +4797,15 @@ dependencies = [ "syn 2.0.106", ] +[[package]] +name = "serde_spanned" +version = "0.6.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bf41e0cfaf7226dca15e8197172c295a782857fcb97fad1808a166870dee75a3" +dependencies = [ + "serde", +] + [[package]] name = "serde_spanned" version = "1.0.2" @@ -5519,6 +5596,18 @@ dependencies = [ "tokio", ] +[[package]] +name = "toml" +version = "0.8.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dc1beb996b9d83529a9e75c17a1686767d148d70663143c7854d8b4a09ced362" +dependencies = [ + "serde", + "serde_spanned 0.6.9", + "toml_datetime 0.6.11", + "toml_edit 0.22.27", +] + [[package]] name = "toml" version = "0.9.7" @@ -5527,13 +5616,22 @@ checksum = "00e5e5d9bf2475ac9d4f0d9edab68cc573dc2fd644b0dba36b0c30a92dd9eaa0" dependencies = [ "indexmap 2.11.4", "serde_core", - "serde_spanned", - "toml_datetime", + "serde_spanned 1.0.2", + "toml_datetime 0.7.2", "toml_parser", "toml_writer", "winnow", ] +[[package]] +name = "toml_datetime" +version = "0.6.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "22cddaf88f4fbc13c51aebbf5f8eceb5c7c5a9da2ac40a13519eb5b0a0e8f11c" +dependencies = [ + "serde", +] + [[package]] name = "toml_datetime" version = "0.7.2" @@ -5543,6 +5641,20 @@ dependencies = [ "serde_core", ] +[[package]] +name = "toml_edit" +version = "0.22.27" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "41fe8c660ae4257887cf66394862d21dbca4a6ddd26f04a3560410406a2f819a" +dependencies = [ + "indexmap 2.11.4", + "serde", + "serde_spanned 0.6.9", + "toml_datetime 0.6.11", + "toml_write", + "winnow", +] + [[package]] name = "toml_edit" version = "0.23.6" @@ -5550,7 +5662,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f3effe7c0e86fdff4f69cdd2ccc1b96f933e24811c5441d44904e8683e27184b" dependencies = [ "indexmap 2.11.4", - "toml_datetime", + "toml_datetime 0.7.2", "toml_parser", "toml_writer", "winnow", @@ -5565,6 +5677,12 @@ dependencies = [ "winnow", ] +[[package]] +name = "toml_write" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5d99f8c9a7727884afe522e9bd5edbfc91a3312b36a77b5fb8926e4c31a41801" + [[package]] name = "toml_writer" version = "1.0.3" @@ -6356,6 +6474,15 @@ dependencies = [ "windows-targets 0.42.2", ] +[[package]] +name = "windows-sys" +version = "0.48.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "677d2418bec65e3338edb076e806bc1ec15693c5d0104683f2efe857f61056a9" +dependencies = [ + "windows-targets 0.48.5", +] + [[package]] name = "windows-sys" version = "0.52.0" @@ -6407,6 +6534,21 @@ dependencies = [ "windows_x86_64_msvc 0.42.2", ] +[[package]] +name = "windows-targets" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a2fa6e2155d7247be68c096456083145c183cbbbc2764150dda45a87197940c" +dependencies = [ + "windows_aarch64_gnullvm 0.48.5", + "windows_aarch64_msvc 0.48.5", + "windows_i686_gnu 0.48.5", + "windows_i686_msvc 0.48.5", + "windows_x86_64_gnu 0.48.5", + "windows_x86_64_gnullvm 0.48.5", + "windows_x86_64_msvc 0.48.5", +] + [[package]] name = "windows-targets" version = "0.52.6" @@ -6455,6 +6597,12 @@ version = "0.42.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "597a5118570b68bc08d8d59125332c54f1ba9d9adeedeef5b99b02ba2b0698f8" +[[package]] +name = "windows_aarch64_gnullvm" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2b38e32f0abccf9987a4e3079dfb67dcd799fb61361e53e2882c3cbaf0d905d8" + [[package]] name = "windows_aarch64_gnullvm" version = "0.52.6" @@ -6473,6 +6621,12 @@ version = "0.42.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e08e8864a60f06ef0d0ff4ba04124db8b0fb3be5776a5cd47641e942e58c4d43" +[[package]] +name = "windows_aarch64_msvc" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dc35310971f3b2dbbf3f0690a219f40e2d9afcf64f9ab7cc1be722937c26b4bc" + [[package]] name = "windows_aarch64_msvc" version = "0.52.6" @@ -6491,6 +6645,12 @@ version = "0.42.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c61d927d8da41da96a81f029489353e68739737d3beca43145c8afec9a31a84f" +[[package]] +name = "windows_i686_gnu" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a75915e7def60c94dcef72200b9a8e58e5091744960da64ec734a6c6e9b3743e" + [[package]] name = "windows_i686_gnu" version = "0.52.6" @@ -6521,6 +6681,12 @@ version = "0.42.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "44d840b6ec649f480a41c8d80f9c65108b92d89345dd94027bfe06ac444d1060" +[[package]] +name = "windows_i686_msvc" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f55c233f70c4b27f66c523580f78f1004e8b5a8b659e05a4eb49d4166cca406" + [[package]] name = "windows_i686_msvc" version = "0.52.6" @@ -6539,6 +6705,12 @@ version = "0.42.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8de912b8b8feb55c064867cf047dda097f92d51efad5b491dfb98f6bbb70cb36" +[[package]] +name = "windows_x86_64_gnu" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "53d40abd2583d23e4718fddf1ebec84dbff8381c07cae67ff7768bbf19c6718e" + [[package]] name = "windows_x86_64_gnu" version = "0.52.6" @@ -6557,6 +6729,12 @@ version = "0.42.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "26d41b46a36d453748aedef1486d5c7a85db22e56aff34643984ea85514e94a3" +[[package]] +name = "windows_x86_64_gnullvm" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b7b52767868a23d5bab768e390dc5f5c55825b6d30b86c844ff2dc7414044cc" + [[package]] name = "windows_x86_64_gnullvm" version = "0.52.6" @@ -6575,6 +6753,12 @@ version = "0.42.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9aec5da331524158c6d1a4ac0ab1541149c0b9505fde06423b02f5ef0106b9f0" +[[package]] +name = "windows_x86_64_msvc" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ed94fce61571a4006852b7389a063ab983c02eb1bb37b47f8272ce92d06d9538" + [[package]] name = "windows_x86_64_msvc" version = "0.52.6" diff --git a/codex-rs/Cargo.toml b/codex-rs/Cargo.toml index 01dc47a70de..8162e25651c 100644 --- a/codex-rs/Cargo.toml +++ b/codex-rs/Cargo.toml @@ -32,6 +32,7 @@ members = [ "git-apply", "utils/json-to-toml", "utils/readiness", + "scheduler", ] resolver = "2" diff --git a/codex-rs/justfile b/codex-rs/justfile index 9ddc4a37aac..bafc7110b23 100644 --- a/codex-rs/justfile +++ b/codex-rs/justfile @@ -45,3 +45,20 @@ test: # Run the MCP server mcp-server-run *args: cargo run -p codex-mcp-server -- "$@" + +# --- PR helpers --- + +pr-fmt: + cd .. && cd codex-rs && just fmt + +pr-trigger: + cd .. && git commit --allow-empty -m "chore: re-run Codex Review" && git push --force-with-lease + +pr-body PR FILE: + cd .. && gh pr edit {{PR}} -F {{FILE}} + +pr-comment PR FILE: + cd .. && gh pr comment {{PR}} -F {{FILE}} + +review-bundle OUT='local/review_bundles/single/review.md': + cd .. && OUTDIR=$(dirname {{OUT}}) && mkdir -p "$OUTDIR" && OUT={{OUT}} && DATE=$(date +%F) && WT=$(git diff || true) && printf "# Codex Review Bundle (Single File)\n\n- Date: %s\n\n## Diffs (working tree)\n\n\`\`\`diff\n%s\n\`\`\`\n\n## Test Plan\n- cd codex-rs && just fmt\n- cargo test -p codex-tui\n" "$DATE" "$WT" > "$OUT" && gh gist create -p -d "Codex Review Bundle $DATE" "$OUT" | tail -n1 diff --git a/codex-rs/mcp-server/Cargo.toml b/codex-rs/mcp-server/Cargo.toml index 484af6d8e83..a7b9dcfa71b 100644 --- a/codex-rs/mcp-server/Cargo.toml +++ b/codex-rs/mcp-server/Cargo.toml @@ -35,6 +35,8 @@ tokio = { workspace = true, features = [ ] } tracing = { workspace = true, features = ["log"] } tracing-subscriber = { workspace = true, features = ["env-filter", "fmt"] } +codex-scheduler = { path = "../scheduler", optional = true } +async-trait = { version = "0.1", optional = true } [dev-dependencies] assert_cmd = { workspace = true } @@ -44,3 +46,9 @@ os_info = { workspace = true } pretty_assertions = { workspace = true } tempfile = { workspace = true } wiremock = { workspace = true } + +[features] +# Do not enable scheduler by default. +default = [] +# Opt-in scheduler integration +scheduler = ["dep:codex-scheduler", "dep:async-trait"] diff --git a/codex-rs/mcp-server/src/context_injector.rs b/codex-rs/mcp-server/src/context_injector.rs new file mode 100644 index 00000000000..a90c54fceff --- /dev/null +++ b/codex-rs/mcp-server/src/context_injector.rs @@ -0,0 +1,11 @@ +#![allow(dead_code)] +//! Optional context injection hooks from notifications into live conversations. + +#[derive(Clone, Default)] +pub struct ContextInjector; + +impl ContextInjector { + pub fn new() -> Self { Self } + pub fn inject(&self, _conversation_id: &str, _context: &str) {} +} + diff --git a/codex-rs/mcp-server/src/main.rs b/codex-rs/mcp-server/src/main.rs index 314944fab57..a7a4a612cf6 100644 --- a/codex-rs/mcp-server/src/main.rs +++ b/codex-rs/mcp-server/src/main.rs @@ -4,7 +4,16 @@ use codex_mcp_server::run_main; fn main() -> anyhow::Result<()> { arg0_dispatch_or_else(|codex_linux_sandbox_exe| async move { - run_main(codex_linux_sandbox_exe, CliConfigOverrides::default()).await?; + // Launch the MCP server. + let res = run_main(codex_linux_sandbox_exe, CliConfigOverrides::default()).await; + // Opportunistically start scheduler when feature is enabled; it is no-op otherwise. + #[cfg(feature = "scheduler")] + { + // Note: this call does not change behavior unless [scheduler] is enabled in ~/.codex/config.toml + // and Arango config is present. + super::scheduler_bootstrap::start_if_enabled(); + } + res?; Ok(()) }) } diff --git a/codex-rs/mcp-server/src/notify_watcher.rs b/codex-rs/mcp-server/src/notify_watcher.rs new file mode 100644 index 00000000000..f63d68f5d36 --- /dev/null +++ b/codex-rs/mcp-server/src/notify_watcher.rs @@ -0,0 +1,21 @@ +#![allow(dead_code)] +//! Notification watcher bridge (feature-gated). + +#[cfg(feature = "scheduler")] +pub struct NotificationWatcher; + +#[cfg(feature = "scheduler")] +impl NotificationWatcher { + pub fn new() -> Self { Self } + pub fn spawn(self) {} +} + +#[cfg(not(feature = "scheduler"))] +pub struct NotificationWatcher; + +#[cfg(not(feature = "scheduler"))] +impl NotificationWatcher { + pub fn new() -> Self { Self } + pub fn spawn(self) {} +} + diff --git a/codex-rs/mcp-server/src/scheduler_bootstrap.rs b/codex-rs/mcp-server/src/scheduler_bootstrap.rs new file mode 100644 index 00000000000..d78c265cb51 --- /dev/null +++ b/codex-rs/mcp-server/src/scheduler_bootstrap.rs @@ -0,0 +1,18 @@ +#![allow(dead_code)] +//! Feature-gated bootstrap for the config-driven scheduler. + +#[cfg(feature = "scheduler")] +pub fn start_if_enabled() { + // Fire-and-forget task; the scheduler handles its own config gating. + tokio::spawn(async move { + if let Err(e) = codex_scheduler::start_scheduler_if_configured().await { + tracing::warn!("scheduler bootstrap failed: {e:#}"); + } + }); +} + +#[cfg(not(feature = "scheduler"))] +pub fn start_if_enabled() { + // no-op when feature is disabled +} + diff --git a/codex-rs/scheduler/Cargo.toml b/codex-rs/scheduler/Cargo.toml new file mode 100644 index 00000000000..acf79c7e093 --- /dev/null +++ b/codex-rs/scheduler/Cargo.toml @@ -0,0 +1,23 @@ +[package] +name = "codex-scheduler" +version = "0.1.0" +edition = "2021" + +[dependencies] +anyhow = "1" +async-trait = "0.1" +chrono = { version = "0.4", features = ["serde"] } +cron = "0.12" +once_cell = "1.19" +serde = { version = "1", features = ["derive"] } +serde_json = "1" +thiserror = "1" +tokio = { version = "1", features = ["rt-multi-thread", "macros", "process", "time", "fs", "io-util"] } +tracing = "0.1" +uuid = { version = "1", features = ["v4", "serde"] } +which = "6" +reqwest = { version = "0.12", default-features = false, features = ["json", "rustls-tls"] } +dirs = "5" +toml = "0.8" +hostname = "0.4" +rand = { workspace = true } diff --git a/codex-rs/scheduler/src/config.rs b/codex-rs/scheduler/src/config.rs new file mode 100644 index 00000000000..30c36061f43 --- /dev/null +++ b/codex-rs/scheduler/src/config.rs @@ -0,0 +1,212 @@ +use anyhow::anyhow; +use anyhow::Context; +use anyhow::Result; +use dirs::home_dir; +use serde::Deserialize; +use serde::Serialize; +use std::path::PathBuf; + +#[derive(Debug, Clone, Deserialize, Serialize)] +pub struct SchedulerJob { + pub id: String, + pub cron: String, + pub prompt: String, + pub cwd: Option, + pub model: Option, + pub profile: Option, + pub sandbox: Option, + pub approval_policy: Option, + pub max_duration_seconds: Option, + pub retry_limit: Option, + pub retry_backoff_seconds: Option, + pub resume_conversation_id: Option, + pub tags: Option>, +} + +#[derive(Debug, Clone, Deserialize, Serialize)] +pub struct SchedulerNotifications { + pub enabled: Option, + pub on: Option>, // kinds to inject on + pub default_conversation_id: Option, + pub routes: Option>, // job_id -> conv_id +} + +#[derive(Debug, Clone, Deserialize, Serialize)] +pub struct SchedulerConfig { + pub enabled: bool, + pub max_concurrency: Option, + pub poll_interval_seconds: Option, + pub lock_key: Option, + pub lock_ttl_seconds: Option, + + pub default_model: Option, + pub default_profile: Option, + pub default_sandbox: Option, + pub default_approval_policy: Option, + pub default_cwd: Option, + pub default_max_duration_seconds: Option, + pub default_retry_limit: Option, + pub default_retry_backoff_seconds: Option, + + /// Enforce HTTPS and rustls; when true, http:// URLs will be rejected + /// even if `allow_insecure=true` on the ArangoDB config. + #[serde(default)] + pub strict_tls: bool, + + /// Proximity window (ms) within which a cron is considered due on a poll tick. + #[serde(default = "default_due_window_ms")] + pub due_window_ms: u64, + + /// Max write retries for notify/events batch. + #[serde(default = "default_max_retries")] + pub max_write_retries: usize, + /// Base backoff in ms (jittered exponential). + #[serde(default = "default_backoff_base_ms")] + pub backoff_base_ms: u64, + /// Jitter ratio (0.0 - 1.0) applied to backoff (default 0.25). + #[serde(default = "default_backoff_jitter")] + pub backoff_jitter: f32, + + pub notifications: Option, + #[serde(default)] + pub jobs: Vec, +} + +#[derive(Clone, Deserialize, Serialize)] +pub struct ArangoConfig { + pub url: String, + pub database: String, + pub username: String, + pub password: String, + pub runs_collection: String, + pub events_collection: String, + pub notifications_collection: String, + pub state_collection: String, + /// Allow http:// URLs only when explicitly set. Default: false + #[serde(default)] + pub allow_insecure: bool, +} + +impl std::fmt::Debug for ArangoConfig { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("ArangoConfig") + .field("url", &self.url) + .field("database", &self.database) + .field("username", &self.username) + .field("password", &"") + .field("runs_collection", &self.runs_collection) + .field("events_collection", &self.events_collection) + .field("notifications_collection", &self.notifications_collection) + .field("state_collection", &self.state_collection) + .field("allow_insecure", &self.allow_insecure) + .finish() + } +} + +#[derive(Debug, Clone, Deserialize)] +struct RootToml { + pub scheduler: Option, + #[serde(rename = "database.arango")] + pub database_arango: Option, +} + +pub async fn load_scheduler_config_from_toml( + path: Option, +) -> Result> { + let cfg_path = if let Some(p) = path { + p + } else { + let mut p = home_dir().context("cannot locate home dir")?; + p.push(".codex"); + p.push("config.toml"); + p + }; + let data = tokio::fs::read(&cfg_path) + .await + .with_context(|| format!("failed reading {:?}", cfg_path))?; + let toml_str = String::from_utf8_lossy(&data); + let root: RootToml = + toml::from_str(&toml_str).with_context(|| format!("failed parsing TOML {:?}", cfg_path))?; + let Some(sched) = root.scheduler else { + return Ok(None); + }; + if !sched.enabled { + return Ok(None); + } + let arango = root + .database_arango + .context("missing [database.arango] config while scheduler is enabled")?; + Ok(Some((sched, arango))) +} + +#[allow(dead_code)] +pub fn now_iso() -> String { + chrono::Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Millis, true) +} +#[allow(dead_code)] +pub fn plus_seconds_iso(secs: i64) -> String { + (chrono::Utc::now() + chrono::Duration::seconds(secs)) + .to_rfc3339_opts(chrono::SecondsFormat::Millis, true) +} + +impl SchedulerConfig { + /// Parse from a TOML string and validate URL scheme according to + /// `strict_tls` and `database.arango.allow_insecure`. + pub fn from_toml(src: &str) -> Result<(Self, ArangoConfig)> { + #[derive(Deserialize)] + struct DbBlock { + arango: Option, + } + #[derive(Deserialize)] + struct Root2 { + scheduler: Option, + database: Option, + } + + let root: Root2 = toml::from_str(src)?; + let sched = root + .scheduler + .ok_or_else(|| anyhow!("missing [scheduler] section"))?; + if !sched.enabled { + return Err(anyhow!("scheduler disabled")); + } + let arango = root + .database + .and_then(|d| d.arango) + .ok_or_else(|| anyhow!("missing [database.arango] section"))?; + + // URL scheme validation + let strict = sched.strict_tls; + if strict && !arango.url.starts_with("https://") { + return Err(anyhow!( + "strict_tls=true requires Arango URL to be https://" + )); + } + if !(arango.url.starts_with("https://") + || (arango.allow_insecure && arango.url.starts_with("http://"))) + { + return Err(anyhow!( + "Arango URL must be https:// (set allow_insecure=true to permit http:// for local dev)" + )); + } + + if !(0.0..=1.0).contains(&sched.backoff_jitter) { + return Err(anyhow!("backoff_jitter must be between 0.0 and 1.0")); + } + + Ok((sched, arango)) + } +} + +fn default_due_window_ms() -> u64 { + 5_000 +} +fn default_max_retries() -> usize { + 5 +} +fn default_backoff_base_ms() -> u64 { + 200 +} +fn default_backoff_jitter() -> f32 { + 0.25 +} diff --git a/codex-rs/scheduler/src/cronloop.rs b/codex-rs/scheduler/src/cronloop.rs new file mode 100644 index 00000000000..9a5e4ef6720 --- /dev/null +++ b/codex-rs/scheduler/src/cronloop.rs @@ -0,0 +1,172 @@ +use crate::config::SchedulerConfig; +use crate::config::SchedulerJob; +use crate::db::Db; +use crate::runner::execute_run; +use crate::runner::RunRequest; +use anyhow::Context; +use anyhow::Result; +use chrono::DateTime; +use chrono::Utc; +use cron::Schedule; +use hostname; +use std::str::FromStr; +use std::sync::Arc; +use std::time::Duration; +use tokio::sync::Semaphore; +use tokio::time::sleep; +use tracing::warn; +use uuid::Uuid; + +pub async fn run(cfg: SchedulerConfig, db: Db) -> Result<()> { + let owner = format!( + "{}-{}", + hostname::get().unwrap_or_default().to_string_lossy(), + Uuid::new_v4() + ); + let lock_key = cfg + .lock_key + .clone() + .unwrap_or_else(|| "codex-scheduler".to_string()); + let lock_ttl = cfg.lock_ttl_seconds.unwrap_or(60); + let poll_secs = cfg.poll_interval_seconds.unwrap_or(5); + let due_window = std::time::Duration::from_millis(cfg.due_window_ms.max(1)); + + let sem = Arc::new(Semaphore::new(cfg.max_concurrency.unwrap_or(2))); + let mut compiled: Vec<(SchedulerJob, Schedule)> = vec![]; + for j in &cfg.jobs { + let sched = Schedule::from_str(&j.cron) + .with_context(|| format!("invalid cron for job {}", j.id))?; + compiled.push((j.clone(), sched)); + } + + loop { + if !try_acquire_lock(&db, &lock_key, &owner, lock_ttl).await? { + sleep(Duration::from_secs(poll_secs)).await; + continue; + } + let _ = renew_lock(&db, &lock_key, &owner, lock_ttl).await; + + let now = chrono::Utc::now(); + for (job, schedule) in &compiled { + if let Some(next) = schedule.after(&now).next() { + if is_due_within(next, now, due_window) { + if let Ok(permit) = sem.clone().try_acquire_owned() { + let dbc = db.clone(); + let jobc = job.clone(); + tokio::spawn(async move { + let _g = permit; + let req = RunRequest { + job_id: jobc.id.clone(), + prompt: jobc.prompt.clone(), + cwd: jobc.cwd.clone(), + resume_conversation_id: jobc.resume_conversation_id.clone(), + max_duration_seconds: jobc.max_duration_seconds.unwrap_or(900), + tags: jobc.tags.clone(), + }; + if let Err(e) = execute_run(req, dbc).await { + warn!("scheduler: job {} failed: {e:#}", jobc.id); + } + }); + } + } + } + } + + sleep(Duration::from_secs(poll_secs)).await; + } +} + +/// Returns true if `next_due` is non-negative and within `window` from `now`. +/// This isolates the proximity decision for unit testing and consistent behavior. +pub(crate) fn is_due_within( + next_due: DateTime, + now: DateTime, + window: std::time::Duration, +) -> bool { + let delta = next_due - now; + if delta.num_milliseconds() < 0 { + return false; + } + match delta.to_std() { + Ok(d) => d <= window, + Err(_) => false, + } +} + +async fn try_acquire_lock(db: &Db, key: &str, owner: &str, ttl_secs: u64) -> Result { + let q = r#" +LET now = DATE_NOW() +LET expiry = DATE_ADD(DATE_NOW(), @ttl_ms, 'millisecond') +LET doc = DOCUMENT(@@state, @key) +LET can_acquire = doc == null OR doc.expiresAt < DATE_ISO8601(now) OR doc.owner_id == @owner +IF can_acquire + UPSERT { _key: @key } + INSERT { _key: @key, owner_id: @owner, heartbeat: DATE_ISO8601(now), expiresAt: DATE_ISO8601(expiry) } + UPDATE { owner_id: @owner, heartbeat: DATE_ISO8601(now), expiresAt: DATE_ISO8601(expiry) } + IN @@state + RETURN { acquired: true } +ELSE + RETURN { acquired: false } +"#; + let ttl_ms = (ttl_secs as i64) * 1000; + let resp = db + .aql_public( + q, + serde_json::json!({ + "@state": db.state_collection(), + "key": key, + "owner": owner, + "ttl_ms": ttl_ms + }), + ) + .await?; + let acquired = resp + .pointer("/result/0/acquired") + .and_then(|v| v.as_bool()) + .unwrap_or(false); + Ok(acquired) +} + +async fn renew_lock(db: &Db, key: &str, owner: &str, ttl_secs: u64) -> Result<()> { + let q = r#" +LET now = DATE_NOW() +LET expiry = DATE_ADD(DATE_NOW(), @ttl_ms, 'millisecond') +UPDATE { _key: @key } WITH { owner_id: @owner, heartbeat: DATE_ISO8601(now), expiresAt: DATE_ISO8601(expiry) } IN @@state +"#; + let ttl_ms = (ttl_secs as i64) * 1000; + let _ = db + .aql_public( + q, + serde_json::json!({ + "@state": db.state_collection(), + "key": key, + "owner": owner, + "ttl_ms": ttl_ms + }), + ) + .await?; + Ok(()) +} + +#[cfg(test)] +mod tests { + use super::*; + use chrono::TimeZone; + + #[test] + fn proximity_detection_basic() { + let now = Utc.with_ymd_and_hms(2025, 1, 1, 12, 0, 0).unwrap(); + let within = Utc.with_ymd_and_hms(2025, 1, 1, 12, 0, 2).unwrap(); + let outside = Utc.with_ymd_and_hms(2025, 1, 1, 12, 0, 6).unwrap(); + assert!(is_due_within( + within, + now, + std::time::Duration::from_secs(5) + )); + assert!(!is_due_within( + outside, + now, + std::time::Duration::from_secs(5) + )); + } +} diff --git a/codex-rs/scheduler/src/db.rs b/codex-rs/scheduler/src/db.rs new file mode 100644 index 00000000000..84dc14e807d --- /dev/null +++ b/codex-rs/scheduler/src/db.rs @@ -0,0 +1,322 @@ +use crate::config::ArangoConfig; +use crate::config::SchedulerConfig; +use anyhow::bail; +use anyhow::Result; +use reqwest::Client; +use reqwest::ClientBuilder; +use serde::Deserialize; +use serde::Serialize; +use serde_json::json; +use serde_json::Value; + +#[derive(Clone)] +pub struct Db { + pub(crate) base_url: String, + pub(crate) database: String, + runs: String, + events: String, + notes: String, + state: String, + client: Client, + auth: (String, String), + // Tunables from SchedulerConfig + max_retries: usize, + backoff_base_ms: u64, + backoff_jitter: f32, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct RunDoc { + pub run_id: String, + pub job_id: String, + pub created_at: String, + pub started_at: Option, + pub finished_at: Option, + pub status: String, + pub conversation_id: Option, + pub submission_id: Option, + pub prompt: String, + pub model: Option, + pub sandbox: Option, + pub approval_policy: Option, + pub cwd: Option, + pub error_message: Option, + pub tags: Option>, +} + +#[derive(Debug, Clone, Deserialize)] +pub struct NotificationDoc { + pub _key: Option, + pub run_id: String, + pub job_id: String, + pub kind: String, + pub created_at: String, +} + +impl Db { + pub async fn new(cfg: &ArangoConfig) -> Result { + let client = ClientBuilder::new() + .use_rustls_tls() + .https_only(true) + .user_agent(concat!("codex-scheduler/", env!("CARGO_PKG_VERSION"))) + .build()?; + Ok(Self { + base_url: cfg.url.clone(), + database: cfg.database.clone(), + runs: cfg.runs_collection.clone(), + events: cfg.events_collection.clone(), + notes: cfg.notifications_collection.clone(), + state: cfg.state_collection.clone(), + client, + auth: (cfg.username.clone(), cfg.password.clone()), + max_retries: 5, + backoff_base_ms: 200, + backoff_jitter: 0.25, + }) + } + + /// Construct Db with TLS + retry/backoff from SchedulerConfig and ArangoConfig. + pub fn from_config(s: &SchedulerConfig, cfg: &ArangoConfig) -> Result { + let https_only = !(cfg.allow_insecure && !s.strict_tls); + let client = ClientBuilder::new() + .use_rustls_tls() + .https_only(https_only) + .user_agent(concat!("codex-scheduler/", env!("CARGO_PKG_VERSION"))) + .build()?; + Ok(Self { + base_url: cfg.url.clone(), + database: cfg.database.clone(), + runs: cfg.runs_collection.clone(), + events: cfg.events_collection.clone(), + notes: cfg.notifications_collection.clone(), + state: cfg.state_collection.clone(), + client, + auth: (cfg.username.clone(), cfg.password.clone()), + max_retries: s.max_write_retries, + backoff_base_ms: s.backoff_base_ms, + backoff_jitter: s.backoff_jitter, + }) + } + + pub fn state_collection(&self) -> &str { + &self.state + } + + fn col_url(&self, col: &str) -> String { + format!( + "{}/_db/{}/_api/document/{}", + self.base_url, self.database, col + ) + } + fn cursor_url(&self) -> String { + format!("{}/_db/{}/_api/cursor", self.base_url, self.database) + } + + async fn post_json(&self, url: &str, body: &Value) -> Result { + let res = self + .client + .post(url) + .basic_auth(self.auth.0.clone(), Some(self.auth.1.clone())) + .json(body) + .send() + .await?; + let status = res.status(); + let val: Value = res.json().await.unwrap_or(json!({"error":"invalid json"})); + if !status.is_success() { + bail!("arangodb POST {} failed: {} body={}", url, status, val); + } + Ok(val) + } + + /// Retry wrapper with jittered exponential backoff for POSTs that insert documents. + async fn post_json_with_retry(&self, url: &str, body: &Value) -> Result<()> { + use rand::rng; + use rand::Rng; + use tokio::time::sleep; + use tokio::time::Duration; + + let mut attempt = 0usize; + loop { + match self.post_json(url, body).await { + Ok(_) => return Ok(()), + Err(e) => { + tracing::warn!("Arango POST retryable error: {e}"); + } + } + if attempt >= self.max_retries { + break; + } + // backoff base * 2^attempt with +/- jitter + let base_ms = self.backoff_base_ms.saturating_mul(1u64 << attempt.min(8)); + let variance = (base_ms as f32 * self.backoff_jitter) as u64; + let jitter = { + let mut r = rng(); + let offset: i64 = r.random_range(-(variance as i64)..=(variance as i64)); + (base_ms as i64 + offset).max(50) as u64 + }; + sleep(Duration::from_millis(jitter)).await; + attempt += 1; + } + bail!("arangodb POST exhausted retries for {}", url) + } + + pub async fn aql_public(&self, query: &str, bind_vars: Value) -> Result { + let body = json!({"query": query, "bindVars": bind_vars}); + self.post_json(&self.cursor_url(), &body).await + } + + pub async fn ensure_collections_and_indexes(&self) -> Result<()> { + // Create collections if not exist (best-effort) + for col in [&self.runs, &self.events, &self.notes, &self.state] { + let url = format!("{}/_db/{}/_api/collection", self.base_url, self.database); + let body = json!({"name": col, "type": 2}); + let _ = self + .client + .post(&url) + .basic_auth(self.auth.0.clone(), Some(self.auth.1.clone())) + .json(&body) + .send() + .await; + } + + // Indexes (best-effort) + let _ = self + .aql_public( + "RETURN ENSURE_INDEX(@col, { type: 'persistent', fields: ['job_id'] })", + json!({"col": self.runs}), + ) + .await; + let _ = self + .aql_public( + "RETURN ENSURE_INDEX(@col, { type: 'persistent', fields: ['created_at'] })", + json!({"col": self.runs}), + ) + .await; + let _ = self + .aql_public( + "RETURN ENSURE_INDEX(@col, { type: 'persistent', fields: ['status'] })", + json!({"col": self.runs}), + ) + .await; + let _ = self.aql_public( + "RETURN ENSURE_INDEX(@col, { type: 'persistent', fields: ['run_id','seq'], unique: true })", + json!({"col": self.events}), + ).await; + let _ = self + .aql_public( + "RETURN ENSURE_INDEX(@col, { type: 'persistent', fields: ['created_at'] })", + json!({"col": self.notes}), + ) + .await; + Ok(()) + } + + pub async fn insert_run_started(&self, doc: &RunDoc) -> Result<()> { + let body = json!({ + "run_id": doc.run_id, "job_id": doc.job_id, "created_at": doc.created_at, + "started_at": doc.started_at, "status": doc.status, "prompt": doc.prompt, + "conversation_id": doc.conversation_id, "submission_id": doc.submission_id, + "model": doc.model, "sandbox": doc.sandbox, "approval_policy": doc.approval_policy, + "cwd": doc.cwd, "error_message": doc.error_message, "tags": doc.tags + }); + let _ = self.post_json(&self.col_url(&self.runs), &body).await?; + Ok(()) + } + + pub async fn update_run_finished( + &self, + run_id: &str, + status: &str, + err: Option<&str>, + ) -> Result<()> { + let q = format!( + "FOR r IN {} FILTER r.run_id == @run_id UPDATE r WITH {{ status: @status, finished_at: @finished_at, error_message: @error }} IN {}", + self.runs, self.runs + ); + let _ = self + .aql_public( + &q, + json!({ + "run_id": run_id, + "status": status, + "finished_at": now_iso(), + "error": err + }), + ) + .await?; + Ok(()) + } + + pub async fn insert_events_batch( + &self, + run_id: &str, + batch: &[(i64, String, Value)], + ) -> Result<()> { + if batch.is_empty() { + return Ok(()); + } + let docs: Vec = batch.iter().map(|(seq, typ, payload)| { + json!({"run_id": run_id, "seq": seq, "ts": now_iso(), "type": typ, "payload": payload}) + }).collect(); + let url = self.col_url(&self.events); + let body = Value::from(docs); + self.post_json_with_retry(&url, &body).await + } + + pub async fn notify( + &self, + run_id: &str, + job_id: &str, + kind: &str, + ttl_secs: u64, + ) -> Result<()> { + let created_at = now_iso(); + let expires_at = plus_seconds_iso(ttl_secs as i64); + let body = json!({"run_id": run_id, "job_id": job_id, "kind": kind, "created_at": created_at, "expiresAt": expires_at}); + let url = self.col_url(&self.notes); + self.post_json_with_retry(&url, &body).await + } + + pub async fn fetch_notifications_since(&self, since_iso: &str) -> Result> { + let q = format!( + "FOR n IN {} FILTER n.created_at > @since SORT n.created_at ASC LIMIT 1000 RETURN n", + self.notes + ); + let resp = self.aql_public(&q, json!({"since": since_iso})).await?; + let arr = resp + .get("result") + .and_then(|v| v.as_array()) + .cloned() + .unwrap_or_default(); + let mut out = Vec::with_capacity(arr.len()); + for v in arr { + out.push(serde_json::from_value(v)?); + } + Ok(out) + } + + pub async fn fetch_run(&self, run_id: &str) -> Result { + let q = format!( + "FOR r IN {} FILTER r.run_id == @run_id LIMIT 1 RETURN r", + self.runs + ); + let resp = self.aql_public(&q, json!({"run_id": run_id})).await?; + let arr = resp + .get("result") + .and_then(|v| v.as_array()) + .cloned() + .unwrap_or_default(); + if arr.is_empty() { + bail!("run not found"); + } + Ok(serde_json::from_value(arr[0].clone())?) + } +} + +fn now_iso() -> String { + chrono::Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Millis, true) +} +fn plus_seconds_iso(secs: i64) -> String { + (chrono::Utc::now() + chrono::Duration::seconds(secs)) + .to_rfc3339_opts(chrono::SecondsFormat::Millis, true) +} diff --git a/codex-rs/scheduler/src/lib.rs b/codex-rs/scheduler/src/lib.rs new file mode 100644 index 00000000000..395e186e1c8 --- /dev/null +++ b/codex-rs/scheduler/src/lib.rs @@ -0,0 +1,35 @@ +pub mod config; +pub mod cronloop; +pub mod db; +pub mod runner; + +use anyhow::Result; +use config::load_scheduler_config_from_toml; +use db::Db; +use tokio::task::JoinHandle; +use tracing::info; + +pub struct SchedulerHandles { + pub cron_handle: JoinHandle<()>, +} + +/// Start the scheduler if `[scheduler]` is enabled in `~/.codex/config.toml`. +/// Returns `Ok(None)` when disabled or config is missing. +pub async fn start_scheduler_if_configured() -> Result> { + let maybe_cfg = load_scheduler_config_from_toml(None).await?; + let Some((sched_cfg, arango_cfg)) = maybe_cfg else { + info!("scheduler: not configured/enabled; skipping start"); + return Ok(None); + }; + + let db = Db::from_config(&sched_cfg, &arango_cfg)?; + db.ensure_collections_and_indexes().await?; + + let cron_handle = tokio::spawn(async move { + if let Err(e) = cronloop::run(sched_cfg, db).await { + tracing::error!("scheduler: cron loop exited with error: {e:#}"); + } + }); + + Ok(Some(SchedulerHandles { cron_handle })) +} diff --git a/codex-rs/scheduler/src/runner.rs b/codex-rs/scheduler/src/runner.rs new file mode 100644 index 00000000000..ad79d83e2a0 --- /dev/null +++ b/codex-rs/scheduler/src/runner.rs @@ -0,0 +1,185 @@ +use crate::db::Db; +use crate::db::RunDoc; +use anyhow::Context; +use anyhow::Result; +use chrono::Utc; +use serde_json::Value; +use std::process::Stdio; +use tokio::io::AsyncBufReadExt; +use tokio::io::BufReader; +use tokio::process::Command; +use tokio::time::timeout; +use tokio::time::Duration; +use tracing::info; +use uuid::Uuid; + +#[derive(Debug, Clone)] +pub struct RunRequest { + pub job_id: String, + pub prompt: String, + pub cwd: Option, + pub resume_conversation_id: Option, + pub max_duration_seconds: u64, + pub tags: Option>, +} + +pub async fn execute_run(req: RunRequest, db: Db) -> Result<()> { + let run_id = Uuid::new_v4().to_string(); + let created_at = Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Millis, true); + + db.insert_run_started(&RunDoc { + run_id: run_id.clone(), + job_id: req.job_id.clone(), + created_at: created_at.clone(), + started_at: Some(created_at.clone()), + finished_at: None, + status: "running".into(), + conversation_id: req.resume_conversation_id.clone(), + submission_id: None, + prompt: req.prompt.clone(), + model: None, + sandbox: None, + approval_policy: None, + cwd: req.cwd.clone(), + error_message: None, + tags: req.tags.clone(), + }) + .await?; + + let _ = db.notify(&run_id, &req.job_id, "job_started", 120).await; + + let mut args: Vec = vec![ + "exec".into(), + "--experimental-json".into(), + req.prompt.clone(), + ]; + if let Some(thread) = &req.resume_conversation_id { + args = vec![ + "exec".into(), + "--experimental-json".into(), + "resume".into(), + thread.clone(), + req.prompt.clone(), + ]; + } + + let mut cmd = Command::new(which::which("codex").unwrap_or_else(|_| "codex".into())); + cmd.args(&args); + if let Some(cwd) = &req.cwd { + cmd.current_dir(cwd); + } + cmd.stdout(Stdio::piped()).stderr(Stdio::inherit()); + + let mut child = cmd.spawn().context("spawn codex exec")?; + let stdout = child.stdout.take().context("no stdout from codex exec")?; + let mut reader = BufReader::new(stdout).lines(); + + let run_future = async { + let mut seq: i64 = 0; + let mut batch: Vec<(i64, String, Value)> = Vec::with_capacity(64); + while let Some(line) = reader.next_line().await? { + let (typ, payload) = match serde_json::from_str::(&line) { + Ok(v) => ( + v.get("type") + .and_then(|t| t.as_str()) + .unwrap_or("event") + .to_string(), + v, + ), + Err(_) => ("raw_line".to_string(), serde_json::json!({"line": line})), + }; + batch.push((seq, typ, payload)); + seq += 1; + if batch.len() >= 50 { + db.insert_events_batch(&run_id, &batch).await.ok(); + batch.clear(); + } + } + if !batch.is_empty() { + db.insert_events_batch(&run_id, &batch).await.ok(); + } + Ok::<(), anyhow::Error>(()) + }; + + let status = match timeout(Duration::from_secs(req.max_duration_seconds), run_future).await { + Ok(Ok(())) => { + let exit = child.wait().await?; + if exit.success() { + "succeeded" + } else { + "failed" + } + } + Ok(Err(e)) => { + db.update_run_finished(&run_id, "failed", Some(&e.to_string())) + .await + .ok(); + "failed" + } + Err(_) => { + let _ = child.kill().await; + "cancelled" + } + }; + + db.update_run_finished(&run_id, status, None).await.ok(); + let _ = db + .notify(&run_id, &req.job_id, &format!("job_{}", status), 300) + .await; + info!( + "scheduler: job {} finished with status {}", + req.job_id, status + ); + Ok(()) +} + +/// Internal helper to run a prepared Command with a hard timeout. +/// Returns true if the process timed out and was killed, false if it exited on its own. +/// Exposed for test scaffolding of the timeout path. +#[allow(dead_code)] +pub(crate) async fn run_child_with_timeout( + mut cmd: Command, + timeout: Duration, +) -> std::io::Result { + let mut child = cmd.spawn()?; + let timed_out = match tokio::time::timeout(timeout, child.wait()).await { + Ok(Ok(_status)) => false, + Ok(Err(e)) => { + let _ = child.kill().await; + return Err(e); + } + Err(_) => { + let _ = child.kill().await; + let _ = child.wait().await; + true + } + }; + Ok(timed_out) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[cfg(unix)] + #[tokio::test] + async fn timeout_helper_kills_long_running_unix() { + let mut cmd = Command::new("sh"); + cmd.arg("-c").arg("sleep 2"); + let timed_out = run_child_with_timeout(cmd, Duration::from_millis(300)) + .await + .unwrap(); + assert!(timed_out, "expected the child to be killed after timeout"); + } + + #[cfg(windows)] + #[tokio::test] + async fn timeout_helper_kills_long_running_windows() { + let mut cmd = Command::new("powershell"); + cmd.args(&["-NoProfile", "-Command", "Start-Sleep -Seconds 2"]); + let timed_out = run_child_with_timeout(cmd, Duration::from_millis(300)) + .await + .unwrap(); + assert!(timed_out, "expected the child to be killed after timeout"); + } +} diff --git a/codex-rs/scheduler/tests/config_redaction.rs b/codex-rs/scheduler/tests/config_redaction.rs new file mode 100644 index 00000000000..e1b786cd3db --- /dev/null +++ b/codex-rs/scheduler/tests/config_redaction.rs @@ -0,0 +1,70 @@ +use codex_scheduler::config::ArangoConfig; +use codex_scheduler::config::SchedulerConfig; + +#[test] +fn arango_password_is_redacted_in_debug() { + let a = ArangoConfig { + url: "https://localhost:8529".into(), + database: "codex".into(), + username: "root".into(), + password: "super-secret".into(), + runs_collection: "runs".into(), + events_collection: "events".into(), + notifications_collection: "notifications".into(), + state_collection: "state".into(), + allow_insecure: false, + }; + let s = format!("{:?}", a); + assert!(s.contains("")); + assert!(!s.contains("super-secret")); +} + +#[test] +fn https_required_by_default() { + let src = r#" +[scheduler] +enabled = true + +[database.arango] +url = "http://localhost:8529" +database = "codex" +username = "root" +password = "pw" +runs_collection = "runs" +events_collection = "events" +notifications_collection = "notifications" +state_collection = "state" +"#; + let res = SchedulerConfig::from_toml(src); + assert!( + res.is_err(), + "http:// should be rejected unless allow_insecure=true" + ); +} + +#[test] +fn http_allowed_when_allow_insecure_true() { + let src = r#" +[scheduler] +enabled = true + +[database.arango] +url = "http://localhost:8529" +database = "codex" +username = "root" +password = "pw" +runs_collection = "runs" +events_collection = "events" +notifications_collection = "notifications" +state_collection = "state" +allow_insecure = true +"#; + let res = SchedulerConfig::from_toml(src); + if let Err(e) = &res { + eprintln!("parse error: {}", e); + } + assert!( + res.is_ok(), + "allow_insecure=true should permit http:// for local dev" + ); +} diff --git a/codex-rs/tui/src/resume_picker.rs b/codex-rs/tui/src/resume_picker.rs index d6297528cec..67fededb1da 100644 --- a/codex-rs/tui/src/resume_picker.rs +++ b/codex-rs/tui/src/resume_picker.rs @@ -19,6 +19,7 @@ use ratatui::layout::Rect; use ratatui::style::Stylize as _; use ratatui::text::Line; use ratatui::text::Span; +use std::env; use tokio::sync::mpsc; use tokio_stream::StreamExt; use tokio_stream::wrappers::UnboundedReceiverStream; @@ -30,6 +31,7 @@ use crate::tui::TuiEvent; use codex_protocol::models::ContentItem; use codex_protocol::models::ResponseItem; use codex_protocol::protocol::InputMessageKind; +use codex_protocol::protocol::SessionMetaLine; use codex_protocol::protocol::USER_MESSAGE_BEGIN; const PAGE_SIZE: usize = 25; @@ -64,6 +66,41 @@ enum BackgroundEvent { /// search and pagination. Shows the first user input as the preview, relative /// time (e.g., "5 seconds ago"), and the absolute path. pub async fn run_resume_picker(tui: &mut Tui, codex_home: &Path) -> Result { + // Plain, non-interactive verification mode: print rows and exit. + // Enables checking the '[project]' tag & preview without onboarding/TUI. + if env::var("CODEX_TUI_PLAIN").as_deref() == Ok("1") { + match RolloutRecorder::list_conversations(codex_home, PAGE_SIZE, None).await { + Ok(page) => { + let rows: Vec = page.items.iter().map(|it| head_to_row(it)).collect(); + let no_color = env::var("NO_COLOR").is_ok(); + let dumb = env::var("TERM").unwrap_or_default() == "dumb"; + let use_color = !no_color && !dumb; + for (i, r) in rows.iter().enumerate() { + let mark = if i == 0 { "> " } else { " " }; + let ts = + r.ts.as_ref() + .map(|dt| human_time_ago(dt.clone())) + .unwrap_or_else(|| "-".to_string()); + let tag = r.project.as_deref().unwrap_or(""); + // Sanitize preview to a single line, limited length similar to TUI + let mut pv = r.preview.replace('\n', " "); + if pv.len() > 80 { + pv.truncate(79); + pv.push('…'); + } + if use_color { + println!("{mark}{ts:<12} \x1b[36;1m[{tag}]\x1b[0m {pv}"); + } else { + println!("{mark}{ts:<12} [{tag}] {pv}"); + } + } + } + Err(e) => { + eprintln!("Failed to list conversations: {e}"); + } + } + return Ok(ResumeSelection::StartFresh); + } let alt = AltScreenGuard::enter(tui); let (bg_tx, bg_rx) = mpsc::unbounded_channel(); @@ -91,6 +128,12 @@ pub async fn run_resume_picker(tui: &mut Tui, codex_home: &Path) -> Result>, + project: Option, } impl PickerState { @@ -565,6 +609,7 @@ fn rows_from_items(items: Vec) -> Vec { fn head_to_row(item: &ConversationItem) -> Row { let mut ts: Option> = None; + let mut project: Option = None; if let Some(first) = item.head.first() && let Some(t) = first.get("timestamp").and_then(|v| v.as_str()) && let Ok(parsed) = chrono::DateTime::parse_from_rfc3339(t) @@ -572,6 +617,19 @@ fn head_to_row(item: &ConversationItem) -> Row { ts = Some(parsed.with_timezone(&Utc)); } + // Attempt to derive the project tag from the SessionMeta line (cwd basename). + for value in &item.head { + if let Ok(meta_line) = serde_json::from_value::(value.clone()) { + let cwd = meta_line.meta.cwd; + if let Some(name) = cwd.file_name().and_then(|s| s.to_str()) { + if !name.is_empty() { + project = Some(name.to_string()); + } + } + break; + } + } + let preview = preview_from_head(&item.head) .map(|s| s.trim().to_string()) .filter(|s| !s.is_empty()) @@ -581,6 +639,7 @@ fn head_to_row(item: &ConversationItem) -> Row { path: item.path.clone(), preview, ts, + project, } } @@ -696,10 +755,21 @@ fn render_list(frame: &mut crate::custom_terminal::Frame, area: Rect, state: &Pi .map(human_time_ago) .unwrap_or_else(|| "".to_string()) .dim(); - let max_cols = area.width.saturating_sub(6) as usize; + // Calculate remaining width for preview text after fixed columns. + let mut max_cols = area.width.saturating_sub(6) as usize; + if let Some(tag) = &row.project { + max_cols = max_cols.saturating_sub(tag.len() + 4); + } let preview = truncate_text(&row.preview, max_cols); - let line: Line = vec![marker, ts, " ".into(), preview.into()].into(); + // Build line: marker, time, optional [project], preview + let mut spans: Vec> = vec![marker, ts, " ".into()]; + if let Some(tag) = &row.project { + spans.push(format!("[{}]", tag).cyan().bold()); + spans.push(" ".into()); + } + spans.push(preview.into()); + let line: Line = spans.into(); let rect = Rect::new(area.x, y, area.width, 1); frame.render_widget_ref(line, rect); y = y.saturating_add(1); diff --git a/scripts/pr.sh b/scripts/pr.sh new file mode 100755 index 00000000000..f6d09635187 --- /dev/null +++ b/scripts/pr.sh @@ -0,0 +1,47 @@ +#!/usr/bin/env bash +set -euo pipefail + +usage() { + cat <<'USAGE' +Usage: scripts/pr.sh [args] +Commands: + prepare Format, fix, tests; push branch; create/update PR body + comment Post PR comment from a markdown file + body Update PR body from a markdown file + trigger Create empty commit and push to retrigger Codex Review +USAGE +} + +cmd=${1:-} +case "$cmd" in + prepare) + branch=${2:?branch}; body=${3:?body_md} + echo "Formatting & targeted fixes..."; (cd codex-rs && just fmt || true) + # Optional: add targeted crate fixes here if desired. + echo "Pushing branch $branch..." + if git show-ref --verify --quiet "refs/heads/$branch"; then + git checkout -q "$branch" + else + git checkout -q -b "$branch" + fi + git add -A && git commit -m "chore: prepare PR $branch" || true + git push --set-upstream origin "$branch" || true + echo "Creating/updating PR body..." + gh pr view --json number >/dev/null 2>&1 || gh pr create -F "$body" || true + gh pr edit -F "$body" || true + ;; + comment) + pr=${2:?pr}; md=${3:?md} + gh pr comment "$pr" -F "$md" + ;; + body) + pr=${2:?pr}; md=${3:?md} + gh pr edit "$pr" -F "$md" + ;; + trigger) + git commit --allow-empty -m "chore: re-run Codex Review" && git push --force-with-lease + ;; + *) + usage; exit 1;; +esac +