Skip to content

Commit

Permalink
feat(services/fs): impl Sink for Fs (#2626)
Browse files Browse the repository at this point in the history
* impl Sink for Fs

* rename

* upd
  • Loading branch information
xyjixyjixyji authored Jul 13, 2023
1 parent eaffb47 commit c0205fd
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 5 deletions.
1 change: 1 addition & 0 deletions core/src/services/fs/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,7 @@ impl Accessor for FsBackend {
read_with_range: true,

write: true,
write_can_sink: true,
write_without_content_length: true,
create_dir: true,
delete: true,
Expand Down
18 changes: 13 additions & 5 deletions core/src/services/fs/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use std::path::PathBuf;

use async_trait::async_trait;
use bytes::Bytes;
use futures::StreamExt;
use tokio::io::AsyncSeekExt;
use tokio::io::AsyncWriteExt;

Expand Down Expand Up @@ -64,11 +65,18 @@ impl oio::Write for FsWriter<tokio::fs::File> {
Ok(())
}

async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> {
Err(Error::new(
ErrorKind::Unsupported,
"Write::sink is not supported",
))
async fn sink(&mut self, _size: u64, mut s: oio::Streamer) -> Result<()> {
while let Some(bs) = s.next().await {
let bs = bs?;
self.f
.seek(SeekFrom::Start(self.pos))
.await
.map_err(parse_io_error)?;
self.f.write_all(&bs).await.map_err(parse_io_error)?;
self.pos += bs.len() as u64;
}

Ok(())
}

async fn abort(&mut self) -> Result<()> {
Expand Down

0 comments on commit c0205fd

Please sign in to comment.