Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Explicit keys #5533

Merged
merged 7 commits into from
Jun 3, 2020
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 4 additions & 13 deletions docs/developer-guide/create-a-stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -90,31 +90,22 @@ Your output should resemble:
Name : PAGEVIEWS
Field | Type
--------------------------------------
ROWKEY | VARCHAR(STRING) (key)
VIEWTIME | BIGINT
USERID | VARCHAR(STRING)
PAGEID | VARCHAR(STRING)
--------------------------------------
For runtime statistics and query details run: DESCRIBE EXTENDED <Stream,Table>;
```

You may notice that ksqlDB has added a key column named `ROWKEY`.
This is the default key column that ksqlDB adds if you don't provide one.
If your data doesn't contain a {{ site.ak }} serialized
`STRING` in the {{ site.ak }} message key, don't use `ROWKEY` in your SQL statements,
because this may cause unexpected results.

### Create a Stream with a Specified Key

The previous SQL statement doesn't define a column to represent the data in the
{{ site.ak }} message key in the underlying {{ site.ak }} topic, so the system adds a
`ROWKEY` column with type `STRING`.

If the {{ site.ak }} message key is serialized in a key format that ksqlDB supports (currently `KAFKA`),
{{ site.ak }} message key in the underlying {{ site.ak }} topic. If the {{ site.ak }} message key
is serialized in a key format that ksqlDB supports (currently `KAFKA`),
you can specify the key in the column list of the CREATE STREAM statement.

For example, the {{ site.ak }} message key of the `pageviews` topic is a `BIGINT` containing the `viewtime`,
so you can write the CREATE STREAM statement like this:
For example, the {{ site.ak }} message key of the `pageviews` topic is a `BIGINT` containing the
`viewtime`, so you can write the CREATE STREAM statement like this:

```sql
CREATE STREAM pageviews_withkey
Expand Down
9 changes: 3 additions & 6 deletions docs/developer-guide/ksqldb-reference/create-stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,7 @@ Each column is defined by:
If a column is not marked as a `KEY` column, ksqlDB loads it from the Kafka message's value.
Unlike a table's `PRIMARY KEY`, a stream's keys can be NULL.

