Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: primitive keys for simple queries #4096

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import io.confluent.ksql.schema.ksql.FormatOptions;
import io.confluent.ksql.schema.ksql.LogicalSchema;
import io.confluent.ksql.schema.ksql.PhysicalSchema;
import io.confluent.ksql.schema.ksql.SqlBaseType;
import io.confluent.ksql.serde.Format;
import io.confluent.ksql.serde.GenericRowSerDe;
import io.confluent.ksql.serde.SerdeOption;
Expand Down Expand Up @@ -179,12 +178,6 @@ private static LogicalSchema buildSchema(final TableElements tableElements) {
throw new KsqlException("'" + e.getName().name() + "' is an invalid KEY column name. "
+ "KSQL currently only supports KEY columns named ROWKEY.");
}

if (e.getType().getSqlType().baseType() != SqlBaseType.STRING) {
throw new KsqlException("'" + e.getName().name()
+ "' is a KEY column with an unsupported type. "
+ "KSQL currently only supports KEY columns of type " + SqlBaseType.STRING + ".");
}
} else if (isRowKey) {
throw new KsqlException("'" + e.getName().name() + "' is a reserved column name. "
+ "It can only be used for KEY columns.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import io.confluent.ksql.parser.tree.WindowExpression;
import io.confluent.ksql.schema.ksql.ColumnRef;
import io.confluent.ksql.schema.ksql.LogicalSchema;
import io.confluent.ksql.schema.ksql.SqlBaseType;
import io.confluent.ksql.serde.ValueFormat;
import io.confluent.ksql.services.KafkaTopicClient;
import io.confluent.ksql.structured.SchemaKGroupedStream;
Expand Down Expand Up @@ -108,6 +109,10 @@ public AggregateNode(
this.havingExpressions = havingExpressions;
this.keyField = KeyField.of(requireNonNull(keyFieldName, "keyFieldName"))
.validateKeyExistsIn(schema);

if (schema.key().get(0).type().baseType() != SqlBaseType.STRING) {
throw new KsqlException("GROUP BY is not supported with non-STRING keys");
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import io.confluent.ksql.schema.ksql.ColumnRef;
import io.confluent.ksql.schema.ksql.FormatOptions;
import io.confluent.ksql.schema.ksql.LogicalSchema;
import io.confluent.ksql.schema.ksql.SqlBaseType;
import io.confluent.ksql.serde.ValueFormat;
import io.confluent.ksql.services.KafkaTopicClient;
import io.confluent.ksql.structured.SchemaKStream;
Expand Down Expand Up @@ -93,6 +94,10 @@ public JoinNode(
: KeyField.of(leftKeyCol.ref());

this.schema = JoinParamsFactory.createSchema(left.getSchema(), right.getSchema());

if (schema.key().get(0).type().baseType() != SqlBaseType.STRING) {
throw new KsqlException("GROUP BY is not supported with non-STRING keys");
Copy link
Member

@stevenpyzhang stevenpyzhang Dec 10, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be "JOIN is not supported"?

}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,10 @@
import io.confluent.ksql.execution.plan.SelectExpression;
import io.confluent.ksql.metastore.model.KeyField;
import io.confluent.ksql.schema.ksql.LogicalSchema;
import io.confluent.ksql.schema.ksql.SqlBaseType;
import io.confluent.ksql.services.KafkaTopicClient;
import io.confluent.ksql.structured.SchemaKStream;
import io.confluent.ksql.util.KsqlException;
import java.util.List;
import java.util.Objects;

Expand All @@ -45,6 +47,10 @@ public RepartitionNode(
this.source = Objects.requireNonNull(source, "source");
this.partitionBy = Objects.requireNonNull(partitionBy, "partitionBy");
this.keyField = Objects.requireNonNull(keyField, "keyField");

if (source.getSchema().key().get(0).type().baseType() != SqlBaseType.STRING) {
throw new KsqlException("GROUP BY is not supported with non-STRING keys");
Copy link
Member

@stevenpyzhang stevenpyzhang Dec 10, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be "PARTITION BY is not supported"?

}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static io.confluent.ksql.model.WindowType.TUMBLING;
import static io.confluent.ksql.parser.tree.TableElement.Namespace.KEY;
import static io.confluent.ksql.parser.tree.TableElement.Namespace.VALUE;
import static io.confluent.ksql.schema.ksql.ColumnMatchers.keyColumn;
import static io.confluent.ksql.serde.Format.AVRO;
import static io.confluent.ksql.serde.Format.JSON;
import static io.confluent.ksql.serde.Format.KAFKA;
Expand Down Expand Up @@ -765,7 +766,7 @@ public void shouldNotThrowOnRowKeyKeyColumn() {
}

@Test
public void shouldThrowOnRowKeyIfNotString() {
public void shouldAllowNonStringKeyColumn() {
// Given:
final CreateStream statement = new CreateStream(
SOME_NAME,
Expand All @@ -774,13 +775,14 @@ public void shouldThrowOnRowKeyIfNotString() {
withProperties
);

// Then:
expectedException.expect(KsqlException.class);
expectedException.expectMessage("'ROWKEY' is a KEY column with an unsupported type. "
+ "KSQL currently only supports KEY columns of type STRING.");

// When:
createSourceFactory.createStreamCommand(statement, ksqlConfig);
final CreateStreamCommand cmd = createSourceFactory
.createStreamCommand(statement, ksqlConfig);

// Then:
assertThat(cmd.getSchema().key(), contains(
keyColumn(ROWKEY_NAME, SqlTypes.INTEGER)
));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,13 +73,57 @@
]
},
{
"name": "explicit non-STRING ROWKEY",
"name": "stream explicit non-STRING ROWKEY",
"statements": [
"CREATE STREAM INPUT (ROWKEY INT KEY, ID bigint) WITH (kafka_topic='input',value_format='JSON');"
"CREATE STREAM INPUT (ROWKEY INT KEY, ID bigint) WITH (kafka_topic='input',value_format='JSON');",
"CREATE STREAM OUTPUT as SELECT ID, ROWKEY as KEY FROM INPUT;"
],
"expectedException": {
"type": "io.confluent.ksql.util.KsqlStatementException",
"message": "'ROWKEY' is a KEY column with an unsupported type. KSQL currently only supports KEY columns of type STRING."
"inputs": [
{"topic": "input", "key": 3, "value": {"id": 1}},
{"topic": "input", "key": 2, "value": {"id": 2}},
{"topic": "input", "key": null, "value": {"id": 3}}
],
"outputs": [
{"topic": "OUTPUT", "key": 3, "value": {"ID": 1, "KEY": 3}},
{"topic": "OUTPUT", "key": 2, "value": {"ID": 2, "KEY": 2}},
{"topic": "OUTPUT", "key": null, "value": {"ID": 3, "KEY": null}}
],
"post": {
"sources": [
{
"name": "OUTPUT",
"type": "stream",
"keyFormat": {"format": "KAFKA"},
"schema": "ROWKEY INT KEY, ID BIGINT, KEY INT"
}
]
}
},
{
"name": "table explicit non-STRING ROWKEY",
"statements": [
"CREATE TABLE INPUT (ROWKEY BIGINT KEY, ID bigint) WITH (kafka_topic='input',value_format='JSON');",
"CREATE TABLE OUTPUT as SELECT ID, ROWKEY as KEY FROM INPUT;"
],
"inputs": [
{"topic": "input", "key": 3, "value": {"id": 1}},
{"topic": "input", "key": 2, "value": {"id": 2}},
{"topic": "input", "key": 1, "value": {"id": 3}}
],
"outputs": [
{"topic": "OUTPUT", "key": 3, "value": {"ID": 1, "KEY": 3}},
{"topic": "OUTPUT", "key": 2, "value": {"ID": 2, "KEY": 2}},
{"topic": "OUTPUT", "key": 1, "value": {"ID": 3, "KEY": 1}}
],
"post": {
"sources": [
{
"name": "OUTPUT",
"type": "table",
"keyFormat": {"format": "KAFKA"},
"schema": "ROWKEY BIGINT KEY, ID BIGINT, KEY BIGINT"
}
]
}
},
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ private static <K> KStream<K, GenericRow> buildKStream(
final AbstractStreamSource<?> streamSource,
final KsqlQueryBuilder queryBuilder,
final Consumed<K, GenericRow> consumed,
final Function<K, String> rowKeyGenerator
final Function<K, Object> rowKeyGenerator
) {
KStream<K, GenericRow> stream = queryBuilder.getStreamsBuilder()
.stream(streamSource.getTopicName(), consumed);
Expand All @@ -280,7 +280,7 @@ private static <K> KTable<K, GenericRow> buildKTable(
final AbstractStreamSource<?> streamSource,
final KsqlQueryBuilder queryBuilder,
final Consumed<K, GenericRow> consumed,
final Function<K, String> rowKeyGenerator,
final Function<K, Object> rowKeyGenerator,
final Materialized<K, GenericRow, KeyValueStore<Bytes, byte[]>> materialized
) {
final KTable<K, GenericRow> table = queryBuilder.getStreamsBuilder()
Expand Down Expand Up @@ -344,7 +344,7 @@ private static String tableChangeLogOpName(final ExecutionStepPropertiesV1 props
return StreamsUtil.buildOpName(stacker.push("Reduce").getQueryContext());
}

private static Function<Windowed<Struct>, String> windowedRowKeyGenerator(
private static Function<Windowed<Struct>, Object> windowedRowKeyGenerator(
final LogicalSchema schema
) {
final org.apache.kafka.connect.data.Field keyField = getKeySchemaSingleField(schema);
Expand All @@ -362,7 +362,7 @@ private static Function<Windowed<Struct>, String> windowedRowKeyGenerator(
};
}

private static Function<Struct, String> nonWindowedRowKeyGenerator(
private static Function<Struct, Object> nonWindowedRowKeyGenerator(
final LogicalSchema schema
) {
final org.apache.kafka.connect.data.Field keyField = getKeySchemaSingleField(schema);
Expand All @@ -371,19 +371,16 @@ private static Function<Struct, String> nonWindowedRowKeyGenerator(
return null;
}

final Object k = key.get(keyField);
return k == null
? null
: k.toString();
return key.get(keyField);
};
}

private static class AddKeyAndTimestampColumns<K>
implements ValueTransformerWithKeySupplier<K, GenericRow, GenericRow> {

private final Function<K, String> rowKeyGenerator;
private final Function<K, Object> rowKeyGenerator;

AddKeyAndTimestampColumns(final Function<K, String> rowKeyGenerator) {
AddKeyAndTimestampColumns(final Function<K, Object> rowKeyGenerator) {
this.rowKeyGenerator = requireNonNull(rowKeyGenerator, "rowKeyGenerator");
}

Expand Down