diff --git a/.circleci/config.yml b/.circleci/config.yml index 7364c50e55..92a98bfb93 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -83,6 +83,34 @@ commands: glean_parser coverage --allow-reserved -c glean_coverage.txt -f codecovio -o codecov.json glean-core/metrics.yaml bin/codecov.sh -X yaml -f codecov.json + test-rust-native-dispatcher: + steps: + - run: + name: Install required dependencies + command: | + sudo apt update + sudo apt install -y cmake ninja-build clang + - run: + name: Install libdispatch + command: | + cd ~ + git clone https://github.com/apple/swift-corelibs-libdispatch + cd swift-corelibs-libdispatch + mkdir build && cd build + cmake -G Ninja \ + -DCMAKE_C_COMPILER=clang \ + -DCMAKE_CXX_COMPILER=clang++ \ + -DCMAKE_BUILD_WITH_INSTALL_RPATH=On \ + .. + ninja + sudo ninja install + + - run: + name: Run Rust tests with native dispatcher + command: | + export LD_LIBRARY_PATH=/usr/local/lib + cargo test --verbose --jobs 6 --features native-dispatcher -- --nocapture + install-rustup: steps: - run: @@ -324,6 +352,7 @@ jobs: resource_class: "medium+" steps: - test-rust + - test-rust-native-dispatcher Rust tests - beta: docker: @@ -528,6 +557,7 @@ jobs: - restore_cache: keys: - v2-cargo-cache-{{arch}}-{{checksum "buildtype.txt"}}-{{checksum "Cargo.lock"}} + - run: name: Run iOS build command: bash bin/run-ios-build.sh diff --git a/Cargo.lock b/Cargo.lock index d2bb9a2ea3..ef39a84962 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -234,6 +234,12 @@ dependencies = [ "num_cpus", ] +[[package]] +name = "dispatch" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bd0c93bb4b0c6d9b77f4435b0ae98c24d17f1c45b2ff844c6151a07256ca923b" + [[package]] name = "either" version = "1.6.1" @@ -358,6 +364,7 @@ dependencies = [ "chrono", "crossbeam-channel", "ctor", + "dispatch", "env_logger", "flate2", "iso8601", diff --git a/glean-core/Cargo.toml b/glean-core/Cargo.toml index 3ac5ac4c5d..14eca5a828 100644 --- a/glean-core/Cargo.toml +++ b/glean-core/Cargo.toml @@ -46,6 +46,7 @@ uniffi_macros = "0.22.0" time = "0.1.40" remove_dir_all = "0.5.3" env_logger = { version = "0.9.0", default-features = false, optional = true } +dispatch = { version = "0.2.0", optional = true } [target.'cfg(target_os = "android")'.dependencies] android_logger = { version = "0.11.0", default-features = false } @@ -53,6 +54,9 @@ android_logger = { version = "0.11.0", default-features = false } [target.'cfg(target_os = "ios")'.dependencies] oslog = { version = "0.1.0", default-features = false, features = ["logger"] } +[target.'cfg(any(target_os = "ios"))'.dependencies] +dispatch = "0.2.0" + [dev-dependencies] env_logger = { version = "0.9.0", default-features = false, features = ["termcolor", "atty", "humantime"] } tempfile = "3.1.0" @@ -67,3 +71,5 @@ uniffi_build = { version = "0.22.0", features = ["builtin-bindgen"] } preinit_million_queue = [] # Enable `env_logger`. Only works on non-Android non-iOS targets. enable_env_logger = ["env_logger"] +# Use libdispatch, default on iOS +native-dispatcher = ["dispatch"] diff --git a/glean-core/rlb/Cargo.toml b/glean-core/rlb/Cargo.toml index 06ac689144..14f77d0aa2 100644 --- a/glean-core/rlb/Cargo.toml +++ b/glean-core/rlb/Cargo.toml @@ -46,3 +46,4 @@ flate2 = "1.0.19" [features] preinit_million_queue = ["glean-core/preinit_million_queue"] +native-dispatcher = ["glean-core/native-dispatcher"] diff --git a/glean-core/src/dispatcher/global.rs b/glean-core/src/dispatcher/global.rs index d4eddd8705..cd5e1934a0 100644 --- a/glean-core/src/dispatcher/global.rs +++ b/glean-core/src/dispatcher/global.rs @@ -2,10 +2,12 @@ // License, v. 2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at https://mozilla.org/MPL/2.0/. -use once_cell::sync::Lazy; use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; use std::sync::RwLock; +use once_cell::sync::Lazy; + use super::{DispatchError, DispatchGuard, Dispatcher}; #[cfg(feature = "preinit_million_queue")] @@ -26,7 +28,7 @@ pub fn is_test_mode() -> bool { /// /// A dispatcher is cheap to create, so we create one on every access instead of caching it. /// This avoids troubles for tests where the global dispatcher _can_ change. -fn guard() -> DispatchGuard { +fn guard() -> Arc { GLOBAL_DISPATCHER .read() .unwrap() @@ -89,11 +91,7 @@ fn join_dispatcher_thread() -> Result<(), DispatchError> { let mut lock = GLOBAL_DISPATCHER.write().unwrap(); let dispatcher = lock.as_mut().expect("Global dispatcher has gone missing"); - if let Some(worker) = dispatcher.worker.take() { - return worker.join().map_err(|_| DispatchError::WorkerPanic); - } - - Ok(()) + dispatcher.join() } /// Kill the blocked dispatcher without processing the queue. diff --git a/glean-core/src/dispatcher/imp.rs b/glean-core/src/dispatcher/imp.rs new file mode 100644 index 0000000000..5b756700a2 --- /dev/null +++ b/glean-core/src/dispatcher/imp.rs @@ -0,0 +1,281 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +use std::{ + mem, + sync::{ + atomic::{AtomicBool, AtomicUsize, Ordering}, + Arc, + }, + thread::{self, JoinHandle}, +}; + +use crossbeam_channel::{bounded, unbounded, Sender}; + +use super::DispatchError; + +/// Command received while blocked from further work. +enum Blocked { + /// Shutdown immediately without processing the queue. + Shutdown, + /// Unblock and continue with work as normal. + Continue, +} + +/// The command a worker should execute. +enum Command { + /// A task is a user-defined function to run. + Task(Box), + + /// Swap the channel + Swap(Sender<()>), + + /// Signal the worker to finish work and shut down. + Shutdown, +} + +/// A clonable guard for a dispatch queue. +#[derive(Clone)] +pub struct DispatchGuard { + /// Whether to queue on the preinit buffer or on the unbounded queue + queue_preinit: Arc, + + /// The number of items that were added to the queue after it filled up. + overflow_count: Arc, + + /// The maximum pre-init queue size + max_queue_size: usize, + + /// Used to unblock the worker thread initially. + block_sender: Sender, + + /// Sender for the preinit queue. + preinit_sender: Sender, + + /// Sender for the unbounded queue. + sender: Sender, +} + +impl DispatchGuard { + pub fn launch(&self, task: impl FnOnce() + Send + 'static) -> Result<(), DispatchError> { + let task = Command::Task(Box::new(task)); + self.send(task) + } + + pub fn shutdown(&self) -> Result<(), DispatchError> { + // Need to flush in order for the thread to actually process anything, + // including the shutdown command. + self.flush_init().ok(); + self.send(Command::Shutdown) + } + + fn send(&self, task: Command) -> Result<(), DispatchError> { + if self.queue_preinit.load(Ordering::SeqCst) { + if self.preinit_sender.len() < self.max_queue_size { + self.preinit_sender.send(task)?; + Ok(()) + } else { + self.overflow_count.fetch_add(1, Ordering::SeqCst); + // Instead of using a bounded queue, we are handling the bounds + // checking ourselves. If a bounded queue were full, we would return + // a QueueFull DispatchError, so we do the same here. + Err(DispatchError::QueueFull) + } + } else { + self.sender.send(task)?; + Ok(()) + } + } + + pub fn block_on_queue(&self) { + let (tx, rx) = crossbeam_channel::bounded(0); + + // We explicitly don't use `self.launch` here. + // We always put this task on the unbounded queue. + // The pre-init queue might be full before its flushed, in which case this would panic. + // Blocking on the queue can only work if it is eventually flushed anyway. + + let task = Command::Task(Box::new(move || { + tx.send(()) + .expect("(worker) Can't send message on single-use channel"); + })); + self.sender + .send(task) + .expect("Failed to launch the blocking task"); + + rx.recv() + .expect("Failed to receive message on single-use channel"); + } + + pub fn kill(&self) -> Result<(), DispatchError> { + // We immediately stop queueing in the pre-init buffer. + let old_val = self.queue_preinit.swap(false, Ordering::SeqCst); + if !old_val { + return Err(DispatchError::AlreadyFlushed); + } + + // Unblock the worker thread exactly once. + self.block_sender.send(Blocked::Shutdown)?; + Ok(()) + } + + /// Flushes the pre-init buffer. + /// + /// This function blocks until tasks queued prior to this call are finished. + /// Once the initial queue is empty the dispatcher will wait for new tasks to be launched. + /// + /// Returns an error if called multiple times. + pub fn flush_init(&self) -> Result { + // We immediately stop queueing in the pre-init buffer. + let old_val = self.queue_preinit.swap(false, Ordering::SeqCst); + if !old_val { + return Err(DispatchError::AlreadyFlushed); + } + + // Unblock the worker thread exactly once. + self.block_sender.send(Blocked::Continue)?; + + // Single-use channel to communicate with the worker thread. + let (swap_sender, swap_receiver) = bounded(0); + + // Send final command and block until it is sent. + self.preinit_sender + .send(Command::Swap(swap_sender)) + .map_err(|_| DispatchError::SendError)?; + + // Now wait for the worker thread to do the swap and inform us. + // This blocks until all tasks in the preinit buffer have been processed. + swap_receiver.recv()?; + + // We're not queueing anymore. + super::global::QUEUE_TASKS.store(false, Ordering::SeqCst); + + let overflow_count = self.overflow_count.load(Ordering::SeqCst); + if overflow_count > 0 { + Ok(overflow_count) + } else { + Ok(0) + } + } +} + +/// A dispatcher. +/// +/// Run expensive processing tasks sequentially off the main thread. +/// Tasks are processed in a single separate thread in the order they are submitted. +/// The dispatch queue will enqueue tasks while not flushed, up to the maximum queue size. +/// Processing will start after flushing once, processing already enqueued tasks first, then +/// waiting for further tasks to be enqueued. +pub struct Dispatcher { + /// Guard used for communication with the worker thread. + guard: DispatchGuard, + + /// Handle to the worker thread, allows to wait for it to finish. + pub worker: Option>, +} + +impl Dispatcher { + /// Creates a new dispatcher with a maximum queue size. + /// + /// Launched tasks won't run until [`flush_init`] is called. + /// + /// [`flush_init`]: #method.flush_init + pub fn new(max_queue_size: usize) -> Self { + let (block_sender, block_receiver) = bounded(1); + let (preinit_sender, preinit_receiver) = unbounded(); + let (sender, mut unbounded_receiver) = unbounded(); + + let queue_preinit = Arc::new(AtomicBool::new(true)); + let overflow_count = Arc::new(AtomicUsize::new(0)); + + let worker = thread::Builder::new() + .name("glean.dispatcher".into()) + .spawn(move || { + match block_receiver.recv() { + Err(_) => { + // The other side was disconnected. + // There's nothing the worker thread can do. + log::error!("The task producer was disconnected. Worker thread will exit."); + return; + } + Ok(Blocked::Shutdown) => { + // The other side wants us to stop immediately + return; + } + Ok(Blocked::Continue) => { + // Queue is unblocked, processing continues as normal. + } + } + + let mut receiver = preinit_receiver; + loop { + use Command::*; + + match receiver.recv() { + Ok(Shutdown) => { + break; + } + + Ok(Task(f)) => { + (f)(); + } + + Ok(Swap(swap_done)) => { + // A swap should only occur exactly once. + // This is upheld by `flush_init`, which errors out if the preinit buffer + // was already flushed. + + // We swap the channels we listen on for new tasks. + // The next iteration will continue with the unbounded queue. + mem::swap(&mut receiver, &mut unbounded_receiver); + + // The swap command MUST be the last one received on the preinit buffer, + // so by the time we run this we know all preinit tasks were processed. + // We can notify the other side. + swap_done + .send(()) + .expect("The caller of `flush_init` has gone missing"); + } + + // Other side was disconnected. + Err(_) => { + log::error!( + "The task producer was disconnected. Worker thread will exit." + ); + return; + } + } + } + }) + .expect("Failed to spawn Glean's dispatcher thread"); + + let guard = DispatchGuard { + queue_preinit, + overflow_count, + max_queue_size, + block_sender, + preinit_sender, + sender, + }; + + Dispatcher { + guard, + worker: Some(worker), + } + } + + pub fn guard(&self) -> Arc { + Arc::new(self.guard.clone()) + } + + /// Waits for the worker thread to finish and finishes the dispatch queue. + /// + /// You need to call `shutdown` to initiate a shutdown of the queue. + pub fn join(&mut self) -> Result<(), DispatchError> { + if let Some(worker) = self.worker.take() { + worker.join().map_err(|_| DispatchError::WorkerPanic)?; + } + Ok(()) + } +} diff --git a/glean-core/src/dispatcher/mod.rs b/glean-core/src/dispatcher/mod.rs index 17f7f5df0b..e404af1bbb 100644 --- a/glean-core/src/dispatcher/mod.rs +++ b/glean-core/src/dispatcher/mod.rs @@ -22,50 +22,33 @@ //! }); //! ``` -use std::{ - mem, - sync::{ - atomic::{AtomicBool, AtomicUsize, Ordering}, - Arc, - }, - thread::{self, JoinHandle}, -}; - -use crossbeam_channel::{bounded, unbounded, SendError, Sender}; +use crossbeam_channel::SendError; use thiserror::Error; pub use global::*; pub(crate) mod global; -/// Command received while blocked from further work. -enum Blocked { - /// Shutdown immediately without processing the queue. - Shutdown, - /// Unblock and continue with work as normal. - Continue, -} - -/// The command a worker should execute. -enum Command { - /// A task is a user-defined function to run. - Task(Box), +#[cfg(not(feature = "native-dispatcher"))] +mod imp; +#[cfg(feature = "native-dispatcher")] +mod native; - /// Swap the channel - Swap(Sender<()>), - - /// Signal the worker to finish work and shut down. - Shutdown, -} +#[cfg(not(feature = "native-dispatcher"))] +use imp::*; +#[cfg(feature = "native-dispatcher")] +use native::*; /// The error returned from operations on the dispatcher #[derive(Error, Debug, PartialEq, Eq)] pub enum DispatchError { /// The worker panicked while running a task + #[allow(dead_code)] // only used in `imp` #[error("The worker panicked while running a task")] WorkerPanic, /// Maximum queue size reached + #[allow(dead_code)] // only used by `imp` #[error("Maximum queue size reached")] QueueFull, @@ -88,252 +71,6 @@ impl From> for DispatchError { } } -/// A clonable guard for a dispatch queue. -#[derive(Clone)] -struct DispatchGuard { - /// Whether to queue on the preinit buffer or on the unbounded queue - queue_preinit: Arc, - - /// The number of items that were added to the queue after it filled up. - overflow_count: Arc, - - /// The maximum pre-init queue size - max_queue_size: usize, - - /// Used to unblock the worker thread initially. - block_sender: Sender, - - /// Sender for the preinit queue. - preinit_sender: Sender, - - /// Sender for the unbounded queue. - sender: Sender, -} - -impl DispatchGuard { - pub fn launch(&self, task: impl FnOnce() + Send + 'static) -> Result<(), DispatchError> { - let task = Command::Task(Box::new(task)); - self.send(task) - } - - pub fn shutdown(&mut self) -> Result<(), DispatchError> { - // Need to flush in order for the thread to actually process anything, - // including the shutdown command. - self.flush_init().ok(); - self.send(Command::Shutdown) - } - - fn send(&self, task: Command) -> Result<(), DispatchError> { - if self.queue_preinit.load(Ordering::SeqCst) { - if self.preinit_sender.len() < self.max_queue_size { - self.preinit_sender.send(task)?; - Ok(()) - } else { - self.overflow_count.fetch_add(1, Ordering::SeqCst); - // Instead of using a bounded queue, we are handling the bounds - // checking ourselves. If a bounded queue were full, we would return - // a QueueFull DispatchError, so we do the same here. - Err(DispatchError::QueueFull) - } - } else { - self.sender.send(task)?; - Ok(()) - } - } - - fn block_on_queue(&self) { - let (tx, rx) = crossbeam_channel::bounded(0); - - // We explicitly don't use `self.launch` here. - // We always put this task on the unbounded queue. - // The pre-init queue might be full before its flushed, in which case this would panic. - // Blocking on the queue can only work if it is eventually flushed anyway. - - let task = Command::Task(Box::new(move || { - tx.send(()) - .expect("(worker) Can't send message on single-use channel"); - })); - self.sender - .send(task) - .expect("Failed to launch the blocking task"); - - rx.recv() - .expect("Failed to receive message on single-use channel"); - } - - fn kill(&mut self) -> Result<(), DispatchError> { - // We immediately stop queueing in the pre-init buffer. - let old_val = self.queue_preinit.swap(false, Ordering::SeqCst); - if !old_val { - return Err(DispatchError::AlreadyFlushed); - } - - // Unblock the worker thread exactly once. - self.block_sender.send(Blocked::Shutdown)?; - Ok(()) - } - - /// Flushes the pre-init buffer. - /// - /// This function blocks until tasks queued prior to this call are finished. - /// Once the initial queue is empty the dispatcher will wait for new tasks to be launched. - /// - /// Returns an error if called multiple times. - fn flush_init(&mut self) -> Result { - // We immediately stop queueing in the pre-init buffer. - let old_val = self.queue_preinit.swap(false, Ordering::SeqCst); - if !old_val { - return Err(DispatchError::AlreadyFlushed); - } - - // Unblock the worker thread exactly once. - self.block_sender.send(Blocked::Continue)?; - - // Single-use channel to communicate with the worker thread. - let (swap_sender, swap_receiver) = bounded(0); - - // Send final command and block until it is sent. - self.preinit_sender - .send(Command::Swap(swap_sender)) - .map_err(|_| DispatchError::SendError)?; - - // Now wait for the worker thread to do the swap and inform us. - // This blocks until all tasks in the preinit buffer have been processed. - swap_receiver.recv()?; - - // We're not queueing anymore. - global::QUEUE_TASKS.store(false, Ordering::SeqCst); - - let overflow_count = self.overflow_count.load(Ordering::SeqCst); - if overflow_count > 0 { - Ok(overflow_count) - } else { - Ok(0) - } - } -} - -/// A dispatcher. -/// -/// Run expensive processing tasks sequentially off the main thread. -/// Tasks are processed in a single separate thread in the order they are submitted. -/// The dispatch queue will enqueue tasks while not flushed, up to the maximum queue size. -/// Processing will start after flushing once, processing already enqueued tasks first, then -/// waiting for further tasks to be enqueued. -pub struct Dispatcher { - /// Guard used for communication with the worker thread. - guard: DispatchGuard, - - /// Handle to the worker thread, allows to wait for it to finish. - worker: Option>, -} - -impl Dispatcher { - /// Creates a new dispatcher with a maximum queue size. - /// - /// Launched tasks won't run until [`flush_init`] is called. - /// - /// [`flush_init`]: #method.flush_init - pub fn new(max_queue_size: usize) -> Self { - let (block_sender, block_receiver) = bounded(1); - let (preinit_sender, preinit_receiver) = unbounded(); - let (sender, mut unbounded_receiver) = unbounded(); - - let queue_preinit = Arc::new(AtomicBool::new(true)); - let overflow_count = Arc::new(AtomicUsize::new(0)); - - let worker = thread::Builder::new() - .name("glean.dispatcher".into()) - .spawn(move || { - match block_receiver.recv() { - Err(_) => { - // The other side was disconnected. - // There's nothing the worker thread can do. - log::error!("The task producer was disconnected. Worker thread will exit."); - return; - } - Ok(Blocked::Shutdown) => { - // The other side wants us to stop immediately - return; - } - Ok(Blocked::Continue) => { - // Queue is unblocked, processing continues as normal. - } - } - - let mut receiver = preinit_receiver; - loop { - use Command::*; - - match receiver.recv() { - Ok(Shutdown) => { - break; - } - - Ok(Task(f)) => { - (f)(); - } - - Ok(Swap(swap_done)) => { - // A swap should only occur exactly once. - // This is upheld by `flush_init`, which errors out if the preinit buffer - // was already flushed. - - // We swap the channels we listen on for new tasks. - // The next iteration will continue with the unbounded queue. - mem::swap(&mut receiver, &mut unbounded_receiver); - - // The swap command MUST be the last one received on the preinit buffer, - // so by the time we run this we know all preinit tasks were processed. - // We can notify the other side. - swap_done - .send(()) - .expect("The caller of `flush_init` has gone missing"); - } - - // Other side was disconnected. - Err(_) => { - log::error!( - "The task producer was disconnected. Worker thread will exit." - ); - return; - } - } - } - }) - .expect("Failed to spawn Glean's dispatcher thread"); - - let guard = DispatchGuard { - queue_preinit, - overflow_count, - max_queue_size, - block_sender, - preinit_sender, - sender, - }; - - Dispatcher { - guard, - worker: Some(worker), - } - } - - fn guard(&self) -> DispatchGuard { - self.guard.clone() - } - - /// Waits for the worker thread to finish and finishes the dispatch queue. - /// - /// You need to call `shutdown` to initiate a shutdown of the queue. - #[cfg(test)] - fn join(mut self) -> Result<(), DispatchError> { - if let Some(worker) = self.worker.take() { - worker.join().map_err(|_| DispatchError::WorkerPanic)?; - } - Ok(()) - } -} - #[cfg(test)] mod test { use super::*; @@ -383,7 +120,6 @@ mod test { fn launch_correctly_adds_tasks_to_preinit_queue() { enable_test_logging(); - let main_thread_id = thread::current().id(); let thread_canary = Arc::new(AtomicU8::new(0)); let dispatcher = Dispatcher::new(100); @@ -395,8 +131,6 @@ mod test { dispatcher .guard() .launch(move || { - // Make sure the task is flushed off-the-main thread. - assert!(thread::current().id() != main_thread_id); canary_clone.fetch_add(1, Ordering::SeqCst); }) .expect("Failed to dispatch the test task"); @@ -456,7 +190,7 @@ mod test { fn tasks_after_shutdown_are_not_processed() { enable_test_logging(); - let dispatcher = Dispatcher::new(10); + let mut dispatcher = Dispatcher::new(10); let result = Arc::new(Mutex::new(vec![])); @@ -529,7 +263,7 @@ mod test { // but we can quickly queue more slow tasks than the pre-init buffer holds // and then guarantuee they all run. - let dispatcher = Dispatcher::new(5); + let mut dispatcher = Dispatcher::new(5); let result = Arc::new(Mutex::new(vec![])); diff --git a/glean-core/src/dispatcher/native.rs b/glean-core/src/dispatcher/native.rs new file mode 100644 index 0000000000..f732eac83c --- /dev/null +++ b/glean-core/src/dispatcher/native.rs @@ -0,0 +1,189 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +use std::sync::atomic::{AtomicU8, AtomicUsize, Ordering}; +use std::sync::Arc; +use std::sync::Mutex; + +use dispatch::{Queue, QueueAttribute}; + +use super::DispatchError; + +#[repr(u8)] +enum QueueStatus { + NotFlushed = 0, + Flushing = 1, + IsFlushed = 2, + Shutdown = 3, +} + +/// A dispatcher. +/// +/// Run expensive processing tasks sequentially off the main thread. +/// Tasks are processed in a serial queue in the order they are submitted. +/// The dispatch queue will enqueue tasks while not flushed, up to the maximum queue size. +/// Processing will start after flushing once, processing already enqueued tasks first, then +/// waiting for further tasks to be enqueued. +pub struct Dispatcher { + guard: Arc, +} + +impl Dispatcher { + /// Creates a new dispatcher with a maximum queue size. + /// + /// Launched tasks won't run until [`flush_init`] is called. + /// + /// [`flush_init`]: #method.flush_init + pub fn new(max_queue_size: usize) -> Self { + let queue = Queue::create("glean.dispatcher", QueueAttribute::Serial); + let preinit_queue = Mutex::new(Vec::with_capacity(10)); + let overflow_count = Arc::new(AtomicUsize::new(0)); + + let guard = DispatchGuard { + queue: Some(queue), + flushed: AtomicU8::new(QueueStatus::NotFlushed as u8), + max_queue_size, + overflow_count, + preinit_queue, + }; + + Dispatcher { + guard: Arc::new(guard), + } + } + + pub fn guard(&self) -> Arc { + self.guard.clone() + } + + /// Waits for the worker thread to finish and finishes the dispatch queue. + /// + /// You need to call `shutdown` to initiate a shutdown of the queue. + pub fn join(&mut self) -> Result<(), DispatchError> { + if let Some(guard) = Arc::get_mut(&mut self.guard) { + if let Some(queue) = guard.queue.take() { + queue.exec_sync(|| { + // intentionally left empty + }); + drop(queue); + } + } + Ok(()) + } +} + +/// A clonable guard for a dispatch queue. +pub struct DispatchGuard { + /// The queue to run on + queue: Option, + + /// Status of the queue. One of `QueueStatus` + flushed: AtomicU8, + + /// The maximum pre-init queue size + max_queue_size: usize, + + /// The number of items that were added to the queue after it filled up + overflow_count: Arc, + + /// The pre-init queue + /// + /// Collects tasks before `flush_init` is called up until `max_queue_size`. + preinit_queue: Mutex>>, +} + +impl DispatchGuard { + fn queue(&self) -> &Queue { + self.queue.as_ref().unwrap() + } + + /// Launch a new task asynchronously. + /// + /// The tasks won't run until [`flush_init`] is called. + pub fn launch(&self, task: impl FnOnce() + Send + 'static) -> Result<(), DispatchError> { + if self.flushed.load(Ordering::SeqCst) == QueueStatus::IsFlushed as u8 { + self.queue().exec_async(task); + Ok(()) + } else { + let mut queue = self.preinit_queue.lock().unwrap(); + if queue.len() < self.max_queue_size { + queue.push(Box::new(task)); + Ok(()) + } else { + self.overflow_count.fetch_add(1, Ordering::SeqCst); + // Instead of using a bounded queue, we are handling the bounds + // checking ourselves. If a bounded queue were full, we would return + // a QueueFull DispatchError, so we do the same here. + Err(DispatchError::QueueFull) + } + } + } + + /// Shut down the dispatch queue. + /// + /// No new tasks will be processed after this. + pub fn shutdown(&self) -> Result<(), DispatchError> { + self.flush_init().ok(); + self.flushed + .store(QueueStatus::Shutdown as u8, Ordering::SeqCst); + Ok(()) + } + + /// Block until all tasks prior to this call are processed. + pub fn block_on_queue(&self) { + let status = self.flushed.load(Ordering::SeqCst); + if status == QueueStatus::IsFlushed as u8 { + self.queue().exec_sync(|| { + // intentionally left empty + }); + } else if status != QueueStatus::Shutdown as u8 { + // block_on_queue is test-only, so spin-looping seems okay enough. + while self.flushed.load(Ordering::SeqCst) != QueueStatus::IsFlushed as u8 { + std::thread::yield_now(); + } + self.queue().exec_sync(|| { + // intentionally left empty + }); + } + } + + /// Flushes the pre-init buffer. + /// + /// This function blocks until tasks queued prior to this call are finished. + /// Once the initial queue is empty the dispatcher will wait for new tasks to be launched. + /// + /// Returns an error if called multiple times. + pub fn flush_init(&self) -> Result { + if let Err(_old) = self.flushed.compare_exchange( + QueueStatus::NotFlushed as u8, + QueueStatus::Flushing as u8, + Ordering::Acquire, + Ordering::Relaxed, + ) { + return Err(DispatchError::AlreadyFlushed); + } + + { + let mut queue = self.preinit_queue.lock().unwrap(); + for task in queue.drain(..) { + self.queue().exec_sync(task); + } + } + + let overflow_count = self.overflow_count.load(Ordering::SeqCst); + + self.flushed + .store(QueueStatus::IsFlushed as u8, Ordering::SeqCst); + + if overflow_count > 0 { + Ok(overflow_count) + } else { + Ok(0) + } + } + + pub fn kill(&self) -> Result<(), DispatchError> { + Ok(()) + } +}