diff --git a/Cargo.lock b/Cargo.lock index fae9eacdab9c..410b45d00183 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -16729,6 +16729,7 @@ dependencies = [ "sc-transaction-pool", "sc-transaction-pool-api", "sc-utils", + "schnellru", "serde", "serde_json", "sp-api", diff --git a/substrate/client/api/src/client.rs b/substrate/client/api/src/client.rs index 46232c74539c..2de09840e4df 100644 --- a/substrate/client/api/src/client.rs +++ b/substrate/client/api/src/client.rs @@ -278,7 +278,7 @@ impl fmt::Display for UsageInfo { pub struct UnpinHandleInner { /// Hash of the block pinned by this handle hash: Block::Hash, - unpin_worker_sender: TracingUnboundedSender, + unpin_worker_sender: TracingUnboundedSender>, } impl Debug for UnpinHandleInner { @@ -291,7 +291,7 @@ impl UnpinHandleInner { /// Create a new [`UnpinHandleInner`] pub fn new( hash: Block::Hash, - unpin_worker_sender: TracingUnboundedSender, + unpin_worker_sender: TracingUnboundedSender>, ) -> Self { Self { hash, unpin_worker_sender } } @@ -299,12 +299,25 @@ impl UnpinHandleInner { impl Drop for UnpinHandleInner { fn drop(&mut self) { - if let Err(err) = self.unpin_worker_sender.unbounded_send(self.hash) { + if let Err(err) = + self.unpin_worker_sender.unbounded_send(UnpinWorkerMessage::Unpin(self.hash)) + { log::debug!(target: "db", "Unable to unpin block with hash: {}, error: {:?}", self.hash, err); }; } } +/// Message that signals notification-based pinning actions to the pinning-worker. +/// +/// When the notification is dropped, an `Unpin` message should be sent to the worker. +#[derive(Debug)] +pub enum UnpinWorkerMessage { + /// Should be sent when a import or finality notification is created. + AnnouncePin(Block::Hash), + /// Should be sent when a import or finality notification is dropped. + Unpin(Block::Hash), +} + /// Keeps a specific block pinned while the handle is alive. /// Once the last handle instance for a given block is dropped, the /// block is unpinned in the [`Backend`](crate::backend::Backend::unpin_block). @@ -315,7 +328,7 @@ impl UnpinHandle { /// Create a new [`UnpinHandle`] pub fn new( hash: Block::Hash, - unpin_worker_sender: TracingUnboundedSender, + unpin_worker_sender: TracingUnboundedSender>, ) -> UnpinHandle { UnpinHandle(Arc::new(UnpinHandleInner::new(hash, unpin_worker_sender))) } @@ -353,7 +366,7 @@ impl BlockImportNotification { header: Block::Header, is_new_best: bool, tree_route: Option>>, - unpin_worker_sender: TracingUnboundedSender, + unpin_worker_sender: TracingUnboundedSender>, ) -> Self { Self { hash, @@ -412,7 +425,7 @@ impl FinalityNotification { /// Create finality notification from finality summary. pub fn from_summary( mut summary: FinalizeSummary, - unpin_worker_sender: TracingUnboundedSender, + unpin_worker_sender: TracingUnboundedSender>, ) -> FinalityNotification { let hash = summary.finalized.pop().unwrap_or_default(); FinalityNotification { @@ -436,7 +449,7 @@ impl BlockImportNotification { /// Create finality notification from finality summary. pub fn from_summary( summary: ImportSummary, - unpin_worker_sender: TracingUnboundedSender, + unpin_worker_sender: TracingUnboundedSender>, ) -> BlockImportNotification { let hash = summary.hash; BlockImportNotification { diff --git a/substrate/client/db/src/lib.rs b/substrate/client/db/src/lib.rs index 870b4150b9df..0faa90dfc4f9 100644 --- a/substrate/client/db/src/lib.rs +++ b/substrate/client/db/src/lib.rs @@ -2531,7 +2531,7 @@ impl sc_client_api::backend::Backend for Backend { self.storage.state_db.pin(&hash, number.saturated_into::(), hint).map_err( |_| { sp_blockchain::Error::UnknownBlock(format!( - "State already discarded for `{:?}`", + "Unable to pin: state already discarded for `{:?}`", hash )) }, diff --git a/substrate/client/db/src/pinned_blocks_cache.rs b/substrate/client/db/src/pinned_blocks_cache.rs index 46c9287fb19a..ac4aad07765c 100644 --- a/substrate/client/db/src/pinned_blocks_cache.rs +++ b/substrate/client/db/src/pinned_blocks_cache.rs @@ -20,7 +20,7 @@ use schnellru::{Limiter, LruMap}; use sp_runtime::{traits::Block as BlockT, Justifications}; const LOG_TARGET: &str = "db::pin"; -const PINNING_CACHE_SIZE: usize = 1024; +const PINNING_CACHE_SIZE: usize = 2048; /// Entry for pinned blocks cache. struct PinnedBlockCacheEntry { diff --git a/substrate/client/service/Cargo.toml b/substrate/client/service/Cargo.toml index 73edceb2ef36..bbf67d1fbd0a 100644 --- a/substrate/client/service/Cargo.toml +++ b/substrate/client/service/Cargo.toml @@ -84,6 +84,7 @@ tokio = { version = "1.22.0", features = ["parking_lot", "rt-multi-thread", "tim tempfile = "3.1.0" directories = "5.0.1" static_init = "1.0.3" +schnellru = "0.2.1" [dev-dependencies] substrate-test-runtime-client = { path = "../../test-utils/runtime/client" } diff --git a/substrate/client/service/src/client/client.rs b/substrate/client/service/src/client/client.rs index 108c258595a7..35e8b53a09cf 100644 --- a/substrate/client/service/src/client/client.rs +++ b/substrate/client/service/src/client/client.rs @@ -19,8 +19,8 @@ //! Substrate Client use super::block_rules::{BlockRules, LookupResult as BlockLookupResult}; -use futures::{FutureExt, StreamExt}; -use log::{error, info, trace, warn}; +use crate::client::notification_pinning::NotificationPinningWorker; +use log::{debug, info, trace, warn}; use parking_lot::{Mutex, RwLock}; use prometheus_endpoint::Registry; use rand::Rng; @@ -38,7 +38,7 @@ use sc_client_api::{ execution_extensions::ExecutionExtensions, notifications::{StorageEventStream, StorageNotifications}, CallExecutor, ExecutorProvider, KeysIter, OnFinalityAction, OnImportAction, PairsIter, - ProofProvider, UsageProvider, + ProofProvider, UnpinWorkerMessage, UsageProvider, }; use sc_consensus::{ BlockCheckParams, BlockImportParams, ForkChoiceStrategy, ImportResult, StateAction, @@ -114,7 +114,7 @@ where block_rules: BlockRules, config: ClientConfig, telemetry: Option, - unpin_worker_sender: TracingUnboundedSender, + unpin_worker_sender: TracingUnboundedSender>, _phantom: PhantomData, } @@ -326,19 +326,35 @@ where // dropped, the block will be unpinned automatically. if let Some(ref notification) = finality_notification { if let Err(err) = self.backend.pin_block(notification.hash) { - error!( + debug!( "Unable to pin block for finality notification. hash: {}, Error: {}", notification.hash, err ); - }; + } else { + let _ = self + .unpin_worker_sender + .unbounded_send(UnpinWorkerMessage::AnnouncePin(notification.hash)) + .map_err(|e| { + log::error!( + "Unable to send AnnouncePin worker message for finality: {e}" + ) + }); + } } if let Some(ref notification) = import_notification { if let Err(err) = self.backend.pin_block(notification.hash) { - error!( + debug!( "Unable to pin block for import notification. hash: {}, Error: {}", notification.hash, err ); + } else { + let _ = self + .unpin_worker_sender + .unbounded_send(UnpinWorkerMessage::AnnouncePin(notification.hash)) + .map_err(|e| { + log::error!("Unable to send AnnouncePin worker message for import: {e}") + }); }; } @@ -416,25 +432,12 @@ where backend.commit_operation(op)?; } - let (unpin_worker_sender, mut rx) = - tracing_unbounded::("unpin-worker-channel", 10_000); - let task_backend = Arc::downgrade(&backend); - spawn_handle.spawn( - "unpin-worker", - None, - async move { - while let Some(message) = rx.next().await { - if let Some(backend) = task_backend.upgrade() { - backend.unpin_block(message); - } else { - log::debug!("Terminating unpin-worker, backend reference was dropped."); - return - } - } - log::debug!("Terminating unpin-worker, stream terminated.") - } - .boxed(), + let (unpin_worker_sender, rx) = tracing_unbounded::>( + "notification-pinning-worker-channel", + 10_000, ); + let unpin_worker = NotificationPinningWorker::new(rx, backend.clone()); + spawn_handle.spawn("notification-pinning-worker", None, Box::pin(unpin_worker.run())); Ok(Client { backend, diff --git a/substrate/client/service/src/client/mod.rs b/substrate/client/service/src/client/mod.rs index a13fd4317e15..0703cc2b47d1 100644 --- a/substrate/client/service/src/client/mod.rs +++ b/substrate/client/service/src/client/mod.rs @@ -47,6 +47,7 @@ mod block_rules; mod call_executor; mod client; +mod notification_pinning; mod wasm_override; mod wasm_substitutes; diff --git a/substrate/client/service/src/client/notification_pinning.rs b/substrate/client/service/src/client/notification_pinning.rs new file mode 100644 index 000000000000..80de91c02f1a --- /dev/null +++ b/substrate/client/service/src/client/notification_pinning.rs @@ -0,0 +1,353 @@ +// This file is part of Substrate. + +// Copyright (C) Parity Technologies (UK) Ltd. +// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +//! Notification pinning related logic. +//! +//! This file contains a worker that should be started when a new client instance is created. +//! The goal is to avoid pruning of blocks that have active notifications in the node. Every +//! recipient of notifications should receive the chance to act upon them. In addition, notification +//! listeners can hold onto a [`sc_client_api::UnpinHandle`] to keep a block pinned. Once the handle +//! is dropped, a message is sent and the worker unpins the respective block. +use std::{ + marker::PhantomData, + sync::{Arc, Weak}, +}; + +use futures::StreamExt; +use sc_client_api::{Backend, UnpinWorkerMessage}; + +use sc_utils::mpsc::TracingUnboundedReceiver; +use schnellru::Limiter; +use sp_runtime::traits::Block as BlockT; + +const LOG_TARGET: &str = "db::notification_pinning"; +const NOTIFICATION_PINNING_LIMIT: usize = 1024; + +/// A limiter which automatically unpins blocks that leave the data structure. +#[derive(Clone, Debug)] +struct UnpinningByLengthLimiter> { + max_length: usize, + backend: Weak, + _phantom: PhantomData, +} + +impl> UnpinningByLengthLimiter { + /// Creates a new length limiter with a given `max_length`. + pub fn new(max_length: usize, backend: Weak) -> UnpinningByLengthLimiter { + UnpinningByLengthLimiter { max_length, backend, _phantom: PhantomData::::default() } + } +} + +impl> Limiter + for UnpinningByLengthLimiter +{ + type KeyToInsert<'a> = Block::Hash; + type LinkType = usize; + + fn is_over_the_limit(&self, length: usize) -> bool { + length > self.max_length + } + + fn on_insert( + &mut self, + _length: usize, + key: Self::KeyToInsert<'_>, + value: u32, + ) -> Option<(Block::Hash, u32)> { + log::debug!(target: LOG_TARGET, "Pinning block based on notification. hash = {key}"); + if self.max_length > 0 { + Some((key, value)) + } else { + None + } + } + + fn on_replace( + &mut self, + _length: usize, + _old_key: &mut Block::Hash, + _new_key: Block::Hash, + _old_value: &mut u32, + _new_value: &mut u32, + ) -> bool { + true + } + + fn on_removed(&mut self, key: &mut Block::Hash, references: &mut u32) { + // If reference count was larger than 0 on removal, + // the item was removed due to capacity limitations. + // Since the cache should be large enough for pinned items, + // we want to know about these evictions. + if *references > 0 { + log::warn!( + target: LOG_TARGET, + "Notification block pinning limit reached. Unpinning block with hash = {key:?}" + ); + if let Some(backend) = self.backend.upgrade() { + (0..*references).for_each(|_| backend.unpin_block(*key)); + } + } else { + log::trace!( + target: LOG_TARGET, + "Unpinned block. hash = {key:?}", + ) + } + } + + fn on_cleared(&mut self) {} + + fn on_grow(&mut self, _new_memory_usage: usize) -> bool { + true + } +} + +/// Worker for the handling of notification pinning. +/// +/// It receives messages from a receiver and pins/unpins based on the incoming messages. +/// All notification related unpinning should go through this worker. If the maximum number of +/// notification pins is reached, the block from the oldest notification is unpinned. +pub struct NotificationPinningWorker> { + unpin_message_rx: TracingUnboundedReceiver>, + task_backend: Weak, + pinned_blocks: schnellru::LruMap>, +} + +impl> NotificationPinningWorker { + /// Creates a new `NotificationPinningWorker`. + pub fn new( + unpin_message_rx: TracingUnboundedReceiver>, + task_backend: Arc, + ) -> Self { + let pinned_blocks = + schnellru::LruMap::>::new( + UnpinningByLengthLimiter::new( + NOTIFICATION_PINNING_LIMIT, + Arc::downgrade(&task_backend), + ), + ); + Self { unpin_message_rx, task_backend: Arc::downgrade(&task_backend), pinned_blocks } + } + + fn handle_announce_message(&mut self, hash: Block::Hash) { + if let Some(entry) = self.pinned_blocks.get_or_insert(hash, Default::default) { + *entry = *entry + 1; + } + } + + fn handle_unpin_message(&mut self, hash: Block::Hash) -> Result<(), ()> { + if let Some(refcount) = self.pinned_blocks.peek_mut(&hash) { + *refcount = *refcount - 1; + if *refcount == 0 { + self.pinned_blocks.remove(&hash); + } + if let Some(backend) = self.task_backend.upgrade() { + log::debug!(target: LOG_TARGET, "Reducing pinning refcount for block hash = {hash:?}"); + backend.unpin_block(hash); + } else { + log::debug!(target: LOG_TARGET, "Terminating unpin-worker, backend reference was dropped."); + return Err(()) + } + } else { + log::debug!(target: LOG_TARGET, "Received unpin message for already unpinned block. hash = {hash:?}"); + } + Ok(()) + } + + /// Start working on the received messages. + /// + /// The worker maintains a map which keeps track of the pinned blocks and their reference count. + /// Depending upon the received message, it acts to pin/unpin the block. + pub async fn run(mut self) { + while let Some(message) = self.unpin_message_rx.next().await { + match message { + UnpinWorkerMessage::AnnouncePin(hash) => self.handle_announce_message(hash), + UnpinWorkerMessage::Unpin(hash) => + if self.handle_unpin_message(hash).is_err() { + return + }, + } + } + log::debug!(target: LOG_TARGET, "Terminating unpin-worker, stream terminated.") + } +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use sc_client_api::{Backend, UnpinWorkerMessage}; + use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver}; + use sp_core::H256; + use sp_runtime::traits::Block as BlockT; + + type Block = substrate_test_runtime_client::runtime::Block; + + use super::{NotificationPinningWorker, UnpinningByLengthLimiter}; + + impl> NotificationPinningWorker { + fn new_with_limit( + unpin_message_rx: TracingUnboundedReceiver>, + task_backend: Arc, + limit: usize, + ) -> Self { + let pinned_blocks = + schnellru::LruMap::>::new( + UnpinningByLengthLimiter::new(limit, Arc::downgrade(&task_backend)), + ); + Self { unpin_message_rx, task_backend: Arc::downgrade(&task_backend), pinned_blocks } + } + + fn lru( + &self, + ) -> &schnellru::LruMap> { + &self.pinned_blocks + } + } + + #[test] + fn pinning_worker_handles_base_case() { + let (_tx, rx) = tracing_unbounded("testing", 1000); + + let backend = Arc::new(sc_client_api::in_mem::Backend::::new()); + + let hash = H256::random(); + + let mut worker = NotificationPinningWorker::new(rx, backend.clone()); + + // Block got pinned and unpin message should unpin in the backend. + let _ = backend.pin_block(hash); + assert_eq!(backend.pin_refs(&hash), Some(1)); + + worker.handle_announce_message(hash); + assert_eq!(worker.lru().len(), 1); + + let _ = worker.handle_unpin_message(hash); + + assert_eq!(backend.pin_refs(&hash), Some(0)); + assert!(worker.lru().is_empty()); + } + + #[test] + fn pinning_worker_handles_multiple_pins() { + let (_tx, rx) = tracing_unbounded("testing", 1000); + + let backend = Arc::new(sc_client_api::in_mem::Backend::::new()); + + let hash = H256::random(); + + let mut worker = NotificationPinningWorker::new(rx, backend.clone()); + // Block got pinned multiple times. + let _ = backend.pin_block(hash); + let _ = backend.pin_block(hash); + let _ = backend.pin_block(hash); + assert_eq!(backend.pin_refs(&hash), Some(3)); + + worker.handle_announce_message(hash); + worker.handle_announce_message(hash); + worker.handle_announce_message(hash); + assert_eq!(worker.lru().len(), 1); + + let _ = worker.handle_unpin_message(hash); + assert_eq!(backend.pin_refs(&hash), Some(2)); + let _ = worker.handle_unpin_message(hash); + assert_eq!(backend.pin_refs(&hash), Some(1)); + let _ = worker.handle_unpin_message(hash); + assert_eq!(backend.pin_refs(&hash), Some(0)); + assert!(worker.lru().is_empty()); + + let _ = worker.handle_unpin_message(hash); + assert_eq!(backend.pin_refs(&hash), Some(0)); + } + + #[test] + fn pinning_worker_handles_too_many_unpins() { + let (_tx, rx) = tracing_unbounded("testing", 1000); + + let backend = Arc::new(sc_client_api::in_mem::Backend::::new()); + + let hash = H256::random(); + let hash2 = H256::random(); + + let mut worker = NotificationPinningWorker::new(rx, backend.clone()); + // Block was announced once but unpinned multiple times. The worker should ignore the + // additional unpins. + let _ = backend.pin_block(hash); + let _ = backend.pin_block(hash); + let _ = backend.pin_block(hash); + assert_eq!(backend.pin_refs(&hash), Some(3)); + + worker.handle_announce_message(hash); + assert_eq!(worker.lru().len(), 1); + + let _ = worker.handle_unpin_message(hash); + assert_eq!(backend.pin_refs(&hash), Some(2)); + let _ = worker.handle_unpin_message(hash); + assert_eq!(backend.pin_refs(&hash), Some(2)); + assert!(worker.lru().is_empty()); + + let _ = worker.handle_unpin_message(hash2); + assert!(worker.lru().is_empty()); + assert_eq!(backend.pin_refs(&hash2), None); + } + + #[test] + fn pinning_worker_should_evict_when_limit_reached() { + let (_tx, rx) = tracing_unbounded("testing", 1000); + + let backend = Arc::new(sc_client_api::in_mem::Backend::::new()); + + let hash1 = H256::random(); + let hash2 = H256::random(); + let hash3 = H256::random(); + let hash4 = H256::random(); + + // Only two items fit into the cache. + let mut worker = NotificationPinningWorker::new_with_limit(rx, backend.clone(), 2); + + // Multiple blocks are announced but the cache size is too small. We expect that blocks + // are evicted by the cache and unpinned in the backend. + let _ = backend.pin_block(hash1); + let _ = backend.pin_block(hash2); + let _ = backend.pin_block(hash3); + assert_eq!(backend.pin_refs(&hash1), Some(1)); + assert_eq!(backend.pin_refs(&hash2), Some(1)); + assert_eq!(backend.pin_refs(&hash3), Some(1)); + + worker.handle_announce_message(hash1); + assert!(worker.lru().peek(&hash1).is_some()); + worker.handle_announce_message(hash2); + assert!(worker.lru().peek(&hash2).is_some()); + worker.handle_announce_message(hash3); + assert!(worker.lru().peek(&hash3).is_some()); + assert!(worker.lru().peek(&hash2).is_some()); + assert_eq!(worker.lru().len(), 2); + + // Hash 1 should have gotten unpinned, since its oldest. + assert_eq!(backend.pin_refs(&hash1), Some(0)); + assert_eq!(backend.pin_refs(&hash2), Some(1)); + assert_eq!(backend.pin_refs(&hash3), Some(1)); + + // Hash 2 is getting bumped. + worker.handle_announce_message(hash2); + assert_eq!(worker.lru().peek(&hash2), Some(&2)); + + // Since hash 2 was accessed, evict hash 3. + worker.handle_announce_message(hash4); + assert_eq!(worker.lru().peek(&hash3), None); + } +}