diff --git a/src/lib.rs b/src/lib.rs index 1fc01ea..1b101ab 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -55,7 +55,7 @@ use std::thread::{self, JoinHandle}; use std::sync::mpsc::{channel, Sender, Receiver, SyncSender, sync_channel, RecvError}; use std::sync::{Arc, Mutex}; use std::marker::PhantomData; -use std::mem; +use std::mem::{self, ManuallyDrop}; enum Message { NewJob(Thunk<'static>), @@ -76,7 +76,11 @@ type Thunk<'a> = Box; impl Drop for Pool { fn drop(&mut self) { - self.job_sender = None; + // Explicitly drop job_sender before the threads, to ensure + // channel is already closed by the time threads are joined. + unsafe { + ManuallyDrop::drop(&mut self.job_sender); + } } } @@ -84,7 +88,7 @@ impl Drop for Pool { /// of threads spawned at construction. pub struct Pool { threads: Vec, - job_sender: Option> + job_sender: ManuallyDrop>, } struct ThreadData { @@ -162,7 +166,7 @@ impl Pool { Pool { threads: threads, - job_sender: Some(job_sender), + job_sender: ManuallyDrop::new(job_sender), } } @@ -210,13 +214,13 @@ impl<'pool, 'scope> Scope<'pool, 'scope> { let b = unsafe { mem::transmute::, Thunk<'static>>(Box::new(f)) }; - self.pool.job_sender.as_ref().unwrap().send(Message::NewJob(b)).unwrap(); + self.pool.job_sender.send(Message::NewJob(b)).unwrap(); } /// Blocks until all currently queued jobs have run to completion. pub fn join_all(&self) { for _ in 0..self.pool.threads.len() { - self.pool.job_sender.as_ref().unwrap().send(Message::Join).unwrap(); + self.pool.job_sender.send(Message::Join).unwrap(); } // Synchronize/Join with threads