diff --git a/src/protocol/libp2p/kademlia/executor.rs b/src/protocol/libp2p/kademlia/executor.rs index eb7e01ba..39080f44 100644 --- a/src/protocol/libp2p/kademlia/executor.rs +++ b/src/protocol/libp2p/kademlia/executor.rs @@ -35,6 +35,8 @@ use std::{ /// Read timeout for inbound messages. const READ_TIMEOUT: Duration = Duration::from_secs(15); +/// Write timeout for outbound messages. +const WRITE_TIMEOUT: Duration = Duration::from_secs(15); /// Query result. #[derive(Debug)] @@ -91,16 +93,24 @@ impl QueryExecutor { /// Send message to remote peer. pub fn send_message(&mut self, peer: PeerId, message: Bytes, mut substream: Substream) { self.futures.push(Box::pin(async move { - match substream.send_framed(message).await { - Ok(_) => QueryContext { + match tokio::time::timeout(WRITE_TIMEOUT, substream.send_framed(message)).await { + // Timeout error. + Err(_) => + return QueryContext { + peer, + query_id: None, + result: QueryResult::Timeout, + }, + // Writing message to substream failed. + Ok(Err(_)) => QueryContext { peer, query_id: None, - result: QueryResult::SendSuccess { substream }, + result: QueryResult::SubstreamClosed, }, - Err(_) => QueryContext { + Ok(Ok(())) => QueryContext { peer, query_id: None, - result: QueryResult::SubstreamClosed, + result: QueryResult::SendSuccess { substream }, }, } })); @@ -143,14 +153,25 @@ impl QueryExecutor { mut substream: Substream, ) { self.futures.push(Box::pin(async move { - if let Err(_) = substream.send_framed(message).await { - let _ = substream.close().await; - return QueryContext { - peer, - query_id, - result: QueryResult::SubstreamClosed, - }; - } + match tokio::time::timeout(WRITE_TIMEOUT, substream.send_framed(message)).await { + // Timeout error. + Err(_) => + return QueryContext { + peer, + query_id, + result: QueryResult::Timeout, + }, + // Writing message to substream failed. + Ok(Err(_)) => { + let _ = substream.close().await; + return QueryContext { + peer, + query_id, + result: QueryResult::SubstreamClosed, + }; + } + Ok(Ok(())) => (), + }; match tokio::time::timeout(READ_TIMEOUT, substream.next()).await { Err(_) => QueryContext {