From b31a58438c591af63bcec3cf7d67e1ef30f8ac1e Mon Sep 17 00:00:00 2001 From: parkma99 <84610851+parkma99@users.noreply.github.com> Date: Thu, 27 Jul 2023 23:09:09 +0800 Subject: [PATCH] feat: oss multipart uploads write (#2723) --- core/src/services/oss/backend.rs | 3 +- core/src/services/oss/core.rs | 58 +++++----- core/src/services/oss/writer.rs | 184 ++++++++++++------------------- 3 files changed, 108 insertions(+), 137 deletions(-) diff --git a/core/src/services/oss/backend.rs b/core/src/services/oss/backend.rs index 370c7d2e7fc..14cc08c2f93 100644 --- a/core/src/services/oss/backend.rs +++ b/core/src/services/oss/backend.rs @@ -386,7 +386,7 @@ pub struct OssBackend { impl Accessor for OssBackend { type Reader = IncomingAsyncBody; type BlockingReader = (); - type Writer = OssWriter; + type Writer = oio::MultipartUploadWriter; type BlockingWriter = (); type Appender = OssAppender; type Pager = OssPager; @@ -412,6 +412,7 @@ impl Accessor for OssBackend { write_can_sink: true, write_with_cache_control: true, write_with_content_type: true, + write_with_content_disposition: true, write_without_content_length: true, delete: true, create_dir: true, diff --git a/core/src/services/oss/core.rs b/core/src/services/oss/core.rs index da2628a3adb..11279f9e115 100644 --- a/core/src/services/oss/core.rs +++ b/core/src/services/oss/core.rs @@ -487,27 +487,13 @@ impl OssCore { } pub async fn oss_initiate_upload( - &self, - path: &str, - args: &OpWrite, - ) -> Result> { - let cache_control = args.cache_control(); - let req = self - .oss_initiate_upload_request(path, None, None, cache_control, AsyncBody::Empty, false) - .await?; - self.send(req).await - } - - /// Creates a request that initiates multipart upload - async fn oss_initiate_upload_request( &self, path: &str, content_type: Option<&str>, content_disposition: Option<&str>, cache_control: Option<&str>, - body: AsyncBody, is_presign: bool, - ) -> Result> { + ) -> Result> { let path = build_abs_path(&self.root, path); let endpoint = self.get_endpoint(is_presign); let url = format!("{}/{}?uploads", endpoint, percent_encode_path(&path)); @@ -522,9 +508,11 @@ impl OssCore { req = req.header(CACHE_CONTROL, cache_control); } req = self.insert_sse_headers(req); - let mut req = req.body(body).map_err(new_request_build_error)?; + let mut req = req + .body(AsyncBody::Empty) + .map_err(new_request_build_error)?; self.sign(&mut req).await?; - Ok(req) + self.send(req).await } /// Creates a request to upload a part @@ -534,9 +522,9 @@ impl OssCore { upload_id: &str, part_number: usize, is_presign: bool, - size: Option, + size: u64, body: AsyncBody, - ) -> Result> { + ) -> Result> { let p = build_abs_path(&self.root, path); let endpoint = self.get_endpoint(is_presign); @@ -549,13 +537,10 @@ impl OssCore { ); let mut req = Request::put(&url); - - if let Some(size) = size { - req = req.header(CONTENT_LENGTH, size); - } + req = req.header(CONTENT_LENGTH, size); let mut req = req.body(body).map_err(new_request_build_error)?; self.sign(&mut req).await?; - Ok(req) + self.send(req).await } pub async fn oss_complete_multipart_upload_request( @@ -563,7 +548,7 @@ impl OssCore { path: &str, upload_id: &str, is_presign: bool, - parts: &[MultipartUploadPart], + parts: Vec, ) -> Result> { let p = build_abs_path(&self.root, path); let endpoint = self.get_endpoint(is_presign); @@ -592,6 +577,29 @@ impl OssCore { self.sign(&mut req).await?; self.send(req).await } + + /// Abort an on-going multipart upload. + /// reference docs https://www.alibabacloud.com/help/zh/oss/developer-reference/abortmultipartupload + pub async fn oss_abort_multipart_upload( + &self, + path: &str, + upload_id: &str, + ) -> Result> { + let p = build_abs_path(&self.root, path); + + let url = format!( + "{}/{}?uploadId={}", + self.endpoint, + percent_encode_path(&p), + percent_encode_path(upload_id) + ); + + let mut req = Request::delete(&url) + .body(AsyncBody::Empty) + .map_err(new_request_build_error)?; + self.sign(&mut req).await?; + self.send(req).await + } } /// Request of DeleteObjects. diff --git a/core/src/services/oss/writer.rs b/core/src/services/oss/writer.rs index d81fbebf13a..f2f737d65e7 100644 --- a/core/src/services/oss/writer.rs +++ b/core/src/services/oss/writer.rs @@ -19,7 +19,6 @@ use std::sync::Arc; use async_trait::async_trait; use bytes::Buf; -use bytes::Bytes; use http::StatusCode; use super::core::*; @@ -32,29 +31,28 @@ pub struct OssWriter { op: OpWrite, path: String, - upload_id: Option, - - parts: Vec, - buffer: oio::VectorCursor, - buffer_size: usize, } impl OssWriter { - pub fn new(core: Arc, path: &str, op: OpWrite) -> Self { - let buffer_size = core.write_min_size; - OssWriter { + pub fn new( + core: Arc, + path: &str, + op: OpWrite, + ) -> oio::MultipartUploadWriter { + let write_min_size = core.write_min_size; + let total_size = op.content_length(); + let oss_writer = OssWriter { core, path: path.to_string(), op, - - upload_id: None, - parts: vec![], - buffer: oio::VectorCursor::new(), - buffer_size, - } + }; + oio::MultipartUploadWriter::new(oss_writer, total_size).with_write_min_size(write_min_size) } +} - async fn write_oneshot(&self, size: u64, body: AsyncBody) -> Result<()> { +#[async_trait] +impl oio::MultipartUploadWrite for OssWriter { + async fn write_once(&self, size: u64, body: AsyncBody) -> Result<()> { let mut req = self.core.oss_put_object_request( &self.path, Some(size), @@ -80,38 +78,51 @@ impl OssWriter { } } - async fn initiate_upload(&self) -> Result { - let resp = self.core.oss_initiate_upload(&self.path, &self.op).await?; - match resp.status() { + async fn initiate_part(&self) -> Result { + let resp = self + .core + .oss_initiate_upload( + &self.path, + self.op.content_type(), + self.op.content_disposition(), + self.op.cache_control(), + false, + ) + .await?; + + let status = resp.status(); + + match status { StatusCode::OK => { let bs = resp.into_body().bytes().await?; + let result: InitiateMultipartUploadResult = quick_xml::de::from_reader(bs.reader()).map_err(new_xml_deserialize_error)?; + Ok(result.upload_id) } _ => Err(parse_error(resp).await?), } } - async fn write_part(&self, upload_id: &str, bs: Bytes) -> Result { - // Aliyun OSS requires part number must between [1..=10000] - let part_number = self.parts.len() + 1; - let mut req = self + async fn write_part( + &self, + upload_id: &str, + part_number: usize, + size: u64, + body: AsyncBody, + ) -> Result { + // OSS requires part number must between [1..=10000] + let part_number = part_number + 1; + + let resp = self .core - .oss_upload_part_request( - &self.path, - upload_id, - part_number, - false, - Some(bs.len() as u64), - AsyncBody::Bytes(bs), - ) + .oss_upload_part_request(&self.path, upload_id, part_number, false, size, body) .await?; - self.core.sign(&mut req).await?; + let status = resp.status(); - let resp = self.core.send(req).await?; - match resp.status() { + match status { StatusCode::OK => { let etag = parse_etag(resp.headers())? .ok_or_else(|| { @@ -121,103 +132,54 @@ impl OssWriter { ) })? .to_string(); + resp.into_body().consume().await?; - Ok(MultipartUploadPart { part_number, etag }) + + Ok(oio::MultipartUploadPart { part_number, etag }) } _ => Err(parse_error(resp).await?), } } -} -#[async_trait] -impl oio::Write for OssWriter { - async fn write(&mut self, bs: Bytes) -> Result<()> { - let upload_id = match &self.upload_id { - Some(upload_id) => upload_id, - None => { - if self.op.content_length().unwrap_or_default() == bs.len() as u64 { - return self - .write_oneshot(bs.len() as u64, AsyncBody::Bytes(bs)) - .await; - } else { - let upload_id = self.initiate_upload().await?; - self.upload_id = Some(upload_id); - self.upload_id.as_deref().unwrap() - } - } - }; + async fn complete_part( + &self, + upload_id: &str, + parts: &[oio::MultipartUploadPart], + ) -> Result<()> { + let parts = parts + .iter() + .map(|p| MultipartUploadPart { + part_number: p.part_number, + etag: p.etag.clone(), + }) + .collect(); - // Ignore empty bytes - if bs.is_empty() { - return Ok(()); - } + let resp = self + .core + .oss_complete_multipart_upload_request(&self.path, upload_id, false, parts) + .await?; - self.buffer.push(bs); - // Return directly if the buffer is not full - if self.buffer.len() <= self.buffer_size { - return Ok(()); - } + let status = resp.status(); - let bs = self.buffer.peak_at_least(self.buffer_size); - let size = bs.len(); + match status { + StatusCode::OK => { + resp.into_body().consume().await?; - match self.write_part(upload_id, bs).await { - Ok(part) => { - self.buffer.take(size); - self.parts.push(part); Ok(()) } - Err(e) => { - // If the upload fails, we should pop the given bs to make sure - // write is re-enter safe. - self.buffer.pop(); - Err(e) - } + _ => Err(parse_error(resp).await?), } } - async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> { - self.write_oneshot(size, AsyncBody::Stream(s)).await - } - - // TODO: we can cancel the upload by sending an abort request. - async fn abort(&mut self) -> Result<()> { - Err(Error::new( - ErrorKind::Unsupported, - "output writer doesn't support abort", - )) - } - - async fn close(&mut self) -> Result<()> { - let upload_id = if let Some(upload_id) = &self.upload_id { - upload_id - } else { - return Ok(()); - }; - - // Make sure internal buffer has been flushed. - if !self.buffer.is_empty() { - let bs = self.buffer.peak_exact(self.buffer.len()); - - match self.write_part(upload_id, bs).await { - Ok(part) => { - self.buffer.clear(); - self.parts.push(part); - } - Err(e) => { - return Err(e); - } - } - } - + async fn abort_part(&self, upload_id: &str) -> Result<()> { let resp = self .core - .oss_complete_multipart_upload_request(&self.path, upload_id, false, &self.parts) + .oss_abort_multipart_upload(&self.path, upload_id) .await?; match resp.status() { - StatusCode::OK => { + // OSS returns code 204 if abort succeeds. + StatusCode::NO_CONTENT => { resp.into_body().consume().await?; - Ok(()) } _ => Err(parse_error(resp).await?),