Skip to content

Commit

Permalink
Remove ActiveRequestsLimiter
Browse files Browse the repository at this point in the history
  • Loading branch information
ackintosh committed Oct 21, 2024
1 parent 5c9e063 commit 3c058b3
Show file tree
Hide file tree
Showing 4 changed files with 78 additions and 163 deletions.
121 changes: 0 additions & 121 deletions beacon_node/lighthouse_network/src/rpc/active_requests_limiter.rs

This file was deleted.

17 changes: 12 additions & 5 deletions beacon_node/lighthouse_network/src/rpc/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use libp2p::swarm::handler::{
FullyNegotiatedInbound, FullyNegotiatedOutbound, StreamUpgradeError, SubstreamProtocol,
};
use libp2p::swarm::{ConnectionId, Stream};
use libp2p::PeerId;
use slog::{crit, debug, trace};
use smallvec::SmallVec;
use std::{
Expand Down Expand Up @@ -89,6 +90,12 @@ pub struct RPCHandler<Id, E>
where
E: EthSpec,
{
/// This `ConnectionId`.
connection_id: ConnectionId,

/// The matching `PeerId` of this connection.
peer_id: PeerId,

/// The upgrade for inbound substreams.
listen_protocol: SubstreamProtocol<RPCProtocol<E>, ()>,

Expand Down Expand Up @@ -140,9 +147,6 @@ where

/// Timeout that will be used for inbound and outbound responses.
resp_timeout: Duration,

/// This `ConnectionId`.
connection_id: ConnectionId,
}

enum HandlerState {
Expand Down Expand Up @@ -222,13 +226,16 @@ where
E: EthSpec,
{
pub fn new(
connection_id: ConnectionId,
peer_id: PeerId,
listen_protocol: SubstreamProtocol<RPCProtocol<E>, ()>,
fork_context: Arc<ForkContext>,
log: &slog::Logger,
resp_timeout: Duration,
connection_id: ConnectionId,
) -> Self {
RPCHandler {
connection_id,
peer_id,
listen_protocol,
events_out: SmallVec::new(),
dial_queue: SmallVec::new(),
Expand All @@ -246,7 +253,6 @@ where
waker: None,
log: log.clone(),
resp_timeout,
connection_id,
}
}

Expand Down Expand Up @@ -903,6 +909,7 @@ where
self.events_out
.push(HandlerEvent::Ok(RPCReceived::Request(Request {
id: RequestId::next(),
peer_id: self.peer_id,
connection_id: self.connection_id,
substream_id: self.current_inbound_substream_id,
r#type: req,
Expand Down
50 changes: 23 additions & 27 deletions beacon_node/lighthouse_network/src/rpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ pub use protocol::RequestType;
use self::config::{InboundRateLimiterConfig, OutboundRateLimiterConfig};
use self::protocol::RPCProtocol;
use self::self_limiter::SelfRateLimiter;
use crate::rpc::active_requests_limiter::ActiveRequestsLimiter;
use crate::rpc::rate_limiter::RateLimiterItem;
use crate::rpc::response_limiter::ResponseLimiter;
pub use handler::SubstreamId;
Expand All @@ -40,7 +39,6 @@ pub use methods::{
};
pub use protocol::{max_rpc_size, Protocol, RPCError};

mod active_requests_limiter;
pub(crate) mod codec;
pub mod config;
mod handler;
Expand All @@ -53,6 +51,9 @@ mod self_limiter;

static NEXT_REQUEST_ID: AtomicUsize = AtomicUsize::new(1);

// Maximum number of concurrent requests per protocol ID that a client may issue.
const MAX_CONCURRENT_REQUESTS: usize = 2;

/// Composite trait for a request id.
pub trait ReqId: Send + 'static + std::fmt::Debug + Copy + Clone {}
impl<T> ReqId for T where T: Send + 'static + std::fmt::Debug + Copy + Clone {}
Expand Down Expand Up @@ -118,6 +119,7 @@ impl RequestId {
#[derive(Debug, Clone)]
pub struct Request<E: EthSpec> {
pub id: RequestId,
pub peer_id: PeerId,
pub connection_id: ConnectionId,
pub substream_id: SubstreamId,
pub r#type: RequestType<E>,
Expand Down Expand Up @@ -159,9 +161,6 @@ pub struct RPC<Id: ReqId, E: EthSpec> {
response_limiter: Option<ResponseLimiter<E>>,
/// Rate limiter for our own requests.
outbound_request_limiter: Option<SelfRateLimiter<Id, E>>,
/// Limiter for inbound requests, which restricts more than two requests from running
/// simultaneously on the same protocol per peer.
active_inbound_requests_limiter: ActiveRequestsLimiter,
/// Active inbound requests that are awaiting a response.
active_inbound_requests: HashMap<RequestId, Request<E>>,
/// Queue of events to be processed.
Expand Down Expand Up @@ -200,7 +199,6 @@ impl<Id: ReqId, E: EthSpec> RPC<Id, E> {
RPC {
response_limiter,
outbound_request_limiter,
active_inbound_requests_limiter: ActiveRequestsLimiter::new(),
active_inbound_requests: HashMap::new(),
events: Vec::new(),
fork_context,
Expand Down Expand Up @@ -246,12 +244,6 @@ impl<Id: ReqId, E: EthSpec> RPC<Id, E> {
}
}

self.active_inbound_requests_limiter.remove_request(
peer_id,
&request.connection_id,
&request.substream_id,
);

self.events.push(ToSwarm::NotifyHandler {
peer_id,
handler: NotifyHandler::One(request.connection_id),
Expand Down Expand Up @@ -354,11 +346,12 @@ where
.log
.new(slog::o!("peer_id" => peer_id.to_string(), "connection_id" => connection_id.to_string()));
let handler = RPCHandler::new(
connection_id,
peer_id,
protocol,
self.fork_context.clone(),
&log,
self.network_params.resp_timeout,
connection_id,
);

Ok(handler)
Expand Down Expand Up @@ -388,11 +381,12 @@ where
.new(slog::o!("peer_id" => peer_id.to_string(), "connection_id" => connection_id.to_string()));

let handler = RPCHandler::new(
connection_id,
peer_id,
protocol,
self.fork_context.clone(),
&log,
self.network_params.resp_timeout,
connection_id,
);

Ok(handler)
Expand Down Expand Up @@ -432,7 +426,8 @@ where
}
}

self.active_inbound_requests_limiter.remove_peer(&peer_id);
self.active_inbound_requests
.retain(|_request_id, request| request.peer_id != peer_id);

if let Some(limiter) = self.response_limiter.as_mut() {
limiter.peer_disconnected(peer_id);
Expand Down Expand Up @@ -470,27 +465,34 @@ where
match event {
HandlerEvent::Ok(RPCReceived::Request(Request {
id,
peer_id,
connection_id,
substream_id,
r#type,
})) => {
let request = Request {
id,
peer_id,
connection_id,
substream_id,
r#type,
};

let active_requests = self
.active_inbound_requests
.iter()
.filter(|(_request_id, active_request)| {
active_request.peer_id == peer_id
&& active_request.r#type.protocol() == request.r#type.protocol()
})
.count();

// We need to insert the request regardless of whether it is allowed by the limiter,
// since we send an error response (RateLimited) if it is not allowed.
self.active_inbound_requests.insert(id, request.clone());

if !self.active_inbound_requests_limiter.allows(
peer_id,
request.r#type.protocol(),
&conn_id,
&substream_id,
) {
// Restricts more than MAX_CONCURRENT_REQUESTS inbound requests from running simultaneously on the same protocol per peer.
if active_requests >= MAX_CONCURRENT_REQUESTS {
// There is already an active request with the same protocol. Send an error code to the peer.
debug!(self.log, "There is an active request with the same protocol"; "peer_id" => peer_id.to_string(), "request" => %request.r#type, "protocol" => %request.r#type.versioned_protocol().protocol());
self.send_response(
Expand Down Expand Up @@ -570,12 +572,6 @@ where
if let Some(response_limiter) = self.response_limiter.as_mut() {
if let Poll::Ready(responses) = response_limiter.poll_ready(cx) {
for response in responses {
self.active_inbound_requests_limiter.remove_request(
response.peer_id,
&response.connection_id,
&response.substream_id,
);

self.events.push(ToSwarm::NotifyHandler {
peer_id: response.peer_id,
handler: NotifyHandler::One(response.connection_id),
Expand Down
Loading

0 comments on commit 3c058b3

Please sign in to comment.