Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: PromQL handler in query engine #861

Merged
merged 6 commits into from
Jan 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions src/datanode/src/instance/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,11 @@ impl Instance {
let stmt = QueryLanguageParser::parse_sql(sql).context(ExecuteSqlSnafu)?;
self.execute_stmt(stmt, query_ctx).await
}

pub async fn execute_promql(&self, sql: &str, query_ctx: QueryContextRef) -> Result<Output> {
let stmt = QueryLanguageParser::parse_promql(sql).context(ExecuteSqlSnafu)?;
self.execute_stmt(stmt, query_ctx).await
}
}

// TODO(LFC): Refactor consideration: move this function to some helper mod,
Expand Down
1 change: 1 addition & 0 deletions src/datanode/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,5 @@
// limitations under the License.

mod instance_test;
mod promql_test;
pub(crate) mod test_util;
26 changes: 2 additions & 24 deletions src/datanode/src/tests/instance_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use datatypes::data_type::ConcreteDataType;
use datatypes::vectors::{Int64Vector, StringVector, UInt64Vector, VectorRef};
use session::context::QueryContext;

use crate::tests::test_util::{self, MockInstance};
use crate::tests::test_util::{self, check_output_stream, setup_test_instance, MockInstance};

#[tokio::test(flavor = "multi_thread")]
async fn test_create_database_and_insert_query() {
Expand Down Expand Up @@ -69,6 +69,7 @@ async fn test_create_database_and_insert_query() {
_ => unreachable!(),
}
}

#[tokio::test(flavor = "multi_thread")]
async fn test_issue477_same_table_name_in_different_databases() {
let instance = MockInstance::new("test_issue477_same_table_name_in_different_databases").await;
Expand Down Expand Up @@ -158,19 +159,6 @@ async fn assert_query_result(instance: &MockInstance, sql: &str, ts: i64, host:
}
}

async fn setup_test_instance(test_name: &str) -> MockInstance {
let instance = MockInstance::new(test_name).await;

test_util::create_test_table(
instance.inner(),
ConcreteDataType::timestamp_millisecond_datatype(),
)
.await
.unwrap();

instance
}