ksqlDB adds an implicit `ROWKEY` system column to every stream and table, which represents the
corresponding Kafka message key. An implicit `ROWTIME` pseudo column is also available on every
ksqlDB adds an implicit `ROWTIME` pseudo column is also available on every
stream and table, which represents the corresponding Kafka message timestamp. The timestamp has
milliseconds accuracy, and generally represents the _event time_ of a stream row and the
_last modified time_ of a table row.
Expand All @@ -60,7 +59,6 @@ The WITH clause supports the following properties:
| PARTITIONS | The number of partitions in the backing topic. This property must be set if creating a STREAM without an existing topic (the command will fail if the topic does not exist). |
| REPLICAS | The number of replicas in the backing topic. If this property is not set but PARTITIONS is set, then the default Kafka cluster configuration for replicas will be used for creating a new topic. |
| VALUE_DELIMITER | Used when VALUE_FORMAT='DELIMITED'. Supports single character to be a delimiter, defaults to ','. For space and tab delimited values you must use the special values 'SPACE' or 'TAB', not an actual space or tab character. |
| KEY | Optimization hint: If the Kafka message key is also present as a field/column in the Kafka message value, you may set this property to associate the corresponding field/column with the implicit `ROWKEY` column (message key). If set, ksqlDB uses it as an optimization hint to determine if repartitioning can be avoided when performing aggregations and joins. Do not use this hint if the message key format in Kafka is `AVRO` or `JSON`. See [Key Requirements](../syntax-reference.md#key-requirements) for more information. |
| TIMESTAMP | By default, the implicit `ROWTIME` column is the timestamp of the message in the Kafka topic. The TIMESTAMP property can be used to override `ROWTIME` with the contents of the specified field/column within the Kafka message value (similar to timestamp extractors in Kafka's Streams API). Timestamps have a millisecond accuracy. Time-based operations, such as windowing, will process a record according to the timestamp in `ROWTIME`. |
| TIMESTAMP_FORMAT | Used in conjunction with TIMESTAMP. If not set will assume that the timestamp field is a `bigint`. If it is set, then the TIMESTAMP field must be of type `varchar` and have a format that can be parsed with the java `DateTimeFormatter`. If your timestamp format has characters requiring single quotes, you can escape them with successive single quotes, `''`, for example: `'yyyy-MM-dd''T''HH:mm:ssX'`. For more information on timestamp formats, see [DateTimeFormatter](https://cnfl.io/java-dtf). |
| WRAP_SINGLE_VALUE | Controls how values are deserialized where the value schema contains only a single field. The setting controls how ksqlDB will deserialize the value of the records in the supplied `KAFKA_TOPIC` that contain only a single field.<br>If set to `true`, ksqlDB expects the field to have been serialized as a named field within a record.<br>If set to `false`, ksqlDB expects the field to have been serialized as an anonymous value.<br>If not supplied, the system default, defined by [ksql.persistence.wrap.single.values](../../operate-and-deploy/installation/server-config/config-reference.md#ksqlpersistencewrapsinglevalues) and defaulting to `true`, is used.<br>**Note:** `null` values have special meaning in ksqlDB. Care should be taken when dealing with single-field schemas where the value can be `null`. For more information, see [Single field (un)wrapping](../serialization.md#single-field-unwrapping).<br>**Note:** Supplying this property for formats that do not support wrapping, for example `DELIMITED`, or when the value schema has multiple fields, will result in an error. |
Expand All @@ -83,10 +81,9 @@ Example
```sql
CREATE STREAM pageviews
(
rowkey BIGINT KEY,
page_id BIGINT KEY,
viewtime BIGINT,
user_id VARCHAR,
page_id VARCHAR
user_id VARCHAR
)
WITH (VALUE_FORMAT = 'JSON', KAFKA_TOPIC = 'my-pageviews-topic');
```
Expand Down
9 changes: 3 additions & 6 deletions docs/developer-guide/ksqldb-reference/create-table.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,8 @@ Each column is defined by:
`PRIMARY KEY` columns. If a column is not marked as a `PRIMARY KEY` column ksqlDB loads it
from the Kafka message's value. Unlike a stream's `KEY` column, a table's `PRIMARY KEY` column(s)
are NON NULL. Any records in the Kafka topic with NULL key columns are dropped.

ksqlDB adds an implicit `ROWKEY` system column to every stream and table, which represents the
corresponding Kafka message key. An implicit `ROWTIME` pseudo column is also available on every

ksqlDB adds an implicit `ROWTIME` pseudo column is also available on every
stream and table, which represents the corresponding Kafka message timestamp. The timestamp has
milliseconds accuracy, and generally represents the _event time_ of a stream row and the
_last modified time_ of a table row.
Expand All @@ -60,7 +59,6 @@ The WITH clause supports the following properties:
| PARTITIONS | The number of partitions in the backing topic. This property must be set if creating a TABLE without an existing topic (the command will fail if the topic does not exist). |
| REPLICAS | The number of replicas in the backing topic. If this property is not set but PARTITIONS is set, then the default Kafka cluster configuration for replicas will be used for creating a new topic. |
| VALUE_DELIMITER | Used when VALUE_FORMAT='DELIMITED'. Supports single character to be a delimiter, defaults to ','. For space and tab delimited values you must use the special values 'SPACE' or 'TAB', not an actual space or tab character. |
| KEY | **Optimization hint:** If the Kafka message key is also present as a field/column in the Kafka message value, you may set this property to associate the corresponding field/column with the implicit `ROWKEY` column (message key). If set, ksqlDB uses it as an optimization hint to determine if repartitioning can be avoided when performing aggregations and joins. Do not use this hint if the message key format in kafka is `AVRO` or `JSON`. For more information, see [Key Requirements](../syntax-reference.md#key-requirements). |
| TIMESTAMP | By default, the implicit `ROWTIME` column is the timestamp of the message in the Kafka topic. The TIMESTAMP property can be used to override `ROWTIME` with the contents of the specified field/column within the Kafka message value (similar to timestamp extractors in the Kafka Streams API). Timestamps have a millisecond accuracy. Time-based operations, such as windowing, will process a record according to the timestamp in `ROWTIME`. |
| TIMESTAMP_FORMAT | Used in conjunction with TIMESTAMP. If not set will assume that the timestamp field is a `bigint`. If it is set, then the TIMESTAMP field must be of type varchar and have a format that can be parsed with the Java `DateTimeFormatter`. If your timestamp format has characters requiring single quotes, you can escape them with two successive single quotes, `''`, for example: `'yyyy-MM-dd''T''HH:mm:ssX'`. For more information on timestamp formats, see [DateTimeFormatter](https://cnfl.io/java-dtf). |
| WRAP_SINGLE_VALUE | Controls how values are deserialized where the values schema contains only a single field. The setting controls how ksqlDB will deserialize the value of the records in the supplied `KAFKA_TOPIC` that contain only a single field.<br>If set to `true`, ksqlDB expects the field to have been serialized as named field within a record.<br>If set to `false`, ksqlDB expects the field to have been serialized as an anonymous value.<br>If not supplied, the system default, defined by [ksql.persistence.wrap.single.values](../../operate-and-deploy/installation/server-config/config-reference.md#ksqlpersistencewrapsinglevalues) and defaulting to `true`, is used.<br>**Note:** `null` values have special meaning in ksqlDB. Care should be taken when dealing with single-field schemas where the value can be `null`. For more information, see [Single field (un)wrapping](../serialization.md#single-field-unwrapping).<br>**Note:** Supplying this property for formats that do not support wrapping, for example `DELIMITED`, or when the value schema has multiple fields, will result in an error. |
Expand All @@ -79,9 +77,8 @@ Example
```sql
CREATE TABLE users
(
rowkey BIGINT PRIMARY KEY,
id BIGINT PRIMARY KEY,
usertimestamp BIGINT,
user_id VARCHAR,
gender VARCHAR,
region_id VARCHAR
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,22 @@ public CreateTableCommand createTableCommand(
final SourceName sourceName = statement.getName();
final KsqlTopic topic = buildTopic(statement.getProperties(), serviceContext);
final LogicalSchema schema = buildSchema(statement.getElements());
if (schema.key().isEmpty()) {
final boolean usingSchemaInference = statement.getProperties().getSchemaId().isPresent();

final String additional = usingSchemaInference
? System.lineSeparator()
+ "Use a partial schema to define the primary key and still load the value columns from "
+ "the Schema Registry, for example:"
+ System.lineSeparator()
+ "\tCREATE TABLE " + statement.getName().text() + " (ID INT PRIMARY KEY) WITH (...);"
: "";

throw new KsqlException(
"Tables require a PRIMARY KEY. Please define the PRIMARY KEY." + additional
);
}

final Optional<TimestampColumn> timestampColumn = buildTimestampColumn(
ksqlConfig,
statement.getProperties(),
Expand Down Expand Up @@ -153,7 +169,7 @@ private static LogicalSchema buildSchema(final TableElements tableElements) {
}
});

return tableElements.toLogicalSchema(true);
return tableElements.toLogicalSchema();
}

private static KsqlTopic buildTopic(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public enum PlanJsonMapper {
new JavaTimeModule(),
new KsqlParserSerializationModule(),
new KsqlTypesSerializationModule(),
new KsqlTypesDeserializationModule(true)
new KsqlTypesDeserializationModule()
)
.enable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES)
.enable(DeserializationFeature.FAIL_ON_NULL_FOR_PRIMITIVES)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,11 @@ public Stream<ColumnName> resolveSelectStar(

@Override
void validateKeyPresent(final SourceName sinkName, final Projection projection) {
if (getSchema().key().isEmpty()) {
// No key column.
return;
}

final ColumnName keyName = Iterables.getOnlyElement(getSchema().key()).name();

if (!projection.containsExpression(new QualifiedColumnReferenceExp(getAlias(), keyName))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ private void registerForCreateSource(final ConfiguredStatement<? extends CreateS
// we can assume that the kafka topic is always present in the
// statement properties
registerSchema(
cs.getStatement().getElements().toLogicalSchema(false),
cs.getStatement().getElements().toLogicalSchema(),
cs.getStatement().getProperties().getKafkaTopic(),
cs.getStatement().getProperties().getFormatInfo(),
cs.getConfig(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,11 @@ boolean repartitionNotNeeded(final List<Expression> expressions) {
// Note: A repartition is only not required if partitioning by the existing key column, or
// the existing keyField.

if (schema.key().isEmpty()) {
// No current key, so repartition needed:
return false;
}

if (schema.key().size() != 1) {
throw new UnsupportedOperationException("logic only supports single key column");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,10 @@ public class CommandFactoriesTest {
private static final TableElement ELEMENT1 =
tableElement(Namespace.VALUE, "bob", new Type(SqlTypes.STRING));
private static final TableElements SOME_ELEMENTS = TableElements.of(ELEMENT1);
private static final TableElements ELEMENTS_WITH_PK = TableElements.of(
tableElement(Namespace.PRIMARY_KEY, "k", new Type(SqlTypes.STRING)),
ELEMENT1
);
private static final String TOPIC_NAME = "some topic";
private static final Map<String, Literal> MINIMIM_PROPS = ImmutableMap.of(
CommonCreateConfigs.VALUE_FORMAT_PROPERTY, new StringLiteral("JSON"),
Expand Down Expand Up @@ -325,7 +329,7 @@ public void shouldCreateTableCommandWithSingleValueWrappingFromOverridesNotConfi
);

final DdlStatement statement =
new CreateTable(SOME_NAME, SOME_ELEMENTS, true, withProperties);
new CreateTable(SOME_NAME, ELEMENTS_WITH_PK, true, withProperties);

// When:
final DdlCommand cmd = commandFactories
Expand Down
Loading