From 3c058b3bacdc378721d1c6faf2a4d65c5b77725e Mon Sep 17 00:00:00 2001 From: ackintosh Date: Tue, 22 Oct 2024 08:04:12 +0900 Subject: [PATCH] Remove ActiveRequestsLimiter https://github.com/sigp/lighthouse/pull/5923#discussion_r1799577038 --- .../src/rpc/active_requests_limiter.rs | 121 ------------------ .../lighthouse_network/src/rpc/handler.rs | 17 ++- beacon_node/lighthouse_network/src/rpc/mod.rs | 50 ++++---- .../lighthouse_network/tests/rpc_tests.rs | 53 ++++++-- 4 files changed, 78 insertions(+), 163 deletions(-) delete mode 100644 beacon_node/lighthouse_network/src/rpc/active_requests_limiter.rs diff --git a/beacon_node/lighthouse_network/src/rpc/active_requests_limiter.rs b/beacon_node/lighthouse_network/src/rpc/active_requests_limiter.rs deleted file mode 100644 index 19acb1ffe3..0000000000 --- a/beacon_node/lighthouse_network/src/rpc/active_requests_limiter.rs +++ /dev/null @@ -1,121 +0,0 @@ -use crate::rpc::{Protocol, SubstreamId}; -use libp2p::swarm::ConnectionId; -use libp2p::PeerId; -use std::collections::hash_map::Entry; -use std::collections::HashMap; - -/// Restricts more than two inbound requests from running simultaneously on the same protocol per peer. -pub(super) struct ActiveRequestsLimiter { - requests: HashMap>, -} - -impl ActiveRequestsLimiter { - pub(super) fn new() -> Self { - Self { - requests: HashMap::new(), - } - } - - /// Allows if there is not a request on the same protocol. - pub(super) fn allows( - &mut self, - peer_id: PeerId, - protocol: Protocol, - connection_id: &ConnectionId, - substream_id: &SubstreamId, - ) -> bool { - match self.requests.entry(peer_id) { - Entry::Occupied(mut entry) => { - for (p, _cid, _sid) in entry.get_mut().iter_mut() { - // Check if there is a request on the same protocol. - if p == &protocol { - return false; - } - } - - // Request on the same protocol was not found. - entry - .get_mut() - .push((protocol, *connection_id, *substream_id)); - true - } - Entry::Vacant(entry) => { - // No active requests for the peer. - entry.insert(vec![(protocol, *connection_id, *substream_id)]); - true - } - } - } - - /// Removes the request with the given SubstreamId and ConnectionId. - pub(super) fn remove_request( - &mut self, - peer_id: PeerId, - connection_id: &ConnectionId, - substream_id: &SubstreamId, - ) { - if let Some(requests) = self.requests.get_mut(&peer_id) { - requests.retain(|(_protocol, cid, sid)| cid != connection_id && sid != substream_id); - } - } - - /// Removes the requests with the given PeerId. - pub(super) fn remove_peer(&mut self, peer_id: &PeerId) { - self.requests.remove(peer_id); - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn test_limiter() { - let mut limiter = ActiveRequestsLimiter::new(); - let peer_id = PeerId::random(); - let connection_id = ConnectionId::new_unchecked(1); - let substream_id = SubstreamId::new(1); - - assert!(limiter.allows(peer_id, Protocol::Status, &connection_id, &substream_id)); - // Not allowed since a request for the same protocol is in progress. - assert!(!limiter.allows(peer_id, Protocol::Status, &connection_id, &substream_id)); - // Allowed since there is no BlocksByRange request in the active requests. - assert!(limiter.allows( - peer_id, - Protocol::BlocksByRange, - &connection_id, - &SubstreamId::new(2) - )); - // Allowed since there is no request from the peer in the active requests. - assert!(limiter.allows( - PeerId::random(), - Protocol::Status, - &connection_id, - &substream_id - )); - - // Remove the Status request. - limiter.remove_request(peer_id, &connection_id, &substream_id); - assert!(limiter.allows( - peer_id, - Protocol::Status, - &connection_id, - &SubstreamId::new(3) - )); - - // Remove the peer. - limiter.remove_peer(&peer_id); - assert!(limiter.allows( - peer_id, - Protocol::Status, - &connection_id, - &SubstreamId::new(4) - )); - assert!(limiter.allows( - peer_id, - Protocol::BlocksByRange, - &connection_id, - &SubstreamId::new(5) - )); - } -} diff --git a/beacon_node/lighthouse_network/src/rpc/handler.rs b/beacon_node/lighthouse_network/src/rpc/handler.rs index be169c543c..6547a61075 100644 --- a/beacon_node/lighthouse_network/src/rpc/handler.rs +++ b/beacon_node/lighthouse_network/src/rpc/handler.rs @@ -16,6 +16,7 @@ use libp2p::swarm::handler::{ FullyNegotiatedInbound, FullyNegotiatedOutbound, StreamUpgradeError, SubstreamProtocol, }; use libp2p::swarm::{ConnectionId, Stream}; +use libp2p::PeerId; use slog::{crit, debug, trace}; use smallvec::SmallVec; use std::{ @@ -89,6 +90,12 @@ pub struct RPCHandler where E: EthSpec, { + /// This `ConnectionId`. + connection_id: ConnectionId, + + /// The matching `PeerId` of this connection. + peer_id: PeerId, + /// The upgrade for inbound substreams. listen_protocol: SubstreamProtocol, ()>, @@ -140,9 +147,6 @@ where /// Timeout that will be used for inbound and outbound responses. resp_timeout: Duration, - - /// This `ConnectionId`. - connection_id: ConnectionId, } enum HandlerState { @@ -222,13 +226,16 @@ where E: EthSpec, { pub fn new( + connection_id: ConnectionId, + peer_id: PeerId, listen_protocol: SubstreamProtocol, ()>, fork_context: Arc, log: &slog::Logger, resp_timeout: Duration, - connection_id: ConnectionId, ) -> Self { RPCHandler { + connection_id, + peer_id, listen_protocol, events_out: SmallVec::new(), dial_queue: SmallVec::new(), @@ -246,7 +253,6 @@ where waker: None, log: log.clone(), resp_timeout, - connection_id, } } @@ -903,6 +909,7 @@ where self.events_out .push(HandlerEvent::Ok(RPCReceived::Request(Request { id: RequestId::next(), + peer_id: self.peer_id, connection_id: self.connection_id, substream_id: self.current_inbound_substream_id, r#type: req, diff --git a/beacon_node/lighthouse_network/src/rpc/mod.rs b/beacon_node/lighthouse_network/src/rpc/mod.rs index 96a5428ec7..7340d9b9b3 100644 --- a/beacon_node/lighthouse_network/src/rpc/mod.rs +++ b/beacon_node/lighthouse_network/src/rpc/mod.rs @@ -30,7 +30,6 @@ pub use protocol::RequestType; use self::config::{InboundRateLimiterConfig, OutboundRateLimiterConfig}; use self::protocol::RPCProtocol; use self::self_limiter::SelfRateLimiter; -use crate::rpc::active_requests_limiter::ActiveRequestsLimiter; use crate::rpc::rate_limiter::RateLimiterItem; use crate::rpc::response_limiter::ResponseLimiter; pub use handler::SubstreamId; @@ -40,7 +39,6 @@ pub use methods::{ }; pub use protocol::{max_rpc_size, Protocol, RPCError}; -mod active_requests_limiter; pub(crate) mod codec; pub mod config; mod handler; @@ -53,6 +51,9 @@ mod self_limiter; static NEXT_REQUEST_ID: AtomicUsize = AtomicUsize::new(1); +// Maximum number of concurrent requests per protocol ID that a client may issue. +const MAX_CONCURRENT_REQUESTS: usize = 2; + /// Composite trait for a request id. pub trait ReqId: Send + 'static + std::fmt::Debug + Copy + Clone {} impl ReqId for T where T: Send + 'static + std::fmt::Debug + Copy + Clone {} @@ -118,6 +119,7 @@ impl RequestId { #[derive(Debug, Clone)] pub struct Request { pub id: RequestId, + pub peer_id: PeerId, pub connection_id: ConnectionId, pub substream_id: SubstreamId, pub r#type: RequestType, @@ -159,9 +161,6 @@ pub struct RPC { response_limiter: Option>, /// Rate limiter for our own requests. outbound_request_limiter: Option>, - /// Limiter for inbound requests, which restricts more than two requests from running - /// simultaneously on the same protocol per peer. - active_inbound_requests_limiter: ActiveRequestsLimiter, /// Active inbound requests that are awaiting a response. active_inbound_requests: HashMap>, /// Queue of events to be processed. @@ -200,7 +199,6 @@ impl RPC { RPC { response_limiter, outbound_request_limiter, - active_inbound_requests_limiter: ActiveRequestsLimiter::new(), active_inbound_requests: HashMap::new(), events: Vec::new(), fork_context, @@ -246,12 +244,6 @@ impl RPC { } } - self.active_inbound_requests_limiter.remove_request( - peer_id, - &request.connection_id, - &request.substream_id, - ); - self.events.push(ToSwarm::NotifyHandler { peer_id, handler: NotifyHandler::One(request.connection_id), @@ -354,11 +346,12 @@ where .log .new(slog::o!("peer_id" => peer_id.to_string(), "connection_id" => connection_id.to_string())); let handler = RPCHandler::new( + connection_id, + peer_id, protocol, self.fork_context.clone(), &log, self.network_params.resp_timeout, - connection_id, ); Ok(handler) @@ -388,11 +381,12 @@ where .new(slog::o!("peer_id" => peer_id.to_string(), "connection_id" => connection_id.to_string())); let handler = RPCHandler::new( + connection_id, + peer_id, protocol, self.fork_context.clone(), &log, self.network_params.resp_timeout, - connection_id, ); Ok(handler) @@ -432,7 +426,8 @@ where } } - self.active_inbound_requests_limiter.remove_peer(&peer_id); + self.active_inbound_requests + .retain(|_request_id, request| request.peer_id != peer_id); if let Some(limiter) = self.response_limiter.as_mut() { limiter.peer_disconnected(peer_id); @@ -470,27 +465,34 @@ where match event { HandlerEvent::Ok(RPCReceived::Request(Request { id, + peer_id, connection_id, substream_id, r#type, })) => { let request = Request { id, + peer_id, connection_id, substream_id, r#type, }; + let active_requests = self + .active_inbound_requests + .iter() + .filter(|(_request_id, active_request)| { + active_request.peer_id == peer_id + && active_request.r#type.protocol() == request.r#type.protocol() + }) + .count(); + // We need to insert the request regardless of whether it is allowed by the limiter, // since we send an error response (RateLimited) if it is not allowed. self.active_inbound_requests.insert(id, request.clone()); - if !self.active_inbound_requests_limiter.allows( - peer_id, - request.r#type.protocol(), - &conn_id, - &substream_id, - ) { + // Restricts more than MAX_CONCURRENT_REQUESTS inbound requests from running simultaneously on the same protocol per peer. + if active_requests >= MAX_CONCURRENT_REQUESTS { // There is already an active request with the same protocol. Send an error code to the peer. debug!(self.log, "There is an active request with the same protocol"; "peer_id" => peer_id.to_string(), "request" => %request.r#type, "protocol" => %request.r#type.versioned_protocol().protocol()); self.send_response( @@ -570,12 +572,6 @@ where if let Some(response_limiter) = self.response_limiter.as_mut() { if let Poll::Ready(responses) = response_limiter.poll_ready(cx) { for response in responses { - self.active_inbound_requests_limiter.remove_request( - response.peer_id, - &response.connection_id, - &response.substream_id, - ); - self.events.push(ToSwarm::NotifyHandler { peer_id: response.peer_id, handler: NotifyHandler::One(response.connection_id), diff --git a/beacon_node/lighthouse_network/tests/rpc_tests.rs b/beacon_node/lighthouse_network/tests/rpc_tests.rs index 4cbde5fc35..20840d4470 100644 --- a/beacon_node/lighthouse_network/tests/rpc_tests.rs +++ b/beacon_node/lighthouse_network/tests/rpc_tests.rs @@ -1410,7 +1410,7 @@ fn test_active_requests() { ) .await; - // Dummy STATUS RPC message. + // Dummy STATUS RPC request. let rpc_request = RequestType::Status(StatusMessage { fork_digest: [0; 4], finalized_root: Hash256::from_low_u64_be(0), @@ -1419,8 +1419,19 @@ fn test_active_requests() { head_slot: Slot::new(1), }); + // Dummy STATUS RPC response + let rpc_response = Response::Status(StatusMessage { + fork_digest: [0; 4], + finalized_root: Hash256::zero(), + finalized_epoch: Epoch::new(1), + head_root: Hash256::zero(), + head_slot: Slot::new(1), + }); + // Build the sender future. let sender_future = async { + let mut response_received = 0; + let mut rate_limited = 0; loop { match sender.next_event().await { NetworkEvent::PeerConnectedOutgoing(peer_id) => { @@ -1432,9 +1443,15 @@ fn test_active_requests() { sender .send_request(peer_id, AppRequestId::Router, rpc_request.clone()) .unwrap(); + sender + .send_request(peer_id, AppRequestId::Router, rpc_request.clone()) + .unwrap(); } - NetworkEvent::ResponseReceived { .. } => { - unreachable!(); + NetworkEvent::ResponseReceived { response, .. } => { + debug!(log, "Sender received response"; "response" => ?response); + if matches!(response, Response::Status(_)) { + response_received += 1; + } } NetworkEvent::RPCFailed { id: _, @@ -1442,26 +1459,42 @@ fn test_active_requests() { error, } => { debug!(log, "RPC Failed"; "error" => ?error); - // Verify that the sender received a rate-limited error. assert!(matches!( error, RPCError::ErrorResponse(RpcErrorResponse::RateLimited, ..) )); - // End the test. - return; + rate_limited += 1; } _ => {} } + + // The sender sent 3 requests, and 1 rate-limited error is expected due to the MAX_CONCURRENT_REQUESTS limit. + if response_received + rate_limited == 3 { + assert_eq!(1, rate_limited); + return; + } } }; // Build the receiver future. let receiver_future = async { + let mut received_requests = vec![]; loop { - if let NetworkEvent::RequestReceived { id, .. } = receiver.next_event().await { - debug!(log, "Receiver received request"; "request_id" => ?id); - // Do not send a response to intentionally trigger the RPC error. - continue; + tokio::select! { + event = receiver.next_event() => { + if let NetworkEvent::RequestReceived { peer_id, id, request } = event { + debug!(log, "Receiver received request"; "request" => ?request); + if matches!(request.r#type, RequestType::Status(_)) { + received_requests.push((peer_id, id, request.id)); + } + } + } + // Introduce a delay in sending responses to trigger a rate-limited error. + _ = sleep(Duration::from_secs(5)) => { + for (peer_id, id, request_id) in received_requests.drain(..) { + receiver.send_response(peer_id, id, request_id, rpc_response.clone()); + } + } } } };