Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add jsonrpc batch request limit #4529

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions resource/ckb.toml
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,11 @@ enable_deprecated_rpc = false # {{
# integration => enable_deprecated_rpc = true
# }}

# By default, there is no limitation on the size of batch request size
# a huge batch request may cost a lot of memory or makes the RPC server slow,
# to avoid this, you may want to add a limit for the batch request size.
# rpc_batch_limit = 2000

[tx_pool]
max_tx_pool_size = 180_000_000 # 180mb
min_fee_rate = 1_000 # Here fee_rate are calculated directly using size in units of shannons/KB
Expand Down
1 change: 0 additions & 1 deletion rpc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ async-stream = "0.3.3"
ckb-async-runtime = { path = "../util/runtime", version = "= 0.117.0-pre" }
# issue tracking: https://github.com/GREsau/schemars/pull/251
schemars = { version = "0.8.19", package = "ckb_schemars" }

[dev-dependencies]
reqwest = { version = "=0.11.20", features = ["blocking", "json"] }
serde = { version = "1.0", features = ["derive"] }
Expand Down
77 changes: 74 additions & 3 deletions rpc/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,32 @@ use ckb_async_runtime::Handle;
use ckb_error::AnyError;
use ckb_logger::info;

use axum::http::StatusCode;
use axum::{body::Bytes, http::StatusCode, response::Response, Json};

use jsonrpc_core::{MetaIoHandler, Metadata, Request};

use ckb_stop_handler::{new_tokio_exit_rx, CancellationToken};
use futures_util::future;
use futures_util::future::Either::{Left, Right};
use jsonrpc_core::types::error::ErrorCode;
use jsonrpc_core::types::Response as RpcResponse;
use jsonrpc_core::Error;

use futures_util::{SinkExt, TryStreamExt};
use jsonrpc_core::MetaIoHandler;
use jsonrpc_utils::axum_utils::{handle_jsonrpc, handle_jsonrpc_ws};
use jsonrpc_utils::axum_utils::handle_jsonrpc_ws;
use jsonrpc_utils::pub_sub::Session;
use jsonrpc_utils::stream::{serve_stream_sink, StreamMsg, StreamServerConfig};
use std::net::{SocketAddr, ToSocketAddrs};
use std::sync::Arc;
use std::sync::OnceLock;
use std::time::Duration;
use tokio::net::TcpListener;
use tokio_util::codec::{FramedRead, FramedWrite, LinesCodec, LinesCodecError};
use tower_http::cors::CorsLayer;
use tower_http::timeout::TimeoutLayer;

static JSONRPC_BATCH_LIMIT: OnceLock<usize> = OnceLock::new();

