Skip to content
This repository has been archived by the owner on Aug 3, 2023. It is now read-only.

Commit

Permalink
Merge pull request #1391 from cloudflare/bug/bulk-timeout
Browse files Browse the repository at this point in the history
use bulk api client directly in bulk action code
  • Loading branch information
ashleymichal authored Jun 16, 2020
2 parents 5dcf0bb + dd6d63a commit 496dc2f
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 37 deletions.
23 changes: 0 additions & 23 deletions src/commands/kv/bulk/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,26 +3,3 @@ pub mod put;

pub use delete::run as delete;
pub use put::run as put;

use std::time::Duration;

use cloudflare::framework::auth::Credentials;
use cloudflare::framework::{Environment, HttpApiClient, HttpApiClientConfig};

use crate::http::feature::headers;
use crate::settings::global_user::GlobalUser;

// Create a special API client that has a longer timeout than usual, given that KV operations
// can be lengthy if payloads are large.
fn bulk_api_client(user: &GlobalUser) -> Result<HttpApiClient, failure::Error> {
let config = HttpApiClientConfig {
http_timeout: Duration::from_secs(5 * 60),
default_headers: headers(None),
};

HttpApiClient::new(
Credentials::from(user.to_owned()),
config,
Environment::Production,
)
}
5 changes: 1 addition & 4 deletions src/commands/kv/bulk/put.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@ use crate::settings::global_user::GlobalUser;
use crate::settings::toml::Target;
use crate::terminal::message;

use super::bulk_api_client;

pub fn run(
target: &Target,
user: &GlobalUser,
Expand Down Expand Up @@ -52,15 +50,14 @@ pub fn run(
None
};

let client = bulk_api_client(user)?;
while !pairs.is_empty() {
let p: Vec<KeyValuePair> = if pairs.len() > MAX_PAIRS {
pairs.drain(0..MAX_PAIRS).collect()
} else {
pairs.drain(0..).collect()
};

put(&client, target, namespace_id, &p)?;
put(target, &user, namespace_id, &p)?;

if let Some(pb) = &progress_bar {
pb.inc(p.len() as u64);
Expand Down
27 changes: 24 additions & 3 deletions src/kv/bulk.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,42 @@
use std::time::Duration;

use cloudflare::endpoints::workerskv::delete_bulk::DeleteBulk;
use cloudflare::endpoints::workerskv::write_bulk::KeyValuePair;
use cloudflare::endpoints::workerskv::write_bulk::WriteBulk;
use cloudflare::framework::apiclient::ApiClient;
use cloudflare::framework::auth::Credentials;
use cloudflare::framework::{Environment, HttpApiClient, HttpApiClientConfig};

use crate::commands::kv::format_error;
use crate::http;
use crate::http::feature::headers;
use crate::settings::global_user::GlobalUser;
use crate::settings::toml::Target;

pub const MAX_PAIRS: usize = 10000;

// Create a special API client that has a longer timeout than usual, given that KV operations
// can be lengthy if payloads are large.
fn bulk_api_client(user: &GlobalUser) -> Result<HttpApiClient, failure::Error> {
let config = HttpApiClientConfig {
http_timeout: Duration::from_secs(5 * 60),
default_headers: headers(None),
};

HttpApiClient::new(
Credentials::from(user.to_owned()),
config,
Environment::Production,
)
}

pub fn put(
client: &impl ApiClient,
target: &Target,
user: &GlobalUser,
namespace_id: &str,
pairs: &[KeyValuePair],
) -> Result<(), failure::Error> {
let client = bulk_api_client(user)?;

match client.request(&WriteBulk {
account_identifier: &target.account_id,
namespace_identifier: namespace_id,
Expand All @@ -32,7 +53,7 @@ pub fn delete(
namespace_id: &str,
keys: Vec<String>,
) -> Result<(), failure::Error> {
let client = http::cf_v4_client(user)?;
let client = bulk_api_client(user)?;

let response = client.request(&DeleteBulk {
account_identifier: &target.account_id,
Expand Down
11 changes: 4 additions & 7 deletions src/sites/upload.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
use indicatif::{ProgressBar, ProgressStyle};

use cloudflare::endpoints::workerskv::write_bulk::KeyValuePair;
use cloudflare::framework::apiclient::ApiClient;

use crate::http;
use crate::kv::bulk::put;
use crate::settings::global_user::GlobalUser;
use crate::settings::toml::Target;
Expand All @@ -21,7 +19,6 @@ pub fn upload_files(
mut pairs: Vec<KeyValuePair>,
) -> Result<(), failure::Error> {
if !pairs.is_empty() {
let client = http::cf_v4_client(user)?;
// Iterate over all key-value pairs and create batches of uploads, each of which are
// maximum 5K key-value pairs in size OR maximum ~50MB in size. Upload each batch
// as it is created.
Expand All @@ -40,14 +37,14 @@ pub fn upload_files(
while !(pairs.is_empty() && key_value_batch.is_empty()) {
if pairs.is_empty() {
// Last batch to upload
upload_batch(&client, target, namespace_id, &mut key_value_batch)?;
upload_batch(target, &user, namespace_id, &mut key_value_batch)?;
} else {
let pair = pairs.pop().unwrap();
if key_count + 1 > PAIRS_MAX_COUNT
// Keep upload size small to keep KV bulk API happy
|| key_pair_bytes + pair.key.len() + pair.value.len() > UPLOAD_MAX_SIZE
{
upload_batch(&client, target, namespace_id, &mut key_value_batch)?;
upload_batch(target, &user, namespace_id, &mut key_value_batch)?;
if let Some(p) = &pb {
p.inc(key_value_batch.len() as u64);
}
Expand All @@ -72,13 +69,13 @@ pub fn upload_files(
}

fn upload_batch(
client: &impl ApiClient,
target: &Target,
user: &GlobalUser,
namespace_id: &str,
key_value_batch: &mut Vec<KeyValuePair>,
) -> Result<(), failure::Error> {
// If partial upload fails (e.g. server error), return that error message
put(client, target, namespace_id, &key_value_batch)?;
put(target, user, namespace_id, &key_value_batch)?;
// Can clear batch now that we've uploaded it
key_value_batch.clear();
Ok(())
Expand Down

0 comments on commit 496dc2f

Please sign in to comment.