diff --git a/core/src/layers/error_context.rs b/core/src/layers/error_context.rs index eeb429a3e71..86ae9dba80c 100644 --- a/core/src/layers/error_context.rs +++ b/core/src/layers/error_context.rs @@ -35,6 +35,14 @@ use crate::*; /// - `service`: The [`Scheme`] of underlying service. /// - `operation`: The [`Operation`] of this operation /// - `path`: The path of this operation +/// +/// Some operations may have additional context: +/// +/// - `range`: The range the read operation is trying to read. +/// - `read`: The already read size in given reader. +/// - `size`: The size of the current write operation. +/// - `written`: The already written size in given writer. +/// - `listed`: The already listed size in given lister. pub struct ErrorContextLayer; impl Layer for ErrorContextLayer { @@ -87,22 +95,21 @@ impl LayeredAccess for ErrorContextAccessor { } async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> { + let range = args.range(); self.inner .read(path, args) .map_ok(|(rp, r)| { ( rp, - ErrorContextWrapper { - scheme: self.meta.scheme(), - path: path.to_string(), - inner: r, - }, + ErrorContextWrapper::new(self.meta.scheme(), path.to_string(), r) + .with_range(range), ) }) .map_err(|err| { err.with_operation(Operation::Read) .with_context("service", self.meta.scheme()) .with_context("path", path) + .with_context("range", range.to_string()) }) .await } @@ -113,11 +120,7 @@ impl LayeredAccess for ErrorContextAccessor { .map_ok(|(rp, w)| { ( rp, - ErrorContextWrapper { - scheme: self.meta.scheme(), - path: path.to_string(), - inner: w, - }, + ErrorContextWrapper::new(self.meta.scheme(), path.to_string(), w), ) }) .map_err(|err| { @@ -180,11 +183,7 @@ impl LayeredAccess for ErrorContextAccessor { .map_ok(|(rp, p)| { ( rp, - ErrorContextWrapper { - scheme: self.meta.scheme(), - path: path.to_string(), - inner: p, - }, + ErrorContextWrapper::new(self.meta.scheme(), path.to_string(), p), ) }) .map_err(|err| { @@ -238,22 +237,21 @@ impl LayeredAccess for ErrorContextAccessor { } fn blocking_read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::BlockingReader)> { + let range = args.range(); self.inner .blocking_read(path, args) .map(|(rp, os)| { ( rp, - ErrorContextWrapper { - scheme: self.meta.scheme(), - path: path.to_string(), - inner: os, - }, + ErrorContextWrapper::new(self.meta.scheme(), path.to_string(), os) + .with_range(range), ) }) .map_err(|err| { err.with_operation(Operation::BlockingRead) .with_context("service", self.meta.scheme()) .with_context("path", path) + .with_context("range", range.to_string()) }) } @@ -263,11 +261,7 @@ impl LayeredAccess for ErrorContextAccessor { .map(|(rp, os)| { ( rp, - ErrorContextWrapper { - scheme: self.meta.scheme(), - path: path.to_string(), - inner: os, - }, + ErrorContextWrapper::new(self.meta.scheme(), path.to_string(), os), ) }) .map_err(|err| { @@ -317,11 +311,7 @@ impl LayeredAccess for ErrorContextAccessor { .map(|(rp, os)| { ( rp, - ErrorContextWrapper { - scheme: self.meta.scheme(), - path: path.to_string(), - inner: os, - }, + ErrorContextWrapper::new(self.meta.scheme(), path.to_string(), os), ) }) .map_err(|err| { @@ -336,36 +326,81 @@ pub struct ErrorContextWrapper { scheme: Scheme, path: String, inner: T, + range: BytesRange, + processed: u64, +} + +impl ErrorContextWrapper { + fn new(scheme: Scheme, path: String, inner: T) -> Self { + Self { + scheme, + path, + inner, + range: BytesRange::default(), + processed: 0, + } + } + + fn with_range(mut self, range: BytesRange) -> Self { + self.range = range; + self + } } impl oio::Read for ErrorContextWrapper { async fn read(&mut self) -> Result { - self.inner.read().await.map_err(|err| { - err.with_operation(ReadOperation::Read) - .with_context("service", self.scheme) - .with_context("path", &self.path) - }) + self.inner + .read() + .await + .map(|bs| { + self.processed += bs.len() as u64; + bs + }) + .map_err(|err| { + err.with_operation(ReadOperation::Read) + .with_context("service", self.scheme) + .with_context("path", &self.path) + .with_context("range", self.range.to_string()) + .with_context("read", self.processed.to_string()) + }) } } impl oio::BlockingRead for ErrorContextWrapper { fn read(&mut self) -> Result { - self.inner.read().map_err(|err| { - err.with_operation(ReadOperation::BlockingRead) - .with_context("service", self.scheme) - .with_context("path", &self.path) - }) + self.inner + .read() + .map(|bs| { + self.processed += bs.len() as u64; + bs + }) + .map_err(|err| { + err.with_operation(ReadOperation::BlockingRead) + .with_context("service", self.scheme) + .with_context("path", &self.path) + .with_context("range", self.range.to_string()) + .with_context("read", self.processed.to_string()) + }) } } impl oio::Write for ErrorContextWrapper { async fn write(&mut self, bs: Buffer) -> Result { - self.inner.write(bs.clone()).await.map_err(|err| { - err.with_operation(WriteOperation::Write) - .with_context("service", self.scheme) - .with_context("path", &self.path) - .with_context("write_buf", bs.len().to_string()) - }) + let size = bs.len(); + self.inner + .write(bs) + .await + .map(|n| { + self.processed += n as u64; + n + }) + .map_err(|err| { + err.with_operation(WriteOperation::Write) + .with_context("service", self.scheme) + .with_context("path", &self.path) + .with_context("size", size.to_string()) + .with_context("written", self.processed.to_string()) + }) } async fn close(&mut self) -> Result<()> { @@ -373,6 +408,7 @@ impl oio::Write for ErrorContextWrapper { err.with_operation(WriteOperation::Close) .with_context("service", self.scheme) .with_context("path", &self.path) + .with_context("written", self.processed.to_string()) }) } @@ -381,18 +417,27 @@ impl oio::Write for ErrorContextWrapper { err.with_operation(WriteOperation::Abort) .with_context("service", self.scheme) .with_context("path", &self.path) + .with_context("processed", self.processed.to_string()) }) } } impl oio::BlockingWrite for ErrorContextWrapper { fn write(&mut self, bs: Buffer) -> Result { - self.inner.write(bs.clone()).map_err(|err| { - err.with_operation(WriteOperation::BlockingWrite) - .with_context("service", self.scheme) - .with_context("path", &self.path) - .with_context("write_buf", bs.len().to_string()) - }) + let size = bs.len(); + self.inner + .write(bs) + .map(|n| { + self.processed += n as u64; + n + }) + .map_err(|err| { + err.with_operation(WriteOperation::BlockingWrite) + .with_context("service", self.scheme) + .with_context("path", &self.path) + .with_context("size", size.to_string()) + .with_context("written", self.processed.to_string()) + }) } fn close(&mut self) -> Result<()> { @@ -400,26 +445,42 @@ impl oio::BlockingWrite for ErrorContextWrapper { err.with_operation(WriteOperation::BlockingClose) .with_context("service", self.scheme) .with_context("path", &self.path) + .with_context("written", self.processed.to_string()) }) } } impl oio::List for ErrorContextWrapper { async fn next(&mut self) -> Result> { - self.inner.next().await.map_err(|err| { - err.with_operation(ListOperation::Next) - .with_context("service", self.scheme) - .with_context("path", &self.path) - }) + self.inner + .next() + .await + .map(|bs| { + self.processed += bs.is_some() as u64; + bs + }) + .map_err(|err| { + err.with_operation(ListOperation::Next) + .with_context("service", self.scheme) + .with_context("path", &self.path) + .with_context("listed", self.processed.to_string()) + }) } } impl oio::BlockingList for ErrorContextWrapper { fn next(&mut self) -> Result> { - self.inner.next().map_err(|err| { - err.with_operation(ListOperation::BlockingNext) - .with_context("service", self.scheme) - .with_context("path", &self.path) - }) + self.inner + .next() + .map(|bs| { + self.processed += bs.is_some() as u64; + bs + }) + .map_err(|err| { + err.with_operation(ListOperation::BlockingNext) + .with_context("service", self.scheme) + .with_context("path", &self.path) + .with_context("listed", self.processed.to_string()) + }) } }