Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Named Pipes: Support Message Read Mode #5350

Merged
merged 4 commits into from
Feb 18, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 36 additions & 5 deletions tokio/src/net/windows/named_pipe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1701,14 +1701,18 @@ impl ServerOptions {
/// The default pipe mode is [`PipeMode::Byte`]. See [`PipeMode`] for
/// documentation of what each mode means.
///
/// This corresponding to specifying [`dwPipeMode`].
/// This corresponds to specifying `PIPE_TYPE_` and `PIPE_READMODE_` in [`dwPipeMode`].
///
/// [`dwPipeMode`]: https://docs.microsoft.com/en-us/windows/win32/api/winbase/nf-winbase-createnamedpipea
pub fn pipe_mode(&mut self, pipe_mode: PipeMode) -> &mut Self {
let is_msg = matches!(pipe_mode, PipeMode::Message);
// Pipe mode is implemented as a bit flag 0x4. Set is message and unset
// is byte.
bool_flag!(self.pipe_mode, is_msg, windows_sys::PIPE_TYPE_MESSAGE);
bool_flag!(
self.pipe_mode,
is_msg,
windows_sys::PIPE_TYPE_MESSAGE | windows_sys::PIPE_READMODE_MESSAGE
);
self
}

Expand Down Expand Up @@ -2268,6 +2272,7 @@ impl ServerOptions {
pub struct ClientOptions {
desired_access: u32,
security_qos_flags: u32,
pipe_mode: PipeMode,
}

impl ClientOptions {
Expand All @@ -2289,6 +2294,7 @@ impl ClientOptions {
desired_access: windows_sys::GENERIC_READ | windows_sys::GENERIC_WRITE,
security_qos_flags: windows_sys::SECURITY_IDENTIFICATION
| windows_sys::SECURITY_SQOS_PRESENT,
pipe_mode: PipeMode::Byte,
}
}

Expand Down Expand Up @@ -2341,6 +2347,15 @@ impl ClientOptions {
self
}

/// The pipe mode.
///
/// The default pipe mode is [`PipeMode::Byte`]. See [`PipeMode`] for
/// documentation of what each mode means.
pub fn pipe_mode(&mut self, pipe_mode: PipeMode) -> &mut Self {
self.pipe_mode = pipe_mode;
self
}

/// Opens the named pipe identified by `addr`.
///
/// This opens the client using [`CreateFile`] with the
Expand Down Expand Up @@ -2437,6 +2452,20 @@ impl ClientOptions {
return Err(io::Error::last_os_error());
}

if matches!(self.pipe_mode, PipeMode::Message) {
let mut mode = windows_sys::PIPE_READMODE_MESSAGE;
let result = windows_sys::SetNamedPipeHandleState(
h,
&mut mode,
ptr::null_mut(),
ptr::null_mut(),
);

if result == 0 {
return Err(io::Error::last_os_error());
}
}

NamedPipeClient::from_raw_handle(h as _)
}

Expand Down Expand Up @@ -2556,7 +2585,9 @@ unsafe fn named_pipe_info(handle: RawHandle) -> io::Result<PipeInfo> {

#[cfg(test)]
mod test {
use self::windows_sys::{PIPE_REJECT_REMOTE_CLIENTS, PIPE_TYPE_BYTE, PIPE_TYPE_MESSAGE};
use self::windows_sys::{
PIPE_READMODE_MESSAGE, PIPE_REJECT_REMOTE_CLIENTS, PIPE_TYPE_BYTE, PIPE_TYPE_MESSAGE,
};
use super::*;

#[test]
Expand Down Expand Up @@ -2588,13 +2619,13 @@ mod test {

opts.reject_remote_clients(false);
opts.pipe_mode(PipeMode::Message);
assert_eq!(opts.pipe_mode, PIPE_TYPE_MESSAGE);
assert_eq!(opts.pipe_mode, PIPE_TYPE_MESSAGE | PIPE_READMODE_MESSAGE);

opts.reject_remote_clients(true);
opts.pipe_mode(PipeMode::Message);
assert_eq!(
opts.pipe_mode,
PIPE_TYPE_MESSAGE | PIPE_REJECT_REMOTE_CLIENTS
PIPE_TYPE_MESSAGE | PIPE_READMODE_MESSAGE | PIPE_REJECT_REMOTE_CLIENTS
);
Comment on lines 2620 to 2629
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens here if we change the pipe mode back to byte? Is PIPE_READMODE_MESSAGE still set?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PIPE_READMODE_MESSAGE is cleared. I've pushed c582f04 to add that to the test. :)

I have another commit (658c3f8) queued after this to refactor the code to not use bitfields (#5359).

}
}
48 changes: 41 additions & 7 deletions tokio/tests/net_named_pipe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::io;
use std::mem;
use std::os::windows::io::AsRawHandle;
use std::time::Duration;
use tokio::io::AsyncWriteExt;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::windows::named_pipe::{ClientOptions, PipeMode, ServerOptions};
use tokio::time;
use windows_sys::Win32::Foundation::{ERROR_NO_DATA, ERROR_PIPE_BUSY, NO_ERROR, UNICODE_STRING};
Expand Down Expand Up @@ -327,17 +327,51 @@ async fn test_named_pipe_multi_client_ready() -> io::Result<()> {
Ok(())
}

// This tests what happens when a client tries to disconnect.
// This tests that message mode works as expected.
#[tokio::test]
async fn test_named_pipe_mode_message() -> io::Result<()> {
const PIPE_NAME: &str = r"\\.\pipe\test-named-pipe-mode-message";
// it's easy to accidentally get a seemingly working test here because byte pipes
// often return contents at write boundaries. to make sure we're doing the right thing we
// explicitly test that it doesn't work in byte mode.
_named_pipe_mode_message(PipeMode::Message).await?;
_named_pipe_mode_message(PipeMode::Byte).await
}

async fn _named_pipe_mode_message(mode: PipeMode) -> io::Result<()> {
let pipe_name = format!(
r"\\.\pipe\test-named-pipe-mode-message-{}",
matches!(mode, PipeMode::Message)
);
let mut buf = [0u8; 32];

let server = ServerOptions::new()
.pipe_mode(PipeMode::Message)
.create(PIPE_NAME)?;
let mut server = ServerOptions::new()
.first_pipe_instance(true)
.pipe_mode(mode)
.create(&pipe_name)?;

let mut client = ClientOptions::new().pipe_mode(mode).open(&pipe_name)?;

let _ = ClientOptions::new().open(PIPE_NAME)?;
server.connect().await?;

// this needs a few iterations, presumably Windows waits for a few calls before merging buffers
for _ in 0..10 {
client.write_all(b"hello").await?;
server.write_all(b"world").await?;
}
for _ in 0..10 {
let n = server.read(&mut buf).await?;
if buf[..n] != b"hello"[..] {
assert!(matches!(mode, PipeMode::Byte));
return Ok(());
}
let n = client.read(&mut buf).await?;
if buf[..n] != b"world"[..] {
assert!(matches!(mode, PipeMode::Byte));
return Ok(());
}
}
// byte mode should have errored before.
assert!(matches!(mode, PipeMode::Message));
Ok(())
}

Expand Down