Skip to content

Commit

Permalink
fix(subscriber): record timestamps for updates last
Browse files Browse the repository at this point in the history
## Motivation

Currently, when constructing an update message to send over the wire, a
timestamp is taken first, and then the protobuf data is constructed.
This can lead to issues where the "now" timestamp is actually _before_
timestamps present in the stats sent in that update, since the stats for
a particular task/resource/async op might be updated on another thread
after taking the update's "now" timestamp. This results in issues like
#266.

## Solution

There's no actual reason to take those timestamps *before* we assemble
the update. This branch changes the aggregator to build all the various
data updates in an update message, and *then* record the update's "now"
timestamp. Any timestamps for tasks/resources/async ops that are
recorded after the update's "now" timestamp will now be included in the
*next* update.

While I was making this change, I also did a little refactoring. I
factored out the code that's shared between constructing the initial
update and subsequent updates into methods that are called with an
`Include` enum, so we can reuse them when building the first update for
a subscription and when building subsequent updates in `publish`. IMO
this makes the code much more readable.

Fixes #266
Depends on #288
  • Loading branch information
hawkw committed Feb 17, 2022
1 parent 09f9ec5 commit f47ace1
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 71 deletions.
18 changes: 18 additions & 0 deletions console-subscriber/src/aggregator/id_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ pub(crate) struct IdData<T> {
data: ShrinkMap<Id, T>,
}

