Skip to content

Commit

Permalink
also add tcp_keepalive_probe. prettify code.
Browse files Browse the repository at this point in the history
  • Loading branch information
zephyrchien committed Dec 10, 2023
1 parent 1f7b1b1 commit 771429e
Show file tree
Hide file tree
Showing 10 changed files with 74 additions and 43 deletions.
9 changes: 4 additions & 5 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 3 additions & 4 deletions realm_core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ license = "MIT"
[dependencies]
# realm
realm_io = "0.4"
realm_syscall = "0.1.7"
realm_hook = { version = "0.1.4", optional = true }
realm_lb = { version = "0.1.0", optional = true }
realm_syscall = "0.1"
realm_hook = { version = "0.1", optional = true }
realm_lb = { version = "0.1", optional = true }
kaminari = { version = "0.11", features = ["ws", "tls", "mix"], optional = true }

# other
Expand All @@ -27,7 +27,6 @@ pin-project = "1"
trust-dns-resolver = "0.22"
tokio = { version = "1.18", features = ["rt", "net", "time"] }
proxy-protocol = { version = "0.5", optional = true }
socket2 = "0.5.5"

[features]
default = []
Expand Down
10 changes: 6 additions & 4 deletions realm_core/src/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,10 @@ impl ProxyOpts {
/// Connect or associate options.
#[derive(Debug, Default, Clone)]
pub struct ConnectOpts {
pub tcp_keepalive: u64,
pub connect_timeout: usize,
pub associate_timeout: usize,
pub tcp_keepalive: usize,
pub tcp_keepalive_probe: usize,
pub bind_address: Option<SocketAddr>,
pub bind_interface: Option<String>,

Expand Down Expand Up @@ -87,9 +88,10 @@ impl Display for Endpoint {
impl Display for ConnectOpts {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
let ConnectOpts {
tcp_keepalive,
connect_timeout,
associate_timeout,
tcp_keepalive,
tcp_keepalive_probe,
bind_address,
bind_interface,

Expand Down Expand Up @@ -128,8 +130,8 @@ impl Display for ConnectOpts {

write!(
f,
"tcp-keepalive={}s connect-timeout={}s, associate-timeout={}s; ",
tcp_keepalive, connect_timeout, associate_timeout
"tcp-keepalive={}s[{}] connect-timeout={}s, associate-timeout={}s; ",
tcp_keepalive, tcp_keepalive_probe, connect_timeout, associate_timeout
)?;

#[cfg(feature = "transport")]
Expand Down
2 changes: 1 addition & 1 deletion realm_core/src/tcp/hook.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use crate::endpoint::RemoteAddr;
pub async fn pre_connect_hook<'a>(
local: &mut TcpStream,
raddr: &'a RemoteAddr,
extra_raddrs: &'a Vec<RemoteAddr>,
extra_raddrs: &'a [RemoteAddr],
) -> Result<&'a RemoteAddr> {
if !pre_conn::is_loaded() {
return Ok(raddr);
Expand Down
12 changes: 0 additions & 12 deletions realm_core/src/tcp/middle.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
use std::io::Result;
use std::time::Duration;

use tokio::net::TcpStream;

use super::socket;
Expand Down Expand Up @@ -77,16 +75,6 @@ pub async fn connect_and_relay(

// connect!
let mut remote = socket::connect(raddr, conn_opts.as_ref()).await?;

if *tcp_keepalive > 0 {
let sockref = socket2::SockRef::from(&remote);
let mut ka = socket2::TcpKeepalive::new();

ka = ka
.with_time(Duration::from_secs(*tcp_keepalive))
.with_interval(Duration::from_secs(*tcp_keepalive));
let _ = sockref.set_tcp_keepalive(&ka);
}
log::info!("[tcp]{} => {} as {}", local.peer_addr()?, raddr, remote.peer_addr()?);

// after connected
Expand Down
15 changes: 6 additions & 9 deletions realm_core/src/tcp/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ mod proxy;
mod transport;

use std::io::{ErrorKind, Result};
use std::time::Duration;

use crate::trick::Ref;
use crate::endpoint::Endpoint;
Expand All @@ -35,6 +34,7 @@ pub async fn run_tcp(endpoint: Endpoint) -> Result<()> {
let extra_raddrs = Ref::new(&extra_raddrs);

let lis = socket::bind(&laddr).unwrap_or_else(|e| panic!("[tcp]failed to bind {}: {}", &laddr, e));
let keepalive = socket::keepalive::build(&conn_opts);

loop {
let (local, addr) = match lis.accept().await {
Expand All @@ -51,15 +51,12 @@ pub async fn run_tcp(endpoint: Endpoint) -> Result<()> {

// ignore error
let _ = local.set_nodelay(true);
if conn_opts.tcp_keepalive > 0 {
let sockref = socket2::SockRef::from(&local);
let mut ka = socket2::TcpKeepalive::new();
ka = ka
.with_time(Duration::from_secs(conn_opts.tcp_keepalive))
.with_interval(Duration::from_secs(conn_opts.tcp_keepalive));

let _ = sockref.set_tcp_keepalive(&ka);
// set tcp_keepalive
if let Some(kpa) = &keepalive {
use socket::keepalive::SockRef;
SockRef::from(&local).set_tcp_keepalive(kpa)?;
}

tokio::spawn(async move {
match connect_and_relay(local, raddr, conn_opts, extra_raddrs).await {
Ok(..) => log::debug!("[tcp]{} => {}, finish", addr, raddr.as_ref()),
Expand Down
28 changes: 28 additions & 0 deletions realm_core/src/tcp/socket.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::io::{Result, Error, ErrorKind};
use std::net::SocketAddr;
use std::time::Duration;

use realm_syscall::new_tcp_socket;
use tokio::net::{TcpSocket, TcpStream, TcpListener};
Expand Down Expand Up @@ -32,6 +33,7 @@ pub async fn connect(raddr: &RemoteAddr, conn_opts: &ConnectOpts) -> Result<TcpS
} = conn_opts;

let mut last_err = None;
let keepalive = keepalive::build(conn_opts);

for addr in resolve_addr(raddr).await?.iter() {
log::debug!("[tcp]{} resolved as {}", raddr, &addr);
Expand All @@ -51,6 +53,10 @@ pub async fn connect(raddr: &RemoteAddr, conn_opts: &ConnectOpts) -> Result<TcpS
realm_syscall::bind_to_device(&socket, iface)?;
}

if let Some(kpa) = &keepalive {
socket.set_tcp_keepalive(kpa)?;
}

let socket = TcpSocket::from_std_stream(socket.into());

match timeoutfut(socket.connect(addr), *connect_timeout).await {
Expand All @@ -68,3 +74,25 @@ pub async fn connect(raddr: &RemoteAddr, conn_opts: &ConnectOpts) -> Result<TcpS

Err(last_err.unwrap_or_else(|| Error::new(ErrorKind::InvalidInput, "could not connect to any address")))
}

pub(super) mod keepalive {
use super::*;
pub use realm_syscall::socket2::{SockRef, TcpKeepalive};
pub fn build(conn_opts: &ConnectOpts) -> Option<TcpKeepalive> {
let ConnectOpts {
tcp_keepalive,
tcp_keepalive_probe,
..
} = conn_opts;
if *tcp_keepalive == 0 {
return None;
};
let sec = Duration::from_secs(*tcp_keepalive as u64);
let probe = *tcp_keepalive_probe as u32;
let kpa = TcpKeepalive::new()
.with_time(sec)
.with_interval(sec)
.with_retries(probe);
Some(kpa)
}
}
10 changes: 8 additions & 2 deletions src/cmd/flag.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,13 +212,13 @@ pub fn add_global_options(app: Command) -> Command {
let app = app.next_help_heading("TIMEOUT OPTIONS").args([
Arg::new("tcp_timeout")
.long("tcp-timeout")
.help("override tcp timeout")
.help("override tcp timeout(5s)")
.value_name("second")
.takes_value(true)
.display_order(0),
Arg::new("udp_timeout")
.long("udp-timeout")
.help("override udp timeout")
.help("override udp timeout(30s)")
.value_name("second")
.takes_value(true)
.display_order(1),
Expand All @@ -228,6 +228,12 @@ pub fn add_global_options(app: Command) -> Command {
.value_name("second")
.takes_value(true)
.display_order(2),
Arg::new("tcp_keepalive_probe")
.long("tcp-keepalive-probe")
.help("override default tcp keepalive count(3)")
.value_name("count")
.takes_value(true)
.display_order(3),
]);

app
Expand Down
21 changes: 16 additions & 5 deletions src/conf/net.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ use serde::{Serialize, Deserialize};
use realm_core::endpoint::{ConnectOpts, ProxyOpts};

use super::Config;
use crate::consts::{TCP_KEEPALIVE, TCP_TIMEOUT, UDP_TIMEOUT};
use crate::consts::{TCP_TIMEOUT, UDP_TIMEOUT};
use crate::consts::{TCP_KEEPALIVE, TCP_KEEPALIVE_PROBE};
use crate::consts::PROXY_PROTOCOL_VERSION;
use crate::consts::PROXY_PROTOCOL_TIMEOUT;

Expand Down Expand Up @@ -33,7 +34,11 @@ pub struct NetConf {

#[serde(default)]
#[serde(skip_serializing_if = "Option::is_none")]
pub tcp_keepalive: Option<u64>,
pub tcp_keepalive: Option<usize>,

#[serde(default)]
#[serde(skip_serializing_if = "Option::is_none")]
pub tcp_keepalive_probe: Option<usize>,

#[serde(default)]
#[serde(skip_serializing_if = "Option::is_none")]
Expand Down Expand Up @@ -73,7 +78,8 @@ impl Config for NetConf {

let no_tcp = unbox!(no_tcp);
let use_udp = unbox!(use_udp);
let tcp_keepalive = unbox!(tcp_keepalive, TCP_KEEPALIVE);
let tcp_kpa = unbox!(tcp_keepalive, TCP_KEEPALIVE);
let tcp_kpa_probe = unbox!(tcp_keepalive_probe, TCP_KEEPALIVE_PROBE);
let tcp_timeout = unbox!(tcp_timeout, TCP_TIMEOUT);
let udp_timeout = unbox!(udp_timeout, UDP_TIMEOUT);

Expand All @@ -84,7 +90,8 @@ impl Config for NetConf {
let accept_proxy_timeout = unbox!(accept_proxy_timeout, PROXY_PROTOCOL_TIMEOUT);

let conn_opts = ConnectOpts {
tcp_keepalive: tcp_keepalive,
tcp_keepalive: tcp_kpa,
tcp_keepalive_probe: tcp_kpa_probe,
connect_timeout: tcp_timeout,
associate_timeout: udp_timeout,

Expand Down Expand Up @@ -120,6 +127,7 @@ impl Config for NetConf {
rst!(self, no_tcp, other);
rst!(self, use_udp, other);
rst!(self, tcp_keepalive, other);
rst!(self, tcp_keepalive_probe, other);
rst!(self, tcp_timeout, other);
rst!(self, udp_timeout, other);
rst!(self, send_proxy, other);
Expand All @@ -136,6 +144,7 @@ impl Config for NetConf {
take!(self, no_tcp, other);
take!(self, use_udp, other);
take!(self, tcp_keepalive, other);
take!(self, tcp_keepalive_probe, other);
take!(self, tcp_timeout, other);
take!(self, udp_timeout, other);
take!(self, send_proxy, other);
Expand All @@ -162,7 +171,8 @@ impl Config for NetConf {
let no_tcp = unpack!("no_tcp");
let use_udp = unpack!("use_udp");

let tcp_keepalive = unpack!("tcp_keepalive", u64);
let tcp_keepalive = unpack!("tcp_keepalive", usize);
let tcp_keepalive_probe = unpack!("tcp_keepalive", usize);
let tcp_timeout = unpack!("tcp_timeout", usize);
let udp_timeout = unpack!("udp_timeout", usize);

Expand All @@ -176,6 +186,7 @@ impl Config for NetConf {
no_tcp,
use_udp,
tcp_keepalive,
tcp_keepalive_probe,
tcp_timeout,
udp_timeout,
send_proxy,
Expand Down
3 changes: 2 additions & 1 deletion src/consts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@ use std::fmt::{Display, Formatter};
pub const DEFAULT_LOG_FILE: &str = "stdout";

// default timeout
pub const TCP_KEEPALIVE: u64 = 15;
pub const TCP_TIMEOUT: usize = 5;
pub const TCP_KEEPALIVE: usize = 15;
pub const TCP_KEEPALIVE_PROBE: usize = 3;
pub const UDP_TIMEOUT: usize = 30;

// default haproxy proxy-protocol version
Expand Down

0 comments on commit 771429e

Please sign in to comment.