Skip to content

Commit

Permalink
chore: enforce WITH KEY column type matches ROWKEY type (#4147)
Browse files Browse the repository at this point in the history
* chore: enforce WITH KEY column type matches ROWKEY type

BREAKING CHANGE:  Any `KEY` column identified in the `WITH` clause must be of the same Sql type as `ROWKEY`.

Users can provide the name of a value column that matches the key column, e.g.

```sql
CREATE STREAM S (ID INT, NAME STRING) WITH (KEY='ID', ...);
```

Before primitive keys was introduced all keys were treated as `STRING`. With primitive keys `ROWKEY` can be types other than `STRING`, e.g. `BIGINT`.
It therefore follows that any `KEY` column identified in the `WITH` clause must have the same SQL type as the _actual_ key,  i.e. `ROWKEY`.

With this change the above example statement will fail with the error:

```
The KEY field (ID) identified in the WITH clause is of a different type to the actual key column.
Either change the type of the KEY field to match ROWKEY, or explicitly set ROWKEY to the type of the KEY field by adding 'ROWKEY INTEGER KEY' in the schema.
KEY field type: INTEGER
ROWKEY type: STRING
```

As the error message says, the error can be resolved by changing the statement to:

```sql
CREATE STREAM S (ROWKEY INT KEY, ID INT, NAME STRING) WITH (KEY='ID', ...);
```
  • Loading branch information
big-andy-coates authored Dec 19, 2019
1 parent 56ac607 commit 6c6695c
Show file tree
Hide file tree
Showing 91 changed files with 1,271 additions and 1,396 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import io.confluent.ksql.cli.console.table.Table.Builder;
import io.confluent.ksql.rest.entity.PropertiesList;
import io.confluent.ksql.rest.entity.PropertiesList.Property;

import java.util.Comparator;
import java.util.List;
import java.util.Objects;
Expand Down
13 changes: 6 additions & 7 deletions ksql-cli/src/test/java/io/confluent/ksql/cli/CliTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@
*/
@SuppressWarnings("SameParameterValue")
@RunWith(MockitoJUnitRunner.class)
@Category({IntegrationTest.class})
@Category(IntegrationTest.class)
public class CliTest {

private static final EmbeddedSingleNodeKafkaCluster CLUSTER = EmbeddedSingleNodeKafkaCluster.build();
Expand Down Expand Up @@ -171,7 +171,7 @@ public class CliTest {
private String tableName;

@BeforeClass
public static void classSetUp() throws Exception {
public static void classSetUp() {
restClient = KsqlRestClient.create(
REST_APP.getHttpListener().toString(),
ImmutableMap.of(),
Expand Down Expand Up @@ -236,15 +236,14 @@ private static void run(final String command, final Cli localCli) {
}
}

private static void produceInputStream(final TestDataProvider dataProvider) throws Exception {
private static void produceInputStream(final TestDataProvider dataProvider) {
topicProducer.produceInputData(dataProvider);
}

private static void createKStream(final TestDataProvider dataProvider, final Cli cli) {
run(String.format(
"CREATE STREAM %s %s WITH (value_format = 'json', kafka_topic = '%s' , key='%s');",
dataProvider.kstreamName(), dataProvider.ksqlSchemaString(), dataProvider.topicName(),
dataProvider.key()),
"CREATE STREAM %s %s WITH (value_format = 'json', kafka_topic = '%s');",
dataProvider.kstreamName(), dataProvider.ksqlSchemaString(), dataProvider.topicName()),
cli);
}

Expand Down Expand Up @@ -1206,7 +1205,7 @@ private static Matcher<Iterable<? extends Iterable<? extends String>>> isRow(
}

@SafeVarargs
@SuppressWarnings({"varargs", "unchecked"})
@SuppressWarnings({"varargs", "unchecked", "rawtypes"})
private static Matcher<Iterable<? extends Iterable<? extends String>>> hasRow(
final Matcher<String>... expected
) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,13 +103,12 @@ public class SslFunctionalTest {
private SslContextFactory sslContextFactory;

@BeforeClass
public static void classSetUp() throws Exception {
public static void classSetUp() {
final OrderDataProvider dataProvider = new OrderDataProvider();
CLUSTER.createTopic(TOPIC_NAME);
new TopicProducer(CLUSTER).produceInputData(dataProvider);
}

@SuppressWarnings("deprecation")
@Before
public void setUp() {
clientProps = Collections.emptyMap();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -350,9 +350,9 @@ private static void handleExplicitKeyField(
if (keyValue == null) {
values.put(key.name(), rowKeyValue);
} else {
values.put(SchemaUtil.ROWKEY_NAME, keyValue.toString());
values.put(SchemaUtil.ROWKEY_NAME, keyValue);
}
} else if (keyValue != null && !Objects.equals(keyValue.toString(), rowKeyValue)) {
} else if (keyValue != null && !Objects.equals(keyValue, rowKeyValue)) {
throw new KsqlException(String.format(
"Expected ROWKEY and %s to match but got %s and %s respectively.",
key.toString(FormatOptions.noEscape()), rowKeyValue, keyValue));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import io.confluent.ksql.name.ColumnName;
import io.confluent.ksql.name.SourceName;
import io.confluent.ksql.schema.ksql.LogicalSchema;
import io.confluent.ksql.schema.ksql.types.SqlPrimitiveType;
import io.confluent.ksql.schema.ksql.types.SqlTypes;
import io.confluent.ksql.serde.Format;
import io.confluent.ksql.serde.FormatInfo;
Expand All @@ -30,6 +29,7 @@
import io.confluent.ksql.serde.ValueFormat;
import io.confluent.ksql.serde.WindowInfo;
import io.confluent.ksql.util.MetaStoreFixture;
import io.confluent.ksql.util.SchemaUtil;
import java.util.Optional;
import java.util.Set;
import org.hamcrest.MatcherAssert;
Expand All @@ -47,8 +47,9 @@ public class DdlCommandExecTest {
private static final SourceName TABLE_NAME = SourceName.of("t1");
private static final String TOPIC_NAME = "topic";
private static final LogicalSchema SCHEMA = new LogicalSchema.Builder()
.valueColumn(ColumnName.of("F1"), SqlPrimitiveType.of("INTEGER"))
.valueColumn(ColumnName.of("F2"), SqlPrimitiveType.of("VARCHAR"))
.keyColumn(SchemaUtil.ROWKEY_NAME, SqlTypes.BIGINT)
.valueColumn(ColumnName.of("F1"), SqlTypes.BIGINT)
.valueColumn(ColumnName.of("F2"), SqlTypes.STRING)
.build();
private static final ValueFormat VALUE_FORMAT = ValueFormat.of(FormatInfo.of(Format.JSON));
private static final KeyFormat KEY_FORMAT = KeyFormat.nonWindowed(FormatInfo.of(Format.KAFKA));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -279,15 +279,15 @@ public void shouldThrowOnInsertIntoWithKeyMismatch() {
expectedException.expect(rawMessage(containsString(
"Incompatible key fields for sink and results. "
+ "Sink key field is ORDERTIME (type: BIGINT) "
+ "while result key field is ITEMID (type: STRING)")));
+ "while result key field is ORDERID (type: BIGINT)")));
expectedException.expect(statementText(
is("insert into bar select * from orders partition by itemid;")));
is("insert into bar select * from orders partition by orderid;")));

// When:
KsqlEngineTestUtil.execute(
serviceContext,
ksqlEngine,
"insert into bar select * from orders partition by itemid;",
"insert into bar select * from orders partition by orderid;",
KSQL_CONFIG,
Collections.emptyMap()
);
Expand Down Expand Up @@ -767,7 +767,7 @@ public void shouldHandleMultipleStatements() {
+ "CREATE STREAM S0 (a INT, b VARCHAR) "
+ " WITH (kafka_topic='s0_topic', value_format='DELIMITED');\n"
+ "\n"
+ "CREATE TABLE T1 (f0 BIGINT, f1 DOUBLE) "
+ "CREATE TABLE T1 (ROWKEY BIGINT KEY, f0 BIGINT, f1 DOUBLE) "
+ " WITH (kafka_topic='t1_topic', value_format='JSON', key = 'f0');\n"
+ "\n"
+ "CREATE STREAM S1 AS SELECT * FROM S0;\n"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,8 @@ private void produceInitData() throws Exception {
}

private void execInitCreateStreamQueries() {
final String ordersStreamStr = String.format("CREATE STREAM %s (ORDERTIME bigint, ORDERID varchar, "
final String ordersStreamStr = String.format("CREATE STREAM %s ("
+ "ROWKEY BIGINT KEY, ORDERTIME bigint, ORDERID varchar, "
+ "ITEMID varchar, ORDERUNITS double, PRICEARRAY array<double>, KEYVALUEMAP "
+ "map<varchar, double>) WITH (value_format = 'json', "
+ "kafka_topic='%s' , "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ private static Map<String, Object> getKsqlConfig(final Credentials user) {
return configs;
}

private void produceInitData() throws Exception {
private void produceInitData() {
if (topicClient.isTopicExists(INPUT_TOPIC)) {
return;
}
Expand All @@ -340,11 +340,11 @@ private void awaitAsyncInputTopicCreation() {
}

private void execInitCreateStreamQueries() {
final String ordersStreamStr = String.format("CREATE STREAM %s (ORDERTIME bigint, ORDERID varchar, "
+ "ITEMID varchar, ORDERUNITS double, PRICEARRAY array<double>, KEYVALUEMAP "
+ "map<varchar, double>) WITH (value_format = 'json', "
+ "kafka_topic='%s' , "
+ "key='ordertime');", INPUT_STREAM, INPUT_TOPIC);
final String ordersStreamStr =
"CREATE STREAM " + INPUT_STREAM + " (ORDERTIME bigint, ORDERID varchar, "
+ "ITEMID varchar, ORDERUNITS double, PRICEARRAY array<double>, KEYVALUEMAP "
+ "map<varchar, double>) WITH (value_format = 'json', "
+ "kafka_topic='" + INPUT_TOPIC + "');";

KsqlEngineTestUtil.execute(
serviceContext,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -432,13 +432,13 @@ private void createOrdersStream() {
+ " KEYVALUEMAP map<varchar, double>";

ksqlContext.sql("CREATE STREAM " + JSON_STREAM_NAME + " (" + columns + ") WITH "
+ "(kafka_topic='" + jsonTopicName + "', value_format='JSON', key='ordertime');");
+ "(kafka_topic='" + jsonTopicName + "', value_format='JSON');");

ksqlContext.sql("CREATE STREAM " + AVRO_STREAM_NAME + " (" + columns + ") WITH "
+ "(kafka_topic='" + avroTopicName + "', value_format='AVRO', key='ordertime');");
+ "(kafka_topic='" + avroTopicName + "', value_format='AVRO');");

ksqlContext.sql("CREATE STREAM " + AVRO_TIMESTAMP_STREAM_NAME + " (" + columns + ") WITH "
+ "(kafka_topic='" + avroTopicName + "', value_format='AVRO', key='ordertime', "
+ "(kafka_topic='" + avroTopicName + "', value_format='AVRO', "
+ "timestamp='timestamp', timestamp_format='yyyy-MM-dd');");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -269,9 +269,9 @@ private Set<String> getTopicNames() {
return names;
}

private static Deserializer getKeyDeserializerFor(final Object key) {
private static Deserializer<?> getKeyDeserializerFor(final Object key) {
if (key instanceof Windowed) {
if (((Windowed) key).window() instanceof SessionWindow) {
if (((Windowed<?>) key).window() instanceof SessionWindow) {
return SESSION_WINDOWED_DESERIALIZER;
}
return TIME_WINDOWED_DESERIALIZER;
Expand All @@ -288,6 +288,6 @@ private void createOrdersStream() {
+ "ORDERUNITS double, "
+ "PRICEARRAY array<double>, "
+ "KEYVALUEMAP map<varchar, double>) "
+ "WITH (kafka_topic='" + sourceTopicName + "', value_format='JSON', key='ordertime');");
+ "WITH (kafka_topic='" + sourceTopicName + "', value_format='JSON');");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@
import java.io.IOException;
import java.util.Collections;
import java.util.Optional;

import org.apache.avro.Schema;
import org.junit.Before;
import org.junit.Rule;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,13 @@
import io.confluent.ksql.execution.timestamp.TimestampColumn;
import io.confluent.ksql.name.ColumnName;
import io.confluent.ksql.name.SourceName;
import io.confluent.ksql.schema.ksql.Column;
import io.confluent.ksql.schema.ksql.ColumnRef;
import io.confluent.ksql.schema.ksql.FormatOptions;
import io.confluent.ksql.schema.ksql.LogicalSchema;
import io.confluent.ksql.schema.ksql.types.SqlType;
import io.confluent.ksql.serde.WindowInfo;
import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.util.SchemaUtil;
import java.util.Objects;
import java.util.Optional;
Expand Down Expand Up @@ -56,10 +60,7 @@ public abstract class CreateSourceCommand implements DdlCommand {
this.formats = Objects.requireNonNull(formats, "formats");
this.windowInfo = Objects.requireNonNull(windowInfo, "windowInfo");

if (schema.findValueColumn(ColumnRef.withoutSource(SchemaUtil.ROWKEY_NAME)).isPresent()
|| schema.findValueColumn(ColumnRef.withoutSource(SchemaUtil.ROWTIME_NAME)).isPresent()) {
throw new IllegalArgumentException("Schema contains implicit columns in value schema");
}
validate(schema, keyField);
}

public SourceName getSourceName() {
Expand Down Expand Up @@ -89,4 +90,38 @@ public Formats getFormats() {
public Optional<WindowInfo> getWindowInfo() {
return windowInfo;
}

private static void validate(final LogicalSchema schema, final Optional<ColumnName> keyField) {
if (schema.findValueColumn(ColumnRef.withoutSource(SchemaUtil.ROWKEY_NAME)).isPresent()
|| schema.findValueColumn(ColumnRef.withoutSource(SchemaUtil.ROWTIME_NAME)).isPresent()) {
throw new IllegalArgumentException("Schema contains implicit columns in value schema");
}

if (schema.key().size() != 1) {
throw new UnsupportedOperationException("Only single key columns supported");
}

if (keyField.isPresent()) {
final SqlType keyFieldType = schema.findColumn(ColumnRef.withoutSource(keyField.get()))
.map(Column::type)
.orElseThrow(IllegalArgumentException::new);

final SqlType keyType = schema.key().get(0).type();

if (!keyFieldType.equals(keyType)) {
throw new KsqlException("The KEY field ("
+ keyField.get().toString(FormatOptions.noEscape())
+ ") identified in the WITH clause is of a different type to the actual key column."
+ System.lineSeparator()
+ "Either change the type of the KEY field to match ROWKEY, "
+ "or explicitly set ROWKEY to the type of the KEY field by adding "
+ "'ROWKEY " + keyFieldType + " KEY' in the schema."
+ System.lineSeparator()
+ "KEY field type: " + keyFieldType
+ System.lineSeparator()
+ "ROWKEY type: " + keyType
);
}
}
}
}
Loading

0 comments on commit 6c6695c

Please sign in to comment.