#[doc(hidden)]
#[derive(Debug)]
pub struct RpcServer {
Expand All @@ -39,6 +50,10 @@ impl RpcServer {
/// * `io_handler` - RPC methods handler. See [ServiceBuilder](../service_builder/struct.ServiceBuilder.html).
/// * `handler` - Tokio runtime handle.
pub fn new(config: RpcConfig, io_handler: IoHandler, handler: Handle) -> Self {
if let Some(jsonrpc_batch_limit) = config.rpc_batch_limit {
let _ = JSONRPC_BATCH_LIMIT.get_or_init(|| jsonrpc_batch_limit);
}

let rpc = Arc::new(io_handler);

let http_address = Self::start_server(
Expand Down Expand Up @@ -195,3 +210,59 @@ async fn get_error_handler() -> impl IntoResponse {
"Used HTTP Method is not allowed. POST or OPTIONS is required",
)
}

async fn handle_jsonrpc<T: Default + Metadata>(
Extension(io): Extension<Arc<MetaIoHandler<T>>>,
req_body: Bytes,
) -> Response {
let make_error_response = |error| {
Json(jsonrpc_core::Failure {
jsonrpc: Some(jsonrpc_core::Version::V2),
id: jsonrpc_core::Id::Null,
error,
})
.into_response()
};

let req = match std::str::from_utf8(req_body.as_ref()) {
Ok(req) => req,
Err(_) => {
return make_error_response(jsonrpc_core::Error::parse_error());
}
};

let req = serde_json::from_str::<Request>(req);
let result = match req {
Err(_error) => Left(future::ready(Some(RpcResponse::from(
Error::new(ErrorCode::ParseError),
Some(jsonrpc_core::Version::V2),
)))),
Ok(request) => {
if let Request::Batch(ref arr) = request {
if let Some(batch_size) = JSONRPC_BATCH_LIMIT.get() {
if arr.len() > *batch_size {
return make_error_response(jsonrpc_core::Error::invalid_params(format!(
"batch size is too large, expect it less than: {}",
batch_size
)));
}
}
}
Right(io.handle_rpc_request(request, T::default()))
}
};

if let Some(response) = result.await {
serde_json::to_string(&response)
.map(|json| {
(
[(axum::http::header::CONTENT_TYPE, "application/json")],
json,
)
.into_response()
})
.unwrap_or_else(|_| StatusCode::INTERNAL_SERVER_ERROR.into_response())
} else {
StatusCode::NO_CONTENT.into_response()
}
}
22 changes: 13 additions & 9 deletions rpc/src/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,6 @@ impl fmt::Display for RpcTestRequest {
}
}

impl RpcTestRequest {
fn json(&self) -> String {
serde_json::to_string(self).unwrap()
}
}

#[derive(Debug, Deserialize, Serialize, Clone, PartialEq, Default)]
struct RpcTestResponse {
pub id: usize,
Expand Down Expand Up @@ -89,6 +83,18 @@ pub(crate) struct RpcTestSuite {

impl RpcTestSuite {
fn rpc(&self, request: &RpcTestRequest) -> RpcTestResponse {
self.send_request(request)
.json::<RpcTestResponse>()
.expect("Deserialize RpcTestRequest")
}

fn rpc_batch(&self, request: &[RpcTestRequest]) -> Result<Vec<RpcTestResponse>, String> {
let res = self.send_request(request);
res.json::<Vec<RpcTestResponse>>()
.map_err(|res| format!("batch request failed : {:?}", res))
}

fn send_request<T: Serialize + ?Sized>(&self, request: &T) -> reqwest::blocking::Response {
self.rpc_client
.post(&self.rpc_uri)
.json(&request)
Expand All @@ -97,11 +103,9 @@ impl RpcTestSuite {
panic!(
"Failed to call RPC request: {:?}\n\nrequest = {:?}",
e,
request.json(),
serde_json::to_string(request).unwrap(),
)
})
.json::<RpcTestResponse>()
.expect("Deserialize RpcTestRequest")
}

async fn tcp(&self, request: &RpcTestRequest) -> Result<RpcTestResponse, Box<dyn Error>> {
Expand Down
26 changes: 26 additions & 0 deletions rpc/src/tests/module/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,32 @@ fn test_rpc_tcp() {
assert_eq!(res.unwrap().result, "0x1e0014000000");
}

#[test]
fn test_rpc_batch_request_limit() {
let suite = setup_rpc();
let single_request = RpcTestRequest {
id: 42,
jsonrpc: "2.0".to_string(),
method: "generate_epochs".to_string(),
params: vec!["0x20000000000".into()],
};

let mut batch_request = vec![];
for _i in 0..1001 {
batch_request.push(single_request.clone());
}

// exceed limit with 1001
let res = suite.rpc_batch(&batch_request);
assert!(res.is_err());
eprintln!("res: {:?}", res);

// batch request will success with 1000
batch_request.remove(0);
let res = suite.rpc_batch(&batch_request);
assert!(res.is_ok());
}

// setup a chain for integration test rpc
fn setup_rpc() -> RpcTestSuite {
const INITIAL_PRIMARY_EPOCH_REWARD: Capacity = Capacity::shannons(1_917_808_21917808);
Expand Down
1 change: 1 addition & 0 deletions rpc/src/tests/setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ pub(crate) fn setup_rpc_test_suite(height: u64, consensus: Option<Consensus>) ->
ws_listen_address: None,
max_request_body_size: 20_000_000,
threads: None,
rpc_batch_limit: Some(1000),
// enable all rpc modules in unit test
modules: vec![
RpcModule::Net,
Expand Down
3 changes: 3 additions & 0 deletions test/template/ckb.toml
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,9 @@ enable_deprecated_rpc = true
# threads number for RPC service, default value is the number of CPU cores
# threads = 4

# the largest rpc batch request size, default is disabled
# rpc_batch_limit = 2000

[tx_pool]
max_tx_pool_size = 180_000_000 # 180mb
min_fee_rate = 0 # Here fee_rate are calculated directly using size in units of shannons/KB
Expand Down
2 changes: 2 additions & 0 deletions util/app-config/src/configs/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ pub struct Config {
pub max_request_body_size: usize,
/// Number of RPC worker threads.
pub threads: Option<usize>,
/// Number of RPC batch limit.
pub rpc_batch_limit: Option<usize>,
/// Enabled RPC modules.
pub modules: Vec<Module>,
/// Rejects txs with scripts that might trigger known bugs
Expand Down
Loading