From 8ea4f225e40ed0209d8d420105b4de54cff6e74b Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Sun, 20 Aug 2023 16:01:29 +0800 Subject: [PATCH] feat: Implement RFC-2758 Merge Append Into Write (#2880) * Save work Signed-off-by: Xuanwo * Fix test Signed-off-by: Xuanwo * fix binding Signed-off-by: Xuanwo * allow append for azblob Signed-off-by: Xuanwo * Fix test Signed-off-by: Xuanwo * Fix write test Signed-off-by: Xuanwo * Make sure buffer has been flushed Signed-off-by: Xuanwo * disable hdfs for now Signed-off-by: Xuanwo * ftp doesn't support append yet Signed-off-by: Xuanwo --------- Signed-off-by: Xuanwo --- bindings/java/src/operator.rs | 2 +- bindings/nodejs/src/lib.rs | 6 +- core/src/layers/async_backtrace.rs | 6 - core/src/layers/await_tree.rs | 8 - core/src/layers/blocking.rs | 5 - core/src/layers/chaos.rs | 5 - core/src/layers/complete.rs | 67 ----- core/src/layers/concurrent_limit.rs | 26 -- core/src/layers/error_context.rs | 42 --- core/src/layers/immutable_index.rs | 5 - core/src/layers/logging.rs | 124 --------- core/src/layers/madsim.rs | 12 - core/src/layers/metrics.rs | 5 - core/src/layers/minitrace.rs | 40 --- core/src/layers/oteltrace.rs | 5 - core/src/layers/prometheus.rs | 5 - core/src/layers/retry.rs | 84 ------ core/src/layers/throttle.rs | 41 --- core/src/layers/timeout.rs | 43 +-- core/src/layers/tracing.rs | 5 - core/src/layers/type_eraser.rs | 8 - core/src/raw/accessor.rs | 26 -- core/src/raw/adapters/kv/backend.rs | 1 - core/src/raw/adapters/typed_kv/backend.rs | 1 - core/src/raw/layer.rs | 14 - core/src/raw/oio/append/api.rs | 108 -------- core/src/raw/oio/append/mod.rs | 19 -- core/src/raw/oio/cursor.rs | 2 + core/src/raw/oio/mod.rs | 3 - core/src/raw/oio/page/into_flat_page.rs | 1 - .../oio/read/into_seekable_read_by_range.rs | 1 - core/src/raw/oio/write/append_object_write.rs | 176 ++++++++++++ core/src/raw/oio/write/mod.rs | 7 + core/src/raw/oio/write/two_ways_write.rs | 64 +++++ core/src/raw/operation.rs | 3 - core/src/raw/ops.rs | 69 ++--- core/src/raw/rps.rs | 11 - core/src/services/azblob/appender.rs | 145 ---------- core/src/services/azblob/backend.rs | 21 +- core/src/services/azblob/core.rs | 2 +- core/src/services/azblob/mod.rs | 1 - core/src/services/azblob/writer.rs | 127 ++++++++- core/src/services/azdfs/backend.rs | 1 - core/src/services/cos/appender.rs | 162 ----------- core/src/services/cos/backend.rs | 37 +-- core/src/services/cos/core.rs | 4 +- core/src/services/cos/mod.rs | 1 - core/src/services/cos/writer.rs | 48 +++- core/src/services/dropbox/backend.rs | 1 - core/src/services/fs/appender.rs | 49 ---- core/src/services/fs/backend.rs | 35 +-- core/src/services/fs/mod.rs | 1 - core/src/services/ftp/backend.rs | 2 +- core/src/services/gcs/backend.rs | 1 - core/src/services/gdrive/backend.rs | 1 - core/src/services/ghac/backend.rs | 1 - core/src/services/hdfs/appender.rs | 48 ---- core/src/services/hdfs/backend.rs | 53 +--- core/src/services/hdfs/mod.rs | 1 - core/src/services/http/backend.rs | 1 - core/src/services/ipfs/backend.rs | 1 - core/src/services/ipmfs/backend.rs | 1 - core/src/services/obs/appender.rs | 141 ---------- core/src/services/obs/backend.rs | 37 +-- core/src/services/obs/core.rs | 6 +- core/src/services/obs/mod.rs | 1 - core/src/services/obs/writer.rs | 168 ++++-------- core/src/services/onedrive/backend.rs | 1 - core/src/services/oss/appender.rs | 142 ---------- core/src/services/oss/backend.rs | 37 +-- core/src/services/oss/core.rs | 4 +- core/src/services/oss/mod.rs | 1 - core/src/services/oss/writer.rs | 52 +++- core/src/services/s3/backend.rs | 1 - core/src/services/sftp/backend.rs | 29 +- core/src/services/sftp/writer.rs | 13 - core/src/services/supabase/backend.rs | 1 - core/src/services/vercel_artifacts/backend.rs | 1 - core/src/services/wasabi/backend.rs | 1 - core/src/services/webdav/backend.rs | 1 - core/src/services/webhdfs/backend.rs | 1 - core/src/types/appender.rs | 252 ------------------ core/src/types/capability.rs | 70 +++-- core/src/types/mod.rs | 3 - core/src/types/operator/operator.rs | 160 ----------- core/src/types/operator/operator_futures.rs | 100 ++----- core/tests/behavior/append.rs | 39 ++- core/tests/behavior/write.rs | 26 +- 88 files changed, 724 insertions(+), 2359 deletions(-) delete mode 100644 core/src/raw/oio/append/api.rs delete mode 100644 core/src/raw/oio/append/mod.rs create mode 100644 core/src/raw/oio/write/append_object_write.rs create mode 100644 core/src/raw/oio/write/two_ways_write.rs delete mode 100644 core/src/services/azblob/appender.rs delete mode 100644 core/src/services/cos/appender.rs delete mode 100644 core/src/services/fs/appender.rs delete mode 100644 core/src/services/hdfs/appender.rs delete mode 100644 core/src/services/obs/appender.rs delete mode 100644 core/src/services/oss/appender.rs delete mode 100644 core/src/types/appender.rs diff --git a/bindings/java/src/operator.rs b/bindings/java/src/operator.rs index 0fc184d2795..7730bd0386b 100644 --- a/bindings/java/src/operator.rs +++ b/bindings/java/src/operator.rs @@ -144,7 +144,7 @@ fn intern_append( } async fn do_append(op: &mut Operator, path: String, content: Vec) -> Result<()> { - Ok(op.append(&path, content).await?) + Ok(op.write_with(&path, content).append(true).await?) } /// # Safety diff --git a/bindings/nodejs/src/lib.rs b/bindings/nodejs/src/lib.rs index b615bc7e92f..7ecd68ee8df 100644 --- a/bindings/nodejs/src/lib.rs +++ b/bindings/nodejs/src/lib.rs @@ -224,7 +224,11 @@ impl Operator { Either::B(s) => s.into_bytes(), }; - self.0.append(&path, c).await.map_err(format_napi_error) + self.0 + .write_with(&path, c) + .append(true) + .await + .map_err(format_napi_error) } /// Copy file according to given `from` and `to` path. diff --git a/core/src/layers/async_backtrace.rs b/core/src/layers/async_backtrace.rs index 7ce2d875700..6fc5d3a1a81 100644 --- a/core/src/layers/async_backtrace.rs +++ b/core/src/layers/async_backtrace.rs @@ -65,7 +65,6 @@ impl LayeredAccessor for AsyncBacktraceAccessor { type BlockingReader = A::BlockingReader; type Writer = A::Writer; type BlockingWriter = A::BlockingWriter; - type Appender = A::Appender; type Pager = A::Pager; type BlockingPager = A::BlockingPager; @@ -83,11 +82,6 @@ impl LayeredAccessor for AsyncBacktraceAccessor { self.inner.write(path, args).await } - #[async_backtrace::framed] - async fn append(&self, path: &str, args: OpAppend) -> Result<(RpAppend, Self::Appender)> { - self.inner.append(path, args).await - } - #[async_backtrace::framed] async fn copy(&self, from: &str, to: &str, args: OpCopy) -> Result { self.inner.copy(from, to, args).await diff --git a/core/src/layers/await_tree.rs b/core/src/layers/await_tree.rs index 6f6d32b0b87..dc50656f6d2 100644 --- a/core/src/layers/await_tree.rs +++ b/core/src/layers/await_tree.rs @@ -73,7 +73,6 @@ impl LayeredAccessor for AwaitTreeAccessor { type BlockingReader = A::BlockingReader; type Writer = A::Writer; type BlockingWriter = A::BlockingWriter; - type Appender = A::Appender; type Pager = A::Pager; type BlockingPager = A::BlockingPager; @@ -95,13 +94,6 @@ impl LayeredAccessor for AwaitTreeAccessor { .await } - async fn append(&self, path: &str, args: OpAppend) -> Result<(RpAppend, Self::Appender)> { - self.inner - .append(path, args) - .instrument_await(format!("opendal::{}", Operation::Append)) - .await - } - async fn copy(&self, from: &str, to: &str, args: OpCopy) -> Result { self.inner() .copy(from, to, args) diff --git a/core/src/layers/blocking.rs b/core/src/layers/blocking.rs index 47dbcba40c1..7b80b595642 100644 --- a/core/src/layers/blocking.rs +++ b/core/src/layers/blocking.rs @@ -71,7 +71,6 @@ impl LayeredAccessor for BlockingAccessor { type BlockingReader = BlockingWrapper; type Writer = A::Writer; type BlockingWriter = BlockingWrapper; - type Appender = A::Appender; type Pager = A::Pager; type BlockingPager = BlockingWrapper; @@ -97,10 +96,6 @@ impl LayeredAccessor for BlockingAccessor { self.inner.write(path, args).await } - async fn append(&self, path: &str, args: OpAppend) -> Result<(RpAppend, Self::Appender)> { - self.inner.append(path, args).await - } - async fn copy(&self, from: &str, to: &str, args: OpCopy) -> Result { self.inner.copy(from, to, args).await } diff --git a/core/src/layers/chaos.rs b/core/src/layers/chaos.rs index 618c5e612f1..02d7fb9a5a6 100644 --- a/core/src/layers/chaos.rs +++ b/core/src/layers/chaos.rs @@ -106,7 +106,6 @@ impl LayeredAccessor for ChaosAccessor { type BlockingReader = ChaosReader; type Writer = A::Writer; type BlockingWriter = A::BlockingWriter; - type Appender = A::Appender; type Pager = A::Pager; type BlockingPager = A::BlockingPager; @@ -135,10 +134,6 @@ impl LayeredAccessor for ChaosAccessor { self.inner.blocking_write(path, args) } - async fn append(&self, path: &str, args: OpAppend) -> Result<(RpAppend, Self::Appender)> { - self.inner.append(path, args).await - } - async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Pager)> { self.inner.list(path, args).await } diff --git a/core/src/layers/complete.rs b/core/src/layers/complete.rs index 0dda5965c55..f95ba08510d 100644 --- a/core/src/layers/complete.rs +++ b/core/src/layers/complete.rs @@ -367,7 +367,6 @@ impl LayeredAccessor for CompleteReaderAccessor { type BlockingReader = CompleteReader; type Writer = CompleteWriter; type BlockingWriter = CompleteWriter; - type Appender = CompleteAppender; type Pager = CompletePager; type BlockingPager = CompletePager; @@ -446,18 +445,6 @@ impl LayeredAccessor for CompleteReaderAccessor { .map(|(rp, w)| (rp, CompleteWriter::new(w, size))) } - async fn append(&self, path: &str, args: OpAppend) -> Result<(RpAppend, Self::Appender)> { - let capability = self.meta.full_capability(); - if !capability.append { - return new_capability_unsupported_error(Operation::Append); - } - - self.inner - .append(path, args) - .await - .map(|(rp, a)| (rp, CompleteAppender::new(a))) - } - async fn create_dir(&self, path: &str, args: OpCreateDir) -> Result { let capability = self.meta.full_capability(); if !capability.create_dir { @@ -854,54 +841,6 @@ where } } -pub struct CompleteAppender { - inner: Option, -} - -impl CompleteAppender { - pub fn new(inner: A) -> CompleteAppender { - CompleteAppender { inner: Some(inner) } - } -} - -/// Check if the appender has been closed while debug_assertions enabled. -/// This code will never be executed in release mode. -#[cfg(debug_assertions)] -impl Drop for CompleteAppender { - fn drop(&mut self) { - if self.inner.is_some() { - // Do we need to panic here? - log::warn!("appender has not been closed, must be a bug") - } - } -} - -#[async_trait] -impl oio::Append for CompleteAppender -where - A: oio::Append, -{ - async fn append(&mut self, bs: Bytes) -> Result<()> { - let a = self - .inner - .as_mut() - .ok_or_else(|| Error::new(ErrorKind::Unexpected, "appender has been closed"))?; - - a.append(bs).await - } - - async fn close(&mut self) -> Result<()> { - let a = self - .inner - .as_mut() - .ok_or_else(|| Error::new(ErrorKind::Unexpected, "appender has been closed"))?; - - a.close().await?; - self.inner = None; - Ok(()) - } -} - fn new_capability_unsupported_error(operation: Operation) -> Result { Err(Error::new(ErrorKind::Unsupported, "operation is not supported").with_operation(operation)) } @@ -955,7 +894,6 @@ mod tests { type BlockingReader = (); type Writer = (); type BlockingWriter = (); - type Appender = (); type Pager = (); type BlockingPager = (); @@ -978,10 +916,6 @@ mod tests { Ok((RpWrite::new(), ())) } - async fn append(&self, _: &str, _: OpAppend) -> Result<(RpAppend, Self::Appender)> { - Ok((RpAppend::new(), ())) - } - async fn copy(&self, _: &str, _: &str, _: OpCopy) -> Result { Ok(RpCopy {}) } @@ -1045,7 +979,6 @@ mod tests { capability_test!(stat, |op| { op.stat("/path/to/mock_file") }); capability_test!(read, |op| { op.read("/path/to/mock_file") }); capability_test!(write, |op| { op.writer("/path/to/mock_file") }); - capability_test!(append, |op| { op.appender("/path/to/mock_file") }); capability_test!(create_dir, |op| { op.create_dir("/path/to/mock_dir/") }); capability_test!(delete, |op| { op.delete("/path/to/mock_file") }); capability_test!(copy, |op| { diff --git a/core/src/layers/concurrent_limit.rs b/core/src/layers/concurrent_limit.rs index 1e41030cd0d..9cef0fb9b62 100644 --- a/core/src/layers/concurrent_limit.rs +++ b/core/src/layers/concurrent_limit.rs @@ -86,7 +86,6 @@ impl LayeredAccessor for ConcurrentLimitAccessor { type BlockingReader = ConcurrentLimitWrapper; type Writer = ConcurrentLimitWrapper; type BlockingWriter = ConcurrentLimitWrapper; - type Appender = ConcurrentLimitWrapper; type Pager = ConcurrentLimitWrapper; type BlockingPager = ConcurrentLimitWrapper; @@ -132,20 +131,6 @@ impl LayeredAccessor for ConcurrentLimitAccessor { .map(|(rp, w)| (rp, ConcurrentLimitWrapper::new(w, permit))) } - async fn append(&self, path: &str, args: OpAppend) -> Result<(RpAppend, Self::Appender)> { - let permit = self - .semaphore - .clone() - .acquire_owned() - .await - .expect("semaphore must be valid"); - - self.inner - .append(path, args) - .await - .map(|(rp, a)| (rp, ConcurrentLimitWrapper::new(a, permit))) - } - async fn stat(&self, path: &str, args: OpStat) -> Result { let _permit = self .semaphore @@ -327,17 +312,6 @@ impl oio::BlockingWrite for ConcurrentLimitWrapper { } } -#[async_trait] -impl oio::Append for ConcurrentLimitWrapper { - async fn append(&mut self, bs: Bytes) -> Result<()> { - self.inner.append(bs).await - } - - async fn close(&mut self) -> Result<()> { - self.inner.close().await - } -} - #[async_trait] impl oio::Page for ConcurrentLimitWrapper { async fn next(&mut self) -> Result>> { diff --git a/core/src/layers/error_context.rs b/core/src/layers/error_context.rs index 3e702417339..bfe9be4dfef 100644 --- a/core/src/layers/error_context.rs +++ b/core/src/layers/error_context.rs @@ -25,7 +25,6 @@ use async_trait::async_trait; use bytes::Bytes; use futures::TryFutureExt; -use crate::raw::oio::AppendOperation; use crate::raw::oio::PageOperation; use crate::raw::oio::ReadOperation; use crate::raw::oio::WriteOperation; @@ -71,7 +70,6 @@ impl LayeredAccessor for ErrorContextAccessor { type BlockingReader = ErrorContextWrapper; type Writer = ErrorContextWrapper; type BlockingWriter = ErrorContextWrapper; - type Appender = ErrorContextWrapper; type Pager = ErrorContextWrapper; type BlockingPager = ErrorContextWrapper; @@ -139,27 +137,6 @@ impl LayeredAccessor for ErrorContextAccessor { .await } - async fn append(&self, path: &str, args: OpAppend) -> Result<(RpAppend, Self::Appender)> { - self.inner - .append(path, args) - .map_ok(|(rp, os)| { - ( - rp, - ErrorContextWrapper { - scheme: self.meta.scheme(), - path: path.to_string(), - inner: os, - }, - ) - }) - .map_err(|err| { - err.with_operation(Operation::Append) - .with_context("service", self.meta.scheme()) - .with_context("path", path) - }) - .await - } - async fn copy(&self, from: &str, to: &str, args: OpCopy) -> Result { self.inner .copy(from, to, args) @@ -477,25 +454,6 @@ impl oio::BlockingWrite for ErrorContextWrapper { } } -#[async_trait::async_trait] -impl oio::Append for ErrorContextWrapper { - async fn append(&mut self, bs: Bytes) -> Result<()> { - self.inner.append(bs).await.map_err(|err| { - err.with_operation(AppendOperation::Append) - .with_context("service", self.scheme) - .with_context("path", &self.path) - }) - } - - async fn close(&mut self) -> Result<()> { - self.inner.close().await.map_err(|err| { - err.with_operation(AppendOperation::Close) - .with_context("service", self.scheme) - .with_context("path", &self.path) - }) - } -} - #[async_trait::async_trait] impl oio::Page for ErrorContextWrapper { async fn next(&mut self) -> Result>> { diff --git a/core/src/layers/immutable_index.rs b/core/src/layers/immutable_index.rs index dc167fbb5b4..31858211c21 100644 --- a/core/src/layers/immutable_index.rs +++ b/core/src/layers/immutable_index.rs @@ -140,7 +140,6 @@ impl LayeredAccessor for ImmutableIndexAccessor { type BlockingReader = A::BlockingReader; type Writer = A::Writer; type BlockingWriter = A::BlockingWriter; - type Appender = A::Appender; type Pager = ImmutableDir; type BlockingPager = ImmutableDir; @@ -196,10 +195,6 @@ impl LayeredAccessor for ImmutableIndexAccessor { self.inner.blocking_write(path, args) } - async fn append(&self, path: &str, args: OpAppend) -> Result<(RpAppend, Self::Appender)> { - self.inner.append(path, args).await - } - fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList, Self::BlockingPager)> { let mut path = path; if path == "/" { diff --git a/core/src/layers/logging.rs b/core/src/layers/logging.rs index 73fa17b8fa6..9a09cc8d93e 100644 --- a/core/src/layers/logging.rs +++ b/core/src/layers/logging.rs @@ -217,7 +217,6 @@ impl LayeredAccessor for LoggingAccessor { type BlockingReader = LoggingReader; type Writer = LoggingWriter; type BlockingWriter = LoggingWriter; - type Appender = LoggingAppender; type Pager = LoggingPager; type BlockingPager = LoggingPager; @@ -367,45 +366,6 @@ impl LayeredAccessor for LoggingAccessor { }) } - async fn append(&self, path: &str, args: OpAppend) -> Result<(RpAppend, Self::Appender)> { - debug!( - target: LOGGING_TARGET, - "service={} operation={} path={} -> started", - self.ctx.scheme, - Operation::Append, - path - ); - - self.inner - .append(path, args) - .await - .map(|(rp, a)| { - debug!( - target: LOGGING_TARGET, - "service={} operation={} path={} -> start appending", - self.ctx.scheme, - Operation::Append, - path, - ); - let a = LoggingAppender::new(self.ctx.clone(), Operation::Append, path, a); - (rp, a) - }) - .map_err(|err| { - if let Some(lvl) = self.ctx.error_level(&err) { - log!( - target: LOGGING_TARGET, - lvl, - "service={} operation={} path={} -> {}", - self.ctx.scheme, - Operation::Append, - path, - self.ctx.error_print(&err) - ) - }; - err - }) - } - async fn copy(&self, from: &str, to: &str, args: OpCopy) -> Result { debug!( target: LOGGING_TARGET, @@ -1490,90 +1450,6 @@ impl oio::BlockingWrite for LoggingWriter { } } -pub struct LoggingAppender { - ctx: LoggingContext, - op: Operation, - path: String, - - inner: A, -} - -impl LoggingAppender { - fn new(ctx: LoggingContext, op: Operation, path: &str, appender: A) -> Self { - Self { - ctx, - op, - path: path.to_string(), - - inner: appender, - } - } -} - -#[async_trait] -impl oio::Append for LoggingAppender { - async fn append(&mut self, bs: Bytes) -> Result<()> { - let len = bs.len(); - - match self.inner.append(bs).await { - Ok(_) => { - trace!( - target: LOGGING_TARGET, - "service={} operation={} path={} -> data append {}B", - self.ctx.scheme, - self.op, - self.path, - len - ); - Ok(()) - } - Err(err) => { - if let Some(lvl) = self.ctx.error_level(&err) { - log!( - target: LOGGING_TARGET, - lvl, - "service={} operation={} path={} -> data append failed: {}", - self.ctx.scheme, - self.op, - self.path, - self.ctx.error_print(&err) - ) - } - Err(err) - } - } - } - - async fn close(&mut self) -> Result<()> { - match self.inner.close().await { - Ok(_) => { - debug!( - target: LOGGING_TARGET, - "service={} operation={} path={} -> data appended finished", - self.ctx.scheme, - self.op, - self.path, - ); - Ok(()) - } - Err(err) => { - if let Some(lvl) = self.ctx.error_level(&err) { - log!( - target: LOGGING_TARGET, - lvl, - "service={} operation={} path={} -> data appender close failed: {}", - self.ctx.scheme, - self.op, - self.path, - self.ctx.error_print(&err), - ) - } - Err(err) - } - } - } -} - pub struct LoggingPager

{ ctx: LoggingContext, path: String, diff --git a/core/src/layers/madsim.rs b/core/src/layers/madsim.rs index 87d726c9961..d7d483989ee 100644 --- a/core/src/layers/madsim.rs +++ b/core/src/layers/madsim.rs @@ -149,7 +149,6 @@ impl LayeredAccessor for MadsimAccessor { type BlockingReader = (); type Writer = MadsimWriter; type BlockingWriter = (); - type Appender = (); type Pager = MadsimPager; type BlockingPager = (); @@ -221,17 +220,6 @@ impl LayeredAccessor for MadsimAccessor { } } - async fn append( - &self, - path: &str, - args: OpAppend, - ) -> crate::Result<(RpAppend, Self::Appender)> { - Err(Error::new( - ErrorKind::Unsupported, - "will not be supported in MadsimLayer", - )) - } - async fn list(&self, path: &str, args: OpList) -> crate::Result<(RpList, Self::Pager)> { Err(Error::new( ErrorKind::Unsupported, diff --git a/core/src/layers/metrics.rs b/core/src/layers/metrics.rs index e3d0c539310..1eade833bd3 100644 --- a/core/src/layers/metrics.rs +++ b/core/src/layers/metrics.rs @@ -412,7 +412,6 @@ impl LayeredAccessor for MetricsAccessor { type BlockingReader = MetricWrapper; type Writer = MetricWrapper; type BlockingWriter = MetricWrapper; - type Appender = A::Appender; type Pager = A::Pager; type BlockingPager = A::BlockingPager; @@ -510,10 +509,6 @@ impl LayeredAccessor for MetricsAccessor { .await } - async fn append(&self, path: &str, args: OpAppend) -> Result<(RpAppend, Self::Appender)> { - self.inner.append(path, args).await - } - async fn stat(&self, path: &str, args: OpStat) -> Result { self.handle.requests_total_stat.increment(1); diff --git a/core/src/layers/minitrace.rs b/core/src/layers/minitrace.rs index 6f574e7c1bf..1213d692ebf 100644 --- a/core/src/layers/minitrace.rs +++ b/core/src/layers/minitrace.rs @@ -25,7 +25,6 @@ use bytes::Bytes; use futures::FutureExt; use minitrace::prelude::*; -use crate::raw::oio::AppendOperation; use crate::raw::oio::PageOperation; use crate::raw::oio::ReadOperation; use crate::raw::oio::WriteOperation; @@ -137,7 +136,6 @@ impl LayeredAccessor for MinitraceAccessor { type BlockingReader = MinitraceWrapper; type Writer = MinitraceWrapper; type BlockingWriter = MinitraceWrapper; - type Appender = MinitraceWrapper; type Pager = MinitraceWrapper; type BlockingPager = MinitraceWrapper; @@ -185,21 +183,6 @@ impl LayeredAccessor for MinitraceAccessor { .await } - #[trace(enter_on_poll = true)] - async fn append(&self, path: &str, args: OpAppend) -> Result<(RpAppend, Self::Appender)> { - self.inner - .append(path, args) - .map(|v| { - v.map(|(rp, r)| { - ( - rp, - MinitraceWrapper::new(Span::enter_with_local_parent("AppendOperation"), r), - ) - }) - }) - .await - } - #[trace(enter_on_poll = true)] async fn copy(&self, from: &str, to: &str, args: OpCopy) -> Result { self.inner().copy(from, to, args).await @@ -409,29 +392,6 @@ impl oio::BlockingWrite for MinitraceWrapper { } } -#[async_trait] -impl oio::Append for MinitraceWrapper { - async fn append(&mut self, bs: Bytes) -> Result<()> { - self.inner - .append(bs) - .in_span(Span::enter_with_parent( - AppendOperation::Append.into_static(), - &self.span, - )) - .await - } - - async fn close(&mut self) -> Result<()> { - self.inner - .close() - .in_span(Span::enter_with_parent( - AppendOperation::Close.into_static(), - &self.span, - )) - .await - } -} - #[async_trait] impl oio::Page for MinitraceWrapper { async fn next(&mut self) -> Result>> { diff --git a/core/src/layers/oteltrace.rs b/core/src/layers/oteltrace.rs index c5536201937..2ae39b05c03 100644 --- a/core/src/layers/oteltrace.rs +++ b/core/src/layers/oteltrace.rs @@ -72,7 +72,6 @@ impl LayeredAccessor for OtelTraceAccessor { type BlockingReader = OtelTraceWrapper; type Writer = OtelTraceWrapper; type BlockingWriter = OtelTraceWrapper; - type Appender = A::Appender; type Pager = OtelTraceWrapper; type BlockingPager = OtelTraceWrapper; @@ -116,10 +115,6 @@ impl LayeredAccessor for OtelTraceAccessor { .map(|(rp, r)| (rp, OtelTraceWrapper::new(span, r))) } - async fn append(&self, path: &str, args: OpAppend) -> Result<(RpAppend, Self::Appender)> { - self.inner.append(path, args).await - } - async fn copy(&self, from: &str, to: &str, args: OpCopy) -> Result { let tracer = global::tracer("opendal"); let mut span = tracer.start("copy"); diff --git a/core/src/layers/prometheus.rs b/core/src/layers/prometheus.rs index 2ac2d47744a..005d6aa974d 100644 --- a/core/src/layers/prometheus.rs +++ b/core/src/layers/prometheus.rs @@ -192,7 +192,6 @@ impl LayeredAccessor for PrometheusAccessor { type BlockingReader = PrometheusMetricWrapper; type Writer = PrometheusMetricWrapper; type BlockingWriter = PrometheusMetricWrapper; - type Appender = A::Appender; type Pager = A::Pager; type BlockingPager = A::BlockingPager; @@ -298,10 +297,6 @@ impl LayeredAccessor for PrometheusAccessor { }) } - async fn append(&self, path: &str, args: OpAppend) -> Result<(RpAppend, Self::Appender)> { - self.inner.append(path, args).await - } - async fn stat(&self, path: &str, args: OpStat) -> Result { self.stats .requests_total diff --git a/core/src/layers/retry.rs b/core/src/layers/retry.rs index d9febab02d3..a240e9e62f1 100644 --- a/core/src/layers/retry.rs +++ b/core/src/layers/retry.rs @@ -35,7 +35,6 @@ use bytes::Bytes; use futures::FutureExt; use log::warn; -use crate::raw::oio::AppendOperation; use crate::raw::oio::PageOperation; use crate::raw::oio::ReadOperation; use crate::raw::oio::WriteOperation; @@ -285,7 +284,6 @@ impl LayeredAccessor for RetryAccessor { type BlockingReader = RetryWrapper; type Writer = RetryWrapper; type BlockingWriter = RetryWrapper; - type Appender = RetryWrapper; type Pager = RetryWrapper; type BlockingPager = RetryWrapper; @@ -363,32 +361,6 @@ impl LayeredAccessor for RetryAccessor { .await } - async fn append(&self, path: &str, args: OpAppend) -> Result<(RpAppend, Self::Appender)> { - { || self.inner.append(path, args.clone()) } - .retry(&self.builder) - .when(|e| e.is_temporary()) - .notify(|err, dur| { - self.notify.intercept( - err, - dur, - &[ - ("operation", Operation::Append.into_static()), - ("path", path), - ], - ) - }) - .map(|v| { - v.map(|(rp, r)| { - ( - rp, - RetryWrapper::new(r, self.notify.clone(), path, self.builder.clone()), - ) - }) - .map_err(|e| e.set_persistent()) - }) - .await - } - async fn stat(&self, path: &str, args: OpStat) -> Result { { || self.inner.stat(path, args.clone()) } .retry(&self.builder) @@ -1022,61 +994,6 @@ impl oio::BlockingWrite for RetryWra } } -#[async_trait] -impl oio::Append for RetryWrapper { - async fn append(&mut self, bs: Bytes) -> Result<()> { - let mut backoff = self.builder.build(); - - loop { - match self.inner.append(bs.clone()).await { - Ok(v) => return Ok(v), - Err(e) if !e.is_temporary() => return Err(e), - Err(e) => match backoff.next() { - None => return Err(e), - Some(dur) => { - self.notify.intercept( - &e, - dur, - &[ - ("operation", AppendOperation::Append.into_static()), - ("path", &self.path), - ], - ); - tokio::time::sleep(dur).await; - continue; - } - }, - } - } - } - - async fn close(&mut self) -> Result<()> { - let mut backoff = self.builder.build(); - - loop { - match self.inner.close().await { - Ok(v) => return Ok(v), - Err(e) if !e.is_temporary() => return Err(e), - Err(e) => match backoff.next() { - None => return Err(e), - Some(dur) => { - self.notify.intercept( - &e, - dur, - &[ - ("operation", AppendOperation::Close.into_static()), - ("path", &self.path), - ], - ); - tokio::time::sleep(dur).await; - continue; - } - }, - } - } - } -} - #[async_trait] impl oio::Page for RetryWrapper { async fn next(&mut self) -> Result>> { @@ -1173,7 +1090,6 @@ mod tests { type BlockingReader = (); type Writer = (); type BlockingWriter = (); - type Appender = (); type Pager = MockPager; type BlockingPager = (); diff --git a/core/src/layers/throttle.rs b/core/src/layers/throttle.rs index 871d4ec18a0..d929226df81 100644 --- a/core/src/layers/throttle.rs +++ b/core/src/layers/throttle.rs @@ -121,7 +121,6 @@ impl LayeredAccessor for ThrottleAccessor { type BlockingReader = ThrottleWrapper; type Writer = ThrottleWrapper; type BlockingWriter = ThrottleWrapper; - type Appender = ThrottleWrapper; type Pager = A::Pager; type BlockingPager = A::BlockingPager; @@ -147,15 +146,6 @@ impl LayeredAccessor for ThrottleAccessor { .map(|(rp, w)| (rp, ThrottleWrapper::new(w, limiter))) } - async fn append(&self, path: &str, args: OpAppend) -> Result<(RpAppend, Self::Appender)> { - let limiter = self.rate_limiter.clone(); - - self.inner - .append(path, args) - .await - .map(|(rp, a)| (rp, ThrottleWrapper::new(a, limiter))) - } - async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Pager)> { self.inner.list(path, args).await } @@ -294,34 +284,3 @@ impl oio::BlockingWrite for ThrottleWrapper { self.inner.close() } } - -#[async_trait] -impl oio::Append for ThrottleWrapper { - async fn append(&mut self, bs: Bytes) -> Result<()> { - let buf_length = NonZeroU32::new(bs.len() as u32).unwrap(); - - loop { - match self.limiter.check_n(buf_length) { - Ok(_) => return self.inner.append(bs).await, - Err(negative) => match negative { - // the query is valid but the Decider can not accommodate them. - NegativeMultiDecision::BatchNonConforming(_, not_until) => { - let wait_time = not_until.wait_time_from(DefaultClock::default().now()); - tokio::time::sleep(wait_time).await; - } - // the query was invalid as the rate limit parameters can "never" accommodate the number of cells queried for. - NegativeMultiDecision::InsufficientCapacity(_) => { - return Err(Error::new( - ErrorKind::RateLimited, - "InsufficientCapacity due to burst size being smaller than the request size", - )) - } - }, - } - } - } - - async fn close(&mut self) -> Result<()> { - self.inner.close().await - } -} diff --git a/core/src/layers/timeout.rs b/core/src/layers/timeout.rs index fb6967bd5b8..c0fb739aa13 100644 --- a/core/src/layers/timeout.rs +++ b/core/src/layers/timeout.rs @@ -24,7 +24,6 @@ use std::time::Instant; use async_trait::async_trait; use bytes::Bytes; -use crate::raw::oio::AppendOperation; use crate::raw::oio::PageOperation; use crate::raw::oio::ReadOperation; use crate::raw::oio::WriteOperation; @@ -35,7 +34,7 @@ use crate::*; /// /// # Notes /// -/// - For IO operations like `read`, `write`, and `append`, we will set a timeout +/// - For IO operations like `read`, `write`, we will set a timeout /// for each single IO operation. /// - For other operations like `stat`, and `delete`, the timeout is for the whole /// operation. @@ -147,7 +146,6 @@ impl LayeredAccessor for TimeoutAccessor { type BlockingReader = A::BlockingReader; type Writer = TimeoutWrapper; type BlockingWriter = A::BlockingWriter; - type Appender = TimeoutWrapper; type Pager = TimeoutWrapper; type BlockingPager = A::BlockingPager; @@ -179,18 +177,6 @@ impl LayeredAccessor for TimeoutAccessor { .map(|(rp, r)| (rp, TimeoutWrapper::new(r, self.timeout, self.speed))) } - async fn append(&self, path: &str, args: OpAppend) -> Result<(RpAppend, Self::Appender)> { - tokio::time::timeout(self.timeout, self.inner.append(path, args)) - .await - .map_err(|_| { - Error::new(ErrorKind::Unexpected, "operation timeout") - .with_operation(Operation::Append) - .with_context("timeout", self.timeout.as_secs_f64().to_string()) - .set_temporary() - })? - .map(|(rp, r)| (rp, TimeoutWrapper::new(r, self.timeout, self.speed))) - } - async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Pager)> { tokio::time::timeout(self.timeout, self.inner.list(path, args)) .await @@ -385,33 +371,6 @@ impl oio::Write for TimeoutWrapper { } } -#[async_trait] -impl oio::Append for TimeoutWrapper { - async fn append(&mut self, bs: Bytes) -> Result<()> { - let timeout = self.io_timeout(bs.len() as u64); - - tokio::time::timeout(timeout, self.inner.append(bs)) - .await - .map_err(|_| { - Error::new(ErrorKind::Unexpected, "operation timeout") - .with_operation(AppendOperation::Append) - .with_context("timeout", timeout.as_secs_f64().to_string()) - .set_temporary() - })? - } - - async fn close(&mut self) -> Result<()> { - tokio::time::timeout(self.timeout, self.inner.close()) - .await - .map_err(|_| { - Error::new(ErrorKind::Unexpected, "operation timeout") - .with_operation(AppendOperation::Close) - .with_context("timeout", self.timeout.as_secs_f64().to_string()) - .set_temporary() - })? - } -} - #[async_trait] impl oio::Page for TimeoutWrapper { async fn next(&mut self) -> Result>> { diff --git a/core/src/layers/tracing.rs b/core/src/layers/tracing.rs index 4191f70edb8..9042e6e35f2 100644 --- a/core/src/layers/tracing.rs +++ b/core/src/layers/tracing.rs @@ -137,7 +137,6 @@ impl LayeredAccessor for TracingAccessor { type BlockingReader = TracingWrapper; type Writer = TracingWrapper; type BlockingWriter = TracingWrapper; - type Appender = A::Appender; type Pager = TracingWrapper; type BlockingPager = TracingWrapper; @@ -171,10 +170,6 @@ impl LayeredAccessor for TracingAccessor { .map(|(rp, r)| (rp, TracingWrapper::new(Span::current(), r))) } - async fn append(&self, path: &str, args: OpAppend) -> Result<(RpAppend, Self::Appender)> { - self.inner.append(path, args).await - } - #[tracing::instrument(level = "debug", skip(self))] async fn copy(&self, from: &str, to: &str, args: OpCopy) -> Result { self.inner().copy(from, to, args).await diff --git a/core/src/layers/type_eraser.rs b/core/src/layers/type_eraser.rs index f5aaa389f97..8d58221eacd 100644 --- a/core/src/layers/type_eraser.rs +++ b/core/src/layers/type_eraser.rs @@ -57,7 +57,6 @@ impl LayeredAccessor for TypeEraseAccessor { type BlockingReader = oio::BlockingReader; type Writer = oio::Writer; type BlockingWriter = oio::BlockingWriter; - type Appender = oio::Appender; type Pager = oio::Pager; type BlockingPager = oio::BlockingPager; @@ -91,13 +90,6 @@ impl LayeredAccessor for TypeEraseAccessor { .map(|(rp, w)| (rp, Box::new(w) as oio::BlockingWriter)) } - async fn append(&self, path: &str, args: OpAppend) -> Result<(RpAppend, Self::Appender)> { - self.inner - .append(path, args) - .await - .map(|(rp, a)| (rp, Box::new(a) as oio::Appender)) - } - async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Pager)> { self.inner .list(path, args) diff --git a/core/src/raw/accessor.rs b/core/src/raw/accessor.rs index ad8d3b095ae..9c890548295 100644 --- a/core/src/raw/accessor.rs +++ b/core/src/raw/accessor.rs @@ -70,8 +70,6 @@ pub trait Accessor: Send + Sync + Debug + Unpin + 'static { /// BlockingPager is the associated pager that could return in /// `blocking_list` operation. type BlockingPager: oio::BlockingPage; - /// Appender is the associated appender that could return in `append` operation. - type Appender: oio::Append; /// Invoke the `info` operation to get metadata of accessor. /// @@ -138,23 +136,6 @@ pub trait Accessor: Send + Sync + Debug + Unpin + 'static { )) } - /// Invoke the `append` operation on the specified path, returns a - /// appended size if operate successful. - /// - /// Require [`Capability::append`] - /// - /// # Behavior - /// - /// - Input path MUST be file path, DON'T NEED to check mode. - async fn append(&self, path: &str, args: OpAppend) -> Result<(RpAppend, Self::Appender)> { - let (_, _) = (path, args); - - Err(Error::new( - ErrorKind::Unsupported, - "operation is not supported", - )) - } - /// Invoke the `copy` operation on the specified `from` path and `to` path. /// /// Require [Capability::copy] @@ -389,7 +370,6 @@ impl Accessor for () { type BlockingReader = (); type Writer = (); type BlockingWriter = (); - type Appender = (); type Pager = (); type BlockingPager = (); @@ -412,7 +392,6 @@ impl Accessor for Arc { type BlockingReader = T::BlockingReader; type Writer = T::Writer; type BlockingWriter = T::BlockingWriter; - type Appender = T::Appender; type Pager = T::Pager; type BlockingPager = T::BlockingPager; @@ -431,10 +410,6 @@ impl Accessor for Arc { self.as_ref().write(path, args).await } - async fn append(&self, path: &str, args: OpAppend) -> Result<(RpAppend, Self::Appender)> { - self.as_ref().append(path, args).await - } - async fn copy(&self, from: &str, to: &str, args: OpCopy) -> Result { self.as_ref().copy(from, to, args).await } @@ -497,7 +472,6 @@ pub type FusedAccessor = Arc< BlockingReader = oio::BlockingReader, Writer = oio::Writer, BlockingWriter = oio::BlockingWriter, - Appender = oio::Appender, Pager = oio::Pager, BlockingPager = oio::BlockingPager, >, diff --git a/core/src/raw/adapters/kv/backend.rs b/core/src/raw/adapters/kv/backend.rs index 91417d59585..94b110a1808 100644 --- a/core/src/raw/adapters/kv/backend.rs +++ b/core/src/raw/adapters/kv/backend.rs @@ -62,7 +62,6 @@ impl Accessor for Backend { type BlockingReader = oio::Cursor; type Writer = KvWriter; type BlockingWriter = KvWriter; - type Appender = (); type Pager = KvPager; type BlockingPager = KvPager; diff --git a/core/src/raw/adapters/typed_kv/backend.rs b/core/src/raw/adapters/typed_kv/backend.rs index b4353027495..99062955374 100644 --- a/core/src/raw/adapters/typed_kv/backend.rs +++ b/core/src/raw/adapters/typed_kv/backend.rs @@ -58,7 +58,6 @@ impl Accessor for Backend { type BlockingReader = oio::Cursor; type Writer = KvWriter; type BlockingWriter = KvWriter; - type Appender = (); type Pager = KvPager; type BlockingPager = KvPager; diff --git a/core/src/raw/layer.rs b/core/src/raw/layer.rs index 511ca074d38..7d0c40a2088 100644 --- a/core/src/raw/layer.rs +++ b/core/src/raw/layer.rs @@ -61,7 +61,6 @@ use crate::*; /// type BlockingReader = A::BlockingReader; /// type Writer = A::Writer; /// type BlockingWriter = A::BlockingWriter; -/// type Appender = A::Appender; /// type Pager = A::Pager; /// type BlockingPager = A::BlockingPager; /// @@ -93,10 +92,6 @@ use crate::*; /// self.inner.blocking_write(path, args) /// } /// -/// async fn append(&self, path: &str, args: OpAppend) -> Result<(RpAppend, Self::Appender)> { -/// self.inner.append(path, args).await -/// } -/// /// async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Pager)> { /// self.inner.list(path, args).await /// } @@ -137,7 +132,6 @@ pub trait LayeredAccessor: Send + Sync + Debug + Unpin + 'static { type BlockingReader: oio::BlockingRead; type Writer: oio::Write; type BlockingWriter: oio::BlockingWrite; - type Appender: oio::Append; type Pager: oio::Page; type BlockingPager: oio::BlockingPage; @@ -155,8 +149,6 @@ pub trait LayeredAccessor: Send + Sync + Debug + Unpin + 'static { async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)>; - async fn append(&self, path: &str, args: OpAppend) -> Result<(RpAppend, Self::Appender)>; - async fn copy(&self, from: &str, to: &str, args: OpCopy) -> Result { self.inner().copy(from, to, args).await } @@ -216,7 +208,6 @@ impl Accessor for L { type BlockingReader = L::BlockingReader; type Writer = L::Writer; type BlockingWriter = L::BlockingWriter; - type Appender = L::Appender; type Pager = L::Pager; type BlockingPager = L::BlockingPager; @@ -236,10 +227,6 @@ impl Accessor for L { (self as &L).write(path, args).await } - async fn append(&self, path: &str, args: OpAppend) -> Result<(RpAppend, Self::Appender)> { - (self as &L).append(path, args).await - } - async fn copy(&self, from: &str, to: &str, args: OpCopy) -> Result { (self as &L).copy(from, to, args).await } @@ -334,7 +321,6 @@ mod tests { type BlockingReader = (); type Writer = (); type BlockingWriter = (); - type Appender = (); type Pager = (); type BlockingPager = (); diff --git a/core/src/raw/oio/append/api.rs b/core/src/raw/oio/append/api.rs deleted file mode 100644 index 6b5d576d6c2..00000000000 --- a/core/src/raw/oio/append/api.rs +++ /dev/null @@ -1,108 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use std::fmt::Display; -use std::fmt::Formatter; - -use async_trait::async_trait; -use bytes::Bytes; - -use crate::*; - -/// AppendOperation is the name for APIs of Append. -#[derive(Debug, Copy, Clone, Hash, Eq, PartialEq)] -#[non_exhaustive] -pub enum AppendOperation { - /// Operation for [`Append::append`] - Append, - /// Operation for [`Append::close`] - Close, -} - -impl AppendOperation { - /// Convert self into static str. - pub fn into_static(self) -> &'static str { - self.into() - } -} - -impl Display for AppendOperation { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - write!(f, "{}", self.into_static()) - } -} - -impl From for &'static str { - fn from(v: AppendOperation) -> &'static str { - use AppendOperation::*; - - match v { - Append => "Append::append", - Close => "Append::close", - } - } -} - -/// Appender is a type erased [`Append`] -pub type Appender = Box; - -/// Append is the trait that OpenDAL returns to callers. -/// -/// # Notes -/// -/// Users will call `append` multiple times. -#[async_trait] -pub trait Append: Unpin + Send + Sync { - /// Append data to the end of file. - /// - /// Users will call `append` multiple times. - /// Please make sure `append` is safe to re-enter. - async fn append(&mut self, bs: Bytes) -> Result<()>; - - /// Seal the file to mark it as unmodifiable. - async fn close(&mut self) -> Result<()>; -} - -#[async_trait] -impl Append for () { - async fn append(&mut self, bs: Bytes) -> Result<()> { - let _ = bs; - - unimplemented!("append is required to be implemented for oio::Append") - } - - async fn close(&mut self) -> Result<()> { - Err(Error::new( - ErrorKind::Unsupported, - "output appender doesn't support close", - )) - } -} - -/// `Box` won't implement `Append` automatically. -/// -/// To make Appender work as expected, we must add this impl. -#[async_trait] -impl Append for Box { - async fn append(&mut self, bs: Bytes) -> Result<()> { - (**self).append(bs).await - } - - async fn close(&mut self) -> Result<()> { - (**self).close().await - } -} diff --git a/core/src/raw/oio/append/mod.rs b/core/src/raw/oio/append/mod.rs deleted file mode 100644 index defb32e6b56..00000000000 --- a/core/src/raw/oio/append/mod.rs +++ /dev/null @@ -1,19 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -mod api; -pub use api::*; diff --git a/core/src/raw/oio/cursor.rs b/core/src/raw/oio/cursor.rs index 363719a3ffd..1940c40cda3 100644 --- a/core/src/raw/oio/cursor.rs +++ b/core/src/raw/oio/cursor.rs @@ -267,6 +267,8 @@ impl VectorCursor { /// peak all will read and copy all bytes from current cursor /// without change it's content. + /// + /// TODO: we need to find a way to avoid copy all content here. pub fn peak_all(&self) -> Bytes { // Avoid data copy if we only have one bytes. if self.inner.len() == 1 { diff --git a/core/src/raw/oio/mod.rs b/core/src/raw/oio/mod.rs index 2f76090dc0a..5a4729fe8c4 100644 --- a/core/src/raw/oio/mod.rs +++ b/core/src/raw/oio/mod.rs @@ -28,9 +28,6 @@ pub use read::*; mod write; pub use write::*; -mod append; -pub use append::*; - mod stream; pub use stream::*; diff --git a/core/src/raw/oio/page/into_flat_page.rs b/core/src/raw/oio/page/into_flat_page.rs index 5699b14d783..0efdea6bec9 100644 --- a/core/src/raw/oio/page/into_flat_page.rs +++ b/core/src/raw/oio/page/into_flat_page.rs @@ -248,7 +248,6 @@ mod tests { type BlockingReader = (); type Writer = (); type BlockingWriter = (); - type Appender = (); type Pager = (); type BlockingPager = MockPager; diff --git a/core/src/raw/oio/read/into_seekable_read_by_range.rs b/core/src/raw/oio/read/into_seekable_read_by_range.rs index 18fd8a33bbd..052b300ec48 100644 --- a/core/src/raw/oio/read/into_seekable_read_by_range.rs +++ b/core/src/raw/oio/read/into_seekable_read_by_range.rs @@ -415,7 +415,6 @@ mod tests { type BlockingReader = (); type Writer = (); type BlockingWriter = (); - type Appender = (); type Pager = (); type BlockingPager = (); diff --git a/core/src/raw/oio/write/append_object_write.rs b/core/src/raw/oio/write/append_object_write.rs new file mode 100644 index 00000000000..5d6eda9dae8 --- /dev/null +++ b/core/src/raw/oio/write/append_object_write.rs @@ -0,0 +1,176 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use async_trait::async_trait; +use bytes::Bytes; + +use crate::raw::oio::Streamer; +use crate::raw::*; +use crate::*; + +const DEFAULT_WRITE_MIN_SIZE: usize = 8 * 1024 * 1024; + +/// AppendObjectWrite is used to implement [`Write`] based on append +/// object. By implementing AppendObjectWrite, services don't need to +/// care about the details of buffering and uploading parts. +/// +/// The layout after adopting [`AppendObjectWrite`]: +/// +/// - Services impl `AppendObjectWrite` +/// - `AppendObjectWriter` impl `Write` +/// - Expose `AppendObjectWriter` as `Accessor::Writer` +#[async_trait] +pub trait AppendObjectWrite: Send + Sync + Unpin { + /// Get the current offset of the append object. + /// + /// Returns `0` if the object is not exist. + async fn offset(&self) -> Result; + + /// Append the data to the end of this object. + async fn append(&self, offset: u64, size: u64, body: AsyncBody) -> Result<()>; +} + +/// AppendObjectWriter will implements [`Write`] based on append object. +/// +/// ## TODO +/// +/// - Allow users to switch to un-buffered mode if users write 16MiB every time. +pub struct AppendObjectWriter { + inner: W, + + offset: Option, + buffer: oio::VectorCursor, + buffer_size: usize, +} + +impl AppendObjectWriter { + /// Create a new AppendObjectWriter. + pub fn new(inner: W) -> Self { + Self { + inner, + offset: None, + buffer: oio::VectorCursor::new(), + buffer_size: DEFAULT_WRITE_MIN_SIZE, + } + } + + /// Configure the write_min_size. + /// + /// write_min_size is used to control the size of internal buffer. + /// + /// AppendObjectWriter will flush the buffer to storage when + /// the size of buffer is larger than write_min_size. + /// + /// This value is default to 8 MiB. + pub fn with_write_min_size(mut self, v: usize) -> Self { + self.buffer_size = v; + self + } + + async fn offset(&mut self) -> Result { + if let Some(offset) = self.offset { + return Ok(offset); + } + + let offset = self.inner.offset().await?; + self.offset = Some(offset); + + Ok(offset) + } +} + +#[async_trait] +impl oio::Write for AppendObjectWriter +where + W: AppendObjectWrite, +{ + async fn write(&mut self, bs: Bytes) -> Result<()> { + let offset = self.offset().await?; + + // Ignore empty bytes + if bs.is_empty() { + return Ok(()); + } + + self.buffer.push(bs); + // Return directly if the buffer is not full + if self.buffer.len() <= self.buffer_size { + return Ok(()); + } + + let bs = self.buffer.peak_all(); + let size = bs.len(); + + match self + .inner + .append(offset, size as u64, AsyncBody::Bytes(bs)) + .await + { + Ok(_) => { + self.buffer.take(size); + self.offset = Some(offset + size as u64); + Ok(()) + } + Err(e) => { + // If the upload fails, we should pop the given bs to make sure + // write is re-enter safe. + self.buffer.pop(); + Err(e) + } + } + } + + async fn sink(&mut self, size: u64, s: Streamer) -> Result<()> { + if !self.buffer.is_empty() { + return Err(Error::new( + ErrorKind::InvalidInput, + "Writer::sink should not be used mixed with existing buffer", + )); + } + + let offset = self.offset().await?; + + self.inner + .append(offset, size, AsyncBody::Stream(s)) + .await?; + self.offset = Some(offset + size); + + Ok(()) + } + + async fn close(&mut self) -> Result<()> { + // Make sure internal buffer has been flushed. + if !self.buffer.is_empty() { + let bs = self.buffer.peak_exact(self.buffer.len()); + + let offset = self.offset().await?; + let size = bs.len() as u64; + self.inner + .append(offset, size, AsyncBody::Bytes(bs)) + .await?; + + self.buffer.clear(); + self.offset = Some(offset + size); + } + + Ok(()) + } + + async fn abort(&mut self) -> Result<()> { + Ok(()) + } +} diff --git a/core/src/raw/oio/write/mod.rs b/core/src/raw/oio/write/mod.rs index 490136e2930..b13b3e2c756 100644 --- a/core/src/raw/oio/write/mod.rs +++ b/core/src/raw/oio/write/mod.rs @@ -22,7 +22,14 @@ pub use api::Write; pub use api::WriteOperation; pub use api::Writer; +mod two_ways_write; +pub use two_ways_write::TwoWaysWriter; + mod multipart_upload_write; pub use multipart_upload_write::MultipartUploadPart; pub use multipart_upload_write::MultipartUploadWrite; pub use multipart_upload_write::MultipartUploadWriter; + +mod append_object_write; +pub use append_object_write::AppendObjectWrite; +pub use append_object_write::AppendObjectWriter; diff --git a/core/src/raw/oio/write/two_ways_write.rs b/core/src/raw/oio/write/two_ways_write.rs new file mode 100644 index 00000000000..470655783f1 --- /dev/null +++ b/core/src/raw/oio/write/two_ways_write.rs @@ -0,0 +1,64 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use async_trait::async_trait; +use bytes::Bytes; + +use crate::raw::oio::Streamer; +use crate::raw::*; +use crate::*; + +/// TwoWaysWrite is used to implement [`Write`] based on two ways. +/// +/// Users can wrap two different writers together. +pub enum TwoWaysWriter { + /// The left side for the [`TwoWaysWriter`]. + Left(L), + /// The right side for the [`TwoWaysWriter`]. + Right(R), +} + +#[async_trait] +impl oio::Write for TwoWaysWriter { + async fn write(&mut self, bs: Bytes) -> Result<()> { + match self { + Self::Left(l) => l.write(bs).await, + Self::Right(r) => r.write(bs).await, + } + } + + async fn sink(&mut self, size: u64, s: Streamer) -> Result<()> { + match self { + Self::Left(l) => l.sink(size, s).await, + Self::Right(r) => r.sink(size, s).await, + } + } + + async fn abort(&mut self) -> Result<()> { + match self { + Self::Left(l) => l.abort().await, + Self::Right(r) => r.abort().await, + } + } + + async fn close(&mut self) -> Result<()> { + match self { + Self::Left(l) => l.close().await, + Self::Right(r) => r.close().await, + } + } +} diff --git a/core/src/raw/operation.rs b/core/src/raw/operation.rs index d0f61b478bf..a7b81c625a1 100644 --- a/core/src/raw/operation.rs +++ b/core/src/raw/operation.rs @@ -31,8 +31,6 @@ pub enum Operation { Read, /// Operation for [`crate::raw::Accessor::write`] Write, - /// Operation for [`crate::raw::Accessor::append`] - Append, /// Operation for [`crate::raw::Accessor::copy`] Copy, /// Operation for [`crate::raw::Accessor::rename`] @@ -85,7 +83,6 @@ impl From for &'static str { Operation::CreateDir => "create_dir", Operation::Read => "read", Operation::Write => "write", - Operation::Append => "append", Operation::Copy => "copy", Operation::Rename => "rename", Operation::Stat => "stat", diff --git a/core/src/raw/ops.rs b/core/src/raw/ops.rs index 715892f0c47..2617768da8d 100644 --- a/core/src/raw/ops.rs +++ b/core/src/raw/ops.rs @@ -404,6 +404,8 @@ impl OpStat { /// Args for `write` operation. #[derive(Debug, Clone, Default)] pub struct OpWrite { + append: bool, + content_length: Option, content_type: Option, content_disposition: Option, @@ -418,6 +420,25 @@ impl OpWrite { Self::default() } + /// Get the append from op. + /// + /// The append is the flag to indicate that this write operation is an append operation. + pub fn append(&self) -> bool { + self.append + } + + /// Set the append mode of op. + /// + /// If the append mode is set, the data will be appended to the end of the file. + /// + /// # Notes + /// + /// Service could return `Unsupported` if the underlying storage does not support append. + pub fn with_append(mut self, append: bool) -> Self { + self.append = append; + self + } + /// Get the content length from op. /// /// The content length is the total length of the data to be written. @@ -468,54 +489,6 @@ impl OpWrite { } } -/// Args for `append` operation. -#[derive(Debug, Clone, Default)] -pub struct OpAppend { - content_type: Option, - content_disposition: Option, - cache_control: Option, -} - -impl OpAppend { - /// Create a new `OpAppend`. - pub fn new() -> Self { - Self::default() - } - - /// Get the content type from option - pub fn content_type(&self) -> Option<&str> { - self.content_type.as_deref() - } - - /// Set the content type of option - pub fn with_content_type(mut self, content_type: &str) -> Self { - self.content_type = Some(content_type.to_string()); - self - } - - /// Get the content disposition from option - pub fn content_disposition(&self) -> Option<&str> { - self.content_disposition.as_deref() - } - - /// Set the content disposition of option - pub fn with_content_disposition(mut self, content_disposition: &str) -> Self { - self.content_disposition = Some(content_disposition.to_string()); - self - } - - /// Get the cache control from option - pub fn cache_control(&self) -> Option<&str> { - self.cache_control.as_deref() - } - - /// Set the cache control of option - pub fn with_cache_control(mut self, cache_control: &str) -> Self { - self.cache_control = Some(cache_control.to_string()); - self - } -} - /// Args for `copy` operation. #[derive(Debug, Clone, Default)] pub struct OpCopy {} diff --git a/core/src/raw/rps.rs b/core/src/raw/rps.rs index 7cd246117c7..bc45d1457b6 100644 --- a/core/src/raw/rps.rs +++ b/core/src/raw/rps.rs @@ -195,17 +195,6 @@ impl RpWrite { } } -/// Reply for `append` operation. -#[derive(Debug, Clone, Default)] -pub struct RpAppend {} - -impl RpAppend { - /// Create a new reply for `append`. - pub fn new() -> Self { - Self {} - } -} - /// Reply for `copy` operation. #[derive(Debug, Clone, Default)] pub struct RpCopy {} diff --git a/core/src/services/azblob/appender.rs b/core/src/services/azblob/appender.rs deleted file mode 100644 index a32e3409a56..00000000000 --- a/core/src/services/azblob/appender.rs +++ /dev/null @@ -1,145 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use std::sync::Arc; - -use async_trait::async_trait; -use bytes::Bytes; -use http::StatusCode; - -use super::core::*; -use super::error::parse_error; -use crate::raw::*; -use crate::*; - -const X_MS_BLOB_TYPE: &str = "x-ms-blob-type"; -const X_MS_BLOB_APPEND_OFFSET: &str = "x-ms-blob-append-offset"; - -pub struct AzblobAppender { - core: Arc, - - op: OpAppend, - path: String, - - position: Option, -} - -impl AzblobAppender { - pub fn new(core: Arc, path: &str, op: OpAppend) -> Self { - Self { - core, - op, - path: path.to_string(), - position: None, - } - } -} - -#[async_trait] -impl oio::Append for AzblobAppender { - async fn append(&mut self, bs: Bytes) -> Result<()> { - // If the position is not set, we need to check the blob. - // Any successful append operation will set the position. - if self.position.is_none() { - let resp = self - .core - .azblob_get_blob_properties(&self.path, None, None) - .await?; - - let status = resp.status(); - - match status { - // Just check the blob type. - // If it is not an appendable blob, return an error. - // We can not get the append position of the blob here. - StatusCode::OK => { - let headers = resp.headers(); - let blob_type = headers.get(X_MS_BLOB_TYPE).and_then(|v| v.to_str().ok()); - if blob_type != Some("AppendBlob") { - return Err(Error::new( - ErrorKind::ConditionNotMatch, - "the blob is not an appendable blob.", - )); - } - } - - // If the blob is not existing, we need to create one. - StatusCode::NOT_FOUND => { - let mut req = self.core.azblob_init_appendable_blob_request( - &self.path, - self.op.content_type(), - self.op.cache_control(), - )?; - - self.core.sign(&mut req).await?; - - let resp = self.core.client.send(req).await?; - - let status = resp.status(); - match status { - StatusCode::CREATED => { - // do nothing - } - _ => { - return Err(parse_error(resp).await?); - } - } - - self.position = Some(0); - } - - _ => { - return Err(parse_error(resp).await?); - } - } - } - - let size = bs.len(); - - let mut req = self.core.azblob_append_blob_request( - &self.path, - size, - self.position, - AsyncBody::Bytes(bs), - )?; - - self.core.sign(&mut req).await?; - - let resp = self.core.send(req).await?; - - let status = resp.status(); - match status { - StatusCode::CREATED => { - let headers = resp.headers(); - let position = headers - .get(X_MS_BLOB_APPEND_OFFSET) - .and_then(|v| v.to_str().ok()) - .and_then(|v| v.parse::().ok()); - self.position = position.map(|v| v + size as u64); - } - _ => { - return Err(parse_error(resp).await?); - } - } - - Ok(()) - } - - async fn close(&mut self) -> Result<()> { - Ok(()) - } -} diff --git a/core/src/services/azblob/backend.rs b/core/src/services/azblob/backend.rs index 5ec983cb760..e773d419955 100644 --- a/core/src/services/azblob/backend.rs +++ b/core/src/services/azblob/backend.rs @@ -32,7 +32,6 @@ use reqsign::AzureStorageSigner; use sha2::Digest; use sha2::Sha256; -use super::appender::AzblobAppender; use super::batch::parse_batch_delete_response; use super::error::parse_error; use super::pager::AzblobPager; @@ -509,7 +508,6 @@ impl Accessor for AzblobBackend { type BlockingReader = (); type Writer = AzblobWriter; type BlockingWriter = (); - type Appender = AzblobAppender; type Pager = AzblobPager; type BlockingPager = (); @@ -531,14 +529,11 @@ impl Accessor for AzblobBackend { read_with_override_content_disposition: true, write: true, + write_can_append: true, write_can_sink: true, write_with_cache_control: true, write_with_content_type: true, - append: true, - append_with_cache_control: true, - append_with_content_type: true, - delete: true, create_dir: true, copy: true, @@ -607,26 +602,12 @@ impl Accessor for AzblobBackend { } async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> { - if args.content_length().is_none() { - return Err(Error::new( - ErrorKind::Unsupported, - "write without content length is not supported", - )); - } - Ok(( RpWrite::default(), AzblobWriter::new(self.core.clone(), args, path.to_string()), )) } - async fn append(&self, path: &str, args: OpAppend) -> Result<(RpAppend, Self::Appender)> { - Ok(( - RpAppend::default(), - AzblobAppender::new(self.core.clone(), path, args), - )) - } - async fn copy(&self, from: &str, to: &str, _args: OpCopy) -> Result { let resp = self.core.azblob_copy_blob(from, to).await?; diff --git a/core/src/services/azblob/core.rs b/core/src/services/azblob/core.rs index e1591082814..6382817bb63 100644 --- a/core/src/services/azblob/core.rs +++ b/core/src/services/azblob/core.rs @@ -365,7 +365,7 @@ impl AzblobCore { pub fn azblob_append_blob_request( &self, path: &str, - size: usize, + size: u64, position: Option, body: AsyncBody, ) -> Result> { diff --git a/core/src/services/azblob/mod.rs b/core/src/services/azblob/mod.rs index 868d088a595..7edcfde3d42 100644 --- a/core/src/services/azblob/mod.rs +++ b/core/src/services/azblob/mod.rs @@ -18,7 +18,6 @@ mod backend; pub use backend::AzblobBuilder as Azblob; -mod appender; mod batch; mod core; mod error; diff --git a/core/src/services/azblob/writer.rs b/core/src/services/azblob/writer.rs index b8d4dc6444e..31a56f27a92 100644 --- a/core/src/services/azblob/writer.rs +++ b/core/src/services/azblob/writer.rs @@ -26,16 +26,26 @@ use super::error::parse_error; use crate::raw::*; use crate::*; +const X_MS_BLOB_TYPE: &str = "x-ms-blob-type"; +const X_MS_BLOB_APPEND_OFFSET: &str = "x-ms-blob-append-offset"; + pub struct AzblobWriter { core: Arc, op: OpWrite, path: String, + + position: Option, } impl AzblobWriter { pub fn new(core: Arc, op: OpWrite, path: String) -> Self { - AzblobWriter { core, op, path } + AzblobWriter { + core, + op, + path, + position: None, + } } async fn write_oneshot(&self, size: u64, body: AsyncBody) -> Result<()> { @@ -61,17 +71,126 @@ impl AzblobWriter { _ => Err(parse_error(resp).await?), } } + + async fn current_position(&mut self) -> Result> { + if let Some(v) = self.position { + return Ok(Some(v)); + } + + // TODO: we should check with current etag to make sure file not changed. + let resp = self + .core + .azblob_get_blob_properties(&self.path, None, None) + .await?; + + let status = resp.status(); + + match status { + // Just check the blob type. + // If it is not an appendable blob, return an error. + // We can not get the append position of the blob here. + StatusCode::OK => { + let headers = resp.headers(); + let blob_type = headers.get(X_MS_BLOB_TYPE).and_then(|v| v.to_str().ok()); + if blob_type != Some("AppendBlob") { + return Err(Error::new( + ErrorKind::ConditionNotMatch, + "the blob is not an appendable blob.", + )); + } + Ok(None) + } + // If the blob is not existing, we need to create one. + StatusCode::NOT_FOUND => { + let mut req = self.core.azblob_init_appendable_blob_request( + &self.path, + self.op.content_type(), + self.op.cache_control(), + )?; + + self.core.sign(&mut req).await?; + + let resp = self.core.client.send(req).await?; + + let status = resp.status(); + match status { + StatusCode::CREATED => { + // do nothing + } + _ => { + return Err(parse_error(resp).await?); + } + } + + self.position = Some(0); + Ok(Some(0)) + } + _ => Err(parse_error(resp).await?), + } + } + + async fn append_oneshot(&mut self, size: u64, body: AsyncBody) -> Result<()> { + let _ = self.current_position().await?; + + let mut req = + self.core + .azblob_append_blob_request(&self.path, size, self.position, body)?; + + self.core.sign(&mut req).await?; + + let resp = self.core.send(req).await?; + + let status = resp.status(); + match status { + StatusCode::CREATED => { + let headers = resp.headers(); + let position = headers + .get(X_MS_BLOB_APPEND_OFFSET) + .and_then(|v| v.to_str().ok()) + .and_then(|v| v.parse::().ok()); + self.position = position.map(|v| v + size); + } + _ => { + return Err(parse_error(resp).await?); + } + } + + Ok(()) + } } #[async_trait] impl oio::Write for AzblobWriter { async fn write(&mut self, bs: Bytes) -> Result<()> { - self.write_oneshot(bs.len() as u64, AsyncBody::Bytes(bs)) - .await + if self.op.append() { + self.append_oneshot(bs.len() as u64, AsyncBody::Bytes(bs)) + .await + } else { + if self.op.content_length().is_none() { + return Err(Error::new( + ErrorKind::Unsupported, + "write without content length is not supported", + )); + } + + self.write_oneshot(bs.len() as u64, AsyncBody::Bytes(bs)) + .await + } } async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> { - self.write_oneshot(size, AsyncBody::Stream(s)).await + if self.op.append() { + self.append_oneshot(size, AsyncBody::Stream(s)).await + } else { + if self.op.content_length().is_none() { + return Err(Error::new( + ErrorKind::Unsupported, + "write without content length is not supported", + )); + } + + self.write_oneshot(size, AsyncBody::Stream(s)).await + } } async fn abort(&mut self) -> Result<()> { diff --git a/core/src/services/azdfs/backend.rs b/core/src/services/azdfs/backend.rs index 5320a308951..0d567266b6e 100644 --- a/core/src/services/azdfs/backend.rs +++ b/core/src/services/azdfs/backend.rs @@ -232,7 +232,6 @@ impl Accessor for AzdfsBackend { type BlockingReader = (); type Writer = AzdfsWriter; type BlockingWriter = (); - type Appender = (); type Pager = AzdfsPager; type BlockingPager = (); diff --git a/core/src/services/cos/appender.rs b/core/src/services/cos/appender.rs deleted file mode 100644 index c49753f6204..00000000000 --- a/core/src/services/cos/appender.rs +++ /dev/null @@ -1,162 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use std::sync::Arc; - -use async_trait::async_trait; -use bytes::Bytes; -use http::header::CONTENT_LENGTH; -use http::StatusCode; - -use super::core::*; -use super::error::parse_error; -use crate::raw::*; -use crate::*; - -pub const X_COS_OBJECT_TYPE: &str = "x-cos-object-type"; -pub const X_COS_NEXT_APPEND_POSITION: &str = "x-cos-next-append-position"; - -pub struct CosAppender { - core: Arc, - - op: OpAppend, - path: String, - - position: Option, -} - -impl CosAppender { - pub fn new(core: Arc, path: &str, op: OpAppend) -> Self { - Self { - core, - op, - path: path.to_string(), - position: None, - } - } -} - -#[async_trait] -impl oio::Append for CosAppender { - async fn append(&mut self, bs: Bytes) -> Result<()> { - // If the position is not set, we need to get the current position. - if self.position.is_none() { - let resp = self.core.cos_head_object(&self.path, None, None).await?; - - let status = resp.status(); - match status { - StatusCode::OK => { - let object_type = resp - .headers() - .get(X_COS_OBJECT_TYPE) - .and_then(|v| v.to_str().ok()) - .ok_or_else(|| { - Error::new( - ErrorKind::Unexpected, - "missing x-cos-object-type, the object may not be appendable", - ) - })?; - - if object_type != "appendable" { - return Err(Error::new( - ErrorKind::Unexpected, - "object_type mismatch. the object may not be appendable", - )); - } - - let position = resp - .headers() - .get(CONTENT_LENGTH) - .and_then(|v| v.to_str().ok()) - .and_then(|v| v.parse::().ok()) - .ok_or_else(|| { - Error::new( - ErrorKind::Unexpected, - "missing content-length, the object may not be appendable", - ) - })?; - self.position = Some(position); - } - StatusCode::NOT_FOUND => { - self.position = Some(0); - } - _ => { - return Err(parse_error(resp).await?); - } - } - } - - let mut req = self.core.cos_append_object_request( - &self.path, - self.position.expect("position is not set"), - bs.len(), - &self.op, - AsyncBody::Bytes(bs), - )?; - - self.core.sign(&mut req).await?; - - let resp = self.core.send(req).await?; - - let status = resp.status(); - - match status { - StatusCode::OK => { - let position = resp - .headers() - .get(X_COS_NEXT_APPEND_POSITION) - .and_then(|v| v.to_str().ok()) - .and_then(|v| v.parse::().ok()) - .ok_or_else(|| { - Error::new( - ErrorKind::Unexpected, - "append ok but missing x-cos-next-append-position, the object may not be appendable", - ) - })?; - self.position = Some(position); - Ok(()) - } - StatusCode::CONFLICT => { - // The object is not appendable or the position is not match with the object's length. - // If the position is not match, we could get the current position and retry. - let position = resp - .headers() - .get(X_COS_NEXT_APPEND_POSITION) - .and_then(|v| v.to_str().ok()) - .and_then(|v| v.parse::().ok()) - .ok_or_else(|| { - Error::new( - ErrorKind::Unexpected, - "append conflict. missing x-cos-next-append-position, the object may not be appendable", - ) - })?; - self.position = Some(position); - - // Then return the error to the caller, so the caller could retry. - Err(Error::new( - ErrorKind::ConditionNotMatch, - "the position is not match with the object's length. position has been updated.", - )) - } - _ => Err(parse_error(resp).await?), - } - } - - async fn close(&mut self) -> Result<()> { - Ok(()) - } -} diff --git a/core/src/services/cos/backend.rs b/core/src/services/cos/backend.rs index 1ec1caccb63..9dc4b1b6607 100644 --- a/core/src/services/cos/backend.rs +++ b/core/src/services/cos/backend.rs @@ -32,7 +32,6 @@ use super::error::parse_error; use super::pager::CosPager; use super::writer::CosWriter; use crate::raw::*; -use crate::services::cos::appender::CosAppender; use crate::*; const DEFAULT_WRITE_MIN_SIZE: usize = 1024 * 1024; @@ -268,9 +267,11 @@ pub struct CosBackend { impl Accessor for CosBackend { type Reader = IncomingAsyncBody; type BlockingReader = (); - type Writer = oio::MultipartUploadWriter; + type Writer = oio::TwoWaysWriter< + oio::MultipartUploadWriter, + oio::AppendObjectWriter, + >; type BlockingWriter = (); - type Appender = CosAppender; type Pager = CosPager; type BlockingPager = (); @@ -291,17 +292,13 @@ impl Accessor for CosBackend { read_with_if_none_match: true, write: true, + write_can_append: true, write_can_sink: true, write_with_content_type: true, write_with_cache_control: true, write_with_content_disposition: true, write_without_content_length: true, - append: true, - append_with_cache_control: true, - append_with_content_disposition: true, - append_with_content_type: true, - delete: true, create_dir: true, copy: true, @@ -359,17 +356,21 @@ impl Accessor for CosBackend { } async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> { - Ok(( - RpWrite::default(), - CosWriter::new(self.core.clone(), path, args), - )) - } + let writer = CosWriter::new(self.core.clone(), path, args.clone()); - async fn append(&self, path: &str, args: OpAppend) -> Result<(RpAppend, Self::Appender)> { - Ok(( - RpAppend::default(), - CosAppender::new(self.core.clone(), path, args), - )) + let tw = if args.append() { + let w = + oio::AppendObjectWriter::new(writer).with_write_min_size(self.core.write_min_size); + + oio::TwoWaysWriter::Right(w) + } else { + let w = oio::MultipartUploadWriter::new(writer, args.content_length()) + .with_write_min_size(self.core.write_min_size); + + oio::TwoWaysWriter::Left(w) + }; + + return Ok((RpWrite::default(), tw)); } async fn copy(&self, from: &str, to: &str, _args: OpCopy) -> Result { diff --git a/core/src/services/cos/core.rs b/core/src/services/cos/core.rs index 6a81c7d7ef7..e288a447c71 100644 --- a/core/src/services/cos/core.rs +++ b/core/src/services/cos/core.rs @@ -241,8 +241,8 @@ impl CosCore { &self, path: &str, position: u64, - size: usize, - args: &OpAppend, + size: u64, + args: &OpWrite, body: AsyncBody, ) -> Result> { let p = build_abs_path(&self.root, path); diff --git a/core/src/services/cos/mod.rs b/core/src/services/cos/mod.rs index ee38708ae46..b51de4b7e2b 100644 --- a/core/src/services/cos/mod.rs +++ b/core/src/services/cos/mod.rs @@ -18,7 +18,6 @@ mod backend; pub use backend::CosBuilder as Cos; -mod appender; mod core; mod error; mod pager; diff --git a/core/src/services/cos/writer.rs b/core/src/services/cos/writer.rs index 702680ea367..e64bc23a1ed 100644 --- a/core/src/services/cos/writer.rs +++ b/core/src/services/cos/writer.rs @@ -34,15 +34,12 @@ pub struct CosWriter { } impl CosWriter { - pub fn new(core: Arc, path: &str, op: OpWrite) -> oio::MultipartUploadWriter { - let write_min_size = core.write_min_size; - let total_size = op.content_length(); - let cos_writer = CosWriter { + pub fn new(core: Arc, path: &str, op: OpWrite) -> Self { + CosWriter { core, path: path.to_string(), op, - }; - oio::MultipartUploadWriter::new(cos_writer, total_size).with_write_min_size(write_min_size) + } } } @@ -181,3 +178,42 @@ impl oio::MultipartUploadWrite for CosWriter { } } } + +#[async_trait] +impl oio::AppendObjectWrite for CosWriter { + async fn offset(&self) -> Result { + let resp = self.core.cos_head_object(&self.path, None, None).await?; + + let status = resp.status(); + match status { + StatusCode::OK => { + let content_length = parse_content_length(resp.headers())?.ok_or_else(|| { + Error::new( + ErrorKind::Unexpected, + "Content-Length not present in returning response", + ) + })?; + Ok(content_length) + } + StatusCode::NOT_FOUND => Ok(0), + _ => Err(parse_error(resp).await?), + } + } + + async fn append(&self, offset: u64, size: u64, body: AsyncBody) -> Result<()> { + let mut req = self + .core + .cos_append_object_request(&self.path, offset, size, &self.op, body)?; + + self.core.sign(&mut req).await?; + + let resp = self.core.send(req).await?; + + let status = resp.status(); + + match status { + StatusCode::OK => Ok(()), + _ => Err(parse_error(resp).await?), + } + } +} diff --git a/core/src/services/dropbox/backend.rs b/core/src/services/dropbox/backend.rs index 29b4f12acda..e24f0b36fd1 100644 --- a/core/src/services/dropbox/backend.rs +++ b/core/src/services/dropbox/backend.rs @@ -53,7 +53,6 @@ impl Accessor for DropboxBackend { type BlockingWriter = (); type Pager = (); type BlockingPager = (); - type Appender = (); fn info(&self) -> AccessorInfo { let mut ma = AccessorInfo::default(); diff --git a/core/src/services/fs/appender.rs b/core/src/services/fs/appender.rs deleted file mode 100644 index 86a3e82fb36..00000000000 --- a/core/src/services/fs/appender.rs +++ /dev/null @@ -1,49 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use async_trait::async_trait; -use bytes::Bytes; -use tokio::io::AsyncWriteExt; - -use super::error::parse_io_error; -use crate::raw::*; -use crate::*; - -pub struct FsAppender { - f: F, -} - -impl FsAppender { - pub fn new(f: F) -> Self { - Self { f } - } -} - -#[async_trait] -impl oio::Append for FsAppender { - async fn append(&mut self, bs: Bytes) -> Result<()> { - self.f.write_all(&bs).await.map_err(parse_io_error)?; - - Ok(()) - } - - async fn close(&mut self) -> Result<()> { - self.f.sync_all().await.map_err(parse_io_error)?; - - Ok(()) - } -} diff --git a/core/src/services/fs/backend.rs b/core/src/services/fs/backend.rs index 632e9fff400..b8fac181342 100644 --- a/core/src/services/fs/backend.rs +++ b/core/src/services/fs/backend.rs @@ -27,7 +27,6 @@ use chrono::DateTime; use log::debug; use uuid::Uuid; -use super::appender::FsAppender; use super::error::parse_io_error; use super::pager::FsPager; use super::writer::FsWriter; @@ -248,7 +247,6 @@ impl Accessor for FsBackend { type BlockingReader = oio::FromFileReader; type Writer = FsWriter; type BlockingWriter = FsWriter; - type Appender = FsAppender; type Pager = Option>; type BlockingPager = Option>; @@ -264,13 +262,12 @@ impl Accessor for FsBackend { read_with_range: true, write: true, + write_can_append: true, write_can_sink: true, write_without_content_length: true, create_dir: true, delete: true, - append: true, - list: true, list_with_delimiter_slash: true, @@ -366,7 +363,7 @@ impl Accessor for FsBackend { Ok((RpRead::new(end - start), r)) } - async fn write(&self, path: &str, _: OpWrite) -> Result<(RpWrite, Self::Writer)> { + async fn write(&self, path: &str, op: OpWrite) -> Result<(RpWrite, Self::Writer)> { let (target_path, tmp_path) = if let Some(atomic_write_dir) = &self.atomic_write_dir { let target_path = Self::ensure_write_abs_path(&self.root, path).await?; let tmp_path = @@ -378,29 +375,21 @@ impl Accessor for FsBackend { (p, None) }; - let f = tokio::fs::OpenOptions::new() - .create(true) - .truncate(true) - .write(true) - .open(tmp_path.as_ref().unwrap_or(&target_path)) - .await - .map_err(parse_io_error)?; - - Ok((RpWrite::new(), FsWriter::new(target_path, tmp_path, f))) - } + let mut open_options = tokio::fs::OpenOptions::new(); - async fn append(&self, path: &str, _: OpAppend) -> Result<(RpAppend, Self::Appender)> { - let path = Self::ensure_write_abs_path(&self.root, path).await?; + open_options.create(true).write(true); + if op.append() { + open_options.append(true); + } else { + open_options.truncate(true); + } - let f = tokio::fs::OpenOptions::new() - .create(true) - .write(true) - .append(true) - .open(&path) + let f = open_options + .open(tmp_path.as_ref().unwrap_or(&target_path)) .await .map_err(parse_io_error)?; - Ok((RpAppend::new(), FsAppender::new(f))) + Ok((RpWrite::new(), FsWriter::new(target_path, tmp_path, f))) } async fn copy(&self, from: &str, to: &str, _args: OpCopy) -> Result { diff --git a/core/src/services/fs/mod.rs b/core/src/services/fs/mod.rs index 81e92090162..aa2a5fca1f6 100644 --- a/core/src/services/fs/mod.rs +++ b/core/src/services/fs/mod.rs @@ -18,7 +18,6 @@ mod backend; pub use backend::FsBuilder as Fs; -mod appender; mod error; mod pager; mod writer; diff --git a/core/src/services/ftp/backend.rs b/core/src/services/ftp/backend.rs index 2e643d35ffc..0c9d2755794 100644 --- a/core/src/services/ftp/backend.rs +++ b/core/src/services/ftp/backend.rs @@ -266,7 +266,6 @@ impl Accessor for FtpBackend { type BlockingReader = (); type Writer = FtpWriter; type BlockingWriter = (); - type Appender = (); type Pager = FtpPager; type BlockingPager = (); @@ -281,6 +280,7 @@ impl Accessor for FtpBackend { read_with_range: true, write: true, + delete: true, create_dir: true, diff --git a/core/src/services/gcs/backend.rs b/core/src/services/gcs/backend.rs index db0318a26fd..62470ab2475 100644 --- a/core/src/services/gcs/backend.rs +++ b/core/src/services/gcs/backend.rs @@ -395,7 +395,6 @@ impl Accessor for GcsBackend { type BlockingReader = (); type Writer = GcsWriter; type BlockingWriter = (); - type Appender = (); type Pager = GcsPager; type BlockingPager = (); diff --git a/core/src/services/gdrive/backend.rs b/core/src/services/gdrive/backend.rs index 2aa1c4246b6..72d5740e880 100644 --- a/core/src/services/gdrive/backend.rs +++ b/core/src/services/gdrive/backend.rs @@ -54,7 +54,6 @@ impl Accessor for GdriveBackend { type BlockingReader = (); type Writer = GdriveWriter; type BlockingWriter = (); - type Appender = (); type Pager = (); type BlockingPager = (); diff --git a/core/src/services/ghac/backend.rs b/core/src/services/ghac/backend.rs index db24d063ad2..396b1400049 100644 --- a/core/src/services/ghac/backend.rs +++ b/core/src/services/ghac/backend.rs @@ -295,7 +295,6 @@ impl Accessor for GhacBackend { type BlockingReader = (); type Writer = GhacWriter; type BlockingWriter = (); - type Appender = (); type Pager = (); type BlockingPager = (); diff --git a/core/src/services/hdfs/appender.rs b/core/src/services/hdfs/appender.rs deleted file mode 100644 index 2ef4843fc6a..00000000000 --- a/core/src/services/hdfs/appender.rs +++ /dev/null @@ -1,48 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use async_trait::async_trait; -use bytes::Bytes; -use futures::AsyncWriteExt; - -use super::error::parse_io_error; -use crate::raw::*; -use crate::*; - -pub struct HdfsAppender { - f: F, -} - -impl HdfsAppender { - pub fn new(f: F) -> Self { - Self { f } - } -} - -#[async_trait] -impl oio::Append for HdfsAppender { - async fn append(&mut self, bs: Bytes) -> Result<()> { - self.f.write_all(&bs).await.map_err(parse_io_error)?; - Ok(()) - } - - async fn close(&mut self) -> Result<()> { - self.f.flush().await.map_err(parse_io_error)?; - self.f.close().await.map_err(parse_io_error)?; - Ok(()) - } -} diff --git a/core/src/services/hdfs/backend.rs b/core/src/services/hdfs/backend.rs index 1e28b68d8bf..0888589c79f 100644 --- a/core/src/services/hdfs/backend.rs +++ b/core/src/services/hdfs/backend.rs @@ -26,7 +26,6 @@ use std::sync::Arc; use async_trait::async_trait; use log::debug; -use super::appender::HdfsAppender; use super::error::parse_io_error; use super::pager::HdfsPager; use super::writer::HdfsWriter; @@ -165,7 +164,6 @@ impl Accessor for HdfsBackend { type BlockingReader = oio::FromFileReader; type Writer = HdfsWriter; type BlockingWriter = HdfsWriter; - type Appender = HdfsAppender; type Pager = Option; type BlockingPager = Option; @@ -181,6 +179,9 @@ impl Accessor for HdfsBackend { read_with_range: true, write: true, + // TODO: wait for https://github.com/apache/incubator-opendal/pull/2715 + write_can_append: false, + create_dir: true, delete: true, @@ -238,36 +239,7 @@ impl Accessor for HdfsBackend { Ok((RpRead::new(end - start), r)) } - async fn append(&self, path: &str, _: OpAppend) -> Result<(RpAppend, Self::Appender)> { - let p = build_rooted_abs_path(&self.root, path); - - let parent = PathBuf::from(&p) - .parent() - .ok_or_else(|| { - Error::new( - ErrorKind::Unexpected, - "path should have parent but not, it must be malformed", - ) - .with_context("input", &p) - })? - .to_path_buf(); - self.client - .create_dir(&parent.to_string_lossy()) - .map_err(parse_io_error)?; - - let f = self - .client - .open_file() - .create(true) - .append(true) - .async_open(&p) - .await - .map_err(parse_io_error)?; - - Ok((RpAppend::new(), HdfsAppender::new(f))) - } - - async fn write(&self, path: &str, _: OpWrite) -> Result<(RpWrite, Self::Writer)> { + async fn write(&self, path: &str, op: OpWrite) -> Result<(RpWrite, Self::Writer)> { let p = build_rooted_abs_path(&self.root, path); let parent = PathBuf::from(&p) @@ -285,14 +257,15 @@ impl Accessor for HdfsBackend { .create_dir(&parent.to_string_lossy()) .map_err(parse_io_error)?; - let f = self - .client - .open_file() - .create(true) - .write(true) - .async_open(&p) - .await - .map_err(parse_io_error)?; + let mut open_options = self.client.open_file(); + open_options.create(true); + if op.append() { + open_options.append(true); + } else { + open_options.write(true); + } + + let f = open_options.async_open(&p).await.map_err(parse_io_error)?; Ok((RpWrite::new(), HdfsWriter::new(f))) } diff --git a/core/src/services/hdfs/mod.rs b/core/src/services/hdfs/mod.rs index 3bc6cfc8a3c..c1e98a3b628 100644 --- a/core/src/services/hdfs/mod.rs +++ b/core/src/services/hdfs/mod.rs @@ -18,7 +18,6 @@ mod backend; pub use backend::HdfsBuilder as Hdfs; -mod appender; mod error; mod pager; mod writer; diff --git a/core/src/services/http/backend.rs b/core/src/services/http/backend.rs index 9135e723de1..25f06f993f4 100644 --- a/core/src/services/http/backend.rs +++ b/core/src/services/http/backend.rs @@ -255,7 +255,6 @@ impl Accessor for HttpBackend { type BlockingReader = (); type Writer = (); type BlockingWriter = (); - type Appender = (); type Pager = (); type BlockingPager = (); diff --git a/core/src/services/ipfs/backend.rs b/core/src/services/ipfs/backend.rs index 2a1e0b09a1d..7fd9b4eabd9 100644 --- a/core/src/services/ipfs/backend.rs +++ b/core/src/services/ipfs/backend.rs @@ -165,7 +165,6 @@ impl Accessor for IpfsBackend { type BlockingReader = (); type Writer = (); type BlockingWriter = (); - type Appender = (); type Pager = DirStream; type BlockingPager = (); diff --git a/core/src/services/ipmfs/backend.rs b/core/src/services/ipmfs/backend.rs index 5d92d20f7d7..95163f2c218 100644 --- a/core/src/services/ipmfs/backend.rs +++ b/core/src/services/ipmfs/backend.rs @@ -67,7 +67,6 @@ impl Accessor for IpmfsBackend { type BlockingReader = (); type Writer = IpmfsWriter; type BlockingWriter = (); - type Appender = (); type Pager = IpmfsPager; type BlockingPager = (); diff --git a/core/src/services/obs/appender.rs b/core/src/services/obs/appender.rs deleted file mode 100644 index 1031910d576..00000000000 --- a/core/src/services/obs/appender.rs +++ /dev/null @@ -1,141 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use std::sync::Arc; - -use async_trait::async_trait; -use bytes::Bytes; -use http::StatusCode; - -use super::core::*; -use super::error::parse_error; -use crate::raw::*; -use crate::*; - -pub const X_OBS_NEXT_APPEND_POSITION: &str = "x-obs-next-append-position"; - -pub struct ObsAppender { - core: Arc, - - op: OpAppend, - path: String, - - position: Option, -} - -impl ObsAppender { - pub fn new(core: Arc, path: &str, op: OpAppend) -> Self { - Self { - core, - op, - path: path.to_string(), - position: None, - } - } -} - -#[async_trait] -impl oio::Append for ObsAppender { - async fn append(&mut self, bs: Bytes) -> Result<()> { - // If the position is not set, we need to get the current position. - if self.position.is_none() { - let resp = self.core.obs_head_object(&self.path, None, None).await?; - - let status = resp.status(); - match status { - StatusCode::OK => { - let position = resp - .headers() - .get( X_OBS_NEXT_APPEND_POSITION) - .and_then(|v| v.to_str().ok()) - .and_then(|v|v.parse::().ok()) - .ok_or_else(|| Error::new(ErrorKind::Unexpected, "missing x-obs-next-append-position, the object may not be appendable"))?; - self.position = Some(position); - } - - StatusCode::NOT_FOUND => { - self.position = Some(0); - } - - _ => { - return Err(parse_error(resp).await?); - } - } - } - - let mut req = self.core.obs_append_object_request( - &self.path, - self.position.expect("position is not set"), - bs.len(), - &self.op, - AsyncBody::Bytes(bs), - )?; - - self.core.sign(&mut req).await?; - - let resp = self.core.send(req).await?; - - let status = resp.status(); - - match status { - StatusCode::OK => { - let position = resp - .headers() - .get(X_OBS_NEXT_APPEND_POSITION) - .and_then(|v| v.to_str().ok()) - .and_then(|v| v.parse::().ok()) - .ok_or_else(|| { - Error::new( - ErrorKind::Unexpected, - "missing x-obs-next-append-position, the object may not be appendable", - ) - })?; - self.position = Some(position); - Ok(()) - } - - StatusCode::CONFLICT => { - // The object is not appendable or the position is not match with the object's length. - // If the position is not match, we could get the current position and retry. - let position = resp - .headers() - .get(X_OBS_NEXT_APPEND_POSITION) - .and_then(|v| v.to_str().ok()) - .and_then(|v| v.parse::().ok()) - .ok_or_else(|| { - Error::new( - ErrorKind::Unexpected, - "missing x-obs-next-append-position, the object may not be appendable", - ) - })?; - self.position = Some(position); - - // Then return the error to the caller, so the caller could retry. - Err(Error::new( - ErrorKind::ConditionNotMatch, - "the position is not match with the object's length. position has been updated.", - )) - } - - _ => Err(parse_error(resp).await?), - } - } - - async fn close(&mut self) -> Result<()> { - Ok(()) - } -} diff --git a/core/src/services/obs/backend.rs b/core/src/services/obs/backend.rs index 56957a899ba..e90308eda8e 100644 --- a/core/src/services/obs/backend.rs +++ b/core/src/services/obs/backend.rs @@ -27,7 +27,6 @@ use reqsign::HuaweicloudObsConfig; use reqsign::HuaweicloudObsCredentialLoader; use reqsign::HuaweicloudObsSigner; -use super::appender::ObsAppender; use super::core::ObsCore; use super::error::parse_error; use super::pager::ObsPager; @@ -331,9 +330,11 @@ pub struct ObsBackend { impl Accessor for ObsBackend { type Reader = IncomingAsyncBody; type BlockingReader = (); - type Writer = ObsWriter; + type Writer = oio::TwoWaysWriter< + oio::MultipartUploadWriter, + oio::AppendObjectWriter, + >; type BlockingWriter = (); - type Appender = ObsAppender; type Pager = ObsPager; type BlockingPager = (); @@ -354,16 +355,12 @@ impl Accessor for ObsBackend { read_with_if_none_match: true, write: true, + write_can_append: true, write_can_sink: true, write_with_content_type: true, write_with_cache_control: true, write_without_content_length: true, - append: true, - append_with_cache_control: true, - append_with_content_type: true, - append_with_content_disposition: true, - delete: true, create_dir: true, copy: true, @@ -453,17 +450,21 @@ impl Accessor for ObsBackend { } async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> { - Ok(( - RpWrite::default(), - ObsWriter::new(self.core.clone(), path, args), - )) - } + let writer = ObsWriter::new(self.core.clone(), path, args.clone()); - async fn append(&self, path: &str, args: OpAppend) -> Result<(RpAppend, Self::Appender)> { - Ok(( - RpAppend::default(), - ObsAppender::new(self.core.clone(), path, args), - )) + let tw = if args.append() { + let w = + oio::AppendObjectWriter::new(writer).with_write_min_size(self.core.write_min_size); + + oio::TwoWaysWriter::Right(w) + } else { + let w = oio::MultipartUploadWriter::new(writer, args.content_length()) + .with_write_min_size(self.core.write_min_size); + + oio::TwoWaysWriter::Left(w) + }; + + return Ok((RpWrite::default(), tw)); } async fn copy(&self, from: &str, to: &str, _args: OpCopy) -> Result { diff --git a/core/src/services/obs/core.rs b/core/src/services/obs/core.rs index ca214b831fd..52604fd5056 100644 --- a/core/src/services/obs/core.rs +++ b/core/src/services/obs/core.rs @@ -241,8 +241,8 @@ impl ObsCore { &self, path: &str, position: u64, - size: usize, - args: &OpAppend, + size: u64, + args: &OpWrite, body: AsyncBody, ) -> Result> { let p = build_abs_path(&self.root, path); @@ -388,7 +388,7 @@ impl ObsCore { &self, path: &str, upload_id: &str, - parts: &[CompleteMultipartUploadRequestPart], + parts: Vec, ) -> Result> { let p = build_abs_path(&self.root, path); let url = format!( diff --git a/core/src/services/obs/mod.rs b/core/src/services/obs/mod.rs index f6fc0606f47..9e44b65ef0c 100644 --- a/core/src/services/obs/mod.rs +++ b/core/src/services/obs/mod.rs @@ -18,7 +18,6 @@ mod backend; pub use backend::ObsBuilder as Obs; -mod appender; mod core; mod error; mod pager; diff --git a/core/src/services/obs/writer.rs b/core/src/services/obs/writer.rs index 7ba992f3ab5..efcfd3ab6cf 100644 --- a/core/src/services/obs/writer.rs +++ b/core/src/services/obs/writer.rs @@ -19,11 +19,11 @@ use std::sync::Arc; use async_trait::async_trait; use bytes::Buf; -use bytes::Bytes; use http::StatusCode; use super::core::*; use super::error::parse_error; +use crate::raw::oio::MultipartUploadPart; use crate::raw::*; use crate::*; @@ -32,29 +32,20 @@ pub struct ObsWriter { op: OpWrite, path: String, - upload_id: Option, - - parts: Vec, - buffer: oio::VectorCursor, - buffer_size: usize, } impl ObsWriter { pub fn new(core: Arc, path: &str, op: OpWrite) -> Self { - let buffer_size = core.write_min_size; ObsWriter { core, path: path.to_string(), op, - - upload_id: None, - parts: vec![], - buffer: oio::VectorCursor::new(), - buffer_size, } } - - async fn write_oneshot(&self, size: u64, body: AsyncBody) -> Result<()> { +} +#[async_trait] +impl oio::MultipartUploadWrite for ObsWriter { + async fn write_once(&self, size: u64, body: AsyncBody) -> Result<()> { let mut req = self.core.obs_put_object_request( &self.path, Some(size), @@ -78,7 +69,7 @@ impl ObsWriter { } } - async fn initiate_upload(&self) -> Result { + async fn initiate_part(&self) -> Result { let resp = self .core .obs_initiate_multipart_upload(&self.path, self.op.content_type()) @@ -102,20 +93,16 @@ impl ObsWriter { async fn write_part( &self, upload_id: &str, - bs: Bytes, - ) -> Result { + part_number: usize, + size: u64, + body: AsyncBody, + ) -> Result { // Obs service requires part number must between [1..=10000] - let part_number = self.parts.len() + 1; + let part_number = part_number + 1; let resp = self .core - .obs_upload_part_request( - &self.path, - upload_id, - part_number, - Some(bs.len() as u64), - AsyncBody::Bytes(bs), - ) + .obs_upload_part_request(&self.path, upload_id, part_number, Some(size), body) .await?; let status = resp.status(); @@ -133,78 +120,39 @@ impl ObsWriter { resp.into_body().consume().await?; - Ok(CompleteMultipartUploadRequestPart { part_number, etag }) + Ok(MultipartUploadPart { part_number, etag }) } _ => Err(parse_error(resp).await?), } } -} -#[async_trait] -impl oio::Write for ObsWriter { - async fn write(&mut self, bs: Bytes) -> Result<()> { - let upload_id = match &self.upload_id { - Some(upload_id) => upload_id, - None => { - if self.op.content_length().unwrap_or_default() == bs.len() as u64 { - return self - .write_oneshot(bs.len() as u64, AsyncBody::Bytes(bs)) - .await; - } else { - let upload_id = self.initiate_upload().await?; - self.upload_id = Some(upload_id); - self.upload_id.as_deref().unwrap() - } - } - }; + async fn complete_part(&self, upload_id: &str, parts: &[MultipartUploadPart]) -> Result<()> { + let parts = parts + .iter() + .map(|p| CompleteMultipartUploadRequestPart { + part_number: p.part_number, + etag: p.etag.clone(), + }) + .collect(); - // Ignore empty bytes - if bs.is_empty() { - return Ok(()); - } + let resp = self + .core + .obs_complete_multipart_upload(&self.path, upload_id, parts) + .await?; - self.buffer.push(bs); - // Return directly if the buffer is not full - if self.buffer.len() <= self.buffer_size { - return Ok(()); - } + let status = resp.status(); - let bs = self.buffer.peak_at_least(self.buffer_size); - let size = bs.len(); + match status { + StatusCode::OK => { + resp.into_body().consume().await?; - match self.write_part(upload_id, bs).await { - Ok(part) => { - self.buffer.take(size); - self.parts.push(part); Ok(()) } - Err(e) => { - // If the upload fails, we should pop the given bs to make sure - // write is re-enter safe. - self.buffer.pop(); - Err(e) - } - } - } - - async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> { - if self.op.content_length().unwrap_or_default() == size { - return self.write_oneshot(size, AsyncBody::Stream(s)).await; - } else { - return Err(Error::new( - ErrorKind::Unsupported, - "Obs does not support streaming multipart upload", - )); + _ => Err(parse_error(resp).await?), } } - async fn abort(&mut self) -> Result<()> { - let upload_id = if let Some(upload_id) = &self.upload_id { - upload_id - } else { - return Ok(()); - }; - + async fn abort_part(&self, upload_id: &str) -> Result<()> { let resp = self .core .obs_abort_multipart_upload(&self.path, upload_id) @@ -219,42 +167,42 @@ impl oio::Write for ObsWriter { _ => Err(parse_error(resp).await?), } } +} - async fn close(&mut self) -> Result<()> { - let upload_id = if let Some(upload_id) = &self.upload_id { - upload_id - } else { - return Ok(()); - }; - - // Make sure internal buffer has been flushed. - if !self.buffer.is_empty() { - let bs = self.buffer.peak_exact(self.buffer.len()); - - match self.write_part(upload_id, bs).await { - Ok(part) => { - self.buffer.clear(); - self.parts.push(part); - } - Err(e) => { - return Err(e); - } +#[async_trait] +impl oio::AppendObjectWrite for ObsWriter { + async fn offset(&self) -> Result { + let resp = self.core.obs_head_object(&self.path, None, None).await?; + + let status = resp.status(); + match status { + StatusCode::OK => { + let content_length = parse_content_length(resp.headers())?.ok_or_else(|| { + Error::new( + ErrorKind::Unexpected, + "Content-Length not present in returning response", + ) + })?; + Ok(content_length) } + StatusCode::NOT_FOUND => Ok(0), + _ => Err(parse_error(resp).await?), } + } - let resp = self + async fn append(&self, offset: u64, size: u64, body: AsyncBody) -> Result<()> { + let mut req = self .core - .obs_complete_multipart_upload(&self.path, upload_id, &self.parts) - .await?; + .obs_append_object_request(&self.path, offset, size, &self.op, body)?; + + self.core.sign(&mut req).await?; + + let resp = self.core.send(req).await?; let status = resp.status(); match status { - StatusCode::OK => { - resp.into_body().consume().await?; - - Ok(()) - } + StatusCode::OK => Ok(()), _ => Err(parse_error(resp).await?), } } diff --git a/core/src/services/onedrive/backend.rs b/core/src/services/onedrive/backend.rs index 236dae03b18..5590a7a374d 100644 --- a/core/src/services/onedrive/backend.rs +++ b/core/src/services/onedrive/backend.rs @@ -66,7 +66,6 @@ impl Accessor for OnedriveBackend { type BlockingReader = (); type Writer = OneDriveWriter; type BlockingWriter = (); - type Appender = (); type Pager = OnedrivePager; type BlockingPager = (); diff --git a/core/src/services/oss/appender.rs b/core/src/services/oss/appender.rs deleted file mode 100644 index 2edbda5fb85..00000000000 --- a/core/src/services/oss/appender.rs +++ /dev/null @@ -1,142 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use std::sync::Arc; - -use async_trait::async_trait; -use bytes::Bytes; -use http::StatusCode; - -use super::core::*; -use super::error::parse_error; -use crate::raw::*; -use crate::*; - -pub const X_OSS_NEXT_APPEND_POSITION: &str = "x-oss-next-append-position"; - -pub struct OssAppender { - core: Arc, - - op: OpAppend, - path: String, - - position: Option, -} - -impl OssAppender { - pub fn new(core: Arc, path: &str, op: OpAppend) -> Self { - Self { - core, - op, - path: path.to_string(), - position: None, - } - } -} - -#[async_trait] -impl oio::Append for OssAppender { - async fn append(&mut self, bs: Bytes) -> Result<()> { - // If the position is not set, we need to get the current position. - if self.position.is_none() { - let resp = self.core.oss_head_object(&self.path, None, None).await?; - - let status = resp.status(); - match status { - StatusCode::OK => { - let position = resp - .headers() - .get(X_OSS_NEXT_APPEND_POSITION) - .and_then(|v| v.to_str().ok()) - .and_then(|v| v.parse::().ok()) - .ok_or_else(|| { - Error::new( - ErrorKind::Unexpected, - "missing x-oss-next-append-position, the object may not be appendable", - ) - })?; - self.position = Some(position); - } - StatusCode::NOT_FOUND => { - self.position = Some(0); - } - _ => { - return Err(parse_error(resp).await?); - } - } - } - - let mut req = self.core.oss_append_object_request( - &self.path, - self.position.expect("position is not set"), - bs.len(), - &self.op, - AsyncBody::Bytes(bs), - )?; - - self.core.sign(&mut req).await?; - - let resp = self.core.send(req).await?; - - let status = resp.status(); - - match status { - StatusCode::OK => { - let position = resp - .headers() - .get(X_OSS_NEXT_APPEND_POSITION) - .and_then(|v| v.to_str().ok()) - .and_then(|v| v.parse::().ok()) - .ok_or_else(|| { - Error::new( - ErrorKind::Unexpected, - "missing x-oss-next-append-position, the object may not be appendable", - ) - })?; - self.position = Some(position); - Ok(()) - } - StatusCode::CONFLICT => { - // The object is not appendable or the position is not match with the object's length. - // If the position is not match, we could get the current position and retry. - let position = resp - .headers() - .get(X_OSS_NEXT_APPEND_POSITION) - .and_then(|v| v.to_str().ok()) - .and_then(|v| v.parse::().ok()) - .ok_or_else(|| { - Error::new( - ErrorKind::Unexpected, - "missing x-oss-next-append-position, the object may not be appendable", - ) - })?; - self.position = Some(position); - - // Then return the error to the caller, so the caller could retry. - Err(Error::new( - ErrorKind::ConditionNotMatch, - "the position is not match with the object's length. position has been updated.", - )) - } - _ => Err(parse_error(resp).await?), - } - } - - async fn close(&mut self) -> Result<()> { - Ok(()) - } -} diff --git a/core/src/services/oss/backend.rs b/core/src/services/oss/backend.rs index 65156ccc393..6704d158b79 100644 --- a/core/src/services/oss/backend.rs +++ b/core/src/services/oss/backend.rs @@ -30,7 +30,6 @@ use reqsign::AliyunConfig; use reqsign::AliyunLoader; use reqsign::AliyunOssSigner; -use super::appender::OssAppender; use super::core::*; use super::error::parse_error; use super::pager::OssPager; @@ -386,9 +385,11 @@ pub struct OssBackend { impl Accessor for OssBackend { type Reader = IncomingAsyncBody; type BlockingReader = (); - type Writer = oio::MultipartUploadWriter; + type Writer = oio::TwoWaysWriter< + oio::MultipartUploadWriter, + oio::AppendObjectWriter, + >; type BlockingWriter = (); - type Appender = OssAppender; type Pager = OssPager; type BlockingPager = (); @@ -409,6 +410,7 @@ impl Accessor for OssBackend { read_with_if_none_match: true, write: true, + write_can_append: true, write_can_sink: true, write_with_cache_control: true, write_with_content_type: true, @@ -418,11 +420,6 @@ impl Accessor for OssBackend { create_dir: true, copy: true, - append: true, - append_with_cache_control: true, - append_with_content_type: true, - append_with_content_disposition: true, - list: true, list_with_delimiter_slash: true, list_without_delimiter: true, @@ -481,17 +478,21 @@ impl Accessor for OssBackend { } async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> { - Ok(( - RpWrite::default(), - OssWriter::new(self.core.clone(), path, args), - )) - } + let writer = OssWriter::new(self.core.clone(), path, args.clone()); - async fn append(&self, path: &str, args: OpAppend) -> Result<(RpAppend, Self::Appender)> { - Ok(( - RpAppend::default(), - OssAppender::new(self.core.clone(), path, args), - )) + let tw = if args.append() { + let w = + oio::AppendObjectWriter::new(writer).with_write_min_size(self.core.write_min_size); + + oio::TwoWaysWriter::Right(w) + } else { + let w = oio::MultipartUploadWriter::new(writer, args.content_length()) + .with_write_min_size(self.core.write_min_size); + + oio::TwoWaysWriter::Left(w) + }; + + return Ok((RpWrite::default(), tw)); } async fn copy(&self, from: &str, to: &str, _args: OpCopy) -> Result { diff --git a/core/src/services/oss/core.rs b/core/src/services/oss/core.rs index 11279f9e115..6c570fb1770 100644 --- a/core/src/services/oss/core.rs +++ b/core/src/services/oss/core.rs @@ -190,8 +190,8 @@ impl OssCore { &self, path: &str, position: u64, - size: usize, - args: &OpAppend, + size: u64, + args: &OpWrite, body: AsyncBody, ) -> Result> { let p = build_abs_path(&self.root, path); diff --git a/core/src/services/oss/mod.rs b/core/src/services/oss/mod.rs index 21829ea51dd..9bfd1692c7d 100644 --- a/core/src/services/oss/mod.rs +++ b/core/src/services/oss/mod.rs @@ -18,7 +18,6 @@ mod backend; pub use backend::OssBuilder as Oss; -mod appender; mod core; mod error; mod pager; diff --git a/core/src/services/oss/writer.rs b/core/src/services/oss/writer.rs index f2f737d65e7..9faad249f02 100644 --- a/core/src/services/oss/writer.rs +++ b/core/src/services/oss/writer.rs @@ -34,19 +34,12 @@ pub struct OssWriter { } impl OssWriter { - pub fn new( - core: Arc, - path: &str, - op: OpWrite, - ) -> oio::MultipartUploadWriter { - let write_min_size = core.write_min_size; - let total_size = op.content_length(); - let oss_writer = OssWriter { + pub fn new(core: Arc, path: &str, op: OpWrite) -> Self { + OssWriter { core, path: path.to_string(), op, - }; - oio::MultipartUploadWriter::new(oss_writer, total_size).with_write_min_size(write_min_size) + } } } @@ -186,3 +179,42 @@ impl oio::MultipartUploadWrite for OssWriter { } } } + +#[async_trait] +impl oio::AppendObjectWrite for OssWriter { + async fn offset(&self) -> Result { + let resp = self.core.oss_head_object(&self.path, None, None).await?; + + let status = resp.status(); + match status { + StatusCode::OK => { + let content_length = parse_content_length(resp.headers())?.ok_or_else(|| { + Error::new( + ErrorKind::Unexpected, + "Content-Length not present in returning response", + ) + })?; + Ok(content_length) + } + StatusCode::NOT_FOUND => Ok(0), + _ => Err(parse_error(resp).await?), + } + } + + async fn append(&self, offset: u64, size: u64, body: AsyncBody) -> Result<()> { + let mut req = self + .core + .oss_append_object_request(&self.path, offset, size, &self.op, body)?; + + self.core.sign(&mut req).await?; + + let resp = self.core.send(req).await?; + + let status = resp.status(); + + match status { + StatusCode::OK => Ok(()), + _ => Err(parse_error(resp).await?), + } + } +} diff --git a/core/src/services/s3/backend.rs b/core/src/services/s3/backend.rs index 54220b71144..83ff8a7d0ac 100644 --- a/core/src/services/s3/backend.rs +++ b/core/src/services/s3/backend.rs @@ -893,7 +893,6 @@ impl Accessor for S3Backend { type BlockingReader = (); type Writer = oio::MultipartUploadWriter; type BlockingWriter = (); - type Appender = (); type Pager = S3Pager; type BlockingPager = (); diff --git a/core/src/services/sftp/backend.rs b/core/src/services/sftp/backend.rs index 425568d1384..6094960125c 100644 --- a/core/src/services/sftp/backend.rs +++ b/core/src/services/sftp/backend.rs @@ -225,7 +225,6 @@ impl Accessor for SftpBackend { type BlockingReader = (); type Writer = SftpWriter; type BlockingWriter = (); - type Appender = SftpWriter; type Pager = Option; type BlockingPager = (); @@ -251,7 +250,6 @@ impl Accessor for SftpBackend { copy: self.copyable, rename: true, - append: true, ..Default::default() }); @@ -320,23 +318,7 @@ impl Accessor for SftpBackend { Ok((RpRead::new(end - start), r)) } - async fn write(&self, path: &str, _: OpWrite) -> Result<(RpWrite, Self::Writer)> { - if let Some((dir, _)) = path.rsplit_once('/') { - self.create_dir(dir, OpCreateDir::default()).await?; - } - - let client = self.connect().await?; - - let mut fs = client.fs(); - fs.set_cwd(&self.root); - let path = fs.canonicalize(path).await?; - - let file = client.create(&path).await?; - - Ok((RpWrite::new(), SftpWriter::new(file))) - } - - async fn append(&self, path: &str, _: OpAppend) -> Result<(RpAppend, Self::Appender)> { + async fn write(&self, path: &str, op: OpWrite) -> Result<(RpWrite, Self::Writer)> { if let Some((dir, _)) = path.rsplit_once('/') { self.create_dir(dir, OpCreateDir::default()).await?; } @@ -348,11 +330,16 @@ impl Accessor for SftpBackend { let path = fs.canonicalize(path).await?; let mut option = client.options(); - option.append(true).create(true); + option.create(true); + if op.append() { + option.append(true); + } else { + option.write(true); + } let file = option.open(path).await?; - Ok((RpAppend::new(), SftpWriter::new(file))) + Ok((RpWrite::new(), SftpWriter::new(file))) } async fn copy(&self, from: &str, to: &str, _: OpCopy) -> Result { diff --git a/core/src/services/sftp/writer.rs b/core/src/services/sftp/writer.rs index 7007d062b2a..76da70da3f8 100644 --- a/core/src/services/sftp/writer.rs +++ b/core/src/services/sftp/writer.rs @@ -60,16 +60,3 @@ impl oio::Write for SftpWriter { Ok(()) } } - -#[async_trait] -impl oio::Append for SftpWriter { - async fn append(&mut self, bs: Bytes) -> Result<()> { - self.file.write_all(&bs).await?; - - Ok(()) - } - - async fn close(&mut self) -> Result<()> { - Ok(()) - } -} diff --git a/core/src/services/supabase/backend.rs b/core/src/services/supabase/backend.rs index 18ef035e846..3872623c2c0 100644 --- a/core/src/services/supabase/backend.rs +++ b/core/src/services/supabase/backend.rs @@ -208,7 +208,6 @@ impl Accessor for SupabaseBackend { type BlockingReader = (); type Writer = SupabaseWriter; type BlockingWriter = (); - type Appender = (); // todo: implement Pager to support list and scan type Pager = (); type BlockingPager = (); diff --git a/core/src/services/vercel_artifacts/backend.rs b/core/src/services/vercel_artifacts/backend.rs index 866cb28cd33..34501827bd2 100644 --- a/core/src/services/vercel_artifacts/backend.rs +++ b/core/src/services/vercel_artifacts/backend.rs @@ -48,7 +48,6 @@ impl Accessor for VercelArtifactsBackend { type BlockingReader = (); type Writer = VercelArtifactsWriter; type BlockingWriter = (); - type Appender = (); type Pager = (); type BlockingPager = (); diff --git a/core/src/services/wasabi/backend.rs b/core/src/services/wasabi/backend.rs index 7aabc55c44d..a8a8da91cc8 100644 --- a/core/src/services/wasabi/backend.rs +++ b/core/src/services/wasabi/backend.rs @@ -893,7 +893,6 @@ impl Accessor for WasabiBackend { type BlockingReader = (); type Writer = WasabiWriter; type BlockingWriter = (); - type Appender = (); type Pager = WasabiPager; type BlockingPager = (); diff --git a/core/src/services/webdav/backend.rs b/core/src/services/webdav/backend.rs index d472153fabf..b4fef1db597 100644 --- a/core/src/services/webdav/backend.rs +++ b/core/src/services/webdav/backend.rs @@ -274,7 +274,6 @@ impl Accessor for WebdavBackend { type BlockingReader = (); type Writer = WebdavWriter; type BlockingWriter = (); - type Appender = (); type Pager = Option; type BlockingPager = (); diff --git a/core/src/services/webhdfs/backend.rs b/core/src/services/webhdfs/backend.rs index a8a17f0689a..4167e1aee1d 100644 --- a/core/src/services/webhdfs/backend.rs +++ b/core/src/services/webhdfs/backend.rs @@ -401,7 +401,6 @@ impl Accessor for WebhdfsBackend { type BlockingReader = (); type Writer = WebhdfsWriter; type BlockingWriter = (); - type Appender = (); type Pager = WebhdfsPager; type BlockingPager = (); diff --git a/core/src/types/appender.rs b/core/src/types/appender.rs deleted file mode 100644 index 7ebe3358b35..00000000000 --- a/core/src/types/appender.rs +++ /dev/null @@ -1,252 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use std::fmt::Display; -use std::io; -use std::pin::Pin; -use std::task::ready; -use std::task::Context; -use std::task::Poll; - -use bytes::Bytes; -use futures::future::BoxFuture; -use futures::AsyncWrite; -use futures::FutureExt; - -use crate::raw::oio::Append; -use crate::raw::*; -use crate::*; - -/// Appender is designed to append data into given path in an asynchronous -/// manner. -/// -/// ## Notes -/// -/// Please make sure either `close`` has been called before -/// dropping the appender otherwise the data could be lost. -/// -/// ## Notes -/// -/// Appender always append data into the end of the file, so it's not -/// possible to overwrite existing data. -/// -/// Appender did not know the size of the data to be written, so it will -/// always be `unsized`. -pub struct Appender { - state: State, -} - -/// # Safety -/// -/// Appender will only be accessed by `&mut Self` -unsafe impl Sync for Appender {} - -impl Appender { - /// Create a new appender. - /// - /// Create will use internal information to decide the most suitable - /// implementation for users. - /// - /// We don't want to expose those details to users so keep this function - /// in crate only. - pub(crate) async fn create(acc: FusedAccessor, path: &str, op: OpAppend) -> Result { - let (_, a) = acc.append(path, op).await?; - - Ok(Appender { - state: State::Idle(Some(a)), - }) - } - - /// Write into inner appender. - pub async fn append(&mut self, bs: impl Into) -> Result<()> { - if let State::Idle(Some(a)) = &mut self.state { - a.append(bs.into()).await - } else { - unreachable!( - "appender state invalid while write, expect Idle, actual {}", - self.state - ); - } - } - - /// Close the appender and make sure all data have been committed. - /// - /// ## Notes - /// - /// Close should only be called when the appender is not closed, - /// otherwise an unexpected error could be returned. - pub async fn close(&mut self) -> Result<()> { - if let State::Idle(Some(a)) = &mut self.state { - a.close().await - } else { - unreachable!( - "appender state invalid while close, expect Idle, actual {}", - self.state - ); - } - } -} - -#[allow(dead_code)] -enum State { - Idle(Option), - Write(BoxFuture<'static, Result<(usize, oio::Appender)>>), - Close(BoxFuture<'static, Result>), -} - -impl Display for State { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - State::Idle(_) => write!(f, "Idle"), - State::Write(_) => write!(f, "Write"), - State::Close(_) => write!(f, "Close"), - } - } -} - -impl AsyncWrite for Appender { - fn poll_write( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &[u8], - ) -> Poll> { - loop { - match &mut self.state { - State::Idle(a) => { - let mut a = a - .take() - .expect("invalid state of appender: poll_write with State::Idle"); - let bs = Bytes::from(buf.to_vec()); - let size = bs.len(); - let ut = async move { - a.append(bs).await?; - Ok((size, a)) - }; - self.state = State::Write(Box::pin(ut)); - } - State::Write(fut) => match ready!(fut.poll_unpin(cx)) { - Ok((size, a)) => { - self.state = State::Idle(Some(a)); - return Poll::Ready(Ok(size)); - } - Err(err) => return Poll::Ready(Err(io::Error::new(io::ErrorKind::Other, err))), - }, - - State::Close(_) => { - unreachable!("invalid state of appender: poll_write with State::Close") - } - } - } - } - - fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { - Poll::Ready(Ok(())) - } - - fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - loop { - match &mut self.state { - State::Idle(a) => { - let mut a = a - .take() - .expect("invalid state of appender: Idle state with empty append"); - let fut = async move { - a.close().await?; - Ok(a) - }; - self.state = State::Close(Box::pin(fut)); - } - State::Write(_) => { - unreachable!("invalid state of appender: poll_close with State::Write") - } - State::Close(fut) => match ready!(fut.poll_unpin(cx)) { - Ok(a) => { - self.state = State::Idle(Some(a)); - return Poll::Ready(Ok(())); - } - Err(err) => return Poll::Ready(Err(io::Error::new(io::ErrorKind::Other, err))), - }, - } - } - } -} - -impl tokio::io::AsyncWrite for Appender { - fn poll_write( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &[u8], - ) -> Poll> { - loop { - match &mut self.state { - State::Idle(a) => { - let mut a = a - .take() - .expect("invalid state of appender: Idle state with empty append"); - let bs = Bytes::from(buf.to_vec()); - let size = bs.len(); - let fut = async move { - a.append(bs).await?; - Ok((size, a)) - }; - self.state = State::Write(Box::pin(fut)); - } - State::Write(fut) => match ready!(fut.poll_unpin(cx)) { - Ok((size, a)) => { - self.state = State::Idle(Some(a)); - return Poll::Ready(Ok(size)); - } - Err(err) => return Poll::Ready(Err(io::Error::new(io::ErrorKind::Other, err))), - }, - State::Close(_) => { - unreachable!("invalid state of appender: poll_write with State::Close") - } - }; - } - } - - fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { - Poll::Ready(Ok(())) - } - - fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - loop { - match &mut self.state { - State::Idle(a) => { - let mut a = a - .take() - .expect("invalid state of appender: Idle state with empty append"); - let fut = async move { - a.close().await?; - Ok(a) - }; - self.state = State::Close(Box::pin(fut)); - } - State::Write(_) => { - unreachable!("invalid state of appender: poll_close with State::Write") - } - State::Close(fut) => match ready!(fut.poll_unpin(cx)) { - Ok(a) => { - self.state = State::Idle(Some(a)); - return Poll::Ready(Ok(())); - } - Err(err) => return Poll::Ready(Err(io::Error::new(io::ErrorKind::Other, err))), - }, - } - } - } -} diff --git a/core/src/types/capability.rs b/core/src/types/capability.rs index de91bc0765f..d8baab96c50 100644 --- a/core/src/types/capability.rs +++ b/core/src/types/capability.rs @@ -46,36 +46,38 @@ use std::fmt::Debug; /// - Operation with limitations should be named like `batch_max_operations`. #[derive(Copy, Clone, Default)] pub struct Capability { - /// If operator supports stat natively, it will be true. + /// If operator supports stat , it will be true. pub stat: bool, - /// If operator supports stat with if match natively, it will be true. + /// If operator supports stat with if match , it will be true. pub stat_with_if_match: bool, - /// If operator supports stat with if none match natively, it will be true. + /// If operator supports stat with if none match , it will be true. pub stat_with_if_none_match: bool, - /// If operator supports read natively, it will be true. + /// If operator supports read , it will be true. pub read: bool, - /// If operator supports seek on returning reader natively, it will + /// If operator supports seek on returning reader , it will /// be true. pub read_can_seek: bool, - /// If operator supports next on returning reader natively, it will + /// If operator supports next on returning reader , it will /// be true. pub read_can_next: bool, - /// If operator supports read with range natively, it will be true. + /// If operator supports read with range , it will be true. pub read_with_range: bool, - /// If operator supports read with if match natively, it will be true. + /// If operator supports read with if match , it will be true. pub read_with_if_match: bool, - /// If operator supports read with if none match natively, it will be true. + /// If operator supports read with if none match , it will be true. pub read_with_if_none_match: bool, - /// if operator supports read with override cache control natively, it will be true. + /// if operator supports read with override cache control , it will be true. pub read_with_override_cache_control: bool, - /// if operator supports read with override content disposition natively, it will be true. + /// if operator supports read with override content disposition , it will be true. pub read_with_override_content_disposition: bool, - /// if operator supports read with override content type natively, it will be true. + /// if operator supports read with override content type , it will be true. pub read_with_override_content_type: bool, - /// If operator supports write natively, it will be true. + /// If operator supports write , it will be true. pub write: bool, + /// If operator supports write by append, it will be true. + pub write_can_append: bool, /// If operator supports write by sink a stream into, it will be true. pub write_can_sink: bool, /// If operator supports write with without content length, it will @@ -83,35 +85,26 @@ pub struct Capability { /// /// This feature also be called as `Unsized` write or streaming write. pub write_without_content_length: bool, - /// If operator supports write with content type natively, it will be true. + /// If operator supports write with content type , it will be true. pub write_with_content_type: bool, - /// If operator supports write with content disposition natively, it will be true. + /// If operator supports write with content disposition , it will be true. pub write_with_content_disposition: bool, - /// If operator supports write with cache control natively, it will be true. + /// If operator supports write with cache control , it will be true. pub write_with_cache_control: bool, - /// If operator supports append natively, it will be true. - pub append: bool, - /// If operator supports append with content type natively, it will be true. - pub append_with_content_type: bool, - /// If operator supports append with content disposition natively, it will be true. - pub append_with_content_disposition: bool, - /// If operator supports append with cache control natively, it will be true. - pub append_with_cache_control: bool, - - /// If operator supports create dir natively, it will be true. + /// If operator supports create dir , it will be true. pub create_dir: bool, - /// If operator supports delete natively, it will be true. + /// If operator supports delete , it will be true. pub delete: bool, - /// If operator supports copy natively, it will be true. + /// If operator supports copy , it will be true. pub copy: bool, - /// If operator supports rename natively, it will be true. + /// If operator supports rename , it will be true. pub rename: bool, - /// If operator supports list natively, it will be true. + /// If operator supports list , it will be true. pub list: bool, /// If backend supports list with limit, it will be true. pub list_with_limit: bool, @@ -122,23 +115,23 @@ pub struct Capability { /// If backend supports list without delimiter. pub list_without_delimiter: bool, - /// If operator supports presign natively, it will be true. + /// If operator supports presign , it will be true. pub presign: bool, - /// If operator supports presign read natively, it will be true. + /// If operator supports presign read , it will be true. pub presign_read: bool, - /// If operator supports presign stat natively, it will be true. + /// If operator supports presign stat , it will be true. pub presign_stat: bool, - /// If operator supports presign write natively, it will be true. + /// If operator supports presign write , it will be true. pub presign_write: bool, - /// If operator supports batch natively, it will be true. + /// If operator supports batch , it will be true. pub batch: bool, - /// If operator supports batch delete natively, it will be true. + /// If operator supports batch delete , it will be true. pub batch_delete: bool, /// The max operations that operator supports in batch. pub batch_max_operations: Option, - /// If operator supports blocking natively, it will be true. + /// If operator supports blocking , it will be true. pub blocking: bool, } @@ -155,9 +148,6 @@ impl Debug for Capability { if self.write { s.push("Write"); } - if self.append { - s.push("Append"); - } if self.create_dir { s.push("CreateDir"); } diff --git a/core/src/types/mod.rs b/core/src/types/mod.rs index 11dd12683e5..0f75fa50c84 100644 --- a/core/src/types/mod.rs +++ b/core/src/types/mod.rs @@ -33,9 +33,6 @@ mod writer; pub use writer::BlockingWriter; pub use writer::Writer; -mod appender; -pub use appender::Appender; - mod list; pub use list::BlockingLister; pub use list::Lister; diff --git a/core/src/types/operator/operator.rs b/core/src/types/operator/operator.rs index 9df7c7bbe42..b65eb8698c5 100644 --- a/core/src/types/operator/operator.rs +++ b/core/src/types/operator/operator.rs @@ -536,32 +536,6 @@ impl Operator { self.write_with(path, bs).await } - /// Append bytes into path. - /// - /// # Notes - /// - /// - Append will make sure all bytes has been written, or an error will be returned. - /// - Append will create the file if it does not exist. - /// - Append always write bytes to the end of the file. - /// - /// # Examples - /// - /// ``` - /// # use std::io::Result; - /// # use opendal::Operator; - /// use bytes::Bytes; - /// - /// # #[tokio::main] - /// # async fn test(op: Operator) -> Result<()> { - /// op.append("path/to/file", vec![0; 4096]).await?; - /// # Ok(()) - /// # } - /// ``` - pub async fn append(&self, path: &str, bs: impl Into) -> Result<()> { - let bs = bs.into(); - self.append_with(path, bs).await - } - /// Copy a file from `from` to `to`. /// /// # Notes @@ -813,140 +787,6 @@ impl Operator { fut } - /// Append multiple bytes into path. - /// - /// Refer to [`Appender`] for more details. - /// - /// # Examples - /// - /// ``` - /// # use std::io::Result; - /// # use opendal::Operator; - /// use bytes::Bytes; - /// - /// # #[tokio::main] - /// # async fn test(op: Operator) -> Result<()> { - /// let mut a = op.appender("path/to/file").await?; - /// a.append(vec![0; 4096]).await?; - /// a.append(vec![1; 4096]).await?; - /// a.close().await?; - /// # Ok(()) - /// # } - /// ``` - pub async fn appender(&self, path: &str) -> Result { - self.appender_with(path).await - } - - /// Append multiple bytes into path with extra options. - /// - /// Refer to [`Appender`] for more details. - /// - /// # Examples - /// - /// ``` - /// # use std::io::Result; - /// # use opendal::Operator; - /// use bytes::Bytes; - /// - /// # #[tokio::main] - /// # async fn test(op: Operator) -> Result<()> { - /// let mut a = op - /// .appender_with("path/to/file") - /// .content_type("application/octet-stream") - /// .await?; - /// a.append(vec![0; 4096]).await?; - /// a.append(vec![1; 4096]).await?; - /// a.close().await?; - /// # Ok(()) - /// # } - /// ``` - pub fn appender_with(&self, path: &str) -> FutureAppender { - let path = normalize_path(path); - - let fut = FutureAppender(OperatorFuture::new( - self.inner().clone(), - path, - OpAppend::default(), - |inner, path, args| { - let fut = async move { - if !validate_path(&path, EntryMode::FILE) { - return Err(Error::new( - ErrorKind::IsADirectory, - "append path is a directory", - ) - .with_operation("Operator::appender") - .with_context("service", inner.info().scheme().into_static()) - .with_context("path", &path)); - } - let ap = Appender::create(inner, &path, args).await?; - Ok(ap) - }; - - Box::pin(fut) - }, - )); - - fut - } - - /// Append bytes with extra options. - /// - /// # Notes - /// - /// - Append will make sure all bytes has been written, or an error will be returned. - /// - Append will create the file if it does not exist. - /// - Append always write bytes to the end of the file. - /// - /// # Examples - /// - /// ``` - /// # use std::io::Result; - /// # use opendal::Operator; - /// use bytes::Bytes; - /// - /// # #[tokio::main] - /// # async fn test(op: Operator) -> Result<()> { - /// let bs = b"hello, world!".to_vec(); - /// let _ = op - /// .append_with("path/to/file", bs) - /// .content_type("text/plain") - /// .await?; - /// # Ok(()) - /// # } - /// ``` - pub fn append_with(&self, path: &str, bs: impl Into) -> FutureAppend { - let path = normalize_path(path); - let bs = bs.into(); - - let fut = FutureAppend(OperatorFuture::new( - self.inner().clone(), - path, - (OpAppend::default(), bs), - |inner, path, (args, bs)| { - let fut = async move { - if !validate_path(&path, EntryMode::FILE) { - return Err(Error::new( - ErrorKind::IsADirectory, - "append path is a directory", - ) - .with_operation("Operator::append_with") - .with_context("service", inner.info().scheme().into_static()) - .with_context("path", &path)); - } - let (_, mut a) = inner.append(&path, args).await?; - a.append(bs).await?; - a.close().await?; - - Ok(()) - }; - - Box::pin(fut) - }, - )); - - fut - } - /// Delete the given path. /// /// # Notes diff --git a/core/src/types/operator/operator_futures.rs b/core/src/types/operator/operator_futures.rs index 42ec83311d9..d3676f1ce2a 100644 --- a/core/src/types/operator/operator_futures.rs +++ b/core/src/types/operator/operator_futures.rs @@ -148,82 +148,6 @@ impl Future for FutureStat { } } -/// Future that generated by [`Operator::append_with`]. -/// -/// Users can add more options by public functions provided by this struct. -pub struct FutureAppend(pub(crate) OperatorFuture<(OpAppend, Bytes), ()>); - -impl FutureAppend { - /// Set the content type of option - pub fn content_type(mut self, v: &str) -> Self { - self.0 = self - .0 - .map_args(|(args, bs)| (args.with_content_type(v), bs)); - self - } - - /// Set the content disposition of option - pub fn content_disposition(mut self, v: &str) -> Self { - self.0 = self - .0 - .map_args(|(args, bs)| (args.with_content_disposition(v), bs)); - self - } - - /// Set the cache control of option - pub fn cache_control(mut self, v: &str) -> Self { - self.0 = self - .0 - .map_args(|(args, bs)| (args.with_cache_control(v), bs)); - self - } -} - -impl Future for FutureAppend { - type Output = Result<()>; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - self.0.poll_unpin(cx) - } -} - -/// Future that generated by [`Operation.appender_with`]. -/// -/// Users can add more options by public functions provided by this struct. -pub struct FutureAppender(pub(crate) OperatorFuture); - -impl FutureAppender { - /// Set the content type for this operation. - pub fn content_type(mut self, content_type: &str) -> Self { - self.0 = self.0.map_args(|args| args.with_content_type(content_type)); - self - } - - /// Set the content disposition for this operation. - pub fn content_disposition(mut self, content_disposition: &str) -> Self { - self.0 = self - .0 - .map_args(|args| args.with_content_disposition(content_disposition)); - self - } - - /// Set the cache control for this operation. - pub fn cache_control(mut self, cache_control: &str) -> Self { - self.0 = self - .0 - .map_args(|args| args.with_cache_control(cache_control)); - self - } -} - -impl Future for FutureAppender { - type Output = Result; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - self.0.poll_unpin(cx) - } -} - /// Future that generated by [`Operator::presign_read_with`]. /// /// Users can add more options by public functions provided by this struct. @@ -459,6 +383,18 @@ impl Future for FutureReader { pub struct FutureWrite(pub(crate) OperatorFuture<(OpWrite, Bytes), ()>); impl FutureWrite { + /// Set the append mode of op. + /// + /// If the append mode is set, the data will be appended to the end of the file. + /// + /// # Notes + /// + /// Service could return `Unsupported` if the underlying storage does not support append. + pub fn append(mut self, v: bool) -> Self { + self.0 = self.0.map_args(|(args, bs)| (args.with_append(v), bs)); + self + } + /// Set the content length of op. /// /// If the content length is not set, the content length will be @@ -509,6 +445,18 @@ impl Future for FutureWrite { pub struct FutureWriter(pub(crate) OperatorFuture); impl FutureWriter { + /// Set the append mode of op. + /// + /// If the append mode is set, the data will be appended to the end of the file. + /// + /// # Notes + /// + /// Service could return `Unsupported` if the underlying storage does not support append. + pub fn append(mut self, v: bool) -> Self { + self.0 = self.0.map_args(|args| args.with_append(v)); + self + } + /// Set the content length of op. /// /// If the content length is not set, the content length will be diff --git a/core/tests/behavior/append.rs b/core/tests/behavior/append.rs index 782a323c52b..533ca15b7a1 100644 --- a/core/tests/behavior/append.rs +++ b/core/tests/behavior/append.rs @@ -20,7 +20,6 @@ use std::vec; use anyhow::Result; use futures::io::BufReader; use futures::io::Cursor; -use log::warn; use sha2::Digest; use sha2::Sha256; @@ -29,7 +28,7 @@ use crate::*; pub fn behavior_append_tests(op: &Operator) -> Vec { let cap = op.info().full_capability(); - if !(cap.read && cap.write && cap.append) { + if !(cap.read && cap.write && cap.write_can_append) { return vec![]; } @@ -51,11 +50,13 @@ pub async fn test_append_create_append(op: Operator) -> Result<()> { let (content_one, size_one) = gen_bytes(); let (content_two, size_two) = gen_bytes(); - op.append(&path, content_one.clone()) + op.write_with(&path, content_one.clone()) + .append(true) .await .expect("append file first time must success"); - op.append(&path, content_two.clone()) + op.write_with(&path, content_two.clone()) + .append(true) .await .expect("append to an existing file must success"); @@ -75,7 +76,7 @@ pub async fn test_append_with_dir_path(op: Operator) -> Result<()> { let path = format!("{}/", uuid::Uuid::new_v4()); let (content, _) = gen_bytes(); - let res = op.append(&path, content).await; + let res = op.write_with(&path, content).append(true).await; assert!(res.is_err()); assert_eq!(res.unwrap_err().kind(), ErrorKind::IsADirectory); @@ -84,7 +85,7 @@ pub async fn test_append_with_dir_path(op: Operator) -> Result<()> { /// Test append with cache control must success. pub async fn test_append_with_cache_control(op: Operator) -> Result<()> { - if !op.info().full_capability().append_with_cache_control { + if !op.info().full_capability().write_with_cache_control { return Ok(()); } @@ -92,7 +93,8 @@ pub async fn test_append_with_cache_control(op: Operator) -> Result<()> { let (content, _) = gen_bytes(); let target_cache_control = "no-cache, no-store, max-age=300"; - op.append_with(&path, content) + op.write_with(&path, content) + .append(true) .cache_control(target_cache_control) .await?; @@ -110,7 +112,7 @@ pub async fn test_append_with_cache_control(op: Operator) -> Result<()> { /// Test append with content type must success. pub async fn test_append_with_content_type(op: Operator) -> Result<()> { - if !op.info().full_capability().append_with_content_type { + if !op.info().full_capability().write_with_content_type { return Ok(()); } @@ -118,7 +120,8 @@ pub async fn test_append_with_content_type(op: Operator) -> Result<()> { let (content, size) = gen_bytes(); let target_content_type = "application/json"; - op.append_with(&path, content) + op.write_with(&path, content) + .append(true) .content_type(target_content_type) .await?; @@ -137,7 +140,7 @@ pub async fn test_append_with_content_type(op: Operator) -> Result<()> { /// Write a single file with content disposition should succeed. pub async fn test_append_with_content_disposition(op: Operator) -> Result<()> { - if !op.info().full_capability().append_with_content_disposition { + if !op.info().full_capability().write_with_content_disposition { return Ok(()); } @@ -145,7 +148,8 @@ pub async fn test_append_with_content_disposition(op: Operator) -> Result<()> { let (content, size) = gen_bytes(); let target_content_disposition = "attachment; filename=\"filename.jpg\""; - op.append_with(&path, content) + op.write_with(&path, content) + .append(true) .content_disposition(target_content_disposition) .await?; @@ -168,14 +172,7 @@ pub async fn test_appender_futures_copy(op: Operator) -> Result<()> { let (content, size): (Vec, usize) = gen_bytes_with_range(10 * 1024 * 1024..20 * 1024 * 1024); - let mut a = match op.appender(&path).await { - Ok(a) => a, - Err(err) if err.kind() == ErrorKind::Unsupported => { - warn!("service doesn't support write with append"); - return Ok(()); - } - Err(err) => return Err(err.into()), - }; + let mut a = op.writer_with(&path).append(true).await?; // Wrap a buf reader here to make sure content is read in 1MiB chunks. let mut cursor = BufReader::with_capacity(1024 * 1024, Cursor::new(content.clone())); @@ -203,12 +200,12 @@ pub async fn test_fuzz_appender(op: Operator) -> Result<()> { let mut fuzzer = ObjectWriterFuzzer::new(&path, None); - let mut a = op.appender(&path).await?; + let mut a = op.writer_with(&path).append(true).await?; for _ in 0..100 { match fuzzer.fuzz() { ObjectWriterAction::Write(bs) => { - a.append(bs).await?; + a.write(bs).await?; } } } diff --git a/core/tests/behavior/write.rs b/core/tests/behavior/write.rs index 590ae6c067f..f60b6673bdc 100644 --- a/core/tests/behavior/write.rs +++ b/core/tests/behavior/write.rs @@ -1109,19 +1109,16 @@ pub async fn test_delete_stream(op: Operator) -> Result<()> { /// Append data into writer pub async fn test_writer_write(op: Operator) -> Result<()> { + if !(op.info().full_capability().write_without_content_length) { + return Ok(()); + } + let path = uuid::Uuid::new_v4().to_string(); let size = 5 * 1024 * 1024; // write file with 5 MiB let content_a = gen_fixed_bytes(size); let content_b = gen_fixed_bytes(size); - let mut w = match op.writer(&path).await { - Ok(w) => w, - Err(err) if err.kind() == ErrorKind::Unsupported => { - warn!("service doesn't support write with append"); - return Ok(()); - } - Err(err) => return Err(err.into()), - }; + let mut w = op.writer(&path).await?; w.write(content_a.clone()).await?; w.write(content_b.clone()).await?; w.close().await?; @@ -1228,18 +1225,15 @@ pub async fn test_writer_copy(op: Operator) -> Result<()> { /// Copy data from reader to writer pub async fn test_writer_futures_copy(op: Operator) -> Result<()> { + if !(op.info().full_capability().write_without_content_length) { + return Ok(()); + } + let path = uuid::Uuid::new_v4().to_string(); let (content, size): (Vec, usize) = gen_bytes_with_range(10 * 1024 * 1024..20 * 1024 * 1024); - let mut w = match op.writer(&path).await { - Ok(w) => w, - Err(err) if err.kind() == ErrorKind::Unsupported => { - warn!("service doesn't support write with append"); - return Ok(()); - } - Err(err) => return Err(err.into()), - }; + let mut w = op.writer(&path).await?; // Wrap a buf reader here to make sure content is read in 1MiB chunks. let mut cursor = BufReader::with_capacity(1024 * 1024, Cursor::new(content.clone()));