Skip to content

Commit

Permalink
address more comments by reviewers
Browse files Browse the repository at this point in the history
  • Loading branch information
alexpyattaev committed Jan 10, 2025
1 parent 8fd7713 commit 3b56edf
Show file tree
Hide file tree
Showing 10 changed files with 111 additions and 95 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions thread-manager/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ publish = false

[dependencies]
anyhow = { workspace = true }
cfg-if = "1.0.0"
log = { workspace = true }
num_cpus = { workspace = true }
rayon = { workspace = true }
Expand All @@ -26,8 +27,12 @@ tower = "0.5.2"
affinity = "0.1.2"

[dev-dependencies]
agave-thread-manager = { path = ".", features = ["dev-context-only-utils"] }
axum = "0.7.9"
env_logger = { workspace = true }
hyper = { workspace = true, features = ["http1", "client", "stream", "tcp"] }
serde_json = { workspace = true }
toml = { workspace = true }

[features]
dev-context-only-utils = []
18 changes: 13 additions & 5 deletions thread-manager/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,18 @@ This will minimize contention for CPU caches and context switches that
would occur if Rayon was entirely unaware it was running side-by-side with
tokio, and each was to spawn as many threads as there are cores.

## Thread pool mapping
Thread manager will, by default, look for a particular named pool, e.g. "solGossip".
Matching is done independently for each type of runtime.
However, if no named pool is found, it will fall back to the "default" thread pool
of the same type (if specified in the config). If the default pool is not specified,
thread pool lookup will fail.

Multiple names can point to the same pool. For example, "solGossipConsume" and
"solSigverify" can both be executed on the same rayon pool named "rayonSigverify".
This, in principle, allows some degree of runtime sharing between different crates
in the codebase without having to manually patch the pointers through.

# Supported threading models
## Affinity
All threading models allow setting core affinity, but only on linux
Expand Down Expand Up @@ -51,14 +63,10 @@ one may want to spawn many rayon pools.

# TODO:

* support tracing
* better metrics integration
* proper error handling everywhere
* even more tests
* better thread priority support


# Examples
All examples need `wrk` HTTP behnchmarking tool for load generation. Please install it before running.

* core_contention_basics will demonstrate why core contention is bad, and how thread configs can help
* core_contention_sweep will sweep across a range of core counts to show how benefits scale with core counts
2 changes: 1 addition & 1 deletion thread-manager/examples/core_contention_basics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ fn main() -> anyhow::Result<()> {

let join_handle =
scope.spawn(|| workload_runtime.block_on(workload_main(&[8888, 8889], 1000)));
join_handle.join().expect("WRK crashed!")
join_handle.join().expect("Load generator crashed!")
});
//print out the results of the bench run
println!("Results are: {:?}", results);
Expand Down
66 changes: 29 additions & 37 deletions thread-manager/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,51 +15,48 @@ pub use {
rayon_runtime::{RayonConfig, RayonRuntime},
tokio_runtime::{TokioConfig, TokioRuntime},
};
pub type ConstString = Box<str>;

pub const MAX_THREAD_NAME_CHARS: usize = 12;

#[derive(Default, Debug)]
pub struct ThreadManagerInner {
pub tokio_runtimes: HashMap<ConstString, TokioRuntime>,
pub tokio_runtime_mapping: HashMap<ConstString, ConstString>,
pub tokio_runtimes: HashMap<String, TokioRuntime>,
pub tokio_runtime_mapping: HashMap<String, String>,

pub native_thread_runtimes: HashMap<ConstString, NativeThreadRuntime>,
pub native_runtime_mapping: HashMap<ConstString, ConstString>,
pub native_thread_runtimes: HashMap<String, NativeThreadRuntime>,
pub native_runtime_mapping: HashMap<String, String>,

pub rayon_runtimes: HashMap<ConstString, RayonRuntime>,
pub rayon_runtime_mapping: HashMap<ConstString, ConstString>,
pub rayon_runtimes: HashMap<String, RayonRuntime>,
pub rayon_runtime_mapping: HashMap<String, String>,
}

