Skip to content

Commit

Permalink
Enable parallel key scraping (paritytech#1985)
Browse files Browse the repository at this point in the history
closes paritytech#174

---------

Co-authored-by: Liam Aharon <[email protected]>
Co-authored-by: Oliver Tale-Yazdi <[email protected]>
  • Loading branch information
3 people committed Nov 29, 2023
1 parent 2566569 commit b027d01
Showing 1 changed file with 156 additions and 37 deletions.
193 changes: 156 additions & 37 deletions substrate/utils/frame/remote-externalities/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ use std::{
fs,
ops::{Deref, DerefMut},
path::{Path, PathBuf},
sync::Arc,
time::{Duration, Instant},
};
use substrate_rpc_client::{rpc_params, BatchRequestBuilder, ChainApi, ClientT, StateApi};
Expand Down Expand Up @@ -298,6 +299,7 @@ impl Default for SnapshotConfig {
}

/// Builder for remote-externalities.
#[derive(Clone)]
pub struct Builder<B: BlockT> {
/// Custom key-pairs to be injected into the final externalities. The *hashed* keys and values
/// must be given.
Expand Down Expand Up @@ -400,41 +402,134 @@ where
})
}

/// Get all the keys at `prefix` at `hash` using the paged, safe RPC methods.
async fn rpc_get_keys_paged(
/// Get keys with `prefix` at `block` in a parallel manner.
async fn rpc_get_keys_parallel(
&self,
prefix: StorageKey,
at: B::Hash,
prefix: &StorageKey,
block: B::Hash,
parallel: usize,
) -> Result<Vec<StorageKey>, &'static str> {
/// Divide the workload and return the start key of each chunks. Guaranteed to return a
/// non-empty list.
fn gen_start_keys(prefix: &StorageKey) -> Vec<StorageKey> {
let mut prefix = prefix.as_ref().to_vec();
let scale = 32usize.saturating_sub(prefix.len());

// no need to divide workload
if scale < 9 {
prefix.extend(vec![0; scale]);
return vec![StorageKey(prefix)]
}

let chunks = 16;
let step = 0x10000 / chunks;
let ext = scale - 2;

(0..chunks)
.map(|i| {
let mut key = prefix.clone();
let start = i * step;
key.extend(vec![(start >> 8) as u8, (start & 0xff) as u8]);
key.extend(vec![0; ext]);
StorageKey(key)
})
.collect()
}

let start_keys = gen_start_keys(&prefix);
let start_keys: Vec<Option<&StorageKey>> = start_keys.iter().map(Some).collect();
let mut end_keys: Vec<Option<&StorageKey>> = start_keys[1..].to_vec();
end_keys.push(None);

// use a semaphore to limit max scraping tasks
let parallel = Arc::new(tokio::sync::Semaphore::new(parallel));
let builder = Arc::new(self.clone());
let mut handles = vec![];

for (start_key, end_key) in start_keys.into_iter().zip(end_keys) {
let permit = parallel
.clone()
.acquire_owned()
.await
.expect("semaphore is not closed until the end of loop");

let builder = builder.clone();
let prefix = prefix.clone();
let start_key = start_key.cloned();
let end_key = end_key.cloned();

let handle = tokio::spawn(async move {
let res = builder
.rpc_get_keys_in_range(&prefix, block, start_key.as_ref(), end_key.as_ref())
.await;
drop(permit);
res
});

handles.push(handle);
}

parallel.close();

let keys = futures::future::join_all(handles)
.await
.into_iter()
.filter_map(|res| match res {
Ok(Ok(keys)) => Some(keys),
_ => None,
})
.flatten()
.collect::<Vec<StorageKey>>();

Ok(keys)
}

/// Get all keys with `prefix` within the given range at `block`.
/// Both `start_key` and `end_key` are optional if you want an open-ended range.
async fn rpc_get_keys_in_range(
&self,
prefix: &StorageKey,
block: B::Hash,
start_key: Option<&StorageKey>,
end_key: Option<&StorageKey>,
) -> Result<Vec<StorageKey>, &'static str> {
let mut last_key: Option<StorageKey> = None;
let mut all_keys: Vec<StorageKey> = vec![];
let keys = loop {
let mut last_key: Option<&StorageKey> = start_key;
let mut keys: Vec<StorageKey> = vec![];

loop {
// This loop can hit the node with very rapid requests, occasionally causing it to
// error out in CI (https://github.com/paritytech/substrate/issues/14129), so we retry.
let retry_strategy =
FixedInterval::new(Self::KEYS_PAGE_RETRY_INTERVAL).take(Self::MAX_RETRIES);
let get_page_closure =
|| self.get_keys_single_page(Some(prefix.clone()), last_key.clone(), at);
let page = Retry::spawn(retry_strategy, get_page_closure).await?;
let page_len = page.len();
|| self.get_keys_single_page(Some(prefix.clone()), last_key.cloned(), block);
let mut page = Retry::spawn(retry_strategy, get_page_closure).await?;

all_keys.extend(page);
// avoid duplicated keys across workloads
if let (Some(last), Some(end)) = (page.last(), end_key) {
if last >= end {
page.retain(|key| key < end);
}
}

let page_len = page.len();
keys.extend(page);
last_key = keys.last();

// scraping out of range or no more matches,
// we are done either way
if page_len < Self::DEFAULT_KEY_DOWNLOAD_PAGE as usize {
log::debug!(target: LOG_TARGET, "last page received: {}", page_len);
break all_keys
} else {
let new_last_key =
all_keys.last().expect("all_keys is populated; has .last(); qed");
log::debug!(
target: LOG_TARGET,
"new total = {}, full page received: {}",
all_keys.len(),
HexDisplay::from(new_last_key)
);
last_key = Some(new_last_key.clone());
};
};
break
}

