Skip to content

Commit

Permalink
Use io_uring in rmz
Browse files Browse the repository at this point in the history
Signed-off-by: Alex Saveau <[email protected]>
  • Loading branch information
SUPERCILEX committed Mar 22, 2023
1 parent e034263 commit a01a22b
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 12 deletions.
11 changes: 11 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions fuc_engine/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ thiserror = "1.0.40"
typed-builder = "0.14.0"

[target.'cfg(target_os = "linux")'.dependencies]
io-uring = "0.5.13"
rustix = { version = "0.37.3", features = ["fs", "linux_latest"] }

[target.'cfg(not(target_os = "linux"))'.dependencies]
Expand Down
77 changes: 65 additions & 12 deletions fuc_engine/src/ops/remove.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,21 +85,23 @@ mod compat {
use std::{
borrow::Cow,
ffi::{CStr, CString},
io,
mem::MaybeUninit,
num::NonZeroUsize,
os::fd::{AsRawFd, RawFd},
path::Path,
sync::Arc,
thread,
thread::JoinHandle,
};

use crossbeam_channel::{Receiver, Sender};
use io_uring::{opcode::UnlinkAt, squeue::Flags, types::Fd, IoUring};
use rustix::fs::{cwd, openat, unlinkat, AtFlags, FileType, Mode, OFlags, RawDir};

use crate::{
ops::{
compat::DirectoryOp, concat_cstrs, get_file_type, join_cstr_paths, path_buf_to_cstring,
IoErr, LazyCell,
compat::DirectoryOp, concat_cstrs, get_file_type, path_buf_to_cstring, IoErr, LazyCell,
},
Error,
};
Expand Down Expand Up @@ -145,6 +147,8 @@ mod compat {
}
}

const URING_ENTRIES: u16 = 128;

#[allow(clippy::needless_pass_by_value)]
fn root_worker_thread(tasks: Receiver<Message>) -> Result<(), Error> {
let mut available_parallelism = thread::available_parallelism()
Expand All @@ -154,20 +158,27 @@ mod compat {

thread::scope(|scope| {
let mut threads = Vec::with_capacity(available_parallelism);
let mut io_uring = IoUring::builder()
.setup_coop_taskrun()
.setup_single_issuer()
.build(URING_ENTRIES.into())
.map_io_err(|| "Failed to create io_uring".to_string())?;

{
let io_uring_fd = io_uring.as_raw_fd();
let mut buf = [MaybeUninit::<u8>::uninit(); 8192];

for message in &tasks {
if available_parallelism > 0 && !tasks.is_empty() {
available_parallelism -= 1;
threads.push(scope.spawn({
let tasks = tasks.clone();
|| worker_thread(tasks)
move || worker_thread(tasks, io_uring_fd)
}));
}

match message {
Message::Node(node) => delete_dir(node, &mut buf)?,
Message::Node(node) => delete_dir(node, &mut buf, &mut io_uring)?,
Message::Error(e) => return Err(e),
}
}
Expand All @@ -180,18 +191,48 @@ mod compat {
})
}

fn worker_thread(tasks: Receiver<Message>) -> Result<(), Error> {
fn worker_thread(tasks: Receiver<Message>, io_uring_fd: RawFd) -> Result<(), Error> {
let mut io_uring = IoUring::builder()
.setup_attach_wq(io_uring_fd)
.setup_coop_taskrun()
.setup_single_issuer()
.build(URING_ENTRIES.into())
.map_io_err(|| "Failed to create io_uring".to_string())?;
let mut buf = [MaybeUninit::<u8>::uninit(); 8192];

for message in tasks {
match message {
Message::Node(node) => delete_dir(node, &mut buf)?,
Message::Node(node) => delete_dir(node, &mut buf, &mut io_uring)?,
Message::Error(e) => return Err(e),
}
}
Ok(())
}

fn delete_dir(node: TreeNode, buf: &mut [MaybeUninit<u8>]) -> Result<(), Error> {
fn delete_dir(
node: TreeNode,
buf: &mut [MaybeUninit<u8>],
io_uring: &mut IoUring,
) -> Result<(), Error> {
fn flush_uring(io_uring: &mut IoUring, entries: usize) -> Result<(), Error> {
if entries == 0 {
return Ok(());
}

io_uring
.submit_and_wait(entries)
.map_io_err(|| "Failed to submit io_uring queue.".to_string())?;
for entry in io_uring.completion() {
if entry.result() != 0 {
return Err(io::Error::from_raw_os_error(entry.result())).map_io_err(|| {
// TODO need to pass index into user_data
"Failed to delete file".to_string()
});
}
}
Ok(())
}

let dir = openat(
cwd(),
node.path.as_c_str(),
Expand All @@ -202,6 +243,7 @@ mod compat {

let node = LazyCell::new(|| Arc::new(node));
let mut raw_dir = RawDir::new(&dir, buf);
let mut pending = 0;
while let Some(file) = raw_dir.next() {
// TODO here and other uses: https://github.com/rust-lang/rust/issues/105723
const DOT: &CStr = CStr::from_bytes_with_nul(b".\0").ok().unwrap();
Expand All @@ -225,14 +267,25 @@ mod compat {
}))
.map_err(|_| Error::Internal)?;
} else {
unlinkat(&dir, file.file_name(), AtFlags::empty()).map_io_err(|| {
format!(
"Failed to delete file: {:?}",
join_cstr_paths(&node.path, file.file_name())
pending += 1;
unsafe {
io_uring
.submission()
.push(
&UnlinkAt::new(Fd(dir.as_raw_fd()), file.file_name().as_ptr())
.build()
.flags(Flags::IO_LINK),
)
})?;
.map_err(|_| Error::Internal)?;
}
}

if raw_dir.is_buffer_empty() || pending == usize::from(URING_ENTRIES) {
flush_uring(io_uring, pending)?;
pending = 0;
}
}
flush_uring(io_uring, pending)?;
Ok(())
}

Expand Down

0 comments on commit a01a22b

Please sign in to comment.