Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(streaming): watermark on append only table #8207

Merged
merged 3 commits into from
Feb 28, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/frontend/planner_test/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,7 @@ impl TestCase {
constraints,
if_not_exists,
source_schema,
source_watermarks,
..
} => {
create_table::handle_create_table(
Expand All @@ -371,6 +372,7 @@ impl TestCase {
constraints,
if_not_exists,
source_schema,
source_watermarks,
)
.await?;
}
Expand Down
10 changes: 10 additions & 0 deletions src/frontend/planner_test/tests/testdata/watermark.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,13 @@
└─StreamRowIdGen { row_id_index: 1 }
└─StreamWatermarkFilter { watermark_descs: [idx: 0, expr: (v1 - '00:00:01':Interval)] }
└─StreamSource { source: "t", columns: ["v1", "_row_id"] }
- name: watermark on append only table
sql: |
explain create table t (v1 timestamp with time zone, watermark for v1 as v1 - INTERVAL '1' SECOND) with (connector = 'kafka', kafka.topic = 'kafka_3_partition_topic', kafka.brokers = '127.0.0.1:1234', kafka.scan.startup.mode='earliest', appendonly=true) ROW FORMAT JSON;
Copy link
Contributor

@st1page st1page Feb 27, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

emm, our usual usage is without the external connector. So maybe removing it is better?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can keep both behaviors undocumented for test. I'll add a new planner test.

explain_output: |
StreamMaterialize { columns: [v1, _row_id(hidden)], pk_columns: [_row_id], pk_conflict: "no check" }
└─StreamExchange { dist: HashShard(_row_id) }
└─StreamRowIdGen { row_id_index: 1 }
└─StreamWatermarkFilter { watermark_descs: [idx: 0, expr: (v1 - '00:00:01':Interval)] }
└─StreamDml { columns: [v1, _row_id] }
└─StreamSource { source: "t", columns: ["v1", "_row_id"] }
2 changes: 1 addition & 1 deletion src/frontend/src/handler/create_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -470,7 +470,7 @@ fn check_and_add_timestamp_column(
}
}

