From 8dd5c1de57d3fed075c0d63dbffec2f437169ca1 Mon Sep 17 00:00:00 2001 From: yukang Date: Wed, 17 Jul 2024 18:56:08 +0800 Subject: [PATCH 1/6] add jsonrpc batch limit --- Cargo.lock | 1 + rpc/Cargo.toml | 1 + rpc/src/server.rs | 73 ++++++++++++++++++++++++++++-- rpc/src/tests/setup.rs | 1 + test/template/ckb.toml | 3 ++ util/app-config/src/configs/rpc.rs | 2 + 6 files changed, 78 insertions(+), 3 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 2cd0da914c..f18b8750b1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1453,6 +1453,7 @@ dependencies = [ "itertools 0.11.0", "jsonrpc-core", "jsonrpc-utils", + "once_cell", "pretty_assertions", "reqwest", "serde", diff --git a/rpc/Cargo.toml b/rpc/Cargo.toml index 461dc7a4fb..80c7268126 100644 --- a/rpc/Cargo.toml +++ b/rpc/Cargo.toml @@ -52,6 +52,7 @@ async-stream = "0.3.3" ckb-async-runtime = { path = "../util/runtime", version = "= 0.117.0-pre" } # issue tracking: https://github.com/GREsau/schemars/pull/251 schemars = { version = "0.8.19", package = "ckb_schemars" } +once_cell = "1.19.0" [dev-dependencies] reqwest = { version = "=0.11.20", features = ["blocking", "json"] } diff --git a/rpc/src/server.rs b/rpc/src/server.rs index 13fd55e37c..7fa65bd4a5 100644 --- a/rpc/src/server.rs +++ b/rpc/src/server.rs @@ -7,11 +7,19 @@ use ckb_async_runtime::Handle; use ckb_error::AnyError; use ckb_logger::info; -use axum::http::StatusCode; +use axum::{body::Bytes, http::StatusCode, response::Response, Json}; + +use jsonrpc_core::{MetaIoHandler, Metadata, Request}; + use ckb_stop_handler::{new_tokio_exit_rx, CancellationToken}; +use futures_util::future; +use futures_util::future::Either::{Left, Right}; +use jsonrpc_core::types::error::ErrorCode; +use jsonrpc_core::types::Response as RpcResponse; +use jsonrpc_core::Error; + use futures_util::{SinkExt, TryStreamExt}; -use jsonrpc_core::MetaIoHandler; -use jsonrpc_utils::axum_utils::{handle_jsonrpc, handle_jsonrpc_ws}; +use jsonrpc_utils::axum_utils::handle_jsonrpc_ws; use jsonrpc_utils::pub_sub::Session; use jsonrpc_utils::stream::{serve_stream_sink, StreamMsg, StreamServerConfig}; use std::net::{SocketAddr, ToSocketAddrs}; @@ -22,6 +30,10 @@ use tokio_util::codec::{FramedRead, FramedWrite, LinesCodec, LinesCodecError}; use tower_http::cors::CorsLayer; use tower_http::timeout::TimeoutLayer; +const DEFAULT_BATCH_LIMIT: usize = 2000; + +static JSONRPC_BATCH_LIMIT: once_cell::sync::OnceCell = once_cell::sync::OnceCell::new(); + #[doc(hidden)] #[derive(Debug)] pub struct RpcServer { @@ -39,6 +51,9 @@ impl RpcServer { /// * `io_handler` - RPC methods handler. See [ServiceBuilder](../service_builder/struct.ServiceBuilder.html). /// * `handler` - Tokio runtime handle. pub fn new(config: RpcConfig, io_handler: IoHandler, handler: Handle) -> Self { + let _ = JSONRPC_BATCH_LIMIT + .get_or_init(|| config.rpc_batch_limit.unwrap_or(DEFAULT_BATCH_LIMIT)); + let rpc = Arc::new(io_handler); let http_address = Self::start_server( @@ -195,3 +210,55 @@ async fn get_error_handler() -> impl IntoResponse { "Used HTTP Method is not allowed. POST or OPTIONS is required", ) } + +async fn handle_jsonrpc( + Extension(io): Extension>>, + req_body: Bytes, +) -> Response { + let make_error_response = |error| { + Json(jsonrpc_core::Failure { + jsonrpc: Some(jsonrpc_core::Version::V2), + id: jsonrpc_core::Id::Null, + error, + }) + .into_response() + }; + + let req = match std::str::from_utf8(req_body.as_ref()) { + Ok(req) => req, + Err(_) => { + return make_error_response(jsonrpc_core::Error::parse_error()); + } + }; + + let req = serde_json::from_str::(req); + let result = match req { + Err(_error) => Left(future::ready(Some(RpcResponse::from( + Error::new(ErrorCode::ParseError), + Some(jsonrpc_core::Version::V2), + )))), + Ok(request) => { + if let Request::Batch(ref arr) = request { + if let Some(batch_size) = JSONRPC_BATCH_LIMIT.get() { + if arr.len() > *batch_size { + return make_error_response(jsonrpc_core::Error::invalid_params(format!( + "batch size is too large, expect it less than: {}", + batch_size + ))); + } + } + } + Right(io.handle_rpc_request(request, T::default())) + } + }; + + if let Some(response) = result.await { + ( + [(axum::http::header::CONTENT_TYPE, "application/json")], + serde_json::to_string(&response).unwrap(), + ) + .into_response() + } else { + StatusCode::NO_CONTENT.into_response() + } +} diff --git a/rpc/src/tests/setup.rs b/rpc/src/tests/setup.rs index 5dc0dab2b5..59e162161e 100644 --- a/rpc/src/tests/setup.rs +++ b/rpc/src/tests/setup.rs @@ -178,6 +178,7 @@ pub(crate) fn setup_rpc_test_suite(height: u64, consensus: Option) -> ws_listen_address: None, max_request_body_size: 20_000_000, threads: None, + rpc_batch_limit: Some(1000), // enable all rpc modules in unit test modules: vec![ RpcModule::Net, diff --git a/test/template/ckb.toml b/test/template/ckb.toml index f713d47abc..935ba92d18 100644 --- a/test/template/ckb.toml +++ b/test/template/ckb.toml @@ -79,6 +79,9 @@ enable_deprecated_rpc = true # threads number for RPC service, default value is the number of CPU cores # threads = 4 +# the largest rpc batch request size, default is 2000 +# rpc_batch_limit = 2000 + [tx_pool] max_tx_pool_size = 180_000_000 # 180mb min_fee_rate = 0 # Here fee_rate are calculated directly using size in units of shannons/KB diff --git a/util/app-config/src/configs/rpc.rs b/util/app-config/src/configs/rpc.rs index abd357cbf6..1605e48e5b 100644 --- a/util/app-config/src/configs/rpc.rs +++ b/util/app-config/src/configs/rpc.rs @@ -39,6 +39,8 @@ pub struct Config { pub max_request_body_size: usize, /// Number of RPC worker threads. pub threads: Option, + /// Number of RPC batch limit. + pub rpc_batch_limit: Option, /// Enabled RPC modules. pub modules: Vec, /// Rejects txs with scripts that might trigger known bugs From 529ff7edf57c1841411959662d32e16f0871d61a Mon Sep 17 00:00:00 2001 From: yukang Date: Thu, 18 Jul 2024 09:11:46 +0800 Subject: [PATCH 2/6] add batch request limit test --- rpc/src/tests/mod.rs | 22 +++++++++++++--------- rpc/src/tests/module/test.rs | 26 ++++++++++++++++++++++++++ 2 files changed, 39 insertions(+), 9 deletions(-) diff --git a/rpc/src/tests/mod.rs b/rpc/src/tests/mod.rs index b59897bd22..45f996dd94 100644 --- a/rpc/src/tests/mod.rs +++ b/rpc/src/tests/mod.rs @@ -55,12 +55,6 @@ impl fmt::Display for RpcTestRequest { } } -impl RpcTestRequest { - fn json(&self) -> String { - serde_json::to_string(self).unwrap() - } -} - #[derive(Debug, Deserialize, Serialize, Clone, PartialEq, Default)] struct RpcTestResponse { pub id: usize, @@ -89,6 +83,18 @@ pub(crate) struct RpcTestSuite { impl RpcTestSuite { fn rpc(&self, request: &RpcTestRequest) -> RpcTestResponse { + self.send_request(request) + .json::() + .expect("Deserialize RpcTestRequest") + } + + fn rpc_batch(&self, request: &[RpcTestRequest]) -> Result, String> { + let res = self.send_request(request); + res.json::>() + .map_err(|res| format!("batch request failed : {:?}", res)) + } + + fn send_request(&self, request: &T) -> reqwest::blocking::Response { self.rpc_client .post(&self.rpc_uri) .json(&request) @@ -97,11 +103,9 @@ impl RpcTestSuite { panic!( "Failed to call RPC request: {:?}\n\nrequest = {:?}", e, - request.json(), + serde_json::to_string(request).unwrap(), ) }) - .json::() - .expect("Deserialize RpcTestRequest") } async fn tcp(&self, request: &RpcTestRequest) -> Result> { diff --git a/rpc/src/tests/module/test.rs b/rpc/src/tests/module/test.rs index 0c9ef54718..202139a349 100644 --- a/rpc/src/tests/module/test.rs +++ b/rpc/src/tests/module/test.rs @@ -105,6 +105,32 @@ fn test_rpc_tcp() { assert_eq!(res.unwrap().result, "0x1e0014000000"); } +#[test] +fn test_rpc_batch_request_limit() { + let suite = setup_rpc(); + let single_request = RpcTestRequest { + id: 42, + jsonrpc: "2.0".to_string(), + method: "generate_epochs".to_string(), + params: vec!["0x20000000000".into()], + }; + + let mut batch_request = vec![]; + for _i in 0..1001 { + batch_request.push(single_request.clone()); + } + + // exceed limit with 1001 + let res = suite.rpc_batch(&batch_request); + assert!(res.is_err()); + eprintln!("res: {:?}", res); + + // batch request will success with 1000 + batch_request.remove(0); + let res = suite.rpc_batch(&batch_request); + assert!(res.is_ok()); +} + // setup a chain for integration test rpc fn setup_rpc() -> RpcTestSuite { const INITIAL_PRIMARY_EPOCH_REWARD: Capacity = Capacity::shannons(1_917_808_21917808); From 9fff0365028e18b09fff324c84c73053483b5567 Mon Sep 17 00:00:00 2001 From: yukang Date: Thu, 18 Jul 2024 09:33:26 +0800 Subject: [PATCH 3/6] handle error better for rpc --- rpc/src/server.rs | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/rpc/src/server.rs b/rpc/src/server.rs index 7fa65bd4a5..f6cacce9fa 100644 --- a/rpc/src/server.rs +++ b/rpc/src/server.rs @@ -253,11 +253,15 @@ async fn handle_jsonrpc( }; if let Some(response) = result.await { - ( - [(axum::http::header::CONTENT_TYPE, "application/json")], - serde_json::to_string(&response).unwrap(), - ) - .into_response() + serde_json::to_string(&response) + .map(|json| { + ( + [(axum::http::header::CONTENT_TYPE, "application/json")], + json, + ) + .into_response() + }) + .unwrap_or_else(|_| StatusCode::INTERNAL_SERVER_ERROR.into_response()) } else { StatusCode::NO_CONTENT.into_response() } From a0fac83ff0f3571c6a4a5ac3e9eedd1e490c089e Mon Sep 17 00:00:00 2001 From: yukang Date: Fri, 19 Jul 2024 16:02:01 +0800 Subject: [PATCH 4/6] remove default batch request size limit --- rpc/src/server.rs | 7 +++---- test/template/ckb.toml | 2 +- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/rpc/src/server.rs b/rpc/src/server.rs index f6cacce9fa..5ec8726eaa 100644 --- a/rpc/src/server.rs +++ b/rpc/src/server.rs @@ -30,8 +30,6 @@ use tokio_util::codec::{FramedRead, FramedWrite, LinesCodec, LinesCodecError}; use tower_http::cors::CorsLayer; use tower_http::timeout::TimeoutLayer; -const DEFAULT_BATCH_LIMIT: usize = 2000; - static JSONRPC_BATCH_LIMIT: once_cell::sync::OnceCell = once_cell::sync::OnceCell::new(); #[doc(hidden)] @@ -51,8 +49,9 @@ impl RpcServer { /// * `io_handler` - RPC methods handler. See [ServiceBuilder](../service_builder/struct.ServiceBuilder.html). /// * `handler` - Tokio runtime handle. pub fn new(config: RpcConfig, io_handler: IoHandler, handler: Handle) -> Self { - let _ = JSONRPC_BATCH_LIMIT - .get_or_init(|| config.rpc_batch_limit.unwrap_or(DEFAULT_BATCH_LIMIT)); + if let Some(jsonrpc_batch_limit) = config.rpc_batch_limit { + let _ = JSONRPC_BATCH_LIMIT.get_or_init(|| jsonrpc_batch_limit); + } let rpc = Arc::new(io_handler); diff --git a/test/template/ckb.toml b/test/template/ckb.toml index 935ba92d18..4cf431ec87 100644 --- a/test/template/ckb.toml +++ b/test/template/ckb.toml @@ -79,7 +79,7 @@ enable_deprecated_rpc = true # threads number for RPC service, default value is the number of CPU cores # threads = 4 -# the largest rpc batch request size, default is 2000 +# the largest rpc batch request size, default is disabled # rpc_batch_limit = 2000 [tx_pool] From 1e2a596e75f6a4ca1a420513d82599cf44de883f Mon Sep 17 00:00:00 2001 From: yukang Date: Wed, 24 Jul 2024 14:46:13 +0800 Subject: [PATCH 5/6] use OnceLock instead --- Cargo.lock | 1 - rpc/Cargo.toml | 2 -- rpc/src/server.rs | 3 ++- 3 files changed, 2 insertions(+), 4 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index f18b8750b1..2cd0da914c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1453,7 +1453,6 @@ dependencies = [ "itertools 0.11.0", "jsonrpc-core", "jsonrpc-utils", - "once_cell", "pretty_assertions", "reqwest", "serde", diff --git a/rpc/Cargo.toml b/rpc/Cargo.toml index 80c7268126..6873ce99d9 100644 --- a/rpc/Cargo.toml +++ b/rpc/Cargo.toml @@ -52,8 +52,6 @@ async-stream = "0.3.3" ckb-async-runtime = { path = "../util/runtime", version = "= 0.117.0-pre" } # issue tracking: https://github.com/GREsau/schemars/pull/251 schemars = { version = "0.8.19", package = "ckb_schemars" } -once_cell = "1.19.0" - [dev-dependencies] reqwest = { version = "=0.11.20", features = ["blocking", "json"] } serde = { version = "1.0", features = ["derive"] } diff --git a/rpc/src/server.rs b/rpc/src/server.rs index 5ec8726eaa..4835cd6765 100644 --- a/rpc/src/server.rs +++ b/rpc/src/server.rs @@ -24,13 +24,14 @@ use jsonrpc_utils::pub_sub::Session; use jsonrpc_utils::stream::{serve_stream_sink, StreamMsg, StreamServerConfig}; use std::net::{SocketAddr, ToSocketAddrs}; use std::sync::Arc; +use std::sync::OnceLock; use std::time::Duration; use tokio::net::TcpListener; use tokio_util::codec::{FramedRead, FramedWrite, LinesCodec, LinesCodecError}; use tower_http::cors::CorsLayer; use tower_http::timeout::TimeoutLayer; -static JSONRPC_BATCH_LIMIT: once_cell::sync::OnceCell = once_cell::sync::OnceCell::new(); +static JSONRPC_BATCH_LIMIT: OnceLock = OnceLock::new(); #[doc(hidden)] #[derive(Debug)] From 9dd4ac234a519f3a7e6744e9c97b09c999ef7264 Mon Sep 17 00:00:00 2001 From: yukang Date: Thu, 25 Jul 2024 16:10:04 +0800 Subject: [PATCH 6/6] add notes --- resource/ckb.toml | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/resource/ckb.toml b/resource/ckb.toml index 20d96b9ac9..e47a37cff4 100644 --- a/resource/ckb.toml +++ b/resource/ckb.toml @@ -131,6 +131,11 @@ enable_deprecated_rpc = false # {{ # integration => enable_deprecated_rpc = true # }} +# By default, there is no limitation on the size of batch request size +# a huge batch request may cost a lot of memory or makes the RPC server slow, +# to avoid this, you may want to add a limit for the batch request size. +# rpc_batch_limit = 2000 + [tx_pool] max_tx_pool_size = 180_000_000 # 180mb min_fee_rate = 1_000 # Here fee_rate are calculated directly using size in units of shannons/KB