#[tokio::test(flavor = "multi_thread")]
async fn test_execute_insert() {
let instance = setup_test_instance("test_execute_insert").await;
Expand Down Expand Up @@ -354,16 +342,6 @@ pub async fn test_execute_create() {
assert!(matches!(output, Output::AffectedRows(0)));
}

async fn check_output_stream(output: Output, expected: String) {
let recordbatches = match output {
Output::Stream(stream) => util::collect_batches(stream).await.unwrap(),
Output::RecordBatches(recordbatches) => recordbatches,
_ => unreachable!(),
};
let pretty_print = recordbatches.pretty_print().unwrap();
assert_eq!(pretty_print, expected);
}

#[tokio::test(flavor = "multi_thread")]
async fn test_alter_table() {
let instance = setup_test_instance("test_alter_table").await;
Expand Down
73 changes: 73 additions & 0 deletions src/datanode/src/tests/promql_test.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_query::Output;
use session::context::QueryContext;

use crate::tests::test_util::{check_output_stream, setup_test_instance};

#[tokio::test(flavor = "multi_thread")]
async fn sql_insert_promql_query_ceil() {
let instance = setup_test_instance("test_execute_insert").await;
let query_ctx = Arc::new(QueryContext::with(
DEFAULT_CATALOG_NAME.to_owned(),
DEFAULT_SCHEMA_NAME.to_owned(),
));
let put_output = instance
.inner()
.execute_sql(
r#"insert into demo(host, cpu, memory, ts) values
('host1', 66.6, 1024, 0),
('host1', 66.6, 2048, 2000),
('host1', 66.6, 4096, 5000),
('host1', 43.1, 8192, 7000),
('host1', 19.1, 10240, 9000),
('host1', 99.1, 20480, 10000),
('host1', 999.9, 40960, 21000),
('host1', 31.9, 8192, 22000),
('host1', 95.4, 333.3, 32000),
('host1', 12423.1, 1333.3, 49000),
('host1', 0, 2333.3, 80000),
('host1', 49, 3333.3, 99000);
"#,
query_ctx.clone(),
)
.await
.unwrap();
assert!(matches!(put_output, Output::AffectedRows(12)));

let promql = "ceil(demo{host=\"host1\"})";
let query_output = instance
.inner()
.execute_promql(promql, query_ctx)
.await
.unwrap();
let expected = String::from(
"\
+---------------------+----------------+-------------------+
| ts | ceil(demo.cpu) | ceil(demo.memory) |
+---------------------+----------------+-------------------+
| 1970-01-01T00:00:00 | 67 | 1024 |
| 1970-01-01T00:00:05 | 67 | 4096 |
| 1970-01-01T00:00:10 | 100 | 20480 |
| 1970-01-01T00:00:50 | 12424 | 1334 |
| 1970-01-01T00:01:20 | 0 | 2334 |
| 1970-01-01T00:01:40 | 49 | 3334 |
+---------------------+----------------+-------------------+",
);
check_output_stream(query_output, expected).await;
}
27 changes: 26 additions & 1 deletion src/datanode/src/tests/test_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ use std::collections::HashMap;
use std::sync::Arc;

use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MIN_USER_TABLE_ID};
use common_query::Output;
use common_recordbatch::util;
use datatypes::data_type::ConcreteDataType;
use datatypes::schema::{ColumnSchema, SchemaBuilder};
use mito::config::EngineConfig;
Expand Down Expand Up @@ -85,7 +87,7 @@ pub(crate) async fn create_test_table(
ts_type: ConcreteDataType,
) -> Result<()> {
let column_schemas = vec![
ColumnSchema::new("host", ConcreteDataType::string_datatype(), false),
ColumnSchema::new("host", ConcreteDataType::string_datatype(), true),
ColumnSchema::new("cpu", ConcreteDataType::float64_datatype(), true),
ColumnSchema::new("memory", ConcreteDataType::float64_datatype(), true),
ColumnSchema::new("ts", ts_type, true).with_time_index(true),
Expand Down Expand Up @@ -146,3 +148,26 @@ pub async fn create_mock_sql_handler() -> SqlHandler {

SqlHandler::new(mock_engine, catalog_manager, factory.query_engine())
}

pub(crate) async fn setup_test_instance(test_name: &str) -> MockInstance {
let instance = MockInstance::new(test_name).await;

create_test_table(
instance.inner(),
ConcreteDataType::timestamp_millisecond_datatype(),
)
.await
.unwrap();

instance
}

pub async fn check_output_stream(output: Output, expected: String) {
let recordbatches = match output {
Output::Stream(stream) => util::collect_batches(stream).await.unwrap(),
Output::RecordBatches(recordbatches) => recordbatches,
_ => unreachable!(),
};
let pretty_print = recordbatches.pretty_print().unwrap();
assert_eq!(pretty_print, expected);
}
3 changes: 2 additions & 1 deletion src/promql/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ edition.workspace = true
license.workspace = true

[dependencies]
async-trait.workspace = true
bytemuck = "1.12"
catalog = { path = "../catalog" }
common-error = { path = "../common/error" }
Expand All @@ -13,10 +14,10 @@ datafusion.workspace = true
datatypes = { path = "../datatypes" }
futures = "0.3"
promql-parser = { git = "https://github.com/GreptimeTeam/promql-parser.git", rev = "d2f6ec4bbbae19b5156cfc977a6e7de9c6684651" }
query = { path = "../query" }
session = { path = "../session" }
snafu = { version = "0.7", features = ["backtraces"] }
table = { path = "../table" }

[dev-dependencies]
tokio = { version = "1.23", features = ["full"] }
query = { path = "../query" }
5 changes: 5 additions & 0 deletions src/promql/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ pub enum Error {
#[snafu(display("Internal error during build DataFusion plan, error: {}", source))]
DataFusionPlanning {
source: datafusion::error::DataFusionError,
backtrace: Backtrace,
},

#[snafu(display("Unexpected plan or expression: {}", desc))]
Expand All @@ -37,6 +38,9 @@ pub enum Error {
#[snafu(display("Cannot find time index column in table {}", table))]
TimeIndexNotFound { table: String, backtrace: Backtrace },

#[snafu(display("Cannot find value columns in table {}", table))]
ValueNotFound { table: String, backtrace: Backtrace },

#[snafu(display("Cannot find the table {}", table))]
TableNotFound {
table: String,
Expand Down Expand Up @@ -84,6 +88,7 @@ impl ErrorExt for Error {
use Error::*;
match self {
TimeIndexNotFound { .. }
| ValueNotFound { .. }
| UnsupportedExpr { .. }
| MultipleVector { .. }
| ExpectExpr { .. } => StatusCode::InvalidArguments,
Expand Down
2 changes: 2 additions & 0 deletions src/promql/src/extension_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,13 @@

mod instant_manipulate;
mod normalize;
mod planner;
mod range_manipulate;

use datafusion::arrow::datatypes::{ArrowPrimitiveType, TimestampMillisecondType};
pub use instant_manipulate::{InstantManipulate, InstantManipulateExec, InstantManipulateStream};
pub use normalize::{SeriesNormalize, SeriesNormalizeExec, SeriesNormalizeStream};
pub use planner::PromExtensionPlanner;
pub use range_manipulate::{RangeManipulate, RangeManipulateExec, RangeManipulateStream};

pub(crate) type Millisecond = <TimestampMillisecondType as ArrowPrimitiveType>::Native;
49 changes: 49 additions & 0 deletions src/promql/src/extension_plan/planner.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use async_trait::async_trait;
use datafusion::error::Result as DfResult;
use datafusion::execution::context::SessionState;
use datafusion::logical_expr::{LogicalPlan, UserDefinedLogicalNode};
use datafusion::physical_plan::planner::ExtensionPlanner;
use datafusion::physical_plan::{ExecutionPlan, PhysicalPlanner};

use super::{InstantManipulate, RangeManipulate};
use crate::extension_plan::SeriesNormalize;

pub struct PromExtensionPlanner {}

#[async_trait]
impl ExtensionPlanner for PromExtensionPlanner {
async fn plan_extension(
&self,
_planner: &dyn PhysicalPlanner,
node: &dyn UserDefinedLogicalNode,
_logical_inputs: &[&LogicalPlan],
physical_inputs: &[Arc<dyn ExecutionPlan>],
_session_state: &SessionState,
) -> DfResult<Option<Arc<dyn ExecutionPlan>>> {
if let Some(node) = node.as_any().downcast_ref::<SeriesNormalize>() {
Ok(Some(node.to_execution_plan(physical_inputs[0].clone())))
} else if let Some(node) = node.as_any().downcast_ref::<InstantManipulate>() {
Ok(Some(node.to_execution_plan(physical_inputs[0].clone())))
} else if let Some(node) = node.as_any().downcast_ref::<RangeManipulate>() {
Ok(Some(node.to_execution_plan(physical_inputs[0].clone())))
} else {
Ok(None)
}
}
}
Loading