@@ -8,7 +8,7 @@ use std::{
8
8
} ,
9
9
time:: Duration ,
10
10
} ;
11
-
11
+ use std :: error :: Error ;
12
12
use base64:: Engine ;
13
13
use bitcoin:: { Block , BlockHash , Txid } ;
14
14
use hex:: FromHexError ;
@@ -190,6 +190,11 @@ impl BitcoinRpc {
190
190
self . make_request ( "queueblocks" , params)
191
191
}
192
192
193
+ pub fn queue_filters ( & self ) -> BitcoinRpcRequest {
194
+ let params = serde_json:: json!( [ ] ) ;
195
+ self . make_request ( "queuefilters" , params)
196
+ }
197
+
193
198
pub fn get_mempool_entry ( & self , txid : & Txid ) -> BitcoinRpcRequest {
194
199
let params = serde_json:: json!( [ txid] ) ;
195
200
@@ -276,7 +281,7 @@ impl BitcoinRpc {
276
281
{
277
282
Ok ( res) => return Self :: clean_rpc_response ( res) . await ,
278
283
Err ( e) if e. is_temporary ( ) && attempt < max_retries - 1 => {
279
- error ! ( "Rpc: {} - retrying in {:?}..." , e, delay) ;
284
+ log_rpc_error ( & request . body , & e, delay) ;
280
285
last_error = Some ( e. into ( ) ) ;
281
286
tokio:: time:: sleep ( delay) . await ;
282
287
delay *= 2 ;
@@ -302,15 +307,14 @@ impl BitcoinRpc {
302
307
if let Some ( auth) = self . auth_token . as_ref ( ) {
303
308
builder = builder. header ( "Authorization" , format ! ( "Basic {}" , auth) ) ;
304
309
}
305
-
306
310
match builder
307
311
. json ( & request. body )
308
312
. send ( )
309
313
. map_err ( BitcoinRpcError :: from)
310
314
{
311
315
Ok ( res) => return Self :: clean_rpc_response_blocking ( res) ,
312
316
Err ( e) if e. is_temporary ( ) && attempt < max_retries - 1 => {
313
- error ! ( "Rpc: {} - retrying in {:?}..." , e, delay) ;
317
+ log_rpc_error ( & request . body , & e, delay) ;
314
318
last_error = Some ( e. into ( ) ) ;
315
319
std:: thread:: sleep ( delay) ;
316
320
delay *= 2 ;
@@ -811,7 +815,28 @@ impl std::error::Error for BitcoinRpcError {
811
815
812
816
impl ErrorForRpc for reqwest:: Response {
813
817
async fn error_for_rpc < T : DeserializeOwned > ( self ) -> Result < T , BitcoinRpcError > {
814
- let rpc_res: JsonRpcResponse < T > = self . json ( ) . await ?;
818
+ let text = self
819
+ . text ( )
820
+ . await
821
+ . map_err ( |e| BitcoinRpcError :: Other ( format ! ( "Could not read response body: {}" , e) ) ) ?;
822
+
823
+ // Try to deserialize as JsonRpcResponse<T>
824
+ let rpc_res: JsonRpcResponse < T > = match serde_json:: from_str ( & text) {
825
+ Ok ( rpc_res) => rpc_res,
826
+ Err ( e) => {
827
+ // Try to decode without result
828
+ let error_res: Option < JsonRpcResponse < Option < String > > > = serde_json:: from_str ( & text) . ok ( ) ;
829
+ if let Some ( error_res) = error_res {
830
+ if let Some ( error) = error_res. error {
831
+ return Err ( BitcoinRpcError :: Rpc ( error) ) ;
832
+ }
833
+ }
834
+ return Err ( BitcoinRpcError :: Other (
835
+ format ! ( "Expected a JSON response, got '{}': {}" , text, e) ,
836
+ ) ) ;
837
+ }
838
+ } ;
839
+
815
840
if let Some ( e) = rpc_res. error {
816
841
return Err ( BitcoinRpcError :: Rpc ( e) ) ;
817
842
}
@@ -822,7 +847,28 @@ impl ErrorForRpc for reqwest::Response {
822
847
823
848
impl ErrorForRpcBlocking for reqwest:: blocking:: Response {
824
849
fn error_for_rpc < T : DeserializeOwned > ( self ) -> Result < T , BitcoinRpcError > {
825
- let rpc_res: JsonRpcResponse < T > = self . json ( ) ?;
850
+ let text = self . text ( ) . map_err ( |e| BitcoinRpcError :: Other (
851
+ format ! ( "Could not read response body: {}" , e) ,
852
+ ) ) ?;
853
+
854
+ // Attempt to deserialize the text as JSON
855
+ let rpc_res: JsonRpcResponse < T > = match serde_json:: from_str ( & text)
856
+ {
857
+ Ok ( rpc_res) => rpc_res,
858
+ Err ( e) => {
859
+ // try to decode without result
860
+ let error_res : Option < JsonRpcResponse < Option < String > > > = serde_json:: from_str ( & text) . ok ( ) ;
861
+ if let Some ( error_res) = error_res {
862
+ if let Some ( error) = error_res. error {
863
+ return Err ( BitcoinRpcError :: Rpc ( error) ) ;
864
+ }
865
+ }
866
+ return Err ( BitcoinRpcError :: Other (
867
+ format ! ( "Expected a JSON response, got '{}': {}" , text, e) ,
868
+ ) )
869
+ }
870
+ } ;
871
+
826
872
if let Some ( e) = rpc_res. error {
827
873
return Err ( BitcoinRpcError :: Rpc ( e) ) ;
828
874
}
@@ -905,7 +951,6 @@ impl BlockSource for BitcoinBlockSource {
905
951
. send_json_blocking ( & self . client , & self . rpc . get_block_count ( ) ) ?)
906
952
}
907
953
908
-
909
954
fn get_best_chain ( & self , tip : Option < u32 > , expected_chain : Network ) -> Result < Option < ChainAnchor > , BitcoinRpcError > {
910
955
#[ derive( Deserialize ) ]
911
956
struct Info {
@@ -981,4 +1026,25 @@ impl BlockSource for BitcoinBlockSource {
981
1026
. send_json_blocking :: < ( ) > ( & self . client , & self . rpc . queue_blocks ( heights) ) ?;
982
1027
Ok ( ( ) )
983
1028
}
1029
+
1030
+ fn queue_filters ( & self ) -> anyhow:: Result < ( ) , BitcoinRpcError > {
1031
+ self
1032
+ . rpc
1033
+ . send_json_blocking :: < ( ) > ( & self . client , & self . rpc . queue_filters ( ) ) ?;
1034
+ Ok ( ( ) )
1035
+ }
1036
+ }
1037
+
1038
+ fn log_rpc_error ( request : & Value , e : & BitcoinRpcError , delay : Duration ) {
1039
+ let rpc_method = serde_json:: to_string ( & request. get ( "method" ) )
1040
+ . unwrap_or ( "" . to_string ( ) ) ;
1041
+ let rpc_params = serde_json:: to_string ( & request. get ( "params" ) )
1042
+ . unwrap_or ( "" . to_string ( ) ) ;
1043
+ let src = match e {
1044
+ BitcoinRpcError :: Transport ( e) =>
1045
+ e. source ( ) . map ( |s| format ! ( "({:?})" , s) ) ,
1046
+ _ => None
1047
+ } . unwrap_or ( "" . to_string ( ) ) ;
1048
+
1049
+ error ! ( "Rpc {}{}: {}{} - retrying in {:?}..." , rpc_method, rpc_params, e, src, delay) ;
984
1050
}
0 commit comments