diff --git a/extension/src/heartbeat_agg.rs b/extension/src/heartbeat_agg.rs index 3656ae54..8d46fcd5 100644 --- a/extension/src/heartbeat_agg.rs +++ b/extension/src/heartbeat_agg.rs @@ -11,6 +11,8 @@ use crate::{ ron_inout_funcs, }; +use std::cmp::{max, min}; + use toolkit_experimental::HeartbeatAggData; const BUFFER_SIZE: usize = 1000; // How many values to absorb before consolidating @@ -80,6 +82,22 @@ impl HeartbeatTransState { } } + // In general we shouldn't need to change these creation time parameters, but if + // we're combining with another interval this may be necessary. + fn extend_covered_interval(&mut self, new_start: i64, new_end: i64) { + assert!(new_start <= self.start && new_end >= self.end); // this is guaranteed by the combine function + self.start = new_start; + + // extend last range if able + if self.end < new_end && self.last + self.interval_len > self.end { + assert!(!self.liveness.is_empty()); // above condition should be impossible without liveness data + + let mut last_interval = self.liveness.last_mut().unwrap(); + last_interval.1 = min(self.last + self.interval_len, new_end); + } + self.end = new_end; + } + fn combine_intervals(&mut self, new_intervals: Vec<(i64, i64)>) { // Optimized path for ordered inputs if self.liveness.last().unwrap().0 < new_intervals.first().unwrap().0 { @@ -141,7 +159,14 @@ impl HeartbeatTransState { assert!(self.interval_len == other.interval_len); // Nicer error would be nice here self.process_batch(); other.process_batch(); + + let min_start = min(self.start, other.start); + let max_end = max(self.end, other.end); + self.extend_covered_interval(min_start, max_end); + other.extend_covered_interval(min_start, max_end); + self.combine_intervals(other.liveness); + self.last = max(self.last, other.last); } } @@ -670,6 +695,52 @@ mod tests { #[pg_test] pub fn test_heartbeat_rollup() { + Spi::execute(|client| { + client.select("SET TIMEZONE to UTC", None, None); + + client.select( + "CREATE TABLE heartbeats(time timestamptz, batch timestamptz)", + None, + None, + ); + + client.select( + "INSERT INTO heartbeats VALUES + ('01-01-2020 3:02:20 UTC'::timestamptz, '01-01-2020 3:00:00 UTC'::timestamptz), + ('01-01-2020 3:03:10 UTC'::timestamptz, '01-01-2020 3:00:00 UTC'::timestamptz), + ('01-01-2020 3:04:07 UTC'::timestamptz, '01-01-2020 3:00:00 UTC'::timestamptz), + ('01-01-2020 7:19:20 UTC'::timestamptz, '01-01-2020 7:00:00 UTC'::timestamptz), + ('01-01-2020 7:39:20 UTC'::timestamptz, '01-01-2020 7:00:00 UTC'::timestamptz), + ('01-01-2020 7:59:20 UTC'::timestamptz, '01-01-2020 7:00:00 UTC'::timestamptz), + ('01-01-2020 8:00:10 UTC'::timestamptz, '01-01-2020 8:00:00 UTC'::timestamptz), + ('01-01-2020 8:59:10 UTC'::timestamptz, '01-01-2020 8:00:00 UTC'::timestamptz), + ('01-01-2020 23:34:20 UTC'::timestamptz, '01-01-2020 23:00:00 UTC'::timestamptz), + ('01-01-2020 23:37:20 UTC'::timestamptz, '01-01-2020 23:00:00 UTC'::timestamptz), + ('01-01-2020 23:38:05 UTC'::timestamptz, '01-01-2020 23:00:00 UTC'::timestamptz), + ('01-01-2020 23:39:00 UTC'::timestamptz, '01-01-2020 23:00:00 UTC'::timestamptz)", + None, + None, + ); + + let result = client + .select( + "WITH aggs AS ( + SELECT toolkit_experimental.heartbeat_agg(time, batch, '1h', '1m') + FROM heartbeats + GROUP BY batch + ) SELECT toolkit_experimental.rollup(heartbeat_agg)::TEXT FROM aggs", + None, + None, + ) + .first() + .get_one::() + .unwrap(); + assert_eq!("(version:1,start_time:631162800000000,end_time:631238400000000,last_seen:631237140000000,interval_len:60000000,num_intervals:7,interval_starts:[631162940000000,631178360000000,631179560000000,631180760000000,631184350000000,631236860000000,631237040000000],interval_ends:[631163107000000,631178420000000,631179620000000,631180870000000,631184410000000,631236920000000,631237200000000])", result); + }) + } + + #[pg_test] + pub fn test_heartbeat_combining_rollup() { Spi::execute(|client| { client.select("SET TIMEZONE to UTC", None, None);