Skip to content

Commit

Permalink
feat(net): 实现阻塞式unix stream socket 读写机制 (#930)
Browse files Browse the repository at this point in the history
* 修复mprotect系统调用未正确设置vm_flags的错误 (#847)

* fix(time): modify update wall time (#836)

更改了时间子系统的update_wall_time函数,通过读取当前周期数,计算delta值进行更新,而不是通过传入delta值进行更新

* chore: 调整triagebot.toml以适应新的组织架构 (#848)

* doc: 完善README.md (#849)

* doc: 完善README.md

* chore: 更新sphinx相关配置,适应read the docs的更新 (#850)

根据read the docs在7月15日blog,进行此修改

https://about.readthedocs.com/blog/2024/07/addons-by-default/

* feat(driver/net): 实现Loopback网卡接口 (#845)

* 初步实现loopback设备

* fix: build-scripts和tools目录下的make check指定工具链版本 (#861)

* fix: tcp poll没有正确处理posix socket的listen状态的问题 (#859)

* chore: 将工具链更新到2024-07-23 (#864)

* chore: 将工具链更新到2024-07-23

* feat(fs): add eventfd syscall support (#858)

* feat(fs): add eventfd syscall support

* refactor: 删除过时的va-pa转换函数,改为统一使用MMArch (#862)

* 默认nightly-2024-07-23 & config改为config.toml (#872)

* fix: 修复由于升级到2024-07-23工具链之后,某些机器上面内核运行一直fault的问题。 (#870)

* fix: 修复由于升级到2024-07-23工具链之后,某些机器上面内核运行一直fault的问题。

* feat(cred): 初步实现Cred (#846)

* 初步实现Cred

* 添加seteuid和setegid

* 添加cred测试程序

* 修改Cred::fscmp返回结果为CredFsCmp枚举

* 完善root用户相关信息

* fix: 修复键盘码解析器没能正确处理类似ctrl C的控制字符的问题 (#877)

* fix: 修复键盘码解析器没能正确处理类似ctrl C的控制字符的问题

* fix: 解决ntty潜在的panic问题

* ci: enable ci workflow on branches other than master (#891)

* 修复unlink、unlinkat系统调用的路径错误 (#892)

* fix: socket shutdown wrong implement (#893)

* feat: 增加tokio异步运行时支持 (#894)

* fix the EventFdFlags error

* feat: support tokio (Single thread version)

Fix deadlock issue on closing file.
Add function for PipeInode and EventFdInode.

* feat: 实现unix stream sock和其状态

* 握手支持

* feat:建立连接功能实现

* feat:unix stream socket 初版

* fix: 消除标红

* feat: 阻塞式读写buffer

* feat: 实现unix socket buffer

* 不小心改了inode

* fix: 修复客户端和服务端buffer不互通的问题

* 111

* fix: 修改建立连接逻辑

* merge net

* feat: 解决bing问题

* fix: 消除红码

* feat: unix stream socket 阻塞式读写机制实现

---------

Co-authored-by: MemoryShore <[email protected]>
Co-authored-by: 黄铭涛 <[email protected]>
Co-authored-by: LoGin <[email protected]>
Co-authored-by: linfeng <[email protected]>
Co-authored-by: Jomo <[email protected]>
Co-authored-by: Chiichen <[email protected]>
Co-authored-by: Samuel Dai <[email protected]>
  • Loading branch information
8 people authored Sep 19, 2024
1 parent 3ab8d05 commit 21e5982
Show file tree
Hide file tree
Showing 17 changed files with 637 additions and 239 deletions.
1 change: 1 addition & 0 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -161,4 +161,5 @@
],
"rust-analyzer.showUnlinkedFileNotification": false,
"editor.accessibilityPageSize": 15,
"makefile.configureOnOpen": false,
}
4 changes: 2 additions & 2 deletions kernel/src/filesystem/vfs/open.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use alloc::sync::Arc;
use log::warn;
use log::{debug, warn};
use system_error::SystemError;

use super::{
Expand Down Expand Up @@ -81,7 +81,7 @@ fn do_sys_openat2(
how: OpenHow,
follow_symlink: bool,
) -> Result<usize, SystemError> {
// debug!("open path: {}, how: {:?}", path, how);
//debug!("open path: {}, how: {:?}", path, how);
let path = path.trim();

let (inode_begin, path) = user_path_at(&ProcessManager::current_pcb(), dirfd, path)?;
Expand Down
4 changes: 2 additions & 2 deletions kernel/src/filesystem/vfs/syscall.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use core::ffi::c_void;
use core::mem::size_of;

use alloc::{string::String, sync::Arc, vec::Vec};
use log::warn;
use log::{debug, warn};
use system_error::SystemError;

use crate::producefs;
Expand Down Expand Up @@ -483,10 +483,10 @@ impl Syscall {
mode: u32,
follow_symlink: bool,
) -> Result<usize, SystemError> {
debug!("path: {:?}", path);
let path = check_and_clone_cstr(path, Some(MAX_PATHLEN))?
.into_string()
.map_err(|_| SystemError::EINVAL)?;

let open_flags: FileMode = FileMode::from_bits(o_flags).ok_or(SystemError::EINVAL)?;
let mode = ModeType::from_bits(mode).ok_or(SystemError::EINVAL)?;
return do_sys_open(
Expand Down
17 changes: 17 additions & 0 deletions kernel/src/net/socket/buffer.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use alloc::vec::Vec;

use alloc::{sync::Arc,string::String};
use log::debug;
use system_error::SystemError;

use crate::libs::spinlock::SpinLock;
Expand Down Expand Up @@ -33,6 +34,10 @@ impl Buffer {
return self.write_buffer.lock().is_empty();
}

pub fn is_write_buf_full(&self) -> bool {
return self.write_buffer.lock().len() >= self.metadata.buf_size;
}

pub fn read_read_buffer(&self, buf: &mut [u8]) -> Result<usize, SystemError> {
let mut read_buffer = self.read_buffer.lock_irqsave();
let len = core::cmp::min(buf.len(), read_buffer.len());
Expand All @@ -54,6 +59,18 @@ impl Buffer {

Ok(len)
}

pub fn write_write_buffer(&self, buf: &[u8]) -> Result<usize, SystemError> {
let mut buffer = self.write_buffer.lock_irqsave();

let len = buf.len();
if self.metadata.buf_size - buffer.len() < len {
return Err(SystemError::ENOBUFS);
}
buffer.extend_from_slice(buf);

Ok(len)
}
}

#[derive(Debug)]
Expand Down
6 changes: 5 additions & 1 deletion kernel/src/net/socket/inet/datagram/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,11 @@ impl UdpSocket {
let mut inner = self.inner.write();
if let Some(UdpInner::Unbound(unbound)) = inner.take() {
let bound = unbound.bind(local_endpoint)?;
bound.inner().iface().common().bind_socket(self.self_ref.upgrade().unwrap());
bound
.inner()
.iface()
.common()
.bind_socket(self.self_ref.upgrade().unwrap());
*inner = Some(UdpInner::Bound(bound));
return Ok(());
}
Expand Down
7 changes: 4 additions & 3 deletions kernel/src/net/socket/unix/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,17 @@ fn create_unix_socket(
sock_type: Type,
) -> Result<Arc<Inode>, SystemError> {
match sock_type {
// Type::Stream => {
// Ok(stream::StreamSocket::new())
// },
Type::Stream |Type::Datagram=> {
stream::StreamSocket::new_inode()
},
Type::SeqPacket |Type::Datagram=>{
// Ok(seqpacket::SeqpacketSocket::new(false))
seqpacket::SeqpacketSocket::new_inode(false)
},
_ => {
Err(EPROTONOSUPPORT)
}
_ => Err(EPROTONOSUPPORT),
}
}

Expand Down
158 changes: 132 additions & 26 deletions kernel/src/net/socket/unix/stream/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,12 @@ use log::debug;
use system_error::SystemError;

use crate::libs::mutex::Mutex;
use crate::net::socket::{Endpoint, ShutdownTemp};
use crate::net::socket::buffer::Buffer;
use crate::net::socket::unix::stream::StreamSocket;
use crate::net::socket::{Endpoint, Inode, ShutdownTemp};

use alloc::collections::VecDeque;
use alloc::sync::Arc;

#[derive(Debug)]
pub enum Inner {
Expand All @@ -15,50 +18,136 @@ pub enum Inner {
Listener(Listener),
}

#[derive(Debug, Clone)]
#[derive(Debug)]
pub struct Init {
addr: Option<Endpoint>,
}

impl Init {
pub(super) fn new() -> Self {
Self { addr: None }
Self { addr: None}
}

pub(super) fn bind(&mut self, endpoint_to_bind: Endpoint) -> Result<(), SystemError> {
self.addr = Some(endpoint_to_bind);
Ok(())
if self.addr.is_some() {
log::error!("the socket is already bound");
return Err(SystemError::EINVAL);
}

match endpoint_to_bind {
Endpoint::Inode(_) => self.addr = Some(endpoint_to_bind),
_ => return Err(SystemError::EINVAL),
}

return Ok(())
}

pub(super) fn addr(&self) -> Option<Endpoint> {
self.addr.clone()
pub(super) fn endpoint(&self) -> Option<&Endpoint> {
self.addr.as_ref().clone()
}
}

#[derive(Debug, Clone)]
pub struct Connected {
addr: Option<Endpoint>,
peer_addr: Option<Endpoint>,
buffer: Arc<Buffer>,
}

impl Connected {
pub fn new(addr: Option<Endpoint>, peer_addr: Option<Endpoint>) -> Self {
Self { addr, peer_addr }
pub fn new_pair(addr: Option<Endpoint>, peer_addr: Option<Endpoint>) -> (Self, Self) {
let this = Connected{
addr: addr.clone(),
peer_addr: peer_addr.clone(),
buffer: Buffer::new()
};
let peer = Connected{
addr: peer_addr,
peer_addr: addr,
buffer: Buffer::new()
};

return (this, peer);
}

pub fn new_pair(addr: Option<Endpoint>, peer_addr: Option<Endpoint>) -> (Connected, Connected) {
let this = Connected::new(addr.clone(), peer_addr.clone());
let peer = Connected::new(peer_addr.clone(), addr.clone());
pub fn endpoint(&self) -> Option<&Endpoint> {
self.addr.as_ref()
}

return (this, peer);
pub fn set_addr(&mut self, addr: Option<Endpoint>) {
self.addr = addr;
}

pub fn peer_endpoint(&self) -> Option<&Endpoint> {
self.peer_addr.as_ref()
}

pub fn set_peer_addr(&mut self, peer: Option<Endpoint>) {
self.peer_addr = peer;
}

pub(super) fn addr(&self) -> Option<Endpoint> {
self.addr.clone()
fn send_slice(&self, buf: &[u8]) -> Result<usize, SystemError> {
//写入对端buffer
let peer_inode = match self.peer_addr.as_ref().unwrap() {
Endpoint::Inode(inode) => inode,
_ => return Err(SystemError::EINVAL),
};
let peer_socket = Arc::downcast::<StreamSocket>(peer_inode.inner()).map_err(|_| SystemError::EINVAL)?;
let usize= match &* peer_socket.inner.read() {
Inner::Connected(conntected) => {
let usize = conntected.buffer.write_read_buffer(buf)?;
usize
},
_ => {
debug!("no! is not connested!");
return Err(SystemError::EINVAL);
}
};
Ok(usize)
}

pub fn peer_addr(&self) -> Option<Endpoint> {
self.peer_addr.clone()
fn can_send(&self) -> Result<bool, SystemError>{
//查看连接体里的buf是否非满
let peer_inode = match self.peer_addr.as_ref().unwrap() {
Endpoint::Inode(inode) => inode,
_ => return Err(SystemError::EINVAL),
};
let peer_socket = Arc::downcast::<StreamSocket>(peer_inode.inner()).map_err(|_| SystemError::EINVAL)?;
let is_full = match &* peer_socket.inner.read() {
Inner::Connected(connected) => {
connected.buffer.is_read_buf_full()
},
_ => {
return Err(SystemError::EINVAL)
},
};
debug!("can send? :{}", !is_full);
Ok(!is_full)
}

fn can_recv(&self) -> bool {
//查看连接体里的buf是否非空
return !self.buffer.is_read_buf_empty();
}

pub fn try_send(&self, buf: &[u8]) -> Result<usize, SystemError> {
if self.can_send()? {
return self.send_slice(buf);
} else {
return Err(SystemError::ENOBUFS);
}
}

fn recv_slice(&self, buf: &mut [u8]) -> Result<usize, SystemError> {
return self.buffer.read_read_buffer(buf);
}

pub fn try_recv(&self, buf: &mut [u8]) -> Result<usize, SystemError> {
if self.can_recv() {
return self.recv_slice(buf);
} else {
return Err(SystemError::EINVAL);
}
}

pub fn shutdown(&self, how: ShutdownTemp) -> Result<(), SystemError> {
Expand All @@ -77,7 +166,7 @@ impl Connected {
#[derive(Debug)]
pub struct Listener {
addr: Option<Endpoint>,
incoming_connects: Mutex<VecDeque<Connected>>,
incoming_connects: Mutex<VecDeque<Arc<Inode>>>,
backlog: AtomicUsize,
}

Expand All @@ -95,28 +184,45 @@ impl Listener {
return Ok(());
}

pub fn push_incoming(&self, client_addr: Option<Endpoint>) -> Result<Connected, SystemError> {
pub fn push_incoming(&self, server_inode: Arc<Inode>) -> Result<(), SystemError> {
let mut incoming_connects = self.incoming_connects.lock();

if incoming_connects.len() >= self.backlog.load(Ordering::Relaxed) {
debug!("unix stream listen socket connected queue is full!");
return Err(SystemError::EAGAIN_OR_EWOULDBLOCK);
}

let (server_conn, client_conn) = Connected::new_pair(self.addr.clone(), client_addr);
incoming_connects.push_back(server_inode);

incoming_connects.push_back(server_conn);

return Ok(client_conn);
return Ok(());
}

pub fn pop_incoming(&self) -> Option<Connected> {
pub fn pop_incoming(&self) -> Option<Arc<Inode>> {
let mut incoming_connects = self.incoming_connects.lock();

return incoming_connects.pop_front();
}

pub(super) fn addr(&self) -> Option<Endpoint> {
self.addr.clone()
pub(super) fn endpoint(&self) -> Option<&Endpoint> {
self.addr.as_ref()
}

pub(super) fn is_acceptable(&self) -> bool {
return self.incoming_connects.lock().len() != 0
}

pub(super) fn try_accept(&self) -> Result<(Arc<Inode>, Endpoint), SystemError> {
let mut incoming_connecteds = self.incoming_connects.lock();
debug!("incom len {}", incoming_connecteds.len());
let connected = incoming_connecteds.pop_front().ok_or_else(|| SystemError::EAGAIN_OR_EWOULDBLOCK)?;
let socket = Arc::downcast::<StreamSocket>(connected.inner()).map_err(|_| SystemError::EINVAL)?;
let peer = match & *socket.inner.read() {
Inner::Connected(connected) => connected.peer_endpoint().unwrap().clone(),
_ => {
return Err(SystemError::ENOTCONN)
}
};
debug!("server accept!");
return Ok((Inode::new(socket), peer))
}
}
Loading

0 comments on commit 21e5982

Please sign in to comment.