Skip to content

Commit

Permalink
chore: fix bad error message on insert into (#6793)
Browse files Browse the repository at this point in the history
  • Loading branch information
agavra authored Dec 17, 2020
1 parent cb0e6f2 commit 554aef7
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,13 @@ private static Stream<SelectExpression> resolveSelectItem(

if (selectItem instanceof SingleColumn) {
final SingleColumn column = (SingleColumn) selectItem;
final Optional<Column> targetColumn = targetSchema.map(schema -> schema.columns().get(idx));
// if the column we are trying to coerce into a target schema is beyond
// the target schema's max columns ignore it. this will generate a failure
// down the line when we check that the result schema is identical to
// the schema of the source we are attempting to fit
final Optional<Column> targetColumn = targetSchema
.filter(schema -> schema.columns().size() > idx)
.map(schema -> schema.columns().get(idx));

return resolveSingleColumn(idx, parentNode, column, targetColumn);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,32 @@
"outputs": [
{"topic": "target", "value": {"C1": 1.00, "C2": 2.00}}
]
},
{
"name": "join mismatch (fewer columns than expected)",
"statements": [
"CREATE STREAM SOURCE1 (K STRING KEY, data VARCHAR) WITH (kafka_topic='stream-source', value_format='DELIMITED');",
"CREATE STREAM SOURCE2 (K STRING KEY, data VARCHAR) WITH (kafka_topic='insert-source', value_format='DELIMITED');",
"CREATE STREAM OUTPUT AS SELECT K, DATA AS DATA_1, DATA AS DATA_2 FROM SOURCE1;",
"INSERT INTO OUTPUT SELECT S1.K AS K, S1.DATA AS DATA_1 FROM SOURCE1 S1 JOIN SOURCE2 S2 WITHIN 1 SECOND ON S1.K = S2.K;"
],
"expectedException": {
"type": "io.confluent.ksql.util.KsqlStatementException",
"message": "Result schema is `K` STRING KEY, `DATA_1` STRING\nSink schema is `K` STRING KEY, `DATA_1` STRING, `DATA_2` STRING"
}
},
{
"name": "join mismatch (more columns than expected)",
"statements": [
"CREATE STREAM SOURCE1 (K STRING KEY, data VARCHAR) WITH (kafka_topic='stream-source', value_format='DELIMITED');",
"CREATE STREAM SOURCE2 (K STRING KEY, data VARCHAR) WITH (kafka_topic='insert-source', value_format='DELIMITED');",
"CREATE STREAM OUTPUT AS SELECT K, DATA AS DATA_1, DATA AS DATA_2 FROM SOURCE1;",
"INSERT INTO OUTPUT SELECT S1.K AS K, S1.DATA AS DATA_1, S2.DATA AS DATA_2, S2.DATA AS DATA_3 FROM SOURCE1 S1 JOIN SOURCE2 S2 WITHIN 1 SECOND ON S1.K = S2.K;"
],
"expectedException": {
"type": "io.confluent.ksql.util.KsqlStatementException",
"message": "Result schema is `K` STRING KEY, `DATA_1` STRING, `DATA_2` STRING, `DATA_3` STRING\nSink schema is `K` STRING KEY, `DATA_1` STRING, `DATA_2` STRING"
}
}
]
}

0 comments on commit 554aef7

Please sign in to comment.