fn bind_source_watermark(
pub(super) fn bind_source_watermark(
session: &SessionImpl,
name: String,
source_watermarks: Vec<SourceWatermark>,
Expand Down
33 changes: 30 additions & 3 deletions src/frontend/src/handler/create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,18 +25,20 @@ use risingwave_common::catalog::{
use risingwave_common::error::{ErrorCode, Result};
use risingwave_pb::catalog::{
ColumnIndex as ProstColumnIndex, Source as ProstSource, StreamSourceInfo, Table as ProstTable,
WatermarkDesc,
};
use risingwave_pb::stream_plan::stream_fragment_graph::Parallelism;
use risingwave_sqlparser::ast::{
ColumnDef, ColumnOption, DataType as AstDataType, ObjectName, SourceSchema, TableConstraint,
ColumnDef, ColumnOption, DataType as AstDataType, ObjectName, SourceSchema, SourceWatermark,
TableConstraint,
};

use super::create_source::resolve_source_schema;
use super::RwPgResponse;
use crate::binder::{bind_data_type, bind_struct_field};
use crate::catalog::table_catalog::TableVersion;
use crate::catalog::{check_valid_column_name, ColumnId};
use crate::handler::create_source::UPSTREAM_SOURCE_KEY;
use crate::handler::create_source::{bind_source_watermark, UPSTREAM_SOURCE_KEY};
use crate::handler::HandlerArgs;
use crate::optimizer::plan_node::LogicalSource;
use crate::optimizer::property::{Order, RequiredDist};
Expand Down Expand Up @@ -285,6 +287,7 @@ pub(crate) async fn gen_create_table_plan_with_source(
columns: Vec<ColumnDef>,
constraints: Vec<TableConstraint>,
source_schema: SourceSchema,
source_watermarks: Vec<SourceWatermark>,
mut col_id_gen: ColumnIdGenerator,
) -> Result<(PlanRef, Option<ProstSource>, ProstTable)> {
let (column_descs, pk_column_id_from_columns) = bind_sql_columns(columns, &mut col_id_gen)?;
Expand All @@ -293,6 +296,15 @@ pub(crate) async fn gen_create_table_plan_with_source(
let (mut columns, mut pk_column_ids, mut row_id_index) =
bind_sql_table_constraints(column_descs, pk_column_id_from_columns, constraints)?;

let watermark_descs = bind_source_watermark(
context.session_ctx(),
table_name.real_value(),
source_watermarks,
&columns,
)?;
// TODO(yuhao): allow multiple watermark on source.
assert!(watermark_descs.len() <= 1);

let definition = context.normalized_sql().to_owned();

let source_info = resolve_source_schema(
Expand All @@ -313,6 +325,7 @@ pub(crate) async fn gen_create_table_plan_with_source(
row_id_index,
Some(source_info),
definition,
watermark_descs,
Some(col_id_gen.into_version()),
)
}
Expand Down Expand Up @@ -360,6 +373,7 @@ pub(crate) fn gen_create_table_plan_without_bind(
row_id_index,
None,
definition,
vec![],
version,
)
}
Expand All @@ -373,6 +387,7 @@ fn gen_table_plan_inner(
row_id_index: Option<usize>,
source_info: Option<StreamSourceInfo>,
definition: String,
watermark_descs: Vec<WatermarkDesc>,
version: Option<TableVersion>, /* TODO: this should always be `Some` if we support `ALTER
* TABLE` for `CREATE TABLE AS`. */
) -> Result<(PlanRef, Option<ProstSource>, ProstTable)> {
Expand All @@ -395,7 +410,7 @@ fn gen_table_plan_inner(
properties: context.with_options().inner().clone().into_iter().collect(),
info: Some(source_info),
owner: session.user_id(),
watermark_descs: vec![],
watermark_descs: watermark_descs.clone(),
});

let source_catalog = source.as_ref().map(|source| Rc::new((source).into()));
Expand All @@ -408,6 +423,7 @@ fn gen_table_plan_inner(
pk_column_ids,
row_id_index,
false,
true,
context.clone(),
)
.into();
Expand Down Expand Up @@ -438,12 +454,21 @@ fn gen_table_plan_inner(
.into());
}

if !append_only && !watermark_descs.is_empty() {
return Err(ErrorCode::NotSupported(
"Defining watermarks on table requires the table to be append only.".to_owned(),
"Set the option `appendonly=true`".to_owned(),
)
.into());
}

Comment on lines +468 to +475
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@xx01cyx append only part

let materialize = plan_root.gen_table_plan(
name,
columns,
definition,
row_id_index,
append_only,
watermark_descs,
version,
)?;

Expand All @@ -460,6 +485,7 @@ pub async fn handle_create_table(
constraints: Vec<TableConstraint>,
if_not_exists: bool,
source_schema: Option<SourceSchema>,
source_watermarks: Vec<SourceWatermark>,
) -> Result<RwPgResponse> {
let session = handler_args.session.clone();

Expand Down Expand Up @@ -487,6 +513,7 @@ pub async fn handle_create_table(
columns,
constraints,
source_schema,
source_watermarks,
col_id_gen,
)
.await?
Expand Down
2 changes: 2 additions & 0 deletions src/frontend/src/handler/explain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ pub async fn handle_explain(
columns,
constraints,
source_schema,
source_watermarks,
..
} => match check_create_table_with_source(&handler_args.with_options, source_schema)? {
Some(s) => {
Expand All @@ -77,6 +78,7 @@ pub async fn handle_explain(
columns,
constraints,
s,
source_watermarks,
ColumnIdGenerator::new_initial(),
)
.await?
Expand Down
2 changes: 2 additions & 0 deletions src/frontend/src/handler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ pub async fn handle(
temporary,
if_not_exists,
source_schema,
source_watermarks,
} => {
if or_replace {
return Err(ErrorCode::NotImplemented(
Expand Down Expand Up @@ -229,6 +230,7 @@ pub async fn handle(
constraints,
if_not_exists,
source_schema,
source_watermarks,
)
.await
}
Expand Down
10 changes: 9 additions & 1 deletion src/frontend/src/optimizer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,12 @@ use property::Order;
use risingwave_common::catalog::{ColumnCatalog, Field, Schema};
use risingwave_common::error::{ErrorCode, Result};
use risingwave_common::util::iter_util::ZipEqDebug;
use risingwave_pb::catalog::WatermarkDesc;

use self::heuristic_optimizer::{ApplyOrder, HeuristicOptimizer};
use self::plan_node::{
BatchProject, Convention, LogicalProject, StreamDml, StreamMaterialize, StreamProject,
StreamRowIdGen, StreamSink,
StreamRowIdGen, StreamSink, StreamWatermarkFilter,
};
#[cfg(debug_assertions)]
use self::plan_visitor::InputRefValidator;
Expand Down Expand Up @@ -700,6 +701,7 @@ impl PlanRoot {
definition: String,
row_id_index: Option<usize>,
append_only: bool,
watermark_descs: Vec<WatermarkDesc>,
version: Option<TableVersion>,
) -> Result<StreamMaterialize> {
let mut stream_plan = self.gen_stream_plan()?;
Expand All @@ -711,6 +713,12 @@ impl PlanRoot {
columns.iter().map(|c| c.column_desc.clone()).collect(),
)
.into();

// Add WatermarkFilter node.
if !watermark_descs.is_empty() {
stream_plan = StreamWatermarkFilter::new(stream_plan, watermark_descs).into();
}

// Add RowIDGen node if needed.
if let Some(row_id_index) = row_id_index {
stream_plan = StreamRowIdGen::new(stream_plan, row_id_index).into();
Expand Down
2 changes: 2 additions & 0 deletions src/frontend/src/optimizer/plan_node/generic/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ pub struct Source {
pub row_id_index: Option<usize>,
/// Whether the "SourceNode" should generate the row id column for append only source
pub gen_row_id: bool,
/// True if it is a source created when creating table with a source.
pub for_table: bool,
}

impl GenericPlanNode for Source {
Expand Down
6 changes: 5 additions & 1 deletion src/frontend/src/optimizer/plan_node/logical_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ impl LogicalSource {
pk_col_ids: Vec<ColumnId>,
row_id_index: Option<usize>,
gen_row_id: bool,
for_table: bool,
ctx: OptimizerContextRef,
) -> Self {
let core = generic::Source {
Expand All @@ -70,6 +71,7 @@ impl LogicalSource {
pk_col_ids,
row_id_index,
gen_row_id,
for_table,
};

let schema = core.schema();
Expand Down Expand Up @@ -356,9 +358,11 @@ impl ToBatch for LogicalSource {
impl ToStream for LogicalSource {
fn to_stream(&self, _ctx: &mut ToStreamContext) -> Result<PlanRef> {
let mut plan: PlanRef = StreamSource::new(self.clone()).into();
if let Some(catalog) = self.source_catalog() && !catalog.watermark_descs.is_empty(){
if let Some(catalog) = self.source_catalog() && !catalog.watermark_descs.is_empty() && !self.core.for_table{
plan = StreamWatermarkFilter::new(plan, catalog.watermark_descs.clone()).into();
}

assert!(!(self.core.gen_row_id && self.core.for_table));
Comment on lines +361 to +365
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May add some comments here.

if let Some(row_id_index) = self.core.row_id_index && self.core.gen_row_id {
plan = StreamRowIdGen::new(plan, row_id_index).into();
}
Expand Down
1 change: 1 addition & 0 deletions src/frontend/src/planner/relation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ impl Planner {
pk_col_ids,
row_id_index,
gen_row_id,
false,
self.ctx(),
)
.into())
Expand Down
9 changes: 4 additions & 5 deletions src/sqlparser/src/ast/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -951,6 +951,8 @@ pub enum Statement {
with_options: Vec<SqlOption>,
/// Optional schema of the external source with which the table is created
source_schema: Option<SourceSchema>,
/// The watermark defined on source.
source_watermarks: Vec<SourceWatermark>,
/// `AS ( query )`
query: Option<Box<Query>>,
},
Expand Down Expand Up @@ -1300,6 +1302,7 @@ impl fmt::Display for Statement {
if_not_exists,
temporary,
source_schema,
source_watermarks,
query,
} => {
// We want to allow the following options
Expand All @@ -1318,11 +1321,7 @@ impl fmt::Display for Statement {
name = name,
)?;
if !columns.is_empty() || !constraints.is_empty() {
write!(f, " ({}", display_comma_separated(columns))?;
if !columns.is_empty() && !constraints.is_empty() {
write!(f, ", ")?;
}
write!(f, "{})", display_comma_separated(constraints))?;
write!(f, " {}", fmt_create_items(columns, constraints, source_watermarks)?)?;
} else if query.is_none() {
// PostgreSQL allows `CREATE TABLE t ();`, but requires empty parens
write!(f, " ()")?;
Expand Down
47 changes: 22 additions & 25 deletions src/sqlparser/src/ast/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -375,37 +375,34 @@ impl ParseTo for CreateSourceStatement {
}
}

pub(super) fn fmt_create_items(
columns: &[ColumnDef],
constraints: &[TableConstraint],
watermarks: &[SourceWatermark],
) -> std::result::Result<String, fmt::Error> {
let mut items = String::new();
let has_items = !columns.is_empty() || !constraints.is_empty() || !watermarks.is_empty();
has_items.then(|| write!(&mut items, "("));
write!(&mut items, "{}", display_comma_separated(columns))?;
if !columns.is_empty() && (!constraints.is_empty() || !watermarks.is_empty()) {
write!(&mut items, ", ")?;
}
write!(&mut items, "{}", display_comma_separated(constraints))?;
if !columns.is_empty() && !constraints.is_empty() && !watermarks.is_empty() {
write!(&mut items, ", ")?;
}
write!(&mut items, "{}", display_comma_separated(watermarks))?;
has_items.then(|| write!(&mut items, ")"));
Ok(items)
}

impl fmt::Display for CreateSourceStatement {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let mut v: Vec<String> = vec![];
impl_fmt_display!(if_not_exists => [Keyword::IF, Keyword::NOT, Keyword::EXISTS], v, self);
impl_fmt_display!(source_name, v, self);

// Items
let mut items = String::new();
let has_items = !self.columns.is_empty()
|| !self.constraints.is_empty()
|| !self.source_watermarks.is_empty();
has_items.then(|| write!(&mut items, "("));
write!(&mut items, "{}", display_comma_separated(&self.columns))?;
if !self.columns.is_empty()
&& (!self.constraints.is_empty() || !self.source_watermarks.is_empty())
{
write!(&mut items, ", ")?;
}
write!(&mut items, "{}", display_comma_separated(&self.constraints))?;
if !self.columns.is_empty()
&& !self.constraints.is_empty()
&& !self.source_watermarks.is_empty()
{
write!(&mut items, ", ")?;
}
write!(
&mut items,
"{}",
display_comma_separated(&self.source_watermarks)
)?;
has_items.then(|| write!(&mut items, ")"));
let items = fmt_create_items(&self.columns, &self.constraints, &self.source_watermarks)?;
if !items.is_empty() {
v.push(items);
}
Expand Down
Loading