Skip to content

Commit

Permalink
feat: impl literal only PromQL query (#1641)
Browse files Browse the repository at this point in the history
* refactor EmptyMetric to accept expr

Signed-off-by: Ruihang Xia <[email protected]>

* impl literal only query

Signed-off-by: Ruihang Xia <[email protected]>

* add empty line

Signed-off-by: Ruihang Xia <[email protected]>

* clean up

Signed-off-by: Ruihang Xia <[email protected]>

* support literal on HTTP gateway

Signed-off-by: Ruihang Xia <[email protected]>

* fix clippy (again)

Signed-off-by: Ruihang Xia <[email protected]>

---------

Signed-off-by: Ruihang Xia <[email protected]>
  • Loading branch information
waynexia authored May 26, 2023
1 parent f0a519b commit 0da9493
Show file tree
Hide file tree
Showing 7 changed files with 239 additions and 62 deletions.
2 changes: 1 addition & 1 deletion src/promql/src/extension_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ mod range_manipulate;
mod series_divide;

use datafusion::arrow::datatypes::{ArrowPrimitiveType, TimestampMillisecondType};
pub use empty_metric::{EmptyMetric, EmptyMetricExec, EmptyMetricStream};
pub use empty_metric::{build_special_time_expr, EmptyMetric, EmptyMetricExec, EmptyMetricStream};
pub use instant_manipulate::{InstantManipulate, InstantManipulateExec, InstantManipulateStream};
pub use normalize::{SeriesNormalize, SeriesNormalizeExec, SeriesNormalizeStream};
pub use planner::PromExtensionPlanner;
Expand Down
163 changes: 123 additions & 40 deletions src/promql/src/extension_plan/empty_metric.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,35 +14,45 @@

use std::any::Any;
use std::collections::HashMap;
use std::ops::Div;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};

use datafusion::arrow::array::Float64Array;
use datafusion::arrow::datatypes::{DataType, TimeUnit};
use datafusion::arrow::datatypes::{DataType, Schema as ArrowSchema, TimeUnit};
use datafusion::common::{DFField, DFSchema, DFSchemaRef, Result as DataFusionResult, Statistics};
use datafusion::error::DataFusionError;
use datafusion::execution::context::TaskContext;
use datafusion::logical_expr::{LogicalPlan, UserDefinedLogicalNodeCore};
use datafusion::physical_expr::PhysicalSortExpr;
use datafusion::execution::context::{SessionState, TaskContext};
use datafusion::logical_expr::{ExprSchemable, LogicalPlan, UserDefinedLogicalNodeCore};
use datafusion::physical_expr::{PhysicalExprRef, PhysicalSortExpr};
use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
use datafusion::physical_plan::{
DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream, SendableRecordBatchStream,
DisplayFormatType, ExecutionPlan, Partitioning, PhysicalPlanner, RecordBatchStream,
SendableRecordBatchStream,
};
use datafusion::prelude::Expr;
use datafusion::prelude::{col, lit, Expr};
use datatypes::arrow::array::TimestampMillisecondArray;
use datatypes::arrow::datatypes::SchemaRef;
use datatypes::arrow::record_batch::RecordBatch;
use futures::Stream;

use crate::extension_plan::Millisecond;

/// Empty source plan that generate record batch with two columns:
/// - time index column, computed from start, end and interval
/// - value column, generated by the input expr. The expr should not
/// reference any column except the time index column.
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct EmptyMetric {
start: Millisecond,
end: Millisecond,
interval: Millisecond,
schema: DFSchemaRef,
expr: Expr,
/// Schema that only contains the time index column.
/// This is for intermediate result only.
time_index_schema: DFSchemaRef,
/// Schema of the output record batch
result_schema: DFSchemaRef,
}

