From 554aef7194776c573439537e68eb0172242a088f Mon Sep 17 00:00:00 2001 From: Almog Gavra Date: Thu, 17 Dec 2020 08:21:40 -0800 Subject: [PATCH] chore: fix bad error message on insert into (#6793) --- .../ksql/planner/plan/SelectionUtil.java | 8 +++++- .../query-validation-tests/insert-into.json | 26 +++++++++++++++++++ 2 files changed, 33 insertions(+), 1 deletion(-) diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/planner/plan/SelectionUtil.java b/ksqldb-engine/src/main/java/io/confluent/ksql/planner/plan/SelectionUtil.java index 8ba8d657736f..091987c805c8 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/planner/plan/SelectionUtil.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/planner/plan/SelectionUtil.java @@ -96,7 +96,13 @@ private static Stream resolveSelectItem( if (selectItem instanceof SingleColumn) { final SingleColumn column = (SingleColumn) selectItem; - final Optional targetColumn = targetSchema.map(schema -> schema.columns().get(idx)); + // if the column we are trying to coerce into a target schema is beyond + // the target schema's max columns ignore it. this will generate a failure + // down the line when we check that the result schema is identical to + // the schema of the source we are attempting to fit + final Optional targetColumn = targetSchema + .filter(schema -> schema.columns().size() > idx) + .map(schema -> schema.columns().get(idx)); return resolveSingleColumn(idx, parentNode, column, targetColumn); } diff --git a/ksqldb-functional-tests/src/test/resources/query-validation-tests/insert-into.json b/ksqldb-functional-tests/src/test/resources/query-validation-tests/insert-into.json index 3bda49b95d42..1e9e810e4774 100644 --- a/ksqldb-functional-tests/src/test/resources/query-validation-tests/insert-into.json +++ b/ksqldb-functional-tests/src/test/resources/query-validation-tests/insert-into.json @@ -183,6 +183,32 @@ "outputs": [ {"topic": "target", "value": {"C1": 1.00, "C2": 2.00}} ] + }, + { + "name": "join mismatch (fewer columns than expected)", + "statements": [ + "CREATE STREAM SOURCE1 (K STRING KEY, data VARCHAR) WITH (kafka_topic='stream-source', value_format='DELIMITED');", + "CREATE STREAM SOURCE2 (K STRING KEY, data VARCHAR) WITH (kafka_topic='insert-source', value_format='DELIMITED');", + "CREATE STREAM OUTPUT AS SELECT K, DATA AS DATA_1, DATA AS DATA_2 FROM SOURCE1;", + "INSERT INTO OUTPUT SELECT S1.K AS K, S1.DATA AS DATA_1 FROM SOURCE1 S1 JOIN SOURCE2 S2 WITHIN 1 SECOND ON S1.K = S2.K;" + ], + "expectedException": { + "type": "io.confluent.ksql.util.KsqlStatementException", + "message": "Result schema is `K` STRING KEY, `DATA_1` STRING\nSink schema is `K` STRING KEY, `DATA_1` STRING, `DATA_2` STRING" + } + }, + { + "name": "join mismatch (more columns than expected)", + "statements": [ + "CREATE STREAM SOURCE1 (K STRING KEY, data VARCHAR) WITH (kafka_topic='stream-source', value_format='DELIMITED');", + "CREATE STREAM SOURCE2 (K STRING KEY, data VARCHAR) WITH (kafka_topic='insert-source', value_format='DELIMITED');", + "CREATE STREAM OUTPUT AS SELECT K, DATA AS DATA_1, DATA AS DATA_2 FROM SOURCE1;", + "INSERT INTO OUTPUT SELECT S1.K AS K, S1.DATA AS DATA_1, S2.DATA AS DATA_2, S2.DATA AS DATA_3 FROM SOURCE1 S1 JOIN SOURCE2 S2 WITHIN 1 SECOND ON S1.K = S2.K;" + ], + "expectedException": { + "type": "io.confluent.ksql.util.KsqlStatementException", + "message": "Result schema is `K` STRING KEY, `DATA_1` STRING, `DATA_2` STRING, `DATA_3` STRING\nSink schema is `K` STRING KEY, `DATA_1` STRING, `DATA_2` STRING" + } } ] } \ No newline at end of file