Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
142 changes: 83 additions & 59 deletions src/commands/list/collect.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
//! Worktree data collection with parallelized git operations.
//!
//! This module provides an efficient approach to collecting worktree data:
//! - Parallel collection across worktrees (using Rayon)
//! - Parallel operations within each worktree (using scoped threads)
//! - Progressive updates via channels (update UI as each worktree completes)
//! - All tasks flattened into a single Rayon work queue
//! - Network tasks (CI, URL) sorted to run last
//! - Progressive updates via channels (update UI as each task completes)
//!
//! ## Skeleton Performance (IMPORTANT)
//!
Expand Down Expand Up @@ -37,11 +37,9 @@
//!
//! Both modes render the final table in `collect()`, ensuring a single canonical rendering path.
//!
//! **Parallelism at two levels**:
//! - Across worktrees: Multiple worktrees collected concurrently via Rayon
//! - Within worktrees: Git operations (ahead/behind, diffs, CI) run concurrently via scoped threads
//!
//! This ensures fast operations don't wait for slow ones (e.g., CI doesn't block ahead/behind counts)
//! **Flat parallelism**: All tasks (for all worktrees and branches) are collected into a single
//! work queue and processed via Rayon's thread pool. This avoids nested parallelism and ensures
//! optimal CPU utilization (~8 threads) regardless of worktree count.
use anyhow::Context;
use color_print::cformat;
use crossbeam_channel as chan;
Expand Down Expand Up @@ -225,6 +223,13 @@ impl TaskResult {
}
}

impl TaskKind {
/// Whether this task requires network I/O (slow).
fn is_network(self) -> bool {
matches!(self, TaskKind::CiStatus | TaskKind::UrlStatus)
}
}

/// Detect if a worktree is in the middle of a git operation (rebase/merge).
pub(super) fn detect_git_operation(repo: &Repository) -> GitOperationState {
if repo.is_rebasing().unwrap_or(false) {
Expand Down Expand Up @@ -944,36 +949,20 @@ pub fn collect(
// Collect errors for display after rendering
let mut errors: Vec<TaskError> = Vec::new();

// Spawn worktree collection in background thread
// Collect all work items upfront, then execute in a single Rayon pool.
// This avoids nested parallelism (Rayon par_iter → thread::scope per worktree)
// which could create 100+ threads. Instead, we have one pool with ~8 threads.
let sorted_worktrees_clone = sorted_worktrees.clone();
let tx_worktrees = tx.clone();
let tx_worker = tx.clone();
let default_branch_clone = default_branch.clone();
let target_clone = integration_target.clone();
let expected_results_wt = expected_results.clone();
let options_wt = options.clone();
std::thread::spawn(move || {
sorted_worktrees_clone
.par_iter()
.enumerate()
.for_each(|(idx, wt)| {
// Pass default_branch (local default) for stable informational stats,
// and target (effective target) for integration checks.
super::collect_progressive_impl::collect_worktree_progressive(
wt,
idx,
&default_branch_clone,
&target_clone,
&options_wt,
tx_worktrees.clone(),
&expected_results_wt,
);
});
});
let expected_results_clone = expected_results.clone();
let options_clone = options.clone();
let main_path = main_worktree.path.clone();

// Spawn branch collection in background thread (local + remote)
if show_branches || show_remotes {
// Combine local and remote branches with their item indices
let mut all_branches: Vec<(usize, String, String)> = Vec::new();
// Prepare branch data if needed (before moving into closure)
let branch_data: Vec<(usize, String, String)> = if show_branches || show_remotes {
let mut all_branches = Vec::new();
if show_branches {
all_branches.extend(
branches_without_worktrees
Expand All @@ -990,31 +979,54 @@ pub fn collect(
.map(|(idx, (name, sha))| (remote_start_idx + idx, name.clone(), sha.clone())),
);
}
all_branches
} else {
Vec::new()
};

std::thread::spawn(move || {
use super::collect_progressive_impl::{work_items_for_branch, work_items_for_worktree};

// Phase 1: Generate all work items (sequential, fast)
// Work items are collected upfront so we can process them all in a single par_iter.
let mut all_work_items = Vec::new();

// Worktree work items
for (idx, wt) in sorted_worktrees_clone.iter().enumerate() {
all_work_items.extend(work_items_for_worktree(
wt,
idx,
&default_branch_clone,
&target_clone,
&options_clone,
&expected_results_clone,
&tx_worker,
));
}

// Branch work items (local + remote)
for (item_idx, branch_name, commit_sha) in &branch_data {
all_work_items.extend(work_items_for_branch(
branch_name,
commit_sha,
&main_path,
*item_idx,
&default_branch_clone,
&target_clone,
&options_clone,
&expected_results_clone,
));
}

// Sort: local git ops first, network ops (CI, URL) last.
all_work_items.sort_by_key(|item| item.kind.is_network());

let main_path = main_worktree.path.clone();
let tx_branches = tx.clone();
let default_branch_clone = default_branch.clone();
let target_clone = integration_target.clone();
let expected_results_br = expected_results.clone();
let options_br = options.clone();
std::thread::spawn(move || {
all_branches
.par_iter()
.for_each(|(item_idx, branch_name, commit_sha)| {
super::collect_progressive_impl::collect_branch_progressive(
branch_name,
commit_sha,
&main_path,
*item_idx,
&default_branch_clone,
&target_clone,
&options_br,
tx_branches.clone(),
&expected_results_br,
);
});
// Phase 2: Execute all work items in Rayon's thread pool
all_work_items.into_par_iter().for_each(|item| {
let result = item.execute();
let _ = tx_worker.send(result);
});
}
});

// Drop the original sender so drain_results knows when all spawned threads are done
drop(tx);
Expand Down Expand Up @@ -1368,15 +1380,27 @@ pub fn populate_item(

// Spawn collection in background thread
std::thread::spawn(move || {
super::collect_progressive_impl::collect_worktree_progressive(
use super::collect_progressive_impl::work_items_for_worktree;

// Generate work items for this single worktree
let mut work_items = work_items_for_worktree(
&wt,
0, // Single item, always index 0
&default_branch_clone,
&target_clone,
&options,
tx,
&expected_results_clone,
&tx,
);

// Sort: local git ops first, network ops last
work_items.sort_by_key(|item| item.kind.is_network());

// Execute all work items in Rayon's thread pool
work_items.into_par_iter().for_each(|item| {
let result = item.execute();
let _ = tx.send(result);
});
});

// Drain task results (blocking until complete)
Expand Down
Loading