Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(subscriber): use monotonic Instants for all timestamps #288

Merged
merged 3 commits into from
Feb 17, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions console-subscriber/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ hdrhistogram = { version = "7.3.0", default-features = false, features = ["seria
# feature to also enable `tracing-subscriber`'s parking_lot feature flag.
parking_lot_crate = { package = "parking_lot", version = "0.11", optional = true }
humantime = "2.1.0"
prost-types = "0.9.0"

# Required for recording:
serde = { version = "1", features = ["derive"] }
Expand Down
18 changes: 11 additions & 7 deletions console-subscriber/src/aggregator/id_data.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use super::{shrink::ShrinkMap, Id, ToProto};
use crate::stats::{DroppedAt, Unsent};
use crate::stats::{DroppedAt, TimeAnchor, Unsent};
use std::collections::HashMap;
use std::time::{Duration, SystemTime};
use std::time::{Duration, Instant};

pub(crate) struct IdData<T> {
data: ShrinkMap<Id, T>,
Expand Down Expand Up @@ -45,26 +45,30 @@ impl<T: Unsent> IdData<T> {
self.data.get(id)
}

pub(crate) fn as_proto(&mut self, include: Include) -> HashMap<u64, T::Output>
pub(crate) fn as_proto(
&mut self,
include: Include,
base_time: &TimeAnchor,
) -> HashMap<u64, T::Output>
where
T: ToProto,
{
match include {
Include::UpdatedOnly => self
.since_last_update()
.map(|(id, d)| (id.into_u64(), d.to_proto()))
.map(|(id, d)| (id.into_u64(), d.to_proto(&base_time)))
.collect(),
Include::All => self
.all()
.map(|(id, d)| (id.into_u64(), d.to_proto()))
.map(|(id, d)| (id.into_u64(), d.to_proto(&base_time)))
.collect(),
}
}

pub(crate) fn drop_closed<R: DroppedAt + Unsent>(
&mut self,
stats: &mut IdData<R>,
now: SystemTime,
now: Instant,
retention: Duration,
has_watchers: bool,
) {
Expand All @@ -80,7 +84,7 @@ impl<T: Unsent> IdData<T> {

stats.data.retain_and_shrink(|id, stats| {
if let Some(dropped_at) = stats.dropped_at() {
let dropped_for = now.duration_since(dropped_at).unwrap_or_default();
let dropped_for = now.checked_duration_since(dropped_at).unwrap_or_default();
let dirty = stats.is_unsent();
let should_drop =
// if there are any clients watching, retain all dirty tasks regardless of age
Expand Down
56 changes: 34 additions & 22 deletions console-subscriber/src/aggregator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use std::{
atomic::{AtomicBool, Ordering::*},
Arc,
},
time::{Duration, SystemTime},
time::{Duration, Instant, SystemTime},
};
use tracing_core::{span::Id, Metadata};

Expand Down Expand Up @@ -86,6 +86,10 @@ pub(crate) struct Aggregator {

/// The time "state" of the aggregator, such as paused or live.
temporality: Temporality,

/// Used to anchor monotonic timestamps to a base `SystemTime`, to produce a
/// timestamp that can be sent over the wire.
base_time: stats::TimeAnchor,
}

#[derive(Debug, Default)]
Expand Down Expand Up @@ -135,6 +139,7 @@ impl Aggregator {
rpcs: mpsc::Receiver<Command>,
builder: &crate::Builder,
shared: Arc<crate::Shared>,
base_time: stats::TimeAnchor,
) -> Self {
Self {
shared,
Expand All @@ -155,6 +160,7 @@ impl Aggregator {
all_poll_ops: Default::default(),
new_poll_ops: Default::default(),
temporality: Temporality::Live,
base_time,
}
}

Expand Down Expand Up @@ -241,7 +247,7 @@ impl Aggregator {
fn cleanup_closed(&mut self) {
// drop all closed have that has completed *and* whose final data has already
// been sent off.
let now = SystemTime::now();
let now = Instant::now();
let has_watchers = !self.watchers.is_empty();
self.tasks
.drop_closed(&mut self.task_stats, now, self.retention, has_watchers);
Expand All @@ -254,38 +260,38 @@ impl Aggregator {
/// Add the task subscription to the watchers after sending the first update
fn add_instrument_subscription(&mut self, subscription: Watch<proto::instrument::Update>) {
tracing::debug!("new instrument subscription");
let now = SystemTime::now();
let now = Instant::now();
// Send the initial state --- if this fails, the subscription is already dead
let update = &proto::instrument::Update {
task_update: Some(proto::tasks::TaskUpdate {
new_tasks: self
.tasks
.all()
.map(|(_, value)| value.to_proto())
.map(|(_, value)| value.to_proto(&self.base_time))
.collect(),
stats_update: self.task_stats.as_proto(Include::All),
stats_update: self.task_stats.as_proto(Include::All, &self.base_time),
dropped_events: self.shared.dropped_tasks.swap(0, AcqRel) as u64,
}),
resource_update: Some(proto::resources::ResourceUpdate {
new_resources: self
.resources
.all()
.map(|(_, value)| value.to_proto())
.map(|(_, value)| value.to_proto(&self.base_time))
.collect(),
stats_update: self.resource_stats.as_proto(Include::All),
stats_update: self.resource_stats.as_proto(Include::All, &self.base_time),
new_poll_ops: (*self.all_poll_ops).clone(),
dropped_events: self.shared.dropped_resources.swap(0, AcqRel) as u64,
}),
async_op_update: Some(proto::async_ops::AsyncOpUpdate {
new_async_ops: self
.async_ops
.all()
.map(|(_, value)| value.to_proto())
.map(|(_, value)| value.to_proto(&self.base_time))
.collect(),
stats_update: self.async_op_stats.as_proto(Include::All),
stats_update: self.async_op_stats.as_proto(Include::All, &self.base_time),
dropped_events: self.shared.dropped_async_ops.swap(0, AcqRel) as u64,
}),
now: Some(now.into()),
now: Some(self.base_time.to_timestamp(now)),
new_metadata: Some(proto::RegisterMetadata {
metadata: (*self.all_metadata).clone(),
}),
Expand Down Expand Up @@ -345,27 +351,31 @@ impl Aggregator {

let new_poll_ops = std::mem::take(&mut self.new_poll_ops);

let now = SystemTime::now();
let now = self.base_time.to_timestamp(Instant::now());
let update = proto::instrument::Update {
now: Some(now.into()),
now: Some(now.clone()),
new_metadata,
task_update: Some(proto::tasks::TaskUpdate {
new_tasks: self
.tasks
.since_last_update()
.map(|(_, value)| value.to_proto())
.map(|(_, value)| value.to_proto(&self.base_time))
.collect(),
stats_update: self.task_stats.as_proto(Include::UpdatedOnly),
stats_update: self
.task_stats
.as_proto(Include::UpdatedOnly, &self.base_time),

dropped_events: self.shared.dropped_tasks.swap(0, AcqRel) as u64,
}),
resource_update: Some(proto::resources::ResourceUpdate {
new_resources: self
.resources
.since_last_update()
.map(|(_, value)| value.to_proto())
.map(|(_, value)| value.to_proto(&self.base_time))
.collect(),
stats_update: self.resource_stats.as_proto(Include::UpdatedOnly),
stats_update: self
.resource_stats
.as_proto(Include::UpdatedOnly, &self.base_time),
new_poll_ops,

dropped_events: self.shared.dropped_resources.swap(0, AcqRel) as u64,
Expand All @@ -374,9 +384,11 @@ impl Aggregator {
new_async_ops: self
.async_ops
.since_last_update()
.map(|(_, value)| value.to_proto())
.map(|(_, value)| value.to_proto(&self.base_time))
.collect(),
stats_update: self.async_op_stats.as_proto(Include::UpdatedOnly),
stats_update: self
.async_op_stats
.as_proto(Include::UpdatedOnly, &self.base_time),

dropped_events: self.shared.dropped_async_ops.swap(0, AcqRel) as u64,
}),
Expand All @@ -392,7 +404,7 @@ impl Aggregator {
if let Some(task_stats) = stats.get(id) {
let details = proto::tasks::TaskDetails {
task_id: Some(id.clone().into()),
now: Some(now.into()),
now: Some(now.clone()),
poll_times_histogram: task_stats.serialize_histogram(),
};
watchers.retain(|watch| watch.update(&details));
Expand Down Expand Up @@ -545,7 +557,7 @@ impl<T: Clone> Watch<T> {
impl ToProto for Task {
type Output = proto::tasks::Task;

fn to_proto(&self) -> Self::Output {
fn to_proto(&self, _: &stats::TimeAnchor) -> Self::Output {
proto::tasks::Task {
id: Some(self.id.clone().into()),
// TODO: more kinds of tasks...
Expand All @@ -571,7 +583,7 @@ impl Unsent for Task {
impl ToProto for Resource {
type Output = proto::resources::Resource;

fn to_proto(&self) -> Self::Output {
fn to_proto(&self, _: &stats::TimeAnchor) -> Self::Output {
proto::resources::Resource {
id: Some(self.id.clone().into()),
parent_resource_id: self.parent_id.clone().map(Into::into),
Expand All @@ -597,7 +609,7 @@ impl Unsent for Resource {
impl ToProto for AsyncOp {
type Output = proto::async_ops::AsyncOp;

fn to_proto(&self) -> Self::Output {
fn to_proto(&self, _: &stats::TimeAnchor) -> Self::Output {
proto::async_ops::AsyncOp {
id: Some(self.id.clone().into()),
metadata: Some(self.metadata.into()),
Expand Down
Loading