Skip to content

Commit 9ad20ff

Browse files
authored
kad/executor: Add timeout for writting frames (#277)
This PR adds a `WRITE_TIMEOUT` for sending frames to remote peers. The `WRITE_TIMEOUT` is set to 15 seconds to mirror the `READ_TIMEOUT`. Closes: #231 Signed-off-by: Alexandru Vasile <[email protected]>
1 parent 0b4bf1d commit 9ad20ff

File tree

1 file changed

+34
-13
lines changed

1 file changed

+34
-13
lines changed

src/protocol/libp2p/kademlia/executor.rs

Lines changed: 34 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@ use std::{
3535

3636
/// Read timeout for inbound messages.
3737
const READ_TIMEOUT: Duration = Duration::from_secs(15);
38+
/// Write timeout for outbound messages.
39+
const WRITE_TIMEOUT: Duration = Duration::from_secs(15);
3840

3941
/// Query result.
4042
#[derive(Debug)]
@@ -91,16 +93,24 @@ impl QueryExecutor {
9193
/// Send message to remote peer.
9294
pub fn send_message(&mut self, peer: PeerId, message: Bytes, mut substream: Substream) {
9395
self.futures.push(Box::pin(async move {
94-
match substream.send_framed(message).await {
95-
Ok(_) => QueryContext {
96+
match tokio::time::timeout(WRITE_TIMEOUT, substream.send_framed(message)).await {
97+
// Timeout error.
98+
Err(_) =>
99+
return QueryContext {
100+
peer,
101+
query_id: None,
102+
result: QueryResult::Timeout,
103+
},
104+
// Writing message to substream failed.
105+
Ok(Err(_)) => QueryContext {
96106
peer,
97107
query_id: None,
98-
result: QueryResult::SendSuccess { substream },
108+
result: QueryResult::SubstreamClosed,
99109
},
100-
Err(_) => QueryContext {
110+
Ok(Ok(())) => QueryContext {
101111
peer,
102112
query_id: None,
103-
result: QueryResult::SubstreamClosed,
113+
result: QueryResult::SendSuccess { substream },
104114
},
105115
}
106116
}));
@@ -143,14 +153,25 @@ impl QueryExecutor {
143153
mut substream: Substream,
144154
) {
145155
self.futures.push(Box::pin(async move {
146-
if let Err(_) = substream.send_framed(message).await {
147-
let _ = substream.close().await;
148-
return QueryContext {
149-
peer,
150-
query_id,
151-
result: QueryResult::SubstreamClosed,
152-
};
153-
}
156+
match tokio::time::timeout(WRITE_TIMEOUT, substream.send_framed(message)).await {
157+
// Timeout error.
158+
Err(_) =>
159+
return QueryContext {
160+
peer,
161+
query_id,
162+
result: QueryResult::Timeout,
163+
},
164+
// Writing message to substream failed.
165+
Ok(Err(_)) => {
166+
let _ = substream.close().await;
167+
return QueryContext {
168+
peer,
169+
query_id,
170+
result: QueryResult::SubstreamClosed,
171+
};
172+
}
173+
Ok(Ok(())) => (),
174+
};
154175

155176
match tokio::time::timeout(READ_TIMEOUT, substream.next()).await {
156177
Err(_) => QueryContext {

0 commit comments

Comments
 (0)