Skip to content

Commit

Permalink
feat: Implement RFC-2758 Merge Append Into Write (#2880)
Browse files Browse the repository at this point in the history
* Save work

Signed-off-by: Xuanwo <[email protected]>

* Fix test

Signed-off-by: Xuanwo <[email protected]>

* fix binding

Signed-off-by: Xuanwo <[email protected]>

* allow append for azblob

Signed-off-by: Xuanwo <[email protected]>

* Fix test

Signed-off-by: Xuanwo <[email protected]>

* Fix write test

Signed-off-by: Xuanwo <[email protected]>

* Make sure buffer has been flushed

Signed-off-by: Xuanwo <[email protected]>

* disable hdfs for now

Signed-off-by: Xuanwo <[email protected]>

* ftp doesn't support append yet

Signed-off-by: Xuanwo <[email protected]>

---------

Signed-off-by: Xuanwo <[email protected]>
  • Loading branch information
Xuanwo authored Aug 20, 2023
1 parent 2048ee9 commit 8ea4f22
Show file tree
Hide file tree
Showing 88 changed files with 724 additions and 2,359 deletions.
2 changes: 1 addition & 1 deletion bindings/java/src/operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ fn intern_append(
}

async fn do_append(op: &mut Operator, path: String, content: Vec<u8>) -> Result<()> {
Ok(op.append(&path, content).await?)
Ok(op.write_with(&path, content).append(true).await?)
}

/// # Safety
Expand Down
6 changes: 5 additions & 1 deletion bindings/nodejs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,11 @@ impl Operator {
Either::B(s) => s.into_bytes(),
};

self.0.append(&path, c).await.map_err(format_napi_error)
self.0
.write_with(&path, c)
.append(true)
.await
.map_err(format_napi_error)
}

