Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: simplify and improve rpc TCP listener #618

Merged
merged 1 commit into from
Mar 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/rpc-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ tokio = { version = "1.15", default-features = false, features = ["rt-multi-thre
bytes-v10 = { version = "1.0", package = "bytes" }
async-trait = "0.1"
lru = "0.7"
socket2 = "0.4"
socket2 = { version = "0.4", features = ["all"] }
pprof = { version = "0.6", features = ["flamegraph", "cpp", "protobuf"]}
once_cell = "1.8"
jemalloc-ctl = { package = "tikv-jemalloc-ctl", version = "0.4.2" }
Expand Down
35 changes: 13 additions & 22 deletions crates/rpc-server/src/server.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,12 @@
// Taken and adapted from https://github.com/smol-rs/smol/blob/ad0839e1b3700dd33abb9bf23c1efd3c83b5bb2d/examples/hyper-server.rs
use std::net::SocketAddr;
#[cfg(unix)]
use std::os::unix::io::{FromRawFd, IntoRawFd};
#[cfg(windows)]
use std::os::windows::io::{FromRawSocket, IntoRawSocket};
use std::sync::Arc;
use std::time::Duration;

use anyhow::{Error, Result};
use hyper::service::{make_service_fn, service_fn};
use hyper::{body::HttpBody, server::conn::AddrIncoming, Body, Method, Request, Response, Server};
use tokio::net::TcpSocket;
use tokio::net::TcpListener;

use jsonrpc_v2::{RequestKind, ResponseObjects, Router, Server as JsonrpcServer};
use tokio::sync::{broadcast, mpsc};
Expand All @@ -24,30 +20,25 @@ pub async fn start_jsonrpc_server(
mut sub_shutdown: broadcast::Receiver<()>,
) -> Result<()> {
let rpc_server = registry.build_rpc_server()?;
let socket = socket2::Socket::new(socket2::Domain::IPV4, socket2::Type::STREAM, None)?;
let keepalive = socket2::TcpKeepalive::new().with_time(Duration::from_secs(10)); // Default 10s
socket.set_tcp_keepalive(&keepalive)?;

let listener = {
socket.bind(&listen_addr.into())?;
socket.set_nonblocking(true)?;
let socket = unsafe {
#[cfg(unix)]
let socket = TcpSocket::from_raw_fd(socket.into_raw_fd());
#[cfg(windows)]
let socket = TcpSocket::from_raw_socket(socket.into_raw_socket());
socket
};
socket.listen(128)? // Linux default, see /proc/sys/net/core/somaxconn
};
let listener = TcpListener::bind(listen_addr).await?;
let listener_ref = socket2::SockRef::from(&listener);
// Set TCP keepalive options with socket2 because tokio/hyper does not support setting interval/retries (yet).
let keepalive = socket2::TcpKeepalive::new()
.with_time(Duration::from_secs(10))
.with_interval(Duration::from_secs(5))
.with_retries(3);
// TCP keepalive options set on listening sockets are inhereted by accepted sockets (at least on linux and FreeBSD).
listener_ref.set_tcp_keepalive(&keepalive)?;

// Format the full address.
let url = format!("http://{}", listener.local_addr()?);
log::info!("JSONRPC server listening on {}", url);

// Start a hyper server.
let server =
Server::builder(AddrIncoming::from_listener(listener)?).serve(make_service_fn(move |_| {
let server = Server::builder(AddrIncoming::from_listener(listener)?)
.tcp_nodelay(true)
.serve(make_service_fn(move |_| {
let rpc_server = Arc::clone(&rpc_server);
async { Ok::<_, Error>(service_fn(move |req| serve(Arc::clone(&rpc_server), req))) }
}));
Expand Down