Skip to content

Commit

Permalink
Make sure OTel exporter can work properly with TLS
Browse files Browse the repository at this point in the history
  • Loading branch information
Sushisource committed Aug 22, 2024
1 parent e79eb11 commit 35326d6
Show file tree
Hide file tree
Showing 8 changed files with 76 additions and 32 deletions.
1 change: 1 addition & 0 deletions core-api/src/telemetry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ pub struct OtelCollectorOptions {
/// export to this same collector.
pub url: Url,
/// Optional set of HTTP headers to send to the Collector, e.g for authentication.
#[builder(default = "HashMap::new()")]
pub headers: HashMap<String, String>,
/// Optionally specify how frequently metrics should be exported. Defaults to 1 second.
#[builder(default = "Duration::from_secs(1)")]
Expand Down
2 changes: 1 addition & 1 deletion core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ mockall = "0.13"
once_cell = { workspace = true }
opentelemetry = { workspace = true, features = ["metrics"], optional = true }
opentelemetry_sdk = { version = "0.24", features = ["rt-tokio", "metrics"], optional = true }
opentelemetry-otlp = { version = "0.17", features = ["tokio", "metrics"], optional = true }
opentelemetry-otlp = { version = "0.17", features = ["tokio", "metrics", "tls"], optional = true }
opentelemetry-prometheus = { version = "0.17", optional = true }
parking_lot = { version = "0.12", features = ["send_guard"] }
pid = "4.0"
Expand Down
8 changes: 0 additions & 8 deletions core/src/telemetry/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,15 +160,7 @@ impl CoreTelemetry for TelemetryInstance {
///
/// See [TelemetryOptions] docs for more on configuration.
pub fn telemetry_init(opts: TelemetryOptions) -> Result<TelemetryInstance, anyhow::Error> {
// This is a bit odd, but functional. It's desirable to create a separate tokio runtime for
// metrics handling, since tests typically use a single-threaded runtime and initializing
// pipeline requires us to know if the runtime is single or multithreaded, we will crash
// in one case or the other. There does not seem to be a way to tell from the current runtime
// handle if it is single or multithreaded. Additionally, we can isolate metrics work this
// way which is nice.
// Parts of telem dat ====
let mut logs_out = None;
// =======================

// Tracing subscriber layers =========
let mut console_pretty_layer = None;
Expand Down
10 changes: 7 additions & 3 deletions core/src/telemetry/otel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use temporal_sdk_core_api::telemetry::{
MetricTemporality, OtelCollectorOptions, PrometheusExporterOptions,
};
use tokio::task::AbortHandle;
use tonic::metadata::MetadataMap;
use tonic::{metadata::MetadataMap, transport::ClientTlsConfig};

/// Chooses appropriate aggregators for our metrics
#[derive(Debug, Clone)]
Expand Down Expand Up @@ -160,8 +160,12 @@ impl<U> MemoryGauge<U> {
pub fn build_otlp_metric_exporter(
opts: OtelCollectorOptions,
) -> Result<CoreOtelMeter, anyhow::Error> {
let exporter = opentelemetry_otlp::TonicExporterBuilder::default()
.with_endpoint(opts.url.to_string())
let mut exporter =
opentelemetry_otlp::TonicExporterBuilder::default().with_endpoint(opts.url.to_string());
if opts.url.scheme() == "https" || opts.url.scheme() == "grpcs" {
exporter = exporter.with_tls_config(ClientTlsConfig::new().with_native_roots());
}
let exporter = exporter
.with_metadata(MetadataMap::from_headers((&opts.headers).try_into()?))
.build_metrics_exporter(
Box::new(SDKAggSelector::new(opts.use_seconds_for_durations)),
Expand Down
8 changes: 4 additions & 4 deletions docker/docker-compose-telem.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,24 +10,24 @@ services:

otel-collector:
image: otel/opentelemetry-collector:latest
command: ['--config=/etc/otel-collector-config.yaml']
command: [ '--config=/etc/otel-collector-config.yaml' ]
volumes:
- ../../etc/otel-collector-config.yaml:/etc/otel-collector-config.yaml
- ../etc/otel-collector-config.yaml:/etc/otel-collector-config.yaml
ports:
# - "1888:1888" # pprof extension
# It's useful to be able to manually inspect metrics during dev
- '8888:8888' # Prometheus metrics exposed by the collector
- '8889:8889' # Prometheus exporter metrics
# - "13133:13133" # health_check extension
- '4317:4317' # OTLP gRPC receiver
# - "55670:55679" # zpages extension
# - "55679:55679" # zpages extension
depends_on:
- jaeger

prometheus:
container_name: prometheus
image: prom/prometheus:latest
volumes:
- ../../etc/prometheus.yaml:/etc/prometheus/prometheus.yml
- ../etc/prometheus.yaml:/etc/prometheus/prometheus.yml
ports:
- '9090:9090'
21 changes: 12 additions & 9 deletions etc/otel-collector-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,19 @@ receivers:
otlp:
protocols:
grpc:
endpoint: 0.0.0.0:4317
http:
endpoint: 0.0.0.0:4318

exporters:
prometheus:
endpoint: '0.0.0.0:8889'
namespace: temporal_sdk

logging:

jaeger:
otlp/jaeger:
endpoint: jaeger:14250
insecure: true

processors:
batch:
Expand All @@ -24,13 +27,13 @@ extensions:
endpoint: :55679

service:
extensions: [pprof, zpages, health_check]
extensions: [ pprof, zpages, health_check ]
pipelines:
traces:
receivers: [otlp]
processors: [batch]
exporters: [logging, jaeger]
receivers: [ otlp ]
processors: [ batch ]
exporters: [ logging, otlp/jaeger ]
metrics:
receivers: [otlp]
processors: [batch]
exporters: [logging, prometheus]
receivers: [ otlp ]
processors: [ batch ]
exporters: [ logging, prometheus ]
5 changes: 2 additions & 3 deletions test-utils/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,11 +73,10 @@ pub const INTEG_USE_TLS_ENV_VAR: &str = "TEMPORAL_USE_TLS";
pub const INTEG_TEMPORAL_DEV_SERVER_USED_ENV_VAR: &str = "INTEG_TEMPORAL_DEV_SERVER_ON";
/// This env var is set (to any value) if the test server is in use
pub const INTEG_TEST_SERVER_USED_ENV_VAR: &str = "INTEG_TEST_SERVER_ON";

/// If set, turn export traces and metrics to the OTel collector at the given URL
const OTEL_URL_ENV_VAR: &str = "TEMPORAL_INTEG_OTEL_URL";
pub const OTEL_URL_ENV_VAR: &str = "TEMPORAL_INTEG_OTEL_URL";
/// If set, enable direct scraping of prom metrics on the specified port
const PROM_ENABLE_ENV_VAR: &str = "TEMPORAL_INTEG_PROM_PORT";
pub const PROM_ENABLE_ENV_VAR: &str = "TEMPORAL_INTEG_PROM_PORT";
#[macro_export]
macro_rules! prost_dur {
($dur_call:ident $args:tt) => {
Expand Down
53 changes: 49 additions & 4 deletions tests/integ_tests/metrics_tests.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
use assert_matches::assert_matches;
use std::{net::SocketAddr, sync::Arc, time::Duration};
use std::{env, net::SocketAddr, sync::Arc, time::Duration};
use temporal_client::{WorkflowClientTrait, WorkflowOptions, WorkflowService};
use temporal_sdk_core::{init_worker, telemetry::start_prometheus_metric_exporter, CoreRuntime};
use temporal_sdk_core::{
init_worker,
telemetry::{build_otlp_metric_exporter, start_prometheus_metric_exporter},
CoreRuntime,
};
use temporal_sdk_core_api::{
telemetry::{
metrics::{CoreMeter, MetricAttributes, MetricParameters},
PrometheusExporterOptionsBuilder, TelemetryOptions,
OtelCollectorOptionsBuilder, PrometheusExporterOptionsBuilder, TelemetryOptions,
TelemetryOptionsBuilder,
},
worker::WorkerConfigBuilder,
Worker,
Expand All @@ -30,9 +35,10 @@ use temporal_sdk_core_protos::{
},
};
use temporal_sdk_core_test_utils::{
get_integ_server_options, get_integ_telem_options, CoreWfStarter, NAMESPACE,
get_integ_server_options, get_integ_telem_options, CoreWfStarter, NAMESPACE, OTEL_URL_ENV_VAR,
};
use tokio::{join, sync::Barrier, task::AbortHandle};
use url::Url;

static ANY_PORT: &str = "127.0.0.1:0";

Expand Down Expand Up @@ -575,3 +581,42 @@ async fn request_fail_codes() {
assert!(matching_line.contains("status_code=\"INVALID_ARGUMENT\""));
assert!(matching_line.contains("} 1"));
}

// OTel collector shutdown hangs in a single-threaded Tokio environment. We used to, in the past
// have a dedicated runtime just for telemetry which was meant to address problems like this.
// In reality, users are unlikely to run a single-threaded runtime.
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn request_fail_codes_otel() {
let exporter = if let Some(url) = env::var(OTEL_URL_ENV_VAR)
.ok()
.map(|x| x.parse::<Url>().unwrap())
{
let opts = OtelCollectorOptionsBuilder::default()
.url(url)
.build()
.unwrap();
build_otlp_metric_exporter(opts).unwrap()
} else {
// skip
return;
};
let mut telemopts = TelemetryOptionsBuilder::default();
let exporter = Arc::new(exporter);
telemopts.metrics(exporter as Arc<dyn CoreMeter>);

let rt = CoreRuntime::new_assume_tokio(telemopts.build().unwrap()).unwrap();
let opts = get_integ_server_options();
let mut client = opts
.connect(NAMESPACE, rt.telemetry().get_temporal_metric_meter())
.await
.unwrap();

for _ in 0..10 {
// Describe namespace w/ invalid argument (unset namespace field)
WorkflowService::describe_namespace(&mut client, DescribeNamespaceRequest::default())
.await
.unwrap_err();

tokio::time::sleep(Duration::from_secs(1)).await;
}
}

0 comments on commit 35326d6

Please sign in to comment.