diff --git a/Changelog.md b/Changelog.md index 53a5b1dd..5168d039 100644 --- a/Changelog.md +++ b/Changelog.md @@ -19,6 +19,7 @@ This changelog should be updated as part of a PR if the work is worth noting (mo #### Other notable changes - [#743](https://github.com/timescale/timescaledb-toolkit/pull/743): Remove support for direct upgrades from toolkit versions more than 1 year old. Toolkit versions 1.4.x and 1.5.x will have to upgrade to an intermediate version before upgrading to 1.16.0. - [#744](https://github.com/timescale/timescaledb-toolkit/pull/744): Fix nightly CI failures from building TimescaleDB on Enterprise Linux +- [#749](https://github.com/timescale/timescaledb-toolkit/pull/749): Added num_live_ranges, num_gaps, and trim_to accessors for heartbeat aggregates #### Shout-outs diff --git a/extension/src/accessors.rs b/extension/src/accessors.rs index 8703240e..cfbf0417 100644 --- a/extension/src/accessors.rs +++ b/extension/src/accessors.rs @@ -105,6 +105,8 @@ accessor! { into_values() } accessor! { into_int_values() } accessor! { state_timeline() } accessor! { state_int_timeline() } +accessor! { num_live_ranges() } +accessor! { num_gaps() } // The rest are more complex, with String or other challenges. Leaving alone for now. pg_type! { diff --git a/extension/src/datum_utils.rs b/extension/src/datum_utils.rs index ea0f328e..65a147f0 100644 --- a/extension/src/datum_utils.rs +++ b/extension/src/datum_utils.rs @@ -41,7 +41,10 @@ pub(crate) unsafe fn free_datum(datum: Datum, typoid: Oid) { // TODO: is there a better place for this? // Note that this requires an reference time to deal with variable length intervals (days or months) -pub fn interval_to_ms(ref_time: &crate::raw::TimestampTz, interval: &crate::raw::Interval) -> i64 { +pub fn ts_interval_sum_to_ms( + ref_time: &crate::raw::TimestampTz, + interval: &crate::raw::Interval, +) -> i64 { extern "C" { fn timestamptz_pl_interval(fcinfo: pg_sys::FunctionCallInfo) -> pg_sys::Datum; } @@ -53,7 +56,11 @@ pub fn interval_to_ms(ref_time: &crate::raw::TimestampTz, interval: &crate::raw: interval.0, ) }; - bound.value() as i64 - ref_time.0.value() as i64 + bound.value() as i64 +} + +pub fn interval_to_ms(ref_time: &crate::raw::TimestampTz, interval: &crate::raw::Interval) -> i64 { + ts_interval_sum_to_ms(ref_time, interval) - ref_time.0.value() as i64 } pub struct TextSerializableDatumWriter { diff --git a/extension/src/heartbeat_agg.rs b/extension/src/heartbeat_agg.rs index 790efd79..4690b9b8 100644 --- a/extension/src/heartbeat_agg.rs +++ b/extension/src/heartbeat_agg.rs @@ -3,7 +3,8 @@ use pgx::*; use crate::{ accessors::{ - AccessorDeadRanges, AccessorDowntime, AccessorLiveAt, AccessorLiveRanges, AccessorUptime, + AccessorDeadRanges, AccessorDowntime, AccessorLiveAt, AccessorLiveRanges, AccessorNumGaps, + AccessorNumLiveRanges, AccessorUptime, }, aggregate_utils::in_aggregate_context, datum_utils::interval_to_ms, @@ -20,7 +21,7 @@ mod accessors; use accessors::{ HeartbeatInterpolateAccessor, HeartbeatInterpolatedDowntimeAccessor, - HeartbeatInterpolatedUptimeAccessor, + HeartbeatInterpolatedUptimeAccessor, HeartbeatTrimToAccessor, }; const BUFFER_SIZE: usize = 1000; // How many values to absorb before consolidating @@ -205,6 +206,68 @@ pg_type! { ron_inout_funcs!(HeartbeatAgg); impl HeartbeatAgg<'_> { + fn trim_to(self, start: Option, end: Option) -> HeartbeatAgg<'static> { + if (start.is_some() && start.unwrap() < self.start_time) + || (end.is_some() && end.unwrap() > self.end_time) + { + error!("Can not query beyond the original aggregate bounds"); + } + + let mut starts: Vec = vec![]; + let mut ends: Vec = vec![]; + for i in 0..self.num_intervals as usize { + starts.push(self.interval_starts.slice()[i]); + ends.push(self.interval_ends.slice()[i]); + } + + let low_idx = if let Some(start) = start { + let mut idx = 0; + while idx < self.num_intervals as usize && ends[idx] < start { + idx += 1; + } + if starts[idx] < start { + starts[idx] = start; + } + idx + } else { + 0 + }; + + let mut new_last = None; + let high_idx = if let Some(end) = end { + if self.num_intervals > 0 { + let mut idx = self.num_intervals as usize - 1; + while idx > low_idx && starts[idx] > end { + idx -= 1; + } + new_last = Some(ends[idx] - self.interval_len); + if ends[idx] > end { + if end < new_last.unwrap() { + new_last = Some(end); + } + ends[idx] = end; + } + idx + } else { + self.num_intervals as usize - 1 + } + } else { + self.num_intervals as usize - 1 + }; + + unsafe { + flatten!(HeartbeatAgg { + start_time: start.unwrap_or(self.start_time), + end_time: end.unwrap_or(self.end_time), + last_seen: new_last.unwrap_or(self.last_seen), + interval_len: self.interval_len, + num_intervals: (high_idx - low_idx + 1) as u64, + interval_starts: starts[low_idx..=high_idx].into(), + interval_ends: ends[low_idx..=high_idx].into(), + }) + } + } + fn sum_live_intervals(self) -> i64 { let starts = self.interval_starts.as_slice(); let ends = self.interval_ends.as_slice(); @@ -391,11 +454,16 @@ pub fn arrow_heartbeat_agg_interpolated_downtime( #[pg_extern] pub fn live_at(agg: HeartbeatAgg<'static>, test: TimestampTz) -> bool { + let test = i64::from(test); + + if test < agg.start_time || test > agg.end_time { + error!("unable to test for liveness outside of a heartbeat_agg's covered range") + } + if agg.num_intervals == 0 { return false; } - let test = i64::from(test); let mut start_iter = agg.interval_starts.iter().enumerate().peekable(); while let Some((idx, val)) = start_iter.next() { if test < val { @@ -443,6 +511,75 @@ pub fn arrow_heartbeat_agg_interpolate( interpolate_heartbeat_agg(sketch, accessor.pred()) } +#[pg_extern] +pub fn num_live_ranges(agg: HeartbeatAgg<'static>) -> i64 { + agg.num_intervals as i64 +} + +#[pg_operator(immutable, parallel_safe)] +#[opname(->)] +pub fn arrow_heartbeat_agg_num_live_ranges( + agg: HeartbeatAgg<'static>, + _accessor: AccessorNumLiveRanges<'static>, +) -> i64 { + num_live_ranges(agg) +} + +#[pg_extern] +pub fn num_gaps(agg: HeartbeatAgg<'static>) -> i64 { + if agg.num_intervals == 0 { + return 1; + } + let mut count = agg.num_intervals - 1; + if agg.interval_starts.slice()[0] != agg.start_time { + count += 1; + } + if agg.interval_ends.slice()[agg.num_intervals as usize - 1] != agg.end_time { + count += 1; + } + count as i64 +} + +#[pg_operator(immutable, parallel_safe)] +#[opname(->)] +pub fn arrow_heartbeat_agg_num_gaps( + agg: HeartbeatAgg<'static>, + _accessor: AccessorNumGaps<'static>, +) -> i64 { + num_gaps(agg) +} + +#[pg_extern] +pub fn trim_to( + agg: HeartbeatAgg<'static>, + start: default!(Option, "NULL"), + duration: default!(Option, "NULL"), +) -> HeartbeatAgg<'static> { + if let Some(start) = start { + let end = duration.map(|intv| crate::datum_utils::ts_interval_sum_to_ms(&start, &intv)); + agg.trim_to(Some(i64::from(start)), end) + } else { + let end = duration.map(|intv| { + crate::datum_utils::ts_interval_sum_to_ms(&TimestampTz::from(agg.start_time), &intv) + }); + agg.trim_to(None, end) + } +} + +#[pg_operator(immutable, parallel_safe)] +#[opname(->)] +pub fn arrow_heartbeat_agg_trim_to( + agg: HeartbeatAgg<'static>, + accessor: HeartbeatTrimToAccessor<'static>, +) -> HeartbeatAgg<'static> { + let end = if accessor.end == 0 { + None + } else { + Some(accessor.end) + }; + agg.trim_to(Some(accessor.start), end) +} + impl From> for HeartbeatTransState { fn from(agg: HeartbeatAgg<'static>) -> Self { HeartbeatTransState { @@ -883,19 +1020,17 @@ mod tests { .unwrap().first() .get_three::().unwrap(); - let (result4, result5) = + let result4 = client.update( "WITH agg AS (SELECT heartbeat_agg(heartbeat, '01-01-2020 UTC', '2h', '10m') AS agg FROM liveness) - SELECT live_at(agg, '01-01-2020 01:38:00 UTC')::TEXT, - live_at(agg, '01-01-2020 02:01:00 UTC')::TEXT FROM agg", None, None) + SELECT live_at(agg, '01-01-2020 01:38:00 UTC')::TEXT FROM agg", None, None) .unwrap().first() - .get_two::().unwrap(); + .get_one::().unwrap(); assert_eq!(result1.unwrap(), "false"); // outside ranges assert_eq!(result2.unwrap(), "true"); // inside ranges assert_eq!(result3.unwrap(), "true"); // first point of range assert_eq!(result4.unwrap(), "false"); // last point of range - assert_eq!(result5.unwrap(), "false"); // outside aggregate let (result1, result2, result3) = client.update( @@ -906,19 +1041,37 @@ mod tests { .unwrap().first() .get_three::().unwrap(); - let (result4, result5) = + let result4 = client.update( "WITH agg AS (SELECT heartbeat_agg(heartbeat, '01-01-2020 UTC', '2h', '10m') AS agg FROM liveness) - SELECT (agg -> live_at('01-01-2020 01:38:00 UTC'))::TEXT, - (agg -> live_at('01-01-2020 02:01:00 UTC'))::TEXT FROM agg", None, None) + SELECT (agg -> live_at('01-01-2020 01:38:00 UTC'))::TEXT FROM agg", None, None) .unwrap().first() - .get_two::().unwrap(); + .get_one::().unwrap(); assert_eq!(result1.unwrap(), "false"); // outside ranges assert_eq!(result2.unwrap(), "true"); // inside ranges assert_eq!(result3.unwrap(), "true"); // first point of range assert_eq!(result4.unwrap(), "false"); // last point of range - assert_eq!(result5.unwrap(), "false"); // outside aggregate + + let (result1, result2) = + client.update( + "WITH agg AS (SELECT heartbeat_agg(heartbeat, '01-01-2020 UTC', '2h', '10m') AS agg FROM liveness) + SELECT num_live_ranges(agg), num_gaps(agg) FROM agg", None, None) + .unwrap().first() + .get_two::().unwrap(); + + assert_eq!(result1.unwrap(), 4); + assert_eq!(result2.unwrap(), 4); + + let (result1, result2) = + client.update( + "WITH agg AS (SELECT heartbeat_agg(heartbeat, '01-01-2020 UTC', '2h', '10m') AS agg FROM liveness) + SELECT agg->num_live_ranges(), agg->num_gaps() FROM agg", None, None) + .unwrap().first() + .get_two::().unwrap(); + + assert_eq!(result1.unwrap(), 4); + assert_eq!(result2.unwrap(), 4); }) } @@ -1072,6 +1225,83 @@ mod tests { }); } + #[pg_test] + pub fn test_heartbeat_trim_to() { + Spi::connect(|mut client| { + client.update("SET TIMEZONE to UTC", None, None).unwrap(); + + client + .update("CREATE TABLE liveness(heartbeat TIMESTAMPTZ)", None, None) + .unwrap(); + + client + .update( + "INSERT INTO liveness VALUES + ('01-01-2020 0:2:20 UTC'), + ('01-01-2020 0:10 UTC'), + ('01-01-2020 0:17 UTC'), + ('01-01-2020 0:30 UTC'), + ('01-01-2020 0:35 UTC'), + ('01-01-2020 0:40 UTC'), + ('01-01-2020 0:35 UTC'), + ('01-01-2020 0:40 UTC'), + ('01-01-2020 0:40 UTC'), + ('01-01-2020 0:50:30 UTC'), + ('01-01-2020 1:00 UTC'), + ('01-01-2020 1:08 UTC'), + ('01-01-2020 1:18 UTC'), + ('01-01-2020 1:28 UTC'), + ('01-01-2020 1:38:01 UTC'), + ('01-01-2020 1:40 UTC'), + ('01-01-2020 1:40:01 UTC'), + ('01-01-2020 1:50:01 UTC'), + ('01-01-2020 1:57 UTC'), + ('01-01-2020 1:59:50 UTC') + ", + None, + None, + ) + .unwrap(); + + let (result1, result2, result3) = + client.update( + "WITH agg AS (SELECT heartbeat_agg(heartbeat, '01-01-2020 UTC', '2h', '10m') AS agg FROM liveness), + trimmed AS (SELECT trim_to(agg, '01-01-2020 0:30 UTC', '1h') AS agg FROM agg) + SELECT uptime(agg)::TEXT, num_gaps(agg), live_at(agg, '01-01-2020 0:50:25 UTC')::TEXT FROM trimmed", None, None) + .unwrap().first() + .get_three::().unwrap(); + + assert_eq!(result1.unwrap(), "00:59:30"); + assert_eq!(result2.unwrap(), 1); + assert_eq!(result3.unwrap(), "false"); + + let (result1, result2, result3) = + client.update( + "WITH agg AS (SELECT heartbeat_agg(heartbeat, '01-01-2020 UTC', '2h', '10m') AS agg FROM liveness), + trimmed AS (SELECT trim_to(agg, duration=>'30m') AS agg FROM agg) + SELECT uptime(agg)::TEXT, num_gaps(agg), live_at(agg, '01-01-2020 0:20:25 UTC')::TEXT FROM trimmed", None, None) + .unwrap().first() + .get_three::().unwrap(); + + assert_eq!(result1.unwrap(), "00:24:40"); + assert_eq!(result2.unwrap(), 2); + assert_eq!(result3.unwrap(), "true"); + + let (result1, result2, result3) = + client.update( + "WITH agg AS (SELECT heartbeat_agg(heartbeat, '01-01-2020 UTC', '2h', '10m') AS agg FROM liveness) + SELECT agg -> trim_to('01-01-2020 1:40:00 UTC'::timestamptz) -> num_gaps(), + (agg -> trim_to('01-01-2020 00:50:00 UTC'::timestamptz, '30s') -> uptime())::TEXT, + agg -> trim_to('01-01-2020 00:28:00 UTC'::timestamptz, '22m15s') -> num_live_ranges() FROM agg", None, None) + .unwrap().first() + .get_three::().unwrap(); + + assert_eq!(result1.unwrap(), 0); + assert_eq!(result2.unwrap(), "00:00:00"); + assert_eq!(result3.unwrap(), 1); + }); + } + #[pg_test] pub fn test_heartbeat_agg_interpolation() { Spi::connect(|mut client| { diff --git a/extension/src/heartbeat_agg/accessors.rs b/extension/src/heartbeat_agg/accessors.rs index e60b7c95..ea063bf9 100644 --- a/extension/src/heartbeat_agg/accessors.rs +++ b/extension/src/heartbeat_agg/accessors.rs @@ -121,3 +121,33 @@ impl<'a> HeartbeatInterpolateAccessor<'a> { } } } + +pg_type! { + struct HeartbeatTrimToAccessor { + start : i64, + end : i64, + } +} + +ron_inout_funcs!(HeartbeatTrimToAccessor); + +// Note that this is unable to take only a duration, as we don't have the functionality to store +// an interval in PG format and are unable to convert it to an int without a reference time. +// This is a difference from the inline function. +#[pg_extern(immutable, parallel_safe, name = "trim_to")] +fn heartbeat_agg_trim_to_accessor( + start: crate::raw::TimestampTz, + duration: default!(Option, "NULL"), +) -> HeartbeatTrimToAccessor<'static> { + let end = duration + .map(|intv| crate::datum_utils::ts_interval_sum_to_ms(&start, &intv)) + .unwrap_or(0); + let start = i64::from(start); + + crate::build! { + HeartbeatTrimToAccessor { + start, + end, + } + } +} diff --git a/extension/src/stabilization_info.rs b/extension/src/stabilization_info.rs index c2f9311f..5e6d6cea 100644 --- a/extension/src/stabilization_info.rs +++ b/extension/src/stabilization_info.rs @@ -14,6 +14,21 @@ crate::functions_stabilized_at! { "1.16.0" => { approx_count_distinct(anyelement), approx_count_distinct_trans(internal,anyelement), + accessornumgaps_in(cstring), + accessornumgaps_out(accessornumgaps), + accessornumliveranges_in(cstring), + accessornumliveranges_out(accessornumliveranges), + arrow_heartbeat_agg_num_gaps(heartbeatagg,accessornumgaps), + arrow_heartbeat_agg_num_live_ranges(heartbeatagg,accessornumliveranges), + arrow_heartbeat_agg_trim_to(heartbeatagg,heartbeattrimtoaccessor), + heartbeattrimtoaccessor_in(cstring), + heartbeattrimtoaccessor_out(heartbeattrimtoaccessor), + num_gaps(), + num_gaps(heartbeatagg), + num_live_ranges(), + num_live_ranges(heartbeatagg), + trim_to(heartbeatagg,timestamp with time zone,interval), + trim_to(timestamp with time zone,interval), } "1.15.0" => { arrow_counter_interpolated_delta(countersummary,counterinterpolateddeltaaccessor), @@ -673,6 +688,11 @@ crate::functions_stabilized_at! { crate::types_stabilized_at! { STABLE_TYPES + "1.16.0" => { + accessornumgaps, + accessornumliveranges, + heartbeattrimtoaccessor, + } "1.15.0" => { counterinterpolateddeltaaccessor, counterinterpolatedrateaccessor, @@ -797,6 +817,11 @@ crate::types_stabilized_at! { crate::operators_stabilized_at! { STABLE_OPERATORS + "1.16.0" => { + "->"(heartbeatagg,accessornumgaps), + "->"(heartbeatagg,accessornumliveranges), + "->"(heartbeatagg,heartbeattrimtoaccessor), + } "1.15.0" => { "->"(countersummary,counterinterpolateddeltaaccessor), "->"(countersummary,counterinterpolatedrateaccessor),