impl ThreadManagerInner {
/// Populates mappings with copies of config names, overrides as appropriate
fn populate_mappings(&mut self, config: &ThreadManagerConfig) {
//TODO: this should probably be cleaned up with a macro at some point...

for name in config.native_configs.keys() {
self.native_runtime_mapping
.insert(name.clone().into_boxed_str(), name.clone().into_boxed_str());
.insert(name.clone(), name.clone());
}
for (k, v) in config.native_runtime_mapping.iter() {
self.native_runtime_mapping
.insert(k.clone().into_boxed_str(), v.clone().into_boxed_str());
self.native_runtime_mapping.insert(k.clone(), v.clone());
}

for name in config.tokio_configs.keys() {
self.tokio_runtime_mapping
.insert(name.clone().into_boxed_str(), name.clone().into_boxed_str());
.insert(name.clone(), name.clone());
}
for (k, v) in config.tokio_runtime_mapping.iter() {
self.tokio_runtime_mapping
.insert(k.clone().into_boxed_str(), v.clone().into_boxed_str());
self.tokio_runtime_mapping.insert(k.clone(), v.clone());
}

for name in config.rayon_configs.keys() {
self.rayon_runtime_mapping
.insert(name.clone().into_boxed_str(), name.clone().into_boxed_str());
.insert(name.clone(), name.clone());
}
for (k, v) in config.rayon_runtime_mapping.iter() {
self.rayon_runtime_mapping
.insert(k.clone().into_boxed_str(), v.clone().into_boxed_str());
self.rayon_runtime_mapping.insert(k.clone(), v.clone());
}
}
}
Expand All @@ -68,6 +65,7 @@ impl ThreadManagerInner {
pub struct ThreadManager {
inner: Arc<ThreadManagerInner>,
}

impl Deref for ThreadManager {
type Target = ThreadManagerInner;

Expand Down Expand Up @@ -110,13 +108,16 @@ impl ThreadManager {
fn lookup<'a, T>(
&'a self,
name: &str,
mapping: &HashMap<ConstString, ConstString>,
runtimes: &'a HashMap<ConstString, T>,
mapping: &HashMap<String, String>,
runtimes: &'a HashMap<String, T>,
) -> Option<&'a T> {
match mapping.get(name) {
Some(n) => runtimes.get(n),
None => match mapping.get("default") {
Some(n) => runtimes.get(n),
Some(n) => {
log::warn!("Falling back to default runtime for {name}");
runtimes.get(n)
}
None => None,
},
}
Expand All @@ -133,7 +134,7 @@ impl ThreadManager {
if let Some(runtime) = self.try_get_native(name) {
runtime
} else {
panic!("Native thread pool {name} not configured!");
panic!("Native thread pool for {name} can not be found!");
}
}

Expand All @@ -145,7 +146,7 @@ impl ThreadManager {
if let Some(runtime) = self.try_get_rayon(name) {
runtime
} else {
panic!("Rayon thread pool {name} not configured!");
panic!("Rayon thread pool for {name} can not be found!");
}
}

Expand All @@ -157,44 +158,35 @@ impl ThreadManager {
if let Some(runtime) = self.try_get_tokio(name) {
runtime
} else {
panic!("Tokio runtime {name} not configured!");
panic!("Tokio thread pool for {name} can not be found!");
}
}

pub fn set_process_affinity(config: &ThreadManagerConfig) -> anyhow::Result<Vec<usize>> {
let chosen_cores_mask = config.default_core_allocation.as_core_mask_vector();

crate::policy::set_thread_affinity(&chosen_cores_mask);
Ok(chosen_cores_mask)
}

pub fn new(config: ThreadManagerConfig) -> anyhow::Result<Self> {
let mut core_allocations = HashMap::<ConstString, Vec<usize>>::new();
let mut core_allocations = HashMap::<String, Vec<usize>>::new();
Self::set_process_affinity(&config)?;
let mut manager = ThreadManagerInner::default();
manager.populate_mappings(&config);
for (name, cfg) in config.native_configs.iter() {
let nrt = NativeThreadRuntime::new(name.clone(), cfg.clone());
manager
.native_thread_runtimes
.insert(name.clone().into_boxed_str(), nrt);
manager.native_thread_runtimes.insert(name.clone(), nrt);
}
for (name, cfg) in config.rayon_configs.iter() {
let rrt = RayonRuntime::new(name.clone(), cfg.clone())?;
manager
.rayon_runtimes
.insert(name.clone().into_boxed_str(), rrt);
manager.rayon_runtimes.insert(name.clone(), rrt);
}

for (name, cfg) in config.tokio_configs.iter() {
let tokiort = TokioRuntime::new(name.clone(), cfg.clone())?;

core_allocations.insert(
name.clone().into_boxed_str(),
cfg.core_allocation.as_core_mask_vector(),
);
manager
.tokio_runtimes
.insert(name.clone().into_boxed_str(), tokiort);
core_allocations.insert(name.clone(), cfg.core_allocation.as_core_mask_vector());
manager.tokio_runtimes.insert(name.clone(), tokiort);
}
Ok(Self {
inner: Arc::new(manager),
Expand Down
7 changes: 6 additions & 1 deletion thread-manager/src/native_thread_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ impl<T> JoinHandle<T> {
impl<T> Drop for JoinHandle<T> {
fn drop(&mut self) {
if self.std_handle.is_some() {
warn!("Attempting to drop a Join Handle of a running thread will leak thread IDs, please join your managed threads!");
warn!("Attempting to drop a Join Handle of a running thread will leak thread IDs, please join your threads!");
self.join_inner().expect("Child thread panicked");
}
}
Expand Down Expand Up @@ -155,4 +155,9 @@ impl NativeThreadRuntime {
running_count: self.running_count.clone(),
})
}

#[cfg(feature = "dev-context-only-utils")]
pub fn new_for_tests(name: &str) -> Self {
Self::new(name.to_owned(), NativeConfig::default())
}
}
98 changes: 49 additions & 49 deletions thread-manager/src/policy.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
#[cfg(target_os = "linux")]
use thread_priority::{NormalThreadSchedulePolicy, ThreadExt, ThreadSchedulePolicy};
cfg_if::cfg_if! {
if #[cfg(target_os = "linux")]{
use thread_priority::{NormalThreadSchedulePolicy, ThreadExt, ThreadSchedulePolicy};
}
else{
#[derive(Clone, Copy)]
pub(crate) struct ThreadSchedulePolicy {}
}
}
use {
serde::{Deserialize, Serialize},
std::sync::OnceLock,
};

#[cfg(not(target_os = "linux"))]
#[derive(Clone, Copy)]
pub(crate) struct ThreadSchedulePolicy {}

static CORE_COUNT: OnceLock<usize> = OnceLock::new();

pub const DEFAULT_PRIORITY: u8 = 0;
Expand All @@ -34,53 +37,50 @@ impl CoreAllocation {
}
}
}
cfg_if::cfg_if! {
if #[cfg(target_os = "linux")]{

#[cfg(target_os = "linux")]
pub fn set_thread_affinity(cores: &[usize]) {
assert!(
!cores.is_empty(),
"Can not call setaffinity with empty cores mask"
);
if let Err(e) = affinity::set_thread_affinity(cores) {
let thread = std::thread::current();
panic!(
"Can not set core affinity {:?} for thread {:?} named {:?}, error {}",
cores,
thread.id(),
thread.name(),
e
);
}
}

#[cfg(not(target_os = "linux"))]
pub fn set_thread_affinity(_cores: &[usize]) {}

#[cfg(target_os = "linux")]
pub fn parse_policy(policy: &str) -> ThreadSchedulePolicy {
match policy.to_uppercase().as_ref() {
"BATCH" => ThreadSchedulePolicy::Normal(NormalThreadSchedulePolicy::Batch),
"OTHER" => ThreadSchedulePolicy::Normal(NormalThreadSchedulePolicy::Other),
"IDLE" => ThreadSchedulePolicy::Normal(NormalThreadSchedulePolicy::Idle),
_ => panic!("Could not parse the policy"),
pub fn set_thread_affinity(cores: &[usize]) {
assert!(
!cores.is_empty(),
"Can not call setaffinity with empty cores mask"
);
if let Err(e) = affinity::set_thread_affinity(cores) {
let thread = std::thread::current();
panic!(
"Can not set core affinity {:?} for thread {:?} named {:?}, error {}",
cores,
thread.id(),
thread.name(),
e
);
}
}
fn apply_thread_scheduler_policy(policy: ThreadSchedulePolicy, priority: u8) {
if let Err(e) = std::thread::current().set_priority_and_policy(
policy,
thread_priority::ThreadPriority::Crossplatform((priority).try_into().expect("Priority value outside of OS-supported range")),
) {
panic!("Can not set thread priority, OS error {:?}", e);
}
}
pub fn parse_policy(policy: &str) -> ThreadSchedulePolicy {
match policy.to_uppercase().as_ref() {
"BATCH" => ThreadSchedulePolicy::Normal(NormalThreadSchedulePolicy::Batch),
"OTHER" => ThreadSchedulePolicy::Normal(NormalThreadSchedulePolicy::Other),
"IDLE" => ThreadSchedulePolicy::Normal(NormalThreadSchedulePolicy::Idle),
_ => panic!("Could not parse the policy"),
}
}
}
}
else{

#[cfg(not(target_os = "linux"))]
pub(crate) fn parse_policy(_policy: &str) -> ThreadSchedulePolicy {
ThreadSchedulePolicy {}
}

#[cfg(not(target_os = "linux"))]
fn apply_thread_scheduler_policy(_policy: ThreadSchedulePolicy, _priority: u8) {}
pub fn set_thread_affinity(_cores: &[usize]) {}

#[cfg(target_os = "linux")]
fn apply_thread_scheduler_policy(policy: ThreadSchedulePolicy, priority: u8) {
if let Err(e) = std::thread::current().set_priority_and_policy(
policy,
thread_priority::ThreadPriority::Crossplatform((priority).try_into().unwrap()),
) {
panic!("Can not set thread priority, OS error {:?}", e);
pub(crate) fn parse_policy(_policy: &str) -> ThreadSchedulePolicy {
ThreadSchedulePolicy {}
}
fn apply_thread_scheduler_policy(_policy: ThreadSchedulePolicy, _priority: u8) {}
}
}

Expand Down
4 changes: 3 additions & 1 deletion thread-manager/src/rayon_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,9 @@ impl RayonRuntime {
})
}

#[cfg(feature = "dev-context-only-utils")]
pub fn new_for_tests(name: &str) -> Self {
Self::new(name.to_owned(), RayonConfig::default()).unwrap()
Self::new(name.to_owned(), RayonConfig::default())
.expect("Failed to create rayon runtime for tests")
}
}
Empty file.
Loading

0 comments on commit 3b56edf

Please sign in to comment.