Skip to content

Commit

Permalink
address more comments by reviewers
Browse files Browse the repository at this point in the history
  • Loading branch information
alexpyattaev committed Jan 10, 2025
1 parent 8fd7713 commit bf9196e
Show file tree
Hide file tree
Showing 10 changed files with 99 additions and 91 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions thread-manager/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ publish = false

[dependencies]
anyhow = { workspace = true }
cfg-if = "1.0.0"
log = { workspace = true }
num_cpus = { workspace = true }
rayon = { workspace = true }
Expand All @@ -26,8 +27,12 @@ tower = "0.5.2"
affinity = "0.1.2"

[dev-dependencies]
agave-thread-manager = { path = ".", features = ["dev-context-only-utils"] }
axum = "0.7.9"
env_logger = { workspace = true }
hyper = { workspace = true, features = ["http1", "client", "stream", "tcp"] }
serde_json = { workspace = true }
toml = { workspace = true }

[features]
dev-context-only-utils = []
2 changes: 1 addition & 1 deletion thread-manager/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ one may want to spawn many rayon pools.


# Examples
All examples need `wrk` HTTP behnchmarking tool for load generation. Please install it before running.
All examples need `wrk` HTTP benchmarking tool for load generation. Please install it before running.

* core_contention_basics will demonstrate why core contention is bad, and how thread configs can help
* core_contention_sweep will sweep across a range of core counts to show how benefits scale with core counts
2 changes: 1 addition & 1 deletion thread-manager/examples/core_contention_basics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ fn main() -> anyhow::Result<()> {

let join_handle =
scope.spawn(|| workload_runtime.block_on(workload_main(&[8888, 8889], 1000)));
join_handle.join().expect("WRK crashed!")
join_handle.join().expect("Load generator crashed!")
});
//print out the results of the bench run
println!("Results are: {:?}", results);
Expand Down
66 changes: 29 additions & 37 deletions thread-manager/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,51 +15,48 @@ pub use {
rayon_runtime::{RayonConfig, RayonRuntime},
tokio_runtime::{TokioConfig, TokioRuntime},
};
pub type ConstString = Box<str>;

pub const MAX_THREAD_NAME_CHARS: usize = 12;

#[derive(Default, Debug)]
pub struct ThreadManagerInner {
pub tokio_runtimes: HashMap<ConstString, TokioRuntime>,
pub tokio_runtime_mapping: HashMap<ConstString, ConstString>,
pub tokio_runtimes: HashMap<String, TokioRuntime>,
pub tokio_runtime_mapping: HashMap<String, String>,

pub native_thread_runtimes: HashMap<ConstString, NativeThreadRuntime>,
pub native_runtime_mapping: HashMap<ConstString, ConstString>,
pub native_thread_runtimes: HashMap<String, NativeThreadRuntime>,
pub native_runtime_mapping: HashMap<String, String>,

pub rayon_runtimes: HashMap<ConstString, RayonRuntime>,
pub rayon_runtime_mapping: HashMap<ConstString, ConstString>,
pub rayon_runtimes: HashMap<String, RayonRuntime>,
pub rayon_runtime_mapping: HashMap<String, String>,
}

