Skip to content

Commit

Permalink
feat: physical plan wrapper (GreptimeTeam#1837)
Browse files Browse the repository at this point in the history
* test: add physical plan wrapper trait

* test: add plugins to datanode initialization

* test: add plugins to datanode initialization

* chore: add metrics method

* chore: update meter-core version

* chore: remove unused code

* chore: impl metrics method on df execution plan adapter

* chore: minor comment fix

* chore: add retry in create table

* chore: shrink keep lease handler buffer

* chore: add etcd batch size warn

* chore: try shrink

* Revert "chore: try shrink"

This reverts commit 0361b51.

* chore: add create table backup time

* add metrics in some interfaces

Signed-off-by: Ruihang Xia <[email protected]>

* calc elapsed time and rows

Signed-off-by: Ruihang Xia <[email protected]>

* chore: remove timer in scan batch

* chore: add back stream metrics wrapper

* chore: add timer to ready poll

* chore: minor update

* chore: try using df_plan.metrics()

* chore: remove table scan timer

* chore: remove scan timer

* chore: add debug log

* Revert "chore: add debug log"

This reverts commit 672a013.

* chore: use batch size as row count

* chore: use batch size as row count

* chore: tune code for pr

* chore: rename to physical plan wrapper

---------

Signed-off-by: Ruihang Xia <[email protected]>
Co-authored-by: Ruihang Xia <[email protected]>
  • Loading branch information
2 people authored and paomian committed Oct 19, 2023
1 parent f301cb7 commit baa3bc9
Show file tree
Hide file tree
Showing 14 changed files with 88 additions and 23 deletions.
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -88,11 +88,11 @@ tokio-util = { version = "0.7", features = ["io-util", "compat"] }
tonic = { version = "0.9", features = ["tls"] }
uuid = { version = "1", features = ["serde", "v4", "fast-rng"] }
metrics = "0.20"
meter-core = { git = "https://github.com/GreptimeTeam/greptime-meter.git", rev = "f0798c4c648d89f51abe63e870919c75dd463199" }
meter-core = { git = "https://github.com/GreptimeTeam/greptime-meter.git", rev = "abbd357c1e193cd270ea65ee7652334a150b628f" }

[workspace.dependencies.meter-macros]
git = "https://github.com/GreptimeTeam/greptime-meter.git"
rev = "f0798c4c648d89f51abe63e870919c75dd463199"
rev = "abbd357c1e193cd270ea65ee7652334a150b628f"

[profile.release]
debug = true
Expand Down
6 changes: 4 additions & 2 deletions src/cmd/src/cli/repl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use std::time::Instant;
use catalog::remote::CachedMetaKvBackend;
use client::client_manager::DatanodeClients;
use client::{Client, Database, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_base::Plugins;
use common_error::prelude::ErrorExt;
use common_query::Output;
use common_recordbatch::RecordBatches;
Expand Down Expand Up @@ -266,13 +267,14 @@ async fn create_query_engine(meta_addr: &str) -> Result<DatafusionQueryEngine> {
partition_manager,
datanode_clients,
));
let plugins: Arc<Plugins> = Default::default();
let state = Arc::new(QueryEngineState::new(
catalog_list,
false,
None,
None,
Default::default(),
plugins.clone(),
));

Ok(DatafusionQueryEngine::new(state))
Ok(DatafusionQueryEngine::new(state, plugins))
}
4 changes: 3 additions & 1 deletion src/cmd/src/datanode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,9 @@ impl StartCommand {
logging::info!("Datanode start command: {:#?}", self);
logging::info!("Datanode options: {:#?}", opts);

let datanode = Datanode::new(opts).await.context(StartDatanodeSnafu)?;
let datanode = Datanode::new(opts, Default::default())
.await
.context(StartDatanodeSnafu)?;

Ok(Instance { datanode })
}
Expand Down
2 changes: 1 addition & 1 deletion src/cmd/src/standalone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ impl StartCommand {
fe_opts, dn_opts
);

let datanode = Datanode::new(dn_opts.clone())
let datanode = Datanode::new(dn_opts.clone(), Default::default())
.await
.context(StartDatanodeSnafu)?;

Expand Down
2 changes: 1 addition & 1 deletion src/common/query/src/physical_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ impl PhysicalPlan for PhysicalPlanAdapter {
}

fn metrics(&self) -> Option<MetricsSet> {
Some(self.metric.clone_inner())
self.df_plan.metrics()
}
}

