Skip to content

Commit

Permalink
Integrate OpenTelemetry into the proxy
Browse files Browse the repository at this point in the history
OpenCensus is a deprecated protocol and is slated to be removed from upstream collectors soon.

This wires up the proxy to optionally use OpenTelmetry as the format for exported traces. Currently, this defaults to the existing OpenCensus exporter, and we can switch the default later.

[#10111](linkerd/linkerd2#10111)

Signed-off-by: Scott Fleener <[email protected]>
  • Loading branch information
sfleen committed Sep 25, 2024
1 parent a1d3c79 commit 92221b2
Show file tree
Hide file tree
Showing 24 changed files with 623 additions and 272 deletions.
4 changes: 4 additions & 0 deletions Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -1126,6 +1126,7 @@ dependencies = [
"linkerd-app-outbound",
"linkerd-error",
"linkerd-opencensus",
"linkerd-opentelemetry",
"linkerd-tonic-stream",
"rangemap",
"regex",
Expand Down Expand Up @@ -1184,6 +1185,7 @@ dependencies = [
"linkerd-meshtls",
"linkerd-metrics",
"linkerd-opencensus",
"linkerd-opentelemetry",
"linkerd-proxy-api-resolve",
"linkerd-proxy-balance",
"linkerd-proxy-client-policy",
Expand Down Expand Up @@ -1743,6 +1745,7 @@ dependencies = [
"http-body",
"linkerd-error",
"linkerd-metrics",
"linkerd-trace-context",
"opencensus-proto",
"tokio",
"tokio-stream",
Expand All @@ -1759,6 +1762,7 @@ dependencies = [
"http-body",
"linkerd-error",
"linkerd-metrics",
"linkerd-trace-context",
"opentelemetry",
"opentelemetry-proto",
"opentelemetry_sdk",
Expand Down
1 change: 1 addition & 0 deletions linkerd/app/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ linkerd-app-inbound = { path = "./inbound" }
linkerd-app-outbound = { path = "./outbound" }
linkerd-error = { path = "../error" }
linkerd-opencensus = { path = "../opencensus" }
linkerd-opentelemetry = { path = "../opentelemetry" }
linkerd-tonic-stream = { path = "../tonic-stream" }
rangemap = "1"
regex = "1"
Expand Down
1 change: 1 addition & 0 deletions linkerd/app/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ linkerd-io = { path = "../../io" }
linkerd-meshtls = { path = "../../meshtls", default-features = false }
linkerd-metrics = { path = "../../metrics", features = ["process", "stack"] }
linkerd-opencensus = { path = "../../opencensus" }
linkerd-opentelemetry = { path = "../../opentelemetry" }
linkerd-proxy-api-resolve = { path = "../../proxy/api-resolve" }
linkerd-proxy-balance = { path = "../../proxy/balance" }
linkerd-proxy-core = { path = "../../proxy/core" }
Expand Down
154 changes: 45 additions & 109 deletions linkerd/app/core/src/http_tracing.rs
Original file line number Diff line number Diff line change
@@ -1,139 +1,75 @@
use linkerd_error::Error;
use linkerd_opencensus::proto::trace::v1 as oc;
use linkerd_stack::layer;
use linkerd_trace_context::{self as trace_context, TraceContext};
use linkerd_trace_context as trace_context;
use linkerd_trace_context::export::{ExportSpan, SpanKind};
use linkerd_trace_context::{Span, TraceContext};
use std::str::FromStr;
use std::{collections::HashMap, sync::Arc};
use thiserror::Error;
use tokio::sync::mpsc;

pub type OpenCensusSink = Option<mpsc::Sender<oc::Span>>;
pub type Labels = Arc<HashMap<String, String>>;

/// SpanConverter converts trace_context::Span objects into OpenCensus agent
/// protobuf span objects. SpanConverter receives trace_context::Span objects by
/// implmenting the SpanSink trait. For each span that it receives, it converts
/// it to an OpenCensus span and then sends it on the provided mpsc::Sender.
#[derive(Clone)]
pub struct SpanConverter {
kind: Kind,
sink: mpsc::Sender<oc::Span>,
labels: Labels,
#[derive(Debug, Copy, Clone, Default)]
pub enum CollectorProtocol {
#[default]
OpenCensus,
OpenTelemetry,
}

#[derive(Debug, Error)]
#[error("ID '{:?} should have {} bytes, but it has {}", self.id, self.expected_size, self.actual_size)]
pub struct IdLengthError {
id: Vec<u8>,
expected_size: usize,
actual_size: usize,
impl FromStr for CollectorProtocol {
type Err = ();

fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"opencensus" => Ok(CollectorProtocol::OpenCensus),
"opentelemetry" => Ok(CollectorProtocol::OpenTelemetry),
_ => Err(()),
}
}
}

pub type SpanSink = mpsc::Sender<ExportSpan>;

pub fn server<S>(
sink: OpenCensusSink,
sink: Option<SpanSink>,
labels: impl Into<Labels>,
) -> impl layer::Layer<S, Service = TraceContext<Option<SpanConverter>, S>> + Clone {
SpanConverter::layer(Kind::Server, sink, labels)
TraceContext::layer(sink.map(move |sink| SpanConverter {
kind: SpanKind::Server,
sink,
labels: labels.into(),
}))
}

pub fn client<S>(
sink: OpenCensusSink,
sink: Option<SpanSink>,
labels: impl Into<Labels>,
) -> impl layer::Layer<S, Service = TraceContext<Option<SpanConverter>, S>> + Clone {
SpanConverter::layer(Kind::Client, sink, labels)
}

#[derive(Copy, Clone, Debug, PartialEq)]
enum Kind {
Server = 1,
Client = 2,
TraceContext::layer(sink.map(move |sink| SpanConverter {
kind: SpanKind::Client,
sink,
labels: labels.into(),
}))
}

impl SpanConverter {
fn layer<S>(
kind: Kind,
sink: OpenCensusSink,
labels: impl Into<Labels>,
) -> impl layer::Layer<S, Service = TraceContext<Option<Self>, S>> + Clone {
TraceContext::layer(sink.map(move |sink| Self {
kind,
sink,
labels: labels.into(),
}))
}

fn mk_span(&self, mut span: trace_context::Span) -> Result<oc::Span, IdLengthError> {
let mut attributes = HashMap::<String, oc::AttributeValue>::new();
for (k, v) in self.labels.iter() {
attributes.insert(
k.clone(),
oc::AttributeValue {
value: Some(oc::attribute_value::Value::StringValue(truncatable(
v.clone(),
))),
},
);
}
for (k, v) in span.labels.drain() {
attributes.insert(
k.to_string(),
oc::AttributeValue {
value: Some(oc::attribute_value::Value::StringValue(truncatable(v))),
},
);
}
Ok(oc::Span {
trace_id: into_bytes(span.trace_id, 16)?,
span_id: into_bytes(span.span_id, 8)?,
tracestate: None,
parent_span_id: into_bytes(span.parent_id, 8)?,
name: Some(truncatable(span.span_name)),
kind: self.kind as i32,
start_time: Some(span.start.into()),
end_time: Some(span.end.into()),
attributes: Some(oc::span::Attributes {
attribute_map: attributes,
dropped_attributes_count: 0,
}),
stack_trace: None,
time_events: None,
links: None,
status: None, // TODO: this is gRPC status; we must read response trailers to populate this
resource: None,
same_process_as_parent_span: Some(self.kind == Kind::Client),
child_span_count: None,
})
}
#[derive(Clone)]
pub struct SpanConverter {
kind: SpanKind,
sink: SpanSink,
labels: Labels,
}

impl trace_context::SpanSink for SpanConverter {
#[inline]
fn is_enabled(&self) -> bool {
true
}

fn try_send(&mut self, span: trace_context::Span) -> Result<(), Error> {
let span = self.mk_span(span)?;
self.sink.try_send(span).map_err(Into::into)
}
}

fn into_bytes(id: trace_context::Id, size: usize) -> Result<Vec<u8>, IdLengthError> {
let bytes: Vec<u8> = id.into();
if bytes.len() == size {
Ok(bytes)
} else {
let actual_size = bytes.len();
Err(IdLengthError {
id: bytes,
expected_size: size,
actual_size,
})
}
}

fn truncatable(value: String) -> oc::TruncatableString {
oc::TruncatableString {
value,
truncated_byte_count: 0,
fn try_send(&mut self, span: Span) -> Result<(), Error> {
self.sink.try_send(ExportSpan {
span,
kind: self.kind,
labels: Arc::clone(&self.labels),
})?;
Ok(())
}
}
3 changes: 2 additions & 1 deletion linkerd/app/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ pub use linkerd_http_metrics as http_metrics;
pub use linkerd_idle_cache as idle_cache;
pub use linkerd_io as io;
pub use linkerd_opencensus as opencensus;
pub use linkerd_opentelemetry as opentelemetry;
pub use linkerd_service_profiles as profiles;
pub use linkerd_stack_metrics as stack_metrics;
pub use linkerd_stack_tracing as stack_tracing;
Expand All @@ -65,7 +66,7 @@ pub struct ProxyRuntime {
pub identity: identity::creds::Receiver,
pub metrics: metrics::Proxy,
pub tap: proxy::tap::Registry,
pub span_sink: http_tracing::OpenCensusSink,
pub span_sink: Option<http_tracing::SpanSink>,
pub drain: drain::Watch,
}

Expand Down
6 changes: 5 additions & 1 deletion linkerd/app/core/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
pub use crate::transport::labels::{TargetAddr, TlsAccept};
use crate::{
classify::Class,
control, http_metrics, opencensus, profiles, stack_metrics,
control, http_metrics, opencensus, opentelemetry, profiles, stack_metrics,
svc::Param,
tls,
transport::{self, labels::TlsConnect},
Expand Down Expand Up @@ -39,6 +39,7 @@ pub struct Metrics {
pub proxy: Proxy,
pub control: ControlHttp,
pub opencensus: opencensus::metrics::Registry,
pub opentelemetry: opentelemetry::metrics::Registry,
}

#[derive(Clone, Debug)]
Expand Down Expand Up @@ -191,11 +192,13 @@ impl Metrics {
};

let (opencensus, opencensus_report) = opencensus::metrics::new();
let (opentelemetry, opentelemetry_report) = opentelemetry::metrics::new();

let metrics = Metrics {
proxy,
control,
opencensus,
opentelemetry,
};

let report = endpoint_report
Expand All @@ -205,6 +208,7 @@ impl Metrics {
.and_report(control_report)
.and_report(transport_report)
.and_report(opencensus_report)
.and_report(opentelemetry_report)
.and_report(stack);

(metrics, report)
Expand Down
12 changes: 5 additions & 7 deletions linkerd/app/inbound/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,13 @@ mod server;
#[cfg(any(test, feature = "test-util", fuzzing))]
pub mod test_util;

#[cfg(fuzzing)]
pub use self::http::fuzz as http_fuzz;
pub use self::{metrics::InboundMetrics, policy::DefaultPolicy};
use linkerd_app_core::http_tracing::SpanSink;
use linkerd_app_core::{
config::{ConnectConfig, ProxyConfig, QueueConfig},
drain,
http_tracing::OpenCensusSink,
identity, io,
drain, identity, io,
proxy::{tap, tcp},
svc,
transport::{self, Remote, ServerAddr},
Expand All @@ -33,9 +34,6 @@ use std::{fmt::Debug, time::Duration};
use thiserror::Error;
use tracing::debug_span;

#[cfg(fuzzing)]
pub use self::http::fuzz as http_fuzz;

#[derive(Clone, Debug)]
pub struct Config {
pub allow_discovery: NameMatch,
Expand Down Expand Up @@ -67,7 +65,7 @@ struct Runtime {
metrics: InboundMetrics,
identity: identity::creds::Receiver,
tap: tap::Registry,
span_sink: OpenCensusSink,
span_sink: Option<SpanSink>,
drain: drain::Watch,
}

Expand Down
2 changes: 1 addition & 1 deletion linkerd/app/outbound/src/http/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ impl<T> Outbound<svc::ArcNewCloneHttp<T>> {
.check_new_service::<T, http::Request<_>>()
.push(ServerRescue::layer(config.emit_headers))
.check_new_service::<T, http::Request<_>>()
// Initiates OpenCensus tracing.
// Initiates OpenTelemetry tracing.
.push_on_service(http_tracing::server(rt.span_sink.clone(), trace_labels()))
.push_on_service(http::BoxResponse::layer())
// Convert origin form HTTP/1 URIs to absolute form for Hyper's
Expand Down
4 changes: 2 additions & 2 deletions linkerd/app/outbound/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@
#![allow(opaque_hidden_inferred_bound)]
#![forbid(unsafe_code)]

use linkerd_app_core::http_tracing::SpanSink;
use linkerd_app_core::{
config::{ProxyConfig, QueueConfig},
drain,
exp_backoff::ExponentialBackoff,
http_tracing::OpenCensusSink,
identity, io,
metrics::prom,
profiles,
Expand Down Expand Up @@ -95,7 +95,7 @@ struct Runtime {
metrics: OutboundMetrics,
identity: identity::NewClient,
tap: tap::Registry,
span_sink: OpenCensusSink,
span_sink: Option<SpanSink>,
drain: drain::Watch,
}

Expand Down
Loading

0 comments on commit 92221b2

Please sign in to comment.