impl ThreadManagerInner {
/// Populates mappings with copies of config names, overrides as appropriate
fn populate_mappings(&mut self, config: &ThreadManagerConfig) {
//TODO: this should probably be cleaned up with a macro at some point...

for name in config.native_configs.keys() {
self.native_runtime_mapping
.insert(name.clone().into_boxed_str(), name.clone().into_boxed_str());
.insert(name.clone(), name.clone());
}
for (k, v) in config.native_runtime_mapping.iter() {
self.native_runtime_mapping
.insert(k.clone().into_boxed_str(), v.clone().into_boxed_str());
self.native_runtime_mapping.insert(k.clone(), v.clone());
}

for name in config.tokio_configs.keys() {
self.tokio_runtime_mapping
.insert(name.clone().into_boxed_str(), name.clone().into_boxed_str());
.insert(name.clone(), name.clone());
}
for (k, v) in config.tokio_runtime_mapping.iter() {
self.tokio_runtime_mapping
.insert(k.clone().into_boxed_str(), v.clone().into_boxed_str());
self.tokio_runtime_mapping.insert(k.clone(), v.clone());
}

for name in config.rayon_configs.keys() {
self.rayon_runtime_mapping
.insert(name.clone().into_boxed_str(), name.clone().into_boxed_str());
.insert(name.clone(), name.clone());
}
for (k, v) in config.rayon_runtime_mapping.iter() {
self.rayon_runtime_mapping
.insert(k.clone().into_boxed_str(), v.clone().into_boxed_str());
self.rayon_runtime_mapping.insert(k.clone(), v.clone());
}
}
}
Expand All @@ -68,6 +65,7 @@ impl ThreadManagerInner {
pub struct ThreadManager {
inner: Arc<ThreadManagerInner>,
}

impl Deref for ThreadManager {
type Target = ThreadManagerInner;

Expand Down Expand Up @@ -110,13 +108,16 @@ impl ThreadManager {
fn lookup<'a, T>(
&'a self,
name: &str,
mapping: &HashMap<ConstString, ConstString>,
runtimes: &'a HashMap<ConstString, T>,
mapping: &HashMap<String, String>,
runtimes: &'a HashMap<String, T>,
) -> Option<&'a T> {
match mapping.get(name) {
Some(n) => runtimes.get(n),
None => match mapping.get("default") {
Some(n) => runtimes.get(n),
Some(n) => {
log::warn!("Falling back to default runtime for {name}");
runtimes.get(n)
}
None => None,
},
}
Expand All @@ -133,7 +134,7 @@ impl ThreadManager {
if let Some(runtime) = self.try_get_native(name) {
runtime
} else {
panic!("Native thread pool {name} not configured!");
panic!("Native thread pool for {name} can not be found!");
}
}

Expand All @@ -145,7 +146,7 @@ impl ThreadManager {
if let Some(runtime) = self.try_get_rayon(name) {
runtime
} else {
panic!("Rayon thread pool {name} not configured!");
panic!("Rayon thread pool for {name} can not be found!");
}
}

Expand All @@ -157,44 +158,35 @@ impl ThreadManager {
if let Some(runtime) = self.try_get_tokio(name) {
runtime
} else {
panic!("Tokio runtime {name} not configured!");
panic!("Tokio thread pool for {name} can not be found!");
}
}

pub fn set_process_affinity(config: &ThreadManagerConfig) -> anyhow::Result<Vec<usize>> {
let chosen_cores_mask = config.default_core_allocation.as_core_mask_vector();

crate::policy::set_thread_affinity(&chosen_cores_mask);
Ok(chosen_cores_mask)
}

pub fn new(config: ThreadManagerConfig) -> anyhow::Result<Self> {
let mut core_allocations = HashMap::<ConstString, Vec<usize>>::new();
let mut core_allocations = HashMap::<String, Vec<usize>>::new();
Self::set_process_affinity(&config)?;
let mut manager = ThreadManagerInner::default();
manager.populate_mappings(&config);
for (name, cfg) in config.native_configs.iter() {
let nrt = NativeThreadRuntime::new(name.clone(), cfg.clone());
manager
.native_thread_runtimes
.insert(name.clone().into_boxed_str(), nrt);
manager.native_thread_runtimes.insert(name.clone(), nrt);
}
for (name, cfg) in config.rayon_configs.iter() {
let rrt = RayonRuntime::new(name.clone(), cfg.clone())?;
manager
.rayon_runtimes
.insert(name.clone().into_boxed_str(), rrt);
manager.rayon_runtimes.insert(name.clone(), rrt);
}

for (name, cfg) in config.tokio_configs.iter() {
let tokiort = TokioRuntime::new(name.clone(), cfg.clone())?;

core_allocations.insert(
name.clone().into_boxed_str(),
cfg.core_allocation.as_core_mask_vector(),
);
manager
.tokio_runtimes
.insert(name.clone().into_boxed_str(), tokiort);
core_allocations.insert(name.clone(), cfg.core_allocation.as_core_mask_vector());
manager.tokio_runtimes.insert(name.clone(), tokiort);
}
Ok(Self {
inner: Arc::new(manager),
Expand Down
7 changes: 6 additions & 1 deletion thread-manager/src/native_thread_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ impl<T> JoinHandle<T> {
impl<T> Drop for JoinHandle<T> {
fn drop(&mut self) {
if self.std_handle.is_some() {
warn!("Attempting to drop a Join Handle of a running thread will leak thread IDs, please join your managed threads!");
warn!("Attempting to drop a Join Handle of a running thread will leak thread IDs, please join your threads!");
self.join_inner().expect("Child thread panicked");
}
}
Expand Down Expand Up @@ -155,4 +155,9 @@ impl NativeThreadRuntime {
running_count: self.running_count.clone(),
})
}

#[cfg(feature = "dev-context-only-utils")]
pub fn new_for_tests(name: &str) -> Self {
Self::new(name.to_owned(), NativeConfig::default())
}
}
98 changes: 49 additions & 49 deletions thread-manager/src/policy.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
#[cfg(target_os = "linux")]
use thread_priority::{NormalThreadSchedulePolicy, ThreadExt, ThreadSchedulePolicy};
cfg_if::cfg_if! {
if #[cfg(target_os = "linux")]{
use thread_priority::{NormalThreadSchedulePolicy, ThreadExt, ThreadSchedulePolicy};
}
else{
#[derive(Clone, Copy)]
pub(crate) struct ThreadSchedulePolicy {}
}
}
use {
serde::{Deserialize, Serialize},
std::sync::OnceLock,
};

#[cfg(not(target_os = "linux"))]
#[derive(Clone, Copy)]
pub(crate) struct ThreadSchedulePolicy {}

static CORE_COUNT: OnceLock<usize> = OnceLock::new();

pub const DEFAULT_PRIORITY: u8 = 0;
Expand All @@ -34,53 +37,50 @@ impl CoreAllocation {
}
}
}
cfg_if::cfg_if! {
if #[cfg(target_os = "linux")]{

#[cfg(target_os = "linux")]
pub fn set_thread_affinity(cores: &[usize]) {
assert!(
!cores.is_empty(),
"Can not call setaffinity with empty cores mask"
);
if let Err(e) = affinity::set_thread_affinity(cores) {
let thread = std::thread::current();
panic!(
"Can not set core affinity {:?} for thread {:?} named {:?}, error {}",
cores,
thread.id(),
thread.name(),
e
);
}
}

#[cfg(not(target_os = "linux"))]
pub fn set_thread_affinity(_cores: &[usize]) {}

#[cfg(target_os = "linux")]
pub fn parse_policy(policy: &str) -> ThreadSchedulePolicy {
match policy.to_uppercase().as_ref() {
"BATCH" => ThreadSchedulePolicy::Normal(NormalThreadSchedulePolicy::Batch),
"OTHER" => ThreadSchedulePolicy::Normal(NormalThreadSchedulePolicy::Other),
"IDLE" => ThreadSchedulePolicy::Normal(NormalThreadSchedulePolicy::Idle),
_ => panic!("Could not parse the policy"),
pub fn set_thread_affinity(cores: &[usize]) {
assert!(
!cores.is_empty(),
"Can not call setaffinity with empty cores mask"
);
if let Err(e) = affinity::set_thread_affinity(cores) {
let thread = std::thread::current();
panic!(
"Can not set core affinity {:?} for thread {:?} named {:?}, error {}",
cores,
thread.id(),
thread.name(),
e
);
}
}
fn apply_thread_scheduler_policy(policy: ThreadSchedulePolicy, priority: u8) {
if let Err(e) = std::thread::current().set_priority_and_policy(
policy,
thread_priority::ThreadPriority::Crossplatform((priority).try_into().expect("Priority value outside of OS-supported range")),
) {
panic!("Can not set thread priority, OS error {:?}", e);
}
}
pub fn parse_policy(policy: &str) -> ThreadSchedulePolicy {
match policy.to_uppercase().as_ref() {
"BATCH" => ThreadSchedulePolicy::Normal(NormalThreadSchedulePolicy::Batch),
"OTHER" => ThreadSchedulePolicy::Normal(NormalThreadSchedulePolicy::Other),
"IDLE" => ThreadSchedulePolicy::Normal(NormalThreadSchedulePolicy::Idle),
_ => panic!("Could not parse the policy"),
}
}
}
}
else{

#[cfg(not(target_os = "linux"))]
pub(crate) fn parse_policy(_policy: &str) -> ThreadSchedulePolicy {
ThreadSchedulePolicy {}
}

#[cfg(not(target_os = "linux"))]
fn apply_thread_scheduler_policy(_policy: ThreadSchedulePolicy, _priority: u8) {}
pub fn set_thread_affinity(_cores: &[usize]) {}

#[cfg(target_os = "linux")]
fn apply_thread_scheduler_policy(policy: ThreadSchedulePolicy, priority: u8) {
if let Err(e) = std::thread::current().set_priority_and_policy(
policy,
thread_priority::ThreadPriority::Crossplatform((priority).try_into().unwrap()),
) {
panic!("Can not set thread priority, OS error {:?}", e);
pub(crate) fn parse_policy(_policy: &str) -> ThreadSchedulePolicy {
ThreadSchedulePolicy {}
}
fn apply_thread_scheduler_policy(_policy: ThreadSchedulePolicy, _priority: u8) {}
}
}

Expand Down
4 changes: 3 additions & 1 deletion thread-manager/src/rayon_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,9 @@ impl RayonRuntime {
})
}

#[cfg(feature = "dev-context-only-utils")]
pub fn new_for_tests(name: &str) -> Self {
Self::new(name.to_owned(), RayonConfig::default()).unwrap()
Self::new(name.to_owned(), RayonConfig::default())
.expect("Failed to create rayon runtime for tests")
}
}
Empty file.
4 changes: 3 additions & 1 deletion thread-manager/src/tokio_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,12 +136,14 @@ impl TokioRuntime {
}

/// Makes test runtime with 2 threads, only for unittests
#[cfg(feature = "dev-context-only-utils")]
pub fn new_for_tests() -> Self {
let cfg = TokioConfig {
worker_threads: 2,
..Default::default()
};
TokioRuntime::new("solNetTest".to_owned(), cfg.clone()).unwrap()
TokioRuntime::new("solNetTest".to_owned(), cfg.clone())
.expect("Failed to create Tokio runtime for tests")
}
}

Expand Down

0 comments on commit bf9196e

Please sign in to comment.