diff --git a/src/common/meta/src/key/table_info.rs b/src/common/meta/src/key/table_info.rs index 730daa1c3ede..b50d7bb6b037 100644 --- a/src/common/meta/src/key/table_info.rs +++ b/src/common/meta/src/key/table_info.rs @@ -128,6 +128,7 @@ impl TableInfoValue { } pub type TableInfoManagerRef = Arc; +#[derive(Clone)] pub struct TableInfoManager { kv_backend: KvBackendRef, } diff --git a/src/flow/src/adapter.rs b/src/flow/src/adapter.rs index 8179ca5807f9..33b05ddec12b 100644 --- a/src/flow/src/adapter.rs +++ b/src/flow/src/adapter.rs @@ -17,9 +17,16 @@ pub(crate) mod error; pub(crate) mod node_context; +mod table_source; +mod util; -pub(crate) use node_context::{FlowId, FlownodeContext, TableName}; +pub(crate) use node_context::FlownodeContext; +pub(crate) use table_source::TableSource; mod worker; pub const PER_REQ_MAX_ROW_CNT: usize = 8192; +// TODO: refactor common types for flow to a separate module +/// FlowId is a unique identifier for a flow task +pub type FlowId = u32; +pub type TableName = [String; 3]; diff --git a/src/flow/src/adapter/error.rs b/src/flow/src/adapter/error.rs index 2406dc5ea79d..47df3d9014aa 100644 --- a/src/flow/src/adapter/error.rs +++ b/src/flow/src/adapter/error.rs @@ -98,13 +98,6 @@ pub enum Error { location: Location, }, - #[snafu(display("Invalid query plan: {source}"))] - InvalidQueryPlan { - source: query::error::Error, - #[snafu(implicit)] - location: Location, - }, - #[snafu(display("Invalid query: prost can't decode substrait plan: {inner}"))] InvalidQueryProst { inner: api::DecodeError, @@ -112,13 +105,6 @@ pub enum Error { location: Location, }, - #[snafu(display("Invalid query, can't transform to substrait: {source}"))] - InvalidQuerySubstrait { - source: substrait::error::Error, - #[snafu(implicit)] - location: Location, - }, - #[snafu(display("Invalid query: {reason}"))] InvalidQuery { reason: String, @@ -193,9 +179,7 @@ impl ErrorExt for Error { Self::TableNotFound { .. } | Self::TableNotFoundMeta { .. } | Self::FlowNotFound { .. } => StatusCode::TableNotFound, - Self::InvalidQueryPlan { .. } - | Self::InvalidQuerySubstrait { .. } - | Self::InvalidQueryProst { .. } + Self::InvalidQueryProst { .. } | &Self::InvalidQuery { .. } | &Self::Plan { .. } | &Self::Datatypes { .. } => StatusCode::PlanQuery, diff --git a/src/flow/src/adapter/node_context.rs b/src/flow/src/adapter/node_context.rs index 345414182222..82900aac3644 100644 --- a/src/flow/src/adapter/node_context.rs +++ b/src/flow/src/adapter/node_context.rs @@ -17,32 +17,18 @@ use std::collections::{BTreeMap, BTreeSet, HashMap, VecDeque}; use std::sync::Arc; +use common_telemetry::debug; use session::context::QueryContext; use snafu::{OptionExt, ResultExt}; use table::metadata::TableId; use tokio::sync::{broadcast, mpsc}; use crate::adapter::error::{Error, EvalSnafu, TableNotFoundSnafu}; +use crate::adapter::{FlowId, TableName, TableSource}; use crate::expr::error::InternalSnafu; use crate::expr::GlobalId; use crate::repr::{DiffRow, RelationType, BROADCAST_CAP}; -// TODO: refactor common types for flow to a separate module -/// FlowId is a unique identifier for a flow task -pub type FlowId = u64; -pub type TableName = [String; 3]; - -pub struct TableSource {} - -impl TableSource { - pub async fn get_table_name_schema( - &self, - _table_id: &TableId, - ) -> Result<(TableName, RelationType), Error> { - todo!() - } -} - /// A context that holds the information of the dataflow #[derive(Default)] pub struct FlownodeContext { @@ -53,7 +39,7 @@ pub struct FlownodeContext { /// broadcast sender for source table, any incoming write request will be sent to the source table's corresponding sender /// /// Note that we are getting insert requests with table id, so we should use table id as the key - pub source_sender: BTreeMap>, + pub source_sender: BTreeMap, /// broadcast receiver for sink table, there should only be one receiver, and it will receive all the data from the sink table /// /// and send it back to the client, since we are mocking the sink table as a client, we should use table name as the key @@ -74,38 +60,90 @@ pub struct FlownodeContext { pub query_context: Option>, } +/// a simple broadcast sender with backpressure and unbound capacity +/// +/// receiver still use tokio broadcast channel, since only sender side need to know +/// backpressure and adjust dataflow running duration to avoid blocking +pub struct SourceSender { + sender: broadcast::Sender, + send_buf: VecDeque, +} + +impl Default for SourceSender { + fn default() -> Self { + Self { + sender: broadcast::Sender::new(BROADCAST_CAP), + send_buf: Default::default(), + } + } +} + +impl SourceSender { + pub fn get_receiver(&self) -> broadcast::Receiver { + self.sender.subscribe() + } + + /// send as many as possible rows from send buf + /// until send buf is empty or broadchannel is full + pub fn try_send_all(&mut self) -> Result { + let mut row_cnt = 0; + loop { + // if inner sender channel is empty or send buf is empty, there + // is nothing to do for now, just break + if self.sender.len() >= BROADCAST_CAP || self.send_buf.is_empty() { + break; + } + if let Some(row) = self.send_buf.pop_front() { + self.sender + .send(row) + .map_err(|err| { + InternalSnafu { + reason: format!("Failed to send row, error = {:?}", err), + } + .build() + }) + .with_context(|_| EvalSnafu)?; + row_cnt += 1; + } + } + if row_cnt > 0 { + debug!("Send {} rows", row_cnt); + } + + Ok(row_cnt) + } + + /// return number of rows it actual send(including what's in the buffer) + pub fn send_rows(&mut self, rows: Vec) -> Result { + self.send_buf.extend(rows); + + let row_cnt = self.try_send_all()?; + + Ok(row_cnt) + } +} + impl FlownodeContext { - // return number of rows it actual send(including what's in the buffer) + /// return number of rows it actual send(including what's in the buffer) + /// + /// TODO(discord9): make this concurrent pub fn send(&mut self, table_id: TableId, rows: Vec) -> Result { let sender = self .source_sender - .get(&table_id) + .get_mut(&table_id) .with_context(|| TableNotFoundSnafu { name: table_id.to_string(), })?; - let send_buffer = self.send_buffer.entry(table_id).or_default(); - send_buffer.extend(rows); - let mut row_cnt = 0; - while let Some(row) = send_buffer.pop_front() { - if sender.len() >= BROADCAST_CAP { - break; - } - row_cnt += 1; - sender - .send(row) - .map_err(|err| { - InternalSnafu { - reason: format!( - "Failed to send row to table_id = {:?}, error = {:?}", - table_id, err - ), - } - .build() - }) - .with_context(|_| EvalSnafu)?; - } + // debug!("FlownodeContext::send: trying to send {} rows", rows.len()); + sender.send_rows(rows) + } - Ok(row_cnt) + /// flush all sender's buf + pub fn flush_all_sender(&mut self) -> Result { + self.source_sender + .iter_mut() + .map(|(_table_id, src_sender)| src_sender.try_send_all()) + .try_fold(0, |acc, x| x.map(|x| x + acc)) } } @@ -120,7 +158,7 @@ impl FlownodeContext { sink_table_name: TableName, ) { for source_table_id in source_table_ids { - self.add_source_sender(*source_table_id); + self.add_source_sender_if_not_exist(*source_table_id); self.source_to_tasks .entry(*source_table_id) .or_default() @@ -131,10 +169,9 @@ impl FlownodeContext { self.flow_to_sink.insert(task_id, sink_table_name); } - pub fn add_source_sender(&mut self, table_id: TableId) { - self.source_sender - .entry(table_id) - .or_insert_with(|| broadcast::channel(BROADCAST_CAP).0); + /// try add source sender, if already exist, do nothing + pub fn add_source_sender_if_not_exist(&mut self, table_id: TableId) { + let _sender = self.source_sender.entry(table_id).or_default(); } pub fn add_sink_receiver(&mut self, table_name: TableName) { @@ -143,10 +180,7 @@ impl FlownodeContext { .or_insert_with(mpsc::unbounded_channel::); } - pub fn get_source_by_global_id( - &self, - id: &GlobalId, - ) -> Result<&broadcast::Sender, Error> { + pub fn get_source_by_global_id(&self, id: &GlobalId) -> Result<&SourceSender, Error> { let table_id = self .table_repr .get_by_global_id(id) diff --git a/src/flow/src/adapter/table_source.rs b/src/flow/src/adapter/table_source.rs new file mode 100644 index 000000000000..cfa41f785ac8 --- /dev/null +++ b/src/flow/src/adapter/table_source.rs @@ -0,0 +1,148 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! How to query table information from database + +use common_error::ext::BoxedError; +use common_meta::key::table_info::{TableInfoManager, TableInfoValue}; +use common_meta::key::table_name::{TableNameKey, TableNameManager}; +use itertools::Itertools; +use snafu::{OptionExt, ResultExt}; +use table::metadata::TableId; + +use crate::adapter::error::{ + Error, ExternalSnafu, TableNotFoundMetaSnafu, TableNotFoundSnafu, UnexpectedSnafu, +}; +use crate::adapter::TableName; +use crate::repr::{self, ColumnType, RelationType}; + +/// mapping of table name <-> table id should be query from tableinfo manager +pub struct TableSource { + /// for query `TableId -> TableName` mapping + table_info_manager: TableInfoManager, + table_name_manager: TableNameManager, +} + +impl TableSource { + pub fn new(table_info_manager: TableInfoManager, table_name_manager: TableNameManager) -> Self { + TableSource { + table_info_manager, + table_name_manager, + } + } + + pub async fn get_table_id_from_proto_name( + &self, + name: &greptime_proto::v1::TableName, + ) -> Result { + self.table_name_manager + .get(TableNameKey::new( + &name.catalog_name, + &name.schema_name, + &name.table_name, + )) + .await + .with_context(|_| TableNotFoundMetaSnafu { + msg: format!("Table name = {:?}, couldn't found table id", name), + })? + .with_context(|| UnexpectedSnafu { + reason: format!("Table name = {:?}, couldn't found table id", name), + }) + .map(|id| id.table_id()) + } + + /// If the table havn't been created in database, the tableId returned would be null + pub async fn get_table_id_from_name(&self, name: &TableName) -> Result, Error> { + let ret = self + .table_name_manager + .get(TableNameKey::new(&name[0], &name[1], &name[2])) + .await + .with_context(|_| TableNotFoundMetaSnafu { + msg: format!("Table name = {:?}, couldn't found table id", name), + })? + .map(|id| id.table_id()); + Ok(ret) + } + + /// query metasrv about the table name and table id + pub async fn get_table_name(&self, table_id: &TableId) -> Result { + self.table_info_manager + .get(*table_id) + .await + .map_err(BoxedError::new) + .context(ExternalSnafu)? + .with_context(|| UnexpectedSnafu { + reason: format!("Table id = {:?}, couldn't found table name", table_id), + }) + .map(|name| name.table_name()) + .map(|name| [name.catalog_name, name.schema_name, name.table_name]) + } + + /// query metasrv about the `TableInfoValue` and table id + pub async fn get_table_info_value( + &self, + table_id: &TableId, + ) -> Result, Error> { + Ok(self + .table_info_manager + .get(*table_id) + .await + .with_context(|_| TableNotFoundMetaSnafu { + msg: format!("TableId = {:?}, couldn't found table name", table_id), + })? + .map(|v| v.into_inner())) + } + + pub async fn get_table_name_schema( + &self, + table_id: &TableId, + ) -> Result<(TableName, RelationType), Error> { + let table_info_value = self + .get_table_info_value(table_id) + .await? + .with_context(|| TableNotFoundSnafu { + name: format!("TableId = {:?}, Can't found table info", table_id), + })?; + + let table_name = table_info_value.table_name(); + let table_name = [ + table_name.catalog_name, + table_name.schema_name, + table_name.table_name, + ]; + + let raw_schema = table_info_value.table_info.meta.schema; + let column_types = raw_schema + .column_schemas + .into_iter() + .map(|col| ColumnType { + nullable: col.is_nullable(), + scalar_type: col.data_type, + }) + .collect_vec(); + + let key = table_info_value.table_info.meta.primary_key_indices; + let keys = vec![repr::Key::from(key)]; + + let time_index = raw_schema.timestamp_index; + Ok(( + table_name, + RelationType { + column_types, + keys, + time_index, + }, + )) + } +} diff --git a/src/flow/src/adapter/util.rs b/src/flow/src/adapter/util.rs new file mode 100644 index 000000000000..1946d4265d3f --- /dev/null +++ b/src/flow/src/adapter/util.rs @@ -0,0 +1,60 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use api::helper::ColumnDataTypeWrapper; +use api::v1::{ColumnDataType, ColumnDataTypeExtension, SemanticType}; +use common_error::ext::BoxedError; +use datatypes::schema::ColumnSchema; +use itertools::Itertools; +use snafu::ResultExt; + +use crate::adapter::error::{Error, ExternalSnafu}; + +/// convert `ColumnSchema` lists to it's corresponding proto type +pub fn column_schemas_to_proto( + column_schemas: Vec, + primary_keys: &[String], +) -> Result, Error> { + let column_datatypes: Vec<(ColumnDataType, Option)> = column_schemas + .iter() + .map(|c| { + ColumnDataTypeWrapper::try_from(c.data_type.clone()) + .map(|w| w.to_parts()) + .map_err(BoxedError::new) + .context(ExternalSnafu) + }) + .try_collect()?; + + let ret = column_schemas + .iter() + .zip(column_datatypes) + .map(|(schema, datatype)| { + let semantic_type = if schema.is_time_index() { + SemanticType::Timestamp + } else if primary_keys.contains(&schema.name) { + SemanticType::Tag + } else { + SemanticType::Field + } as i32; + + api::v1::ColumnSchema { + column_name: schema.name.clone(), + datatype: datatype.0 as i32, + semantic_type, + datatype_extension: datatype.1, + } + }) + .collect(); + Ok(ret) +} diff --git a/src/flow/src/adapter/worker.rs b/src/flow/src/adapter/worker.rs index 42da2e3d111d..3e58a4307c79 100644 --- a/src/flow/src/adapter/worker.rs +++ b/src/flow/src/adapter/worker.rs @@ -20,10 +20,10 @@ use std::sync::Arc; use enum_as_inner::EnumAsInner; use hydroflow::scheduled::graph::Hydroflow; -use snafu::{ensure, OptionExt, ResultExt}; +use snafu::{ensure, OptionExt}; use tokio::sync::{broadcast, mpsc, Mutex}; -use crate::adapter::error::{Error, EvalSnafu, FlowAlreadyExistSnafu, InternalSnafu}; +use crate::adapter::error::{Error, FlowAlreadyExistSnafu, InternalSnafu}; use crate::adapter::FlowId; use crate::compute::{Context, DataflowState, ErrCollector}; use crate::expr::GlobalId; @@ -151,6 +151,8 @@ impl WorkerHandle { /// trigger running the worker, will not block, and will run the worker parallelly /// /// will set the current timestamp to `now` for all dataflows before running them + /// + /// the returned error is unrecoverable, and the worker should be shutdown/rebooted pub async fn run_available(&self, now: repr::Timestamp) -> Result<(), Error> { self.itc_client .lock() diff --git a/src/flow/src/repr.rs b/src/flow/src/repr.rs index 85bdfa8e4abb..e918044c0d91 100644 --- a/src/flow/src/repr.rs +++ b/src/flow/src/repr.rs @@ -51,8 +51,9 @@ pub type DiffRow = (Row, Timestamp, Diff); /// Row with key-value pair, timestamp and diff pub type KeyValDiffRow = ((Row, Row), Timestamp, Diff); -/// broadcast channel capacity -pub const BROADCAST_CAP: usize = 1024; +/// broadcast channel capacity, can be important to memory consumption, since this influence how many +/// updates can be buffered in memory in the entire dataflow +pub const BROADCAST_CAP: usize = 8192; /// Convert a value that is or can be converted to Datetime to internal timestamp /// diff --git a/src/flow/src/transform.rs b/src/flow/src/transform.rs index d8c514f92011..8eca0788e8a1 100644 --- a/src/flow/src/transform.rs +++ b/src/flow/src/transform.rs @@ -35,8 +35,8 @@ use substrait_proto::proto::extensions::simple_extension_declaration::MappingTyp use substrait_proto::proto::extensions::SimpleExtensionDeclaration; use crate::adapter::error::{ - Error, ExternalSnafu, InvalidQueryPlanSnafu, InvalidQueryProstSnafu, - InvalidQuerySubstraitSnafu, NotImplementedSnafu, TableNotFoundSnafu, UnexpectedSnafu, + Error, ExternalSnafu, InvalidQueryProstSnafu, NotImplementedSnafu, TableNotFoundSnafu, + UnexpectedSnafu, }; use crate::adapter::FlownodeContext; use crate::expr::GlobalId; @@ -110,12 +110,15 @@ pub async fn sql_to_flow_plan( } .build() })?; - let stmt = QueryLanguageParser::parse_sql(sql, &query_ctx).context(InvalidQueryPlanSnafu)?; + let stmt = QueryLanguageParser::parse_sql(sql, &query_ctx) + .map_err(BoxedError::new) + .context(ExternalSnafu)?; let plan = engine .planner() .plan(stmt, query_ctx) .await - .context(InvalidQueryPlanSnafu)?; + .map_err(BoxedError::new) + .context(ExternalSnafu)?; let LogicalPlan::DfPlan(plan) = plan; let sub_plan = DFLogicalSubstraitConvertor {} .to_sub_plan(&plan)