From dd41c07bc70a325931e59cefec93b081bb0e22b7 Mon Sep 17 00:00:00 2001 From: buffalu <85544055+buffalu@users.noreply.github.com> Date: Tue, 17 May 2022 01:21:05 -0500 Subject: [PATCH] Speedup bigtable block upload by factor of 8-10x (#24534) Added multiple blockstore read threads. Run the bigtable upload in tokio::spawn context. Run bigtable tx and tx-by-addr uploads in tokio::spawn context. (cherry picked from commit 6bcadc755ee1f8ef50362ac8b145e940c1a973ad) # Conflicts: # Cargo.lock # programs/bpf/Cargo.lock # storage-bigtable/Cargo.toml --- Cargo.lock | 6 ++ ledger/src/bigtable_upload.rs | 135 ++++++++++++++++++++++++---------- programs/bpf/Cargo.lock | 6 ++ storage-bigtable/Cargo.toml | 15 ++++ storage-bigtable/src/lib.rs | 53 ++++++++++--- 5 files changed, 165 insertions(+), 50 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c15a1011df758b..ff38d71222ef7a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5950,6 +5950,7 @@ dependencies = [ "bzip2", "enum-iterator", "flate2", + "futures 0.3.21", "goauth", "log", "openssl", @@ -5963,7 +5964,12 @@ dependencies = [ "solana-storage-proto", "solana-transaction-status", "thiserror", +<<<<<<< HEAD "tonic 0.7.1", +======= + "tokio", + "tonic 0.7.2", +>>>>>>> 6bcadc755 (Speedup bigtable block upload by factor of 8-10x (#24534)) "zstd", ] diff --git a/ledger/src/bigtable_upload.rs b/ledger/src/bigtable_upload.rs index f2481351f07bbb..f1145a4eef8245 100644 --- a/ledger/src/bigtable_upload.rs +++ b/ledger/src/bigtable_upload.rs @@ -1,18 +1,18 @@ use { crate::blockstore::Blockstore, - crossbeam_channel::bounded, + crossbeam_channel::{bounded, unbounded}, log::*, solana_measure::measure::Measure, solana_sdk::clock::Slot, std::{ - cmp::min, + cmp::{max, min}, collections::HashSet, result::Result, sync::{ atomic::{AtomicBool, Ordering}, Arc, }, - time::Duration, + time::{Duration, Instant}, }, }; @@ -26,16 +26,21 @@ pub struct ConfirmedBlockUploadConfig { impl Default for ConfirmedBlockUploadConfig { fn default() -> Self { - const NUM_BLOCKS_TO_UPLOAD_IN_PARALLEL: usize = 32; + let num_blocks_to_upload_in_parallel = num_cpus::get() / 2; ConfirmedBlockUploadConfig { force_reupload: false, - max_num_slots_to_check: NUM_BLOCKS_TO_UPLOAD_IN_PARALLEL * 4, - num_blocks_to_upload_in_parallel: NUM_BLOCKS_TO_UPLOAD_IN_PARALLEL, - block_read_ahead_depth: NUM_BLOCKS_TO_UPLOAD_IN_PARALLEL * 2, + max_num_slots_to_check: num_blocks_to_upload_in_parallel * 4, + num_blocks_to_upload_in_parallel, + block_read_ahead_depth: num_blocks_to_upload_in_parallel * 2, } } } +struct BlockstoreLoadStats { + pub num_blocks_read: usize, + pub elapsed: Duration, +} + pub async fn upload_confirmed_blocks( blockstore: Arc, bigtable: solana_storage_bigtable::LedgerStorage, @@ -147,42 +152,56 @@ pub async fn upload_confirmed_blocks( last_slot ); - // Load the blocks out of blockstore in a separate thread to allow for concurrent block uploading - let (_loader_thread, receiver) = { + // Distribute the blockstore reading across a few background threads to speed up the bigtable uploading + let (loader_threads, receiver): (Vec<_>, _) = { let exit = exit.clone(); let (sender, receiver) = bounded(config.block_read_ahead_depth); + + let (slot_sender, slot_receiver) = unbounded(); + let _ = blocks_to_upload + .into_iter() + .for_each(|b| slot_sender.send(b).unwrap()); + drop(slot_sender); + ( - std::thread::spawn(move || { - let mut measure = Measure::start("block loader thread"); - for (i, slot) in blocks_to_upload.iter().enumerate() { - if exit.load(Ordering::Relaxed) { - break; - } + (0..config.num_blocks_to_upload_in_parallel) + .map(|_| { + let blockstore = blockstore.clone(); + let sender = sender.clone(); + let slot_receiver = slot_receiver.clone(); + let exit = exit.clone(); - let _ = match blockstore.get_rooted_block(*slot, true) { - Ok(confirmed_block) => sender.send((*slot, Some(confirmed_block))), - Err(err) => { - warn!( - "Failed to get load confirmed block from slot {}: {:?}", - slot, err - ); - sender.send((*slot, None)) + std::thread::spawn(move || { + let start = Instant::now(); + let mut num_blocks_read = 0; + + while let Ok(slot) = slot_receiver.recv() { + if exit.load(Ordering::Relaxed) { + break; + } + + let _ = match blockstore.get_rooted_block(slot, true) { + Ok(confirmed_block) => { + num_blocks_read += 1; + sender.send((slot, Some(confirmed_block))) + } + Err(err) => { + warn!( + "Failed to get load confirmed block from slot {}: {:?}", + slot, err + ); + sender.send((slot, None)) + } + }; } - }; - - if i > 0 && i % config.num_blocks_to_upload_in_parallel == 0 { - info!( - "{}% of blocks processed ({}/{})", - i * 100 / blocks_to_upload.len(), - i, - blocks_to_upload.len() - ); - } - } - measure.stop(); - info!("{} to load {} blocks", measure, blocks_to_upload.len()); - }), + BlockstoreLoadStats { + num_blocks_read, + elapsed: start.elapsed(), + } + }) + }) + .collect(), receiver, ) }; @@ -207,12 +226,20 @@ pub async fn upload_confirmed_blocks( num_blocks -= 1; None } - Some(confirmed_block) => Some(bigtable.upload_confirmed_block(slot, confirmed_block)), + Some(confirmed_block) => { + let bt = bigtable.clone(); + Some(tokio::spawn(async move { + bt.upload_confirmed_block(slot, confirmed_block).await + })) + } }); for result in futures::future::join_all(uploads).await { - if result.is_err() { - error!("upload_confirmed_block() failed: {:?}", result.err()); + if let Err(err) = result { + error!("upload_confirmed_block() join failed: {:?}", err); + failures += 1; + } else if let Err(err) = result.unwrap() { + error!("upload_confirmed_block() upload failed: {:?}", err); failures += 1; } } @@ -223,6 +250,34 @@ pub async fn upload_confirmed_blocks( measure.stop(); info!("{}", measure); + + let blockstore_results = loader_threads.into_iter().map(|t| t.join()); + + let mut blockstore_num_blocks_read = 0; + let mut blockstore_load_wallclock = Duration::default(); + let mut blockstore_errors = 0; + + for r in blockstore_results { + match r { + Ok(stats) => { + blockstore_num_blocks_read += stats.num_blocks_read; + blockstore_load_wallclock = max(stats.elapsed, blockstore_load_wallclock); + } + Err(e) => { + error!("error joining blockstore thread: {:?}", e); + blockstore_errors += 1; + } + } + } + + info!( + "blockstore upload took {:?} for {} blocks ({:.2} blocks/s) errors: {}", + blockstore_load_wallclock, + blockstore_num_blocks_read, + blockstore_num_blocks_read as f64 / blockstore_load_wallclock.as_secs_f64(), + blockstore_errors + ); + if failures > 0 { Err(format!("Incomplete upload, {} operations failed", failures).into()) } else { diff --git a/programs/bpf/Cargo.lock b/programs/bpf/Cargo.lock index ffc7be6d90c38d..aea2d4738f1b53 100644 --- a/programs/bpf/Cargo.lock +++ b/programs/bpf/Cargo.lock @@ -5227,6 +5227,7 @@ dependencies = [ "bzip2", "enum-iterator", "flate2", + "futures 0.3.21", "goauth", "log", "openssl", @@ -5240,7 +5241,12 @@ dependencies = [ "solana-storage-proto", "solana-transaction-status", "thiserror", +<<<<<<< HEAD "tonic 0.7.1", +======= + "tokio", + "tonic 0.7.2", +>>>>>>> 6bcadc755 (Speedup bigtable block upload by factor of 8-10x (#24534)) "zstd", ] diff --git a/storage-bigtable/Cargo.toml b/storage-bigtable/Cargo.toml index 963e5a6cd845e5..4c8551eae69a37 100644 --- a/storage-bigtable/Cargo.toml +++ b/storage-bigtable/Cargo.toml @@ -13,11 +13,20 @@ edition = "2021" backoff = { version = "0.4.0", features = ["tokio"] } bincode = "1.3.3" bzip2 = "0.4.3" +<<<<<<< HEAD enum-iterator = "0.7.0" flate2 = "1.0.22" goauth = "0.11.1" log = "0.4.14" prost = "0.10.0" +======= +enum-iterator = "0.8.1" +flate2 = "1.0.23" +futures = "0.3.21" +goauth = "0.12.0" +log = "0.4.17" +prost = "0.10.3" +>>>>>>> 6bcadc755 (Speedup bigtable block upload by factor of 8-10x (#24534)) prost-types = "0.10.0" serde = "1.0.136" serde_derive = "1.0.103" @@ -27,8 +36,14 @@ solana-sdk = { path = "../sdk", version = "=1.10.15" } solana-storage-proto = { path = "../storage-proto", version = "=1.10.15" } solana-transaction-status = { path = "../transaction-status", version = "=1.10.15" } thiserror = "1.0" +<<<<<<< HEAD tonic = { version = "0.7.1", features = ["tls", "transport"] } zstd = "0.11.1" +======= +tokio = "~1.14.1" +tonic = { version = "0.7.2", features = ["tls", "transport"] } +zstd = "0.11.2" +>>>>>>> 6bcadc755 (Speedup bigtable block upload by factor of 8-10x (#24534)) # openssl is a dependency of the goauth and smpl_jwt crates, but explicitly # declare it here as well to activate the "vendored" feature that builds OpenSSL diff --git a/storage-bigtable/src/lib.rs b/storage-bigtable/src/lib.rs index 68dca350e409a4..26a5bd55ca2a9e 100644 --- a/storage-bigtable/src/lib.rs +++ b/storage-bigtable/src/lib.rs @@ -1,4 +1,5 @@ #![allow(clippy::integer_arithmetic)] + use { crate::bigtable::RowKey, log::*, @@ -25,6 +26,7 @@ use { convert::TryInto, }, thiserror::Error, + tokio::task::JoinError, }; #[macro_use] @@ -54,6 +56,9 @@ pub enum Error { #[error("Signature not found")] SignatureNotFound, + + #[error("tokio error")] + TokioJoinError(JoinError), } impl std::convert::From for Error { @@ -736,8 +741,6 @@ impl LedgerStorage { slot: Slot, confirmed_block: VersionedConfirmedBlock, ) -> Result<()> { - let mut bytes_written = 0; - let mut by_addr: HashMap<&Pubkey, Vec> = HashMap::new(); let mut tx_cells = vec![]; @@ -789,21 +792,51 @@ impl LedgerStorage { }) .collect(); + let mut tasks = vec![]; + if !tx_cells.is_empty() { - bytes_written += self - .connection - .put_bincode_cells_with_retry::("tx", &tx_cells) - .await?; + let conn = self.connection.clone(); + tasks.push(tokio::spawn(async move { + conn.put_bincode_cells_with_retry::("tx", &tx_cells) + .await + })); } if !tx_by_addr_cells.is_empty() { - bytes_written += self - .connection - .put_protobuf_cells_with_retry::( + let conn = self.connection.clone(); + tasks.push(tokio::spawn(async move { + conn.put_protobuf_cells_with_retry::( "tx-by-addr", &tx_by_addr_cells, ) - .await?; + .await + })); + } + + let mut bytes_written = 0; + let mut maybe_first_err: Option = None; + + let results = futures::future::join_all(tasks).await; + for result in results { + match result { + Err(err) => { + if maybe_first_err.is_none() { + maybe_first_err = Some(Error::TokioJoinError(err)); + } + } + Ok(Err(err)) => { + if maybe_first_err.is_none() { + maybe_first_err = Some(Error::BigTableError(err)); + } + } + Ok(Ok(bytes)) => { + bytes_written += bytes; + } + } + } + + if let Some(err) = maybe_first_err { + return Err(err); } let num_transactions = confirmed_block.transactions.len();