diff --git a/core/src/services/fs/backend.rs b/core/src/services/fs/backend.rs index 6fcc9ffa614..505fda60d28 100644 --- a/core/src/services/fs/backend.rs +++ b/core/src/services/fs/backend.rs @@ -264,6 +264,7 @@ impl Accessor for FsBackend { read_with_range: true, write: true, + write_can_sink: true, write_without_content_length: true, create_dir: true, delete: true, diff --git a/core/src/services/fs/writer.rs b/core/src/services/fs/writer.rs index afa02fa2e0f..4d31444af61 100644 --- a/core/src/services/fs/writer.rs +++ b/core/src/services/fs/writer.rs @@ -22,6 +22,7 @@ use std::path::PathBuf; use async_trait::async_trait; use bytes::Bytes; +use futures::StreamExt; use tokio::io::AsyncSeekExt; use tokio::io::AsyncWriteExt; @@ -64,11 +65,18 @@ impl oio::Write for FsWriter { Ok(()) } - async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> { - Err(Error::new( - ErrorKind::Unsupported, - "Write::sink is not supported", - )) + async fn sink(&mut self, _size: u64, mut s: oio::Streamer) -> Result<()> { + while let Some(bs) = s.next().await { + let bs = bs?; + self.f + .seek(SeekFrom::Start(self.pos)) + .await + .map_err(parse_io_error)?; + self.f.write_all(&bs).await.map_err(parse_io_error)?; + self.pos += bs.len() as u64; + } + + Ok(()) } async fn abort(&mut self) -> Result<()> {