From 70d0ed4177e167f2c13d380b51b2b142a1c89f13 Mon Sep 17 00:00:00 2001 From: Sebastian Kunert Date: Mon, 8 Jan 2024 17:44:51 +0100 Subject: [PATCH 1/6] Keep track of pinned notifications in pinning worker. --- Cargo.lock | 1 + substrate/client/api/src/client.rs | 28 +- substrate/client/db/src/lib.rs | 4 +- .../client/db/src/pinned_blocks_cache.rs | 3 +- substrate/client/service/Cargo.toml | 1 + substrate/client/service/src/client/client.rs | 54 +-- substrate/client/service/src/client/mod.rs | 1 + .../src/client/notification_pinning.rs | 341 ++++++++++++++++++ 8 files changed, 400 insertions(+), 33 deletions(-) create mode 100644 substrate/client/service/src/client/notification_pinning.rs diff --git a/Cargo.lock b/Cargo.lock index b20adebf963a..4e13890b1015 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -16484,6 +16484,7 @@ dependencies = [ "sc-transaction-pool", "sc-transaction-pool-api", "sc-utils", + "schnellru", "serde", "serde_json", "sp-api", diff --git a/substrate/client/api/src/client.rs b/substrate/client/api/src/client.rs index 46232c74539c..97d7a19e26c3 100644 --- a/substrate/client/api/src/client.rs +++ b/substrate/client/api/src/client.rs @@ -26,6 +26,7 @@ use sp_runtime::{ Justifications, }; use std::{ + backtrace::Backtrace, collections::HashSet, fmt::{self, Debug}, sync::Arc, @@ -278,7 +279,7 @@ impl fmt::Display for UsageInfo { pub struct UnpinHandleInner { /// Hash of the block pinned by this handle hash: Block::Hash, - unpin_worker_sender: TracingUnboundedSender, + unpin_worker_sender: TracingUnboundedSender>, } impl Debug for UnpinHandleInner { @@ -291,7 +292,7 @@ impl UnpinHandleInner { /// Create a new [`UnpinHandleInner`] pub fn new( hash: Block::Hash, - unpin_worker_sender: TracingUnboundedSender, + unpin_worker_sender: TracingUnboundedSender>, ) -> Self { Self { hash, unpin_worker_sender } } @@ -299,12 +300,25 @@ impl UnpinHandleInner { impl Drop for UnpinHandleInner { fn drop(&mut self) { - if let Err(err) = self.unpin_worker_sender.unbounded_send(self.hash) { + if let Err(err) = + self.unpin_worker_sender.unbounded_send(UnpinWorkerMessage::Unpin(self.hash)) + { log::debug!(target: "db", "Unable to unpin block with hash: {}, error: {:?}", self.hash, err); }; } } +/// Message that signals notification-based pinning actions to the pinning-worker. +/// +/// When the notification is dropped, an `Unpin` message should be sent to the worker. +#[derive(Debug)] +pub enum UnpinWorkerMessage { + /// Should be sent when a import or finality notification is created. + AnnouncePin(Block::Hash), + /// Should be sent when a import or finality notification is dropped. + Unpin(Block::Hash), +} + /// Keeps a specific block pinned while the handle is alive. /// Once the last handle instance for a given block is dropped, the /// block is unpinned in the [`Backend`](crate::backend::Backend::unpin_block). @@ -315,7 +329,7 @@ impl UnpinHandle { /// Create a new [`UnpinHandle`] pub fn new( hash: Block::Hash, - unpin_worker_sender: TracingUnboundedSender, + unpin_worker_sender: TracingUnboundedSender>, ) -> UnpinHandle { UnpinHandle(Arc::new(UnpinHandleInner::new(hash, unpin_worker_sender))) } @@ -353,7 +367,7 @@ impl BlockImportNotification { header: Block::Header, is_new_best: bool, tree_route: Option>>, - unpin_worker_sender: TracingUnboundedSender, + unpin_worker_sender: TracingUnboundedSender>, ) -> Self { Self { hash, @@ -412,7 +426,7 @@ impl FinalityNotification { /// Create finality notification from finality summary. pub fn from_summary( mut summary: FinalizeSummary, - unpin_worker_sender: TracingUnboundedSender, + unpin_worker_sender: TracingUnboundedSender>, ) -> FinalityNotification { let hash = summary.finalized.pop().unwrap_or_default(); FinalityNotification { @@ -436,7 +450,7 @@ impl BlockImportNotification { /// Create finality notification from finality summary. pub fn from_summary( summary: ImportSummary, - unpin_worker_sender: TracingUnboundedSender, + unpin_worker_sender: TracingUnboundedSender>, ) -> BlockImportNotification { let hash = summary.hash; BlockImportNotification { diff --git a/substrate/client/db/src/lib.rs b/substrate/client/db/src/lib.rs index 194bec8a88eb..93e94d56529e 100644 --- a/substrate/client/db/src/lib.rs +++ b/substrate/client/db/src/lib.rs @@ -2514,7 +2514,7 @@ impl sc_client_api::backend::Backend for Backend { self.storage.state_db.pin(&hash, number.saturated_into::(), hint).map_err( |_| { sp_blockchain::Error::UnknownBlock(format!( - "State already discarded for `{:?}`", + "Unable to pin: state already discarded for `{:?}`", hash )) }, @@ -4180,6 +4180,7 @@ pub(crate) mod tests { #[test] fn test_pinned_blocks_on_finalize() { + sp_tracing::try_init_simple(); let backend = Backend::::new_test_with_tx_storage(BlocksPruning::Some(1), 10); let mut blocks = Vec::new(); let mut prev_hash = Default::default(); @@ -4341,6 +4342,7 @@ pub(crate) mod tests { #[test] fn test_pinned_blocks_on_finalize_with_fork() { + sp_tracing::try_init_simple(); let backend = Backend::::new_test_with_tx_storage(BlocksPruning::Some(1), 10); let mut blocks = Vec::new(); let mut prev_hash = Default::default(); diff --git a/substrate/client/db/src/pinned_blocks_cache.rs b/substrate/client/db/src/pinned_blocks_cache.rs index 46c9287fb19a..f4718010c66b 100644 --- a/substrate/client/db/src/pinned_blocks_cache.rs +++ b/substrate/client/db/src/pinned_blocks_cache.rs @@ -20,7 +20,7 @@ use schnellru::{Limiter, LruMap}; use sp_runtime::{traits::Block as BlockT, Justifications}; const LOG_TARGET: &str = "db::pin"; -const PINNING_CACHE_SIZE: usize = 1024; +const PINNING_CACHE_SIZE: usize = 4096; /// Entry for pinned blocks cache. struct PinnedBlockCacheEntry { @@ -81,6 +81,7 @@ impl Limiter> for Loggi key: Self::KeyToInsert<'_>, value: PinnedBlockCacheEntry, ) -> Option<(Block::Hash, PinnedBlockCacheEntry)> { + log::info!(target: LOG_TARGET, "Inserting value into pinned block cache. hash = {}", key); if self.max_length > 0 { Some((key, value)) } else { diff --git a/substrate/client/service/Cargo.toml b/substrate/client/service/Cargo.toml index 576a8aac8e49..ae6400962fd5 100644 --- a/substrate/client/service/Cargo.toml +++ b/substrate/client/service/Cargo.toml @@ -84,6 +84,7 @@ tokio = { version = "1.22.0", features = ["parking_lot", "rt-multi-thread", "tim tempfile = "3.1.0" directories = "5.0.1" static_init = "1.0.3" +schnellru = "0.2.1" [dev-dependencies] substrate-test-runtime-client = { path = "../../test-utils/runtime/client" } diff --git a/substrate/client/service/src/client/client.rs b/substrate/client/service/src/client/client.rs index aa9c1b80a29a..4a99de43137a 100644 --- a/substrate/client/service/src/client/client.rs +++ b/substrate/client/service/src/client/client.rs @@ -19,8 +19,7 @@ //! Substrate Client use super::block_rules::{BlockRules, LookupResult as BlockLookupResult}; -use futures::{FutureExt, StreamExt}; -use log::{error, info, trace, warn}; +use log::{debug, info, trace, warn}; use parking_lot::{Mutex, RwLock}; use prometheus_endpoint::Registry; use rand::Rng; @@ -38,7 +37,7 @@ use sc_client_api::{ execution_extensions::ExecutionExtensions, notifications::{StorageEventStream, StorageNotifications}, CallExecutor, ExecutorProvider, KeysIter, OnFinalityAction, OnImportAction, PairsIter, - ProofProvider, UsageProvider, + ProofProvider, UnpinWorkerMessage, UsageProvider, }; use sc_consensus::{ BlockCheckParams, BlockImportParams, ForkChoiceStrategy, ImportResult, StateAction, @@ -114,7 +113,7 @@ where block_rules: BlockRules, config: ClientConfig, telemetry: Option, - unpin_worker_sender: TracingUnboundedSender, + unpin_worker_sender: TracingUnboundedSender>, _phantom: PhantomData, } @@ -326,19 +325,35 @@ where // dropped, the block will be unpinned automatically. if let Some(ref notification) = finality_notification { if let Err(err) = self.backend.pin_block(notification.hash) { - error!( + debug!( "Unable to pin block for finality notification. hash: {}, Error: {}", notification.hash, err ); - }; + } else { + let _ = self + .unpin_worker_sender + .unbounded_send(UnpinWorkerMessage::AnnouncePin(notification.hash)) + .map_err(|e| { + log::debug!( + "Unable to send AnnouncePin worker message for finality: {e}" + ) + }); + } } if let Some(ref notification) = import_notification { if let Err(err) = self.backend.pin_block(notification.hash) { - error!( + debug!( "Unable to pin block for import notification. hash: {}, Error: {}", notification.hash, err ); + } else { + let _ = self + .unpin_worker_sender + .unbounded_send(UnpinWorkerMessage::AnnouncePin(notification.hash)) + .map_err(|e| { + log::debug!("Unable to send AnnouncePin worker message for import: {e}") + }); }; } @@ -416,25 +431,16 @@ where backend.commit_operation(op)?; } - let (unpin_worker_sender, mut rx) = - tracing_unbounded::("unpin-worker-channel", 10_000); + let (unpin_worker_sender, rx) = tracing_unbounded::>( + "notification-pinning-worker-channel", + 10_000, + ); let task_backend = Arc::downgrade(&backend); - spawn_handle.spawn( - "unpin-worker", - None, - async move { - while let Some(message) = rx.next().await { - if let Some(backend) = task_backend.upgrade() { - backend.unpin_block(message); - } else { - log::debug!("Terminating unpin-worker, backend reference was dropped."); - return - } - } - log::debug!("Terminating unpin-worker, stream terminated.") - } - .boxed(), + let unpin_worker = crate::client::notification_pinning::NotificationPinningWorker::new( + rx, + task_backend.clone(), ); + spawn_handle.spawn("notification-pinning-worker", None, Box::pin(unpin_worker.work())); Ok(Client { backend, diff --git a/substrate/client/service/src/client/mod.rs b/substrate/client/service/src/client/mod.rs index a13fd4317e15..0703cc2b47d1 100644 --- a/substrate/client/service/src/client/mod.rs +++ b/substrate/client/service/src/client/mod.rs @@ -47,6 +47,7 @@ mod block_rules; mod call_executor; mod client; +mod notification_pinning; mod wasm_override; mod wasm_substitutes; diff --git a/substrate/client/service/src/client/notification_pinning.rs b/substrate/client/service/src/client/notification_pinning.rs new file mode 100644 index 000000000000..aa8b587e6987 --- /dev/null +++ b/substrate/client/service/src/client/notification_pinning.rs @@ -0,0 +1,341 @@ +// This file is part of Substrate. + +// Copyright (C) Parity Technologies (UK) Ltd. +// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +//! Notification pinning related logic. +//! +//! This file contains a worker that should be started when a new client instance is created. +//! The goal is to avoid pruning of blocks that have active notifications in the node. Every +//! recipient of notifications should receive the chance to act upon them. In addition, notification +//! listeners can hold onto a [`sc_client_api::UnpinHandle`] to keep a block pinned. Once the handle +//! is dropped, a message is sent and the worker unpins the respective block. +use std::{marker::PhantomData, sync::Weak}; + +use futures::StreamExt; +use sc_client_api::{Backend, UnpinWorkerMessage}; + +use sc_utils::mpsc::TracingUnboundedReceiver; +use schnellru::Limiter; +use sp_runtime::traits::Block as BlockT; + +const LOG_TARGET: &str = "db::notification_pinning"; +const NOTIFICATION_PINNING_LIMIT: usize = 2048; + +/// A limiter which automatically unpins blockst hat leave the data structure. +#[derive(Clone, Debug)] +struct UnpinningByLengthLimiter> { + max_length: usize, + backend: Weak, + _phantom: PhantomData, +} + +impl> UnpinningByLengthLimiter { + /// Creates a new length limiter with a given `max_length`. + pub fn new(max_length: usize, backend: Weak) -> UnpinningByLengthLimiter { + UnpinningByLengthLimiter { max_length, backend, _phantom: PhantomData::::default() } + } +} + +impl> Limiter + for UnpinningByLengthLimiter +{ + type KeyToInsert<'a> = Block::Hash; + type LinkType = usize; + + fn is_over_the_limit(&self, length: usize) -> bool { + length > self.max_length + } + + fn on_insert( + &mut self, + _length: usize, + key: Self::KeyToInsert<'_>, + value: u32, + ) -> Option<(Block::Hash, u32)> { + log::debug!(target: LOG_TARGET, "Pinning block based on notification. hash = {key}"); + if self.max_length > 0 { + Some((key, value)) + } else { + None + } + } + + fn on_replace( + &mut self, + _length: usize, + _old_key: &mut Block::Hash, + _new_key: Block::Hash, + _old_value: &mut u32, + _new_value: &mut u32, + ) -> bool { + true + } + + fn on_removed(&mut self, key: &mut Block::Hash, references: &mut u32) { + // If reference count was larger than 0 on removal, + // the item was removed due to capacity limitations. + // Since the cache should be large enough for pinned items, + // we want to know about these evictions. + if *references > 0 { + log::warn!( + target: LOG_TARGET, + "Notification block pinning limit reached. Unpinning block with hash = {key:?}" + ); + if let Some(backend) = self.backend.upgrade() { + (0..*references).for_each(|_| backend.unpin_block(*key)); + } + } else { + log::trace!( + target: LOG_TARGET, + "Unpinned block. hash = {key:?}", + ) + } + } + + fn on_cleared(&mut self) {} + + fn on_grow(&mut self, _new_memory_usage: usize) -> bool { + true + } +} + +/// Worker for the handling of notification pinning. +/// +/// It receives messages from a receiver and pins/unping based on the incoming messages. +/// All notification related unpinning should go through this worker. If the maximum number of +/// notification pins is reached, the block from the oldest notification is unpinned. +pub struct NotificationPinningWorker> { + unpin_message_rx: TracingUnboundedReceiver>, + task_backend: Weak, + pinned_blocks: schnellru::LruMap>, +} + +impl> NotificationPinningWorker { + /// Creates a new `NotificationPinningWorker`. + pub fn new( + unpin_message_rx: TracingUnboundedReceiver>, + task_backend: Weak, + ) -> Self { + let pinned_blocks = + schnellru::LruMap::>::new( + UnpinningByLengthLimiter::new(NOTIFICATION_PINNING_LIMIT, task_backend.clone()), + ); + Self { unpin_message_rx, task_backend, pinned_blocks } + } + + pub fn handle_announce_message(&mut self, hash: Block::Hash) { + if let Some(entry) = self.pinned_blocks.get_or_insert(hash, Default::default) { + *entry = *entry + 1; + } + } + + pub fn handle_unpin_message(&mut self, hash: Block::Hash) { + if let Some(refcount) = self.pinned_blocks.peek_mut(&hash) { + *refcount = *refcount - 1; + if *refcount == 0 { + self.pinned_blocks.remove(&hash); + } + if let Some(backend) = self.task_backend.upgrade() { + log::debug!(target: LOG_TARGET, "Reducing pinning refcount for block hash = {hash:?}"); + backend.unpin_block(hash); + } else { + log::debug!(target: LOG_TARGET, "Terminating unpin-worker, backend reference was dropped."); + return + } + } else { + log::debug!(target: LOG_TARGET, "Received unpin message for already unpinned block. hash = {hash:?}"); + }; + } + + /// Start working on the received messages. + /// + /// The worker maintains a map which keeps track of the pinned blocks and their reference count. + /// Depending upon the received message, it acts to pin/unpin the block. + pub async fn work(mut self) { + while let Some(message) = self.unpin_message_rx.next().await { + match message { + UnpinWorkerMessage::AnnouncePin(hash) => self.handle_announce_message(hash), + UnpinWorkerMessage::Unpin(hash) => self.handle_unpin_message(hash), + } + } + log::debug!(target: LOG_TARGET, "Terminating unpin-worker, stream terminated.") + } +} + +#[cfg(test)] +impl> NotificationPinningWorker { + fn new_with_limit( + unpin_message_rx: TracingUnboundedReceiver>, + task_backend: Weak, + limit: usize, + ) -> Self { + let pinned_blocks = + schnellru::LruMap::>::new( + UnpinningByLengthLimiter::new(limit, task_backend.clone()), + ); + Self { unpin_message_rx, task_backend, pinned_blocks } + } + + fn lru(&self) -> &schnellru::LruMap> { + &self.pinned_blocks + } +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use sc_client_api::Backend; + use sc_utils::mpsc::tracing_unbounded; + use sp_core::H256; + + type Block = substrate_test_runtime_client::runtime::Block; + + use super::NotificationPinningWorker; + + #[test] + fn pinning_worker_handles_base_case() { + let (_tx, rx) = tracing_unbounded("testing", 1000); + + let backend = Arc::new(sc_client_api::in_mem::Backend::::new()); + + let hash = H256::random(); + + let mut worker = NotificationPinningWorker::new(rx, Arc::downgrade(&backend)); + + // 1. Block got pinned and unpin message should unpin in the backend. + let _ = backend.pin_block(hash); + assert_eq!(backend.pin_refs(&hash), Some(1)); + + worker.handle_announce_message(hash); + assert_eq!(worker.lru().len(), 1); + + worker.handle_unpin_message(hash); + + assert_eq!(backend.pin_refs(&hash), Some(0)); + assert!(worker.lru().is_empty()); + } + + #[test] + fn pinning_worker_handles_multiple_pins() { + let (_tx, rx) = tracing_unbounded("testing", 1000); + + let backend = Arc::new(sc_client_api::in_mem::Backend::::new()); + + let hash = H256::random(); + + let mut worker = NotificationPinningWorker::new(rx, Arc::downgrade(&backend)); + // 2. Block got pinned multiple times. + let _ = backend.pin_block(hash); + let _ = backend.pin_block(hash); + let _ = backend.pin_block(hash); + assert_eq!(backend.pin_refs(&hash), Some(3)); + + worker.handle_announce_message(hash); + worker.handle_announce_message(hash); + worker.handle_announce_message(hash); + assert_eq!(worker.lru().len(), 1); + + worker.handle_unpin_message(hash); + assert_eq!(backend.pin_refs(&hash), Some(2)); + worker.handle_unpin_message(hash); + assert_eq!(backend.pin_refs(&hash), Some(1)); + worker.handle_unpin_message(hash); + assert_eq!(backend.pin_refs(&hash), Some(0)); + assert!(worker.lru().is_empty()); + + worker.handle_unpin_message(hash); + assert_eq!(backend.pin_refs(&hash), Some(0)); + } + + #[test] + fn pinning_worker_handles_too_many_unpins() { + let (_tx, rx) = tracing_unbounded("testing", 1000); + + let backend = Arc::new(sc_client_api::in_mem::Backend::::new()); + + let hash = H256::random(); + let hash2 = H256::random(); + + let mut worker = NotificationPinningWorker::new(rx, Arc::downgrade(&backend)); + // 3. Block was announced once but unpinned multiple times. The worker should ignore the + // additional unpins. + let _ = backend.pin_block(hash); + let _ = backend.pin_block(hash); + let _ = backend.pin_block(hash); + assert_eq!(backend.pin_refs(&hash), Some(3)); + + worker.handle_announce_message(hash); + assert_eq!(worker.lru().len(), 1); + + worker.handle_unpin_message(hash); + assert_eq!(backend.pin_refs(&hash), Some(2)); + worker.handle_unpin_message(hash); + assert_eq!(backend.pin_refs(&hash), Some(2)); + assert!(worker.lru().is_empty()); + + worker.handle_unpin_message(hash2); + assert!(worker.lru().is_empty()); + assert_eq!(backend.pin_refs(&hash2), None); + } + + #[test] + fn pinning_worker_should_evict_when_limit_reached() { + let (_tx, rx) = tracing_unbounded("testing", 1000); + + let backend = Arc::new(sc_client_api::in_mem::Backend::::new()); + + let hash1 = H256::random(); + let hash2 = H256::random(); + let hash3 = H256::random(); + let hash4 = H256::random(); + + // Only two items fit into the cache. + let mut worker = NotificationPinningWorker::new_with_limit(rx, Arc::downgrade(&backend), 2); + + // 4. Multiple blocks are announced but the cache size is too small. We expect that blocks + // are evicted by the cache and unpinned in the backend. + let _ = backend.pin_block(hash1); + let _ = backend.pin_block(hash2); + let _ = backend.pin_block(hash3); + assert_eq!(backend.pin_refs(&hash1), Some(1)); + assert_eq!(backend.pin_refs(&hash2), Some(1)); + assert_eq!(backend.pin_refs(&hash3), Some(1)); + + worker.handle_announce_message(hash1); + assert!(worker.lru().peek(&hash1).is_some()); + worker.handle_announce_message(hash2); + assert!(worker.lru().peek(&hash2).is_some()); + worker.handle_announce_message(hash3); + assert!(worker.lru().peek(&hash3).is_some()); + assert!(worker.lru().peek(&hash2).is_some()); + assert_eq!(worker.lru().len(), 2); + + // Hash 1 should have gotten unpinned, since its oldest. + assert_eq!(backend.pin_refs(&hash1), Some(0)); + assert_eq!(backend.pin_refs(&hash2), Some(1)); + assert_eq!(backend.pin_refs(&hash3), Some(1)); + + // Hash 2 is getting bumped. + worker.handle_announce_message(hash2); + assert_eq!(worker.lru().peek(&hash2), Some(&2)); + + // Since hash 2 was accessed, evict hash 3. + worker.handle_announce_message(hash4); + assert_eq!(worker.lru().peek(&hash3), None); + } +} From 958ded0975c7623e11594c28461168ad86428304 Mon Sep 17 00:00:00 2001 From: Sebastian Kunert Date: Mon, 15 Jan 2024 18:07:29 +0100 Subject: [PATCH 2/6] Fix small issues --- substrate/client/db/src/lib.rs | 2 -- substrate/client/db/src/pinned_blocks_cache.rs | 3 +-- .../service/src/client/notification_pinning.rs | 14 +++++++------- 3 files changed, 8 insertions(+), 11 deletions(-) diff --git a/substrate/client/db/src/lib.rs b/substrate/client/db/src/lib.rs index 93e94d56529e..f14b5f2d301f 100644 --- a/substrate/client/db/src/lib.rs +++ b/substrate/client/db/src/lib.rs @@ -4180,7 +4180,6 @@ pub(crate) mod tests { #[test] fn test_pinned_blocks_on_finalize() { - sp_tracing::try_init_simple(); let backend = Backend::::new_test_with_tx_storage(BlocksPruning::Some(1), 10); let mut blocks = Vec::new(); let mut prev_hash = Default::default(); @@ -4342,7 +4341,6 @@ pub(crate) mod tests { #[test] fn test_pinned_blocks_on_finalize_with_fork() { - sp_tracing::try_init_simple(); let backend = Backend::::new_test_with_tx_storage(BlocksPruning::Some(1), 10); let mut blocks = Vec::new(); let mut prev_hash = Default::default(); diff --git a/substrate/client/db/src/pinned_blocks_cache.rs b/substrate/client/db/src/pinned_blocks_cache.rs index f4718010c66b..ac4aad07765c 100644 --- a/substrate/client/db/src/pinned_blocks_cache.rs +++ b/substrate/client/db/src/pinned_blocks_cache.rs @@ -20,7 +20,7 @@ use schnellru::{Limiter, LruMap}; use sp_runtime::{traits::Block as BlockT, Justifications}; const LOG_TARGET: &str = "db::pin"; -const PINNING_CACHE_SIZE: usize = 4096; +const PINNING_CACHE_SIZE: usize = 2048; /// Entry for pinned blocks cache. struct PinnedBlockCacheEntry { @@ -81,7 +81,6 @@ impl Limiter> for Loggi key: Self::KeyToInsert<'_>, value: PinnedBlockCacheEntry, ) -> Option<(Block::Hash, PinnedBlockCacheEntry)> { - log::info!(target: LOG_TARGET, "Inserting value into pinned block cache. hash = {}", key); if self.max_length > 0 { Some((key, value)) } else { diff --git a/substrate/client/service/src/client/notification_pinning.rs b/substrate/client/service/src/client/notification_pinning.rs index aa8b587e6987..37b6a877ea72 100644 --- a/substrate/client/service/src/client/notification_pinning.rs +++ b/substrate/client/service/src/client/notification_pinning.rs @@ -33,7 +33,7 @@ use schnellru::Limiter; use sp_runtime::traits::Block as BlockT; const LOG_TARGET: &str = "db::notification_pinning"; -const NOTIFICATION_PINNING_LIMIT: usize = 2048; +const NOTIFICATION_PINNING_LIMIT: usize = 1024; /// A limiter which automatically unpins blockst hat leave the data structure. #[derive(Clone, Debug)] @@ -217,7 +217,7 @@ mod tests { let mut worker = NotificationPinningWorker::new(rx, Arc::downgrade(&backend)); - // 1. Block got pinned and unpin message should unpin in the backend. + // Block got pinned and unpin message should unpin in the backend. let _ = backend.pin_block(hash); assert_eq!(backend.pin_refs(&hash), Some(1)); @@ -239,7 +239,7 @@ mod tests { let hash = H256::random(); let mut worker = NotificationPinningWorker::new(rx, Arc::downgrade(&backend)); - // 2. Block got pinned multiple times. + // Block got pinned multiple times. let _ = backend.pin_block(hash); let _ = backend.pin_block(hash); let _ = backend.pin_block(hash); @@ -272,8 +272,8 @@ mod tests { let hash2 = H256::random(); let mut worker = NotificationPinningWorker::new(rx, Arc::downgrade(&backend)); - // 3. Block was announced once but unpinned multiple times. The worker should ignore the - // additional unpins. + // Block was announced once but unpinned multiple times. The worker should ignore the + // additional unpins. let _ = backend.pin_block(hash); let _ = backend.pin_block(hash); let _ = backend.pin_block(hash); @@ -307,8 +307,8 @@ mod tests { // Only two items fit into the cache. let mut worker = NotificationPinningWorker::new_with_limit(rx, Arc::downgrade(&backend), 2); - // 4. Multiple blocks are announced but the cache size is too small. We expect that blocks - // are evicted by the cache and unpinned in the backend. + // Multiple blocks are announced but the cache size is too small. We expect that blocks + // are evicted by the cache and unpinned in the backend. let _ = backend.pin_block(hash1); let _ = backend.pin_block(hash2); let _ = backend.pin_block(hash3); From a56a82d692ff9a094460142d00ba21edb7be10b5 Mon Sep 17 00:00:00 2001 From: Sebastian Kunert Date: Tue, 16 Jan 2024 09:10:25 +0100 Subject: [PATCH 3/6] Update substrate/client/service/src/client/notification_pinning.rs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Bastian Köcher --- substrate/client/service/src/client/notification_pinning.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/substrate/client/service/src/client/notification_pinning.rs b/substrate/client/service/src/client/notification_pinning.rs index 37b6a877ea72..3c50bfd36ff3 100644 --- a/substrate/client/service/src/client/notification_pinning.rs +++ b/substrate/client/service/src/client/notification_pinning.rs @@ -137,13 +137,13 @@ impl> NotificationPinningWorker Self { unpin_message_rx, task_backend, pinned_blocks } } - pub fn handle_announce_message(&mut self, hash: Block::Hash) { + fn handle_announce_message(&mut self, hash: Block::Hash) { if let Some(entry) = self.pinned_blocks.get_or_insert(hash, Default::default) { *entry = *entry + 1; } } - pub fn handle_unpin_message(&mut self, hash: Block::Hash) { + fn handle_unpin_message(&mut self, hash: Block::Hash) { if let Some(refcount) = self.pinned_blocks.peek_mut(&hash) { *refcount = *refcount - 1; if *refcount == 0 { From 2f78e351e1701eee99f474efc1d9abab10aae854 Mon Sep 17 00:00:00 2001 From: Sebastian Kunert Date: Tue, 16 Jan 2024 09:10:56 +0100 Subject: [PATCH 4/6] Unused import --- substrate/client/api/src/client.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/substrate/client/api/src/client.rs b/substrate/client/api/src/client.rs index 97d7a19e26c3..2de09840e4df 100644 --- a/substrate/client/api/src/client.rs +++ b/substrate/client/api/src/client.rs @@ -26,7 +26,6 @@ use sp_runtime::{ Justifications, }; use std::{ - backtrace::Backtrace, collections::HashSet, fmt::{self, Debug}, sync::Arc, From c3bfd4e51b1de89b0565979c66d63487c7789a4f Mon Sep 17 00:00:00 2001 From: Sebastian Kunert Date: Tue, 30 Jan 2024 14:36:48 +0100 Subject: [PATCH 5/6] Reviewer comments --- substrate/client/service/src/client/client.rs | 13 +-- .../src/client/notification_pinning.rs | 100 ++++++++++-------- 2 files changed, 61 insertions(+), 52 deletions(-) diff --git a/substrate/client/service/src/client/client.rs b/substrate/client/service/src/client/client.rs index 4a99de43137a..4b9887318bd3 100644 --- a/substrate/client/service/src/client/client.rs +++ b/substrate/client/service/src/client/client.rs @@ -19,6 +19,7 @@ //! Substrate Client use super::block_rules::{BlockRules, LookupResult as BlockLookupResult}; +use crate::client::notification_pinning::NotificationPinningWorker; use log::{debug, info, trace, warn}; use parking_lot::{Mutex, RwLock}; use prometheus_endpoint::Registry; @@ -334,7 +335,7 @@ where .unpin_worker_sender .unbounded_send(UnpinWorkerMessage::AnnouncePin(notification.hash)) .map_err(|e| { - log::debug!( + log::error!( "Unable to send AnnouncePin worker message for finality: {e}" ) }); @@ -352,7 +353,7 @@ where .unpin_worker_sender .unbounded_send(UnpinWorkerMessage::AnnouncePin(notification.hash)) .map_err(|e| { - log::debug!("Unable to send AnnouncePin worker message for import: {e}") + log::error!("Unable to send AnnouncePin worker message for import: {e}") }); }; } @@ -435,12 +436,8 @@ where "notification-pinning-worker-channel", 10_000, ); - let task_backend = Arc::downgrade(&backend); - let unpin_worker = crate::client::notification_pinning::NotificationPinningWorker::new( - rx, - task_backend.clone(), - ); - spawn_handle.spawn("notification-pinning-worker", None, Box::pin(unpin_worker.work())); + let unpin_worker = NotificationPinningWorker::new(rx, backend.clone()); + spawn_handle.spawn("notification-pinning-worker", None, Box::pin(unpin_worker.run())); Ok(Client { backend, diff --git a/substrate/client/service/src/client/notification_pinning.rs b/substrate/client/service/src/client/notification_pinning.rs index 3c50bfd36ff3..b7d3ad383624 100644 --- a/substrate/client/service/src/client/notification_pinning.rs +++ b/substrate/client/service/src/client/notification_pinning.rs @@ -23,7 +23,10 @@ //! recipient of notifications should receive the chance to act upon them. In addition, notification //! listeners can hold onto a [`sc_client_api::UnpinHandle`] to keep a block pinned. Once the handle //! is dropped, a message is sent and the worker unpins the respective block. -use std::{marker::PhantomData, sync::Weak}; +use std::{ + marker::PhantomData, + sync::{Arc, Weak}, +}; use futures::StreamExt; use sc_client_api::{Backend, UnpinWorkerMessage}; @@ -115,7 +118,7 @@ impl> Limiter /// Worker for the handling of notification pinning. /// -/// It receives messages from a receiver and pins/unping based on the incoming messages. +/// It receives messages from a receiver and pins/unpins based on the incoming messages. /// All notification related unpinning should go through this worker. If the maximum number of /// notification pins is reached, the block from the oldest notification is unpinned. pub struct NotificationPinningWorker> { @@ -128,13 +131,16 @@ impl> NotificationPinningWorker /// Creates a new `NotificationPinningWorker`. pub fn new( unpin_message_rx: TracingUnboundedReceiver>, - task_backend: Weak, + task_backend: Arc, ) -> Self { let pinned_blocks = schnellru::LruMap::>::new( - UnpinningByLengthLimiter::new(NOTIFICATION_PINNING_LIMIT, task_backend.clone()), + UnpinningByLengthLimiter::new( + NOTIFICATION_PINNING_LIMIT, + Arc::downgrade(&task_backend), + ), ); - Self { unpin_message_rx, task_backend, pinned_blocks } + Self { unpin_message_rx, task_backend: Arc::downgrade(&task_backend), pinned_blocks } } fn handle_announce_message(&mut self, hash: Block::Hash) { @@ -143,7 +149,7 @@ impl> NotificationPinningWorker } } - fn handle_unpin_message(&mut self, hash: Block::Hash) { + fn handle_unpin_message(&mut self, hash: Block::Hash) -> Result<(), ()> { if let Some(refcount) = self.pinned_blocks.peek_mut(&hash) { *refcount = *refcount - 1; if *refcount == 0 { @@ -154,58 +160,64 @@ impl> NotificationPinningWorker backend.unpin_block(hash); } else { log::debug!(target: LOG_TARGET, "Terminating unpin-worker, backend reference was dropped."); - return + return Err(()) } } else { log::debug!(target: LOG_TARGET, "Received unpin message for already unpinned block. hash = {hash:?}"); - }; + } + Ok(()) } /// Start working on the received messages. /// /// The worker maintains a map which keeps track of the pinned blocks and their reference count. /// Depending upon the received message, it acts to pin/unpin the block. - pub async fn work(mut self) { + pub async fn run(mut self) { while let Some(message) = self.unpin_message_rx.next().await { match message { UnpinWorkerMessage::AnnouncePin(hash) => self.handle_announce_message(hash), - UnpinWorkerMessage::Unpin(hash) => self.handle_unpin_message(hash), + UnpinWorkerMessage::Unpin(hash) => + if self.handle_unpin_message(hash).is_err() { + return + }, } } log::debug!(target: LOG_TARGET, "Terminating unpin-worker, stream terminated.") } } -#[cfg(test)] -impl> NotificationPinningWorker { - fn new_with_limit( - unpin_message_rx: TracingUnboundedReceiver>, - task_backend: Weak, - limit: usize, - ) -> Self { - let pinned_blocks = - schnellru::LruMap::>::new( - UnpinningByLengthLimiter::new(limit, task_backend.clone()), - ); - Self { unpin_message_rx, task_backend, pinned_blocks } - } - - fn lru(&self) -> &schnellru::LruMap> { - &self.pinned_blocks - } -} - #[cfg(test)] mod tests { use std::sync::Arc; - use sc_client_api::Backend; - use sc_utils::mpsc::tracing_unbounded; + use sc_client_api::{Backend, UnpinWorkerMessage}; + use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver}; use sp_core::H256; + use sp_runtime::traits::Block as BlockT; type Block = substrate_test_runtime_client::runtime::Block; - use super::NotificationPinningWorker; + use super::{NotificationPinningWorker, UnpinningByLengthLimiter}; + + impl> NotificationPinningWorker { + fn new_with_limit( + unpin_message_rx: TracingUnboundedReceiver>, + task_backend: Arc, + limit: usize, + ) -> Self { + let pinned_blocks = + schnellru::LruMap::>::new( + UnpinningByLengthLimiter::new(limit, Arc::downgrade(&task_backend)), + ); + Self { unpin_message_rx, task_backend: Arc::downgrade(&task_backend), pinned_blocks } + } + + fn lru( + &self, + ) -> &schnellru::LruMap> { + &self.pinned_blocks + } + } #[test] fn pinning_worker_handles_base_case() { @@ -215,7 +227,7 @@ mod tests { let hash = H256::random(); - let mut worker = NotificationPinningWorker::new(rx, Arc::downgrade(&backend)); + let mut worker = NotificationPinningWorker::new(rx, backend.clone()); // Block got pinned and unpin message should unpin in the backend. let _ = backend.pin_block(hash); @@ -224,7 +236,7 @@ mod tests { worker.handle_announce_message(hash); assert_eq!(worker.lru().len(), 1); - worker.handle_unpin_message(hash); + let _ = worker.handle_unpin_message(hash); assert_eq!(backend.pin_refs(&hash), Some(0)); assert!(worker.lru().is_empty()); @@ -238,7 +250,7 @@ mod tests { let hash = H256::random(); - let mut worker = NotificationPinningWorker::new(rx, Arc::downgrade(&backend)); + let mut worker = NotificationPinningWorker::new(rx, backend.clone()); // Block got pinned multiple times. let _ = backend.pin_block(hash); let _ = backend.pin_block(hash); @@ -250,15 +262,15 @@ mod tests { worker.handle_announce_message(hash); assert_eq!(worker.lru().len(), 1); - worker.handle_unpin_message(hash); + let _ = worker.handle_unpin_message(hash); assert_eq!(backend.pin_refs(&hash), Some(2)); - worker.handle_unpin_message(hash); + let _ = worker.handle_unpin_message(hash); assert_eq!(backend.pin_refs(&hash), Some(1)); - worker.handle_unpin_message(hash); + let _ = worker.handle_unpin_message(hash); assert_eq!(backend.pin_refs(&hash), Some(0)); assert!(worker.lru().is_empty()); - worker.handle_unpin_message(hash); + let _ = worker.handle_unpin_message(hash); assert_eq!(backend.pin_refs(&hash), Some(0)); } @@ -271,7 +283,7 @@ mod tests { let hash = H256::random(); let hash2 = H256::random(); - let mut worker = NotificationPinningWorker::new(rx, Arc::downgrade(&backend)); + let mut worker = NotificationPinningWorker::new(rx, backend.clone()); // Block was announced once but unpinned multiple times. The worker should ignore the // additional unpins. let _ = backend.pin_block(hash); @@ -282,13 +294,13 @@ mod tests { worker.handle_announce_message(hash); assert_eq!(worker.lru().len(), 1); - worker.handle_unpin_message(hash); + let _ = worker.handle_unpin_message(hash); assert_eq!(backend.pin_refs(&hash), Some(2)); - worker.handle_unpin_message(hash); + let _ = worker.handle_unpin_message(hash); assert_eq!(backend.pin_refs(&hash), Some(2)); assert!(worker.lru().is_empty()); - worker.handle_unpin_message(hash2); + let _ = worker.handle_unpin_message(hash2); assert!(worker.lru().is_empty()); assert_eq!(backend.pin_refs(&hash2), None); } @@ -305,7 +317,7 @@ mod tests { let hash4 = H256::random(); // Only two items fit into the cache. - let mut worker = NotificationPinningWorker::new_with_limit(rx, Arc::downgrade(&backend), 2); + let mut worker = NotificationPinningWorker::new_with_limit(rx, backend.clone(), 2); // Multiple blocks are announced but the cache size is too small. We expect that blocks // are evicted by the cache and unpinned in the backend. From 795d21099dc7c383dbda911a8d4fef0dc4cb6a84 Mon Sep 17 00:00:00 2001 From: Sebastian Kunert Date: Mon, 26 Feb 2024 10:08:11 +0100 Subject: [PATCH 6/6] Update substrate/client/service/src/client/notification_pinning.rs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: André Silva <123550+andresilva@users.noreply.github.com> --- substrate/client/service/src/client/notification_pinning.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/substrate/client/service/src/client/notification_pinning.rs b/substrate/client/service/src/client/notification_pinning.rs index b7d3ad383624..80de91c02f1a 100644 --- a/substrate/client/service/src/client/notification_pinning.rs +++ b/substrate/client/service/src/client/notification_pinning.rs @@ -38,7 +38,7 @@ use sp_runtime::traits::Block as BlockT; const LOG_TARGET: &str = "db::notification_pinning"; const NOTIFICATION_PINNING_LIMIT: usize = 1024; -/// A limiter which automatically unpins blockst hat leave the data structure. +/// A limiter which automatically unpins blocks that leave the data structure. #[derive(Clone, Debug)] struct UnpinningByLengthLimiter> { max_length: usize,