diff --git a/src/frontend/planner_test/src/lib.rs b/src/frontend/planner_test/src/lib.rs index d619297dbdf9..a5159654e846 100644 --- a/src/frontend/planner_test/src/lib.rs +++ b/src/frontend/planner_test/src/lib.rs @@ -362,6 +362,7 @@ impl TestCase { constraints, if_not_exists, source_schema, + source_watermarks, .. } => { create_table::handle_create_table( @@ -371,6 +372,7 @@ impl TestCase { constraints, if_not_exists, source_schema, + source_watermarks, ) .await?; } diff --git a/src/frontend/planner_test/tests/testdata/watermark.yaml b/src/frontend/planner_test/tests/testdata/watermark.yaml index b18dfa545f18..409bd42d6639 100644 --- a/src/frontend/planner_test/tests/testdata/watermark.yaml +++ b/src/frontend/planner_test/tests/testdata/watermark.yaml @@ -13,3 +13,23 @@ └─StreamRowIdGen { row_id_index: 1 } └─StreamWatermarkFilter { watermark_descs: [idx: 0, expr: (v1 - '00:00:01':Interval)] } └─StreamSource { source: "t", columns: ["v1", "_row_id"] } +- name: watermark on append only table with source + sql: | + explain create table t (v1 timestamp with time zone, watermark for v1 as v1 - INTERVAL '1' SECOND) with (connector = 'kafka', kafka.topic = 'kafka_3_partition_topic', kafka.brokers = '127.0.0.1:1234', kafka.scan.startup.mode='earliest', appendonly=true) ROW FORMAT JSON; + explain_output: | + StreamMaterialize { columns: [v1, _row_id(hidden)], pk_columns: [_row_id], pk_conflict: "no check" } + └─StreamExchange { dist: HashShard(_row_id) } + └─StreamRowIdGen { row_id_index: 1 } + └─StreamWatermarkFilter { watermark_descs: [idx: 0, expr: (v1 - '00:00:01':Interval)] } + └─StreamDml { columns: [v1, _row_id] } + └─StreamSource { source: "t", columns: ["v1", "_row_id"] } +- name: watermark on append only table without source + sql: | + explain create table t (v1 timestamp with time zone, watermark for v1 as v1 - INTERVAL '1' SECOND) with (appendonly=true); + explain_output: | + StreamMaterialize { columns: [v1, _row_id(hidden)], pk_columns: [_row_id], pk_conflict: "no check" } + └─StreamExchange { dist: HashShard(_row_id) } + └─StreamRowIdGen { row_id_index: 1 } + └─StreamWatermarkFilter { watermark_descs: [idx: 0, expr: (v1 - '00:00:01':Interval)] } + └─StreamDml { columns: [v1, _row_id] } + └─StreamSource diff --git a/src/frontend/src/handler/alter_table.rs b/src/frontend/src/handler/alter_table.rs index 2b38a6e61af9..0c9be1f66bc1 100644 --- a/src/frontend/src/handler/alter_table.rs +++ b/src/frontend/src/handler/alter_table.rs @@ -91,14 +91,20 @@ pub async fn handle_add_column( // Create handler args as if we're creating a new table with the altered definition. let handler_args = HandlerArgs::new(session.clone(), &definition, "")?; let col_id_gen = ColumnIdGenerator::new_alter(&original_catalog); - let Statement::CreateTable { columns, constraints, .. } = definition else { + let Statement::CreateTable { columns, constraints, source_watermarks, .. } = definition else { panic!("unexpected statement type: {:?}", definition); }; let (graph, table) = { let context = OptimizerContext::from_handler_args(handler_args); - let (plan, source, table) = - gen_create_table_plan(context, table_name, columns, constraints, col_id_gen)?; + let (plan, source, table) = gen_create_table_plan( + context, + table_name, + columns, + constraints, + col_id_gen, + source_watermarks, + )?; // We should already have rejected the case where the table has a connector. assert!(source.is_none()); diff --git a/src/frontend/src/handler/create_source.rs b/src/frontend/src/handler/create_source.rs index 280495b2021b..91267873b4a4 100644 --- a/src/frontend/src/handler/create_source.rs +++ b/src/frontend/src/handler/create_source.rs @@ -470,7 +470,7 @@ fn check_and_add_timestamp_column( } } -fn bind_source_watermark( +pub(super) fn bind_source_watermark( session: &SessionImpl, name: String, source_watermarks: Vec, diff --git a/src/frontend/src/handler/create_table.rs b/src/frontend/src/handler/create_table.rs index 089017e53335..1c7db1b7fa8e 100644 --- a/src/frontend/src/handler/create_table.rs +++ b/src/frontend/src/handler/create_table.rs @@ -25,10 +25,12 @@ use risingwave_common::catalog::{ use risingwave_common::error::{ErrorCode, Result}; use risingwave_pb::catalog::{ ColumnIndex as ProstColumnIndex, Source as ProstSource, StreamSourceInfo, Table as ProstTable, + WatermarkDesc, }; use risingwave_pb::stream_plan::stream_fragment_graph::Parallelism; use risingwave_sqlparser::ast::{ - ColumnDef, ColumnOption, DataType as AstDataType, ObjectName, SourceSchema, TableConstraint, + ColumnDef, ColumnOption, DataType as AstDataType, ObjectName, SourceSchema, SourceWatermark, + TableConstraint, }; use super::create_source::resolve_source_schema; @@ -36,7 +38,7 @@ use super::RwPgResponse; use crate::binder::{bind_data_type, bind_struct_field}; use crate::catalog::table_catalog::TableVersion; use crate::catalog::{check_valid_column_name, ColumnId}; -use crate::handler::create_source::UPSTREAM_SOURCE_KEY; +use crate::handler::create_source::{bind_source_watermark, UPSTREAM_SOURCE_KEY}; use crate::handler::HandlerArgs; use crate::optimizer::plan_node::LogicalSource; use crate::optimizer::property::{Order, RequiredDist}; @@ -285,6 +287,7 @@ pub(crate) async fn gen_create_table_plan_with_source( columns: Vec, constraints: Vec, source_schema: SourceSchema, + source_watermarks: Vec, mut col_id_gen: ColumnIdGenerator, ) -> Result<(PlanRef, Option, ProstTable)> { let (column_descs, pk_column_id_from_columns) = bind_sql_columns(columns, &mut col_id_gen)?; @@ -293,6 +296,15 @@ pub(crate) async fn gen_create_table_plan_with_source( let (mut columns, mut pk_column_ids, mut row_id_index) = bind_sql_table_constraints(column_descs, pk_column_id_from_columns, constraints)?; + let watermark_descs = bind_source_watermark( + context.session_ctx(), + table_name.real_value(), + source_watermarks, + &columns, + )?; + // TODO(yuhao): allow multiple watermark on source. + assert!(watermark_descs.len() <= 1); + let definition = context.normalized_sql().to_owned(); let source_info = resolve_source_schema( @@ -313,6 +325,7 @@ pub(crate) async fn gen_create_table_plan_with_source( row_id_index, Some(source_info), definition, + watermark_descs, Some(col_id_gen.into_version()), ) } @@ -325,9 +338,11 @@ pub(crate) fn gen_create_table_plan( columns: Vec, constraints: Vec, mut col_id_gen: ColumnIdGenerator, + source_watermarks: Vec, ) -> Result<(PlanRef, Option, ProstTable)> { let definition = context.normalized_sql().to_owned(); let (column_descs, pk_column_id_from_columns) = bind_sql_columns(columns, &mut col_id_gen)?; + gen_create_table_plan_without_bind( context, table_name, @@ -335,6 +350,7 @@ pub(crate) fn gen_create_table_plan( pk_column_id_from_columns, constraints, definition, + source_watermarks, Some(col_id_gen.into_version()), ) } @@ -347,11 +363,19 @@ pub(crate) fn gen_create_table_plan_without_bind( pk_column_id_from_columns: Option, constraints: Vec, definition: String, + source_watermarks: Vec, version: Option, ) -> Result<(PlanRef, Option, ProstTable)> { let (columns, pk_column_ids, row_id_index) = bind_sql_table_constraints(column_descs, pk_column_id_from_columns, constraints)?; + let watermark_descs = bind_source_watermark( + context.session_ctx(), + table_name.real_value(), + source_watermarks, + &columns, + )?; + gen_table_plan_inner( context.into(), table_name, @@ -360,6 +384,7 @@ pub(crate) fn gen_create_table_plan_without_bind( row_id_index, None, definition, + watermark_descs, version, ) } @@ -373,6 +398,7 @@ fn gen_table_plan_inner( row_id_index: Option, source_info: Option, definition: String, + watermark_descs: Vec, version: Option, /* TODO: this should always be `Some` if we support `ALTER * TABLE` for `CREATE TABLE AS`. */ ) -> Result<(PlanRef, Option, ProstTable)> { @@ -395,7 +421,7 @@ fn gen_table_plan_inner( properties: context.with_options().inner().clone().into_iter().collect(), info: Some(source_info), owner: session.user_id(), - watermark_descs: vec![], + watermark_descs: watermark_descs.clone(), }); let source_catalog = source.as_ref().map(|source| Rc::new((source).into())); @@ -408,6 +434,7 @@ fn gen_table_plan_inner( pk_column_ids, row_id_index, false, + true, context.clone(), ) .into(); @@ -438,12 +465,21 @@ fn gen_table_plan_inner( .into()); } + if !append_only && !watermark_descs.is_empty() { + return Err(ErrorCode::NotSupported( + "Defining watermarks on table requires the table to be append only.".to_owned(), + "Set the option `appendonly=true`".to_owned(), + ) + .into()); + } + let materialize = plan_root.gen_table_plan( name, columns, definition, row_id_index, append_only, + watermark_descs, version, )?; @@ -460,6 +496,7 @@ pub async fn handle_create_table( constraints: Vec, if_not_exists: bool, source_schema: Option, + source_watermarks: Vec, ) -> Result { let session = handler_args.session.clone(); @@ -487,6 +524,7 @@ pub async fn handle_create_table( columns, constraints, source_schema, + source_watermarks, col_id_gen, ) .await? @@ -497,6 +535,7 @@ pub async fn handle_create_table( columns, constraints, col_id_gen, + source_watermarks, )?, }; let mut graph = build_graph(plan); diff --git a/src/frontend/src/handler/create_table_as.rs b/src/frontend/src/handler/create_table_as.rs index 4ab605c78884..9ba23aaeaf6d 100644 --- a/src/frontend/src/handler/create_table_as.rs +++ b/src/frontend/src/handler/create_table_as.rs @@ -93,6 +93,7 @@ pub async fn handle_create_as( None, vec![], "".to_owned(), // TODO: support `SHOW CREATE TABLE` for `CREATE TABLE AS` + vec![], // No watermark should be defined in for `CREATE TABLE AS` Some(col_id_gen.into_version()), )?; let mut graph = build_graph(plan); diff --git a/src/frontend/src/handler/explain.rs b/src/frontend/src/handler/explain.rs index f52ff5c37552..248d0fdf94d7 100644 --- a/src/frontend/src/handler/explain.rs +++ b/src/frontend/src/handler/explain.rs @@ -68,6 +68,7 @@ pub async fn handle_explain( columns, constraints, source_schema, + source_watermarks, .. } => match check_create_table_with_source(&handler_args.with_options, source_schema)? { Some(s) => { @@ -77,6 +78,7 @@ pub async fn handle_explain( columns, constraints, s, + source_watermarks, ColumnIdGenerator::new_initial(), ) .await? @@ -89,6 +91,7 @@ pub async fn handle_explain( columns, constraints, ColumnIdGenerator::new_initial(), + source_watermarks, )? .0 } diff --git a/src/frontend/src/handler/mod.rs b/src/frontend/src/handler/mod.rs index 5a95c796e447..320de98e0233 100644 --- a/src/frontend/src/handler/mod.rs +++ b/src/frontend/src/handler/mod.rs @@ -197,6 +197,7 @@ pub async fn handle( temporary, if_not_exists, source_schema, + source_watermarks, } => { if or_replace { return Err(ErrorCode::NotImplemented( @@ -229,6 +230,7 @@ pub async fn handle( constraints, if_not_exists, source_schema, + source_watermarks, ) .await } diff --git a/src/frontend/src/optimizer/mod.rs b/src/frontend/src/optimizer/mod.rs index 3775ad59ae8e..6fb45fb15449 100644 --- a/src/frontend/src/optimizer/mod.rs +++ b/src/frontend/src/optimizer/mod.rs @@ -35,11 +35,12 @@ use property::Order; use risingwave_common::catalog::{ColumnCatalog, Field, Schema}; use risingwave_common::error::{ErrorCode, Result}; use risingwave_common::util::iter_util::ZipEqDebug; +use risingwave_pb::catalog::WatermarkDesc; use self::heuristic_optimizer::{ApplyOrder, HeuristicOptimizer}; use self::plan_node::{ BatchProject, Convention, LogicalProject, StreamDml, StreamMaterialize, StreamProject, - StreamRowIdGen, StreamSink, + StreamRowIdGen, StreamSink, StreamWatermarkFilter, }; #[cfg(debug_assertions)] use self::plan_visitor::InputRefValidator; @@ -700,6 +701,7 @@ impl PlanRoot { definition: String, row_id_index: Option, append_only: bool, + watermark_descs: Vec, version: Option, ) -> Result { let mut stream_plan = self.gen_stream_plan()?; @@ -711,6 +713,12 @@ impl PlanRoot { columns.iter().map(|c| c.column_desc.clone()).collect(), ) .into(); + + // Add WatermarkFilter node. + if !watermark_descs.is_empty() { + stream_plan = StreamWatermarkFilter::new(stream_plan, watermark_descs).into(); + } + // Add RowIDGen node if needed. if let Some(row_id_index) = row_id_index { stream_plan = StreamRowIdGen::new(stream_plan, row_id_index).into(); diff --git a/src/frontend/src/optimizer/plan_node/generic/source.rs b/src/frontend/src/optimizer/plan_node/generic/source.rs index 0e617b67199d..50c120daf710 100644 --- a/src/frontend/src/optimizer/plan_node/generic/source.rs +++ b/src/frontend/src/optimizer/plan_node/generic/source.rs @@ -38,6 +38,8 @@ pub struct Source { pub row_id_index: Option, /// Whether the "SourceNode" should generate the row id column for append only source pub gen_row_id: bool, + /// True if it is a source created when creating table with a source. + pub for_table: bool, } impl GenericPlanNode for Source { diff --git a/src/frontend/src/optimizer/plan_node/logical_source.rs b/src/frontend/src/optimizer/plan_node/logical_source.rs index 958d231fe6a9..24f8ddec34cb 100644 --- a/src/frontend/src/optimizer/plan_node/logical_source.rs +++ b/src/frontend/src/optimizer/plan_node/logical_source.rs @@ -62,6 +62,7 @@ impl LogicalSource { pk_col_ids: Vec, row_id_index: Option, gen_row_id: bool, + for_table: bool, ctx: OptimizerContextRef, ) -> Self { let core = generic::Source { @@ -70,6 +71,7 @@ impl LogicalSource { pk_col_ids, row_id_index, gen_row_id, + for_table, }; let schema = core.schema(); @@ -356,9 +358,11 @@ impl ToBatch for LogicalSource { impl ToStream for LogicalSource { fn to_stream(&self, _ctx: &mut ToStreamContext) -> Result { let mut plan: PlanRef = StreamSource::new(self.clone()).into(); - if let Some(catalog) = self.source_catalog() && !catalog.watermark_descs.is_empty(){ + if let Some(catalog) = self.source_catalog() && !catalog.watermark_descs.is_empty() && !self.core.for_table{ plan = StreamWatermarkFilter::new(plan, catalog.watermark_descs.clone()).into(); } + + assert!(!(self.core.gen_row_id && self.core.for_table)); if let Some(row_id_index) = self.core.row_id_index && self.core.gen_row_id { plan = StreamRowIdGen::new(plan, row_id_index).into(); } diff --git a/src/frontend/src/planner/relation.rs b/src/frontend/src/planner/relation.rs index 3146fd20faba..8d9c3e9ee41a 100644 --- a/src/frontend/src/planner/relation.rs +++ b/src/frontend/src/planner/relation.rs @@ -90,6 +90,7 @@ impl Planner { pk_col_ids, row_id_index, gen_row_id, + false, self.ctx(), ) .into()) diff --git a/src/sqlparser/src/ast/mod.rs b/src/sqlparser/src/ast/mod.rs index 67ad3a974d8d..6cd784257879 100644 --- a/src/sqlparser/src/ast/mod.rs +++ b/src/sqlparser/src/ast/mod.rs @@ -951,6 +951,8 @@ pub enum Statement { with_options: Vec, /// Optional schema of the external source with which the table is created source_schema: Option, + /// The watermark defined on source. + source_watermarks: Vec, /// `AS ( query )` query: Option>, }, @@ -1300,6 +1302,7 @@ impl fmt::Display for Statement { if_not_exists, temporary, source_schema, + source_watermarks, query, } => { // We want to allow the following options @@ -1318,11 +1321,7 @@ impl fmt::Display for Statement { name = name, )?; if !columns.is_empty() || !constraints.is_empty() { - write!(f, " ({}", display_comma_separated(columns))?; - if !columns.is_empty() && !constraints.is_empty() { - write!(f, ", ")?; - } - write!(f, "{})", display_comma_separated(constraints))?; + write!(f, " {}", fmt_create_items(columns, constraints, source_watermarks)?)?; } else if query.is_none() { // PostgreSQL allows `CREATE TABLE t ();`, but requires empty parens write!(f, " ()")?; diff --git a/src/sqlparser/src/ast/statement.rs b/src/sqlparser/src/ast/statement.rs index 6842c5e53e23..830727495b50 100644 --- a/src/sqlparser/src/ast/statement.rs +++ b/src/sqlparser/src/ast/statement.rs @@ -375,37 +375,34 @@ impl ParseTo for CreateSourceStatement { } } +pub(super) fn fmt_create_items( + columns: &[ColumnDef], + constraints: &[TableConstraint], + watermarks: &[SourceWatermark], +) -> std::result::Result { + let mut items = String::new(); + let has_items = !columns.is_empty() || !constraints.is_empty() || !watermarks.is_empty(); + has_items.then(|| write!(&mut items, "(")); + write!(&mut items, "{}", display_comma_separated(columns))?; + if !columns.is_empty() && (!constraints.is_empty() || !watermarks.is_empty()) { + write!(&mut items, ", ")?; + } + write!(&mut items, "{}", display_comma_separated(constraints))?; + if !columns.is_empty() && !constraints.is_empty() && !watermarks.is_empty() { + write!(&mut items, ", ")?; + } + write!(&mut items, "{}", display_comma_separated(watermarks))?; + has_items.then(|| write!(&mut items, ")")); + Ok(items) +} + impl fmt::Display for CreateSourceStatement { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { let mut v: Vec = vec![]; impl_fmt_display!(if_not_exists => [Keyword::IF, Keyword::NOT, Keyword::EXISTS], v, self); impl_fmt_display!(source_name, v, self); - // Items - let mut items = String::new(); - let has_items = !self.columns.is_empty() - || !self.constraints.is_empty() - || !self.source_watermarks.is_empty(); - has_items.then(|| write!(&mut items, "(")); - write!(&mut items, "{}", display_comma_separated(&self.columns))?; - if !self.columns.is_empty() - && (!self.constraints.is_empty() || !self.source_watermarks.is_empty()) - { - write!(&mut items, ", ")?; - } - write!(&mut items, "{}", display_comma_separated(&self.constraints))?; - if !self.columns.is_empty() - && !self.constraints.is_empty() - && !self.source_watermarks.is_empty() - { - write!(&mut items, ", ")?; - } - write!( - &mut items, - "{}", - display_comma_separated(&self.source_watermarks) - )?; - has_items.then(|| write!(&mut items, ")")); + let items = fmt_create_items(&self.columns, &self.constraints, &self.source_watermarks)?; if !items.is_empty() { v.push(items); } diff --git a/src/sqlparser/src/parser.rs b/src/sqlparser/src/parser.rs index d793f6e5092a..49d9eea3faa9 100644 --- a/src/sqlparser/src/parser.rs +++ b/src/sqlparser/src/parser.rs @@ -1950,8 +1950,8 @@ impl Parser { ) -> Result { let if_not_exists = self.parse_keywords(&[Keyword::IF, Keyword::NOT, Keyword::EXISTS]); let table_name = self.parse_object_name()?; - // parse optional column list (schema) - let (columns, constraints) = self.parse_columns()?; + // parse optional column list (schema) and watermarks on source. + let (columns, constraints, source_watermarks) = self.parse_columns_with_watermark()?; // PostgreSQL supports `WITH ( options )`, before `AS` let with_options = self.parse_with_properties()?; @@ -1999,6 +1999,11 @@ impl Parser { // Parse optional `AS ( query )` let query = if self.parse_keyword(Keyword::AS) { + if !source_watermarks.is_empty() { + return Err(ParserError::ParserError( + "Watermarks can't be defined on table created by CREATE TABLE AS".to_string(), + )); + } Some(Box::new(self.parse_query()?)) } else { None @@ -2013,23 +2018,12 @@ impl Parser { or_replace, if_not_exists, source_schema, + source_watermarks, query, }) } - pub fn parse_columns(&mut self) -> Result<(Vec, Vec), ParserError> { - let (column_refs, table_constraints, _) = self.parse_columns_inner(true)?; - Ok((column_refs, table_constraints)) - } - pub fn parse_columns_with_watermark(&mut self) -> Result { - self.parse_columns_inner(true) - } - - fn parse_columns_inner( - &mut self, - with_watermark: bool, - ) -> Result { let mut columns = vec![]; let mut constraints = vec![]; let mut watermarks = vec![]; @@ -2040,7 +2034,7 @@ impl Parser { loop { if let Some(constraint) = self.parse_optional_table_constraint()? { constraints.push(constraint); - } else if with_watermark && let Some(watermark) = self.parse_optional_watermark()? { + } else if let Some(watermark) = self.parse_optional_watermark()? { watermarks.push(watermark); if watermarks.len() > 1 { // TODO(yuhao): allow multiple watermark on source.