From 925454e0f2dba053af296b7034f622bb301dca93 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Fri, 26 May 2023 14:41:25 +0800 Subject: [PATCH 1/5] Save work Signed-off-by: Xuanwo --- core/src/raw/oio/write.rs | 25 +++++++++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/core/src/raw/oio/write.rs b/core/src/raw/oio/write.rs index 15f7b63c576..24c0924a148 100644 --- a/core/src/raw/oio/write.rs +++ b/core/src/raw/oio/write.rs @@ -93,6 +93,12 @@ pub trait Write: Unpin + Send + Sync { /// Please make sure `write` is safe to re-enter. async fn write(&mut self, bs: Bytes) -> Result<()>; + async fn sink( + &mut self, + size: u64, + s: Box> + Send>, + ) -> Result<()>; + /// Abort the pending writer. async fn abort(&mut self) -> Result<()>; @@ -108,6 +114,17 @@ impl Write for () { unimplemented!("write is required to be implemented for oio::Write") } + async fn sink( + &mut self, + _: u64, + _: Box> + Send>, + ) -> Result<()> { + Err(Error::new( + ErrorKind::Unsupported, + "output writer doesn't support sink", + )) + } + async fn abort(&mut self) -> Result<()> { Err(Error::new( ErrorKind::Unsupported, @@ -132,6 +149,14 @@ impl Write for Box { (**self).write(bs).await } + async fn sink( + &mut self, + n: u64, + s: Box> + Send>, + ) -> Result<()> { + (**self).sink(n, s).await + } + async fn abort(&mut self) -> Result<()> { (**self).abort().await } From bbc23d0c5b2019723ef3fa002f16c1211b2762d6 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Thu, 8 Jun 2023 21:35:10 +0800 Subject: [PATCH 2/5] Add core api Signed-off-by: Xuanwo --- core/src/layers/complete.rs | 25 ++++++++++++++ core/src/layers/concurrent_limit.rs | 8 +++++ core/src/layers/error_context.rs | 12 +++++++ core/src/layers/logging.rs | 36 ++++++++++++++++++++ core/src/layers/madsim.rs | 11 ++++++ core/src/layers/metrics.rs | 15 ++++++++ core/src/layers/minitrace.rs | 14 ++++++++ core/src/layers/oteltrace.rs | 8 +++++ core/src/layers/prometheus.rs | 20 +++++++++++ core/src/layers/retry.rs | 9 +++++ core/src/layers/timeout.rs | 17 +++++++++ core/src/layers/tracing.rs | 12 +++++++ core/src/raw/adapters/kv/backend.rs | 11 ++++++ core/src/raw/adapters/typed_kv/backend.rs | 11 ++++++ core/src/raw/oio/write.rs | 10 ++++-- core/src/services/azblob/writer.rs | 11 ++++++ core/src/services/azdfs/writer.rs | 11 ++++++ core/src/services/cos/writer.rs | 11 ++++++ core/src/services/fs/writer.rs | 11 ++++++ core/src/services/ftp/writer.rs | 11 ++++++ core/src/services/gcs/writer.rs | 11 ++++++ core/src/services/gdrive/writer.rs | 11 ++++++ core/src/services/ghac/writer.rs | 11 ++++++ core/src/services/hdfs/writer.rs | 11 ++++++ core/src/services/ipmfs/writer.rs | 11 ++++++ core/src/services/obs/writer.rs | 11 ++++++ core/src/services/onedrive/writer.rs | 11 ++++++ core/src/services/oss/writer.rs | 11 ++++++ core/src/services/s3/writer.rs | 11 ++++++ core/src/services/sftp/writer.rs | 11 ++++++ core/src/services/supabase/writer.rs | 11 ++++++ core/src/services/vercel_artifacts/writer.rs | 11 ++++++ core/src/services/wasabi/writer.rs | 11 ++++++ core/src/services/webdav/writer.rs | 11 ++++++ core/src/services/webhdfs/writer.rs | 11 ++++++ 35 files changed, 436 insertions(+), 3 deletions(-) diff --git a/core/src/layers/complete.rs b/core/src/layers/complete.rs index 92e7c8147e8..a147ab76702 100644 --- a/core/src/layers/complete.rs +++ b/core/src/layers/complete.rs @@ -568,6 +568,31 @@ where Ok(()) } + async fn sink( + &mut self, + size: u64, + s: Box> + Send>, + ) -> Result<()> { + if let Some(total_size) = self.size { + if self.written + size > total_size { + return Err(Error::new( + ErrorKind::ContentTruncated, + &format!( + "writer got too much data, expect: {size}, actual: {}", + self.written + size + ), + )); + } + } + + let w = self.inner.as_mut().ok_or_else(|| { + Error::new(ErrorKind::Unexpected, "writer has been closed or aborted") + })?; + w.sink(size, s).await?; + self.written += size; + Ok(()) + } + async fn abort(&mut self) -> Result<()> { let w = self.inner.as_mut().ok_or_else(|| { Error::new(ErrorKind::Unexpected, "writer has been closed or aborted") diff --git a/core/src/layers/concurrent_limit.rs b/core/src/layers/concurrent_limit.rs index 30003785e83..5e9c924a376 100644 --- a/core/src/layers/concurrent_limit.rs +++ b/core/src/layers/concurrent_limit.rs @@ -308,6 +308,14 @@ impl oio::Write for ConcurrentLimitWrapper { self.inner.abort().await } + async fn sink( + &mut self, + size: u64, + s: Box> + Send>, + ) -> Result<()> { + self.inner.sink(size, s).await + } + async fn close(&mut self) -> Result<()> { self.inner.close().await } diff --git a/core/src/layers/error_context.rs b/core/src/layers/error_context.rs index d7c604d907f..c5013803358 100644 --- a/core/src/layers/error_context.rs +++ b/core/src/layers/error_context.rs @@ -442,6 +442,18 @@ impl oio::Write for ErrorContextWrapper { }) } + async fn sink( + &mut self, + size: u64, + s: Box> + Send>, + ) -> Result<()> { + self.inner.sink(size, s).await.map_err(|err| { + err.with_operation(WriteOperation::Sink) + .with_context("service", self.scheme) + .with_context("path", &self.path) + }) + } + async fn close(&mut self) -> Result<()> { self.inner.close().await.map_err(|err| { err.with_operation(WriteOperation::Close) diff --git a/core/src/layers/logging.rs b/core/src/layers/logging.rs index 6f372c96f71..8afdb6b0799 100644 --- a/core/src/layers/logging.rs +++ b/core/src/layers/logging.rs @@ -1343,6 +1343,42 @@ impl oio::Write for LoggingWriter { } } + async fn sink( + &mut self, + size: u64, + s: Box> + Send>, + ) -> Result<()> { + match self.inner.sink(size, s).await { + Ok(_) => { + self.written += size; + trace!( + target: LOGGING_TARGET, + "service={} operation={} path={} written={} -> data sink {}B", + self.scheme, + WriteOperation::Sink, + self.path, + self.written, + size + ); + Ok(()) + } + Err(err) => { + if let Some(lvl) = self.failure_level { + log!( + target: LOGGING_TARGET, + lvl, + "service={} operation={} path={} written={} -> data sink failed: {err:?}", + self.scheme, + WriteOperation::Sink, + self.path, + self.written, + ) + } + Err(err) + } + } + } + async fn abort(&mut self) -> Result<()> { match self.inner.abort().await { Ok(_) => { diff --git a/core/src/layers/madsim.rs b/core/src/layers/madsim.rs index 49794fd6b58..fe304153f4b 100644 --- a/core/src/layers/madsim.rs +++ b/core/src/layers/madsim.rs @@ -330,6 +330,17 @@ impl oio::Write for MadsimWriter { } } + async fn sink( + &mut self, + size: u64, + s: Box> + Send>, + ) -> crate::Result<()> { + Err(Error::new( + ErrorKind::Unsupported, + "will be supported in the future", + )) + } + async fn abort(&mut self) -> crate::Result<()> { Err(Error::new( ErrorKind::Unsupported, diff --git a/core/src/layers/metrics.rs b/core/src/layers/metrics.rs index 1cda0e8efc2..a8f0b9e7b59 100644 --- a/core/src/layers/metrics.rs +++ b/core/src/layers/metrics.rs @@ -864,6 +864,21 @@ impl oio::Write for MetricWrapper { }) } + async fn sink( + &mut self, + size: u64, + s: Box> + Send>, + ) -> Result<()> { + self.inner + .sink(size, s) + .await + .map(|_| self.bytes += size) + .map_err(|err| { + self.handle.increment_errors_total(self.op, err.kind()); + err + }) + } + async fn abort(&mut self) -> Result<()> { self.inner.abort().await.map_err(|err| { self.handle.increment_errors_total(self.op, err.kind()); diff --git a/core/src/layers/minitrace.rs b/core/src/layers/minitrace.rs index c7a8d883690..3ad037acca1 100644 --- a/core/src/layers/minitrace.rs +++ b/core/src/layers/minitrace.rs @@ -323,6 +323,20 @@ impl oio::Write for MinitraceWrapper { .await } + async fn sink( + &mut self, + size: u64, + s: Box> + Send>, + ) -> Result<()> { + self.inner + .sink(size, s) + .in_span(Span::enter_with_parent( + WriteOperation::Sink.into_static(), + &self.span, + )) + .await + } + async fn abort(&mut self) -> Result<()> { self.inner .abort() diff --git a/core/src/layers/oteltrace.rs b/core/src/layers/oteltrace.rs index ba17cd25340..dbebe1b53c5 100644 --- a/core/src/layers/oteltrace.rs +++ b/core/src/layers/oteltrace.rs @@ -322,6 +322,14 @@ impl oio::Write for OtelTraceWrapper { self.inner.write(bs).await } + async fn sink( + &mut self, + size: u64, + s: Box> + Send>, + ) -> Result<()> { + self.inner.sink(size, s).await + } + async fn abort(&mut self) -> Result<()> { self.inner.abort().await } diff --git a/core/src/layers/prometheus.rs b/core/src/layers/prometheus.rs index 1a5842d2334..f230c0fce48 100644 --- a/core/src/layers/prometheus.rs +++ b/core/src/layers/prometheus.rs @@ -684,6 +684,26 @@ impl oio::Write for PrometheusMetricWrapper { }) } + async fn sink( + &mut self, + size: u64, + s: Box> + Send>, + ) -> Result<()> { + self.inner + .sink(size, s) + .await + .map(|_| { + self.stats + .bytes_total + .with_label_values(&[&self.scheme, Operation::Write.into_static()]) + .observe(size as f64) + }) + .map_err(|err| { + self.stats.increment_errors_total(self.op, err.kind()); + err + }) + } + async fn abort(&mut self) -> Result<()> { self.inner.abort().await.map_err(|err| { self.stats.increment_errors_total(self.op, err.kind()); diff --git a/core/src/layers/retry.rs b/core/src/layers/retry.rs index cfafa35f090..2d2a3f8eac9 100644 --- a/core/src/layers/retry.rs +++ b/core/src/layers/retry.rs @@ -648,6 +648,15 @@ impl oio::Write for RetryWrapper { } } + /// Sink will move the input stream, so we can't retry it. + async fn sink( + &mut self, + size: u64, + s: Box> + Send>, + ) -> Result<()> { + self.inner.sink(size, s).await + } + async fn abort(&mut self) -> Result<()> { let mut backoff = self.builder.build(); diff --git a/core/src/layers/timeout.rs b/core/src/layers/timeout.rs index d93047e8ac8..a60bc4080dc 100644 --- a/core/src/layers/timeout.rs +++ b/core/src/layers/timeout.rs @@ -349,6 +349,23 @@ impl oio::Write for TimeoutWrapper { })? } + async fn sink( + &mut self, + size: u64, + s: Box> + Send>, + ) -> Result<()> { + let timeout = self.io_timeout(size); + + tokio::time::timeout(timeout, self.inner.sink(size, s)) + .await + .map_err(|_| { + Error::new(ErrorKind::Unexpected, "operation timeout") + .with_operation(WriteOperation::Sink) + .with_context("timeout", timeout.as_secs_f64().to_string()) + .set_temporary() + })? + } + async fn abort(&mut self) -> Result<()> { tokio::time::timeout(self.timeout, self.inner.abort()) .await diff --git a/core/src/layers/tracing.rs b/core/src/layers/tracing.rs index cc9fa34e320..7b4f95eb0b7 100644 --- a/core/src/layers/tracing.rs +++ b/core/src/layers/tracing.rs @@ -333,6 +333,18 @@ impl oio::Write for TracingWrapper { self.inner.write(bs).await } + #[tracing::instrument( + parent = &self.span, + level = "trace", + skip_all)] + async fn sink( + &mut self, + size: u64, + s: Box> + Send>, + ) -> Result<()> { + self.inner.sink(size, s).await + } + #[tracing::instrument( parent = &self.span, level = "trace", diff --git a/core/src/raw/adapters/kv/backend.rs b/core/src/raw/adapters/kv/backend.rs index 36ca960525a..39ac54ed128 100644 --- a/core/src/raw/adapters/kv/backend.rs +++ b/core/src/raw/adapters/kv/backend.rs @@ -325,6 +325,17 @@ impl oio::Write for KvWriter { Ok(()) } + async fn sink( + &mut self, + _size: u64, + _s: Box> + Send>, + ) -> Result<()> { + Err(Error::new( + ErrorKind::Unsupported, + "Write::sink is not supported", + )) + } + async fn abort(&mut self) -> Result<()> { Err(Error::new( ErrorKind::Unsupported, diff --git a/core/src/raw/adapters/typed_kv/backend.rs b/core/src/raw/adapters/typed_kv/backend.rs index 2085a23d70a..0ba413c7d32 100644 --- a/core/src/raw/adapters/typed_kv/backend.rs +++ b/core/src/raw/adapters/typed_kv/backend.rs @@ -338,6 +338,17 @@ impl oio::Write for KvWriter { Ok(()) } + async fn sink( + &mut self, + _size: u64, + _s: Box> + Send>, + ) -> Result<()> { + Err(Error::new( + ErrorKind::Unsupported, + "Write::sink is not supported", + )) + } + async fn abort(&mut self) -> Result<()> { self.buf.clear(); diff --git a/core/src/raw/oio/write.rs b/core/src/raw/oio/write.rs index 24c0924a148..fe14712e8ea 100644 --- a/core/src/raw/oio/write.rs +++ b/core/src/raw/oio/write.rs @@ -29,6 +29,8 @@ use crate::*; pub enum WriteOperation { /// Operation for [`Write::write`] Write, + /// Operation for [`Write::sink`] + Sink, /// Operation for [`Write::abort`] Abort, /// Operation for [`Write::close`] @@ -58,6 +60,7 @@ impl From for &'static str { match v { Write => "Writer::write", + Sink => "Writer::sink", Abort => "Writer::abort", Close => "Writer::close", BlockingWrite => "BlockingWriter::write", @@ -83,7 +86,7 @@ pub type Writer = Box; /// the whole data. #[async_trait] pub trait Write: Unpin + Send + Sync { - /// Write given into writer. + /// Write given bytes into writer. /// /// # Notes /// @@ -93,10 +96,11 @@ pub trait Write: Unpin + Send + Sync { /// Please make sure `write` is safe to re-enter. async fn write(&mut self, bs: Bytes) -> Result<()>; + /// Sink given stream into writer. async fn sink( &mut self, - size: u64, - s: Box> + Send>, + _size: u64, + _s: Box> + Send>, ) -> Result<()>; /// Abort the pending writer. diff --git a/core/src/services/azblob/writer.rs b/core/src/services/azblob/writer.rs index 2b26d3290d6..cd7003d8919 100644 --- a/core/src/services/azblob/writer.rs +++ b/core/src/services/azblob/writer.rs @@ -65,6 +65,17 @@ impl oio::Write for AzblobWriter { } } + async fn sink( + &mut self, + _size: u64, + _s: Box> + Send>, + ) -> Result<()> { + Err(Error::new( + ErrorKind::Unsupported, + "Write::sink is not supported", + )) + } + async fn abort(&mut self) -> Result<()> { Ok(()) } diff --git a/core/src/services/azdfs/writer.rs b/core/src/services/azdfs/writer.rs index 1cdbf9dba98..6e16e264422 100644 --- a/core/src/services/azdfs/writer.rs +++ b/core/src/services/azdfs/writer.rs @@ -86,6 +86,17 @@ impl oio::Write for AzdfsWriter { } } + async fn sink( + &mut self, + _size: u64, + _s: Box> + Send>, + ) -> Result<()> { + Err(Error::new( + ErrorKind::Unsupported, + "Write::sink is not supported", + )) + } + async fn abort(&mut self) -> Result<()> { Ok(()) } diff --git a/core/src/services/cos/writer.rs b/core/src/services/cos/writer.rs index 59fb5602211..d7429d42753 100644 --- a/core/src/services/cos/writer.rs +++ b/core/src/services/cos/writer.rs @@ -65,6 +65,17 @@ impl oio::Write for CosWriter { } } + async fn sink( + &mut self, + _size: u64, + _s: Box> + Send>, + ) -> Result<()> { + Err(Error::new( + ErrorKind::Unsupported, + "Write::sink is not supported", + )) + } + async fn abort(&mut self) -> Result<()> { Ok(()) } diff --git a/core/src/services/fs/writer.rs b/core/src/services/fs/writer.rs index 03d6d8247f3..35386a09fc6 100644 --- a/core/src/services/fs/writer.rs +++ b/core/src/services/fs/writer.rs @@ -64,6 +64,17 @@ impl oio::Write for FsWriter { Ok(()) } + async fn sink( + &mut self, + _size: u64, + _s: Box> + Send>, + ) -> Result<()> { + Err(Error::new( + ErrorKind::Unsupported, + "Write::sink is not supported", + )) + } + async fn abort(&mut self) -> Result<()> { Err(Error::new( ErrorKind::Unsupported, diff --git a/core/src/services/ftp/writer.rs b/core/src/services/ftp/writer.rs index 653dc003df7..8ea440bb71a 100644 --- a/core/src/services/ftp/writer.rs +++ b/core/src/services/ftp/writer.rs @@ -53,6 +53,17 @@ impl oio::Write for FtpWriter { Ok(()) } + async fn sink( + &mut self, + _size: u64, + _s: Box> + Send>, + ) -> Result<()> { + Err(Error::new( + ErrorKind::Unsupported, + "Write::sink is not supported", + )) + } + async fn abort(&mut self) -> Result<()> { Ok(()) } diff --git a/core/src/services/gcs/writer.rs b/core/src/services/gcs/writer.rs index dd56ae24611..1ade48c622b 100644 --- a/core/src/services/gcs/writer.rs +++ b/core/src/services/gcs/writer.rs @@ -162,6 +162,17 @@ impl oio::Write for GcsWriter { } } + async fn sink( + &mut self, + _size: u64, + _s: Box> + Send>, + ) -> Result<()> { + Err(Error::new( + ErrorKind::Unsupported, + "Write::sink is not supported", + )) + } + async fn abort(&mut self) -> Result<()> { let location = if let Some(location) = &self.location { location diff --git a/core/src/services/gdrive/writer.rs b/core/src/services/gdrive/writer.rs index d791b04ad0c..dc17f57bb21 100644 --- a/core/src/services/gdrive/writer.rs +++ b/core/src/services/gdrive/writer.rs @@ -62,6 +62,17 @@ impl oio::Write for GdriveWriter { } } + async fn sink( + &mut self, + _size: u64, + _s: Box> + Send>, + ) -> Result<()> { + Err(Error::new( + ErrorKind::Unsupported, + "Write::sink is not supported", + )) + } + async fn abort(&mut self) -> Result<()> { Ok(()) } diff --git a/core/src/services/ghac/writer.rs b/core/src/services/ghac/writer.rs index 8ff12c8bc6f..584c4378146 100644 --- a/core/src/services/ghac/writer.rs +++ b/core/src/services/ghac/writer.rs @@ -62,6 +62,17 @@ impl oio::Write for GhacWriter { } } + async fn sink( + &mut self, + _size: u64, + _s: Box> + Send>, + ) -> Result<()> { + Err(Error::new( + ErrorKind::Unsupported, + "Write::sink is not supported", + )) + } + async fn abort(&mut self) -> Result<()> { Ok(()) } diff --git a/core/src/services/hdfs/writer.rs b/core/src/services/hdfs/writer.rs index 9ddd34f8a09..9161705541e 100644 --- a/core/src/services/hdfs/writer.rs +++ b/core/src/services/hdfs/writer.rs @@ -56,6 +56,17 @@ impl oio::Write for HdfsWriter { Ok(()) } + async fn sink( + &mut self, + _size: u64, + _s: Box> + Send>, + ) -> Result<()> { + Err(Error::new( + ErrorKind::Unsupported, + "Write::sink is not supported", + )) + } + async fn abort(&mut self) -> Result<()> { Err(Error::new( ErrorKind::Unsupported, diff --git a/core/src/services/ipmfs/writer.rs b/core/src/services/ipmfs/writer.rs index f704dcd0ebf..8fd642d6009 100644 --- a/core/src/services/ipmfs/writer.rs +++ b/core/src/services/ipmfs/writer.rs @@ -52,6 +52,17 @@ impl oio::Write for IpmfsWriter { } } + async fn sink( + &mut self, + _size: u64, + _s: Box> + Send>, + ) -> Result<()> { + Err(Error::new( + ErrorKind::Unsupported, + "Write::sink is not supported", + )) + } + async fn abort(&mut self) -> Result<()> { Ok(()) } diff --git a/core/src/services/obs/writer.rs b/core/src/services/obs/writer.rs index 247955f6747..4ded9ead461 100644 --- a/core/src/services/obs/writer.rs +++ b/core/src/services/obs/writer.rs @@ -65,6 +65,17 @@ impl oio::Write for ObsWriter { } } + async fn sink( + &mut self, + _size: u64, + _s: Box> + Send>, + ) -> Result<()> { + Err(Error::new( + ErrorKind::Unsupported, + "Write::sink is not supported", + )) + } + async fn abort(&mut self) -> Result<()> { Ok(()) } diff --git a/core/src/services/onedrive/writer.rs b/core/src/services/onedrive/writer.rs index fd7d5507b13..098f2e168a7 100644 --- a/core/src/services/onedrive/writer.rs +++ b/core/src/services/onedrive/writer.rs @@ -56,6 +56,17 @@ impl oio::Write for OneDriveWriter { } } + async fn sink( + &mut self, + _size: u64, + _s: Box> + Send>, + ) -> Result<()> { + Err(Error::new( + ErrorKind::Unsupported, + "Write::sink is not supported", + )) + } + async fn abort(&mut self) -> Result<()> { Ok(()) } diff --git a/core/src/services/oss/writer.rs b/core/src/services/oss/writer.rs index b861458d3e3..9a5f8509e9f 100644 --- a/core/src/services/oss/writer.rs +++ b/core/src/services/oss/writer.rs @@ -174,6 +174,17 @@ impl oio::Write for OssWriter { } } + async fn sink( + &mut self, + _size: u64, + _s: Box> + Send>, + ) -> Result<()> { + Err(Error::new( + ErrorKind::Unsupported, + "Write::sink is not supported", + )) + } + // TODO: we can cancel the upload by sending an abort request. async fn abort(&mut self) -> Result<()> { Err(Error::new( diff --git a/core/src/services/s3/writer.rs b/core/src/services/s3/writer.rs index 325d2ed57fc..85417b27178 100644 --- a/core/src/services/s3/writer.rs +++ b/core/src/services/s3/writer.rs @@ -192,6 +192,17 @@ impl oio::Write for S3Writer { } } + async fn sink( + &mut self, + _size: u64, + _s: Box> + Send>, + ) -> Result<()> { + Err(Error::new( + ErrorKind::Unsupported, + "Write::sink is not supported", + )) + } + async fn abort(&mut self) -> Result<()> { let upload_id = if let Some(upload_id) = &self.upload_id { upload_id diff --git a/core/src/services/sftp/writer.rs b/core/src/services/sftp/writer.rs index 5fa1c17d481..be3487e5b44 100644 --- a/core/src/services/sftp/writer.rs +++ b/core/src/services/sftp/writer.rs @@ -42,6 +42,17 @@ impl oio::Write for SftpWriter { Ok(()) } + async fn sink( + &mut self, + _size: u64, + _s: Box> + Send>, + ) -> Result<()> { + Err(Error::new( + ErrorKind::Unsupported, + "Write::sink is not supported", + )) + } + async fn abort(&mut self) -> Result<()> { Err(Error::new( ErrorKind::Unsupported, diff --git a/core/src/services/supabase/writer.rs b/core/src/services/supabase/writer.rs index 7075670d3dd..dd697bf384b 100644 --- a/core/src/services/supabase/writer.rs +++ b/core/src/services/supabase/writer.rs @@ -75,6 +75,17 @@ impl oio::Write for SupabaseWriter { self.upload(bs).await } + async fn sink( + &mut self, + _size: u64, + _s: Box> + Send>, + ) -> Result<()> { + Err(Error::new( + ErrorKind::Unsupported, + "Write::sink is not supported", + )) + } + async fn abort(&mut self) -> Result<()> { Err(Error::new( ErrorKind::Unsupported, diff --git a/core/src/services/vercel_artifacts/writer.rs b/core/src/services/vercel_artifacts/writer.rs index e95a86d0913..50ec452eab0 100644 --- a/core/src/services/vercel_artifacts/writer.rs +++ b/core/src/services/vercel_artifacts/writer.rs @@ -60,6 +60,17 @@ impl oio::Write for VercelArtifactsWriter { } } + async fn sink( + &mut self, + _size: u64, + _s: Box> + Send>, + ) -> Result<()> { + Err(Error::new( + ErrorKind::Unsupported, + "Write::sink is not supported", + )) + } + async fn abort(&mut self) -> Result<()> { Ok(()) } diff --git a/core/src/services/wasabi/writer.rs b/core/src/services/wasabi/writer.rs index 46d5e74a66a..8e93a2f49cf 100644 --- a/core/src/services/wasabi/writer.rs +++ b/core/src/services/wasabi/writer.rs @@ -63,6 +63,17 @@ impl oio::Write for WasabiWriter { } } + async fn sink( + &mut self, + _size: u64, + _s: Box> + Send>, + ) -> Result<()> { + Err(Error::new( + ErrorKind::Unsupported, + "Write::sink is not supported", + )) + } + async fn abort(&mut self) -> Result<()> { Ok(()) } diff --git a/core/src/services/webdav/writer.rs b/core/src/services/webdav/writer.rs index dce14d2b235..fae8408a1eb 100644 --- a/core/src/services/webdav/writer.rs +++ b/core/src/services/webdav/writer.rs @@ -62,6 +62,17 @@ impl oio::Write for WebdavWriter { } } + async fn sink( + &mut self, + _size: u64, + _s: Box> + Send>, + ) -> Result<()> { + Err(Error::new( + ErrorKind::Unsupported, + "Write::sink is not supported", + )) + } + async fn abort(&mut self) -> Result<()> { Ok(()) } diff --git a/core/src/services/webhdfs/writer.rs b/core/src/services/webhdfs/writer.rs index 110f06442e3..ba822411f68 100644 --- a/core/src/services/webhdfs/writer.rs +++ b/core/src/services/webhdfs/writer.rs @@ -62,6 +62,17 @@ impl oio::Write for WebhdfsWriter { } } + async fn sink( + &mut self, + _size: u64, + _s: Box> + Send>, + ) -> Result<()> { + Err(Error::new( + ErrorKind::Unsupported, + "Write::sink is not supported", + )) + } + async fn abort(&mut self) -> Result<()> { Ok(()) } From 1ebbfbe8eb8c188bfe57d2b784c64a8cc6fbf86d Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Fri, 9 Jun 2023 12:00:29 +0800 Subject: [PATCH 3/5] Save work Signed-off-by: Xuanwo --- core/src/raw/http_util/body.rs | 6 +- core/src/raw/http_util/client.rs | 1 + core/src/raw/http_util/multipart.rs | 1 + core/src/raw/oio/into_stream/from_response.rs | 33 +++++++++++ core/src/raw/oio/into_stream/mod.rs | 18 ++++++ core/src/raw/oio/mod.rs | 9 ++- core/src/raw/oio/stream.rs | 58 +++++++++++++++++++ 7 files changed, 122 insertions(+), 4 deletions(-) create mode 100644 core/src/raw/oio/into_stream/from_response.rs create mode 100644 core/src/raw/oio/into_stream/mod.rs create mode 100644 core/src/raw/oio/stream.rs diff --git a/core/src/raw/http_util/body.rs b/core/src/raw/http_util/body.rs index d9937ac25f5..e1e9a3de074 100644 --- a/core/src/raw/http_util/body.rs +++ b/core/src/raw/http_util/body.rs @@ -41,6 +41,8 @@ pub enum AsyncBody { Empty, /// Body with bytes. Bytes(Bytes), + /// Body with stream. + Stream(Box> + Send + Sync + Unpin>), } type BytesStream = Box> + Send + Sync + Unpin>; @@ -58,7 +60,7 @@ pub struct IncomingAsyncBody { /// /// After [TAIT](https://rust-lang.github.io/rfcs/2515-type_alias_impl_trait.html) /// has been stable, we can change `IncomingAsyncBody` into `IncomingAsyncBody`. - inner: BytesStream, + inner: oio::Streamer, size: Option, consumed: u64, chunk: Option, @@ -66,7 +68,7 @@ pub struct IncomingAsyncBody { impl IncomingAsyncBody { /// Construct a new incoming async body - pub fn new(s: BytesStream, size: Option) -> Self { + pub fn new(s: oio::Streamer, size: Option) -> Self { Self { inner: s, size, diff --git a/core/src/raw/http_util/client.rs b/core/src/raw/http_util/client.rs index d2f050f5ed6..68bd0977ea5 100644 --- a/core/src/raw/http_util/client.rs +++ b/core/src/raw/http_util/client.rs @@ -93,6 +93,7 @@ impl HttpClient { req_builder = match body { AsyncBody::Empty => req_builder.body(reqwest::Body::from("")), AsyncBody::Bytes(bs) => req_builder.body(reqwest::Body::from(bs)), + AsyncBody::Stream(s) => req_builder.body(reqwest::Body::wrap_stream(s)), }; let mut resp = req_builder.send().await.map_err(|err| { diff --git a/core/src/raw/http_util/multipart.rs b/core/src/raw/http_util/multipart.rs index d27829e7894..a0f8c5dd4b2 100644 --- a/core/src/raw/http_util/multipart.rs +++ b/core/src/raw/http_util/multipart.rs @@ -283,6 +283,7 @@ impl MixedPart { let content = match body { AsyncBody::Empty => Bytes::new(), AsyncBody::Bytes(bs) => bs, + AsyncBody::Stream(_) => panic!("multipart request can't contain stream body"), }; Self { diff --git a/core/src/raw/oio/into_stream/from_response.rs b/core/src/raw/oio/into_stream/from_response.rs new file mode 100644 index 00000000000..6274b1bddb4 --- /dev/null +++ b/core/src/raw/oio/into_stream/from_response.rs @@ -0,0 +1,33 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::task::{Context, Poll}; + +use bytes::Bytes; + +use crate::raw::*; +use crate::*; + +pub struct ResponseStream { + inner: reqwest::Response, +} + +impl oio::Stream for ResponseStream { + fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll>> { + self.inner.poll_next(cx) + } +} diff --git a/core/src/raw/oio/into_stream/mod.rs b/core/src/raw/oio/into_stream/mod.rs new file mode 100644 index 00000000000..e3010591dbd --- /dev/null +++ b/core/src/raw/oio/into_stream/mod.rs @@ -0,0 +1,18 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +mod from_response; diff --git a/core/src/raw/oio/mod.rs b/core/src/raw/oio/mod.rs index ce15d4f1a50..fb1d31efda8 100644 --- a/core/src/raw/oio/mod.rs +++ b/core/src/raw/oio/mod.rs @@ -30,9 +30,8 @@ pub use read::ReadExt; pub use read::ReadOperation; pub use read::Reader; -pub mod into_reader; - pub mod into_blocking_reader; +pub mod into_reader; mod write; pub use write::BlockingWrite; @@ -46,6 +45,12 @@ pub use append::Append; pub use append::AppendOperation; pub use append::Appender; +mod stream; +pub use stream::Stream; +pub use stream::Streamer; + +pub mod into_stream; + mod cursor; pub use cursor::Cursor; pub use cursor::VectorCursor; diff --git a/core/src/raw/oio/stream.rs b/core/src/raw/oio/stream.rs new file mode 100644 index 00000000000..a7068d3df89 --- /dev/null +++ b/core/src/raw/oio/stream.rs @@ -0,0 +1,58 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use bytes::Bytes; + +use crate::raw::*; +use crate::*; +use std::task::{Context, Poll}; + +/// Streamer is a type erased [`Stream`]. +pub type Streamer = Box; + +pub trait Stream: Unpin + Send + Sync { + fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll>>; +} + +impl Stream for () { + fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll>> { + let _ = cx; + + unimplemented!("poll_next is required to be implemented for oio::Stream") + } +} + +/// `Box` won't implement `Stream` automatically. +/// To make Streamer work as expected, we must add this impl. +impl Stream for Box { + fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll>> { + (**self).poll_next(cx) + } +} + +impl futures::Stream for dyn Stream { + type Item = Result; + + fn poll_next( + mut self: std::pin::Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + let this: &mut dyn Stream = &mut *self; + + this.poll_next(cx) + } +} From f1f29c83a4da046d5f20853022b78bcc5ce3aaf1 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Fri, 9 Jun 2023 16:14:04 +0800 Subject: [PATCH 4/5] Save work Signed-off-by: Xuanwo --- core/src/layers/complete.rs | 6 +----- core/src/layers/concurrent_limit.rs | 6 +----- core/src/layers/error_context.rs | 6 +----- core/src/layers/logging.rs | 6 +----- core/src/layers/madsim.rs | 6 +----- core/src/layers/metrics.rs | 6 +----- core/src/layers/minitrace.rs | 6 +----- core/src/layers/oteltrace.rs | 6 +----- core/src/layers/prometheus.rs | 6 +----- core/src/layers/retry.rs | 6 +----- core/src/layers/timeout.rs | 6 +----- core/src/layers/tracing.rs | 6 +----- core/src/raw/adapters/kv/backend.rs | 6 +----- core/src/raw/adapters/typed_kv/backend.rs | 6 +----- core/src/raw/http_util/body.rs | 5 +---- core/src/raw/http_util/client.rs | 6 +++++- core/src/raw/http_util/multipart.rs | 7 +++++-- ...rom_response.rs => from_futures_stream.rs} | 20 +++++++++++++++---- core/src/raw/oio/into_stream/mod.rs | 6 +++++- core/src/raw/oio/stream.rs | 6 +++++- core/src/raw/oio/write.rs | 19 ++++-------------- core/src/services/azblob/writer.rs | 6 +----- core/src/services/azdfs/writer.rs | 6 +----- core/src/services/cos/writer.rs | 6 +----- core/src/services/fs/writer.rs | 6 +----- core/src/services/ftp/writer.rs | 6 +----- core/src/services/gcs/writer.rs | 6 +----- core/src/services/gdrive/writer.rs | 6 +----- core/src/services/ghac/writer.rs | 6 +----- core/src/services/hdfs/writer.rs | 6 +----- core/src/services/ipmfs/writer.rs | 6 +----- core/src/services/obs/writer.rs | 6 +----- core/src/services/onedrive/writer.rs | 6 +----- core/src/services/oss/writer.rs | 6 +----- core/src/services/s3/writer.rs | 6 +----- core/src/services/sftp/writer.rs | 6 +----- core/src/services/supabase/writer.rs | 6 +----- core/src/services/vercel_artifacts/writer.rs | 6 +----- core/src/services/wasabi/writer.rs | 6 +----- core/src/services/webdav/writer.rs | 6 +----- core/src/services/webhdfs/error.rs | 9 ++++++++- core/src/services/webhdfs/writer.rs | 6 +----- 42 files changed, 83 insertions(+), 199 deletions(-) rename core/src/raw/oio/into_stream/{from_response.rs => from_futures_stream.rs} (67%) diff --git a/core/src/layers/complete.rs b/core/src/layers/complete.rs index a147ab76702..491da8427d3 100644 --- a/core/src/layers/complete.rs +++ b/core/src/layers/complete.rs @@ -568,11 +568,7 @@ where Ok(()) } - async fn sink( - &mut self, - size: u64, - s: Box> + Send>, - ) -> Result<()> { + async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> { if let Some(total_size) = self.size { if self.written + size > total_size { return Err(Error::new( diff --git a/core/src/layers/concurrent_limit.rs b/core/src/layers/concurrent_limit.rs index 5e9c924a376..1e41030cd0d 100644 --- a/core/src/layers/concurrent_limit.rs +++ b/core/src/layers/concurrent_limit.rs @@ -308,11 +308,7 @@ impl oio::Write for ConcurrentLimitWrapper { self.inner.abort().await } - async fn sink( - &mut self, - size: u64, - s: Box> + Send>, - ) -> Result<()> { + async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> { self.inner.sink(size, s).await } diff --git a/core/src/layers/error_context.rs b/core/src/layers/error_context.rs index c5013803358..3e702417339 100644 --- a/core/src/layers/error_context.rs +++ b/core/src/layers/error_context.rs @@ -442,11 +442,7 @@ impl oio::Write for ErrorContextWrapper { }) } - async fn sink( - &mut self, - size: u64, - s: Box> + Send>, - ) -> Result<()> { + async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> { self.inner.sink(size, s).await.map_err(|err| { err.with_operation(WriteOperation::Sink) .with_context("service", self.scheme) diff --git a/core/src/layers/logging.rs b/core/src/layers/logging.rs index 8afdb6b0799..4ce842af5a7 100644 --- a/core/src/layers/logging.rs +++ b/core/src/layers/logging.rs @@ -1343,11 +1343,7 @@ impl oio::Write for LoggingWriter { } } - async fn sink( - &mut self, - size: u64, - s: Box> + Send>, - ) -> Result<()> { + async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> { match self.inner.sink(size, s).await { Ok(_) => { self.written += size; diff --git a/core/src/layers/madsim.rs b/core/src/layers/madsim.rs index fe304153f4b..40b18d78580 100644 --- a/core/src/layers/madsim.rs +++ b/core/src/layers/madsim.rs @@ -330,11 +330,7 @@ impl oio::Write for MadsimWriter { } } - async fn sink( - &mut self, - size: u64, - s: Box> + Send>, - ) -> crate::Result<()> { + async fn sink(&mut self, size: u64, s: oio::Streamer) -> crate::Result<()> { Err(Error::new( ErrorKind::Unsupported, "will be supported in the future", diff --git a/core/src/layers/metrics.rs b/core/src/layers/metrics.rs index a8f0b9e7b59..e3d0c539310 100644 --- a/core/src/layers/metrics.rs +++ b/core/src/layers/metrics.rs @@ -864,11 +864,7 @@ impl oio::Write for MetricWrapper { }) } - async fn sink( - &mut self, - size: u64, - s: Box> + Send>, - ) -> Result<()> { + async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> { self.inner .sink(size, s) .await diff --git a/core/src/layers/minitrace.rs b/core/src/layers/minitrace.rs index 3ad037acca1..c6084673158 100644 --- a/core/src/layers/minitrace.rs +++ b/core/src/layers/minitrace.rs @@ -323,11 +323,7 @@ impl oio::Write for MinitraceWrapper { .await } - async fn sink( - &mut self, - size: u64, - s: Box> + Send>, - ) -> Result<()> { + async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> { self.inner .sink(size, s) .in_span(Span::enter_with_parent( diff --git a/core/src/layers/oteltrace.rs b/core/src/layers/oteltrace.rs index dbebe1b53c5..c5536201937 100644 --- a/core/src/layers/oteltrace.rs +++ b/core/src/layers/oteltrace.rs @@ -322,11 +322,7 @@ impl oio::Write for OtelTraceWrapper { self.inner.write(bs).await } - async fn sink( - &mut self, - size: u64, - s: Box> + Send>, - ) -> Result<()> { + async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> { self.inner.sink(size, s).await } diff --git a/core/src/layers/prometheus.rs b/core/src/layers/prometheus.rs index f230c0fce48..2ac2d47744a 100644 --- a/core/src/layers/prometheus.rs +++ b/core/src/layers/prometheus.rs @@ -684,11 +684,7 @@ impl oio::Write for PrometheusMetricWrapper { }) } - async fn sink( - &mut self, - size: u64, - s: Box> + Send>, - ) -> Result<()> { + async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> { self.inner .sink(size, s) .await diff --git a/core/src/layers/retry.rs b/core/src/layers/retry.rs index 2d2a3f8eac9..55e81980745 100644 --- a/core/src/layers/retry.rs +++ b/core/src/layers/retry.rs @@ -649,11 +649,7 @@ impl oio::Write for RetryWrapper { } /// Sink will move the input stream, so we can't retry it. - async fn sink( - &mut self, - size: u64, - s: Box> + Send>, - ) -> Result<()> { + async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> { self.inner.sink(size, s).await } diff --git a/core/src/layers/timeout.rs b/core/src/layers/timeout.rs index a60bc4080dc..fb6967bd5b8 100644 --- a/core/src/layers/timeout.rs +++ b/core/src/layers/timeout.rs @@ -349,11 +349,7 @@ impl oio::Write for TimeoutWrapper { })? } - async fn sink( - &mut self, - size: u64, - s: Box> + Send>, - ) -> Result<()> { + async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> { let timeout = self.io_timeout(size); tokio::time::timeout(timeout, self.inner.sink(size, s)) diff --git a/core/src/layers/tracing.rs b/core/src/layers/tracing.rs index 7b4f95eb0b7..4191f70edb8 100644 --- a/core/src/layers/tracing.rs +++ b/core/src/layers/tracing.rs @@ -337,11 +337,7 @@ impl oio::Write for TracingWrapper { parent = &self.span, level = "trace", skip_all)] - async fn sink( - &mut self, - size: u64, - s: Box> + Send>, - ) -> Result<()> { + async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> { self.inner.sink(size, s).await } diff --git a/core/src/raw/adapters/kv/backend.rs b/core/src/raw/adapters/kv/backend.rs index 39ac54ed128..a53eb3937cf 100644 --- a/core/src/raw/adapters/kv/backend.rs +++ b/core/src/raw/adapters/kv/backend.rs @@ -325,11 +325,7 @@ impl oio::Write for KvWriter { Ok(()) } - async fn sink( - &mut self, - _size: u64, - _s: Box> + Send>, - ) -> Result<()> { + async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> { Err(Error::new( ErrorKind::Unsupported, "Write::sink is not supported", diff --git a/core/src/raw/adapters/typed_kv/backend.rs b/core/src/raw/adapters/typed_kv/backend.rs index 0ba413c7d32..7f73dff0f2e 100644 --- a/core/src/raw/adapters/typed_kv/backend.rs +++ b/core/src/raw/adapters/typed_kv/backend.rs @@ -338,11 +338,7 @@ impl oio::Write for KvWriter { Ok(()) } - async fn sink( - &mut self, - _size: u64, - _s: Box> + Send>, - ) -> Result<()> { + async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> { Err(Error::new( ErrorKind::Unsupported, "Write::sink is not supported", diff --git a/core/src/raw/http_util/body.rs b/core/src/raw/http_util/body.rs index e1e9a3de074..bcbde6db866 100644 --- a/core/src/raw/http_util/body.rs +++ b/core/src/raw/http_util/body.rs @@ -25,7 +25,6 @@ use std::task::Poll; use bytes::Buf; use bytes::BufMut; use bytes::Bytes; -use futures::Stream; use futures::StreamExt; use crate::raw::*; @@ -42,11 +41,9 @@ pub enum AsyncBody { /// Body with bytes. Bytes(Bytes), /// Body with stream. - Stream(Box> + Send + Sync + Unpin>), + Stream(oio::Streamer), } -type BytesStream = Box> + Send + Sync + Unpin>; - /// IncomingAsyncBody carries the content returned by remote servers. /// /// # Notes diff --git a/core/src/raw/http_util/client.rs b/core/src/raw/http_util/client.rs index 68bd0977ea5..22a84c7c884 100644 --- a/core/src/raw/http_util/client.rs +++ b/core/src/raw/http_util/client.rs @@ -27,6 +27,7 @@ use http::Response; use super::body::IncomingAsyncBody; use super::parse_content_length; use super::AsyncBody; +use crate::raw::oio::into_stream; use crate::Error; use crate::ErrorKind; use crate::Result; @@ -144,7 +145,10 @@ impl HttpClient { .set_source(err) }); - let body = IncomingAsyncBody::new(Box::new(stream), content_length); + let body = IncomingAsyncBody::new( + Box::new(into_stream::from_futures_stream(stream)), + content_length, + ); let resp = hr.body(body).expect("response must build succeed"); diff --git a/core/src/raw/http_util/multipart.rs b/core/src/raw/http_util/multipart.rs index a0f8c5dd4b2..f9051cd7dd5 100644 --- a/core/src/raw/http_util/multipart.rs +++ b/core/src/raw/http_util/multipart.rs @@ -38,6 +38,7 @@ use http::Version; use super::new_request_build_error; use super::AsyncBody; use super::IncomingAsyncBody; +use crate::raw::oio::into_stream; use crate::*; /// Multipart is a builder for multipart/form-data. @@ -318,8 +319,10 @@ impl MixedPart { let bs: Bytes = self.content; let length = bs.len(); - let body = - IncomingAsyncBody::new(Box::new(stream::iter(vec![Ok(bs)])), Some(length as u64)); + let body = IncomingAsyncBody::new( + Box::new(into_stream::from_futures_stream(stream::iter(vec![Ok(bs)]))), + Some(length as u64), + ); builder .body(body) diff --git a/core/src/raw/oio/into_stream/from_response.rs b/core/src/raw/oio/into_stream/from_futures_stream.rs similarity index 67% rename from core/src/raw/oio/into_stream/from_response.rs rename to core/src/raw/oio/into_stream/from_futures_stream.rs index 6274b1bddb4..cd014cc909f 100644 --- a/core/src/raw/oio/into_stream/from_response.rs +++ b/core/src/raw/oio/into_stream/from_futures_stream.rs @@ -18,16 +18,28 @@ use std::task::{Context, Poll}; use bytes::Bytes; +use futures::TryStreamExt; use crate::raw::*; use crate::*; -pub struct ResponseStream { - inner: reqwest::Response, +/// Convert given futures stream into [`oio::Stream`]. +pub fn from_futures_stream(stream: S) -> FromFuturesStream +where + S: futures::Stream> + Send + Sync + Unpin, +{ + FromFuturesStream { inner: stream } } -impl oio::Stream for ResponseStream { +pub struct FromFuturesStream { + inner: S, +} + +impl oio::Stream for FromFuturesStream +where + S: futures::Stream> + Send + Sync + Unpin, +{ fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll>> { - self.inner.poll_next(cx) + self.inner.try_poll_next_unpin(cx) } } diff --git a/core/src/raw/oio/into_stream/mod.rs b/core/src/raw/oio/into_stream/mod.rs index e3010591dbd..8bf6bd78e3f 100644 --- a/core/src/raw/oio/into_stream/mod.rs +++ b/core/src/raw/oio/into_stream/mod.rs @@ -15,4 +15,8 @@ // specific language governing permissions and limitations // under the License. -mod from_response; +//! into_stream will provide different implementations to convert into +//! [`oio::Stream`][crate::raw::oio::Stream] + +mod from_futures_stream; +pub use from_futures_stream::from_futures_stream; diff --git a/core/src/raw/oio/stream.rs b/core/src/raw/oio/stream.rs index a7068d3df89..677591199bb 100644 --- a/core/src/raw/oio/stream.rs +++ b/core/src/raw/oio/stream.rs @@ -17,14 +17,18 @@ use bytes::Bytes; -use crate::raw::*; use crate::*; use std::task::{Context, Poll}; /// Streamer is a type erased [`Stream`]. pub type Streamer = Box; +/// Stream is the trait that OpenDAL accepts for sinking data. +/// +/// It's nearly the same with [`futures::Stream`], but it satified +/// `Unpin` + `Send` + `Sync`. And the item is `Result`. pub trait Stream: Unpin + Send + Sync { + /// Poll next item `Result` from the stream. fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll>>; } diff --git a/core/src/raw/oio/write.rs b/core/src/raw/oio/write.rs index fe14712e8ea..f2bb025afc8 100644 --- a/core/src/raw/oio/write.rs +++ b/core/src/raw/oio/write.rs @@ -21,6 +21,7 @@ use std::fmt::Formatter; use async_trait::async_trait; use bytes::Bytes; +use crate::raw::*; use crate::*; /// WriteOperation is the name for APIs of Writer. @@ -97,11 +98,7 @@ pub trait Write: Unpin + Send + Sync { async fn write(&mut self, bs: Bytes) -> Result<()>; /// Sink given stream into writer. - async fn sink( - &mut self, - _size: u64, - _s: Box> + Send>, - ) -> Result<()>; + async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()>; /// Abort the pending writer. async fn abort(&mut self) -> Result<()>; @@ -118,11 +115,7 @@ impl Write for () { unimplemented!("write is required to be implemented for oio::Write") } - async fn sink( - &mut self, - _: u64, - _: Box> + Send>, - ) -> Result<()> { + async fn sink(&mut self, _: u64, _: oio::Streamer) -> Result<()> { Err(Error::new( ErrorKind::Unsupported, "output writer doesn't support sink", @@ -153,11 +146,7 @@ impl Write for Box { (**self).write(bs).await } - async fn sink( - &mut self, - n: u64, - s: Box> + Send>, - ) -> Result<()> { + async fn sink(&mut self, n: u64, s: oio::Streamer) -> Result<()> { (**self).sink(n, s).await } diff --git a/core/src/services/azblob/writer.rs b/core/src/services/azblob/writer.rs index cd7003d8919..b751f55909f 100644 --- a/core/src/services/azblob/writer.rs +++ b/core/src/services/azblob/writer.rs @@ -65,11 +65,7 @@ impl oio::Write for AzblobWriter { } } - async fn sink( - &mut self, - _size: u64, - _s: Box> + Send>, - ) -> Result<()> { + async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> { Err(Error::new( ErrorKind::Unsupported, "Write::sink is not supported", diff --git a/core/src/services/azdfs/writer.rs b/core/src/services/azdfs/writer.rs index 6e16e264422..3c8db1ac1df 100644 --- a/core/src/services/azdfs/writer.rs +++ b/core/src/services/azdfs/writer.rs @@ -86,11 +86,7 @@ impl oio::Write for AzdfsWriter { } } - async fn sink( - &mut self, - _size: u64, - _s: Box> + Send>, - ) -> Result<()> { + async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> { Err(Error::new( ErrorKind::Unsupported, "Write::sink is not supported", diff --git a/core/src/services/cos/writer.rs b/core/src/services/cos/writer.rs index d7429d42753..d826f754d08 100644 --- a/core/src/services/cos/writer.rs +++ b/core/src/services/cos/writer.rs @@ -65,11 +65,7 @@ impl oio::Write for CosWriter { } } - async fn sink( - &mut self, - _size: u64, - _s: Box> + Send>, - ) -> Result<()> { + async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> { Err(Error::new( ErrorKind::Unsupported, "Write::sink is not supported", diff --git a/core/src/services/fs/writer.rs b/core/src/services/fs/writer.rs index 35386a09fc6..afa02fa2e0f 100644 --- a/core/src/services/fs/writer.rs +++ b/core/src/services/fs/writer.rs @@ -64,11 +64,7 @@ impl oio::Write for FsWriter { Ok(()) } - async fn sink( - &mut self, - _size: u64, - _s: Box> + Send>, - ) -> Result<()> { + async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> { Err(Error::new( ErrorKind::Unsupported, "Write::sink is not supported", diff --git a/core/src/services/ftp/writer.rs b/core/src/services/ftp/writer.rs index 8ea440bb71a..cd4ba0f6a49 100644 --- a/core/src/services/ftp/writer.rs +++ b/core/src/services/ftp/writer.rs @@ -53,11 +53,7 @@ impl oio::Write for FtpWriter { Ok(()) } - async fn sink( - &mut self, - _size: u64, - _s: Box> + Send>, - ) -> Result<()> { + async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> { Err(Error::new( ErrorKind::Unsupported, "Write::sink is not supported", diff --git a/core/src/services/gcs/writer.rs b/core/src/services/gcs/writer.rs index 1ade48c622b..858c65d932b 100644 --- a/core/src/services/gcs/writer.rs +++ b/core/src/services/gcs/writer.rs @@ -162,11 +162,7 @@ impl oio::Write for GcsWriter { } } - async fn sink( - &mut self, - _size: u64, - _s: Box> + Send>, - ) -> Result<()> { + async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> { Err(Error::new( ErrorKind::Unsupported, "Write::sink is not supported", diff --git a/core/src/services/gdrive/writer.rs b/core/src/services/gdrive/writer.rs index dc17f57bb21..ba8f20b8f63 100644 --- a/core/src/services/gdrive/writer.rs +++ b/core/src/services/gdrive/writer.rs @@ -62,11 +62,7 @@ impl oio::Write for GdriveWriter { } } - async fn sink( - &mut self, - _size: u64, - _s: Box> + Send>, - ) -> Result<()> { + async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> { Err(Error::new( ErrorKind::Unsupported, "Write::sink is not supported", diff --git a/core/src/services/ghac/writer.rs b/core/src/services/ghac/writer.rs index 584c4378146..b2f9599476e 100644 --- a/core/src/services/ghac/writer.rs +++ b/core/src/services/ghac/writer.rs @@ -62,11 +62,7 @@ impl oio::Write for GhacWriter { } } - async fn sink( - &mut self, - _size: u64, - _s: Box> + Send>, - ) -> Result<()> { + async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> { Err(Error::new( ErrorKind::Unsupported, "Write::sink is not supported", diff --git a/core/src/services/hdfs/writer.rs b/core/src/services/hdfs/writer.rs index 9161705541e..23c5f1d6827 100644 --- a/core/src/services/hdfs/writer.rs +++ b/core/src/services/hdfs/writer.rs @@ -56,11 +56,7 @@ impl oio::Write for HdfsWriter { Ok(()) } - async fn sink( - &mut self, - _size: u64, - _s: Box> + Send>, - ) -> Result<()> { + async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> { Err(Error::new( ErrorKind::Unsupported, "Write::sink is not supported", diff --git a/core/src/services/ipmfs/writer.rs b/core/src/services/ipmfs/writer.rs index 8fd642d6009..52847814269 100644 --- a/core/src/services/ipmfs/writer.rs +++ b/core/src/services/ipmfs/writer.rs @@ -52,11 +52,7 @@ impl oio::Write for IpmfsWriter { } } - async fn sink( - &mut self, - _size: u64, - _s: Box> + Send>, - ) -> Result<()> { + async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> { Err(Error::new( ErrorKind::Unsupported, "Write::sink is not supported", diff --git a/core/src/services/obs/writer.rs b/core/src/services/obs/writer.rs index 4ded9ead461..60535e8f944 100644 --- a/core/src/services/obs/writer.rs +++ b/core/src/services/obs/writer.rs @@ -65,11 +65,7 @@ impl oio::Write for ObsWriter { } } - async fn sink( - &mut self, - _size: u64, - _s: Box> + Send>, - ) -> Result<()> { + async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> { Err(Error::new( ErrorKind::Unsupported, "Write::sink is not supported", diff --git a/core/src/services/onedrive/writer.rs b/core/src/services/onedrive/writer.rs index 098f2e168a7..6201308474f 100644 --- a/core/src/services/onedrive/writer.rs +++ b/core/src/services/onedrive/writer.rs @@ -56,11 +56,7 @@ impl oio::Write for OneDriveWriter { } } - async fn sink( - &mut self, - _size: u64, - _s: Box> + Send>, - ) -> Result<()> { + async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> { Err(Error::new( ErrorKind::Unsupported, "Write::sink is not supported", diff --git a/core/src/services/oss/writer.rs b/core/src/services/oss/writer.rs index 9a5f8509e9f..f7cc7b7e878 100644 --- a/core/src/services/oss/writer.rs +++ b/core/src/services/oss/writer.rs @@ -174,11 +174,7 @@ impl oio::Write for OssWriter { } } - async fn sink( - &mut self, - _size: u64, - _s: Box> + Send>, - ) -> Result<()> { + async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> { Err(Error::new( ErrorKind::Unsupported, "Write::sink is not supported", diff --git a/core/src/services/s3/writer.rs b/core/src/services/s3/writer.rs index 85417b27178..320a9ec0ea8 100644 --- a/core/src/services/s3/writer.rs +++ b/core/src/services/s3/writer.rs @@ -192,11 +192,7 @@ impl oio::Write for S3Writer { } } - async fn sink( - &mut self, - _size: u64, - _s: Box> + Send>, - ) -> Result<()> { + async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> { Err(Error::new( ErrorKind::Unsupported, "Write::sink is not supported", diff --git a/core/src/services/sftp/writer.rs b/core/src/services/sftp/writer.rs index be3487e5b44..7007d062b2a 100644 --- a/core/src/services/sftp/writer.rs +++ b/core/src/services/sftp/writer.rs @@ -42,11 +42,7 @@ impl oio::Write for SftpWriter { Ok(()) } - async fn sink( - &mut self, - _size: u64, - _s: Box> + Send>, - ) -> Result<()> { + async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> { Err(Error::new( ErrorKind::Unsupported, "Write::sink is not supported", diff --git a/core/src/services/supabase/writer.rs b/core/src/services/supabase/writer.rs index dd697bf384b..f4c27131313 100644 --- a/core/src/services/supabase/writer.rs +++ b/core/src/services/supabase/writer.rs @@ -75,11 +75,7 @@ impl oio::Write for SupabaseWriter { self.upload(bs).await } - async fn sink( - &mut self, - _size: u64, - _s: Box> + Send>, - ) -> Result<()> { + async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> { Err(Error::new( ErrorKind::Unsupported, "Write::sink is not supported", diff --git a/core/src/services/vercel_artifacts/writer.rs b/core/src/services/vercel_artifacts/writer.rs index 50ec452eab0..6f32d67ba50 100644 --- a/core/src/services/vercel_artifacts/writer.rs +++ b/core/src/services/vercel_artifacts/writer.rs @@ -60,11 +60,7 @@ impl oio::Write for VercelArtifactsWriter { } } - async fn sink( - &mut self, - _size: u64, - _s: Box> + Send>, - ) -> Result<()> { + async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> { Err(Error::new( ErrorKind::Unsupported, "Write::sink is not supported", diff --git a/core/src/services/wasabi/writer.rs b/core/src/services/wasabi/writer.rs index 8e93a2f49cf..689c334dcc8 100644 --- a/core/src/services/wasabi/writer.rs +++ b/core/src/services/wasabi/writer.rs @@ -63,11 +63,7 @@ impl oio::Write for WasabiWriter { } } - async fn sink( - &mut self, - _size: u64, - _s: Box> + Send>, - ) -> Result<()> { + async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> { Err(Error::new( ErrorKind::Unsupported, "Write::sink is not supported", diff --git a/core/src/services/webdav/writer.rs b/core/src/services/webdav/writer.rs index fae8408a1eb..5ccccba5794 100644 --- a/core/src/services/webdav/writer.rs +++ b/core/src/services/webdav/writer.rs @@ -62,11 +62,7 @@ impl oio::Write for WebdavWriter { } } - async fn sink( - &mut self, - _size: u64, - _s: Box> + Send>, - ) -> Result<()> { + async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> { Err(Error::new( ErrorKind::Unsupported, "Write::sink is not supported", diff --git a/core/src/services/webhdfs/error.rs b/core/src/services/webhdfs/error.rs index 56ea0f5b786..222b23e357f 100644 --- a/core/src/services/webhdfs/error.rs +++ b/core/src/services/webhdfs/error.rs @@ -80,6 +80,8 @@ mod tests { use futures::stream; use serde_json::from_reader; + use crate::raw::oio::into_stream; + use super::*; /// Error response example from https://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-hdfs/WebHDFS.html#Error%20Responses @@ -97,7 +99,12 @@ mod tests { } "#, ); - let body = IncomingAsyncBody::new(Box::new(stream::iter(vec![Ok(ill_args.clone())])), None); + let body = IncomingAsyncBody::new( + Box::new(into_stream::from_futures_stream(stream::iter(vec![Ok( + ill_args.clone(), + )]))), + None, + ); let resp = Response::builder() .status(StatusCode::BAD_REQUEST) .body(body) diff --git a/core/src/services/webhdfs/writer.rs b/core/src/services/webhdfs/writer.rs index ba822411f68..97eef2e3d25 100644 --- a/core/src/services/webhdfs/writer.rs +++ b/core/src/services/webhdfs/writer.rs @@ -62,11 +62,7 @@ impl oio::Write for WebhdfsWriter { } } - async fn sink( - &mut self, - _size: u64, - _s: Box> + Send>, - ) -> Result<()> { + async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> { Err(Error::new( ErrorKind::Unsupported, "Write::sink is not supported", From 3d28adad65474460eb5eee4a2dcb8e91fc02fb63 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Fri, 9 Jun 2023 16:17:42 +0800 Subject: [PATCH 5/5] Fix typo Signed-off-by: Xuanwo --- core/src/docs/internals/accessor.rs | 2 +- core/src/raw/http_util/error.rs | 2 +- core/src/raw/http_util/multipart.rs | 2 +- core/src/raw/oio/stream.rs | 2 +- core/src/services/s3/backend.rs | 2 +- 5 files changed, 5 insertions(+), 5 deletions(-) diff --git a/core/src/docs/internals/accessor.rs b/core/src/docs/internals/accessor.rs index d149457efea..b3e8c66384c 100644 --- a/core/src/docs/internals/accessor.rs +++ b/core/src/docs/internals/accessor.rs @@ -155,7 +155,7 @@ //! vendors that provide s3-like RESTful APIs, and our s3 service is //! implemented to support all of them, not just AWS S3. //! -//! Obviously, we can use `duck` as scheme, let's add a new variant in [`Scheme`], and implement all reqired functions like `Scheme::from_str` and `Scheme::into_static`: +//! Obviously, we can use `duck` as scheme, let's add a new variant in [`Scheme`], and implement all required functions like `Scheme::from_str` and `Scheme::into_static`: //! //! ```ignore //! pub enum Scheme { diff --git a/core/src/raw/http_util/error.rs b/core/src/raw/http_util/error.rs index 421970f66cd..3fed55c5e09 100644 --- a/core/src/raw/http_util/error.rs +++ b/core/src/raw/http_util/error.rs @@ -98,7 +98,7 @@ pub fn new_request_build_error(err: http::Error) -> Error { pub fn new_request_credential_error(err: anyhow::Error) -> Error { Error::new( ErrorKind::Unexpected, - "loading credentail to sign http request", + "loading credential to sign http request", ) .set_temporary() .with_operation("reqsign::LoadCredential") diff --git a/core/src/raw/http_util/multipart.rs b/core/src/raw/http_util/multipart.rs index f9051cd7dd5..d9c932cdea6 100644 --- a/core/src/raw/http_util/multipart.rs +++ b/core/src/raw/http_util/multipart.rs @@ -251,7 +251,7 @@ pub struct MixedPart { } impl MixedPart { - /// Create a new mixed part with gien uri. + /// Create a new mixed part with given uri. pub fn new(uri: &str) -> Self { let mut part_headers = HeaderMap::new(); part_headers.insert(CONTENT_TYPE, "application/http".parse().unwrap()); diff --git a/core/src/raw/oio/stream.rs b/core/src/raw/oio/stream.rs index 677591199bb..39a1fd1e4f5 100644 --- a/core/src/raw/oio/stream.rs +++ b/core/src/raw/oio/stream.rs @@ -25,7 +25,7 @@ pub type Streamer = Box; /// Stream is the trait that OpenDAL accepts for sinking data. /// -/// It's nearly the same with [`futures::Stream`], but it satified +/// It's nearly the same with [`futures::Stream`], but it satisfied /// `Unpin` + `Send` + `Sync`. And the item is `Result`. pub trait Stream: Unpin + Send + Sync { /// Poll next item `Result` from the stream. diff --git a/core/src/services/s3/backend.rs b/core/src/services/s3/backend.rs index 056db3e59c6..26da170ebaf 100644 --- a/core/src/services/s3/backend.rs +++ b/core/src/services/s3/backend.rs @@ -414,7 +414,7 @@ impl S3Builder { } /// Allow anonymous will allow opendal to send request without signing - /// when credentail is not loaded. + /// when credential is not loaded. pub fn allow_anonymous(&mut self) -> &mut Self { self.allow_anonymous = true; self