Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(ofs): Close file during flush #4680

Merged
merged 5 commits into from
Jun 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/actions/test_behavior_bin_ofs/action.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ runs:
run: cargo test --features ${{ inputs.feature }} --no-default-features -- --nocapture
env:
OPENDAL_TEST: ${{ inputs.service }}
# Remove me after we find out what happened
RUST_TEST_THREADS: 1
EOF
- name: Run
uses: ./dynamic_test_bin_ofs
9 changes: 9 additions & 0 deletions bin/ofs/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions bin/ofs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -62,3 +62,4 @@ test-context = "0.3.0"
urlencoding = "2.1.3"
uuid = "1.7.0"
walkdir = "2.5.0"
opendal = { version = "0.46.0", path = "../../core", features = ["tests"] }
50 changes: 33 additions & 17 deletions bin/ofs/src/fuse/adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -387,15 +387,12 @@ impl PathFilesystem for FuseAdapter {
let (is_read, is_trunc, is_append) = self.check_flags(flags | libc::O_CREAT as u32)?;

let path = PathBuf::from(parent).join(name);
self.op
.write(&path.to_string_lossy(), Bytes::new())
.await
.map_err(opendal_error2errno)?;

let inner_writer = if is_trunc || is_append {
let writer = self
.op
.writer_with(&path.to_string_lossy())
.chunk(4 * 1024 * 1024)
.append(is_append)
.await
.map_err(opendal_error2errno)?;
Expand Down Expand Up @@ -425,22 +422,22 @@ impl PathFilesystem for FuseAdapter {
})
}

async fn release(
/// In design, flush could be called multiple times for a single open. But there is the only
/// place that we can handle the write operations.
///
/// So we only support the use case that flush only be called once.
async fn flush(
&self,
_req: Request,
path: Option<&OsStr>,
fh: u64,
flags: u32,
lock_owner: u64,
flush: bool,
) -> Result<()> {
log::debug!(
"release(path={:?}, fh={}, flags=0x{:x}, lock_owner={}, flush={})",
"flush(path={:?}, fh={}, lock_owner={})",
path,
fh,
flags,
lock_owner,
flush
);

let file = self
Expand All @@ -449,13 +446,9 @@ impl PathFilesystem for FuseAdapter {
.ok_or(Errno::from(libc::EBADF))?;

if let Some(inner_writer) = file.inner_writer {
inner_writer
.lock_owned()
.await
.writer
.close()
.await
.map_err(opendal_error2errno)?;
let mut lock = inner_writer.lock().await;
let res = lock.writer.close().await.map_err(opendal_error2errno);
return res;
}

if matches!(path, Some(ref p) if p != &file.path) {
Expand All @@ -465,6 +458,29 @@ impl PathFilesystem for FuseAdapter {
Ok(())
}

async fn release(
&self,
_req: Request,
path: Option<&OsStr>,
fh: u64,
flags: u32,
lock_owner: u64,
flush: bool,
) -> Result<()> {
log::debug!(
"release(path={:?}, fh={}, flags=0x{:x}, lock_owner={}, flush={})",
path,
fh,
flags,
lock_owner,
flush
);

// Just take and forget it.
let _ = self.opened_files.take(FileKey::try_from(fh)?.0);
Ok(())
}

async fn open(&self, _req: Request, path: &OsStr, flags: u32) -> Result<ReplyOpen> {
log::debug!("open(path={:?}, flags=0x{:x})", path, flags);

Expand Down
43 changes: 10 additions & 33 deletions bin/ofs/tests/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,10 @@
// specific language governing permissions and limitations
// under the License.

use std::{collections::HashMap, env, sync::OnceLock};
use std::sync::OnceLock;

use opendal::{Capability, Operator};
use opendal::raw::tests;
use opendal::Capability;
use tempfile::TempDir;
use test_context::TestContext;
use tokio::runtime::{self, Runtime};
Expand All @@ -33,7 +34,9 @@ pub struct OfsTestContext {

impl TestContext for OfsTestContext {
fn setup() -> Self {
let backend = backend();
let backend = tests::init_test_service()
.expect("init test services failed")
.expect("no test services has been configured");
let capability = backend.info().full_capability();

INIT_LOGGER.get_or_init(env_logger::init);
Expand Down Expand Up @@ -61,38 +64,12 @@ impl TestContext for OfsTestContext {
}
}

// We don't care if the unmount fails, so we ignore the result.
fn teardown(self) {
RUNTIME
let _ = RUNTIME
.get()
.expect("runtime")
.block_on(async move { self.mount_handle.unmount().await })
.unwrap();
self.mount_point.close().unwrap();
.block_on(async move { self.mount_handle.unmount().await });
let _ = self.mount_point.close();
}
}

fn backend() -> Operator {
let scheme = env::var("OPENDAL_TEST").unwrap().parse().unwrap();
let prefix = format!("opendal_{scheme}_");

let mut cfg = env::vars()
.filter_map(|(k, v)| {
k.to_lowercase()
.strip_prefix(&prefix)
.map(|k| (k.to_string(), v))
})
.collect::<HashMap<String, String>>();

// Use random root unless OPENDAL_DISABLE_RANDOM_ROOT is set to true.
let disable_random_root = env::var("OPENDAL_DISABLE_RANDOM_ROOT").unwrap_or_default() == "true";
if !disable_random_root {
let root = format!(
"{}{}/",
cfg.get("root").cloned().unwrap_or_else(|| "/".to_string()),
uuid::Uuid::new_v4()
);
cfg.insert("root".to_string(), root);
}

Operator::via_map(scheme, cfg).unwrap()
}