-
Notifications
You must be signed in to change notification settings - Fork 577
Closed as not planned
Labels
Description
I'm spawning some tasks using rayon, where each task is partially dependent on the previous task. futures::sync::mpsc together with rayon::spawn_future seemed like a good solution to manage this dependency but I'm having problems with deadlocks.
Running the code provided below with two threads in the rayon threadpool works as expected
$ RAYON_NUM_THREADS=2 RUSTFLAGS='--cfg rayon_unstable' cargo run
4 blocked
3 blocked
2 blocked
1 blocked
0 sent
1 unblocked
1 sent
2 unblocked
2 sent
3 unblocked
3 sent
4 unblocked
4 sent
but with three threads it deadlocks quite reliably
$ RAYON_NUM_THREADS=3 RUSTFLAGS='--cfg rayon_unstable' cargo run
1 blocked
4 blocked
2 blocked
3 blocked
0 sent
extern crate futures;
extern crate rayon;
use futures::{Sink, Stream};
use futures::sync::mpsc as futures_mpsc;
use std::{thread, time};
fn main() {
let size = 5;
rayon::scope(|scope| {
let mut prev_receiver = None;
for i in 0..size {
let (sender, receiver) = futures_mpsc::channel(1);
scope.spawn(move |_| {
do_work(i, prev_receiver.take(), sender);
});
prev_receiver = Some(receiver);
}
});
}
fn do_work(
number: usize,
mut receiver: Option<futures_mpsc::Receiver<()>>,
mut sender: futures_mpsc::Sender<()>,
) {
if let Some(receiver) = receiver.take() {
let receiver_future = receiver.into_future();
println!("{} blocked", number);
let _ = rayon::spawn_future(receiver_future).rayon_wait();
println!("{} unblocked", number);
}
thread::sleep(time::Duration::from_millis(10));
let _ = sender.start_send(());
println!("{} sent", number);
}Reactions are currently unavailable