From 48bf2a586bb5fd7d07ae6a70a8c64b228fe490cf Mon Sep 17 00:00:00 2001 From: Ashley Michal Lewis Date: Fri, 19 Jun 2020 13:56:23 -0500 Subject: [PATCH] move batching into bulk module code --- src/commands/kv/bulk/delete.rs | 31 +++------- src/commands/kv/bulk/put.rs | 20 ++---- src/commands/publish.rs | 54 +++++++++++++--- src/kv/bulk.rs | 110 ++++++++++++++++++++++++++++----- src/main.rs | 6 +- src/preview/upload.rs | 11 ++-- src/sites/mod.rs | 2 - src/sites/upload.rs | 82 ------------------------ 8 files changed, 158 insertions(+), 158 deletions(-) delete mode 100644 src/sites/upload.rs diff --git a/src/commands/kv/bulk/delete.rs b/src/commands/kv/bulk/delete.rs index e1b864c4b..5bd427932 100644 --- a/src/commands/kv/bulk/delete.rs +++ b/src/commands/kv/bulk/delete.rs @@ -10,7 +10,7 @@ use indicatif::{ProgressBar, ProgressStyle}; use crate::commands::kv; use crate::kv::bulk::delete; -use crate::kv::bulk::MAX_PAIRS; +use crate::kv::bulk::BATCH_KEY_MAX; use crate::settings::global_user::GlobalUser; use crate::settings::toml::Target; use crate::terminal::interactive; @@ -36,12 +36,13 @@ pub fn run( Err(e) => failure::bail!(e), } - let pairs: Result, failure::Error> = match &metadata(filename) { + let keys: Vec = match &metadata(filename) { Ok(file_type) if file_type.is_file() => { let data = fs::read_to_string(filename)?; - let keys_vec = serde_json::from_str(&data); + let keys_vec: Result, serde_json::Error> = + serde_json::from_str(&data); match keys_vec { - Ok(keys_vec) => Ok(keys_vec), + Ok(keys_vec) => keys_vec.iter().map(|kv| { kv.key.to_owned() }).collect(), Err(_) => failure::bail!("Failed to decode JSON. Please make sure to follow the format, [{\"key\": \"test_key\", \"value\": \"test_value\"}, ...]") } } @@ -49,11 +50,11 @@ pub fn run( Err(e) => failure::bail!("{}", e), }; - let mut keys: Vec = pairs?.iter().map(|kv| kv.key.to_owned()).collect(); - let len = keys.len(); + message::working(&format!("deleting {} key value pairs", len)); - let progress_bar = if len > MAX_PAIRS { + + let progress_bar = if len > BATCH_KEY_MAX { let pb = ProgressBar::new(len as u64); pb.set_style(ProgressStyle::default_bar().template("{wide_bar} {pos}/{len}\n{msg}")); Some(pb) @@ -61,21 +62,7 @@ pub fn run( None }; - while !keys.is_empty() { - let p: Vec = if keys.len() > MAX_PAIRS { - keys.drain(0..MAX_PAIRS).collect() - } else { - keys.drain(0..).collect() - }; - - if let Err(e) = delete(target, user, namespace_id, p) { - failure::bail!("{}", e); - } - - if let Some(pb) = &progress_bar { - pb.inc(keys.len() as u64); - } - } + delete(target, user, namespace_id, keys, &progress_bar)?; if let Some(pb) = &progress_bar { pb.finish_with_message(&format!("deleted {} key value pairs", len)); diff --git a/src/commands/kv/bulk/put.rs b/src/commands/kv/bulk/put.rs index 6da0bc1ba..2a6a57f49 100644 --- a/src/commands/kv/bulk/put.rs +++ b/src/commands/kv/bulk/put.rs @@ -10,7 +10,7 @@ use indicatif::{ProgressBar, ProgressStyle}; use crate::commands::kv::validate_target; use crate::kv::bulk::put; -use crate::kv::bulk::MAX_PAIRS; +use crate::kv::bulk::BATCH_KEY_MAX; use crate::settings::global_user::GlobalUser; use crate::settings::toml::Target; use crate::terminal::message; @@ -23,7 +23,7 @@ pub fn run( ) -> Result<(), failure::Error> { validate_target(target)?; - let mut pairs: Vec = match &metadata(filename) { + let pairs: Vec = match &metadata(filename) { Ok(file_type) if file_type.is_file() => { let data = fs::read_to_string(filename)?; let data_vec = serde_json::from_str(&data); @@ -42,7 +42,7 @@ pub fn run( let len = pairs.len(); message::working(&format!("uploading {} key value pairs", len)); - let progress_bar = if len > MAX_PAIRS { + let progress_bar = if len > BATCH_KEY_MAX { let pb = ProgressBar::new(len as u64); pb.set_style(ProgressStyle::default_bar().template("{wide_bar} {pos}/{len}\n{msg}")); Some(pb) @@ -50,19 +50,7 @@ pub fn run( None }; - while !pairs.is_empty() { - let p: Vec = if pairs.len() > MAX_PAIRS { - pairs.drain(0..MAX_PAIRS).collect() - } else { - pairs.drain(0..).collect() - }; - - put(target, &user, namespace_id, &p)?; - - if let Some(pb) = &progress_bar { - pb.inc(p.len() as u64); - } - } + put(target, &user, namespace_id, pairs, &progress_bar)?; if let Some(pb) = &progress_bar { pb.finish_with_message(&format!("uploaded {} key value pairs", len)); diff --git a/src/commands/publish.rs b/src/commands/publish.rs index dd0624143..76993a68e 100644 --- a/src/commands/publish.rs +++ b/src/commands/publish.rs @@ -1,10 +1,12 @@ use std::env; use std::path::{Path, PathBuf}; +use indicatif::{ProgressBar, ProgressStyle}; + use crate::build; use crate::deploy; use crate::http::{self, Feature}; -use crate::kv::bulk::delete; +use crate::kv::bulk; use crate::settings::global_user::GlobalUser; use crate::settings::toml::{DeployConfig, Target}; use crate::sites; @@ -15,7 +17,6 @@ pub fn publish( user: &GlobalUser, target: &mut Target, deploy_config: DeployConfig, - verbose: bool, ) -> Result<(), failure::Error> { validate_target_required_fields_present(target)?; @@ -33,10 +34,27 @@ pub fn publish( sites::sync(target, user, &site_namespace.id, &path)?; // First, upload all existing files in bucket directory - if verbose { - message::info("Preparing to upload updated files..."); + message::working("Uploading site files"); + let upload_progress_bar = if to_upload.len() > bulk::BATCH_KEY_MAX { + let upload_progress_bar = ProgressBar::new(to_upload.len() as u64); + upload_progress_bar + .set_style(ProgressStyle::default_bar().template("{wide_bar} {pos}/{len}\n{msg}")); + Some(upload_progress_bar) + } else { + None + }; + + bulk::put( + target, + user, + &site_namespace.id, + to_upload, + &upload_progress_bar, + )?; + + if let Some(pb) = upload_progress_bar { + pb.finish_with_message("Done Uploading"); } - sites::upload_files(target, user, &site_namespace.id, to_upload)?; let upload_client = http::featured_legacy_auth_client(user, Feature::Sites); @@ -47,11 +65,29 @@ pub fn publish( // Finally, remove any stale files if !to_delete.is_empty() { - if verbose { - message::info("Deleting stale files..."); + message::info("Deleting stale files..."); + + let delete_progress_bar = if to_delete.len() > bulk::BATCH_KEY_MAX { + let delete_progress_bar = ProgressBar::new(to_delete.len() as u64); + delete_progress_bar.set_style( + ProgressStyle::default_bar().template("{wide_bar} {pos}/{len}\n{msg}"), + ); + Some(delete_progress_bar) + } else { + None + }; + + bulk::delete( + target, + user, + &site_namespace.id, + to_delete, + &delete_progress_bar, + )?; + + if let Some(pb) = delete_progress_bar { + pb.finish_with_message("Done deleting"); } - - delete(target, user, &site_namespace.id, to_delete)?; } } else { let upload_client = http::legacy_auth_client(user); diff --git a/src/kv/bulk.rs b/src/kv/bulk.rs index bca92a788..dc54c6c52 100644 --- a/src/kv/bulk.rs +++ b/src/kv/bulk.rs @@ -1,5 +1,7 @@ use std::time::Duration; +use indicatif::ProgressBar; + use cloudflare::endpoints::workerskv::delete_bulk::DeleteBulk; use cloudflare::endpoints::workerskv::write_bulk::KeyValuePair; use cloudflare::endpoints::workerskv::write_bulk::WriteBulk; @@ -12,7 +14,11 @@ use crate::http::feature::headers; use crate::settings::global_user::GlobalUser; use crate::settings::toml::Target; -pub const MAX_PAIRS: usize = 10000; +const API_MAX_PAIRS: usize = 10000; +// The consts below are halved from the API's true capacity to help avoid +// hammering it with large requests. +pub const BATCH_KEY_MAX: usize = API_MAX_PAIRS / 2; +const UPLOAD_MAX_SIZE: usize = 50 * 1024 * 1024; // Create a special API client that has a longer timeout than usual, given that KV operations // can be lengthy if payloads are large. @@ -33,18 +39,27 @@ pub fn put( target: &Target, user: &GlobalUser, namespace_id: &str, - pairs: &[KeyValuePair], + pairs: Vec, + progress_bar: &Option, ) -> Result<(), failure::Error> { let client = bulk_api_client(user)?; - match client.request(&WriteBulk { - account_identifier: &target.account_id, - namespace_identifier: namespace_id, - bulk_key_value_pairs: pairs.to_owned(), - }) { - Ok(_) => Ok(()), - Err(e) => failure::bail!("{}", format_error(e)), + for b in batch_keys_values(pairs) { + match client.request(&WriteBulk { + account_identifier: &target.account_id, + namespace_identifier: namespace_id, + bulk_key_value_pairs: b.to_owned(), + }) { + Ok(_) => {} + Err(e) => failure::bail!("{}", format_error(e)), + } + + if let Some(pb) = &progress_bar { + pb.inc(b.len() as u64); + } } + + Ok(()) } pub fn delete( @@ -52,17 +67,78 @@ pub fn delete( user: &GlobalUser, namespace_id: &str, keys: Vec, + progress_bar: &Option, ) -> Result<(), failure::Error> { let client = bulk_api_client(user)?; - let response = client.request(&DeleteBulk { - account_identifier: &target.account_id, - namespace_identifier: namespace_id, - bulk_keys: keys, - }); + for b in batch_keys(keys) { + match client.request(&DeleteBulk { + account_identifier: &target.account_id, + namespace_identifier: namespace_id, + bulk_keys: b.to_owned(), + }) { + Ok(_) => {} + Err(e) => failure::bail!("{}", format_error(e)), + } + + if let Some(pb) = &progress_bar { + pb.inc(b.len() as u64); + } + } + + Ok(()) +} + +fn batch_keys_values(mut pairs: Vec) -> Vec> { + let mut batches: Vec> = Vec::new(); - match response { - Ok(_) => Ok(()), - Err(e) => failure::bail!("{}", format_error(e)), + if !pairs.is_empty() { + // Iterate over all key-value pairs and create batches of uploads, each of which are + // maximum 5K key-value pairs in size OR maximum ~50MB in size. Upload each batch + // as it is created. + let mut key_count = 0; + let mut key_pair_bytes = 0; + let mut key_value_batch: Vec = Vec::new(); + + while !(pairs.is_empty() && key_value_batch.is_empty()) { + if pairs.is_empty() { + // Last batch to upload + batches.push(key_value_batch.to_vec()); + key_value_batch.clear(); + } else { + let pair = pairs.pop().unwrap(); + if key_count + 1 > BATCH_KEY_MAX + // Keep upload size small to keep KV bulk API happy + || key_pair_bytes + pair.key.len() + pair.value.len() > UPLOAD_MAX_SIZE + { + batches.push(key_value_batch.to_vec()); + key_count = 0; + key_pair_bytes = 0; + key_value_batch.clear(); + } + + // Add the popped key-value pair to the running batch of key-value pair uploads + key_count += 1; + key_pair_bytes = key_pair_bytes + pair.key.len() + pair.value.len(); + key_value_batch.push(pair); + } + } + } + + batches +} + +fn batch_keys(mut keys: Vec) -> Vec> { + let mut batches = Vec::new(); + while !keys.is_empty() { + let k: Vec = if keys.len() > BATCH_KEY_MAX { + keys.drain(0..BATCH_KEY_MAX).collect() + } else { + keys.drain(0..).collect() + }; + + batches.push(k); } + + batches } diff --git a/src/main.rs b/src/main.rs index 8dbad19f6..69bae4087 100644 --- a/src/main.rs +++ b/src/main.rs @@ -514,7 +514,7 @@ fn run() -> Result<(), failure::Error> { .long("env") .takes_value(true) ) - .arg(verbose_arg.clone()) + .arg(silent_verbose_arg.clone()) .arg( Arg::with_name("release") .hidden(true) @@ -757,9 +757,7 @@ fn run() -> Result<(), failure::Error> { let mut target = manifest.get_target(env, is_preview)?; let deploy_config = manifest.deploy_config(env)?; - let verbose = matches.is_present("verbose"); - - commands::publish(&user, &mut target, deploy_config, verbose)?; + commands::publish(&user, &mut target, deploy_config)?; } else if let Some(matches) = matches.subcommand_matches("subdomain") { log::info!("Getting project settings"); let manifest = settings::toml::Manifest::new(config_path)?; diff --git a/src/preview/upload.rs b/src/preview/upload.rs index acccaab93..c5f9f8ca9 100644 --- a/src/preview/upload.rs +++ b/src/preview/upload.rs @@ -4,11 +4,10 @@ use reqwest::blocking::Client; use serde::Deserialize; use crate::http; -use crate::kv::bulk::delete; +use crate::kv::bulk; use crate::settings::global_user::GlobalUser; use crate::settings::toml::Target; -use crate::sites; -use crate::sites::{sync, upload_files, AssetManifest}; +use crate::sites::{add_namespace, sync, AssetManifest}; use crate::terminal::{message, styles}; use crate::upload; @@ -59,7 +58,7 @@ pub fn upload( let client = http::legacy_auth_client(&user); if let Some(site_config) = target.site.clone() { - let site_namespace = sites::add_namespace(user, target, true)?; + let site_namespace = add_namespace(user, target, true)?; let path = Path::new(&site_config.bucket); let (to_upload, to_delete, asset_manifest) = @@ -70,7 +69,7 @@ pub fn upload( message::info("Uploading updated files..."); } - upload_files(target, user, &site_namespace.id, to_upload)?; + bulk::put(target, user, &site_namespace.id, to_upload, &None)?; let preview = authenticated_upload(&client, &target, Some(asset_manifest))?; if !to_delete.is_empty() { @@ -78,7 +77,7 @@ pub fn upload( message::info("Deleting stale files..."); } - delete(target, user, &site_namespace.id, to_delete)?; + bulk::delete(target, user, &site_namespace.id, to_delete, &None)?; } preview diff --git a/src/sites/mod.rs b/src/sites/mod.rs index 3e8944f3e..4be4a377c 100644 --- a/src/sites/mod.rs +++ b/src/sites/mod.rs @@ -2,11 +2,9 @@ extern crate base64; mod manifest; mod sync; -mod upload; pub use manifest::AssetManifest; pub use sync::sync; -pub use upload::upload_files; use std::ffi::OsString; use std::fs; diff --git a/src/sites/upload.rs b/src/sites/upload.rs deleted file mode 100644 index 66d72b5b6..000000000 --- a/src/sites/upload.rs +++ /dev/null @@ -1,82 +0,0 @@ -use indicatif::{ProgressBar, ProgressStyle}; - -use cloudflare::endpoints::workerskv::write_bulk::KeyValuePair; - -use crate::kv::bulk::put; -use crate::settings::global_user::GlobalUser; -use crate::settings::toml::Target; -use crate::terminal::message; - -// The consts below are halved from the API's true capacity to help avoid -// hammering it with large requests. -const PAIRS_MAX_COUNT: usize = 5000; -const UPLOAD_MAX_SIZE: usize = 50 * 1024 * 1024; - -pub fn upload_files( - target: &Target, - user: &GlobalUser, - namespace_id: &str, - mut pairs: Vec, -) -> Result<(), failure::Error> { - if !pairs.is_empty() { - // Iterate over all key-value pairs and create batches of uploads, each of which are - // maximum 5K key-value pairs in size OR maximum ~50MB in size. Upload each batch - // as it is created. - let mut key_count = 0; - let mut key_pair_bytes = 0; - let mut key_value_batch: Vec = Vec::new(); - - message::working("Uploading site files"); - let pb = if pairs.len() > PAIRS_MAX_COUNT { - let pb = ProgressBar::new(pairs.len() as u64); - pb.set_style(ProgressStyle::default_bar().template("{wide_bar} {pos}/{len}\n{msg}")); - Some(pb) - } else { - None - }; - while !(pairs.is_empty() && key_value_batch.is_empty()) { - if pairs.is_empty() { - // Last batch to upload - upload_batch(target, &user, namespace_id, &mut key_value_batch)?; - } else { - let pair = pairs.pop().unwrap(); - if key_count + 1 > PAIRS_MAX_COUNT - // Keep upload size small to keep KV bulk API happy - || key_pair_bytes + pair.key.len() + pair.value.len() > UPLOAD_MAX_SIZE - { - upload_batch(target, &user, namespace_id, &mut key_value_batch)?; - if let Some(p) = &pb { - p.inc(key_value_batch.len() as u64); - } - - // If upload successful, reset counters - key_count = 0; - key_pair_bytes = 0; - } - - // Add the popped key-value pair to the running batch of key-value pair uploads - key_count += 1; - key_pair_bytes = key_pair_bytes + pair.key.len() + pair.value.len(); - key_value_batch.push(pair); - } - } - if let Some(p) = pb { - p.finish_with_message("Done Uploading"); - } - } - - Ok(()) -} - -fn upload_batch( - target: &Target, - user: &GlobalUser, - namespace_id: &str, - key_value_batch: &mut Vec, -) -> Result<(), failure::Error> { - // If partial upload fails (e.g. server error), return that error message - put(target, user, namespace_id, &key_value_batch)?; - // Can clear batch now that we've uploaded it - key_value_batch.clear(); - Ok(()) -}