Skip to content

Commit

Permalink
feat: PromQL handler in query engine (#861)
Browse files Browse the repository at this point in the history
* example promql test

Signed-off-by: Ruihang Xia <[email protected]>

* make the mock test works

Signed-off-by: Ruihang Xia <[email protected]>

* update planner test

Signed-off-by: Ruihang Xia <[email protected]>

* fix clippys

Signed-off-by: Ruihang Xia <[email protected]>

* add license header

Signed-off-by: Ruihang Xia <[email protected]>

Signed-off-by: Ruihang Xia <[email protected]>
  • Loading branch information
waynexia committed Jan 11, 2023
1 parent 9428e70 commit a9b42b4
Show file tree
Hide file tree
Showing 17 changed files with 324 additions and 46 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions src/datanode/src/instance/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,11 @@ impl Instance {
let stmt = QueryLanguageParser::parse_sql(sql).context(ExecuteSqlSnafu)?;
self.execute_stmt(stmt, query_ctx).await
}

pub async fn execute_promql(&self, sql: &str, query_ctx: QueryContextRef) -> Result<Output> {
let stmt = QueryLanguageParser::parse_promql(sql).context(ExecuteSqlSnafu)?;
self.execute_stmt(stmt, query_ctx).await
}
}

// TODO(LFC): Refactor consideration: move this function to some helper mod,
Expand Down
1 change: 1 addition & 0 deletions src/datanode/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,5 @@
// limitations under the License.

mod instance_test;
mod promql_test;
pub(crate) mod test_util;
26 changes: 2 additions & 24 deletions src/datanode/src/tests/instance_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use datatypes::data_type::ConcreteDataType;
use datatypes::vectors::{Int64Vector, StringVector, UInt64Vector, VectorRef};
use session::context::QueryContext;

use crate::tests::test_util::{self, MockInstance};
use crate::tests::test_util::{self, check_output_stream, setup_test_instance, MockInstance};

#[tokio::test(flavor = "multi_thread")]
async fn test_create_database_and_insert_query() {
Expand Down Expand Up @@ -69,6 +69,7 @@ async fn test_create_database_and_insert_query() {
_ => unreachable!(),
}
}

#[tokio::test(flavor = "multi_thread")]
async fn test_issue477_same_table_name_in_different_databases() {
let instance = MockInstance::new("test_issue477_same_table_name_in_different_databases").await;
Expand Down Expand Up @@ -158,19 +159,6 @@ async fn assert_query_result(instance: &MockInstance, sql: &str, ts: i64, host:
}
}

async fn setup_test_instance(test_name: &str) -> MockInstance {
let instance = MockInstance::new(test_name).await;

test_util::create_test_table(
instance.inner(),
ConcreteDataType::timestamp_millisecond_datatype(),
)
.await
.unwrap();

instance
}

