Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: Allow reusing the same operator to speed up tests #2068

Merged
merged 18 commits into from
Apr 23, 2023
Merged
2 changes: 2 additions & 0 deletions .github/workflows/service_test_http.yml
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ jobs:
RUST_LOG: debug
OPENDAL_HTTP_TEST: on
OPENDAL_HTTP_ENDPOINT: http://127.0.0.1:8080
OPENDAL_DISABLE_RANDOM_ROOT: true

caddy:
runs-on: ${{ matrix.os }}
Expand Down Expand Up @@ -102,3 +103,4 @@ jobs:
RUST_LOG: debug
OPENDAL_HTTP_TEST: on
OPENDAL_HTTP_ENDPOINT: http://127.0.0.1:8080
OPENDAL_DISABLE_RANDOM_ROOT: true
2 changes: 2 additions & 0 deletions .github/workflows/service_test_ipfs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ jobs:
OPENDAL_IPFS_TEST: on
OPENDAL_IPFS_ROOT: /ipfs/QmPpCt1aYGb9JWJRmXRUnmJtVgeFFTJGzWFYEEX7bo9zGJ/
OPENDAL_IPFS_ENDPOINT: "http://127.0.0.1:8080"
OPENDAL_DISABLE_RANDOM_ROOT: true

# # ipfs.io can't pass our test by now, we should address them in the future.
# ipfs-io:
# runs-on: ubuntu-latest
Expand Down
28 changes: 28 additions & 0 deletions core/src/layers/retry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,34 @@ impl<A: Accessor> LayeredAccessor for RetryAccessor<A> {
.await
}

async fn copy(&self, from: &str, to: &str, args: OpCopy) -> Result<RpCopy> {
{ || self.inner.copy(from, to, args.clone()) }
.retry(&self.builder)
.when(|e| e.is_temporary())
.notify(|err, dur| {
warn!(
target: "opendal::service",
"operation={} -> retry after {}s: error={:?}",
Operation::Copy, dur.as_secs_f64(), err)
})
.map(|v| v.map_err(|e| e.set_persistent()))
.await
}

async fn rename(&self, from: &str, to: &str, args: OpRename) -> Result<RpRename> {
{ || self.inner.rename(from, to, args.clone()) }
.retry(&self.builder)
.when(|e| e.is_temporary())
.notify(|err, dur| {
warn!(
target: "opendal::service",
"operation={} -> retry after {}s: error={:?}",
Operation::Rename, dur.as_secs_f64(), err)
})
.map(|v| v.map_err(|e| e.set_persistent()))
.await
}

