Skip to content

Commit

Permalink
feat: NONE format for key-less streams (#6349)
Browse files Browse the repository at this point in the history
* feat: add NONE format

fixes: #6221

Introduce a `NONE` format, used to indicate the key data, in key-less streams, should not be deserialized.

Co-authored-by: Andy Coates <[email protected]>
  • Loading branch information
big-andy-coates and big-andy-coates authored Oct 2, 2020
1 parent 0d60246 commit 25bb352
Show file tree
Hide file tree
Showing 21 changed files with 1,263 additions and 18 deletions.
87 changes: 75 additions & 12 deletions docs/developer-guide/serialization.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,16 @@ at runtime. ksqlDB offers several mechanisms for controlling serialization
and deserialization.

The primary mechanism is by choosing the serialization format when you
create a stream or table and specify the `VALUE_FORMAT` in the `WITH`
create a stream or table and specify `FORMAT`, `KEY_FORMAT` or `VALUE_FORMAT` in the `WITH`
clause.

While ksqlDB supports different value formats, it requires keys to be `KAFKA` format.

```sql
-- create table with JSON value format:
CREATE TABLE ORDERS (
F0 INT PRIMARY KEY,
F1 STRING
) WITH (
KEY_FORMAT='KAFKA',
VALUE_FORMAT='JSON',
...
);
Expand All @@ -32,15 +31,77 @@ Serialization Formats

ksqlDB supports these serialization formats:

- `DELIMITED` supports comma separated values. See [DELIMITED](#delimited) below.
- `JSON` and `JSON_SR` support JSON values. See [JSON](#json) below.
- `AVRO` supports AVRO serialized values. See [AVRO](#avro) below.
- `KAFKA` supports primitives serialized using the standard Kafka
serializers. See [KAFKA](#kafka) below.
- `PROTOBUF` supports Protocol Buffers. See [Protobuf](#protobuf) below.
- [`NONE`](#none) used to indicate the data should not be deserialized.
- [`DELIMITED`](#delimited) supports comma separated values.
- [`JSON`](#json) and [`JSON_SR`](#json) support JSON values, with and within schema registry integration
- [`AVRO`](#avro) supports AVRO serialized values.
- [`KAFKA`](#kafka) supports primitives serialized using the standard Kafka serializers.
- [`PROTOBUF`](#protobuf) supports Protocol Buffers.


Not all formats can be used as both key and value formats. See individual formats for details.

### NONE

| Feature | Supported |
|------------------------------|-----------|
| As value format | No |
| As key format | Yes |
| [Schema Registry required][0]| No |
| [Schema inference][1] | No |
| [Single field wrapping][2] | No |
| [Single field unwrapping][2] | No |

The `NONE` format is a special marker format that is used to indicate ksqlDB should not attempt to
deserialize that part of the {{ site.ak }} record.

It's main use is as the `KEY_FORMAT` of key-less streams, especially where a default key format
has been set, via [`ksql.persistence.default.format.key`][1] that supports Schema inference. If the
key format was not overridden, the server would attempt to load the key schema from the {{ site.sr }}.
If the schema existed, the key columns would be inferred from the schema, which may not be the intent.
If the schema did not exist, the statement would be rejected. In such situations, the key format can
be set to `NONE`:

```sql
CREATE STREAM KEYLESS_STREAM (
VAL STRING
) WITH (
KEY_FORMAT='NONE',
VALUE_FORMAT='JSON',
KAFKA_TOPIC='foo'
);
```

Any statement that sets the key format to `NONE` and has key columns defined, will result in an error.

If a `CREATE TABLE AS` or `CREATE STREAM AS` statement has a source with a key format of `NONE`, but
the newly created table or stream has key columns, then you may either explicitly define the key
format to use in the `WITH` clause, or the default key format, as set in [`ksql.persistence.default.format.key`][1]
will be used.

Conversely, a `CREATE STREAM AS` statement that removes the key columns, i.e. via `PARTITION BY null`
will automatically set the key format to `NONE`.

All formats are supported as value formats. Only a subset of formats are
currently supported as key formats. See individual formats for details.
```sql
-- keyless stream with NONE key format:
CREATE STREAM KEYLESS_STREAM (
VAL STRING
) WITH (
KEY_FORMAT='NONE',
VALUE_FORMAT='JSON',
KAFKA_TOPIC='foo'
);

-- Table created from stream with explicit key format declared in WITH clause:
CREATE TABLE T WITH (KEY_FORMAT='KAFKA') AS
SELECT VAL, COUNT() FROM KEYLESS_STREAM
GROUP BY VAL;

-- or, using the default key format set in the ksql.persistence.default.format.key config:
CREATE TABLE T AS
SELECT VAL, COUNT() FROM KEYLESS_STREAM
GROUP BY VAL;
```

### DELIMITED

Expand Down Expand Up @@ -556,4 +617,6 @@ CREATE STREAM BAD_SINK WITH(WRAP_SINGLE_VALUE=true) AS SELECT ID, COST FROM S EM

## Suggested Reading

- Blog post: [I’ve Got the Key, I’ve Got the Secret. Here’s How Keys Work in ksqlDB 0.10](https://www.confluent.io/blog/ksqldb-0-10-updates-key-columns/)
- Blog post: [I’ve Got the Key, I’ve Got the Secret. Here’s How Keys Work in ksqlDB 0.10](https://www.confluent.io/blog/ksqldb-0-10-updates-key-columns/)

[1]: ../operate-and-deploy/installation/server-config/config-reference.md#ksqlpersistencedefaultformatkey
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,12 @@ If not set and no explicit key format is provided in the statement, via either t

For supported formats, see [Serialization Formats](../../../developer-guide/serialization.md#serialization-formats).

[CREATE STREAM AS SELECT](../../../developer-guide/ksqldb-reference/create-stream-as-select.md) and
[CREATE TABLE AS SELECT](../../../developer-guide/ksqldb-reference/create-table-as-select.md)
statements that create streams or tables with key columns, where the source stream or table
has a [NONE](../../../developer-guide/serialization.md#none) key format, will also use the default
key format set in this configuration if no explicit key format is declared in the `WITH` clause.

### ksql.persistence.default.format.value

Sets the default value for the `VALUE_FORMAT` property if one is
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,11 +76,13 @@
import io.confluent.ksql.schema.ksql.types.SqlType;
import io.confluent.ksql.schema.ksql.types.SqlTypes;
import io.confluent.ksql.serde.FormatFactory;
import io.confluent.ksql.serde.FormatInfo;
import io.confluent.ksql.serde.KeyFormat;
import io.confluent.ksql.serde.RefinementInfo;
import io.confluent.ksql.serde.SerdeFeatures;
import io.confluent.ksql.serde.SerdeFeaturesFactory;
import io.confluent.ksql.serde.ValueFormat;
import io.confluent.ksql.serde.none.NoneFormat;
import io.confluent.ksql.util.GrammaticalJoiner;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.KsqlException;
Expand Down Expand Up @@ -202,10 +204,11 @@ private KsqlTopic getSinkTopic(final Into into, final LogicalSchema schema) {
}

final NewTopic newTopic = into.getNewTopic().orElseThrow(IllegalStateException::new);
final FormatInfo keyFormat = getSinkKeyFormat(schema, newTopic);

final SerdeFeatures keyFeatures = SerdeFeaturesFactory.buildKeyFeatures(
schema,
FormatFactory.of(newTopic.getKeyFormat())
FormatFactory.of(keyFormat)
);

final SerdeFeatures valFeatures = SerdeFeaturesFactory.buildValueFeatures(
Expand All @@ -217,11 +220,25 @@ private KsqlTopic getSinkTopic(final Into into, final LogicalSchema schema) {

return new KsqlTopic(
newTopic.getTopicName(),
KeyFormat.of(newTopic.getKeyFormat(), keyFeatures, newTopic.getWindowInfo()),
KeyFormat.of(keyFormat, keyFeatures, newTopic.getWindowInfo()),
ValueFormat.of(newTopic.getValueFormat(), valFeatures)
);
}

private FormatInfo getSinkKeyFormat(final LogicalSchema schema, final NewTopic newTopic) {
// If the inherited key format is NONE, and the result has key columns, use default key format:
final boolean resultHasKeyColumns = !schema.key().isEmpty();
final boolean inheritedNone = !analysis.getProperties().getKeyFormat().isPresent()
&& newTopic.getKeyFormat().getFormat().equals(NoneFormat.NAME);

if (!inheritedNone || !resultHasKeyColumns) {
return newTopic.getKeyFormat();
}

final String defaultKeyFormat = ksqlConfig.getString(KsqlConfig.KSQL_DEFAULT_KEY_FORMAT_CONFIG);
return FormatInfo.of(defaultKeyFormat);
}

private Optional<TimestampColumn> getTimestampColumn(
final LogicalSchema inputSchema,
final ImmutableAnalysis analysis
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
package io.confluent.ksql.serde;

import io.confluent.ksql.execution.plan.Formats;
import io.confluent.ksql.serde.json.JsonFormat;
import io.confluent.ksql.serde.none.NoneFormat;

/**
* Util class for creating internal formats.
Expand Down Expand Up @@ -50,10 +52,15 @@ private InternalFormats() {
* @see SerdeFeaturesFactory#buildInternal
*/
public static Formats of(final KeyFormat keyFormat, final ValueFormat valueFormat) {
// Do not use NONE format for internal topics:
final FormatInfo formatInfo = keyFormat.getFormatInfo().getFormat().equals(NoneFormat.NAME)
? FormatInfo.of(JsonFormat.NAME)
: keyFormat.getFormatInfo();

return Formats.of(
keyFormat.getFormatInfo(),
formatInfo,
valueFormat.getFormatInfo(),
SerdeFeaturesFactory.buildInternal(FormatFactory.of(keyFormat.getFormatInfo())),
SerdeFeaturesFactory.buildInternal(FormatFactory.of(formatInfo)),
SerdeFeatures.of()
);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright 2020 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/


package io.confluent.ksql.test.serde.none;

import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.ksql.serde.voids.KsqlVoidSerde;
import io.confluent.ksql.test.serde.SerdeSupplier;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;

public class NoneSerdeSupplier implements SerdeSupplier<Void> {

@Override
public Serializer<Void> getSerializer(final SchemaRegistryClient schemaRegistryClient) {
return new KsqlVoidSerde<Void>().serializer();
}

@Override
public Deserializer<Void> getDeserializer(final SchemaRegistryClient schemaRegistryClient) {
return new KsqlVoidSerde<Void>().deserializer();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,13 @@
import io.confluent.ksql.serde.json.JsonFormat;
import io.confluent.ksql.serde.json.JsonSchemaFormat;
import io.confluent.ksql.serde.kafka.KafkaFormat;
import io.confluent.ksql.serde.none.NoneFormat;
import io.confluent.ksql.serde.protobuf.ProtobufFormat;
import io.confluent.ksql.test.serde.SerdeSupplier;
import io.confluent.ksql.test.serde.avro.ValueSpecAvroSerdeSupplier;
import io.confluent.ksql.test.serde.json.ValueSpecJsonSerdeSupplier;
import io.confluent.ksql.test.serde.kafka.KafkaSerdeSupplier;
import io.confluent.ksql.test.serde.none.NoneSerdeSupplier;
import io.confluent.ksql.test.serde.protobuf.ValueSpecProtobufSerdeSupplier;
import io.confluent.ksql.test.serde.string.StringSerdeSupplier;
import io.confluent.ksql.test.tools.exceptions.InvalidFieldException;
Expand Down Expand Up @@ -72,6 +74,7 @@ public static SerdeSupplier<?> getSerdeSupplier(
case JsonSchemaFormat.NAME: return new ValueSpecJsonSerdeSupplier(true);
case DelimitedFormat.NAME: return new StringSerdeSupplier();
case KafkaFormat.NAME: return new KafkaSerdeSupplier(schema);
case NoneFormat.NAME: return new NoneSerdeSupplier();
default:
throw new InvalidFieldException("format", "unsupported value: " + format);
}
Expand Down
Loading

0 comments on commit 25bb352

Please sign in to comment.