Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove generic E from RequestId #6462

Merged
merged 2 commits into from
Oct 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 10 additions & 9 deletions beacon_node/lighthouse_network/src/rpc/codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ const CONTEXT_BYTES_LEN: usize = 4;

/* Inbound Codec */

pub struct SSZSnappyInboundCodec<E: EthSpec> {
pub struct SSZSnappyInboundCodec<E> {
protocol: ProtocolId,
inner: Uvi<usize>,
len: Option<usize>,
Expand Down Expand Up @@ -142,7 +142,7 @@ impl<E: EthSpec> Encoder<RpcResponse<E>> for SSZSnappyInboundCodec<E> {

// Decoder for inbound streams: Decodes RPC requests from peers
impl<E: EthSpec> Decoder for SSZSnappyInboundCodec<E> {
type Item = RequestType<E>;
type Item = RequestType;
type Error = RPCError;

fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
Expand Down Expand Up @@ -194,7 +194,7 @@ impl<E: EthSpec> Decoder for SSZSnappyInboundCodec<E> {
}

/* Outbound Codec: Codec for initiating RPC requests */
pub struct SSZSnappyOutboundCodec<E: EthSpec> {
pub struct SSZSnappyOutboundCodec<E> {
inner: Uvi<usize>,
len: Option<usize>,
protocol: ProtocolId,
Expand Down Expand Up @@ -321,10 +321,10 @@ impl<E: EthSpec> SSZSnappyOutboundCodec<E> {
}

// Encoder for outbound streams: Encodes RPC Requests to peers
impl<E: EthSpec> Encoder<RequestType<E>> for SSZSnappyOutboundCodec<E> {
impl<E: EthSpec> Encoder<RequestType> for SSZSnappyOutboundCodec<E> {
type Error = RPCError;

fn encode(&mut self, item: RequestType<E>, dst: &mut BytesMut) -> Result<(), Self::Error> {
fn encode(&mut self, item: RequestType, dst: &mut BytesMut) -> Result<(), Self::Error> {
let bytes = match item {
RequestType::Status(req) => req.as_ssz_bytes(),
RequestType::Goodbye(req) => req.as_ssz_bytes(),
Expand Down Expand Up @@ -543,11 +543,11 @@ fn handle_length(
/// Decodes an `InboundRequest` from the byte stream.
/// `decoded_buffer` should be an ssz-encoded bytestream with
// length = length-prefix received in the beginning of the stream.
fn handle_rpc_request<E: EthSpec>(
fn handle_rpc_request(
versioned_protocol: SupportedProtocol,
decoded_buffer: &[u8],
spec: &ChainSpec,
) -> Result<Option<RequestType<E>>, RPCError> {
) -> Result<Option<RequestType>, RPCError> {
match versioned_protocol {
SupportedProtocol::StatusV1 => Ok(Some(RequestType::Status(
StatusMessage::from_ssz_bytes(decoded_buffer)?,
Expand Down Expand Up @@ -1009,6 +1009,7 @@ mod tests {
BlobsByRangeRequest {
start_slot: 0,
count: 10,
max_blobs_per_block: Spec::max_blobs_per_block(),
}
}

Expand Down Expand Up @@ -1154,7 +1155,7 @@ mod tests {
}

/// Verifies that requests we send are encoded in a way that we would correctly decode too.
fn encode_then_decode_request(req: RequestType<Spec>, fork_name: ForkName, spec: &ChainSpec) {
fn encode_then_decode_request(req: RequestType, fork_name: ForkName, spec: &ChainSpec) {
let fork_context = Arc::new(fork_context(fork_name));
let max_packet_size = max_rpc_size(&fork_context, spec.max_chunk_size as usize);
let protocol = ProtocolId::new(req.versioned_protocol(), Encoding::SSZSnappy);
Expand Down Expand Up @@ -1745,7 +1746,7 @@ mod tests {
fn test_encode_then_decode_request() {
let chain_spec = Spec::default_spec();

let requests: &[RequestType<Spec>] = &[
let requests: &[RequestType] = &[
RequestType::Ping(ping_message()),
RequestType::Status(status_message()),
RequestType::Goodbye(GoodbyeReason::Fault),
Expand Down
14 changes: 8 additions & 6 deletions beacon_node/lighthouse_network/src/rpc/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use slog::{crit, debug, trace};
use smallvec::SmallVec;
use std::{
collections::{hash_map::Entry, VecDeque},
marker::PhantomData,
pin::Pin,
sync::Arc,
task::{Context, Poll},
Expand Down Expand Up @@ -96,7 +97,7 @@ where
events_out: SmallVec<[HandlerEvent<Id, E>; 4]>,

/// Queue of outbound substreams to open.
dial_queue: SmallVec<[(Id, RequestType<E>); 4]>,
dial_queue: SmallVec<[(Id, RequestType); 4]>,

/// Current number of concurrent outbound substreams being opened.
dial_negotiated: u32,
Expand Down Expand Up @@ -206,7 +207,7 @@ pub enum OutboundSubstreamState<E: EthSpec> {
/// The framed negotiated substream.
substream: Box<OutboundFramed<Stream, E>>,
/// Keeps track of the actual request sent.
request: RequestType<E>,
request: RequestType,
},
/// Closing an outbound substream>
Closing(Box<OutboundFramed<Stream, E>>),
Expand Down Expand Up @@ -274,7 +275,7 @@ where
}

/// Opens an outbound substream with a request.
fn send_request(&mut self, id: Id, req: RequestType<E>) {
fn send_request(&mut self, id: Id, req: RequestType) {
match self.state {
HandlerState::Active => {
self.dial_queue.push((id, req));
Expand Down Expand Up @@ -330,7 +331,7 @@ where
type ToBehaviour = HandlerEvent<Id, E>;
type InboundProtocol = RPCProtocol<E>;
type OutboundProtocol = OutboundRequestContainer<E>;
type OutboundOpenInfo = (Id, RequestType<E>); // Keep track of the id and the request
type OutboundOpenInfo = (Id, RequestType); // Keep track of the id and the request
type InboundOpenInfo = ();

fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol, ()> {
Expand Down Expand Up @@ -788,6 +789,7 @@ where
req: req.clone(),
fork_context: self.fork_context.clone(),
max_rpc_size: self.listen_protocol().upgrade().max_rpc_size,
phantom: PhantomData,
},
(),
)
Expand Down Expand Up @@ -905,7 +907,7 @@ where
fn on_fully_negotiated_outbound(
&mut self,
substream: OutboundFramed<Stream, E>,
(id, request): (Id, RequestType<E>),
(id, request): (Id, RequestType),
) {
self.dial_negotiated -= 1;
// Reset any io-retries counter.
Expand Down Expand Up @@ -961,7 +963,7 @@ where
}
fn on_dial_upgrade_error(
&mut self,
request_info: (Id, RequestType<E>),
request_info: (Id, RequestType),
error: StreamUpgradeError<RPCError>,
) {
let (id, req) = request_info;
Expand Down
28 changes: 11 additions & 17 deletions beacon_node/lighthouse_network/src/rpc/methods.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ use ssz_derive::{Decode, Encode};
use ssz_types::{typenum::U256, VariableList};
use std::collections::BTreeMap;
use std::fmt::Display;
use std::marker::PhantomData;
use std::ops::Deref;
use std::sync::Arc;
use strum::IntoStaticStr;
Expand Down Expand Up @@ -93,27 +92,19 @@ pub struct Ping {
variant_attributes(derive(Clone, Debug, PartialEq, Serialize),)
)]
#[derive(Clone, Debug, PartialEq)]
pub struct MetadataRequest<E: EthSpec> {
_phantom_data: PhantomData<E>,
}
pub struct MetadataRequest;

impl<E: EthSpec> MetadataRequest<E> {
impl MetadataRequest {
pub fn new_v1() -> Self {
Self::V1(MetadataRequestV1 {
_phantom_data: PhantomData,
})
Self::V1(MetadataRequestV1 {})
}

pub fn new_v2() -> Self {
Self::V2(MetadataRequestV2 {
_phantom_data: PhantomData,
})
Self::V2(MetadataRequestV2 {})
}

pub fn new_v3() -> Self {
Self::V3(MetadataRequestV3 {
_phantom_data: PhantomData,
})
Self::V3(MetadataRequestV3 {})
}
}

Expand Down Expand Up @@ -323,11 +314,14 @@ pub struct BlobsByRangeRequest {

/// The number of slots from the start slot.
pub count: u64,

/// maximum number of blobs in a single block.
pub max_blobs_per_block: usize,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This value is the same on all requests, does it need to be appended to every request? Or can it be part of the handler struct

Copy link
Member Author

@jxs jxs Oct 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi Lion, good question, the thing is:

self.network_send
.send(NetworkMessage::SendRequest {
peer_id,
request: RequestType::BlobsByRange(BlobsByRangeRequest {
start_slot: *request.start_slot(),
count: *request.count(),
}),
request_id: AppRequestId::Sync(SyncRequestId::RangeBlockAndBlobs { id }),
})
.map_err(|_| RpcRequestSendError::NetworkSendError)?;

the request is created here for example, it's outside of the Rpc NetworkBehaviour.
Also BlobsByRangeRequest::max_blobs_requested(&self) requires it, how would you envision storing max_blobs_per_block?
My objective is for Rpc to be completely unaware of E: EthSpec so that we can uncouple it from the rest of lighthouse and all its config values are passed dynamically.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, I take that back, if we increase the blob count we will have different requests with different limits at runtime. So okay with me to keep it on the request

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we should have added this field to BlobsByRangeRequest. It breaks the SSZ representation.

}

impl BlobsByRangeRequest {
pub fn max_blobs_requested<E: EthSpec>(&self) -> u64 {
self.count.saturating_mul(E::max_blobs_per_block() as u64)
pub fn max_blobs_requested(&self) -> u64 {
self.count.saturating_mul(self.max_blobs_per_block as u64)
}
}

Expand All @@ -343,7 +337,7 @@ pub struct DataColumnsByRangeRequest {
}

impl DataColumnsByRangeRequest {
pub fn max_requested<E: EthSpec>(&self) -> u64 {
pub fn max_requested(&self) -> u64 {
self.count.saturating_mul(self.columns.len() as u64)
}

Expand Down
10 changes: 5 additions & 5 deletions beacon_node/lighthouse_network/src/rpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ pub enum RPCSend<Id, E: EthSpec> {
///
/// The `Id` is given by the application making the request. These
/// go over *outbound* connections.
Request(Id, RequestType<E>),
Request(Id, RequestType),
/// A response sent from Lighthouse.
///
/// The `SubstreamId` must correspond to the RPC-given ID of the original request received from the
Expand All @@ -79,7 +79,7 @@ pub enum RPCReceived<Id, E: EthSpec> {
///
/// The `SubstreamId` is given by the `RPCHandler` as it identifies this request with the
/// *inbound* substream over which it is managed.
Request(Request<E>),
Request(Request),
/// A response received from the outside.
///
/// The `Id` corresponds to the application given ID of the original request sent to the
Expand Down Expand Up @@ -113,10 +113,10 @@ impl RequestId {

/// An Rpc Request.
#[derive(Debug, Clone)]
pub struct Request<E: EthSpec> {
pub struct Request {
pub id: RequestId,
pub substream_id: SubstreamId,
pub r#type: RequestType<E>,
pub r#type: RequestType,
}

impl<E: EthSpec, Id: std::fmt::Debug> std::fmt::Display for RPCSend<Id, E> {
Expand Down Expand Up @@ -221,7 +221,7 @@ impl<Id: ReqId, E: EthSpec> RPC<Id, E> {
/// Submits an RPC request.
///
/// The peer must be connected for this to succeed.
pub fn send_request(&mut self, peer_id: PeerId, request_id: Id, req: RequestType<E>) {
pub fn send_request(&mut self, peer_id: PeerId, request_id: Id, req: RequestType) {
let event = if let Some(self_limiter) = self.self_limiter.as_mut() {
match self_limiter.allows(peer_id, request_id, req) {
Ok(event) => event,
Expand Down
8 changes: 5 additions & 3 deletions beacon_node/lighthouse_network/src/rpc/outbound.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use futures::future::BoxFuture;
use futures::prelude::{AsyncRead, AsyncWrite};
use futures::{FutureExt, SinkExt};
use libp2p::core::{OutboundUpgrade, UpgradeInfo};
use std::marker::PhantomData;
use std::sync::Arc;
use tokio_util::{
codec::Framed,
Expand All @@ -19,13 +20,14 @@ use types::{EthSpec, ForkContext};
// `OutboundUpgrade`

#[derive(Debug, Clone)]
pub struct OutboundRequestContainer<E: EthSpec> {
pub req: RequestType<E>,
pub struct OutboundRequestContainer<E> {
pub req: RequestType,
pub fork_context: Arc<ForkContext>,
pub max_rpc_size: usize,
pub phantom: PhantomData<E>,
}

impl<E: EthSpec> UpgradeInfo for OutboundRequestContainer<E> {
impl<E> UpgradeInfo for OutboundRequestContainer<E> {
type Info = ProtocolId;
type InfoIter = Vec<Self::Info>;

Expand Down
14 changes: 7 additions & 7 deletions beacon_node/lighthouse_network/src/rpc/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -643,7 +643,7 @@ pub fn rpc_data_column_limits<E: EthSpec>() -> RpcLimits {
// The inbound protocol reads the request, decodes it and returns the stream to the protocol
// handler to respond to once ready.

pub type InboundOutput<TSocket, E> = (RequestType<E>, InboundFramed<TSocket, E>);
pub type InboundOutput<TSocket, E> = (RequestType, InboundFramed<TSocket, E>);
pub type InboundFramed<TSocket, E> =
Framed<std::pin::Pin<Box<TimeoutStream<Compat<TSocket>>>>, SSZSnappyInboundCodec<E>>;

Expand Down Expand Up @@ -711,7 +711,7 @@ where
}

#[derive(Debug, Clone, PartialEq)]
pub enum RequestType<E: EthSpec> {
pub enum RequestType {
Status(StatusMessage),
Goodbye(GoodbyeReason),
BlocksByRange(OldBlocksByRangeRequest),
Expand All @@ -724,11 +724,11 @@ pub enum RequestType<E: EthSpec> {
LightClientOptimisticUpdate,
LightClientFinalityUpdate,
Ping(Ping),
MetaData(MetadataRequest<E>),
MetaData(MetadataRequest),
}

/// Implements the encoding per supported protocol for `RPCRequest`.
impl<E: EthSpec> RequestType<E> {
impl RequestType {
/* These functions are used in the handler for stream management */

/// Maximum number of responses expected for this request.
Expand All @@ -738,10 +738,10 @@ impl<E: EthSpec> RequestType<E> {
RequestType::Goodbye(_) => 0,
RequestType::BlocksByRange(req) => *req.count(),
RequestType::BlocksByRoot(req) => req.block_roots().len() as u64,
RequestType::BlobsByRange(req) => req.max_blobs_requested::<E>(),
RequestType::BlobsByRange(req) => req.max_blobs_requested(),
RequestType::BlobsByRoot(req) => req.blob_ids.len() as u64,
RequestType::DataColumnsByRoot(req) => req.data_column_ids.len() as u64,
RequestType::DataColumnsByRange(req) => req.max_requested::<E>(),
RequestType::DataColumnsByRange(req) => req.max_requested(),
RequestType::Ping(_) => 1,
RequestType::MetaData(_) => 1,
RequestType::LightClientBootstrap(_) => 1,
Expand Down Expand Up @@ -973,7 +973,7 @@ impl std::error::Error for RPCError {
}
}

impl<E: EthSpec> std::fmt::Display for RequestType<E> {
impl std::fmt::Display for RequestType {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
RequestType::Status(status) => write!(f, "Status Message: {}", status),
Expand Down
3 changes: 1 addition & 2 deletions beacon_node/lighthouse_network/src/rpc/rate_limiter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::{Duration, Instant};
use tokio::time::Interval;
use types::EthSpec;

/// Nanoseconds since a given time.
// Maintained as u64 to reduce footprint
Expand Down Expand Up @@ -252,7 +251,7 @@ pub trait RateLimiterItem {
fn max_responses(&self) -> u64;
}

impl<E: EthSpec> RateLimiterItem for super::RequestType<E> {
impl RateLimiterItem for super::RequestType {
fn protocol(&self) -> Protocol {
self.versioned_protocol().protocol()
}
Expand Down
12 changes: 6 additions & 6 deletions beacon_node/lighthouse_network/src/rpc/self_limiter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,16 @@ use super::{

/// A request that was rate limited or waiting on rate limited requests for the same peer and
/// protocol.
struct QueuedRequest<Id: ReqId, E: EthSpec> {
req: RequestType<E>,
struct QueuedRequest<Id: ReqId> {
req: RequestType,
request_id: Id,
}

pub(crate) struct SelfRateLimiter<Id: ReqId, E: EthSpec> {
/// Requests queued for sending per peer. This requests are stored when the self rate
/// limiter rejects them. Rate limiting is based on a Peer and Protocol basis, therefore
/// are stored in the same way.
delayed_requests: HashMap<(PeerId, Protocol), VecDeque<QueuedRequest<Id, E>>>,
delayed_requests: HashMap<(PeerId, Protocol), VecDeque<QueuedRequest<Id>>>,
/// The delay required to allow a peer's outbound request per protocol.
next_peer_request: DelayQueue<(PeerId, Protocol)>,
/// Rate limiter for our own requests.
Expand Down Expand Up @@ -70,7 +70,7 @@ impl<Id: ReqId, E: EthSpec> SelfRateLimiter<Id, E> {
&mut self,
peer_id: PeerId,
request_id: Id,
req: RequestType<E>,
req: RequestType,
) -> Result<BehaviourAction<Id, E>, Error> {
let protocol = req.versioned_protocol().protocol();
// First check that there are not already other requests waiting to be sent.
Expand Down Expand Up @@ -101,9 +101,9 @@ impl<Id: ReqId, E: EthSpec> SelfRateLimiter<Id, E> {
limiter: &mut RateLimiter,
peer_id: PeerId,
request_id: Id,
req: RequestType<E>,
req: RequestType,
log: &Logger,
) -> Result<BehaviourAction<Id, E>, (QueuedRequest<Id, E>, Duration)> {
) -> Result<BehaviourAction<Id, E>, (QueuedRequest<Id>, Duration)> {
match limiter.allows(&peer_id, &req) {
Ok(()) => Ok(BehaviourAction::NotifyHandler {
peer_id,
Expand Down
Loading