Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(streaming): watermark on append only table #8207

Merged
merged 3 commits into from
Feb 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/frontend/planner_test/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,7 @@ impl TestCase {
constraints,
if_not_exists,
source_schema,
source_watermarks,
..
} => {
create_table::handle_create_table(
Expand All @@ -371,6 +372,7 @@ impl TestCase {
constraints,
if_not_exists,
source_schema,
source_watermarks,
)
.await?;
}
Expand Down
20 changes: 20 additions & 0 deletions src/frontend/planner_test/tests/testdata/watermark.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,23 @@
└─StreamRowIdGen { row_id_index: 1 }
└─StreamWatermarkFilter { watermark_descs: [idx: 0, expr: (v1 - '00:00:01':Interval)] }
└─StreamSource { source: "t", columns: ["v1", "_row_id"] }
- name: watermark on append only table with source
sql: |
explain create table t (v1 timestamp with time zone, watermark for v1 as v1 - INTERVAL '1' SECOND) with (connector = 'kafka', kafka.topic = 'kafka_3_partition_topic', kafka.brokers = '127.0.0.1:1234', kafka.scan.startup.mode='earliest', appendonly=true) ROW FORMAT JSON;
Copy link
Contributor

@st1page st1page Feb 27, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

emm, our usual usage is without the external connector. So maybe removing it is better?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can keep both behaviors undocumented for test. I'll add a new planner test.

explain_output: |
StreamMaterialize { columns: [v1, _row_id(hidden)], pk_columns: [_row_id], pk_conflict: "no check" }
└─StreamExchange { dist: HashShard(_row_id) }
└─StreamRowIdGen { row_id_index: 1 }
└─StreamWatermarkFilter { watermark_descs: [idx: 0, expr: (v1 - '00:00:01':Interval)] }
└─StreamDml { columns: [v1, _row_id] }
└─StreamSource { source: "t", columns: ["v1", "_row_id"] }
- name: watermark on append only table without source
sql: |
explain create table t (v1 timestamp with time zone, watermark for v1 as v1 - INTERVAL '1' SECOND) with (appendonly=true);
explain_output: |
StreamMaterialize { columns: [v1, _row_id(hidden)], pk_columns: [_row_id], pk_conflict: "no check" }
└─StreamExchange { dist: HashShard(_row_id) }
└─StreamRowIdGen { row_id_index: 1 }
└─StreamWatermarkFilter { watermark_descs: [idx: 0, expr: (v1 - '00:00:01':Interval)] }
└─StreamDml { columns: [v1, _row_id] }
└─StreamSource
12 changes: 9 additions & 3 deletions src/frontend/src/handler/alter_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,14 +91,20 @@ pub async fn handle_add_column(
// Create handler args as if we're creating a new table with the altered definition.
let handler_args = HandlerArgs::new(session.clone(), &definition, "")?;
let col_id_gen = ColumnIdGenerator::new_alter(&original_catalog);
let Statement::CreateTable { columns, constraints, .. } = definition else {
let Statement::CreateTable { columns, constraints, source_watermarks, .. } = definition else {
panic!("unexpected statement type: {:?}", definition);
};

let (graph, table) = {
let context = OptimizerContext::from_handler_args(handler_args);
let (plan, source, table) =
gen_create_table_plan(context, table_name, columns, constraints, col_id_gen)?;
let (plan, source, table) = gen_create_table_plan(
context,
table_name,
columns,
constraints,
col_id_gen,
source_watermarks,
)?;

// We should already have rejected the case where the table has a connector.
assert!(source.is_none());
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/handler/create_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -470,7 +470,7 @@ fn check_and_add_timestamp_column(
}
}

fn bind_source_watermark(
pub(super) fn bind_source_watermark(
session: &SessionImpl,
name: String,
source_watermarks: Vec<SourceWatermark>,
Expand Down
45 changes: 42 additions & 3 deletions src/frontend/src/handler/create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,18 +25,20 @@ use risingwave_common::catalog::{
use risingwave_common::error::{ErrorCode, Result};
use risingwave_pb::catalog::{
ColumnIndex as ProstColumnIndex, Source as ProstSource, StreamSourceInfo, Table as ProstTable,
WatermarkDesc,
};
use risingwave_pb::stream_plan::stream_fragment_graph::Parallelism;
use risingwave_sqlparser::ast::{
ColumnDef, ColumnOption, DataType as AstDataType, ObjectName, SourceSchema, TableConstraint,
ColumnDef, ColumnOption, DataType as AstDataType, ObjectName, SourceSchema, SourceWatermark,
TableConstraint,
};

use super::create_source::resolve_source_schema;
use super::RwPgResponse;
use crate::binder::{bind_data_type, bind_struct_field};
use crate::catalog::table_catalog::TableVersion;
use crate::catalog::{check_valid_column_name, ColumnId};
use crate::handler::create_source::UPSTREAM_SOURCE_KEY;
use crate::handler::create_source::{bind_source_watermark, UPSTREAM_SOURCE_KEY};
use crate::handler::HandlerArgs;
use crate::optimizer::plan_node::LogicalSource;
use crate::optimizer::property::{Order, RequiredDist};
Expand Down Expand Up @@ -285,6 +287,7 @@ pub(crate) async fn gen_create_table_plan_with_source(
columns: Vec<ColumnDef>,
constraints: Vec<TableConstraint>,
source_schema: SourceSchema,
source_watermarks: Vec<SourceWatermark>,
mut col_id_gen: ColumnIdGenerator,
) -> Result<(PlanRef, Option<ProstSource>, ProstTable)> {
let (column_descs, pk_column_id_from_columns) = bind_sql_columns(columns, &mut col_id_gen)?;
Expand All @@ -293,6 +296,15 @@ pub(crate) async fn gen_create_table_plan_with_source(
let (mut columns, mut pk_column_ids, mut row_id_index) =
bind_sql_table_constraints(column_descs, pk_column_id_from_columns, constraints)?;

let watermark_descs = bind_source_watermark(
context.session_ctx(),
table_name.real_value(),
source_watermarks,
&columns,
)?;
// TODO(yuhao): allow multiple watermark on source.
assert!(watermark_descs.len() <= 1);

let definition = context.normalized_sql().to_owned();

let source_info = resolve_source_schema(
Expand All @@ -313,6 +325,7 @@ pub(crate) async fn gen_create_table_plan_with_source(
row_id_index,
Some(source_info),
definition,
watermark_descs,
Some(col_id_gen.into_version()),
)
}
Expand All @@ -325,16 +338,19 @@ pub(crate) fn gen_create_table_plan(
columns: Vec<ColumnDef>,
constraints: Vec<TableConstraint>,
mut col_id_gen: ColumnIdGenerator,
source_watermarks: Vec<SourceWatermark>,
) -> Result<(PlanRef, Option<ProstSource>, ProstTable)> {
let definition = context.normalized_sql().to_owned();
let (column_descs, pk_column_id_from_columns) = bind_sql_columns(columns, &mut col_id_gen)?;

gen_create_table_plan_without_bind(
context,
table_name,
column_descs,
pk_column_id_from_columns,
constraints,
definition,
source_watermarks,
Some(col_id_gen.into_version()),
)
}
Expand All @@ -347,11 +363,19 @@ pub(crate) fn gen_create_table_plan_without_bind(
pk_column_id_from_columns: Option<ColumnId>,
constraints: Vec<TableConstraint>,
definition: String,
source_watermarks: Vec<SourceWatermark>,
version: Option<TableVersion>,
) -> Result<(PlanRef, Option<ProstSource>, ProstTable)> {
let (columns, pk_column_ids, row_id_index) =
bind_sql_table_constraints(column_descs, pk_column_id_from_columns, constraints)?;

let watermark_descs = bind_source_watermark(
context.session_ctx(),
table_name.real_value(),
source_watermarks,
&columns,
)?;

gen_table_plan_inner(
context.into(),
table_name,
Expand All @@ -360,6 +384,7 @@ pub(crate) fn gen_create_table_plan_without_bind(
row_id_index,
None,
definition,
watermark_descs,
version,
)
}
Expand All @@ -373,6 +398,7 @@ fn gen_table_plan_inner(
row_id_index: Option<usize>,
source_info: Option<StreamSourceInfo>,
definition: String,
watermark_descs: Vec<WatermarkDesc>,
version: Option<TableVersion>, /* TODO: this should always be `Some` if we support `ALTER
* TABLE` for `CREATE TABLE AS`. */
) -> Result<(PlanRef, Option<ProstSource>, ProstTable)> {
Expand All @@ -395,7 +421,7 @@ fn gen_table_plan_inner(
properties: context.with_options().inner().clone().into_iter().collect(),
info: Some(source_info),
owner: session.user_id(),
watermark_descs: vec![],
watermark_descs: watermark_descs.clone(),
});

let source_catalog = source.as_ref().map(|source| Rc::new((source).into()));
Expand All @@ -408,6 +434,7 @@ fn gen_table_plan_inner(
pk_column_ids,
row_id_index,
false,
true,
context.clone(),
)
.into();
Expand Down Expand Up @@ -438,12 +465,21 @@ fn gen_table_plan_inner(
.into());
}

if !append_only && !watermark_descs.is_empty() {
return Err(ErrorCode::NotSupported(
"Defining watermarks on table requires the table to be append only.".to_owned(),
"Set the option `appendonly=true`".to_owned(),
)
.into());
}

Comment on lines +468 to +475
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@xx01cyx append only part

let materialize = plan_root.gen_table_plan(
name,
columns,
definition,
row_id_index,
append_only,
watermark_descs,
version,
)?;

Expand All @@ -460,6 +496,7 @@ pub async fn handle_create_table(
constraints: Vec<TableConstraint>,
if_not_exists: bool,
source_schema: Option<SourceSchema>,
source_watermarks: Vec<SourceWatermark>,
) -> Result<RwPgResponse> {
let session = handler_args.session.clone();

Expand Down Expand Up @@ -487,6 +524,7 @@ pub async fn handle_create_table(
columns,
constraints,
source_schema,
source_watermarks,
col_id_gen,
)
.await?
Expand All @@ -497,6 +535,7 @@ pub async fn handle_create_table(
columns,
constraints,
col_id_gen,
source_watermarks,
)?,
};
let mut graph = build_graph(plan);
Expand Down
1 change: 1 addition & 0 deletions src/frontend/src/handler/create_table_as.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ pub async fn handle_create_as(
None,
vec![],
"".to_owned(), // TODO: support `SHOW CREATE TABLE` for `CREATE TABLE AS`
vec![], // No watermark should be defined in for `CREATE TABLE AS`
Some(col_id_gen.into_version()),
)?;
let mut graph = build_graph(plan);
Expand Down
3 changes: 3 additions & 0 deletions src/frontend/src/handler/explain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ pub async fn handle_explain(
columns,
constraints,
source_schema,
source_watermarks,
..
} => match check_create_table_with_source(&handler_args.with_options, source_schema)? {
Some(s) => {
Expand All @@ -77,6 +78,7 @@ pub async fn handle_explain(
columns,
constraints,
s,
source_watermarks,
ColumnIdGenerator::new_initial(),
)
.await?
Expand All @@ -89,6 +91,7 @@ pub async fn handle_explain(
columns,
constraints,
ColumnIdGenerator::new_initial(),
source_watermarks,
)?
.0
}
Expand Down
2 changes: 2 additions & 0 deletions src/frontend/src/handler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ pub async fn handle(
temporary,
if_not_exists,
source_schema,
source_watermarks,
} => {
if or_replace {
return Err(ErrorCode::NotImplemented(
Expand Down Expand Up @@ -229,6 +230,7 @@ pub async fn handle(
constraints,
if_not_exists,
source_schema,
source_watermarks,
)
.await
}
Expand Down
10 changes: 9 additions & 1 deletion src/frontend/src/optimizer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,12 @@ use property::Order;
use risingwave_common::catalog::{ColumnCatalog, Field, Schema};
use risingwave_common::error::{ErrorCode, Result};
use risingwave_common::util::iter_util::ZipEqDebug;
use risingwave_pb::catalog::WatermarkDesc;

use self::heuristic_optimizer::{ApplyOrder, HeuristicOptimizer};
use self::plan_node::{
BatchProject, Convention, LogicalProject, StreamDml, StreamMaterialize, StreamProject,
StreamRowIdGen, StreamSink,
StreamRowIdGen, StreamSink, StreamWatermarkFilter,
};
#[cfg(debug_assertions)]
use self::plan_visitor::InputRefValidator;
Expand Down Expand Up @@ -700,6 +701,7 @@ impl PlanRoot {
definition: String,
row_id_index: Option<usize>,
append_only: bool,
watermark_descs: Vec<WatermarkDesc>,
version: Option<TableVersion>,
) -> Result<StreamMaterialize> {
let mut stream_plan = self.gen_stream_plan()?;
Expand All @@ -711,6 +713,12 @@ impl PlanRoot {
columns.iter().map(|c| c.column_desc.clone()).collect(),
)
.into();

// Add WatermarkFilter node.
if !watermark_descs.is_empty() {
stream_plan = StreamWatermarkFilter::new(stream_plan, watermark_descs).into();
}

// Add RowIDGen node if needed.
if let Some(row_id_index) = row_id_index {
stream_plan = StreamRowIdGen::new(stream_plan, row_id_index).into();
Expand Down
2 changes: 2 additions & 0 deletions src/frontend/src/optimizer/plan_node/generic/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ pub struct Source {
pub row_id_index: Option<usize>,
/// Whether the "SourceNode" should generate the row id column for append only source
pub gen_row_id: bool,
/// True if it is a source created when creating table with a source.
pub for_table: bool,
}

impl GenericPlanNode for Source {
Expand Down
6 changes: 5 additions & 1 deletion src/frontend/src/optimizer/plan_node/logical_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ impl LogicalSource {
pk_col_ids: Vec<ColumnId>,
row_id_index: Option<usize>,
gen_row_id: bool,
for_table: bool,
ctx: OptimizerContextRef,
) -> Self {
let core = generic::Source {
Expand All @@ -70,6 +71,7 @@ impl LogicalSource {
pk_col_ids,
row_id_index,
gen_row_id,
for_table,
};

let schema = core.schema();
Expand Down Expand Up @@ -356,9 +358,11 @@ impl ToBatch for LogicalSource {
impl ToStream for LogicalSource {
fn to_stream(&self, _ctx: &mut ToStreamContext) -> Result<PlanRef> {
let mut plan: PlanRef = StreamSource::new(self.clone()).into();
if let Some(catalog) = self.source_catalog() && !catalog.watermark_descs.is_empty(){
if let Some(catalog) = self.source_catalog() && !catalog.watermark_descs.is_empty() && !self.core.for_table{
plan = StreamWatermarkFilter::new(plan, catalog.watermark_descs.clone()).into();
}

assert!(!(self.core.gen_row_id && self.core.for_table));
Comment on lines +361 to +365
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May add some comments here.

if let Some(row_id_index) = self.core.row_id_index && self.core.gen_row_id {
plan = StreamRowIdGen::new(plan, row_id_index).into();
}
Expand Down
1 change: 1 addition & 0 deletions src/frontend/src/planner/relation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ impl Planner {
pk_col_ids,
row_id_index,
gen_row_id,
false,
self.ctx(),
)
.into())
Expand Down
Loading