impl EmptyMetric {
Expand All @@ -52,16 +62,14 @@ impl EmptyMetric {
interval: Millisecond,
time_index_column_name: String,
field_column_name: String,
field_expr: Expr,
) -> DataFusionResult<Self> {
let ts_only_schema = build_ts_only_schema(&time_index_column_name);
let field_data_type = field_expr.get_type(&ts_only_schema)?;
let schema = Arc::new(DFSchema::new_with_metadata(
vec![
DFField::new(
Some(""),
&time_index_column_name,
DataType::Timestamp(TimeUnit::Millisecond, None),
false,
),
DFField::new(Some(""), &field_column_name, DataType::Float64, true),
ts_only_schema.field(0).clone(),
DFField::new(Some(""), &field_column_name, field_data_type, true),
],
HashMap::new(),
)?);
Expand All @@ -70,18 +78,33 @@ impl EmptyMetric {
start,
end,
interval,
schema,
time_index_schema: Arc::new(ts_only_schema),
result_schema: schema,
expr: field_expr,
})
}

pub fn to_execution_plan(&self) -> Arc<dyn ExecutionPlan> {
Arc::new(EmptyMetricExec {
pub fn to_execution_plan(
&self,
session_state: &SessionState,
physical_planner: &dyn PhysicalPlanner,
) -> DataFusionResult<Arc<dyn ExecutionPlan>> {
let physical_expr = physical_planner.create_physical_expr(
&self.expr,
&self.result_schema,
&ArrowSchema::from(self.result_schema.as_ref()),
session_state,
)?;

Ok(Arc::new(EmptyMetricExec {
start: self.start,
end: self.end,
interval: self.interval,
schema: Arc::new(self.schema.as_ref().into()),
time_index_schema: Arc::new(self.time_index_schema.as_ref().into()),
result_schema: Arc::new(self.result_schema.as_ref().into()),
expr: physical_expr,
metric: ExecutionPlanMetricsSet::new(),
})
}))
}
}

Expand All @@ -95,7 +118,7 @@ impl UserDefinedLogicalNodeCore for EmptyMetric {
}

fn schema(&self) -> &DFSchemaRef {
&self.schema
&self.result_schema
}

fn expressions(&self) -> Vec<Expr> {
Expand All @@ -120,7 +143,12 @@ pub struct EmptyMetricExec {
start: Millisecond,
end: Millisecond,
interval: Millisecond,
schema: SchemaRef,
/// Schema that only contains the time index column.
/// This is for intermediate result only.
time_index_schema: SchemaRef,
/// Schema of the output record batch
result_schema: SchemaRef,
expr: PhysicalExprRef,

metric: ExecutionPlanMetricsSet,
}
Expand All @@ -131,7 +159,7 @@ impl ExecutionPlan for EmptyMetricExec {
}

fn schema(&self) -> SchemaRef {
self.schema.clone()
self.result_schema.clone()
}

fn output_partitioning(&self) -> Partitioning {
Expand Down Expand Up @@ -167,8 +195,10 @@ impl ExecutionPlan for EmptyMetricExec {
start: self.start,
end: self.end,
interval: self.interval,
expr: self.expr.clone(),
is_first_poll: true,
schema: self.schema.clone(),
time_index_schema: self.time_index_schema.clone(),
result_schema: self.result_schema.clone(),
metric: baseline_metric,
}))
}
Expand Down Expand Up @@ -204,15 +234,20 @@ pub struct EmptyMetricStream {
start: Millisecond,
end: Millisecond,
interval: Millisecond,
// only generate one record batch at the first poll
expr: PhysicalExprRef,
/// This stream only generate one record batch at the first poll
is_first_poll: bool,
schema: SchemaRef,
/// Schema that only contains the time index column.
/// This is for intermediate result only.
time_index_schema: SchemaRef,
/// Schema of the output record batch
result_schema: SchemaRef,
metric: BaselineMetrics,
}

impl RecordBatchStream for EmptyMetricStream {
fn schema(&self) -> SchemaRef {
self.schema.clone()
self.result_schema.clone()
}
}

