Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace AsyncWrite with Upload trait and rename MultiPartStore to MultipartStore (#5458) #5500

Merged
merged 16 commits into from
Mar 19, 2024
104 changes: 59 additions & 45 deletions object_store/src/aws/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,14 @@

//! An object store implementation for S3
//!
//! ## Multi-part uploads
//! ## Multipart uploads
//!
//! Multi-part uploads can be initiated with the [ObjectStore::put_multipart] method.
//! Data passed to the writer is automatically buffered to meet the minimum size
//! requirements for a part. Multiple parts are uploaded concurrently.
//! Multipart uploads can be initiated with the [ObjectStore::put_multipart] method.
//!
//! If the writer fails for any reason, you may have parts uploaded to AWS but not
//! used that you may be charged for. Use the [ObjectStore::abort_multipart] method
//! to abort the upload and drop those unneeded parts. In addition, you may wish to
//! consider implementing [automatic cleanup] of unused parts that are older than one
//! week.
//! used that you will be charged for. [`MultipartUpload::abort`] may be invoked to drop
//! these unneeded parts, however, it is recommended that you consider implementing
//! [automatic cleanup] of unused parts that are older than some threshold.
//!
//! [automatic cleanup]: https://aws.amazon.com/blogs/aws/s3-lifecycle-management-update-support-for-multipart-uploads-and-delete-markers/

Expand All @@ -38,18 +35,17 @@ use futures::{StreamExt, TryStreamExt};
use reqwest::header::{HeaderName, IF_MATCH, IF_NONE_MATCH};
use reqwest::{Method, StatusCode};
use std::{sync::Arc, time::Duration};
use tokio::io::AsyncWrite;
use url::Url;

use crate::aws::client::{RequestError, S3Client};
use crate::client::get::GetClientExt;
use crate::client::list::ListClientExt;
use crate::client::CredentialProvider;
use crate::multipart::{MultiPartStore, PartId, PutPart, WriteMultiPart};
use crate::multipart::{MultipartStore, PartId};
use crate::signer::Signer;
use crate::{
Error, GetOptions, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore, Path, PutMode,
PutOptions, PutResult, Result,
Error, GetOptions, GetResult, ListResult, MultipartId, MultipartUpload, ObjectMeta,
ObjectStore, Path, PutMode, PutOptions, PutResult, Result, UploadPart,
};

static TAGS_HEADER: HeaderName = HeaderName::from_static("x-amz-tagging");
Expand Down Expand Up @@ -85,6 +81,7 @@ const STORE: &str = "S3";

/// [`CredentialProvider`] for [`AmazonS3`]
pub type AwsCredentialProvider = Arc<dyn CredentialProvider<Credential = AwsCredential>>;
use crate::client::parts::Parts;
pub use credential::{AwsAuthorizer, AwsCredential};

/// Interface for [Amazon S3](https://aws.amazon.com/s3/).
Expand Down Expand Up @@ -211,25 +208,18 @@ impl ObjectStore for AmazonS3 {
}
}

async fn put_multipart(
&self,
location: &Path,
) -> Result<(MultipartId, Box<dyn AsyncWrite + Unpin + Send>)> {
let id = self.client.create_multipart(location).await?;

let upload = S3MultiPartUpload {
location: location.clone(),
upload_id: id.clone(),
client: Arc::clone(&self.client),
};

Ok((id, Box::new(WriteMultiPart::new(upload, 8))))
}

async fn abort_multipart(&self, location: &Path, multipart_id: &MultipartId) -> Result<()> {
self.client
.delete_request(location, &[("uploadId", multipart_id)])
.await
async fn put_multipart(&self, location: &Path) -> Result<Box<dyn MultipartUpload>> {
let upload_id = self.client.create_multipart(location).await?;

Ok(Box::new(S3MultiPartUpload {
part_idx: 0,
state: Arc::new(UploadState {
client: Arc::clone(&self.client),
location: location.clone(),
upload_id: upload_id.clone(),
parts: Default::default(),
}),
}))
}

async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
Expand Down Expand Up @@ -319,30 +309,55 @@ impl ObjectStore for AmazonS3 {
}
}

#[derive(Debug)]
struct S3MultiPartUpload {
part_idx: usize,
state: Arc<UploadState>,
}

#[derive(Debug)]
struct UploadState {
parts: Parts,
location: Path,
upload_id: String,
client: Arc<S3Client>,
}

