Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Runtime] Bound XCMP queue #2302

Merged
merged 26 commits into from
Jan 29, 2024
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion cumulus/pallets/parachain-system/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
use codec::{Decode, Encode, MaxEncodedLen};
use cumulus_primitives_core::{
relay_chain, AbridgedHostConfiguration, ChannelInfo, ChannelStatus, CollationInfo,
GetChannelInfo, InboundDownwardMessage, InboundHrmpMessage, MessageSendError,
GetChannelInfo, InboundDownwardMessage, InboundHrmpMessage, ListChannelInfos, MessageSendError,
OutboundHrmpMessage, ParaId, PersistedValidationData, UpwardMessage, UpwardMessageSender,
XcmpMessageHandler, XcmpMessageSource,
};
Expand Down Expand Up @@ -1054,6 +1054,13 @@ impl<T: Config> FeeTracker for Pallet<T> {
}
}

impl<T: Config> ListChannelInfos for Pallet<T> {
fn outgoing_channels() -> Vec<ParaId> {
let Some(state) = Self::relevant_messaging_state() else { return Vec::new() };
state.egress_channels.into_iter().map(|(id, _)| id).collect()
}
}

impl<T: Config> GetChannelInfo for Pallet<T> {
fn get_channel_status(id: ParaId) -> ChannelStatus {
// Note, that we are using `relevant_messaging_state` which may be from the previous
Expand Down
104 changes: 79 additions & 25 deletions cumulus/pallets/xcmp-queue/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ pub mod weights;
pub use weights::WeightInfo;

use bounded_collections::BoundedBTreeSet;
use codec::{Decode, DecodeLimit, Encode};
use codec::{Decode, DecodeLimit, Encode, MaxEncodedLen};
use cumulus_primitives_core::{
relay_chain::BlockNumber as RelayBlockNumber, ChannelStatus, GetChannelInfo, MessageSendError,
ParaId, XcmpMessageFormat, XcmpMessageHandler, XcmpMessageSource,
Expand Down Expand Up @@ -105,7 +105,6 @@ pub mod pallet {

#[pallet::pallet]
#[pallet::storage_version(migration::STORAGE_VERSION)]
#[pallet::without_storage_info]
pub struct Pallet<T>(_);

#[pallet::config]
Expand All @@ -132,6 +131,24 @@ pub mod pallet {
#[pallet::constant]
type MaxInboundSuspended: Get<u32>;

/// Maximal number of outbound XCMP channels that can have messages queued at the same time.
///
/// If this is reached, then no further messages can be sent to channels that do not yet
/// have a message queued. This should be set to the expected maximum of outbound channels
/// which is determined by [`Self::ChannelInfo`]. It is important to set this correctly,
/// since otherwise the congestion control protocol will not work as intended and messages
/// may be dropped.
ggwpez marked this conversation as resolved.
Show resolved Hide resolved
#[pallet::constant]
type MaxActiveOutboundChannels: Get<u32>;

/// The maximal page size for HRMP message pages.
///
/// A lower limit can be set dynamically, but this is the hard-limit for the PoV worst case
/// benchmarking. The limit for the size of a message is slightly below this, since some
/// overhead is incurred for encoding the format.
#[pallet::constant]
type MaxPageSize: Get<u32>;

/// The origin that is allowed to resume or suspend the XCMP queue.
type ControllerOrigin: EnsureOrigin<Self::RuntimeOrigin>;

Expand Down Expand Up @@ -276,6 +293,10 @@ pub mod pallet {
AlreadySuspended,
/// The execution is already resumed.
AlreadyResumed,
/// There are too many active outbound channels.
TooManyOutboundChannels,
/// The message is too big.
TooBig,
}

/// The suspended inbound XCMP channels. All others are not suspended.
Expand All @@ -297,19 +318,28 @@ pub mod pallet {
/// case of the need to send a high-priority signal message this block.
/// The bool is true if there is a signal message waiting to be sent.
#[pallet::storage]
pub(super) type OutboundXcmpStatus<T: Config> =
StorageValue<_, Vec<OutboundChannelDetails>, ValueQuery>;
pub(super) type OutboundXcmpStatus<T: Config> = StorageValue<
_,
BoundedVec<OutboundChannelDetails, T::MaxActiveOutboundChannels>,
ValueQuery,
>;

// The new way of doing it:
/// The messages outbound in a given XCMP channel.
#[pallet::storage]
pub(super) type OutboundXcmpMessages<T: Config> =
StorageDoubleMap<_, Blake2_128Concat, ParaId, Twox64Concat, u16, Vec<u8>, ValueQuery>;
pub(super) type OutboundXcmpMessages<T: Config> = StorageDoubleMap<
_,
Blake2_128Concat,
ParaId,
Twox64Concat,
u16,
BoundedVec<u8, T::MaxPageSize>,
ValueQuery,
>;

/// Any signal messages waiting to be sent.
#[pallet::storage]
pub(super) type SignalMessages<T: Config> =
StorageMap<_, Blake2_128Concat, ParaId, Vec<u8>, ValueQuery>;
StorageMap<_, Blake2_128Concat, ParaId, BoundedVec<u8, T::MaxPageSize>, ValueQuery>;

/// The configuration which controls the dynamics of the outbound queue.
#[pallet::storage]
Expand All @@ -331,15 +361,14 @@ pub mod pallet {
StorageMap<_, Twox64Concat, ParaId, FixedU128, ValueQuery, InitialFactor>;
}

#[derive(Copy, Clone, Eq, PartialEq, Encode, Decode, RuntimeDebug, TypeInfo)]
#[derive(Copy, Clone, Eq, PartialEq, Encode, Decode, RuntimeDebug, TypeInfo, MaxEncodedLen)]
pub enum OutboundState {
Ok,
Suspended,
}

/// Struct containing detailed information about the outbound channel.
#[derive(Clone, Eq, PartialEq, Encode, Decode, TypeInfo)]
#[cfg_attr(feature = "std", derive(Debug))]
#[derive(Clone, Eq, PartialEq, Encode, Decode, TypeInfo, RuntimeDebug, MaxEncodedLen)]
pub struct OutboundChannelDetails {
/// The `ParaId` of the parachain that this channel is connected with.
recipient: ParaId,
Expand Down Expand Up @@ -375,7 +404,7 @@ impl OutboundChannelDetails {
}
}

#[derive(Copy, Clone, Eq, PartialEq, Encode, Decode, RuntimeDebug, TypeInfo)]
#[derive(Copy, Clone, Eq, PartialEq, Encode, Decode, RuntimeDebug, TypeInfo, MaxEncodedLen)]
pub struct QueueConfigData {
/// The number of pages which must be in the queue for the other side to be told to suspend
/// their sending.
Expand Down Expand Up @@ -488,7 +517,9 @@ impl<T: Config> Pallet<T> {
{
details
} else {
all_channels.push(OutboundChannelDetails::new(recipient));
all_channels
.try_push(OutboundChannelDetails::new(recipient))
.map_err(|_| MessageSendError::TooManyChannels)?;
all_channels
.last_mut()
.expect("can't be empty; a new element was just pushed; qed")
Expand All @@ -513,7 +544,7 @@ impl<T: Config> Pallet<T> {
if page.len() + encoded_fragment.len() > max_message_size {
return None
}
page.extend_from_slice(&encoded_fragment[..]);
page.try_extend(encoded_fragment.iter().cloned()).ok()?;
Some(page.len())
},
)
Expand All @@ -531,7 +562,9 @@ impl<T: Config> Pallet<T> {
new_page.extend_from_slice(&encoded_fragment[..]);
let last_page_size = new_page.len();
let number_of_pages = (channel_details.last_index - channel_details.first_index) as u32;
<OutboundXcmpMessages<T>>::insert(recipient, page_index, new_page);
let bounded_page =
BoundedVec::try_from(new_page).map_err(|_| MessageSendError::TooBig)?;
<OutboundXcmpMessages<T>>::insert(recipient, page_index, bounded_page);
<OutboundXcmpStatus<T>>::put(all_channels);
(number_of_pages, last_page_size)
};
Expand All @@ -553,17 +586,21 @@ impl<T: Config> Pallet<T> {

/// Sends a signal to the `dest` chain over XCMP. This is guaranteed to be dispatched on this
/// block.
fn send_signal(dest: ParaId, signal: ChannelSignal) {
fn send_signal(dest: ParaId, signal: ChannelSignal) -> Result<(), Error<T>> {
let mut s = <OutboundXcmpStatus<T>>::get();
if let Some(details) = s.iter_mut().find(|item| item.recipient == dest) {
details.signals_exist = true;
} else {
s.push(OutboundChannelDetails::new(dest).with_signals());
s.try_push(OutboundChannelDetails::new(dest).with_signals())
.map_err(|_| Error::<T>::TooManyOutboundChannels)?;
}
<SignalMessages<T>>::mutate(dest, |page| {
*page = (XcmpMessageFormat::Signals, signal).encode();
});

let page = BoundedVec::try_from((XcmpMessageFormat::Signals, signal).encode())
.map_err(|_| Error::<T>::TooBig)?;
ggwpez marked this conversation as resolved.
Show resolved Hide resolved

<SignalMessages<T>>::insert(dest, page);
<OutboundXcmpStatus<T>>::put(s);
Ok(())
}

fn suspend_channel(target: ParaId) {
Expand All @@ -573,7 +610,13 @@ impl<T: Config> Pallet<T> {
defensive_assert!(ok, "WARNING: Attempt to suspend channel that was not Ok.");
details.state = OutboundState::Suspended;
} else {
s.push(OutboundChannelDetails::new(target).with_suspended_state());
if s.try_push(OutboundChannelDetails::new(target).with_suspended_state()).is_err() {
// Nothing that we can do here. The outbound channel does not exist either, so
// there should be no message going out as well. The next time that the channel
// can be created it will again get the suspension from the remote side. It can
// therefore result in a few lost messages, but should eventually self-repair.
defensive!("Cannot pause channel; too many outbound channels");
}
0xmovses marked this conversation as resolved.
Show resolved Hide resolved
}
});
}
Expand Down Expand Up @@ -674,13 +717,17 @@ impl<T: Config> OnQueueChanged<ParaId> for Pallet<T> {
let suspended = suspended_channels.contains(&para);

if suspended && fp.pages <= resume_threshold {
Self::send_signal(para, ChannelSignal::Resume);
// If the resuming fails then it is not critical. We will retry in the future.
let _ = Self::send_signal(para, ChannelSignal::Resume);

suspended_channels.remove(&para);
<InboundXcmpSuspended<T>>::put(suspended_channels);
} else if !suspended && fp.pages >= suspend_threshold {
log::warn!("XCMP queue for sibling {:?} is full; suspending channel.", para);
Self::send_signal(para, ChannelSignal::Suspend);
if let Err(_) = Self::send_signal(para, ChannelSignal::Suspend) {
// It will retry if `drop_threshold` is not reached, but it could be too late.
defensive!("Could not send suspension signal; future messages may be dropped.");
}

if let Err(err) = suspended_channels.try_insert(para) {
log::error!("Too many channels suspended; cannot suspend sibling {:?}: {:?}; further messages may be dropped.", para, err);
Expand Down Expand Up @@ -852,7 +899,7 @@ impl<T: Config> XcmpMessageSource for Pallet<T> {
// since it's so unlikely then for now we just drop it.
defensive!("WARNING: oversize message in queue - dropping");
} else {
result.push((para_id, page));
result.push((para_id, page.into_inner()));
}

let max_total_size = match T::ChannelInfo::get_channel_info(para_id) {
Expand Down Expand Up @@ -900,7 +947,14 @@ impl<T: Config> XcmpMessageSource for Pallet<T> {
let pruned = old_statuses_len - statuses.len();
// removing an item from status implies a message being sent, so the result messages must
// be no less than the pruned channels.
statuses.rotate_left(result.len().saturating_sub(pruned));

// TODO <https://github.com/paritytech/parity-common/pull/800>
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can be done now

Copy link
Member Author

@ggwpez ggwpez Jan 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

{
let mut statuses_inner = statuses.into_inner();
statuses_inner.rotate_left(result.len().saturating_sub(pruned));
statuses = BoundedVec::try_from(statuses_inner)
.expect("Rotating does not change the length; it still fits; qed");
}

<OutboundXcmpStatus<T>>::put(statuses);

Expand Down
4 changes: 3 additions & 1 deletion cumulus/pallets/xcmp-queue/src/migration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

//! A module that is responsible for migration of storage.

pub mod v4;

use crate::{Config, OverweightIndex, Pallet, ParaId, QueueConfig, DEFAULT_POV_SIZE};
use cumulus_primitives_core::XcmpMessageFormat;
use frame_support::{
Expand All @@ -25,7 +27,7 @@ use frame_support::{
};

/// The current storage version.
pub const STORAGE_VERSION: StorageVersion = StorageVersion::new(3);
pub const STORAGE_VERSION: StorageVersion = StorageVersion::new(4);

pub const LOG: &str = "runtime::xcmp-queue-migration";

Expand Down
112 changes: 112 additions & 0 deletions cumulus/pallets/xcmp-queue/src/migration/v4.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
// Copyright (C) Parity Technologies (UK) Ltd.
// This file is part of Polkadot.

// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.

//! Migrates the storage to version 4.

use crate::*;
use cumulus_primitives_core::ListChannelInfos;
use frame_support::{pallet_prelude::*, traits::OnRuntimeUpgrade};

/// Configs needed to run the V4 migration.
pub trait V4Config: Config {
/// List all outbound channels with their target `ParaId` and maximum message size.
type ChannelList: ListChannelInfos;
}

/// Ensures that the storage migrates cleanly to V4.
///
/// The migration itself is a no-op, but it checks that none of the `BoundedVec`s would truncate on
/// the next decode after the upgrade was applied.
pub type MigrateV3ToV4<T> = frame_support::migrations::VersionedMigration<
3,
4,
UncheckedMigrateV3ToV4<T>,
Pallet<T>,
<T as frame_system::Config>::DbWeight,
>;

// V3 storage aliases
mod v3 {
use super::*;

#[frame_support::storage_alias]
pub(super) type OutboundXcmpStatus<T: Config> =
StorageValue<Pallet<T>, Vec<OutboundChannelDetails>, ValueQuery>;

#[frame_support::storage_alias]
pub(super) type OutboundXcmpMessages<T: Config> = StorageDoubleMap<
Pallet<T>,
Blake2_128Concat,
ParaId,
Twox64Concat,
u16,
Vec<u8>,
ValueQuery,
>;

#[frame_support::storage_alias]
pub(super) type SignalMessages<T: Config> =
StorageMap<Pallet<T>, Blake2_128Concat, ParaId, Vec<u8>, ValueQuery>;
}

/// Please use [`MigrateV3ToV4`] instead.
pub struct UncheckedMigrateV3ToV4<T: V4Config>(core::marker::PhantomData<T>);
ggwpez marked this conversation as resolved.
Show resolved Hide resolved

impl<T: V4Config> OnRuntimeUpgrade for UncheckedMigrateV3ToV4<T> {
fn on_runtime_upgrade() -> frame_support::weights::Weight {
Default::default()
}

#[cfg(feature = "try-runtime")]
fn post_upgrade(_: Vec<u8>) -> Result<(), sp_runtime::DispatchError> {
// We dont need any front-run protection for this since channels are opened by governance.
ensure!(
v3::OutboundXcmpStatus::<T>::get().len() as u32 <= T::MaxActiveOutboundChannels::get(),
"Too many outbound channels."
);

// Check if any channels have a too large message max sizes.
let max_msg_len = T::MaxPageSize::get() - XcmpMessageFormat::max_encoded_len() as u32;
for channel in T::ChannelList::outgoing_channels() {
let info = T::ChannelInfo::get_channel_info(channel)
.expect("All listed channels must provide info");

ensure!(
info.max_message_size <= max_msg_len,
"Max message size for channel is too large. This means that the V4 migration can \
be front-run and an attacker could place a large message just right before the \
migration to make other messages un-decodable. Please either increase \
`MaxPageSize` or decrease the `max_message_size` for this channel.",
);
}

// Now check that all pages still fit into the new `BoundedVec`s:
for page in v3::OutboundXcmpMessages::<T>::iter_values() {
ensure!(
page.len() < T::MaxPageSize::get() as usize,
"Too long message in storage. Manual intervention required."
);
}
for page in v3::SignalMessages::<T>::iter_values() {
ensure!(
page.len() < T::MaxPageSize::get() as usize,
"Too long signal in storage. Manual intervention required."
ggwpez marked this conversation as resolved.
Show resolved Hide resolved
);
}

Ok(())
}
}
4 changes: 3 additions & 1 deletion cumulus/pallets/xcmp-queue/src/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,9 @@ impl Config for Test {
type ChannelInfo = MockedChannelInfo;
type VersionWrapper = ();
type XcmpQueue = EnqueueToLocalStorage<Pallet<Test>>;
type MaxInboundSuspended = sp_core::ConstU32<1_000>;
type MaxInboundSuspended = ConstU32<1_000>;
type MaxActiveOutboundChannels = ConstU32<128>;
type MaxPageSize = ConstU32<{ 1 << 16 }>;
type ControllerOrigin = EnsureRoot<AccountId>;
type ControllerOriginConverter = SystemParachainAsSuperuser<RuntimeOrigin>;
type WeightInfo = ();
Expand Down
Loading