Skip to content

Commit

Permalink
refactor: Add RpStat for stat operation (#984)
Browse files Browse the repository at this point in the history
Signed-off-by: Xuanwo <[email protected]>

Signed-off-by: Xuanwo <[email protected]>
  • Loading branch information
Xuanwo authored Nov 22, 2022
1 parent 68efedb commit 28b2fe1
Show file tree
Hide file tree
Showing 27 changed files with 89 additions and 84 deletions.
4 changes: 2 additions & 2 deletions src/accessor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ pub trait Accessor: Send + Sync + Debug + 'static {
/// - `stat` empty path means stat backend's root path.
/// - `stat` a path endswith "/" means stating a dir.
/// - `mode` and `content_length` must be set.
async fn stat(&self, path: &str, args: OpStat) -> Result<ObjectMetadata> {
async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
match self.inner() {
Some(inner) => inner.stat(path, args).await,
None => Err(Error::new(
Expand Down Expand Up @@ -386,7 +386,7 @@ impl<T: Accessor> Accessor for Arc<T> {
async fn write(&self, path: &str, args: OpWrite, r: BytesReader) -> Result<RpWrite> {
self.as_ref().write(path, args, r).await
}
async fn stat(&self, path: &str, args: OpStat) -> Result<ObjectMetadata> {
async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
self.as_ref().stat(path, args).await
}
async fn delete(&self, path: &str, args: OpDelete) -> Result<()> {
Expand Down
10 changes: 5 additions & 5 deletions src/adapters/kv/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,15 +127,15 @@ where
Ok(args.size())
}

async fn stat(&self, path: &str, _: OpStat) -> Result<ObjectMetadata> {
async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
if path.ends_with('/') {
Ok(ObjectMetadata::new(ObjectMode::DIR))
Ok(RpStat::new(ObjectMetadata::new(ObjectMode::DIR)))
} else {
let bs = self.kv.get(path).await?;
match bs {
Some(bs) => {
Ok(ObjectMetadata::new(ObjectMode::FILE).with_content_length(bs.len() as u64))
}
Some(bs) => Ok(RpStat::new(
ObjectMetadata::new(ObjectMode::FILE).with_content_length(bs.len() as u64),
)),
None => Err(Error::new(
ErrorKind::ObjectNotFound,
"kv doesn't have this path",
Expand Down
4 changes: 2 additions & 2 deletions src/io_util/seekable_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ pub struct SeekableReader {
enum State {
Idle,
Sending(BoxFuture<'static, Result<(RpRead, BytesReader)>>),
Seeking(BoxFuture<'static, Result<ObjectMetadata>>),
Seeking(BoxFuture<'static, Result<RpStat>>),
Reading(BytesReader),
}

Expand Down Expand Up @@ -138,7 +138,7 @@ impl AsyncSeek for SeekableReader {
pos: SeekFrom,
) -> Poll<io::Result<u64>> {
if let State::Seeking(future) = &mut self.state {
let meta = ready!(Pin::new(future).poll(cx))?;
let meta = ready!(Pin::new(future).poll(cx))?.into_metadata();
self.size = Some(meta.content_length() - self.offset.unwrap_or_default())
}

Expand Down
2 changes: 1 addition & 1 deletion src/layers/concurrent_limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ impl Accessor for ConcurrentLimitAccessor {
self.inner.write(path, args, r).await
}

async fn stat(&self, path: &str, args: OpStat) -> Result<ObjectMetadata> {
async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
let _permit = self
.semaphore
.acquire()
Expand Down
2 changes: 1 addition & 1 deletion src/layers/content_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ impl ContentCacheAccessor {
let it = match (range.offset(), range.size()) {
(Some(offset), Some(size)) => FixedCacheRangeIterator::new(offset, size, step),
_ => {
let meta = self.inner.stat(path, OpStat::new()).await?;
let meta = self.inner.stat(path, OpStat::new()).await?.into_metadata();
let bcr = BytesContentRange::from_bytes_range(meta.content_length(), range);
let br = bcr.to_bytes_range().expect("bytes range must be valid");
FixedCacheRangeIterator::new(
Expand Down
2 changes: 1 addition & 1 deletion src/layers/logging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ impl Accessor for LoggingAccessor {
})
}

async fn stat(&self, path: &str, args: OpStat) -> Result<ObjectMetadata> {
async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
debug!(
target: "opendal::services",
"service={} operation={} path={} -> started",
Expand Down
8 changes: 4 additions & 4 deletions src/layers/metadata_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ impl Accessor for MetadataCacheAccessor {
self.inner.write(path, args, r).await
}

async fn stat(&self, path: &str, args: OpStat) -> Result<ObjectMetadata> {
async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
match self.cache.read(path, OpRead::new()).await {
Ok((_, r)) => {
let buffer = Vec::with_capacity(1024);
Expand All @@ -122,10 +122,10 @@ impl Accessor for MetadataCacheAccessor {
.with_context("path", path)
.set_source(err)
})?;
Ok(meta)
Ok(RpStat::new(meta))
}
Err(err) if err.kind() == ErrorKind::ObjectNotFound => {
let meta = self.inner.stat(path, args).await?;
let meta = self.inner.stat(path, args).await?.into_metadata();
let bs = bincode::serde::encode_to_vec(&meta, bincode::config::standard())
.map_err(|err| {
Error::new(ErrorKind::Unexpected, "encode object metadata into cache")
Expand All @@ -140,7 +140,7 @@ impl Accessor for MetadataCacheAccessor {
Box::new(Cursor::new(bs)),
)
.await?;
Ok(meta)
Ok(RpStat::new(meta))
}
Err(err) => Err(err),
}
Expand Down
2 changes: 1 addition & 1 deletion src/layers/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -696,7 +696,7 @@ impl Accessor for MetricsAccessor {
})
}

async fn stat(&self, path: &str, args: OpStat) -> Result<ObjectMetadata> {
async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
self.handle.requests_total_stat.increment(1);

let start = Instant::now();
Expand Down
2 changes: 1 addition & 1 deletion src/layers/retry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ where
.await
}

async fn stat(&self, path: &str, args: OpStat) -> Result<ObjectMetadata> {
async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
{ || self.inner.stat(path, args.clone()) }
.retry(self.backoff.clone())
.when(|e| e.is_temporary())
Expand Down
2 changes: 1 addition & 1 deletion src/layers/subdir.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ impl Accessor for SubdirAccessor {
self.inner.write(&path, args, r).await
}

async fn stat(&self, path: &str, args: OpStat) -> Result<ObjectMetadata> {
async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
let path = self.prepend_subdir(path);

self.inner.stat(&path, args).await
Expand Down
2 changes: 1 addition & 1 deletion src/layers/tracing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ impl Accessor for TracingAccessor {
}

#[tracing::instrument(level = "debug", skip(self))]
async fn stat(&self, path: &str, args: OpStat) -> Result<ObjectMetadata> {
async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
self.inner.stat(path, args).await
}

Expand Down
4 changes: 2 additions & 2 deletions src/object/entry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,8 +158,8 @@ impl ObjectEntry {
pub async fn metadata(&self) -> ObjectMetadata {
if !self.complete.load(Ordering::Relaxed) {
// We will ignore all errors happened during inner metadata.
if let Ok(meta) = self.acc.stat(self.path(), OpStat::new()).await {
self.set_metadata(meta);
if let Ok(rp) = self.acc.stat(self.path(), OpStat::new()).await {
self.set_metadata(rp.into_metadata());
self.complete.store(true, Ordering::Relaxed);
}
}
Expand Down
3 changes: 2 additions & 1 deletion src/object/object.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1118,7 +1118,8 @@ impl Object {
/// # }
/// ```
pub async fn metadata(&self) -> Result<ObjectMetadata> {
self.acc.stat(self.path(), OpStat::new()).await
let rp = self.acc.stat(self.path(), OpStat::new()).await?;
Ok(rp.into_metadata())
}

/// Get current object's metadata.
Expand Down
1 change: 1 addition & 0 deletions src/ops/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ pub use op_read::OpRead;
pub use op_read::RpRead;
mod op_stat;
pub use op_stat::OpStat;
pub use op_stat::RpStat;
mod op_write;
pub use op_write::OpWrite;
pub use op_write::RpWrite;
Expand Down
20 changes: 20 additions & 0 deletions src/ops/op_stat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use crate::ObjectMetadata;

/// Args for `stat` operation.
#[derive(Debug, Clone, Default)]
pub struct OpStat {}
Expand All @@ -22,3 +24,21 @@ impl OpStat {
Self {}
}
}

/// Reply for `stat` operation.
#[derive(Debug, Clone)]
pub struct RpStat {
meta: ObjectMetadata,
}

impl RpStat {
/// Create a new reply for stat.
pub fn new(meta: ObjectMetadata) -> Self {
RpStat { meta }
}

/// Consume RpStat to get the inner metadata.
pub fn into_metadata(self) -> ObjectMetadata {
self.meta
}
}
8 changes: 4 additions & 4 deletions src/services/azblob/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -296,20 +296,20 @@ impl Accessor for Backend {
}
}

async fn stat(&self, path: &str, _: OpStat) -> Result<ObjectMetadata> {
async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
// Stat root always returns a DIR.
if path == "/" {
return Ok(ObjectMetadata::new(ObjectMode::DIR));
return Ok(RpStat::new(ObjectMetadata::new(ObjectMode::DIR)));
}

let resp = self.azblob_get_blob_properties(path).await?;

let status = resp.status();

match status {
StatusCode::OK => parse_into_object_metadata(path, resp.headers()),
StatusCode::OK => parse_into_object_metadata(path, resp.headers()).map(RpStat::new),
StatusCode::NOT_FOUND if path.ends_with('/') => {
Ok(ObjectMetadata::new(ObjectMode::DIR))
Ok(RpStat::new(ObjectMetadata::new(ObjectMode::DIR)))
}
_ => {
let er = parse_error_response(resp).await?;
Expand Down
4 changes: 2 additions & 2 deletions src/services/fs/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ impl Accessor for Backend {
Ok(RpWrite::new(size))
}

async fn stat(&self, path: &str, _: OpStat) -> Result<ObjectMetadata> {
async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
let p = build_rooted_abs_path(&self.root, path);

let meta = Self::fs_metadata(&p).await?;
Expand All @@ -298,7 +298,7 @@ impl Accessor for Backend {
.map_err(parse_io_error)?,
);

Ok(m)
Ok(RpStat::new(m))
}

async fn delete(&self, path: &str, _: OpDelete) -> Result<()> {
Expand Down
6 changes: 3 additions & 3 deletions src/services/ftp/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -365,10 +365,10 @@ impl Accessor for Backend {
Ok(RpWrite::new(bytes))
}

async fn stat(&self, path: &str, _: OpStat) -> Result<ObjectMetadata> {
async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
// root dir, return default ObjectMetadata with Dir ObjectMode.
if path == "/" {
return Ok(ObjectMetadata::new(ObjectMode::DIR));
return Ok(RpStat::new(ObjectMetadata::new(ObjectMode::DIR)));
}

let file = self.ftp_stat(path).await?;
Expand All @@ -384,7 +384,7 @@ impl Accessor for Backend {
meta.set_content_length(file.size() as u64);
meta.set_last_modified(OffsetDateTime::from(file.modified()));

Ok(meta)
Ok(RpStat::new(meta))
}

async fn delete(&self, path: &str, _: OpDelete) -> Result<()> {
Expand Down
8 changes: 4 additions & 4 deletions src/services/gcs/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -270,10 +270,10 @@ impl Accessor for Backend {
}
}

async fn stat(&self, path: &str, _: OpStat) -> Result<ObjectMetadata> {
async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
// Stat root always returns a DIR.
if path == "/" {
return Ok(ObjectMetadata::new(ObjectMode::DIR));
return Ok(RpStat::new(ObjectMetadata::new(ObjectMode::DIR)));
}

let resp = self.gcs_get_object_metadata(path).await?;
Expand Down Expand Up @@ -308,9 +308,9 @@ impl Accessor for Backend {
})?;
m.set_last_modified(datetime);

Ok(m)
Ok(RpStat::new(m))
} else if resp.status() == StatusCode::NOT_FOUND && path.ends_with('/') {
Ok(ObjectMetadata::new(ObjectMode::DIR))
Ok(RpStat::new(ObjectMetadata::new(ObjectMode::DIR)))
} else {
let er = parse_error_response(resp).await?;
let e = parse_error(er);
Expand Down
4 changes: 2 additions & 2 deletions src/services/hdfs/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ impl Accessor for Backend {
Ok(RpWrite::new(n))
}

async fn stat(&self, path: &str, _: OpStat) -> Result<ObjectMetadata> {
async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
let p = build_rooted_abs_path(&self.root, path);

let meta = self.client.metadata(&p).map_err(parse_io_error)?;
Expand All @@ -268,7 +268,7 @@ impl Accessor for Backend {
m.set_content_length(meta.len());
m.set_last_modified(OffsetDateTime::from(meta.modified()));

Ok(m)
Ok(RpStat::new(m))
}

async fn delete(&self, path: &str, _: OpDelete) -> Result<()> {
Expand Down
31 changes: 7 additions & 24 deletions src/services/http/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,29 +23,12 @@ use log::debug;

use super::error::parse_error;
use crate::accessor::AccessorCapability;
use crate::http_util::new_request_build_error;
use crate::http_util::parse_error_response;
use crate::http_util::parse_into_object_metadata;
use crate::http_util::percent_encode_path;
use crate::http_util::AsyncBody;
use crate::http_util::HttpClient;
use crate::http_util::IncomingAsyncBody;
use crate::ops::BytesRange;
use crate::ops::OpRead;
use crate::ops::OpStat;
use crate::ops::RpRead;
use crate::http_util::*;
use crate::ops::*;
use crate::path::build_rooted_abs_path;
use crate::path::normalize_root;
use crate::wrappers::wrapper;
use crate::Accessor;
use crate::AccessorMetadata;
use crate::BytesReader;
use crate::Error;
use crate::ErrorKind;
use crate::ObjectMetadata;
use crate::ObjectMode;
use crate::Result;
use crate::Scheme;
use crate::*;

/// Builder for http backend.
#[derive(Default)]
Expand Down Expand Up @@ -179,22 +162,22 @@ impl Accessor for Backend {
}
}

async fn stat(&self, path: &str, _: OpStat) -> Result<ObjectMetadata> {
async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
// Stat root always returns a DIR.
if path == "/" {
return Ok(ObjectMetadata::new(ObjectMode::DIR));
return Ok(RpStat::new(ObjectMetadata::new(ObjectMode::DIR)));
}

let resp = self.http_head(path).await?;

let status = resp.status();

match status {
StatusCode::OK => parse_into_object_metadata(path, resp.headers()),
StatusCode::OK => parse_into_object_metadata(path, resp.headers()).map(RpStat::new),
// HTTP Server like nginx could return FORBIDDEN if auto-index
// is not enabled, we should ignore them.
StatusCode::NOT_FOUND | StatusCode::FORBIDDEN if path.ends_with('/') => {
Ok(ObjectMetadata::new(ObjectMode::DIR))
Ok(RpStat::new(ObjectMetadata::new(ObjectMode::DIR)))
}
_ => {
let er = parse_error_response(resp).await?;
Expand Down
Loading

1 comment on commit 28b2fe1

@github-actions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Deploy preview for opendal ready!

✅ Preview
https://opendal-rcud4myv5-databend.vercel.app

Built with commit 28b2fe1.
This pull request is being automatically deployed with vercel-action

Please sign in to comment.