Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(core): Add more context in error context #4673

Merged
merged 1 commit into from
Jun 3, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
185 changes: 123 additions & 62 deletions core/src/layers/error_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,14 @@ use crate::*;
/// - `service`: The [`Scheme`] of underlying service.
/// - `operation`: The [`Operation`] of this operation
/// - `path`: The path of this operation
///
/// Some operations may have additional context:
///
/// - `range`: The range the read operation is trying to read.
/// - `read`: The already read size in given reader.
/// - `size`: The size of the current write operation.
/// - `written`: The already written size in given writer.
/// - `listed`: The already listed size in given lister.
pub struct ErrorContextLayer;

impl<A: Access> Layer<A> for ErrorContextLayer {
Expand Down Expand Up @@ -87,22 +95,21 @@ impl<A: Access> LayeredAccess for ErrorContextAccessor<A> {
}

async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
let range = args.range();
self.inner
.read(path, args)
.map_ok(|(rp, r)| {
(
rp,
ErrorContextWrapper {
scheme: self.meta.scheme(),
path: path.to_string(),
inner: r,
},
ErrorContextWrapper::new(self.meta.scheme(), path.to_string(), r)
.with_range(range),
)
})
.map_err(|err| {
err.with_operation(Operation::Read)
.with_context("service", self.meta.scheme())
.with_context("path", path)
.with_context("range", range.to_string())
})
.await
}
Expand All @@ -113,11 +120,7 @@ impl<A: Access> LayeredAccess for ErrorContextAccessor<A> {
.map_ok(|(rp, w)| {
(
rp,
ErrorContextWrapper {
scheme: self.meta.scheme(),
path: path.to_string(),
inner: w,
},
ErrorContextWrapper::new(self.meta.scheme(), path.to_string(), w),
)
})
.map_err(|err| {
Expand Down Expand Up @@ -180,11 +183,7 @@ impl<A: Access> LayeredAccess for ErrorContextAccessor<A> {
.map_ok(|(rp, p)| {
(
rp,
ErrorContextWrapper {
scheme: self.meta.scheme(),
path: path.to_string(),
inner: p,
},
ErrorContextWrapper::new(self.meta.scheme(), path.to_string(), p),
)
})
.map_err(|err| {
Expand Down Expand Up @@ -238,22 +237,21 @@ impl<A: Access> LayeredAccess for ErrorContextAccessor<A> {
}

fn blocking_read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::BlockingReader)> {
let range = args.range();
self.inner
.blocking_read(path, args)
.map(|(rp, os)| {
(
rp,
ErrorContextWrapper {
scheme: self.meta.scheme(),
path: path.to_string(),
inner: os,
},
ErrorContextWrapper::new(self.meta.scheme(), path.to_string(), os)
.with_range(range),
)
})
.map_err(|err| {
err.with_operation(Operation::BlockingRead)
.with_context("service", self.meta.scheme())
.with_context("path", path)
.with_context("range", range.to_string())
})
}

Expand All @@ -263,11 +261,7 @@ impl<A: Access> LayeredAccess for ErrorContextAccessor<A> {
.map(|(rp, os)| {
(
rp,
ErrorContextWrapper {
scheme: self.meta.scheme(),
path: path.to_string(),
inner: os,
},
ErrorContextWrapper::new(self.meta.scheme(), path.to_string(), os),
)
})
.map_err(|err| {
Expand Down Expand Up @@ -317,11 +311,7 @@ impl<A: Access> LayeredAccess for ErrorContextAccessor<A> {
.map(|(rp, os)| {
(
rp,
ErrorContextWrapper {
scheme: self.meta.scheme(),
path: path.to_string(),
inner: os,
},
ErrorContextWrapper::new(self.meta.scheme(), path.to_string(), os),
)
})
.map_err(|err| {
Expand All @@ -336,43 +326,89 @@ pub struct ErrorContextWrapper<T> {
scheme: Scheme,
path: String,
inner: T,
range: BytesRange,
processed: u64,
}

impl<T> ErrorContextWrapper<T> {
fn new(scheme: Scheme, path: String, inner: T) -> Self {
Self {
scheme,
path,
inner,
range: BytesRange::default(),
processed: 0,
}
}

fn with_range(mut self, range: BytesRange) -> Self {
self.range = range;
self
}
}

impl<T: oio::Read> oio::Read for ErrorContextWrapper<T> {
async fn read(&mut self) -> Result<Buffer> {
self.inner.read().await.map_err(|err| {
err.with_operation(ReadOperation::Read)
.with_context("service", self.scheme)
.with_context("path", &self.path)
})
self.inner
.read()
.await
.map(|bs| {
self.processed += bs.len() as u64;
bs
})
.map_err(|err| {
err.with_operation(ReadOperation::Read)
.with_context("service", self.scheme)
.with_context("path", &self.path)
.with_context("range", self.range.to_string())
.with_context("read", self.processed.to_string())
})
}
}

impl<T: oio::BlockingRead> oio::BlockingRead for ErrorContextWrapper<T> {
fn read(&mut self) -> Result<Buffer> {
self.inner.read().map_err(|err| {
err.with_operation(ReadOperation::BlockingRead)
.with_context("service", self.scheme)
.with_context("path", &self.path)
})
self.inner
.read()
.map(|bs| {
self.processed += bs.len() as u64;
bs
})
.map_err(|err| {
err.with_operation(ReadOperation::BlockingRead)
.with_context("service", self.scheme)
.with_context("path", &self.path)
.with_context("range", self.range.to_string())
.with_context("read", self.processed.to_string())
})
}
}

impl<T: oio::Write> oio::Write for ErrorContextWrapper<T> {
async fn write(&mut self, bs: Buffer) -> Result<usize> {
self.inner.write(bs.clone()).await.map_err(|err| {
err.with_operation(WriteOperation::Write)
.with_context("service", self.scheme)
.with_context("path", &self.path)
.with_context("write_buf", bs.len().to_string())
})
let size = bs.len();
self.inner
.write(bs)
.await
.map(|n| {
self.processed += n as u64;
n
})
.map_err(|err| {
err.with_operation(WriteOperation::Write)
.with_context("service", self.scheme)
.with_context("path", &self.path)
.with_context("size", size.to_string())
.with_context("written", self.processed.to_string())
})
}

async fn close(&mut self) -> Result<()> {
self.inner.close().await.map_err(|err| {
err.with_operation(WriteOperation::Close)
.with_context("service", self.scheme)
.with_context("path", &self.path)
.with_context("written", self.processed.to_string())
})
}

Expand All @@ -381,45 +417,70 @@ impl<T: oio::Write> oio::Write for ErrorContextWrapper<T> {
err.with_operation(WriteOperation::Abort)
.with_context("service", self.scheme)
.with_context("path", &self.path)
.with_context("processed", self.processed.to_string())
})
}
}

impl<T: oio::BlockingWrite> oio::BlockingWrite for ErrorContextWrapper<T> {
fn write(&mut self, bs: Buffer) -> Result<usize> {
self.inner.write(bs.clone()).map_err(|err| {
err.with_operation(WriteOperation::BlockingWrite)
.with_context("service", self.scheme)
.with_context("path", &self.path)
.with_context("write_buf", bs.len().to_string())
})
let size = bs.len();
self.inner
.write(bs)
.map(|n| {
self.processed += n as u64;
n
})
.map_err(|err| {
err.with_operation(WriteOperation::BlockingWrite)
.with_context("service", self.scheme)
.with_context("path", &self.path)
.with_context("size", size.to_string())
.with_context("written", self.processed.to_string())
})
}

fn close(&mut self) -> Result<()> {
self.inner.close().map_err(|err| {
err.with_operation(WriteOperation::BlockingClose)
.with_context("service", self.scheme)
.with_context("path", &self.path)
.with_context("written", self.processed.to_string())
})
}
}

impl<T: oio::List> oio::List for ErrorContextWrapper<T> {
async fn next(&mut self) -> Result<Option<oio::Entry>> {
self.inner.next().await.map_err(|err| {
err.with_operation(ListOperation::Next)
.with_context("service", self.scheme)
.with_context("path", &self.path)
})
self.inner
.next()
.await
.map(|bs| {
self.processed += bs.is_some() as u64;
bs
})
.map_err(|err| {
err.with_operation(ListOperation::Next)
.with_context("service", self.scheme)
.with_context("path", &self.path)
.with_context("listed", self.processed.to_string())
})
}
}

impl<T: oio::BlockingList> oio::BlockingList for ErrorContextWrapper<T> {
fn next(&mut self) -> Result<Option<oio::Entry>> {
self.inner.next().map_err(|err| {
err.with_operation(ListOperation::BlockingNext)
.with_context("service", self.scheme)
.with_context("path", &self.path)
})
self.inner
.next()
.map(|bs| {
self.processed += bs.is_some() as u64;
bs
})
.map_err(|err| {
err.with_operation(ListOperation::BlockingNext)
.with_context("service", self.scheme)
.with_context("path", &self.path)
.with_context("listed", self.processed.to_string())
})
}
}
Loading