From f72d17b5cd5952280871bac635e396047395170d Mon Sep 17 00:00:00 2001 From: Andy Coates Date: Tue, 10 Dec 2019 11:25:46 +0000 Subject: [PATCH] chore: primitive keys for simple queries First of a few commits to start introducing support for primitive keys in different query types. This commit opens the door for CT/CS statements with primitive keys, (`STRING`, `INT`, `BIGINT`, `BOOLEAN` and `DOUBLE`), and for using those sources in non-join, non-aggregate and non-partition-by queries. --- .../ddl/commands/CreateSourceFactory.java | 7 --- .../ksql/planner/plan/AggregateNode.java | 5 ++ .../confluent/ksql/planner/plan/JoinNode.java | 5 ++ .../ksql/planner/plan/RepartitionNode.java | 6 +++ .../ddl/commands/CreateSourceFactoryTest.java | 16 +++--- .../query-validation-tests/key-schemas.json | 54 +++++++++++++++++-- .../ksql/execution/streams/SourceBuilder.java | 17 +++--- 7 files changed, 81 insertions(+), 29 deletions(-) diff --git a/ksql-engine/src/main/java/io/confluent/ksql/ddl/commands/CreateSourceFactory.java b/ksql-engine/src/main/java/io/confluent/ksql/ddl/commands/CreateSourceFactory.java index 6acc1e4ab266..b272b21c2e33 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/ddl/commands/CreateSourceFactory.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/ddl/commands/CreateSourceFactory.java @@ -36,7 +36,6 @@ import io.confluent.ksql.schema.ksql.FormatOptions; import io.confluent.ksql.schema.ksql.LogicalSchema; import io.confluent.ksql.schema.ksql.PhysicalSchema; -import io.confluent.ksql.schema.ksql.SqlBaseType; import io.confluent.ksql.serde.Format; import io.confluent.ksql.serde.GenericRowSerDe; import io.confluent.ksql.serde.SerdeOption; @@ -179,12 +178,6 @@ private static LogicalSchema buildSchema(final TableElements tableElements) { throw new KsqlException("'" + e.getName().name() + "' is an invalid KEY column name. " + "KSQL currently only supports KEY columns named ROWKEY."); } - - if (e.getType().getSqlType().baseType() != SqlBaseType.STRING) { - throw new KsqlException("'" + e.getName().name() - + "' is a KEY column with an unsupported type. " - + "KSQL currently only supports KEY columns of type " + SqlBaseType.STRING + "."); - } } else if (isRowKey) { throw new KsqlException("'" + e.getName().name() + "' is a reserved column name. " + "It can only be used for KEY columns."); diff --git a/ksql-engine/src/main/java/io/confluent/ksql/planner/plan/AggregateNode.java b/ksql-engine/src/main/java/io/confluent/ksql/planner/plan/AggregateNode.java index 6bcb6e98a85e..8600272e28ca 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/planner/plan/AggregateNode.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/planner/plan/AggregateNode.java @@ -34,6 +34,7 @@ import io.confluent.ksql.parser.tree.WindowExpression; import io.confluent.ksql.schema.ksql.ColumnRef; import io.confluent.ksql.schema.ksql.LogicalSchema; +import io.confluent.ksql.schema.ksql.SqlBaseType; import io.confluent.ksql.serde.ValueFormat; import io.confluent.ksql.services.KafkaTopicClient; import io.confluent.ksql.structured.SchemaKGroupedStream; @@ -108,6 +109,10 @@ public AggregateNode( this.havingExpressions = havingExpressions; this.keyField = KeyField.of(requireNonNull(keyFieldName, "keyFieldName")) .validateKeyExistsIn(schema); + + if (schema.key().get(0).type().baseType() != SqlBaseType.STRING) { + throw new KsqlException("GROUP BY is not supported with non-STRING keys"); + } } @Override diff --git a/ksql-engine/src/main/java/io/confluent/ksql/planner/plan/JoinNode.java b/ksql-engine/src/main/java/io/confluent/ksql/planner/plan/JoinNode.java index 897479977e7b..40863a1c4958 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/planner/plan/JoinNode.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/planner/plan/JoinNode.java @@ -31,6 +31,7 @@ import io.confluent.ksql.schema.ksql.ColumnRef; import io.confluent.ksql.schema.ksql.FormatOptions; import io.confluent.ksql.schema.ksql.LogicalSchema; +import io.confluent.ksql.schema.ksql.SqlBaseType; import io.confluent.ksql.serde.ValueFormat; import io.confluent.ksql.services.KafkaTopicClient; import io.confluent.ksql.structured.SchemaKStream; @@ -93,6 +94,10 @@ public JoinNode( : KeyField.of(leftKeyCol.ref()); this.schema = JoinParamsFactory.createSchema(left.getSchema(), right.getSchema()); + + if (schema.key().get(0).type().baseType() != SqlBaseType.STRING) { + throw new KsqlException("GROUP BY is not supported with non-STRING keys"); + } } @Override diff --git a/ksql-engine/src/main/java/io/confluent/ksql/planner/plan/RepartitionNode.java b/ksql-engine/src/main/java/io/confluent/ksql/planner/plan/RepartitionNode.java index d80bba1dd8e3..a10c0c07234d 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/planner/plan/RepartitionNode.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/planner/plan/RepartitionNode.java @@ -23,8 +23,10 @@ import io.confluent.ksql.execution.plan.SelectExpression; import io.confluent.ksql.metastore.model.KeyField; import io.confluent.ksql.schema.ksql.LogicalSchema; +import io.confluent.ksql.schema.ksql.SqlBaseType; import io.confluent.ksql.services.KafkaTopicClient; import io.confluent.ksql.structured.SchemaKStream; +import io.confluent.ksql.util.KsqlException; import java.util.List; import java.util.Objects; @@ -45,6 +47,10 @@ public RepartitionNode( this.source = Objects.requireNonNull(source, "source"); this.partitionBy = Objects.requireNonNull(partitionBy, "partitionBy"); this.keyField = Objects.requireNonNull(keyField, "keyField"); + + if (source.getSchema().key().get(0).type().baseType() != SqlBaseType.STRING) { + throw new KsqlException("GROUP BY is not supported with non-STRING keys"); + } } @Override diff --git a/ksql-engine/src/test/java/io/confluent/ksql/ddl/commands/CreateSourceFactoryTest.java b/ksql-engine/src/test/java/io/confluent/ksql/ddl/commands/CreateSourceFactoryTest.java index 28b46d5ac19a..abbb3cd9541b 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/ddl/commands/CreateSourceFactoryTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/ddl/commands/CreateSourceFactoryTest.java @@ -20,6 +20,7 @@ import static io.confluent.ksql.model.WindowType.TUMBLING; import static io.confluent.ksql.parser.tree.TableElement.Namespace.KEY; import static io.confluent.ksql.parser.tree.TableElement.Namespace.VALUE; +import static io.confluent.ksql.schema.ksql.ColumnMatchers.keyColumn; import static io.confluent.ksql.serde.Format.AVRO; import static io.confluent.ksql.serde.Format.JSON; import static io.confluent.ksql.serde.Format.KAFKA; @@ -765,7 +766,7 @@ public void shouldNotThrowOnRowKeyKeyColumn() { } @Test - public void shouldThrowOnRowKeyIfNotString() { + public void shouldAllowNonStringKeyColumn() { // Given: final CreateStream statement = new CreateStream( SOME_NAME, @@ -774,13 +775,14 @@ public void shouldThrowOnRowKeyIfNotString() { withProperties ); - // Then: - expectedException.expect(KsqlException.class); - expectedException.expectMessage("'ROWKEY' is a KEY column with an unsupported type. " - + "KSQL currently only supports KEY columns of type STRING."); - // When: - createSourceFactory.createStreamCommand(statement, ksqlConfig); + final CreateStreamCommand cmd = createSourceFactory + .createStreamCommand(statement, ksqlConfig); + + // Then: + assertThat(cmd.getSchema().key(), contains( + keyColumn(ROWKEY_NAME, SqlTypes.INTEGER) + )); } @Test diff --git a/ksql-functional-tests/src/test/resources/query-validation-tests/key-schemas.json b/ksql-functional-tests/src/test/resources/query-validation-tests/key-schemas.json index cc83364dabb7..c80206653832 100644 --- a/ksql-functional-tests/src/test/resources/query-validation-tests/key-schemas.json +++ b/ksql-functional-tests/src/test/resources/query-validation-tests/key-schemas.json @@ -73,13 +73,57 @@ ] }, { - "name": "explicit non-STRING ROWKEY", + "name": "stream explicit non-STRING ROWKEY", "statements": [ - "CREATE STREAM INPUT (ROWKEY INT KEY, ID bigint) WITH (kafka_topic='input',value_format='JSON');" + "CREATE STREAM INPUT (ROWKEY INT KEY, ID bigint) WITH (kafka_topic='input',value_format='JSON');", + "CREATE STREAM OUTPUT as SELECT ID, ROWKEY as KEY FROM INPUT;" ], - "expectedException": { - "type": "io.confluent.ksql.util.KsqlStatementException", - "message": "'ROWKEY' is a KEY column with an unsupported type. KSQL currently only supports KEY columns of type STRING." + "inputs": [ + {"topic": "input", "key": 3, "value": {"id": 1}}, + {"topic": "input", "key": 2, "value": {"id": 2}}, + {"topic": "input", "key": null, "value": {"id": 3}} + ], + "outputs": [ + {"topic": "OUTPUT", "key": 3, "value": {"ID": 1, "KEY": 3}}, + {"topic": "OUTPUT", "key": 2, "value": {"ID": 2, "KEY": 2}}, + {"topic": "OUTPUT", "key": null, "value": {"ID": 3, "KEY": null}} + ], + "post": { + "sources": [ + { + "name": "OUTPUT", + "type": "stream", + "keyFormat": {"format": "KAFKA"}, + "schema": "ROWKEY INT KEY, ID BIGINT, KEY INT" + } + ] + } + }, + { + "name": "table explicit non-STRING ROWKEY", + "statements": [ + "CREATE TABLE INPUT (ROWKEY BIGINT KEY, ID bigint) WITH (kafka_topic='input',value_format='JSON');", + "CREATE TABLE OUTPUT as SELECT ID, ROWKEY as KEY FROM INPUT;" + ], + "inputs": [ + {"topic": "input", "key": 3, "value": {"id": 1}}, + {"topic": "input", "key": 2, "value": {"id": 2}}, + {"topic": "input", "key": 1, "value": {"id": 3}} + ], + "outputs": [ + {"topic": "OUTPUT", "key": 3, "value": {"ID": 1, "KEY": 3}}, + {"topic": "OUTPUT", "key": 2, "value": {"ID": 2, "KEY": 2}}, + {"topic": "OUTPUT", "key": 1, "value": {"ID": 3, "KEY": 1}} + ], + "post": { + "sources": [ + { + "name": "OUTPUT", + "type": "table", + "keyFormat": {"format": "KAFKA"}, + "schema": "ROWKEY BIGINT KEY, ID BIGINT, KEY BIGINT" + } + ] } }, { diff --git a/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/SourceBuilder.java b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/SourceBuilder.java index cb78a5011ed1..f3c7d585b7c3 100644 --- a/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/SourceBuilder.java +++ b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/SourceBuilder.java @@ -267,7 +267,7 @@ private static KStream buildKStream( final AbstractStreamSource streamSource, final KsqlQueryBuilder queryBuilder, final Consumed consumed, - final Function rowKeyGenerator + final Function rowKeyGenerator ) { KStream stream = queryBuilder.getStreamsBuilder() .stream(streamSource.getTopicName(), consumed); @@ -280,7 +280,7 @@ private static KTable buildKTable( final AbstractStreamSource streamSource, final KsqlQueryBuilder queryBuilder, final Consumed consumed, - final Function rowKeyGenerator, + final Function rowKeyGenerator, final Materialized> materialized ) { final KTable table = queryBuilder.getStreamsBuilder() @@ -344,7 +344,7 @@ private static String tableChangeLogOpName(final ExecutionStepPropertiesV1 props return StreamsUtil.buildOpName(stacker.push("Reduce").getQueryContext()); } - private static Function, String> windowedRowKeyGenerator( + private static Function, Object> windowedRowKeyGenerator( final LogicalSchema schema ) { final org.apache.kafka.connect.data.Field keyField = getKeySchemaSingleField(schema); @@ -362,7 +362,7 @@ private static Function, String> windowedRowKeyGenerator( }; } - private static Function nonWindowedRowKeyGenerator( + private static Function nonWindowedRowKeyGenerator( final LogicalSchema schema ) { final org.apache.kafka.connect.data.Field keyField = getKeySchemaSingleField(schema); @@ -371,19 +371,16 @@ private static Function nonWindowedRowKeyGenerator( return null; } - final Object k = key.get(keyField); - return k == null - ? null - : k.toString(); + return key.get(keyField); }; } private static class AddKeyAndTimestampColumns implements ValueTransformerWithKeySupplier { - private final Function rowKeyGenerator; + private final Function rowKeyGenerator; - AddKeyAndTimestampColumns(final Function rowKeyGenerator) { + AddKeyAndTimestampColumns(final Function rowKeyGenerator) { this.rowKeyGenerator = requireNonNull(rowKeyGenerator, "rowKeyGenerator"); }