Skip to content
This repository has been archived by the owner on Aug 3, 2023. It is now read-only.

move batching into bulk module code #1400

Merged
merged 3 commits into from
Jun 22, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 9 additions & 22 deletions src/commands/kv/bulk/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use indicatif::{ProgressBar, ProgressStyle};

use crate::commands::kv;
use crate::kv::bulk::delete;
use crate::kv::bulk::MAX_PAIRS;
use crate::kv::bulk::BATCH_KEY_MAX;
use crate::settings::global_user::GlobalUser;
use crate::settings::toml::Target;
use crate::terminal::interactive;
Expand All @@ -36,46 +36,33 @@ pub fn run(
Err(e) => failure::bail!(e),
}

let pairs: Result<Vec<KeyValuePair>, failure::Error> = match &metadata(filename) {
let keys: Vec<String> = match &metadata(filename) {
Ok(file_type) if file_type.is_file() => {
let data = fs::read_to_string(filename)?;
let keys_vec = serde_json::from_str(&data);
let keys_vec: Result<Vec<KeyValuePair>, serde_json::Error> =
serde_json::from_str(&data);
match keys_vec {
Ok(keys_vec) => Ok(keys_vec),
Ok(keys_vec) => keys_vec.iter().map(|kv| { kv.key.to_owned() }).collect(),
Err(_) => failure::bail!("Failed to decode JSON. Please make sure to follow the format, [{\"key\": \"test_key\", \"value\": \"test_value\"}, ...]")
}
}
Ok(_) => failure::bail!("{} should be a JSON file, but is not", filename.display()),
Err(e) => failure::bail!("{}", e),
};

let mut keys: Vec<String> = pairs?.iter().map(|kv| kv.key.to_owned()).collect();

let len = keys.len();

message::working(&format!("deleting {} key value pairs", len));
let progress_bar = if len > MAX_PAIRS {

let progress_bar = if len > BATCH_KEY_MAX {
let pb = ProgressBar::new(len as u64);
pb.set_style(ProgressStyle::default_bar().template("{wide_bar} {pos}/{len}\n{msg}"));
Some(pb)
} else {
None
};

while !keys.is_empty() {
let p: Vec<String> = if keys.len() > MAX_PAIRS {
keys.drain(0..MAX_PAIRS).collect()
} else {
keys.drain(0..).collect()
};

if let Err(e) = delete(target, user, namespace_id, p) {
failure::bail!("{}", e);
}

if let Some(pb) = &progress_bar {
pb.inc(keys.len() as u64);
}
}
delete(target, user, namespace_id, keys, &progress_bar)?;

if let Some(pb) = &progress_bar {
pb.finish_with_message(&format!("deleted {} key value pairs", len));
Expand Down
20 changes: 4 additions & 16 deletions src/commands/kv/bulk/put.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use indicatif::{ProgressBar, ProgressStyle};

use crate::commands::kv::validate_target;
use crate::kv::bulk::put;
use crate::kv::bulk::MAX_PAIRS;
use crate::kv::bulk::BATCH_KEY_MAX;
use crate::settings::global_user::GlobalUser;
use crate::settings::toml::Target;
use crate::terminal::message;
Expand All @@ -23,7 +23,7 @@ pub fn run(
) -> Result<(), failure::Error> {
validate_target(target)?;

let mut pairs: Vec<KeyValuePair> = match &metadata(filename) {
let pairs: Vec<KeyValuePair> = match &metadata(filename) {
Ok(file_type) if file_type.is_file() => {
let data = fs::read_to_string(filename)?;
let data_vec = serde_json::from_str(&data);
Expand All @@ -42,27 +42,15 @@ pub fn run(
let len = pairs.len();

message::working(&format!("uploading {} key value pairs", len));
let progress_bar = if len > MAX_PAIRS {
let progress_bar = if len > BATCH_KEY_MAX {
let pb = ProgressBar::new(len as u64);
pb.set_style(ProgressStyle::default_bar().template("{wide_bar} {pos}/{len}\n{msg}"));
Some(pb)
} else {
None
};

while !pairs.is_empty() {
let p: Vec<KeyValuePair> = if pairs.len() > MAX_PAIRS {
pairs.drain(0..MAX_PAIRS).collect()
} else {
pairs.drain(0..).collect()
};

put(target, &user, namespace_id, &p)?;

if let Some(pb) = &progress_bar {
pb.inc(p.len() as u64);
}
}
put(target, &user, namespace_id, pairs, &progress_bar)?;

if let Some(pb) = &progress_bar {
pb.finish_with_message(&format!("uploaded {} key value pairs", len));
Expand Down
54 changes: 45 additions & 9 deletions src/commands/publish.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
use std::env;
use std::path::{Path, PathBuf};

use indicatif::{ProgressBar, ProgressStyle};

use crate::build;
use crate::deploy;
use crate::http::{self, Feature};
use crate::kv::bulk::delete;
use crate::kv::bulk;
use crate::settings::global_user::GlobalUser;
use crate::settings::toml::{DeployConfig, Target};
use crate::sites;
Expand All @@ -15,7 +17,6 @@ pub fn publish(
user: &GlobalUser,
target: &mut Target,
deploy_config: DeployConfig,
verbose: bool,
) -> Result<(), failure::Error> {
validate_target_required_fields_present(target)?;

Expand All @@ -33,10 +34,27 @@ pub fn publish(
sites::sync(target, user, &site_namespace.id, &path)?;

// First, upload all existing files in bucket directory
if verbose {
message::info("Preparing to upload updated files...");
message::working("Uploading site files");
let upload_progress_bar = if to_upload.len() > bulk::BATCH_KEY_MAX {
let upload_progress_bar = ProgressBar::new(to_upload.len() as u64);
upload_progress_bar
.set_style(ProgressStyle::default_bar().template("{wide_bar} {pos}/{len}\n{msg}"));
Some(upload_progress_bar)
} else {
None
};

bulk::put(
target,
user,
&site_namespace.id,
to_upload,
&upload_progress_bar,
)?;

if let Some(pb) = upload_progress_bar {
pb.finish_with_message("Done Uploading");
}
sites::upload_files(target, user, &site_namespace.id, to_upload)?;

let upload_client = http::featured_legacy_auth_client(user, Feature::Sites);

Expand All @@ -47,11 +65,29 @@ pub fn publish(

// Finally, remove any stale files
if !to_delete.is_empty() {
if verbose {
message::info("Deleting stale files...");
message::info("Deleting stale files...");

let delete_progress_bar = if to_delete.len() > bulk::BATCH_KEY_MAX {
let delete_progress_bar = ProgressBar::new(to_delete.len() as u64);
delete_progress_bar.set_style(
ProgressStyle::default_bar().template("{wide_bar} {pos}/{len}\n{msg}"),
);
Some(delete_progress_bar)
} else {
None
};

bulk::delete(
target,
user,
&site_namespace.id,
to_delete,
&delete_progress_bar,
)?;

if let Some(pb) = delete_progress_bar {
pb.finish_with_message("Done deleting");
}

delete(target, user, &site_namespace.id, to_delete)?;
}
} else {
let upload_client = http::legacy_auth_client(user);
Expand Down
110 changes: 93 additions & 17 deletions src/kv/bulk.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use std::time::Duration;

use indicatif::ProgressBar;

use cloudflare::endpoints::workerskv::delete_bulk::DeleteBulk;
use cloudflare::endpoints::workerskv::write_bulk::KeyValuePair;
use cloudflare::endpoints::workerskv::write_bulk::WriteBulk;
Expand All @@ -12,7 +14,11 @@ use crate::http::feature::headers;
use crate::settings::global_user::GlobalUser;
use crate::settings::toml::Target;

pub const MAX_PAIRS: usize = 10000;
const API_MAX_PAIRS: usize = 10000;
// The consts below are halved from the API's true capacity to help avoid
// hammering it with large requests.
pub const BATCH_KEY_MAX: usize = API_MAX_PAIRS / 2;
const UPLOAD_MAX_SIZE: usize = 50 * 1024 * 1024;

// Create a special API client that has a longer timeout than usual, given that KV operations
// can be lengthy if payloads are large.
Expand All @@ -33,36 +39,106 @@ pub fn put(
target: &Target,
user: &GlobalUser,
namespace_id: &str,
pairs: &[KeyValuePair],
pairs: Vec<KeyValuePair>,
progress_bar: &Option<ProgressBar>,
) -> Result<(), failure::Error> {
let client = bulk_api_client(user)?;

match client.request(&WriteBulk {
account_identifier: &target.account_id,
namespace_identifier: namespace_id,
bulk_key_value_pairs: pairs.to_owned(),
}) {
Ok(_) => Ok(()),
Err(e) => failure::bail!("{}", format_error(e)),
for b in batch_keys_values(pairs) {
match client.request(&WriteBulk {
account_identifier: &target.account_id,
namespace_identifier: namespace_id,
bulk_key_value_pairs: b.to_owned(),
}) {
Ok(_) => {}
Err(e) => failure::bail!("{}", format_error(e)),
}

if let Some(pb) = &progress_bar {
pb.inc(b.len() as u64);
}
}

Ok(())
}

pub fn delete(
target: &Target,
user: &GlobalUser,
namespace_id: &str,
keys: Vec<String>,
progress_bar: &Option<ProgressBar>,
) -> Result<(), failure::Error> {
let client = bulk_api_client(user)?;

let response = client.request(&DeleteBulk {
account_identifier: &target.account_id,
namespace_identifier: namespace_id,
bulk_keys: keys,
});
for b in batch_keys(keys) {
match client.request(&DeleteBulk {
account_identifier: &target.account_id,
namespace_identifier: namespace_id,
bulk_keys: b.to_owned(),
}) {
Ok(_) => {}
Err(e) => failure::bail!("{}", format_error(e)),
}

if let Some(pb) = &progress_bar {
pb.inc(b.len() as u64);
}
}

Ok(())
}

fn batch_keys_values(mut pairs: Vec<KeyValuePair>) -> Vec<Vec<KeyValuePair>> {
ashleymichal marked this conversation as resolved.
Show resolved Hide resolved
let mut batches: Vec<Vec<KeyValuePair>> = Vec::new();

match response {
Ok(_) => Ok(()),
Err(e) => failure::bail!("{}", format_error(e)),
if !pairs.is_empty() {
ashleymichal marked this conversation as resolved.
Show resolved Hide resolved
// Iterate over all key-value pairs and create batches of uploads, each of which are
// maximum 5K key-value pairs in size OR maximum ~50MB in size. Upload each batch
// as it is created.
let mut key_count = 0;
let mut key_pair_bytes = 0;
let mut key_value_batch: Vec<KeyValuePair> = Vec::new();

while !(pairs.is_empty() && key_value_batch.is_empty()) {
if pairs.is_empty() {
// Last batch to upload
batches.push(key_value_batch.to_vec());
key_value_batch.clear();
} else {
let pair = pairs.pop().unwrap();
if key_count + 1 > BATCH_KEY_MAX
// Keep upload size small to keep KV bulk API happy
|| key_pair_bytes + pair.key.len() + pair.value.len() > UPLOAD_MAX_SIZE
{
batches.push(key_value_batch.to_vec());
key_count = 0;
key_pair_bytes = 0;
key_value_batch.clear();
}

// Add the popped key-value pair to the running batch of key-value pair uploads
key_count += 1;
key_pair_bytes = key_pair_bytes + pair.key.len() + pair.value.len();
key_value_batch.push(pair);
}
}
}

batches
}

fn batch_keys(mut keys: Vec<String>) -> Vec<Vec<String>> {
let mut batches = Vec::new();
while !keys.is_empty() {
let k: Vec<String> = if keys.len() > BATCH_KEY_MAX {
keys.drain(0..BATCH_KEY_MAX).collect()
} else {
keys.drain(0..).collect()
};

batches.push(k);
}

batches
}
6 changes: 2 additions & 4 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -514,7 +514,7 @@ fn run() -> Result<(), failure::Error> {
.long("env")
.takes_value(true)
)
.arg(verbose_arg.clone())
.arg(silent_verbose_arg.clone())
.arg(
Arg::with_name("release")
.hidden(true)
Expand Down Expand Up @@ -757,9 +757,7 @@ fn run() -> Result<(), failure::Error> {
let mut target = manifest.get_target(env, is_preview)?;
let deploy_config = manifest.deploy_config(env)?;

let verbose = matches.is_present("verbose");
ashleymichal marked this conversation as resolved.
Show resolved Hide resolved

commands::publish(&user, &mut target, deploy_config, verbose)?;
commands::publish(&user, &mut target, deploy_config)?;
} else if let Some(matches) = matches.subcommand_matches("subdomain") {
log::info!("Getting project settings");
let manifest = settings::toml::Manifest::new(config_path)?;
Expand Down
Loading