Skip to content

Commit

Permalink
Merge pull request #315 from NAICNO/larstha-314-faster-sonalyze
Browse files Browse the repository at this point in the history
For #314 (sonarlog) - Micro-optimize merge_streams
  • Loading branch information
lars-t-hansen authored Dec 18, 2023
2 parents 72a0788 + d066111 commit a1518a9
Show file tree
Hide file tree
Showing 4 changed files with 203 additions and 38 deletions.
3 changes: 3 additions & 0 deletions sonalyze/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,6 @@ json = "0.12.4"
cfg-if = "1.0.0"
subprocess = "0.2.9"
urlencoding = "2.1.3"

[profile.release]
debug = 1
8 changes: 8 additions & 0 deletions sonarlog/src/dates.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,14 @@ pub fn now() -> Timestamp {
Utc::now()
}

/// far_future: some timestamp not in any sample data.
///
/// Note the returned timestamp may contain non-zero subsecond data.

pub fn far_future() -> Timestamp {
now() + Duration::days(365)
}

/// Parse the date, which may contain a non-zero TZO, into a UTC timestamp.
///
/// Note the returned timestamp may contain non-zero subsecond data, if the input had subsecond
Expand Down
4 changes: 4 additions & 0 deletions sonarlog/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ pub use dates::epoch;

pub use dates::now;

// A time that should not be in any sample record.

pub use dates::far_future;

// Parse a &str into a Timestamp.

pub use dates::parse_timestamp;
Expand Down
226 changes: 188 additions & 38 deletions sonarlog/src/synthesize.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/// Helpers for merging sample streams.
use crate::{
combine_hosts, empty_gpuset, epoch, hosts, merge_gpu_status, now, union_gpuset, GpuStatus, InputStreamSet,
LogEntry, Timebound, Timebounds, Timestamp,
combine_hosts, empty_gpuset, epoch, far_future, hosts, merge_gpu_status, now, union_gpuset,
GpuStatus, InputStreamSet, LogEntry, Timebound, Timebounds, Timestamp,
};

