Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow returning ready on push #82

Merged
merged 3 commits into from
Oct 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion compio-driver/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ windows-sys = { version = "0.48", features = [
"Win32_System_Console",
"Win32_System_IO",
"Win32_System_Pipes",
"Win32_System_SystemServices",
"Win32_System_Threading",
] }

Expand Down
70 changes: 14 additions & 56 deletions compio-driver/src/iocp/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,11 @@ use slab::Slab;
use windows_sys::Win32::{
Foundation::{
RtlNtStatusToDosError, ERROR_HANDLE_EOF, ERROR_IO_INCOMPLETE, ERROR_NO_DATA,
ERROR_OPERATION_ABORTED, FACILITY_NTWIN32, INVALID_HANDLE_VALUE, NTSTATUS, STATUS_PENDING,
STATUS_SUCCESS,
ERROR_OPERATION_ABORTED, INVALID_HANDLE_VALUE, NTSTATUS, STATUS_PENDING, STATUS_SUCCESS,
},
System::{
SystemServices::ERROR_SEVERITY_ERROR,
Threading::INFINITE,
IO::{
CreateIoCompletionPort, GetQueuedCompletionStatusEx, PostQueuedCompletionStatus,
OVERLAPPED, OVERLAPPED_ENTRY,
},
IO::{CreateIoCompletionPort, GetQueuedCompletionStatusEx, OVERLAPPED, OVERLAPPED_ENTRY},
},
};

Expand Down Expand Up @@ -222,28 +217,23 @@ impl Driver {
}
}

pub fn push(&mut self, user_data: usize, op: &mut RawOp) -> Poll<io::Result<usize>> {
if self.cancelled.remove(&user_data) {
Poll::Ready(Err(io::Error::from_raw_os_error(
ERROR_OPERATION_ABORTED as _,
)))
} else {
let optr = op.as_mut_ptr();
unsafe { op.as_op_pin().operate(optr.cast()) }
}
}

pub unsafe fn poll(
&mut self,
timeout: Option<Duration>,
ops: &mut impl Iterator<Item = usize>,
entries: &mut impl Extend<Entry>,
registry: &mut Slab<RawOp>,
_registry: &mut Slab<RawOp>,
) -> io::Result<()> {
for user_data in ops {
let overlapped_ptr = registry[user_data].as_mut_ptr();
let op = registry[user_data].as_op_pin();
let result = if self.cancelled.remove(&user_data) {
Poll::Ready(Err(io::Error::from_raw_os_error(
ERROR_OPERATION_ABORTED as _,
)))
} else {
op.operate(overlapped_ptr.cast())
};
if let Poll::Ready(result) = result {
post_driver_raw(self.port.as_raw_handle(), result, overlapped_ptr.cast())?;
}
}

// Prevent stack growth.
let mut iocp_entries = ArrayVec::<OVERLAPPED_ENTRY, { Self::DEFAULT_CAPACITY }>::new();
self.poll_impl(timeout, &mut iocp_entries)?;
Expand Down Expand Up @@ -272,38 +262,6 @@ impl AsRawFd for Driver {
}
}

/// # Safety
///
/// * The handle should be valid.
/// * The overlapped_ptr should be non-null.
unsafe fn post_driver_raw(
handle: RawFd,
result: io::Result<usize>,
overlapped_ptr: *mut OVERLAPPED,
) -> io::Result<()> {
if let Err(e) = &result {
(*overlapped_ptr).Internal = ntstatus_from_win32(e.raw_os_error().unwrap_or_default()) as _;
}
syscall!(
BOOL,
PostQueuedCompletionStatus(
handle as _,
result.unwrap_or_default() as _,
0,
overlapped_ptr,
)
)?;
Ok(())
}

fn ntstatus_from_win32(x: i32) -> NTSTATUS {
if x <= 0 {
x
} else {
(x & 0x0000FFFF) | (FACILITY_NTWIN32 << 16) as NTSTATUS | ERROR_SEVERITY_ERROR as NTSTATUS
}
}