#[tokio::test(flavor = "multi_thread")]
async fn test_execute_insert() {
let instance = setup_test_instance("test_execute_insert").await;
Expand Down Expand Up @@ -354,16 +342,6 @@ pub async fn test_execute_create() {
assert!(matches!(output, Output::AffectedRows(0)));
}

async fn check_output_stream(output: Output, expected: String) {
let recordbatches = match output {
Output::Stream(stream) => util::collect_batches(stream).await.unwrap(),
Output::RecordBatches(recordbatches) => recordbatches,
_ => unreachable!(),
};
let pretty_print = recordbatches.pretty_print().unwrap();
assert_eq!(pretty_print, expected);
}

#[tokio::test(flavor = "multi_thread")]
async fn test_alter_table() {
let instance = setup_test_instance("test_alter_table").await;
Expand Down
73 changes: 73 additions & 0 deletions src/datanode/src/tests/promql_test.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_query::Output;
use session::context::QueryContext;

use crate::tests::test_util::{check_output_stream, setup_test_instance};

#[tokio::test(flavor = "multi_thread")]
async fn sql_insert_promql_query_ceil() {
let instance = setup_test_instance("test_execute_insert").await;
let query_ctx = Arc::new(QueryContext::with(
DEFAULT_CATALOG_NAME.to_owned(),
DEFAULT_SCHEMA_NAME.to_owned(),
));
let put_output = instance
.inner()
.execute_sql(
r#"insert into demo(host, cpu, memory, ts) values
('host1', 66.6, 1024, 0),
('host1', 66.6, 2048, 2000),
('host1', 66.6, 4096, 5000),
('host1', 43.1, 8192, 7000),
('host1', 19.1, 10240, 9000),
('host1', 99.1, 20480, 10000),
('host1', 999.9, 40960, 21000),
('host1', 31.9, 8192, 22000),
('host1', 95.4, 333.3, 32000),
('host1', 12423.1, 1333.3, 49000),
('host1', 0, 2333.3, 80000),
('host1', 49, 3333.3, 99000);
"#,
query_ctx.clone(),
)
.await
.unwrap();
assert!(matches!(put_output, Output::AffectedRows(12)));

let promql = "ceil(demo{host=\"host1\"})";
let query_output = instance
.inner()
.execute_promql(promql, query_ctx)
.await
.unwrap();
let expected = String::from(
"\
+---------------------+----------------+-------------------+
| ts | ceil(demo.cpu) | ceil(demo.memory) |
+---------------------+----------------+-------------------+
| 1970-01-01T00:00:00 | 67 | 1024 |
| 1970-01-01T00:00:05 | 67 | 4096 |
| 1970-01-01T00:00:10 | 100 | 20480 |
| 1970-01-01T00:00:50 | 12424 | 1334 |
| 1970-01-01T00:01:20 | 0 | 2334 |
| 1970-01-01T00:01:40 | 49 | 3334 |
+---------------------+----------------+-------------------+",
);
check_output_stream(query_output, expected).await;
}
27 changes: 26 additions & 1 deletion src/datanode/src/tests/test_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ use std::collections::HashMap;
use std::sync::Arc;

use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MIN_USER_TABLE_ID};
use common_query::Output;
use common_recordbatch::util;
use datatypes::data_type::ConcreteDataType;
use datatypes::schema::{ColumnSchema, SchemaBuilder};
use mito::config::EngineConfig;
Expand Down Expand Up @@ -85,7 +87,7 @@ pub(crate) async fn create_test_table(
ts_type: ConcreteDataType,
) -> Result<()> {
let column_schemas = vec![
ColumnSchema::new("host", ConcreteDataType::string_datatype(), false),
ColumnSchema::new("host", ConcreteDataType::string_datatype(), true),
ColumnSchema::new("cpu", ConcreteDataType::float64_datatype(), true),
ColumnSchema::new("memory", ConcreteDataType::float64_datatype(), true),
ColumnSchema::new("ts", ts_type, true).with_time_index(true),
Expand Down Expand Up @@ -146,3 +148,26 @@ pub async fn create_mock_sql_handler() -> SqlHandler {

SqlHandler::new(mock_engine, catalog_manager, factory.query_engine())
}

pub(crate) async fn setup_test_instance(test_name: &str) -> MockInstance {
let instance = MockInstance::new(test_name).await;

create_test_table(
instance.inner(),
ConcreteDataType::timestamp_millisecond_datatype(),
)
.await
.unwrap();

instance
}

pub async fn check_output_stream(output: Output, expected: String) {
let recordbatches = match output {
Output::Stream(stream) => util::collect_batches(stream).await.unwrap(),
Output::RecordBatches(recordbatches) => recordbatches,
_ => unreachable!(),
};
let pretty_print = recordbatches.pretty_print().unwrap();
assert_eq!(pretty_print, expected);
}
3 changes: 2 additions & 1 deletion src/promql/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ edition.workspace = true
license.workspace = true

[dependencies]
async-trait.workspace = true
bytemuck = "1.12"
catalog = { path = "../catalog" }
common-error = { path = "../common/error" }
Expand All @@ -13,10 +14,10 @@ datafusion.workspace = true
datatypes = { path = "../datatypes" }
futures = "0.3"
promql-parser = { git = "https://github.com/GreptimeTeam/promql-parser.git", rev = "d2f6ec4bbbae19b5156cfc977a6e7de9c6684651" }
query = { path = "../query" }
session = { path = "../session" }
snafu = { version = "0.7", features = ["backtraces"] }
table = { path = "../table" }

[dev-dependencies]
tokio = { version = "1.23", features = ["full"] }
query = { path = "../query" }
5 changes: 5 additions & 0 deletions src/promql/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ pub enum Error {
#[snafu(display("Internal error during build DataFusion plan, error: {}", source))]
DataFusionPlanning {
source: datafusion::error::DataFusionError,
backtrace: Backtrace,
},

#[snafu(display("Unexpected plan or expression: {}", desc))]
Expand All @@ -37,6 +38,9 @@ pub enum Error {
#[snafu(display("Cannot find time index column in table {}", table))]
TimeIndexNotFound { table: String, backtrace: Backtrace },

#[snafu(display("Cannot find value columns in table {}", table))]
ValueNotFound { table: String, backtrace: Backtrace },

#[snafu(display("Cannot find the table {}", table))]
TableNotFound {
table: String,
Expand Down Expand Up @@ -84,6 +88,7 @@ impl ErrorExt for Error {
use Error::*;
match self {
TimeIndexNotFound { .. }
| ValueNotFound { .. }
| UnsupportedExpr { .. }
| MultipleVector { .. }
| ExpectExpr { .. } => StatusCode::InvalidArguments,
Expand Down
2 changes: 2 additions & 0 deletions src/promql/src/extension_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,13 @@

mod instant_manipulate;
mod normalize;
mod planner;
mod range_manipulate;

use datafusion::arrow::datatypes::{ArrowPrimitiveType, TimestampMillisecondType};
pub use instant_manipulate::{InstantManipulate, InstantManipulateExec, InstantManipulateStream};
pub use normalize::{SeriesNormalize, SeriesNormalizeExec, SeriesNormalizeStream};
pub use planner::PromExtensionPlanner;
pub use range_manipulate::{RangeManipulate, RangeManipulateExec, RangeManipulateStream};

pub(crate) type Millisecond = <TimestampMillisecondType as ArrowPrimitiveType>::Native;
49 changes: 49 additions & 0 deletions src/promql/src/extension_plan/planner.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use async_trait::async_trait;
use datafusion::error::Result as DfResult;
use datafusion::execution::context::SessionState;
use datafusion::logical_expr::{LogicalPlan, UserDefinedLogicalNode};
use datafusion::physical_plan::planner::ExtensionPlanner;
use datafusion::physical_plan::{ExecutionPlan, PhysicalPlanner};

use super::{InstantManipulate, RangeManipulate};
use crate::extension_plan::SeriesNormalize;

pub struct PromExtensionPlanner {}

#[async_trait]
impl ExtensionPlanner for PromExtensionPlanner {
async fn plan_extension(
&self,
_planner: &dyn PhysicalPlanner,
node: &dyn UserDefinedLogicalNode,
_logical_inputs: &[&LogicalPlan],
physical_inputs: &[Arc<dyn ExecutionPlan>],
_session_state: &SessionState,
) -> DfResult<Option<Arc<dyn ExecutionPlan>>> {
if let Some(node) = node.as_any().downcast_ref::<SeriesNormalize>() {
Ok(Some(node.to_execution_plan(physical_inputs[0].clone())))
} else if let Some(node) = node.as_any().downcast_ref::<InstantManipulate>() {
Ok(Some(node.to_execution_plan(physical_inputs[0].clone())))
} else if let Some(node) = node.as_any().downcast_ref::<RangeManipulate>() {
Ok(Some(node.to_execution_plan(physical_inputs[0].clone())))
} else {
Ok(None)
}
}
}
Loading

0 comments on commit a9b42b4

Please sign in to comment.