Skip to content

Commit

Permalink
Speedup bigtable block upload by factor of 8-10x (#24534)
Browse files Browse the repository at this point in the history
Added multiple blockstore read threads.
Run the bigtable upload in tokio::spawn context.
Run bigtable tx and tx-by-addr uploads in tokio::spawn context.

(cherry picked from commit 6bcadc7)

# Conflicts:
#	Cargo.lock
#	programs/bpf/Cargo.lock
#	storage-bigtable/Cargo.toml
  • Loading branch information
buffalu authored and mergify[bot] committed May 17, 2022
1 parent 37f9d3c commit dd41c07
Show file tree
Hide file tree
Showing 5 changed files with 165 additions and 50 deletions.
6 changes: 6 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

135 changes: 95 additions & 40 deletions ledger/src/bigtable_upload.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
use {
crate::blockstore::Blockstore,
crossbeam_channel::bounded,
crossbeam_channel::{bounded, unbounded},
log::*,
solana_measure::measure::Measure,
solana_sdk::clock::Slot,
std::{
cmp::min,
cmp::{max, min},
collections::HashSet,
result::Result,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
time::Duration,
time::{Duration, Instant},
},
};

Expand All @@ -26,16 +26,21 @@ pub struct ConfirmedBlockUploadConfig {

impl Default for ConfirmedBlockUploadConfig {
fn default() -> Self {
const NUM_BLOCKS_TO_UPLOAD_IN_PARALLEL: usize = 32;
let num_blocks_to_upload_in_parallel = num_cpus::get() / 2;
ConfirmedBlockUploadConfig {
force_reupload: false,
max_num_slots_to_check: NUM_BLOCKS_TO_UPLOAD_IN_PARALLEL * 4,
num_blocks_to_upload_in_parallel: NUM_BLOCKS_TO_UPLOAD_IN_PARALLEL,
block_read_ahead_depth: NUM_BLOCKS_TO_UPLOAD_IN_PARALLEL * 2,
max_num_slots_to_check: num_blocks_to_upload_in_parallel * 4,
num_blocks_to_upload_in_parallel,
block_read_ahead_depth: num_blocks_to_upload_in_parallel * 2,
}
}
}

struct BlockstoreLoadStats {
pub num_blocks_read: usize,
pub elapsed: Duration,
}

pub async fn upload_confirmed_blocks(
blockstore: Arc<Blockstore>,
bigtable: solana_storage_bigtable::LedgerStorage,
Expand Down Expand Up @@ -147,42 +152,56 @@ pub async fn upload_confirmed_blocks(
last_slot
);

// Load the blocks out of blockstore in a separate thread to allow for concurrent block uploading
let (_loader_thread, receiver) = {
// Distribute the blockstore reading across a few background threads to speed up the bigtable uploading
let (loader_threads, receiver): (Vec<_>, _) = {
let exit = exit.clone();

let (sender, receiver) = bounded(config.block_read_ahead_depth);

let (slot_sender, slot_receiver) = unbounded();
let _ = blocks_to_upload
.into_iter()
.for_each(|b| slot_sender.send(b).unwrap());
drop(slot_sender);

(
std::thread::spawn(move || {
let mut measure = Measure::start("block loader thread");
for (i, slot) in blocks_to_upload.iter().enumerate() {
if exit.load(Ordering::Relaxed) {
break;
}
(0..config.num_blocks_to_upload_in_parallel)
.map(|_| {
let blockstore = blockstore.clone();
let sender = sender.clone();
let slot_receiver = slot_receiver.clone();
let exit = exit.clone();

let _ = match blockstore.get_rooted_block(*slot, true) {
Ok(confirmed_block) => sender.send((*slot, Some(confirmed_block))),
Err(err) => {
warn!(
"Failed to get load confirmed block from slot {}: {:?}",
slot, err
);
sender.send((*slot, None))
std::thread::spawn(move || {
let start = Instant::now();
let mut num_blocks_read = 0;

while let Ok(slot) = slot_receiver.recv() {
if exit.load(Ordering::Relaxed) {
break;
}

let _ = match blockstore.get_rooted_block(slot, true) {
Ok(confirmed_block) => {
num_blocks_read += 1;
sender.send((slot, Some(confirmed_block)))
}
Err(err) => {
warn!(
"Failed to get load confirmed block from slot {}: {:?}",
slot, err
);
sender.send((slot, None))
}
};
}
};

if i > 0 && i % config.num_blocks_to_upload_in_parallel == 0 {
info!(
"{}% of blocks processed ({}/{})",
i * 100 / blocks_to_upload.len(),
i,
blocks_to_upload.len()
);
}
}
measure.stop();
info!("{} to load {} blocks", measure, blocks_to_upload.len());
}),
BlockstoreLoadStats {
num_blocks_read,
elapsed: start.elapsed(),
}
})
})
.collect(),
receiver,
)
};
Expand All @@ -207,12 +226,20 @@ pub async fn upload_confirmed_blocks(
num_blocks -= 1;
None
}
Some(confirmed_block) => Some(bigtable.upload_confirmed_block(slot, confirmed_block)),
Some(confirmed_block) => {
let bt = bigtable.clone();
Some(tokio::spawn(async move {
bt.upload_confirmed_block(slot, confirmed_block).await
}))
}
});

for result in futures::future::join_all(uploads).await {
if result.is_err() {
error!("upload_confirmed_block() failed: {:?}", result.err());
if let Err(err) = result {
error!("upload_confirmed_block() join failed: {:?}", err);
failures += 1;
} else if let Err(err) = result.unwrap() {
error!("upload_confirmed_block() upload failed: {:?}", err);
failures += 1;
}
}
Expand All @@ -223,6 +250,34 @@ pub async fn upload_confirmed_blocks(

measure.stop();
info!("{}", measure);

let blockstore_results = loader_threads.into_iter().map(|t| t.join());

let mut blockstore_num_blocks_read = 0;
let mut blockstore_load_wallclock = Duration::default();
let mut blockstore_errors = 0;

for r in blockstore_results {
match r {
Ok(stats) => {
blockstore_num_blocks_read += stats.num_blocks_read;
blockstore_load_wallclock = max(stats.elapsed, blockstore_load_wallclock);
}
Err(e) => {
error!("error joining blockstore thread: {:?}", e);
blockstore_errors += 1;
}
}
}

info!(
"blockstore upload took {:?} for {} blocks ({:.2} blocks/s) errors: {}",
blockstore_load_wallclock,
blockstore_num_blocks_read,
blockstore_num_blocks_read as f64 / blockstore_load_wallclock.as_secs_f64(),
blockstore_errors
);

if failures > 0 {
Err(format!("Incomplete upload, {} operations failed", failures).into())
} else {
Expand Down
6 changes: 6 additions & 0 deletions programs/bpf/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 15 additions & 0 deletions storage-bigtable/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,20 @@ edition = "2021"
backoff = { version = "0.4.0", features = ["tokio"] }
bincode = "1.3.3"
bzip2 = "0.4.3"
<<<<<<< HEAD
enum-iterator = "0.7.0"
flate2 = "1.0.22"
goauth = "0.11.1"
log = "0.4.14"
prost = "0.10.0"
=======
enum-iterator = "0.8.1"
flate2 = "1.0.23"
futures = "0.3.21"
goauth = "0.12.0"
log = "0.4.17"
prost = "0.10.3"
>>>>>>> 6bcadc755 (Speedup bigtable block upload by factor of 8-10x (#24534))
prost-types = "0.10.0"
serde = "1.0.136"
serde_derive = "1.0.103"
Expand All @@ -27,8 +36,14 @@ solana-sdk = { path = "../sdk", version = "=1.10.15" }
solana-storage-proto = { path = "../storage-proto", version = "=1.10.15" }
solana-transaction-status = { path = "../transaction-status", version = "=1.10.15" }
thiserror = "1.0"
<<<<<<< HEAD
tonic = { version = "0.7.1", features = ["tls", "transport"] }
zstd = "0.11.1"
=======
tokio = "~1.14.1"
tonic = { version = "0.7.2", features = ["tls", "transport"] }
zstd = "0.11.2"
>>>>>>> 6bcadc755 (Speedup bigtable block upload by factor of 8-10x (#24534))

# openssl is a dependency of the goauth and smpl_jwt crates, but explicitly
# declare it here as well to activate the "vendored" feature that builds OpenSSL
Expand Down
53 changes: 43 additions & 10 deletions storage-bigtable/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#![allow(clippy::integer_arithmetic)]

use {
crate::bigtable::RowKey,
log::*,
Expand All @@ -25,6 +26,7 @@ use {
convert::TryInto,
},
thiserror::Error,
tokio::task::JoinError,
};

#[macro_use]
Expand Down Expand Up @@ -54,6 +56,9 @@ pub enum Error {

#[error("Signature not found")]
SignatureNotFound,

#[error("tokio error")]
TokioJoinError(JoinError),
}

impl std::convert::From<bigtable::Error> for Error {
Expand Down Expand Up @@ -736,8 +741,6 @@ impl LedgerStorage {
slot: Slot,
confirmed_block: VersionedConfirmedBlock,
) -> Result<()> {
let mut bytes_written = 0;

let mut by_addr: HashMap<&Pubkey, Vec<TransactionByAddrInfo>> = HashMap::new();

let mut tx_cells = vec![];
Expand Down Expand Up @@ -789,21 +792,51 @@ impl LedgerStorage {
})
.collect();

let mut tasks = vec![];

if !tx_cells.is_empty() {
bytes_written += self
.connection
.put_bincode_cells_with_retry::<TransactionInfo>("tx", &tx_cells)
.await?;
let conn = self.connection.clone();
tasks.push(tokio::spawn(async move {
conn.put_bincode_cells_with_retry::<TransactionInfo>("tx", &tx_cells)
.await
}));
}

if !tx_by_addr_cells.is_empty() {
bytes_written += self
.connection
.put_protobuf_cells_with_retry::<tx_by_addr::TransactionByAddr>(
let conn = self.connection.clone();
tasks.push(tokio::spawn(async move {
conn.put_protobuf_cells_with_retry::<tx_by_addr::TransactionByAddr>(
"tx-by-addr",
&tx_by_addr_cells,
)
.await?;
.await
}));
}

let mut bytes_written = 0;
let mut maybe_first_err: Option<Error> = None;

let results = futures::future::join_all(tasks).await;
for result in results {
match result {
Err(err) => {
if maybe_first_err.is_none() {
maybe_first_err = Some(Error::TokioJoinError(err));
}
}
Ok(Err(err)) => {
if maybe_first_err.is_none() {
maybe_first_err = Some(Error::BigTableError(err));
}
}
Ok(Ok(bytes)) => {
bytes_written += bytes;
}
}
}

if let Some(err) = maybe_first_err {
return Err(err);
}

let num_transactions = confirmed_block.transactions.len();
Expand Down

0 comments on commit dd41c07

Please sign in to comment.