diff --git a/src/commands/list/collect.rs b/src/commands/list/collect.rs index ef2f46d87..b394a3405 100644 --- a/src/commands/list/collect.rs +++ b/src/commands/list/collect.rs @@ -1,9 +1,9 @@ //! Worktree data collection with parallelized git operations. //! //! This module provides an efficient approach to collecting worktree data: -//! - Parallel collection across worktrees (using Rayon) -//! - Parallel operations within each worktree (using scoped threads) -//! - Progressive updates via channels (update UI as each worktree completes) +//! - All tasks flattened into a single Rayon work queue +//! - Network tasks (CI, URL) sorted to run last +//! - Progressive updates via channels (update UI as each task completes) //! //! ## Skeleton Performance (IMPORTANT) //! @@ -37,11 +37,9 @@ //! //! Both modes render the final table in `collect()`, ensuring a single canonical rendering path. //! -//! **Parallelism at two levels**: -//! - Across worktrees: Multiple worktrees collected concurrently via Rayon -//! - Within worktrees: Git operations (ahead/behind, diffs, CI) run concurrently via scoped threads -//! -//! This ensures fast operations don't wait for slow ones (e.g., CI doesn't block ahead/behind counts) +//! **Flat parallelism**: All tasks (for all worktrees and branches) are collected into a single +//! work queue and processed via Rayon's thread pool. This avoids nested parallelism and ensures +//! optimal CPU utilization (~8 threads) regardless of worktree count. use anyhow::Context; use color_print::cformat; use crossbeam_channel as chan; @@ -225,6 +223,13 @@ impl TaskResult { } } +impl TaskKind { + /// Whether this task requires network I/O (slow). + fn is_network(self) -> bool { + matches!(self, TaskKind::CiStatus | TaskKind::UrlStatus) + } +} + /// Detect if a worktree is in the middle of a git operation (rebase/merge). pub(super) fn detect_git_operation(repo: &Repository) -> GitOperationState { if repo.is_rebasing().unwrap_or(false) { @@ -944,36 +949,20 @@ pub fn collect( // Collect errors for display after rendering let mut errors: Vec = Vec::new(); - // Spawn worktree collection in background thread + // Collect all work items upfront, then execute in a single Rayon pool. + // This avoids nested parallelism (Rayon par_iter → thread::scope per worktree) + // which could create 100+ threads. Instead, we have one pool with ~8 threads. let sorted_worktrees_clone = sorted_worktrees.clone(); - let tx_worktrees = tx.clone(); + let tx_worker = tx.clone(); let default_branch_clone = default_branch.clone(); let target_clone = integration_target.clone(); - let expected_results_wt = expected_results.clone(); - let options_wt = options.clone(); - std::thread::spawn(move || { - sorted_worktrees_clone - .par_iter() - .enumerate() - .for_each(|(idx, wt)| { - // Pass default_branch (local default) for stable informational stats, - // and target (effective target) for integration checks. - super::collect_progressive_impl::collect_worktree_progressive( - wt, - idx, - &default_branch_clone, - &target_clone, - &options_wt, - tx_worktrees.clone(), - &expected_results_wt, - ); - }); - }); + let expected_results_clone = expected_results.clone(); + let options_clone = options.clone(); + let main_path = main_worktree.path.clone(); - // Spawn branch collection in background thread (local + remote) - if show_branches || show_remotes { - // Combine local and remote branches with their item indices - let mut all_branches: Vec<(usize, String, String)> = Vec::new(); + // Prepare branch data if needed (before moving into closure) + let branch_data: Vec<(usize, String, String)> = if show_branches || show_remotes { + let mut all_branches = Vec::new(); if show_branches { all_branches.extend( branches_without_worktrees @@ -990,31 +979,54 @@ pub fn collect( .map(|(idx, (name, sha))| (remote_start_idx + idx, name.clone(), sha.clone())), ); } + all_branches + } else { + Vec::new() + }; + + std::thread::spawn(move || { + use super::collect_progressive_impl::{work_items_for_branch, work_items_for_worktree}; + + // Phase 1: Generate all work items (sequential, fast) + // Work items are collected upfront so we can process them all in a single par_iter. + let mut all_work_items = Vec::new(); + + // Worktree work items + for (idx, wt) in sorted_worktrees_clone.iter().enumerate() { + all_work_items.extend(work_items_for_worktree( + wt, + idx, + &default_branch_clone, + &target_clone, + &options_clone, + &expected_results_clone, + &tx_worker, + )); + } + + // Branch work items (local + remote) + for (item_idx, branch_name, commit_sha) in &branch_data { + all_work_items.extend(work_items_for_branch( + branch_name, + commit_sha, + &main_path, + *item_idx, + &default_branch_clone, + &target_clone, + &options_clone, + &expected_results_clone, + )); + } + + // Sort: local git ops first, network ops (CI, URL) last. + all_work_items.sort_by_key(|item| item.kind.is_network()); - let main_path = main_worktree.path.clone(); - let tx_branches = tx.clone(); - let default_branch_clone = default_branch.clone(); - let target_clone = integration_target.clone(); - let expected_results_br = expected_results.clone(); - let options_br = options.clone(); - std::thread::spawn(move || { - all_branches - .par_iter() - .for_each(|(item_idx, branch_name, commit_sha)| { - super::collect_progressive_impl::collect_branch_progressive( - branch_name, - commit_sha, - &main_path, - *item_idx, - &default_branch_clone, - &target_clone, - &options_br, - tx_branches.clone(), - &expected_results_br, - ); - }); + // Phase 2: Execute all work items in Rayon's thread pool + all_work_items.into_par_iter().for_each(|item| { + let result = item.execute(); + let _ = tx_worker.send(result); }); - } + }); // Drop the original sender so drain_results knows when all spawned threads are done drop(tx); @@ -1368,15 +1380,27 @@ pub fn populate_item( // Spawn collection in background thread std::thread::spawn(move || { - super::collect_progressive_impl::collect_worktree_progressive( + use super::collect_progressive_impl::work_items_for_worktree; + + // Generate work items for this single worktree + let mut work_items = work_items_for_worktree( &wt, 0, // Single item, always index 0 &default_branch_clone, &target_clone, &options, - tx, &expected_results_clone, + &tx, ); + + // Sort: local git ops first, network ops last + work_items.sort_by_key(|item| item.kind.is_network()); + + // Execute all work items in Rayon's thread pool + work_items.into_par_iter().for_each(|item| { + let result = item.execute(); + let _ = tx.send(result); + }); }); // Drain task results (blocking until complete) diff --git a/src/commands/list/collect_progressive_impl.rs b/src/commands/list/collect_progressive_impl.rs index 272de2ecc..4b71bf78b 100644 --- a/src/commands/list/collect_progressive_impl.rs +++ b/src/commands/list/collect_progressive_impl.rs @@ -5,13 +5,17 @@ //! //! ## Architecture //! -//! The framework guarantees that every spawned task is registered in `ExpectedResults` -//! and sends exactly one `TaskResult`: +//! All tasks are executed in a single Rayon thread pool (flat parallelism): //! -//! - `Task` trait: Each task type implements `compute()` returning a `TaskResult` -//! - `TaskSpawner`: Ties together registration + spawn + send in a single operation +//! 1. **Work item generation**: `work_items_for_worktree()` and `work_items_for_branch()` +//! generate `WorkItem` instances for each task, registering expected results upfront. //! -//! This eliminates the "spawn but forget to register" failure mode from the old design. +//! 2. **Parallel execution**: All work items are collected into a `Vec` and processed +//! via `into_par_iter()`. Rayon schedules optimally across its thread pool (~8 threads). +//! +//! 3. **Result delivery**: Each `WorkItem::execute()` returns a result; the caller sends it. +//! +//! This avoids nested parallelism (Rayon → thread::scope) which could create 100+ threads. use crossbeam_channel::Sender; use std::fmt::Display; @@ -38,7 +42,7 @@ pub struct CollectOptions { /// Tasks to skip (not compute). Empty set means compute everything. /// /// This controls both: - /// - Task spawning (in `collect_worktree_progressive`/`collect_branch_progressive`) + /// - Work item generation (in `work_items_for_worktree`/`work_items_for_branch`) /// - Column visibility (layout filters columns via `ColumnSpec::requires_task`) pub skip_tasks: std::collections::HashSet, @@ -110,98 +114,204 @@ pub trait Task: Send + Sync + 'static { fn compute(ctx: TaskContext) -> Result; } -/// Spawner that ties together registration + spawn + send. +// ============================================================================ +// Work Item Dispatch (for flat parallelism) +// ============================================================================ + +/// A unit of work for the thread pool. /// -/// Using `TaskSpawner::spawn()` is the only way to run a task, and it -/// automatically registers the expected result kind before spawning. -pub struct TaskSpawner { - tx: Sender>, - expected: Arc, +/// Each work item represents a single task to be executed. Work items are +/// collected upfront and then processed in parallel via Rayon's thread pool, +/// avoiding nested parallelism (Rayon par_iter → thread::scope). +#[derive(Clone)] +pub struct WorkItem { + pub ctx: TaskContext, + pub kind: TaskKind, } -impl TaskSpawner { - pub fn new(tx: Sender>, expected: Arc) -> Self { - Self { tx, expected } +impl WorkItem { + /// Execute this work item, returning the task result. + pub fn execute(self) -> Result { + let result = dispatch_task(self.kind, self.ctx); + if let Ok(ref task_result) = result { + debug_assert_eq!(TaskKind::from(task_result), self.kind); + } + result } +} - /// Spawn a task, registering its expected result and sending on completion. - /// - /// This is the only way to run a `Task`. It guarantees: - /// 1. The expected result is registered before the task runs - /// 2. Exactly one result (Ok or Err) is sent when the task completes - pub fn spawn<'scope, T: Task>( - &self, - scope: &'scope std::thread::Scope<'scope, '_>, - ctx: &TaskContext, - ) { - // 1. Register expectation - self.expected.expect(ctx.item_idx, T::KIND); - - // 2. Clone for the spawned thread - let tx = self.tx.clone(); - let ctx = ctx.clone(); - - // 3. Spawn the work - scope.spawn(move || { - let result = T::compute(ctx); - if let Ok(ref task_result) = result { - debug_assert_eq!(TaskKind::from(task_result), T::KIND); - } - let _ = tx.send(result); - }); +/// Dispatch a task by kind, calling the appropriate Task::compute(). +fn dispatch_task(kind: TaskKind, ctx: TaskContext) -> Result { + match kind { + TaskKind::CommitDetails => CommitDetailsTask::compute(ctx), + TaskKind::AheadBehind => AheadBehindTask::compute(ctx), + TaskKind::CommittedTreesMatch => CommittedTreesMatchTask::compute(ctx), + TaskKind::HasFileChanges => HasFileChangesTask::compute(ctx), + TaskKind::WouldMergeAdd => WouldMergeAddTask::compute(ctx), + TaskKind::IsAncestor => IsAncestorTask::compute(ctx), + TaskKind::BranchDiff => BranchDiffTask::compute(ctx), + TaskKind::WorkingTreeDiff => WorkingTreeDiffTask::compute(ctx), + TaskKind::MergeTreeConflicts => MergeTreeConflictsTask::compute(ctx), + TaskKind::WorkingTreeConflicts => WorkingTreeConflictsTask::compute(ctx), + TaskKind::GitOperation => GitOperationTask::compute(ctx), + TaskKind::UserMarker => UserMarkerTask::compute(ctx), + TaskKind::Upstream => UpstreamTask::compute(ctx), + TaskKind::CiStatus => CiStatusTask::compute(ctx), + TaskKind::UrlStatus => UrlStatusTask::compute(ctx), } +} - fn spawn_core_tasks<'scope>( - &self, - scope: &'scope std::thread::Scope<'scope, '_>, - ctx: &TaskContext, - ) { - self.spawn::(scope, ctx); - self.spawn::(scope, ctx); - self.spawn::(scope, ctx); - self.spawn::(scope, ctx); - self.spawn::(scope, ctx); - self.spawn::(scope, ctx); - } - - fn spawn_worktree_only_tasks<'scope>( - &self, - scope: &'scope std::thread::Scope<'scope, '_>, - ctx: &TaskContext, - skip: &std::collections::HashSet, - ) { - self.spawn::(scope, ctx); - self.spawn::(scope, ctx); - self.spawn::(scope, ctx); - // Working tree conflict check only with --full - if !skip.contains(&TaskKind::WorkingTreeConflicts) { - self.spawn::(scope, ctx); - } +/// Generate work items for a worktree. +/// +/// Returns a list of work items representing all tasks that should run for this +/// worktree. Expected results are registered internally as each work item is added. +/// The caller is responsible for executing the work items. +pub fn work_items_for_worktree( + wt: &Worktree, + item_idx: usize, + default_branch: &str, + target: &str, + options: &CollectOptions, + expected_results: &Arc, + tx: &Sender>, +) -> Vec { + // Skip git operations for prunable worktrees (directory missing). + if wt.is_prunable() { + return vec![]; } - fn spawn_optional_tasks<'scope>( - &self, - scope: &'scope std::thread::Scope<'scope, '_>, - ctx: &TaskContext, - skip: &std::collections::HashSet, - ) { - if !skip.contains(&TaskKind::BranchDiff) { - self.spawn::(scope, ctx); - } - if !skip.contains(&TaskKind::MergeTreeConflicts) { - self.spawn::(scope, ctx); - } - if !skip.contains(&TaskKind::CiStatus) { - self.spawn::(scope, ctx); - } - if !skip.contains(&TaskKind::WouldMergeAdd) { - self.spawn::(scope, ctx); + // Expand URL template for this item + let item_url = options.url_template.as_ref().and_then(|template| { + wt.branch.as_ref().and_then(|branch| { + let mut vars = std::collections::HashMap::new(); + vars.insert("branch", branch.as_str()); + worktrunk::config::expand_template(template, &vars, false).ok() + }) + }); + + // Send URL immediately (before health check) so it appears right away. + // The UrlStatusTask will later update with active status. + if let Some(ref url) = item_url { + expected_results.expect(item_idx, TaskKind::UrlStatus); + let _ = tx.send(Ok(TaskResult::UrlStatus { + item_idx, + url: Some(url.clone()), + active: None, + })); + } + + let ctx = TaskContext { + repo_path: wt.path.clone(), + commit_sha: wt.head.clone(), + branch: wt.branch.clone(), + default_branch: Some(default_branch.to_string()), + target: Some(target.to_string()), + item_idx, + item_url, + }; + + let skip = &options.skip_tasks; + let mut items = Vec::with_capacity(15); + + // Helper to add a work item and register the expected result + let mut add_item = |kind: TaskKind| { + expected_results.expect(item_idx, kind); + items.push(WorkItem { + ctx: ctx.clone(), + kind, + }); + }; + + for kind in [ + TaskKind::CommitDetails, + TaskKind::AheadBehind, + TaskKind::CommittedTreesMatch, + TaskKind::HasFileChanges, + TaskKind::IsAncestor, + TaskKind::Upstream, + TaskKind::WorkingTreeDiff, + TaskKind::GitOperation, + TaskKind::UserMarker, + TaskKind::WorkingTreeConflicts, + TaskKind::BranchDiff, + TaskKind::MergeTreeConflicts, + TaskKind::CiStatus, + TaskKind::WouldMergeAdd, + ] { + if !skip.contains(&kind) { + add_item(kind); } - // URL status only runs if this item has a URL - if !skip.contains(&TaskKind::UrlStatus) && ctx.item_url.is_some() { - self.spawn::(scope, ctx); + } + // URL status health check task (if we have a URL). + // Note: We already registered and sent an immediate UrlStatus above with url + active=None. + // This work item will send a second UrlStatus with active=Some(bool) after health check. + // Both results must be registered and expected. + if !skip.contains(&TaskKind::UrlStatus) && ctx.item_url.is_some() { + expected_results.expect(item_idx, TaskKind::UrlStatus); + items.push(WorkItem { + ctx: ctx.clone(), + kind: TaskKind::UrlStatus, + }); + } + + items +} + +/// Generate work items for a branch (no worktree). +/// +/// Returns a list of work items representing all tasks that should run for this +/// branch. Branches have fewer tasks than worktrees (no working tree operations). +#[allow(clippy::too_many_arguments)] +pub fn work_items_for_branch( + branch_name: &str, + commit_sha: &str, + repo_path: &std::path::Path, + item_idx: usize, + default_branch: &str, + target: &str, + options: &CollectOptions, + expected_results: &Arc, +) -> Vec { + let ctx = TaskContext { + repo_path: repo_path.to_path_buf(), + commit_sha: commit_sha.to_string(), + branch: Some(branch_name.to_string()), + default_branch: Some(default_branch.to_string()), + target: Some(target.to_string()), + item_idx, + item_url: None, // Branches without worktrees don't have URLs + }; + + let skip = &options.skip_tasks; + let mut items = Vec::with_capacity(11); + + // Helper to add a work item and register the expected result + let mut add_item = |kind: TaskKind| { + expected_results.expect(item_idx, kind); + items.push(WorkItem { + ctx: ctx.clone(), + kind, + }); + }; + + for kind in [ + TaskKind::CommitDetails, + TaskKind::AheadBehind, + TaskKind::CommittedTreesMatch, + TaskKind::HasFileChanges, + TaskKind::IsAncestor, + TaskKind::Upstream, + TaskKind::BranchDiff, + TaskKind::MergeTreeConflicts, + TaskKind::CiStatus, + TaskKind::WouldMergeAdd, + ] { + if !skip.contains(&kind) { + add_item(kind); } } + + items } // ============================================================================ @@ -682,122 +792,6 @@ pub(crate) fn parse_port_from_url(url: &str) -> Option { port_str.parse().ok() } -// ============================================================================ -// Collection Entry Points -// ============================================================================ - -fn collect_progressive( - ctx: TaskContext, - include_worktree_tasks: bool, - options: &CollectOptions, - tx: Sender>, - expected_results: &Arc, -) { - let spawner = TaskSpawner::new(tx, expected_results.clone()); - let skip = &options.skip_tasks; - - std::thread::scope(|s| { - // Core tasks (always run) - spawner.spawn_core_tasks(s, &ctx); - if include_worktree_tasks { - spawner.spawn_worktree_only_tasks(s, &ctx, skip); - } - spawner.spawn_optional_tasks(s, &ctx, skip); - }); -} - -/// Collect worktree data progressively, sending results as each task completes. -/// -/// Spawns parallel git operations (up to 10). Each task sends a TaskResult when it -/// completes, enabling progressive UI updates. Tasks in `options.skip_tasks` are not spawned. -/// -/// # Parameters -/// - `default_branch`: Local default branch for informational stats (ahead/behind, branch diff) -/// - `target`: Effective target for integration checks (may be upstream if ahead) -pub fn collect_worktree_progressive( - wt: &Worktree, - item_idx: usize, - default_branch: &str, - target: &str, - options: &CollectOptions, - tx: Sender>, - expected_results: &Arc, -) { - // Skip git operations for prunable worktrees (directory missing). - // Git operations would fail anyway since the directory doesn't exist. - if wt.is_prunable() { - return; - } - - // Expand URL template for this item (deferred from pre-skeleton) - let item_url = options.url_template.as_ref().and_then(|template| { - wt.branch.as_ref().and_then(|branch| { - let mut vars = std::collections::HashMap::new(); - vars.insert("branch", branch.as_str()); - worktrunk::config::expand_template(template, &vars, false).ok() - }) - }); - - // Send URL immediately (before health check) so it appears in normal styling right away. - // The health check task will later send url_active to dim if inactive. - if let Some(ref url) = item_url { - expected_results.expect(item_idx, TaskKind::UrlStatus); - let _ = tx.send(Ok(TaskResult::UrlStatus { - item_idx, - url: Some(url.clone()), - active: None, - })); - } - - let ctx = TaskContext { - repo_path: wt.path.clone(), - commit_sha: wt.head.clone(), - branch: wt.branch.clone(), - default_branch: Some(default_branch.to_string()), - target: Some(target.to_string()), - item_idx, - item_url, - }; - - collect_progressive(ctx, true, options, tx, expected_results); -} - -/// Collect branch data progressively, sending results as each task completes. -/// -/// Spawns parallel git operations (up to 7, similar to worktrees but without working -/// tree operations). Tasks in `options.skip_tasks` are not spawned. -/// -/// # Parameters -/// - `default_branch`: Local default branch for informational stats (ahead/behind, branch diff) -/// - `target`: Effective target for integration checks (may be upstream if ahead) -#[allow(clippy::too_many_arguments)] -pub fn collect_branch_progressive( - branch_name: &str, - commit_sha: &str, - repo_path: &std::path::Path, - item_idx: usize, - default_branch: &str, - target: &str, - options: &CollectOptions, - tx: Sender>, - expected_results: &Arc, -) { - // Branches without worktrees don't have URLs - there's no directory to run a dev server from. - // URL templates only make sense for worktrees where post-start hooks can start servers. - - let ctx = TaskContext { - repo_path: repo_path.to_path_buf(), - commit_sha: commit_sha.to_string(), - branch: Some(branch_name.to_string()), - default_branch: Some(default_branch.to_string()), - target: Some(target.to_string()), - item_idx, - item_url: None, - }; - - collect_progressive(ctx, false, options, tx, expected_results); -} - // ============================================================================ // Helper Functions // ============================================================================ diff --git a/src/main.rs b/src/main.rs index 54b1bd653..64c566994 100644 --- a/src/main.rs +++ b/src/main.rs @@ -709,6 +709,20 @@ fn enhance_and_exit_error(err: clap::Error) -> ! { } fn main() { + // Configure Rayon's global thread pool for mixed I/O workloads. + // The `wt list` command runs git operations (CPU + disk I/O) and network + // requests (CI status, URL health checks) in parallel. Using 2x CPU cores + // allows threads blocked on I/O to overlap with compute work. + // + // TODO: Benchmark different thread counts to find optimal value. + // Test with `RAYON_NUM_THREADS=N wt list` on repos with many worktrees. + let num_threads = std::thread::available_parallelism() + .map(|n| n.get() * 2) + .unwrap_or(8); + let _ = rayon::ThreadPoolBuilder::new() + .num_threads(num_threads) + .build_global(); + // Tell crossterm to always emit ANSI sequences crossterm::style::force_color_output(true);