Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 10 additions & 6 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ use std::thread::{self, JoinHandle};
use std::sync::mpsc::{channel, Sender, Receiver, SyncSender, sync_channel, RecvError};
use std::sync::{Arc, Mutex};
use std::marker::PhantomData;
use std::mem;
use std::mem::{self, ManuallyDrop};

enum Message {
NewJob(Thunk<'static>),
Expand All @@ -76,15 +76,19 @@ type Thunk<'a> = Box<FnBox + Send + 'a>;

impl Drop for Pool {
fn drop(&mut self) {
self.job_sender = None;
// Explicitly drop job_sender before the threads, to ensure
// channel is already closed by the time threads are joined.
unsafe {
ManuallyDrop::drop(&mut self.job_sender);
}
}
}

/// A threadpool that acts as a handle to a number
/// of threads spawned at construction.
pub struct Pool {
threads: Vec<ThreadData>,
job_sender: Option<Sender<Message>>
job_sender: ManuallyDrop<Sender<Message>>,
}

struct ThreadData {
Expand Down Expand Up @@ -162,7 +166,7 @@ impl Pool {

Pool {
threads: threads,
job_sender: Some(job_sender),
job_sender: ManuallyDrop::new(job_sender),
}
}

Expand Down Expand Up @@ -210,13 +214,13 @@ impl<'pool, 'scope> Scope<'pool, 'scope> {
let b = unsafe {
mem::transmute::<Thunk<'scope>, Thunk<'static>>(Box::new(f))
};
self.pool.job_sender.as_ref().unwrap().send(Message::NewJob(b)).unwrap();
self.pool.job_sender.send(Message::NewJob(b)).unwrap();
}

/// Blocks until all currently queued jobs have run to completion.
pub fn join_all(&self) {
for _ in 0..self.pool.threads.len() {
self.pool.job_sender.as_ref().unwrap().send(Message::Join).unwrap();
self.pool.job_sender.send(Message::Join).unwrap();
}

// Synchronize/Join with threads
Expand Down