Skip to content

Commit 0c62550

Browse files
committed
geyser: add worker_threads and affinity (#481)
1 parent 75df985 commit 0c62550

File tree

9 files changed

+141
-14
lines changed

9 files changed

+141
-14
lines changed

CHANGELOG.md

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,21 @@ The minor version will be incremented upon a breaking change and the patch versi
1515
### Features
1616

1717
- proto: add tonic feature ([#474](https://github.com/rpcpool/yellowstone-grpc/pull/474))
18-
- geyser: use default compression as gzip and zstd ([#475](https://github.com/rpcpool/yellowstone-grpc/pull/475))
19-
- example: add connection options to Rust client ([#478](https://github.com/rpcpool/yellowstone-grpc/pull/478))
20-
- nodejs: add parse tx function ([#471](https://github.com/rpcpool/yellowstone-grpc/pull/471))
2118

2219
### Breaking
2320

21+
## 2024-12-01
22+
23+
- yellowstone-grpc-client-simple-3.1.0
24+
- yellowstone-grpc-geyser-3.1.0
25+
26+
### Features
27+
28+
- nodejs: add parse tx function ([#471](https://github.com/rpcpool/yellowstone-grpc/pull/471))
29+
- geyser: use default compression as gzip and zstd ([#475](https://github.com/rpcpool/yellowstone-grpc/pull/475))
30+
- example: add connection options to Rust client ([#478](https://github.com/rpcpool/yellowstone-grpc/pull/478))
31+
- geyser: add worker_threads and affinity ([#481](https://github.com/rpcpool/yellowstone-grpc/pull/481))
32+
2433
## 2024-11-28
2534

2635
- yellowstone-grpc-geyser-3.0.1

Cargo.lock

Lines changed: 25 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
[workspace]
22
resolver = "2"
33
members = [
4-
"examples/rust", # 3.0.0
4+
"examples/rust", # 3.1.0
55
"yellowstone-grpc-client", # 3.0.0
6-
"yellowstone-grpc-geyser", # 3.0.1
6+
"yellowstone-grpc-geyser", # 3.1.0
77
"yellowstone-grpc-proto", # 3.0.0
88
]
99
exclude = [
@@ -20,6 +20,7 @@ keywords = ["solana"]
2020
publish = false
2121

2222
[workspace.dependencies]
23+
affinity = "0.1.2"
2324
agave-geyser-plugin-interface = "~2.0.15"
2425
anyhow = "1.0.62"
2526
backoff = "0.4.0"

examples/rust/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "yellowstone-grpc-client-simple"
3-
version = "3.0.0"
3+
version = "3.1.0"
44
authors = { workspace = true }
55
edition = { workspace = true }
66
homepage = { workspace = true }

yellowstone-grpc-geyser/Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "yellowstone-grpc-geyser"
3-
version = "3.0.1"
3+
version = "3.1.0"
44
authors = { workspace = true }
55
edition = { workspace = true }
66
description = "Yellowstone gRPC Geyser Plugin"
@@ -17,6 +17,7 @@ crate-type = ["cdylib", "rlib"]
1717
name = "config-check"
1818

1919
[dependencies]
20+
affinity = { workspace = true }
2021
agave-geyser-plugin-interface = { workspace = true }
2122
anyhow = { workspace = true }
2223
base64 = { workspace = true }

yellowstone-grpc-geyser/config.json

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,10 @@
33
"log": {
44
"level": "info"
55
},
6+
"tokio": {
7+
"worker_threads": 8,
8+
"affinity": "0-1,12-13"
9+
},
610
"grpc": {
711
"address": "0.0.0.0:10000",
812
"tls_config": {

yellowstone-grpc-geyser/src/config.rs

Lines changed: 71 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use {
33
GeyserPluginError, Result as PluginResult,
44
},
55
serde::{de, Deserialize, Deserializer},
6-
std::{fs::read_to_string, net::SocketAddr, path::Path, time::Duration},
6+
std::{collections::HashSet, fs::read_to_string, net::SocketAddr, path::Path, time::Duration},
77
tokio::sync::Semaphore,
88
tonic::codec::CompressionEncoding,
99
yellowstone_grpc_proto::plugin::filter::limits::FilterLimits,
@@ -15,6 +15,8 @@ pub struct Config {
1515
pub libpath: String,
1616
#[serde(default)]
1717
pub log: ConfigLog,
18+
#[serde(default)]
19+
pub tokio: ConfigTokio,
1820
pub grpc: ConfigGrpc,
1921
#[serde(default)]
2022
pub prometheus: Option<ConfigPrometheus>,
@@ -58,6 +60,74 @@ impl ConfigLog {
5860
}
5961
}
6062

63+
#[derive(Debug, Clone, Default, Deserialize)]
64+
#[serde(deny_unknown_fields)]
65+
pub struct ConfigTokio {
66+
/// Number of worker threads in Tokio runtime
67+
pub worker_threads: Option<usize>,
68+
/// Threads affinity
69+
#[serde(deserialize_with = "ConfigTokio::deserialize_affinity")]
70+
pub affinity: Option<Vec<usize>>,
71+
}
72+
73+
impl ConfigTokio {
74+
fn deserialize_affinity<'de, D>(deserializer: D) -> Result<Option<Vec<usize>>, D::Error>
75+
where
76+
D: Deserializer<'de>,
77+
{
78+
match Option::<&str>::deserialize(deserializer)? {
79+
Some(taskset) => parse_taskset(taskset).map(Some).map_err(de::Error::custom),
80+
None => Ok(None),
81+
}
82+
}
83+
}
84+
85+
fn parse_taskset(taskset: &str) -> Result<Vec<usize>, String> {
86+
let mut set = HashSet::new();
87+
for taskset2 in taskset.split(',') {
88+
match taskset2.split_once('-') {
89+
Some((start, end)) => {
90+
let start: usize = start
91+
.parse()
92+
.map_err(|_error| format!("failed to parse {start:?} from {taskset:?}"))?;
93+
let end: usize = end
94+
.parse()
95+
.map_err(|_error| format!("failed to parse {end:?} from {taskset:?}"))?;
96+
if start > end {
97+
return Err(format!("invalid interval {taskset2:?} in {taskset:?}"));
98+
}
99+
for idx in start..=end {
100+
set.insert(idx);
101+
}
102+
}
103+
None => {
104+
set.insert(
105+
taskset2.parse().map_err(|_error| {
106+
format!("failed to parse {taskset2:?} from {taskset:?}")
107+
})?,
108+
);
109+
}
110+
}
111+
}
112+
113+
let mut vec = set.into_iter().collect::<Vec<usize>>();
114+
vec.sort();
115+
116+
if let Some(set_max_index) = vec.last().copied() {
117+
let max_index = affinity::get_thread_affinity()
118+
.map_err(|_err| "failed to get affinity".to_owned())?
119+
.into_iter()
120+
.max()
121+
.unwrap_or(0);
122+
123+
if set_max_index > max_index {
124+
return Err(format!("core index must be in the range [0, {max_index}]"));
125+
}
126+
}
127+
128+
Ok(vec)
129+
}
130+
61131
#[derive(Debug, Clone, Deserialize)]
62132
#[serde(deny_unknown_fields)]
63133
pub struct ConfigGrpc {

yellowstone-grpc-geyser/src/grpc.rs

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use {
22
crate::{
3-
config::ConfigGrpc,
3+
config::{ConfigGrpc, ConfigTokio},
44
metrics::{self, DebugClientMessage},
55
version::GrpcVersionInfo,
66
},
@@ -309,6 +309,7 @@ pub struct GrpcService {
309309
impl GrpcService {
310310
#[allow(clippy::type_complexity)]
311311
pub async fn create(
312+
config_tokio: ConfigTokio,
312313
config: ConfigGrpc,
313314
debug_clients_tx: Option<mpsc::UnboundedSender<DebugClientMessage>>,
314315
is_reload: bool,
@@ -389,9 +390,17 @@ impl GrpcService {
389390
// Run geyser message loop
390391
let (messages_tx, messages_rx) = mpsc::unbounded_channel();
391392
spawn_blocking(move || {
392-
Builder::new_multi_thread()
393+
let mut builder = Builder::new_multi_thread();
394+
if let Some(worker_threads) = config_tokio.worker_threads {
395+
builder.worker_threads(worker_threads);
396+
}
397+
if let Some(tokio_cpus) = config_tokio.affinity.clone() {
398+
builder.on_thread_start(move || {
399+
affinity::set_thread_affinity(&tokio_cpus).expect("failed to set affinity")
400+
});
401+
}
402+
builder
393403
.thread_name_fn(crate::get_thread_name)
394-
.worker_threads(4)
395404
.enable_all()
396405
.build()
397406
.expect("Failed to create a new runtime for geyser loop")

yellowstone-grpc-geyser/src/plugin.rs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,16 @@ impl GeyserPlugin for Plugin {
7171
solana_logger::setup_with_default(&config.log.level);
7272

7373
// Create inner
74-
let runtime = Builder::new_multi_thread()
74+
let mut builder = Builder::new_multi_thread();
75+
if let Some(worker_threads) = config.tokio.worker_threads {
76+
builder.worker_threads(worker_threads);
77+
}
78+
if let Some(tokio_cpus) = config.tokio.affinity.clone() {
79+
builder.on_thread_start(move || {
80+
affinity::set_thread_affinity(&tokio_cpus).expect("failed to set affinity")
81+
});
82+
}
83+
let runtime = builder
7584
.thread_name_fn(crate::get_thread_name)
7685
.enable_all()
7786
.build()
@@ -81,6 +90,7 @@ impl GeyserPlugin for Plugin {
8190
runtime.block_on(async move {
8291
let (debug_client_tx, debug_client_rx) = mpsc::unbounded_channel();
8392
let (snapshot_channel, grpc_channel, grpc_shutdown) = GrpcService::create(
93+
config.tokio,
8494
config.grpc,
8595
config.debug_clients_http.then_some(debug_client_tx),
8696
is_reload,

0 commit comments

Comments
 (0)