use std::boxed::Box;
Expand Down Expand Up @@ -283,66 +283,215 @@ fn merge_streams(
// Generated records
let mut records = vec![];

// Some further observations about the input:
//
// Sonar uses the same timestamp for all the jobs seen during the same invocation (this is by
// design) and even with multi-node jobs the runs of cron will tend to be highly correlated.
// With records only having 1s resolution for the timestamp, even streams from different nodes
// will tend to have the same timestamp. Thus many streams may be the "earliest" stream at each
// time step, indeed, during normal operation it will be common that all the active streams have
// the same timestamp in their oldest unconsumed sample.
//
// At each time the number of active streams will be O(1) - basically proportional to the number
// of nodes in the cluster, which is constant. But the number of streams in a stream set will
// tend to be O(t) - proportional to the number of time steps covered by the set.
//
// As we move forward through time, we start in a situation where most streams are not started
// yet, then streams become live as we reach their starting point, and then become inactive
// again as we move past their end point and even the point where they are considered residually
// live.

// indices[i] has the index of the next element of stream[i]
let mut indices = [0].repeat(streams.len());

// Streams that have moved into the past have their indices[i] value set to STREAM_ENDED, this
// enables some fast filtering, described later.
const STREAM_ENDED: usize = 0xFFFFFFFF;

// selected holds the records selected by the second inner loop, we allocate it once.
let mut selected: Vec<&Box<LogEntry>> = vec![];
selected.reserve(streams.len());

// The following loop nest is O(t^2) and very performance-sensitive. The number of streams can
// be very large when running analyses over longer time ranges (month or longer). The common
// case is that the outer loop makes one iteration per time step and each inner loop loops over
// all the streams. The number of streams will tend to grow with the length of the time window
// (because new jobs are started and there is at least one stream per job), hence the total
// amount of work will tend to grow quadratically with time.
//
// Conditions have been ordered carefully and some have been added to reduce the number of tests
// and ensure quick exits. Computations have been hoisted or sunk to take them off hot paths.
// Conditions have been combined or avoided by introducing sentinel values (sentinel_time and
// STREAM_ENDED).
//
// There are additional tweaks here:
//
// First, the hot inner loops have tests that quickly skip expired streams (note the tests are
// expressed differently), and in addition, the variable `live` keeps track of the first stream
// that is definitely not expired, and loops start from this value. The reason we have both
// `live` and the fast test is that there may be expired streams following non-expired streams
// in the array of streams.
//
// Second, the second loop also very quickly skips unstarted streams.
//
// TODO: The inner loop counts could be reduced significantly if we could partition the streams
// array precisely into streams that are expired, current, and in the future. However, the
// current tests are very quick, and any scheme to introduce that partitioning must be very,
// very cheap, and benefits may not show until the number of inputs is exceptionally large
// (perhaps 90 or more days of data instead of the 30 days of data I've been testing with). In
// addition, attempts at implementing this partitioning have so far resulted in major slowdowns,
// possibly because the resulting code confuses bounds checking optimizations in the Rust
// compiler. This needs to be investigated further.

// The first stream that is known not to be expired.
let mut live = 0;

let sentinel_time = far_future();
loop {
// You'd think that it'd be better to have this loop down below where values are set to
// STREAM_ENDED, but empirically it's better to have it here. The difference is fairly
// pronounced.
while live < streams.len() && indices[live] == STREAM_ENDED {
live += 1;
}

// Loop across streams to find smallest head.
// smallest_stream is -1 or the index of the stream with the smallest head
let mut smallest_stream = 0;
let mut have_smallest = false;
for i in 0..streams.len() {
let mut min_time = sentinel_time;
for i in live..streams.len() {
if indices[i] >= streams[i].len() {
continue;
}
// stream[i] has a value, select this stream if we have no stream or if the value is
// smaller than the one at the head of the smallest stream.
if !have_smallest
|| streams[smallest_stream][indices[smallest_stream]].timestamp
> streams[i][indices[i]].timestamp
{
smallest_stream = i;
have_smallest = true;
if min_time > streams[i][indices[i]].timestamp {
min_time = streams[i][indices[i]].timestamp;
}
}

// Exit if no values in any stream
if !have_smallest {
if min_time == sentinel_time {
break;
}

let min_time = streams[smallest_stream][indices[smallest_stream]].timestamp;
let lim_time = min_time + chrono::Duration::seconds(10);
let near_past = min_time - chrono::Duration::seconds(30);
let deep_past = min_time - chrono::Duration::seconds(60);

// Now select values from all streams (either a value in the time window or the most
// recent value before the time window) and advance the stream pointers for the ones in
// the window.
let mut selected: Vec<&Box<LogEntry>> = vec![];
for i in 0..streams.len() {
// Now select values from all streams (either a value in the time window or the most recent
// value before the time window) and advance the stream pointers for the ones in the window.
//
// The cases marked "highly likely" get most of the hits in long runs, then the case marked
// "fairly likely" gets one hit per record, usually, and then the case that retires a stream
// gets one hit per stream.

for i in live..streams.len() {
let s = &streams[i];
let ix = indices[i];
let lim = s.len();
if ix < lim && s[ix].timestamp >= min_time && s[ix].timestamp < lim_time {
// Current exists and is in in the time window, pick it up and advance index
selected.push(&s[ix]);
indices[i] += 1;
} else if ix > 0 && ix < lim && s[ix - 1].timestamp >= near_past {
// Previous exists and is not last and is in the near past, pick it up. The
// condition is tricky. ix > 0 guarantees that there is a past record at ix - 1,
// while ix < lim says that there is also a future record at ix.
//
// This is hard to make reliable. The guard on the time is necessary to avoid
// picking up records from a lot of dead processes. Intra-host it is OK.
// Cross-host it depends on sonar runs being more or less synchronized.
selected.push(&s[ix - 1]);
} else if ix > 0 && s[ix - 1].timestamp < min_time && s[ix - 1].timestamp >= deep_past {
// Previous exists (and is last) and is not in the deep past, pick it up
selected.push(&s[ix - 1]);

// lim > 0 because no stream is empty

if ix < lim {
// Live or future stream.

// ix < lim

if s[ix].timestamp >= lim_time {
// Highly likely - the stream starts in the future.
continue;
}

// ix < lim
// s[ix].timestamp < lim_time

if s[ix].timestamp == min_time {
// Fairly likely in normal input - sample time is equal to the min_time. This
// would be subsumed by the following test using >= for > but the equality test
// is faster.
selected.push(&s[ix]);
indices[i] += 1;
continue;
}

// ix < lim
// s[ix].timestamp < lim_time
// s[ix].timestamp != min_time

if s[ix].timestamp > min_time {
// Unlikely in normal input - sample time is in in the time window but not equal
// to min_time.
selected.push(&s[ix]);
indices[i] += 1;
continue;
}

// ix < lim
// s[ix].timestamp < min_time

if ix > 0 && s[ix - 1].timestamp >= near_past {
// Unlikely in normal input - Previous exists and is not last and is in the near
// past (redundant test for t < lim_time removed). The condition is tricky.
// ix>0 guarantees that there is a past record at ix - 1, while ix<lim says that
// there is also a future record at ix.
//
// This is hard to make reliable. The guard on the time is necessary to avoid
// picking up records from a lot of dead processes. Intra-host it is OK.
// Cross-host it depends on sonar runs being more or less synchronized.
selected.push(&s[ix - 1]);
continue;
}

// ix < lim
// s[ix].timestamp < min_time
// s[ix-1].timestamp < near_past

// This is duplicated within the ix==lim nest below, in a different form.
if ix > 0 && s[ix - 1].timestamp >= deep_past {
// Previous exists (and is last) and is not in the deep past, pick it up
selected.push(&s[ix - 1]);
continue;
}

// ix < lim
// s[ix].timestamp < min_time
// s[ix-1].timestamp < deep_past

// This is an old record and we can ignore it.
continue;

} else if ix == STREAM_ENDED {
// Highly likely - stream already marked as exhausted.
continue;
} else {
// Various cases where we don't pick up any data:
// - we're at the first position and the record is in the future
// - we're at the last position and the record is in the deep past
// About-to-be exhausted stream.

// ix == lim
// ix > 0 because lim > 0

if s[ix - 1].timestamp < deep_past {
// Previous is in the deep past and no current - stream is done.
indices[i] = STREAM_ENDED;
continue;
}

// ix == lim
// ix > 0
// s[ix-1].timestamp >= deep_past

// This case is a duplicate from the ix<lim nest above, in a different form.
if s[ix - 1].timestamp < min_time {
// Previous exists (and is last) and is not in the deep past, pick it up
selected.push(&s[ix - 1]);
continue;
}

// ix == lim
// ix > 0
// s[ix-1].timestamp >= min_time

// This is a contradiction probably and it seems we should not come this far. Don't
// worry about it.
continue;
}
}

Expand All @@ -355,6 +504,7 @@ fn merge_streams(
command.clone(),
&selected,
));
selected.clear();
}

records
Expand Down

0 comments on commit a1518a9

Please sign in to comment.