Skip to content

Commit

Permalink
feat: oss multipart uploads write (#2723)
Browse files Browse the repository at this point in the history
  • Loading branch information
parkma99 authored Jul 27, 2023
1 parent d2b4315 commit b31a584
Show file tree
Hide file tree
Showing 3 changed files with 108 additions and 137 deletions.
3 changes: 2 additions & 1 deletion core/src/services/oss/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,7 @@ pub struct OssBackend {
impl Accessor for OssBackend {
type Reader = IncomingAsyncBody;
type BlockingReader = ();
type Writer = OssWriter;
type Writer = oio::MultipartUploadWriter<OssWriter>;
type BlockingWriter = ();
type Appender = OssAppender;
type Pager = OssPager;
Expand All @@ -412,6 +412,7 @@ impl Accessor for OssBackend {
write_can_sink: true,
write_with_cache_control: true,
write_with_content_type: true,
write_with_content_disposition: true,
write_without_content_length: true,
delete: true,
create_dir: true,
Expand Down
58 changes: 33 additions & 25 deletions core/src/services/oss/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -487,27 +487,13 @@ impl OssCore {
}

pub async fn oss_initiate_upload(
&self,
path: &str,
args: &OpWrite,
) -> Result<Response<IncomingAsyncBody>> {
let cache_control = args.cache_control();
let req = self
.oss_initiate_upload_request(path, None, None, cache_control, AsyncBody::Empty, false)
.await?;
self.send(req).await
}

/// Creates a request that initiates multipart upload
async fn oss_initiate_upload_request(
&self,
path: &str,
content_type: Option<&str>,
content_disposition: Option<&str>,
cache_control: Option<&str>,
body: AsyncBody,
is_presign: bool,
) -> Result<Request<AsyncBody>> {
) -> Result<Response<IncomingAsyncBody>> {
let path = build_abs_path(&self.root, path);
let endpoint = self.get_endpoint(is_presign);
let url = format!("{}/{}?uploads", endpoint, percent_encode_path(&path));
Expand All @@ -522,9 +508,11 @@ impl OssCore {
req = req.header(CACHE_CONTROL, cache_control);
}
req = self.insert_sse_headers(req);
let mut req = req.body(body).map_err(new_request_build_error)?;
let mut req = req
.body(AsyncBody::Empty)
.map_err(new_request_build_error)?;
self.sign(&mut req).await?;
Ok(req)
self.send(req).await
}

/// Creates a request to upload a part
Expand All @@ -534,9 +522,9 @@ impl OssCore {
upload_id: &str,
part_number: usize,
is_presign: bool,
size: Option<u64>,
size: u64,
body: AsyncBody,
) -> Result<Request<AsyncBody>> {
) -> Result<Response<IncomingAsyncBody>> {
let p = build_abs_path(&self.root, path);
let endpoint = self.get_endpoint(is_presign);

Expand All @@ -549,21 +537,18 @@ impl OssCore {
);

let mut req = Request::put(&url);

if let Some(size) = size {
req = req.header(CONTENT_LENGTH, size);
}
req = req.header(CONTENT_LENGTH, size);
let mut req = req.body(body).map_err(new_request_build_error)?;
self.sign(&mut req).await?;
Ok(req)
self.send(req).await
}

pub async fn oss_complete_multipart_upload_request(
&self,
path: &str,
upload_id: &str,
is_presign: bool,
parts: &[MultipartUploadPart],
parts: Vec<MultipartUploadPart>,
) -> Result<Response<IncomingAsyncBody>> {
let p = build_abs_path(&self.root, path);
let endpoint = self.get_endpoint(is_presign);
Expand Down Expand Up @@ -592,6 +577,29 @@ impl OssCore {
self.sign(&mut req).await?;
self.send(req).await
}

/// Abort an on-going multipart upload.
/// reference docs https://www.alibabacloud.com/help/zh/oss/developer-reference/abortmultipartupload
pub async fn oss_abort_multipart_upload(
&self,
path: &str,
upload_id: &str,
) -> Result<Response<IncomingAsyncBody>> {
let p = build_abs_path(&self.root, path);

let url = format!(
"{}/{}?uploadId={}",
self.endpoint,
percent_encode_path(&p),
percent_encode_path(upload_id)
);

let mut req = Request::delete(&url)
.body(AsyncBody::Empty)
.map_err(new_request_build_error)?;
self.sign(&mut req).await?;
self.send(req).await
}
}

/// Request of DeleteObjects.
Expand Down
184 changes: 73 additions & 111 deletions core/src/services/oss/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ use std::sync::Arc;

use async_trait::async_trait;
use bytes::Buf;
use bytes::Bytes;
use http::StatusCode;

use super::core::*;
Expand All @@ -32,29 +31,28 @@ pub struct OssWriter {

op: OpWrite,
path: String,
upload_id: Option<String>,

parts: Vec<MultipartUploadPart>,
buffer: oio::VectorCursor,
buffer_size: usize,
}

impl OssWriter {
pub fn new(core: Arc<OssCore>, path: &str, op: OpWrite) -> Self {
let buffer_size = core.write_min_size;
OssWriter {
pub fn new(
core: Arc<OssCore>,
path: &str,
op: OpWrite,
) -> oio::MultipartUploadWriter<OssWriter> {
let write_min_size = core.write_min_size;
let total_size = op.content_length();
let oss_writer = OssWriter {
core,
path: path.to_string(),
op,

upload_id: None,
parts: vec![],
buffer: oio::VectorCursor::new(),
buffer_size,
}
};
oio::MultipartUploadWriter::new(oss_writer, total_size).with_write_min_size(write_min_size)
}
}

async fn write_oneshot(&self, size: u64, body: AsyncBody) -> Result<()> {
#[async_trait]
impl oio::MultipartUploadWrite for OssWriter {
async fn write_once(&self, size: u64, body: AsyncBody) -> Result<()> {
let mut req = self.core.oss_put_object_request(
&self.path,
Some(size),
Expand All @@ -80,38 +78,51 @@ impl OssWriter {
}
}

async fn initiate_upload(&self) -> Result<String> {
let resp = self.core.oss_initiate_upload(&self.path, &self.op).await?;
match resp.status() {
async fn initiate_part(&self) -> Result<String> {
let resp = self
.core
.oss_initiate_upload(
&self.path,
self.op.content_type(),
self.op.content_disposition(),
self.op.cache_control(),
false,
)
.await?;

let status = resp.status();

match status {
StatusCode::OK => {
let bs = resp.into_body().bytes().await?;

let result: InitiateMultipartUploadResult =
quick_xml::de::from_reader(bs.reader()).map_err(new_xml_deserialize_error)?;

Ok(result.upload_id)
}
_ => Err(parse_error(resp).await?),
}
}

async fn write_part(&self, upload_id: &str, bs: Bytes) -> Result<MultipartUploadPart> {
// Aliyun OSS requires part number must between [1..=10000]
let part_number = self.parts.len() + 1;
let mut req = self
async fn write_part(
&self,
upload_id: &str,
part_number: usize,
size: u64,
body: AsyncBody,
) -> Result<oio::MultipartUploadPart> {
// OSS requires part number must between [1..=10000]
let part_number = part_number + 1;

let resp = self
.core
.oss_upload_part_request(
&self.path,
upload_id,
part_number,
false,
Some(bs.len() as u64),
AsyncBody::Bytes(bs),
)
.oss_upload_part_request(&self.path, upload_id, part_number, false, size, body)
.await?;

self.core.sign(&mut req).await?;
let status = resp.status();

let resp = self.core.send(req).await?;
match resp.status() {
match status {
StatusCode::OK => {
let etag = parse_etag(resp.headers())?
.ok_or_else(|| {
Expand All @@ -121,103 +132,54 @@ impl OssWriter {
)
})?
.to_string();

resp.into_body().consume().await?;
Ok(MultipartUploadPart { part_number, etag })

Ok(oio::MultipartUploadPart { part_number, etag })
}
_ => Err(parse_error(resp).await?),
}
}
}

#[async_trait]
impl oio::Write for OssWriter {
async fn write(&mut self, bs: Bytes) -> Result<()> {
let upload_id = match &self.upload_id {
Some(upload_id) => upload_id,
None => {
if self.op.content_length().unwrap_or_default() == bs.len() as u64 {
return self
.write_oneshot(bs.len() as u64, AsyncBody::Bytes(bs))
.await;
} else {
let upload_id = self.initiate_upload().await?;
self.upload_id = Some(upload_id);
self.upload_id.as_deref().unwrap()
}
}
};
async fn complete_part(
&self,
upload_id: &str,
parts: &[oio::MultipartUploadPart],
) -> Result<()> {
let parts = parts
.iter()
.map(|p| MultipartUploadPart {
part_number: p.part_number,
etag: p.etag.clone(),
})
.collect();

// Ignore empty bytes
if bs.is_empty() {
return Ok(());
}
let resp = self
.core
.oss_complete_multipart_upload_request(&self.path, upload_id, false, parts)
.await?;

self.buffer.push(bs);
// Return directly if the buffer is not full
if self.buffer.len() <= self.buffer_size {
return Ok(());
}
let status = resp.status();

let bs = self.buffer.peak_at_least(self.buffer_size);
let size = bs.len();
match status {
StatusCode::OK => {
resp.into_body().consume().await?;

match self.write_part(upload_id, bs).await {
Ok(part) => {
self.buffer.take(size);
self.parts.push(part);
Ok(())
}
Err(e) => {
// If the upload fails, we should pop the given bs to make sure
// write is re-enter safe.
self.buffer.pop();
Err(e)
}
_ => Err(parse_error(resp).await?),
}
}

async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> {
self.write_oneshot(size, AsyncBody::Stream(s)).await
}

// TODO: we can cancel the upload by sending an abort request.
async fn abort(&mut self) -> Result<()> {
Err(Error::new(
ErrorKind::Unsupported,
"output writer doesn't support abort",
))
}

async fn close(&mut self) -> Result<()> {
let upload_id = if let Some(upload_id) = &self.upload_id {
upload_id
} else {
return Ok(());
};

// Make sure internal buffer has been flushed.
if !self.buffer.is_empty() {
let bs = self.buffer.peak_exact(self.buffer.len());

match self.write_part(upload_id, bs).await {
Ok(part) => {
self.buffer.clear();
self.parts.push(part);
}
Err(e) => {
return Err(e);
}
}
}

async fn abort_part(&self, upload_id: &str) -> Result<()> {
let resp = self
.core
.oss_complete_multipart_upload_request(&self.path, upload_id, false, &self.parts)
.oss_abort_multipart_upload(&self.path, upload_id)
.await?;
match resp.status() {
StatusCode::OK => {
// OSS returns code 204 if abort succeeds.
StatusCode::NO_CONTENT => {
resp.into_body().consume().await?;

Ok(())
}
_ => Err(parse_error(resp).await?),
Expand Down

0 comments on commit b31a584

Please sign in to comment.