Skip to content

Commit

Permalink
Integrate OpenTelemetry into the proxy
Browse files Browse the repository at this point in the history
This wires up the proxy to optionally use OpenTelmetry as the format for exported traces. Currently, this defaults to the existing OpenCensus exporter. Changing the default and eventually removing the OpenCensus exporter will come later.

[#10111](linkerd/linkerd2#10111)

Signed-off-by: Scott Fleener <[email protected]>
  • Loading branch information
sfleen committed Sep 19, 2024
1 parent 197943e commit cfc8d3f
Show file tree
Hide file tree
Showing 21 changed files with 636 additions and 268 deletions.
41 changes: 32 additions & 9 deletions Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -1126,7 +1126,9 @@ dependencies = [
"linkerd-app-outbound",
"linkerd-error",
"linkerd-opencensus",
"linkerd-opentelemetry",
"linkerd-tonic-stream",
"opentelemetry-proto",
"rangemap",
"regex",
"thiserror",
Expand Down Expand Up @@ -1184,6 +1186,7 @@ dependencies = [
"linkerd-meshtls",
"linkerd-metrics",
"linkerd-opencensus",
"linkerd-opentelemetry",
"linkerd-proxy-api-resolve",
"linkerd-proxy-balance",
"linkerd-proxy-client-policy",
Expand All @@ -1209,6 +1212,8 @@ dependencies = [
"linkerd-tracing",
"linkerd-transport-header",
"linkerd-transport-metrics",
"opentelemetry",
"opentelemetry_sdk",
"parking_lot",
"pin-project",
"prometheus-client",
Expand Down Expand Up @@ -1750,6 +1755,24 @@ dependencies = [
"tracing",
]

[[package]]
name = "linkerd-opentelemetry"
version = "0.1.0"
dependencies = [
"futures",
"http",
"http-body",
"linkerd-error",
"linkerd-metrics",
"opentelemetry",
"opentelemetry-proto",
"opentelemetry_sdk",
"tokio",
"tokio-stream",
"tonic",
"tracing",
]

[[package]]
name = "linkerd-pool"
version = "0.1.0"
Expand Down Expand Up @@ -2466,14 +2489,13 @@ dependencies = [

[[package]]
name = "mio"
version = "1.0.2"
version = "0.8.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "80e04d1dcff3aae0704555fe5fee3bcfaf3d1fdf8a7e521d5b9d2b42acb52cec"
checksum = "a4a650543ca06a924e8b371db273b2756685faae30f8487da1b56505a8f78b0c"
dependencies = [
"hermit-abi",
"libc",
"wasi",
"windows-sys 0.52.0",
"windows-sys 0.48.0",
]

[[package]]
Expand Down Expand Up @@ -3423,20 +3445,21 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20"

[[package]]
name = "tokio"
version = "1.40.0"
version = "1.38.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e2b070231665d27ad9ec9b8df639893f46727666c6767db40317fbe920a5d998"
checksum = "eb2caba9f80616f438e09748d5acda951967e1ea58508ef53d9c6402485a46df"
dependencies = [
"backtrace",
"bytes",
"libc",
"mio",
"num_cpus",
"parking_lot",
"pin-project-lite",
"signal-hook-registry",
"socket2",
"tokio-macros",
"windows-sys 0.52.0",
"windows-sys 0.48.0",
]

[[package]]
Expand All @@ -3462,9 +3485,9 @@ dependencies = [

[[package]]
name = "tokio-macros"
version = "2.4.0"
version = "2.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "693d596312e88961bc67d7f1f97af8a70227d9f90c31bba5806eec004978d752"
checksum = "5f5ae998a069d4b5aba8ee9dad856af7d520c3699e6159b185c2acd48155d39a"
dependencies = [
"proc-macro2",
"quote",
Expand Down
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ members = [
"linkerd/meshtls/verifier",
"linkerd/metrics",
"linkerd/opencensus",
"linkerd/opentelemetry",
"linkerd/pool",
"linkerd/pool/mock",
"linkerd/pool/p2c",
Expand Down
2 changes: 2 additions & 0 deletions linkerd/app/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ linkerd-app-inbound = { path = "./inbound" }
linkerd-app-outbound = { path = "./outbound" }
linkerd-error = { path = "../error" }
linkerd-opencensus = { path = "../opencensus" }
linkerd-opentelemetry = { path = "../opentelemetry" }
linkerd-tonic-stream = { path = "../tonic-stream" }
opentelemetry-proto = { path = "../../opentelemetry-proto" }
rangemap = "1"
regex = "1"
thiserror = "1"
Expand Down
3 changes: 3 additions & 0 deletions linkerd/app/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ linkerd-io = { path = "../../io" }
linkerd-meshtls = { path = "../../meshtls", default-features = false }
linkerd-metrics = { path = "../../metrics", features = ["process", "stack"] }
linkerd-opencensus = { path = "../../opencensus" }
linkerd-opentelemetry = { path = "../../opentelemetry" }
linkerd-proxy-api-resolve = { path = "../../proxy/api-resolve" }
linkerd-proxy-balance = { path = "../../proxy/balance" }
linkerd-proxy-core = { path = "../../proxy/core" }
Expand All @@ -71,6 +72,8 @@ linkerd-transport-header = { path = "../../transport-header" }
linkerd-transport-metrics = { path = "../../transport-metrics" }
linkerd-tls = { path = "../../tls" }
linkerd-trace-context = { path = "../../trace-context" }
opentelemetry = { version = "0.23", default-features = false, features = ["trace"] }
opentelemetry_sdk = { version = "0.23", default-features = false, features = ["trace"] }

[dependencies.tower]
version = "0.4"
Expand Down
159 changes: 54 additions & 105 deletions linkerd/app/core/src/http_tracing.rs
Original file line number Diff line number Diff line change
@@ -1,139 +1,88 @@
use crate::http_tracing::opencensus::{OcSpanConverter, OpenCensusSink};
use crate::http_tracing::opentelemetry::{OpenTelemetrySink, OtelSpanConverter};
use linkerd_error::Error;
use linkerd_opencensus::proto::trace::v1 as oc;
use linkerd_stack::layer;
use linkerd_trace_context::{self as trace_context, TraceContext};
use linkerd_trace_context as trace_context;
use linkerd_trace_context::{Span, TraceContext};
use std::{collections::HashMap, sync::Arc};
use thiserror::Error;
use tokio::sync::mpsc;

pub type OpenCensusSink = Option<mpsc::Sender<oc::Span>>;
mod opencensus;
mod opentelemetry;

pub type Labels = Arc<HashMap<String, String>>;

/// SpanConverter converts trace_context::Span objects into OpenCensus agent
/// protobuf span objects. SpanConverter receives trace_context::Span objects by
/// implmenting the SpanSink trait. For each span that it receives, it converts
/// it to an OpenCensus span and then sends it on the provided mpsc::Sender.
#[derive(Clone)]
pub struct SpanConverter {
kind: Kind,
sink: mpsc::Sender<oc::Span>,
labels: Labels,
#[derive(Clone, Debug)]
pub enum SpanSink {
Oc(OpenCensusSink),
Otel(OpenTelemetrySink),
}

#[derive(Debug, Error)]
#[error("ID '{:?} should have {} bytes, but it has {}", self.id, self.expected_size, self.actual_size)]
pub struct IdLengthError {
id: Vec<u8>,
expected_size: usize,
actual_size: usize,
impl SpanSink {
fn into_converter(self, kind: Kind, labels: impl Into<Labels>) -> SpanConverter {
match self {
SpanSink::Oc(oc) => SpanConverter::Oc(OcSpanConverter::from_sink(oc, kind, labels)),
SpanSink::Otel(otel) => {
SpanConverter::Otel(OtelSpanConverter::from_sink(otel, kind, labels))
}
}
}
}

pub fn server<S>(
sink: OpenCensusSink,
sink: Option<SpanSink>,
labels: impl Into<Labels>,
) -> impl layer::Layer<S, Service = TraceContext<Option<SpanConverter>, S>> + Clone {
SpanConverter::layer(Kind::Server, sink, labels)
TraceContext::layer(sink.map(move |sink| sink.into_converter(Kind::Server, labels)))
}

pub fn client<S>(
sink: OpenCensusSink,
sink: Option<SpanSink>,
labels: impl Into<Labels>,
) -> impl layer::Layer<S, Service = TraceContext<Option<SpanConverter>, S>> + Clone {
SpanConverter::layer(Kind::Client, sink, labels)
TraceContext::layer(sink.map(move |sink| sink.into_converter(Kind::Client, labels)))
}

#[derive(Copy, Clone, Debug, PartialEq)]
enum Kind {
Server = 1,
Client = 2,
}

impl SpanConverter {
fn layer<S>(
kind: Kind,
sink: OpenCensusSink,
labels: impl Into<Labels>,
) -> impl layer::Layer<S, Service = TraceContext<Option<Self>, S>> + Clone {
TraceContext::layer(sink.map(move |sink| Self {
kind,
sink,
labels: labels.into(),
}))
}

fn mk_span(&self, mut span: trace_context::Span) -> Result<oc::Span, IdLengthError> {
let mut attributes = HashMap::<String, oc::AttributeValue>::new();
for (k, v) in self.labels.iter() {
attributes.insert(
k.clone(),
oc::AttributeValue {
value: Some(oc::attribute_value::Value::StringValue(truncatable(
v.clone(),
))),
},
);
}
for (k, v) in span.labels.drain() {
attributes.insert(
k.to_string(),
oc::AttributeValue {
value: Some(oc::attribute_value::Value::StringValue(truncatable(v))),
},
);
}
Ok(oc::Span {
trace_id: into_bytes(span.trace_id, 16)?,
span_id: into_bytes(span.span_id, 8)?,
tracestate: None,
parent_span_id: into_bytes(span.parent_id, 8)?,
name: Some(truncatable(span.span_name)),
kind: self.kind as i32,
start_time: Some(span.start.into()),
end_time: Some(span.end.into()),
attributes: Some(oc::span::Attributes {
attribute_map: attributes,
dropped_attributes_count: 0,
}),
stack_trace: None,
time_events: None,
links: None,
status: None, // TODO: this is gRPC status; we must read response trailers to populate this
resource: None,
same_process_as_parent_span: Some(self.kind == Kind::Client),
child_span_count: None,
})
}
#[derive(Clone)]
pub enum SpanConverter {
Oc(OcSpanConverter),
Otel(OtelSpanConverter),
}

impl trace_context::SpanSink for SpanConverter {
#[inline]
fn is_enabled(&self) -> bool {
true
}

fn try_send(&mut self, span: trace_context::Span) -> Result<(), Error> {
let span = self.mk_span(span)?;
self.sink.try_send(span).map_err(Into::into)
fn try_send(&mut self, span: Span) -> Result<(), Error> {
match self {
SpanConverter::Oc(oc) => oc.try_send(span),
SpanConverter::Otel(otel) => otel.try_send(span),
}
}
}

fn into_bytes(id: trace_context::Id, size: usize) -> Result<Vec<u8>, IdLengthError> {
let bytes: Vec<u8> = id.into();
if bytes.len() == size {
Ok(bytes)
} else {
let actual_size = bytes.len();
Err(IdLengthError {
id: bytes,
expected_size: size,
actual_size,
})
}
#[derive(Debug, Error)]
#[error("ID '{:?} should have {} bytes, but it has {}", self.id, self.expected_size, self.actual_size)]
pub struct IdLengthError {
id: Vec<u8>,
expected_size: usize,
actual_size: usize,
}

fn truncatable(value: String) -> oc::TruncatableString {
oc::TruncatableString {
value,
truncated_byte_count: 0,
}
#[derive(Copy, Clone, Debug, PartialEq)]
enum Kind {
Server = 1,
Client = 2,
}

fn into_bytes<const N: usize>(id: trace_context::Id) -> Result<[u8; N], IdLengthError> {
id.as_ref().try_into().map_err(|_| {
let bytes: Vec<u8> = id.into();
IdLengthError {
expected_size: N,
actual_size: bytes.len(),
id: bytes,
}
})
}
Loading

0 comments on commit cfc8d3f

Please sign in to comment.