diff --git a/Cargo.lock b/Cargo.lock index 59e5d37..b186e32 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -237,6 +237,28 @@ dependencies = [ "tower-service", ] +[[package]] +name = "axum-insights" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9d4671ab9126b6d109c06fe9698485d4602c718e19d7c647e32347c1a7112943" +dependencies = [ + "axum", + "futures", + "http", + "hyper", + "opentelemetry", + "opentelemetry-application-insights", + "reqwest", + "serde", + "serde_json", + "tokio", + "tower", + "tracing", + "tracing-opentelemetry", + "tracing-subscriber", +] + [[package]] name = "axum-macros" version = "0.3.8" @@ -408,7 +430,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ff41a3c2c1e39921b9003de14bf0439c7b63a9039637c291e1a64925d8ddfa45" dependencies = [ "owning_ref", - "parking_lot", + "parking_lot 0.4.8", ] [[package]] @@ -1653,7 +1675,17 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "149d8f5b97f3c1133e3cfcd8886449959e856b557ff281e292b733d7c69e005e" dependencies = [ "owning_ref", - "parking_lot_core", + "parking_lot_core 0.2.14", +] + +[[package]] +name = "parking_lot" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3742b2c103b9f06bc9fff0a37ff4912935851bee6d36f3c02bcc755bcfec228f" +dependencies = [ + "lock_api", + "parking_lot_core 0.9.8", ] [[package]] @@ -1668,6 +1700,19 @@ dependencies = [ "winapi", ] +[[package]] +name = "parking_lot_core" +version = "0.9.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "93f00c865fe7cabf650081affecd3871070f26767e7b2070a3ffae14c654b447" +dependencies = [ + "cfg-if 1.0.0", + "libc", + "redox_syscall 0.3.5", + "smallvec 1.11.0", + "windows-targets", +] + [[package]] name = "parse-zoneinfo" version = "0.3.0" @@ -2109,6 +2154,7 @@ dependencies = [ "anyhow", "async-trait", "axum", + "axum-insights", "bincode", "chashmap", "chrono", @@ -2123,8 +2169,6 @@ dependencies = [ "hyper", "js-sys", "log", - "opentelemetry", - "opentelemetry-application-insights", "pretty_assertions", "rand 0.8.5", "rayon", @@ -2139,8 +2183,6 @@ dependencies = [ "tower", "tower-http", "tracing", - "tracing-opentelemetry", - "tracing-subscriber", "utoipa", "utoipa-rapidoc", "utoipa-redoc", @@ -2679,6 +2721,7 @@ dependencies = [ "libc", "mio", "num_cpus", + "parking_lot 0.12.1", "pin-project-lite", "signal-hook-registry", "socket2", @@ -2851,12 +2894,14 @@ dependencies = [ [[package]] name = "tracing-opentelemetry" -version = "0.20.0" +version = "0.21.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fc09e402904a5261e42cf27aea09ccb7d5318c6717a9eec3d8e2e65c56b18f19" +checksum = "75327c6b667828ddc28f5e3f169036cb793c3f588d83bf0f262a7f062ffed3c8" dependencies = [ "once_cell", "opentelemetry", + "opentelemetry_sdk", + "smallvec 1.11.0", "tracing", "tracing-core", "tracing-log", diff --git a/rtz/Cargo.toml b/rtz/Cargo.toml index 6a6564c..ae16482 100644 --- a/rtz/Cargo.toml +++ b/rtz/Cargo.toml @@ -39,7 +39,7 @@ tz-osm = ["rtz-core/tz-osm", "rtz-build/tz-osm", "chrono-tz", "chrono"] admin-osm = ["rtz-core/admin-osm", "rtz-build/admin-osm"] cli = ["clap"] -web = ["full", "tokio", "config", "log", "simple_logger", "schemars", "chrono", "axum", "hyper", "tower", "tower-http", "utoipa", "utoipa-swagger-ui", "utoipa-redoc", "utoipa-rapidoc", "tracing", "opentelemetry", "opentelemetry-application-insights", "tracing-opentelemetry", "tracing-subscriber"] +web = ["full", "tokio", "config", "log", "simple_logger", "schemars", "chrono", "axum", "hyper", "tower", "tower-http", "utoipa", "utoipa-swagger-ui", "utoipa-redoc", "utoipa-rapidoc", "axum-insights", "tracing"] wasm = ["wasm-bindgen", "wasm-bindgen-futures", "wee_alloc", "js-sys"] [dependencies] @@ -68,7 +68,7 @@ chrono-tz ={ version = "0.8.3", features = ["serde"], optional = true } chrono = { version = "0.4.19", features = ["serde"], optional = true } # web -tokio = { version = "1.29.1", features = ["rt", "macros", "signal"], optional = true } +tokio = { version = "1.29.1", features = ["rt", "macros", "signal", "parking_lot"], optional = true } config = { version = "0.13.3", optional = true } log = { version = "0.4.8", features = ["release_max_level_info"], optional = true } simple_logger = { version = "4.2.0", optional = true } @@ -83,11 +83,8 @@ utoipa-redoc = { version = "0.1.0", features = ["axum"], optional = true } utoipa-rapidoc = { version = "0.1.0", features = ["axum"], optional = true } # telemetry +axum-insights = { version = "0.2.0", optional = true } tracing = { version = "0.1.37", optional = true } -opentelemetry = { version = "0.20", features = ["rt-tokio"], optional = true } -opentelemetry-application-insights = { version = "0.27.0", features = ["reqwest-client"], optional = true } -tracing-opentelemetry = { version = "0.20.0", optional = true} -tracing-subscriber = { version = "0.3.17", optional = true } # wasm wasm-bindgen = { version = "0.2.84", optional = true } diff --git a/rtz/src/web/mod.rs b/rtz/src/web/mod.rs index da5bc19..52bde5f 100644 --- a/rtz/src/web/mod.rs +++ b/rtz/src/web/mod.rs @@ -6,7 +6,6 @@ pub(crate) mod config; pub(crate) mod response_types; pub(crate) mod server; pub(crate) mod types; -pub(crate) mod telemetry; pub(crate) mod utilities; // Imports. @@ -52,13 +51,13 @@ pub fn server_start(config_path: String, bind_address: Option, port: Opt mod tests { use super::*; use axum::Router; - use hyper::{Request, Body, StatusCode}; + use hyper::{Body, Request, StatusCode}; use pretty_assertions::assert_eq; - use tower::{ServiceExt, Service}; + use tower::{Service, ServiceExt}; fn get_client() -> Router { let config = Config::new("", None, None, Some(false)).unwrap(); - + server::create_axum_app(&config) } @@ -148,14 +147,14 @@ mod bench { use super::*; use axum::Router; - use hyper::{Request, Body, StatusCode}; + use hyper::{Body, Request, StatusCode}; use pretty_assertions::assert_eq; use test::Bencher; - use tower::{ServiceExt, Service}; + use tower::{Service, ServiceExt}; fn get_client() -> Router { let config = Config::new("", None, None, Some(false)).unwrap(); - + server::create_axum_app(&config) } @@ -193,7 +192,7 @@ mod bench { futures::executor::block_on(async { let request = Request::get(format!("/api/ned/tz/{}/{}", x, y)).body(Body::empty()).unwrap(); let response = client.call(request).await.unwrap(); - + assert_eq!(response.status(), StatusCode::OK); }); }); @@ -213,7 +212,7 @@ mod bench { for y in ys.clone() { let request = Request::get(format!("/api/osm/tz/{}/{}", x, y)).body(Body::empty()).unwrap(); let response = client.call(request).await.unwrap(); - + assert_eq!(response.status(), StatusCode::OK); } } @@ -233,7 +232,7 @@ mod bench { futures::executor::block_on(async { let request = Request::get(format!("/api/osm/tz/{}/{}", x, y)).body(Body::empty()).unwrap(); let response = client.call(request).await.unwrap(); - + assert_eq!(response.status(), StatusCode::OK); }); }); @@ -253,7 +252,7 @@ mod bench { for y in ys.clone() { let request = Request::get(format!("/api/osm/admin/{}/{}", x, y)).body(Body::empty()).unwrap(); let response = client.call(request).await.unwrap(); - + assert_eq!(response.status(), StatusCode::OK); } } @@ -274,7 +273,7 @@ mod bench { futures::executor::block_on(async { let request = Request::get(format!("/api/osm/admin/{}/{}", x, y)).body(Body::empty()).unwrap(); let response = client.call(request).await.unwrap(); - + assert_eq!(response.status(), StatusCode::OK); }); }); diff --git a/rtz/src/web/response_types.rs b/rtz/src/web/response_types.rs index 23e8eec..50c8542 100644 --- a/rtz/src/web/response_types.rs +++ b/rtz/src/web/response_types.rs @@ -1,6 +1,9 @@ //! Them for supporting the response types of the APIs, and their versions. -use axum::{response::{IntoResponse, Response}, http::HeaderValue}; +use axum::{ + http::HeaderValue, + response::{IntoResponse, Response}, +}; use hyper::StatusCode; use serde::Serialize; diff --git a/rtz/src/web/server.rs b/rtz/src/web/server.rs index dc4ce73..db2f366 100644 --- a/rtz/src/web/server.rs +++ b/rtz/src/web/server.rs @@ -1,6 +1,11 @@ -use std::sync::Arc; +use std::{ + collections::HashMap, + sync::{Arc, OnceLock}, +}; -use axum::{extract::Path, routing::get, Json, Router, middleware::from_fn}; +use axum::{extract::Path, routing::get, Json, Router}; +use axum_insights::AppInsights; +use hyper::Method; use rtz_core::{ base::types::{Float, Void}, geo::{ @@ -8,7 +13,8 @@ use rtz_core::{ tz::{ned::NedTimezone, osm::OsmTimezone}, }, }; -use tracing::{instrument, Instrument}; +use tower_http::cors::{Any, CorsLayer}; +use tracing::instrument; use utoipa::OpenApi; use utoipa_rapidoc::RapiDoc; use utoipa_redoc::{Redoc, Servable}; @@ -22,19 +28,22 @@ use crate::{ use super::{ config::Config, response_types::LookupResponse, - types::{get_last_modified_time, AppState, IfModifiedSince, WebResult}, utilities::shutdown_signal, telemetry::{telemetry_fn, self}, + types::{get_last_modified_time, AppState, IfModifiedSince, WebResult, WebError}, + utilities::shutdown_signal, }; +// Statics. + +static FLY_REGION: OnceLock = OnceLock::new(); +static FLY_ALLOC_ID: OnceLock = OnceLock::new(); +static FLY_PUBLIC_IP: OnceLock = OnceLock::new(); + /// Starts the web server. pub async fn start(config: &Config) -> Void { let app = create_axum_app(config); - telemetry::init(config.analytics_api_key.as_deref()); - let bind_address = format!("{}:{}", config.bind_address, config.port); - axum::Server::bind(&bind_address - .parse() - .unwrap()) + axum::Server::bind(&bind_address.parse().unwrap()) .serve(app.into_make_service()) .with_graceful_shutdown(shutdown_signal()) .await @@ -46,7 +55,41 @@ pub async fn start(config: &Config) -> Void { pub fn create_axum_app(config: &Config) -> Router { let state = AppState { config: Arc::new(config.clone()) }; - let telemetry_layer = from_fn(telemetry_fn); + let cors_layer = CorsLayer::new().allow_methods([Method::GET]).allow_origin(Any); + + let name = std::env::var("FLY_REGION").unwrap_or_else(|_| "server".to_string()); + let _ = FLY_REGION.set(name.clone()); + let _ = FLY_ALLOC_ID.set(std::env::var("FLY_ALLOC_ID").unwrap_or_else(|_| "unknown".to_string())); + let _ = FLY_PUBLIC_IP.set(std::env::var("FLY_PUBLIC_IP").unwrap_or_else(|_| "unknown".to_string())); + + let telemetry_layer = AppInsights::default() + .with_connection_string(config.analytics_api_key.clone()) + .with_service_config("rtz", name) + .with_catch_panic(true) + .with_field_mapper(|p| { + let fly_alloc_id = FLY_ALLOC_ID.get().unwrap().to_owned(); + let fly_public_ip = FLY_PUBLIC_IP.get().unwrap().to_owned(); + let fly_region = FLY_REGION.get().unwrap().to_owned(); + let fly_accept_region = p.headers.get("Fly-Region").map(|v| v.to_str().unwrap_or("unknown").to_owned()).unwrap_or("unknown".to_owned()); + + HashMap::from([ + ("fly.alloc_id".to_string(), fly_alloc_id), + ("fly.public_ip".to_string(), fly_public_ip), + ("fly.server_region".to_string(), fly_region), + ("fly.accept_region".to_string(), fly_accept_region), + ]) + }) + .with_panic_mapper(|e| { + (500, WebError { + status: 500, + message: format!("A panic occurred: {:?}", e), + backtrace: None, + }) + }) + .with_error_type::() + .build_and_set_global_default() + .unwrap() + .layer(); let api_router = Router::new() .route("/ned/tz/:lng/:lat", get(timezone_ned)) @@ -61,6 +104,7 @@ pub fn create_axum_app(config: &Config) -> Router { .merge(Redoc::with_url("/redoc", ApiDoc::openapi())) .merge(RapiDoc::new("/api-docs/openapi.json").path("/rapidoc")) .nest("/api", api_router) + .layer(cors_layer) .layer(telemetry_layer) .with_state(state); diff --git a/rtz/src/web/telemetry.rs b/rtz/src/web/telemetry.rs deleted file mode 100644 index b0043a0..0000000 --- a/rtz/src/web/telemetry.rs +++ /dev/null @@ -1,125 +0,0 @@ -use std::panic; - -use axum::{middleware::Next, response::Response, RequestPartsExt, extract::MatchedPath}; -use hyper::{Request, body::{Bytes, HttpBody}, Body}; -use opentelemetry::{sdk::{trace::Config, self}, KeyValue}; -use reqwest::Client; -use tracing::{Span, Instrument, Level, level_filters}; -use tracing_subscriber::{Registry, prelude::__tracing_subscriber_SubscriberExt}; - -use super::types::WebError; - -// Methods. - -/// Initializes the application insights [`ConcreteTracer`]. -fn init_tracer(key: &str) { - let config = Config::default().with_resource(sdk::Resource::new(vec![KeyValue::new("service.namespace", "rtz"), KeyValue::new("service.name", "server")])); - - let tracer = opentelemetry_application_insights::new_pipeline_from_connection_string(key) - .unwrap() - .with_client(Client::new()) - .with_trace_config(config) - .install_batch(opentelemetry::runtime::Tokio); - - let telemetry = tracing_opentelemetry::layer().with_tracer(tracer); - let subscriber = Registry::default().with(telemetry).with(level_filters::LevelFilter::INFO); - tracing::subscriber::set_global_default(subscriber).unwrap(); -} - -/// Initializes the global telemetry tracer with the given application insights key. -pub fn init(key: Option<&str>) { - let Some(key) = key else { - return; - }; - - init_tracer(key); - - let default_panic = panic::take_hook(); - - panic::set_hook(Box::new(move |p| { - let payload_string = format!("{:?}", p.payload().downcast_ref::<&str>()); - let location_string = p.location().map(|l| format!("{}", l)).unwrap_or_else(|| "unknown".to_owned()); - - // This doesn't work because this macro prescribes the name without allowing it to be overriden. - tracing::event!(Level::ERROR, event.name = "exception", "exception.type" = "PANIC", exception.message = payload_string, exception.stacktrace = location_string); - - default_panic(p); - })); -} - -/// The axum layer for request telemetry. -pub async fn telemetry_fn( - request: Request, - next: Next, -) -> Response { - let method = request.method().to_string(); - let uri = request.uri().to_string(); - let client_ip = request - .headers() - .get("x-forwarded-for") - .and_then(|v| v.to_str().ok()) - .unwrap_or("unknown") - .to_string(); - let client_ip = client_ip - .split(',') - .next() - .unwrap_or("unknown"); - - let (mut parts, body) = request.into_parts(); - let route = parts.extract::().await.map(|m| m.as_str().to_owned()).unwrap_or_else(|_| "unknown".to_owned()); - let request = Request::from_parts(parts, body); - - let span = tracing::info_span!( - "request", - otel.kind = "server", - http.method = method.as_str(), - http.url = uri.as_str(), - http.client_ip = client_ip, - http.route = route.as_str(), - otel.status_code = tracing::field::Empty, - otel.status_description = tracing::field::Empty, - http.response.status_code = tracing::field::Empty, - ); - - async move { - ///////////////////////////////// - let response = next.run(request).await; - ///////////////////////////////// - - let status = response.status(); - let (response, otel_status, otel_status_description) = if status.is_success() { - (response, "OK", format!(r#"{{ "status": {} }}"#, status.as_u16())) - } else { - // Breakup the response into parts. - let (parts, body) = response.into_parts(); - - // Get the body bytes. - let body_bytes = hyper::body::to_bytes(body).await.unwrap_or(Bytes::new()); - - // Deserialize the error. - let error: WebError = serde_json::from_slice(&body_bytes).unwrap_or_else(|_| WebError { - status: status.as_u16(), - message: "UNKNOWN".to_string(), - backtrace: None, - }); - - // Get the stringified error. - let error_string = serde_json::to_string_pretty(&error).unwrap(); - - // Recreate the body. - let body = Body::from(body_bytes).boxed_unsync().map_err(axum::Error::new).boxed_unsync(); - - let response = Response::from_parts(parts, body); - - (response, "ERROR", error_string) - }; - - let span = Span::current().entered(); - - span.record("otel.status_code", otel_status); - span.record("otel.status_description", otel_status_description); - span.record("http.response.status_code", status.as_u16()); - - response - }.instrument(span).await -} \ No newline at end of file diff --git a/rtz/src/web/types.rs b/rtz/src/web/types.rs index c0c3c20..d8427d2 100644 --- a/rtz/src/web/types.rs +++ b/rtz/src/web/types.rs @@ -9,6 +9,7 @@ use axum::{ http::{request::Parts, HeaderValue}, response::{IntoResponse, Response}, }; +use axum_insights::AppInsightsError; use chrono::{DateTime, Utc}; use hyper::StatusCode; use rtz_core::base::types::Err; @@ -86,6 +87,26 @@ pub struct WebError { pub backtrace: Option, } +impl Default for WebError { + fn default() -> Self { + WebError { + status: 0, + message: "An unknown error occurred.".to_string(), + backtrace: None, + } + } +} + +impl AppInsightsError for WebError { + fn message(&self) -> Option { + Some(self.message.clone()) + } + + fn backtrace(&self) -> Option { + self.backtrace.clone() + } +} + impl std::error::Error for WebError {} impl Display for WebError { diff --git a/rtz/src/web/utilities.rs b/rtz/src/web/utilities.rs index 0a14f19..98b3333 100644 --- a/rtz/src/web/utilities.rs +++ b/rtz/src/web/utilities.rs @@ -1,10 +1,6 @@ - - pub async fn shutdown_signal() { let ctrl_c = async { - tokio::signal::ctrl_c() - .await - .expect("failed to install Ctrl+C handler"); + tokio::signal::ctrl_c().await.expect("failed to install Ctrl+C handler"); }; #[cfg(unix)] @@ -22,8 +18,8 @@ pub async fn shutdown_signal() { _ = ctrl_c => {}, _ = terminate => {}, } - + // Graceful shutdown logic goes here. - opentelemetry::global::shutdown_tracer_provider(); -} \ No newline at end of file + axum_insights::exports::opentelemetry::global::shutdown_tracer_provider(); +}