From 8166eb2942205fe3ee19d83d92cf0197068ef117 Mon Sep 17 00:00:00 2001 From: pawan Date: Tue, 9 Nov 2021 10:42:02 -0600 Subject: [PATCH 1/9] Fix max packet sizes --- beacon_node/lighthouse_network/src/config.rs | 2 +- .../src/rpc/codec/ssz_snappy.rs | 4 ++-- .../lighthouse_network/src/rpc/protocol.rs | 19 +++++++++---------- consensus/types/src/eth_spec.rs | 5 +++++ consensus/types/src/execution_payload.rs | 11 +++++++++++ 5 files changed, 28 insertions(+), 13 deletions(-) diff --git a/beacon_node/lighthouse_network/src/config.rs b/beacon_node/lighthouse_network/src/config.rs index 4ea3fa4b643..6bb64f83f4d 100644 --- a/beacon_node/lighthouse_network/src/config.rs +++ b/beacon_node/lighthouse_network/src/config.rs @@ -17,7 +17,7 @@ use std::time::Duration; use types::{ForkContext, ForkName}; /// The maximum transmit size of gossip messages in bytes. -pub const GOSSIP_MAX_SIZE: usize = 1_048_576; +pub const GOSSIP_MAX_SIZE: usize = 10 * 1_048_576; // 10M /// This is a constant to be used in discovery. The lower bound of the gossipsub mesh. pub const MESH_N_LOW: usize = 6; diff --git a/beacon_node/lighthouse_network/src/rpc/codec/ssz_snappy.rs b/beacon_node/lighthouse_network/src/rpc/codec/ssz_snappy.rs index 6517ec0ad8b..1afc8fcf50a 100644 --- a/beacon_node/lighthouse_network/src/rpc/codec/ssz_snappy.rs +++ b/beacon_node/lighthouse_network/src/rpc/codec/ssz_snappy.rs @@ -145,7 +145,7 @@ impl Decoder for SSZSnappyInboundCodec { // Should not attempt to decode rpc chunks with `length > max_packet_size` or not within bounds of // packet size for ssz container corresponding to `self.protocol`. let ssz_limits = self.protocol.rpc_request_limits(); - if length > self.max_packet_size || ssz_limits.is_out_of_bounds(length) { + if ssz_limits.is_out_of_bounds(length, self.max_packet_size) { return Err(RPCError::InvalidData); } // Calculate worst case compression length for given uncompressed length @@ -280,7 +280,7 @@ impl Decoder for SSZSnappyOutboundCodec { // Should not attempt to decode rpc chunks with `length > max_packet_size` or not within bounds of // packet size for ssz container corresponding to `self.protocol`. let ssz_limits = self.protocol.rpc_response_limits::(); - if length > self.max_packet_size || ssz_limits.is_out_of_bounds(length) { + if ssz_limits.is_out_of_bounds(length, self.max_packet_size) { return Err(RPCError::InvalidData); } // Calculate worst case compression length for given uncompressed length diff --git a/beacon_node/lighthouse_network/src/rpc/protocol.rs b/beacon_node/lighthouse_network/src/rpc/protocol.rs index 88c1efcd82b..baf262ecd9b 100644 --- a/beacon_node/lighthouse_network/src/rpc/protocol.rs +++ b/beacon_node/lighthouse_network/src/rpc/protocol.rs @@ -60,12 +60,10 @@ lazy_static! { ) .as_ssz_bytes() .len(); - pub static ref SIGNED_BEACON_BLOCK_MERGE_MAX: usize = SignedBeaconBlock::::from_block( - BeaconBlock::Merge(BeaconBlockMerge::full(&MainnetEthSpec::default_spec())), - Signature::empty(), - ) - .as_ssz_bytes() - .len(); + + /// The `BeaconBlockMerge` block has an `ExecutionPayload` field which has a max size ~16 GiB for future proofing. + /// We calculate the value from its fields instead of constructing the block and checking the length. + pub static ref SIGNED_BEACON_BLOCK_MERGE_MAX: usize = types::ExecutionPayload::::max_execution_payload_size(); pub static ref BLOCKS_BY_ROOT_REQUEST_MIN: usize = VariableList::::from(Vec::::new()) @@ -95,7 +93,7 @@ lazy_static! { } /// The maximum bytes that can be sent across the RPC. -const MAX_RPC_SIZE: usize = 1_048_576; // 1M +const MAX_RPC_SIZE: usize = 10 * 1_048_576; // 10M /// The protocol prefix the RPC protocol id. const PROTOCOL_PREFIX: &str = "/eth2/beacon_chain/req"; /// Time allowed for the first byte of a request to arrive before we time out (Time To First Byte). @@ -208,9 +206,10 @@ impl RpcLimits { Self { min, max } } - /// Returns true if the given length is out of bounds, false otherwise. - pub fn is_out_of_bounds(&self, length: usize) -> bool { - length > self.max || length < self.min + /// Returns true if the given length is is greater than `MAX_RPC_SIZE` or out of + /// bounds for the given ssz type, returns false otherwise. + pub fn is_out_of_bounds(&self, length: usize, max_rpc_size: usize) -> bool { + length > std::cmp::min(self.max, max_rpc_size) || length < self.min } } diff --git a/consensus/types/src/eth_spec.rs b/consensus/types/src/eth_spec.rs index 98b3c4db776..ae0cafe1fff 100644 --- a/consensus/types/src/eth_spec.rs +++ b/consensus/types/src/eth_spec.rs @@ -210,6 +210,11 @@ pub trait EthSpec: 'static + Default + Sync + Send + Clone + Debug + PartialEq + Self::MaxTransactionsPerPayload::to_usize() } + /// Returns the `MAX_EXTRA_DATA_BYTES` constant for this specification. + fn max_extra_data_bytes() -> usize { + Self::MaxExtraDataBytes::to_usize() + } + /// Returns the `BYTES_PER_LOGS_BLOOM` constant for this specification. fn bytes_per_logs_bloom() -> usize { Self::BytesPerLogsBloom::to_usize() diff --git a/consensus/types/src/execution_payload.rs b/consensus/types/src/execution_payload.rs index 7b635755125..eb9283ec449 100644 --- a/consensus/types/src/execution_payload.rs +++ b/consensus/types/src/execution_payload.rs @@ -1,5 +1,6 @@ use crate::{test_utils::TestRandom, *}; use serde_derive::{Deserialize, Serialize}; +use ssz::Encode; use ssz_derive::{Decode, Encode}; use test_random_derive::TestRandom; use tree_hash_derive::TreeHash; @@ -57,4 +58,14 @@ impl ExecutionPayload { transactions: VariableList::empty(), } } + + /// Returns the maximum size of an execution payload. + pub fn max_execution_payload_size() -> usize { + // Fixed part + Self::empty().as_ssz_bytes().len() + // length of List * size_of(uint8) + + (T::max_extra_data_bytes() * 1) + // length of List * offset size * max size of transaction + + (T::max_transactions_per_payload() *4 * T::max_bytes_per_transaction()) + } } From 8e7eecb4c3adf8ea5baf2de9e51c5c3bd55d42b9 Mon Sep 17 00:00:00 2001 From: pawan Date: Tue, 9 Nov 2021 11:58:35 -0600 Subject: [PATCH 2/9] Fix max_payload_size function --- consensus/types/src/execution_payload.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/consensus/types/src/execution_payload.rs b/consensus/types/src/execution_payload.rs index eb9283ec449..7898d78d930 100644 --- a/consensus/types/src/execution_payload.rs +++ b/consensus/types/src/execution_payload.rs @@ -59,13 +59,14 @@ impl ExecutionPayload { } } + #[allow(clippy::integer_arithmetic)] /// Returns the maximum size of an execution payload. pub fn max_execution_payload_size() -> usize { // Fixed part Self::empty().as_ssz_bytes().len() // length of List * size_of(uint8) - + (T::max_extra_data_bytes() * 1) + + (T::max_extra_data_bytes() * ::ssz_fixed_len()) // length of List * offset size * max size of transaction - + (T::max_transactions_per_payload() *4 * T::max_bytes_per_transaction()) + + (T::max_transactions_per_payload() *ssz::BYTES_PER_LENGTH_OFFSET * T::max_bytes_per_transaction()) } } From 45306c25401b0af2e5cf1f681a223f87e62c3443 Mon Sep 17 00:00:00 2001 From: pawan Date: Tue, 9 Nov 2021 11:58:43 -0600 Subject: [PATCH 3/9] Add merge block test --- .../lighthouse_network/tests/common/mod.rs | 1 + .../lighthouse_network/tests/rpc_tests.rs | 56 ++++++++++++++++--- 2 files changed, 48 insertions(+), 9 deletions(-) diff --git a/beacon_node/lighthouse_network/tests/common/mod.rs b/beacon_node/lighthouse_network/tests/common/mod.rs index 9055da05bfb..12436b624d5 100644 --- a/beacon_node/lighthouse_network/tests/common/mod.rs +++ b/beacon_node/lighthouse_network/tests/common/mod.rs @@ -22,6 +22,7 @@ fn fork_context() -> ForkContext { // Set fork_epoch to `Some` to ensure that the `ForkContext` object // includes altair in the list of forks chain_spec.altair_fork_epoch = Some(types::Epoch::new(42)); + chain_spec.merge_fork_epoch = Some(types::Epoch::new(84)); ForkContext::new::(types::Slot::new(0), Hash256::zero(), &chain_spec) } diff --git a/beacon_node/lighthouse_network/tests/rpc_tests.rs b/beacon_node/lighthouse_network/tests/rpc_tests.rs index 565304a79b0..6e63edfce45 100644 --- a/beacon_node/lighthouse_network/tests/rpc_tests.rs +++ b/beacon_node/lighthouse_network/tests/rpc_tests.rs @@ -8,14 +8,34 @@ use std::time::Duration; use tokio::runtime::Runtime; use tokio::time::sleep; use types::{ - BeaconBlock, BeaconBlockAltair, BeaconBlockBase, Epoch, EthSpec, Hash256, MinimalEthSpec, - Signature, SignedBeaconBlock, Slot, + BeaconBlock, BeaconBlockAltair, BeaconBlockBase, BeaconBlockMerge, Epoch, EthSpec, Hash256, + MinimalEthSpec, Signature, SignedBeaconBlock, Slot, }; mod common; type E = MinimalEthSpec; +/// Merge block with length < MAX_RPC_SIZE. +fn merge_block_small() -> BeaconBlock { + let mut block = BeaconBlockMerge::empty(&E::default_spec()); + let tx = VariableList::from(vec![0; 1024]); + let txs = VariableList::from(std::iter::repeat(tx).take(100).collect::>()); + + block.body.execution_payload.transactions = txs; + BeaconBlock::Merge(block) +} + +/// Merge block with length > MAX_RPC_SIZE. +fn merge_block_large() -> BeaconBlock { + let mut block = BeaconBlockMerge::empty(&E::default_spec()); + let tx = VariableList::from(vec![0; 1024]); + let txs = VariableList::from(std::iter::repeat(tx).take(100000).collect::>()); + + block.body.execution_payload.transactions = txs; + BeaconBlock::Merge(block) +} + // Tests the STATUS RPC message #[test] #[allow(clippy::single_match)] @@ -118,10 +138,10 @@ fn test_status_rpc() { #[allow(clippy::single_match)] fn test_blocks_by_range_chunked_rpc() { // set up the logging. The level and enabled logging or not - let log_level = Level::Trace; + let log_level = Level::Debug; let enable_logging = false; - let messages_to_send = 10; + let messages_to_send = 20; let log = common::build_log(log_level, enable_logging); @@ -149,6 +169,14 @@ fn test_blocks_by_range_chunked_rpc() { let signed_full_block = SignedBeaconBlock::from_block(full_block, Signature::empty()); let rpc_response_altair = Response::BlocksByRange(Some(Box::new(signed_full_block))); + let full_block = merge_block_small(); + let signed_full_block = SignedBeaconBlock::from_block(full_block, Signature::empty()); + let rpc_response_merge_small = Response::BlocksByRange(Some(Box::new(signed_full_block))); + + let full_block = merge_block_large(); + let signed_full_block = SignedBeaconBlock::from_block(full_block, Signature::empty()); + let rpc_response_merge_large = Response::BlocksByRange(Some(Box::new(signed_full_block))); + // keep count of the number of messages received let mut messages_received = 0; // build the sender future @@ -160,13 +188,13 @@ fn test_blocks_by_range_chunked_rpc() { debug!(log, "Sending RPC"); sender.swarm.behaviour_mut().send_request( peer_id, - RequestId::Sync(10), + RequestId::Sync(20), rpc_request.clone(), ); } Libp2pEvent::Behaviour(BehaviourEvent::ResponseReceived { peer_id: _, - id: RequestId::Sync(10), + id: RequestId::Sync(20), response, }) => { warn!(log, "Sender received a response"); @@ -174,8 +202,12 @@ fn test_blocks_by_range_chunked_rpc() { Response::BlocksByRange(Some(_)) => { if messages_received < 5 { assert_eq!(response, rpc_response_base.clone()); - } else { + } else if messages_received < 10 { assert_eq!(response, rpc_response_altair.clone()); + } else if messages_received < 15 { + assert_eq!(response, rpc_response_merge_small.clone()); + } else { + assert_eq!(response, rpc_response_merge_large.clone()); } messages_received += 1; warn!(log, "Chunk received"); @@ -211,8 +243,12 @@ fn test_blocks_by_range_chunked_rpc() { // second half as altair blocks. let rpc_response = if i < 5 { rpc_response_base.clone() - } else { + } else if i < 10 { rpc_response_altair.clone() + } else if i < 15 { + rpc_response_merge_small.clone() + } else { + rpc_response_merge_large.clone() }; receiver.swarm.behaviour_mut().send_successful_response( peer_id, @@ -237,7 +273,9 @@ fn test_blocks_by_range_chunked_rpc() { _ = sender_future => {} _ = receiver_future => {} _ = sleep(Duration::from_secs(10)) => { - panic!("Future timed out"); + if messages_received < 15 { + panic!("Future timed out"); + } } } }) From 5f007ca957492f2959cd933e6f01d5385faffa12 Mon Sep 17 00:00:00 2001 From: pawan Date: Tue, 9 Nov 2021 13:19:41 -0600 Subject: [PATCH 4/9] Fix max size calculation; fix up test --- .../lighthouse_network/tests/rpc_tests.rs | 53 ++++++++++--------- consensus/types/src/execution_payload.rs | 2 +- 2 files changed, 30 insertions(+), 25 deletions(-) diff --git a/beacon_node/lighthouse_network/tests/rpc_tests.rs b/beacon_node/lighthouse_network/tests/rpc_tests.rs index 6e63edfce45..d9418cb10d6 100644 --- a/beacon_node/lighthouse_network/tests/rpc_tests.rs +++ b/beacon_node/lighthouse_network/tests/rpc_tests.rs @@ -27,6 +27,8 @@ fn merge_block_small() -> BeaconBlock { } /// Merge block with length > MAX_RPC_SIZE. +/// The max limit for a merge block is in the order of ~16GiB which wouldn't fit in memory. +/// Hence, we generate a merge block just greater than `MAX_RPC_SIZE` to test rejection on the rpc layer. fn merge_block_large() -> BeaconBlock { let mut block = BeaconBlockMerge::empty(&E::default_spec()); let tx = VariableList::from(vec![0; 1024]); @@ -141,7 +143,7 @@ fn test_blocks_by_range_chunked_rpc() { let log_level = Level::Debug; let enable_logging = false; - let messages_to_send = 20; + let messages_to_send = 8; let log = common::build_log(log_level, enable_logging); @@ -188,23 +190,23 @@ fn test_blocks_by_range_chunked_rpc() { debug!(log, "Sending RPC"); sender.swarm.behaviour_mut().send_request( peer_id, - RequestId::Sync(20), + RequestId::Sync(8), rpc_request.clone(), ); } Libp2pEvent::Behaviour(BehaviourEvent::ResponseReceived { peer_id: _, - id: RequestId::Sync(20), + id: RequestId::Sync(8), response, }) => { warn!(log, "Sender received a response"); match response { Response::BlocksByRange(Some(_)) => { - if messages_received < 5 { + if messages_received < 2 { assert_eq!(response, rpc_response_base.clone()); - } else if messages_received < 10 { + } else if messages_received < 4 { assert_eq!(response, rpc_response_altair.clone()); - } else if messages_received < 15 { + } else if messages_received < 6 { assert_eq!(response, rpc_response_merge_small.clone()); } else { assert_eq!(response, rpc_response_merge_large.clone()); @@ -241,11 +243,11 @@ fn test_blocks_by_range_chunked_rpc() { for i in 0..messages_to_send { // Send first half of responses as base blocks and // second half as altair blocks. - let rpc_response = if i < 5 { + let rpc_response = if i < 2 { rpc_response_base.clone() - } else if i < 10 { + } else if i < 4 { rpc_response_altair.clone() - } else if i < 15 { + } else if i < 6 { rpc_response_merge_small.clone() } else { rpc_response_merge_large.clone() @@ -273,7 +275,7 @@ fn test_blocks_by_range_chunked_rpc() { _ = sender_future => {} _ = receiver_future => {} _ = sleep(Duration::from_secs(10)) => { - if messages_received < 15 { + if messages_received < 6 { panic!("Future timed out"); } } @@ -535,7 +537,7 @@ fn test_blocks_by_root_chunked_rpc() { let log_level = Level::Debug; let enable_logging = false; - let messages_to_send = 10; + let messages_to_send = 6; let log = common::build_log(log_level, enable_logging); let spec = E::default_spec(); @@ -554,10 +556,6 @@ fn test_blocks_by_root_chunked_rpc() { Hash256::from_low_u64_be(0), Hash256::from_low_u64_be(0), Hash256::from_low_u64_be(0), - Hash256::from_low_u64_be(0), - Hash256::from_low_u64_be(0), - Hash256::from_low_u64_be(0), - Hash256::from_low_u64_be(0), ]), }); @@ -570,6 +568,10 @@ fn test_blocks_by_root_chunked_rpc() { let signed_full_block = SignedBeaconBlock::from_block(full_block, Signature::empty()); let rpc_response_altair = Response::BlocksByRoot(Some(Box::new(signed_full_block))); + let full_block = merge_block_small(); + let signed_full_block = SignedBeaconBlock::from_block(full_block, Signature::empty()); + let rpc_response_merge_small = Response::BlocksByRoot(Some(Box::new(signed_full_block))); + // keep count of the number of messages received let mut messages_received = 0; // build the sender future @@ -581,20 +583,22 @@ fn test_blocks_by_root_chunked_rpc() { debug!(log, "Sending RPC"); sender.swarm.behaviour_mut().send_request( peer_id, - RequestId::Sync(10), + RequestId::Sync(6), rpc_request.clone(), ); } Libp2pEvent::Behaviour(BehaviourEvent::ResponseReceived { peer_id: _, - id: RequestId::Sync(10), + id: RequestId::Sync(6), response, }) => match response { Response::BlocksByRoot(Some(_)) => { - if messages_received < 5 { + if messages_received < 2 { assert_eq!(response, rpc_response_base.clone()); - } else { + } else if messages_received < 4 { assert_eq!(response, rpc_response_altair.clone()); + } else { + assert_eq!(response, rpc_response_merge_small.clone()); } messages_received += 1; debug!(log, "Chunk received"); @@ -626,12 +630,13 @@ fn test_blocks_by_root_chunked_rpc() { debug!(log, "Receiver got request"); for i in 0..messages_to_send { - // Send first half of responses as base blocks and - // second half as altair blocks. - let rpc_response = if i < 5 { + // Send equal base, altair and merge blocks + let rpc_response = if i < 2 { rpc_response_base.clone() - } else { + } else if i < 4 { rpc_response_altair.clone() + } else { + rpc_response_merge_small.clone() }; receiver.swarm.behaviour_mut().send_successful_response( peer_id, @@ -657,7 +662,7 @@ fn test_blocks_by_root_chunked_rpc() { _ = sender_future => {} _ = receiver_future => {} _ = sleep(Duration::from_secs(30)) => { - panic!("Future timed out"); + panic!("Future timed out"); } } }) diff --git a/consensus/types/src/execution_payload.rs b/consensus/types/src/execution_payload.rs index 7898d78d930..f9a2e4ef61c 100644 --- a/consensus/types/src/execution_payload.rs +++ b/consensus/types/src/execution_payload.rs @@ -67,6 +67,6 @@ impl ExecutionPayload { // length of List * size_of(uint8) + (T::max_extra_data_bytes() * ::ssz_fixed_len()) // length of List * offset size * max size of transaction - + (T::max_transactions_per_payload() *ssz::BYTES_PER_LENGTH_OFFSET * T::max_bytes_per_transaction()) + + (T::max_transactions_per_payload() *(ssz::BYTES_PER_LENGTH_OFFSET + T::max_bytes_per_transaction())) } } From 550db4bc388cae479f3b414b75d01fb14729624f Mon Sep 17 00:00:00 2001 From: pawan Date: Thu, 11 Nov 2021 13:10:11 -0600 Subject: [PATCH 5/9] Clear comments --- consensus/types/src/execution_payload.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/consensus/types/src/execution_payload.rs b/consensus/types/src/execution_payload.rs index f9a2e4ef61c..cd8a97dbd6a 100644 --- a/consensus/types/src/execution_payload.rs +++ b/consensus/types/src/execution_payload.rs @@ -64,9 +64,9 @@ impl ExecutionPayload { pub fn max_execution_payload_size() -> usize { // Fixed part Self::empty().as_ssz_bytes().len() - // length of List * size_of(uint8) + // Max size of variable length `extra_data` field + (T::max_extra_data_bytes() * ::ssz_fixed_len()) - // length of List * offset size * max size of transaction - + (T::max_transactions_per_payload() *(ssz::BYTES_PER_LENGTH_OFFSET + T::max_bytes_per_transaction())) + // Max size of variable length `transactions` field + + (T::max_transactions_per_payload() * (ssz::BYTES_PER_LENGTH_OFFSET + T::max_bytes_per_transaction())) } } From 91fc3aa7461e861028bacd1c5bb0196ef532d781 Mon Sep 17 00:00:00 2001 From: pawan Date: Thu, 11 Nov 2021 17:33:45 -0600 Subject: [PATCH 6/9] Add a payload_size_function --- consensus/types/src/execution_payload.rs | 27 ++++++++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/consensus/types/src/execution_payload.rs b/consensus/types/src/execution_payload.rs index cd8a97dbd6a..13d6699990f 100644 --- a/consensus/types/src/execution_payload.rs +++ b/consensus/types/src/execution_payload.rs @@ -59,6 +59,16 @@ impl ExecutionPayload { } } + pub fn payload_size(&self) -> usize { + let mut tx_size = ssz::BYTES_PER_LENGTH_OFFSET * self.transactions.len(); + for tx in self.transactions.iter() { + tx_size += tx.len(); + } + Self::empty().as_ssz_bytes().len() + + ::ssz_fixed_len() * self.extra_data.len() + + tx_size + } + #[allow(clippy::integer_arithmetic)] /// Returns the maximum size of an execution payload. pub fn max_execution_payload_size() -> usize { @@ -70,3 +80,20 @@ impl ExecutionPayload { + (T::max_transactions_per_payload() * (ssz::BYTES_PER_LENGTH_OFFSET + T::max_bytes_per_transaction())) } } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_payload_size() { + let mut payload = ExecutionPayload::empty(); + + assert_eq!(payload.as_ssz_bytes().len(), payload.payload_size()); + + payload.extra_data = VariableList::from(vec![42; 16]); + payload.transactions = VariableList::from(vec![VariableList::from(vec![42; 42])]); + + assert_eq!(payload.as_ssz_bytes().len(), payload.payload_size()); + } +} From 6680e4e53475ad5a41e18da71ae111f61dde180e Mon Sep 17 00:00:00 2001 From: pawan Date: Fri, 12 Nov 2021 12:02:36 -0600 Subject: [PATCH 7/9] Use safe arith for payload calculation --- consensus/types/src/execution_payload.rs | 28 ++++++++++++++++-------- 1 file changed, 19 insertions(+), 9 deletions(-) diff --git a/consensus/types/src/execution_payload.rs b/consensus/types/src/execution_payload.rs index 13d6699990f..4136663869d 100644 --- a/consensus/types/src/execution_payload.rs +++ b/consensus/types/src/execution_payload.rs @@ -1,4 +1,5 @@ use crate::{test_utils::TestRandom, *}; +use safe_arith::{ArithError, SafeArith}; use serde_derive::{Deserialize, Serialize}; use ssz::Encode; use ssz_derive::{Decode, Encode}; @@ -59,14 +60,17 @@ impl ExecutionPayload { } } - pub fn payload_size(&self) -> usize { - let mut tx_size = ssz::BYTES_PER_LENGTH_OFFSET * self.transactions.len(); + /// Returns the ssz size of `self`. + pub fn payload_size(&self) -> Result { + let mut tx_size = ssz::BYTES_PER_LENGTH_OFFSET.safe_mul(self.transactions.len())?; for tx in self.transactions.iter() { - tx_size += tx.len(); + tx_size.safe_add_assign(tx.len())?; } - Self::empty().as_ssz_bytes().len() - + ::ssz_fixed_len() * self.extra_data.len() - + tx_size + Self::empty() + .as_ssz_bytes() + .len() + .safe_add(::ssz_fixed_len().safe_mul(self.extra_data.len())?)? + .safe_add(tx_size) } #[allow(clippy::integer_arithmetic)] @@ -87,13 +91,19 @@ mod tests { #[test] fn test_payload_size() { - let mut payload = ExecutionPayload::empty(); + let mut payload = ExecutionPayload::::empty(); - assert_eq!(payload.as_ssz_bytes().len(), payload.payload_size()); + assert_eq!( + payload.as_ssz_bytes().len(), + payload.payload_size().unwrap() + ); payload.extra_data = VariableList::from(vec![42; 16]); payload.transactions = VariableList::from(vec![VariableList::from(vec![42; 42])]); - assert_eq!(payload.as_ssz_bytes().len(), payload.payload_size()); + assert_eq!( + payload.as_ssz_bytes().len(), + payload.payload_size().unwrap() + ); } } From 9f36f81e82b01bcd37deb304ea8f2347e41a506d Mon Sep 17 00:00:00 2001 From: pawan Date: Fri, 12 Nov 2021 12:10:23 -0600 Subject: [PATCH 8/9] Return an error if block too big in block production --- beacon_node/beacon_chain/src/beacon_chain.rs | 14 ++++++++++++++ beacon_node/beacon_chain/src/chain_config.rs | 3 +++ beacon_node/beacon_chain/src/errors.rs | 1 + beacon_node/beacon_chain/src/metrics.rs | 5 +++++ beacon_node/lighthouse_network/src/lib.rs | 2 ++ beacon_node/src/config.rs | 2 ++ 6 files changed, 27 insertions(+) diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 06eafc05655..e2bed665528 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -61,6 +61,7 @@ use safe_arith::SafeArith; use slasher::Slasher; use slog::{crit, debug, error, info, trace, warn, Logger}; use slot_clock::SlotClock; +use ssz::Encode; use state_processing::{ common::get_indexed_attestation, per_block_processing, @@ -3006,6 +3007,19 @@ impl BeaconChain { Signature::empty(), ); + let block_size = block.ssz_bytes_len(); + debug!( + self.log, + "Produced block on state"; + "block_size" => block_size, + ); + + metrics::observe(&metrics::BLOCK_SIZE, block_size as f64); + + if block_size > self.config.max_network_size { + return Err(BlockProductionError::BlockTooLarge(block_size)); + } + let process_timer = metrics::start_timer(&metrics::BLOCK_PRODUCTION_PROCESS_TIMES); per_block_processing( &mut state, diff --git a/beacon_node/beacon_chain/src/chain_config.rs b/beacon_node/beacon_chain/src/chain_config.rs index 9fe09c9822a..4aee06d468c 100644 --- a/beacon_node/beacon_chain/src/chain_config.rs +++ b/beacon_node/beacon_chain/src/chain_config.rs @@ -16,6 +16,8 @@ pub struct ChainConfig { pub reconstruct_historic_states: bool, /// Whether timeouts on `TimeoutRwLock`s are enabled or not. pub enable_lock_timeouts: bool, + /// The max size of a message that can be sent over the network. + pub max_network_size: usize, } impl Default for ChainConfig { @@ -25,6 +27,7 @@ impl Default for ChainConfig { weak_subjectivity_checkpoint: None, reconstruct_historic_states: false, enable_lock_timeouts: true, + max_network_size: 10 * 1_048_576, // 10M } } } diff --git a/beacon_node/beacon_chain/src/errors.rs b/beacon_node/beacon_chain/src/errors.rs index 3d5aad3aa9c..cec72a5818a 100644 --- a/beacon_node/beacon_chain/src/errors.rs +++ b/beacon_node/beacon_chain/src/errors.rs @@ -185,6 +185,7 @@ pub enum BlockProductionError { GetPayloadFailed(execution_layer::Error), FailedToReadFinalizedBlock(store::Error), MissingFinalizedBlock(Hash256), + BlockTooLarge(usize), } easy_from_to!(BlockProcessingError, BlockProductionError); diff --git a/beacon_node/beacon_chain/src/metrics.rs b/beacon_node/beacon_chain/src/metrics.rs index 2967d40a18b..44b267647c5 100644 --- a/beacon_node/beacon_chain/src/metrics.rs +++ b/beacon_node/beacon_chain/src/metrics.rs @@ -107,6 +107,11 @@ lazy_static! { "Number of attestations in a block" ); + pub static ref BLOCK_SIZE: Result = try_create_histogram( + "beacon_block_total_size", + "Size of a signed beacon block" + ); + /* * Unaggregated Attestation Verification */ diff --git a/beacon_node/lighthouse_network/src/lib.rs b/beacon_node/lighthouse_network/src/lib.rs index 733dc72ab5b..b37b69dcfa6 100644 --- a/beacon_node/lighthouse_network/src/lib.rs +++ b/beacon_node/lighthouse_network/src/lib.rs @@ -16,6 +16,8 @@ pub mod rpc; mod service; pub mod types; +pub use config::GOSSIP_MAX_SIZE; + use serde::{de, Deserialize, Deserializer, Serialize, Serializer}; use std::str::FromStr; diff --git a/beacon_node/src/config.rs b/beacon_node/src/config.rs index 1ae72736ba4..2ac16c35df8 100644 --- a/beacon_node/src/config.rs +++ b/beacon_node/src/config.rs @@ -452,6 +452,8 @@ pub fn get_config( }; } + client_config.chain.max_network_size = lighthouse_network::GOSSIP_MAX_SIZE; + if cli_args.is_present("slasher") { let slasher_dir = if let Some(slasher_dir) = cli_args.value_of("slasher-dir") { PathBuf::from(slasher_dir) From a9739e7dca79df98f628627289d8e3b270bd7b7d Mon Sep 17 00:00:00 2001 From: pawan Date: Tue, 16 Nov 2021 11:31:53 -0600 Subject: [PATCH 9/9] Separate test to check if block is over limit --- beacon_node/lighthouse_network/src/rpc/mod.rs | 2 +- .../lighthouse_network/src/rpc/protocol.rs | 2 +- .../lighthouse_network/tests/rpc_tests.rs | 144 +++++++++++++++--- 3 files changed, 123 insertions(+), 25 deletions(-) diff --git a/beacon_node/lighthouse_network/src/rpc/mod.rs b/beacon_node/lighthouse_network/src/rpc/mod.rs index 3d386148d02..c7bfd405d5e 100644 --- a/beacon_node/lighthouse_network/src/rpc/mod.rs +++ b/beacon_node/lighthouse_network/src/rpc/mod.rs @@ -30,7 +30,7 @@ pub use methods::{ RPCResponseErrorCode, RequestId, ResponseTermination, StatusMessage, MAX_REQUEST_BLOCKS, }; pub(crate) use outbound::OutboundRequest; -pub use protocol::{Protocol, RPCError}; +pub use protocol::{Protocol, RPCError, MAX_RPC_SIZE}; pub(crate) mod codec; mod handler; diff --git a/beacon_node/lighthouse_network/src/rpc/protocol.rs b/beacon_node/lighthouse_network/src/rpc/protocol.rs index baf262ecd9b..9d48887eaaa 100644 --- a/beacon_node/lighthouse_network/src/rpc/protocol.rs +++ b/beacon_node/lighthouse_network/src/rpc/protocol.rs @@ -93,7 +93,7 @@ lazy_static! { } /// The maximum bytes that can be sent across the RPC. -const MAX_RPC_SIZE: usize = 10 * 1_048_576; // 10M +pub const MAX_RPC_SIZE: usize = 10 * 1_048_576; // 10M /// The protocol prefix the RPC protocol id. const PROTOCOL_PREFIX: &str = "/eth2/beacon_chain/req"; /// Time allowed for the first byte of a request to arrive before we time out (Time To First Byte). diff --git a/beacon_node/lighthouse_network/tests/rpc_tests.rs b/beacon_node/lighthouse_network/tests/rpc_tests.rs index d9418cb10d6..77d014e6a3a 100644 --- a/beacon_node/lighthouse_network/tests/rpc_tests.rs +++ b/beacon_node/lighthouse_network/tests/rpc_tests.rs @@ -1,7 +1,10 @@ #![cfg(test)] use lighthouse_network::rpc::methods::*; -use lighthouse_network::{BehaviourEvent, Libp2pEvent, ReportSource, Request, Response}; +use lighthouse_network::{ + rpc::MAX_RPC_SIZE, BehaviourEvent, Libp2pEvent, ReportSource, Request, Response, +}; use slog::{debug, warn, Level}; +use ssz::Encode; use ssz_types::VariableList; use std::sync::Arc; use std::time::Duration; @@ -23,7 +26,10 @@ fn merge_block_small() -> BeaconBlock { let txs = VariableList::from(std::iter::repeat(tx).take(100).collect::>()); block.body.execution_payload.transactions = txs; - BeaconBlock::Merge(block) + + let block = BeaconBlock::Merge(block); + assert!(block.ssz_bytes_len() <= MAX_RPC_SIZE); + block } /// Merge block with length > MAX_RPC_SIZE. @@ -35,7 +41,10 @@ fn merge_block_large() -> BeaconBlock { let txs = VariableList::from(std::iter::repeat(tx).take(100000).collect::>()); block.body.execution_payload.transactions = txs; - BeaconBlock::Merge(block) + + let block = BeaconBlock::Merge(block); + assert!(block.ssz_bytes_len() > MAX_RPC_SIZE); + block } // Tests the STATUS RPC message @@ -143,7 +152,7 @@ fn test_blocks_by_range_chunked_rpc() { let log_level = Level::Debug; let enable_logging = false; - let messages_to_send = 8; + let messages_to_send = 6; let log = common::build_log(log_level, enable_logging); @@ -175,12 +184,9 @@ fn test_blocks_by_range_chunked_rpc() { let signed_full_block = SignedBeaconBlock::from_block(full_block, Signature::empty()); let rpc_response_merge_small = Response::BlocksByRange(Some(Box::new(signed_full_block))); - let full_block = merge_block_large(); - let signed_full_block = SignedBeaconBlock::from_block(full_block, Signature::empty()); - let rpc_response_merge_large = Response::BlocksByRange(Some(Box::new(signed_full_block))); - // keep count of the number of messages received let mut messages_received = 0; + let request_id = RequestId::Sync(messages_to_send as usize); // build the sender future let sender_future = async { loop { @@ -190,13 +196,13 @@ fn test_blocks_by_range_chunked_rpc() { debug!(log, "Sending RPC"); sender.swarm.behaviour_mut().send_request( peer_id, - RequestId::Sync(8), + request_id, rpc_request.clone(), ); } Libp2pEvent::Behaviour(BehaviourEvent::ResponseReceived { peer_id: _, - id: RequestId::Sync(8), + id: _, response, }) => { warn!(log, "Sender received a response"); @@ -206,16 +212,14 @@ fn test_blocks_by_range_chunked_rpc() { assert_eq!(response, rpc_response_base.clone()); } else if messages_received < 4 { assert_eq!(response, rpc_response_altair.clone()); - } else if messages_received < 6 { - assert_eq!(response, rpc_response_merge_small.clone()); } else { - assert_eq!(response, rpc_response_merge_large.clone()); + assert_eq!(response, rpc_response_merge_small.clone()); } messages_received += 1; warn!(log, "Chunk received"); } Response::BlocksByRange(None) => { - // should be exactly 10 messages before terminating + // should be exactly `messages_to_send` messages before terminating assert_eq!(messages_received, messages_to_send); // end the test return; @@ -241,16 +245,14 @@ fn test_blocks_by_range_chunked_rpc() { // send the response warn!(log, "Receiver got request"); for i in 0..messages_to_send { - // Send first half of responses as base blocks and - // second half as altair blocks. + // Send first third of responses as base blocks, + // second as altair and third as merge. let rpc_response = if i < 2 { rpc_response_base.clone() } else if i < 4 { rpc_response_altair.clone() - } else if i < 6 { - rpc_response_merge_small.clone() } else { - rpc_response_merge_large.clone() + rpc_response_merge_small.clone() }; receiver.swarm.behaviour_mut().send_successful_response( peer_id, @@ -274,11 +276,106 @@ fn test_blocks_by_range_chunked_rpc() { tokio::select! { _ = sender_future => {} _ = receiver_future => {} - _ = sleep(Duration::from_secs(10)) => { - if messages_received < 6 { + _ = sleep(Duration::from_secs(30)) => { panic!("Future timed out"); + } + } + }) +} + +// Tests rejection of blocks over `MAX_RPC_SIZE`. +#[test] +#[allow(clippy::single_match)] +fn test_blocks_by_range_over_limit() { + // set up the logging. The level and enabled logging or not + let log_level = Level::Debug; + let enable_logging = false; + + let messages_to_send = 5; + + let log = common::build_log(log_level, enable_logging); + + let rt = Arc::new(Runtime::new().unwrap()); + + rt.block_on(async { + // get sender/receiver + let (mut sender, mut receiver) = common::build_node_pair(Arc::downgrade(&rt), &log).await; + + // BlocksByRange Request + let rpc_request = Request::BlocksByRange(BlocksByRangeRequest { + start_slot: 0, + count: messages_to_send, + step: 0, + }); + + // BlocksByRange Response + let full_block = merge_block_large(); + let signed_full_block = SignedBeaconBlock::from_block(full_block, Signature::empty()); + let rpc_response_merge_large = Response::BlocksByRange(Some(Box::new(signed_full_block))); + + let request_id = RequestId::Sync(messages_to_send as usize); + // build the sender future + let sender_future = async { + loop { + match sender.next_event().await { + Libp2pEvent::Behaviour(BehaviourEvent::PeerConnectedOutgoing(peer_id)) => { + // Send a STATUS message + debug!(log, "Sending RPC"); + sender.swarm.behaviour_mut().send_request( + peer_id, + request_id, + rpc_request.clone(), + ); + } + // The request will fail because the sender will refuse to send anything > MAX_RPC_SIZE + Libp2pEvent::Behaviour(BehaviourEvent::RPCFailed { id, .. }) => { + assert_eq!(id, request_id); + return; + } + _ => {} // Ignore other behaviour events + } + } + }; + + // build the receiver future + let receiver_future = async { + loop { + match receiver.next_event().await { + Libp2pEvent::Behaviour(BehaviourEvent::RequestReceived { + peer_id, + id, + request, + }) => { + if request == rpc_request { + // send the response + warn!(log, "Receiver got request"); + for _ in 0..messages_to_send { + let rpc_response = rpc_response_merge_large.clone(); + receiver.swarm.behaviour_mut().send_successful_response( + peer_id, + id, + rpc_response.clone(), + ); + } + // send the stream termination + receiver.swarm.behaviour_mut().send_successful_response( + peer_id, + id, + Response::BlocksByRange(None), + ); + } + } + _ => {} // Ignore other events } } + }; + + tokio::select! { + _ = sender_future => {} + _ = receiver_future => {} + _ = sleep(Duration::from_secs(30)) => { + panic!("Future timed out"); + } } }) } @@ -316,6 +413,7 @@ fn test_blocks_by_range_chunked_rpc_terminates_correctly() { // keep count of the number of messages received let mut messages_received: u64 = 0; + let request_id = RequestId::Sync(messages_to_send as usize); // build the sender future let sender_future = async { loop { @@ -325,13 +423,13 @@ fn test_blocks_by_range_chunked_rpc_terminates_correctly() { debug!(log, "Sending RPC"); sender.swarm.behaviour_mut().send_request( peer_id, - RequestId::Sync(10), + request_id, rpc_request.clone(), ); } Libp2pEvent::Behaviour(BehaviourEvent::ResponseReceived { peer_id: _, - id: RequestId::Sync(10), + id: _, response, }) => // Should receive the RPC response