Skip to content

Commit

Permalink
Update opentelemetry (#19665)
Browse files Browse the repository at this point in the history
- Upgrade opentelemetry
- consensus: properly configure http2 server

---------

Co-authored-by: Brandon Williams <[email protected]>
  • Loading branch information
mystenmark and bmwill authored Oct 2, 2024
1 parent 337e6a5 commit 928b981
Show file tree
Hide file tree
Showing 8 changed files with 180 additions and 210 deletions.
265 changes: 111 additions & 154 deletions Cargo.lock

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -386,8 +386,8 @@ moka = { version = "0.12", default-features = false, features = [
"atomic64",
] }
more-asserts = "0.3.1"
msim = { git = "https://github.com/MystenLabs/mysten-sim.git", rev = "b320996d8dfb99b273fe31c0222c659332283c99", package = "msim" }
msim-macros = { git = "https://github.com/MystenLabs/mysten-sim.git", rev = "b320996d8dfb99b273fe31c0222c659332283c99", package = "msim-macros" }
msim = { git = "https://github.com/MystenLabs/mysten-sim.git", rev = "9c6636c399d5c60a1759f1670b1c07b3d408799a", package = "msim" }
msim-macros = { git = "https://github.com/MystenLabs/mysten-sim.git", rev = "9c6636c399d5c60a1759f1670b1c07b3d408799a", package = "msim-macros" }
multiaddr = "0.17.0"
nexlint = { git = "https://github.com/nextest-rs/nexlint.git", rev = "7ce56bd591242a57660ed05f14ca2483c37d895b" }
nexlint-lints = { git = "https://github.com/nextest-rs/nexlint.git", rev = "7ce56bd591242a57660ed05f14ca2483c37d895b" }
Expand Down
52 changes: 28 additions & 24 deletions consensus/core/src/network/tonic_network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use bytes::Bytes;
use cfg_if::cfg_if;
use consensus_config::{AuthorityIndex, NetworkKeyPair, NetworkPublicKey};
use futures::{stream, Stream, StreamExt as _};
use hyper_util::rt::tokio::TokioIo;
use hyper_util::rt::{tokio::TokioIo, TokioTimer};
use hyper_util::service::TowerToHyperService;
use mysten_common::sync::notify_once::NotifyOnce;
use mysten_metrics::monitored_future;
Expand All @@ -31,7 +31,7 @@ use tokio::{
};
use tokio_rustls::TlsAcceptor;
use tokio_stream::{iter, Iter};
use tonic::{transport::Server, Request, Response, Streaming};
use tonic::{Request, Response, Streaming};
use tower_http::{
trace::{DefaultMakeSpan, DefaultOnFailure, TraceLayer},
ServiceBuilderExt,
Expand Down Expand Up @@ -710,31 +710,30 @@ impl<S: NetworkService> NetworkManager<S> for TonicManager {
let service = TonicServiceProxy::new(self.context.clone(), service);
let config = &self.context.parameters.tonic;

let consensus_service = Server::builder()
.layer(
TraceLayer::new_for_grpc()
.make_span_with(DefaultMakeSpan::new().level(tracing::Level::TRACE))
.on_failure(DefaultOnFailure::new().level(tracing::Level::DEBUG)),
)
.initial_connection_window_size(64 << 20)
.initial_stream_window_size(32 << 20)
.http2_keepalive_interval(Some(config.keepalive_interval))
.http2_keepalive_timeout(Some(config.keepalive_interval))
// tcp keepalive is unsupported by msim
.add_service(
ConsensusServiceServer::new(service)
.max_encoding_message_size(config.message_size_limit)
.max_decoding_message_size(config.message_size_limit),
)
.into_router();
let consensus_service = tonic::service::Routes::new(
ConsensusServiceServer::new(service)
.max_encoding_message_size(config.message_size_limit)
.max_decoding_message_size(config.message_size_limit),
)
.into_axum_router();

let inbound_metrics = self.context.metrics.network_metrics.inbound.clone();
let excessive_message_size = self.context.parameters.tonic.excessive_message_size;

let http =
hyper_util::server::conn::auto::Builder::new(hyper_util::rt::TokioExecutor::new())
.http2_only();
let http = Arc::new(http);
let http = {
let mut builder =
hyper_util::server::conn::auto::Builder::new(hyper_util::rt::TokioExecutor::new())
.http2_only();
builder
.http2()
.timer(TokioTimer::new())
.initial_connection_window_size(64 << 20)
.initial_stream_window_size(32 << 20)
.keep_alive_interval(Some(config.keepalive_interval))
.keep_alive_timeout(config.keepalive_interval);

Arc::new(builder)
};

let tls_server_config =
create_rustls_server_config(&self.context, self.network_keypair.clone());
Expand Down Expand Up @@ -907,7 +906,12 @@ impl<S: NetworkService> NetworkManager<S> for TonicManager {
inbound_metrics,
excessive_message_size,
)))
.service(consensus_service.clone());
.layer(
TraceLayer::new_for_grpc()
.make_span_with(DefaultMakeSpan::new().level(tracing::Level::TRACE))
.on_failure(DefaultOnFailure::new().level(tracing::Level::DEBUG)),
)
.service(consensus_service);

pin! {
let connection = http.serve_connection(TokioIo::new(tls_stream), TowerToHyperService::new(svc));
Expand Down
16 changes: 9 additions & 7 deletions crates/telemetry-subscribers/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,12 @@ prometheus.workspace = true
tracing.workspace = true
tracing-appender.workspace = true
tracing-subscriber.workspace = true
opentelemetry = { version = "0.20.0", features = ["rt-tokio"], optional = true }
opentelemetry = { version = "0.25.0", optional = true }
opentelemetry_api = { version = "0.20.0", optional = true }
opentelemetry-otlp = { version = "0.13.0", features = ["grpc-tonic"], optional = true }
tracing-opentelemetry = { version = "0.21.0", optional = true }
opentelemetry-proto = { version = "0.3", optional = true }
opentelemetry_sdk = { version = "0.25.0", features = ["rt-tokio"], optional = true }
opentelemetry-otlp = { version = "0.25.0", features = ["grpc-tonic"], optional = true }
tracing-opentelemetry = { version = "0.26.0", optional = true }
opentelemetry-proto = { version = "0.25", optional = true }
tokio = { workspace = true, features = ["full"] }
futures.workspace = true
clap.workspace = true
Expand All @@ -30,8 +31,8 @@ bytes-varint = { version = "1" }

# must use same version as opentelemetry for tonic and prost, so we can't use from
# workspace
tonic = { version = "0.9" }
prost = "0.11.9"
tonic = { version = "0.12.3" }
prost = "0.13"

[features]
default = ["otlp"]
Expand All @@ -41,7 +42,8 @@ otlp = [
"opentelemetry",
"opentelemetry-otlp",
"opentelemetry-proto",
"opentelemetry_api"
"opentelemetry_api",
"opentelemetry_sdk"
]

[dev-dependencies]
Expand Down
17 changes: 12 additions & 5 deletions crates/telemetry-subscribers/src/file_exporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,15 @@
// SPDX-License-Identifier: Apache-2.0

use futures::{future::BoxFuture, FutureExt};
use opentelemetry::sdk::export::trace::{ExportResult, SpanData, SpanExporter};
use opentelemetry::trace::TraceError;
use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest;
use opentelemetry_proto::{
tonic::collector::trace::v1::ExportTraceServiceRequest,
transform::{
common::tonic::ResourceAttributesWithSchema,
trace::tonic::group_spans_by_resource_and_scope,
},
};
use opentelemetry_sdk::export::trace::{ExportResult, SpanData, SpanExporter};
use prost::Message;
use std::fs::OpenOptions;
use std::io::Write;
Expand Down Expand Up @@ -70,26 +76,27 @@ impl CachedOpenFile {
#[derive(Debug)]
pub(crate) struct FileExporter {
pub cached_open_file: CachedOpenFile,
resource: ResourceAttributesWithSchema,
}

impl FileExporter {
pub fn new(file_path: Option<PathBuf>) -> std::io::Result<Self> {
Ok(Self {
cached_open_file: CachedOpenFile::new(file_path)?,
resource: ResourceAttributesWithSchema::default(),
})
}
}

impl SpanExporter for FileExporter {
fn export(&mut self, batch: Vec<SpanData>) -> BoxFuture<'static, ExportResult> {
let cached_open_file = self.cached_open_file.clone();
let resource_spans = group_spans_by_resource_and_scope(batch, &self.resource);
async move {
cached_open_file
.with_file(|maybe_file| {
if let Some(file) = maybe_file {
let request = ExportTraceServiceRequest {
resource_spans: batch.into_iter().map(Into::into).collect(),
};
let request = ExportTraceServiceRequest { resource_spans };

let buf = request.encode_length_delimited_to_vec();

Expand Down
28 changes: 14 additions & 14 deletions crates/telemetry-subscribers/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,18 @@
use atomic_float::AtomicF64;
use crossterm::tty::IsTty;
use once_cell::sync::Lazy;
use opentelemetry::sdk::trace::Sampler;
use opentelemetry::sdk::{
use opentelemetry::{
trace::{Link, SamplingResult, SpanKind, TraceId, TracerProvider as _},
Context, KeyValue,
};
use opentelemetry_otlp::WithExportConfig;
use opentelemetry_sdk::trace::Sampler;
use opentelemetry_sdk::{
self, runtime,
trace::{BatchSpanProcessor, ShouldSample, TracerProvider},
Resource,
};
use opentelemetry::trace::TracerProvider as _;
use opentelemetry_api::{
trace::{Link, SamplingResult, SpanKind, TraceId},
Context, Key, OrderMap, Value,
};
use opentelemetry_otlp::WithExportConfig;
use span_latency_prom::PrometheusSpanLatencyLayer;
use std::collections::hash_map::RandomState;
use std::path::PathBuf;
use std::time::Duration;
use std::{
Expand Down Expand Up @@ -385,7 +383,7 @@ impl TelemetryConfig {
if config.enable_otlp_tracing {
let trace_file = env::var("TRACE_FILE").ok();

let config = sdk::trace::config()
let config = opentelemetry_sdk::trace::Config::default()
.with_resource(Resource::new(vec![opentelemetry::KeyValue::new(
"service.name",
service_name.clone(),
Expand Down Expand Up @@ -413,23 +411,25 @@ impl TelemetryConfig {
let endpoint = env::var("OTLP_ENDPOINT")
.unwrap_or_else(|_| "http://localhost:4317".to_string());

let tracer = opentelemetry_otlp::new_pipeline()
let p = opentelemetry_otlp::new_pipeline()
.tracing()
.with_exporter(
opentelemetry_otlp::new_exporter()
.tonic()
.with_endpoint(endpoint),
)
.with_trace_config(config)
.install_batch(sdk::runtime::Tokio)
.install_batch(runtime::Tokio)
.expect("Could not create async Tracer");

let tracer = p.tracer(service_name);

tracing_opentelemetry::layer().with_tracer(tracer)
};

// Enable Trace Contexts for tying spans together
opentelemetry::global::set_text_map_propagator(
opentelemetry::sdk::propagation::TraceContextPropagator::new(),
opentelemetry_sdk::propagation::TraceContextPropagator::new(),
);

let trace_env_filter = EnvFilter::try_from_env("TRACE_FILTER").unwrap();
Expand Down Expand Up @@ -517,7 +517,7 @@ impl ShouldSample for SamplingFilter {
trace_id: TraceId,
name: &str,
span_kind: &SpanKind,
attributes: &OrderMap<Key, Value, RandomState>,
attributes: &[KeyValue],
links: &[Link],
) -> SamplingResult {
let sample_rate = self.sample_rate.load(Ordering::Relaxed);
Expand Down
4 changes: 2 additions & 2 deletions scripts/simtest/cargo-simtest
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,9 @@ if [ -n "$LOCAL_MSIM_PATH" ]; then
else
cargo_patch_args=(
--config 'patch.crates-io.tokio.git = "https://github.com/MystenLabs/mysten-sim.git"'
--config 'patch.crates-io.tokio.rev = "b320996d8dfb99b273fe31c0222c659332283c99"'
--config 'patch.crates-io.tokio.rev = "9c6636c399d5c60a1759f1670b1c07b3d408799a"'
--config 'patch.crates-io.futures-timer.git = "https://github.com/MystenLabs/mysten-sim.git"'
--config 'patch.crates-io.futures-timer.rev = "b320996d8dfb99b273fe31c0222c659332283c99"'
--config 'patch.crates-io.futures-timer.rev = "9c6636c399d5c60a1759f1670b1c07b3d408799a"'
)
fi

Expand Down
4 changes: 2 additions & 2 deletions scripts/simtest/config-patch
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,5 @@ index c0829bc1b6..4007f97d66 100644
include_dir = "0.7.3"

[patch.crates-io]
+tokio = { git = "https://github.com/MystenLabs/mysten-sim.git", rev = "b320996d8dfb99b273fe31c0222c659332283c99" }
+futures-timer = { git = "https://github.com/MystenLabs/mysten-sim.git", rev = "b320996d8dfb99b273fe31c0222c659332283c99" }
+tokio = { git = "https://github.com/MystenLabs/mysten-sim.git", rev = "9c6636c399d5c60a1759f1670b1c07b3d408799a" }
+futures-timer = { git = "https://github.com/MystenLabs/mysten-sim.git", rev = "9c6636c399d5c60a1759f1670b1c07b3d408799a" }

0 comments on commit 928b981

Please sign in to comment.