Skip to content

Commit

Permalink
Drop requirement that CREATE TABLE statements have a KEY set in `…
Browse files Browse the repository at this point in the history
…WITH` clause.

Fixes confluentinc#2745
  • Loading branch information
big-andy-coates committed Apr 29, 2019
1 parent 68afee4 commit e27b22c
Show file tree
Hide file tree
Showing 11 changed files with 164 additions and 139 deletions.
6 changes: 5 additions & 1 deletion docs/changelog.rst
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,18 @@ Version 5.3.0

KSQL 5.3.0 includes new features, including:

* Drop the requirement that ``CREATE TABLE`` statements must have a ``KEY`` set in their ``WITH`` clause.
This is now an optional optimisation to avoid unnecessary repartition steps.
See `Github issue #2745 <https://github.com/confluentinc/ksql/pull/2745>`_ for more info.

* Improved handling of ``KEY`` fields. The ``KEY`` field is an optional copy of the Kafka record's key held
within the record's value. Users can supply the name of the field that holds the copy of the key within
the ``WITH`` clause.
The improved handling may eliminate unnecessary repartition steps in certain queries.
Please note that preexisting persistent queries, e.g. those created via ``CREATE TABLE AS SELECT ...`` or
``CREATE STREAM AS SELECT ...`` or ``INSERT INTO ...``, will continue to have the unnecessary repartition step.
This is required to avoid the potential for data loss should this step be dropped.
See `#2280 <https://github.com/confluentinc/ksql/pull/2636>`_ for more info.
See `Github issue #2636 <https://github.com/confluentinc/ksql/pull/2636>`_ for more info.


Version 5.2.0
Expand Down
27 changes: 11 additions & 16 deletions docs/developer-guide/syntax-reference.rst
Original file line number Diff line number Diff line change
Expand Up @@ -379,10 +379,7 @@ timestamp and message key, respectively. The timestamp has milliseconds accuracy

KSQL has currently the following requirements for creating a table from a Kafka topic:

1. The Kafka message key must also be present as a field/column in the Kafka message value. The ``KEY`` property (see
below) must be defined to inform KSQL which field/column in the message value represents the key. If the message key
is not present in the message value, follow the instructions in :ref:`ksql_key_requirements`.
2. The message key must be in ``VARCHAR`` aka ``STRING`` format. If the message key is not in this format, follow the
1. The message key must be in ``VARCHAR`` aka ``STRING`` format. If the message key is not in this format, follow the
instructions in :ref:`ksql_key_requirements`.

The WITH clause supports the following properties:
Expand All @@ -395,13 +392,13 @@ The WITH clause supports the following properties:
| VALUE_FORMAT (required) | Specifies the serialization format of message values in the topic. Supported formats: |
| | ``JSON``, ``DELIMITED`` (comma-separated value), and ``AVRO``. |
+-------------------------+--------------------------------------------------------------------------------------------+
| KEY (required) | Associates a field/column within the Kafka message value with the implicit ``ROWKEY`` |
| | column (message key) in the KSQL table. |
| | |
| | KSQL currently requires that the Kafka message key, which will be available as the |
| | implicit ``ROWKEY`` column in the table, must also be present as a field/column in the |
| | message value. You must set the KEY property to this corresponding field/column in the |
| | message value, and this column must be in ``VARCHAR`` aka ``STRING`` format. |
| KEY | Optimization hint: If the Kafka message key is also present as a field/column in the Kafka |
| | message value, you may set this property to associate the corresponding field/column with |
| | the implicit ``ROWKEY`` column (message key). |
| | If set, KSQL uses it as an optimization hint to determine if repartitioning can be avoided |
| | when performing aggregations and joins. |
| | You can only use this if the key format in kafka is ``VARCHAR`` or ``STRING``. Do not use |
| | this hint if the message key format in kafka is AVRO or JSON. |
| | See :ref:`ksql_key_requirements` for more information. |
+-------------------------+--------------------------------------------------------------------------------------------+
| TIMESTAMP | By default, the implicit ``ROWTIME`` column is the timestamp of the message in the Kafka |
Expand Down Expand Up @@ -1649,7 +1646,8 @@ Key Requirements
Message Keys
------------