#[derive(Copy, Clone, Eq, PartialEq)]
pub(crate) enum Include {
All,
UpdatedOnly,
Expand Down Expand Up @@ -45,6 +46,23 @@ impl<T: Unsent> IdData<T> {
self.data.get(id)
}

pub(crate) fn as_proto_list(
&mut self,
include: Include,
base_time: &TimeAnchor,
) -> Vec<T::Output>
where
T: ToProto,
{
match include {
Include::UpdatedOnly => self
.since_last_update()
.map(|(_, d)| d.to_proto(&base_time))
.collect(),
Include::All => self.all().map(|(_, d)| d.to_proto(&base_time)).collect(),
}
}

pub(crate) fn as_proto(
&mut self,
include: Include,
Expand Down
117 changes: 46 additions & 71 deletions console-subscriber/src/aggregator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -260,48 +260,57 @@ impl Aggregator {
/// Add the task subscription to the watchers after sending the first update
fn add_instrument_subscription(&mut self, subscription: Watch<proto::instrument::Update>) {
tracing::debug!("new instrument subscription");

let task_update = Some(self.task_update(Include::All));
let resource_update = Some(self.resource_update(Include::All));
let async_op_update = Some(self.async_op_update(Include::All));
let now = Instant::now();
// Send the initial state --- if this fails, the subscription is already dead

let update = &proto::instrument::Update {
task_update: Some(proto::tasks::TaskUpdate {
new_tasks: self
.tasks
.all()
.map(|(_, value)| value.to_proto(&self.base_time))
.collect(),
stats_update: self.task_stats.as_proto(Include::All, &self.base_time),
dropped_events: self.shared.dropped_tasks.swap(0, AcqRel) as u64,
}),
resource_update: Some(proto::resources::ResourceUpdate {
new_resources: self
.resources
.all()
.map(|(_, value)| value.to_proto(&self.base_time))
.collect(),
stats_update: self.resource_stats.as_proto(Include::All, &self.base_time),
new_poll_ops: (*self.all_poll_ops).clone(),
dropped_events: self.shared.dropped_resources.swap(0, AcqRel) as u64,
}),
async_op_update: Some(proto::async_ops::AsyncOpUpdate {
new_async_ops: self
.async_ops
.all()
.map(|(_, value)| value.to_proto(&self.base_time))
.collect(),
stats_update: self.async_op_stats.as_proto(Include::All, &self.base_time),
dropped_events: self.shared.dropped_async_ops.swap(0, AcqRel) as u64,
}),
task_update,
resource_update,
async_op_update,
now: Some(self.base_time.to_timestamp(now)),
new_metadata: Some(proto::RegisterMetadata {
metadata: (*self.all_metadata).clone(),
}),
};

// Send the initial state --- if this fails, the subscription is already dead
if subscription.update(update) {
self.watchers.push(subscription)
}
}

fn task_update(&mut self, include: Include) -> proto::tasks::TaskUpdate {
proto::tasks::TaskUpdate {
new_tasks: self.tasks.as_proto_list(include, &self.base_time),
stats_update: self.task_stats.as_proto(include, &self.base_time),
dropped_events: self.shared.dropped_tasks.swap(0, AcqRel) as u64,
}
}

fn resource_update(&mut self, include: Include) -> proto::resources::ResourceUpdate {
let new_poll_ops = match include {
Include::All => (*self.all_poll_ops).clone(),
Include::UpdatedOnly => std::mem::take(&mut self.new_poll_ops),
};
proto::resources::ResourceUpdate {
new_resources: self.resources.as_proto_list(include, &self.base_time),
stats_update: self.resource_stats.as_proto(include, &self.base_time),
new_poll_ops,
dropped_events: self.shared.dropped_resources.swap(0, AcqRel) as u64,
}
}

fn async_op_update(&mut self, include: Include) -> proto::async_ops::AsyncOpUpdate {
proto::async_ops::AsyncOpUpdate {
new_async_ops: self.async_ops.as_proto_list(include, &self.base_time),
stats_update: self.async_op_stats.as_proto(include, &self.base_time),
dropped_events: self.shared.dropped_async_ops.swap(0, AcqRel) as u64,
}
}

/// Add the task details subscription to the watchers after sending the first update,
/// if the task is found.
fn add_task_detail_subscription(
Expand Down Expand Up @@ -348,50 +357,16 @@ impl Aggregator {
} else {
None
};
let task_update = Some(self.task_update(Include::UpdatedOnly));
let resource_update = Some(self.resource_update(Include::UpdatedOnly));
let async_op_update = Some(self.async_op_update(Include::UpdatedOnly));

let new_poll_ops = std::mem::take(&mut self.new_poll_ops);

let now = self.base_time.to_timestamp(Instant::now());
let update = proto::instrument::Update {
now: Some(now.clone()),
now: Some(self.base_time.to_timestamp(Instant::now())),
new_metadata,
task_update: Some(proto::tasks::TaskUpdate {
new_tasks: self
.tasks
.since_last_update()
.map(|(_, value)| value.to_proto(&self.base_time))
.collect(),
stats_update: self
.task_stats
.as_proto(Include::UpdatedOnly, &self.base_time),

dropped_events: self.shared.dropped_tasks.swap(0, AcqRel) as u64,
}),
resource_update: Some(proto::resources::ResourceUpdate {
new_resources: self
.resources
.since_last_update()
.map(|(_, value)| value.to_proto(&self.base_time))
.collect(),
stats_update: self
.resource_stats
.as_proto(Include::UpdatedOnly, &self.base_time),
new_poll_ops,

dropped_events: self.shared.dropped_resources.swap(0, AcqRel) as u64,
}),
async_op_update: Some(proto::async_ops::AsyncOpUpdate {
new_async_ops: self
.async_ops
.since_last_update()
.map(|(_, value)| value.to_proto(&self.base_time))
.collect(),
stats_update: self
.async_op_stats
.as_proto(Include::UpdatedOnly, &self.base_time),

dropped_events: self.shared.dropped_async_ops.swap(0, AcqRel) as u64,
}),
task_update,
resource_update,
async_op_update,
};

self.watchers
Expand All @@ -404,7 +379,7 @@ impl Aggregator {
if let Some(task_stats) = stats.get(id) {
let details = proto::tasks::TaskDetails {
task_id: Some(id.clone().into()),
now: Some(now.clone()),
now: Some(self.base_time.to_timestamp(Instant::now())),
poll_times_histogram: task_stats.serialize_histogram(),
};
watchers.retain(|watch| watch.update(&details));
Expand Down

0 comments on commit f47ace1

Please sign in to comment.