diff --git a/crates/iii-worker/src/cli/app.rs b/crates/iii-worker/src/cli/app.rs index b09ef49ec..072ce168c 100644 --- a/crates/iii-worker/src/cli/app.rs +++ b/crates/iii-worker/src/cli/app.rs @@ -94,37 +94,6 @@ pub enum Commands { port: u16, }, - /// Run a worker project in an isolated environment for development. - /// - /// Auto-detects the project type (package.json, Cargo.toml, pyproject.toml) - /// and runs it inside a VM (libkrun) connected - /// to the engine. - Dev { - /// Path to the worker project directory - #[arg(value_name = "PATH")] - path: String, - - /// Sandbox name (defaults to directory name) - #[arg(long)] - name: Option, - - /// Runtime to use (auto-detected if not set) - #[arg(long, value_parser = ["libkrun"])] - runtime: Option, - - /// Force rebuild: re-run setup and install scripts (libkrun only) - #[arg(long)] - rebuild: bool, - - /// Engine host address - #[arg(long, default_value = "localhost")] - address: String, - - /// Engine WebSocket port - #[arg(long, default_value_t = DEFAULT_PORT)] - port: u16, - }, - /// List all workers and their status List, diff --git a/crates/iii-worker/src/cli/config_file.rs b/crates/iii-worker/src/cli/config_file.rs index 3d30daeaa..85389bd13 100644 --- a/crates/iii-worker/src/cli/config_file.rs +++ b/crates/iii-worker/src/cli/config_file.rs @@ -11,6 +11,22 @@ use std::path::Path; const CONFIG_FILE: &str = "config.yaml"; +/// Canonical worker type resolved from config.yaml + filesystem. +#[derive(Debug)] +pub enum ResolvedWorkerType { + /// OCI worker — has `image:` in config.yaml + Oci { + image: String, + env: std::collections::HashMap, + }, + /// Local-path worker — has `worker_path:` in config.yaml + Local { worker_path: String }, + /// Binary worker — executable at ~/.iii/workers/{name} + Binary { binary_path: std::path::PathBuf }, + /// Config-only / builtin worker — no image, path, or binary + Config, +} + // ────────────────────────────────────────────────────────────────────────────── // Private helpers (operate on string content, making them easily testable) // ────────────────────────────────────────────────────────────────────────────── @@ -199,6 +215,66 @@ fn extract_worker_config(content: &str, name: &str) -> Option { Some(stripped.join("\n")) } +/// Extract the `worker_path:` value for a named worker from file content. +fn extract_worker_path(content: &str, name: &str) -> Option { + let target = format!("- name: {}", name); + let lines: Vec<&str> = content.lines().collect(); + let mut i = 0; + + // Find the entry + while i < lines.len() { + if lines[i].trim() == target { + i += 1; + break; + } + i += 1; + } + + // Look for `worker_path:` in the entry's indented lines + while i < lines.len() { + let trimmed = lines[i].trim(); + if trimmed.starts_with("- name:") || (!lines[i].starts_with(' ') && !lines[i].is_empty()) { + break; // hit next entry or top-level key + } + if let Some(rest) = trimmed.strip_prefix("worker_path:") { + return Some(rest.trim().to_string()); + } + i += 1; + } + + None +} + +/// Extract the `image:` value for a named worker from file content. +fn extract_image(content: &str, name: &str) -> Option { + let target = format!("- name: {}", name); + let lines: Vec<&str> = content.lines().collect(); + let mut i = 0; + + // Find the entry + while i < lines.len() { + if lines[i].trim() == target { + i += 1; + break; + } + i += 1; + } + + // Look for `image:` in the entry's indented lines + while i < lines.len() { + let trimmed = lines[i].trim(); + if trimmed.starts_with("- name:") || (!lines[i].starts_with(' ') && !lines[i].is_empty()) { + break; // hit next entry or top-level key + } + if let Some(rest) = trimmed.strip_prefix("image:") { + return Some(rest.trim().to_string()); + } + i += 1; + } + + None +} + /// Deep-merge two YAML config strings. `base` provides defaults, `overrides` /// takes precedence. Both are parsed as serde_json::Value and merged. fn merge_yaml_configs(base: &str, overrides: &str) -> String { @@ -263,6 +339,67 @@ pub fn append_worker_with_image( append_worker_impl(name, Some(image), config_yaml) } +/// Same as [`append_worker`] but writes a `worker_path: {worker_path}` field +/// instead of `image:`. Used for local directory-based workers. +pub fn append_worker_with_path( + name: &str, + worker_path: &str, + config_yaml: Option<&str>, +) -> Result<(), String> { + super::registry::validate_worker_name(name)?; + let path = Path::new(CONFIG_FILE); + + let mut content = if path.exists() { + std::fs::read_to_string(path) + .map_err(|e| format!("failed to read {}: {}", CONFIG_FILE, e))? + } else { + String::new() + }; + + if worker_exists_in(&content, name) { + let existing_config = extract_worker_config(&content, name); + content = remove_worker_from(&content, name); + + if let Some(existing) = existing_config { + if let Some(incoming) = config_yaml { + let merged = merge_yaml_configs(incoming, &existing); + return append_to_content_with_fields( + &mut content, + path, + name, + None, + Some(worker_path), + Some(&merged), + ); + } + return append_to_content_with_fields( + &mut content, + path, + name, + None, + Some(worker_path), + Some(&existing), + ); + } + } + + append_to_content_with_fields( + &mut content, + path, + name, + None, + Some(worker_path), + config_yaml, + ) +} + +/// Returns the `worker_path:` value for a named worker in `config.yaml`, if present. +pub fn get_worker_path(name: &str) -> Option { + let path = Path::new(CONFIG_FILE); + let content = std::fs::read_to_string(path).ok()?; + extract_worker_path(&content, name) +} + fn append_worker_impl( name: &str, image: Option<&str>, @@ -308,6 +445,19 @@ fn append_to_content( name: &str, image: Option<&str>, config_yaml: Option<&str>, +) -> Result<(), String> { + append_to_content_with_fields(content, path, name, image, None, config_yaml) +} + +/// Low-level: appends a worker entry with optional `image` and `worker_path` +/// fields to `content` and writes to `path`. +fn append_to_content_with_fields( + content: &mut String, + path: &Path, + name: &str, + image: Option<&str>, + worker_path: Option<&str>, + config_yaml: Option<&str>, ) -> Result<(), String> { // Ensure there is a `workers:` key. if !content.contains("workers:") { @@ -322,6 +472,9 @@ fn append_to_content( if let Some(img) = image { entry.push_str(&format!(" image: {}\n", img)); } + if let Some(wp) = worker_path { + entry.push_str(&format!(" worker_path: {}\n", wp)); + } if let Some(cfg) = config_yaml { let cfg = cfg.trim_end_matches('\n'); if !cfg.is_empty() { @@ -348,6 +501,8 @@ fn append_to_content( std::fs::write(path, &new_content) .map_err(|e| format!("failed to write {}: {}", CONFIG_FILE, e))?; + *content = new_content; + Ok(()) } @@ -355,33 +510,58 @@ fn append_to_content( pub fn get_worker_image(name: &str) -> Option { let path = Path::new(CONFIG_FILE); let content = std::fs::read_to_string(path).ok()?; + extract_image(&content, name) +} - let target = format!("- name: {}", name); - let lines: Vec<&str> = content.lines().collect(); - let mut i = 0; +/// Resolve the worker type from config.yaml content (no filesystem access for binary check). +/// Used by tests and by `resolve_worker_type`. +fn resolve_worker_type_from_content(content: &str, name: &str) -> ResolvedWorkerType { + // Check worker_path first (local), then image (OCI). + // Consistent ordering: local > OCI > binary > config. + if let Some(worker_path) = extract_worker_path(content, name) { + return ResolvedWorkerType::Local { worker_path }; + } - // Find the entry - while i < lines.len() { - if lines[i].trim() == target { - i += 1; - break; + if let Some(image) = extract_image(content, name) { + let config_str = extract_worker_config(content, name); + let mut env = std::collections::HashMap::new(); + if let Some(cfg) = config_str { + if let Ok(val) = serde_yaml::from_str::(&cfg) { + flatten_value_to_env(&val, "", &mut env); + } } - i += 1; + return ResolvedWorkerType::Oci { image, env }; } - // Look for `image:` in the entry's indented lines - while i < lines.len() { - let trimmed = lines[i].trim(); - if trimmed.starts_with("- name:") || (!lines[i].starts_with(' ') && !lines[i].is_empty()) { - break; // hit next entry or top-level key - } - if let Some(rest) = trimmed.strip_prefix("image:") { - return Some(rest.trim().to_string()); - } - i += 1; + ResolvedWorkerType::Config +} + +/// Resolve the canonical worker type for a named worker. +/// Reads config.yaml once and checks the filesystem for binary workers. +/// Priority: local (worker_path) > OCI (image) > binary (~/.iii/workers/{name}) > config. +pub fn resolve_worker_type(name: &str) -> ResolvedWorkerType { + let path = std::path::Path::new(CONFIG_FILE); + let content = match std::fs::read_to_string(path) { + Ok(c) => c, + Err(_) => return check_binary_fallback(name), + }; + + match resolve_worker_type_from_content(&content, name) { + ResolvedWorkerType::Config => check_binary_fallback(name), + other => other, } +} - None +fn check_binary_fallback(name: &str) -> ResolvedWorkerType { + let binary_path = dirs::home_dir() + .unwrap_or_default() + .join(".iii/workers") + .join(name); + if binary_path.exists() { + ResolvedWorkerType::Binary { binary_path } + } else { + ResolvedWorkerType::Config + } } /// Returns the `config:` block for a named worker as a flat `HashMap`. @@ -776,6 +956,57 @@ mod tests { assert_eq!(end, content.len()); } + #[test] + fn test_extract_image_found() { + let content = "workers:\n - name: pdfkit\n image: ghcr.io/iii-hq/pdfkit:1.0\n"; + let image = extract_image(content, "pdfkit"); + assert_eq!(image, Some("ghcr.io/iii-hq/pdfkit:1.0".to_string())); + } + + #[test] + fn test_extract_image_not_found() { + let content = "workers:\n - name: local-w\n worker_path: /tmp/w\n"; + let image = extract_image(content, "local-w"); + assert!(image.is_none()); + } + + #[test] + fn test_resolve_worker_type_local() { + let content = "workers:\n - name: my-local\n worker_path: /home/user/proj\n"; + let resolved = resolve_worker_type_from_content(content, "my-local"); + assert!( + matches!(resolved, ResolvedWorkerType::Local { worker_path } if worker_path == "/home/user/proj") + ); + } + + #[test] + fn test_resolve_worker_type_oci() { + let content = "workers:\n - name: pdfkit\n image: ghcr.io/iii-hq/pdfkit:1.0\n config:\n timeout: 30\n"; + let resolved = resolve_worker_type_from_content(content, "pdfkit"); + match resolved { + ResolvedWorkerType::Oci { image, env } => { + assert_eq!(image, "ghcr.io/iii-hq/pdfkit:1.0"); + assert_eq!(env.get("TIMEOUT"), Some(&"30".to_string())); + } + other => panic!("expected Oci, got {:?}", other), + } + } + + #[test] + fn test_resolve_worker_type_config_fallback() { + let content = "workers:\n - name: builtin\n"; + let resolved = resolve_worker_type_from_content(content, "builtin"); + assert!(matches!(resolved, ResolvedWorkerType::Config)); + } + + #[test] + fn test_resolve_worker_type_local_takes_precedence_over_image() { + let content = + "workers:\n - name: weird\n worker_path: /tmp/proj\n image: ghcr.io/org/w:1\n"; + let resolved = resolve_worker_type_from_content(content, "weird"); + assert!(matches!(resolved, ResolvedWorkerType::Local { .. })); + } + #[test] fn test_remove_worker_from_first_entry() { let content = "workers:\n - name: first\n config:\n x: 1\n - name: second\n"; @@ -792,6 +1023,39 @@ mod tests { assert!(result.contains("workers:")); } + #[test] + fn test_append_worker_with_path_field() { + let mut content = "workers:\n".to_string(); + let path = std::path::Path::new("/tmp/test-config.yaml"); + let _ = std::fs::write(path, &content); + append_to_content_with_fields( + &mut content, + path, + "local-worker", + None, + Some("/absolute/path/to/worker"), + Some("timeout: 30"), + ) + .unwrap(); + assert!(content.contains("- name: local-worker")); + assert!(content.contains("worker_path: /absolute/path/to/worker")); + assert!(content.contains("timeout: 30")); + } + + #[test] + fn test_get_worker_path_found() { + let content = "workers:\n - name: my-worker\n worker_path: /home/user/my-worker\n config:\n timeout: 30\n"; + let path = extract_worker_path(content, "my-worker"); + assert_eq!(path, Some("/home/user/my-worker".to_string())); + } + + #[test] + fn test_get_worker_path_not_found() { + let content = "workers:\n - name: oci-worker\n image: ghcr.io/org/worker:tag\n"; + let path = extract_worker_path(content, "oci-worker"); + assert!(path.is_none()); + } + #[test] fn test_get_worker_start_info_with_image_and_config() { let content = "workers:\n - name: pdfkit\n image: ghcr.io/iii-hq/pdfkit:1.0\n config:\n timeout: 30\n"; diff --git a/crates/iii-worker/src/cli/dev.rs b/crates/iii-worker/src/cli/dev.rs deleted file mode 100644 index 23dd934e8..000000000 --- a/crates/iii-worker/src/cli/dev.rs +++ /dev/null @@ -1,577 +0,0 @@ -// Copyright Motia LLC and/or licensed to Motia LLC under one or more -// contributor license agreements. Licensed under the Elastic License 2.0; -// you may not use this file except in compliance with the Elastic License 2.0. -// This software is patent protected. We welcome discussions - reach out at support@motia.dev -// See LICENSE and PATENTS files for details. - -//! Dev workflow orchestration for `iii worker dev`. - -use colored::Colorize; -use std::collections::HashMap; - -use super::project::{ProjectInfo, WORKER_MANIFEST, load_project_info}; -use super::rootfs::clone_rootfs; - -async fn detect_lan_ip() -> Option { - use tokio::process::Command; - let route = Command::new("route") - .args(["-n", "get", "default"]) - .output() - .await - .ok()?; - let route_out = String::from_utf8_lossy(&route.stdout); - let iface = route_out - .lines() - .find(|l| l.contains("interface:"))? - .split(':') - .nth(1)? - .trim() - .to_string(); - - let ifconfig = Command::new("ifconfig").arg(&iface).output().await.ok()?; - let ifconfig_out = String::from_utf8_lossy(&ifconfig.stdout); - let ip = ifconfig_out - .lines() - .find(|l| l.contains("inet ") && !l.contains("127.0.0.1"))? - .split_whitespace() - .nth(1)? - .to_string(); - - Some(ip) -} - -pub fn engine_url_for_runtime( - _runtime: &str, - _address: &str, - port: u16, - _lan_ip: &Option, -) -> String { - format!("ws://localhost:{}", port) -} - -/// Ensure the terminal is in cooked mode with proper NL→CRNL translation. -#[cfg(unix)] -pub fn restore_terminal_cooked_mode() { - let stderr = std::io::stderr(); - if let Ok(mut termios) = nix::sys::termios::tcgetattr(&stderr) { - termios - .output_flags - .insert(nix::sys::termios::OutputFlags::OPOST); - termios - .output_flags - .insert(nix::sys::termios::OutputFlags::ONLCR); - let _ = nix::sys::termios::tcsetattr(&stderr, nix::sys::termios::SetArg::TCSANOW, &termios); - } -} - -pub async fn handle_worker_dev( - path: &str, - name: Option<&str>, - runtime: Option<&str>, - rebuild: bool, - address: &str, - port: u16, -) -> i32 { - #[cfg(unix)] - restore_terminal_cooked_mode(); - - let project_path = match std::fs::canonicalize(path) { - Ok(p) => p, - Err(e) => { - eprintln!("{} Invalid path '{}': {}", "error:".red(), path, e); - return 1; - } - }; - - if let Err(e) = super::firmware::download::ensure_libkrunfw().await { - tracing::warn!(error = %e, "failed to ensure libkrunfw"); - } - - let selected_runtime = match detect_dev_runtime(runtime).await { - Some(rt) => rt, - None => { - eprintln!( - "{} No dev runtime available.\n \ - Rebuild with --features embed-libkrunfw or place libkrunfw in ~/.iii/lib/", - "error:".red() - ); - return 1; - } - }; - - let project = match load_project_info(&project_path) { - Some(p) => p, - None => { - eprintln!( - "{} Could not detect project type in '{}'. Add iii.worker.yaml or use package.json/Cargo.toml/pyproject.toml.", - "error:".red(), - project_path.display() - ); - return 1; - } - }; - - let has_manifest = project_path.join(WORKER_MANIFEST).exists(); - - let dir_name = project_path - .file_name() - .and_then(|n| n.to_str()) - .unwrap_or("worker"); - let sb_name = name - .map(|n| n.to_string()) - .unwrap_or_else(|| format!("iii-dev-{}", dir_name)); - let project_str = project_path.to_string_lossy(); - - let lan_ip = detect_lan_ip().await; - let engine_url = engine_url_for_runtime(&selected_runtime, address, port, &lan_ip); - - tracing::debug!(runtime = %selected_runtime, "selected dev runtime"); - - eprintln!(); - if has_manifest { - eprintln!( - " {} loaded from {}", - "Config".cyan().bold(), - WORKER_MANIFEST.bold() - ); - } - let lang_suffix = project - .language - .as_deref() - .map(|l| format!(" ({})", l.dimmed())) - .unwrap_or_default(); - eprintln!( - " {} {}{}", - "Project".cyan().bold(), - project.name.bold(), - lang_suffix - ); - eprintln!(" {} {}", "Sandbox".cyan().bold(), sb_name.bold()); - eprintln!(" {} {}", "Engine".cyan().bold(), engine_url.bold()); - eprintln!(); - - run_dev_worker( - &selected_runtime, - &sb_name, - &project_str, - &project, - &engine_url, - rebuild, - ) - .await -} - -async fn detect_dev_runtime(explicit: Option<&str>) -> Option { - if let Some(rt) = explicit { - return Some(rt.to_string()); - } - - { - if super::worker_manager::libkrun::libkrun_available() { - return Some("libkrun".to_string()); - } - } - - None -} - -async fn run_dev_worker( - runtime: &str, - sb_name: &str, - project_str: &str, - project: &ProjectInfo, - engine_url: &str, - rebuild: bool, -) -> i32 { - match runtime { - "libkrun" => { - let language = project.language.as_deref().unwrap_or("typescript"); - let mut env = build_dev_env(engine_url, &project.env); - - let base_rootfs = match super::worker_manager::oci::prepare_rootfs(language).await { - Ok(p) => p, - Err(e) => { - eprintln!("{} {}", "error:".red(), e); - return 1; - } - }; - - let oci_env = super::worker_manager::oci::read_oci_env(&base_rootfs); - for (key, value) in oci_env { - env.entry(key).or_insert(value); - } - - let dev_dir = match dirs::home_dir() { - Some(h) => h.join(".iii").join("dev").join(sb_name), - None => { - eprintln!("{} Cannot determine home directory", "error:".red()); - return 1; - } - }; - let prepared_marker = dev_dir.join("var").join(".iii-prepared"); - - if rebuild && dev_dir.exists() { - eprintln!(" Rebuilding: clearing cached sandbox..."); - let _ = std::fs::remove_dir_all(&dev_dir); - } - - if !dev_dir.exists() { - eprintln!(" Preparing sandbox..."); - if let Err(e) = clone_rootfs(&base_rootfs, &dev_dir) { - eprintln!("{} Failed to create project rootfs: {}", "error:".red(), e); - return 1; - } - } - - let is_prepared = prepared_marker.exists(); - if is_prepared { - eprintln!( - " {} Using cached deps {}", - "✓".green(), - "(use --rebuild to reinstall)".dimmed() - ); - } - - let script = build_libkrun_dev_script(project, is_prepared); - - let script_path = dev_dir.join("tmp").join("iii-dev-run.sh"); - if let Err(e) = std::fs::write(&script_path, &script) { - eprintln!("{} Failed to write dev script: {}", "error:".red(), e); - return 1; - } - #[cfg(unix)] - { - use std::os::unix::fs::PermissionsExt; - let _ = - std::fs::set_permissions(&script_path, std::fs::Permissions::from_mode(0o755)); - } - - let workspace = dev_dir.join("workspace"); - std::fs::create_dir_all(&workspace).ok(); - if let Err(e) = copy_dir_contents(std::path::Path::new(project_str), &workspace) { - eprintln!("{} Failed to copy project to rootfs: {}", "error:".red(), e); - return 1; - } - - let init_path = match super::firmware::download::ensure_init_binary().await { - Ok(p) => p, - Err(e) => { - eprintln!("{} Failed to provision iii-init: {}", "error:".red(), e); - return 1; - } - }; - - if !iii_filesystem::init::has_init() { - let dest = dev_dir.join("init.krun"); - if let Err(e) = std::fs::copy(&init_path, &dest) { - eprintln!( - "{} Failed to copy iii-init to rootfs: {}", - "error:".red(), - e - ); - return 1; - } - #[cfg(unix)] - { - use std::os::unix::fs::PermissionsExt; - let _ = std::fs::set_permissions(&dest, std::fs::Permissions::from_mode(0o755)); - } - } - - let exec_path = "/bin/sh"; - let args = vec![ - "-c".to_string(), - "cd /workspace && exec bash /tmp/iii-dev-run.sh".to_string(), - ]; - let manifest_path = std::path::Path::new(project_str).join(WORKER_MANIFEST); - let (vcpus, ram) = parse_manifest_resources(&manifest_path); - - super::worker_manager::libkrun::run_dev( - language, - project_str, - exec_path, - &args, - env, - vcpus, - ram, - dev_dir, - ) - .await - } - _ => { - eprintln!("{} Unknown runtime: {}", "error:".red(), runtime); - 1 - } - } -} - -pub fn parse_manifest_resources(manifest_path: &std::path::Path) -> (u32, u32) { - let default = (2, 2048); - let content = match std::fs::read_to_string(manifest_path) { - Ok(c) => c, - Err(_) => return default, - }; - let yaml: serde_yml::Value = match serde_yml::from_str(&content) { - Ok(v) => v, - Err(_) => return default, - }; - let cpus = yaml - .get("resources") - .and_then(|r| r.get("cpus")) - .and_then(|v| v.as_u64()) - .unwrap_or(2) as u32; - let memory = yaml - .get("resources") - .and_then(|r| r.get("memory")) - .and_then(|v| v.as_u64()) - .unwrap_or(2048) as u32; - (cpus, memory) -} - -pub fn copy_dir_contents(src: &std::path::Path, dst: &std::path::Path) -> Result<(), String> { - let skip = [ - "node_modules", - ".git", - "target", - "__pycache__", - ".venv", - "dist", - ]; - for entry in - std::fs::read_dir(src).map_err(|e| format!("Failed to read {}: {}", src.display(), e))? - { - let entry = entry.map_err(|e| e.to_string())?; - let name = entry.file_name(); - let name_str = name.to_string_lossy(); - if skip.iter().any(|s| *s == name_str.as_ref()) { - continue; - } - let src_path = entry.path(); - let dst_path = dst.join(&name); - if src_path.is_dir() { - std::fs::create_dir_all(&dst_path).map_err(|e| e.to_string())?; - copy_dir_contents(&src_path, &dst_path)?; - } else { - std::fs::copy(&src_path, &dst_path).map_err(|e| e.to_string())?; - } - } - Ok(()) -} - -pub fn build_libkrun_dev_script(project: &ProjectInfo, prepared: bool) -> String { - let env_exports = build_env_exports(&project.env); - let mut parts: Vec = Vec::new(); - - parts.push("export PATH=/usr/local/bin:/usr/bin:/bin:$PATH".to_string()); - parts.push("export LANG=${LANG:-C.UTF-8}".to_string()); - parts.push("echo $$ > /sys/fs/cgroup/worker/cgroup.procs 2>/dev/null || true".to_string()); - - if !prepared { - if !project.setup_cmd.is_empty() { - parts.push(project.setup_cmd.clone()); - } - if !project.install_cmd.is_empty() { - parts.push(project.install_cmd.clone()); - } - parts.push("mkdir -p /var && touch /var/.iii-prepared".to_string()); - } - - parts.push(format!("{} && {}", env_exports, project.run_cmd)); - parts.join("\n") -} - -pub fn build_dev_env( - engine_url: &str, - project_env: &HashMap, -) -> HashMap { - let mut env = HashMap::new(); - env.insert("III_ENGINE_URL".to_string(), engine_url.to_string()); - env.insert("III_URL".to_string(), engine_url.to_string()); - for (key, value) in project_env { - if key != "III_ENGINE_URL" && key != "III_URL" { - env.insert(key.clone(), value.clone()); - } - } - env -} - -pub fn build_env_exports(env: &HashMap) -> String { - let mut parts: Vec = Vec::new(); - for (k, v) in env { - if k == "III_ENGINE_URL" || k == "III_URL" { - continue; - } - if !k.bytes().all(|b| b.is_ascii_alphanumeric() || b == b'_') || k.is_empty() { - continue; - } - parts.push(format!("export {}='{}'", k, shell_escape(v))); - } - if parts.is_empty() { - "true".to_string() - } else { - parts.join(" && ") - } -} - -pub fn shell_escape(s: &str) -> String { - s.replace('\'', "'\\''") -} - -#[cfg(test)] -mod tests { - use super::*; - - fn managed_engine_url(bind_addr: &str) -> String { - let (_host, port) = match bind_addr.rsplit_once(':') { - Some((h, p)) => (h, p), - None => (bind_addr, "49134"), - }; - format!("ws://localhost:{}", port) - } - - #[test] - fn managed_engine_url_uses_localhost() { - let url = managed_engine_url("0.0.0.0:49134"); - assert_eq!(url, "ws://localhost:49134"); - } - - #[test] - fn build_env_exports_excludes_engine_urls() { - let mut env = HashMap::new(); - env.insert( - "III_ENGINE_URL".to_string(), - "ws://localhost:49134".to_string(), - ); - env.insert("III_URL".to_string(), "ws://localhost:49134".to_string()); - env.insert("CUSTOM_VAR".to_string(), "custom-val".to_string()); - - let exports = build_env_exports(&env); - assert!(!exports.contains("III_ENGINE_URL")); - assert!(!exports.contains("III_URL")); - assert!(exports.contains("CUSTOM_VAR='custom-val'")); - } - - #[test] - fn build_env_exports_empty_env() { - let env = HashMap::new(); - let exports = build_env_exports(&env); - assert_eq!(exports, "true"); - } - - #[test] - fn engine_url_for_runtime_libkrun_uses_localhost() { - let url = engine_url_for_runtime("libkrun", "0.0.0.0", 49134, &None); - assert_eq!(url, "ws://localhost:49134"); - } - - #[test] - fn build_libkrun_dev_script_first_run() { - let project = ProjectInfo { - name: "test".to_string(), - language: Some("typescript".to_string()), - setup_cmd: "apt-get install nodejs".to_string(), - install_cmd: "npm install".to_string(), - run_cmd: "node server.js".to_string(), - env: HashMap::new(), - }; - let script = build_libkrun_dev_script(&project, false); - assert!(script.contains("apt-get install nodejs")); - assert!(script.contains("npm install")); - assert!(script.contains("node server.js")); - assert!(script.contains(".iii-prepared")); - } - - #[test] - fn build_libkrun_dev_script_prepared() { - let project = ProjectInfo { - name: "test".to_string(), - language: Some("typescript".to_string()), - setup_cmd: "apt-get install nodejs".to_string(), - install_cmd: "npm install".to_string(), - run_cmd: "node server.js".to_string(), - env: HashMap::new(), - }; - let script = build_libkrun_dev_script(&project, true); - assert!(!script.contains("apt-get install nodejs")); - assert!(!script.contains("npm install")); - assert!(script.contains("node server.js")); - } - - #[test] - fn build_dev_env_sets_engine_urls() { - let env = build_dev_env("ws://localhost:49134", &HashMap::new()); - assert_eq!(env.get("III_ENGINE_URL").unwrap(), "ws://localhost:49134"); - assert_eq!(env.get("III_URL").unwrap(), "ws://localhost:49134"); - } - - #[test] - fn build_dev_env_preserves_custom_env() { - let mut project_env = HashMap::new(); - project_env.insert("CUSTOM".to_string(), "value".to_string()); - let env = build_dev_env("ws://localhost:49134", &project_env); - assert_eq!(env.get("CUSTOM").unwrap(), "value"); - assert_eq!(env.get("III_ENGINE_URL").unwrap(), "ws://localhost:49134"); - assert_eq!(env.get("III_URL").unwrap(), "ws://localhost:49134"); - } - - #[test] - fn build_dev_env_does_not_override_engine_urls() { - let mut project_env = HashMap::new(); - project_env.insert("III_URL".to_string(), "custom".to_string()); - let env = build_dev_env("ws://localhost:49134", &project_env); - assert_eq!(env.get("III_URL").unwrap(), "ws://localhost:49134"); - } - - #[test] - fn parse_manifest_resources_defaults() { - let dir = tempfile::tempdir().unwrap(); - let nonexistent = dir.path().join("nonexistent.yaml"); - let (cpus, memory) = parse_manifest_resources(&nonexistent); - assert_eq!(cpus, 2); - assert_eq!(memory, 2048); - } - - #[test] - fn parse_manifest_resources_custom() { - let dir = tempfile::tempdir().unwrap(); - let manifest_path = dir.path().join("iii.worker.yaml"); - let yaml = r#" -name: resource-test -resources: - cpus: 4 - memory: 4096 -"#; - std::fs::write(&manifest_path, yaml).unwrap(); - let (cpus, memory) = parse_manifest_resources(&manifest_path); - assert_eq!(cpus, 4); - assert_eq!(memory, 4096); - } - - #[test] - fn shell_escape_single_quote() { - let result = shell_escape("it's"); - assert_eq!(result, "it'\\''s"); - } - - #[test] - fn copy_dir_contents_skips_ignored_dirs() { - let src = tempfile::tempdir().unwrap(); - let dst = tempfile::tempdir().unwrap(); - - std::fs::create_dir_all(src.path().join("src")).unwrap(); - std::fs::write(src.path().join("src/main.rs"), "fn main() {}").unwrap(); - std::fs::create_dir_all(src.path().join("node_modules/pkg")).unwrap(); - std::fs::write(src.path().join("node_modules/pkg/index.js"), "").unwrap(); - std::fs::create_dir_all(src.path().join(".git")).unwrap(); - std::fs::write(src.path().join(".git/config"), "").unwrap(); - std::fs::create_dir_all(src.path().join("target/debug")).unwrap(); - std::fs::write(src.path().join("target/debug/bin"), "").unwrap(); - - copy_dir_contents(src.path(), dst.path()).unwrap(); - - assert!(dst.path().join("src/main.rs").exists()); - assert!(!dst.path().join("node_modules").exists()); - assert!(!dst.path().join(".git").exists()); - assert!(!dst.path().join("target").exists()); - } -} diff --git a/crates/iii-worker/src/cli/local_worker.rs b/crates/iii-worker/src/cli/local_worker.rs new file mode 100644 index 000000000..86a26519b --- /dev/null +++ b/crates/iii-worker/src/cli/local_worker.rs @@ -0,0 +1,811 @@ +// Copyright Motia LLC and/or licensed to Motia LLC under one or more +// contributor license agreements. Licensed under the Elastic License 2.0; +// you may not use this file except in compliance with the Elastic License 2.0. +// This software is patent protected. We welcome discussions - reach out at support@motia.dev +// See LICENSE and PATENTS files for details. + +//! Local-path worker helpers: extracted shared functions from `dev.rs` plus +//! `handle_local_add` and `start_local_worker` for directory-based workers. + +use colored::Colorize; +use std::collections::HashMap; +use std::path::Path; + +use super::project::{ProjectInfo, WORKER_MANIFEST, load_project_info}; +use super::rootfs::clone_rootfs; + +// ────────────────────────────────────────────────────────────────────────────── +// Shared helpers (extracted from dev.rs) +// ────────────────────────────────────────────────────────────────────────────── + +pub async fn detect_lan_ip() -> Option { + use tokio::process::Command; + let route = Command::new("route") + .args(["-n", "get", "default"]) + .output() + .await + .ok()?; + let route_out = String::from_utf8_lossy(&route.stdout); + let iface = route_out + .lines() + .find(|l| l.contains("interface:"))? + .split(':') + .nth(1)? + .trim() + .to_string(); + + let ifconfig = Command::new("ifconfig").arg(&iface).output().await.ok()?; + let ifconfig_out = String::from_utf8_lossy(&ifconfig.stdout); + let ip = ifconfig_out + .lines() + .find(|l| l.contains("inet ") && !l.contains("127.0.0.1"))? + .split_whitespace() + .nth(1)? + .to_string(); + + Some(ip) +} + +pub fn engine_url_for_runtime( + _runtime: &str, + _address: &str, + port: u16, + _lan_ip: &Option, +) -> String { + format!("ws://localhost:{}", port) +} + +/// Ensure the terminal is in cooked (canonical) mode with proper input and +/// output processing. Restores both output flags (NL→CRNL) and input flags +/// (canonical buffering, echo, CR→NL translation) so that interactive prompts +/// and line-oriented I/O work correctly after a raw-mode session (e.g. VM boot). +#[cfg(unix)] +pub fn restore_terminal_cooked_mode() { + let stderr = std::io::stderr(); + if let Ok(mut termios) = nix::sys::termios::tcgetattr(&stderr) { + // Output: enable post-processing and NL→CRNL + termios + .output_flags + .insert(nix::sys::termios::OutputFlags::OPOST); + termios + .output_flags + .insert(nix::sys::termios::OutputFlags::ONLCR); + // Input: canonical mode, echo, CR→NL translation + termios + .local_flags + .insert(nix::sys::termios::LocalFlags::ICANON); + termios + .local_flags + .insert(nix::sys::termios::LocalFlags::ECHO); + termios + .input_flags + .insert(nix::sys::termios::InputFlags::ICRNL); + let _ = nix::sys::termios::tcsetattr(&stderr, nix::sys::termios::SetArg::TCSANOW, &termios); + } +} + +pub fn parse_manifest_resources(manifest_path: &Path) -> (u32, u32) { + let default = (2, 2048); + let content = match std::fs::read_to_string(manifest_path) { + Ok(c) => c, + Err(_) => return default, + }; + let yaml: serde_yml::Value = match serde_yml::from_str(&content) { + Ok(v) => v, + Err(_) => return default, + }; + let cpus = yaml + .get("resources") + .and_then(|r| r.get("cpus")) + .and_then(|v| v.as_u64()) + .unwrap_or(2) as u32; + let memory = yaml + .get("resources") + .and_then(|r| r.get("memory")) + .and_then(|v| v.as_u64()) + .unwrap_or(2048) as u32; + (cpus, memory) +} + +/// Remove workspace contents except installed dependency directories. +/// This lets us re-copy source files without losing `npm install` artifacts. +fn clean_workspace_preserving_deps(workspace: &Path) { + let preserve = ["node_modules", "target", ".venv", "__pycache__"]; + if let Ok(entries) = std::fs::read_dir(workspace) { + for entry in entries.flatten() { + let name = entry.file_name(); + let name_str = name.to_string_lossy(); + if preserve.iter().any(|s| *s == name_str.as_ref()) { + continue; + } + let path = entry.path(); + if path.is_dir() { + let _ = std::fs::remove_dir_all(&path); + } else { + let _ = std::fs::remove_file(&path); + } + } + } +} + +pub fn copy_dir_contents(src: &Path, dst: &Path) -> Result<(), String> { + let skip = [ + "node_modules", + ".git", + "target", + "__pycache__", + ".venv", + "dist", + ]; + for entry in + std::fs::read_dir(src).map_err(|e| format!("Failed to read {}: {}", src.display(), e))? + { + let entry = entry.map_err(|e| e.to_string())?; + let name = entry.file_name(); + let name_str = name.to_string_lossy(); + if skip.iter().any(|s| *s == name_str.as_ref()) { + continue; + } + let src_path = entry.path(); + let dst_path = dst.join(&name); + let meta = std::fs::symlink_metadata(&src_path) + .map_err(|e| format!("Failed to read metadata {}: {}", src_path.display(), e))?; + if meta.file_type().is_symlink() { + continue; + } + if meta.file_type().is_dir() { + std::fs::create_dir_all(&dst_path).map_err(|e| e.to_string())?; + copy_dir_contents(&src_path, &dst_path)?; + } else { + std::fs::copy(&src_path, &dst_path).map_err(|e| e.to_string())?; + } + } + Ok(()) +} + +pub fn build_libkrun_local_script(project: &ProjectInfo, prepared: bool) -> String { + let env_exports = build_env_exports(&project.env); + let mut parts: Vec = Vec::new(); + + parts.push("export PATH=/usr/local/bin:/usr/bin:/bin:$PATH".to_string()); + parts.push("export LANG=${LANG:-C.UTF-8}".to_string()); + parts.push("echo $$ > /sys/fs/cgroup/worker/cgroup.procs 2>/dev/null || true".to_string()); + + if !prepared { + if !project.setup_cmd.is_empty() { + parts.push(project.setup_cmd.clone()); + } + if !project.install_cmd.is_empty() { + parts.push(project.install_cmd.clone()); + } + parts.push("mkdir -p /var && touch /var/.iii-prepared".to_string()); + } + + parts.push(format!("{} && {}", env_exports, project.run_cmd)); + parts.join("\n") +} + +pub fn build_env_exports(env: &HashMap) -> String { + let mut parts: Vec = Vec::new(); + for (k, v) in env { + if k == "III_ENGINE_URL" || k == "III_URL" { + continue; + } + if !k.bytes().all(|b| b.is_ascii_alphanumeric() || b == b'_') || k.is_empty() { + continue; + } + parts.push(format!("export {}='{}'", k, shell_escape(v))); + } + if parts.is_empty() { + "true".to_string() + } else { + parts.join(" && ") + } +} + +pub fn shell_escape(s: &str) -> String { + s.replace('\'', "'\\''") +} + +pub fn build_local_env( + engine_url: &str, + project_env: &HashMap, +) -> HashMap { + let mut env = HashMap::new(); + env.insert("III_ENGINE_URL".to_string(), engine_url.to_string()); + env.insert("III_URL".to_string(), engine_url.to_string()); + for (key, value) in project_env { + if key != "III_ENGINE_URL" && key != "III_URL" { + env.insert(key.clone(), value.clone()); + } + } + env +} + +// ────────────────────────────────────────────────────────────────────────────── +// New functions for local-path worker support +// ────────────────────────────────────────────────────────────────────────────── + +/// Returns `true` if `input` looks like a local filesystem path rather than +/// a registry name or OCI reference. +pub fn is_local_path(input: &str) -> bool { + input.starts_with('.') || input.starts_with('/') || input.starts_with('~') +} + +/// Reads the worker `name` from `iii.worker.yaml` inside `project_path`. +/// Falls back to the directory name if no manifest or no `name` field is found. +pub fn resolve_worker_name(project_path: &Path) -> String { + let manifest_path = project_path.join(WORKER_MANIFEST); + if manifest_path.exists() { + if let Ok(content) = std::fs::read_to_string(&manifest_path) { + if let Ok(doc) = serde_yaml::from_str::(&content) { + if let Some(name) = doc.get("name").and_then(|n| n.as_str()) { + if !name.is_empty() { + return name.to_string(); + } + } + } + } + } + project_path + .file_name() + .and_then(|n| n.to_str()) + .unwrap_or("worker") + .to_string() +} + +/// Full flow for adding a local-path worker. +/// +/// 1. Resolve path, validate, detect language, resolve name +/// 2. Check config.yaml for duplicates (--force to override) +/// 3. Prepare base rootfs, clone, copy project files +/// 4. Run setup+install scripts inside a libkrun VM +/// 5. Extract default config from iii.worker.yaml +/// 6. Append to config.yaml with `worker_path` +pub async fn handle_local_add(path: &str, force: bool, reset_config: bool, brief: bool) -> i32 { + // 1. Resolve path to absolute + let project_path = match std::fs::canonicalize(path) { + Ok(p) => p, + Err(e) => { + eprintln!("{} Invalid path '{}': {}", "error:".red(), path, e); + return 1; + } + }; + + // 2. Validate directory exists + if !project_path.is_dir() { + eprintln!( + "{} '{}' is not a directory", + "error:".red(), + project_path.display() + ); + return 1; + } + + // 3. Detect language / project type + let _project = match load_project_info(&project_path) { + Some(p) => p, + None => { + eprintln!( + "{} Could not detect project type in '{}'. \ + Add iii.worker.yaml or use package.json/Cargo.toml/pyproject.toml.", + "error:".red(), + project_path.display() + ); + return 1; + } + }; + + // 4. Resolve worker name + let worker_name = resolve_worker_name(&project_path); + + if !brief { + eprintln!(" Adding local worker {}...", worker_name.bold()); + } + + // 5. Check if already exists in config.yaml + if super::config_file::worker_exists(&worker_name) { + if !force { + eprintln!( + "{} Worker '{}' already exists in config.yaml. Use --force to replace.", + "error:".red(), + worker_name + ); + return 1; + } + // --force: stop if running, clear artifacts + if super::managed::is_worker_running(&worker_name) { + eprintln!(" Stopping running worker {}...", worker_name.bold()); + super::managed::handle_managed_stop(&worker_name, "0.0.0.0", 49134).await; + } + let freed = super::managed::delete_worker_artifacts(&worker_name); + if freed > 0 { + eprintln!( + " Cleared {:.1} MB of artifacts", + freed as f64 / 1_048_576.0 + ); + } + if reset_config { + let _ = super::config_file::remove_worker(&worker_name); + } + } + + // 6. Extract default config from iii.worker.yaml + let manifest_path = project_path.join(WORKER_MANIFEST); + let config_yaml = if manifest_path.exists() { + std::fs::read_to_string(&manifest_path) + .ok() + .and_then(|content| serde_yaml::from_str::(&content).ok()) + .and_then(|doc| doc.get("config").cloned()) + .and_then(|v| serde_yaml::to_string(&v).ok()) + } else { + None + }; + + // 7. Append to config.yaml with worker_path + let abs_path_str = project_path.to_string_lossy(); + if let Err(e) = super::config_file::append_worker_with_path( + &worker_name, + &abs_path_str, + config_yaml.as_deref(), + ) { + eprintln!("{} {}", "error:".red(), e); + return 1; + } + + // 8. Print success + if brief { + eprintln!(" {} {}", "\u{2713}".green(), worker_name.bold()); + } else { + eprintln!( + "\n {} Worker {} added to {}", + "\u{2713}".green(), + worker_name.bold(), + "config.yaml".dimmed(), + ); + eprintln!(" {} {}", "Path".cyan().bold(), abs_path_str.bold()); + + // Auto-start if engine is running (skip if already running) + if super::managed::is_engine_running() { + if super::managed::is_worker_running(&worker_name) { + eprintln!(" {} Worker already running", "\u{2713}".green()); + } else { + let port = super::app::DEFAULT_PORT; + let result = start_local_worker(&worker_name, &abs_path_str, port).await; + if result == 0 { + eprintln!(" {} Worker auto-started", "\u{2713}".green()); + } else { + eprintln!( + " {} Could not auto-start worker. Run `iii worker start {}` manually.", + "\u{26a0}".yellow(), + worker_name + ); + } + } + } else { + eprintln!(" Start the engine to run it, or edit config.yaml to customize."); + } + } + + 0 +} + +/// Start a local-path worker VM. +/// +/// Re-copies project files, builds env, and runs via libkrun. +pub async fn start_local_worker(worker_name: &str, worker_path: &str, port: u16) -> i32 { + // Kill any stale process from a previous engine run + super::managed::kill_stale_worker(worker_name).await; + + #[cfg(unix)] + restore_terminal_cooked_mode(); + + // 1. Validate worker_path directory exists + let project_path = Path::new(worker_path); + if !project_path.is_dir() { + eprintln!( + "{} Worker path '{}' does not exist or is not a directory", + "error:".red(), + worker_path + ); + return 1; + } + + // 2. Detect language + let project = match load_project_info(project_path) { + Some(p) => p, + None => { + eprintln!( + "{} Could not detect project type in '{}'", + "error:".red(), + worker_path + ); + return 1; + } + }; + + let language = project.language.as_deref().unwrap_or("typescript"); + + // 3. Ensure libkrunfw available + if let Err(e) = super::firmware::download::ensure_libkrunfw().await { + tracing::warn!(error = %e, "failed to ensure libkrunfw"); + } + + if !super::worker_manager::libkrun::libkrun_available() { + eprintln!( + "{} No runtime available.\n \ + Rebuild with --features embed-libkrunfw or place libkrunfw in ~/.iii/lib/", + "error:".red() + ); + return 1; + } + + // 4. Prepare managed dir — clone rootfs on first start + let managed_dir = match dirs::home_dir() { + Some(h) => h.join(".iii").join("managed").join(worker_name), + None => { + eprintln!("{} Cannot determine home directory", "error:".red()); + return 1; + } + }; + + if !managed_dir.exists() { + eprintln!(" Preparing sandbox..."); + let base_rootfs = match super::worker_manager::oci::prepare_rootfs(language).await { + Ok(p) => p, + Err(e) => { + eprintln!("{} {}", "error:".red(), e); + return 1; + } + }; + if let Err(e) = clone_rootfs(&base_rootfs, &managed_dir) { + eprintln!("{} Failed to create project rootfs: {}", "error:".red(), e); + return 1; + } + } + + // 5. Re-copy project files to workspace (fresh source each start) + // Preserve installed dependency dirs (node_modules, target, .venv) + // so setup/install doesn't re-run every time. + let workspace = managed_dir.join("workspace"); + let ws = workspace.clone(); + let pp = project_path.to_path_buf(); + let copy_result = tokio::task::spawn_blocking(move || { + if ws.exists() { + clean_workspace_preserving_deps(&ws); + } + std::fs::create_dir_all(&ws).ok(); + copy_dir_contents(&pp, &ws) + }) + .await; + + match copy_result { + Ok(Ok(())) => {} + Ok(Err(e)) => { + eprintln!("{} Failed to copy project to rootfs: {}", "error:".red(), e); + return 1; + } + Err(e) => { + eprintln!("{} Copy task panicked: {}", "error:".red(), e); + return 1; + } + } + + // 5. Check .iii-prepared marker + let prepared_marker = managed_dir.join("var").join(".iii-prepared"); + let is_prepared = prepared_marker.exists(); + + if is_prepared { + eprintln!( + " {} Using cached deps {}", + "\u{2713}".green(), + "(use --force to reinstall)".dimmed() + ); + } + + // 6. Build env with engine URL + OCI env + config.yaml env + let engine_url = engine_url_for_runtime("libkrun", "0.0.0.0", port, &None); + let config_env = super::config_file::get_worker_config_as_env(worker_name); + + let mut combined_project_env = project.env.clone(); + for (k, v) in &config_env { + combined_project_env.insert(k.clone(), v.clone()); + } + + let mut env = build_local_env(&engine_url, &combined_project_env); + + let base_rootfs = match super::worker_manager::oci::prepare_rootfs(language).await { + Ok(p) => p, + Err(e) => { + eprintln!("{} {}", "error:".red(), e); + return 1; + } + }; + let oci_env = super::worker_manager::oci::read_oci_env(&base_rootfs); + for (key, value) in oci_env { + env.entry(key).or_insert(value); + } + + // 7. Build script + let script = build_libkrun_local_script(&project, is_prepared); + + let script_path = managed_dir.join("tmp").join("iii-dev-run.sh"); + std::fs::create_dir_all(managed_dir.join("tmp")).ok(); + if let Err(e) = std::fs::write(&script_path, &script) { + eprintln!("{} Failed to write run script: {}", "error:".red(), e); + return 1; + } + #[cfg(unix)] + { + use std::os::unix::fs::PermissionsExt; + let _ = std::fs::set_permissions(&script_path, std::fs::Permissions::from_mode(0o755)); + } + + // 8. Copy iii-init if needed + let init_path = match super::firmware::download::ensure_init_binary().await { + Ok(p) => p, + Err(e) => { + eprintln!("{} Failed to provision iii-init: {}", "error:".red(), e); + return 1; + } + }; + + if !iii_filesystem::init::has_init() { + let dest = managed_dir.join("init.krun"); + if let Err(e) = std::fs::copy(&init_path, &dest) { + eprintln!( + "{} Failed to copy iii-init to rootfs: {}", + "error:".red(), + e + ); + return 1; + } + #[cfg(unix)] + { + use std::os::unix::fs::PermissionsExt; + let _ = std::fs::set_permissions(&dest, std::fs::Permissions::from_mode(0o755)); + } + } + + // 9. Run via libkrun + let manifest_path = project_path.join(WORKER_MANIFEST); + let (vcpus, ram) = parse_manifest_resources(&manifest_path); + + let exec_path = "/bin/sh"; + let args = vec![ + "-c".to_string(), + "cd /workspace && exec bash /tmp/iii-dev-run.sh".to_string(), + ]; + + super::worker_manager::libkrun::run_dev( + language, + worker_path, + exec_path, + &args, + env, + vcpus, + ram, + managed_dir, + true, + worker_name, + ) + .await +} + +// ────────────────────────────────────────────────────────────────────────────── +// Tests +// ────────────────────────────────────────────────────────────────────────────── + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn is_local_path_detects_relative() { + assert!(is_local_path(".")); + assert!(is_local_path("..")); + assert!(is_local_path("./my-worker")); + assert!(is_local_path("../sibling")); + assert!(is_local_path("/absolute/path")); + assert!(is_local_path("~/projects/worker")); + } + + #[test] + fn is_local_path_rejects_names_and_oci() { + assert!(!is_local_path("pdfkit")); + assert!(!is_local_path("pdfkit@1.0.0")); + assert!(!is_local_path("ghcr.io/org/worker:tag")); + } + + #[test] + fn resolve_worker_name_from_manifest() { + let dir = tempfile::tempdir().unwrap(); + let yaml = "name: my-cool-worker\nruntime:\n language: typescript\n"; + std::fs::write(dir.path().join(WORKER_MANIFEST), yaml).unwrap(); + let name = resolve_worker_name(dir.path()); + assert_eq!(name, "my-cool-worker"); + } + + #[test] + fn resolve_worker_name_falls_back_to_dir_name() { + let dir = tempfile::tempdir().unwrap(); + // No iii.worker.yaml — should fall back to directory name + let name = resolve_worker_name(dir.path()); + let expected = dir + .path() + .file_name() + .unwrap() + .to_str() + .unwrap() + .to_string(); + assert_eq!(name, expected); + } + + #[test] + fn build_libkrun_local_script_first_run() { + let project = ProjectInfo { + name: "test".to_string(), + language: Some("typescript".to_string()), + setup_cmd: "apt-get install nodejs".to_string(), + install_cmd: "npm install".to_string(), + run_cmd: "node server.js".to_string(), + env: HashMap::new(), + }; + let script = build_libkrun_local_script(&project, false); + assert!(script.contains("apt-get install nodejs")); + assert!(script.contains("npm install")); + assert!(script.contains("node server.js")); + assert!(script.contains(".iii-prepared")); + } + + #[test] + fn build_libkrun_local_script_prepared() { + let project = ProjectInfo { + name: "test".to_string(), + language: Some("typescript".to_string()), + setup_cmd: "apt-get install nodejs".to_string(), + install_cmd: "npm install".to_string(), + run_cmd: "node server.js".to_string(), + env: HashMap::new(), + }; + let script = build_libkrun_local_script(&project, true); + assert!(!script.contains("apt-get install nodejs")); + assert!(!script.contains("npm install")); + assert!(script.contains("node server.js")); + } + + #[test] + fn build_local_env_sets_engine_urls() { + let env = build_local_env("ws://localhost:49134", &HashMap::new()); + assert_eq!(env.get("III_ENGINE_URL").unwrap(), "ws://localhost:49134"); + assert_eq!(env.get("III_URL").unwrap(), "ws://localhost:49134"); + } + + #[test] + fn build_local_env_preserves_custom_env() { + let mut project_env = HashMap::new(); + project_env.insert("CUSTOM".to_string(), "value".to_string()); + let env = build_local_env("ws://localhost:49134", &project_env); + assert_eq!(env.get("CUSTOM").unwrap(), "value"); + assert_eq!(env.get("III_ENGINE_URL").unwrap(), "ws://localhost:49134"); + assert_eq!(env.get("III_URL").unwrap(), "ws://localhost:49134"); + } + + #[test] + fn build_env_exports_excludes_engine_urls() { + let mut env = HashMap::new(); + env.insert( + "III_ENGINE_URL".to_string(), + "ws://localhost:49134".to_string(), + ); + env.insert("III_URL".to_string(), "ws://localhost:49134".to_string()); + env.insert("CUSTOM_VAR".to_string(), "custom-val".to_string()); + + let exports = build_env_exports(&env); + assert!(!exports.contains("III_ENGINE_URL")); + assert!(!exports.contains("III_URL")); + assert!(exports.contains("CUSTOM_VAR='custom-val'")); + } + + #[test] + fn shell_escape_single_quote() { + let result = shell_escape("it's"); + assert_eq!(result, "it'\\''s"); + } + + #[test] + fn copy_dir_contents_skips_ignored_dirs() { + let src = tempfile::tempdir().unwrap(); + let dst = tempfile::tempdir().unwrap(); + + std::fs::create_dir_all(src.path().join("src")).unwrap(); + std::fs::write(src.path().join("src/main.rs"), "fn main() {}").unwrap(); + std::fs::create_dir_all(src.path().join("node_modules/pkg")).unwrap(); + std::fs::write(src.path().join("node_modules/pkg/index.js"), "").unwrap(); + std::fs::create_dir_all(src.path().join(".git")).unwrap(); + std::fs::write(src.path().join(".git/config"), "").unwrap(); + std::fs::create_dir_all(src.path().join("target/debug")).unwrap(); + std::fs::write(src.path().join("target/debug/bin"), "").unwrap(); + + copy_dir_contents(src.path(), dst.path()).unwrap(); + + assert!(dst.path().join("src/main.rs").exists()); + assert!(!dst.path().join("node_modules").exists()); + assert!(!dst.path().join(".git").exists()); + assert!(!dst.path().join("target").exists()); + } + + #[test] + fn clean_workspace_preserving_deps_keeps_node_modules() { + let dir = tempfile::tempdir().unwrap(); + let ws = dir.path(); + + // Create dep dirs that should be preserved + std::fs::create_dir_all(ws.join("node_modules/pkg")).unwrap(); + std::fs::write(ws.join("node_modules/pkg/index.js"), "mod").unwrap(); + std::fs::create_dir_all(ws.join("target/debug")).unwrap(); + std::fs::write(ws.join("target/debug/bin"), "elf").unwrap(); + std::fs::create_dir_all(ws.join(".venv/lib")).unwrap(); + std::fs::write(ws.join(".venv/lib/site.py"), "py").unwrap(); + std::fs::create_dir_all(ws.join("__pycache__")).unwrap(); + std::fs::write(ws.join("__pycache__/mod.pyc"), "pyc").unwrap(); + + // Create source files/dirs that should be removed + std::fs::write(ws.join("main.ts"), "console.log()").unwrap(); + std::fs::create_dir_all(ws.join("src")).unwrap(); + std::fs::write(ws.join("src/lib.ts"), "export {}").unwrap(); + + clean_workspace_preserving_deps(ws); + + // Dep dirs preserved + assert!(ws.join("node_modules/pkg/index.js").exists()); + assert!(ws.join("target/debug/bin").exists()); + assert!(ws.join(".venv/lib/site.py").exists()); + assert!(ws.join("__pycache__/mod.pyc").exists()); + + // Source files/dirs removed + assert!(!ws.join("main.ts").exists()); + assert!(!ws.join("src").exists()); + } + + #[test] + fn clean_workspace_preserving_deps_handles_empty_dir() { + let dir = tempfile::tempdir().unwrap(); + // Should not panic on empty directory + clean_workspace_preserving_deps(dir.path()); + assert!(dir.path().exists()); + } + + #[test] + fn clean_workspace_preserving_deps_handles_nonexistent() { + let dir = tempfile::tempdir().unwrap(); + let gone = dir.path().join("nope"); + // Should not panic on nonexistent directory + clean_workspace_preserving_deps(&gone); + } + + #[test] + fn parse_manifest_resources_defaults() { + let dir = tempfile::tempdir().unwrap(); + let nonexistent = dir.path().join("nonexistent.yaml"); + let (cpus, memory) = parse_manifest_resources(&nonexistent); + assert_eq!(cpus, 2); + assert_eq!(memory, 2048); + } + + #[test] + fn parse_manifest_resources_custom() { + let dir = tempfile::tempdir().unwrap(); + let manifest_path = dir.path().join("iii.worker.yaml"); + let yaml = r#" +name: resource-test +resources: + cpus: 4 + memory: 4096 +"#; + std::fs::write(&manifest_path, yaml).unwrap(); + let (cpus, memory) = parse_manifest_resources(&manifest_path); + assert_eq!(cpus, 4); + assert_eq!(memory, 4096); + } +} diff --git a/crates/iii-worker/src/cli/managed.rs b/crates/iii-worker/src/cli/managed.rs index ea163a8f2..a09cb0970 100644 --- a/crates/iii-worker/src/cli/managed.rs +++ b/crates/iii-worker/src/cli/managed.rs @@ -10,13 +10,14 @@ use colored::Colorize; use super::binary_download; use super::builtin_defaults::get_builtin_default; +use super::config_file::ResolvedWorkerType; use super::lifecycle::build_container_spec; use super::registry::{ MANIFEST_PATH, RegistryV2, WorkerType, fetch_registry, parse_worker_input, resolve_image, }; use super::worker_manager::state::WorkerDef; -pub use super::dev::handle_worker_dev; +pub use super::local_worker::{handle_local_add, is_local_path, start_local_worker}; pub async fn handle_binary_add( input: &str, @@ -86,6 +87,21 @@ pub async fn handle_binary_add( repo.to_string().dimmed(), version ); + } + + // If the worker is already running, skip download entirely + if is_worker_running(&worker_name) { + if !brief { + eprintln!( + "\n {} Worker {} already running, skipping download", + "✓".green(), + worker_name.bold(), + ); + } + return 0; + } + + if !brief { eprintln!(" Downloading {}...", worker_name.bold()); } let install_path = match binary_download::download_and_install_binary( @@ -144,7 +160,26 @@ pub async fn handle_binary_add( worker_name.bold(), "config.yaml".dimmed(), ); - eprintln!(" Start the engine to run it, or edit config.yaml to customize."); + + // Auto-start if engine is running (skip if already running) + if is_engine_running() { + if is_worker_running(&worker_name) { + eprintln!(" {} Worker already running", "✓".green()); + } else { + let result = start_binary_worker(&worker_name, &install_path).await; + if result == 0 { + eprintln!(" {} Worker auto-started", "✓".green()); + } else { + eprintln!( + " {} Could not auto-start worker. Run `iii worker start {}` manually.", + "⚠".yellow(), + worker_name + ); + } + } + } else { + eprintln!(" Start the engine to run it, or edit config.yaml to customize."); + } } 0 } @@ -189,6 +224,13 @@ pub async fn handle_managed_add( force: bool, reset_config: bool, ) -> i32 { + // Local path workers: starts with '.', '/', or '~' + // Must be checked before force-mode processing since validate_worker_name rejects paths. + if super::local_worker::is_local_path(image_or_name) { + return super::local_worker::handle_local_add(image_or_name, force, reset_config, brief) + .await; + } + // --force: delete existing artifacts before re-downloading if force { // Extract plain name (strip @version if present) @@ -279,7 +321,27 @@ pub async fn handle_managed_add( "config.yaml".dimmed(), ); } - eprintln!(" Start the engine to run it, or edit config.yaml to customize."); + + // Auto-start if engine is running (skip if already running) + if is_engine_running() { + if is_worker_running(image_or_name) { + eprintln!(" {} Worker already running", "✓".green()); + } else { + let port = super::app::DEFAULT_PORT; + let result = handle_managed_start(image_or_name, "0.0.0.0", port).await; + if result == 0 { + eprintln!(" {} Worker auto-started", "✓".green()); + } else { + eprintln!( + " {} Could not auto-start worker. Run `iii worker start {}` manually.", + "⚠".yellow(), + image_or_name + ); + } + } + } else { + eprintln!(" Start the engine to run it, or edit config.yaml to customize."); + } } return 0; } @@ -422,7 +484,32 @@ async fn handle_oci_pull_and_add(name: &str, image_ref: &str, brief: bool) -> i3 name.bold(), "config.yaml".dimmed(), ); - eprintln!(" Start the engine to run it, or edit config.yaml to customize."); + + // Auto-start if engine is running (skip if already running) + if is_engine_running() { + if is_worker_running(name) { + eprintln!(" {} Worker already running", "✓".green()); + } else { + let port = super::app::DEFAULT_PORT; + let worker_def = WorkerDef::Managed { + image: image_ref.to_string(), + env: oci_env.into_iter().collect(), + resources: None, + }; + let result = start_oci_worker(name, &worker_def, port).await; + if result == 0 { + eprintln!(" {} Worker auto-started", "✓".green()); + } else { + eprintln!( + " {} Could not auto-start worker. Run `iii worker start {}` manually.", + "⚠".yellow(), + name + ); + } + } + } else { + eprintln!(" Start the engine to run it, or edit config.yaml to customize."); + } } 0 } @@ -519,9 +606,25 @@ fn clear_single_worker(worker_name: &str) -> i32 { /// Prompts the user for confirmation before clearing all artifacts. /// Returns `true` if the user confirms with "y". fn confirm_clear() -> bool { - eprint!(" This will remove all downloaded workers and images. Continue? [y/N] "); - let mut input = String::new(); - std::io::stdin().read_line(&mut input).is_ok() && input.trim().eq_ignore_ascii_case("y") + use std::io::{Read, Write}; + + // Restore canonical terminal mode in case a previous VM/worker session + // left the terminal in raw mode (no ICANON, no ECHO, no ICRNL). + #[cfg(unix)] + super::local_worker::restore_terminal_cooked_mode(); + + let _ = std::io::stderr() + .write_all(b" This will remove all downloaded workers and images. Continue? [y/N] "); + let _ = std::io::stderr().flush(); + + // Use read() instead of read_line() so that both CR (\r) and LF (\n) + // terminate input. read_line() only recognises LF, so if the terminal is + // in raw/non-canonical mode pressing Enter sends only CR and read_line + // blocks forever. + let mut buf = [0u8; 64]; + let n = std::io::stdin().read(&mut buf).unwrap_or(0); + let input = std::str::from_utf8(&buf[..n]).unwrap_or(""); + input.trim().eq_ignore_ascii_case("y") } fn clear_all_workers(skip_confirm: bool) -> i32 { @@ -621,14 +724,48 @@ fn clear_all_workers(skip_confirm: bool) -> i32 { 0 } +/// Kill any stale worker process from a previous engine run. +/// Checks OCI/local (vm.pid) and binary (pids/{name}.pid) PID files, +/// sends SIGTERM+SIGKILL, and removes the PID file. +pub async fn kill_stale_worker(worker_name: &str) { + let home = dirs::home_dir().unwrap_or_default(); + let pid_files = [ + home.join(".iii/managed").join(worker_name).join("vm.pid"), + home.join(".iii/pids").join(format!("{}.pid", worker_name)), + ]; + + for pid_file in &pid_files { + if let Ok(pid_str) = tokio::fs::read_to_string(pid_file).await { + if let Ok(pid) = pid_str.trim().parse::() { + #[cfg(unix)] + { + use nix::sys::signal::{Signal, kill}; + use nix::unistd::Pid; + let p = Pid::from_raw(pid); + // Only kill if process is still alive + if kill(p, None).is_ok() { + tracing::info!(worker = %worker_name, pid, "Killing stale worker process"); + let _ = kill(p, Signal::SIGTERM); + // Brief wait then force-kill + tokio::time::sleep(std::time::Duration::from_millis(500)).await; + let _ = kill(p, Signal::SIGKILL); + } + } + #[cfg(not(unix))] + { + let _ = pid; + } + } + let _ = tokio::fs::remove_file(pid_file).await; + } + } +} + /// Returns `true` if the worker has a valid PID file and the process is alive. pub fn is_worker_running(worker_name: &str) -> bool { let home = dirs::home_dir().unwrap_or_default(); let oci_pid = home.join(".iii/managed").join(worker_name).join("vm.pid"); - let bin_pid = home - .join(".iii/workers") - .join(worker_name) - .join("worker.pid"); + let bin_pid = home.join(".iii/pids").join(format!("{}.pid", worker_name)); for pid_file in [oci_pid, bin_pid] { if let Ok(pid_str) = std::fs::read_to_string(&pid_file) { @@ -654,6 +791,16 @@ pub fn is_worker_running(worker_name: &str) -> bool { false } +/// Probes `127.0.0.1:DEFAULT_PORT` to check whether the engine is listening. +/// Uses a 200ms timeout to avoid blocking the CLI. +pub fn is_engine_running() -> bool { + std::net::TcpStream::connect_timeout( + &std::net::SocketAddr::from(([127, 0, 0, 1], super::app::DEFAULT_PORT)), + std::time::Duration::from_millis(200), + ) + .is_ok() +} + /// Deletes local artifacts for a worker (binary dir or OCI image dir). /// Returns the number of bytes freed, or 0 if nothing was found. pub fn delete_worker_artifacts(worker_name: &str) -> u64 { @@ -701,6 +848,23 @@ pub fn delete_worker_artifacts(worker_name: &str) -> u64 { } } + // Local-path worker: ~/.iii/managed/{name}/ (same as OCI) + let managed_dir = home.join(".iii/managed").join(worker_name); + if managed_dir.is_dir() { + // Only count if we haven't already freed anything (avoid double-counting with OCI) + if freed == 0 { + freed += dir_size(&managed_dir); + if let Err(e) = std::fs::remove_dir_all(&managed_dir) { + eprintln!( + " {} Failed to remove {}: {}", + "warning:".yellow(), + managed_dir.display(), + e + ); + } + } + } + freed } @@ -742,21 +906,36 @@ pub async fn handle_managed_stop(worker_name: &str, _address: &str, _port: u16) } let home = dirs::home_dir().unwrap_or_default(); - // Check OCI worker PID file - let oci_pid_file = home.join(".iii/managed").join(worker_name).join("vm.pid"); - // Check binary worker PID file - let binary_pid_file = home - .join(".iii/workers") - .join(worker_name) - .join("worker.pid"); - - let (pid_file, is_oci) = if oci_pid_file.exists() { - (oci_pid_file, true) - } else if binary_pid_file.exists() { - (binary_pid_file, false) - } else { - eprintln!("{} Worker '{}' is not running", "error:".red(), worker_name); - return 1; + let (pid_file, is_oci) = match super::config_file::resolve_worker_type(worker_name) { + ResolvedWorkerType::Oci { .. } | ResolvedWorkerType::Local { .. } => { + let f = home.join(".iii/managed").join(worker_name).join("vm.pid"); + if !f.exists() { + eprintln!("{} Worker '{}' is not running", "error:".red(), worker_name); + return 1; + } + (f, true) + } + ResolvedWorkerType::Config => { + eprintln!( + "{} Cannot stop '{}': config workers run inside the engine and cannot be stopped individually", + "error:".red(), + worker_name + ); + return 1; + } + ResolvedWorkerType::Binary { .. } => { + let f = home.join(".iii/pids").join(format!("{}.pid", worker_name)); + if !f.exists() { + eprintln!( + "{} Worker '{}' is not running. Start it with 'iii worker start {}'", + "error:".red(), + worker_name, + worker_name + ); + return 1; + } + (f, false) + } }; match std::fs::read_to_string(&pid_file) { @@ -804,24 +983,24 @@ pub async fn handle_managed_start(worker_name: &str, _address: &str, port: u16) eprintln!("{} {}", "error:".red(), e); return 1; } - // Check if this is an OCI worker (has image: in config.yaml) - if let Some((image_ref, env)) = super::config_file::get_worker_start_info(worker_name) { - let worker_def = WorkerDef::Managed { - image: image_ref, - env, - resources: None, - }; - return start_oci_worker(worker_name, &worker_def, port).await; - } - - // Check if this is a binary worker (~/.iii/workers/{name} exists) - let binary_path = dirs::home_dir() - .unwrap_or_default() - .join(".iii/workers") - .join(worker_name); - - if binary_path.exists() { - return start_binary_worker(worker_name, &binary_path).await; + match super::config_file::resolve_worker_type(worker_name) { + ResolvedWorkerType::Oci { image, env } => { + let worker_def = WorkerDef::Managed { + image, + env, + resources: None, + }; + return start_oci_worker(worker_name, &worker_def, port).await; + } + ResolvedWorkerType::Local { worker_path } => { + return super::local_worker::start_local_worker(worker_name, &worker_path, port).await; + } + ResolvedWorkerType::Binary { binary_path } => { + return start_binary_worker(worker_name, &binary_path).await; + } + ResolvedWorkerType::Config => { + // Fall through to registry lookup below + } } // Not found locally — try remote registry for auto-install @@ -964,6 +1143,9 @@ async fn start_oci_worker(worker_name: &str, worker_def: &WorkerDef, port: u16) } async fn start_binary_worker(worker_name: &str, binary_path: &std::path::Path) -> i32 { + // Kill any stale process from a previous engine run + kill_stale_worker(worker_name).await; + // Create log directory: ~/.iii/logs/{name}/ let logs_dir = dirs::home_dir() .unwrap_or_default() @@ -1005,11 +1187,10 @@ async fn start_binary_worker(worker_name: &str, binary_path: &std::path::Path) - match cmd.spawn() { Ok(child) => { - // Write PID file for stop/status tracking - let pid_dir = dirs::home_dir() - .unwrap_or_default() - .join(".iii/workers") - .join(worker_name); + // Write PID file for stop/status tracking. + // Use ~/.iii/pids/{name}.pid — binary workers occupy ~/.iii/workers/{name} + // as a file (the executable), so we cannot create a subdirectory there. + let pid_dir = dirs::home_dir().unwrap_or_default().join(".iii/pids"); let _ = std::fs::create_dir_all(&pid_dir); #[cfg(unix)] { @@ -1017,7 +1198,7 @@ async fn start_binary_worker(worker_name: &str, binary_path: &std::path::Path) - let _ = std::fs::set_permissions(&pid_dir, std::fs::Permissions::from_mode(0o700)); } if let Some(pid) = child.id() { - let pid_path = pid_dir.join("worker.pid"); + let pid_path = pid_dir.join(format!("{}.pid", worker_name)); let _ = std::fs::write(&pid_path, pid.to_string()); #[cfg(unix)] { @@ -1049,21 +1230,39 @@ pub async fn handle_worker_list() -> i32 { return 0; } + let engine_running = is_engine_running(); + eprintln!(); - eprintln!(" {:25} {}", "NAME".bold(), "STATUS".bold()); - eprintln!(" {:25} {}", "----".dimmed(), "------".dimmed()); + eprintln!( + " {:25} {:10} {}", + "NAME".bold(), + "TYPE".bold(), + "STATUS".bold() + ); + eprintln!( + " {:25} {:10} {}", + "----".dimmed(), + "----".dimmed(), + "------".dimmed() + ); for name in &names { - let binary_path = dirs::home_dir() - .unwrap_or_default() - .join(".iii/workers") - .join(name); - let status = if binary_path.exists() { - "binary (installed)".green().to_string() + let worker_type = match super::config_file::resolve_worker_type(name) { + ResolvedWorkerType::Local { .. } => "local", + ResolvedWorkerType::Oci { .. } => "oci", + ResolvedWorkerType::Binary { .. } => "binary", + ResolvedWorkerType::Config => "config", + }; + + let running = if is_worker_running(name) { + "running".green().to_string() + } else if worker_type == "config" && engine_running { + "running".green().to_string() } else { - "configured".dimmed().to_string() + "stopped".dimmed().to_string() }; - eprintln!(" {:25} {}", name, status); + + eprintln!(" {:25} {:10} {}", name, worker_type.dimmed(), running); } eprintln!(); 0 @@ -1297,6 +1496,32 @@ pub async fn handle_managed_logs( mod tests { use super::*; use std::io::Write; + use std::sync::Mutex; + + static CWD_LOCK: Mutex<()> = Mutex::new(()); + + /// Run an async closure with CWD set to a temp dir, then restore. + /// Uses a drop guard so CWD is restored even if the closure panics. + async fn in_temp_dir_async(f: F) + where + F: FnOnce(std::path::PathBuf) -> Fut, + Fut: std::future::Future, + { + struct CwdGuard(std::path::PathBuf); + impl Drop for CwdGuard { + fn drop(&mut self) { + let _ = std::env::set_current_dir(&self.0); + } + } + + let _guard = CWD_LOCK.lock().unwrap(); + let dir = tempfile::tempdir().unwrap(); + let original = std::env::current_dir().unwrap(); + let dir_path = dir.path().to_path_buf(); + std::env::set_current_dir(&dir_path).unwrap(); + let _cwd_guard = CwdGuard(original); + f(dir_path).await; + } #[tokio::test] async fn read_new_bytes_picks_up_appended_content() { @@ -1507,7 +1732,7 @@ mod tests { #[test] fn confirm_clear_returns_false_on_empty_stdin() { // confirm_clear reads from stdin — in test context stdin is closed/empty, - // so read_line returns Ok("") which should not match "y" + // so read returns 0 bytes which should not match "y" // We can't easily call confirm_clear (it blocks on stdin), but we can // verify the logic inline: let input = ""; @@ -1518,6 +1743,15 @@ mod tests { assert!(input.trim().eq_ignore_ascii_case("y")); let input = "Y\n"; assert!(input.trim().eq_ignore_ascii_case("y")); + // CR-only line endings (raw/non-canonical terminal mode) + let input = "y\r"; + assert!(input.trim().eq_ignore_ascii_case("y")); + let input = "Y\r"; + assert!(input.trim().eq_ignore_ascii_case("y")); + let input = "y\r\n"; + assert!(input.trim().eq_ignore_ascii_case("y")); + let input = "\r"; + assert!(!input.trim().eq_ignore_ascii_case("y")); } #[test] @@ -1577,6 +1811,78 @@ mod tests { assert!(content.trim().parse::().is_err()); } + #[test] + fn kill_stale_worker_removes_pid_files() { + // Create fake PID files with a dead PID + let dir = tempfile::tempdir().unwrap(); + let managed_dir = dir.path().join("managed").join("test-worker"); + std::fs::create_dir_all(&managed_dir).unwrap(); + std::fs::write(managed_dir.join("vm.pid"), "2000000000").unwrap(); + + let pids_dir = dir.path().join("pids"); + std::fs::create_dir_all(&pids_dir).unwrap(); + std::fs::write(pids_dir.join("test-worker.pid"), "2000000000").unwrap(); + + // Verify files exist + assert!(managed_dir.join("vm.pid").exists()); + assert!(pids_dir.join("test-worker.pid").exists()); + + // kill_stale_worker uses real ~/.iii paths, so we test the logic directly: + // dead PID → signal-0 fails → no kill attempt → file removed + for pid_file in [managed_dir.join("vm.pid"), pids_dir.join("test-worker.pid")] { + if let Ok(pid_str) = std::fs::read_to_string(&pid_file) { + if let Ok(pid) = pid_str.trim().parse::() { + #[cfg(unix)] + { + use nix::sys::signal::kill; + use nix::unistd::Pid; + // PID 2000000000 should not be alive + assert!(kill(Pid::from_raw(pid), None).is_err()); + } + } + let _ = std::fs::remove_file(&pid_file); + } + } + + // Files should be cleaned up + assert!(!managed_dir.join("vm.pid").exists()); + assert!(!pids_dir.join("test-worker.pid").exists()); + } + + #[tokio::test] + async fn kill_stale_worker_no_op_when_no_pid_files() { + // Should not panic when no PID files exist + kill_stale_worker("__iii_test_nonexistent_99999__").await; + } + + #[tokio::test] + async fn kill_stale_worker_handles_invalid_pid_content() { + // Use real function with a worker name that won't collide + // The function should handle garbage content gracefully + let home = dirs::home_dir().unwrap_or_default(); + let pids_dir = home.join(".iii/pids"); + let _ = std::fs::create_dir_all(&pids_dir); + let pid_file = pids_dir.join("__iii_test_garbage_pid__.pid"); + std::fs::write(&pid_file, "not-a-number").unwrap(); + + kill_stale_worker("__iii_test_garbage_pid__").await; + + // File should still be removed even with garbage content + assert!(!pid_file.exists()); + } + + #[test] + fn binary_pid_path_uses_pids_dir() { + // Verify the PID path for binary workers doesn't conflict with the binary file + let home = dirs::home_dir().unwrap_or_default(); + let binary_path = home.join(".iii/workers/some-worker"); + let pid_path = home.join(".iii/pids/some-worker.pid"); + + // These should be different paths — binary at workers/{name}, PID at pids/{name}.pid + assert_ne!(binary_path.parent().unwrap(), pid_path.parent().unwrap()); + assert!(pid_path.to_string_lossy().ends_with(".pid")); + } + #[test] fn image_cache_dir_deterministic_hash() { // Same ref always produces same path @@ -1596,4 +1902,74 @@ mod tests { let path_str = dir.to_string_lossy(); assert!(path_str.contains(".iii/images/") || path_str.contains(".iii\\images\\")); } + + #[tokio::test] + async fn handle_managed_add_routes_local_path() { + in_temp_dir_async(|dir| async move { + // Create a minimal project directory with package.json + let proj = dir.join("test-local-worker"); + std::fs::create_dir_all(&proj).unwrap(); + std::fs::write(proj.join("package.json"), r#"{"name":"test"}"#).unwrap(); + + let path_str = proj.to_string_lossy().to_string(); + let exit_code = handle_managed_add(&path_str, false, None, false, false).await; + assert_eq!(exit_code, 0, "should succeed for valid local path"); + + let content = std::fs::read_to_string("config.yaml").unwrap(); + assert!( + content.contains("worker_path:"), + "should write worker_path field, got:\n{}", + content + ); + assert!( + !content.contains("image:"), + "should not have image field, got:\n{}", + content + ); + }) + .await; + } + + #[tokio::test] + async fn handle_managed_add_local_path_rejects_nonexistent() { + in_temp_dir_async(|_dir| async move { + let exit_code = + handle_managed_add("./nonexistent-path-12345", false, None, false, false).await; + assert_eq!(exit_code, 1, "should fail for nonexistent local path"); + }) + .await; + } + + #[tokio::test] + async fn handle_managed_add_local_path_force_replaces() { + in_temp_dir_async(|dir| async move { + // Create project directory + let proj = dir.join("force-worker"); + std::fs::create_dir_all(&proj).unwrap(); + std::fs::write(proj.join("package.json"), r#"{"name":"force-test"}"#).unwrap(); + + let path_str = proj.to_string_lossy().to_string(); + + // First add + let exit_code = handle_managed_add(&path_str, false, None, false, false).await; + assert_eq!(exit_code, 0); + assert!( + std::fs::read_to_string("config.yaml") + .unwrap() + .contains("worker_path:") + ); + + // Force re-add + let exit_code = handle_managed_add(&path_str, false, None, true, false).await; + assert_eq!(exit_code, 0, "force re-add should succeed"); + + let content = std::fs::read_to_string("config.yaml").unwrap(); + assert!( + content.contains("worker_path:"), + "should still have worker_path after force, got:\n{}", + content + ); + }) + .await; + } } diff --git a/crates/iii-worker/src/cli/mod.rs b/crates/iii-worker/src/cli/mod.rs index 2a2f111c6..6d2a90736 100644 --- a/crates/iii-worker/src/cli/mod.rs +++ b/crates/iii-worker/src/cli/mod.rs @@ -8,9 +8,9 @@ pub mod app; pub mod binary_download; pub mod builtin_defaults; pub mod config_file; -pub mod dev; pub mod firmware; pub mod lifecycle; +pub mod local_worker; pub mod managed; pub mod project; pub mod registry; diff --git a/crates/iii-worker/src/cli/worker_manager/libkrun.rs b/crates/iii-worker/src/cli/worker_manager/libkrun.rs index ae5aea351..578fb53fd 100644 --- a/crates/iii-worker/src/cli/worker_manager/libkrun.rs +++ b/crates/iii-worker/src/cli/worker_manager/libkrun.rs @@ -11,6 +11,7 @@ //! for crash isolation. use anyhow::{Context, Result}; +use colored::Colorize; use std::collections::HashMap; use std::path::PathBuf; @@ -39,6 +40,8 @@ pub async fn run_dev( vcpus: u32, ram_mib: u32, rootfs: PathBuf, + background: bool, + worker_name: &str, ) -> i32 { let self_exe = match std::env::current_exe() { Ok(p) => p, @@ -91,8 +94,61 @@ pub async fn run_dev( cmd.stdin(std::process::Stdio::null()); + if background { + let logs_dir = dirs::home_dir() + .unwrap_or_else(|| PathBuf::from("/tmp")) + .join(".iii/logs") + .join(worker_name); + if let Err(e) = std::fs::create_dir_all(&logs_dir) { + eprintln!("{} Failed to create logs dir: {}", "error:".red(), e); + return 1; + } + let stdout_file = match std::fs::File::create(logs_dir.join("stdout.log")) { + Ok(f) => f, + Err(e) => { + eprintln!("{} Failed to create stdout log: {}", "error:".red(), e); + return 1; + } + }; + let stderr_file = match std::fs::File::create(logs_dir.join("stderr.log")) { + Ok(f) => f, + Err(e) => { + eprintln!("{} Failed to create stderr log: {}", "error:".red(), e); + return 1; + } + }; + cmd.stdout(stdout_file).stderr(stderr_file); + } + match cmd.spawn() { Ok(mut child) => { + // Write PID file so is_worker_running / stop / kill_stale_worker can find us + let pid_file = rootfs.join("vm.pid"); + let pid = child.id().unwrap_or(0); + if pid > 0 { + if let Err(e) = std::fs::write(&pid_file, pid.to_string()) { + eprintln!( + "{} Failed to write PID file {}: {}", + "error:".red(), + pid_file.display(), + e + ); + // Kill the child so we don't leave an untracked VM running + let _ = child.kill().await; + return 1; + } + } + + if background { + eprintln!( + " {} {} started (pid: {})", + "✓".green(), + worker_name.bold(), + pid + ); + return 0; + } + let exit_code = tokio::select! { result = child.wait() => { match result { @@ -112,8 +168,11 @@ pub async fn run_dev( } }; + // Clean up PID file on exit + let _ = std::fs::remove_file(&pid_file); + #[cfg(unix)] - super::super::dev::restore_terminal_cooked_mode(); + super::super::local_worker::restore_terminal_cooked_mode(); exit_code } diff --git a/crates/iii-worker/src/main.rs b/crates/iii-worker/src/main.rs index 55ef74bd4..457068a95 100644 --- a/crates/iii-worker/src/main.rs +++ b/crates/iii-worker/src/main.rs @@ -74,24 +74,6 @@ async fn main() -> anyhow::Result<()> { address, port, } => iii_worker::cli::managed::handle_managed_stop(&worker_name, &address, port).await, - Commands::Dev { - path, - name, - runtime, - rebuild, - address, - port, - } => { - iii_worker::cli::managed::handle_worker_dev( - &path, - name.as_deref(), - runtime.as_deref(), - rebuild, - &address, - port, - ) - .await - } Commands::List => iii_worker::cli::managed::handle_worker_list().await, Commands::Logs { worker_name, diff --git a/crates/iii-worker/tests/config_file_integration.rs b/crates/iii-worker/tests/config_file_integration.rs index 85ce6bbc4..352048c17 100644 --- a/crates/iii-worker/tests/config_file_integration.rs +++ b/crates/iii-worker/tests/config_file_integration.rs @@ -588,3 +588,164 @@ async fn handle_managed_clear_all_no_artifacts() { }) .await; } + +// ────────────────────────────────────────────────────────────────────────────── +// append_worker_with_path tests +// ────────────────────────────────────────────────────────────────────────────── + +#[test] +fn append_worker_with_path_creates_entry() { + in_temp_dir(|| { + iii_worker::cli::config_file::append_worker_with_path("local-w", "/abs/path", None) + .unwrap(); + let content = std::fs::read_to_string("config.yaml").unwrap(); + assert!(content.contains("- name: local-w")); + assert!(content.contains("worker_path: /abs/path")); + }); +} + +#[test] +fn append_worker_with_path_with_config() { + in_temp_dir(|| { + iii_worker::cli::config_file::append_worker_with_path( + "local-w", + "/abs/path", + Some("timeout: 30"), + ) + .unwrap(); + let content = std::fs::read_to_string("config.yaml").unwrap(); + assert!(content.contains("- name: local-w")); + assert!(content.contains("worker_path: /abs/path")); + assert!(content.contains("timeout: 30")); + }); +} + +#[test] +fn append_worker_with_path_merges_existing() { + in_temp_dir(|| { + std::fs::write( + "config.yaml", + "workers:\n - name: local-w\n worker_path: /old\n config:\n custom_key: preserved\n", + ) + .unwrap(); + iii_worker::cli::config_file::append_worker_with_path( + "local-w", + "/new/path", + Some("new_key: added"), + ) + .unwrap(); + let content = std::fs::read_to_string("config.yaml").unwrap(); + assert!( + content.contains("worker_path: /new/path"), + "path should be updated, got:\n{}", + content + ); + assert!( + content.contains("custom_key"), + "user config should be preserved, got:\n{}", + content + ); + assert!( + content.contains("new_key"), + "incoming config should be merged, got:\n{}", + content + ); + }); +} + +#[test] +fn append_worker_with_path_replaces_image_with_path() { + in_temp_dir(|| { + std::fs::write( + "config.yaml", + "workers:\n - name: my-worker\n image: ghcr.io/org/w:1\n", + ) + .unwrap(); + iii_worker::cli::config_file::append_worker_with_path("my-worker", "/local/path", None) + .unwrap(); + let content = std::fs::read_to_string("config.yaml").unwrap(); + assert!( + content.contains("worker_path: /local/path"), + "should have worker_path, got:\n{}", + content + ); + assert!( + !content.contains("image:"), + "image should be removed, got:\n{}", + content + ); + }); +} + +// ────────────────────────────────────────────────────────────────────────────── +// get_worker_path tests +// ────────────────────────────────────────────────────────────────────────────── + +#[test] +fn get_worker_path_present() { + in_temp_dir(|| { + std::fs::write( + "config.yaml", + "workers:\n - name: my-worker\n worker_path: /home/user/proj\n", + ) + .unwrap(); + let path = iii_worker::cli::config_file::get_worker_path("my-worker"); + assert_eq!(path, Some("/home/user/proj".to_string())); + }); +} + +#[test] +fn get_worker_path_absent() { + in_temp_dir(|| { + std::fs::write( + "config.yaml", + "workers:\n - name: oci-worker\n image: ghcr.io/org/w:1\n", + ) + .unwrap(); + let path = iii_worker::cli::config_file::get_worker_path("oci-worker"); + assert!(path.is_none()); + }); +} + +// ────────────────────────────────────────────────────────────────────────────── +// resolve_worker_type tests +// ────────────────────────────────────────────────────────────────────────────── + +#[test] +fn resolve_worker_type_from_config_file() { + use iii_worker::cli::config_file::ResolvedWorkerType; + + in_temp_dir(|| { + std::fs::write( + "config.yaml", + "workers:\n - name: local-w\n worker_path: /home/user/proj\n - name: oci-w\n image: ghcr.io/org/w:1\n - name: config-w\n", + ) + .unwrap(); + + let local = iii_worker::cli::config_file::resolve_worker_type("local-w"); + assert!( + matches!(local, ResolvedWorkerType::Local { ref worker_path } if worker_path == "/home/user/proj"), + "expected Local, got {:?}", + local + ); + + let oci = iii_worker::cli::config_file::resolve_worker_type("oci-w"); + assert!( + matches!(oci, ResolvedWorkerType::Oci { ref image, .. } if image == "ghcr.io/org/w:1"), + "expected Oci, got {:?}", + oci + ); + + // config-only worker resolves to Config (or Binary if ~/.iii/workers/config-w exists, + // but that's unlikely in test environments) + let config = iii_worker::cli::config_file::resolve_worker_type("config-w"); + assert!( + matches!( + config, + ResolvedWorkerType::Config | ResolvedWorkerType::Binary { .. } + ), + "expected Config or Binary fallback, got {:?}", + config + ); + }); +} diff --git a/crates/iii-worker/tests/worker_integration.rs b/crates/iii-worker/tests/worker_integration.rs index 431d2522a..4fb9efd1b 100644 --- a/crates/iii-worker/tests/worker_integration.rs +++ b/crates/iii-worker/tests/worker_integration.rs @@ -22,9 +22,6 @@ fn cli_parses_all_subcommands() { (&["iii-worker", "stop", "pdfkit"], |c| { assert!(matches!(c, Commands::Stop { .. })) }), - (&["iii-worker", "dev", "."], |c| { - assert!(matches!(c, Commands::Dev { .. })) - }), (&["iii-worker", "list"], |c| { assert!(matches!(c, Commands::List)) }), @@ -80,43 +77,6 @@ fn add_subcommand_multiple_workers() { } } -/// `dev` subcommand requires a path and supports all optional flags. -#[test] -fn dev_subcommand_all_flags() { - let cli = Cli::parse_from([ - "iii-worker", - "dev", - "/tmp/project", - "--rebuild", - "--name", - "my-worker", - "--port", - "5000", - ]); - match cli.command { - Commands::Dev { - path, - name, - rebuild, - port, - .. - } => { - assert_eq!(path, "/tmp/project"); - assert_eq!(name, Some("my-worker".to_string())); - assert!(rebuild); - assert_eq!(port, 5000); - } - _ => panic!("expected Dev"), - } -} - -/// `dev` without a path argument fails (path is required). -#[test] -fn dev_requires_path() { - let result = Cli::try_parse_from(["iii-worker", "dev"]); - assert!(result.is_err(), "dev without PATH should fail"); -} - /// `logs` subcommand parses worker name and --follow flag. #[test] fn logs_subcommand_with_follow() { @@ -282,6 +242,45 @@ fn add_force_short_flag() { } } +/// `add ./path` accepts relative local paths as worker names. +#[test] +fn add_subcommand_accepts_local_path() { + let cli = Cli::parse_from(["iii-worker", "add", "./my-worker"]); + match cli.command { + Commands::Add { args, force } => { + assert_eq!(args.worker_names, vec!["./my-worker"]); + assert!(!force); + } + _ => panic!("expected Add"), + } +} + +/// `add /absolute/path` accepts absolute local paths. +#[test] +fn add_subcommand_accepts_absolute_path() { + let cli = Cli::parse_from(["iii-worker", "add", "/tmp/my-worker"]); + match cli.command { + Commands::Add { args, force } => { + assert_eq!(args.worker_names, vec!["/tmp/my-worker"]); + assert!(!force); + } + _ => panic!("expected Add"), + } +} + +/// `add ./path --force` parses both path and force flag. +#[test] +fn add_subcommand_local_path_with_force() { + let cli = Cli::parse_from(["iii-worker", "add", "./my-worker", "--force"]); + match cli.command { + Commands::Add { args, force } => { + assert_eq!(args.worker_names, vec!["./my-worker"]); + assert!(force); + } + _ => panic!("expected Add"), + } +} + /// `reinstall` parses as expected and shares AddArgs with Add. #[test] fn reinstall_subcommand() {