Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement Upkeep::start_pinned API #93

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ prost = ["prost-types"]
once_cell = "1.4"
prost-types = { version = "0.11", default-features = false, optional = true }
crossbeam-utils = "0.8.5"
core_affinity = "0.8"

[target.'cfg(target_arch = "x86")'.dependencies]
raw-cpuid = "11.0"
Expand Down
79 changes: 66 additions & 13 deletions src/upkeep.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::{
fmt, io,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
mpsc, Arc,
},
thread::{self, JoinHandle},
time::Duration,
Expand All @@ -22,9 +22,8 @@ static GLOBAL_UPKEEP_RUNNING: AtomicBool = AtomicBool::new(false);
/// [`Clock::recent`], which is updated by a background upkeep thread. That thread is configured
/// and spanwed via [`Upkeep`].
///
/// [`Upkeep`] can construct a new clock (or be passed an existing clock to use), and given an
/// update interval, and it will faithfully attempt to update the global recent time on the
/// specified interval. There is a trade-off to be struck in terms of how often the time is
/// Given an update interval, [`Upkeep`] will faithfully attempt to update the global recent time
/// on the specified interval. There is a trade-off to be struck in terms of how often the time is
/// updated versus the required accuracy. Checking the time and updating the global reference is
/// itself not zero-cost, and so care must be taken to analyze the number of readers in order to
/// ensure that, given a particular update interval, the upkeep thread is saving more CPU time than
Expand All @@ -44,7 +43,6 @@ static GLOBAL_UPKEEP_RUNNING: AtomicBool = AtomicBool::new(false);
#[derive(Debug)]
pub struct Upkeep {
interval: Duration,
clock: Clock,
}

/// Handle to a running upkeep thread.
Expand All @@ -58,12 +56,15 @@ pub struct Handle {
}

/// Errors thrown during the creation/spawning of the upkeep thread.
#[non_exhaustive]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: adding #[non_exhaustive] is a breaking change (as is adding a new error variant), but allows adding more error variants in the future without breaking changes. MSRV for #[non_exhaustive] is 1.40

#[derive(Debug)]
pub enum Error {
/// An upkeep thread is already running in this process.
UpkeepRunning,
/// An error occurred when trying to spawn the upkeep thread.
FailedToSpawnUpkeepThread(io::Error),
/// The upkeep thread could not be successfully pinned.
FailedToPinUpkeepThread,
}

impl fmt::Display for Error {
Expand All @@ -73,6 +74,7 @@ impl fmt::Display for Error {
Error::FailedToSpawnUpkeepThread(e) => {
write!(f, "failed to spawn upkeep thread: {}", e)
}
Error::FailedToPinUpkeepThread => write!(f, "failed to pin upkeep thread"),
}
}
}
Expand All @@ -82,23 +84,22 @@ impl std::error::Error for Error {
match self {
Self::UpkeepRunning => None,
Self::FailedToSpawnUpkeepThread(e) => Some(e),
Self::FailedToPinUpkeepThread => None,
}
}
}

impl Upkeep {
/// Creates a new [`Upkeep`].
///
/// This creates a new internal clock for acquiring the current time. If you have an existing
/// [`Clock`] that is already calibrated, it is slightly faster to clone it and construct the
/// builder with [`new_with_clock`](Upkeep::new_with_clock) to avoid recalibrating.
pub fn new(interval: Duration) -> Upkeep {
Self::new_with_clock(interval, Clock::new())
Upkeep { interval }
}

/// Creates a new [`Upkeep`] with the specified [`Clock`] instance.
pub fn new_with_clock(interval: Duration, clock: Clock) -> Upkeep {
Upkeep { interval, clock }
#[doc(hidden)]
#[deprecated = "`Upkeep::new_with_clock` is not faster than `Upkeep::new`. Use `Upkeep::new` instead."]
pub fn new_with_clock(interval: Duration, _: Clock) -> Upkeep {
Upkeep { interval }
}

/// Start the upkeep thread, periodically updating the global coarse time.
Expand All @@ -112,20 +113,60 @@ impl Upkeep {
/// If either an existing upkeep thread is running, or there was an issue when attempting to
/// spawn the upkeep thread, an error variant will be returned describing the error.
pub fn start(self) -> Result<Handle, Error> {
self.inner_start(None)
}

/// Start the upkeep thread pinned to `core_id`, periodically updating the global coarse time.
/// [`Upkeep`] will construct a [`Clock`] and run calibration against the core it is
/// pinned to. Since all [`Clock`] instances share a global lazily initialized calibration,
/// users intending to use this API should avoid calling [`Clock::new`] before starting a
/// pinned [`Upkeep`] thread.
///
/// [`Handle`] represents a drop guard for the upkeep thread if it is successfully spawned.
/// Dropping the handle will also instruct the upkeep thread to stop and exist, so the handle
/// must be held while the upkeep thread should continue to run.
///
/// # Errors
///
/// If either an existing upkeep thread is running, or there was an issue when attempting to
/// spawn the upkeep thread, or the upkeep thread was not successfully pinned to `core_id`,
/// an error variant will be returned describing the error.
pub fn start_pinned(self, core_id: core_affinity::CoreId) -> Result<Handle, Error> {
self.inner_start(Some(core_id))
}

fn inner_start(self, core_id: Option<core_affinity::CoreId>) -> Result<Handle, Error> {
// If another upkeep thread is running, inform the caller.
let _ = GLOBAL_UPKEEP_RUNNING
.compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
.map_err(|_| Error::UpkeepRunning)?;

let interval = self.interval;
let clock = self.clock;

let done = Arc::new(AtomicBool::new(false));
let their_done = done.clone();

let (pin_success_sender, pin_success_receiver) = mpsc::sync_channel(1);
let result = thread::Builder::new()
.name("quanta-upkeep".to_string())
.spawn(move || {
if let Some(core_id) = core_id {
let success = core_affinity::set_for_current(core_id);

// Panic safety: `send` may panic if the receiver side has been dropped.
// That can happen only if the parent thread paniced before we reached this
// point. So, this (practically) never panics.
pin_success_sender.send(success).unwrap();

// Do not keep this thread running if pinning was requested, but we failed to
// pin.
if !success {
GLOBAL_UPKEEP_RUNNING.store(false, Ordering::SeqCst);
return;
}
}

let clock = Clock::new();
while !their_done.load(Ordering::Acquire) {
set_recent(clock.now());

Expand All @@ -141,6 +182,18 @@ impl Upkeep {
GLOBAL_UPKEEP_RUNNING.store(false, Ordering::SeqCst);
}

// When thread pinning is requested, verify `quanta-upkeep` has been successfully pinned.
if core_id.is_some() {
// Panic safety: `recv` may panic if the sender has disconnected, or is disconnecting
// while this is blocking.
// However, since we always send a message before the sender can be dropped, this call
// never panics.
let success = pin_success_receiver.recv().unwrap();
if !success {
return Err(Error::FailedToPinUpkeepThread);
}
}

let handle = result?;

Ok(Handle {
Expand Down