Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce Notification block pinning limit #2935

Merged
merged 10 commits into from
Feb 26, 2024
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

27 changes: 20 additions & 7 deletions substrate/client/api/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ impl fmt::Display for UsageInfo {
pub struct UnpinHandleInner<Block: BlockT> {
/// Hash of the block pinned by this handle
hash: Block::Hash,
unpin_worker_sender: TracingUnboundedSender<Block::Hash>,
unpin_worker_sender: TracingUnboundedSender<UnpinWorkerMessage<Block>>,
}

impl<Block: BlockT> Debug for UnpinHandleInner<Block> {
Expand All @@ -291,20 +291,33 @@ impl<Block: BlockT> UnpinHandleInner<Block> {
/// Create a new [`UnpinHandleInner`]
pub fn new(
hash: Block::Hash,
unpin_worker_sender: TracingUnboundedSender<Block::Hash>,
unpin_worker_sender: TracingUnboundedSender<UnpinWorkerMessage<Block>>,
) -> Self {
Self { hash, unpin_worker_sender }
}
}

impl<Block: BlockT> Drop for UnpinHandleInner<Block> {
fn drop(&mut self) {
if let Err(err) = self.unpin_worker_sender.unbounded_send(self.hash) {
if let Err(err) =
self.unpin_worker_sender.unbounded_send(UnpinWorkerMessage::Unpin(self.hash))
{
log::debug!(target: "db", "Unable to unpin block with hash: {}, error: {:?}", self.hash, err);
};
}
}

/// Message that signals notification-based pinning actions to the pinning-worker.
///
/// When the notification is dropped, an `Unpin` message should be sent to the worker.
#[derive(Debug)]
pub enum UnpinWorkerMessage<Block: BlockT> {
/// Should be sent when a import or finality notification is created.
AnnouncePin(Block::Hash),
/// Should be sent when a import or finality notification is dropped.
Unpin(Block::Hash),
}

/// Keeps a specific block pinned while the handle is alive.
/// Once the last handle instance for a given block is dropped, the
/// block is unpinned in the [`Backend`](crate::backend::Backend::unpin_block).
Expand All @@ -315,7 +328,7 @@ impl<Block: BlockT> UnpinHandle<Block> {
/// Create a new [`UnpinHandle`]
pub fn new(
hash: Block::Hash,
unpin_worker_sender: TracingUnboundedSender<Block::Hash>,
unpin_worker_sender: TracingUnboundedSender<UnpinWorkerMessage<Block>>,
) -> UnpinHandle<Block> {
UnpinHandle(Arc::new(UnpinHandleInner::new(hash, unpin_worker_sender)))
}
Expand Down Expand Up @@ -353,7 +366,7 @@ impl<Block: BlockT> BlockImportNotification<Block> {
header: Block::Header,
is_new_best: bool,
tree_route: Option<Arc<sp_blockchain::TreeRoute<Block>>>,
unpin_worker_sender: TracingUnboundedSender<Block::Hash>,
unpin_worker_sender: TracingUnboundedSender<UnpinWorkerMessage<Block>>,
) -> Self {
Self {
hash,
Expand Down Expand Up @@ -412,7 +425,7 @@ impl<Block: BlockT> FinalityNotification<Block> {
/// Create finality notification from finality summary.
pub fn from_summary(
mut summary: FinalizeSummary<Block>,
unpin_worker_sender: TracingUnboundedSender<Block::Hash>,
unpin_worker_sender: TracingUnboundedSender<UnpinWorkerMessage<Block>>,
) -> FinalityNotification<Block> {
let hash = summary.finalized.pop().unwrap_or_default();
FinalityNotification {
Expand All @@ -436,7 +449,7 @@ impl<Block: BlockT> BlockImportNotification<Block> {
/// Create finality notification from finality summary.
pub fn from_summary(
summary: ImportSummary<Block>,
unpin_worker_sender: TracingUnboundedSender<Block::Hash>,
unpin_worker_sender: TracingUnboundedSender<UnpinWorkerMessage<Block>>,
) -> BlockImportNotification<Block> {
let hash = summary.hash;
BlockImportNotification {
Expand Down
2 changes: 1 addition & 1 deletion substrate/client/db/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2531,7 +2531,7 @@ impl<Block: BlockT> sc_client_api::backend::Backend<Block> for Backend<Block> {
self.storage.state_db.pin(&hash, number.saturated_into::<u64>(), hint).map_err(
|_| {
sp_blockchain::Error::UnknownBlock(format!(
"State already discarded for `{:?}`",
"Unable to pin: state already discarded for `{:?}`",
hash
))
},
Expand Down
2 changes: 1 addition & 1 deletion substrate/client/db/src/pinned_blocks_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use schnellru::{Limiter, LruMap};
use sp_runtime::{traits::Block as BlockT, Justifications};

const LOG_TARGET: &str = "db::pin";
const PINNING_CACHE_SIZE: usize = 1024;
const PINNING_CACHE_SIZE: usize = 2048;
skunert marked this conversation as resolved.
Show resolved Hide resolved

/// Entry for pinned blocks cache.
struct PinnedBlockCacheEntry<Block: BlockT> {
Expand Down
1 change: 1 addition & 0 deletions substrate/client/service/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ tokio = { version = "1.22.0", features = ["parking_lot", "rt-multi-thread", "tim
tempfile = "3.1.0"
directories = "5.0.1"
static_init = "1.0.3"
schnellru = "0.2.1"

[dev-dependencies]
substrate-test-runtime-client = { path = "../../test-utils/runtime/client" }
Expand Down
53 changes: 28 additions & 25 deletions substrate/client/service/src/client/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
//! Substrate Client

use super::block_rules::{BlockRules, LookupResult as BlockLookupResult};
use futures::{FutureExt, StreamExt};
use log::{error, info, trace, warn};
use crate::client::notification_pinning::NotificationPinningWorker;
use log::{debug, info, trace, warn};
use parking_lot::{Mutex, RwLock};
use prometheus_endpoint::Registry;
use rand::Rng;
Expand All @@ -38,7 +38,7 @@ use sc_client_api::{
execution_extensions::ExecutionExtensions,
notifications::{StorageEventStream, StorageNotifications},
CallExecutor, ExecutorProvider, KeysIter, OnFinalityAction, OnImportAction, PairsIter,
ProofProvider, UsageProvider,
ProofProvider, UnpinWorkerMessage, UsageProvider,
};
use sc_consensus::{
BlockCheckParams, BlockImportParams, ForkChoiceStrategy, ImportResult, StateAction,
Expand Down Expand Up @@ -114,7 +114,7 @@ where
block_rules: BlockRules<Block>,
config: ClientConfig<Block>,
telemetry: Option<TelemetryHandle>,
unpin_worker_sender: TracingUnboundedSender<Block::Hash>,
unpin_worker_sender: TracingUnboundedSender<UnpinWorkerMessage<Block>>,
_phantom: PhantomData<RA>,
}

Expand Down Expand Up @@ -326,19 +326,35 @@ where
// dropped, the block will be unpinned automatically.
if let Some(ref notification) = finality_notification {
if let Err(err) = self.backend.pin_block(notification.hash) {
error!(
debug!(
"Unable to pin block for finality notification. hash: {}, Error: {}",
notification.hash, err
);
};
} else {
let _ = self
.unpin_worker_sender
.unbounded_send(UnpinWorkerMessage::AnnouncePin(notification.hash))
.map_err(|e| {
log::error!(
"Unable to send AnnouncePin worker message for finality: {e}"
)
});
}
}

if let Some(ref notification) = import_notification {
if let Err(err) = self.backend.pin_block(notification.hash) {
error!(
debug!(
"Unable to pin block for import notification. hash: {}, Error: {}",
notification.hash, err
);
} else {
let _ = self
.unpin_worker_sender
.unbounded_send(UnpinWorkerMessage::AnnouncePin(notification.hash))
.map_err(|e| {
log::error!("Unable to send AnnouncePin worker message for import: {e}")
});
};
}

Expand Down Expand Up @@ -416,25 +432,12 @@ where
backend.commit_operation(op)?;
}

let (unpin_worker_sender, mut rx) =
tracing_unbounded::<Block::Hash>("unpin-worker-channel", 10_000);
let task_backend = Arc::downgrade(&backend);
spawn_handle.spawn(
"unpin-worker",
None,
async move {
while let Some(message) = rx.next().await {
if let Some(backend) = task_backend.upgrade() {
backend.unpin_block(message);
} else {
log::debug!("Terminating unpin-worker, backend reference was dropped.");
return
}
}
log::debug!("Terminating unpin-worker, stream terminated.")
}
.boxed(),
let (unpin_worker_sender, rx) = tracing_unbounded::<UnpinWorkerMessage<Block>>(
"notification-pinning-worker-channel",
10_000,
);
let unpin_worker = NotificationPinningWorker::new(rx, backend.clone());
spawn_handle.spawn("notification-pinning-worker", None, Box::pin(unpin_worker.run()));

Ok(Client {
backend,
Expand Down
1 change: 1 addition & 0 deletions substrate/client/service/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
mod block_rules;
mod call_executor;
mod client;
mod notification_pinning;
mod wasm_override;
mod wasm_substitutes;

Expand Down
Loading
Loading