Skip to content

Commit

Permalink
Merge #683
Browse files Browse the repository at this point in the history
683: Fixing bugs in heartbeat combine code r=WireBaron a=WireBaron

Prior to this the combine code was only combining the intervals, not updating the interval start and end times or the last point.  As part of this change we're now extending any interval that was truncated by the end of an aggregate that now falls in the midst of the combined area.

Fixes #679 
Fixes #660

Co-authored-by: Brian Rowe <[email protected]>
  • Loading branch information
bors[bot] and Brian Rowe authored Jan 31, 2023
2 parents 45cba4f + d1914ff commit 77fed05
Showing 1 changed file with 71 additions and 0 deletions.
71 changes: 71 additions & 0 deletions extension/src/heartbeat_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ use crate::{
ron_inout_funcs,
};

use std::cmp::{max, min};

use toolkit_experimental::HeartbeatAggData;

const BUFFER_SIZE: usize = 1000; // How many values to absorb before consolidating
Expand Down Expand Up @@ -80,6 +82,22 @@ impl HeartbeatTransState {
}
}

// In general we shouldn't need to change these creation time parameters, but if
// we're combining with another interval this may be necessary.
fn extend_covered_interval(&mut self, new_start: i64, new_end: i64) {
assert!(new_start <= self.start && new_end >= self.end); // this is guaranteed by the combine function
self.start = new_start;

// extend last range if able
if self.end < new_end && self.last + self.interval_len > self.end {
assert!(!self.liveness.is_empty()); // above condition should be impossible without liveness data

let mut last_interval = self.liveness.last_mut().unwrap();
last_interval.1 = min(self.last + self.interval_len, new_end);
}
self.end = new_end;
}

fn combine_intervals(&mut self, new_intervals: Vec<(i64, i64)>) {
// Optimized path for ordered inputs
if self.liveness.last().unwrap().0 < new_intervals.first().unwrap().0 {
Expand Down Expand Up @@ -141,7 +159,14 @@ impl HeartbeatTransState {
assert!(self.interval_len == other.interval_len); // Nicer error would be nice here
self.process_batch();
other.process_batch();

let min_start = min(self.start, other.start);
let max_end = max(self.end, other.end);
self.extend_covered_interval(min_start, max_end);
other.extend_covered_interval(min_start, max_end);

self.combine_intervals(other.liveness);
self.last = max(self.last, other.last);
}
}

Expand Down Expand Up @@ -670,6 +695,52 @@ mod tests {

#[pg_test]
pub fn test_heartbeat_rollup() {
Spi::execute(|client| {
client.select("SET TIMEZONE to UTC", None, None);

client.select(
"CREATE TABLE heartbeats(time timestamptz, batch timestamptz)",
None,
None,
);

client.select(
"INSERT INTO heartbeats VALUES
('01-01-2020 3:02:20 UTC'::timestamptz, '01-01-2020 3:00:00 UTC'::timestamptz),
('01-01-2020 3:03:10 UTC'::timestamptz, '01-01-2020 3:00:00 UTC'::timestamptz),
('01-01-2020 3:04:07 UTC'::timestamptz, '01-01-2020 3:00:00 UTC'::timestamptz),
('01-01-2020 7:19:20 UTC'::timestamptz, '01-01-2020 7:00:00 UTC'::timestamptz),
('01-01-2020 7:39:20 UTC'::timestamptz, '01-01-2020 7:00:00 UTC'::timestamptz),
('01-01-2020 7:59:20 UTC'::timestamptz, '01-01-2020 7:00:00 UTC'::timestamptz),
('01-01-2020 8:00:10 UTC'::timestamptz, '01-01-2020 8:00:00 UTC'::timestamptz),
('01-01-2020 8:59:10 UTC'::timestamptz, '01-01-2020 8:00:00 UTC'::timestamptz),
('01-01-2020 23:34:20 UTC'::timestamptz, '01-01-2020 23:00:00 UTC'::timestamptz),
('01-01-2020 23:37:20 UTC'::timestamptz, '01-01-2020 23:00:00 UTC'::timestamptz),
('01-01-2020 23:38:05 UTC'::timestamptz, '01-01-2020 23:00:00 UTC'::timestamptz),
('01-01-2020 23:39:00 UTC'::timestamptz, '01-01-2020 23:00:00 UTC'::timestamptz)",
None,
None,
);

let result = client
.select(
"WITH aggs AS (
SELECT toolkit_experimental.heartbeat_agg(time, batch, '1h', '1m')
FROM heartbeats
GROUP BY batch
) SELECT toolkit_experimental.rollup(heartbeat_agg)::TEXT FROM aggs",
None,
None,
)
.first()
.get_one::<String>()
.unwrap();
assert_eq!("(version:1,start_time:631162800000000,end_time:631238400000000,last_seen:631237140000000,interval_len:60000000,num_intervals:7,interval_starts:[631162940000000,631178360000000,631179560000000,631180760000000,631184350000000,631236860000000,631237040000000],interval_ends:[631163107000000,631178420000000,631179620000000,631180870000000,631184410000000,631236920000000,631237200000000])", result);
})
}

#[pg_test]
pub fn test_heartbeat_combining_rollup() {
Spi::execute(|client| {
client.select("SET TIMEZONE to UTC", None, None);

Expand Down

0 comments on commit 77fed05

Please sign in to comment.