Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kad/executor: Add timeout for writting frames #277

Merged
merged 1 commit into from
Oct 30, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 34 additions & 13 deletions src/protocol/libp2p/kademlia/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ use std::{

/// Read timeout for inbound messages.
const READ_TIMEOUT: Duration = Duration::from_secs(15);
/// Write timeout for outbound messages.
const WRITE_TIMEOUT: Duration = Duration::from_secs(15);

/// Query result.
#[derive(Debug)]
Expand Down Expand Up @@ -91,16 +93,24 @@ impl QueryExecutor {
/// Send message to remote peer.
pub fn send_message(&mut self, peer: PeerId, message: Bytes, mut substream: Substream) {
self.futures.push(Box::pin(async move {
match substream.send_framed(message).await {
Ok(_) => QueryContext {
match tokio::time::timeout(WRITE_TIMEOUT, substream.send_framed(message)).await {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if you already considered this, but a timeout is typically not needed for write operations, because send_framed(message)).await will return an error if ACK is not received for sent data on the stream level.

On the other hand, the timeout can complicate sending bigger payloads on slow connections, so I would be extra careful when adding it.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking about it again, it should be safe to add a 15-sec timeout for writing if we already have a 15-sec timeout for reading 🤔

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I missed that ACK on the stream level implementation 🤔 Yep, I would keep the timeout more as a safety net in case we do anything wrong on the lower level implementations 🙏

// Timeout error.
Err(_) =>
return QueryContext {
peer,
query_id: None,
result: QueryResult::Timeout,
},
// Writing message to substream failed.
Ok(Err(_)) => QueryContext {
peer,
query_id: None,
result: QueryResult::SendSuccess { substream },
result: QueryResult::SubstreamClosed,
},
Err(_) => QueryContext {
Ok(Ok(())) => QueryContext {
peer,
query_id: None,
result: QueryResult::SubstreamClosed,
result: QueryResult::SendSuccess { substream },
},
}
}));
Expand Down Expand Up @@ -143,14 +153,25 @@ impl QueryExecutor {
mut substream: Substream,
) {
self.futures.push(Box::pin(async move {
if let Err(_) = substream.send_framed(message).await {
let _ = substream.close().await;
return QueryContext {
peer,
query_id,
result: QueryResult::SubstreamClosed,
};
}
match tokio::time::timeout(WRITE_TIMEOUT, substream.send_framed(message)).await {
// Timeout error.
Err(_) =>
return QueryContext {
peer,
query_id,
result: QueryResult::Timeout,
},
// Writing message to substream failed.
Ok(Err(_)) => {
let _ = substream.close().await;
return QueryContext {
peer,
query_id,
result: QueryResult::SubstreamClosed,
};
}
Ok(Ok(())) => (),
};

match tokio::time::timeout(READ_TIMEOUT, substream.next()).await {
Err(_) => QueryContext {
Expand Down