Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding new functionality to heartbeat_agg #749

Merged
merged 1 commit into from
Apr 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ This changelog should be updated as part of a PR if the work is worth noting (mo
#### Other notable changes
- [#743](https://github.com/timescale/timescaledb-toolkit/pull/743): Remove support for direct upgrades from toolkit versions more than 1 year old. Toolkit versions 1.4.x and 1.5.x will have to upgrade to an intermediate version before upgrading to 1.16.0.
- [#744](https://github.com/timescale/timescaledb-toolkit/pull/744): Fix nightly CI failures from building TimescaleDB on Enterprise Linux
- [#749](https://github.com/timescale/timescaledb-toolkit/pull/749): Added num_live_ranges, num_gaps, and trim_to accessors for heartbeat aggregates

#### Shout-outs

Expand Down
2 changes: 2 additions & 0 deletions extension/src/accessors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@ accessor! { into_values() }
accessor! { into_int_values() }
accessor! { state_timeline() }
accessor! { state_int_timeline() }
accessor! { num_live_ranges() }
accessor! { num_gaps() }
// The rest are more complex, with String or other challenges. Leaving alone for now.

pg_type! {
Expand Down
11 changes: 9 additions & 2 deletions extension/src/datum_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,10 @@ pub(crate) unsafe fn free_datum(datum: Datum, typoid: Oid) {

// TODO: is there a better place for this?
// Note that this requires an reference time to deal with variable length intervals (days or months)
pub fn interval_to_ms(ref_time: &crate::raw::TimestampTz, interval: &crate::raw::Interval) -> i64 {
pub fn ts_interval_sum_to_ms(
ref_time: &crate::raw::TimestampTz,
interval: &crate::raw::Interval,
) -> i64 {
extern "C" {
fn timestamptz_pl_interval(fcinfo: pg_sys::FunctionCallInfo) -> pg_sys::Datum;
}
Expand All @@ -53,7 +56,11 @@ pub fn interval_to_ms(ref_time: &crate::raw::TimestampTz, interval: &crate::raw:
interval.0,
)
};
bound.value() as i64 - ref_time.0.value() as i64
bound.value() as i64
}

pub fn interval_to_ms(ref_time: &crate::raw::TimestampTz, interval: &crate::raw::Interval) -> i64 {
ts_interval_sum_to_ms(ref_time, interval) - ref_time.0.value() as i64
}

pub struct TextSerializableDatumWriter {
Expand Down
256 changes: 243 additions & 13 deletions extension/src/heartbeat_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ use pgx::*;

use crate::{
accessors::{
AccessorDeadRanges, AccessorDowntime, AccessorLiveAt, AccessorLiveRanges, AccessorUptime,
AccessorDeadRanges, AccessorDowntime, AccessorLiveAt, AccessorLiveRanges, AccessorNumGaps,
AccessorNumLiveRanges, AccessorUptime,
},
aggregate_utils::in_aggregate_context,
datum_utils::interval_to_ms,
Expand All @@ -20,7 +21,7 @@ mod accessors;

use accessors::{
HeartbeatInterpolateAccessor, HeartbeatInterpolatedDowntimeAccessor,
HeartbeatInterpolatedUptimeAccessor,
HeartbeatInterpolatedUptimeAccessor, HeartbeatTrimToAccessor,
};

const BUFFER_SIZE: usize = 1000; // How many values to absorb before consolidating
Expand Down Expand Up @@ -205,6 +206,68 @@ pg_type! {
ron_inout_funcs!(HeartbeatAgg);

impl HeartbeatAgg<'_> {
fn trim_to(self, start: Option<i64>, end: Option<i64>) -> HeartbeatAgg<'static> {
if (start.is_some() && start.unwrap() < self.start_time)
|| (end.is_some() && end.unwrap() > self.end_time)
{
error!("Can not query beyond the original aggregate bounds");
}

let mut starts: Vec<i64> = vec![];
let mut ends: Vec<i64> = vec![];
for i in 0..self.num_intervals as usize {
starts.push(self.interval_starts.slice()[i]);
ends.push(self.interval_ends.slice()[i]);
}

let low_idx = if let Some(start) = start {
let mut idx = 0;
while idx < self.num_intervals as usize && ends[idx] < start {
idx += 1;
}
if starts[idx] < start {
starts[idx] = start;
}
idx
} else {
0
};

let mut new_last = None;
let high_idx = if let Some(end) = end {
if self.num_intervals > 0 {
let mut idx = self.num_intervals as usize - 1;
while idx > low_idx && starts[idx] > end {
idx -= 1;
}
new_last = Some(ends[idx] - self.interval_len);
if ends[idx] > end {
if end < new_last.unwrap() {
new_last = Some(end);
}
ends[idx] = end;
}
idx
} else {
self.num_intervals as usize - 1
}
} else {
self.num_intervals as usize - 1
};

unsafe {
flatten!(HeartbeatAgg {
start_time: start.unwrap_or(self.start_time),
end_time: end.unwrap_or(self.end_time),
last_seen: new_last.unwrap_or(self.last_seen),
interval_len: self.interval_len,
num_intervals: (high_idx - low_idx + 1) as u64,
interval_starts: starts[low_idx..=high_idx].into(),
interval_ends: ends[low_idx..=high_idx].into(),
})
}
}

fn sum_live_intervals(self) -> i64 {
let starts = self.interval_starts.as_slice();
let ends = self.interval_ends.as_slice();
Expand Down Expand Up @@ -391,11 +454,16 @@ pub fn arrow_heartbeat_agg_interpolated_downtime(

#[pg_extern]
pub fn live_at(agg: HeartbeatAgg<'static>, test: TimestampTz) -> bool {
let test = i64::from(test);

if test < agg.start_time || test > agg.end_time {
error!("unable to test for liveness outside of a heartbeat_agg's covered range")
}

if agg.num_intervals == 0 {
return false;
}

let test = i64::from(test);
let mut start_iter = agg.interval_starts.iter().enumerate().peekable();
while let Some((idx, val)) = start_iter.next() {
if test < val {
Expand Down Expand Up @@ -443,6 +511,75 @@ pub fn arrow_heartbeat_agg_interpolate(
interpolate_heartbeat_agg(sketch, accessor.pred())
}

#[pg_extern]
pub fn num_live_ranges(agg: HeartbeatAgg<'static>) -> i64 {
agg.num_intervals as i64
}

#[pg_operator(immutable, parallel_safe)]
#[opname(->)]
pub fn arrow_heartbeat_agg_num_live_ranges(
agg: HeartbeatAgg<'static>,
_accessor: AccessorNumLiveRanges<'static>,
) -> i64 {
num_live_ranges(agg)
}

#[pg_extern]
pub fn num_gaps(agg: HeartbeatAgg<'static>) -> i64 {
if agg.num_intervals == 0 {
return 1;
}
let mut count = agg.num_intervals - 1;
if agg.interval_starts.slice()[0] != agg.start_time {
count += 1;
}
if agg.interval_ends.slice()[agg.num_intervals as usize - 1] != agg.end_time {
count += 1;
}
count as i64
}

#[pg_operator(immutable, parallel_safe)]
#[opname(->)]
pub fn arrow_heartbeat_agg_num_gaps(
agg: HeartbeatAgg<'static>,
_accessor: AccessorNumGaps<'static>,
) -> i64 {
num_gaps(agg)
}

#[pg_extern]
pub fn trim_to(
agg: HeartbeatAgg<'static>,
start: default!(Option<crate::raw::TimestampTz>, "NULL"),
duration: default!(Option<crate::raw::Interval>, "NULL"),
) -> HeartbeatAgg<'static> {
if let Some(start) = start {
let end = duration.map(|intv| crate::datum_utils::ts_interval_sum_to_ms(&start, &intv));
agg.trim_to(Some(i64::from(start)), end)
} else {
let end = duration.map(|intv| {
crate::datum_utils::ts_interval_sum_to_ms(&TimestampTz::from(agg.start_time), &intv)
});
agg.trim_to(None, end)
}
}

#[pg_operator(immutable, parallel_safe)]
#[opname(->)]
pub fn arrow_heartbeat_agg_trim_to(
agg: HeartbeatAgg<'static>,
accessor: HeartbeatTrimToAccessor<'static>,
) -> HeartbeatAgg<'static> {
let end = if accessor.end == 0 {
None
} else {
Some(accessor.end)
};
agg.trim_to(Some(accessor.start), end)
}

impl From<HeartbeatAgg<'static>> for HeartbeatTransState {
fn from(agg: HeartbeatAgg<'static>) -> Self {
HeartbeatTransState {
Expand Down Expand Up @@ -883,19 +1020,17 @@ mod tests {
.unwrap().first()
.get_three::<String, String, String>().unwrap();

let (result4, result5) =
let result4 =
client.update(
"WITH agg AS (SELECT heartbeat_agg(heartbeat, '01-01-2020 UTC', '2h', '10m') AS agg FROM liveness)
SELECT live_at(agg, '01-01-2020 01:38:00 UTC')::TEXT,
live_at(agg, '01-01-2020 02:01:00 UTC')::TEXT FROM agg", None, None)
SELECT live_at(agg, '01-01-2020 01:38:00 UTC')::TEXT FROM agg", None, None)
.unwrap().first()
.get_two::<String, String>().unwrap();
.get_one::<String>().unwrap();

assert_eq!(result1.unwrap(), "false"); // outside ranges
assert_eq!(result2.unwrap(), "true"); // inside ranges
assert_eq!(result3.unwrap(), "true"); // first point of range
assert_eq!(result4.unwrap(), "false"); // last point of range
assert_eq!(result5.unwrap(), "false"); // outside aggregate

let (result1, result2, result3) =
client.update(
Expand All @@ -906,19 +1041,37 @@ mod tests {
.unwrap().first()
.get_three::<String, String, String>().unwrap();

let (result4, result5) =
let result4 =
client.update(
"WITH agg AS (SELECT heartbeat_agg(heartbeat, '01-01-2020 UTC', '2h', '10m') AS agg FROM liveness)
SELECT (agg -> live_at('01-01-2020 01:38:00 UTC'))::TEXT,
(agg -> live_at('01-01-2020 02:01:00 UTC'))::TEXT FROM agg", None, None)
SELECT (agg -> live_at('01-01-2020 01:38:00 UTC'))::TEXT FROM agg", None, None)
.unwrap().first()
.get_two::<String, String>().unwrap();
.get_one::<String>().unwrap();

assert_eq!(result1.unwrap(), "false"); // outside ranges
assert_eq!(result2.unwrap(), "true"); // inside ranges
assert_eq!(result3.unwrap(), "true"); // first point of range
assert_eq!(result4.unwrap(), "false"); // last point of range
assert_eq!(result5.unwrap(), "false"); // outside aggregate

let (result1, result2) =
client.update(
"WITH agg AS (SELECT heartbeat_agg(heartbeat, '01-01-2020 UTC', '2h', '10m') AS agg FROM liveness)
SELECT num_live_ranges(agg), num_gaps(agg) FROM agg", None, None)
.unwrap().first()
.get_two::<i64, i64>().unwrap();

assert_eq!(result1.unwrap(), 4);
assert_eq!(result2.unwrap(), 4);

let (result1, result2) =
client.update(
"WITH agg AS (SELECT heartbeat_agg(heartbeat, '01-01-2020 UTC', '2h', '10m') AS agg FROM liveness)
SELECT agg->num_live_ranges(), agg->num_gaps() FROM agg", None, None)
.unwrap().first()
.get_two::<i64, i64>().unwrap();

assert_eq!(result1.unwrap(), 4);
assert_eq!(result2.unwrap(), 4);
})
}

Expand Down Expand Up @@ -1072,6 +1225,83 @@ mod tests {
});
}

#[pg_test]
pub fn test_heartbeat_trim_to() {
Spi::connect(|mut client| {
client.update("SET TIMEZONE to UTC", None, None).unwrap();

client
.update("CREATE TABLE liveness(heartbeat TIMESTAMPTZ)", None, None)
.unwrap();

client
.update(
"INSERT INTO liveness VALUES
('01-01-2020 0:2:20 UTC'),
('01-01-2020 0:10 UTC'),
('01-01-2020 0:17 UTC'),
('01-01-2020 0:30 UTC'),
('01-01-2020 0:35 UTC'),
('01-01-2020 0:40 UTC'),
('01-01-2020 0:35 UTC'),
('01-01-2020 0:40 UTC'),
('01-01-2020 0:40 UTC'),
('01-01-2020 0:50:30 UTC'),
('01-01-2020 1:00 UTC'),
('01-01-2020 1:08 UTC'),
('01-01-2020 1:18 UTC'),
('01-01-2020 1:28 UTC'),
('01-01-2020 1:38:01 UTC'),
('01-01-2020 1:40 UTC'),
('01-01-2020 1:40:01 UTC'),
('01-01-2020 1:50:01 UTC'),
('01-01-2020 1:57 UTC'),
('01-01-2020 1:59:50 UTC')
",
None,
None,
)
.unwrap();

let (result1, result2, result3) =
client.update(
"WITH agg AS (SELECT heartbeat_agg(heartbeat, '01-01-2020 UTC', '2h', '10m') AS agg FROM liveness),
trimmed AS (SELECT trim_to(agg, '01-01-2020 0:30 UTC', '1h') AS agg FROM agg)
SELECT uptime(agg)::TEXT, num_gaps(agg), live_at(agg, '01-01-2020 0:50:25 UTC')::TEXT FROM trimmed", None, None)
.unwrap().first()
.get_three::<String, i64, String>().unwrap();

assert_eq!(result1.unwrap(), "00:59:30");
assert_eq!(result2.unwrap(), 1);
assert_eq!(result3.unwrap(), "false");

let (result1, result2, result3) =
client.update(
"WITH agg AS (SELECT heartbeat_agg(heartbeat, '01-01-2020 UTC', '2h', '10m') AS agg FROM liveness),
trimmed AS (SELECT trim_to(agg, duration=>'30m') AS agg FROM agg)
SELECT uptime(agg)::TEXT, num_gaps(agg), live_at(agg, '01-01-2020 0:20:25 UTC')::TEXT FROM trimmed", None, None)
.unwrap().first()
.get_three::<String, i64, String>().unwrap();

assert_eq!(result1.unwrap(), "00:24:40");
assert_eq!(result2.unwrap(), 2);
assert_eq!(result3.unwrap(), "true");

let (result1, result2, result3) =
client.update(
"WITH agg AS (SELECT heartbeat_agg(heartbeat, '01-01-2020 UTC', '2h', '10m') AS agg FROM liveness)
SELECT agg -> trim_to('01-01-2020 1:40:00 UTC'::timestamptz) -> num_gaps(),
(agg -> trim_to('01-01-2020 00:50:00 UTC'::timestamptz, '30s') -> uptime())::TEXT,
agg -> trim_to('01-01-2020 00:28:00 UTC'::timestamptz, '22m15s') -> num_live_ranges() FROM agg", None, None)
.unwrap().first()
.get_three::<i64, String, i64>().unwrap();

assert_eq!(result1.unwrap(), 0);
assert_eq!(result2.unwrap(), "00:00:00");
assert_eq!(result3.unwrap(), 1);
});
}

#[pg_test]
pub fn test_heartbeat_agg_interpolation() {
Spi::connect(|mut client| {
Expand Down
Loading