#[async_trait]
impl PutPart for S3MultiPartUpload {
async fn put_part(&self, buf: Vec<u8>, part_idx: usize) -> Result<PartId> {
self.client
.put_part(&self.location, &self.upload_id, part_idx, buf.into())
impl MultipartUpload for S3MultiPartUpload {
fn put_part(&mut self, data: Bytes) -> UploadPart {
let idx = self.part_idx;
self.part_idx += 1;
let state = Arc::clone(&self.state);
Box::pin(async move {
let part = state
.client
.put_part(&state.location, &state.upload_id, idx, data)
.await?;
state.parts.put(idx, part);
Ok(())
})
}

async fn complete(&mut self) -> Result<PutResult> {
let parts = self.state.parts.finish(self.part_idx)?;

self.state
.client
.complete_multipart(&self.state.location, &self.state.upload_id, parts)
.await
}

async fn complete(&self, completed_parts: Vec<PartId>) -> Result<()> {
self.client
.complete_multipart(&self.location, &self.upload_id, completed_parts)
.await?;
Ok(())
async fn abort(&mut self) -> Result<()> {
self.state
.client
.delete_request(&self.state.location, &[("uploadId", &self.state.upload_id)])
.await
}
}

#[async_trait]
impl MultiPartStore for AmazonS3 {
impl MultipartStore for AmazonS3 {
async fn create_multipart(&self, path: &Path) -> Result<MultipartId> {
self.client.create_multipart(path).await
}
Expand Down Expand Up @@ -377,7 +392,6 @@ mod tests {
use crate::{client::get::GetClient, tests::*};
use bytes::Bytes;
use hyper::HeaderMap;
use tokio::io::AsyncWriteExt;

const NON_EXISTENT_NAME: &str = "nonexistentname";

Expand Down Expand Up @@ -542,9 +556,9 @@ mod tests {
store.put(&locations[0], data.clone()).await.unwrap();
store.copy(&locations[0], &locations[1]).await.unwrap();

let (_, mut writer) = store.put_multipart(&locations[2]).await.unwrap();
writer.write_all(&data).await.unwrap();
writer.shutdown().await.unwrap();
let mut upload = store.put_multipart(&locations[2]).await.unwrap();
upload.put_part(data.clone()).await.unwrap();
upload.complete().await.unwrap();

for location in &locations {
let res = store
Expand Down
80 changes: 47 additions & 33 deletions object_store/src/azure/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,15 @@
//!
//! ## Streaming uploads
//!
//! [ObjectStore::put_multipart] will upload data in blocks and write a blob from those
//! blocks. Data is buffered internally to make blocks of at least 5MB and blocks
//! are uploaded concurrently.
//! [ObjectStore::put_multipart] will upload data in blocks and write a blob from those blocks.
//!
//! [ObjectStore::abort_multipart] is a no-op, since Azure Blob Store doesn't provide
//! a way to drop old blocks. Instead unused blocks are automatically cleaned up
//! after 7 days.
//! Unused blocks will automatically be dropped after 7 days.
use crate::{
multipart::{MultiPartStore, PartId, PutPart, WriteMultiPart},
multipart::{MultipartStore, PartId},
path::Path,
signer::Signer,
GetOptions, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore, PutOptions, PutResult,
Result,
GetOptions, GetResult, ListResult, MultipartId, MultipartUpload, ObjectMeta, ObjectStore,
PutOptions, PutResult, Result, UploadPart,
};
use async_trait::async_trait;
use bytes::Bytes;
Expand All @@ -40,7 +36,6 @@ use reqwest::Method;
use std::fmt::Debug;
use std::sync::Arc;
use std::time::Duration;
use tokio::io::AsyncWrite;
use url::Url;

use crate::client::get::GetClientExt;
Expand All @@ -54,6 +49,8 @@ mod credential;

/// [`CredentialProvider`] for [`MicrosoftAzure`]
pub type AzureCredentialProvider = Arc<dyn CredentialProvider<Credential = AzureCredential>>;
use crate::azure::client::AzureClient;
use crate::client::parts::Parts;
pub use builder::{AzureConfigKey, MicrosoftAzureBuilder};
pub use credential::AzureCredential;

Expand Down Expand Up @@ -94,21 +91,15 @@ impl ObjectStore for MicrosoftAzure {
self.client.put_blob(location, bytes, opts).await
}

async fn put_multipart(
&self,
location: &Path,
) -> Result<(MultipartId, Box<dyn AsyncWrite + Unpin + Send>)> {
let inner = AzureMultiPartUpload {
client: Arc::clone(&self.client),
location: location.to_owned(),
};
Ok((String::new(), Box::new(WriteMultiPart::new(inner, 8))))
}

async fn abort_multipart(&self, _location: &Path, _multipart_id: &MultipartId) -> Result<()> {
// There is no way to drop blocks that have been uploaded. Instead, they simply
// expire in 7 days.
Ok(())
async fn put_multipart(&self, location: &Path) -> Result<Box<dyn MultipartUpload>> {
Ok(Box::new(AzureMultiPartUpload {
part_idx: 0,
state: Arc::new(UploadState {
client: Arc::clone(&self.client),
location: location.clone(),
parts: Default::default(),
}),
}))
}

async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
Expand Down Expand Up @@ -197,26 +188,49 @@ impl Signer for MicrosoftAzure {
/// put_multipart_part -> PUT block
/// complete -> PUT block list
/// abort -> No equivalent; blocks are simply dropped after 7 days
#[derive(Debug, Clone)]
#[derive(Debug)]
struct AzureMultiPartUpload {
client: Arc<client::AzureClient>,
part_idx: usize,
state: Arc<UploadState>,
}

#[derive(Debug)]
struct UploadState {
location: Path,
parts: Parts,
client: Arc<AzureClient>,
}

#[async_trait]
impl PutPart for AzureMultiPartUpload {
async fn put_part(&self, buf: Vec<u8>, idx: usize) -> Result<PartId> {
self.client.put_block(&self.location, idx, buf.into()).await
impl MultipartUpload for AzureMultiPartUpload {
fn put_part(&mut self, data: Bytes) -> UploadPart {
let idx = self.part_idx;
self.part_idx += 1;
let state = Arc::clone(&self.state);
Box::pin(async move {
let part = state.client.put_block(&state.location, idx, data).await?;
state.parts.put(idx, part);
Ok(())
})
}

async fn complete(&mut self) -> Result<PutResult> {
let parts = self.state.parts.finish(self.part_idx)?;

self.state
.client
.put_block_list(&self.state.location, parts)
.await
}

async fn complete(&self, parts: Vec<PartId>) -> Result<()> {
self.client.put_block_list(&self.location, parts).await?;
async fn abort(&mut self) -> Result<()> {
// Nothing to do
Ok(())
}
}

#[async_trait]
impl MultiPartStore for MicrosoftAzure {
impl MultipartStore for MicrosoftAzure {
async fn create_multipart(&self, _: &Path) -> Result<MultipartId> {
Ok(String::new())
}
Expand Down
Loading
Loading