From 4a3c8d8d60f80984f295cc79b68fd3ba76046d86 Mon Sep 17 00:00:00 2001 From: htiennv Date: Mon, 24 Jun 2024 14:53:51 +0700 Subject: [PATCH] feat(rpc): remove ipc future and now using ServerHandle and StopHandle from jsonrpsee --- crates/rpc/ipc/src/server/future.rs | 60 ----------------------------- crates/rpc/ipc/src/server/mod.rs | 48 ++++------------------- crates/rpc/rpc-builder/src/auth.rs | 6 +-- crates/rpc/rpc-builder/src/lib.rs | 2 +- 4 files changed, 11 insertions(+), 105 deletions(-) delete mode 100644 crates/rpc/ipc/src/server/future.rs diff --git a/crates/rpc/ipc/src/server/future.rs b/crates/rpc/ipc/src/server/future.rs deleted file mode 100644 index d6aa675c98f9..000000000000 --- a/crates/rpc/ipc/src/server/future.rs +++ /dev/null @@ -1,60 +0,0 @@ -// Copyright 2019-2021 Parity Technologies (UK) Ltd. -// -// Permission is hereby granted, free of charge, to any -// person obtaining a copy of this software and associated -// documentation files (the "Software"), to deal in the -// Software without restriction, including without -// limitation the rights to use, copy, modify, merge, -// publish, distribute, sublicense, and/or sell copies of -// the Software, and to permit persons to whom the Software -// is furnished to do so, subject to the following -// conditions: -// -// The above copyright notice and this permission notice -// shall be included in all copies or substantial portions -// of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF -// ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED -// TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A -// PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT -// SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY -// CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION -// OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR -// IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER -// DEALINGS IN THE SOFTWARE. - -//! Utilities for handling async code. - -use std::sync::Arc; -use tokio::sync::watch; - -#[derive(Debug, Clone)] -pub(crate) struct StopHandle(watch::Receiver<()>); - -impl StopHandle { - pub(crate) const fn new(rx: watch::Receiver<()>) -> Self { - Self(rx) - } - - pub(crate) async fn shutdown(mut self) { - // Err(_) implies that the `sender` has been dropped. - // Ok(_) implies that `stop` has been called. - let _ = self.0.changed().await; - } -} - -/// Server handle. -/// -/// When all [`StopHandle`]'s have been `dropped` or `stop` has been called -/// the server will be stopped. -#[derive(Debug, Clone)] -pub(crate) struct ServerHandle(Arc>); - -impl ServerHandle { - /// Wait for the server to stop. - #[allow(dead_code)] - pub(crate) async fn stopped(self) { - self.0.closed().await - } -} diff --git a/crates/rpc/ipc/src/server/mod.rs b/crates/rpc/ipc/src/server/mod.rs index c38b1629e667..6dff8a8afae0 100644 --- a/crates/rpc/ipc/src/server/mod.rs +++ b/crates/rpc/ipc/src/server/mod.rs @@ -1,9 +1,6 @@ //! JSON-RPC IPC server implementation -use crate::server::{ - connection::{IpcConn, JsonRpcStream}, - future::StopHandle, -}; +use crate::server::connection::{IpcConn, JsonRpcStream}; use futures::StreamExt; use futures_util::future::Either; use interprocess::local_socket::{ @@ -15,8 +12,8 @@ use jsonrpsee::{ core::TEN_MB_SIZE_BYTES, server::{ middleware::rpc::{RpcLoggerLayer, RpcServiceT}, - AlreadyStoppedError, ConnectionGuard, ConnectionPermit, IdProvider, - RandomIntegerIdProvider, + stop_channel, ConnectionGuard, ConnectionPermit, IdProvider, RandomIntegerIdProvider, + ServerHandle, StopHandle, }, BoundedSubscriptions, MethodSink, Methods, }; @@ -29,7 +26,7 @@ use std::{ }; use tokio::{ io::{AsyncRead, AsyncWrite, AsyncWriteExt}, - sync::{oneshot, watch}, + sync::oneshot, }; use tower::{layer::util::Identity, Layer, Service}; use tracing::{debug, instrument, trace, warn, Instrument}; @@ -46,7 +43,6 @@ use tokio_stream::wrappers::ReceiverStream; use tower::layer::{util::Stack, LayerFn}; mod connection; -mod future; mod ipc; mod rpc_service; @@ -109,9 +105,8 @@ where methods: impl Into, ) -> Result { let methods = methods.into(); - let (stop_tx, stop_rx) = watch::channel(()); - let stop_handle = StopHandle::new(stop_rx); + let (stop_handle, server_handle) = stop_channel(); // use a signal channel to wait until we're ready to accept connections let (tx, rx) = oneshot::channel(); @@ -122,7 +117,7 @@ where }; rx.await.expect("channel is open")?; - Ok(ServerHandle::new(stop_tx)) + Ok(server_handle) } async fn start_inner( @@ -795,35 +790,6 @@ impl Builder { } } -/// Server handle. -/// -/// When all [`jsonrpsee::server::StopHandle`]'s have been `dropped` or `stop` has been called -/// the server will be stopped. -#[derive(Debug, Clone)] -pub struct ServerHandle(Arc>); - -impl ServerHandle { - /// Create a new server handle. - pub(crate) fn new(tx: watch::Sender<()>) -> Self { - Self(Arc::new(tx)) - } - - /// Tell the server to stop without waiting for the server to stop. - pub fn stop(&self) -> Result<(), AlreadyStoppedError> { - self.0.send(()).map_err(|_| AlreadyStoppedError) - } - - /// Wait for the server to stop. - pub async fn stopped(self) { - self.0.closed().await - } - - /// Check if the server has been stopped. - pub fn is_stopped(&self) -> bool { - self.0.is_closed() - } -} - #[cfg(test)] pub fn dummy_name() -> String { let num: u64 = rand::Rng::gen(&mut rand::thread_rng()); @@ -877,7 +843,7 @@ mod tests { // and you might want to do something smarter if it's // critical that "the most recent item" must be sent when it is produced. if sink.send(notif).await.is_err() { - break Ok(()) + break Ok(()); } closed = c; diff --git a/crates/rpc/rpc-builder/src/auth.rs b/crates/rpc/rpc-builder/src/auth.rs index 3f016c330e78..1e8ef8f56a18 100644 --- a/crates/rpc/rpc-builder/src/auth.rs +++ b/crates/rpc/rpc-builder/src/auth.rs @@ -68,7 +68,7 @@ impl AuthServerConfig { .map_err(|err| RpcError::server_error(err, ServerKind::Auth(socket_addr)))?; let handle = server.start(module.inner.clone()); - let mut ipc_handle: Option = None; + let mut ipc_handle: Option = None; if let Some(ipc_server_config) = ipc_server_config { let ipc_endpoint_str = ipc_endpoint @@ -241,7 +241,7 @@ pub struct AuthServerHandle { handle: jsonrpsee::server::ServerHandle, secret: JwtSecret, ipc_endpoint: Option, - ipc_handle: Option, + ipc_handle: Option, } // === impl AuthServerHandle === @@ -310,7 +310,7 @@ impl AuthServerHandle { } /// Returns an ipc handle - pub fn ipc_handle(&self) -> Option { + pub fn ipc_handle(&self) -> Option { self.ipc_handle.clone() } diff --git a/crates/rpc/rpc-builder/src/lib.rs b/crates/rpc/rpc-builder/src/lib.rs index 93d4e6b264ee..06415ba8dc8d 100644 --- a/crates/rpc/rpc-builder/src/lib.rs +++ b/crates/rpc/rpc-builder/src/lib.rs @@ -1879,7 +1879,7 @@ pub struct RpcServerHandle { http: Option, ws: Option, ipc_endpoint: Option, - ipc: Option, + ipc: Option, jwt_secret: Option, }