diff --git a/console-subscriber/src/lib.rs b/console-subscriber/src/lib.rs index 6b0c1a75e..7b6daecf2 100644 --- a/console-subscriber/src/lib.rs +++ b/console-subscriber/src/lib.rs @@ -1,6 +1,6 @@ #![doc = include_str!("../README.md")] use console_api as proto; -use proto::resources::resource; +use proto::{instrument::instrument_server::InstrumentServer, resources::resource}; use serde::Serialize; use std::{ cell::RefCell, @@ -15,7 +15,10 @@ use std::{ use thread_local::ThreadLocal; #[cfg(unix)] use tokio::net::UnixListener; -use tokio::sync::{mpsc, oneshot}; +use tokio::{ + sync::{mpsc, oneshot}, + task::JoinHandle, +}; #[cfg(unix)] use tokio_stream::wrappers::UnixListenerStream; use tracing_core::{ @@ -933,18 +936,15 @@ impl Server { /// /// [`tonic`]: https://docs.rs/tonic/ pub async fn serve_with( - mut self, + self, mut builder: tonic::transport::Server, ) -> Result<(), Box> { - let aggregate = self - .aggregator - .take() - .expect("cannot start server multiple times"); - let aggregate = spawn_named(aggregate.run(), "console::aggregate"); let addr = self.addr.clone(); - let router = builder.add_service( - proto::instrument::instrument_server::InstrumentServer::new(self), - ); + let ServerParts { + instrument_server: service, + aggregator_handle: aggregate, + } = self.into_parts(); + let router = builder.add_service(service); let res = match addr { ServerAddr::Tcp(addr) => { let serve = router.serve(addr); @@ -957,9 +957,110 @@ impl Server { spawn_named(serve, "console::serve").await } }; - aggregate.abort(); + drop(aggregate); res?.map_err(Into::into) } + + /// Returns the parts needed to spawn a gRPC server and keep the aggregation + /// worker running. + /// + /// Note that a server spawned in this way will overwrite any value set by + /// [`Builder::server_addr`] as the user becomes responsible for defining + /// the address when calling [`Router::serve`]. + /// + /// # Examples + /// + /// The parts can be used to serve the instrument server together with + /// other endpoints from the same gRPC server. + /// + /// ``` + /// use console_subscriber::{ConsoleLayer, ServerParts}; + /// + /// # let runtime = tokio::runtime::Builder::new_current_thread() + /// # .enable_all() + /// # .build() + /// # .unwrap(); + /// # runtime.block_on(async { + /// let (console_layer, server) = ConsoleLayer::builder().build(); + /// let ServerParts { + /// instrument_server, + /// aggregator_handle, + /// .. + /// } = server.into_parts(); + /// + /// let router = tonic::transport::Server::builder() + /// //.add_service(some_other_service) + /// .add_service(instrument_server); + /// let serve = router.serve(std::net::SocketAddr::new( + /// std::net::IpAddr::V4(std::net::Ipv4Addr::new(127, 0, 0, 1)), + /// 6669, + /// )); + /// + /// // Finally, spawn the server. + /// tokio::spawn(serve); + /// # // Avoid a warning that `console_layer` and `aggregator_handle` are unused. + /// # drop(console_layer); + /// # drop(aggregator_handle); + /// # }); + /// ``` + /// + /// [`Router::serve`]: fn@tonic::transport::server::Router::serve + pub fn into_parts(mut self) -> ServerParts { + let aggregate = self + .aggregator + .take() + .expect("cannot start server multiple times"); + let aggregate = spawn_named(aggregate.run(), "console::aggregate"); + + let service = proto::instrument::instrument_server::InstrumentServer::new(self); + + ServerParts { + instrument_server: service, + aggregator_handle: AggregatorHandle { + join_handle: aggregate, + }, + } + } +} + +/// Server Parts +/// +/// This struct contains the parts returned by [`Server::into_parts`]. It may contain +/// further parts in the future, an as such is marked as `non_exhaustive`. +/// +/// The `InstrumentServer` can be used to construct a router which +/// can be added to a [`tonic`] gRPC server. +/// +/// The [`AggregatorHandle`] must be kept until after the server has been +/// shut down. +/// +/// See the [`Server::into_parts`] documentation for usage. +#[non_exhaustive] +pub struct ServerParts { + /// The instrument server. + /// + /// See the documentation for [`InstrumentServer`] for details. + pub instrument_server: InstrumentServer, + + /// The aggregate handle. + /// + /// See the documentation for [`AggregatorHandle`] for details. + pub aggregator_handle: AggregatorHandle, +} + +/// Aggregator handle. +/// +/// This object is returned from [`Server::into_parts`] and must be +/// kept as long as the `InstrumentServer` - which is also +/// returned - is in use. +pub struct AggregatorHandle { + join_handle: JoinHandle<()>, +} + +impl Drop for AggregatorHandle { + fn drop(&mut self) { + self.join_handle.abort(); + } } #[tonic::async_trait]