Skip to content

Commit

Permalink
fix(subscriber): use monotonic Instants for all timestamps
Browse files Browse the repository at this point in the history
## Motivation

Currently, the `console-subscriber` crate records all timestamps as
`SystemTime`s. This is because they are eventually sent over the wire as
protobuf `Timestamp`s, which can be constructed from a `SystemTime`.
They cannot be constructed from `Instant`s, because `Instant` is opaque,
and does not expose access to the underlying OS time.

However, using `SystemTime` is not really correct for our use case. We
only use timestamps for calculating durations; we only have to serialize
them because some durations are calculated in the console UI rather than
in-process. We *don't* need timestamps that are globally consistent with
a shared timebase, but we *do* need monotonicity --- using `SystemTime`
leaves us vulnerable to clock skew, if (for example), an NTP clock skew
adjustment causes the system clock to run backwards far enough that a
poll appears to end "before" it started (as in issue #286). If we were
using monotonic `Instant`s, all polls should always have positive
durations, but with `SystemTime`s, this isn't necessarily the case.

Furthermore, `Instant::now()` may have less performance overhead than
`SystemTime::now()`, at least on some platforms.

## Solution

This branch changes `console-subscriber` to always take timestamps using
`Instant::now()` rather than using `SystemTime::now()`, and store all
timestamps as `Instant`s. In order to convert these `Instant`s into
`SystemTime`s that can be sent over the wire, we construct a reference
`TimeAnchor`, consisting of a paired `Instant` and `SystemTime` recorded
at the same time when the `ConsoleLayer` is constructed. We can then
construct "system times" that are monotonic, by calculating the duration
between a given `Instant` and the anchor `Instant`, and adding that
duration to the anchor `SystemTime`. These are not *real* system
timestamps, as they will never run backwards if the system clock is
adjusted; they are relative only to the base process start time as
recorded by the anchor. However, they *are* monotonic, and all durations
calculated from them will be reasonable.

This is part of the change I proposed in #254. I'm not going to close
that issue yet, though, as it also described potentially switching to
use the `quanta` crate rather than `std::time::Instant` to reduce the
overhead of recording monotonic timestamps.

Fixes #286
  • Loading branch information
hawkw committed Feb 17, 2022
1 parent 24db8c6 commit 1d9a348
Show file tree
Hide file tree
Showing 6 changed files with 167 additions and 105 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions console-subscriber/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ hdrhistogram = { version = "7.3.0", default-features = false, features = ["seria
# feature to also enable `tracing-subscriber`'s parking_lot feature flag.
parking_lot_crate = { package = "parking_lot", version = "0.11", optional = true }
humantime = "2.1.0"
prost-types = "0.9.0"

# Required for recording:
serde = { version = "1", features = ["derive"] }
Expand Down
18 changes: 11 additions & 7 deletions console-subscriber/src/aggregator/id_data.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use super::{shrink::ShrinkMap, Id, ToProto};
use crate::stats::{DroppedAt, Unsent};
use crate::stats::{DroppedAt, TimeAnchor, Unsent};
use std::collections::HashMap;
use std::time::{Duration, SystemTime};
use std::time::{Duration, Instant};

pub(crate) struct IdData<T> {
data: ShrinkMap<Id, T>,
Expand Down Expand Up @@ -45,26 +45,30 @@ impl<T: Unsent> IdData<T> {
self.data.get(id)
}

pub(crate) fn as_proto(&mut self, include: Include) -> HashMap<u64, T::Output>
pub(crate) fn as_proto(
&mut self,
include: Include,
base_time: &TimeAnchor,
) -> HashMap<u64, T::Output>
where
T: ToProto,
{
match include {
Include::UpdatedOnly => self
.since_last_update()
.map(|(id, d)| (id.into_u64(), d.to_proto()))
.map(|(id, d)| (id.into_u64(), d.to_proto(&base_time)))
.collect(),
Include::All => self
.all()
.map(|(id, d)| (id.into_u64(), d.to_proto()))
.map(|(id, d)| (id.into_u64(), d.to_proto(&base_time)))
.collect(),
}
}

pub(crate) fn drop_closed<R: DroppedAt + Unsent>(
&mut self,
stats: &mut IdData<R>,
now: SystemTime,
now: Instant,
retention: Duration,
has_watchers: bool,
) {
Expand All @@ -80,7 +84,7 @@ impl<T: Unsent> IdData<T> {

stats.data.retain_and_shrink(|id, stats| {
if let Some(dropped_at) = stats.dropped_at() {
let dropped_for = now.duration_since(dropped_at).unwrap_or_default();
let dropped_for = now.checked_duration_since(dropped_at).unwrap_or_default();
let dirty = stats.is_unsent();
let should_drop =
// if there are any clients watching, retain all dirty tasks regardless of age
Expand Down
56 changes: 34 additions & 22 deletions console-subscriber/src/aggregator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use std::{
atomic::{AtomicBool, Ordering::*},
Arc,
},
time::{Duration, SystemTime},
time::{Duration, Instant, SystemTime},
};
use tracing_core::{span::Id, Metadata};

Expand Down Expand Up @@ -86,6 +86,10 @@ pub(crate) struct Aggregator {

/// The time "state" of the aggregator, such as paused or live.
temporality: Temporality,

/// Used to anchor monotonic timestamps to a base `SystemTime`, to produce a
/// timestamp that can be sent over the wire.
base_time: stats::TimeAnchor,
}

#[derive(Debug, Default)]
Expand Down Expand Up @@ -135,6 +139,7 @@ impl Aggregator {
rpcs: mpsc::Receiver<Command>,
builder: &crate::Builder,
shared: Arc<crate::Shared>,
base_time: stats::TimeAnchor,
) -> Self {
Self {
shared,
Expand All @@ -155,6 +160,7 @@ impl Aggregator {
all_poll_ops: Default::default(),
new_poll_ops: Default::default(),
temporality: Temporality::Live,
base_time,
}
}

Expand Down Expand Up @@ -241,7 +247,7 @@ impl Aggregator {
fn cleanup_closed(&mut self) {
// drop all closed have that has completed *and* whose final data has already
// been sent off.
let now = SystemTime::now();
let now = Instant::now();
let has_watchers = !self.watchers.is_empty();
self.tasks
.drop_closed(&mut self.task_stats, now, self.retention, has_watchers);
Expand All @@ -254,38 +260,38 @@ impl Aggregator {
/// Add the task subscription to the watchers after sending the first update
fn add_instrument_subscription(&mut self, subscription: Watch<proto::instrument::Update>) {
tracing::debug!("new instrument subscription");
let now = SystemTime::now();
let now = Instant::now();
// Send the initial state --- if this fails, the subscription is already dead
let update = &proto::instrument::Update {
task_update: Some(proto::tasks::TaskUpdate {
new_tasks: self
.tasks
.all()
.map(|(_, value)| value.to_proto())
.map(|(_, value)| value.to_proto(&self.base_time))
.collect(),
stats_update: self.task_stats.as_proto(Include::All),
stats_update: self.task_stats.as_proto(Include::All, &self.base_time),
dropped_events: self.shared.dropped_tasks.swap(0, AcqRel) as u64,
}),
resource_update: Some(proto::resources::ResourceUpdate {
new_resources: self
.resources
.all()
.map(|(_, value)| value.to_proto())
.map(|(_, value)| value.to_proto(&self.base_time))
.collect(),
stats_update: self.resource_stats.as_proto(Include::All),
stats_update: self.resource_stats.as_proto(Include::All, &self.base_time),
new_poll_ops: (*self.all_poll_ops).clone(),
dropped_events: self.shared.dropped_resources.swap(0, AcqRel) as u64,
}),
async_op_update: Some(proto::async_ops::AsyncOpUpdate {
new_async_ops: self
.async_ops
.all()
.map(|(_, value)| value.to_proto())
.map(|(_, value)| value.to_proto(&self.base_time))
.collect(),
stats_update: self.async_op_stats.as_proto(Include::All),
stats_update: self.async_op_stats.as_proto(Include::All, &self.base_time),
dropped_events: self.shared.dropped_async_ops.swap(0, AcqRel) as u64,
}),
now: Some(now.into()),
now: Some(self.base_time.to_timestamp(now)),
new_metadata: Some(proto::RegisterMetadata {
metadata: (*self.all_metadata).clone(),
}),
Expand Down Expand Up @@ -345,27 +351,31 @@ impl Aggregator {

let new_poll_ops = std::mem::take(&mut self.new_poll_ops);

let now = SystemTime::now();
let now = self.base_time.to_timestamp(Instant::now());
let update = proto::instrument::Update {
now: Some(now.into()),
now: Some(now.clone()),
new_metadata,
task_update: Some(proto::tasks::TaskUpdate {
new_tasks: self
.tasks
.since_last_update()
.map(|(_, value)| value.to_proto())
.map(|(_, value)| value.to_proto(&self.base_time))
.collect(),
stats_update: self.task_stats.as_proto(Include::UpdatedOnly),
stats_update: self
.task_stats
.as_proto(Include::UpdatedOnly, &self.base_time),

dropped_events: self.shared.dropped_tasks.swap(0, AcqRel) as u64,
}),
resource_update: Some(proto::resources::ResourceUpdate {
new_resources: self
.resources
.since_last_update()
.map(|(_, value)| value.to_proto())
.map(|(_, value)| value.to_proto(&self.base_time))
.collect(),
stats_update: self.resource_stats.as_proto(Include::UpdatedOnly),
stats_update: self
.resource_stats
.as_proto(Include::UpdatedOnly, &self.base_time),
new_poll_ops,

dropped_events: self.shared.dropped_resources.swap(0, AcqRel) as u64,
Expand All @@ -374,9 +384,11 @@ impl Aggregator {
new_async_ops: self
.async_ops
.since_last_update()
.map(|(_, value)| value.to_proto())
.map(|(_, value)| value.to_proto(&self.base_time))
.collect(),
stats_update: self.async_op_stats.as_proto(Include::UpdatedOnly),
stats_update: self
.async_op_stats
.as_proto(Include::UpdatedOnly, &self.base_time),

dropped_events: self.shared.dropped_async_ops.swap(0, AcqRel) as u64,
}),
Expand All @@ -392,7 +404,7 @@ impl Aggregator {
if let Some(task_stats) = stats.get(id) {
let details = proto::tasks::TaskDetails {
task_id: Some(id.clone().into()),
now: Some(now.into()),
now: Some(now.clone()),
poll_times_histogram: task_stats.serialize_histogram(),
};
watchers.retain(|watch| watch.update(&details));
Expand Down Expand Up @@ -545,7 +557,7 @@ impl<T: Clone> Watch<T> {
impl ToProto for Task {
type Output = proto::tasks::Task;

fn to_proto(&self) -> Self::Output {
fn to_proto(&self, _: &stats::TimeAnchor) -> Self::Output {
proto::tasks::Task {
id: Some(self.id.clone().into()),
// TODO: more kinds of tasks...
Expand All @@ -571,7 +583,7 @@ impl Unsent for Task {
impl ToProto for Resource {
type Output = proto::resources::Resource;

fn to_proto(&self) -> Self::Output {
fn to_proto(&self, _: &stats::TimeAnchor) -> Self::Output {
proto::resources::Resource {
id: Some(self.id.clone().into()),
parent_resource_id: self.parent_id.clone().map(Into::into),
Expand All @@ -597,7 +609,7 @@ impl Unsent for Resource {
impl ToProto for AsyncOp {
type Output = proto::async_ops::AsyncOp;

fn to_proto(&self) -> Self::Output {
fn to_proto(&self, _: &stats::TimeAnchor) -> Self::Output {
proto::async_ops::AsyncOp {
id: Some(self.id.clone().into()),
metadata: Some(self.metadata.into()),
Expand Down
Loading

0 comments on commit 1d9a348

Please sign in to comment.