Skip to content

Commit

Permalink
Fix HUP notifications on windows (#1370)
Browse files Browse the repository at this point in the history
  • Loading branch information
carllerche authored Oct 19, 2020
1 parent 872ceda commit 50c299a
Show file tree
Hide file tree
Showing 4 changed files with 81 additions and 23 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ ntapi = "0.3"
[dev-dependencies]
env_logger = { version = "0.6.2", default-features = false }
rand = "0.4"
socket2 = "0.3.15"

[package.metadata.docs.rs]
all-features = true
Expand Down
1 change: 1 addition & 0 deletions ci/azure-test-stable.yml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ jobs:
displayName: cargo ${{ parameters.cmd }} --all-features
env:
CI: "True"
RUST_TEST_THREADS: "1"

- ${{ if eq(parameters.cmd, 'test') }}:
- script: cargo doc --no-deps
Expand Down
43 changes: 20 additions & 23 deletions src/sys/windows/selector.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
use super::afd::{self, Afd, AfdPollInfo};
use super::io_status_block::IoStatusBlock;
use super::Event;
use crate::sys::event::{
ERROR_FLAGS, READABLE_FLAGS, READ_CLOSED_FLAGS, WRITABLE_FLAGS, WRITE_CLOSED_FLAGS,
};
use crate::sys::Events;
use crate::Interest;

cfg_net! {
use crate::sys::event::{
ERROR_FLAGS, READABLE_FLAGS, READ_CLOSED_FLAGS, WRITABLE_FLAGS, WRITE_CLOSED_FLAGS,
};
use crate::Interest;
}

use miow::iocp::{CompletionPort, CompletionStatus};
use std::collections::VecDeque;
Expand Down Expand Up @@ -226,15 +229,7 @@ impl SockState {
// In mio, we have to simulate Edge-triggered behavior to match API usage.
// The strategy here is to intercept all read/write from user that could cause WouldBlock usage,
// then reregister the socket to reset the interests.

// Reset readable event
if (afd_events & interests_to_afd_flags(Interest::READABLE)) != 0 {
self.user_evts &= !(interests_to_afd_flags(Interest::READABLE));
}
// Reset writable event
if (afd_events & interests_to_afd_flags(Interest::WRITABLE)) != 0 {
self.user_evts &= !interests_to_afd_flags(Interest::WRITABLE);
}
self.user_evts &= !afd_events;

Some(Event {
data: self.user_data,
Expand Down Expand Up @@ -730,16 +725,18 @@ impl Drop for SelectorInner {
}
}

fn interests_to_afd_flags(interests: Interest) -> u32 {
let mut flags = 0;
cfg_net! {
fn interests_to_afd_flags(interests: Interest) -> u32 {
let mut flags = 0;

if interests.is_readable() {
flags |= READABLE_FLAGS | READ_CLOSED_FLAGS | ERROR_FLAGS;
}
if interests.is_readable() {
flags |= READABLE_FLAGS | READ_CLOSED_FLAGS | ERROR_FLAGS;
}

if interests.is_writable() {
flags |= WRITABLE_FLAGS | WRITE_CLOSED_FLAGS | ERROR_FLAGS;
}
if interests.is_writable() {
flags |= WRITABLE_FLAGS | WRITE_CLOSED_FLAGS | ERROR_FLAGS;
}

flags
}
flags
}
}
59 changes: 59 additions & 0 deletions tests/tcp_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -749,3 +749,62 @@ fn start_listener(
});
(thread_handle, receiver.recv().unwrap())
}

#[test]
fn hup() {
use mio::net::TcpListener;

let (mut poll, mut events) = init_with_poll();
let addr = "127.0.0.1:0".parse().unwrap();

let mut listener = TcpListener::bind(addr).unwrap();
let addr = listener.local_addr().unwrap();
poll.registry()
.register(&mut listener, Token(0), Interest::READABLE)
.unwrap();

let mut stream = TcpStream::connect(addr).unwrap();
poll.registry()
.register(
&mut stream,
Token(1),
Interest::READABLE | Interest::WRITABLE,
)
.unwrap();

expect_events(
&mut poll,
&mut events,
vec![
ExpectEvent::new(Token(0), Interest::READABLE),
ExpectEvent::new(Token(1), Interest::WRITABLE),
],
);

let (sock, _) = listener.accept().unwrap();
set_linger_zero(&sock);
drop(sock);

expect_events(
&mut poll,
&mut events,
vec![ExpectEvent::new(Token(1), Interest::READABLE)],
);
}

#[cfg(windows)]
fn set_linger_zero(socket: &TcpStream) {
use socket2::Socket;
use std::os::windows::io::{AsRawSocket, FromRawSocket};

let s = unsafe { Socket::from_raw_socket(socket.as_raw_socket()) };
s.set_linger(Some(Duration::from_millis(0))).unwrap();
}

#[cfg(unix)]
fn set_linger_zero(socket: &TcpStream) {
use socket2::Socket;

let s = unsafe { Socket::from_raw_fd(socket.as_raw_fd()) };
s.set_linger(Some(Duration::from_millis(0))).unwrap();
}

0 comments on commit 50c299a

Please sign in to comment.