Skip to content

Commit

Permalink
fix: support partial schemas (#4625)
Browse files Browse the repository at this point in the history
* fix: support partial schemas

fixes: #4566

With this change users can now supply just the key schema and use schema inference to get the value columns. For example, if the key is an `INT` serialized using Kafka's `IntegerSerializer` and the value is an Avro record with the schema stored in the Scheme Registry, then such a stream can be registered in ksqlDB with a statement such as:

```sql
-- note: only the key columns are provided between the first set of brackets
-- the value columns will be inferred from the Avro schema in the Schema Registry
CREATE STREAM users (ROWKEY INT KET) WITH (kafka_topic='users', value_format='avro');
```
  • Loading branch information
big-andy-coates authored Feb 26, 2020
1 parent f6a5469 commit 4f1ce8a
Show file tree
Hide file tree
Showing 16 changed files with 879 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,15 @@

package io.confluent.ksql.schema.ksql.inference;

import com.google.common.collect.Iterables;
import io.confluent.ksql.metastore.TypeRegistry;
import io.confluent.ksql.parser.KsqlParser.PreparedStatement;
import io.confluent.ksql.parser.SchemaParser;
import io.confluent.ksql.parser.SqlFormatter;
import io.confluent.ksql.parser.properties.with.CreateSourceProperties;
import io.confluent.ksql.parser.tree.CreateSource;
import io.confluent.ksql.parser.tree.Statement;
import io.confluent.ksql.parser.tree.TableElement;
import io.confluent.ksql.parser.tree.TableElement.Namespace;
import io.confluent.ksql.parser.tree.TableElements;
import io.confluent.ksql.schema.connect.SqlSchemaFormatter;
import io.confluent.ksql.schema.connect.SqlSchemaFormatter.Option;
Expand All @@ -35,20 +36,25 @@
import io.confluent.ksql.util.IdentifierUtil;
import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.util.KsqlStatementException;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Stream;
import org.apache.kafka.connect.data.Schema;

/**
* An injector which injects the schema into the supplied {@code statement}.
* An injector which injects the value columns into the supplied {@code statement}.
*
* <p>The schema is only injected if:
* <p>The value columns are only injected if:
* <ul>
* <li>The statement is a CT/CS.</li>
* <li>The statement does not defined a schema.</li>
* <li>The statement does not defined any value columns.</li>
* <li>The format of the statement supports schema inference.</li>
* </ul>
*
* <p>Any key columns present are passed through unchanged.
*
* <p>If any of the above are not true then the {@code statement} is returned unchanged.
*/
public class DefaultSchemaInjector implements Injector {
Expand All @@ -62,7 +68,6 @@ public DefaultSchemaInjector(final TopicSchemaSupplier schemaSupplier) {
this.schemaSupplier = Objects.requireNonNull(schemaSupplier, "schemaSupplier");
}


@SuppressWarnings("unchecked")
@Override
public <T extends Statement> ConfiguredStatement<T> inject(
Expand Down Expand Up @@ -90,17 +95,18 @@ public <T extends Statement> ConfiguredStatement<T> inject(
private Optional<ConfiguredStatement<CreateSource>> forCreateStatement(
final ConfiguredStatement<CreateSource> statement
) {
if (hasElements(statement)
if (hasValueElements(statement)
|| !statement.getStatement().getProperties().getValueFormat().supportsSchemaInference()) {
return Optional.empty();
}

final SchemaAndId valueSchema = getValueSchema(statement);
final CreateSource withSchema = addSchemaFields(statement, valueSchema);
final PreparedStatement<CreateSource> prepared =
buildPreparedStatement(withSchema);
return Optional.of(ConfiguredStatement.of(
prepared, statement.getOverrides(), statement.getConfig()));
final PreparedStatement<CreateSource> prepared = buildPreparedStatement(withSchema);
final ConfiguredStatement<CreateSource> configured = ConfiguredStatement
.of(prepared, statement.getOverrides(), statement.getConfig());

return Optional.of(configured);
}

private SchemaAndId getValueSchema(
Expand All @@ -123,10 +129,11 @@ private SchemaAndId getValueSchema(
return result.schemaAndId.get();
}

private static boolean hasElements(
private static boolean hasValueElements(
final ConfiguredStatement<CreateSource> statement
) {
return !Iterables.isEmpty(statement.getStatement().getElements());
return statement.getStatement().getElements().stream()
.anyMatch(e -> e.getNamespace().equals(Namespace.VALUE));
}

private static CreateSource addSchemaFields(
Expand All @@ -145,23 +152,44 @@ private static CreateSource addSchemaFields(
}

private static TableElements buildElements(
final Schema schema,
final Schema valueSchema,
final ConfiguredStatement<CreateSource> preparedStatement
) {
try {
throwOnInvalidSchema(schema);
// custom types cannot be injected, so we can pass in an EMPTY type registry
return SchemaParser.parse(FORMATTER.format(schema), TypeRegistry.EMPTY);
} catch (final Exception e) {
throw new KsqlStatementException(
"Failed to convert schema to KSQL model: " + e.getMessage(),
preparedStatement.getStatementText(),
e);
}
throwOnInvalidSchema(valueSchema);

final List<TableElement> elements = new ArrayList<>();

getKeyColumns(preparedStatement)
.forEach(elements::add);

getColumnsFromSchema(valueSchema)
.forEach(elements::add);

return TableElements.of(elements);
}

private static Stream<TableElement> getKeyColumns(
final ConfiguredStatement<CreateSource> preparedStatement
) {
return preparedStatement.getStatement().getElements().stream()
.filter(e -> e.getNamespace() == Namespace.KEY);
}

private static Stream<TableElement> getColumnsFromSchema(final Schema schema) {
return SchemaParser.parse(FORMATTER.format(schema), TypeRegistry.EMPTY).stream();
}

private static void throwOnInvalidSchema(final Schema schema) {
SchemaConverters.connectToSqlConverter().toSqlType(schema);
try {
SchemaConverters.connectToSqlConverter().toSqlType(schema);
} catch (final Exception e) {
throw new KsqlException(
"Schema contains types not supported by KSQL: " + e.getMessage()
+ System.lineSeparator()
+ "Schema: " + schema,
e
);
}
}

private static PreparedStatement<CreateSource> buildPreparedStatement(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,7 @@
import static io.confluent.ksql.schema.ksql.inference.TopicSchemaSupplier.SchemaAndId.schemaAndId;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.sameInstance;
import static org.junit.Assert.fail;
Expand Down Expand Up @@ -381,7 +379,7 @@ public void shouldAddSchemaIdIfNotPresentAlready() {
final ConfiguredStatement<CreateStream> result = injector.inject(csStatement);

// Then:
assertThat(result.getStatement().getProperties().getAvroSchemaId().get(), is(SCHEMA_ID));
assertThat(result.getStatement().getProperties().getAvroSchemaId(), is(Optional.of(SCHEMA_ID)));

assertThat(result.getStatementText(), containsString("AVRO_SCHEMA_ID=5"));
}
Expand All @@ -395,8 +393,7 @@ public void shouldNotOverwriteExistingSchemaId() {
final ConfiguredStatement<CreateStream> result = injector.inject(csStatement);

// Then:
assertThat(result.getStatement().getProperties().getAvroSchemaId().get(),
is(42));
assertThat(result.getStatement().getProperties().getAvroSchemaId(), is(Optional.of(42)));

assertThat(result.getStatementText(), containsString("AVRO_SCHEMA_ID='42'"));
}
Expand All @@ -416,7 +413,7 @@ public void shouldThrowOnUnsupportedType() {
fail("Expected KsqlStatementException. schema: " + unsupportedSchema);
} catch (final KsqlStatementException e) {
assertThat(e.getRawMessage(),
containsString("Failed to convert schema to KSQL model:"));
containsString("Schema contains types not supported by KSQL:"));

assertThat(e.getSqlStatement(), is(csStatement.getStatementText()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,9 @@ public class PlannedTestGeneratorTest {
/**
* Run this test to generate new query plans for the {@link QueryTranslationTest} test cases.
*
* <p>Check the new query plans in with your change. Otherwise, {@link PlannedTestsUpToDateTest}
* fill fail if there are missing or changed query plans.
* <p>Ensure only the test plans you expected have changed, then check the new query plans in
* with your change. Otherwise, {@link PlannedTestsUpToDateTest} fill fail if there are missing
* or changed query plans.
*/
@Test
@Ignore
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
{
"plan" : [ {
"@type" : "ksqlPlanV1",
"statementText" : "CREATE STREAM INPUT (ROWKEY INTEGER KEY, C1 INTEGER) WITH (AVRO_SCHEMA_ID=1, KAFKA_TOPIC='input', VALUE_FORMAT='AvRo');",
"ddlCommand" : {
"@type" : "createStreamV1",
"sourceName" : "INPUT",
"schema" : "`ROWKEY` INTEGER KEY, `C1` INTEGER",
"keyField" : null,
"timestampColumn" : null,
"topicName" : "input",
"formats" : {
"keyFormat" : {
"format" : "KAFKA",
"properties" : { }
},
"valueFormat" : {
"format" : "AVRO",
"properties" : { }
},
"options" : [ ]
},
"windowInfo" : null
},
"queryPlan" : null
}, {
"@type" : "ksqlPlanV1",
"statementText" : "CREATE STREAM OUTPUT AS SELECT *\nFROM INPUT INPUT\nEMIT CHANGES",
"ddlCommand" : {
"@type" : "createStreamV1",
"sourceName" : "OUTPUT",
"schema" : "`ROWKEY` INTEGER KEY, `C1` INTEGER",
"keyField" : null,
"timestampColumn" : null,
"topicName" : "OUTPUT",
"formats" : {
"keyFormat" : {
"format" : "KAFKA",
"properties" : { }
},
"valueFormat" : {
"format" : "AVRO",
"properties" : { }
},
"options" : [ ]
},
"windowInfo" : null
},
"queryPlan" : {
"sources" : [ "INPUT" ],
"sink" : "OUTPUT",
"physicalPlan" : {
"@type" : "streamSinkV1",
"properties" : {
"queryContext" : "OUTPUT"
},
"source" : {
"@type" : "streamSelectV1",
"properties" : {
"queryContext" : "Project"
},
"source" : {
"@type" : "streamSourceV1",
"properties" : {
"queryContext" : "KsqlTopic/Source"
},
"topicName" : "input",
"formats" : {
"keyFormat" : {
"format" : "KAFKA",
"properties" : { }
},
"valueFormat" : {
"format" : "AVRO",
"properties" : { }
},
"options" : [ ]
},
"timestampColumn" : null,
"sourceSchema" : "`ROWKEY` INTEGER KEY, `C1` INTEGER"
},
"selectExpressions" : [ "C1 AS C1" ]
},
"formats" : {
"keyFormat" : {
"format" : "KAFKA",
"properties" : { }
},
"valueFormat" : {
"format" : "AVRO",
"properties" : { }
},
"options" : [ ]
},
"topicName" : "OUTPUT"
},
"queryId" : "CSAS_OUTPUT_0"
}
} ],
"configs" : {
"ksql.extension.dir" : "ext",
"ksql.streams.cache.max.bytes.buffering" : "0",
"ksql.security.extension.class" : null,
"ksql.transient.prefix" : "transient_",
"ksql.persistence.wrap.single.values" : "true",
"ksql.authorization.cache.expiry.time.secs" : "30",
"ksql.schema.registry.url" : "",
"ksql.streams.default.deserialization.exception.handler" : "io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler",
"ksql.output.topic.name.prefix" : "",
"ksql.streams.auto.offset.reset" : "earliest",
"ksql.query.pull.enable.standby.reads" : "false",
"ksql.connect.url" : "http://localhost:8083",
"ksql.service.id" : "some.ksql.service.id",
"ksql.internal.topic.min.insync.replicas" : "1",
"ksql.streams.shutdown.timeout.ms" : "300000",
"ksql.new.api.enabled" : "false",
"ksql.streams.state.dir" : "/var/folders/2d/3pt97ylj3zngd51bwl91bl3r0000gp/T/confluent3048982612619405901",
"ksql.internal.topic.replicas" : "1",
"ksql.insert.into.values.enabled" : "true",
"ksql.query.pull.max.allowed.offset.lag" : "9223372036854775807",
"ksql.streams.default.production.exception.handler" : "io.confluent.ksql.errors.ProductionExceptionHandlerUtil$LogAndFailProductionExceptionHandler",
"ksql.access.validator.enable" : "auto",
"ksql.streams.bootstrap.servers" : "localhost:0",
"ksql.streams.commit.interval.ms" : "2000",
"ksql.metric.reporters" : "",
"ksql.streams.auto.commit.interval.ms" : "0",
"ksql.metrics.extension" : null,
"ksql.streams.topology.optimization" : "all",
"ksql.query.pull.streamsstore.rebalancing.timeout.ms" : "10000",
"ksql.hidden.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses",
"ksql.streams.num.stream.threads" : "4",
"ksql.authorization.cache.max.entries" : "10000",
"ksql.metrics.tags.custom" : "",
"ksql.pull.queries.enable" : "true",
"ksql.udfs.enabled" : "true",
"ksql.udf.enable.security.manager" : "true",
"ksql.connect.worker.config" : "",
"ksql.sink.window.change.log.additional.retention" : "1000000",
"ksql.readonly.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses",
"ksql.udf.collect.metrics" : "false",
"ksql.persistent.prefix" : "query_",
"ksql.query.persistent.active.limit" : "2147483647"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
{
"version" : "6.0.0",
"timestamp" : 1582711670275,
"schemas" : {
"CSAS_OUTPUT_0.KsqlTopic.Source" : "STRUCT<C1 INT> NOT NULL",
"CSAS_OUTPUT_0.OUTPUT" : "STRUCT<C1 INT> NOT NULL"
},
"inputs" : [ {
"topic" : "input",
"key" : 42,
"value" : {
"c1" : 4
}
} ],
"outputs" : [ {
"topic" : "OUTPUT",
"key" : 42,
"value" : {
"C1" : 4
}
} ]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
Topologies:
Sub-topology: 0
Source: KSTREAM-SOURCE-0000000000 (topics: [input])
--> KSTREAM-TRANSFORMVALUES-0000000001
Processor: KSTREAM-TRANSFORMVALUES-0000000001 (stores: [])
--> Project
<-- KSTREAM-SOURCE-0000000000
Processor: Project (stores: [])
--> KSTREAM-SINK-0000000003
<-- KSTREAM-TRANSFORMVALUES-0000000001
Sink: KSTREAM-SINK-0000000003 (topic: OUTPUT)
<-- Project

Loading

0 comments on commit 4f1ce8a

Please sign in to comment.