diff --git a/Cargo.lock b/Cargo.lock index 651895714320a..3c2a947ee4081 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3154,17 +3154,6 @@ dependencies = [ "tikv-jemalloc-sys", ] -[[package]] -name = "rustc-rayon" -version = "0.5.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2cd9fb077db982d7ceb42a90471e5a69a990b58f71e06f0d8340bb2cf35eb751" -dependencies = [ - "either", - "indexmap", - "rustc-rayon-core", -] - [[package]] name = "rustc-rayon-core" version = "0.5.0" @@ -3536,7 +3525,7 @@ dependencies = [ "parking_lot", "portable-atomic", "rustc-hash 2.1.1", - "rustc-rayon", + "rustc-rayon-core", "rustc-stable-hash", "rustc_arena", "rustc_graphviz", @@ -3882,7 +3871,6 @@ dependencies = [ name = "rustc_interface" version = "0.0.0" dependencies = [ - "rustc-rayon", "rustc-rayon-core", "rustc_abi", "rustc_ast", diff --git a/compiler/rustc_codegen_cranelift/src/driver/aot.rs b/compiler/rustc_codegen_cranelift/src/driver/aot.rs index fb7864ae6124b..e33814060f653 100644 --- a/compiler/rustc_codegen_cranelift/src/driver/aot.rs +++ b/compiler/rustc_codegen_cranelift/src/driver/aot.rs @@ -728,26 +728,27 @@ pub(crate) fn run_aot( let concurrency_limiter = IntoDynSyncSend(ConcurrencyLimiter::new(todo_cgus.len())); - let modules = tcx.sess.time("codegen mono items", || { - let mut modules: Vec<_> = par_map(todo_cgus, |(_, cgu)| { - let dep_node = cgu.codegen_dep_node(tcx); - tcx.dep_graph - .with_task( + let modules: Vec<_> = + tcx.sess.time("codegen mono items", || { + let modules: Vec<_> = par_map(todo_cgus, |(_, cgu)| { + let dep_node = cgu.codegen_dep_node(tcx); + let (module, _) = tcx.dep_graph.with_task( dep_node, tcx, (global_asm_config.clone(), cgu.name(), concurrency_limiter.acquire(tcx.dcx())), module_codegen, Some(rustc_middle::dep_graph::hash_result), - ) - .0 - }); - modules.extend( - done_cgus + ); + IntoDynSyncSend(module) + }); + modules .into_iter() - .map(|(_, cgu)| OngoingModuleCodegen::Sync(reuse_workproduct_for_cgu(tcx, cgu))), - ); - modules - }); + .map(|module| module.0) + .chain(done_cgus.into_iter().map(|(_, cgu)| { + OngoingModuleCodegen::Sync(reuse_workproduct_for_cgu(tcx, cgu)) + })) + .collect() + }); let allocator_module = emit_allocator_module(tcx); diff --git a/compiler/rustc_codegen_ssa/src/base.rs b/compiler/rustc_codegen_ssa/src/base.rs index 1985b3b717063..bf49bac1736ce 100644 --- a/compiler/rustc_codegen_ssa/src/base.rs +++ b/compiler/rustc_codegen_ssa/src/base.rs @@ -10,7 +10,7 @@ use rustc_ast::expand::allocator::{ALLOCATOR_METHODS, AllocatorKind, global_fn_n use rustc_attr_parsing::OptimizeAttr; use rustc_data_structures::fx::{FxHashMap, FxIndexSet}; use rustc_data_structures::profiling::{get_resident_set_size, print_time_passes_entry}; -use rustc_data_structures::sync::par_map; +use rustc_data_structures::sync::{IntoDynSyncSend, par_map}; use rustc_data_structures::unord::UnordMap; use rustc_hir::def_id::{DefId, LOCAL_CRATE}; use rustc_hir::lang_items::LangItem; @@ -757,7 +757,7 @@ pub fn codegen_crate( let pre_compiled_cgus = par_map(cgus, |(i, _)| { let module = backend.compile_codegen_unit(tcx, codegen_units[i].name()); - (i, module) + (i, IntoDynSyncSend(module)) }); total_codegen_time += start_time.elapsed(); @@ -777,7 +777,7 @@ pub fn codegen_crate( match cgu_reuse { CguReuse::No => { let (module, cost) = if let Some(cgu) = pre_compiled_cgus.remove(&i) { - cgu + cgu.0 } else { let start_time = Instant::now(); let module = backend.compile_codegen_unit(tcx, cgu.name()); diff --git a/compiler/rustc_data_structures/Cargo.toml b/compiler/rustc_data_structures/Cargo.toml index fcaf2750507dd..f48c73b13b961 100644 --- a/compiler/rustc_data_structures/Cargo.toml +++ b/compiler/rustc_data_structures/Cargo.toml @@ -14,7 +14,7 @@ indexmap = "2.4.0" jobserver_crate = { version = "0.1.28", package = "jobserver" } measureme = "12.0.1" rustc-hash = "2.0.0" -rustc-rayon = { version = "0.5.1", features = ["indexmap"] } +rustc-rayon-core = { version = "0.5.0" } rustc-stable-hash = { version = "0.1.0", features = ["nightly"] } rustc_arena = { path = "../rustc_arena" } rustc_graphviz = { path = "../rustc_graphviz" } diff --git a/compiler/rustc_data_structures/src/marker.rs b/compiler/rustc_data_structures/src/marker.rs index 64c64bfa3c296..744ae9b6fe2ac 100644 --- a/compiler/rustc_data_structures/src/marker.rs +++ b/compiler/rustc_data_structures/src/marker.rs @@ -179,6 +179,12 @@ impl FromDyn { FromDyn(val) } + #[inline(always)] + pub fn derive(&self, val: O) -> FromDyn { + // We already did the check for `sync::is_dyn_thread_safe()` when creating `Self` + FromDyn(val) + } + #[inline(always)] pub fn into_inner(self) -> T { self.0 @@ -200,6 +206,13 @@ impl std::ops::Deref for FromDyn { } } +impl std::ops::DerefMut for FromDyn { + #[inline(always)] + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 + } +} + // A wrapper to convert a struct that is already a `Send` or `Sync` into // an instance of `DynSend` and `DynSync`, since the compiler cannot infer // it automatically in some cases. (e.g. Box) diff --git a/compiler/rustc_data_structures/src/sync/parallel.rs b/compiler/rustc_data_structures/src/sync/parallel.rs index 8ef8a3f358569..ba3c85ef5b155 100644 --- a/compiler/rustc_data_structures/src/sync/parallel.rs +++ b/compiler/rustc_data_structures/src/sync/parallel.rs @@ -7,7 +7,6 @@ use std::any::Any; use std::panic::{AssertUnwindSafe, catch_unwind, resume_unwind}; use parking_lot::Mutex; -use rayon::iter::{FromParallelIterator, IntoParallelIterator, ParallelIterator}; use crate::FatalErrorMarker; use crate::sync::{DynSend, DynSync, FromDyn, IntoDynSyncSend, mode}; @@ -97,11 +96,11 @@ macro_rules! parallel { // This function only works when `mode::is_dyn_thread_safe()`. pub fn scope<'scope, OP, R>(op: OP) -> R where - OP: FnOnce(&rayon::Scope<'scope>) -> R + DynSend, + OP: FnOnce(&rayon_core::Scope<'scope>) -> R + DynSend, R: DynSend, { let op = FromDyn::from(op); - rayon::scope(|s| FromDyn::from(op.into_inner()(s))).into_inner() + rayon_core::scope(|s| FromDyn::from(op.into_inner()(s))).into_inner() } #[inline] @@ -114,7 +113,7 @@ where let oper_a = FromDyn::from(oper_a); let oper_b = FromDyn::from(oper_b); let (a, b) = parallel_guard(|guard| { - rayon::join( + rayon_core::join( move || guard.run(move || FromDyn::from(oper_a.into_inner()())), move || guard.run(move || FromDyn::from(oper_b.into_inner()())), ) @@ -125,56 +124,103 @@ where } } -pub fn par_for_each_in + IntoParallelIterator>( +fn par_slice( + items: &mut [I], + guard: &ParallelGuard, + for_each: impl Fn(&mut I) + DynSync + DynSend, +) { + struct State<'a, F> { + for_each: FromDyn, + guard: &'a ParallelGuard, + group: usize, + } + + fn par_rec( + items: &mut [I], + state: &State<'_, F>, + ) { + if items.len() <= state.group { + for item in items { + state.guard.run(|| (state.for_each)(item)); + } + } else { + let (left, right) = items.split_at_mut(items.len() / 2); + let mut left = state.for_each.derive(left); + let mut right = state.for_each.derive(right); + rayon_core::join(move || par_rec(*left, state), move || par_rec(*right, state)); + } + } + + let state = State { + for_each: FromDyn::from(for_each), + guard, + group: std::cmp::max(items.len() / 128, 1), + }; + par_rec(items, &state) +} + +pub fn par_for_each_in>( t: T, - for_each: impl Fn(I) + DynSync + DynSend, + for_each: impl Fn(&I) + DynSync + DynSend, ) { parallel_guard(|guard| { if mode::is_dyn_thread_safe() { - let for_each = FromDyn::from(for_each); - t.into_par_iter().for_each(|i| { - guard.run(|| for_each(i)); - }); + let mut items: Vec<_> = t.into_iter().collect(); + par_slice(&mut items, guard, |i| for_each(&*i)) } else { t.into_iter().for_each(|i| { - guard.run(|| for_each(i)); + guard.run(|| for_each(&i)); }); } }); } -pub fn try_par_for_each_in< - T: IntoIterator + IntoParallelIterator::Item>, - E: Send, ->( +/// This runs `for_each` in parallel for each iterator item. If one or more of the +/// `for_each` calls returns `Err`, the function will also return `Err`. The error returned +/// will be non-deterministic, but this is expected to be used with `ErrorGuaranteed` which +/// are all equivalent. +pub fn try_par_for_each_in( t: T, - for_each: impl Fn(::Item) -> Result<(), E> + DynSync + DynSend, -) -> Result<(), E> { + for_each: impl Fn(&::Item) -> Result<(), E> + DynSync + DynSend, +) -> Result<(), E> +where + ::Item: DynSend, +{ parallel_guard(|guard| { if mode::is_dyn_thread_safe() { - let for_each = FromDyn::from(for_each); - t.into_par_iter() - .filter_map(|i| guard.run(|| for_each(i))) - .reduce(|| Ok(()), Result::and) + let mut items: Vec<_> = t.into_iter().collect(); + + let error = Mutex::new(None); + + par_slice(&mut items, guard, |i| { + if let Err(err) = for_each(&*i) { + *error.lock() = Some(err); + } + }); + + if let Some(err) = error.into_inner() { Err(err) } else { Ok(()) } } else { - t.into_iter().filter_map(|i| guard.run(|| for_each(i))).fold(Ok(()), Result::and) + t.into_iter().filter_map(|i| guard.run(|| for_each(&i))).fold(Ok(()), Result::and) } }) } -pub fn par_map< - I, - T: IntoIterator + IntoParallelIterator, - R: std::marker::Send, - C: FromIterator + FromParallelIterator, ->( +pub fn par_map, R: DynSend, C: FromIterator>( t: T, map: impl Fn(I) -> R + DynSync + DynSend, ) -> C { parallel_guard(|guard| { if mode::is_dyn_thread_safe() { let map = FromDyn::from(map); - t.into_par_iter().filter_map(|i| guard.run(|| map(i))).collect() + + let mut items: Vec<(Option, Option)> = + t.into_iter().map(|i| (Some(i), None)).collect(); + + par_slice(&mut items, guard, |i| { + i.1 = Some(map(i.0.take().unwrap())); + }); + + items.into_iter().filter_map(|i| i.1).collect() } else { t.into_iter().filter_map(|i| guard.run(|| map(i))).collect() } diff --git a/compiler/rustc_interface/Cargo.toml b/compiler/rustc_interface/Cargo.toml index e36739356648a..ff28dbeaee698 100644 --- a/compiler/rustc_interface/Cargo.toml +++ b/compiler/rustc_interface/Cargo.toml @@ -5,7 +5,6 @@ edition = "2024" [dependencies] # tidy-alphabetical-start -rustc-rayon = { version = "0.5.0" } rustc-rayon-core = { version = "0.5.0" } rustc_ast = { path = "../rustc_ast" } rustc_ast_lowering = { path = "../rustc_ast_lowering" } diff --git a/compiler/rustc_interface/src/util.rs b/compiler/rustc_interface/src/util.rs index 5cccab893bb35..b355d8c1a9cd2 100644 --- a/compiler/rustc_interface/src/util.rs +++ b/compiler/rustc_interface/src/util.rs @@ -179,7 +179,7 @@ pub(crate) fn run_in_thread_pool_with_globals R + Send, let current_gcx = FromDyn::from(CurrentGcx::new()); let current_gcx2 = current_gcx.clone(); - let builder = rayon::ThreadPoolBuilder::new() + let builder = rayon_core::ThreadPoolBuilder::new() .thread_name(|_| "rustc".to_string()) .acquire_thread_handler(jobserver::acquire_thread) .release_thread_handler(jobserver::release_thread) @@ -223,7 +223,7 @@ pub(crate) fn run_in_thread_pool_with_globals R + Send, builder .build_scoped( // Initialize each new worker thread when created. - move |thread: rayon::ThreadBuilder| { + move |thread: rayon_core::ThreadBuilder| { // Register the thread for use with the `WorkerLocal` type. registry.register(); @@ -232,7 +232,9 @@ pub(crate) fn run_in_thread_pool_with_globals R + Send, }) }, // Run `f` on the first thread in the thread pool. - move |pool: &rayon::ThreadPool| pool.install(|| f(current_gcx.into_inner())), + move |pool: &rayon_core::ThreadPool| { + pool.install(|| f(current_gcx.into_inner())) + }, ) .unwrap() }) diff --git a/compiler/rustc_metadata/src/rmeta/encoder.rs b/compiler/rustc_metadata/src/rmeta/encoder.rs index 7ab3d432bdf81..b20f5dbc38561 100644 --- a/compiler/rustc_metadata/src/rmeta/encoder.rs +++ b/compiler/rustc_metadata/src/rmeta/encoder.rs @@ -2192,7 +2192,7 @@ fn prefetch_mir(tcx: TyCtxt<'_>) { } let reachable_set = tcx.reachable_set(()); - par_for_each_in(tcx.mir_keys(()), |&def_id| { + par_for_each_in(tcx.mir_keys(()), |&&def_id| { let (encode_const, encode_opt) = should_encode_mir(tcx, reachable_set, def_id); if encode_const { diff --git a/compiler/rustc_middle/src/hir/map.rs b/compiler/rustc_middle/src/hir/map.rs index 52f155a16b868..86f2706b3b914 100644 --- a/compiler/rustc_middle/src/hir/map.rs +++ b/compiler/rustc_middle/src/hir/map.rs @@ -342,7 +342,7 @@ impl<'tcx> TyCtxt<'tcx> { #[inline] pub fn par_hir_body_owners(self, f: impl Fn(LocalDefId) + DynSend + DynSync) { - par_for_each_in(&self.hir_crate_items(()).body_owners[..], |&def_id| f(def_id)); + par_for_each_in(&self.hir_crate_items(()).body_owners[..], |&&def_id| f(def_id)); } pub fn hir_ty_param_owner(self, def_id: LocalDefId) -> LocalDefId { diff --git a/compiler/rustc_middle/src/hir/mod.rs b/compiler/rustc_middle/src/hir/mod.rs index 68b9a4f56b96d..fa2b1d45da313 100644 --- a/compiler/rustc_middle/src/hir/mod.rs +++ b/compiler/rustc_middle/src/hir/mod.rs @@ -83,35 +83,35 @@ impl ModuleItems { &self, f: impl Fn(ItemId) -> Result<(), ErrorGuaranteed> + DynSend + DynSync, ) -> Result<(), ErrorGuaranteed> { - try_par_for_each_in(&self.free_items[..], |&id| f(id)) + try_par_for_each_in(&self.free_items[..], |&&id| f(id)) } pub fn par_trait_items( &self, f: impl Fn(TraitItemId) -> Result<(), ErrorGuaranteed> + DynSend + DynSync, ) -> Result<(), ErrorGuaranteed> { - try_par_for_each_in(&self.trait_items[..], |&id| f(id)) + try_par_for_each_in(&self.trait_items[..], |&&id| f(id)) } pub fn par_impl_items( &self, f: impl Fn(ImplItemId) -> Result<(), ErrorGuaranteed> + DynSend + DynSync, ) -> Result<(), ErrorGuaranteed> { - try_par_for_each_in(&self.impl_items[..], |&id| f(id)) + try_par_for_each_in(&self.impl_items[..], |&&id| f(id)) } pub fn par_foreign_items( &self, f: impl Fn(ForeignItemId) -> Result<(), ErrorGuaranteed> + DynSend + DynSync, ) -> Result<(), ErrorGuaranteed> { - try_par_for_each_in(&self.foreign_items[..], |&id| f(id)) + try_par_for_each_in(&self.foreign_items[..], |&&id| f(id)) } pub fn par_opaques( &self, f: impl Fn(LocalDefId) -> Result<(), ErrorGuaranteed> + DynSend + DynSync, ) -> Result<(), ErrorGuaranteed> { - try_par_for_each_in(&self.opaques[..], |&id| f(id)) + try_par_for_each_in(&self.opaques[..], |&&id| f(id)) } } diff --git a/compiler/rustc_monomorphize/src/collector.rs b/compiler/rustc_monomorphize/src/collector.rs index 67fca1d7c2947..437f2777cfd60 100644 --- a/compiler/rustc_monomorphize/src/collector.rs +++ b/compiler/rustc_monomorphize/src/collector.rs @@ -1683,7 +1683,7 @@ pub(crate) fn collect_crate_mono_items<'tcx>( let mut recursion_depths = DefIdMap::default(); collect_items_rec( tcx, - dummy_spanned(root), + dummy_spanned(*root), &state, &mut recursion_depths, recursion_limit, diff --git a/src/tools/miri/src/bin/miri.rs b/src/tools/miri/src/bin/miri.rs index 56ee96502b3e7..4e8fe0ca8adf7 100644 --- a/src/tools/miri/src/bin/miri.rs +++ b/src/tools/miri/src/bin/miri.rs @@ -185,7 +185,7 @@ impl rustc_driver::Callbacks for MiriCompilerCalls { let num_failed = sync::IntoDynSyncSend(AtomicU32::new(0)); sync::par_for_each_in(many_seeds.seeds.clone(), |seed| { let mut config = config.clone(); - config.seed = Some(seed.into()); + config.seed = Some((*seed).into()); eprintln!("Trying seed: {seed}"); let return_code = miri::eval_entry(tcx, entry_def_id, entry_type, config) .unwrap_or(rustc_driver::EXIT_FAILURE); diff --git a/src/tools/tidy/src/deps.rs b/src/tools/tidy/src/deps.rs index 81c55ecaa7a29..6924e8256de48 100644 --- a/src/tools/tidy/src/deps.rs +++ b/src/tools/tidy/src/deps.rs @@ -360,7 +360,6 @@ const PERMITTED_RUSTC_DEPENDENCIES: &[&str] = &[ "regex-syntax", "rustc-demangle", "rustc-hash", - "rustc-rayon", "rustc-rayon-core", "rustc-stable-hash", "rustc_apfloat",