Skip to content

Commit 1839c50

Browse files
committed
Performance: Avoid converting responses to serde_json::Value
Previously, all responses were converted to serde_json::Value before being serialized to string. Since jsonrpc does not inspect the result object in any way, this step could be skipped. Now, result objects are serialized to string much earlier and the Value step is skipped. This patch has large performance benefits for huge responses: In tests with 4.5 GB responses, the jsonrpc serialization overhead after returning from an rpc function was reduced by around 35%. Which means several seconds of speed-up in response times. To accomplish this while mostly retaining API compatibility, the traits RpcMethod, RpcMethodSync, RpcMethodSimple are now generic in their return type and are wrapped when added to an io handler. There's now a distinction between the parsed representation of jsonrpc responses (Output/Response) and result of rpc functions calls (WrapOutput/WrapResponse) to allow the latter to carry the pre-serialized strings. Review notes: - I'm not happy with the WrapOutput / WrapResponse names, glad for suggestions. - Similarly, rpc_wrap must be renamed and moved. - The http handler health request is now awkward, and must extract the result/error from the already-serialized response. Probably worth it. - The largest API breakage I could see is in the middleware, where the futures now return WrapOutput/WrapResult instead of Output/Result. - One could make WrapOutput just be String, but having a separate type is likely easier to read. See #212
1 parent dc6b8ed commit 1839c50

File tree

14 files changed

+261
-154
lines changed

14 files changed

+261
-154
lines changed

core-client/transports/src/transports/http.rs

+5-1
Original file line numberDiff line numberDiff line change
@@ -139,12 +139,16 @@ mod tests {
139139
}
140140

141141
fn io() -> IoHandler {
142+
use jsonrpc_core::Result;
143+
142144
let mut io = IoHandler::default();
143145
io.add_sync_method("hello", |params: Params| match params.parse::<(String,)>() {
144146
Ok((msg,)) => Ok(Value::String(format!("hello {}", msg))),
145147
_ => Ok(Value::String("world".into())),
146148
});
147-
io.add_sync_method("fail", |_: Params| Err(Error::new(ErrorCode::ServerError(-34))));
149+
io.add_sync_method("fail", |_: Params| -> Result<i64> {
150+
Err(Error::new(ErrorCode::ServerError(-34)))
151+
});
148152
io.add_notification("notify", |params: Params| {
149153
let (value,) = params.parse::<(u64,)>().expect("expected one u64 as param");
150154
assert_eq!(value, 12);

core/examples/middlewares.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ impl Middleware<Meta> for MyMiddleware {
1717
fn on_request<F, X>(&self, request: Request, meta: Meta, next: F) -> Either<Self::Future, X>
1818
where
1919
F: FnOnce(Request, Meta) -> X + Send,
20-
X: Future<Output = Option<Response>> + Send + 'static,
20+
X: Future<Output = Option<WrapResponse>> + Send + 'static,
2121
{
2222
let start = Instant::now();
2323
let request_number = self.0.fetch_add(1, atomic::Ordering::SeqCst);

core/src/calls.rs

+40-16
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
1-
use crate::types::{Error, Params, Value};
1+
use crate::types::{Error, Id, Params, Value, Version, WrapOutput};
22
use crate::BoxFuture;
3+
use futures_util::{self, future, FutureExt};
4+
use serde::Serialize;
35
use std::fmt;
46
use std::future::Future;
57
use std::sync::Arc;
@@ -30,23 +32,23 @@ impl<T, E> WrapFuture<T, E> for BoxFuture<Result<T, E>> {
3032
}
3133

3234
/// A synchronous or asynchronous method.
33-
pub trait RpcMethodSync: Send + Sync + 'static {
35+
pub trait RpcMethodSync<R = Value>: Send + Sync + 'static {
3436
/// Call method
35-
fn call(&self, params: Params) -> BoxFuture<crate::Result<Value>>;
37+
fn call(&self, params: Params) -> BoxFuture<crate::Result<R>>;
3638
}
3739

3840
/// Asynchronous Method
39-
pub trait RpcMethodSimple: Send + Sync + 'static {
41+
pub trait RpcMethodSimple<R = Value>: Send + Sync + 'static {
4042
/// Output future
41-
type Out: Future<Output = Result<Value, Error>> + Send;
43+
type Out: Future<Output = Result<R, Error>> + Send;
4244
/// Call method
4345
fn call(&self, params: Params) -> Self::Out;
4446
}
4547

4648
/// Asynchronous Method with Metadata
47-
pub trait RpcMethod<T: Metadata>: Send + Sync + 'static {
49+
pub trait RpcMethod<T: Metadata, R = Value>: Send + Sync + 'static {
4850
/// Call method
49-
fn call(&self, params: Params, meta: T) -> BoxFuture<crate::Result<Value>>;
51+
fn call(&self, params: Params, meta: T) -> BoxFuture<crate::Result<R>>;
5052
}
5153

5254
/// Notification
@@ -61,11 +63,22 @@ pub trait RpcNotification<T: Metadata>: Send + Sync + 'static {
6163
fn execute(&self, params: Params, meta: T);
6264
}
6365

66+
pub trait RpcMethodWrapped<T: Metadata>: Send + Sync + 'static {
67+
fn call(&self, params: Params, meta: T, jsonrpc: Option<Version>, id: Id) -> BoxFuture<Option<WrapOutput>>;
68+
}
69+
70+
pub fn rpc_wrap<T: Metadata, R: Serialize + Send + 'static, F: RpcMethod<T, R>>(f: F) -> Arc<dyn RpcMethodWrapped<T>> {
71+
Arc::new(move |params: Params, meta: T, jsonrpc: Option<Version>, id: Id| {
72+
let result = f.call(params, meta);
73+
result.then(move |r| future::ready(Some(WrapOutput::from(r, id, jsonrpc))))
74+
})
75+
}
76+
6477
/// Possible Remote Procedures with Metadata
6578
#[derive(Clone)]
6679
pub enum RemoteProcedure<T: Metadata> {
6780
/// A method call
68-
Method(Arc<dyn RpcMethod<T>>),
81+
Method(Arc<dyn RpcMethodWrapped<T>>),
6982
/// A notification
7083
Notification(Arc<dyn RpcNotification<T>>),
7184
/// An alias to other method,
@@ -83,23 +96,23 @@ impl<T: Metadata> fmt::Debug for RemoteProcedure<T> {
8396
}
8497
}
8598

