diff --git a/apps/c/httpserver/httpserver.c b/apps/c/httpserver/httpserver.c index 2c38211dd5..4480785aae 100644 --- a/apps/c/httpserver/httpserver.c +++ b/apps/c/httpserver/httpserver.c @@ -36,7 +36,7 @@ int main() struct sockaddr_in local, remote; int addr_len = sizeof(remote); local.sin_family = AF_INET; - if (inet_pton(AF_INET, "10.0.2.15", &(local.sin_addr)) != 1) { + if (inet_pton(AF_INET, "0.0.0.0", &(local.sin_addr)) != 1) { perror("inet_pton() error"); return -1; } @@ -54,7 +54,7 @@ int main() perror("listen() error"); return -1; } - puts("listen on: http://10.0.2.15:5555/"); + puts("listen on: http://0.0.0.0:5555/"); char buf[1024] = {}; int client; char response[1024] = {}; diff --git a/apps/c/iperf/axbuild.mk b/apps/c/iperf/axbuild.mk index ddf7699d23..842dbb4482 100644 --- a/apps/c/iperf/axbuild.mk +++ b/apps/c/iperf/axbuild.mk @@ -25,7 +25,7 @@ app-objs := $(patsubst %.c,$(iperf_pkg)/src/%.o,$(iperf_src)) .PRECIOUS: $(APP)/%.c $(APP)/%.c: - echo "Download iperf source code" + @echo "Download iperf source code" wget https://downloads.es.net/pub/iperf/$(iperf_pkg).tar.gz -P $(APP) tar -zxvf $(APP)/$(iperf_pkg).tar.gz -C $(APP) && rm -f $(APP)/$(iperf_pkg).tar.gz patch -p1 -N -d $(iperf_dir) --no-backup-if-mismatch -r - < $(APP)/iperf.patch diff --git a/apps/c/iperf/iperf.patch b/apps/c/iperf/iperf.patch index 7292adf098..07994d1a70 100644 --- a/apps/c/iperf/iperf.patch +++ b/apps/c/iperf/iperf.patch @@ -417,7 +417,7 @@ index 73dc362..4bfe6f9 100644 } diff --git a/src/main_server.c b/src/main_server.c new file mode 100644 -index 0000000..9d31a66 +index 0000000..c9964ba --- /dev/null +++ b/src/main_server.c @@ -0,0 +1,42 @@ @@ -445,7 +445,7 @@ index 0000000..9d31a66 + iperf_defaults(test); + iperf_set_test_role(test, 's'); + iperf_set_test_server_port(test, port); -+ iperf_set_test_bind_address(test, "10.0.2.15"); ++ iperf_set_test_bind_address(test, "0.0.0.0"); + consecutive_errors = 0; + for (;;) { + if (iperf_run_server(test) < 0) { diff --git a/apps/c/udpserver/udpserver.c b/apps/c/udpserver/udpserver.c index fdb190bdbd..cff15139c6 100644 --- a/apps/c/udpserver/udpserver.c +++ b/apps/c/udpserver/udpserver.c @@ -12,7 +12,7 @@ int main() struct sockaddr_in local, remote; int addr_len = sizeof(remote); local.sin_family = AF_INET; - if (inet_pton(AF_INET, "10.0.2.15", &(local.sin_addr)) != 1) { + if (inet_pton(AF_INET, "0.0.0.0", &(local.sin_addr)) != 1) { perror("inet_pton() error"); return -1; } @@ -26,7 +26,7 @@ int main() perror("bind() error"); return -1; } - puts("listen on: 10.0.2.15:5555"); + puts("listen on: 0.0.0.0:5555"); char buf[1024] = {}; for (;;) { ssize_t l = diff --git a/apps/net/echoserver/src/main.rs b/apps/net/echoserver/src/main.rs index b695f4109d..0067dff046 100644 --- a/apps/net/echoserver/src/main.rs +++ b/apps/net/echoserver/src/main.rs @@ -12,7 +12,7 @@ use libax::io::{self, prelude::*}; use libax::net::{IpAddr, TcpListener, TcpStream}; use libax::thread; -const LOCAL_IP: &str = "10.0.2.15"; +const LOCAL_IP: &str = "0.0.0.0"; const LOCAL_PORT: u16 = 5555; fn reverse(buf: &[u8]) -> Vec { @@ -39,7 +39,7 @@ fn echo_server(mut stream: TcpStream) -> io::Result { fn accept_loop() -> io::Result { let (addr, port) = (IpAddr::from_str(LOCAL_IP).unwrap(), LOCAL_PORT); - let mut listener = TcpListener::bind((addr, port).into())?; + let listener = TcpListener::bind((addr, port).into())?; println!("listen on: {}", listener.local_addr().unwrap()); let mut i = 0; diff --git a/apps/net/httpserver/src/main.rs b/apps/net/httpserver/src/main.rs index 7b372f343d..e08abf3dc4 100644 --- a/apps/net/httpserver/src/main.rs +++ b/apps/net/httpserver/src/main.rs @@ -19,7 +19,7 @@ use libax::io::{self, prelude::*}; use libax::net::{IpAddr, TcpListener, TcpStream}; use libax::thread; -const LOCAL_IP: &str = "10.0.2.15"; +const LOCAL_IP: &str = "0.0.0.0"; const LOCAL_PORT: u16 = 5555; macro_rules! header { @@ -62,7 +62,7 @@ fn http_server(mut stream: TcpStream) -> io::Result { fn accept_loop() -> io::Result { let (addr, port) = (IpAddr::from_str(LOCAL_IP).unwrap(), LOCAL_PORT); - let mut listener = TcpListener::bind((addr, port).into())?; + let listener = TcpListener::bind((addr, port).into())?; println!("listen on: http://{}/", listener.local_addr().unwrap()); let mut i = 0; diff --git a/apps/net/udpserver/src/main.rs b/apps/net/udpserver/src/main.rs index 84e6d3eeb7..67e21bb3e4 100644 --- a/apps/net/udpserver/src/main.rs +++ b/apps/net/udpserver/src/main.rs @@ -10,7 +10,7 @@ use core::str::FromStr; use libax::io; use libax::net::{IpAddr, UdpSocket}; -const LOCAL_IP: &str = "10.0.2.15"; +const LOCAL_IP: &str = "0.0.0.0"; const LOCAL_PORT: u16 = 5555; fn receive_loop() -> io::Result { diff --git a/crates/axerrno/src/lib.rs b/crates/axerrno/src/lib.rs index c1688293d6..986040cb4d 100644 --- a/crates/axerrno/src/lib.rs +++ b/crates/axerrno/src/lib.rs @@ -10,6 +10,9 @@ //! [`std::io::ErrorKind`]: https://doc.rust-lang.org/std/io/enum.ErrorKind.html #![no_std] +#![feature(variant_count)] + +use core::fmt; mod linux_errno { include!(concat!(env!("OUT_DIR"), "/linux_errno.rs")); @@ -27,7 +30,7 @@ pub use linux_errno::LinuxError; #[derive(Clone, Copy, Debug, Eq, PartialEq)] pub enum AxError { /// A socket address could not be bound because the address is already in use elsewhere. - AddrInUse, + AddrInUse = 1, /// An entity already exists, often a file. AlreadyExists, /// Bad address. @@ -36,6 +39,8 @@ pub enum AxError { BadState, /// The connection was refused by the remote server, ConnectionRefused, + /// The connection was reset by the remote server. + ConnectionReset, /// A non-empty directory was specified where an empty directory was expected. DirectoryNotEmpty, /// Data not valid for the operation were encountered. @@ -183,14 +188,54 @@ impl AxError { pub fn as_str(&self) -> &'static str { use AxError::*; match *self { + AddrInUse => "Address in use", + BadAddress => "Bad address", BadState => "Bad internal state", + AlreadyExists => "Entity already exists", + ConnectionRefused => "Connection refused", + ConnectionReset => "Connection reset", + DirectoryNotEmpty => "Directory not empty", InvalidData => "Invalid data", - Unsupported => "Operation not supported", + InvalidInput => "Invalid input parameter", + Io => "I/O error", + IsADirectory => "Is a directory", + NoMemory => "Out of memory", + NotADirectory => "Not a directory", + NotConnected => "Not connected", + NotFound => "Entity not found", + PermissionDenied => "Permission denied", + ResourceBusy => "Resource busy", + StorageFull => "No storage space", UnexpectedEof => "Unexpected end of file", + Unsupported => "Operation not supported", + WouldBlock => "Operation would block", WriteZero => "Write zero", - _ => LinuxError::from(*self).as_str(), } } + + /// Returns the error code value in `i32`. + pub const fn code(self) -> i32 { + self as i32 + } +} + +impl TryFrom for AxError { + type Error = i32; + + #[inline] + fn try_from(value: i32) -> Result { + if value > 0 && value <= core::mem::variant_count::() as i32 { + Ok(unsafe { core::mem::transmute(value) }) + } else { + Err(value) + } + } +} + +impl fmt::Display for AxError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}", self.as_str()) + } } impl From for LinuxError { @@ -201,6 +246,7 @@ impl From for LinuxError { AlreadyExists => LinuxError::EEXIST, BadAddress | BadState => LinuxError::EFAULT, ConnectionRefused => LinuxError::ECONNREFUSED, + ConnectionReset => LinuxError::ECONNRESET, DirectoryNotEmpty => LinuxError::ENOTEMPTY, InvalidInput | InvalidData => LinuxError::EINVAL, Io => LinuxError::EIO, @@ -219,7 +265,34 @@ impl From for LinuxError { } } +impl fmt::Display for LinuxError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}", self.as_str()) + } +} + #[doc(hidden)] pub mod __priv { pub use log::warn; } + +#[cfg(test)] +mod tests { + use crate::AxError; + + #[test] + fn test_try_from() { + let max_code = core::mem::variant_count::() as i32; + assert_eq!(max_code, 22); + assert_eq!(max_code, AxError::WriteZero.code()); + + assert_eq!(AxError::AddrInUse.code(), 1); + assert_eq!(Ok(AxError::AddrInUse), AxError::try_from(1)); + assert_eq!(Ok(AxError::AlreadyExists), AxError::try_from(2)); + assert_eq!(Ok(AxError::WriteZero), AxError::try_from(max_code)); + assert_eq!(Err(max_code + 1), AxError::try_from(max_code + 1)); + assert_eq!(Err(0), AxError::try_from(0)); + assert_eq!(Err(-1), AxError::try_from(-1)); + assert_eq!(Err(i32::MAX), AxError::try_from(i32::MAX)); + } +} diff --git a/modules/axnet/src/lib.rs b/modules/axnet/src/lib.rs index e699e485d9..7cbfb61889 100644 --- a/modules/axnet/src/lib.rs +++ b/modules/axnet/src/lib.rs @@ -33,9 +33,9 @@ cfg_if::cfg_if! { } } -pub use self::net_impl::resolve_socket_addr; pub use self::net_impl::TcpSocket; pub use self::net_impl::UdpSocket; +pub use self::net_impl::{poll_interfaces, resolve_socket_addr}; pub use smoltcp::wire::{IpAddress as IpAddr, IpEndpoint as SocketAddr, Ipv4Address as Ipv4Addr}; use axdriver::{prelude::*, AxDeviceContainer}; diff --git a/modules/axnet/src/smoltcp_impl/listen_table.rs b/modules/axnet/src/smoltcp_impl/listen_table.rs index 8dc836ef23..d1c4524c68 100644 --- a/modules/axnet/src/smoltcp_impl/listen_table.rs +++ b/modules/axnet/src/smoltcp_impl/listen_table.rs @@ -5,6 +5,7 @@ use axerrno::{ax_err, AxError, AxResult}; use axsync::Mutex; use smoltcp::iface::{SocketHandle, SocketSet}; use smoltcp::socket::tcp::{self, State}; +use smoltcp::wire::{IpAddress, IpListenEndpoint}; use super::{SocketSetWrapper, LISTEN_QUEUE_SIZE, SOCKET_SET}; use crate::SocketAddr; @@ -12,15 +13,25 @@ use crate::SocketAddr; const PORT_NUM: usize = 65536; struct ListenTableEntry { + listen_addr: IpListenEndpoint, syn_queue: VecDeque, } impl ListenTableEntry { - pub fn new() -> Self { + pub fn new(listen_addr: IpListenEndpoint) -> Self { Self { + listen_addr, syn_queue: VecDeque::with_capacity(LISTEN_QUEUE_SIZE), } } + + #[inline] + fn can_accept(&self, dst: IpAddress) -> bool { + match self.listen_addr.addr { + Some(addr) => addr == dst, + None => true, + } + } } impl Drop for ListenTableEntry { @@ -51,13 +62,12 @@ impl ListenTable { self.tcp[port as usize].lock().is_none() } - pub fn listen(&self, port: u16) -> AxResult { - if port == 0 { - return ax_err!(InvalidInput, "socket listen() failed"); - } + pub fn listen(&self, listen_addr: IpListenEndpoint) -> AxResult { + let port = listen_addr.port; + assert_ne!(port, 0); let mut entry = self.tcp[port as usize].lock(); if entry.is_none() { - *entry = Some(Box::new(ListenTableEntry::new())); + *entry = Some(Box::new(ListenTableEntry::new(listen_addr))); Ok(()) } else { ax_err!(AddrInUse, "socket listen() failed") @@ -65,67 +75,37 @@ impl ListenTable { } pub fn unlisten(&self, port: u16) { - debug!("socket unlisten on {}", port); + debug!("TCP socket unlisten on {}", port); *self.tcp[port as usize].lock() = None; } pub fn can_accept(&self, port: u16) -> AxResult { if let Some(entry) = self.tcp[port as usize].lock().deref() { - if entry.syn_queue.iter().any(|&handle| { - let (connected, _) = get_socket_info(handle); - connected - }) { - Ok(true) - } else { - Ok(false) - } + Ok(entry.syn_queue.iter().any(|&handle| is_connected(handle))) } else { ax_err!(InvalidInput, "socket accept() failed: not listen") } } - pub fn accept(&self, port: u16) -> AxResult<(SocketHandle, Option)> { + pub fn accept(&self, port: u16) -> AxResult<(SocketHandle, (SocketAddr, SocketAddr))> { if let Some(entry) = self.tcp[port as usize].lock().deref_mut() { let syn_queue = &mut entry.syn_queue; - if let Some(&handle) = syn_queue.front() { - // In most cases, the order in which sockets establish connections - // is the same as the order in which they join the SYN queue. That - // is, the front of the queue connects first. At this point, we can - // use `pop_front` to speed up queue deletion. - let (connected, peer_addr) = get_socket_info(handle); - if connected { - syn_queue.pop_front(); - return Ok((handle, peer_addr)); - } - } else { - return Err(AxError::WouldBlock); - } - if let Some((idx, peer_addr)) = - syn_queue - .iter() - .enumerate() - .skip(1) - .find_map(|(idx, &handle)| { - let (connected, peer_addr) = get_socket_info(handle); - if connected { - Some((idx, peer_addr)) - } else { - None - } - }) - { + let (idx, addr_tuple) = syn_queue + .iter() + .enumerate() + .find_map(|(idx, &handle)| { + is_connected(handle).then(|| (idx, get_addr_tuple(handle))) + }) + .ok_or(AxError::WouldBlock)?; // wait for connection + if idx > 0 { warn!( - "slow removal in SYN queue: index = {}, len = {}!", + "slow SYN queue enumeration: index = {}, len = {}!", idx, syn_queue.len() ); - // this removal can be slow - let handle = syn_queue.remove(idx).unwrap(); - Ok((handle, peer_addr)) - } else { - // wait for connection - Err(AxError::WouldBlock) } + let handle = syn_queue.swap_remove_front(idx).unwrap(); + Ok((handle, addr_tuple)) } else { ax_err!(InvalidInput, "socket accept() failed: not listen") } @@ -138,17 +118,21 @@ impl ListenTable { sockets: &mut SocketSet<'_>, ) { if let Some(entry) = self.tcp[dst.port as usize].lock().deref_mut() { + if !entry.can_accept(dst.addr) { + // not listening on this address + return; + } if entry.syn_queue.len() >= LISTEN_QUEUE_SIZE { // SYN queue is full, drop the packet warn!("SYN queue overflow!"); return; } let mut socket = SocketSetWrapper::new_tcp_socket(); - if socket.listen(dst).is_ok() { + if socket.listen(entry.listen_addr).is_ok() { let handle = sockets.add(socket); debug!( - "socket {}: prepare for connection {} -> {}", - handle, src, dst + "TCP socket {}: prepare for connection {} -> {}", + handle, src, entry.listen_addr ); entry.syn_queue.push_back(handle); } @@ -156,12 +140,17 @@ impl ListenTable { } } -fn get_socket_info(handle: SocketHandle) -> (bool, Option) { - let (connected, peer_addr) = SOCKET_SET.with_socket::(handle, |socket| { +fn is_connected(handle: SocketHandle) -> bool { + SOCKET_SET.with_socket::(handle, |socket| { + !matches!(socket.state(), State::Listen | State::SynReceived) + }) +} + +fn get_addr_tuple(handle: SocketHandle) -> (SocketAddr, SocketAddr) { + SOCKET_SET.with_socket::(handle, |socket| { ( - !matches!(socket.state(), State::Listen | State::SynReceived), - socket.remote_endpoint(), + socket.local_endpoint().unwrap(), + socket.remote_endpoint().unwrap(), ) - }); - (connected, peer_addr) + }) } diff --git a/modules/axnet/src/smoltcp_impl/mod.rs b/modules/axnet/src/smoltcp_impl/mod.rs index bb8989a1f5..4013e82867 100644 --- a/modules/axnet/src/smoltcp_impl/mod.rs +++ b/modules/axnet/src/smoltcp_impl/mod.rs @@ -279,6 +279,14 @@ fn snoop_tcp_packet(buf: &[u8], sockets: &mut SocketSet<'_>) -> Result<(), smolt Ok(()) } +/// Poll the network stack. +/// +/// It may receive packets from the NIC and process them, and transmit queued +/// packets to the NIC. +pub fn poll_interfaces() { + SOCKET_SET.poll_interfaces(); +} + pub(crate) fn init(net_dev: AxNetDevice) { let ether_addr = EthernetAddress(net_dev.mac_address().0); let eth0 = InterfaceWrapper::new("eth0", net_dev, ether_addr); diff --git a/modules/axnet/src/smoltcp_impl/tcp.rs b/modules/axnet/src/smoltcp_impl/tcp.rs index 2a2185b9fd..2d05f9e7d6 100644 --- a/modules/axnet/src/smoltcp_impl/tcp.rs +++ b/modules/axnet/src/smoltcp_impl/tcp.rs @@ -1,14 +1,32 @@ +use core::cell::UnsafeCell; +use core::sync::atomic::{AtomicBool, AtomicU8, Ordering}; + use axerrno::{ax_err, ax_err_type, AxError, AxResult}; use axio::PollState; use axsync::Mutex; use smoltcp::iface::SocketHandle; -use smoltcp::socket::tcp::{self, ConnectError, RecvError, State}; -use smoltcp::wire::IpAddress; +use smoltcp::socket::tcp::{self, ConnectError, State}; +use smoltcp::wire::{IpAddress, IpListenEndpoint}; use super::{SocketSetWrapper, ETH0, LISTEN_TABLE, SOCKET_SET}; use crate::SocketAddr; +const UNSPECIFIED_IP: IpAddress = IpAddress::v4(0, 0, 0, 0); +const UNSPECIFIED: SocketAddr = SocketAddr::new(UNSPECIFIED_IP, 0); + +// State transitions: +// CLOSED -(connect)-> BUSY -> CONNECTING -> CONNECTED -(shutdown)-> BUSY -> CLOSED +// | +// |-(listen)-> BUSY -> LISTENING -(shutdown)-> BUSY -> CLOSED +// | +// -(bind)-> BUSY -> CLOSED +const STATE_CLOSED: u8 = 0; +const STATE_BUSY: u8 = 1; +const STATE_CONNECTING: u8 = 2; +const STATE_CONNECTED: u8 = 3; +const STATE_LISTENING: u8 = 4; + /// A TCP socket that provides POSIX-like APIs. /// /// - [`connect`] is for TCP clients. @@ -20,43 +38,66 @@ use crate::SocketAddr; /// [`listen`]: TcpSocket::listen /// [`accept`]: TcpSocket::accept pub struct TcpSocket { - handle: Option, // `None` if is listening - local_addr: Option, - peer_addr: Option, - nonblock: bool, + state: AtomicU8, + handle: UnsafeCell>, + local_addr: UnsafeCell, + peer_addr: UnsafeCell, + nonblock: AtomicBool, } +unsafe impl Sync for TcpSocket {} + impl TcpSocket { /// Creates a new TCP socket. - #[allow(clippy::new_without_default)] - pub fn new() -> Self { - let socket = SocketSetWrapper::new_tcp_socket(); - let handle = Some(SOCKET_SET.add(socket)); + pub const fn new() -> Self { Self { - handle, - local_addr: None, - peer_addr: None, - nonblock: false, + state: AtomicU8::new(STATE_CLOSED), + handle: UnsafeCell::new(None), + local_addr: UnsafeCell::new(UNSPECIFIED), + peer_addr: UnsafeCell::new(UNSPECIFIED), + nonblock: AtomicBool::new(false), } } - /// If `handle` is not [`None`], the socket is used for a client to connect - /// to a server. Otherwise, the socket is used for a server to listen for - /// connections. - const fn is_listening(&self) -> bool { - self.handle.is_none() + /// Creates a new TCP socket that is already connected. + const fn new_connected( + handle: SocketHandle, + local_addr: SocketAddr, + peer_addr: SocketAddr, + ) -> Self { + Self { + state: AtomicU8::new(STATE_CONNECTED), + handle: UnsafeCell::new(Some(handle)), + local_addr: UnsafeCell::new(local_addr), + peer_addr: UnsafeCell::new(peer_addr), + nonblock: AtomicBool::new(false), + } } /// Returns the local address and port, or /// [`Err(NotConnected)`](AxError::NotConnected) if not connected. + #[inline] pub fn local_addr(&self) -> AxResult { - self.local_addr.ok_or(AxError::NotConnected) + match self.get_state() { + STATE_CONNECTED | STATE_LISTENING => unsafe { Ok(self.local_addr.get().read()) }, + _ => Err(AxError::NotConnected), + } } /// Returns the remote address and port, or /// [`Err(NotConnected)`](AxError::NotConnected) if not connected. + #[inline] pub fn peer_addr(&self) -> AxResult { - self.peer_addr.ok_or(AxError::NotConnected) + match self.get_state() { + STATE_CONNECTED | STATE_LISTENING => unsafe { Ok(self.peer_addr.get().read()) }, + _ => Err(AxError::NotConnected), + } + } + + /// Returns whether this socket is in nonblocking mode. + #[inline] + pub fn is_nonblocking(&self) -> bool { + self.nonblock.load(Ordering::Acquire) } /// Moves this TCP stream into or out of nonblocking mode. @@ -67,52 +108,65 @@ impl TcpSocket { /// action is required. If the IO operation could not be completed and needs /// to be retried, an error with kind [`Err(WouldBlock)`](AxError::WouldBlock) is /// returned. - pub fn set_nonblocking(&mut self, nonblocking: bool) { - self.nonblock = nonblocking; + #[inline] + pub fn set_nonblocking(&self, nonblocking: bool) { + self.nonblock.store(nonblocking, Ordering::Release); } /// Connects to the given address and port. /// /// The local port is generated automatically. - pub fn connect(&mut self, addr: SocketAddr) -> AxResult { - let handle = if self.is_listening() { - return ax_err!(AlreadyExists, "socket connect() failed: already connected"); - } else { - self.handle.unwrap() - }; + pub fn connect(&self, remote_addr: SocketAddr) -> AxResult { + self.update_state(STATE_CLOSED, STATE_CONNECTING, || { + // SAFETY: no other threads can read or write these fields. + let handle = unsafe { self.handle.get().read() } + .unwrap_or_else(|| SOCKET_SET.add(SocketSetWrapper::new_tcp_socket())); - // TODO: check host unreachable - let local_port = get_ephemeral_port()?; - let iface = Ð0.iface; - let (local_addr, peer_addr) = - SOCKET_SET.with_socket_mut::(handle, |socket| { - socket - .connect(iface.lock().context(), addr, local_port) - .or_else(|e| match e { - ConnectError::InvalidState => { - ax_err!(AlreadyExists, "socket connect() failed") - } - ConnectError::Unaddressable => { - ax_err!(InvalidInput, "socket connect() failed") - } - })?; - Ok((socket.local_endpoint(), socket.remote_endpoint())) - })?; - - loop { - SOCKET_SET.poll_interfaces(); - let (state, may_recv) = SOCKET_SET.with_socket::(handle, |socket| { - (socket.state(), socket.may_recv()) - }); - if may_recv || state == State::Established { - self.local_addr = local_addr; - self.peer_addr = peer_addr; - return Ok(()); - } else if state == State::SynSent { - axtask::yield_now(); - } else { - return ax_err!(ConnectionRefused, "socket connect() failed"); + // TODO: check remote addr unreachable + let bound_addr = self.bound_addr()?; + let iface = Ð0.iface; + let (local_addr, remote_addr) = + SOCKET_SET.with_socket_mut::(handle, |socket| { + socket + .connect(iface.lock().context(), remote_addr, bound_addr) + .or_else(|e| match e { + ConnectError::InvalidState => { + ax_err!(BadState, "socket connect() failed") + } + ConnectError::Unaddressable => { + ax_err!(ConnectionRefused, "socket connect() failed") + } + })?; + Ok(( + socket.local_endpoint().unwrap(), + socket.remote_endpoint().unwrap(), + )) + })?; + unsafe { + // SAFETY: no other threads can read or write these fields as we + // have changed the state to `BUSY`. + self.local_addr.get().write(local_addr); + self.peer_addr.get().write(remote_addr); + self.handle.get().write(Some(handle)); } + Ok(()) + }) + .unwrap_or_else(|_| ax_err!(AlreadyExists, "socket connect() failed: already connected"))?; // EISCONN + + // Here our state must be `CONNECTING`, and only one thread can run here. + if self.is_nonblocking() { + Err(AxError::WouldBlock) + } else { + self.block_on(|| { + let PollState { writable, .. } = self.poll_connect()?; + if !writable { + Err(AxError::WouldBlock) + } else if self.get_state() == STATE_CONNECTED { + Ok(()) + } else { + ax_err!(ConnectionRefused, "socket connect() failed") + } + }) } } @@ -122,43 +176,44 @@ impl TcpSocket { /// /// It's must be called before [`listen`](Self::listen) and /// [`accept`](Self::accept). - pub fn bind(&mut self, addr: SocketAddr) -> AxResult { - if self.local_addr.is_some() { - return ax_err!(InvalidInput, "socket bind() failed: already bound"); - } - - // TODO: check addr is valid - let mut addr = addr; - if addr.port == 0 { - addr.port = get_ephemeral_port()?; - } - self.local_addr = Some(addr); - Ok(()) + pub fn bind(&self, mut local_addr: SocketAddr) -> AxResult { + self.update_state(STATE_CLOSED, STATE_CLOSED, || { + // TODO: check addr is available + if local_addr.addr.is_unspecified() && local_addr.addr != UNSPECIFIED_IP { + return ax_err!(InvalidInput, "socket bind() failed: invalid addr"); + } + if local_addr.port == 0 { + local_addr.port = get_ephemeral_port()?; + } + // SAFETY: no other threads can read or write `self.local_addr` as we + // have changed the state to `BUSY`. + unsafe { + let old = self.local_addr.get().read(); + if old != UNSPECIFIED { + return ax_err!(InvalidInput, "socket bind() failed: already bound"); + } + self.local_addr.get().write(local_addr); + } + Ok(()) + }) + .unwrap_or_else(|_| ax_err!(InvalidInput, "socket bind() failed: already bound")) } /// Starts listening on the bound address and port. /// /// It's must be called after [`bind`](Self::bind) and before /// [`accept`](Self::accept). - pub fn listen(&mut self) -> AxResult { - if self.is_listening() { - return Ok(()); // already listening - } - - let local_port = if let Some(local_addr) = self.local_addr { - local_addr.port - } else { - let addr = IpAddress::v4(0, 0, 0, 0); - let port = get_ephemeral_port()?; - self.local_addr = Some(SocketAddr::new(addr, port)); - port - }; - - LISTEN_TABLE.listen(local_port)?; - debug!("socket listening on {}", self.local_addr.unwrap()); - let handle = self.handle.take().unwrap(); // should not connect/send/recv any more - SOCKET_SET.remove(handle); - Ok(()) + pub fn listen(&self) -> AxResult { + self.update_state(STATE_CLOSED, STATE_LISTENING, || { + let bound_addr = self.bound_addr()?; + unsafe { + (*self.local_addr.get()).port = bound_addr.port; + } + LISTEN_TABLE.listen(bound_addr)?; + debug!("TCP socket listening on {}", bound_addr); + Ok(()) + }) + .unwrap_or(Ok(())) // ignore simultaneous `listen`s. } /// Accepts a new connection. @@ -167,163 +222,268 @@ impl TcpSocket { /// is established. When established, a new [`TcpSocket`] is returned. /// /// It's must be called after [`bind`](Self::bind) and [`listen`](Self::listen). - pub fn accept(&mut self) -> AxResult { + pub fn accept(&self) -> AxResult { if !self.is_listening() { return ax_err!(InvalidInput, "socket accept() failed: not listen"); } - let local_port = self - .local_addr - .ok_or_else(|| ax_err_type!(InvalidInput, "socket accept() failed: no address bound"))? - .port; - - loop { - SOCKET_SET.poll_interfaces(); - match LISTEN_TABLE.accept(local_port) { - Ok((handle, peer_addr)) => { - debug!("socket accepted a new connection {}", peer_addr.unwrap()); - return Ok(TcpSocket { - handle: Some(handle), - local_addr: self.local_addr, - peer_addr, - nonblock: false, - }); - } - Err(AxError::WouldBlock) => { - if self.nonblock { - return Err(AxError::WouldBlock); - } else { - axtask::yield_now() - } - } - Err(e) => return Err(e), - } - } + // SAFETY: `self.local_addr` should be initialized after `bind()`. + let local_port = unsafe { self.local_addr.get().read().port }; + self.block_on(|| { + let (handle, (local_addr, peer_addr)) = LISTEN_TABLE.accept(local_port)?; + debug!("TCP socket accepted a new connection {}", peer_addr); + Ok(TcpSocket::new_connected(handle, local_addr, peer_addr)) + }) } /// Close the connection. pub fn shutdown(&self) -> AxResult { - if let Some(handle) = self.handle { - // stream + // stream + self.update_state(STATE_CONNECTED, STATE_CLOSED, || { + // SAFETY: `self.handle` should be initialized in a connected socket, and + // no other threads can read or write it. + let handle = unsafe { self.handle.get().read().unwrap() }; SOCKET_SET.with_socket_mut::(handle, |socket| { - debug!("socket {}: shutting down", handle); + debug!("TCP socket {}: shutting down", handle); socket.close(); }); - } else { - // listener - if let Some(local_addr) = self.local_addr { - LISTEN_TABLE.unlisten(local_addr.port); - } - } - SOCKET_SET.poll_interfaces(); + unsafe { self.local_addr.get().write(UNSPECIFIED) }; // clear bound address + SOCKET_SET.poll_interfaces(); + Ok(()) + }) + .unwrap_or(Ok(()))?; + + // listener + self.update_state(STATE_LISTENING, STATE_CLOSED, || { + // SAFETY: `self.local_addr` should be initialized in a listening socket, + // and no other threads can read or write it. + let local_port = unsafe { self.local_addr.get().read().port }; + unsafe { self.local_addr.get().write(UNSPECIFIED) }; // clear bound address + LISTEN_TABLE.unlisten(local_port); + SOCKET_SET.poll_interfaces(); + Ok(()) + }) + .unwrap_or(Ok(()))?; + + // ignore for other states Ok(()) } /// Receives data from the socket, stores it in the given buffer. pub fn recv(&self, buf: &mut [u8]) -> AxResult { - let handle = self - .handle - .ok_or_else(|| ax_err_type!(NotConnected, "socket recv() failed"))?; - loop { - SOCKET_SET.poll_interfaces(); - match SOCKET_SET.with_socket_mut::(handle, |socket| { - if !socket.is_open() { - // not connected - ax_err!(NotConnected, "socket recv() failed") + if self.is_connecting() { + return Err(AxError::WouldBlock); + } else if !self.is_connected() { + return ax_err!(NotConnected, "socket recv() failed"); + } + + // SAFETY: `self.handle` should be initialized in a connected socket. + let handle = unsafe { self.handle.get().read().unwrap() }; + self.block_on(|| { + SOCKET_SET.with_socket_mut::(handle, |socket| { + if !socket.is_active() { + // not open + ax_err!(ConnectionRefused, "socket recv() failed") } else if !socket.may_recv() { // connection closed Ok(0) - } else if socket.can_recv() { + } else if socket.recv_queue() > 0 { // data available // TODO: use socket.recv(|buf| {...}) - match socket.recv_slice(buf) { - Ok(len) => Ok(len), - Err(RecvError::Finished) => Ok(0), - Err(_) => ax_err!(ConnectionRefused, "socket recv() failed"), - } + let len = socket + .recv_slice(buf) + .map_err(|_| ax_err_type!(BadState, "socket recv() failed"))?; + Ok(len) } else { // no more data Err(AxError::WouldBlock) } - }) { - Ok(n) => { - return Ok(n); - } - Err(AxError::WouldBlock) => { - if self.nonblock { - return Err(AxError::WouldBlock); - } else { - axtask::yield_now() - } - } - Err(e) => return Err(e), - } - } + }) + }) } /// Transmits data in the given buffer. pub fn send(&self, buf: &[u8]) -> AxResult { - let handle = self - .handle - .ok_or_else(|| ax_err_type!(NotConnected, "socket send() failed"))?; - loop { - SOCKET_SET.poll_interfaces(); - match SOCKET_SET.with_socket_mut::(handle, |socket| { - if !socket.is_open() || !socket.may_send() { - // not connected - ax_err!(NotConnected, "socket send() failed") + if self.is_connecting() { + return Err(AxError::WouldBlock); + } else if !self.is_connected() { + return ax_err!(NotConnected, "socket recv() failed"); + } + + // SAFETY: `self.handle` should be initialized in a connected socket. + let handle = unsafe { self.handle.get().read().unwrap() }; + self.block_on(|| { + SOCKET_SET.with_socket_mut::(handle, |socket| { + if !socket.is_active() || !socket.may_send() { + // closed by remote + ax_err!(ConnectionReset, "socket send() failed") } else if socket.can_send() { // connected, and the tx buffer is not full // TODO: use socket.send(|buf| {...}) let len = socket .send_slice(buf) - .map_err(|_| ax_err_type!(ConnectionRefused, "socket send() failed"))?; + .map_err(|_| ax_err_type!(BadState, "socket send() failed"))?; Ok(len) } else { // tx buffer is full Err(AxError::WouldBlock) } - }) { - Ok(n) => { - return Ok(n); - } - Err(AxError::WouldBlock) => { - if self.nonblock { - return Err(AxError::WouldBlock); - } else { - axtask::yield_now() - } + }) + }) + } + + /// Whether the socket is readable or writable. + pub fn poll(&self) -> AxResult { + match self.get_state() { + STATE_CONNECTING => self.poll_connect(), + STATE_CONNECTED => self.poll_stream(), + STATE_LISTENING => self.poll_listener(), + _ => Ok(PollState { + readable: false, + writable: false, + }), + } + } +} + +/// Private methods +impl TcpSocket { + #[inline] + fn get_state(&self) -> u8 { + self.state.load(Ordering::Acquire) + } + + #[inline] + fn set_state(&self, state: u8) { + self.state.store(state, Ordering::Release); + } + + /// Update the state of the socket atomically. + /// + /// If the current state is `expect`, it first changes the state to `STATE_BUSY`, + /// then calls the given function. If the function returns `Ok`, it changes the + /// state to `new`, otherwise it changes the state back to `expect`. + /// + /// It returns `Ok` if the current state is `expect`, otherwise it returns + /// the current state in `Err`. + fn update_state(&self, expect: u8, new: u8, f: F) -> Result, u8> + where + F: FnOnce() -> AxResult, + { + match self + .state + .compare_exchange(expect, STATE_BUSY, Ordering::Acquire, Ordering::Acquire) + { + Ok(_) => { + let res = f(); + if res.is_ok() { + self.set_state(new); + } else { + self.set_state(expect); } - Err(e) => return Err(e), + Ok(res) } + Err(old) => Err(old), } } - /// Detect whether the socket needs to receive/can send. - /// - /// Return is - pub fn poll(&self) -> AxResult { - SOCKET_SET.poll_interfaces(); - if let Some(handle) = self.handle { - // stream - SOCKET_SET.with_socket_mut::(handle, |socket| { - Ok(PollState { - readable: socket.is_open() && socket.can_recv(), - writable: socket.is_open() && socket.can_send(), - }) - }) + #[inline] + fn is_connecting(&self) -> bool { + self.get_state() == STATE_CONNECTING + } + + #[inline] + fn is_connected(&self) -> bool { + self.get_state() == STATE_CONNECTED + } + + #[inline] + fn is_listening(&self) -> bool { + self.get_state() == STATE_LISTENING + } + + fn bound_addr(&self) -> AxResult { + // SAFETY: no other threads can read or write `self.local_addr`. + let local_addr = unsafe { self.local_addr.get().read() }; + let port = if local_addr.port != 0 { + local_addr.port + } else { + get_ephemeral_port()? + }; + assert_ne!(port, 0); + let addr = if !local_addr.addr.is_unspecified() { + Some(local_addr.addr) } else { - // listener - let local_port = self - .local_addr - .ok_or_else(|| { - ax_err_type!(InvalidInput, "socket poll() failed: no address bound") - })? - .port; + None + }; + Ok(IpListenEndpoint { addr, port }) + } + + fn poll_connect(&self) -> AxResult { + // SAFETY: `self.handle` should be initialized above. + let handle = unsafe { self.handle.get().read().unwrap() }; + let writable = + SOCKET_SET.with_socket::(handle, |socket| match socket.state() { + State::SynSent => false, // wait for connection + State::Established => { + self.set_state(STATE_CONNECTED); // connected + true + } + _ => { + unsafe { + self.local_addr.get().write(UNSPECIFIED); + self.peer_addr.get().write(UNSPECIFIED); + } + self.set_state(STATE_CLOSED); // connection failed + true + } + }); + Ok(PollState { + readable: false, + writable, + }) + } + + fn poll_stream(&self) -> AxResult { + // SAFETY: `self.handle` should be initialized in a connected socket. + let handle = unsafe { self.handle.get().read().unwrap() }; + SOCKET_SET.with_socket::(handle, |socket| { Ok(PollState { - readable: LISTEN_TABLE.can_accept(local_port)?, - writable: false, + readable: !socket.may_recv() || socket.can_recv(), + writable: !socket.may_send() || socket.can_send(), }) + }) + } + + fn poll_listener(&self) -> AxResult { + // SAFETY: `self.local_addr` should be initialized in a listening socket. + let local_addr = unsafe { self.local_addr.get().read() }; + Ok(PollState { + readable: LISTEN_TABLE.can_accept(local_addr.port)?, + writable: false, + }) + } + + /// Block the current thread until the given function completes or fails. + /// + /// If the socket is non-blocking, it calls the function once and returns + /// immediately. Otherwise, it may call the function multiple times if it + /// returns [`Err(WouldBlock)`](AxError::WouldBlock). + fn block_on(&self, mut f: F) -> AxResult + where + F: FnMut() -> AxResult, + { + if self.is_nonblocking() { + f() + } else { + loop { + SOCKET_SET.poll_interfaces(); + match f() { + Ok(t) => return Ok(t), + Err(AxError::WouldBlock) => axtask::yield_now(), + Err(e) => return Err(e), + } + } } } } @@ -331,7 +491,8 @@ impl TcpSocket { impl Drop for TcpSocket { fn drop(&mut self) { self.shutdown().ok(); - if let Some(handle) = self.handle { + // Safe because we have mut reference to `self`. + if let Some(handle) = unsafe { self.handle.get().read() } { SOCKET_SET.remove(handle); } } @@ -357,5 +518,5 @@ fn get_ephemeral_port() -> AxResult { } tries += 1; } - ax_err!(NoMemory, "no avaliable ports!") + ax_err!(AddrInUse, "no avaliable ports!") } diff --git a/modules/axnet/src/smoltcp_impl/udp.rs b/modules/axnet/src/smoltcp_impl/udp.rs index a16ac2a748..0a8abbca82 100644 --- a/modules/axnet/src/smoltcp_impl/udp.rs +++ b/modules/axnet/src/smoltcp_impl/udp.rs @@ -4,10 +4,13 @@ use axsync::Mutex; use smoltcp::iface::SocketHandle; use smoltcp::socket::udp::{self, BindError, SendError}; +use smoltcp::wire::{IpAddress, IpListenEndpoint}; use super::{SocketSetWrapper, ETH0, SOCKET_SET}; use crate::SocketAddr; +const UNSPECIFIED_IP: IpAddress = IpAddress::v4(0, 0, 0, 0); + /// A UDP socket that provides POSIX-like APIs. pub struct UdpSocket { handle: SocketHandle, @@ -58,34 +61,36 @@ impl UdpSocket { /// /// It's must be called before [`send_to`](Self::send_to) and /// [`recv_from`](Self::recv_from). - pub fn bind(&mut self, addr: SocketAddr) -> AxResult { - let mut addr = addr; - if addr.port == 0 { - addr.port = get_ephemeral_port()?; + pub fn bind(&mut self, mut local_addr: SocketAddr) -> AxResult { + if local_addr.addr.is_unspecified() && local_addr.addr != UNSPECIFIED_IP { + return ax_err!(InvalidInput, "socket bind() failed: invalid addr"); + } + if local_addr.port == 0 { + local_addr.port = get_ephemeral_port()?; } if self.local_addr.is_some() { return ax_err!(InvalidInput, "socket bind() failed: already bound"); } + + let endpoint = IpListenEndpoint { + addr: (!local_addr.addr.is_unspecified()).then_some(local_addr.addr), + port: local_addr.port, + }; SOCKET_SET.with_socket_mut::(self.handle, |socket| { - socket.bind(addr).or_else(|e| match e { - BindError::InvalidState => { - ax_err!(AlreadyExists, "socket bind() failed") - } - BindError::Unaddressable => { - ax_err!(InvalidInput, "socket bind() failed") - } - })?; - Ok(socket.endpoint()) + socket.bind(endpoint).or_else(|e| match e { + BindError::InvalidState => ax_err!(AlreadyExists, "socket bind() failed"), + BindError::Unaddressable => ax_err!(InvalidInput, "socket bind() failed"), + }) })?; - self.local_addr = Some(addr); + self.local_addr = Some(local_addr); + debug!("UDP socket bound on {}", endpoint); Ok(()) } /// Transmits data in the given buffer to the given address. pub fn send_to(&self, buf: &[u8], addr: SocketAddr) -> AxResult { - loop { - SOCKET_SET.poll_interfaces(); - match SOCKET_SET.with_socket_mut::(self.handle, |socket| { + self.block_on(|| { + SOCKET_SET.with_socket_mut::(self.handle, |socket| { if !socket.is_open() { // not bound ax_err!(NotConnected, "socket send() failed") @@ -102,29 +107,16 @@ impl UdpSocket { // tx buffer is full Err(AxError::WouldBlock) } - }) { - Ok(n) => { - return Ok(n); - } - Err(AxError::WouldBlock) => { - if self.nonblock { - return Err(AxError::WouldBlock); - } else { - axtask::yield_now() - } - } - Err(e) => return Err(e), - } - } + }) + }) } fn recv_impl(&self, mut op: F, err: &str) -> AxResult where F: FnMut(&mut udp::Socket) -> AxResult, { - loop { - SOCKET_SET.poll_interfaces(); - match SOCKET_SET.with_socket_mut::(self.handle, |socket| { + self.block_on(|| { + SOCKET_SET.with_socket_mut::(self.handle, |socket| { if !socket.is_open() { // not connected ax_err!(NotConnected, err) @@ -135,20 +127,8 @@ impl UdpSocket { // no more data Err(AxError::WouldBlock) } - }) { - Ok(x) => { - return Ok(x); - } - Err(AxError::WouldBlock) => { - if self.nonblock { - return Err(AxError::WouldBlock); - } else { - axtask::yield_now() - } - } - Err(e) => return Err(e), - } - } + }) + }) } /// Receives data from the socket, stores it in the given buffer. @@ -209,7 +189,7 @@ impl UdpSocket { /// Close the socket. pub fn shutdown(&self) -> AxResult { SOCKET_SET.with_socket_mut::(self.handle, |socket| { - debug!("socket {}: shutting down", self.handle); + debug!("UDP socket {}: shutting down", self.handle); socket.close(); }); SOCKET_SET.poll_interfaces(); @@ -227,21 +207,33 @@ impl UdpSocket { ) } - /// Detect whether the socket needs to receive/can send. - /// - /// Return is + /// Whether the socket is readable or writable. pub fn poll(&self) -> AxResult { - SOCKET_SET.poll_interfaces(); SOCKET_SET.with_socket_mut::(self.handle, |socket| { - if !socket.is_open() { - debug!(" udp socket close"); - } Ok(PollState { readable: socket.is_open() && socket.can_recv(), writable: socket.is_open() && socket.can_send(), }) }) } + + fn block_on(&self, mut f: F) -> AxResult + where + F: FnMut() -> AxResult, + { + if self.nonblock { + f() + } else { + loop { + SOCKET_SET.poll_interfaces(); + match f() { + Ok(t) => return Ok(t), + Err(AxError::WouldBlock) => axtask::yield_now(), + Err(e) => return Err(e), + } + } + } + } } impl Drop for UdpSocket { diff --git a/modules/axtask/src/run_queue.rs b/modules/axtask/src/run_queue.rs index b7f412b28b..9c31f4384c 100644 --- a/modules/axtask/src/run_queue.rs +++ b/modules/axtask/src/run_queue.rs @@ -47,7 +47,7 @@ impl AxRunQueue { pub fn yield_current(&mut self) { let curr = crate::current(); - debug!("task yield: {}", curr.id_name()); + trace!("task yield: {}", curr.id_name()); assert!(curr.is_running()); self.resched(false); } diff --git a/scripts/make/build_c.mk b/scripts/make/build_c.mk index c965364929..88504bf469 100644 --- a/scripts/make/build_c.mk +++ b/scripts/make/build_c.mk @@ -74,4 +74,6 @@ $(OUT_ELF): $(app-objs) $(c_lib) $(rust_lib) @printf " $(CYAN_C)Linking$(END_C) $(OUT_ELF)\n" $(call run_cmd,$(LD),$(LDFLAGS) $^ -o $@) +$(APP)/axbuild: ; + .PHONY: _gen_feat diff --git a/ulib/libax/src/cbindings/io_mpx/epoll.rs b/ulib/libax/src/cbindings/io_mpx/epoll.rs index 3aa39d49df..c6d4bbc370 100644 --- a/ulib/libax/src/cbindings/io_mpx/epoll.rs +++ b/ulib/libax/src/cbindings/io_mpx/epoll.rs @@ -192,6 +192,8 @@ pub unsafe extern "C" fn ax_epoll_wait( .then(|| current_time() + Duration::from_millis(timeout as u64)); let epoll_instance = EpollInstance::from_fd(epfd)?; loop { + #[cfg(feature = "net")] + axnet::poll_interfaces(); let events_num = epoll_instance.poll_all(events)?; if events_num > 0 { return Ok(events_num as c_int); diff --git a/ulib/libax/src/cbindings/io_mpx/select.rs b/ulib/libax/src/cbindings/io_mpx/select.rs index be235aca2c..a17cdb16eb 100644 --- a/ulib/libax/src/cbindings/io_mpx/select.rs +++ b/ulib/libax/src/cbindings/io_mpx/select.rs @@ -132,17 +132,18 @@ pub unsafe extern "C" fn ax_select( zero_fd_set(exceptfds, nfds); loop { - if deadline.map_or(false, |ddl| current_time() >= ddl) { - debug!(" timeout!"); - return Ok(0); - } - + #[cfg(feature = "net")] + axnet::poll_interfaces(); let res = fd_sets.poll_all(readfds, writefds, exceptfds)?; if res > 0 { return Ok(res); - } else { - crate::thread::yield_now(); } + + if deadline.map_or(false, |ddl| current_time() >= ddl) { + debug!(" timeout!"); + return Ok(0); + } + crate::thread::yield_now(); } }) } diff --git a/ulib/libax/src/net/tcp.rs b/ulib/libax/src/net/tcp.rs index ce25a4b757..8a9779b9c5 100644 --- a/ulib/libax/src/net/tcp.rs +++ b/ulib/libax/src/net/tcp.rs @@ -15,7 +15,7 @@ pub struct TcpListener { impl TcpStream { /// Opens a TCP connection to a remote host. pub fn connect(addr: SocketAddr) -> io::Result { - let mut socket = TcpSocket::new(); + let socket = TcpSocket::new(); socket.connect(addr)?; Ok(Self { socket }) } @@ -56,7 +56,7 @@ impl TcpListener { /// Creates a new `TcpListener` which will be bound to the specified /// address. pub fn bind(addr: SocketAddr) -> io::Result { - let mut socket = TcpSocket::new(); + let socket = TcpSocket::new(); socket.bind(addr)?; socket.listen()?; Ok(Self { socket }) @@ -72,7 +72,7 @@ impl TcpListener { /// This function will block the calling thread until a new TCP connection /// is established. When established, the corresponding [`TcpStream`] and the /// remote peer's address will be returned. - pub fn accept(&mut self) -> io::Result<(TcpStream, SocketAddr)> { + pub fn accept(&self) -> io::Result<(TcpStream, SocketAddr)> { let socket = self.socket.accept()?; let addr = socket.peer_addr()?; Ok((TcpStream { socket }, addr))