From 60b6f16debc9288de3a73aa08d06d067373f19ff Mon Sep 17 00:00:00 2001 From: Ji-Xinyou Date: Thu, 13 Jul 2023 03:46:36 +0800 Subject: [PATCH 1/3] impl Sink for Fs --- core/src/services/fs/backend.rs | 1 + core/src/services/fs/writer.rs | 13 ++++++++----- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/core/src/services/fs/backend.rs b/core/src/services/fs/backend.rs index 6fcc9ffa614..505fda60d28 100644 --- a/core/src/services/fs/backend.rs +++ b/core/src/services/fs/backend.rs @@ -264,6 +264,7 @@ impl Accessor for FsBackend { read_with_range: true, write: true, + write_can_sink: true, write_without_content_length: true, create_dir: true, delete: true, diff --git a/core/src/services/fs/writer.rs b/core/src/services/fs/writer.rs index afa02fa2e0f..2e6151e115d 100644 --- a/core/src/services/fs/writer.rs +++ b/core/src/services/fs/writer.rs @@ -22,6 +22,7 @@ use std::path::PathBuf; use async_trait::async_trait; use bytes::Bytes; +use futures::StreamExt; use tokio::io::AsyncSeekExt; use tokio::io::AsyncWriteExt; @@ -64,11 +65,13 @@ impl oio::Write for FsWriter { Ok(()) } - async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> { - Err(Error::new( - ErrorKind::Unsupported, - "Write::sink is not supported", - )) + async fn sink(&mut self, _size: u64, mut s: oio::Streamer) -> Result<()> { + while let Some(item) = s.next().await { + let bytes = item?; + self.f.write_all(&bytes).await.map_err(parse_io_error)?; + } + + Ok(()) } async fn abort(&mut self) -> Result<()> { From 73868010654b3dec3509386e94cdf684d8b2c869 Mon Sep 17 00:00:00 2001 From: Ji-Xinyou Date: Thu, 13 Jul 2023 03:59:56 +0800 Subject: [PATCH 2/3] rename --- core/src/services/fs/writer.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/services/fs/writer.rs b/core/src/services/fs/writer.rs index 2e6151e115d..79de8169bfd 100644 --- a/core/src/services/fs/writer.rs +++ b/core/src/services/fs/writer.rs @@ -66,9 +66,9 @@ impl oio::Write for FsWriter { } async fn sink(&mut self, _size: u64, mut s: oio::Streamer) -> Result<()> { - while let Some(item) = s.next().await { - let bytes = item?; - self.f.write_all(&bytes).await.map_err(parse_io_error)?; + while let Some(bytes) = s.next().await { + let b = bytes?; + self.f.write_all(&b).await.map_err(parse_io_error)?; } Ok(()) From 2559434a8f8fb43fdd9770f8cb8fd0b26246bdd2 Mon Sep 17 00:00:00 2001 From: Ji-Xinyou Date: Thu, 13 Jul 2023 15:42:05 +0800 Subject: [PATCH 3/3] upd --- core/src/services/fs/writer.rs | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/core/src/services/fs/writer.rs b/core/src/services/fs/writer.rs index 79de8169bfd..4d31444af61 100644 --- a/core/src/services/fs/writer.rs +++ b/core/src/services/fs/writer.rs @@ -66,9 +66,14 @@ impl oio::Write for FsWriter { } async fn sink(&mut self, _size: u64, mut s: oio::Streamer) -> Result<()> { - while let Some(bytes) = s.next().await { - let b = bytes?; - self.f.write_all(&b).await.map_err(parse_io_error)?; + while let Some(bs) = s.next().await { + let bs = bs?; + self.f + .seek(SeekFrom::Start(self.pos)) + .await + .map_err(parse_io_error)?; + self.f.write_all(&bs).await.map_err(parse_io_error)?; + self.pos += bs.len() as u64; } Ok(())