From f7f9d205db4d8a92adcf14e026e53b251057d4bc Mon Sep 17 00:00:00 2001 From: Ho 229 Date: Sat, 25 May 2024 14:39:52 +0800 Subject: [PATCH] refactor(bin/ofs): Fuse API (#4637) * fix: ofs write behavior [wip] * fix: set `written` to end of the file when `is_append` * fix: ofs behavior tests * fix: clippy * ci: add timeout for ofs tests * ci * ci * fix: check flags * feat: file truncate test * fix * fix * refactor: `Fuse` APIs * fix: mount on freebsd * refactor: integration tests * chore * chore * Update file.rs * fix: remove unnecessary sleep --- bin/ofs/src/bin/ofs.rs | 85 ++++++++++++++- bin/ofs/src/config.rs | 33 ------ bin/ofs/src/{fuse.rs => fuse/adapter.rs} | 6 +- bin/ofs/src/fuse/mod.rs | 133 +++++++++++++++++++++++ bin/ofs/src/lib.rs | 87 +-------------- bin/ofs/tests/common/mod.rs | 60 ++++------ bin/ofs/tests/file.rs | 19 +--- 7 files changed, 247 insertions(+), 176 deletions(-) delete mode 100644 bin/ofs/src/config.rs rename bin/ofs/src/{fuse.rs => fuse/adapter.rs} (99%) create mode 100644 bin/ofs/src/fuse/mod.rs diff --git a/bin/ofs/src/bin/ofs.rs b/bin/ofs/src/bin/ofs.rs index ffc237f2ba9..44b2d298e20 100644 --- a/bin/ofs/src/bin/ofs.rs +++ b/bin/ofs/src/bin/ofs.rs @@ -17,11 +17,92 @@ use anyhow::Result; use clap::Parser; +use url::Url; + +#[derive(Parser, Debug)] +#[command(version, about)] +struct Config { + /// fuse mount path + #[arg(env = "OFS_MOUNT_PATH", index = 1)] + mount_path: String, + + /// location of opendal service + /// format: ://?=&= + /// example: fs://?root=/tmp + #[arg(env = "OFS_BACKEND", index = 2)] + backend: Url, +} #[tokio::main(flavor = "multi_thread")] async fn main() -> Result<()> { - let cfg = ofs::Config::parse(); + let cfg = Config::parse(); env_logger::init(); - ofs::execute(cfg).await + execute(cfg).await +} + +#[cfg(any(target_os = "linux", target_os = "freebsd"))] +async fn execute(cfg: Config) -> Result<()> { + use std::{collections::HashMap, env, str::FromStr}; + + use anyhow::anyhow; + use ofs::fuse::Fuse; + use opendal::{Operator, Scheme}; + + if cfg.backend.has_host() { + log::warn!("backend host will be ignored"); + } + + let scheme_str = cfg.backend.scheme(); + let op_args = cfg + .backend + .query_pairs() + .into_owned() + .collect::>(); + + let scheme = match Scheme::from_str(scheme_str) { + Ok(Scheme::Custom(_)) | Err(_) => Err(anyhow!("invalid scheme: {}", scheme_str)), + Ok(s) => Ok(s), + }?; + let backend = Operator::via_map(scheme, op_args)?; + + #[cfg(target_os = "linux")] + let mut mount_handle = if nix::unistd::getuid().is_root() { + let mut fuse = Fuse::new(); + if let Some(gid) = env::var("SUDO_GID") + .ok() + .and_then(|gid_str| gid_str.parse::().ok()) + { + fuse = fuse.gid(gid); + } + if let Some(uid) = env::var("SUDO_UID") + .ok() + .and_then(|gid_str| gid_str.parse::().ok()) + { + fuse = fuse.uid(uid); + } + fuse.mount(cfg.mount_path, backend).await? + } else { + Fuse::new() + .mount_with_unprivileged(cfg.mount_path, backend) + .await? + }; + + #[cfg(target_os = "freebsd")] + let mut mount_handle = Fuse::new().mount(cfg.mount_path, backend).await?; + + let handle = &mut mount_handle; + tokio::select! { + res = handle => res?, + _ = tokio::signal::ctrl_c() => { + mount_handle.unmount().await? + } + } + + Ok(()) +} + +#[cfg(not(any(target_os = "linux", target_os = "freebsd")))] +async fn execute(_cfg: Config) -> Result<()> { + Err(anyhow::anyhow!("platform not supported")) } diff --git a/bin/ofs/src/config.rs b/bin/ofs/src/config.rs deleted file mode 100644 index 120d667ad74..00000000000 --- a/bin/ofs/src/config.rs +++ /dev/null @@ -1,33 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use clap::Parser; -use url::Url; - -#[derive(Parser, Debug)] -#[command(version, about)] -pub struct Config { - /// fuse mount path - #[arg(env = "OFS_MOUNT_PATH", index = 1)] - pub mount_path: String, - - /// location of opendal service - /// format: ://?=&= - /// example: fs://?root=/tmp - #[arg(env = "OFS_BACKEND", index = 2)] - pub backend: Url, -} diff --git a/bin/ofs/src/fuse.rs b/bin/ofs/src/fuse/adapter.rs similarity index 99% rename from bin/ofs/src/fuse.rs rename to bin/ofs/src/fuse/adapter.rs index 85e76c69ee9..45ebf79b31c 100644 --- a/bin/ofs/src/fuse.rs +++ b/bin/ofs/src/fuse/adapter.rs @@ -74,14 +74,14 @@ impl FileKey { } } -pub(super) struct Fuse { +pub(super) struct FuseAdapter { op: Operator, gid: u32, uid: u32, opened_files: Slab, } -impl Fuse { +impl FuseAdapter { pub fn new(op: Operator, uid: u32, gid: u32) -> Self { Self { op, @@ -152,7 +152,7 @@ impl Fuse { } } -impl PathFilesystem for Fuse { +impl PathFilesystem for FuseAdapter { type DirEntryStream<'a> = BoxStream<'a, Result>; type DirEntryPlusStream<'a> = BoxStream<'a, Result>; diff --git a/bin/ofs/src/fuse/mod.rs b/bin/ofs/src/fuse/mod.rs new file mode 100644 index 00000000000..21f97148cba --- /dev/null +++ b/bin/ofs/src/fuse/mod.rs @@ -0,0 +1,133 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +mod adapter; + +use adapter::FuseAdapter; + +pub use fuse3::raw::MountHandle; + +use fuse3::{path::Session, MountOptions}; +use opendal::Operator; +use std::{io, path::Path}; + +/// Ofs fuse filesystem mounter. +#[derive(Debug, Clone)] +pub struct Fuse { + mount_options: MountOptions, + uid: Option, + gid: Option, +} + +impl Fuse { + pub fn new() -> Self { + Fuse::default() + } + + /// Set fuse filesystem mount user_id, default is current uid. + pub fn uid(mut self, uid: u32) -> Self { + self.uid.replace(uid); + self.mount_options.uid(uid); + self + } + + /// Set fuse filesystem mount group_id, default is current gid. + pub fn gid(mut self, gid: u32) -> Self { + self.gid.replace(gid); + self.mount_options.gid(gid); + self + } + + /// Set fuse filesystem name, default is __OpenDAL Filesystem__. + pub fn fs_name(mut self, name: impl Into) -> Self { + self.mount_options.fs_name(name); + self + } + + /// Set fuse filesystem `allow_root` mount option, default is disable. + pub fn allow_root(mut self, allow_root: bool) -> Self { + self.mount_options.allow_root(allow_root); + self + } + + /// Set fuse filesystem allow_other mount option, default is disable. + pub fn allow_other(mut self, allow_other: bool) -> Self { + self.mount_options.allow_other(allow_other); + self + } + + /// Set fuse filesystem `ro` mount option, default is disable. + pub fn read_only(mut self, read_only: bool) -> Self { + self.mount_options.read_only(read_only); + self + } + + /// Allow fuse filesystem mount on a non-empty directory, default is not allowed. + pub fn allow_non_empty(mut self, allow_non_empty: bool) -> Self { + self.mount_options.nonempty(allow_non_empty); + self + } + + /// Mount the filesystem with root permission. + pub async fn mount( + self, + mount_point: impl AsRef, + op: Operator, + ) -> io::Result { + let adapter = FuseAdapter::new( + op, + self.uid.unwrap_or_else(|| nix::unistd::getuid().into()), + self.gid.unwrap_or_else(|| nix::unistd::getgid().into()), + ); + Session::new(self.mount_options) + .mount(adapter, mount_point) + .await + } + + /// Mount the filesystem without root permission. + pub async fn mount_with_unprivileged( + self, + mount_point: impl AsRef, + op: Operator, + ) -> io::Result { + log::warn!("unprivileged mount may not detect external unmount, tracking issue: https://github.com/Sherlock-Holo/fuse3/issues/72"); + + let adapter = FuseAdapter::new( + op, + self.uid.unwrap_or_else(|| nix::unistd::getuid().into()), + self.gid.unwrap_or_else(|| nix::unistd::getgid().into()), + ); + Session::new(self.mount_options) + .mount_with_unprivileged(adapter, mount_point) + .await + } +} + +impl Default for Fuse { + fn default() -> Self { + let mut mount_options = MountOptions::default(); + mount_options + .fs_name("OpenDAL Filesystem") + .no_open_dir_support(true); + + Self { + mount_options, + uid: None, + gid: None, + } + } +} diff --git a/bin/ofs/src/lib.rs b/bin/ofs/src/lib.rs index 7fcf9641082..599e58249c6 100644 --- a/bin/ofs/src/lib.rs +++ b/bin/ofs/src/lib.rs @@ -15,90 +15,5 @@ // specific language governing permissions and limitations // under the License. -use std::collections::HashMap; -use std::str::FromStr; - -use anyhow::anyhow; -use anyhow::Result; -use opendal::Operator; -use opendal::Scheme; -use tokio::signal; - -pub mod config; -pub use config::Config; - -mod fuse; - -pub async fn execute(cfg: Config) -> Result<()> { - if cfg.backend.has_host() { - log::warn!("backend host will be ignored"); - } - - let scheme_str = cfg.backend.scheme(); - let op_args = cfg - .backend - .query_pairs() - .into_owned() - .collect::>(); - - let scheme = match Scheme::from_str(scheme_str) { - Ok(Scheme::Custom(_)) | Err(_) => Err(anyhow!("invalid scheme: {}", scheme_str)), - Ok(s) => Ok(s), - }?; - let backend = Operator::via_map(scheme, op_args)?; - - let args = Args { - mount_path: cfg.mount_path, - backend, - }; - execute_inner(args).await -} - -#[derive(Debug)] -struct Args { - mount_path: String, - backend: Operator, -} -#[cfg(not(any(target_os = "linux", target_os = "freebsd")))] -async fn execute_inner(args: Args) -> Result<()> { - _ = args.backend; - Err(anyhow::anyhow!("platform not supported")) -} - #[cfg(any(target_os = "linux", target_os = "freebsd"))] -async fn execute_inner(args: Args) -> Result<()> { - use fuse3::path::Session; - use fuse3::MountOptions; - - let uid = nix::unistd::getuid(); - let gid = nix::unistd::getgid(); - - let mut mount_option = MountOptions::default(); - mount_option.uid(uid.into()); - mount_option.gid(gid.into()); - mount_option.no_open_dir_support(true); - - let adapter = fuse::Fuse::new(args.backend, uid.into(), gid.into()); - - let session = Session::new(mount_option); - - let mut mount_handle = if uid.is_root() { - session.mount(adapter, args.mount_path).await? - } else { - log::warn!("unprivileged mount may not detect external unmount, tracking issue: https://github.com/Sherlock-Holo/fuse3/issues/72"); - session - .mount_with_unprivileged(adapter, args.mount_path) - .await? - }; - - let handle = &mut mount_handle; - - tokio::select! { - res = handle => res?, - _ = signal::ctrl_c() => { - mount_handle.unmount().await? - } - } - - Ok(()) -} +pub mod fuse; diff --git a/bin/ofs/tests/common/mod.rs b/bin/ofs/tests/common/mod.rs index df977cbd1ff..f2df4a12938 100644 --- a/bin/ofs/tests/common/mod.rs +++ b/bin/ofs/tests/common/mod.rs @@ -15,70 +15,64 @@ // specific language governing permissions and limitations // under the License. -use std::{collections::HashMap, env, process::Command, sync::OnceLock, thread, time::Duration}; +use std::{collections::HashMap, env, sync::OnceLock}; +use opendal::{Capability, Operator}; use tempfile::TempDir; use test_context::TestContext; -use tokio::{ - runtime::{self, Runtime}, - task::JoinHandle, -}; +use tokio::runtime::{self, Runtime}; static INIT_LOGGER: OnceLock<()> = OnceLock::new(); static RUNTIME: OnceLock = OnceLock::new(); -pub(crate) struct OfsTestContext { +pub struct OfsTestContext { pub mount_point: TempDir, - ofs_task: JoinHandle<()>, + pub capability: Capability, + mount_handle: ofs::fuse::MountHandle, } impl TestContext for OfsTestContext { fn setup() -> Self { - let backend = backend_scheme().unwrap(); + let backend = backend(); + let capability = backend.info().full_capability(); INIT_LOGGER.get_or_init(env_logger::init); let mount_point = tempfile::tempdir().unwrap(); let mount_point_str = mount_point.path().to_string_lossy().to_string(); - let ofs_task = RUNTIME + let mount_handle = RUNTIME .get_or_init(|| { runtime::Builder::new_multi_thread() .enable_all() .build() .expect("build runtime") }) - .spawn(async move { - ofs::execute(ofs::Config { - mount_path: mount_point_str, - backend: backend.parse().unwrap(), - }) - .await - .unwrap(); - }); - - // wait for ofs to start - thread::sleep(Duration::from_secs(1)); + .block_on(async move { + ofs::fuse::Fuse::new() + .mount_with_unprivileged(mount_point_str, backend) + .await + }) + .unwrap(); OfsTestContext { mount_point, - ofs_task, + capability, + mount_handle, } } fn teardown(self) { - // FIXME: ofs could not unmount - Command::new("fusermount3") - .args(["-u", self.mount_point.path().to_str().unwrap()]) - .output() + RUNTIME + .get() + .expect("runtime") + .block_on(async move { self.mount_handle.unmount().await }) .unwrap(); - - self.ofs_task.abort(); self.mount_point.close().unwrap(); } } -fn backend_scheme() -> Option { - let scheme = env::var("OPENDAL_TEST").ok()?; +fn backend() -> Operator { + let scheme = env::var("OPENDAL_TEST").unwrap().parse().unwrap(); let prefix = format!("opendal_{scheme}_"); let mut cfg = env::vars() @@ -100,11 +94,5 @@ fn backend_scheme() -> Option { cfg.insert("root".to_string(), root); } - let params = cfg - .into_iter() - .map(|(k, v)| format!("{}={}", urlencoding::encode(&k), urlencoding::encode(&v))) - .collect::>() - .join("&"); - - Some(format!("{scheme}://?{params}")) + Operator::via_map(scheme, cfg).unwrap() } diff --git a/bin/ofs/tests/file.rs b/bin/ofs/tests/file.rs index 6e55351e1b7..e6575e37697 100644 --- a/bin/ofs/tests/file.rs +++ b/bin/ofs/tests/file.rs @@ -18,7 +18,6 @@ mod common; use std::{ - env, fs::{self, File, OpenOptions}, io::{Read, Seek, SeekFrom, Write}, thread, @@ -40,8 +39,6 @@ fn test_file(ctx: &mut OfsTestContext) { file.write_all(TEST_TEXT.as_bytes()).unwrap(); drop(file); - thread::sleep(Duration::from_secs(1)); - let mut file = File::open(&path).unwrap(); let mut buf = String::new(); file.read_to_string(&mut buf).unwrap(); @@ -54,7 +51,9 @@ fn test_file(ctx: &mut OfsTestContext) { #[test_context(OfsTestContext)] #[test] fn test_file_append(ctx: &mut OfsTestContext) { - if env::var("OPENDAL_TEST").unwrap() == "s3" { + if !ctx.capability.write_can_append { + // wait for ofs to be ready + thread::sleep(Duration::from_secs(1)); return; } @@ -64,14 +63,10 @@ fn test_file_append(ctx: &mut OfsTestContext) { file.write_all(TEST_TEXT.as_bytes()).unwrap(); drop(file); - thread::sleep(Duration::from_secs(1)); - let mut file = File::options().append(true).open(&path).unwrap(); file.write_all(b"test").unwrap(); drop(file); - thread::sleep(Duration::from_secs(1)); - let mut file = File::open(&path).unwrap(); let mut buf = String::new(); file.read_to_string(&mut buf).unwrap(); @@ -90,8 +85,6 @@ fn test_file_seek(ctx: &mut OfsTestContext) { file.write_all(TEST_TEXT.as_bytes()).unwrap(); drop(file); - thread::sleep(Duration::from_secs(1)); - let mut file = File::open(&path).unwrap(); file.seek(SeekFrom::Start(TEST_TEXT.len() as u64 / 2)) .unwrap(); @@ -100,8 +93,6 @@ fn test_file_seek(ctx: &mut OfsTestContext) { assert_eq!(buf, TEST_TEXT[TEST_TEXT.len() / 2..]); drop(file); - thread::sleep(Duration::from_secs(1)); - fs::remove_file(path).unwrap(); } @@ -113,8 +104,6 @@ fn test_file_truncate(ctx: &mut OfsTestContext) { file.write_all(TEST_TEXT.as_bytes()).unwrap(); drop(file); - thread::sleep(Duration::from_secs(1)); - let mut file = OpenOptions::new() .write(true) .truncate(true) @@ -124,8 +113,6 @@ fn test_file_truncate(ctx: &mut OfsTestContext) { .unwrap(); drop(file); - thread::sleep(Duration::from_secs(1)); - assert_eq!( fs::read_to_string(&path).unwrap(), TEST_TEXT[..TEST_TEXT.len() / 2]