/// Copy file according to given `from` and `to` path.
Expand Down
6 changes: 0 additions & 6 deletions core/src/layers/async_backtrace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ impl<A: Accessor> LayeredAccessor for AsyncBacktraceAccessor<A> {
type BlockingReader = A::BlockingReader;
type Writer = A::Writer;
type BlockingWriter = A::BlockingWriter;
type Appender = A::Appender;
type Pager = A::Pager;
type BlockingPager = A::BlockingPager;

Expand All @@ -83,11 +82,6 @@ impl<A: Accessor> LayeredAccessor for AsyncBacktraceAccessor<A> {
self.inner.write(path, args).await
}

#[async_backtrace::framed]
async fn append(&self, path: &str, args: OpAppend) -> Result<(RpAppend, Self::Appender)> {
self.inner.append(path, args).await
}

#[async_backtrace::framed]
async fn copy(&self, from: &str, to: &str, args: OpCopy) -> Result<RpCopy> {
self.inner.copy(from, to, args).await
Expand Down
8 changes: 0 additions & 8 deletions core/src/layers/await_tree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@ impl<A: Accessor> LayeredAccessor for AwaitTreeAccessor<A> {
type BlockingReader = A::BlockingReader;
type Writer = A::Writer;
type BlockingWriter = A::BlockingWriter;
type Appender = A::Appender;
type Pager = A::Pager;
type BlockingPager = A::BlockingPager;

Expand All @@ -95,13 +94,6 @@ impl<A: Accessor> LayeredAccessor for AwaitTreeAccessor<A> {
.await
}

async fn append(&self, path: &str, args: OpAppend) -> Result<(RpAppend, Self::Appender)> {
self.inner
.append(path, args)
.instrument_await(format!("opendal::{}", Operation::Append))
.await
}

async fn copy(&self, from: &str, to: &str, args: OpCopy) -> Result<RpCopy> {
self.inner()
.copy(from, to, args)
Expand Down
5 changes: 0 additions & 5 deletions core/src/layers/blocking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@ impl<A: Accessor> LayeredAccessor for BlockingAccessor<A> {
type BlockingReader = BlockingWrapper<A::Reader>;
type Writer = A::Writer;
type BlockingWriter = BlockingWrapper<A::Writer>;
type Appender = A::Appender;
type Pager = A::Pager;
type BlockingPager = BlockingWrapper<A::Pager>;

Expand All @@ -97,10 +96,6 @@ impl<A: Accessor> LayeredAccessor for BlockingAccessor<A> {
self.inner.write(path, args).await
}

async fn append(&self, path: &str, args: OpAppend) -> Result<(RpAppend, Self::Appender)> {
self.inner.append(path, args).await
}

async fn copy(&self, from: &str, to: &str, args: OpCopy) -> Result<RpCopy> {
self.inner.copy(from, to, args).await
}
Expand Down
5 changes: 0 additions & 5 deletions core/src/layers/chaos.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,6 @@ impl<A: Accessor> LayeredAccessor for ChaosAccessor<A> {
type BlockingReader = ChaosReader<A::BlockingReader>;
type Writer = A::Writer;
type BlockingWriter = A::BlockingWriter;
type Appender = A::Appender;
type Pager = A::Pager;
type BlockingPager = A::BlockingPager;

Expand Down Expand Up @@ -135,10 +134,6 @@ impl<A: Accessor> LayeredAccessor for ChaosAccessor<A> {
self.inner.blocking_write(path, args)
}

async fn append(&self, path: &str, args: OpAppend) -> Result<(RpAppend, Self::Appender)> {
self.inner.append(path, args).await
}

async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Pager)> {
self.inner.list(path, args).await
}
Expand Down
67 changes: 0 additions & 67 deletions core/src/layers/complete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,6 @@ impl<A: Accessor> LayeredAccessor for CompleteReaderAccessor<A> {
type BlockingReader = CompleteReader<A, A::BlockingReader>;
type Writer = CompleteWriter<A::Writer>;
type BlockingWriter = CompleteWriter<A::BlockingWriter>;
type Appender = CompleteAppender<A::Appender>;
type Pager = CompletePager<A, A::Pager>;
type BlockingPager = CompletePager<A, A::BlockingPager>;

Expand Down Expand Up @@ -446,18 +445,6 @@ impl<A: Accessor> LayeredAccessor for CompleteReaderAccessor<A> {
.map(|(rp, w)| (rp, CompleteWriter::new(w, size)))
}

async fn append(&self, path: &str, args: OpAppend) -> Result<(RpAppend, Self::Appender)> {
let capability = self.meta.full_capability();
if !capability.append {
return new_capability_unsupported_error(Operation::Append);
}

self.inner
.append(path, args)
.await
.map(|(rp, a)| (rp, CompleteAppender::new(a)))
}

async fn create_dir(&self, path: &str, args: OpCreateDir) -> Result<RpCreateDir> {
let capability = self.meta.full_capability();
if !capability.create_dir {
Expand Down Expand Up @@ -854,54 +841,6 @@ where
}
}

pub struct CompleteAppender<A> {
inner: Option<A>,
}

impl<A> CompleteAppender<A> {
pub fn new(inner: A) -> CompleteAppender<A> {
CompleteAppender { inner: Some(inner) }
}
}

/// Check if the appender has been closed while debug_assertions enabled.
/// This code will never be executed in release mode.
#[cfg(debug_assertions)]
impl<A> Drop for CompleteAppender<A> {
fn drop(&mut self) {
if self.inner.is_some() {
// Do we need to panic here?
log::warn!("appender has not been closed, must be a bug")
}
}
}

#[async_trait]
impl<A> oio::Append for CompleteAppender<A>
where
A: oio::Append,
{
async fn append(&mut self, bs: Bytes) -> Result<()> {
let a = self
.inner
.as_mut()
.ok_or_else(|| Error::new(ErrorKind::Unexpected, "appender has been closed"))?;

a.append(bs).await
}

async fn close(&mut self) -> Result<()> {
let a = self
.inner
.as_mut()
.ok_or_else(|| Error::new(ErrorKind::Unexpected, "appender has been closed"))?;

a.close().await?;
self.inner = None;
Ok(())
}
}

fn new_capability_unsupported_error<R>(operation: Operation) -> Result<R> {
Err(Error::new(ErrorKind::Unsupported, "operation is not supported").with_operation(operation))
}
Expand Down Expand Up @@ -955,7 +894,6 @@ mod tests {
type BlockingReader = ();
type Writer = ();
type BlockingWriter = ();
type Appender = ();
type Pager = ();
type BlockingPager = ();

Expand All @@ -978,10 +916,6 @@ mod tests {
Ok((RpWrite::new(), ()))
}

async fn append(&self, _: &str, _: OpAppend) -> Result<(RpAppend, Self::Appender)> {
Ok((RpAppend::new(), ()))
}

async fn copy(&self, _: &str, _: &str, _: OpCopy) -> Result<RpCopy> {
Ok(RpCopy {})
}
Expand Down Expand Up @@ -1045,7 +979,6 @@ mod tests {
capability_test!(stat, |op| { op.stat("/path/to/mock_file") });
capability_test!(read, |op| { op.read("/path/to/mock_file") });
capability_test!(write, |op| { op.writer("/path/to/mock_file") });
capability_test!(append, |op| { op.appender("/path/to/mock_file") });
capability_test!(create_dir, |op| { op.create_dir("/path/to/mock_dir/") });
capability_test!(delete, |op| { op.delete("/path/to/mock_file") });
capability_test!(copy, |op| {
Expand Down
26 changes: 0 additions & 26 deletions core/src/layers/concurrent_limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,6 @@ impl<A: Accessor> LayeredAccessor for ConcurrentLimitAccessor<A> {
type BlockingReader = ConcurrentLimitWrapper<A::BlockingReader>;
type Writer = ConcurrentLimitWrapper<A::Writer>;
type BlockingWriter = ConcurrentLimitWrapper<A::BlockingWriter>;
type Appender = ConcurrentLimitWrapper<A::Appender>;
type Pager = ConcurrentLimitWrapper<A::Pager>;
type BlockingPager = ConcurrentLimitWrapper<A::BlockingPager>;

Expand Down Expand Up @@ -132,20 +131,6 @@ impl<A: Accessor> LayeredAccessor for ConcurrentLimitAccessor<A> {
.map(|(rp, w)| (rp, ConcurrentLimitWrapper::new(w, permit)))
}

async fn append(&self, path: &str, args: OpAppend) -> Result<(RpAppend, Self::Appender)> {
let permit = self
.semaphore
.clone()
.acquire_owned()
.await
.expect("semaphore must be valid");

self.inner
.append(path, args)
.await
.map(|(rp, a)| (rp, ConcurrentLimitWrapper::new(a, permit)))
}

async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
let _permit = self
.semaphore
Expand Down Expand Up @@ -327,17 +312,6 @@ impl<R: oio::BlockingWrite> oio::BlockingWrite for ConcurrentLimitWrapper<R> {
}
}

#[async_trait]
impl<R: oio::Append> oio::Append for ConcurrentLimitWrapper<R> {
async fn append(&mut self, bs: Bytes) -> Result<()> {
self.inner.append(bs).await
}

async fn close(&mut self) -> Result<()> {
self.inner.close().await
}
}

#[async_trait]
impl<R: oio::Page> oio::Page for ConcurrentLimitWrapper<R> {
async fn next(&mut self) -> Result<Option<Vec<oio::Entry>>> {
Expand Down
42 changes: 0 additions & 42 deletions core/src/layers/error_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ use async_trait::async_trait;
use bytes::Bytes;
use futures::TryFutureExt;

use crate::raw::oio::AppendOperation;
use crate::raw::oio::PageOperation;
use crate::raw::oio::ReadOperation;
use crate::raw::oio::WriteOperation;
Expand Down Expand Up @@ -71,7 +70,6 @@ impl<A: Accessor> LayeredAccessor for ErrorContextAccessor<A> {
type BlockingReader = ErrorContextWrapper<A::BlockingReader>;
type Writer = ErrorContextWrapper<A::Writer>;
type BlockingWriter = ErrorContextWrapper<A::BlockingWriter>;
type Appender = ErrorContextWrapper<A::Appender>;
type Pager = ErrorContextWrapper<A::Pager>;
type BlockingPager = ErrorContextWrapper<A::BlockingPager>;

Expand Down Expand Up @@ -139,27 +137,6 @@ impl<A: Accessor> LayeredAccessor for ErrorContextAccessor<A> {
.await
}

async fn append(&self, path: &str, args: OpAppend) -> Result<(RpAppend, Self::Appender)> {
self.inner
.append(path, args)
.map_ok(|(rp, os)| {
(
rp,
ErrorContextWrapper {
scheme: self.meta.scheme(),
path: path.to_string(),
inner: os,
},
)
})
.map_err(|err| {
err.with_operation(Operation::Append)
.with_context("service", self.meta.scheme())
.with_context("path", path)
})
.await
}

async fn copy(&self, from: &str, to: &str, args: OpCopy) -> Result<RpCopy> {
self.inner
.copy(from, to, args)
Expand Down Expand Up @@ -477,25 +454,6 @@ impl<T: oio::BlockingWrite> oio::BlockingWrite for ErrorContextWrapper<T> {
}
}

#[async_trait::async_trait]
impl<T: oio::Append> oio::Append for ErrorContextWrapper<T> {
async fn append(&mut self, bs: Bytes) -> Result<()> {
self.inner.append(bs).await.map_err(|err| {
err.with_operation(AppendOperation::Append)
.with_context("service", self.scheme)
.with_context("path", &self.path)
})
}

async fn close(&mut self) -> Result<()> {
self.inner.close().await.map_err(|err| {
err.with_operation(AppendOperation::Close)
.with_context("service", self.scheme)
.with_context("path", &self.path)
})
}
}

#[async_trait::async_trait]
impl<T: oio::Page> oio::Page for ErrorContextWrapper<T> {
async fn next(&mut self) -> Result<Option<Vec<oio::Entry>>> {
Expand Down
5 changes: 0 additions & 5 deletions core/src/layers/immutable_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,6 @@ impl<A: Accessor> LayeredAccessor for ImmutableIndexAccessor<A> {
type BlockingReader = A::BlockingReader;
type Writer = A::Writer;
type BlockingWriter = A::BlockingWriter;
type Appender = A::Appender;
type Pager = ImmutableDir;
type BlockingPager = ImmutableDir;

Expand Down Expand Up @@ -196,10 +195,6 @@ impl<A: Accessor> LayeredAccessor for ImmutableIndexAccessor<A> {
self.inner.blocking_write(path, args)
}

async fn append(&self, path: &str, args: OpAppend) -> Result<(RpAppend, Self::Appender)> {
self.inner.append(path, args).await
}

fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList, Self::BlockingPager)> {
let mut path = path;
if path == "/" {
Expand Down
Loading

0 comments on commit 8ea4f22

Please sign in to comment.