diff --git a/core-api/src/telemetry.rs b/core-api/src/telemetry.rs index af72e97ca..7d9a5b2e4 100644 --- a/core-api/src/telemetry.rs +++ b/core-api/src/telemetry.rs @@ -54,6 +54,7 @@ pub struct OtelCollectorOptions { /// export to this same collector. pub url: Url, /// Optional set of HTTP headers to send to the Collector, e.g for authentication. + #[builder(default = "HashMap::new()")] pub headers: HashMap, /// Optionally specify how frequently metrics should be exported. Defaults to 1 second. #[builder(default = "Duration::from_secs(1)")] diff --git a/core/Cargo.toml b/core/Cargo.toml index 256753d4b..b85a16b17 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -44,7 +44,7 @@ mockall = "0.13" once_cell = { workspace = true } opentelemetry = { workspace = true, features = ["metrics"], optional = true } opentelemetry_sdk = { version = "0.24", features = ["rt-tokio", "metrics"], optional = true } -opentelemetry-otlp = { version = "0.17", features = ["tokio", "metrics"], optional = true } +opentelemetry-otlp = { version = "0.17", features = ["tokio", "metrics", "tls"], optional = true } opentelemetry-prometheus = { version = "0.17", optional = true } parking_lot = { version = "0.12", features = ["send_guard"] } pid = "4.0" diff --git a/core/src/telemetry/mod.rs b/core/src/telemetry/mod.rs index 669e0ed1e..e6886618c 100644 --- a/core/src/telemetry/mod.rs +++ b/core/src/telemetry/mod.rs @@ -160,15 +160,7 @@ impl CoreTelemetry for TelemetryInstance { /// /// See [TelemetryOptions] docs for more on configuration. pub fn telemetry_init(opts: TelemetryOptions) -> Result { - // This is a bit odd, but functional. It's desirable to create a separate tokio runtime for - // metrics handling, since tests typically use a single-threaded runtime and initializing - // pipeline requires us to know if the runtime is single or multithreaded, we will crash - // in one case or the other. There does not seem to be a way to tell from the current runtime - // handle if it is single or multithreaded. Additionally, we can isolate metrics work this - // way which is nice. - // Parts of telem dat ==== let mut logs_out = None; - // ======================= // Tracing subscriber layers ========= let mut console_pretty_layer = None; diff --git a/core/src/telemetry/otel.rs b/core/src/telemetry/otel.rs index f50f9affe..486866d1f 100644 --- a/core/src/telemetry/otel.rs +++ b/core/src/telemetry/otel.rs @@ -35,7 +35,7 @@ use temporal_sdk_core_api::telemetry::{ MetricTemporality, OtelCollectorOptions, PrometheusExporterOptions, }; use tokio::task::AbortHandle; -use tonic::metadata::MetadataMap; +use tonic::{metadata::MetadataMap, transport::ClientTlsConfig}; /// Chooses appropriate aggregators for our metrics #[derive(Debug, Clone)] @@ -160,8 +160,12 @@ impl MemoryGauge { pub fn build_otlp_metric_exporter( opts: OtelCollectorOptions, ) -> Result { - let exporter = opentelemetry_otlp::TonicExporterBuilder::default() - .with_endpoint(opts.url.to_string()) + let mut exporter = + opentelemetry_otlp::TonicExporterBuilder::default().with_endpoint(opts.url.to_string()); + if opts.url.scheme() == "https" || opts.url.scheme() == "grpcs" { + exporter = exporter.with_tls_config(ClientTlsConfig::new().with_native_roots()); + } + let exporter = exporter .with_metadata(MetadataMap::from_headers((&opts.headers).try_into()?)) .build_metrics_exporter( Box::new(SDKAggSelector::new(opts.use_seconds_for_durations)), diff --git a/docker/docker-compose-telem.yaml b/docker/docker-compose-telem.yaml index 59da5ca9a..0667bf811 100644 --- a/docker/docker-compose-telem.yaml +++ b/docker/docker-compose-telem.yaml @@ -10,9 +10,9 @@ services: otel-collector: image: otel/opentelemetry-collector:latest - command: ['--config=/etc/otel-collector-config.yaml'] + command: [ '--config=/etc/otel-collector-config.yaml' ] volumes: - - ../../etc/otel-collector-config.yaml:/etc/otel-collector-config.yaml + - ../etc/otel-collector-config.yaml:/etc/otel-collector-config.yaml ports: # - "1888:1888" # pprof extension # It's useful to be able to manually inspect metrics during dev @@ -20,7 +20,7 @@ services: - '8889:8889' # Prometheus exporter metrics # - "13133:13133" # health_check extension - '4317:4317' # OTLP gRPC receiver - # - "55670:55679" # zpages extension + # - "55679:55679" # zpages extension depends_on: - jaeger @@ -28,6 +28,6 @@ services: container_name: prometheus image: prom/prometheus:latest volumes: - - ../../etc/prometheus.yaml:/etc/prometheus/prometheus.yml + - ../etc/prometheus.yaml:/etc/prometheus/prometheus.yml ports: - '9090:9090' diff --git a/etc/otel-collector-config.yaml b/etc/otel-collector-config.yaml index 6b4b151e1..644536eb3 100644 --- a/etc/otel-collector-config.yaml +++ b/etc/otel-collector-config.yaml @@ -2,16 +2,19 @@ receivers: otlp: protocols: grpc: + endpoint: 0.0.0.0:4317 + http: + endpoint: 0.0.0.0:4318 exporters: prometheus: endpoint: '0.0.0.0:8889' namespace: temporal_sdk + logging: - jaeger: + otlp/jaeger: endpoint: jaeger:14250 - insecure: true processors: batch: @@ -24,13 +27,13 @@ extensions: endpoint: :55679 service: - extensions: [pprof, zpages, health_check] + extensions: [ pprof, zpages, health_check ] pipelines: traces: - receivers: [otlp] - processors: [batch] - exporters: [logging, jaeger] + receivers: [ otlp ] + processors: [ batch ] + exporters: [ logging, otlp/jaeger ] metrics: - receivers: [otlp] - processors: [batch] - exporters: [logging, prometheus] + receivers: [ otlp ] + processors: [ batch ] + exporters: [ logging, prometheus ] diff --git a/test-utils/src/lib.rs b/test-utils/src/lib.rs index 576f790b4..3c2d88da8 100644 --- a/test-utils/src/lib.rs +++ b/test-utils/src/lib.rs @@ -73,11 +73,10 @@ pub const INTEG_USE_TLS_ENV_VAR: &str = "TEMPORAL_USE_TLS"; pub const INTEG_TEMPORAL_DEV_SERVER_USED_ENV_VAR: &str = "INTEG_TEMPORAL_DEV_SERVER_ON"; /// This env var is set (to any value) if the test server is in use pub const INTEG_TEST_SERVER_USED_ENV_VAR: &str = "INTEG_TEST_SERVER_ON"; - /// If set, turn export traces and metrics to the OTel collector at the given URL -const OTEL_URL_ENV_VAR: &str = "TEMPORAL_INTEG_OTEL_URL"; +pub const OTEL_URL_ENV_VAR: &str = "TEMPORAL_INTEG_OTEL_URL"; /// If set, enable direct scraping of prom metrics on the specified port -const PROM_ENABLE_ENV_VAR: &str = "TEMPORAL_INTEG_PROM_PORT"; +pub const PROM_ENABLE_ENV_VAR: &str = "TEMPORAL_INTEG_PROM_PORT"; #[macro_export] macro_rules! prost_dur { ($dur_call:ident $args:tt) => { diff --git a/tests/integ_tests/metrics_tests.rs b/tests/integ_tests/metrics_tests.rs index 259488197..45e9044ad 100644 --- a/tests/integ_tests/metrics_tests.rs +++ b/tests/integ_tests/metrics_tests.rs @@ -1,11 +1,16 @@ use assert_matches::assert_matches; -use std::{net::SocketAddr, sync::Arc, time::Duration}; +use std::{env, net::SocketAddr, sync::Arc, time::Duration}; use temporal_client::{WorkflowClientTrait, WorkflowOptions, WorkflowService}; -use temporal_sdk_core::{init_worker, telemetry::start_prometheus_metric_exporter, CoreRuntime}; +use temporal_sdk_core::{ + init_worker, + telemetry::{build_otlp_metric_exporter, start_prometheus_metric_exporter}, + CoreRuntime, +}; use temporal_sdk_core_api::{ telemetry::{ metrics::{CoreMeter, MetricAttributes, MetricParameters}, - PrometheusExporterOptionsBuilder, TelemetryOptions, + OtelCollectorOptionsBuilder, PrometheusExporterOptionsBuilder, TelemetryOptions, + TelemetryOptionsBuilder, }, worker::WorkerConfigBuilder, Worker, @@ -30,9 +35,10 @@ use temporal_sdk_core_protos::{ }, }; use temporal_sdk_core_test_utils::{ - get_integ_server_options, get_integ_telem_options, CoreWfStarter, NAMESPACE, + get_integ_server_options, get_integ_telem_options, CoreWfStarter, NAMESPACE, OTEL_URL_ENV_VAR, }; use tokio::{join, sync::Barrier, task::AbortHandle}; +use url::Url; static ANY_PORT: &str = "127.0.0.1:0"; @@ -575,3 +581,42 @@ async fn request_fail_codes() { assert!(matching_line.contains("status_code=\"INVALID_ARGUMENT\"")); assert!(matching_line.contains("} 1")); } + +// OTel collector shutdown hangs in a single-threaded Tokio environment. We used to, in the past +// have a dedicated runtime just for telemetry which was meant to address problems like this. +// In reality, users are unlikely to run a single-threaded runtime. +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn request_fail_codes_otel() { + let exporter = if let Some(url) = env::var(OTEL_URL_ENV_VAR) + .ok() + .map(|x| x.parse::().unwrap()) + { + let opts = OtelCollectorOptionsBuilder::default() + .url(url) + .build() + .unwrap(); + build_otlp_metric_exporter(opts).unwrap() + } else { + // skip + return; + }; + let mut telemopts = TelemetryOptionsBuilder::default(); + let exporter = Arc::new(exporter); + telemopts.metrics(exporter as Arc); + + let rt = CoreRuntime::new_assume_tokio(telemopts.build().unwrap()).unwrap(); + let opts = get_integ_server_options(); + let mut client = opts + .connect(NAMESPACE, rt.telemetry().get_temporal_metric_meter()) + .await + .unwrap(); + + for _ in 0..10 { + // Describe namespace w/ invalid argument (unset namespace field) + WorkflowService::describe_namespace(&mut client, DescribeNamespaceRequest::default()) + .await + .unwrap_err(); + + tokio::time::sleep(Duration::from_secs(1)).await; + } +}