Skip to content

Commit f4afe88

Browse files
committed
Collect RPC events into batches
Related to #464
1 parent 0898208 commit f4afe88

File tree

3 files changed

+88
-57
lines changed

3 files changed

+88
-57
lines changed

src/electrum.rs

+12-5
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ use crate::{
1717
config::{Config, ELECTRS_VERSION},
1818
daemon::{self, extract_bitcoind_error, Daemon},
1919
merkle::Proof,
20-
metrics::{self, Histogram},
20+
metrics::{self, Histogram, Metrics},
2121
signals::Signal,
2222
status::ScriptHashStatus,
2323
tracker::Tracker,
@@ -122,15 +122,15 @@ pub struct Rpc {
122122

123123
impl Rpc {
124124
/// Perform initial index sync (may take a while on first run).
125-
pub fn new(config: &Config) -> Result<Self> {
126-
let tracker = Tracker::new(config)?;
127-
let rpc_duration = tracker.metrics().histogram_vec(
125+
pub fn new(config: &Config, metrics: Metrics) -> Result<Self> {
126+
let rpc_duration = metrics.histogram_vec(
128127
"rpc_duration",
129128
"RPC duration (in seconds)",
130129
"method",
131130
metrics::default_duration_buckets(),
132131
);
133132

133+
let tracker = Tracker::new(config, metrics)?;
134134
let signal = Signal::new();
135135
let daemon = Daemon::connect(config, signal.exit_flag(), tracker.metrics())?;
136136
let cache = Cache::new(tracker.metrics());
@@ -395,7 +395,14 @@ impl Rpc {
395395
}))
396396
}
397397

398-
pub fn handle_request(&self, client: &mut Client, line: &str) -> String {
398+
pub fn handle_requests(&self, client: &mut Client, lines: &[String]) -> Vec<String> {
399+
lines
400+
.iter()
401+
.map(|line| self.handle_request(client, &line))
402+
.collect()
403+
}
404+
405+
fn handle_request(&self, client: &mut Client, line: &str) -> String {
399406
let error_msg_no_id = |err| error_msg(Value::Null, RpcError::Standard(err));
400407
let response: Value = match serde_json::from_str(line) {
401408
// parse JSON from str

src/server.rs

+75-50
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,14 @@ use rayon::prelude::*;
55
use std::{
66
collections::hash_map::HashMap,
77
io::{BufRead, BufReader, Write},
8+
iter::once,
89
net::{Shutdown, TcpListener, TcpStream},
910
};
1011

1112
use crate::{
1213
config::Config,
1314
electrum::{Client, Rpc},
15+
metrics::{self, Metrics},
1416
signals::ExitError,
1517
thread::spawn,
1618
};
@@ -60,7 +62,16 @@ pub fn run() -> Result<()> {
6062

6163
fn serve() -> Result<()> {
6264
let config = Config::from_args();
63-
let mut rpc = Rpc::new(&config)?;
65+
let metrics = Metrics::new(config.monitoring_addr)?;
66+
67+
let server_batch_size = metrics.histogram_vec(
68+
"server_batch_size",
69+
"# of server events handled in a single batch",
70+
"type",
71+
metrics::default_size_buckets(),
72+
);
73+
74+
let mut rpc = Rpc::new(&config, metrics)?;
6475

6576
let (server_tx, server_rx) = unbounded();
6677
if !config.disable_electrum_rpc {
@@ -77,32 +88,29 @@ fn serve() -> Result<()> {
7788
return Ok(());
7889
}
7990
peers = notify_peers(&rpc, peers); // peers are disconnected on error.
80-
loop {
81-
select! {
82-
// Handle signals for graceful shutdown
83-
recv(rpc.signal().receiver()) -> result => {
84-
result.context("signal channel disconnected")?;
85-
rpc.signal().exit_flag().poll().context("RPC server interrupted")?;
86-
},
87-
// Handle new blocks' notifications
88-
recv(new_block_rx) -> result => match result {
89-
Ok(_) => break, // sync and update
90-
Err(_) => {
91-
info!("disconnected from bitcoind");
92-
return Ok(());
93-
}
94-
},
95-
// Handle Electrum RPC requests
96-
recv(server_rx) -> event => {
97-
let event = event.context("server disconnected")?;
98-
handle_event(&rpc, &mut peers, event);
99-
},
100-
default(config.wait_duration) => break, // sync and update
101-
};
102-
// continue RPC processing (if more requests are pending)
103-
if server_rx.is_empty() {
104-
break;
105-
}
91+
select! {
92+
// Handle signals for graceful shutdown
93+
recv(rpc.signal().receiver()) -> result => {
94+
result.context("signal channel disconnected")?;
95+
rpc.signal().exit_flag().poll().context("RPC server interrupted")?;
96+
},
97+
// Handle new blocks' notifications
98+
recv(new_block_rx) -> result => match result {
99+
Ok(_) => (), // sync and update
100+
Err(_) => {
101+
info!("disconnected from bitcoind");
102+
return Ok(());
103+
}
104+
},
105+
// Handle Electrum RPC requests
106+
recv(server_rx) -> event => {
107+
let first = once(event.context("server disconnected")?);
108+
let rest = server_rx.iter().take(server_rx.len());
109+
let events: Vec<Event> = first.chain(rest).collect();
110+
server_batch_size.observe("recv", events.len());
111+
handle_events(&rpc, &mut peers, events);
112+
},
113+
default(config.wait_duration) => (), // sync and update
106114
}
107115
}
108116
}
@@ -140,35 +148,52 @@ enum Message {
140148
Done,
141149
}
142150

143-
fn handle_event(rpc: &Rpc, peers: &mut HashMap<usize, Peer>, event: Event) {
144-
let Event { msg, peer_id } = event;
145-
match msg {
146-
Message::New(stream) => {
147-
debug!("{}: connected", peer_id);
148-
peers.insert(peer_id, Peer::new(peer_id, stream));
149-
}
150-
Message::Request(line) => {
151-
let result = match peers.get_mut(&peer_id) {
152-
Some(peer) => handle_request(rpc, peer, &line),
153-
None => return, // unknown peer
154-
};
155-
if let Err(e) = result {
156-
error!("{}: disconnecting due to {}", peer_id, e);
157-
peers.remove(&peer_id).unwrap().disconnect();
151+
fn handle_events(rpc: &Rpc, peers: &mut HashMap<usize, Peer>, events: Vec<Event>) {
152+
let mut events_by_peer = HashMap::<usize, Vec<Message>>::new();
153+
events
154+
.into_iter()
155+
.for_each(|e| events_by_peer.entry(e.peer_id).or_default().push(e.msg));
156+
for (peer_id, messages) in events_by_peer {
157+
handle_peer_events(rpc, peers, peer_id, messages);
158+
}
159+
}
160+
161+
fn handle_peer_events(
162+
rpc: &Rpc,
163+
peers: &mut HashMap<usize, Peer>,
164+
peer_id: usize,
165+
messages: Vec<Message>,
166+
) {
167+
let mut lines = vec![];
168+
let mut done = false;
169+
for msg in messages {
170+
match msg {
171+
Message::New(stream) => {
172+
debug!("{}: connected", peer_id);
173+
peers.insert(peer_id, Peer::new(peer_id, stream));
174+
}
175+
Message::Request(line) => lines.push(line),
176+
Message::Done => {
177+
done = true;
178+
break;
158179
}
159180
}
160-
Message::Done => {
161-
// already disconnected, just remove from peers' map
162-
peers.remove(&peer_id);
181+
}
182+
let result = match peers.get_mut(&peer_id) {
183+
Some(peer) => {
184+
let responses = rpc.handle_requests(&mut peer.client, &lines);
185+
peer.send(responses)
163186
}
187+
None => return, // unknown peer
188+
};
189+
if let Err(e) = result {
190+
error!("{}: disconnecting due to {}", peer_id, e);
191+
peers.remove(&peer_id).unwrap().disconnect();
192+
} else if done {
193+
peers.remove(&peer_id); // already disconnected, just remove from peers' map
164194
}
165195
}
166196

167-
fn handle_request(rpc: &Rpc, peer: &mut Peer, line: &str) -> Result<()> {
168-
let response = rpc.handle_request(&mut peer.client, line);
169-
peer.send(vec![response])
170-
}
171-
172197
fn accept_loop(listener: TcpListener, server_tx: Sender<Event>) -> Result<()> {
173198
for (peer_id, conn) in listener.incoming().enumerate() {
174199
let stream = conn.context("failed to accept")?;

src/tracker.rs

+1-2
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,7 @@ pub struct Tracker {
2323
}
2424

2525
impl Tracker {
26-
pub fn new(config: &Config) -> Result<Self> {
27-
let metrics = Metrics::new(config.monitoring_addr)?;
26+
pub fn new(config: &Config, metrics: Metrics) -> Result<Self> {
2827
let store = DBStore::open(&config.db_path, config.auto_reindex)?;
2928
let chain = Chain::new(config.network);
3029
Ok(Self {

0 commit comments

Comments
 (0)