/// The overlapped struct we actually used for IOCP.
#[repr(C)]
pub struct Overlapped<T: ?Sized> {
Expand Down
21 changes: 12 additions & 9 deletions compio-driver/src/iour/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#[doc(no_inline)]
pub use std::os::fd::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
use std::{collections::VecDeque, io, pin::Pin, time::Duration};
use std::{collections::VecDeque, io, pin::Pin, task::Poll, time::Duration};

use io_uring::{
cqueue,
Expand Down Expand Up @@ -28,6 +28,7 @@ pub trait OpCode {
pub(crate) struct Driver {
inner: IoUring,
cancel_queue: VecDeque<u64>,
squeue: VecDeque<usize>,
}

impl Driver {
Expand All @@ -37,6 +38,7 @@ impl Driver {
Ok(Self {
inner: IoUring::new(entries)?,
cancel_queue: VecDeque::default(),
squeue: VecDeque::with_capacity(entries as usize),
})
}

Expand Down Expand Up @@ -64,11 +66,9 @@ impl Driver {
}
}

fn flush_submissions(
&mut self,
ops: &mut impl Iterator<Item = usize>,
registry: &mut Slab<RawOp>,
) -> bool {
fn flush_submissions(&mut self, registry: &mut Slab<RawOp>) -> bool {
let mut ops = std::iter::from_fn(|| self.squeue.pop_front()).fuse();

let mut ended_ops = false;
let mut ended_cancel = false;

Expand Down Expand Up @@ -118,17 +118,20 @@ impl Driver {
self.cancel_queue.push_back(user_data as _);
}

pub fn push(&mut self, user_data: usize, _op: &mut RawOp) -> Poll<io::Result<usize>> {
self.squeue.push_back(user_data);
Poll::Pending
}

pub unsafe fn poll(
&mut self,
timeout: Option<Duration>,
ops: &mut impl Iterator<Item = usize>,
entries: &mut impl Extend<Entry>,
registry: &mut Slab<RawOp>,
) -> io::Result<()> {
let mut ops = ops.fuse();
// Anyway we need to submit once, no matter there are entries in squeue.
loop {
let ended = self.flush_submissions(&mut ops, registry);
let ended = self.flush_submissions(registry);

self.submit_auto(timeout, ended)?;

Expand Down
47 changes: 37 additions & 10 deletions compio-driver/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
))]
compile_error!("You must choose one of these features: [\"io-uring\", \"polling\"]");

use std::{collections::VecDeque, io, time::Duration};
use std::{io, task::Poll, time::Duration};

use compio_buf::BufResult;
use slab::Slab;
Expand Down Expand Up @@ -115,12 +115,37 @@ macro_rules! impl_raw_fd {
};
}

/// The return type of [`Proactor::push`].
pub enum PushEntry<K, R> {
/// The operation is pushed to the submission queue.
Pending(K),
/// The operation is ready and returns.
Ready(R),
}

impl<K, R> PushEntry<K, R> {
/// Map the [`PushEntry::Pending`] branch.
pub fn map_pending<L>(self, f: impl FnOnce(K) -> L) -> PushEntry<L, R> {
match self {
Self::Pending(k) => PushEntry::Pending(f(k)),
Self::Ready(r) => PushEntry::Ready(r),
}
}

/// Map the [`PushEntry::Ready`] branch.
pub fn map_ready<S>(self, f: impl FnOnce(R) -> S) -> PushEntry<K, S> {
match self {
Self::Pending(k) => PushEntry::Pending(k),
Self::Ready(r) => PushEntry::Ready(f(r)),
}
}
}

/// Low-level actions of completion-based IO.
/// It owns the operations to keep the driver safe.
pub struct Proactor {
driver: Driver,
ops: Slab<RawOp>,
squeue: VecDeque<usize>,
}

impl Proactor {
Expand All @@ -134,7 +159,6 @@ impl Proactor {
Ok(Self {
driver: Driver::new(entries)?,
ops: Slab::with_capacity(entries as _),
squeue: VecDeque::with_capacity(entries as _),
})
}

Expand Down Expand Up @@ -167,13 +191,18 @@ impl Proactor {

/// Push an operation into the driver, and return the unique key, called
/// user-defined data, associated with it.
pub fn push(&mut self, op: impl OpCode + 'static) -> usize {
pub fn push<T: OpCode + 'static>(&mut self, op: T) -> PushEntry<usize, BufResult<usize, T>> {
let entry = self.ops.vacant_entry();
let user_data = entry.key();
let op = RawOp::new(user_data, op);
entry.insert(op);
self.squeue.push_back(user_data);
user_data
let op = entry.insert(op);
match self.driver.push(user_data, op) {
Poll::Pending => PushEntry::Pending(user_data),
Poll::Ready(res) => {
let op = self.ops.remove(user_data);
PushEntry::Ready(BufResult(res, unsafe { op.into_inner::<T>() }))
}
}
}

/// Poll the driver and get completed entries.
Expand All @@ -183,10 +212,8 @@ impl Proactor {
timeout: Option<Duration>,
entries: &mut impl Extend<Entry>,
) -> io::Result<()> {
let mut iter = std::iter::from_fn(|| self.squeue.pop_front());
unsafe {
self.driver
.poll(timeout, &mut iter, entries, &mut self.ops)?;
self.driver.poll(timeout, entries, &mut self.ops)?;
}
Ok(())
}
Expand Down
Loading