diff --git a/core/src/docs/internals/accessor.rs b/core/src/docs/internals/accessor.rs index d149457efea..b3e8c66384c 100644 --- a/core/src/docs/internals/accessor.rs +++ b/core/src/docs/internals/accessor.rs @@ -155,7 +155,7 @@ //! vendors that provide s3-like RESTful APIs, and our s3 service is //! implemented to support all of them, not just AWS S3. //! -//! Obviously, we can use `duck` as scheme, let's add a new variant in [`Scheme`], and implement all reqired functions like `Scheme::from_str` and `Scheme::into_static`: +//! Obviously, we can use `duck` as scheme, let's add a new variant in [`Scheme`], and implement all required functions like `Scheme::from_str` and `Scheme::into_static`: //! //! ```ignore //! pub enum Scheme { diff --git a/core/src/layers/complete.rs b/core/src/layers/complete.rs index 92e7c8147e8..491da8427d3 100644 --- a/core/src/layers/complete.rs +++ b/core/src/layers/complete.rs @@ -568,6 +568,27 @@ where Ok(()) } + async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> { + if let Some(total_size) = self.size { + if self.written + size > total_size { + return Err(Error::new( + ErrorKind::ContentTruncated, + &format!( + "writer got too much data, expect: {size}, actual: {}", + self.written + size + ), + )); + } + } + + let w = self.inner.as_mut().ok_or_else(|| { + Error::new(ErrorKind::Unexpected, "writer has been closed or aborted") + })?; + w.sink(size, s).await?; + self.written += size; + Ok(()) + } + async fn abort(&mut self) -> Result<()> { let w = self.inner.as_mut().ok_or_else(|| { Error::new(ErrorKind::Unexpected, "writer has been closed or aborted") diff --git a/core/src/layers/concurrent_limit.rs b/core/src/layers/concurrent_limit.rs index 30003785e83..1e41030cd0d 100644 --- a/core/src/layers/concurrent_limit.rs +++ b/core/src/layers/concurrent_limit.rs @@ -308,6 +308,10 @@ impl oio::Write for ConcurrentLimitWrapper { self.inner.abort().await } + async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> { + self.inner.sink(size, s).await + } + async fn close(&mut self) -> Result<()> { self.inner.close().await } diff --git a/core/src/layers/error_context.rs b/core/src/layers/error_context.rs index d7c604d907f..3e702417339 100644 --- a/core/src/layers/error_context.rs +++ b/core/src/layers/error_context.rs @@ -442,6 +442,14 @@ impl oio::Write for ErrorContextWrapper { }) } + async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> { + self.inner.sink(size, s).await.map_err(|err| { + err.with_operation(WriteOperation::Sink) + .with_context("service", self.scheme) + .with_context("path", &self.path) + }) + } + async fn close(&mut self) -> Result<()> { self.inner.close().await.map_err(|err| { err.with_operation(WriteOperation::Close) diff --git a/core/src/layers/logging.rs b/core/src/layers/logging.rs index 6f372c96f71..4ce842af5a7 100644 --- a/core/src/layers/logging.rs +++ b/core/src/layers/logging.rs @@ -1343,6 +1343,38 @@ impl oio::Write for LoggingWriter { } } + async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> { + match self.inner.sink(size, s).await { + Ok(_) => { + self.written += size; + trace!( + target: LOGGING_TARGET, + "service={} operation={} path={} written={} -> data sink {}B", + self.scheme, + WriteOperation::Sink, + self.path, + self.written, + size + ); + Ok(()) + } + Err(err) => { + if let Some(lvl) = self.failure_level { + log!( + target: LOGGING_TARGET, + lvl, + "service={} operation={} path={} written={} -> data sink failed: {err:?}", + self.scheme, + WriteOperation::Sink, + self.path, + self.written, + ) + } + Err(err) + } + } + } + async fn abort(&mut self) -> Result<()> { match self.inner.abort().await { Ok(_) => { diff --git a/core/src/layers/madsim.rs b/core/src/layers/madsim.rs index 49794fd6b58..40b18d78580 100644 --- a/core/src/layers/madsim.rs +++ b/core/src/layers/madsim.rs @@ -330,6 +330,13 @@ impl oio::Write for MadsimWriter { } } + async fn sink(&mut self, size: u64, s: oio::Streamer) -> crate::Result<()> { + Err(Error::new( + ErrorKind::Unsupported, + "will be supported in the future", + )) + } + async fn abort(&mut self) -> crate::Result<()> { Err(Error::new( ErrorKind::Unsupported, diff --git a/core/src/layers/metrics.rs b/core/src/layers/metrics.rs index 1cda0e8efc2..e3d0c539310 100644 --- a/core/src/layers/metrics.rs +++ b/core/src/layers/metrics.rs @@ -864,6 +864,17 @@ impl oio::Write for MetricWrapper { }) } + async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> { + self.inner + .sink(size, s) + .await + .map(|_| self.bytes += size) + .map_err(|err| { + self.handle.increment_errors_total(self.op, err.kind()); + err + }) + } + async fn abort(&mut self) -> Result<()> { self.inner.abort().await.map_err(|err| { self.handle.increment_errors_total(self.op, err.kind()); diff --git a/core/src/layers/minitrace.rs b/core/src/layers/minitrace.rs index c7a8d883690..c6084673158 100644 --- a/core/src/layers/minitrace.rs +++ b/core/src/layers/minitrace.rs @@ -323,6 +323,16 @@ impl oio::Write for MinitraceWrapper { .await } + async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> { + self.inner + .sink(size, s) + .in_span(Span::enter_with_parent( + WriteOperation::Sink.into_static(), + &self.span, + )) + .await + } + async fn abort(&mut self) -> Result<()> { self.inner .abort() diff --git a/core/src/layers/oteltrace.rs b/core/src/layers/oteltrace.rs index ba17cd25340..c5536201937 100644 --- a/core/src/layers/oteltrace.rs +++ b/core/src/layers/oteltrace.rs @@ -322,6 +322,10 @@ impl oio::Write for OtelTraceWrapper { self.inner.write(bs).await } + async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> { + self.inner.sink(size, s).await + } + async fn abort(&mut self) -> Result<()> { self.inner.abort().await } diff --git a/core/src/layers/prometheus.rs b/core/src/layers/prometheus.rs index 1a5842d2334..2ac2d47744a 100644 --- a/core/src/layers/prometheus.rs +++ b/core/src/layers/prometheus.rs @@ -684,6 +684,22 @@ impl oio::Write for PrometheusMetricWrapper { }) } + async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> { + self.inner + .sink(size, s) + .await + .map(|_| { + self.stats + .bytes_total + .with_label_values(&[&self.scheme, Operation::Write.into_static()]) + .observe(size as f64) + }) + .map_err(|err| { + self.stats.increment_errors_total(self.op, err.kind()); + err + }) + } + async fn abort(&mut self) -> Result<()> { self.inner.abort().await.map_err(|err| { self.stats.increment_errors_total(self.op, err.kind()); diff --git a/core/src/layers/retry.rs b/core/src/layers/retry.rs index cfafa35f090..55e81980745 100644 --- a/core/src/layers/retry.rs +++ b/core/src/layers/retry.rs @@ -648,6 +648,11 @@ impl oio::Write for RetryWrapper { } } + /// Sink will move the input stream, so we can't retry it. + async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> { + self.inner.sink(size, s).await + } + async fn abort(&mut self) -> Result<()> { let mut backoff = self.builder.build(); diff --git a/core/src/layers/timeout.rs b/core/src/layers/timeout.rs index d93047e8ac8..fb6967bd5b8 100644 --- a/core/src/layers/timeout.rs +++ b/core/src/layers/timeout.rs @@ -349,6 +349,19 @@ impl oio::Write for TimeoutWrapper { })? } + async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> { + let timeout = self.io_timeout(size); + + tokio::time::timeout(timeout, self.inner.sink(size, s)) + .await + .map_err(|_| { + Error::new(ErrorKind::Unexpected, "operation timeout") + .with_operation(WriteOperation::Sink) + .with_context("timeout", timeout.as_secs_f64().to_string()) + .set_temporary() + })? + } + async fn abort(&mut self) -> Result<()> { tokio::time::timeout(self.timeout, self.inner.abort()) .await diff --git a/core/src/layers/tracing.rs b/core/src/layers/tracing.rs index cc9fa34e320..4191f70edb8 100644 --- a/core/src/layers/tracing.rs +++ b/core/src/layers/tracing.rs @@ -333,6 +333,14 @@ impl oio::Write for TracingWrapper { self.inner.write(bs).await } + #[tracing::instrument( + parent = &self.span, + level = "trace", + skip_all)] + async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> { + self.inner.sink(size, s).await + } + #[tracing::instrument( parent = &self.span, level = "trace", diff --git a/core/src/raw/adapters/kv/backend.rs b/core/src/raw/adapters/kv/backend.rs index 36ca960525a..a53eb3937cf 100644 --- a/core/src/raw/adapters/kv/backend.rs +++ b/core/src/raw/adapters/kv/backend.rs @@ -325,6 +325,13 @@ impl oio::Write for KvWriter { Ok(()) } + async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> { + Err(Error::new( + ErrorKind::Unsupported, + "Write::sink is not supported", + )) + } + async fn abort(&mut self) -> Result<()> { Err(Error::new( ErrorKind::Unsupported, diff --git a/core/src/raw/adapters/typed_kv/backend.rs b/core/src/raw/adapters/typed_kv/backend.rs index 2085a23d70a..7f73dff0f2e 100644 --- a/core/src/raw/adapters/typed_kv/backend.rs +++ b/core/src/raw/adapters/typed_kv/backend.rs @@ -338,6 +338,13 @@ impl oio::Write for KvWriter { Ok(()) } + async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> { + Err(Error::new( + ErrorKind::Unsupported, + "Write::sink is not supported", + )) + } + async fn abort(&mut self) -> Result<()> { self.buf.clear(); diff --git a/core/src/raw/http_util/body.rs b/core/src/raw/http_util/body.rs index d9937ac25f5..bcbde6db866 100644 --- a/core/src/raw/http_util/body.rs +++ b/core/src/raw/http_util/body.rs @@ -25,7 +25,6 @@ use std::task::Poll; use bytes::Buf; use bytes::BufMut; use bytes::Bytes; -use futures::Stream; use futures::StreamExt; use crate::raw::*; @@ -41,10 +40,10 @@ pub enum AsyncBody { Empty, /// Body with bytes. Bytes(Bytes), + /// Body with stream. + Stream(oio::Streamer), } -type BytesStream = Box> + Send + Sync + Unpin>; - /// IncomingAsyncBody carries the content returned by remote servers. /// /// # Notes @@ -58,7 +57,7 @@ pub struct IncomingAsyncBody { /// /// After [TAIT](https://rust-lang.github.io/rfcs/2515-type_alias_impl_trait.html) /// has been stable, we can change `IncomingAsyncBody` into `IncomingAsyncBody`. - inner: BytesStream, + inner: oio::Streamer, size: Option, consumed: u64, chunk: Option, @@ -66,7 +65,7 @@ pub struct IncomingAsyncBody { impl IncomingAsyncBody { /// Construct a new incoming async body - pub fn new(s: BytesStream, size: Option) -> Self { + pub fn new(s: oio::Streamer, size: Option) -> Self { Self { inner: s, size, diff --git a/core/src/raw/http_util/client.rs b/core/src/raw/http_util/client.rs index d2f050f5ed6..22a84c7c884 100644 --- a/core/src/raw/http_util/client.rs +++ b/core/src/raw/http_util/client.rs @@ -27,6 +27,7 @@ use http::Response; use super::body::IncomingAsyncBody; use super::parse_content_length; use super::AsyncBody; +use crate::raw::oio::into_stream; use crate::Error; use crate::ErrorKind; use crate::Result; @@ -93,6 +94,7 @@ impl HttpClient { req_builder = match body { AsyncBody::Empty => req_builder.body(reqwest::Body::from("")), AsyncBody::Bytes(bs) => req_builder.body(reqwest::Body::from(bs)), + AsyncBody::Stream(s) => req_builder.body(reqwest::Body::wrap_stream(s)), }; let mut resp = req_builder.send().await.map_err(|err| { @@ -143,7 +145,10 @@ impl HttpClient { .set_source(err) }); - let body = IncomingAsyncBody::new(Box::new(stream), content_length); + let body = IncomingAsyncBody::new( + Box::new(into_stream::from_futures_stream(stream)), + content_length, + ); let resp = hr.body(body).expect("response must build succeed"); diff --git a/core/src/raw/http_util/error.rs b/core/src/raw/http_util/error.rs index 421970f66cd..3fed55c5e09 100644 --- a/core/src/raw/http_util/error.rs +++ b/core/src/raw/http_util/error.rs @@ -98,7 +98,7 @@ pub fn new_request_build_error(err: http::Error) -> Error { pub fn new_request_credential_error(err: anyhow::Error) -> Error { Error::new( ErrorKind::Unexpected, - "loading credentail to sign http request", + "loading credential to sign http request", ) .set_temporary() .with_operation("reqsign::LoadCredential") diff --git a/core/src/raw/http_util/multipart.rs b/core/src/raw/http_util/multipart.rs index d27829e7894..d9c932cdea6 100644 --- a/core/src/raw/http_util/multipart.rs +++ b/core/src/raw/http_util/multipart.rs @@ -38,6 +38,7 @@ use http::Version; use super::new_request_build_error; use super::AsyncBody; use super::IncomingAsyncBody; +use crate::raw::oio::into_stream; use crate::*; /// Multipart is a builder for multipart/form-data. @@ -250,7 +251,7 @@ pub struct MixedPart { } impl MixedPart { - /// Create a new mixed part with gien uri. + /// Create a new mixed part with given uri. pub fn new(uri: &str) -> Self { let mut part_headers = HeaderMap::new(); part_headers.insert(CONTENT_TYPE, "application/http".parse().unwrap()); @@ -283,6 +284,7 @@ impl MixedPart { let content = match body { AsyncBody::Empty => Bytes::new(), AsyncBody::Bytes(bs) => bs, + AsyncBody::Stream(_) => panic!("multipart request can't contain stream body"), }; Self { @@ -317,8 +319,10 @@ impl MixedPart { let bs: Bytes = self.content; let length = bs.len(); - let body = - IncomingAsyncBody::new(Box::new(stream::iter(vec![Ok(bs)])), Some(length as u64)); + let body = IncomingAsyncBody::new( + Box::new(into_stream::from_futures_stream(stream::iter(vec![Ok(bs)]))), + Some(length as u64), + ); builder .body(body) diff --git a/core/src/raw/oio/into_stream/from_futures_stream.rs b/core/src/raw/oio/into_stream/from_futures_stream.rs new file mode 100644 index 00000000000..cd014cc909f --- /dev/null +++ b/core/src/raw/oio/into_stream/from_futures_stream.rs @@ -0,0 +1,45 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::task::{Context, Poll}; + +use bytes::Bytes; +use futures::TryStreamExt; + +use crate::raw::*; +use crate::*; + +/// Convert given futures stream into [`oio::Stream`]. +pub fn from_futures_stream(stream: S) -> FromFuturesStream +where + S: futures::Stream> + Send + Sync + Unpin, +{ + FromFuturesStream { inner: stream } +} + +pub struct FromFuturesStream { + inner: S, +} + +impl oio::Stream for FromFuturesStream +where + S: futures::Stream> + Send + Sync + Unpin, +{ + fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll>> { + self.inner.try_poll_next_unpin(cx) + } +} diff --git a/core/src/raw/oio/into_stream/mod.rs b/core/src/raw/oio/into_stream/mod.rs new file mode 100644 index 00000000000..8bf6bd78e3f --- /dev/null +++ b/core/src/raw/oio/into_stream/mod.rs @@ -0,0 +1,22 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! into_stream will provide different implementations to convert into +//! [`oio::Stream`][crate::raw::oio::Stream] + +mod from_futures_stream; +pub use from_futures_stream::from_futures_stream; diff --git a/core/src/raw/oio/mod.rs b/core/src/raw/oio/mod.rs index ce15d4f1a50..fb1d31efda8 100644 --- a/core/src/raw/oio/mod.rs +++ b/core/src/raw/oio/mod.rs @@ -30,9 +30,8 @@ pub use read::ReadExt; pub use read::ReadOperation; pub use read::Reader; -pub mod into_reader; - pub mod into_blocking_reader; +pub mod into_reader; mod write; pub use write::BlockingWrite; @@ -46,6 +45,12 @@ pub use append::Append; pub use append::AppendOperation; pub use append::Appender; +mod stream; +pub use stream::Stream; +pub use stream::Streamer; + +pub mod into_stream; + mod cursor; pub use cursor::Cursor; pub use cursor::VectorCursor; diff --git a/core/src/raw/oio/stream.rs b/core/src/raw/oio/stream.rs new file mode 100644 index 00000000000..39a1fd1e4f5 --- /dev/null +++ b/core/src/raw/oio/stream.rs @@ -0,0 +1,62 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use bytes::Bytes; + +use crate::*; +use std::task::{Context, Poll}; + +/// Streamer is a type erased [`Stream`]. +pub type Streamer = Box; + +/// Stream is the trait that OpenDAL accepts for sinking data. +/// +/// It's nearly the same with [`futures::Stream`], but it satisfied +/// `Unpin` + `Send` + `Sync`. And the item is `Result`. +pub trait Stream: Unpin + Send + Sync { + /// Poll next item `Result` from the stream. + fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll>>; +} + +impl Stream for () { + fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll>> { + let _ = cx; + + unimplemented!("poll_next is required to be implemented for oio::Stream") + } +} + +/// `Box` won't implement `Stream` automatically. +/// To make Streamer work as expected, we must add this impl. +impl Stream for Box { + fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll>> { + (**self).poll_next(cx) + } +} + +impl futures::Stream for dyn Stream { + type Item = Result; + + fn poll_next( + mut self: std::pin::Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + let this: &mut dyn Stream = &mut *self; + + this.poll_next(cx) + } +} diff --git a/core/src/raw/oio/write.rs b/core/src/raw/oio/write.rs index 15f7b63c576..f2bb025afc8 100644 --- a/core/src/raw/oio/write.rs +++ b/core/src/raw/oio/write.rs @@ -21,6 +21,7 @@ use std::fmt::Formatter; use async_trait::async_trait; use bytes::Bytes; +use crate::raw::*; use crate::*; /// WriteOperation is the name for APIs of Writer. @@ -29,6 +30,8 @@ use crate::*; pub enum WriteOperation { /// Operation for [`Write::write`] Write, + /// Operation for [`Write::sink`] + Sink, /// Operation for [`Write::abort`] Abort, /// Operation for [`Write::close`] @@ -58,6 +61,7 @@ impl From for &'static str { match v { Write => "Writer::write", + Sink => "Writer::sink", Abort => "Writer::abort", Close => "Writer::close", BlockingWrite => "BlockingWriter::write", @@ -83,7 +87,7 @@ pub type Writer = Box; /// the whole data. #[async_trait] pub trait Write: Unpin + Send + Sync { - /// Write given into writer. + /// Write given bytes into writer. /// /// # Notes /// @@ -93,6 +97,9 @@ pub trait Write: Unpin + Send + Sync { /// Please make sure `write` is safe to re-enter. async fn write(&mut self, bs: Bytes) -> Result<()>; + /// Sink given stream into writer. + async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()>; + /// Abort the pending writer. async fn abort(&mut self) -> Result<()>; @@ -108,6 +115,13 @@ impl Write for () { unimplemented!("write is required to be implemented for oio::Write") } + async fn sink(&mut self, _: u64, _: oio::Streamer) -> Result<()> { + Err(Error::new( + ErrorKind::Unsupported, + "output writer doesn't support sink", + )) + } + async fn abort(&mut self) -> Result<()> { Err(Error::new( ErrorKind::Unsupported, @@ -132,6 +146,10 @@ impl Write for Box { (**self).write(bs).await } + async fn sink(&mut self, n: u64, s: oio::Streamer) -> Result<()> { + (**self).sink(n, s).await + } + async fn abort(&mut self) -> Result<()> { (**self).abort().await } diff --git a/core/src/services/azblob/writer.rs b/core/src/services/azblob/writer.rs index 2b26d3290d6..b751f55909f 100644 --- a/core/src/services/azblob/writer.rs +++ b/core/src/services/azblob/writer.rs @@ -65,6 +65,13 @@ impl oio::Write for AzblobWriter { } } + async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> { + Err(Error::new( + ErrorKind::Unsupported, + "Write::sink is not supported", + )) + } + async fn abort(&mut self) -> Result<()> { Ok(()) } diff --git a/core/src/services/azdfs/writer.rs b/core/src/services/azdfs/writer.rs index 1cdbf9dba98..3c8db1ac1df 100644 --- a/core/src/services/azdfs/writer.rs +++ b/core/src/services/azdfs/writer.rs @@ -86,6 +86,13 @@ impl oio::Write for AzdfsWriter { } } + async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> { + Err(Error::new( + ErrorKind::Unsupported, + "Write::sink is not supported", + )) + } + async fn abort(&mut self) -> Result<()> { Ok(()) } diff --git a/core/src/services/cos/writer.rs b/core/src/services/cos/writer.rs index 59fb5602211..d826f754d08 100644 --- a/core/src/services/cos/writer.rs +++ b/core/src/services/cos/writer.rs @@ -65,6 +65,13 @@ impl oio::Write for CosWriter { } } + async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> { + Err(Error::new( + ErrorKind::Unsupported, + "Write::sink is not supported", + )) + } + async fn abort(&mut self) -> Result<()> { Ok(()) } diff --git a/core/src/services/fs/writer.rs b/core/src/services/fs/writer.rs index 03d6d8247f3..afa02fa2e0f 100644 --- a/core/src/services/fs/writer.rs +++ b/core/src/services/fs/writer.rs @@ -64,6 +64,13 @@ impl oio::Write for FsWriter { Ok(()) } + async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> { + Err(Error::new( + ErrorKind::Unsupported, + "Write::sink is not supported", + )) + } + async fn abort(&mut self) -> Result<()> { Err(Error::new( ErrorKind::Unsupported, diff --git a/core/src/services/ftp/writer.rs b/core/src/services/ftp/writer.rs index 653dc003df7..cd4ba0f6a49 100644 --- a/core/src/services/ftp/writer.rs +++ b/core/src/services/ftp/writer.rs @@ -53,6 +53,13 @@ impl oio::Write for FtpWriter { Ok(()) } + async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> { + Err(Error::new( + ErrorKind::Unsupported, + "Write::sink is not supported", + )) + } + async fn abort(&mut self) -> Result<()> { Ok(()) } diff --git a/core/src/services/gcs/writer.rs b/core/src/services/gcs/writer.rs index dd56ae24611..858c65d932b 100644 --- a/core/src/services/gcs/writer.rs +++ b/core/src/services/gcs/writer.rs @@ -162,6 +162,13 @@ impl oio::Write for GcsWriter { } } + async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> { + Err(Error::new( + ErrorKind::Unsupported, + "Write::sink is not supported", + )) + } + async fn abort(&mut self) -> Result<()> { let location = if let Some(location) = &self.location { location diff --git a/core/src/services/gdrive/writer.rs b/core/src/services/gdrive/writer.rs index d791b04ad0c..ba8f20b8f63 100644 --- a/core/src/services/gdrive/writer.rs +++ b/core/src/services/gdrive/writer.rs @@ -62,6 +62,13 @@ impl oio::Write for GdriveWriter { } } + async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> { + Err(Error::new( + ErrorKind::Unsupported, + "Write::sink is not supported", + )) + } + async fn abort(&mut self) -> Result<()> { Ok(()) } diff --git a/core/src/services/ghac/writer.rs b/core/src/services/ghac/writer.rs index 8ff12c8bc6f..b2f9599476e 100644 --- a/core/src/services/ghac/writer.rs +++ b/core/src/services/ghac/writer.rs @@ -62,6 +62,13 @@ impl oio::Write for GhacWriter { } } + async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> { + Err(Error::new( + ErrorKind::Unsupported, + "Write::sink is not supported", + )) + } + async fn abort(&mut self) -> Result<()> { Ok(()) } diff --git a/core/src/services/hdfs/writer.rs b/core/src/services/hdfs/writer.rs index 9ddd34f8a09..23c5f1d6827 100644 --- a/core/src/services/hdfs/writer.rs +++ b/core/src/services/hdfs/writer.rs @@ -56,6 +56,13 @@ impl oio::Write for HdfsWriter { Ok(()) } + async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> { + Err(Error::new( + ErrorKind::Unsupported, + "Write::sink is not supported", + )) + } + async fn abort(&mut self) -> Result<()> { Err(Error::new( ErrorKind::Unsupported, diff --git a/core/src/services/ipmfs/writer.rs b/core/src/services/ipmfs/writer.rs index f704dcd0ebf..52847814269 100644 --- a/core/src/services/ipmfs/writer.rs +++ b/core/src/services/ipmfs/writer.rs @@ -52,6 +52,13 @@ impl oio::Write for IpmfsWriter { } } + async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> { + Err(Error::new( + ErrorKind::Unsupported, + "Write::sink is not supported", + )) + } + async fn abort(&mut self) -> Result<()> { Ok(()) } diff --git a/core/src/services/obs/writer.rs b/core/src/services/obs/writer.rs index 247955f6747..60535e8f944 100644 --- a/core/src/services/obs/writer.rs +++ b/core/src/services/obs/writer.rs @@ -65,6 +65,13 @@ impl oio::Write for ObsWriter { } } + async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> { + Err(Error::new( + ErrorKind::Unsupported, + "Write::sink is not supported", + )) + } + async fn abort(&mut self) -> Result<()> { Ok(()) } diff --git a/core/src/services/onedrive/writer.rs b/core/src/services/onedrive/writer.rs index fd7d5507b13..6201308474f 100644 --- a/core/src/services/onedrive/writer.rs +++ b/core/src/services/onedrive/writer.rs @@ -56,6 +56,13 @@ impl oio::Write for OneDriveWriter { } } + async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> { + Err(Error::new( + ErrorKind::Unsupported, + "Write::sink is not supported", + )) + } + async fn abort(&mut self) -> Result<()> { Ok(()) } diff --git a/core/src/services/oss/writer.rs b/core/src/services/oss/writer.rs index b861458d3e3..f7cc7b7e878 100644 --- a/core/src/services/oss/writer.rs +++ b/core/src/services/oss/writer.rs @@ -174,6 +174,13 @@ impl oio::Write for OssWriter { } } + async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> { + Err(Error::new( + ErrorKind::Unsupported, + "Write::sink is not supported", + )) + } + // TODO: we can cancel the upload by sending an abort request. async fn abort(&mut self) -> Result<()> { Err(Error::new( diff --git a/core/src/services/s3/backend.rs b/core/src/services/s3/backend.rs index 056db3e59c6..26da170ebaf 100644 --- a/core/src/services/s3/backend.rs +++ b/core/src/services/s3/backend.rs @@ -414,7 +414,7 @@ impl S3Builder { } /// Allow anonymous will allow opendal to send request without signing - /// when credentail is not loaded. + /// when credential is not loaded. pub fn allow_anonymous(&mut self) -> &mut Self { self.allow_anonymous = true; self diff --git a/core/src/services/s3/writer.rs b/core/src/services/s3/writer.rs index 325d2ed57fc..320a9ec0ea8 100644 --- a/core/src/services/s3/writer.rs +++ b/core/src/services/s3/writer.rs @@ -192,6 +192,13 @@ impl oio::Write for S3Writer { } } + async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> { + Err(Error::new( + ErrorKind::Unsupported, + "Write::sink is not supported", + )) + } + async fn abort(&mut self) -> Result<()> { let upload_id = if let Some(upload_id) = &self.upload_id { upload_id diff --git a/core/src/services/sftp/writer.rs b/core/src/services/sftp/writer.rs index 5fa1c17d481..7007d062b2a 100644 --- a/core/src/services/sftp/writer.rs +++ b/core/src/services/sftp/writer.rs @@ -42,6 +42,13 @@ impl oio::Write for SftpWriter { Ok(()) } + async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> { + Err(Error::new( + ErrorKind::Unsupported, + "Write::sink is not supported", + )) + } + async fn abort(&mut self) -> Result<()> { Err(Error::new( ErrorKind::Unsupported, diff --git a/core/src/services/supabase/writer.rs b/core/src/services/supabase/writer.rs index 7075670d3dd..f4c27131313 100644 --- a/core/src/services/supabase/writer.rs +++ b/core/src/services/supabase/writer.rs @@ -75,6 +75,13 @@ impl oio::Write for SupabaseWriter { self.upload(bs).await } + async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> { + Err(Error::new( + ErrorKind::Unsupported, + "Write::sink is not supported", + )) + } + async fn abort(&mut self) -> Result<()> { Err(Error::new( ErrorKind::Unsupported, diff --git a/core/src/services/vercel_artifacts/writer.rs b/core/src/services/vercel_artifacts/writer.rs index e95a86d0913..6f32d67ba50 100644 --- a/core/src/services/vercel_artifacts/writer.rs +++ b/core/src/services/vercel_artifacts/writer.rs @@ -60,6 +60,13 @@ impl oio::Write for VercelArtifactsWriter { } } + async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> { + Err(Error::new( + ErrorKind::Unsupported, + "Write::sink is not supported", + )) + } + async fn abort(&mut self) -> Result<()> { Ok(()) } diff --git a/core/src/services/wasabi/writer.rs b/core/src/services/wasabi/writer.rs index 46d5e74a66a..689c334dcc8 100644 --- a/core/src/services/wasabi/writer.rs +++ b/core/src/services/wasabi/writer.rs @@ -63,6 +63,13 @@ impl oio::Write for WasabiWriter { } } + async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> { + Err(Error::new( + ErrorKind::Unsupported, + "Write::sink is not supported", + )) + } + async fn abort(&mut self) -> Result<()> { Ok(()) } diff --git a/core/src/services/webdav/writer.rs b/core/src/services/webdav/writer.rs index dce14d2b235..5ccccba5794 100644 --- a/core/src/services/webdav/writer.rs +++ b/core/src/services/webdav/writer.rs @@ -62,6 +62,13 @@ impl oio::Write for WebdavWriter { } } + async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> { + Err(Error::new( + ErrorKind::Unsupported, + "Write::sink is not supported", + )) + } + async fn abort(&mut self) -> Result<()> { Ok(()) } diff --git a/core/src/services/webhdfs/error.rs b/core/src/services/webhdfs/error.rs index 56ea0f5b786..222b23e357f 100644 --- a/core/src/services/webhdfs/error.rs +++ b/core/src/services/webhdfs/error.rs @@ -80,6 +80,8 @@ mod tests { use futures::stream; use serde_json::from_reader; + use crate::raw::oio::into_stream; + use super::*; /// Error response example from https://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-hdfs/WebHDFS.html#Error%20Responses @@ -97,7 +99,12 @@ mod tests { } "#, ); - let body = IncomingAsyncBody::new(Box::new(stream::iter(vec![Ok(ill_args.clone())])), None); + let body = IncomingAsyncBody::new( + Box::new(into_stream::from_futures_stream(stream::iter(vec![Ok( + ill_args.clone(), + )]))), + None, + ); let resp = Response::builder() .status(StatusCode::BAD_REQUEST) .body(body) diff --git a/core/src/services/webhdfs/writer.rs b/core/src/services/webhdfs/writer.rs index 110f06442e3..97eef2e3d25 100644 --- a/core/src/services/webhdfs/writer.rs +++ b/core/src/services/webhdfs/writer.rs @@ -62,6 +62,13 @@ impl oio::Write for WebhdfsWriter { } } + async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> { + Err(Error::new( + ErrorKind::Unsupported, + "Write::sink is not supported", + )) + } + async fn abort(&mut self) -> Result<()> { Ok(()) }