Skip to content

Commit 3192b55

Browse files
authored
geyser: fix replay_stored_slots (#496)
1 parent 3912e7c commit 3192b55

File tree

7 files changed

+27
-17
lines changed

7 files changed

+27
-17
lines changed

CHANGELOG.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,14 @@ The minor version will be incremented upon a breaking change and the patch versi
1616

1717
### Breaking
1818

19+
## 2024-12-15
20+
21+
- yellowstone-grpc-geyser-4.2.1
22+
23+
### Fixes
24+
25+
- geyser: fix `replay_stored_slots` ([#496](https://github.com/rpcpool/yellowstone-grpc/pull/496))
26+
1927
## 2024-12-13
2028

2129
- yellowstone-grpc-client-simple-4.2.0

Cargo.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ resolver = "2"
33
members = [
44
"examples/rust", # 4.2.0
55
"yellowstone-grpc-client", # 4.1.0
6-
"yellowstone-grpc-geyser", # 4.2.0
6+
"yellowstone-grpc-geyser", # 4.2.1
77
"yellowstone-grpc-proto", # 4.1.0
88
]
99
exclude = [

yellowstone-grpc-geyser/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "yellowstone-grpc-geyser"
3-
version = "4.2.0"
3+
version = "4.2.1"
44
authors = { workspace = true }
55
edition = { workspace = true }
66
description = "Yellowstone gRPC Geyser Plugin"

yellowstone-grpc-geyser/config.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
{
2-
"libpath": "../target/debug/libyellowstone_grpc_geyser.so",
2+
"libpath": "../target/release/libyellowstone_grpc_geyser.so",
33
"log": {
44
"level": "info"
55
},

yellowstone-grpc-geyser/src/config.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -194,9 +194,9 @@ pub struct ConfigGrpc {
194194
pub replay_stored_slots: u64,
195195
#[serde(default)]
196196
pub server_http2_adaptive_window: Option<bool>,
197-
#[serde(with = "humantime_serde")]
197+
#[serde(default, with = "humantime_serde")]
198198
pub server_http2_keepalive_interval: Option<Duration>,
199-
#[serde(with = "humantime_serde")]
199+
#[serde(default, with = "humantime_serde")]
200200
pub server_http2_keepalive_timeout: Option<Duration>,
201201
#[serde(default)]
202202
pub server_initial_connection_window_size: Option<u32>,

yellowstone-grpc-geyser/src/grpc.rs

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -343,7 +343,7 @@ pub struct GrpcService {
343343
subscribe_id: AtomicUsize,
344344
snapshot_rx: Mutex<Option<crossbeam_channel::Receiver<Box<Message>>>>,
345345
broadcast_tx: broadcast::Sender<BroadcastedMessage>,
346-
replay_stored_slots_tx: mpsc::Sender<ReplayStoredSlotsRequest>,
346+
replay_stored_slots_tx: Option<mpsc::Sender<ReplayStoredSlotsRequest>>,
347347
debug_clients_tx: Option<mpsc::UnboundedSender<DebugClientMessage>>,
348348
filter_names: Arc<Mutex<FilterNames>>,
349349
}
@@ -389,12 +389,12 @@ impl GrpcService {
389389
// Messages to clients combined by commitment
390390
let (broadcast_tx, _) = broadcast::channel(config.channel_capacity);
391391
// attempt to prevent spam of geyser loop with capacity eq 1
392-
let (replay_stored_slots_tx, replay_stored_slots_rx) =
393-
mpsc::channel(if config.replay_stored_slots == 0 {
394-
0
395-
} else {
396-
1
397-
});
392+
let (replay_stored_slots_tx, replay_stored_slots_rx) = if config.replay_stored_slots == 0 {
393+
(None, None)
394+
} else {
395+
let (tx, rx) = mpsc::channel(1);
396+
(Some(tx), Some(rx))
397+
};
398398

399399
// gRPC server builder with optional TLS
400400
let mut server_builder = Server::builder();
@@ -511,7 +511,7 @@ impl GrpcService {
511511
mut messages_rx: mpsc::UnboundedReceiver<Message>,
512512
blocks_meta_tx: Option<mpsc::UnboundedSender<Message>>,
513513
broadcast_tx: broadcast::Sender<BroadcastedMessage>,
514-
mut replay_stored_slots_rx: mpsc::Receiver<ReplayStoredSlotsRequest>,
514+
replay_stored_slots_rx: Option<mpsc::Receiver<ReplayStoredSlotsRequest>>,
515515
replay_stored_slots: u64,
516516
) {
517517
const PROCESSED_MESSAGES_MAX: usize = 31;
@@ -523,6 +523,8 @@ impl GrpcService {
523523
let mut processed_first_slot = None;
524524
let processed_sleep = sleep(PROCESSED_MESSAGES_SLEEP);
525525
tokio::pin!(processed_sleep);
526+
let (_tx, rx) = mpsc::channel(1);
527+
let mut replay_stored_slots_rx = replay_stored_slots_rx.unwrap_or(rx);
526528

527529
loop {
528530
tokio::select! {
@@ -843,7 +845,7 @@ impl GrpcService {
843845
mut client_rx: mpsc::UnboundedReceiver<Option<(Option<u64>, Filter)>>,
844846
mut snapshot_rx: Option<crossbeam_channel::Receiver<Box<Message>>>,
845847
mut messages_rx: broadcast::Receiver<BroadcastedMessage>,
846-
replay_stored_slots_tx: mpsc::Sender<ReplayStoredSlotsRequest>,
848+
replay_stored_slots_tx: Option<mpsc::Sender<ReplayStoredSlotsRequest>>,
847849
debug_client_tx: Option<mpsc::UnboundedSender<DebugClientMessage>>,
848850
drop_client: impl FnOnce(),
849851
) {
@@ -905,13 +907,13 @@ impl GrpcService {
905907
info!("client #{id}: filter updated");
906908

907909
if let Some(from_slot) = from_slot {
908-
if replay_stored_slots_tx.max_capacity() == 0 {
910+
let Some(replay_stored_slots_tx) = &replay_stored_slots_tx else {
909911
info!("client #{id}: from_slot is not supported");
910912
tokio::spawn(async move {
911913
let _ = stream_tx.send(Err(Status::internal("from_slot is not supported"))).await;
912914
});
913915
break 'outer;
914-
}
916+
};
915917

916918
let (tx, rx) = oneshot::channel();
917919
let commitment = filter.get_commitment_level();

0 commit comments

Comments
 (0)