86-
impl<F: Send + Sync + 'static, X: Send + 'static> RpcMethodSimple for F
99+
impl<F: Send + Sync + 'static, X: Send + 'static, R> RpcMethodSimple<R> for F
87100
where
88101
F: Fn(Params) -> X,
89-
X: Future<Output = Result<Value, Error>>,
102+
X: Future<Output = Result<R, Error>>,
90103
{
91104
type Out = X;
92105
fn call(&self, params: Params) -> Self::Out {
93106
self(params)
94107
}
95108
}
96109

97-
impl<F: Send + Sync + 'static, X: Send + 'static> RpcMethodSync for F
110+
impl<F: Send + Sync + 'static, X: Send + 'static, R> RpcMethodSync<R> for F
98111
where
99112
F: Fn(Params) -> X,
100-
X: WrapFuture<Value, Error>,
113+
X: WrapFuture<R, Error>,
101114
{
102-
fn call(&self, params: Params) -> BoxFuture<crate::Result<Value>> {
115+
fn call(&self, params: Params) -> BoxFuture<crate::Result<R>> {
103116
self(params).into_future()
104117
}
105118
}
@@ -113,13 +126,13 @@ where
113126
}
114127
}
115128

116-
impl<F: Send + Sync + 'static, X: Send + 'static, T> RpcMethod<T> for F
129+
impl<F: Send + Sync + 'static, X: Send + 'static, T, R> RpcMethod<T, R> for F
117130
where
118131
T: Metadata,
119132
F: Fn(Params, T) -> X,
120-
X: Future<Output = Result<Value, Error>>,
133+
X: Future<Output = Result<R, Error>>,
121134
{
122-
fn call(&self, params: Params, meta: T) -> BoxFuture<crate::Result<Value>> {
135+
fn call(&self, params: Params, meta: T) -> BoxFuture<crate::Result<R>> {
123136
Box::pin(self(params, meta))
124137
}
125138
}
@@ -133,3 +146,14 @@ where
133146
self(params, meta)
134147
}
135148
}
149+
150+
impl<F: Send + Sync + 'static, X: Send + 'static, T> RpcMethodWrapped<T> for F
151+
where
152+
T: Metadata,
153+
F: Fn(Params, T, Option<Version>, Id) -> X,
154+
X: Future<Output = Option<WrapOutput>>,
155+
{
156+
fn call(&self, params: Params, meta: T, jsonrpc: Option<Version>, id: Id) -> BoxFuture<Option<WrapOutput>> {
157+
Box::pin(self(params, meta, jsonrpc, id))
158+
}
159+
}

