Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: oss multipart uploads write #2723

Merged
merged 1 commit into from
Jul 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion core/src/services/oss/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,7 @@ pub struct OssBackend {
impl Accessor for OssBackend {
type Reader = IncomingAsyncBody;
type BlockingReader = ();
type Writer = OssWriter;
type Writer = oio::MultipartUploadWriter<OssWriter>;
type BlockingWriter = ();
type Appender = OssAppender;
type Pager = OssPager;
Expand All @@ -412,6 +412,7 @@ impl Accessor for OssBackend {
write_can_sink: true,
write_with_cache_control: true,
write_with_content_type: true,
write_with_content_disposition: true,
write_without_content_length: true,
delete: true,
create_dir: true,
Expand Down
58 changes: 33 additions & 25 deletions core/src/services/oss/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -487,27 +487,13 @@ impl OssCore {
}

pub async fn oss_initiate_upload(
&self,
path: &str,
args: &OpWrite,
) -> Result<Response<IncomingAsyncBody>> {
let cache_control = args.cache_control();
let req = self
.oss_initiate_upload_request(path, None, None, cache_control, AsyncBody::Empty, false)
.await?;
self.send(req).await
}

/// Creates a request that initiates multipart upload
async fn oss_initiate_upload_request(
&self,
path: &str,
content_type: Option<&str>,
content_disposition: Option<&str>,
cache_control: Option<&str>,
body: AsyncBody,
is_presign: bool,
) -> Result<Request<AsyncBody>> {
) -> Result<Response<IncomingAsyncBody>> {
let path = build_abs_path(&self.root, path);
let endpoint = self.get_endpoint(is_presign);
let url = format!("{}/{}?uploads", endpoint, percent_encode_path(&path));
Expand All @@ -522,9 +508,11 @@ impl OssCore {
req = req.header(CACHE_CONTROL, cache_control);
}
req = self.insert_sse_headers(req);
let mut req = req.body(body).map_err(new_request_build_error)?;
let mut req = req
.body(AsyncBody::Empty)
.map_err(new_request_build_error)?;
self.sign(&mut req).await?;
Ok(req)
self.send(req).await
}

/// Creates a request to upload a part
Expand All @@ -534,9 +522,9 @@ impl OssCore {
upload_id: &str,
part_number: usize,
is_presign: bool,
size: Option<u64>,
size: u64,
body: AsyncBody,
) -> Result<Request<AsyncBody>> {
) -> Result<Response<IncomingAsyncBody>> {
let p = build_abs_path(&self.root, path);
let endpoint = self.get_endpoint(is_presign);

Expand All @@ -549,21 +537,18 @@ impl OssCore {
);

let mut req = Request::put(&url);

if let Some(size) = size {
req = req.header(CONTENT_LENGTH, size);
}
req = req.header(CONTENT_LENGTH, size);
let mut req = req.body(body).map_err(new_request_build_error)?;
self.sign(&mut req).await?;
Ok(req)
self.send(req).await
}

pub async fn oss_complete_multipart_upload_request(
&self,
path: &str,
upload_id: &str,
is_presign: bool,
parts: &[MultipartUploadPart],
parts: Vec<MultipartUploadPart>,
) -> Result<Response<IncomingAsyncBody>> {
let p = build_abs_path(&self.root, path);
let endpoint = self.get_endpoint(is_presign);
Expand Down Expand Up @@ -592,6 +577,29 @@ impl OssCore {
self.sign(&mut req).await?;
self.send(req).await
}

/// Abort an on-going multipart upload.
/// reference docs https://www.alibabacloud.com/help/zh/oss/developer-reference/abortmultipartupload
pub async fn oss_abort_multipart_upload(
&self,
path: &str,
upload_id: &str,
) -> Result<Response<IncomingAsyncBody>> {
let p = build_abs_path(&self.root, path);

let url = format!(
"{}/{}?uploadId={}",
self.endpoint,
percent_encode_path(&p),
percent_encode_path(upload_id)
);

let mut req = Request::delete(&url)
.body(AsyncBody::Empty)
.map_err(new_request_build_error)?;
self.sign(&mut req).await?;
self.send(req).await
}
}

/// Request of DeleteObjects.
Expand Down
184 changes: 73 additions & 111 deletions core/src/services/oss/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ use std::sync::Arc;

use async_trait::async_trait;
use bytes::Buf;
use bytes::Bytes;
use http::StatusCode;

use super::core::*;
Expand All @@ -32,29 +31,28 @@ pub struct OssWriter {

op: OpWrite,
path: String,
upload_id: Option<String>,

parts: Vec<MultipartUploadPart>,
buffer: oio::VectorCursor,
buffer_size: usize,
}

impl OssWriter {
pub fn new(core: Arc<OssCore>, path: &str, op: OpWrite) -> Self {
let buffer_size = core.write_min_size;
OssWriter {
pub fn new(
core: Arc<OssCore>,
path: &str,
op: OpWrite,
) -> oio::MultipartUploadWriter<OssWriter> {
let write_min_size = core.write_min_size;
let total_size = op.content_length();
let oss_writer = OssWriter {
core,
path: path.to_string(),
op,

upload_id: None,
parts: vec![],
buffer: oio::VectorCursor::new(),
buffer_size,
}
};
oio::MultipartUploadWriter::new(oss_writer, total_size).with_write_min_size(write_min_size)
}
}

async fn write_oneshot(&self, size: u64, body: AsyncBody) -> Result<()> {
#[async_trait]
impl oio::MultipartUploadWrite for OssWriter {
async fn write_once(&self, size: u64, body: AsyncBody) -> Result<()> {
let mut req = self.core.oss_put_object_request(
&self.path,
Some(size),
Expand All @@ -80,38 +78,51 @@ impl OssWriter {
}
}

async fn initiate_upload(&self) -> Result<String> {
let resp = self.core.oss_initiate_upload(&self.path, &self.op).await?;
match resp.status() {
async fn initiate_part(&self) -> Result<String> {
let resp = self
.core
.oss_initiate_upload(
&self.path,
self.op.content_type(),
self.op.content_disposition(),
self.op.cache_control(),
false,
)
.await?;

let status = resp.status();

match status {
StatusCode::OK => {
let bs = resp.into_body().bytes().await?;

let result: InitiateMultipartUploadResult =
quick_xml::de::from_reader(bs.reader()).map_err(new_xml_deserialize_error)?;

Ok(result.upload_id)
}
_ => Err(parse_error(resp).await?),
}
}

async fn write_part(&self, upload_id: &str, bs: Bytes) -> Result<MultipartUploadPart> {
// Aliyun OSS requires part number must between [1..=10000]
let part_number = self.parts.len() + 1;
let mut req = self
async fn write_part(
&self,
upload_id: &str,
part_number: usize,
size: u64,
body: AsyncBody,
) -> Result<oio::MultipartUploadPart> {
// OSS requires part number must between [1..=10000]
let part_number = part_number + 1;

let resp = self
.core
.oss_upload_part_request(
&self.path,
upload_id,
part_number,
false,
Some(bs.len() as u64),
AsyncBody::Bytes(bs),
)
.oss_upload_part_request(&self.path, upload_id, part_number, false, size, body)
.await?;

self.core.sign(&mut req).await?;
let status = resp.status();

let resp = self.core.send(req).await?;
match resp.status() {
match status {
StatusCode::OK => {
let etag = parse_etag(resp.headers())?
.ok_or_else(|| {
Expand All @@ -121,103 +132,54 @@ impl OssWriter {
)
})?
.to_string();

resp.into_body().consume().await?;
Ok(MultipartUploadPart { part_number, etag })

Ok(oio::MultipartUploadPart { part_number, etag })
}
_ => Err(parse_error(resp).await?),
}
}
}

#[async_trait]
impl oio::Write for OssWriter {
async fn write(&mut self, bs: Bytes) -> Result<()> {
let upload_id = match &self.upload_id {
Some(upload_id) => upload_id,
None => {
if self.op.content_length().unwrap_or_default() == bs.len() as u64 {
return self
.write_oneshot(bs.len() as u64, AsyncBody::Bytes(bs))
.await;
} else {
let upload_id = self.initiate_upload().await?;
self.upload_id = Some(upload_id);
self.upload_id.as_deref().unwrap()
}
}
};
async fn complete_part(
&self,
upload_id: &str,
parts: &[oio::MultipartUploadPart],
) -> Result<()> {
let parts = parts
.iter()
.map(|p| MultipartUploadPart {
part_number: p.part_number,
etag: p.etag.clone(),
})
.collect();

// Ignore empty bytes
if bs.is_empty() {
return Ok(());
}
let resp = self
.core
.oss_complete_multipart_upload_request(&self.path, upload_id, false, parts)
.await?;

self.buffer.push(bs);
// Return directly if the buffer is not full
if self.buffer.len() <= self.buffer_size {
return Ok(());
}
let status = resp.status();

let bs = self.buffer.peak_at_least(self.buffer_size);
let size = bs.len();
match status {
StatusCode::OK => {
resp.into_body().consume().await?;

match self.write_part(upload_id, bs).await {
Ok(part) => {
self.buffer.take(size);
self.parts.push(part);
Ok(())
}
Err(e) => {
// If the upload fails, we should pop the given bs to make sure
// write is re-enter safe.
self.buffer.pop();
Err(e)
}
_ => Err(parse_error(resp).await?),
}
}

async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> {
self.write_oneshot(size, AsyncBody::Stream(s)).await
}

// TODO: we can cancel the upload by sending an abort request.
async fn abort(&mut self) -> Result<()> {
Err(Error::new(
ErrorKind::Unsupported,
"output writer doesn't support abort",
))
}

async fn close(&mut self) -> Result<()> {
let upload_id = if let Some(upload_id) = &self.upload_id {
upload_id
} else {
return Ok(());
};

// Make sure internal buffer has been flushed.
if !self.buffer.is_empty() {
let bs = self.buffer.peak_exact(self.buffer.len());

match self.write_part(upload_id, bs).await {
Ok(part) => {
self.buffer.clear();
self.parts.push(part);
}
Err(e) => {
return Err(e);
}
}
}

async fn abort_part(&self, upload_id: &str) -> Result<()> {
let resp = self
.core
.oss_complete_multipart_upload_request(&self.path, upload_id, false, &self.parts)
.oss_abort_multipart_upload(&self.path, upload_id)
.await?;
match resp.status() {
StatusCode::OK => {
// OSS returns code 204 if abort succeeds.
StatusCode::NO_CONTENT => {
resp.into_body().consume().await?;

Ok(())
}
_ => Err(parse_error(resp).await?),
Expand Down