Expand All @@ -223,17 +258,28 @@ impl Stream for EmptyMetricStream {
let result = if self.is_first_poll {
self.is_first_poll = false;
let _timer = self.metric.elapsed_compute().timer();
let result_array = (self.start..=self.end)

// build the time index array, and a record batch that
// only contains that array as the input of field expr
let time_array = (self.start..=self.end)
.step_by(self.interval as _)
.collect::<Vec<_>>();
let float_array =
Float64Array::from_iter(result_array.iter().map(|v| *v as f64 / 1000.0));
let millisecond_array = TimestampMillisecondArray::from(result_array);
let batch = RecordBatch::try_new(
self.schema.clone(),
vec![Arc::new(millisecond_array), Arc::new(float_array)],
)
.map_err(DataFusionError::ArrowError);
let time_array = Arc::new(TimestampMillisecondArray::from(time_array));
let input_record_batch =
RecordBatch::try_new(self.time_index_schema.clone(), vec![time_array.clone()])
.map_err(DataFusionError::ArrowError)?;

// evaluate the field expr and get the result
let field_array = self
.expr
.evaluate(&input_record_batch)?
.into_array(time_array.len());

// assemble the output record batch
let batch =
RecordBatch::try_new(self.result_schema.clone(), vec![time_array, field_array])
.map_err(DataFusionError::ArrowError);

Poll::Ready(Some(batch))
} else {
Poll::Ready(None)
Expand All @@ -242,8 +288,34 @@ impl Stream for EmptyMetricStream {
}
}

/// Build a schema that only contains **millisecond** timestamp column
fn build_ts_only_schema(column_name: &str) -> DFSchema {
let ts_field = DFField::new(
Some(""),
column_name,
DataType::Timestamp(TimeUnit::Millisecond, None),
false,
);
// safety: should not fail (UT covers this)
DFSchema::new_with_metadata(vec![ts_field], HashMap::new()).unwrap()
}

// Convert timestamp column to UNIX epoch second:
// https://prometheus.io/docs/prometheus/latest/querying/functions/#time
pub fn build_special_time_expr(time_index_column_name: &str) -> Expr {
let input_schema = build_ts_only_schema(time_index_column_name);
// safety: should not failed (UT covers this)
col(time_index_column_name)
.cast_to(&DataType::Int64, &input_schema)
.unwrap()
.cast_to(&DataType::Float64, &input_schema)
.unwrap()
.div(lit(1000.0)) // cast to second will lost precision, so we cast to float64 first and manually divide by 1000
}

#[cfg(test)]
mod test {
use datafusion::physical_plan::planner::DefaultPhysicalPlanner;
use datafusion::prelude::SessionContext;

use super::*;
Expand All @@ -256,11 +328,22 @@ mod test {
field_column_name: String,
expected: String,
) {
let empty_metric =
EmptyMetric::new(start, end, interval, time_column_name, field_column_name).unwrap();
let empty_metric_exec = empty_metric.to_execution_plan();

let session_context = SessionContext::default();
let df_default_physical_planner = DefaultPhysicalPlanner::default();
let time_expr = build_special_time_expr(&time_column_name);
let empty_metric = EmptyMetric::new(
start,
end,
interval,
time_column_name,
field_column_name,
time_expr,
)
.unwrap();
let empty_metric_exec = empty_metric
.to_execution_plan(&session_context.state(), &df_default_physical_planner)
.unwrap();

let result =
datafusion::physical_plan::collect(empty_metric_exec, session_context.task_ctx())
.await
Expand Down
6 changes: 3 additions & 3 deletions src/promql/src/extension_plan/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,11 @@ pub struct PromExtensionPlanner;
impl ExtensionPlanner for PromExtensionPlanner {
async fn plan_extension(
&self,
_planner: &dyn PhysicalPlanner,
planner: &dyn PhysicalPlanner,
node: &dyn UserDefinedLogicalNode,
_logical_inputs: &[&LogicalPlan],
physical_inputs: &[Arc<dyn ExecutionPlan>],
_session_state: &SessionState,
session_state: &SessionState,
) -> DfResult<Option<Arc<dyn ExecutionPlan>>> {
if let Some(node) = node.as_any().downcast_ref::<SeriesNormalize>() {
Ok(Some(node.to_execution_plan(physical_inputs[0].clone())))
Expand All @@ -46,7 +46,7 @@ impl ExtensionPlanner for PromExtensionPlanner {
} else if let Some(node) = node.as_any().downcast_ref::<SeriesDivide>() {
Ok(Some(node.to_execution_plan(physical_inputs[0].clone())))
} else if let Some(node) = node.as_any().downcast_ref::<EmptyMetric>() {
Ok(Some(node.to_execution_plan()))
Ok(Some(node.to_execution_plan(session_state, planner)?))
} else {
Ok(None)
}
Expand Down
Loading

0 comments on commit 0da9493

Please sign in to comment.