async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Pager)> {
{ || self.inner.list(path, args.clone()) }
.retry(&self.builder)
Expand Down
6 changes: 4 additions & 2 deletions core/src/raw/http_util/body.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,11 +146,13 @@ impl IncomingAsyncBody {
Ordering::Less => Err(Error::new(
ErrorKind::ContentIncomplete,
&format!("reader got too less data, expect: {expect}, actual: {actual}"),
)),
)
.set_temporary()),
Ordering::Greater => Err(Error::new(
ErrorKind::ContentTruncated,
&format!("reader got too much data, expect: {expect}, actual: {actual}"),
)),
)
.set_temporary()),
}
}
}
Expand Down
46 changes: 31 additions & 15 deletions core/src/services/ftp/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -335,22 +335,16 @@ impl Accessor for FtpBackend {

for path in paths {
curr_path.push_str(path);
// try to create directory
if curr_path.ends_with('/') {
match ftp_stream.mkdir(&curr_path).await {
// Do nothing if status is FileUnavailable or OK(()) is return.
Err(FtpError::UnexpectedResponse(Response {
status: Status::FileUnavailable,
..
}))
| Ok(()) => (),
Err(e) => {
return Err(e.into());
}
match ftp_stream.mkdir(&curr_path).await {
// Do nothing if status is FileUnavailable or OK(()) is return.
Err(FtpError::UnexpectedResponse(Response {
status: Status::FileUnavailable,
..
}))
| Ok(()) => (),
Err(e) => {
return Err(e.into());
}
} else {
// else, create file
ftp_stream.put_file(&curr_path, &mut "".as_bytes()).await?;
}
}

Expand Down Expand Up @@ -398,6 +392,28 @@ impl Accessor for FtpBackend {
));
}

// Ensure the parent dir exists.
let parent = get_parent(path);
let paths: Vec<&str> = parent.split('/').collect();

// TODO: we can optimize this by checking dir existence first.
let mut ftp_stream = self.ftp_connect(Operation::Write).await?;
let mut curr_path = String::new();
for path in paths {
curr_path.push_str(path);
match ftp_stream.mkdir(&curr_path).await {
// Do nothing if status is FileUnavailable or OK(()) is return.
Err(FtpError::UnexpectedResponse(Response {
status: Status::FileUnavailable,
..
}))
| Ok(()) => (),
Err(e) => {
return Err(e.into());
}
}
}

Ok((
RpWrite::new(),
FtpWriter::new(self.clone(), path.to_string()),
Expand Down
4 changes: 3 additions & 1 deletion core/src/services/redis/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,8 @@ impl kv::Adapter for Adapter {

impl From<RedisError> for Error {
fn from(e: RedisError) -> Self {
Error::new(ErrorKind::Unexpected, e.category()).set_source(e)
Error::new(ErrorKind::Unexpected, e.category())
.set_source(e)
.set_temporary()
}
}
4 changes: 4 additions & 0 deletions core/src/services/webdav/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -648,6 +648,10 @@ impl WebdavBackend {
}

async fn ensure_parent_path(&self, path: &str) -> Result<()> {
if path == "/" {
return Ok(());
}

// create dir recursively, split path by `/` and create each dir except the last one
let abs_path = build_abs_path(&self.root, path);
let abs_path = abs_path.as_str();
Expand Down
11 changes: 6 additions & 5 deletions core/src/services/webdav/pager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,13 @@ impl oio::Page for WebdavPager {
.into_iter()
.filter_map(|de| {
let path = de.href;
let normalized_path = if self.root != path {
build_rel_path(&self.root, &path)
} else {
path
};

// Ignore the root path itself.
if self.root == path {
return None;
}

let normalized_path = build_rel_path(&self.root, &path);
if normalized_path == self.path {
// WebDav server may return the current path as an entry.
return None;
Expand Down
49 changes: 48 additions & 1 deletion core/src/types/capability.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
// specific language governing permissions and limitations
// under the License.

use std::fmt::Debug;

/// Capability is used to describe what operations are supported
/// by current Operator.
///
Expand Down Expand Up @@ -42,7 +44,7 @@
/// - Operation with variants should be named like `read_can_seek`.
/// - Operation with arguments should be named like `read_with_range`.
/// - Operation with limtations should be named like `batch_max_operations`.
#[derive(Copy, Clone, Debug, Default)]
#[derive(Copy, Clone, Default)]
pub struct Capability {
/// If operator supports stat natively, it will be true.
pub stat: bool,
Expand Down Expand Up @@ -127,3 +129,48 @@ pub struct Capability {
/// If operator supports blocking natively, it will be true.
pub blocking: bool,
}

impl Debug for Capability {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let mut s = vec![];

if self.read {
s.push("Read");
}
if self.stat {
s.push("Stat");
}
if self.write {
s.push("Write");
}
if self.create_dir {
s.push("CreateDir");
}
if self.delete {
s.push("Delete");
}
if self.list {
s.push("List");
}
if self.scan {
s.push("Scan");
}
if self.copy {
s.push("Copy");
}
if self.rename {
s.push("Rename");
}
if self.presign {
s.push("Presign");
}
if self.batch {
s.push("Batch");
}
if self.blocking {
s.push("Blocking");
}

write!(f, "{{ {} }}", s.join(" | "))
}
}
41 changes: 19 additions & 22 deletions core/tests/behavior/blocking_copy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,31 +30,28 @@ use super::utils::*;
macro_rules! behavior_blocking_copy_test {
($service:ident, $($(#[$meta:meta])* $test:ident),*,) => {
paste::item! {
mod [<services_ $service:lower _blocking_copy>] {
$(
#[test]
$(
#[test]
$(
#[$meta]
)*
fn [< $test >]() -> anyhow::Result<()> {
let op = $crate::utils::init_service::<opendal::services::$service>(true);
match op {
Some(op) if op.info().can_read()
&& op.info().can_write()
&& op.info().can_copy()
&& op.info().can_blocking() => $crate::blocking_copy::$test(op.blocking()),
Some(_) => {
log::warn!("service {} doesn't support blocking_copy, ignored", opendal::Scheme::$service);
Ok(())
},
None => {
log::warn!("service {} not initiated, ignored", opendal::Scheme::$service);
Ok(())
}
#[$meta]
)*
fn [<blocking_copy_ $test >]() -> anyhow::Result<()> {
match OPERATOR.as_ref() {
Some(op) if op.info().can_read()
&& op.info().can_write()
&& op.info().can_copy()
&& op.info().can_blocking() => $crate::blocking_copy::$test(op.blocking()),
Some(_) => {
log::warn!("service {} doesn't support blocking_copy, ignored", opendal::Scheme::$service);
Ok(())
},
None => {
log::warn!("service {} not initiated, ignored", opendal::Scheme::$service);
Ok(())
}
}
)*
}
}
)*
}
};
}
Expand Down
60 changes: 33 additions & 27 deletions core/tests/behavior/blocking_list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,30 +34,27 @@ use super::utils::*;
macro_rules! behavior_blocking_list_test {
($service:ident, $($(#[$meta:meta])* $test:ident),*,) => {
paste::item! {
mod [<services_ $service:lower _blocking_list>] {
$(
#[test]
$(
#[test]
$(
#[$meta]
)*
fn [< $test >]() -> anyhow::Result<()> {
let op = $crate::utils::init_service::<opendal::services::$service>(true);
match op {
Some(op) if op.info().can_read()
&& op.info().can_write()
&& op.info().can_blocking() && (op.info().can_list()||op.info().can_scan()) => $crate::blocking_list::$test(op.blocking()),
Some(_) => {
log::warn!("service {} doesn't support blocking_list, ignored", opendal::Scheme::$service);
Ok(())
},
None => {
log::warn!("service {} not initiated, ignored", opendal::Scheme::$service);
Ok(())
}
#[$meta]
)*
fn [<blocking_list_ $test >]() -> anyhow::Result<()> {
match OPERATOR.as_ref() {
Some(op) if op.info().can_read()
&& op.info().can_write()
&& op.info().can_blocking() && (op.info().can_list()||op.info().can_scan()) => $crate::blocking_list::$test(op.blocking()),
Some(_) => {
log::warn!("service {} doesn't support blocking_list, ignored", opendal::Scheme::$service);
Ok(())
},
None => {
log::warn!("service {} not initiated, ignored", opendal::Scheme::$service);
Ok(())
}
}
)*
}
}
)*
}
};
}
Expand All @@ -79,13 +76,14 @@ macro_rules! behavior_blocking_list_tests {

/// List dir should return newly created file.
pub fn test_list_dir(op: BlockingOperator) -> Result<()> {
let path = uuid::Uuid::new_v4().to_string();
let parent = uuid::Uuid::new_v4().to_string();
let path = format!("{parent}/{}", uuid::Uuid::new_v4());
debug!("Generate a random file: {}", &path);
let (content, size) = gen_bytes();

op.write(&path, content).expect("write must succeed");

let obs = op.list("/")?;
let obs = op.list(&format!("{parent}/"))?;
let mut found = false;
for de in obs {
let de = de?;
Expand Down Expand Up @@ -122,22 +120,30 @@ pub fn test_list_non_exist_dir(op: BlockingOperator) -> Result<()> {

// Walk top down should output as expected
pub fn test_scan(op: BlockingOperator) -> Result<()> {
let parent = uuid::Uuid::new_v4().to_string();

let expected = vec![
"x/", "x/y", "x/x/", "x/x/y", "x/x/x/", "x/x/x/y", "x/x/x/x/",
];
for path in expected.iter() {
if path.ends_with('/') {
op.create_dir(path)?;
op.create_dir(&format!("{parent}/{path}"))?;
} else {
op.write(path, "test_scan")?;
op.write(&format!("{parent}/{path}"), "test_scan")?;
}
}

let w = op.scan("x/")?;
let w = op.scan(&format!("{parent}/x/"))?;
let actual = w
.collect::<Vec<_>>()
.into_iter()
.map(|v| v.unwrap().path().to_string())
.map(|v| {
v.unwrap()
.path()
.strip_prefix(&format!("{parent}/"))
.unwrap()
.to_string()
})
.collect::<HashSet<_>>();

debug!("walk top down: {:?}", actual);
Expand Down
Loading