@@ -7,6 +7,7 @@ use bitcoin::{
77 Decodable ,
88 } ,
99 hashes:: Hash ,
10+ io,
1011 p2p:: {
1112 self , address,
1213 message:: { self , CommandString , NetworkMessage } ,
@@ -19,9 +20,8 @@ use bitcoin::{
1920use bitcoin_slices:: { bsl, Parse } ;
2021use crossbeam_channel:: { bounded, select, Receiver , Sender } ;
2122
22- use std:: io:: { self , ErrorKind , Write } ;
23+ use std:: io:: Write ;
2324use std:: net:: { IpAddr , Ipv4Addr , SocketAddr , TcpStream } ;
24- use std:: sync:: Arc ;
2525use std:: time:: { Duration , Instant , SystemTime , UNIX_EPOCH } ;
2626
2727use crate :: types:: SerBlock ;
@@ -141,10 +141,11 @@ impl Connection {
141141 metrics : & Metrics ,
142142 magic : Magic ,
143143 ) -> Result < Self > {
144- let conn = Arc :: new (
145- TcpStream :: connect ( address)
146- . with_context ( || format ! ( "{} p2p failed to connect: {:?}" , network, address) ) ?,
147- ) ;
144+ let recv_conn = TcpStream :: connect ( address)
145+ . with_context ( || format ! ( "{} p2p failed to connect: {:?}" , network, address) ) ?;
146+ let mut send_conn = recv_conn
147+ . try_clone ( )
148+ . context ( "failed to clone connection" ) ?;
148149
149150 let ( tx_send, tx_recv) = bounded :: < NetworkMessage > ( 1 ) ;
150151 let ( rx_send, rx_recv) = bounded :: < RawNetworkMessage > ( 1 ) ;
@@ -180,7 +181,6 @@ impl Connection {
180181 default_duration_buckets ( ) ,
181182 ) ;
182183
183- let stream = Arc :: clone ( & conn) ;
184184 let mut buffer = vec ! [ ] ;
185185 crate :: thread:: spawn ( "p2p_send" , move || loop {
186186 use std:: net:: Shutdown ;
@@ -190,7 +190,7 @@ impl Connection {
190190 // p2p_loop is closed, so tx_send is disconnected
191191 debug ! ( "closing p2p_send thread: no more messages to send" ) ;
192192 // close the stream reader (p2p_recv thread may block on it)
193- if let Err ( e) = stream . shutdown ( Shutdown :: Read ) {
193+ if let Err ( e) = send_conn . shutdown ( Shutdown :: Read ) {
194194 warn ! ( "failed to shutdown p2p connection: {}" , e)
195195 }
196196 return Ok ( ( ) ) ;
@@ -203,16 +203,16 @@ impl Connection {
203203 raw_msg
204204 . consensus_encode ( & mut buffer)
205205 . expect ( "in-memory writers don't error" ) ;
206- ( & * stream )
206+ send_conn
207207 . write_all ( buffer. as_slice ( ) )
208208 . context ( "p2p failed to send" )
209209 } ) ?;
210210 } ) ;
211211
212- let stream = Arc :: clone ( & conn ) ;
212+ let mut stream_reader = std :: io :: BufReader :: new ( recv_conn ) ;
213213 crate :: thread:: spawn ( "p2p_recv" , move || loop {
214214 let start = Instant :: now ( ) ;
215- let raw_msg = RawNetworkMessage :: consensus_decode ( & mut & * stream ) ;
215+ let raw_msg = RawNetworkMessage :: consensus_decode ( & mut stream_reader ) ;
216216 {
217217 let duration = duration_to_seconds ( start. elapsed ( ) ) ;
218218 let label = format ! (
@@ -232,7 +232,7 @@ impl Connection {
232232 }
233233 raw_msg
234234 }
235- Err ( encode:: Error :: Io ( e) ) if e. kind ( ) == ErrorKind :: UnexpectedEof => {
235+ Err ( encode:: Error :: Io ( e) ) if e. kind ( ) == io :: ErrorKind :: UnexpectedEof => {
236236 debug ! ( "closing p2p_recv thread: connection closed" ) ;
237237 return Ok ( ( ) ) ;
238238 }
@@ -390,7 +390,9 @@ enum ParsedNetworkMessage {
390390}
391391
392392impl Decodable for RawNetworkMessage {
393- fn consensus_decode < D : io:: Read + ?Sized > ( d : & mut D ) -> Result < Self , encode:: Error > {
393+ fn consensus_decode < D : bitcoin:: io:: BufRead + ?Sized > (
394+ d : & mut D ,
395+ ) -> Result < Self , encode:: Error > {
394396 let magic = Decodable :: consensus_decode ( d) ?;
395397 let cmd = Decodable :: consensus_decode ( d) ?;
396398
0 commit comments