Skip to content

Commit

Permalink
fix: Select star table scans need keys in the intermediate row (#6908)
Browse files Browse the repository at this point in the history
* fix: Select star table scans need keys in the intermediate row
  • Loading branch information
AlanConfluent authored Jan 28, 2021
1 parent 98c7d73 commit fcac459
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,8 @@ private boolean shouldAddAdditionalColumnsInSchema() {
getSource().getSchema().isKeyColumn(cn)
);

return hasSystemColumns || hasKeyColumns;
// Select * also requires keys, in case it's not explicitly mentioned
return hasSystemColumns || hasKeyColumns || isSelectStar;
}

private boolean isSelectStar() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,8 @@ public void shouldBuildPullQueryOutputSchemaSelectStar() {

// Then:
final LogicalSchema expectedSchema = INPUT_SCHEMA;
assertThat(expectedSchema, is(projectNode.getIntermediateSchema()));
assertThat(expectedSchema.withPseudoAndKeyColsInValue(false),
is(projectNode.getIntermediateSchema()));
assertThat(expectedSchema.withoutPseudoAndKeyColsInValue(), is(projectNode.getSchema()));
assertThrows(
IllegalStateException.class,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2009,7 +2009,8 @@
"statements": [
"CREATE STREAM INPUT (ID STRING KEY, IGNORED INT) WITH (kafka_topic='test_topic', value_format='JSON');",
"CREATE TABLE AGGREGATE AS SELECT ID, COUNT(1) AS COUNT FROM INPUT GROUP BY ID;",
"SELECT ID, COUNT, ROWTIME FROM AGGREGATE;"
"SELECT ID, COUNT, ROWTIME FROM AGGREGATE;",
"SELECT * FROM AGGREGATE;"
],
"inputs": [
{"topic": "test_topic", "timestamp": 12365, "key": "10", "value": {}},
Expand All @@ -2022,6 +2023,11 @@
{"header":{"schema":"`ID` STRING KEY, `COUNT` BIGINT, `ROWTIME` BIGINT"}},
{"row":{"columns":["11", 1, 12345]}},
{"row":{"columns":["10", 1, 12365]}}
]},
{"query": [
{"header":{"schema":"`ID` STRING KEY, `COUNT` BIGINT"}},
{"row":{"columns":["11", 1]}},
{"row":{"columns":["10", 1]}}
]}
]
},
Expand All @@ -2030,7 +2036,8 @@
"statements": [
"CREATE STREAM INPUT (ID STRING KEY, IGNORED INT) WITH (kafka_topic='test_topic', value_format='JSON');",
"CREATE TABLE AGGREGATE AS SELECT ID, COUNT(1) AS COUNT FROM INPUT WINDOW TUMBLING(SIZE 1 SECOND) GROUP BY ID;",
"SELECT ID, WINDOWSTART, WINDOWEND, COUNT, ROWTIME FROM AGGREGATE;"
"SELECT ID, WINDOWSTART, WINDOWEND, COUNT, ROWTIME FROM AGGREGATE;",
"SELECT * FROM AGGREGATE;"
],
"inputs": [
{"topic": "test_topic", "timestamp": 12345, "key": "10", "value": {"val": 1}},
Expand All @@ -2047,6 +2054,13 @@
{"row":{"columns":["11", 13000, 14000, 1, 13346]}},
{"row":{"columns":["10", 12000, 13000, 1, 12345]}},
{"row":{"columns":["10", 13000, 14000, 1, 13345]}}
]},
{"query": [
{"header":{"schema":"`ID` STRING KEY, `WINDOWSTART` BIGINT KEY, `WINDOWEND` BIGINT KEY, `COUNT` BIGINT"}},
{"row":{"columns":["11", 12000, 13000, 1]}},
{"row":{"columns":["11", 13000, 14000, 1]}},
{"row":{"columns":["10", 12000, 13000, 1]}},
{"row":{"columns":["10", 13000, 14000, 1]}}
]}
]
},
Expand Down

0 comments on commit fcac459

Please sign in to comment.