Skip to content

Commit 60a04f8

Browse files
committed
cleaner api
1 parent a4e0afb commit 60a04f8

8 files changed

+177
-62
lines changed

thread-manager/README.md

+18
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,24 @@ and context switches that would occur if Rayon was entirely unaware it was runni
88
tokio, and each was to spawn as many threads as there are cores.
99

1010
# Supported threading models
11+
## Affinity
12+
All threading models allow setting core affinity, but only on linux
13+
14+
For core affinity you can set e.g.
15+
```toml
16+
core_allocation.DedicatedCoreSet = { min = 16, max = 64 }
17+
```
18+
to pin the pool to cores 16-64.
19+
20+
## Scheduling policy and priority
21+
If you want you can set thread scheduling policy and priority. Keep in mind that this will likely require
22+
```bash
23+
sudo setcap cap_sys_nice+ep
24+
```
25+
or root priviledges to run the resulting process.
26+
To see which policies are supported check (the sources)[./src/policy.rs]
27+
If you use realtime policies, priority to values from 1 (lowest) to 99 (highest) are possible.
28+
1129
## Tokio
1230
Multiple tokio runtimes can be created, and each may be assigned its own pool of CPU cores to run on.
1331
Number of worker and blocking threads is configurable, as are thread priorities for the pool.

thread-manager/examples/core_contention_basics.rs

