Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove the use of Rayon iterators #139011

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 1 addition & 13 deletions Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -3154,17 +3154,6 @@ dependencies = [
"tikv-jemalloc-sys",
]

[[package]]
name = "rustc-rayon"
version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2cd9fb077db982d7ceb42a90471e5a69a990b58f71e06f0d8340bb2cf35eb751"
dependencies = [
"either",
"indexmap",
"rustc-rayon-core",
]

[[package]]
name = "rustc-rayon-core"
version = "0.5.0"
Expand Down Expand Up @@ -3536,7 +3525,7 @@ dependencies = [
"parking_lot",
"portable-atomic",
"rustc-hash 2.1.1",
"rustc-rayon",
"rustc-rayon-core",
"rustc-stable-hash",
"rustc_arena",
"rustc_graphviz",
Expand Down Expand Up @@ -3882,7 +3871,6 @@ dependencies = [
name = "rustc_interface"
version = "0.0.0"
dependencies = [
"rustc-rayon",
"rustc-rayon-core",
"rustc_abi",
"rustc_ast",
Expand Down
29 changes: 15 additions & 14 deletions compiler/rustc_codegen_cranelift/src/driver/aot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -728,26 +728,27 @@ pub(crate) fn run_aot(

let concurrency_limiter = IntoDynSyncSend(ConcurrencyLimiter::new(todo_cgus.len()));

let modules = tcx.sess.time("codegen mono items", || {
let mut modules: Vec<_> = par_map(todo_cgus, |(_, cgu)| {
let dep_node = cgu.codegen_dep_node(tcx);
tcx.dep_graph
.with_task(
let modules: Vec<_> =
tcx.sess.time("codegen mono items", || {
let modules: Vec<_> = par_map(todo_cgus, |(_, cgu)| {
let dep_node = cgu.codegen_dep_node(tcx);
let (module, _) = tcx.dep_graph.with_task(
dep_node,
tcx,
(global_asm_config.clone(), cgu.name(), concurrency_limiter.acquire(tcx.dcx())),
module_codegen,
Some(rustc_middle::dep_graph::hash_result),
)
.0
});
modules.extend(
done_cgus
);
IntoDynSyncSend(module)
});
modules
.into_iter()
.map(|(_, cgu)| OngoingModuleCodegen::Sync(reuse_workproduct_for_cgu(tcx, cgu))),
);
modules
});
.map(|module| module.0)
.chain(done_cgus.into_iter().map(|(_, cgu)| {
OngoingModuleCodegen::Sync(reuse_workproduct_for_cgu(tcx, cgu))
}))
.collect()
});

let allocator_module = emit_allocator_module(tcx);

Expand Down
6 changes: 3 additions & 3 deletions compiler/rustc_codegen_ssa/src/base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use rustc_ast::expand::allocator::{ALLOCATOR_METHODS, AllocatorKind, global_fn_n
use rustc_attr_parsing::OptimizeAttr;
use rustc_data_structures::fx::{FxHashMap, FxIndexSet};
use rustc_data_structures::profiling::{get_resident_set_size, print_time_passes_entry};
use rustc_data_structures::sync::par_map;
use rustc_data_structures::sync::{IntoDynSyncSend, par_map};
use rustc_data_structures::unord::UnordMap;
use rustc_hir::def_id::{DefId, LOCAL_CRATE};
use rustc_hir::lang_items::LangItem;
Expand Down Expand Up @@ -757,7 +757,7 @@ pub fn codegen_crate<B: ExtraBackendMethods>(

let pre_compiled_cgus = par_map(cgus, |(i, _)| {
let module = backend.compile_codegen_unit(tcx, codegen_units[i].name());
(i, module)
(i, IntoDynSyncSend(module))
});

total_codegen_time += start_time.elapsed();
Expand All @@ -777,7 +777,7 @@ pub fn codegen_crate<B: ExtraBackendMethods>(
match cgu_reuse {
CguReuse::No => {
let (module, cost) = if let Some(cgu) = pre_compiled_cgus.remove(&i) {
cgu
cgu.0
} else {
let start_time = Instant::now();
let module = backend.compile_codegen_unit(tcx, cgu.name());
Expand Down
2 changes: 1 addition & 1 deletion compiler/rustc_data_structures/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ indexmap = "2.4.0"
jobserver_crate = { version = "0.1.28", package = "jobserver" }
measureme = "12.0.1"
rustc-hash = "2.0.0"
rustc-rayon = { version = "0.5.1", features = ["indexmap"] }
rustc-rayon-core = { version = "0.5.0" }
rustc-stable-hash = { version = "0.1.0", features = ["nightly"] }
rustc_arena = { path = "../rustc_arena" }
rustc_graphviz = { path = "../rustc_graphviz" }
Expand Down
13 changes: 13 additions & 0 deletions compiler/rustc_data_structures/src/marker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,12 @@ impl<T> FromDyn<T> {
FromDyn(val)
}

#[inline(always)]
pub fn derive<O>(&self, val: O) -> FromDyn<O> {
// We already did the check for `sync::is_dyn_thread_safe()` when creating `Self`
FromDyn(val)
}

#[inline(always)]
pub fn into_inner(self) -> T {
self.0
Expand All @@ -200,6 +206,13 @@ impl<T> std::ops::Deref for FromDyn<T> {
}
}

impl<T> std::ops::DerefMut for FromDyn<T> {
#[inline(always)]
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}

// A wrapper to convert a struct that is already a `Send` or `Sync` into
// an instance of `DynSend` and `DynSync`, since the compiler cannot infer
// it automatically in some cases. (e.g. Box<dyn Send / Sync>)
Expand Down
104 changes: 75 additions & 29 deletions compiler/rustc_data_structures/src/sync/parallel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ use std::any::Any;
use std::panic::{AssertUnwindSafe, catch_unwind, resume_unwind};

use parking_lot::Mutex;
use rayon::iter::{FromParallelIterator, IntoParallelIterator, ParallelIterator};

use crate::FatalErrorMarker;
use crate::sync::{DynSend, DynSync, FromDyn, IntoDynSyncSend, mode};
Expand Down Expand Up @@ -97,11 +96,11 @@ macro_rules! parallel {
// This function only works when `mode::is_dyn_thread_safe()`.
pub fn scope<'scope, OP, R>(op: OP) -> R
where
OP: FnOnce(&rayon::Scope<'scope>) -> R + DynSend,
OP: FnOnce(&rayon_core::Scope<'scope>) -> R + DynSend,
R: DynSend,
{
let op = FromDyn::from(op);
rayon::scope(|s| FromDyn::from(op.into_inner()(s))).into_inner()
rayon_core::scope(|s| FromDyn::from(op.into_inner()(s))).into_inner()
}

#[inline]
Expand All @@ -114,7 +113,7 @@ where
let oper_a = FromDyn::from(oper_a);
let oper_b = FromDyn::from(oper_b);
let (a, b) = parallel_guard(|guard| {
rayon::join(
rayon_core::join(
move || guard.run(move || FromDyn::from(oper_a.into_inner()())),
move || guard.run(move || FromDyn::from(oper_b.into_inner()())),
)
Expand All @@ -125,56 +124,103 @@ where
}
}

pub fn par_for_each_in<I, T: IntoIterator<Item = I> + IntoParallelIterator<Item = I>>(
fn par_slice<I: DynSend>(
items: &mut [I],
guard: &ParallelGuard,
for_each: impl Fn(&mut I) + DynSync + DynSend,
) {
struct State<'a, F> {
for_each: FromDyn<F>,
guard: &'a ParallelGuard,
group: usize,
}

fn par_rec<I: DynSend, F: Fn(&mut I) + DynSync + DynSend>(
items: &mut [I],
state: &State<'_, F>,
) {
if items.len() <= state.group {
for item in items {
state.guard.run(|| (state.for_each)(item));
}
} else {
let (left, right) = items.split_at_mut(items.len() / 2);
let mut left = state.for_each.derive(left);
let mut right = state.for_each.derive(right);
rayon_core::join(move || par_rec(*left, state), move || par_rec(*right, state));
}
}

let state = State {
for_each: FromDyn::from(for_each),
guard,
group: std::cmp::max(items.len() / 128, 1),
};
par_rec(items, &state)
}

pub fn par_for_each_in<I: DynSend, T: IntoIterator<Item = I>>(
t: T,
for_each: impl Fn(I) + DynSync + DynSend,
for_each: impl Fn(&I) + DynSync + DynSend,
) {
parallel_guard(|guard| {
if mode::is_dyn_thread_safe() {
let for_each = FromDyn::from(for_each);
t.into_par_iter().for_each(|i| {
guard.run(|| for_each(i));
});
let mut items: Vec<_> = t.into_iter().collect();
par_slice(&mut items, guard, |i| for_each(&*i))
} else {
t.into_iter().for_each(|i| {
guard.run(|| for_each(i));
guard.run(|| for_each(&i));
});
}
});
}

pub fn try_par_for_each_in<
T: IntoIterator + IntoParallelIterator<Item = <T as IntoIterator>::Item>,
E: Send,
>(
/// This runs `for_each` in parallel for each iterator item. If one or more of the
/// `for_each` calls returns `Err`, the function will also return `Err`. The error returned
/// will be non-deterministic, but this is expected to be used with `ErrorGuaranteed` which
/// are all equivalent.
pub fn try_par_for_each_in<T: IntoIterator, E: DynSend>(
t: T,
for_each: impl Fn(<T as IntoIterator>::Item) -> Result<(), E> + DynSync + DynSend,
) -> Result<(), E> {
for_each: impl Fn(&<T as IntoIterator>::Item) -> Result<(), E> + DynSync + DynSend,
) -> Result<(), E>
where
<T as IntoIterator>::Item: DynSend,
{
parallel_guard(|guard| {
if mode::is_dyn_thread_safe() {
let for_each = FromDyn::from(for_each);
t.into_par_iter()
.filter_map(|i| guard.run(|| for_each(i)))
.reduce(|| Ok(()), Result::and)
let mut items: Vec<_> = t.into_iter().collect();

let error = Mutex::new(None);

par_slice(&mut items, guard, |i| {
if let Err(err) = for_each(&*i) {
*error.lock() = Some(err);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It’s pre-existing and maybe benign but somewhat surprising, is it expected that we only handle a single error in this function? Either way, this looks to now return a random error?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah. It's only used with ErrorGuaranteed. All the errors are equivalent.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool. Let's add that explanation as a comment.

}
});

if let Some(err) = error.into_inner() { Err(err) } else { Ok(()) }
} else {
t.into_iter().filter_map(|i| guard.run(|| for_each(i))).fold(Ok(()), Result::and)
t.into_iter().filter_map(|i| guard.run(|| for_each(&i))).fold(Ok(()), Result::and)
}
})
}

pub fn par_map<
I,
T: IntoIterator<Item = I> + IntoParallelIterator<Item = I>,
R: std::marker::Send,
C: FromIterator<R> + FromParallelIterator<R>,
>(
pub fn par_map<I: DynSend, T: IntoIterator<Item = I>, R: DynSend, C: FromIterator<R>>(
t: T,
map: impl Fn(I) -> R + DynSync + DynSend,
) -> C {
parallel_guard(|guard| {
if mode::is_dyn_thread_safe() {
let map = FromDyn::from(map);
t.into_par_iter().filter_map(|i| guard.run(|| map(i))).collect()

let mut items: Vec<(Option<I>, Option<R>)> =
t.into_iter().map(|i| (Some(i), None)).collect();

par_slice(&mut items, guard, |i| {
i.1 = Some(map(i.0.take().unwrap()));
});

items.into_iter().filter_map(|i| i.1).collect()
} else {
t.into_iter().filter_map(|i| guard.run(|| map(i))).collect()
}
Expand Down
1 change: 0 additions & 1 deletion compiler/rustc_interface/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ edition = "2024"

[dependencies]
# tidy-alphabetical-start
rustc-rayon = { version = "0.5.0" }
rustc-rayon-core = { version = "0.5.0" }
rustc_ast = { path = "../rustc_ast" }
rustc_ast_lowering = { path = "../rustc_ast_lowering" }
Expand Down
8 changes: 5 additions & 3 deletions compiler/rustc_interface/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ pub(crate) fn run_in_thread_pool_with_globals<F: FnOnce(CurrentGcx) -> R + Send,
let current_gcx = FromDyn::from(CurrentGcx::new());
let current_gcx2 = current_gcx.clone();

let builder = rayon::ThreadPoolBuilder::new()
let builder = rayon_core::ThreadPoolBuilder::new()
.thread_name(|_| "rustc".to_string())
.acquire_thread_handler(jobserver::acquire_thread)
.release_thread_handler(jobserver::release_thread)
Expand Down Expand Up @@ -223,7 +223,7 @@ pub(crate) fn run_in_thread_pool_with_globals<F: FnOnce(CurrentGcx) -> R + Send,
builder
.build_scoped(
// Initialize each new worker thread when created.
move |thread: rayon::ThreadBuilder| {
move |thread: rayon_core::ThreadBuilder| {
// Register the thread for use with the `WorkerLocal` type.
registry.register();

Expand All @@ -232,7 +232,9 @@ pub(crate) fn run_in_thread_pool_with_globals<F: FnOnce(CurrentGcx) -> R + Send,
})
},
// Run `f` on the first thread in the thread pool.
move |pool: &rayon::ThreadPool| pool.install(|| f(current_gcx.into_inner())),
move |pool: &rayon_core::ThreadPool| {
pool.install(|| f(current_gcx.into_inner()))
},
)
.unwrap()
})
Expand Down
2 changes: 1 addition & 1 deletion compiler/rustc_metadata/src/rmeta/encoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2192,7 +2192,7 @@ fn prefetch_mir(tcx: TyCtxt<'_>) {
}

let reachable_set = tcx.reachable_set(());
par_for_each_in(tcx.mir_keys(()), |&def_id| {
par_for_each_in(tcx.mir_keys(()), |&&def_id| {
let (encode_const, encode_opt) = should_encode_mir(tcx, reachable_set, def_id);

if encode_const {
Expand Down
2 changes: 1 addition & 1 deletion compiler/rustc_middle/src/hir/map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,7 @@ impl<'tcx> TyCtxt<'tcx> {

#[inline]
pub fn par_hir_body_owners(self, f: impl Fn(LocalDefId) + DynSend + DynSync) {
par_for_each_in(&self.hir_crate_items(()).body_owners[..], |&def_id| f(def_id));
par_for_each_in(&self.hir_crate_items(()).body_owners[..], |&&def_id| f(def_id));
}

pub fn hir_ty_param_owner(self, def_id: LocalDefId) -> LocalDefId {
Expand Down
10 changes: 5 additions & 5 deletions compiler/rustc_middle/src/hir/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,35 +83,35 @@ impl ModuleItems {
&self,
f: impl Fn(ItemId) -> Result<(), ErrorGuaranteed> + DynSend + DynSync,
) -> Result<(), ErrorGuaranteed> {
try_par_for_each_in(&self.free_items[..], |&id| f(id))
try_par_for_each_in(&self.free_items[..], |&&id| f(id))
}

pub fn par_trait_items(
&self,
f: impl Fn(TraitItemId) -> Result<(), ErrorGuaranteed> + DynSend + DynSync,
) -> Result<(), ErrorGuaranteed> {
try_par_for_each_in(&self.trait_items[..], |&id| f(id))
try_par_for_each_in(&self.trait_items[..], |&&id| f(id))
}

pub fn par_impl_items(
&self,
f: impl Fn(ImplItemId) -> Result<(), ErrorGuaranteed> + DynSend + DynSync,
) -> Result<(), ErrorGuaranteed> {
try_par_for_each_in(&self.impl_items[..], |&id| f(id))
try_par_for_each_in(&self.impl_items[..], |&&id| f(id))
}

pub fn par_foreign_items(
&self,
f: impl Fn(ForeignItemId) -> Result<(), ErrorGuaranteed> + DynSend + DynSync,
) -> Result<(), ErrorGuaranteed> {
try_par_for_each_in(&self.foreign_items[..], |&id| f(id))
try_par_for_each_in(&self.foreign_items[..], |&&id| f(id))
}

pub fn par_opaques(
&self,
f: impl Fn(LocalDefId) -> Result<(), ErrorGuaranteed> + DynSend + DynSync,
) -> Result<(), ErrorGuaranteed> {
try_par_for_each_in(&self.opaques[..], |&id| f(id))
try_par_for_each_in(&self.opaques[..], |&&id| f(id))
}
}

Expand Down
Loading
Loading