From baa3bc9bef1f98d65784c05249538ed5417e17e3 Mon Sep 17 00:00:00 2001 From: shuiyisong <113876041+shuiyisong@users.noreply.github.com> Date: Tue, 27 Jun 2023 14:04:04 +0800 Subject: [PATCH] feat: physical plan wrapper (#1837) * test: add physical plan wrapper trait * test: add plugins to datanode initialization * test: add plugins to datanode initialization * chore: add metrics method * chore: update meter-core version * chore: remove unused code * chore: impl metrics method on df execution plan adapter * chore: minor comment fix * chore: add retry in create table * chore: shrink keep lease handler buffer * chore: add etcd batch size warn * chore: try shrink * Revert "chore: try shrink" This reverts commit 0361b51670278575594ab6748971ed5ad4cceeb3. * chore: add create table backup time * add metrics in some interfaces Signed-off-by: Ruihang Xia * calc elapsed time and rows Signed-off-by: Ruihang Xia * chore: remove timer in scan batch * chore: add back stream metrics wrapper * chore: add timer to ready poll * chore: minor update * chore: try using df_plan.metrics() * chore: remove table scan timer * chore: remove scan timer * chore: add debug log * Revert "chore: add debug log" This reverts commit 672a0138fd229b08b1e84fa4a66599d06f47c6bd. * chore: use batch size as row count * chore: use batch size as row count * chore: tune code for pr * chore: rename to physical plan wrapper --------- Signed-off-by: Ruihang Xia Co-authored-by: Ruihang Xia --- Cargo.lock | 4 ++-- Cargo.toml | 4 ++-- src/cmd/src/cli/repl.rs | 6 ++++-- src/cmd/src/datanode.rs | 4 +++- src/cmd/src/standalone.rs | 2 +- src/common/query/src/physical_plan.rs | 2 +- src/datanode/src/datanode.rs | 6 ++++-- src/datanode/src/instance.rs | 17 ++++++++++++++--- src/datanode/src/mock.rs | 8 +++++++- src/query/src/datafusion.rs | 23 ++++++++++++++++++----- src/query/src/lib.rs | 1 + src/query/src/physical_wrapper.rs | 26 ++++++++++++++++++++++++++ src/query/src/query_engine.rs | 4 ++-- tests-integration/src/tests.rs | 4 +++- 14 files changed, 88 insertions(+), 23 deletions(-) create mode 100644 src/query/src/physical_wrapper.rs diff --git a/Cargo.lock b/Cargo.lock index c75aabea7190..fe703d366e21 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5209,7 +5209,7 @@ dependencies = [ [[package]] name = "meter-core" version = "0.1.0" -source = "git+https://github.com/GreptimeTeam/greptime-meter.git?rev=f0798c4c648d89f51abe63e870919c75dd463199#f0798c4c648d89f51abe63e870919c75dd463199" +source = "git+https://github.com/GreptimeTeam/greptime-meter.git?rev=abbd357c1e193cd270ea65ee7652334a150b628f#abbd357c1e193cd270ea65ee7652334a150b628f" dependencies = [ "anymap", "once_cell", @@ -5219,7 +5219,7 @@ dependencies = [ [[package]] name = "meter-macros" version = "0.1.0" -source = "git+https://github.com/GreptimeTeam/greptime-meter.git?rev=f0798c4c648d89f51abe63e870919c75dd463199#f0798c4c648d89f51abe63e870919c75dd463199" +source = "git+https://github.com/GreptimeTeam/greptime-meter.git?rev=abbd357c1e193cd270ea65ee7652334a150b628f#abbd357c1e193cd270ea65ee7652334a150b628f" dependencies = [ "meter-core", ] diff --git a/Cargo.toml b/Cargo.toml index 4a5d98453814..0b09ee555e5b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -88,11 +88,11 @@ tokio-util = { version = "0.7", features = ["io-util", "compat"] } tonic = { version = "0.9", features = ["tls"] } uuid = { version = "1", features = ["serde", "v4", "fast-rng"] } metrics = "0.20" -meter-core = { git = "https://github.com/GreptimeTeam/greptime-meter.git", rev = "f0798c4c648d89f51abe63e870919c75dd463199" } +meter-core = { git = "https://github.com/GreptimeTeam/greptime-meter.git", rev = "abbd357c1e193cd270ea65ee7652334a150b628f" } [workspace.dependencies.meter-macros] git = "https://github.com/GreptimeTeam/greptime-meter.git" -rev = "f0798c4c648d89f51abe63e870919c75dd463199" +rev = "abbd357c1e193cd270ea65ee7652334a150b628f" [profile.release] debug = true diff --git a/src/cmd/src/cli/repl.rs b/src/cmd/src/cli/repl.rs index 6e4d5c645c39..56cea0dd64e5 100644 --- a/src/cmd/src/cli/repl.rs +++ b/src/cmd/src/cli/repl.rs @@ -19,6 +19,7 @@ use std::time::Instant; use catalog::remote::CachedMetaKvBackend; use client::client_manager::DatanodeClients; use client::{Client, Database, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; +use common_base::Plugins; use common_error::prelude::ErrorExt; use common_query::Output; use common_recordbatch::RecordBatches; @@ -266,13 +267,14 @@ async fn create_query_engine(meta_addr: &str) -> Result { partition_manager, datanode_clients, )); + let plugins: Arc = Default::default(); let state = Arc::new(QueryEngineState::new( catalog_list, false, None, None, - Default::default(), + plugins.clone(), )); - Ok(DatafusionQueryEngine::new(state)) + Ok(DatafusionQueryEngine::new(state, plugins)) } diff --git a/src/cmd/src/datanode.rs b/src/cmd/src/datanode.rs index 5e35cb21fbac..b3518d9627c8 100644 --- a/src/cmd/src/datanode.rs +++ b/src/cmd/src/datanode.rs @@ -170,7 +170,9 @@ impl StartCommand { logging::info!("Datanode start command: {:#?}", self); logging::info!("Datanode options: {:#?}", opts); - let datanode = Datanode::new(opts).await.context(StartDatanodeSnafu)?; + let datanode = Datanode::new(opts, Default::default()) + .await + .context(StartDatanodeSnafu)?; Ok(Instance { datanode }) } diff --git a/src/cmd/src/standalone.rs b/src/cmd/src/standalone.rs index 6e307aaad9d8..85a4beebec39 100644 --- a/src/cmd/src/standalone.rs +++ b/src/cmd/src/standalone.rs @@ -308,7 +308,7 @@ impl StartCommand { fe_opts, dn_opts ); - let datanode = Datanode::new(dn_opts.clone()) + let datanode = Datanode::new(dn_opts.clone(), Default::default()) .await .context(StartDatanodeSnafu)?; diff --git a/src/common/query/src/physical_plan.rs b/src/common/query/src/physical_plan.rs index 1c148020ac9b..a3e30e1ccf3f 100644 --- a/src/common/query/src/physical_plan.rs +++ b/src/common/query/src/physical_plan.rs @@ -150,7 +150,7 @@ impl PhysicalPlan for PhysicalPlanAdapter { } fn metrics(&self) -> Option { - Some(self.metric.clone_inner()) + self.df_plan.metrics() } } diff --git a/src/datanode/src/datanode.rs b/src/datanode/src/datanode.rs index 59bf68557a58..6fa7eb4e7a90 100644 --- a/src/datanode/src/datanode.rs +++ b/src/datanode/src/datanode.rs @@ -14,9 +14,11 @@ //! Datanode configurations +use std::sync::Arc; use std::time::Duration; use common_base::readable_size::ReadableSize; +use common_base::Plugins; use common_error::prelude::BoxedError; use common_telemetry::info; use common_telemetry::logging::LoggingOptions; @@ -386,8 +388,8 @@ pub struct Datanode { } impl Datanode { - pub async fn new(opts: DatanodeOptions) -> Result { - let (instance, heartbeat_task) = Instance::with_opts(&opts).await?; + pub async fn new(opts: DatanodeOptions, plugins: Arc) -> Result { + let (instance, heartbeat_task) = Instance::with_opts(&opts, plugins).await?; let services = match opts.mode { Mode::Distributed => Some(Services::try_new(instance.clone(), &opts).await?), Mode::Standalone => None, diff --git a/src/datanode/src/instance.rs b/src/datanode/src/instance.rs index 50d9cce35c3f..1b23e13b4462 100644 --- a/src/datanode/src/instance.rs +++ b/src/datanode/src/instance.rs @@ -22,6 +22,7 @@ use catalog::remote::region_alive_keeper::RegionAliveKeepers; use catalog::remote::{CachedMetaKvBackend, RemoteCatalogManager}; use catalog::{CatalogManager, CatalogManagerRef, RegisterTableRequest}; use common_base::paths::{CLUSTER_DIR, WAL_DIR}; +use common_base::Plugins; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MIN_USER_TABLE_ID}; use common_error::prelude::BoxedError; use common_grpc::channel_manager::{ChannelConfig, ChannelManager}; @@ -84,7 +85,10 @@ pub struct Instance { pub type InstanceRef = Arc; impl Instance { - pub async fn with_opts(opts: &DatanodeOptions) -> Result<(InstanceRef, Option)> { + pub async fn with_opts( + opts: &DatanodeOptions, + plugins: Arc, + ) -> Result<(InstanceRef, Option)> { let meta_client = match opts.mode { Mode::Standalone => None, Mode::Distributed => { @@ -101,7 +105,7 @@ impl Instance { let compaction_scheduler = create_compaction_scheduler(opts); - Self::new(opts, meta_client, compaction_scheduler).await + Self::new(opts, meta_client, compaction_scheduler, plugins).await } fn build_heartbeat_task( @@ -154,6 +158,7 @@ impl Instance { opts: &DatanodeOptions, meta_client: Option>, compaction_scheduler: CompactionSchedulerRef, + plugins: Arc, ) -> Result<(InstanceRef, Option)> { let object_store = store::new_object_store(&opts.storage.store).await?; let log_store = Arc::new(create_log_store(&opts.storage.store, &opts.wal).await?); @@ -261,7 +266,13 @@ impl Instance { }; catalog_manager.start().await.context(CatalogSnafu)?; - let factory = QueryEngineFactory::new(catalog_manager.clone(), false); + let factory = QueryEngineFactory::new_with_plugins( + catalog_manager.clone(), + false, + None, + None, + plugins, + ); let query_engine = factory.query_engine(); let procedure_manager = diff --git a/src/datanode/src/mock.rs b/src/datanode/src/mock.rs index 36ebac5187a6..4114ae0e688c 100644 --- a/src/datanode/src/mock.rs +++ b/src/datanode/src/mock.rs @@ -38,7 +38,13 @@ impl Instance { ) -> Result<(InstanceRef, Option)> { let meta_client = Arc::new(mock_meta_client(meta_srv, opts.node_id.unwrap_or(42)).await); let compaction_scheduler = Arc::new(NoopCompactionScheduler::default()); - Instance::new(opts, Some(meta_client), compaction_scheduler).await + Instance::new( + opts, + Some(meta_client), + compaction_scheduler, + Default::default(), + ) + .await } } diff --git a/src/query/src/datafusion.rs b/src/query/src/datafusion.rs index 10066d411e46..4dd7c1f21bef 100644 --- a/src/query/src/datafusion.rs +++ b/src/query/src/datafusion.rs @@ -22,6 +22,7 @@ use std::collections::HashMap; use std::sync::Arc; use async_trait::async_trait; +use common_base::Plugins; use common_error::prelude::BoxedError; use common_function::scalars::aggregate::AggregateFunctionMetaRef; use common_function::scalars::udf::create_udf; @@ -53,6 +54,7 @@ use crate::executor::QueryExecutor; use crate::logical_optimizer::LogicalOptimizer; use crate::physical_optimizer::PhysicalOptimizer; use crate::physical_planner::PhysicalPlanner; +use crate::physical_wrapper::PhysicalPlanWrapperRef; use crate::plan::LogicalPlan; use crate::planner::{DfLogicalPlanner, LogicalPlanner}; use crate::query_engine::{DescribeResult, QueryEngineContext, QueryEngineState}; @@ -60,20 +62,31 @@ use crate::{metrics, QueryEngine}; pub struct DatafusionQueryEngine { state: Arc, + plugins: Arc, } impl DatafusionQueryEngine { - pub fn new(state: Arc) -> Self { - Self { state } + pub fn new(state: Arc, plugins: Arc) -> Self { + Self { state, plugins } } - async fn exec_query_plan(&self, plan: LogicalPlan) -> Result { + async fn exec_query_plan( + &self, + plan: LogicalPlan, + query_ctx: QueryContextRef, + ) -> Result { let mut ctx = QueryEngineContext::new(self.state.session_state()); // `create_physical_plan` will optimize logical plan internally let physical_plan = self.create_physical_plan(&mut ctx, &plan).await?; let physical_plan = self.optimize_physical_plan(&mut ctx, physical_plan)?; + let physical_plan = if let Some(wrapper) = self.plugins.get::() { + wrapper.wrap(physical_plan, query_ctx) + } else { + physical_plan + }; + Ok(Output::Stream(self.execute_stream(&ctx, &physical_plan)?)) } @@ -95,7 +108,7 @@ impl DatafusionQueryEngine { let table = self.find_table(&table_name).await?; let output = self - .exec_query_plan(LogicalPlan::DfPlan((*dml.input).clone())) + .exec_query_plan(LogicalPlan::DfPlan((*dml.input).clone()), query_ctx) .await?; let mut stream = match output { Output::RecordBatches(batches) => batches.as_stream(), @@ -216,7 +229,7 @@ impl QueryEngine for DatafusionQueryEngine { LogicalPlan::DfPlan(DfLogicalPlan::Dml(dml)) => { self.exec_dml_statement(dml, query_ctx).await } - _ => self.exec_query_plan(plan).await, + _ => self.exec_query_plan(plan, query_ctx).await, } } diff --git a/src/query/src/lib.rs b/src/query/src/lib.rs index 4e18a86c0054..a71d6b88d7f4 100644 --- a/src/query/src/lib.rs +++ b/src/query/src/lib.rs @@ -26,6 +26,7 @@ mod optimizer; pub mod parser; pub mod physical_optimizer; pub mod physical_planner; +pub mod physical_wrapper; pub mod plan; pub mod planner; pub mod query_engine; diff --git a/src/query/src/physical_wrapper.rs b/src/query/src/physical_wrapper.rs new file mode 100644 index 000000000000..8fb3e8c15739 --- /dev/null +++ b/src/query/src/physical_wrapper.rs @@ -0,0 +1,26 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use common_query::physical_plan::PhysicalPlan; +use session::context::QueryContextRef; + +/// wrap physical plan with additional layer +/// e.g: metrics retrieving layer upon physical plan +pub trait PhysicalPlanWrapper: Send + Sync + 'static { + fn wrap(&self, origin: Arc, ctx: QueryContextRef) -> Arc; +} + +pub type PhysicalPlanWrapperRef = Arc; diff --git a/src/query/src/query_engine.rs b/src/query/src/query_engine.rs index ed7f75e700a6..3cfbf70ff74c 100644 --- a/src/query/src/query_engine.rs +++ b/src/query/src/query_engine.rs @@ -108,9 +108,9 @@ impl QueryEngineFactory { with_dist_planner, partition_manager, clients, - plugins, + plugins.clone(), )); - let query_engine = Arc::new(DatafusionQueryEngine::new(state)); + let query_engine = Arc::new(DatafusionQueryEngine::new(state, plugins)); register_functions(&query_engine); Self { query_engine } } diff --git a/tests-integration/src/tests.rs b/tests-integration/src/tests.rs index 2557dc8201af..ca6bfaf60d31 100644 --- a/tests-integration/src/tests.rs +++ b/tests-integration/src/tests.rs @@ -65,7 +65,9 @@ impl MockStandaloneInstance { pub(crate) async fn create_standalone_instance(test_name: &str) -> MockStandaloneInstance { let (opts, guard) = create_tmp_dir_and_datanode_opts(StorageType::File, test_name); - let (dn_instance, heartbeat) = DatanodeInstance::with_opts(&opts).await.unwrap(); + let (dn_instance, heartbeat) = DatanodeInstance::with_opts(&opts, Default::default()) + .await + .unwrap(); let frontend_instance = Instance::try_new_standalone(dn_instance.clone()) .await