Skip to content
This repository has been archived by the owner on Aug 3, 2023. It is now read-only.

use bulk api client directly in bulk action code #1391

Merged
merged 1 commit into from
Jun 16, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 0 additions & 23 deletions src/commands/kv/bulk/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,26 +3,3 @@ pub mod put;

pub use delete::run as delete;
pub use put::run as put;

use std::time::Duration;

use cloudflare::framework::auth::Credentials;
use cloudflare::framework::{Environment, HttpApiClient, HttpApiClientConfig};

use crate::http::feature::headers;
use crate::settings::global_user::GlobalUser;

// Create a special API client that has a longer timeout than usual, given that KV operations
// can be lengthy if payloads are large.
fn bulk_api_client(user: &GlobalUser) -> Result<HttpApiClient, failure::Error> {
let config = HttpApiClientConfig {
http_timeout: Duration::from_secs(5 * 60),
default_headers: headers(None),
};

HttpApiClient::new(
Credentials::from(user.to_owned()),
config,
Environment::Production,
)
}
5 changes: 1 addition & 4 deletions src/commands/kv/bulk/put.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@ use crate::settings::global_user::GlobalUser;
use crate::settings::toml::Target;
use crate::terminal::message;

use super::bulk_api_client;

pub fn run(
target: &Target,
user: &GlobalUser,
Expand Down Expand Up @@ -52,15 +50,14 @@ pub fn run(
None
};

let client = bulk_api_client(user)?;
while !pairs.is_empty() {
let p: Vec<KeyValuePair> = if pairs.len() > MAX_PAIRS {
pairs.drain(0..MAX_PAIRS).collect()
} else {
pairs.drain(0..).collect()
};

put(&client, target, namespace_id, &p)?;
put(target, &user, namespace_id, &p)?;

if let Some(pb) = &progress_bar {
pb.inc(p.len() as u64);
Expand Down
27 changes: 24 additions & 3 deletions src/kv/bulk.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,42 @@
use std::time::Duration;

use cloudflare::endpoints::workerskv::delete_bulk::DeleteBulk;
use cloudflare::endpoints::workerskv::write_bulk::KeyValuePair;
use cloudflare::endpoints::workerskv::write_bulk::WriteBulk;
use cloudflare::framework::apiclient::ApiClient;
use cloudflare::framework::auth::Credentials;
use cloudflare::framework::{Environment, HttpApiClient, HttpApiClientConfig};

use crate::commands::kv::format_error;
use crate::http;
use crate::http::feature::headers;
use crate::settings::global_user::GlobalUser;
use crate::settings::toml::Target;

pub const MAX_PAIRS: usize = 10000;

// Create a special API client that has a longer timeout than usual, given that KV operations
// can be lengthy if payloads are large.
fn bulk_api_client(user: &GlobalUser) -> Result<HttpApiClient, failure::Error> {
let config = HttpApiClientConfig {
http_timeout: Duration::from_secs(5 * 60),
default_headers: headers(None),
};

HttpApiClient::new(
Credentials::from(user.to_owned()),
config,
Environment::Production,
)
}

pub fn put(
client: &impl ApiClient,
target: &Target,
user: &GlobalUser,
namespace_id: &str,
pairs: &[KeyValuePair],
) -> Result<(), failure::Error> {
let client = bulk_api_client(user)?;

match client.request(&WriteBulk {
account_identifier: &target.account_id,
namespace_identifier: namespace_id,
Expand All @@ -32,7 +53,7 @@ pub fn delete(
namespace_id: &str,
keys: Vec<String>,
) -> Result<(), failure::Error> {
let client = http::cf_v4_client(user)?;
let client = bulk_api_client(user)?;

let response = client.request(&DeleteBulk {
account_identifier: &target.account_id,
Expand Down
11 changes: 4 additions & 7 deletions src/sites/upload.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
use indicatif::{ProgressBar, ProgressStyle};

use cloudflare::endpoints::workerskv::write_bulk::KeyValuePair;
use cloudflare::framework::apiclient::ApiClient;

use crate::http;
use crate::kv::bulk::put;
use crate::settings::global_user::GlobalUser;
use crate::settings::toml::Target;
Expand All @@ -21,7 +19,6 @@ pub fn upload_files(
mut pairs: Vec<KeyValuePair>,
) -> Result<(), failure::Error> {
if !pairs.is_empty() {
let client = http::cf_v4_client(user)?;
// Iterate over all key-value pairs and create batches of uploads, each of which are
// maximum 5K key-value pairs in size OR maximum ~50MB in size. Upload each batch
// as it is created.
Expand All @@ -40,14 +37,14 @@ pub fn upload_files(
while !(pairs.is_empty() && key_value_batch.is_empty()) {
if pairs.is_empty() {
// Last batch to upload
upload_batch(&client, target, namespace_id, &mut key_value_batch)?;
upload_batch(target, &user, namespace_id, &mut key_value_batch)?;
} else {
let pair = pairs.pop().unwrap();
if key_count + 1 > PAIRS_MAX_COUNT
// Keep upload size small to keep KV bulk API happy
|| key_pair_bytes + pair.key.len() + pair.value.len() > UPLOAD_MAX_SIZE
{
upload_batch(&client, target, namespace_id, &mut key_value_batch)?;
upload_batch(target, &user, namespace_id, &mut key_value_batch)?;
if let Some(p) = &pb {
p.inc(key_value_batch.len() as u64);
}
Expand All @@ -72,13 +69,13 @@ pub fn upload_files(
}

fn upload_batch(
client: &impl ApiClient,
target: &Target,
user: &GlobalUser,
namespace_id: &str,
key_value_batch: &mut Vec<KeyValuePair>,
) -> Result<(), failure::Error> {
// If partial upload fails (e.g. server error), return that error message
put(client, target, namespace_id, &key_value_batch)?;
put(target, user, namespace_id, &key_value_batch)?;
// Can clear batch now that we've uploaded it
key_value_batch.clear();
Ok(())
Expand Down