Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: add sqlness tests for some promql function #1838

Merged
merged 8 commits into from
Jun 27, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/promql/src/extension_plan/range_manipulate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ impl ExecutionPlan for RangeManipulateExec {
DisplayFormatType::Default => {
write!(
f,
"PromInstantManipulateExec: req range=[{}..{}], interval=[{}], eval range=[{}], time index=[{}]",
"PromRangeManipulateExec: req range=[{}..{}], interval=[{}], eval range=[{}], time index=[{}]",
self.start, self.end, self.interval, self.range, self.time_index_column
)
}
Expand Down
18 changes: 14 additions & 4 deletions src/promql/src/extension_plan/series_divide.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ use datafusion::logical_expr::{EmptyRelation, Expr, LogicalPlan, UserDefinedLogi
use datafusion::physical_expr::PhysicalSortExpr;
use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
use datafusion::physical_plan::{
DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream, SendableRecordBatchStream,
Statistics,
DisplayFormatType, Distribution, ExecutionPlan, Partitioning, RecordBatchStream,
SendableRecordBatchStream, Statistics,
};
use datatypes::arrow::compute;
use futures::{ready, Stream, StreamExt};
Expand Down Expand Up @@ -129,9 +129,15 @@ impl ExecutionPlan for SeriesDivideExec {
}

fn output_partitioning(&self) -> Partitioning {
self.input.output_partitioning()
Partitioning::UnknownPartitioning(1)
}

fn required_input_distribution(&self) -> Vec<Distribution> {
vec![Distribution::SinglePartition]
}

// TODO(ruihang): specify required input ordering

fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
self.input.output_ordering()
}
Expand Down Expand Up @@ -229,7 +235,7 @@ impl Stream for SeriesDivideStream {
loop {
if let Some(batch) = self.buffer.clone() {
let same_length = self.find_first_diff_row(&batch) + 1;
if same_length == batch.num_rows() {
if same_length >= batch.num_rows() {
let next_batch = match ready!(self.as_mut().fetch_next_batch(cx)) {
Some(Ok(batch)) => batch,
None => {
Expand Down Expand Up @@ -277,6 +283,10 @@ impl SeriesDivideStream {
}

fn find_first_diff_row(&self, batch: &RecordBatch) -> usize {
if self.tag_indices.is_empty() {
return batch.num_rows();
}

let num_rows = batch.num_rows();
let mut result = num_rows;

Expand Down
3 changes: 1 addition & 2 deletions src/promql/src/functions/quantile.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,7 @@ impl QuantileOverTime {

fn calc(&self, input: &[ColumnarValue]) -> Result<ColumnarValue, DataFusionError> {
// construct matrix from input.
// The third one is quantile param, which is included in fields.
assert_eq!(input.len(), 3);
assert_eq!(input.len(), 2);
let ts_array = extract_array(&input[0])?;
let value_array = extract_array(&input[1])?;

Expand Down
27 changes: 14 additions & 13 deletions src/promql/src/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::{BTreeSet, HashSet};
use std::collections::{BTreeSet, HashSet, VecDeque};
use std::str::FromStr;
use std::sync::Arc;
use std::time::UNIX_EPOCH;
Expand Down Expand Up @@ -833,9 +833,10 @@ impl PromPlanner {
fn create_function_expr(
&mut self,
func: &Function,
mut other_input_exprs: Vec<DfExpr>,
other_input_exprs: Vec<DfExpr>,
) -> Result<Vec<DfExpr>> {
// TODO(ruihang): check function args list
let mut other_input_exprs: VecDeque<DfExpr> = other_input_exprs.into();

// TODO(ruihang): set this according to in-param list
let field_column_pos = 0;
Expand Down Expand Up @@ -865,8 +866,8 @@ impl PromPlanner {
"stddev_over_time" => ScalarFunc::Udf(StddevOverTime::scalar_udf()),
"stdvar_over_time" => ScalarFunc::Udf(StdvarOverTime::scalar_udf()),
"quantile_over_time" => {
let quantile_expr = match other_input_exprs.get(0) {
Some(DfExpr::Literal(ScalarValue::Float64(Some(quantile)))) => *quantile,
let quantile_expr = match other_input_exprs.pop_front() {
Some(DfExpr::Literal(ScalarValue::Float64(Some(quantile)))) => quantile,
other => UnexpectedPlanExprSnafu {
desc: format!("expect f64 literal as quantile, but found {:?}", other),
}
Expand All @@ -875,8 +876,8 @@ impl PromPlanner {
ScalarFunc::Udf(QuantileOverTime::scalar_udf(quantile_expr))
}
"predict_linear" => {
let t_expr = match other_input_exprs.get(0) {
Some(DfExpr::Literal(ScalarValue::Time64Microsecond(Some(t)))) => *t,
let t_expr = match other_input_exprs.pop_front() {
Some(DfExpr::Literal(ScalarValue::Time64Microsecond(Some(t)))) => t,
other => UnexpectedPlanExprSnafu {
desc: format!("expect i64 literal as t, but found {:?}", other),
}
Expand All @@ -885,8 +886,8 @@ impl PromPlanner {
ScalarFunc::Udf(PredictLinear::scalar_udf(t_expr))
}
"holt_winters" => {
let sf_exp = match other_input_exprs.get(0) {
Some(DfExpr::Literal(ScalarValue::Float64(Some(sf)))) => *sf,
let sf_exp = match other_input_exprs.pop_front() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for improving it. Looks much better than .get(0) 👍

Some(DfExpr::Literal(ScalarValue::Float64(Some(sf)))) => sf,
other => UnexpectedPlanExprSnafu {
desc: format!(
"expect f64 literal as smoothing factor, but found {:?}",
Expand All @@ -895,8 +896,8 @@ impl PromPlanner {
}
.fail()?,
};
let tf_exp = match other_input_exprs.get(1) {
Some(DfExpr::Literal(ScalarValue::Float64(Some(tf)))) => *tf,
let tf_exp = match other_input_exprs.pop_front() {
Some(DfExpr::Literal(ScalarValue::Float64(Some(tf)))) => tf,
other => UnexpectedPlanExprSnafu {
desc: format!("expect f64 literal as trend factor, but found {:?}", other),
}
Expand Down Expand Up @@ -924,7 +925,7 @@ impl PromPlanner {
other_input_exprs.insert(field_column_pos, col_expr);
let fn_expr = DfExpr::ScalarFunction(ScalarFunction {
fun,
args: other_input_exprs.clone(),
args: other_input_exprs.clone().into(),
});
exprs.push(fn_expr);
other_input_exprs.remove(field_column_pos);
Expand All @@ -939,7 +940,7 @@ impl PromPlanner {
other_input_exprs.insert(field_column_pos + 1, col_expr);
let fn_expr = DfExpr::ScalarUDF(ScalarUDF {
fun: Arc::new(fun),
args: other_input_exprs.clone(),
args: other_input_exprs.clone().into(),
});
exprs.push(fn_expr);
other_input_exprs.remove(field_column_pos + 1);
Expand All @@ -957,7 +958,7 @@ impl PromPlanner {
.insert(field_column_pos + 2, self.create_time_index_column_expr()?);
let fn_expr = DfExpr::ScalarUDF(ScalarUDF {
fun: Arc::new(fun),
args: other_input_exprs.clone(),
args: other_input_exprs.clone().into(),
});
exprs.push(fn_expr);
other_input_exprs.remove(field_column_pos + 2);
Expand Down
2 changes: 1 addition & 1 deletion tests/cases/distributed/tql-explain-analyze/analyze.result
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ TQL ANALYZE (0, 10, '5s') test;
| Plan with Metrics | CoalescePartitionsExec, REDACTED
|_|_PromInstantManipulateExec: range=[0..10000], lookback=[300000], interval=[5000], time index=[j], REDACTED
|_|_PromSeriesNormalizeExec: offset=[0], time index=[j], filter NaN: [false], REDACTED
|_|_PromSeriesDivideExec: tags=["k"], REDACTED
|_|_RepartitionExec: partitioning=REDACTED
|_|_RepartitionExec: partitioning=REDACTED
|_|_PromSeriesDivideExec: tags=["k"], REDACTED
|_|_MergeScanExec: peers=[REDACTED
|_|_|
+-+-+
Expand Down
8 changes: 4 additions & 4 deletions tests/cases/distributed/tql-explain-analyze/explain.result
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ TQL EXPLAIN (0, 10, '5s') test;
| | TableScan: test projection=[i, j, k], partial_filters=[j >= TimestampMillisecond(-300000, None), j <= TimestampMillisecond(300000, None)] |
| physical_plan | PromInstantManipulateExec: range=[0..0], lookback=[300000], interval=[300000], time index=[j] |
| | PromSeriesNormalizeExec: offset=[0], time index=[j], filter NaN: [false] |
| | PromSeriesDivideExec: tags=["k"] |
| | RepartitionExec: partitioning=REDACTED
| | RepartitionExec: partitioning=REDACTED
| | PromSeriesDivideExec: tags=["k"] |
| | MergeScanExec: peers=[REDACTED
| | |
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------+
Expand Down Expand Up @@ -58,8 +58,8 @@ TQL EXPLAIN host_load1{__field__="val"};
| | TableScan: host_load1 projection=[ts, collector, host, val], partial_filters=[ts >= TimestampMillisecond(-300000, None), ts <= TimestampMillisecond(300000, None)] |
| physical_plan | PromInstantManipulateExec: range=[0..0], lookback=[300000], interval=[300000], time index=[ts] |
| | PromSeriesNormalizeExec: offset=[0], time index=[ts], filter NaN: [false] |
| | PromSeriesDivideExec: tags=["collector", "host"] |
| | RepartitionExec: partitioning=REDACTED
| | RepartitionExec: partitioning=REDACTED
| | PromSeriesDivideExec: tags=["collector", "host"] |
| | ProjectionExec: expr=[val@3 as val, collector@1 as collector, host@2 as host, ts@0 as ts] |
| | MergeScanExec: peers=[REDACTED
| | |
Expand Down
105 changes: 105 additions & 0 deletions tests/cases/standalone/common/tql/aggr_over_time.result
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
create table metric (ts timestamp(3) time index, val double);

Affected Rows: 0

insert into metric values
(0,0),
(10000,8),
(20000,8),
(30000,2),
(40000,3);

Affected Rows: 5

select * from metric;

+---------------------+-----+
| ts | val |
+---------------------+-----+
| 1970-01-01T00:00:00 | 0.0 |
| 1970-01-01T00:00:10 | 8.0 |
| 1970-01-01T00:00:20 | 8.0 |
| 1970-01-01T00:00:30 | 2.0 |
| 1970-01-01T00:00:40 | 3.0 |
+---------------------+-----+

tql eval (60, 61, '10s') stdvar_over_time(metric[1m]);

+---------------------+-------------------------------------+
| ts | prom_stdvar_over_time(ts_range,val) |
+---------------------+-------------------------------------+
| 1970-01-01T00:01:00 | 10.559999999999999 |
+---------------------+-------------------------------------+

tql eval (60, 60, '1s') stddev_over_time(metric[1m]);

+---------------------+-------------------------------------+
| ts | prom_stddev_over_time(ts_range,val) |
+---------------------+-------------------------------------+
| 1970-01-01T00:01:00 | 3.249615361854384 |
+---------------------+-------------------------------------+

tql eval (60, 60, '1s') stddev_over_time((metric[1m]));

+---------------------+-------------------------------------+
| ts | prom_stddev_over_time(ts_range,val) |
+---------------------+-------------------------------------+
| 1970-01-01T00:01:00 | 3.249615361854384 |
+---------------------+-------------------------------------+

drop table metric;

Affected Rows: 1

create table metric (ts timestamp(3) time index, val double);

Affected Rows: 0

insert into metric values
(0,0),
(10000,1.5990505637277868),
(20000,1.5990505637277868),
(30000,1.5990505637277868);

Affected Rows: 4

tql eval (60, 60, '1s') stdvar_over_time(metric[1m]);

+---------------------+-------------------------------------+
| ts | prom_stdvar_over_time(ts_range,val) |
+---------------------+-------------------------------------+
| 1970-01-01T00:01:00 | 0.47943050725465364 |
+---------------------+-------------------------------------+

tql eval (60, 60, '1s') stddev_over_time(metric[1m]);

+---------------------+-------------------------------------+
| ts | prom_stddev_over_time(ts_range,val) |
+---------------------+-------------------------------------+
| 1970-01-01T00:01:00 | 0.6924092050620454 |
+---------------------+-------------------------------------+

drop table metric;

Affected Rows: 1

create table data (ts timestamp(3) time index, val double, test string primary key);

Affected Rows: 0

insert into data values
(0, 0, "two samples"),
(10000, 1, "two samples"),
(0, 0, "three samples"),
(10000, 1, "three samples"),
(20000, 2, "three samples"),
(0, 0, "uneven samples"),
(10000, 1, "uneven samples"),
(20000, 4, "uneven samples");

Affected Rows: 8

drop table data;

Affected Rows: 1

Loading