diff --git a/Cargo.toml b/Cargo.toml index 94f7903..37272b7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,9 +1,8 @@ [package] name = "scoped_threadpool" version = "0.1.6" -authors = ["Marvin Löbel "] +authors = ["Marvin Löbel ", "Popog"] license = "MIT" -build = "build.rs" description = "A library for scoped and cached threadpools." readme = "README.md" @@ -12,11 +11,8 @@ documentation = "http://kimundi.github.io/scoped-threadpool-rs/scoped_threadpool repository = "https://github.com/Kimundi/scoped-threadpool-rs" keywords = ["thread", "scoped", "pool", "cached", "threadpool"] -[build-dependencies] -rustc_version = "0.1" - -[dev-dependencies] -lazy_static = "*" +[dependencies] +crossbeam = "0.2.4" [features] nightly = [] diff --git a/README.md b/README.md index 9ac8f2a..70b7470 100644 --- a/README.md +++ b/README.md @@ -32,7 +32,7 @@ use scoped_threadpool::Pool; fn main() { // Create a threadpool holding 4 threads - let mut pool = Pool::new(4); + let pool = Pool::new(4); let mut vec = vec![0, 1, 2, 3, 4, 5, 6, 7]; @@ -41,13 +41,11 @@ fn main() { pool.scoped(|scoped| { // Create references to each element in the vector ... for e in &mut vec { - // ... and add 1 to it in a seperate thread + // ... and add 1 to it in a separate thread // (execute() is safe to call in nightly) - unsafe { - scoped.execute(move || { - *e += 1; - }); - } + scoped.execute(move || { + *e += 1; + }); } }); diff --git a/benches/lib.rs b/benches/lib.rs new file mode 100644 index 0000000..bfc2133 --- /dev/null +++ b/benches/lib.rs @@ -0,0 +1,125 @@ +#![feature(test)] +#![feature(const_fn)] + +extern crate test; +#[macro_use] +extern crate scoped_threadpool; + +use self::test::{Bencher, black_box}; +use scoped_threadpool::{Pool, Anchored}; + +// const MS_SLEEP_PER_OP: u32 = 1; + +fn fib(n: u64) -> u64 { + let mut prev_prev: u64 = 1; + let mut prev = 1; + let mut current = 1; + for _ in 2..(n+1) { + current = prev_prev.wrapping_add(prev); + prev_prev = prev; + prev = current; + } + current +} + +fn threads_interleaved_n(pool: &Pool) { + let size = 1024; // 1kiB + + let mut data = vec![1u8; size]; + pool.scoped(|s| { + for e in data.iter_mut() { + s.execute(move || { + *e += fib(black_box(1000 * (*e as u64))) as u8; + for i in 0..10000 { black_box(i); } + //thread::sleep_ms(MS_SLEEP_PER_OP); + }); + } + }); +} + +#[bench] +fn threads_interleaved_1(b: &mut Bencher) { + let pool = Pool::new(1); + b.iter(|| threads_interleaved_n(&pool)) +} + +#[bench] +fn threads_interleaved_2(b: &mut Bencher) { + let pool = Pool::new(2); + b.iter(|| threads_interleaved_n(&pool)) +} + +#[bench] +fn threads_interleaved_4(b: &mut Bencher) { + let pool = Pool::new(4); + b.iter(|| threads_interleaved_n(&pool)) +} + +#[bench] +fn threads_interleaved_8(b: &mut Bencher) { + let pool = Pool::new(8); + b.iter(|| threads_interleaved_n(&pool)) +} + +fn threads_chunked_n(pool: &Pool) { + // Set this to 1GB and 40 to get good but slooow results + let size = 1024 * 1024 * 10 / 4; // 10MiB + let bb_repeat = 50; + + let mut data = vec![0u32; size]; + let n = pool.threads(); + pool.scoped(|s| { + let l = (data.len() - 1) / n as usize + 1; + for es in data.chunks_mut(l) { + s.execute(move || { + if es.len() > 1 { + es[0] = 1; + es[1] = 1; + for i in 2..es.len() { + // Fibonnaci gets big fast, + // so just wrap around all the time + es[i] = black_box(es[i-1].wrapping_add(es[i-2])); + for i in 0..bb_repeat { black_box(i); } + } + } + //thread::sleep_ms(MS_SLEEP_PER_OP); + }); + } + }); +} + +#[bench] +fn threads_chunked_1(b: &mut Bencher) { + let pool = Pool::new(1); + b.iter(|| threads_chunked_n(&pool)) +} + +#[bench] +fn threads_chunked_2(b: &mut Bencher) { + let pool = Pool::new(2); + b.iter(|| threads_chunked_n(&pool)) +} + +#[bench] +fn threads_chunked_3(b: &mut Bencher) { + let pool = Pool::new(3); + b.iter(|| threads_chunked_n(&pool)) +} + +#[bench] +fn threads_chunked_4(b: &mut Bencher) { + let pool = Pool::new(4); + b.iter(|| threads_chunked_n(&pool)) +} + +#[bench] +fn threads_chunked_5(b: &mut Bencher) { + let pool = Pool::new(5); + b.iter(|| threads_chunked_n(&pool)) +} + +#[bench] +fn threads_chunked_8(b: &mut Bencher) { + let pool = Pool::new(8); + b.iter(|| threads_chunked_n(&pool)) +} diff --git a/build.rs b/build.rs deleted file mode 100644 index d7788f4..0000000 --- a/build.rs +++ /dev/null @@ -1,7 +0,0 @@ -extern crate rustc_version; - -fn main() { - if rustc_version::version_matches(">= 1.4.0") { - println!("cargo:rustc-cfg=compiler_has_scoped_bugfix"); - } -} diff --git a/src/lib.rs b/src/lib.rs index afa1b83..636da82 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -19,7 +19,7 @@ //! //! fn main() { //! // Create a threadpool holding 4 threads -//! let mut pool = Pool::new(4); +//! let pool = Pool::new(4); //! //! let mut vec = vec![0, 1, 2, 3, 4, 5, 6, 7]; //! @@ -28,13 +28,10 @@ //! pool.scoped(|scope| { //! // Create references to each element in the vector ... //! for e in &mut vec { -//! // ... and add 1 to it in a seperate thread -//! // (execute() is safe to call in nightly) -//! unsafe { -//! scope.execute(move || { -//! *e += 1; -//! }); -//! } +//! // ... and add 1 to it in a separate thread +//! scope.execute(move || { +//! *e += 1; +//! }); //! } //! }); //! @@ -42,240 +39,524 @@ //! } //! ``` -#![cfg_attr(all(feature="nightly", test), feature(test))] -#![cfg_attr(feature="nightly", feature(const_fn))] - #![warn(missing_docs)] -#[macro_use] -#[cfg(test)] -extern crate lazy_static; +extern crate crossbeam; -use std::thread::{self, JoinHandle}; -use std::sync::mpsc::{channel, Sender, Receiver, SyncSender, sync_channel}; -use std::sync::{Arc, Mutex}; +use std::borrow::{Borrow, BorrowMut}; use std::marker::PhantomData; use std::mem; +use std::ops::{Deref, DerefMut}; +use std::sync::atomic::AtomicUsize; +use std::sync::atomic::Ordering::Relaxed; +use std::sync::{Arc, Mutex, Condvar}; +use std::thread::{self}; +use std::usize; + +use crossbeam::sync::MsQueue; + +struct Semaphore { + lock: Mutex<(usize, bool)>, + cvar: Condvar, +} -enum Message { - NewJob(Thunk<'static>), - Join, +impl Semaphore { + fn new(count: usize) -> Semaphore { + Semaphore { + lock: Mutex::new((count, false)), + cvar: Condvar::new(), + } + } + + fn panicked(&self) -> bool { + self.lock.lock().unwrap().1 + } + + fn acquire(&self, count: usize) -> bool { + let mut v = self.lock.lock().unwrap(); + while v.0 < count { + v = self.cvar.wait(v).unwrap(); + } + v.0 -= count; + v.1 |= thread::panicking(); + v.1 + } + + fn acquire_all(&self) -> (usize, bool) { + let mut v = self.lock.lock().unwrap(); + v.1 |= thread::panicking(); + (mem::replace(&mut v.0, 0), v.1) + } + + fn release(&self) { + let mut v = self.lock.lock().unwrap(); + v.0 += 1; + v.1 |= thread::panicking(); + self.cvar.notify_one(); + } } -trait FnBox { - fn call_box(self: Box); +// d88b .d88b. d8888b. +// `8P' .8P Y8. 88 `8D +// 88 88 88 88oooY' +// 88 88 88 88~~~b. +// db. 88 `8b d8' 88 8D +// Y8888P `Y88P' Y8888P' + +/// TODO: Convert to std::boxed::FnBox. Blocked by rust-lang/rust#28796 +trait FnBox { + fn call_box(self: Box, a: A) -> R; } -impl FnBox for F { - fn call_box(self: Box) { +impl FnBox<(), ()> for F { + fn call_box(self: Box, _: ()) { (*self)() } } -type Thunk<'a> = Box; +struct SemaphoreGuard<'pool>(&'pool Semaphore); -impl Drop for Pool { +impl <'pool> Drop for SemaphoreGuard<'pool> { fn drop(&mut self) { - self.job_sender = None; + self.0.release() } } -/// A threadpool that acts as a handle to a number -/// of threads spawned at construction. -pub struct Pool { - threads: Vec, - job_sender: Option> +type Thunk<'a> = Box + Send + 'a>; + +enum Message { + Job(Thunk<'static>), + ScopedJob { + job: Thunk<'static>, + semaphore: &'static Semaphore, + }, + Remove, + Dropping, } -struct ThreadData { - _thread_join_handle: JoinHandle<()>, - pool_sync_rx: Receiver<()>, - thread_sync_tx: SyncSender<()>, +// .d8888. d88888b d8b db d888888b d888888b d8b db d88888b db +// 88' YP 88' 888o 88 `~~88~~' `88' 888o 88 88' 88 +// `8bo. 88ooooo 88V8o 88 88 88 88V8o 88 88ooooo 88 +// `Y8b. 88~~~~~ 88 V8o88 88 88 88 V8o88 88~~~~~ 88 +// db 8D 88. 88 V888 88 .88. 88 V888 88. 88booo. +// `8888Y' Y88888P VP V8P YP Y888888P VP V8P Y88888P Y88888P + +struct Sentinel { + pool: Arc, + respawn: bool, +} + +impl Sentinel { + fn new(pool: Arc) { + let builder = thread::Builder::new(); + let builder = if let Some(stack_size) = pool.stack_size { + builder.stack_size(stack_size) + } else { builder }; + + builder.spawn(move || { + // Will spawn a new thread on panic unless it is cancelled. + let sentinel = Sentinel { pool: pool, respawn: true }; + + loop { + match sentinel.pool.jobs.pop() { + // Execute the job and send the response + Message::Job(job) => job.call_box(()), + // Execute the job and send the response + Message::ScopedJob{job, semaphore} => { + // If the scope is panicked, don't execute jobs from it + if !semaphore.panicked() { + let _guard = SemaphoreGuard(semaphore); + job.call_box(()); + } + }, + // A message to kill a single thread + Message::Remove => { + sentinel.die(); + return + }, + // A message the pool is being dropped. + Message::Dropping => { + sentinel.pool.threads.release(); + sentinel.die(); + return + }, + } + } + }).unwrap(); + } + + // Destroy this sentinel and let the thread die. + fn die(mut self) { + self.respawn = false; + } +} + +impl Drop for Sentinel { + fn drop(&mut self) { + if self.respawn { Sentinel::new(self.pool.clone()) } + } +} + +// d8888b. db db d888888b db d8888b. d88888b d8888b. +// 88 `8D 88 88 `88' 88 88 `8D 88' 88 `8D +// 88oooY' 88 88 88 88 88 88 88ooooo 88oobY' +// 88~~~b. 88 88 88 88 88 88 88~~~~~ 88`8b +// 88 8D 88b d88 .88. 88booo. 88 .8D 88. 88 `88. +// Y8888P' ~Y8888P' Y888888P Y88888P Y8888D' Y88888P 88 YD + +/// Pool configuration. Provides detailed control over the properties and behavior of new threads. +pub struct Builder { + // The size of the stack for the spawned thread + stack_size: Option, +} + +impl Builder { + /// Generates the base configuration for constructing a threadpool, from which configuration + /// methods can be chained. + pub fn new() -> Builder { + Builder { + stack_size: None, + } + } + + /// Sets the size of the stack for the threadpool threads. + pub fn stack_size(mut self, size: usize) -> Self { + self.stack_size = Some(size); + self + } + + /// Construct a threadpool with the given number of threads. + pub fn build(self, threads: usize) -> Pool { + let Builder { stack_size } = self; + + let shared = Arc::new(PoolShared{ + threads: Semaphore::new(0), + stack_size: stack_size, + jobs: MsQueue::new(), + }); + + // Spawn n threads, put them in waiting mode + for _ in 0..threads { Sentinel::new(shared.clone()) } + + Pool{ + shared: shared, + count: AtomicUsize::new(threads), + } + } +} + + + +// d8888b. .d88b. .d88b. db +// 88 `8D .8P Y8. .8P Y8. 88 +// 88oodD' 88 88 88 88 88 +// 88~~~ 88 88 88 88 88 +// 88 `8b d8' `8b d8' 88booo. +// 88 `Y88P' `Y88P' Y88888P + +struct PoolShared { + threads: Semaphore, + stack_size: Option, + jobs: MsQueue, +} + +/// A threadpool that acts as a handle to a number of threads spawned at construction. +// `Drop` will not block. +pub struct Pool { + shared: Arc, + count: AtomicUsize, } impl Pool { /// Construct a threadpool with the given number of threads. - /// Minimum value is `1`. - pub fn new(n: u32) -> Pool { - assert!(n >= 1); + pub fn new(threads: usize) -> Self { Builder::new().build(threads) } + + /// Adds threads to the pool, returning the previous number of threads. The number of threads + /// will not overflow. + pub fn add_threads(&self, val: usize) -> usize { + let mut current = self.count.load(Relaxed); + if val == 0 { return current; } + + loop { + let old = self.count.compare_and_swap(current, current.saturating_add(val), Relaxed); + if current == old { break } + current = old; + } - let (job_sender, job_receiver) = channel(); - let job_receiver = Arc::new(Mutex::new(job_receiver)); + // Spawn n threads, put them in waiting mode + for _ in 0..(current.saturating_add(val) - current) { + Sentinel::new(self.shared.clone()) + } - let mut threads = Vec::with_capacity(n as usize); + current + } - // spawn n threads, put them in waiting mode - for _ in 0..n { - let job_receiver = job_receiver.clone(); - - let (pool_sync_tx, pool_sync_rx) = - sync_channel::<()>(0); - let (thread_sync_tx, thread_sync_rx) = - sync_channel::<()>(0); - - let thread = thread::spawn(move || { - loop { - let message = { - // Only lock jobs for the time it takes - // to get a job, not run it. - let lock = job_receiver.lock().unwrap(); - lock.recv() - }; - - match message { - Ok(Message::NewJob(job)) => { - job.call_box(); - } - Ok(Message::Join) => { - // Syncronize/Join with pool. - // This has to be a two step - // process to ensure that all threads - // finished their work before the pool - // can continue - - // Wait until the pool started syncing with threads - if pool_sync_tx.send(()).is_err() { - // The pool was dropped. - break; - } - - // Wait until the pool finished syncing with threads - if thread_sync_rx.recv().is_err() { - // The pool was dropped. - break; - } - } - Err(..) => { - // The pool was dropped. - break - } - } - } - }); + /// Removes threads from the pool, returning the previous number of threads. The number of + /// threads will not underflow. + pub fn sub_threads(&self, val: usize) -> usize { + let mut current = self.count.load(Relaxed); + if val == 0 { return current; } - threads.push(ThreadData { - _thread_join_handle: thread, - pool_sync_rx: pool_sync_rx, - thread_sync_tx: thread_sync_tx, - }); + loop { + let old = self.count.compare_and_swap(current, current.saturating_sub(val), Relaxed); + if current == old { break } + current = old; } - Pool { - threads: threads, - job_sender: Some(job_sender), + // Send kill messages to threads + for _ in 0..(current - current.saturating_sub(val)) { + self.shared.jobs.push(Message::Remove); } + + current } - /// Borrows the pool and allows executing jobs on other - /// threads during that scope via the argument of the closure. - /// - /// This method will block until the closure and all its jobs have - /// run to completion. - pub fn scoped<'pool, 'scope, F, R>(&'pool mut self, f: F) -> R - where F: FnOnce(&Scope<'pool, 'scope>) -> R - { - let scope = Scope { - pool: self, - _marker: PhantomData, - }; - f(&scope) - } - - /// Returns the number of threads inside this pool. - pub fn thread_count(&self) -> u32 { - self.threads.len() as u32 + /// Sets the number of threads in the pool to the value requested, adding and removing threads + /// as needed. Returns the old number of threads + pub fn set_threads(&self, new: usize) -> usize { + let old = self.count.swap(new, Relaxed); + + if new > old { + // Spawn n threads, put them in waiting mode + for _ in 0..(new-old) { + Sentinel::new(self.shared.clone()) + } + } else { + // Send kill messages to threads + for _ in 0..(old-new) { + self.shared.jobs.push(Message::Remove); + } + } + + old } -} -///////////////////////////////////////////////////////////////////////////// + /// Sets the number of threads if the current value is the same as the `current` value. + /// + /// The return value is always the previous value. If it is equal to current, then the number + /// of threads was updated. + pub fn compare_and_swap_threads(&self, current: usize, new: usize) -> usize { + let old = self.count.compare_and_swap(current, new, Relaxed); + + // If current is equal, perform changes + if current == old { + if new > old { + // Spawn n threads, put them in waiting mode + for _ in 0..(new-old) { + Sentinel::new(self.shared.clone()) + } + } else { + // Send kill messages to threads + for _ in 0..(old-new) { + self.shared.jobs.push(Message::Remove); + } + } + } + + old + } -/// Handle to the scope during which the threadpool is borrowed. -pub struct Scope<'pool, 'scope> { - pool: &'pool mut Pool, - // The 'scope needs to be invariant... it seems? - _marker: PhantomData<::std::cell::Cell<&'scope mut ()>>, -} + /// Gets the number of threads. The number of threads may change immediately after this call + /// is made. + pub fn threads(&self) -> usize { + self.count.load(Relaxed) + } -impl<'pool, 'scope> Scope<'pool, 'scope> { - /// Execute a job on the threadpool. + /// Submits a static job for execution on the threadpool. /// - /// The body of the closure will be send to one of the - /// internal threads, and this method itself will not wait - /// for its completion. - #[cfg(not(compiler_has_scoped_bugfix))] - pub unsafe fn execute(&self, f: F) where F: FnOnce() + Send + 'scope { - self.execute_(f) + /// The body of the closure will be send to one of the internal threads, and this method itself + /// will not wait for its completion. + pub fn execute(&self, f: F) { + self.shared.jobs.push(Message::Job(Box::new(f))); } - /// Execute a job on the threadpool. + /// Borrows the pool and allows executing jobs on other threads during that scope via the + /// argument of the closure. /// - /// The body of the closure will be send to one of the - /// internal threads, and this method itself will not wait - /// for its completion. - #[cfg(compiler_has_scoped_bugfix)] - pub fn execute(&self, f: F) where F: FnOnce() + Send + 'scope { - self.execute_(f) + /// This method will block until the closure and all its jobs have run to completion. + pub fn scoped<'pool, R, F: FnOnce(&mut Scope<'pool>) -> R>(&'pool self, f: F) -> R { + let mut scope = Scope::new(&self.shared.jobs); + f(&mut scope) } +} - fn execute_(&self, f: F) where F: FnOnce() + Send + 'scope { - let b = unsafe { - mem::transmute::, Thunk<'static>>(Box::new(f)) - }; - self.pool.job_sender.as_ref().unwrap().send(Message::NewJob(b)).unwrap(); +impl Drop for Pool { + fn drop(&mut self) { + // Kill all the threads, but don't bother blocking + self.set_threads(0); } +} + +// .d8b. d8b db .o88b. db db .d88b. d8888b. d88888b d8888b. +// d8' `8b 888o 88 d8P Y8 88 88 .8P Y8. 88 `8D 88' 88 `8D +// 88ooo88 88V8o 88 8P 88ooo88 88 88 88oobY' 88ooooo 88 88 +// 88~~~88 88 V8o88 8b 88~~~88 88 88 88`8b 88~~~~~ 88 88 +// 88 88 88 V888 Y8b d8 88 88 `8b d8' 88 `88. 88. 88 .8D +// YP YP VP V8P `Y88P' YP YP `Y88P' 88 YD Y88888P Y8888D' + +/// An anchored thread pool that will block on `Drop` until all threads are cleaned up. +pub struct Anchored { + pool: Pool, + _marker: PhantomData<* const ()>, // TODO: Convert to !Send. Blocked by rust-lang/rust#13231 +} + +// TODO: Convert to !Send. Blocked by rust-lang/rust#13231 +unsafe impl Sync for Anchored {} + +impl Anchored { + /// Construct a threadpool with the given number of threads. + pub fn new(n: usize) -> Self { Anchored{ pool: Pool::new(n), _marker: PhantomData } } +} - /// Blocks until all currently queued jobs have run to completion. - pub fn join_all(&self) { - for _ in 0..self.pool.threads.len() { - self.pool.job_sender.as_ref().unwrap().send(Message::Join).unwrap(); +impl Deref for Anchored { + type Target = Pool; + fn deref(&self) -> &Self::Target { &self.pool } +} + +impl DerefMut for Anchored { + fn deref_mut(&mut self) -> &mut Self::Target { &mut self.pool } +} + +impl AsRef for Anchored { + fn as_ref(&self) -> &Pool { &self.pool } +} + +impl AsMut for Anchored { + fn as_mut(&mut self) -> &mut Pool { &mut self.pool } +} + +impl Borrow for Anchored { + fn borrow(&self) -> &Pool { &self.pool } +} + +impl BorrowMut for Anchored { + fn borrow_mut(&mut self) -> &mut Pool { &mut self.pool } +} + +impl Drop for Anchored { + fn drop(&mut self) { + // See how many threads we have + let n = self.pool.count.swap(0, Relaxed); + + // Send drop messages to threads + for _ in 0..n { + self.pool.shared.jobs.push(Message::Dropping); + } + + if !thread::panicking() { + self.pool.shared.threads.acquire(n); } + } +} + +// .d8888. .o88b. .d88b. d8888b. d88888b +// 88' YP d8P Y8 .8P Y8. 88 `8D 88' +// `8bo. 8P 88 88 88oodD' 88ooooo +// `Y8b. 8b 88 88 88~~~ 88~~~~~ +// db 8D Y8b d8 `8b d8' 88 88. +// `8888Y' `Y88P' `Y88P' 88 Y88888P + +/// Handle to the scope. +pub struct Scope<'pool> { + jobs: &'pool MsQueue, + job_count: usize, + semaphore: Semaphore, +} - // Syncronize/Join with threads - // This has to be a two step process - // to make sure _all_ threads received _one_ Join message each. +impl <'pool> Scope<'pool> { + fn new(jobs: &'pool MsQueue) -> Self { + Scope{ + jobs: jobs, + job_count: 0, + semaphore: Semaphore::new(0), + } + } - // This loop will block on every thread until it - // received and reacted to its Join message. - for thread_data in &self.pool.threads { - thread_data.pool_sync_rx.recv().unwrap(); + /// Submit a job for execution on the threadpool. + /// + /// The body of the closure will be send to one of the internal threads, and this method + /// itself will not wait for its completion. + pub fn execute<'scope, F: 'scope+Send+FnOnce()>(&'scope mut self, f: F) { + // Check the current state and count any completed jobs + let (finished, panicked) = self.semaphore.acquire_all(); + self.job_count -= finished; + + // Block job submission if a panic occurs. Also panic if needed. + if panicked { + if thread::panicking() { + return; + } else { + panic!("Scoped thread panicked"); + } } - // Once all threads joined the jobs, send them a continue message - for thread_data in &self.pool.threads { - thread_data.thread_sync_tx.send(()).unwrap(); + // Block job overflow + if self.job_count == usize::MAX { + panic!("Job overflow, max jobs is {}", usize::MAX) + } + self.job_count += 1; + + let f = Box::new(f); + self.jobs.push(Message::ScopedJob{ + job: unsafe { + mem::transmute::, Thunk<'static>>(f) + }, + semaphore: unsafe { + mem::transmute::<&'scope Semaphore, &'static Semaphore>(&self.semaphore) + }, + }); + } + + /// Blocks until all submitted jobs have run to completion. + pub fn wait_all(&mut self) { + // Wait for job completion. + let panicked = self.semaphore.acquire(mem::replace(&mut self.job_count, 0)); + + // Panic if needed + if panicked { + if thread::panicking() { + return; + } else { + panic!("Scoped thread panicked"); + } } } } -impl<'pool, 'scope> Drop for Scope<'pool, 'scope> { +impl <'pool> Drop for Scope<'pool> { fn drop(&mut self) { - self.join_all(); + self.wait_all(); } } -///////////////////////////////////////////////////////////////////////////// +// d888888b d88888b .d8888. d888888b +// `~~88~~' 88' 88' YP `~~88~~' +// 88 88ooooo `8bo. 88 +// 88 88~~~~~ `Y8b. 88 +// 88 88. db 8D 88 +// YP Y88888P `8888Y' YP #[cfg(test)] mod tests { - #![cfg_attr(feature="nightly", allow(unused_unsafe))] - - use super::Pool; + use super::{Pool, Anchored}; + use std::borrow::Borrow; use std::thread; - use std::sync; + use std::time::Duration; + use std::sync::mpsc::{channel, sync_channel}; - #[test] - fn smoketest() { - let mut pool = Pool::new(4); + #[test] fn smoketest_pool() { smoketest(Pool::new(4)) } + #[test] fn smoketest_anchored() { smoketest(Anchored::new(4)) } + + fn smoketest>(pool: P) { + let pool = pool.borrow(); for i in 1..7 { let mut vec = vec![0, 1, 2, 3, 4]; pool.scoped(|s| { for e in vec.iter_mut() { - unsafe { - s.execute(move || { - *e += i; - }); - } + s.execute(move || { + *e += i; + }); } }); @@ -288,201 +569,154 @@ mod tests { } } - #[test] - #[should_panic] - fn thread_panic() { - let mut pool = Pool::new(4); + #[test] fn add_pool() { add(Pool::new(0)) } + #[test] fn add_anchored() { add(Anchored::new(0)) } + + fn add>(pool: P) { + let pool = pool.borrow(); + pool.add_threads(1); pool.scoped(|scoped| { - unsafe { - scoped.execute(move || { - panic!() - }); - } + let pool = &pool; + scoped.execute(move || { + pool.sub_threads(1); + }); }); } - #[test] - #[should_panic] - fn scope_panic() { - let mut pool = Pool::new(4); - pool.scoped(|_scoped| { - panic!() + #[test] fn remove_pool() { remove(Pool::new(2)) } + #[test] fn remove_anchored() { remove(Anchored::new(2)) } + + fn remove>(pool: P) { + let pool = pool.borrow(); + pool.sub_threads(1); + pool.scoped(|scoped| { + scoped.execute(move || { + }); }); } - #[test] - #[should_panic] - fn pool_panic() { - let _pool = Pool::new(4); - panic!() + #[test] fn remove_underflow_pool() { remove_underflow(Pool::new(0)) } + #[test] fn remove_underflow_anchored() { remove_underflow(Anchored::new(0)) } + + fn remove_underflow>(pool: P) { + let pool = pool.borrow(); + pool.sub_threads(1); + assert_eq!(pool.threads(), 0); } - #[test] - fn join_all() { - let mut pool = Pool::new(4); - let (tx_, rx) = sync::mpsc::channel(); + #[test] fn double_borrow_pool() { double_borrow(Pool::new(2)) } + #[test] fn double_borrow_anchored() { double_borrow(Anchored::new(2)) } - pool.scoped(|scoped| { - let tx = tx_.clone(); - unsafe { - scoped.execute(move || { - thread::sleep_ms(1000); - tx.send(2).unwrap(); - }); - } - - let tx = tx_.clone(); - unsafe { - scoped.execute(move || { - tx.send(1).unwrap(); + fn double_borrow>(pool: P) { + let pool = pool.borrow(); + pool.scoped(|s| { + let pool = &pool; + s.execute(move || { + pool.scoped(|s2| { + let (tx, rx) = sync_channel(0); + s2.execute(move || { + tx.send(1).unwrap(); + }); + rx.recv().unwrap(); }); - } + }); + }); + } - scoped.join_all(); + #[test] fn thread_panic_pool() { thread_panic(Pool::new(2)) } + #[test] fn thread_panic_anchored() { thread_panic(Anchored::new(2)) } - let tx = tx_.clone(); - unsafe { - scoped.execute(move || { - tx.send(3).unwrap(); - }); - } + fn thread_panic>(pool: P) { + let pool = pool.borrow(); + let (tx, rx) = channel(); + pool.execute(move || { + let _tx = tx; + if false { let _ = _tx.send(()); } + panic!(); }); - - assert_eq!(rx.iter().take(3).collect::>(), vec![1, 2, 3]); + rx.recv().unwrap_err(); } - #[test] - #[cfg(compiler_has_scoped_bugfix)] - fn safe_execute() { - let mut pool = Pool::new(4); + #[should_panic] + #[test] fn scoped_thread_panic_pool() { scoped_thread_panic(Pool::new(4)) } + #[should_panic] + #[test] fn scoped_thread_panic_anchored() { scoped_thread_panic(Anchored::new(4)) } + + fn scoped_thread_panic>(pool: P) { + let pool = pool.borrow(); pool.scoped(|scoped| { scoped.execute(move || { + panic!() }); }); } -} - -#[cfg(all(test, feature="nightly"))] -mod benches { - extern crate test; - - use self::test::{Bencher, black_box}; - use super::Pool; - use std::sync::Mutex; - - // const MS_SLEEP_PER_OP: u32 = 1; - - lazy_static! { - static ref POOL_1: Mutex = Mutex::new(Pool::new(1)); - static ref POOL_2: Mutex = Mutex::new(Pool::new(2)); - static ref POOL_3: Mutex = Mutex::new(Pool::new(3)); - static ref POOL_4: Mutex = Mutex::new(Pool::new(4)); - static ref POOL_5: Mutex = Mutex::new(Pool::new(5)); - static ref POOL_8: Mutex = Mutex::new(Pool::new(8)); - } - - fn fib(n: u64) -> u64 { - let mut prev_prev: u64 = 1; - let mut prev = 1; - let mut current = 1; - for _ in 2..(n+1) { - current = prev_prev.wrapping_add(prev); - prev_prev = prev; - prev = current; - } - current - } - fn threads_interleaved_n(pool: &mut Pool) { - let size = 1024; // 1kiB + #[test] + #[should_panic] + fn scoped_panic_pool() { scope_panic(Pool::new(4)) } + #[test] + #[should_panic] + fn scoped_panic_anchored() { scope_panic(Anchored::new(4)) } - let mut data = vec![1u8; size]; - pool.scoped(|s| { - for e in data.iter_mut() { - s.execute(move || { - *e += fib(black_box(1000 * (*e as u64))) as u8; - for i in 0..10000 { black_box(i); } - //thread::sleep_ms(MS_SLEEP_PER_OP); - }); - } + fn scope_panic>(pool: P) { + let pool = pool.borrow(); + pool.scoped(|_scoped| { + panic!() }); } - #[bench] - fn threads_interleaved_1(b: &mut Bencher) { - b.iter(|| threads_interleaved_n(&mut POOL_1.lock().unwrap())) - } + #[test] + #[should_panic] + fn panic_pool() { pool_panic(Pool::new(4)) } + #[test] + #[should_panic] + fn panic_anchored() { pool_panic(Anchored::new(4)) } - #[bench] - fn threads_interleaved_2(b: &mut Bencher) { - b.iter(|| threads_interleaved_n(&mut POOL_2.lock().unwrap())) + fn pool_panic>(pool: P) { + let _pool = pool.borrow(); + panic!() } - #[bench] - fn threads_interleaved_4(b: &mut Bencher) { - b.iter(|| threads_interleaved_n(&mut POOL_4.lock().unwrap())) - } + #[test] fn wait_all_pool() { wait_all(Pool::new(4)) } + #[test] fn wait_all_anchored() { wait_all(Anchored::new(4)) } - #[bench] - fn threads_interleaved_8(b: &mut Bencher) { - b.iter(|| threads_interleaved_n(&mut POOL_8.lock().unwrap())) - } + fn wait_all>(pool: P) { + let pool = pool.borrow(); - fn threads_chunked_n(pool: &mut Pool) { - // Set this to 1GB and 40 to get good but slooow results - let size = 1024 * 1024 * 10 / 4; // 10MiB - let bb_repeat = 50; + let (tx_, rx) = channel(); - let n = pool.thread_count(); - let mut data = vec![0u32; size]; - pool.scoped(|s| { - let l = (data.len() - 1) / n as usize + 1; - for es in data.chunks_mut(l) { - s.execute(move || { - if es.len() > 1 { - es[0] = 1; - es[1] = 1; - for i in 2..es.len() { - // Fibonnaci gets big fast, - // so just wrap around all the time - es[i] = black_box(es[i-1].wrapping_add(es[i-2])); - for i in 0..bb_repeat { black_box(i); } - } - } - //thread::sleep_ms(MS_SLEEP_PER_OP); - }); - } - }); - } + pool.scoped(|scoped| { + let tx = tx_.clone(); + scoped.execute(move || { + thread::sleep(Duration::from_millis(1000)); + tx.send(2).unwrap(); + }); - #[bench] - fn threads_chunked_1(b: &mut Bencher) { - b.iter(|| threads_chunked_n(&mut POOL_1.lock().unwrap())) - } + let tx = tx_.clone(); + scoped.execute(move || { + tx.send(1).unwrap(); + }); - #[bench] - fn threads_chunked_2(b: &mut Bencher) { - b.iter(|| threads_chunked_n(&mut POOL_2.lock().unwrap())) - } + scoped.wait_all(); - #[bench] - fn threads_chunked_3(b: &mut Bencher) { - b.iter(|| threads_chunked_n(&mut POOL_3.lock().unwrap())) - } + let tx = tx_.clone(); + scoped.execute(move || { + tx.send(3).unwrap(); + }); + }); - #[bench] - fn threads_chunked_4(b: &mut Bencher) { - b.iter(|| threads_chunked_n(&mut POOL_4.lock().unwrap())) + assert_eq!(rx.iter().take(3).collect::>(), vec![1, 2, 3]); } - #[bench] - fn threads_chunked_5(b: &mut Bencher) { - b.iter(|| threads_chunked_n(&mut POOL_5.lock().unwrap())) - } + #[test] fn simple_pool() { simple(Pool::new(4)) } + #[test] fn simple_anchored() { simple(Anchored::new(4)) } - #[bench] - fn threads_chunked_8(b: &mut Bencher) { - b.iter(|| threads_chunked_n(&mut POOL_8.lock().unwrap())) + fn simple>(pool: P) { + let pool = pool.borrow(); + pool.scoped(|scoped| { + scoped.execute(move || { + }); + }); } }