Skip to content

Commit

Permalink
feat: implement concurrent MultipartUploadWriter
Browse files Browse the repository at this point in the history
  • Loading branch information
WenyXu committed Jan 4, 2024
1 parent dd2e68f commit aec2a10
Show file tree
Hide file tree
Showing 8 changed files with 180 additions and 108 deletions.
261 changes: 159 additions & 102 deletions core/src/raw/oio/write/multipart_upload_write.rs

Large diffs are not rendered by default.

12 changes: 12 additions & 0 deletions core/src/raw/ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -516,6 +516,7 @@ impl OpStat {
pub struct OpWrite {
append: bool,
buffer: Option<usize>,
concurrent: usize,

content_type: Option<String>,
content_disposition: Option<String>,
Expand Down Expand Up @@ -601,6 +602,17 @@ impl OpWrite {
self.cache_control = Some(cache_control.to_string());
self
}

/// Get the concurrent.
pub fn concurrent(&self) -> usize {
self.concurrent
}

/// Set the maximum concurrent write task amount.
pub fn with_concurrent(mut self, concurrent: usize) -> Self {
self.concurrent = concurrent;
self
}
}

/// Args for `copy` operation.
Expand Down
3 changes: 2 additions & 1 deletion core/src/services/b2/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -376,9 +376,10 @@ impl Accessor for B2Backend {
}

async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
let concurrent = args.concurrent();
let writer = B2Writer::new(self.core.clone(), path, args);

let w = oio::MultipartUploadWriter::new(writer);
let w = oio::MultipartUploadWriter::new(writer, concurrent);

Ok((RpWrite::default(), w))
}
Expand Down
2 changes: 1 addition & 1 deletion core/src/services/cos/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,7 @@ impl Accessor for CosBackend {
let w = if args.append() {
CosWriters::Two(oio::AppendObjectWriter::new(writer))
} else {
CosWriters::One(oio::MultipartUploadWriter::new(writer))
CosWriters::One(oio::MultipartUploadWriter::new(writer, args.concurrent()))
};

Ok((RpWrite::default(), w))
Expand Down
2 changes: 1 addition & 1 deletion core/src/services/obs/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@ impl Accessor for ObsBackend {
let w = if args.append() {
ObsWriters::Two(oio::AppendObjectWriter::new(writer))
} else {
ObsWriters::One(oio::MultipartUploadWriter::new(writer))
ObsWriters::One(oio::MultipartUploadWriter::new(writer, args.concurrent()))
};

Ok((RpWrite::default(), w))
Expand Down
2 changes: 1 addition & 1 deletion core/src/services/oss/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -492,7 +492,7 @@ impl Accessor for OssBackend {
let w = if args.append() {
OssWriters::Two(oio::AppendObjectWriter::new(writer))
} else {
OssWriters::One(oio::MultipartUploadWriter::new(writer))
OssWriters::One(oio::MultipartUploadWriter::new(writer, args.concurrent()))
};

Ok((RpWrite::default(), w))
Expand Down
3 changes: 2 additions & 1 deletion core/src/services/s3/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1085,9 +1085,10 @@ impl Accessor for S3Backend {
}

async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
let concurrent = args.concurrent();
let writer = S3Writer::new(self.core.clone(), path, args);

let w = oio::MultipartUploadWriter::new(writer);
let w = oio::MultipartUploadWriter::new(writer, concurrent);

Ok((RpWrite::default(), w))
}
Expand Down
3 changes: 2 additions & 1 deletion core/src/services/upyun/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -316,9 +316,10 @@ impl Accessor for UpyunBackend {
}

async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
let concurrent = args.concurrent();
let writer = UpyunWriter::new(self.core.clone(), args, path.to_string());

let w = oio::MultipartUploadWriter::new(writer);
let w = oio::MultipartUploadWriter::new(writer, concurrent);

Ok((RpWrite::default(), w))
}
Expand Down

0 comments on commit aec2a10

Please sign in to comment.