diff --git a/Cargo.lock b/Cargo.lock index 65990f265ebc..7559787902a3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5140,6 +5140,7 @@ dependencies = [ name = "promql" version = "0.1.0" dependencies = [ + "async-trait", "bytemuck", "catalog", "common-catalog", @@ -5389,6 +5390,7 @@ dependencies = [ "num-traits", "once_cell", "paste", + "promql", "promql-parser", "rand 0.8.5", "serde", diff --git a/src/datanode/src/instance/sql.rs b/src/datanode/src/instance/sql.rs index 2653acb9eaf1..57f58666fc02 100644 --- a/src/datanode/src/instance/sql.rs +++ b/src/datanode/src/instance/sql.rs @@ -159,6 +159,11 @@ impl Instance { let stmt = QueryLanguageParser::parse_sql(sql).context(ExecuteSqlSnafu)?; self.execute_stmt(stmt, query_ctx).await } + + pub async fn execute_promql(&self, sql: &str, query_ctx: QueryContextRef) -> Result { + let stmt = QueryLanguageParser::parse_promql(sql).context(ExecuteSqlSnafu)?; + self.execute_stmt(stmt, query_ctx).await + } } // TODO(LFC): Refactor consideration: move this function to some helper mod, diff --git a/src/datanode/src/tests.rs b/src/datanode/src/tests.rs index 030ce4dbaadd..1841b00baa86 100644 --- a/src/datanode/src/tests.rs +++ b/src/datanode/src/tests.rs @@ -13,4 +13,5 @@ // limitations under the License. mod instance_test; +mod promql_test; pub(crate) mod test_util; diff --git a/src/datanode/src/tests/instance_test.rs b/src/datanode/src/tests/instance_test.rs index a69f21e59019..215cf75a6d87 100644 --- a/src/datanode/src/tests/instance_test.rs +++ b/src/datanode/src/tests/instance_test.rs @@ -21,7 +21,7 @@ use datatypes::data_type::ConcreteDataType; use datatypes::vectors::{Int64Vector, StringVector, UInt64Vector, VectorRef}; use session::context::QueryContext; -use crate::tests::test_util::{self, MockInstance}; +use crate::tests::test_util::{self, check_output_stream, setup_test_instance, MockInstance}; #[tokio::test(flavor = "multi_thread")] async fn test_create_database_and_insert_query() { @@ -69,6 +69,7 @@ async fn test_create_database_and_insert_query() { _ => unreachable!(), } } + #[tokio::test(flavor = "multi_thread")] async fn test_issue477_same_table_name_in_different_databases() { let instance = MockInstance::new("test_issue477_same_table_name_in_different_databases").await; @@ -158,19 +159,6 @@ async fn assert_query_result(instance: &MockInstance, sql: &str, ts: i64, host: } } -async fn setup_test_instance(test_name: &str) -> MockInstance { - let instance = MockInstance::new(test_name).await; - - test_util::create_test_table( - instance.inner(), - ConcreteDataType::timestamp_millisecond_datatype(), - ) - .await - .unwrap(); - - instance -} - #[tokio::test(flavor = "multi_thread")] async fn test_execute_insert() { let instance = setup_test_instance("test_execute_insert").await; @@ -354,16 +342,6 @@ pub async fn test_execute_create() { assert!(matches!(output, Output::AffectedRows(0))); } -async fn check_output_stream(output: Output, expected: String) { - let recordbatches = match output { - Output::Stream(stream) => util::collect_batches(stream).await.unwrap(), - Output::RecordBatches(recordbatches) => recordbatches, - _ => unreachable!(), - }; - let pretty_print = recordbatches.pretty_print().unwrap(); - assert_eq!(pretty_print, expected); -} - #[tokio::test(flavor = "multi_thread")] async fn test_alter_table() { let instance = setup_test_instance("test_alter_table").await; diff --git a/src/datanode/src/tests/promql_test.rs b/src/datanode/src/tests/promql_test.rs new file mode 100644 index 000000000000..9ae533fef7b0 --- /dev/null +++ b/src/datanode/src/tests/promql_test.rs @@ -0,0 +1,73 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; +use common_query::Output; +use session::context::QueryContext; + +use crate::tests::test_util::{check_output_stream, setup_test_instance}; + +#[tokio::test(flavor = "multi_thread")] +async fn sql_insert_promql_query_ceil() { + let instance = setup_test_instance("test_execute_insert").await; + let query_ctx = Arc::new(QueryContext::with( + DEFAULT_CATALOG_NAME.to_owned(), + DEFAULT_SCHEMA_NAME.to_owned(), + )); + let put_output = instance + .inner() + .execute_sql( + r#"insert into demo(host, cpu, memory, ts) values + ('host1', 66.6, 1024, 0), + ('host1', 66.6, 2048, 2000), + ('host1', 66.6, 4096, 5000), + ('host1', 43.1, 8192, 7000), + ('host1', 19.1, 10240, 9000), + ('host1', 99.1, 20480, 10000), + ('host1', 999.9, 40960, 21000), + ('host1', 31.9, 8192, 22000), + ('host1', 95.4, 333.3, 32000), + ('host1', 12423.1, 1333.3, 49000), + ('host1', 0, 2333.3, 80000), + ('host1', 49, 3333.3, 99000); + "#, + query_ctx.clone(), + ) + .await + .unwrap(); + assert!(matches!(put_output, Output::AffectedRows(12))); + + let promql = "ceil(demo{host=\"host1\"})"; + let query_output = instance + .inner() + .execute_promql(promql, query_ctx) + .await + .unwrap(); + let expected = String::from( + "\ ++---------------------+----------------+-------------------+ +| ts | ceil(demo.cpu) | ceil(demo.memory) | ++---------------------+----------------+-------------------+ +| 1970-01-01T00:00:00 | 67 | 1024 | +| 1970-01-01T00:00:05 | 67 | 4096 | +| 1970-01-01T00:00:10 | 100 | 20480 | +| 1970-01-01T00:00:50 | 12424 | 1334 | +| 1970-01-01T00:01:20 | 0 | 2334 | +| 1970-01-01T00:01:40 | 49 | 3334 | ++---------------------+----------------+-------------------+", + ); + check_output_stream(query_output, expected).await; +} diff --git a/src/datanode/src/tests/test_util.rs b/src/datanode/src/tests/test_util.rs index e1911cc110a2..d21603d0e0c3 100644 --- a/src/datanode/src/tests/test_util.rs +++ b/src/datanode/src/tests/test_util.rs @@ -16,6 +16,8 @@ use std::collections::HashMap; use std::sync::Arc; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MIN_USER_TABLE_ID}; +use common_query::Output; +use common_recordbatch::util; use datatypes::data_type::ConcreteDataType; use datatypes::schema::{ColumnSchema, SchemaBuilder}; use mito::config::EngineConfig; @@ -85,7 +87,7 @@ pub(crate) async fn create_test_table( ts_type: ConcreteDataType, ) -> Result<()> { let column_schemas = vec![ - ColumnSchema::new("host", ConcreteDataType::string_datatype(), false), + ColumnSchema::new("host", ConcreteDataType::string_datatype(), true), ColumnSchema::new("cpu", ConcreteDataType::float64_datatype(), true), ColumnSchema::new("memory", ConcreteDataType::float64_datatype(), true), ColumnSchema::new("ts", ts_type, true).with_time_index(true), @@ -146,3 +148,26 @@ pub async fn create_mock_sql_handler() -> SqlHandler { SqlHandler::new(mock_engine, catalog_manager, factory.query_engine()) } + +pub(crate) async fn setup_test_instance(test_name: &str) -> MockInstance { + let instance = MockInstance::new(test_name).await; + + create_test_table( + instance.inner(), + ConcreteDataType::timestamp_millisecond_datatype(), + ) + .await + .unwrap(); + + instance +} + +pub async fn check_output_stream(output: Output, expected: String) { + let recordbatches = match output { + Output::Stream(stream) => util::collect_batches(stream).await.unwrap(), + Output::RecordBatches(recordbatches) => recordbatches, + _ => unreachable!(), + }; + let pretty_print = recordbatches.pretty_print().unwrap(); + assert_eq!(pretty_print, expected); +} diff --git a/src/promql/Cargo.toml b/src/promql/Cargo.toml index 06dd29a7f7e1..027f25e5820e 100644 --- a/src/promql/Cargo.toml +++ b/src/promql/Cargo.toml @@ -5,6 +5,7 @@ edition.workspace = true license.workspace = true [dependencies] +async-trait.workspace = true bytemuck = "1.12" catalog = { path = "../catalog" } common-error = { path = "../common/error" } @@ -13,10 +14,10 @@ datafusion.workspace = true datatypes = { path = "../datatypes" } futures = "0.3" promql-parser = { git = "https://github.com/GreptimeTeam/promql-parser.git", rev = "d2f6ec4bbbae19b5156cfc977a6e7de9c6684651" } -query = { path = "../query" } session = { path = "../session" } snafu = { version = "0.7", features = ["backtraces"] } table = { path = "../table" } [dev-dependencies] tokio = { version = "1.23", features = ["full"] } +query = { path = "../query" } diff --git a/src/promql/src/error.rs b/src/promql/src/error.rs index 6a072f2bf896..4a055729f0d2 100644 --- a/src/promql/src/error.rs +++ b/src/promql/src/error.rs @@ -26,6 +26,7 @@ pub enum Error { #[snafu(display("Internal error during build DataFusion plan, error: {}", source))] DataFusionPlanning { source: datafusion::error::DataFusionError, + backtrace: Backtrace, }, #[snafu(display("Unexpected plan or expression: {}", desc))] @@ -37,6 +38,9 @@ pub enum Error { #[snafu(display("Cannot find time index column in table {}", table))] TimeIndexNotFound { table: String, backtrace: Backtrace }, + #[snafu(display("Cannot find value columns in table {}", table))] + ValueNotFound { table: String, backtrace: Backtrace }, + #[snafu(display("Cannot find the table {}", table))] TableNotFound { table: String, @@ -84,6 +88,7 @@ impl ErrorExt for Error { use Error::*; match self { TimeIndexNotFound { .. } + | ValueNotFound { .. } | UnsupportedExpr { .. } | MultipleVector { .. } | ExpectExpr { .. } => StatusCode::InvalidArguments, diff --git a/src/promql/src/extension_plan.rs b/src/promql/src/extension_plan.rs index 820f29a9941a..9363ff73d45e 100644 --- a/src/promql/src/extension_plan.rs +++ b/src/promql/src/extension_plan.rs @@ -14,11 +14,13 @@ mod instant_manipulate; mod normalize; +mod planner; mod range_manipulate; use datafusion::arrow::datatypes::{ArrowPrimitiveType, TimestampMillisecondType}; pub use instant_manipulate::{InstantManipulate, InstantManipulateExec, InstantManipulateStream}; pub use normalize::{SeriesNormalize, SeriesNormalizeExec, SeriesNormalizeStream}; +pub use planner::PromExtensionPlanner; pub use range_manipulate::{RangeManipulate, RangeManipulateExec, RangeManipulateStream}; pub(crate) type Millisecond = ::Native; diff --git a/src/promql/src/extension_plan/planner.rs b/src/promql/src/extension_plan/planner.rs new file mode 100644 index 000000000000..5ad9bacf2df6 --- /dev/null +++ b/src/promql/src/extension_plan/planner.rs @@ -0,0 +1,49 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use async_trait::async_trait; +use datafusion::error::Result as DfResult; +use datafusion::execution::context::SessionState; +use datafusion::logical_expr::{LogicalPlan, UserDefinedLogicalNode}; +use datafusion::physical_plan::planner::ExtensionPlanner; +use datafusion::physical_plan::{ExecutionPlan, PhysicalPlanner}; + +use super::{InstantManipulate, RangeManipulate}; +use crate::extension_plan::SeriesNormalize; + +pub struct PromExtensionPlanner {} + +#[async_trait] +impl ExtensionPlanner for PromExtensionPlanner { + async fn plan_extension( + &self, + _planner: &dyn PhysicalPlanner, + node: &dyn UserDefinedLogicalNode, + _logical_inputs: &[&LogicalPlan], + physical_inputs: &[Arc], + _session_state: &SessionState, + ) -> DfResult>> { + if let Some(node) = node.as_any().downcast_ref::() { + Ok(Some(node.to_execution_plan(physical_inputs[0].clone()))) + } else if let Some(node) = node.as_any().downcast_ref::() { + Ok(Some(node.to_execution_plan(physical_inputs[0].clone()))) + } else if let Some(node) = node.as_any().downcast_ref::() { + Ok(Some(node.to_execution_plan(physical_inputs[0].clone()))) + } else { + Ok(None) + } + } +} diff --git a/src/promql/src/planner.rs b/src/promql/src/planner.rs index 81febf2b6fee..2799c6d2bacc 100644 --- a/src/promql/src/planner.rs +++ b/src/promql/src/planner.rs @@ -18,8 +18,9 @@ use std::time::{Duration, UNIX_EPOCH}; use datafusion::datasource::DefaultTableSource; use datafusion::logical_expr::{ - BinaryExpr, BuiltinScalarFunction, Extension, LogicalPlan, LogicalPlanBuilder, Operator, + BinaryExpr, BuiltinScalarFunction, Extension, Filter, LogicalPlan, LogicalPlanBuilder, Operator, }; +use datafusion::optimizer::utils; use datafusion::prelude::{Column, Expr as DfExpr}; use datafusion::scalar::ScalarValue; use datafusion::sql::planner::ContextProvider; @@ -32,6 +33,7 @@ use table::table::adapter::DfTableProviderAdapter; use crate::error::{ DataFusionPlanningSnafu, ExpectExprSnafu, MultipleVectorSnafu, Result, TableNameNotFoundSnafu, TableNotFoundSnafu, TimeIndexNotFoundSnafu, UnknownTableSnafu, UnsupportedExprSnafu, + ValueNotFoundSnafu, }; use crate::extension_plan::{InstantManipulate, Millisecond, SeriesNormalize}; @@ -149,6 +151,8 @@ impl PromPlanner { LogicalPlanBuilder::from(input) .project(func_exprs) .context(DataFusionPlanningSnafu)? + .filter(self.create_empty_values_filter_expr()?) + .context(DataFusionPlanningSnafu)? .build() .context(DataFusionPlanningSnafu)? } @@ -178,18 +182,38 @@ impl PromPlanner { label_matchers: Matchers, ) -> Result { let table_name = self.ctx.table_name.clone().unwrap(); - // TODO(ruihang): add time range filter - let filter = self.matchers_to_expr(label_matchers)?; - let table_scan = self.create_table_scan_plan(&table_name, filter)?; - let offset = offset.unwrap_or_default(); + // make filter exprs + let mut filters = self.matchers_to_expr(label_matchers)?; + filters.push(self.create_time_index_column_expr()?.gt_eq(DfExpr::Literal( + ScalarValue::TimestampMillisecond(Some(self.ctx.start), None), + ))); + filters.push(self.create_time_index_column_expr()?.lt_eq(DfExpr::Literal( + ScalarValue::TimestampMillisecond(Some(self.ctx.end), None), + ))); + + // make table scan with filter exprs + let table_scan = self.create_table_scan_plan(&table_name, filters.clone())?; + + // make filter plan + let filter_plan = LogicalPlan::Filter( + Filter::try_new( + // safety: at least there are two exprs that filter timestamp column. + utils::conjunction(filters.into_iter()).unwrap(), + Arc::new(table_scan), + ) + .context(DataFusionPlanningSnafu)?, + ); + + // make series_normalize plan + let offset = offset.unwrap_or_default(); let series_normalize = SeriesNormalize::new( offset, self.ctx .time_index_column .clone() .with_context(|| TimeIndexNotFoundSnafu { table: table_name })?, - table_scan, + filter_plan, ); let logical_plan = LogicalPlan::Extension(Extension { node: Arc::new(series_normalize), @@ -272,6 +296,7 @@ impl PromPlanner { .value_column_names() .cloned() .collect(); + self.ctx.value_columns = values; Ok(()) @@ -350,6 +375,18 @@ impl PromPlanner { .with_context(|| TimeIndexNotFoundSnafu { table: "unknown" })?, ))) } + + fn create_empty_values_filter_expr(&self) -> Result { + let mut exprs = Vec::with_capacity(self.ctx.value_columns.len()); + for value in &self.ctx.value_columns { + let expr = DfExpr::Column(Column::from_name(value)).is_not_null(); + exprs.push(expr); + } + + utils::conjunction(exprs.into_iter()).context(ValueNotFoundSnafu { + table: self.ctx.table_name.clone().unwrap(), + }) + } } #[derive(Default, Debug)] @@ -493,10 +530,12 @@ mod test { let plan = PromPlanner::stmt_to_plan(eval_stmt, context_provider).unwrap(); let expected = String::from( - "Projection: some_metric.timestamp, TEMPLATE(some_metric.field_0) [timestamp:Timestamp(Millisecond, None), TEMPLATE(some_metric.field_0):Float64;N]\ - \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ - \n PromSeriesNormalize: offset=[0], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ - \n TableScan: some_metric, unsupported_filters=[tag_0 != Utf8(\"bar\")] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]", + "Filter: some_metric.field_0 IS NOT NULL [timestamp:Timestamp(Millisecond, None), TEMPLATE(some_metric.field_0):Float64;N]\ + \n Projection: some_metric.timestamp, TEMPLATE(some_metric.field_0) [timestamp:Timestamp(Millisecond, None), TEMPLATE(some_metric.field_0):Float64;N]\ + \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ + \n PromSeriesNormalize: offset=[0], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ + \n Filter: tag_0 != Utf8(\"bar\") AND timestamp >= TimestampMillisecond(0, None) AND timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ + \n TableScan: some_metric, unsupported_filters=[tag_0 != Utf8(\"bar\"), timestamp >= TimestampMillisecond(0, None), timestamp <= TimestampMillisecond(100000000, None)] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]", ).replace("TEMPLATE", plan_name); assert_eq!(plan.display_indent_schema().to_string(), expected); diff --git a/src/query/Cargo.toml b/src/query/Cargo.toml index 5890f192b7b9..f5ffa0bd196f 100644 --- a/src/query/Cargo.toml +++ b/src/query/Cargo.toml @@ -26,6 +26,7 @@ futures = "0.3" futures-util.workspace = true metrics = "0.20" once_cell = "1.10" +promql = { path = "../promql" } promql-parser = { git = "https://github.com/GreptimeTeam/promql-parser.git", rev = "d2f6ec4bbbae19b5156cfc977a6e7de9c6684651" } serde.workspace = true serde_json = "1.0" diff --git a/src/query/src/datafusion.rs b/src/query/src/datafusion.rs index 32531263c828..f101a801dd9f 100644 --- a/src/query/src/datafusion.rs +++ b/src/query/src/datafusion.rs @@ -34,6 +34,8 @@ use common_recordbatch::{EmptyRecordBatchStream, SendableRecordBatchStream}; use common_telemetry::timer; use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec; use datafusion::physical_plan::ExecutionPlan; +use promql::planner::PromPlanner; +use promql_parser::parser::EvalStmt; use session::context::QueryContextRef; use snafu::{OptionExt, ResultExt}; use sql::statements::statement::Statement; @@ -41,7 +43,7 @@ use sql::statements::statement::Statement; pub use crate::datafusion::catalog_adapter::DfCatalogListAdapter; pub use crate::datafusion::planner::DfContextProviderAdapter; use crate::datafusion::planner::DfPlanner; -use crate::error::{QueryExecutionSnafu, Result}; +use crate::error::{QueryExecutionSnafu, QueryPlanSnafu, Result}; use crate::executor::QueryExecutor; use crate::logical_optimizer::LogicalOptimizer; use crate::parser::QueryStatement; @@ -66,8 +68,19 @@ impl DatafusionQueryEngine { fn plan_sql_stmt(&self, stmt: Statement, query_ctx: QueryContextRef) -> Result { let context_provider = DfContextProviderAdapter::new(self.state.clone(), query_ctx); let planner = DfPlanner::new(&context_provider); + planner + .statement_to_plan(stmt) + .map_err(BoxedError::new) + .context(QueryPlanSnafu) + } - planner.statement_to_plan(stmt) + // TODO(ruihang): test this method once parser is ready. + fn plan_promql_stmt(&self, stmt: EvalStmt, query_ctx: QueryContextRef) -> Result { + let context_provider = DfContextProviderAdapter::new(self.state.clone(), query_ctx); + PromPlanner::stmt_to_plan(stmt, context_provider) + .map(LogicalPlan::DfPlan) + .map_err(BoxedError::new) + .context(QueryPlanSnafu) } } @@ -85,7 +98,7 @@ impl QueryEngine for DatafusionQueryEngine { ) -> Result { match stmt { QueryStatement::Sql(stmt) => self.plan_sql_stmt(stmt, query_ctx), - QueryStatement::Promql(_) => unimplemented!(), + QueryStatement::Promql(stmt) => self.plan_promql_stmt(stmt, query_ctx), } } diff --git a/src/query/src/metric.rs b/src/query/src/metric.rs index d2427ff95dfd..ae306ff768c4 100644 --- a/src/query/src/metric.rs +++ b/src/query/src/metric.rs @@ -15,6 +15,7 @@ //! query engine metrics pub static METRIC_PARSE_SQL_ELAPSED: &str = "query.parse_sql_elapsed"; +pub static METRIC_PARSE_PROMQL_ELAPSED: &str = "query.parse_promql_elapsed"; pub static METRIC_OPTIMIZE_LOGICAL_ELAPSED: &str = "query.optimize_logicalplan_elapsed"; pub static METRIC_OPTIMIZE_PHYSICAL_ELAPSED: &str = "query.optimize_physicalplan_elapsed"; pub static METRIC_CREATE_PHYSICAL_ELAPSED: &str = "query.create_physicalplan_elapsed"; diff --git a/src/query/src/parser.rs b/src/query/src/parser.rs index f6ee6834153e..ca0403b95c6b 100644 --- a/src/query/src/parser.rs +++ b/src/query/src/parser.rs @@ -12,16 +12,19 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::time::Duration; + use common_error::prelude::BoxedError; use common_telemetry::timer; -use promql_parser::parser::EvalStmt; +use promql_parser::label::{MatchOp, Matcher, Matchers}; +use promql_parser::parser::{EvalStmt, Expr as PromExpr, Function, ValueType}; use snafu::ResultExt; use sql::dialect::GenericDialect; use sql::parser::ParserContext; use sql::statements::statement::Statement; use crate::error::{MultipleStatementsSnafu, QueryParseSnafu, Result}; -use crate::metric::METRIC_PARSE_SQL_ELAPSED; +use crate::metric::{METRIC_PARSE_PROMQL_ELAPSED, METRIC_PARSE_SQL_ELAPSED}; #[derive(Debug, Clone)] pub enum QueryStatement { @@ -48,6 +51,50 @@ impl QueryLanguageParser { Ok(QueryStatement::Sql(statement.pop().unwrap())) } } + + // TODO(ruihang): implement this method when parser is ready. + pub fn parse_promql(_promql: &str) -> Result { + let _timer = timer!(METRIC_PARSE_PROMQL_ELAPSED); + + let prom_expr = PromExpr::Call { + func: Function { + name: "ceil", + arg_types: vec![ValueType::Vector], + variadic: false, + return_type: ValueType::Vector, + }, + args: vec![Box::new(PromExpr::VectorSelector { + name: Some("demo".to_owned()), + offset: None, + start_or_end: None, + label_matchers: Matchers { + matchers: vec![ + Matcher { + op: MatchOp::Equal, + name: "host".to_string(), + value: "host1".to_string(), + }, + Matcher { + op: MatchOp::Equal, + name: promql_parser::label::METRIC_NAME.to_string(), + value: "demo".to_string(), + }, + ], + }, + })], + }; + let eval_stmt = EvalStmt { + expr: prom_expr, + start: std::time::UNIX_EPOCH, + end: std::time::UNIX_EPOCH + .checked_add(Duration::from_secs(100)) + .unwrap(), + interval: Duration::from_secs(5), + lookback_delta: Duration::from_secs(1), + }; + + Ok(QueryStatement::Promql(eval_stmt)) + } } #[cfg(test)] diff --git a/src/query/src/query_engine/state.rs b/src/query/src/query_engine/state.rs index 4ab0edc0dc85..e2f38543c9ed 100644 --- a/src/query/src/query_engine/state.rs +++ b/src/query/src/query_engine/state.rs @@ -16,6 +16,7 @@ use std::collections::HashMap; use std::fmt; use std::sync::{Arc, RwLock}; +use async_trait::async_trait; use catalog::CatalogListRef; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_function::scalars::aggregate::AggregateFunctionMetaRef; @@ -23,15 +24,17 @@ use common_query::physical_plan::{SessionContext, TaskContext}; use common_query::prelude::ScalarUdf; use datafusion::catalog::TableReference; use datafusion::error::Result as DfResult; -use datafusion::execution::context::{SessionConfig, SessionState}; +use datafusion::execution::context::{QueryPlanner, SessionConfig, SessionState}; use datafusion::execution::runtime_env::RuntimeEnv; +use datafusion::physical_plan::planner::DefaultPhysicalPlanner; use datafusion::physical_plan::udf::ScalarUDF; -use datafusion::physical_plan::ExecutionPlan; +use datafusion::physical_plan::{ExecutionPlan, PhysicalPlanner}; use datafusion_common::ScalarValue; use datafusion_expr::{LogicalPlan as DfLogicalPlan, TableSource}; use datafusion_optimizer::optimizer::Optimizer; use datafusion_sql::planner::ContextProvider; use datatypes::arrow::datatypes::DataType; +use promql::extension_plan::PromExtensionPlanner; use crate::datafusion::DfCatalogListAdapter; use crate::optimizer::TypeConversionRule; @@ -66,6 +69,7 @@ impl QueryEngineState { let mut session_state = SessionState::with_config_rt(session_config, runtime_env); session_state.optimizer = optimizer; session_state.catalog_list = Arc::new(DfCatalogListAdapter::new(catalog_list.clone())); + session_state.query_planner = Arc::new(DfQueryPlanner::new()); let df_context = SessionContext::with_state(session_state); @@ -158,3 +162,30 @@ impl QueryEngineState { Ok(plan) } } + +struct DfQueryPlanner { + physical_planner: DefaultPhysicalPlanner, +} + +#[async_trait] +impl QueryPlanner for DfQueryPlanner { + async fn create_physical_plan( + &self, + logical_plan: &DfLogicalPlan, + session_state: &SessionState, + ) -> DfResult> { + self.physical_planner + .create_physical_plan(logical_plan, session_state) + .await + } +} + +impl DfQueryPlanner { + fn new() -> Self { + Self { + physical_planner: DefaultPhysicalPlanner::with_extension_planners(vec![Arc::new( + PromExtensionPlanner {}, + )]), + } + } +} diff --git a/src/table/src/metadata.rs b/src/table/src/metadata.rs index e8ef7f63c552..268a5dd7773c 100644 --- a/src/table/src/metadata.rs +++ b/src/table/src/metadata.rs @@ -116,9 +116,14 @@ impl TableMeta { pub fn value_column_names(&self) -> impl Iterator { let columns_schemas = &self.schema.column_schemas(); - self.value_indices - .iter() - .map(|idx| &columns_schemas[*idx].name) + self.value_indices.iter().filter_map(|idx| { + let column = &columns_schemas[*idx]; + if column.is_time_index() { + None + } else { + Some(&column.name) + } + }) } /// Returns the new [TableMetaBuilder] after applying given `alter_kind`.