Skip to content

Commit

Permalink
chore: victorias comments
Browse files Browse the repository at this point in the history
  • Loading branch information
agavra committed Nov 5, 2020
1 parent 20d2c78 commit 3362d09
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public FinalProjectNode(
this.schema = result.left;
this.selectExpressions = ImmutableList.copyOf(result.right);

validate();
throwOnEmptyValueOrUnknownColumns();
}

@Override
Expand Down Expand Up @@ -107,7 +107,7 @@ private Pair<LogicalSchema, List<SelectExpression>> build(final MetaStore metaSt
SelectionUtil.buildProjectionSchema(parentSchema, selectExpressions, metaStore);

if (into.isPresent()) {
// Persistent queries have key columns as key columns - so final projection can exclude them:
// Persistent queries have key columns as value columns - final projection can exclude them:
final Map<ColumnName, Set<ColumnName>> seenKeyColumns = new HashMap<>();
selectExpressions.removeIf(se -> {
if (se.getExpression() instanceof UnqualifiedColumnReferenceExp) {
Expand Down Expand Up @@ -162,7 +162,7 @@ private Pair<LogicalSchema, List<SelectExpression>> build(final MetaStore metaSt
return Pair.of(nodeSchema, selectExpressions);
}

private void validate() {
private void throwOnEmptyValueOrUnknownColumns() {
final LogicalSchema schema = getSchema();

if (schema.value().isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@
}
},
{
"name": "nested struct key",
"name": "struct as column in multi-column key",
"properties": {
"ksql.key.format.enabled": true
},
Expand Down Expand Up @@ -366,6 +366,20 @@
"type": "io.confluent.ksql.util.KsqlStatementException",
"message": "Cannot repartition a TABLE source. If this is a join, joins on tables with multiple columns is not yet supported."
}
},
{
"name": "multi-column partition",
"properties": {
"ksql.key.format.enabled": true
},
"statements": [
"CREATE STREAM S (K STRING KEY, ID VARCHAR) WITH (kafka_topic='S', format='JSON');",
"CREATE STREAM OUTPUT as SELECT * FROM S PARTITION BY K, ID;"
],
"expectedException": {
"type": "io.confluent.ksql.parser.exception.ParseFailedException",
"message": "mismatched input ',' expecting ';'"
}
}
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -560,7 +560,7 @@ public void shouldHandleEmptyKey() {
}

@Test
public void shouldHandleMultiField() {
public void shouldHandleMultiKeyField() {
// Given:
givenMultiColumnSourceStream();
final ValueTransformerWithKey<Struct, GenericRow, GenericRow> transformer =
Expand All @@ -576,7 +576,7 @@ public void shouldHandleMultiField() {
}

@Test
public void shouldHandleMultiFieldWithNullCol() {
public void shouldHandleMultiKeyFieldWithNullCol() {
// Given:
givenMultiColumnSourceStream();
final ValueTransformerWithKey<Struct, GenericRow, GenericRow> transformer =
Expand All @@ -592,7 +592,7 @@ public void shouldHandleMultiFieldWithNullCol() {
}

@Test
public void shouldHandleMultiFieldEmptyStruct() {
public void shouldHandleMultiKeyFieldEmptyStruct() {
// Given:
givenMultiColumnSourceStream();
final ValueTransformerWithKey<Struct, GenericRow, GenericRow> transformer =
Expand All @@ -608,7 +608,7 @@ public void shouldHandleMultiFieldEmptyStruct() {
}

@Test
public void shouldHandleMultiFieldEntirelyNull() {
public void shouldHandleMultiKeyFieldEntirelyNull() {
// Given:
givenMultiColumnSourceStream();
final ValueTransformerWithKey<Struct, GenericRow, GenericRow> transformer =
Expand Down Expand Up @@ -849,20 +849,6 @@ private void givenWindowedSourceTable() {
);
}

private void givenWindowedMultiKeySourceTable() {
when(queryBuilder.buildKeySerde(any(), any(), any(), any())).thenReturn(windowedKeySerde);
givenConsumed(consumedWindowed, windowedKeySerde);
givenConsumed(consumedWindowed, windowedKeySerde);
windowedTableSource = new WindowedTableSource(
new ExecutionStepPropertiesV1(ctx),
TOPIC_NAME,
Formats.of(keyFormatInfo, valueFormatInfo, KEY_FEATURES, VALUE_FEATURES),
windowInfo,
TIMESTAMP_COLUMN,
MULTI_COL_SOURCE_SCHEMA
);
}

private void givenUnwindowedSourceTable(final Boolean forceChangelog) {
when(queryBuilder.buildKeySerde(any(), any(), any())).thenReturn(keySerde);
givenConsumed(consumed, keySerde);
Expand Down

0 comments on commit 3362d09

Please sign in to comment.