log::debug!(
target: LOG_TARGET,
"new total = {}, full page received: {}",
keys.len(),
HexDisplay::from(last_key.expect("full page received, cannot be None"))
);
}

Ok(keys)
}
Expand Down Expand Up @@ -529,7 +624,7 @@ where
"Batch request failed ({}/{} retries). Error: {}",
retries,
Self::MAX_RETRIES,
e.to_string()
e
);
// after 2 subsequent failures something very wrong is happening. log a warning
// and reset the batch size down to 1.
Expand Down Expand Up @@ -590,16 +685,18 @@ where
/// map them to values one by one.
///
/// This can work with public nodes. But, expect it to be darn slow.
pub(crate) async fn rpc_get_pairs_paged(
pub(crate) async fn rpc_get_pairs(
&self,
prefix: StorageKey,
at: B::Hash,
pending_ext: &mut TestExternalities<HashingFor<B>>,
) -> Result<Vec<KeyValue>, &'static str> {
let start = Instant::now();
let mut sp = Spinner::with_timer(Spinners::Dots, "Scraping keys...".into());
// TODO We could start downloading when having collected the first batch of keys
// https://github.com/paritytech/polkadot-sdk/issues/2494
let keys = self
.rpc_get_keys_paged(prefix.clone(), at)
.rpc_get_keys_parallel(&prefix, at, Self::PARALLEL_REQUESTS)
.await?
.into_iter()
.collect::<Vec<_>>();
Expand Down Expand Up @@ -628,9 +725,9 @@ where
.unwrap()
.progress_chars("=>-"),
);
let payloads_chunked = payloads.chunks((&payloads.len() / Self::PARALLEL_REQUESTS).max(1));
let payloads_chunked = payloads.chunks((payloads.len() / Self::PARALLEL_REQUESTS).max(1));
let requests = payloads_chunked.map(|payload_chunk| {
Self::get_storage_data_dynamic_batch_size(&client, payload_chunk.to_vec(), &bar)
Self::get_storage_data_dynamic_batch_size(client, payload_chunk.to_vec(), &bar)
});
// Execute the requests and move the Result outside.
let storage_data_result: Result<Vec<_>, _> =
Expand All @@ -644,7 +741,7 @@ where
},
};
bar.finish_with_message("✅ Downloaded key values");
print!("\n");
println!();

// Check if we got responses for all submitted requests.
assert_eq!(keys.len(), storage_data.len());
Expand Down Expand Up @@ -778,8 +875,9 @@ where
pending_ext: &mut TestExternalities<HashingFor<B>>,
) -> Result<ChildKeyValues, &'static str> {
let child_roots = top_kv
.into_iter()
.filter_map(|(k, _)| is_default_child_storage_key(k.as_ref()).then(|| k.clone()))
.iter()
.filter(|(k, _)| is_default_child_storage_key(k.as_ref()))
.map(|(k, _)| k.clone())
.collect::<Vec<_>>();

if child_roots.is_empty() {
Expand All @@ -799,11 +897,10 @@ where
let mut child_kv = vec![];
for prefixed_top_key in child_roots {
let child_keys =
Self::rpc_child_get_keys(&client, &prefixed_top_key, StorageKey(vec![]), at)
.await?;
Self::rpc_child_get_keys(client, &prefixed_top_key, StorageKey(vec![]), at).await?;

let child_kv_inner =
Self::rpc_child_get_storage_paged(&client, &prefixed_top_key, child_keys, at)
Self::rpc_child_get_storage_paged(client, &prefixed_top_key, child_keys, at)
.await?;

let prefixed_top_key = PrefixedStorageKey::new(prefixed_top_key.clone().0);
Expand Down Expand Up @@ -846,7 +943,7 @@ where
for prefix in &config.hashed_prefixes {
let now = std::time::Instant::now();
let additional_key_values =
self.rpc_get_pairs_paged(StorageKey(prefix.to_vec()), at, pending_ext).await?;
self.rpc_get_pairs(StorageKey(prefix.to_vec()), at, pending_ext).await?;
let elapsed = now.elapsed();
log::info!(
target: LOG_TARGET,
Expand Down Expand Up @@ -1110,7 +1207,7 @@ mod test_prelude {
pub(crate) type Block = RawBlock<ExtrinsicWrapper<Hash>>;

pub(crate) fn init_logger() {
let _ = sp_tracing::try_init_simple();
sp_tracing::try_init_simple();
}
}

Expand Down Expand Up @@ -1440,4 +1537,26 @@ mod remote_tests {
.unwrap()
.execute_with(|| {});
}

#[tokio::test]
async fn can_fetch_in_parallel() {
init_logger();

let uri = String::from("wss://kusama-bridge-hub-rpc.polkadot.io:443");
let mut builder = Builder::<Block>::new()
.mode(Mode::Online(OnlineConfig { transport: uri.into(), ..Default::default() }));
builder.init_remote_client().await.unwrap();

let at = builder.as_online().at.unwrap();

let prefix = StorageKey(vec![13]);
let paged = builder.rpc_get_keys_in_range(&prefix, at, None, None).await.unwrap();
let para = builder.rpc_get_keys_parallel(&prefix, at, 4).await.unwrap();
assert_eq!(paged, para);

let prefix = StorageKey(vec![]);
let paged = builder.rpc_get_keys_in_range(&prefix, at, None, None).await.unwrap();
let para = builder.rpc_get_keys_parallel(&prefix, at, 8).await.unwrap();
assert_eq!(paged, para);
}
}

0 comments on commit b027d01

Please sign in to comment.