Skip to content

Commit

Permalink
refactor(bin/ofs): Fuse API (#4637)
Browse files Browse the repository at this point in the history
* fix: ofs write behavior [wip]

* fix: set `written` to end of the file when `is_append`

* fix: ofs behavior tests

* fix: clippy

* ci: add timeout for ofs tests

* ci

* ci

* fix: check flags

* feat: file truncate test

* fix

* fix

* refactor: `Fuse` APIs

* fix: mount on freebsd

* refactor: integration tests

* chore

* chore

* Update file.rs

* fix: remove unnecessary sleep
  • Loading branch information
ho-229 authored May 25, 2024
1 parent 90a8e16 commit f7f9d20
Show file tree
Hide file tree
Showing 7 changed files with 247 additions and 176 deletions.
85 changes: 83 additions & 2 deletions bin/ofs/src/bin/ofs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,92 @@

use anyhow::Result;
use clap::Parser;
use url::Url;

#[derive(Parser, Debug)]
#[command(version, about)]
struct Config {
/// fuse mount path
#[arg(env = "OFS_MOUNT_PATH", index = 1)]
mount_path: String,

/// location of opendal service
/// format: <scheme>://?<key>=<value>&<key>=<value>
/// example: fs://?root=/tmp
#[arg(env = "OFS_BACKEND", index = 2)]
backend: Url,
}

#[tokio::main(flavor = "multi_thread")]
async fn main() -> Result<()> {
let cfg = ofs::Config::parse();
let cfg = Config::parse();

env_logger::init();
ofs::execute(cfg).await
execute(cfg).await
}

#[cfg(any(target_os = "linux", target_os = "freebsd"))]
async fn execute(cfg: Config) -> Result<()> {
use std::{collections::HashMap, env, str::FromStr};

use anyhow::anyhow;
use ofs::fuse::Fuse;
use opendal::{Operator, Scheme};

if cfg.backend.has_host() {
log::warn!("backend host will be ignored");
}

let scheme_str = cfg.backend.scheme();
let op_args = cfg
.backend
.query_pairs()
.into_owned()
.collect::<HashMap<String, String>>();

let scheme = match Scheme::from_str(scheme_str) {
Ok(Scheme::Custom(_)) | Err(_) => Err(anyhow!("invalid scheme: {}", scheme_str)),
Ok(s) => Ok(s),
}?;
let backend = Operator::via_map(scheme, op_args)?;

#[cfg(target_os = "linux")]
let mut mount_handle = if nix::unistd::getuid().is_root() {
let mut fuse = Fuse::new();
if let Some(gid) = env::var("SUDO_GID")
.ok()
.and_then(|gid_str| gid_str.parse::<u32>().ok())
{
fuse = fuse.gid(gid);
}
if let Some(uid) = env::var("SUDO_UID")
.ok()
.and_then(|gid_str| gid_str.parse::<u32>().ok())
{
fuse = fuse.uid(uid);
}
fuse.mount(cfg.mount_path, backend).await?
} else {
Fuse::new()
.mount_with_unprivileged(cfg.mount_path, backend)
.await?
};

#[cfg(target_os = "freebsd")]
let mut mount_handle = Fuse::new().mount(cfg.mount_path, backend).await?;

let handle = &mut mount_handle;
tokio::select! {
res = handle => res?,
_ = tokio::signal::ctrl_c() => {
mount_handle.unmount().await?
}
}

Ok(())
}

#[cfg(not(any(target_os = "linux", target_os = "freebsd")))]
async fn execute(_cfg: Config) -> Result<()> {
Err(anyhow::anyhow!("platform not supported"))
}
33 changes: 0 additions & 33 deletions bin/ofs/src/config.rs

This file was deleted.

6 changes: 3 additions & 3 deletions bin/ofs/src/fuse.rs → bin/ofs/src/fuse/adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,14 +74,14 @@ impl FileKey {
}
}

