Skip to content

Commit

Permalink
Add OpenTelemetry exporter implementation
Browse files Browse the repository at this point in the history
This mirrors and duplicates most of the logic from the existing OpenCensus exporter to keep the functionality as similar as possible. Setting up the proxy to use this exporter will come in a follow-up.

(#10111)[linkerd/linkerd2#10111]

Signed-off-by: Scott Fleener <[email protected]>
  • Loading branch information
sfleen committed Sep 19, 2024
1 parent e4a1822 commit dae8222
Show file tree
Hide file tree
Showing 3 changed files with 294 additions and 0 deletions.
24 changes: 24 additions & 0 deletions linkerd/opentelemetry/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
[package]
name = "linkerd-opentelemetry"
version = "0.1.0"
authors = ["Linkerd Developers <[email protected]>"]
license = "Apache-2.0"
edition = "2021"
publish = false

[dependencies]
futures = { version = "0.3", default-features = false }
http = "0.2"
http-body = "0.4"
linkerd-error = { path = "../error" }
linkerd-metrics = { path = "../metrics" }
opentelemetry = { version = "0.23", default-features = false, features = ["trace"] }
opentelemetry_sdk = { version = "0.23", default-features = false, features = ["trace"] }
opentelemetry-proto = { path = "../../opentelemetry-proto" }
tonic = { version = "0.10", default-features = false, features = [
"prost",
"codegen",
] }
tokio = { version = "1", features = ["macros", "sync", "time"] }
tokio-stream = { version = "0.1", features = ["sync"] }
tracing = "0.1"
212 changes: 212 additions & 0 deletions linkerd/opentelemetry/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,212 @@
#![deny(rust_2018_idioms, clippy::disallowed_methods, clippy::disallowed_types)]
#![forbid(unsafe_code)]

pub mod metrics;

use futures::stream::{Stream, StreamExt};
use http_body::Body as HttpBody;
use linkerd_error::Error;
use metrics::Registry;
pub use opentelemetry_proto as proto;
use opentelemetry_proto::proto::collector::trace::v1::trace_service_client::TraceServiceClient;
use opentelemetry_proto::proto::collector::trace::v1::ExportTraceServiceRequest;
use opentelemetry_proto::proto::trace::v1::ResourceSpans;
use opentelemetry_proto::transform::common::ResourceAttributesWithSchema;
use opentelemetry_proto::transform::trace::group_spans_by_resource_and_scope;
pub use opentelemetry_sdk::export::trace::SpanData;
use tokio::{sync::mpsc, time};
use tonic::{self as grpc, body::BoxBody, client::GrpcService};
use tracing::{debug, trace};

pub async fn export_spans<T, S>(
client: T,
spans: S,
resource: ResourceAttributesWithSchema,
metrics: Registry,
) where
T: GrpcService<BoxBody> + Clone,
T::Error: Into<Error>,
T::ResponseBody: Default + HttpBody<Data = tonic::codegen::Bytes> + Send + 'static,
<T::ResponseBody as HttpBody>::Error: Into<Error> + Send,
S: Stream<Item = SpanData> + Unpin,
{
debug!("Span exporter running");
SpanExporter::new(client, spans, resource, metrics)
.run()
.await
}

/// SpanExporter sends a Stream of spans to the given TraceService gRPC service.
struct SpanExporter<T, S> {
client: T,
spans: S,
resource: ResourceAttributesWithSchema,
metrics: Registry,
}

#[derive(Debug)]
struct SpanRxClosed;

// === impl SpanExporter ===

impl<T, S> SpanExporter<T, S>
where
T: GrpcService<BoxBody> + Clone,
T::Error: Into<Error>,
T::ResponseBody: Default + HttpBody<Data = tonic::codegen::Bytes> + Send + 'static,
<T::ResponseBody as HttpBody>::Error: Into<Error> + Send,
S: Stream<Item = SpanData> + Unpin,
{
const MAX_BATCH_SIZE: usize = 1000;
const MAX_BATCH_IDLE: time::Duration = time::Duration::from_secs(10);

fn new(client: T, spans: S, resource: ResourceAttributesWithSchema, metrics: Registry) -> Self {
Self {
client,
spans,
resource,
metrics,
}
}

async fn run(self) {
let Self {
client,
mut spans,
resource,
mut metrics,
} = self;

// Holds the batch of pending spans. Cleared as the spans are flushed.
// Contains no more than MAX_BATCH_SIZE spans.
let mut accum = Vec::new();

let mut svc = TraceServiceClient::new(client);
loop {
trace!("Establishing new TraceService::export request");
metrics.start_stream();
let (tx, mut rx) = mpsc::channel(1);

let recv_future = async {
while let Some(req) = rx.recv().await {
match svc.export(grpc::Request::new(req)).await {
Ok(rsp) => {
let Some(partial_success) = rsp.into_inner().partial_success else {
continue;
};

if !partial_success.error_message.is_empty() {
debug!(
%partial_success.error_message,
rejected_spans = partial_success.rejected_spans,
"Response partially successful",
);
}
}
Err(error) => {
debug!(%error, "Response future failed; restarting");
}
}
}
};

// Drive both the response future and the export stream
// simultaneously.
tokio::select! {
_ = recv_future => {}
res = Self::export(&tx, &mut spans, &resource, &mut accum) => match res {
// The export stream closed; reconnect.
Ok(()) => {},
// No more spans.
Err(SpanRxClosed) => return,
},
}
}
}

/// Accumulate spans and send them on the export stream.
///
/// Returns an error when the proxy has closed the span stream.
async fn export(
tx: &mpsc::Sender<ExportTraceServiceRequest>,
spans: &mut S,
resource: &ResourceAttributesWithSchema,
accum: &mut Vec<ResourceSpans>,
) -> Result<(), SpanRxClosed> {
loop {
// Collect spans into a batch.
let collect = Self::collect_batch(spans, resource, accum).await;

// If we collected spans, flush them.
if !accum.is_empty() {
// Once a batch has been accumulated, ensure that the
// request stream is ready to accept the batch.
match tx.reserve().await {
Ok(tx) => {
let msg = ExportTraceServiceRequest {
resource_spans: std::mem::take(accum),
};
trace!(spans = msg.resource_spans.len(), "Sending batch");
tx.send(msg);
}
Err(error) => {
// If the channel isn't open, start a new stream
// and retry sending the batch.
debug!(%error, "Request stream lost; restarting");
return Ok(());
}
}
}

// If the span source was closed, end the task.
if let Err(closed) = collect {
debug!("Span channel lost");
return Err(closed);
}
}
}

/// Collects spans from the proxy into `accum`.
///
/// Returns an error when the span sream has completed. An error may be
/// returned after accumulating spans.
async fn collect_batch(
span_stream: &mut S,
resource: &ResourceAttributesWithSchema,
output_accum: &mut Vec<ResourceSpans>,
) -> Result<(), SpanRxClosed> {
let mut input_accum = vec![];

let res = loop {
if input_accum.len() == Self::MAX_BATCH_SIZE {
trace!(capacity = Self::MAX_BATCH_SIZE, "Batch capacity reached");
break Ok(());
}

tokio::select! {
biased;

res = span_stream.next() => match res {
Some(span) => {
trace!(?span, "Adding to batch");
input_accum.push(span);
}
None => break Err(SpanRxClosed),
},

// Don't hold spans indefinitely. Return if we hit an idle
// timeout and spans have been collected.
_ = time::sleep(Self::MAX_BATCH_IDLE) => {
if !input_accum.is_empty() {
trace!(spans = input_accum.len(), "Flushing spans due to inactivitiy");
break Ok(());
}
}
}
};

*output_accum = group_spans_by_resource_and_scope(input_accum, resource);

res
}
}
58 changes: 58 additions & 0 deletions linkerd/opentelemetry/src/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
use linkerd_metrics::{metrics, Counter, FmtMetrics};
use std::fmt;
use std::sync::Arc;

metrics! {
opentelemetry_span_export_streams: Counter { "Total count of opened span export streams" },
opentelemetry_span_export_requests: Counter { "Total count of span export request messages" },
opentelemetry_span_exports: Counter { "Total count of spans exported" }
}

#[derive(Debug)]
struct Metrics {
streams: Counter,
requests: Counter,
spans: Counter,
}

#[derive(Clone, Debug)]
pub struct Registry(Arc<Metrics>);

#[derive(Clone, Debug)]
pub struct Report(Arc<Metrics>);

pub fn new() -> (Registry, Report) {
let metrics = Metrics {
streams: Counter::default(),
requests: Counter::default(),
spans: Counter::default(),
};
let shared = Arc::new(metrics);
(Registry(shared.clone()), Report(shared))
}

impl Registry {
pub fn start_stream(&mut self) {
self.0.streams.incr()
}

pub fn send(&mut self, spans: u64) {
self.0.requests.incr();
self.0.spans.add(spans);
}
}

impl FmtMetrics for Report {
fn fmt_metrics(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
opentelemetry_span_export_streams.fmt_help(f)?;
opentelemetry_span_export_streams.fmt_metric(f, &self.0.streams)?;

opentelemetry_span_export_requests.fmt_help(f)?;
opentelemetry_span_export_requests.fmt_metric(f, &self.0.requests)?;

opentelemetry_span_exports.fmt_help(f)?;
opentelemetry_span_exports.fmt_metric(f, &self.0.spans)?;

Ok(())
}
}

0 comments on commit dae8222

Please sign in to comment.