Skip to content

Commit

Permalink
Directly publish start times from heavy_hitters.
Browse files Browse the repository at this point in the history
[ci skip-build-wheels]
  • Loading branch information
stuhood committed Nov 3, 2021
1 parent ff9ca61 commit 7e17fac
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 35 deletions.
35 changes: 28 additions & 7 deletions src/rust/engine/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

17 changes: 8 additions & 9 deletions src/rust/engine/ui/src/console_ui.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,8 +163,7 @@ impl ConsoleUI {
}

// Start any new items.
let now = SystemTime::now();
for (span_id, (description, duration)) in heavy_hitters {
for (span_id, (description, start_time)) in heavy_hitters {
if tasks_to_display.contains_key(&span_id) {
// We're already rendering this item, and our dynamic `unit` instance will continue to
// handle rendering elapsed time for it.
Expand All @@ -188,7 +187,7 @@ impl ConsoleUI {
None,
Some(prodash::unit::dynamic(MillisAsFloatingPointSecs)),
);
item.set(MillisAsFloatingPointSecs::duration_to_step(&now, duration));
item.set(MillisAsFloatingPointSecs::start_time_to_step(&start_time));
tasks_to_display.insert(span_id, item);
}
}
Expand All @@ -213,12 +212,14 @@ impl ConsoleUI {
/// If the ConsoleUI is running, completes it.
///
pub async fn teardown(&mut self) {
if let Some(instance) = self.instance.take() {
if let Some(mut instance) = self.instance.take() {
// Drop all tasks to clear the Tree. The call to shutdown will render a final "Tick" with the
// empty Tree, which will clear the screen.
instance.tasks_to_display.clear();
instance
.executor
.clone()
.spawn_blocking(move || {
// TODO: Necessary to do this on Drop as well? Or do we guarantee that teardown is called?
instance.handle.shutdown_and_wait();
})
.await
Expand All @@ -239,10 +240,8 @@ struct Instance {
struct MillisAsFloatingPointSecs;

impl MillisAsFloatingPointSecs {
/// Computes a static Step value from the given Duration by converting it to "millis-since-epoch".
fn duration_to_step(now: &SystemTime, duration: Option<Duration>) -> Step {
// TODO: Use workunit start SystemTimes directly rather than calculating them.
let start_time = duration.and_then(|d| now.checked_sub(d)).unwrap_or(*now);
/// Computes a static Step from the given start time by converting it to "millis-since-epoch".
fn start_time_to_step(start_time: &SystemTime) -> Step {
start_time.duration_since(UNIX_EPOCH).unwrap().as_millis() as usize
}
}
Expand Down
49 changes: 30 additions & 19 deletions src/rust/engine/workunit_store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#![allow(clippy::mutex_atomic)]

use std::cell::RefCell;
use std::cmp::Reverse;
use std::collections::hash_map::Entry;
use std::collections::{BinaryHeap, HashMap};
use std::future::Future;
Expand Down Expand Up @@ -421,23 +422,24 @@ impl HeavyHittersData {
}
}

fn heavy_hitters(&self, k: usize) -> HashMap<SpanId, (String, Option<Duration>)> {
fn heavy_hitters(&self, k: usize) -> HashMap<SpanId, (String, SystemTime)> {
self.refresh_store();

let now = SystemTime::now();
let inner = self.inner.lock();

// Initialize the heap with the leaves of the running workunit graph.
let mut queue: BinaryHeap<(Duration, SpanId)> = inner
// Initialize the heap with the leaves of the running workunit graph, sorted oldest first.
let mut queue: BinaryHeap<(Reverse<SystemTime>, SpanId)> = inner
.running_graph
.externals(petgraph::Direction::Outgoing)
.map(|entry| inner.running_graph[entry])
.flat_map(|span_id: SpanId| {
let workunit: Option<&Workunit> = inner.workunit_records.get(&span_id);
match workunit {
Some(workunit) if !workunit.state.blocked() => {
Self::duration_for(now, workunit).map(|d| (d, span_id))
}
let workunit: &Workunit = inner.workunit_records.get(&span_id)?;
match workunit.state {
WorkunitState::Started {
ref blocked,
start_time,
..
} if !blocked.load(atomic::Ordering::Relaxed) => Some((Reverse(start_time), span_id)),
_ => None,
}
})
Expand All @@ -447,17 +449,21 @@ impl HeavyHittersData {
let mut res = HashMap::new();
while let Some((_dur, span_id)) = queue.pop() {
// If the leaf is visible or has a visible parent, emit it.
if let Some(span_id) = first_matched_parent(
let parent_span_id = if let Some(span_id) = first_matched_parent(
&inner.workunit_records,
Some(span_id),
|wu| wu.state.completed(),
Self::is_visible,
) {
let workunit = inner.workunit_records.get(&span_id).unwrap();
if let Some(effective_name) = workunit.metadata.desc.as_ref() {
let maybe_duration = Self::duration_for(now, workunit);

res.insert(span_id, (effective_name.to_string(), maybe_duration));
span_id
} else {
continue;
};

let workunit = inner.workunit_records.get(&parent_span_id).unwrap();
if let Some(effective_name) = workunit.metadata.desc.as_ref() {
if let Some(start_time) = Self::start_time_for(workunit) {
res.insert(parent_span_id, (effective_name.to_string(), start_time));
if res.len() >= k {
break;
}
Expand Down Expand Up @@ -512,12 +518,16 @@ impl HeavyHittersData {
&& matches!(workunit.state, WorkunitState::Started { .. })
}

fn duration_for(now: SystemTime, workunit: &Workunit) -> Option<Duration> {
fn start_time_for(workunit: &Workunit) -> Option<SystemTime> {
match workunit.state {
WorkunitState::Started { ref start_time, .. } => now.duration_since(*start_time).ok(),
WorkunitState::Started { start_time, .. } => Some(start_time),
_ => None,
}
}

fn duration_for(now: SystemTime, workunit: &Workunit) -> Option<Duration> {
now.duration_since(Self::start_time_for(workunit)?).ok()
}
}

#[derive(Default)]
Expand Down Expand Up @@ -582,9 +592,10 @@ impl WorkunitStore {
}

///
/// Find the longest running leaf workunits, and return their first visible parents.
/// Find the longest running leaf workunits, and return the description and start time of their
/// first visible parents.
///
pub fn heavy_hitters(&self, k: usize) -> HashMap<SpanId, (String, Option<Duration>)> {
pub fn heavy_hitters(&self, k: usize) -> HashMap<SpanId, (String, SystemTime)> {
self.heavy_hitters_data.heavy_hitters(k)
}

Expand Down

0 comments on commit 7e17fac

Please sign in to comment.