pub(super) struct Fuse {
pub(super) struct FuseAdapter {
op: Operator,
gid: u32,
uid: u32,
opened_files: Slab<OpenedFile>,
}

impl Fuse {
impl FuseAdapter {
pub fn new(op: Operator, uid: u32, gid: u32) -> Self {
Self {
op,
Expand Down Expand Up @@ -152,7 +152,7 @@ impl Fuse {
}
}

impl PathFilesystem for Fuse {
impl PathFilesystem for FuseAdapter {
type DirEntryStream<'a> = BoxStream<'a, Result<DirectoryEntry>>;
type DirEntryPlusStream<'a> = BoxStream<'a, Result<DirectoryEntryPlus>>;

Expand Down
133 changes: 133 additions & 0 deletions bin/ofs/src/fuse/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

mod adapter;

use adapter::FuseAdapter;

pub use fuse3::raw::MountHandle;

use fuse3::{path::Session, MountOptions};
use opendal::Operator;
use std::{io, path::Path};

/// Ofs fuse filesystem mounter.
#[derive(Debug, Clone)]
pub struct Fuse {
mount_options: MountOptions,
uid: Option<u32>,
gid: Option<u32>,
}

impl Fuse {
pub fn new() -> Self {
Fuse::default()
}

/// Set fuse filesystem mount user_id, default is current uid.
pub fn uid(mut self, uid: u32) -> Self {
self.uid.replace(uid);
self.mount_options.uid(uid);
self
}

/// Set fuse filesystem mount group_id, default is current gid.
pub fn gid(mut self, gid: u32) -> Self {
self.gid.replace(gid);
self.mount_options.gid(gid);
self
}

/// Set fuse filesystem name, default is __OpenDAL Filesystem__.
pub fn fs_name(mut self, name: impl Into<String>) -> Self {
self.mount_options.fs_name(name);
self
}

/// Set fuse filesystem `allow_root` mount option, default is disable.
pub fn allow_root(mut self, allow_root: bool) -> Self {
self.mount_options.allow_root(allow_root);
self
}

/// Set fuse filesystem allow_other mount option, default is disable.
pub fn allow_other(mut self, allow_other: bool) -> Self {
self.mount_options.allow_other(allow_other);
self
}

/// Set fuse filesystem `ro` mount option, default is disable.
pub fn read_only(mut self, read_only: bool) -> Self {
self.mount_options.read_only(read_only);
self
}

/// Allow fuse filesystem mount on a non-empty directory, default is not allowed.
pub fn allow_non_empty(mut self, allow_non_empty: bool) -> Self {
self.mount_options.nonempty(allow_non_empty);
self
}

/// Mount the filesystem with root permission.
pub async fn mount(
self,
mount_point: impl AsRef<Path>,
op: Operator,
) -> io::Result<MountHandle> {
let adapter = FuseAdapter::new(
op,
self.uid.unwrap_or_else(|| nix::unistd::getuid().into()),
self.gid.unwrap_or_else(|| nix::unistd::getgid().into()),
);
Session::new(self.mount_options)
.mount(adapter, mount_point)
.await
}

/// Mount the filesystem without root permission.
pub async fn mount_with_unprivileged(
self,
mount_point: impl AsRef<Path>,
op: Operator,
) -> io::Result<MountHandle> {
log::warn!("unprivileged mount may not detect external unmount, tracking issue: https://github.com/Sherlock-Holo/fuse3/issues/72");

let adapter = FuseAdapter::new(
op,
self.uid.unwrap_or_else(|| nix::unistd::getuid().into()),
self.gid.unwrap_or_else(|| nix::unistd::getgid().into()),
);
Session::new(self.mount_options)
.mount_with_unprivileged(adapter, mount_point)
.await
}
}

impl Default for Fuse {
fn default() -> Self {
let mut mount_options = MountOptions::default();
mount_options
.fs_name("OpenDAL Filesystem")
.no_open_dir_support(true);

Self {
mount_options,
uid: None,
gid: None,
}
}
}
87 changes: 1 addition & 86 deletions bin/ofs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,90 +15,5 @@
// specific language governing permissions and limitations
// under the License.

use std::collections::HashMap;
use std::str::FromStr;

use anyhow::anyhow;
use anyhow::Result;
use opendal::Operator;
use opendal::Scheme;
use tokio::signal;

pub mod config;
pub use config::Config;

mod fuse;

pub async fn execute(cfg: Config) -> Result<()> {
if cfg.backend.has_host() {
log::warn!("backend host will be ignored");
}

let scheme_str = cfg.backend.scheme();
let op_args = cfg
.backend
.query_pairs()
.into_owned()
.collect::<HashMap<String, String>>();

let scheme = match Scheme::from_str(scheme_str) {
Ok(Scheme::Custom(_)) | Err(_) => Err(anyhow!("invalid scheme: {}", scheme_str)),
Ok(s) => Ok(s),
}?;
let backend = Operator::via_map(scheme, op_args)?;

let args = Args {
mount_path: cfg.mount_path,
backend,
};
execute_inner(args).await
}

#[derive(Debug)]
struct Args {
mount_path: String,
backend: Operator,
}
#[cfg(not(any(target_os = "linux", target_os = "freebsd")))]
async fn execute_inner(args: Args) -> Result<()> {
_ = args.backend;
Err(anyhow::anyhow!("platform not supported"))
}

#[cfg(any(target_os = "linux", target_os = "freebsd"))]
async fn execute_inner(args: Args) -> Result<()> {
use fuse3::path::Session;
use fuse3::MountOptions;

let uid = nix::unistd::getuid();
let gid = nix::unistd::getgid();

let mut mount_option = MountOptions::default();
mount_option.uid(uid.into());
mount_option.gid(gid.into());
mount_option.no_open_dir_support(true);

let adapter = fuse::Fuse::new(args.backend, uid.into(), gid.into());

let session = Session::new(mount_option);

let mut mount_handle = if uid.is_root() {
session.mount(adapter, args.mount_path).await?
} else {
log::warn!("unprivileged mount may not detect external unmount, tracking issue: https://github.com/Sherlock-Holo/fuse3/issues/72");
session
.mount_with_unprivileged(adapter, args.mount_path)
.await?
};

let handle = &mut mount_handle;

tokio::select! {
res = handle => res?,
_ = signal::ctrl_c() => {
mount_handle.unmount().await?
}
}

Ok(())
}
pub mod fuse;
Loading

0 comments on commit f7f9d20

Please sign in to comment.