Skip to content

Commit 2ef6731

Browse files
feat(rpc-provider): add MeteredBatchRequests(Future) (#19779) (#282)
Co-authored-by: Léa Narzis <78718413+lean-apple@users.noreply.github.com>
1 parent 47813d7 commit 2ef6731

2 files changed

Lines changed: 58 additions & 3 deletions

File tree

crates/rpc/rpc-builder/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ pub use eth::EthHandlers;
9999
// Rpc server metrics
100100
mod metrics;
101101
use crate::middleware::RethRpcMiddleware;
102-
pub use metrics::{MeteredRequestFuture, RpcRequestMetricsService};
102+
pub use metrics::{MeteredBatchRequestsFuture, MeteredRequestFuture, RpcRequestMetricsService};
103103
use reth_chain_state::CanonStateSubscriptions;
104104
use reth_rpc::eth::sim_bundle::EthSimBundle;
105105

crates/rpc/rpc-builder/src/metrics.rs

Lines changed: 57 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ impl RpcRequestMetrics {
6262
Self::new(module, RpcTransport::WebSocket)
6363
}
6464

65-
/// Creates a new instance of the metrics layer for Ws.
65+
/// Creates a new instance of the metrics layer for Ipc.
6666
pub(crate) fn ipc(module: &RpcModule<()>) -> Self {
6767
Self::new(module, RpcTransport::Ipc)
6868
}
@@ -127,7 +127,20 @@ where
127127
}
128128

129129
fn batch<'a>(&self, req: Batch<'a>) -> impl Future<Output = Self::BatchResponse> + Send + 'a {
130-
self.inner.batch(req)
130+
self.metrics.inner.connection_metrics.batches_started_total.increment(1);
131+
132+
for batch_entry in req.iter().flatten() {
133+
let method_name = batch_entry.method_name();
134+
if let Some(call_metrics) = self.metrics.inner.call_metrics.get(method_name) {
135+
call_metrics.started_total.increment(1);
136+
}
137+
}
138+
139+
MeteredBatchRequestsFuture {
140+
fut: self.inner.batch(req),
141+
started_at: Instant::now(),
142+
metrics: self.metrics.clone(),
143+
}
131144
}
132145

133146
fn notification<'a>(
@@ -194,6 +207,42 @@ impl<F: Future<Output = MethodResponse>> Future for MeteredRequestFuture<F> {
194207
}
195208
}
196209

210+
/// Response future to update the metrics for a batch of request/response pairs.
211+
#[pin_project::pin_project]
212+
pub struct MeteredBatchRequestsFuture<F> {
213+
#[pin]
214+
fut: F,
215+
/// time when the batch request started
216+
started_at: Instant,
217+
/// metrics for the batch
218+
metrics: RpcRequestMetrics,
219+
}
220+
221+
impl<F> std::fmt::Debug for MeteredBatchRequestsFuture<F> {
222+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
223+
f.write_str("MeteredBatchRequestsFuture")
224+
}
225+
}
226+
227+
impl<F> Future for MeteredBatchRequestsFuture<F>
228+
where
229+
F: Future,
230+
{
231+
type Output = F::Output;
232+
233+
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
234+
let this = self.project();
235+
let res = this.fut.poll(cx);
236+
237+
if res.is_ready() {
238+
let elapsed = this.started_at.elapsed().as_secs_f64();
239+
this.metrics.inner.connection_metrics.batches_finished_total.increment(1);
240+
this.metrics.inner.connection_metrics.batch_response_time_seconds.record(elapsed);
241+
}
242+
res
243+
}
244+
}
245+
197246
/// The transport protocol used for the RPC connection.
198247
#[derive(Debug, Clone, Copy, Eq, PartialEq)]
199248
pub(crate) enum RpcTransport {
@@ -232,6 +281,12 @@ struct RpcServerConnectionMetrics {
232281
requests_finished_total: Counter,
233282
/// Response for a single request/response pair
234283
request_time_seconds: Histogram,
284+
/// The number of batch requests started
285+
batches_started_total: Counter,
286+
/// The number of batch requests finished
287+
batches_finished_total: Counter,
288+
/// Response time for a batch request
289+
batch_response_time_seconds: Histogram,
235290
}
236291

237292
/// Metrics for the RPC calls

0 commit comments

Comments
 (0)