Skip to content

Commit

Permalink
feat: implement predict_linear function in promql (GreptimeTeam#1362)
Browse files Browse the repository at this point in the history
* feat: implement predict_linear function in promql

* feat: initialize predict_linear's planner

* fix(bug): fix a bug in linear regression and add some unit test for linear regression

* chore: format code

* feat: deal with NULL value in linear_regression

* feat: add test for all value is None
  • Loading branch information
haohuaijin authored and paomian committed Oct 19, 2023
1 parent b54c0e4 commit b84892e
Show file tree
Hide file tree
Showing 4 changed files with 447 additions and 60 deletions.
171 changes: 170 additions & 1 deletion src/promql/src/functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ mod deriv;
mod extrapolate_rate;
mod holt_winters;
mod idelta;
mod predict_linear;
mod quantile;
mod resets;
#[cfg(test)]
Expand All @@ -28,13 +29,14 @@ pub use aggr_over_time::{
PresentOverTime, StddevOverTime, StdvarOverTime, SumOverTime,
};
pub use changes::Changes;
use datafusion::arrow::array::ArrayRef;
use datafusion::arrow::array::{ArrayRef, Float64Array, TimestampMillisecondArray};
use datafusion::error::DataFusionError;
use datafusion::physical_plan::ColumnarValue;
pub use deriv::Deriv;
pub use extrapolate_rate::{Delta, Increase, Rate};
pub use holt_winters::HoltWinters;
pub use idelta::IDelta;
pub use predict_linear::PredictLinear;
pub use quantile::QuantileOverTime;
pub use resets::Resets;

Expand Down Expand Up @@ -63,3 +65,170 @@ pub(crate) fn compensated_sum_inc(inc: f64, sum: f64, mut compensation: f64) ->
}
(new_sum, compensation)
}

/// linear_regression performs a least-square linear regression analysis on the
/// times and values. It return the slope and intercept based on times and values.
/// Prometheus's implementation: https://github.com/prometheus/prometheus/blob/90b2f7a540b8a70d8d81372e6692dcbb67ccbaaa/promql/functions.go#L793-L837
pub(crate) fn linear_regression(
times: &TimestampMillisecondArray,
values: &Float64Array,
intercept_time: i64,
) -> (Option<f64>, Option<f64>) {
let mut count: f64 = 0.0;
let mut sum_x: f64 = 0.0;
let mut sum_y: f64 = 0.0;
let mut sum_xy: f64 = 0.0;
let mut sum_x2: f64 = 0.0;
let mut comp_x: f64 = 0.0;
let mut comp_y: f64 = 0.0;
let mut comp_xy: f64 = 0.0;
let mut comp_x2: f64 = 0.0;

let mut const_y = true;
let init_y: f64 = values.value(0);

for (i, value) in values.iter().enumerate() {
let time = times.value(i) as f64;
if value.is_none() {
continue;
}
let value = value.unwrap();
if const_y && i > 0 && value != init_y {
const_y = false;
}
count += 1.0;
let x = time - intercept_time as f64 / 1e3;
(sum_x, comp_x) = compensated_sum_inc(x, sum_x, comp_x);
(sum_y, comp_y) = compensated_sum_inc(value, sum_y, comp_y);
(sum_xy, comp_xy) = compensated_sum_inc(x * value, sum_xy, comp_xy);
(sum_x2, comp_x2) = compensated_sum_inc(x * x, sum_x2, comp_x2);
}

if count < 2.0 {
return (None, None);
}

if const_y {
if !init_y.is_finite() {
return (None, None);
}
return (Some(0.0), Some(init_y));
}

sum_x += comp_x;
sum_y += comp_y;
sum_xy += comp_xy;
sum_x2 += comp_x2;

let cov_xy = sum_xy - sum_x * sum_y / count;
let var_x = sum_x2 - sum_x * sum_x / count;

let slope = cov_xy / var_x;
let intercept = sum_y / count - slope * sum_x / count;

(Some(slope), Some(intercept))
}