Expand Down
6 changes: 4 additions & 2 deletions src/datanode/src/datanode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,11 @@

//! Datanode configurations

use std::sync::Arc;
use std::time::Duration;

use common_base::readable_size::ReadableSize;
use common_base::Plugins;
use common_error::prelude::BoxedError;
use common_telemetry::info;
use common_telemetry::logging::LoggingOptions;
Expand Down Expand Up @@ -386,8 +388,8 @@ pub struct Datanode {
}

impl Datanode {
pub async fn new(opts: DatanodeOptions) -> Result<Datanode> {
let (instance, heartbeat_task) = Instance::with_opts(&opts).await?;
pub async fn new(opts: DatanodeOptions, plugins: Arc<Plugins>) -> Result<Datanode> {
let (instance, heartbeat_task) = Instance::with_opts(&opts, plugins).await?;
let services = match opts.mode {
Mode::Distributed => Some(Services::try_new(instance.clone(), &opts).await?),
Mode::Standalone => None,
Expand Down
17 changes: 14 additions & 3 deletions src/datanode/src/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use catalog::remote::region_alive_keeper::RegionAliveKeepers;
use catalog::remote::{CachedMetaKvBackend, RemoteCatalogManager};
use catalog::{CatalogManager, CatalogManagerRef, RegisterTableRequest};
use common_base::paths::{CLUSTER_DIR, WAL_DIR};
use common_base::Plugins;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MIN_USER_TABLE_ID};
use common_error::prelude::BoxedError;
use common_grpc::channel_manager::{ChannelConfig, ChannelManager};
Expand Down Expand Up @@ -84,7 +85,10 @@ pub struct Instance {
pub type InstanceRef = Arc<Instance>;

impl Instance {
pub async fn with_opts(opts: &DatanodeOptions) -> Result<(InstanceRef, Option<HeartbeatTask>)> {
pub async fn with_opts(
opts: &DatanodeOptions,
plugins: Arc<Plugins>,
) -> Result<(InstanceRef, Option<HeartbeatTask>)> {
let meta_client = match opts.mode {
Mode::Standalone => None,
Mode::Distributed => {
Expand All @@ -101,7 +105,7 @@ impl Instance {

let compaction_scheduler = create_compaction_scheduler(opts);

Self::new(opts, meta_client, compaction_scheduler).await
Self::new(opts, meta_client, compaction_scheduler, plugins).await
}

fn build_heartbeat_task(
Expand Down Expand Up @@ -154,6 +158,7 @@ impl Instance {
opts: &DatanodeOptions,
meta_client: Option<Arc<MetaClient>>,
compaction_scheduler: CompactionSchedulerRef<RaftEngineLogStore>,
plugins: Arc<Plugins>,
) -> Result<(InstanceRef, Option<HeartbeatTask>)> {
let object_store = store::new_object_store(&opts.storage.store).await?;
let log_store = Arc::new(create_log_store(&opts.storage.store, &opts.wal).await?);
Expand Down Expand Up @@ -261,7 +266,13 @@ impl Instance {
};

catalog_manager.start().await.context(CatalogSnafu)?;
let factory = QueryEngineFactory::new(catalog_manager.clone(), false);
let factory = QueryEngineFactory::new_with_plugins(
catalog_manager.clone(),
false,
None,
None,
plugins,
);
let query_engine = factory.query_engine();

let procedure_manager =
Expand Down
8 changes: 7 additions & 1 deletion src/datanode/src/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,13 @@ impl Instance {
) -> Result<(InstanceRef, Option<HeartbeatTask>)> {
let meta_client = Arc::new(mock_meta_client(meta_srv, opts.node_id.unwrap_or(42)).await);
let compaction_scheduler = Arc::new(NoopCompactionScheduler::default());
Instance::new(opts, Some(meta_client), compaction_scheduler).await
Instance::new(
opts,
Some(meta_client),
compaction_scheduler,
Default::default(),
)
.await
}
}

Expand Down
23 changes: 18 additions & 5 deletions src/query/src/datafusion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use std::collections::HashMap;
use std::sync::Arc;

use async_trait::async_trait;
use common_base::Plugins;
use common_error::prelude::BoxedError;
use common_function::scalars::aggregate::AggregateFunctionMetaRef;
use common_function::scalars::udf::create_udf;
Expand Down Expand Up @@ -53,27 +54,39 @@ use crate::executor::QueryExecutor;
use crate::logical_optimizer::LogicalOptimizer;
use crate::physical_optimizer::PhysicalOptimizer;
use crate::physical_planner::PhysicalPlanner;
use crate::physical_wrapper::PhysicalPlanWrapperRef;
use crate::plan::LogicalPlan;
use crate::planner::{DfLogicalPlanner, LogicalPlanner};
use crate::query_engine::{DescribeResult, QueryEngineContext, QueryEngineState};
use crate::{metrics, QueryEngine};

pub struct DatafusionQueryEngine {
state: Arc<QueryEngineState>,
plugins: Arc<Plugins>,
}

impl DatafusionQueryEngine {
pub fn new(state: Arc<QueryEngineState>) -> Self {
Self { state }
pub fn new(state: Arc<QueryEngineState>, plugins: Arc<Plugins>) -> Self {
Self { state, plugins }
}

async fn exec_query_plan(&self, plan: LogicalPlan) -> Result<Output> {
async fn exec_query_plan(
&self,
plan: LogicalPlan,
query_ctx: QueryContextRef,
) -> Result<Output> {
let mut ctx = QueryEngineContext::new(self.state.session_state());

// `create_physical_plan` will optimize logical plan internally
let physical_plan = self.create_physical_plan(&mut ctx, &plan).await?;
let physical_plan = self.optimize_physical_plan(&mut ctx, physical_plan)?;

let physical_plan = if let Some(wrapper) = self.plugins.get::<PhysicalPlanWrapperRef>() {
wrapper.wrap(physical_plan, query_ctx)
} else {
physical_plan
};

Ok(Output::Stream(self.execute_stream(&ctx, &physical_plan)?))
}

Expand All @@ -95,7 +108,7 @@ impl DatafusionQueryEngine {
let table = self.find_table(&table_name).await?;

let output = self
.exec_query_plan(LogicalPlan::DfPlan((*dml.input).clone()))
.exec_query_plan(LogicalPlan::DfPlan((*dml.input).clone()), query_ctx)
.await?;
let mut stream = match output {
Output::RecordBatches(batches) => batches.as_stream(),
Expand Down Expand Up @@ -216,7 +229,7 @@ impl QueryEngine for DatafusionQueryEngine {
LogicalPlan::DfPlan(DfLogicalPlan::Dml(dml)) => {
self.exec_dml_statement(dml, query_ctx).await
}
_ => self.exec_query_plan(plan).await,
_ => self.exec_query_plan(plan, query_ctx).await,
}
}

Expand Down
1 change: 1 addition & 0 deletions src/query/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ mod optimizer;
pub mod parser;
pub mod physical_optimizer;
pub mod physical_planner;
pub mod physical_wrapper;
pub mod plan;
pub mod planner;
pub mod query_engine;
Expand Down
26 changes: 26 additions & 0 deletions src/query/src/physical_wrapper.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use common_query::physical_plan::PhysicalPlan;
use session::context::QueryContextRef;

/// wrap physical plan with additional layer
/// e.g: metrics retrieving layer upon physical plan
pub trait PhysicalPlanWrapper: Send + Sync + 'static {
fn wrap(&self, origin: Arc<dyn PhysicalPlan>, ctx: QueryContextRef) -> Arc<dyn PhysicalPlan>;
}

pub type PhysicalPlanWrapperRef = Arc<dyn PhysicalPlanWrapper>;
4 changes: 2 additions & 2 deletions src/query/src/query_engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,9 +108,9 @@ impl QueryEngineFactory {
with_dist_planner,
partition_manager,
clients,
plugins,
plugins.clone(),
));
let query_engine = Arc::new(DatafusionQueryEngine::new(state));
let query_engine = Arc::new(DatafusionQueryEngine::new(state, plugins));
register_functions(&query_engine);
Self { query_engine }
}
Expand Down
4 changes: 3 additions & 1 deletion tests-integration/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,9 @@ impl MockStandaloneInstance {

pub(crate) async fn create_standalone_instance(test_name: &str) -> MockStandaloneInstance {
let (opts, guard) = create_tmp_dir_and_datanode_opts(StorageType::File, test_name);
let (dn_instance, heartbeat) = DatanodeInstance::with_opts(&opts).await.unwrap();
let (dn_instance, heartbeat) = DatanodeInstance::with_opts(&opts, Default::default())
.await
.unwrap();

let frontend_instance = Instance::try_new_standalone(dn_instance.clone())
.await
Expand Down

0 comments on commit baa3bc9

Please sign in to comment.