+2-6
Original file line numberDiff line numberDiff line change
@@ -57,13 +57,9 @@ fn main() -> anyhow::Result<()> {
5757
let cfg: ThreadManagerConfig = toml::from_str(&buf)?;
5858

5959
let manager = ThreadManager::new(cfg).unwrap();
60-
let tokio1 = manager
61-
.get_tokio("axum1")
62-
.expect("Expecting runtime named axum1");
60+
let tokio1 = manager.get_tokio("axum1");
6361
tokio1.start_metrics_sampling(Duration::from_secs(1));
64-
let tokio2 = manager
65-
.get_tokio("axum2")
66-
.expect("Expecting runtime named axum2");
62+
let tokio2 = manager.get_tokio("axum2");
6763
tokio2.start_metrics_sampling(Duration::from_secs(1));
6864

6965
let wrk_cores: Vec<_> = (32..64).collect();

thread-manager/examples/core_contention_sweep.rs

+3-24
Original file line numberDiff line numberDiff line change
@@ -109,36 +109,15 @@ fn main() -> anyhow::Result<()> {
109109
let (tokio1, tokio2) = match regime {
110110
Regime::Shared => {
111111
manager = ThreadManager::new(make_config_shared(core_count)).unwrap();
112-
(
113-
manager
114-
.get_tokio("axum1")
115-
.expect("Expecting runtime named axum1"),
116-
manager
117-
.get_tokio("axum2")
118-
.expect("Expecting runtime named axum2"),
119-
)
112+
(manager.get_tokio("axum1"), manager.get_tokio("axum2"))
120113
}
121114
Regime::Dedicated => {
122115
manager = ThreadManager::new(make_config_dedicated(core_count)).unwrap();
123-
(
124-
manager
125-
.get_tokio("axum1")
126-
.expect("Expecting runtime named axum1"),
127-
manager
128-
.get_tokio("axum2")
129-
.expect("Expecting runtime named axum2"),
130-
)
116+
(manager.get_tokio("axum1"), manager.get_tokio("axum2"))
131117
}
132118
Regime::Single => {
133119
manager = ThreadManager::new(make_config_shared(core_count)).unwrap();
134-
(
135-
manager
136-
.get_tokio("axum1")
137-
.expect("Expecting runtime named axum1"),
138-
manager
139-
.get_tokio("axum2")
140-
.expect("Expecting runtime named axum2"),
141-
)
120+
(manager.get_tokio("axum1"), manager.get_tokio("axum2"))
142121
}
143122
};
144123

thread-manager/src/lib.rs

+102-7
Original file line numberDiff line numberDiff line change
@@ -120,22 +120,44 @@ impl ThreadManager {
120120
}
121121
}
122122

123-
pub fn get_native(&self, name: &str) -> Option<&NativeThreadRuntime> {
123+
pub fn try_get_native(&self, name: &str) -> Option<&NativeThreadRuntime> {
124124
self.lookup(
125125
name,
126126
&self.native_runtime_mapping,
127127
&self.native_thread_runtimes,
128128
)
129129
}
130+
pub fn get_native(&self, name: &str) -> &NativeThreadRuntime {
131+
if let Some(runtime) = self.try_get_native(name) {
132+
runtime
133+
} else {
134+
panic!("Native thread pool {name} not configured!");
135+
}
136+
}
130137

131-
pub fn get_rayon(&self, name: &str) -> Option<&RayonRuntime> {
138+
pub fn try_get_rayon(&self, name: &str) -> Option<&RayonRuntime> {
132139
self.lookup(name, &self.rayon_runtime_mapping, &self.rayon_runtimes)
133140
}
134141

135-
pub fn get_tokio(&self, name: &str) -> Option<&TokioRuntime> {
142+
pub fn get_rayon(&self, name: &str) -> &RayonRuntime {
143+
if let Some(runtime) = self.try_get_rayon(name) {
144+
runtime
145+
} else {
146+
panic!("Rayon thread pool {name} not configured!");
147+
}
148+
}
149+
150+
pub fn try_get_tokio(&self, name: &str) -> Option<&TokioRuntime> {
136151
self.lookup(name, &self.tokio_runtime_mapping, &self.tokio_runtimes)
137152
}
138153

154+
pub fn get_tokio(&self, name: &str) -> &TokioRuntime {
155+
if let Some(runtime) = self.try_get_tokio(name) {
156+
runtime
157+
} else {
158+
panic!("Tokio runtime {name} not configured!");
159+
}
160+
}
139161
pub fn set_process_affinity(config: &ThreadManagerConfig) -> anyhow::Result<Vec<usize>> {
140162
let chosen_cores_mask = config.default_core_allocation.as_core_mask_vector();
141163

@@ -214,6 +236,81 @@ mod tests {
214236
#[cfg(not(target_os = "linux"))]
215237
fn validate_affinity(_expect_cores: &[usize], _error_msg: &str) {}
216238

239+
/* #[test]
240+
fn thread_priority() {
241+
let priority_high = 10;
242+
let priority_default = crate::policy::DEFAULT_PRIORITY;
243+
let priority_low = 1;
244+
let conf = ThreadManagerConfig {
245+
native_configs: HashMap::from([
246+
(
247+
"high".to_owned(),
248+
NativeConfig {
249+
priority: priority_high,
250+
..Default::default()
251+
},
252+
),
253+
(
254+
"default".to_owned(),
255+
NativeConfig {
256+
..Default::default()
257+
},
258+
),
259+
(
260+
"low".to_owned(),
261+
NativeConfig {
262+
priority: priority_low,
263+
..Default::default()
264+
},
265+
),
266+
]),
267+
..Default::default()
268+
};
269+
270+
let manager = ThreadManager::new(conf).unwrap();
271+
let high = manager.get_native("high");
272+
let low = manager.get_native("low");
273+
let default = manager.get_native("default");
274+
275+
high.spawn(move || {
276+
let prio =
277+
thread_priority::get_thread_priority(thread_priority::thread_native_id()).unwrap();
278+
assert_eq!(
279+
prio,
280+
thread_priority::ThreadPriority::Crossplatform((priority_high).try_into().unwrap())
281+
);
282+
})
283+
.unwrap()
284+
.join()
285+
.unwrap();
286+
low.spawn(move || {
287+
let prio =
288+
thread_priority::get_thread_priority(thread_priority::thread_native_id()).unwrap();
289+
assert_eq!(
290+
prio,
291+
thread_priority::ThreadPriority::Crossplatform((priority_low).try_into().unwrap())
292+
);
293+
})
294+
.unwrap()
295+
.join()
296+
.unwrap();
297+
default
298+
.spawn(move || {
299+
let prio =
300+
thread_priority::get_thread_priority(thread_priority::thread_native_id())
301+
.unwrap();
302+
assert_eq!(
303+
prio,
304+
thread_priority::ThreadPriority::Crossplatform(
305+
(priority_default).try_into().unwrap()
306+
)
307+
);
308+
})
309+
.unwrap()
310+
.join()
311+
.unwrap();
312+
}*/
313+
217314
#[test]
218315
fn process_affinity() {
219316
let conf = ThreadManagerConfig {
@@ -222,7 +319,6 @@ mod tests {
222319
NativeConfig {
223320
core_allocation: CoreAllocation::DedicatedCoreSet { min: 0, max: 4 },
224321
max_threads: 5,
225-
priority: 0,
226322
..Default::default()
227323
},
228324
)]),
@@ -232,7 +328,7 @@ mod tests {
232328
};
233329

234330
let manager = ThreadManager::new(conf).unwrap();
235-
let runtime = manager.get_native("test").unwrap();
331+
let runtime = manager.get_native("test");
236332

237333
let thread1 = runtime
238334
.spawn(|| {
@@ -263,7 +359,6 @@ mod tests {
263359
RayonConfig {
264360
core_allocation: CoreAllocation::DedicatedCoreSet { min: 1, max: 4 },
265361
worker_threads: 3,
266-
priority: 0,
267362
..Default::default()
268363
},
269364
)]),
@@ -273,7 +368,7 @@ mod tests {
273368
};
274369

275370
let manager = ThreadManager::new(conf).unwrap();
276-
let rayon_runtime = manager.get_rayon("test").unwrap();
371+
let rayon_runtime = manager.get_rayon("test");
277372

278373
let _rr = rayon_runtime.rayon_pool.broadcast(|ctx| {
279374
println!("Rayon thread {} reporting", ctx.index());

thread-manager/src/native_thread_runtime.rs

+7-3
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use {
2-
crate::policy::{apply_policy, CoreAllocation},
2+
crate::policy::{apply_policy, parse_policy, CoreAllocation},
33
anyhow::bail,
44
log::error,
55
serde::{Deserialize, Serialize},
@@ -18,7 +18,9 @@ use {
1818
pub struct NativeConfig {
1919
pub core_allocation: CoreAllocation,
2020
pub max_threads: usize,
21+
/// Priority in range 1..99
2122
pub priority: u8,
23+
pub policy: String,
2224
pub stack_size_bytes: usize,
2325
}
2426

@@ -27,7 +29,8 @@ impl Default for NativeConfig {
2729
Self {
2830
core_allocation: CoreAllocation::OsDefault,
2931
max_threads: 16,
30-
priority: 0,
32+
priority: crate::policy::DEFAULT_PRIORITY,
33+
policy: "OTHER".to_owned(),
3134
stack_size_bytes: 2 * 1024 * 1024,
3235
}
3336
}
@@ -131,12 +134,13 @@ impl NativeThreadRuntime {
131134

132135
let core_alloc = self.config.core_allocation.clone();
133136
let priority = self.config.priority;
137+
let policy = parse_policy(&self.config.policy);
134138
let chosen_cores_mask = Mutex::new(self.config.core_allocation.as_core_mask_vector());
135139
let jh = std::thread::Builder::new()
136140
.name(name)
137141
.stack_size(self.config.stack_size_bytes)
138142
.spawn(move || {
139-
apply_policy(&core_alloc, priority, &chosen_cores_mask);
143+
apply_policy(&core_alloc, policy, priority, &chosen_cores_mask);
140144
f()
141145
})?;
142146
let rc = self.running_count.fetch_add(1, Ordering::Relaxed);

thread-manager/src/policy.rs

+19-6
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,13 @@
11
use {
22
serde::{Deserialize, Serialize},
33
std::sync::OnceLock,
4-
thread_priority::ThreadExt,
4+
thread_priority::{NormalThreadSchedulePolicy, ThreadExt, ThreadSchedulePolicy},
55
};
66

77
static CORE_COUNT: OnceLock<usize> = OnceLock::new();
88

9+
pub const DEFAULT_PRIORITY: u8 = 0;
10+
911
#[derive(Default, Debug, Clone, Serialize, Deserialize)]
1012
pub enum CoreAllocation {
1113
///Use OS default allocation (i.e. do not alter core affinity)
@@ -50,17 +52,28 @@ pub fn set_thread_affinity(cores: &[usize]) {
5052
#[cfg(not(target_os = "linux"))]
5153
pub fn set_thread_affinity(_cores: &[usize]) {}
5254

55+
pub fn parse_policy(policy: &str) -> ThreadSchedulePolicy {
56+
match policy.to_uppercase().as_ref() {
57+
"BATCH" => ThreadSchedulePolicy::Normal(NormalThreadSchedulePolicy::Batch),
58+
"OTHER" => ThreadSchedulePolicy::Normal(NormalThreadSchedulePolicy::Other),
59+
"IDLE" => ThreadSchedulePolicy::Normal(NormalThreadSchedulePolicy::Idle),
60+
_ => panic!("Could not parse the policy"),
61+
}
62+
}
63+
5364
///Applies policy to the calling thread
5465
pub fn apply_policy(
5566
alloc: &CoreAllocation,
67+
policy: ThreadSchedulePolicy,
5668
priority: u8,
5769
chosen_cores_mask: &std::sync::Mutex<Vec<usize>>,
5870
) {
59-
std::thread::current()
60-
.set_priority(thread_priority::ThreadPriority::Crossplatform(
61-
(priority).try_into().unwrap(),
62-
))
63-
.expect("Can not set thread priority!");
71+
if let Err(e) = std::thread::current().set_priority_and_policy(
72+
policy,
73+
thread_priority::ThreadPriority::Crossplatform((priority).try_into().unwrap()),
74+
) {
75+
panic!("Can not set thread priority, OS error {:?}", e);
76+
}
6477

6578
match alloc {
6679
CoreAllocation::PinnedCores { min: _, max: _ } => {

0 commit comments

Comments
 (0)