The ``CREATE STREAM`` and ``CREATE TABLE`` statements, which read data from a Kafka topic into a stream or table, allow you to specify a field/column in the Kafka message value that corresponds to the Kafka message key by setting the ``KEY`` property of the ``WITH`` clause.
The ``CREATE STREAM`` and ``CREATE TABLE`` statements, which read data from a Kafka topic into a stream or table,
allow you to specify a field/column in the Kafka message value that corresponds to the Kafka message key by setting the ``KEY`` property of the ``WITH`` clause.

Example:

Expand All @@ -1659,10 +1657,7 @@ Example:
WITH (KAFKA_TOPIC='users', VALUE_FORMAT='JSON', KEY = 'userid');
The ``KEY`` property is:

- Required for tables.
- Optional for streams. Here, KSQL uses it as an optimization hint to determine if repartitioning can be avoided when performing aggregations and joins.
The ``KEY`` property is optional. KSQL uses it as an optimization hint to determine if repartitioning can be avoided when performing aggregations and joins.

.. important::
Don't set the KEY property, unless you have validated that your stream doesn't need to be re-partitioned for future joins.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,9 @@ public static TimestampExtractionPolicy create(
final String fieldName = StringUtil.cleanQuotes(timestampColumnName.toUpperCase());
final Field timestampField = SchemaUtil.getFieldByName(schema,
fieldName)
.orElseThrow(() -> new KsqlException(String.format(
"No column with the provided timestamp column name in the "
+ "WITH clause, %s, exists in the defined schema.",
fieldName
)));
.orElseThrow(() -> new KsqlException(
"The TIMESTAMP column '" + fieldName
+ "', set in the WITH clause, does not exist in the schema"));

final Schema.Type timestampFieldType = timestampField.schema().type();
if (timestampFieldType == Schema.Type.STRING) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,8 @@ abstract class AbstractCreateStreamCommand implements DdlCommand {
final String keyFieldName = StringUtil.cleanQuotes(name);
final Field keyField = SchemaUtil.getFieldByName(schema, keyFieldName)
.orElseThrow(() -> new KsqlException(
"No column with the provided key column name in the WITH "
+ "clause, " + keyFieldName + ", exists in the defined schema."
"The KEY column '" + keyFieldName
+ "', set in the WITH clause, does not exist in the schema"
));

this.keyField = KeyField.of(keyFieldName, keyField);
Expand All @@ -120,7 +120,6 @@ abstract class AbstractCreateStreamCommand implements DdlCommand {
}

private static void checkTopicNameNotNull(final Map<String, Expression> properties) {
// TODO: move the check to grammar
if (properties.get(DdlConfig.TOPIC_NAME_PROPERTY) == null) {
throw new KsqlException("Topic name should be set in WITH clause.");
}
Expand Down Expand Up @@ -155,7 +154,6 @@ static void checkMetaData(
final String sourceName,
final String topicName
) {
// TODO: move the check to the runtime since it accesses metaStore
if (metaStore.getSource(sourceName) != null) {
throw new KsqlException(String.format("Source already exists: %s", sourceName));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,12 @@

package io.confluent.ksql.ddl.commands;

import io.confluent.ksql.ddl.DdlConfig;
import io.confluent.ksql.metastore.MutableMetaStore;
import io.confluent.ksql.metastore.model.KsqlTable;
import io.confluent.ksql.parser.tree.CreateTable;
import io.confluent.ksql.parser.tree.Expression;
import io.confluent.ksql.services.KafkaTopicClient;
import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.util.SchemaUtil;
import java.util.Map;

public class CreateTableCommand extends AbstractCreateStreamCommand {

Expand All @@ -33,14 +30,6 @@ public class CreateTableCommand extends AbstractCreateStreamCommand {
final KafkaTopicClient kafkaTopicClient
) {
super(sqlExpression, createTable, kafkaTopicClient);

final Map<String, Expression> properties = createTable.getProperties();

if (!properties.containsKey(DdlConfig.KEY_NAME_PROPERTY)) {
throw new KsqlException(
"Cannot define a TABLE without providing the KEY column name in the WITH clause."
);
}
}

@Override
Expand All @@ -55,6 +44,7 @@ public DdlCommandResult run(final MutableMetaStore metaStore) {
}
}
checkMetaData(metaStore, sourceName, topicName);

final KsqlTable ksqlTable = new KsqlTable<>(
sqlExpression,
sourceName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,38 @@ public void shouldNotThrowIfTopicDoesExist() {
verify(kafkaTopicClient).isTopicExists(TOPIC_NAME);
}

@Test
public void shouldThrowIfKeyFieldNotInSchema() {
// Given:
when(statement.getProperties()).thenReturn(minValidProps());
givenPropertiesWith(ImmutableMap.of(
DdlConfig.KEY_NAME_PROPERTY, new StringLiteral("will-not-find-me")));

// Then:
expectedException.expect(KsqlException.class);
expectedException.expectMessage(
"The KEY column 'WILL-NOT-FIND-ME', set in the WITH clause, does not exist in the schema");

// When:
new TestCmd("key not in schema!", statement, kafkaTopicClient);
}

@Test
public void shouldThrowIfTimestampColumnDoesNotExist() {
// Given:
when(statement.getProperties()).thenReturn(minValidProps());
givenPropertiesWith(ImmutableMap.of(
DdlConfig.TIMESTAMP_NAME_PROPERTY, new StringLiteral("will-not-find-me")));

// Then:
expectedException.expect(KsqlException.class);
expectedException.expectMessage(
"The TIMESTAMP column 'WILL-NOT-FIND-ME', set in the WITH clause, does not exist in the schema");

// When:
new TestCmd("key not in schema!", statement, kafkaTopicClient);
}

private static Map<String, Expression> minValidProps() {
return ImmutableMap.of(
DdlConfig.VALUE_FORMAT_PROPERTY, new StringLiteral("json"),
Expand All @@ -160,6 +192,12 @@ private static Map<String, Expression> propsWithout(final String name) {
return ImmutableMap.copyOf(props);
}

private void givenPropertiesWith(final Map<String, Expression> additionalProps) {
final Map<String, Expression> allProps = new HashMap<>(minValidProps());
allProps.putAll(additionalProps);
when(statement.getProperties()).thenReturn(allProps);
}

private static final class TestCmd extends AbstractCreateStreamCommand {

private TestCmd(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

import static org.easymock.EasyMock.anyString;
import static org.easymock.EasyMock.expect;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.MatcherAssert.assertThat;

Expand Down Expand Up @@ -106,64 +105,6 @@ public void shouldCreateCommandForCreateTable() {
assertThat(result, instanceOf(CreateTableCommand.class));
}

@Test
public void shouldFailCreateTableIfKeyNameIsIncorrect() {
final HashMap<String, Expression> tableProperties = validTableProps();
tableProperties.put(DdlConfig.KEY_NAME_PROPERTY, new StringLiteral("COL3"));

try {
commandFactories
.create(sqlExpression, createTable(tableProperties), NO_PROPS);

} catch (final KsqlException e) {
assertThat(e.getMessage(), equalTo("No column with the provided key column name in the "
+ "WITH clause, COL3, exists in the defined schema."));
}

}

@Test
public void shouldFailCreateTableIfTimestampColumnNameIsIncorrect() {
final HashMap<String, Expression> tableProperties = validTableProps();
tableProperties.put(DdlConfig.TIMESTAMP_NAME_PROPERTY, new StringLiteral("COL3"));

try {
commandFactories
.create(sqlExpression, createTable(tableProperties), NO_PROPS);

} catch (final KsqlException e) {
assertThat(e.getMessage(), equalTo("No column with the provided timestamp column name in the WITH clause, COL3, exists in the defined schema."));
}
}

@Test
public void shouldFailCreateTableIfKeyIsNotProvided() {
final HashMap<String, Expression> tableProperties = validTableProps();
tableProperties.remove(DdlConfig.KEY_NAME_PROPERTY);

try {
commandFactories.create(sqlExpression, createTable(properties), NO_PROPS);

} catch (final KsqlException e) {
assertThat(e.getMessage(), equalTo("Cannot define a TABLE without providing the KEY column name in the WITH clause."));
}
}

@Test
public void shouldFailCreateTableIfTopicNotExist() {
final HashMap<String, Expression> tableProperties = validTableProps();

givenTopicsDoNotExist();

try {
commandFactories.create(sqlExpression, createTable(tableProperties),
NO_PROPS);

} catch (final KsqlException e) {
assertThat(e.getMessage(), equalTo("Kafka topic does not exist: topic"));
}
}

@Test
public void shouldCreateCommandForDropStream() {
final DdlCommand result = commandFactories.create(sqlExpression,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@

package io.confluent.ksql.ddl.commands;

import static io.confluent.ksql.metastore.model.MetaStoreMatchers.KeyFieldMatchers.hasLegacyName;
import static io.confluent.ksql.metastore.model.MetaStoreMatchers.KeyFieldMatchers.hasName;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
Expand All @@ -41,6 +43,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.kstream.WindowedSerdes;
import org.junit.Before;
Expand All @@ -54,8 +57,11 @@
@RunWith(MockitoJUnitRunner.class)
public class CreateStreamCommandTest {

private static final String STREAM_NAME = "s1";
private static final List<TableElement> SOME_ELEMENTS = ImmutableList.of(
new TableElement("bob", PrimitiveType.of(SqlType.STRING)));
new TableElement("ID", PrimitiveType.of(SqlType.BIGINT)),
new TableElement("bob", PrimitiveType.of(SqlType.STRING))
);

@Mock
private KafkaTopicClient topicClient;
Expand All @@ -71,7 +77,7 @@ public class CreateStreamCommandTest {
@Before
public void setUp() {
givenPropertiesWith((Collections.emptyMap()));
when(createStreamStatement.getName()).thenReturn(QualifiedName.of("name"));
when(createStreamStatement.getName()).thenReturn(QualifiedName.of(STREAM_NAME));
when(createStreamStatement.getElements()).thenReturn(SOME_ELEMENTS);
when(topicClient.isTopicExists(any())).thenReturn(true);
}
Expand Down Expand Up @@ -172,19 +178,47 @@ public void shouldThrowIfTopicDoesNotExist() {
}

@Test
public void testCreateAlreadyRegisteredStreamThrowsException() {
public void shouldThrowIfAlreadyRegistered() {
// Given:
final CreateStreamCommand cmd = createCmd();
cmd.run(metaStore);

// Then:
expectedException.expectMessage("Cannot create stream 'name': A stream " +
"with name 'name' already exists");
expectedException.expectMessage("Cannot create stream 's1': A stream " +
"with name 's1' already exists");

// When:
cmd.run(metaStore);
}

@Test
public void shouldAddSourceWithKeyField() {
// Given:
givenPropertiesWith(ImmutableMap.of(
"KEY", new StringLiteral("id")));
final CreateStreamCommand cmd = createCmd();

// When:
cmd.run(metaStore);

// Then:
assertThat(metaStore.getSource(STREAM_NAME).getKeyField(), hasName("ID"));
assertThat(metaStore.getSource(STREAM_NAME).getKeyField(), hasLegacyName("ID"));
}

@Test
public void shouldAddSourceWithNoKeyField() {
// Given:
final CreateStreamCommand cmd = createCmd();

// When:
cmd.run(metaStore);

// Then:
assertThat(metaStore.getSource(STREAM_NAME).getKeyField(), hasName(Optional.empty()));
assertThat(metaStore.getSource(STREAM_NAME).getKeyField(), hasLegacyName(Optional.empty()));
}

private CreateStreamCommand createCmd() {
return new CreateStreamCommand("some sql", createStreamStatement, topicClient);
}
Expand Down
Loading

0 comments on commit e27b22c

Please sign in to comment.