Skip to content

Commit

Permalink
fix: support partial schemas (#4625)
Browse files Browse the repository at this point in the history
fixes: #4566

With this change users can now supply just the key schema and use schema inference to get the value columns. For example, if the key is an `INT` serialized using Kafka's `IntegerSerializer` and the value is an Avro record with the schema stored in the Scheme Registry, then such a stream can be registered in ksqlDB with a statement such as:

```sql
-- note: only the key columns are provided between the first set of brackets
-- the value columns will be inferred from the Avro schema in the Schema Registry
CREATE STREAM users (ROWKEY INT KET) WITH (kafka_topic='users', value_format='avro');
```

(cherry picked from commit 4f1ce8a)
  • Loading branch information
big-andy-coates authored and colinhicks committed Feb 27, 2020
1 parent 04fcf8d commit 7cc19a0
Show file tree
Hide file tree
Showing 20 changed files with 1,070 additions and 50 deletions.
2 changes: 1 addition & 1 deletion docs-md/developer-guide/create-a-stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ named `pageviews`. To see these examples in action, create the
The following example creates a stream that has three columns from the
`pageviews` topic: `viewtime`, `userid`, and `pageid`.

ksqlDB can't infer the topic's data format, so you must provide the
ksqlDB can't infer the topic value's data format, so you must provide the
format of the values that are stored in the topic. In this example, the
data format is `DELIMITED`. Other options are `Avro`, `JSON` and `KAFKA`.
See [Serialization Formats](serialization.md#serialization-formats) for more
Expand Down
2 changes: 1 addition & 1 deletion docs-md/developer-guide/create-a-table.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ the `userid` field is assigned as the table's KEY property.
The KEY field is optional. For more information, see
[Key Requirements](syntax-reference.md#key-requirements).

ksqlDB can't infer the topic's data format, so you must provide the
ksqlDB can't infer the topic value's data format, so you must provide the
format of the values that are stored in the topic. In this example, the
data format is `JSON`. Other options are `Avro`, `DELIMITED` and `KAFKA`. For
more information, see
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ CREATE STREAM pageviews
### Create a New Table by Reading Avro-formatted Data

The following statement shows how to create a new `users` table by
reading from a {{ site.ak }} topic that has Avro-formatted messages.
reading from a {{ site.ak }} topic that has Avro-formatted message values.

```sql
CREATE TABLE users
Expand All @@ -109,6 +109,23 @@ the latest registered Avro schema for the `pageviews-avro-topic` topic.
ksqlDB uses the most recent schema at the time the statement is first
executed.

Note: the Avro schema must be registered in the {{ site.sr }} under the
subject `users-avro-topic-value`.

By default, the key of the data will be assumed to be a single `KAFKA`
serialized `STRING` called `ROWKEY`. If the key schema differs, then you
can provide just the key column in the statement. For example, the following
creates the `users` table with a 64-bit integer key and infers the value
columns from the Avro schema.

```sql
CREATE TABLE users (ROWKEY BIGINT KEY)
WITH (KAFKA_TOPIC='users-avro-topic',
VALUE_FORMAT='AVRO',
KEY='userid');
```


### Create a New Stream with Selected Fields of Avro-formatted Data

If you want to create a STREAM or TABLE with only a subset of all the
Expand Down
7 changes: 5 additions & 2 deletions docs-md/tutorials/basics-docker.md
Original file line number Diff line number Diff line change
Expand Up @@ -274,8 +274,11 @@ Your output should resemble:
ksqlDB retrieves the schema from {{ site.sr }} and uses this to build
the SQL schema for the table. You may still provide the schema if you wish.
Until [Github issue #4462](https://github.com/confluentinc/ksql/issues/4462)
is complete, schema inference is only available where the key of the data
is a STRING, as is the case here.
is complete, schema inference is only available for the value columns. By
default, it is assumed the key schema is a single `KAFKA` formatted `STRING`
column and is called `ROWKEY`. It is also possible to supply just the key
column in the statement, allowing you to specify the key column type. For example:
`CREATE TABLE users_original (ROWKEY INT KEY) WITH (...);`

!!! note
The data generated has the same value in the {{ site.ak }} record's key
Expand Down
7 changes: 5 additions & 2 deletions docs-md/tutorials/basics-local.md
Original file line number Diff line number Diff line change
Expand Up @@ -248,8 +248,11 @@ These examples query messages from Kafka topics called `pageviews` and
ksqlDB retrieves the schema from {{ site.sr }} and uses this to build
the SQL schema for the table. You may still provide the schema if you wish.
Until [Github issue #4462](https://github.com/confluentinc/ksql/issues/4462)
is complete, schema inference is only available where the key of the data
is a STRING, as is the case here.
is complete, schema inference is only available for the value columns. By
default, it is assumed the key schema is a single `KAFKA` formatted `STRING`
column and is called `ROWKEY`. It is also possible to supply just the key
column in the statement, allowing you to specify the key column type. For example:
`CREATE TABLE users_original (ROWKEY INT KEY) WITH (...);`
!!! note
The data generated has the same value in the {{ site.ak }} record's key
Expand Down
8 changes: 6 additions & 2 deletions docs-md/tutorials/examples.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,17 @@ Prerequisite:
Create a stream with three columns on the Kafka topic that is named
`pageviews`.

ksqlDB can't infer the topic's data format, so you must provide the format
ksqlDB can't infer the topic values's data format, so you must provide the format
of the values that are stored in the topic. In this example, the values
format is `DELIMITED`.

ksqlDB requires keys to have been serialized using {{ site.ak }}'s own serializers or compatible
serializers. ksqlDB supports `INT`, `BIGINT`, `DOUBLE`, and `STRING` key types.

```sql
CREATE STREAM pageviews
(viewtime BIGINT,
(rowkey STRING KEY,
viewtime BIGINT,
userid VARCHAR,
pageid VARCHAR)
WITH (KAFKA_TOPIC='pageviews',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,15 @@

package io.confluent.ksql.schema.ksql.inference;

import com.google.common.collect.Iterables;
import io.confluent.ksql.metastore.TypeRegistry;
import io.confluent.ksql.parser.KsqlParser.PreparedStatement;
import io.confluent.ksql.parser.SchemaParser;
import io.confluent.ksql.parser.SqlFormatter;
import io.confluent.ksql.parser.properties.with.CreateSourceProperties;
import io.confluent.ksql.parser.tree.CreateSource;
import io.confluent.ksql.parser.tree.Statement;
import io.confluent.ksql.parser.tree.TableElement;
import io.confluent.ksql.parser.tree.TableElement.Namespace;
import io.confluent.ksql.parser.tree.TableElements;
import io.confluent.ksql.schema.connect.SqlSchemaFormatter;
import io.confluent.ksql.schema.connect.SqlSchemaFormatter.Option;
Expand All @@ -35,20 +36,25 @@
import io.confluent.ksql.util.IdentifierUtil;
import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.util.KsqlStatementException;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Stream;
import org.apache.kafka.connect.data.Schema;

/**
* An injector which injects the schema into the supplied {@code statement}.
* An injector which injects the value columns into the supplied {@code statement}.
*
* <p>The schema is only injected if:
* <p>The value columns are only injected if:
* <ul>
* <li>The statement is a CT/CS.</li>
* <li>The statement does not defined a schema.</li>
* <li>The statement does not defined any value columns.</li>
* <li>The format of the statement supports schema inference.</li>
* </ul>
*
* <p>Any key columns present are passed through unchanged.
*
* <p>If any of the above are not true then the {@code statement} is returned unchanged.
*/
public class DefaultSchemaInjector implements Injector {
Expand All @@ -62,7 +68,6 @@ public DefaultSchemaInjector(final TopicSchemaSupplier schemaSupplier) {
this.schemaSupplier = Objects.requireNonNull(schemaSupplier, "schemaSupplier");
}


@SuppressWarnings("unchecked")
@Override
public <T extends Statement> ConfiguredStatement<T> inject(
Expand Down Expand Up @@ -90,17 +95,18 @@ public <T extends Statement> ConfiguredStatement<T> inject(
private Optional<ConfiguredStatement<CreateSource>> forCreateStatement(
final ConfiguredStatement<CreateSource> statement
) {
if (hasElements(statement)
if (hasValueElements(statement)
|| !statement.getStatement().getProperties().getValueFormat().supportsSchemaInference()) {
return Optional.empty();
}

final SchemaAndId valueSchema = getValueSchema(statement);
final CreateSource withSchema = addSchemaFields(statement, valueSchema);
final PreparedStatement<CreateSource> prepared =
buildPreparedStatement(withSchema);
return Optional.of(ConfiguredStatement.of(
prepared, statement.getOverrides(), statement.getConfig()));
final PreparedStatement<CreateSource> prepared = buildPreparedStatement(withSchema);
final ConfiguredStatement<CreateSource> configured = ConfiguredStatement
.of(prepared, statement.getOverrides(), statement.getConfig());

return Optional.of(configured);
}

private SchemaAndId getValueSchema(
Expand All @@ -123,10 +129,11 @@ private SchemaAndId getValueSchema(
return result.schemaAndId.get();
}

private static boolean hasElements(
private static boolean hasValueElements(
final ConfiguredStatement<CreateSource> statement
) {
return !Iterables.isEmpty(statement.getStatement().getElements());
return statement.getStatement().getElements().stream()
.anyMatch(e -> e.getNamespace().equals(Namespace.VALUE));
}

private static CreateSource addSchemaFields(
Expand All @@ -145,23 +152,44 @@ private static CreateSource addSchemaFields(
}

private static TableElements buildElements(
final Schema schema,
final Schema valueSchema,
final ConfiguredStatement<CreateSource> preparedStatement
) {
try {
throwOnInvalidSchema(schema);
// custom types cannot be injected, so we can pass in an EMPTY type registry
return SchemaParser.parse(FORMATTER.format(schema), TypeRegistry.EMPTY);
} catch (final Exception e) {
throw new KsqlStatementException(
"Failed to convert schema to KSQL model: " + e.getMessage(),
preparedStatement.getStatementText(),
e);
}
throwOnInvalidSchema(valueSchema);

final List<TableElement> elements = new ArrayList<>();

getKeyColumns(preparedStatement)
.forEach(elements::add);

getColumnsFromSchema(valueSchema)
.forEach(elements::add);

return TableElements.of(elements);
}

private static Stream<TableElement> getKeyColumns(
final ConfiguredStatement<CreateSource> preparedStatement
) {
return preparedStatement.getStatement().getElements().stream()
.filter(e -> e.getNamespace() == Namespace.KEY);
}

private static Stream<TableElement> getColumnsFromSchema(final Schema schema) {
return SchemaParser.parse(FORMATTER.format(schema), TypeRegistry.EMPTY).stream();
}

private static void throwOnInvalidSchema(final Schema schema) {
SchemaConverters.connectToSqlConverter().toSqlType(schema);
try {
SchemaConverters.connectToSqlConverter().toSqlType(schema);
} catch (final Exception e) {
throw new KsqlException(
"Schema contains types not supported by KSQL: " + e.getMessage()
+ System.lineSeparator()
+ "Schema: " + schema,
e
);
}
}

private static PreparedStatement<CreateSource> buildPreparedStatement(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,7 @@
import static io.confluent.ksql.schema.ksql.inference.TopicSchemaSupplier.SchemaAndId.schemaAndId;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.sameInstance;
import static org.junit.Assert.fail;
Expand Down Expand Up @@ -381,7 +379,7 @@ public void shouldAddSchemaIdIfNotPresentAlready() {
final ConfiguredStatement<CreateStream> result = injector.inject(csStatement);

// Then:
assertThat(result.getStatement().getProperties().getAvroSchemaId().get(), is(SCHEMA_ID));
assertThat(result.getStatement().getProperties().getAvroSchemaId(), is(Optional.of(SCHEMA_ID)));

assertThat(result.getStatementText(), containsString("AVRO_SCHEMA_ID=5"));
}
Expand All @@ -395,8 +393,7 @@ public void shouldNotOverwriteExistingSchemaId() {
final ConfiguredStatement<CreateStream> result = injector.inject(csStatement);

// Then:
assertThat(result.getStatement().getProperties().getAvroSchemaId().get(),
is(42));
assertThat(result.getStatement().getProperties().getAvroSchemaId(), is(Optional.of(42)));

assertThat(result.getStatementText(), containsString("AVRO_SCHEMA_ID='42'"));
}
Expand All @@ -416,7 +413,7 @@ public void shouldThrowOnUnsupportedType() {
fail("Expected KsqlStatementException. schema: " + unsupportedSchema);
} catch (final KsqlStatementException e) {
assertThat(e.getRawMessage(),
containsString("Failed to convert schema to KSQL model:"));
containsString("Schema contains types not supported by KSQL:"));

assertThat(e.getSqlStatement(), is(csStatement.getStatementText()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,14 @@
import org.junit.Test;

public class PlannedTestGeneratorTest {

/**
* Run this test to generate new query plans for the {@link QueryTranslationTest} test cases.
*
* <p>Ensure only the test plans you expected have changed, then check the new query plans in
* with your change. Otherwise, {@link PlannedTestsUpToDateTest} fill fail if there are missing
* or changed query plans.
*/
@Test
@Ignore
public void manuallyGeneratePlans() {
Expand Down
Loading

0 comments on commit 7cc19a0

Please sign in to comment.