#[cfg(test)]
mod test {
use super::*;

#[test]
fn calculate_linear_regression_none() {
let ts_array = TimestampMillisecondArray::from_iter(
[
0i64, 300, 600, 900, 1200, 1500, 1800, 2100, 2400, 2700, 3000,
]
.into_iter()
.map(Some),
);
let values_array = Float64Array::from_iter([
1.0 / 0.0,
1.0 / 0.0,
1.0 / 0.0,
1.0 / 0.0,
1.0 / 0.0,
1.0 / 0.0,
1.0 / 0.0,
1.0 / 0.0,
1.0 / 0.0,
1.0 / 0.0,
]);
let (slope, intercept) = linear_regression(&ts_array, &values_array, ts_array.value(0));
assert_eq!(slope, None);
assert_eq!(intercept, None);
}

#[test]
fn calculate_linear_regression_value_is_const() {
let ts_array = TimestampMillisecondArray::from_iter(
[
0i64, 300, 600, 900, 1200, 1500, 1800, 2100, 2400, 2700, 3000,
]
.into_iter()
.map(Some),
);
let values_array =
Float64Array::from_iter([10.0, 10.0, 10.0, 10.0, 10.0, 10.0, 10.0, 10.0, 10.0, 10.0]);
let (slope, intercept) = linear_regression(&ts_array, &values_array, ts_array.value(0));
assert_eq!(slope, Some(0.0));
assert_eq!(intercept, Some(10.0));
}

#[test]
fn calculate_linear_regression() {
let ts_array = TimestampMillisecondArray::from_iter(
[
0i64, 300, 600, 900, 1200, 1500, 1800, 2100, 2400, 2700, 3000,
]
.into_iter()
.map(Some),
);
let values_array = Float64Array::from_iter([
0.0, 10.0, 20.0, 30.0, 40.0, 0.0, 10.0, 20.0, 30.0, 40.0, 50.0,
]);
let (slope, intercept) = linear_regression(&ts_array, &values_array, ts_array.value(0));
assert_eq!(slope, Some(0.010606060606060607));
assert_eq!(intercept, Some(6.818181818181818));
}

#[test]
fn calculate_linear_regression_value_have_none() {
let ts_array = TimestampMillisecondArray::from_iter(
[
0i64, 300, 600, 900, 1200, 1350, 1500, 1800, 2100, 2400, 2550, 2700, 3000,
]
.into_iter()
.map(Some),
);
let values_array: Float64Array = [
Some(0.0),
Some(10.0),
Some(20.0),
Some(30.0),
Some(40.0),
None,
Some(0.0),
Some(10.0),
Some(20.0),
Some(30.0),
None,
Some(40.0),
Some(50.0),
]
.into_iter()
.collect();
let (slope, intercept) = linear_regression(&ts_array, &values_array, ts_array.value(0));
assert_eq!(slope, Some(0.010606060606060607));
assert_eq!(intercept, Some(6.818181818181818));
}

#[test]
fn calculate_linear_regression_value_all_none() {
let ts_array = TimestampMillisecondArray::from_iter([0i64, 300, 600].into_iter().map(Some));
let values_array: Float64Array = [None, None, None].into_iter().collect();
let (slope, intercept) = linear_regression(&ts_array, &values_array, ts_array.value(0));
assert_eq!(slope, None);
assert_eq!(intercept, None);
}
}
58 changes: 1 addition & 57 deletions src/promql/src/functions/deriv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use datafusion::physical_plan::ColumnarValue;
use datatypes::arrow::array::Array;
use datatypes::arrow::datatypes::DataType;

use crate::functions::{compensated_sum_inc, extract_array};
use crate::functions::{extract_array, linear_regression};
use crate::range_array::RangeArray;

#[range_fn(name = "Deriv", ret = "Float64Array", display_name = "prom_drive")]
Expand All @@ -40,62 +40,6 @@ pub fn drive(times: &TimestampMillisecondArray, values: &Float64Array) -> Option
}
}

/// linear_regression performs a least-square linear regression analysis on the
/// times and values. It return the slope and intercept based on times and values.
/// Prometheus's implementation: https://github.com/prometheus/prometheus/blob/90b2f7a540b8a70d8d81372e6692dcbb67ccbaaa/promql/functions.go#L793-L837
fn linear_regression(
times: &TimestampMillisecondArray,
values: &Float64Array,
intercept_time: i64,
) -> (Option<f64>, Option<f64>) {
let mut count: f64 = 0.0;
let mut sum_x: f64 = 0.0;
let mut sum_y: f64 = 0.0;
let mut sum_xy: f64 = 0.0;
let mut sum_x2: f64 = 0.0;
let mut comp_x: f64 = 0.0;
let mut comp_y: f64 = 0.0;
let mut comp_xy: f64 = 0.0;
let mut comp_x2: f64 = 0.0;

let mut const_y = true;
let init_y: f64 = values.value(0);

for (i, value) in values.iter().enumerate() {
let time = times.value(i) as f64;
let value = value.unwrap();
if const_y && i > 0 && value != init_y {
const_y = false;
}
count += 1.0;
let x = time - intercept_time as f64 / 1e3;
(sum_x, comp_x) = compensated_sum_inc(x, sum_x, comp_x);
(sum_y, comp_y) = compensated_sum_inc(value, sum_y, comp_y);
(sum_xy, comp_xy) = compensated_sum_inc(x * value, sum_xy, comp_xy);
(sum_x2, comp_x2) = compensated_sum_inc(x * x, sum_x2, comp_x2);
}

if const_y {
if init_y.is_finite() {
return (None, None);
}
return (Some(0.0), Some(init_y));
}

sum_x += comp_x;
sum_y += comp_y;
sum_xy += comp_xy;
sum_x2 += comp_x2;

let cov_xy = sum_xy - sum_x * sum_y / count;
let var_x = sum_x2 - sum_x * sum_x / count;

let slope = cov_xy / var_x;
let intercept = sum_y / count - slope * sum_x / count;

(Some(slope), Some(intercept))
}

#[cfg(test)]
mod test {
use super::*;
Expand Down
Loading

0 comments on commit b84892e

Please sign in to comment.