core/src/delegates.rs

+17-14
Original file line numberDiff line numberDiff line change
@@ -1,27 +1,28 @@
11
//! Delegate rpc calls
22
3+
use serde::Serialize;
34
use std::collections::HashMap;
45
use std::future::Future;
56
use std::sync::Arc;
67

7-
use crate::calls::{Metadata, RemoteProcedure, RpcMethod, RpcNotification};
8-
use crate::types::{Error, Params, Value};
8+
use crate::calls::{rpc_wrap, Metadata, RemoteProcedure, RpcMethod, RpcNotification};
9+
use crate::types::{Error, Params};
910
use crate::BoxFuture;
1011

1112
struct DelegateAsyncMethod<T, F> {
1213
delegate: Arc<T>,
1314
closure: F,
1415
}
1516

16-
impl<T, M, F, I> RpcMethod<M> for DelegateAsyncMethod<T, F>
17+
impl<T, M, F, I, R> RpcMethod<M, R> for DelegateAsyncMethod<T, F>
1718
where
1819
M: Metadata,
1920
F: Fn(&T, Params) -> I,
20-
I: Future<Output = Result<Value, Error>> + Send + 'static,
21+
I: Future<Output = Result<R, Error>> + Send + 'static,
2122
T: Send + Sync + 'static,
2223
F: Send + Sync + 'static,
2324
{
24-
fn call(&self, params: Params, _meta: M) -> BoxFuture<crate::Result<Value>> {
25+
fn call(&self, params: Params, _meta: M) -> BoxFuture<crate::Result<R>> {
2526
let closure = &self.closure;
2627
Box::pin(closure(&self.delegate, params))
2728
}
@@ -32,15 +33,15 @@ struct DelegateMethodWithMeta<T, F> {
3233
closure: F,
3334
}
3435

35-
impl<T, M, F, I> RpcMethod<M> for DelegateMethodWithMeta<T, F>
36+
impl<T, M, F, I, R> RpcMethod<M, R> for DelegateMethodWithMeta<T, F>
3637
where
3738
M: Metadata,
3839
F: Fn(&T, Params, M) -> I,
39-
I: Future<Output = Result<Value, Error>> + Send + 'static,
40+
I: Future<Output = Result<R, Error>> + Send + 'static,
4041
T: Send + Sync + 'static,
4142
F: Send + Sync + 'static,
4243
{
43-
fn call(&self, params: Params, meta: M) -> BoxFuture<crate::Result<Value>> {
44+
fn call(&self, params: Params, meta: M) -> BoxFuture<crate::Result<R>> {
4445
let closure = &self.closure;
4546
Box::pin(closure(&self.delegate, params, meta))
4647
}
@@ -112,31 +113,33 @@ where
112113
}
113114

114115
/// Adds async method to the delegate.
115-
pub fn add_method<F, I>(&mut self, name: &str, method: F)
116+
pub fn add_method<F, I, R>(&mut self, name: &str, method: F)
116117
where
117118
F: Fn(&T, Params) -> I,
118-
I: Future<Output = Result<Value, Error>> + Send + 'static,
119+
I: Future<Output = Result<R, Error>> + Send + 'static,
119120
F: Send + Sync + 'static,
121+
R: Serialize + Send + 'static,
120122
{
121123
self.methods.insert(
122124
name.into(),
123-
RemoteProcedure::Method(Arc::new(DelegateAsyncMethod {
125+
RemoteProcedure::Method(rpc_wrap(DelegateAsyncMethod {
124126
delegate: self.delegate.clone(),
125127
closure: method,
126128
})),
127129
);
128130
}
129131

130132
/// Adds async method with metadata to the delegate.
131-
pub fn add_method_with_meta<F, I>(&mut self, name: &str, method: F)
133+
pub fn add_method_with_meta<F, I, R>(&mut self, name: &str, method: F)
132134
where
133135
F: Fn(&T, Params, M) -> I,
134-
I: Future<Output = Result<Value, Error>> + Send + 'static,
136+
I: Future<Output = Result<R, Error>> + Send + 'static,
135137
F: Send + Sync + 'static,
138+
R: Serialize + Send + 'static,
136139
{
137140
self.methods.insert(
138141
name.into(),
139-
RemoteProcedure::Method(Arc::new(DelegateMethodWithMeta {
142+
RemoteProcedure::Method(rpc_wrap(DelegateMethodWithMeta {
140143
delegate: self.delegate.clone(),
141144
closure: method,
142145
})),

0 commit comments

Comments
 (0)