diff --git a/ksql-engine/src/main/java/io/confluent/ksql/KsqlContext.java b/ksql-engine/src/main/java/io/confluent/ksql/KsqlContext.java index 269807b20c66..f3f8aa63ccc5 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/KsqlContext.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/KsqlContext.java @@ -31,6 +31,8 @@ import io.confluent.ksql.schema.inference.SchemaRegistryTopicSchemaSupplier; import io.confluent.ksql.services.DefaultServiceContext; import io.confluent.ksql.services.ServiceContext; +import io.confluent.ksql.topic.DefaultTopicInjector; +import io.confluent.ksql.topic.TopicInjector; import io.confluent.ksql.util.KsqlConfig; import io.confluent.ksql.util.PersistentQueryMetadata; import io.confluent.ksql.util.QueryMetadata; @@ -41,6 +43,7 @@ import java.util.Map; import java.util.Objects; import java.util.Set; +import java.util.function.Function; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -51,7 +54,8 @@ public class KsqlContext { private final ServiceContext serviceContext; private final KsqlConfig ksqlConfig; private final KsqlEngine ksqlEngine; - private final SchemaInjector schemaInjector; + private final Function schemaInjectorFactory; + private final Function topicInjectorFactory; /** * Create a KSQL context object with the given properties. A KSQL context has it's own metastore @@ -72,10 +76,13 @@ public static KsqlContext create( functionRegistry, serviceId); - final DefaultSchemaInjector schemaInjector = new DefaultSchemaInjector( - new SchemaRegistryTopicSchemaSupplier(serviceContext.getSchemaRegistryClient())); - - return new KsqlContext(serviceContext, ksqlConfig, engine, schemaInjector); + return new KsqlContext( + serviceContext, + ksqlConfig, + engine, + sc -> new DefaultSchemaInjector( + new SchemaRegistryTopicSchemaSupplier(sc.getSchemaRegistryClient())), + DefaultTopicInjector::new); } @VisibleForTesting @@ -83,12 +90,16 @@ public static KsqlContext create( final ServiceContext serviceContext, final KsqlConfig ksqlConfig, final KsqlEngine ksqlEngine, - final SchemaInjector schemaInjector + final Function schemaInjectorFactory, + final Function topicInjectorFactory ) { this.serviceContext = Objects.requireNonNull(serviceContext, "serviceContext"); this.ksqlConfig = Objects.requireNonNull(ksqlConfig, "ksqlConfig"); this.ksqlEngine = Objects.requireNonNull(ksqlEngine, "ksqlEngine"); - this.schemaInjector = Objects.requireNonNull(schemaInjector, "schemaInjector"); + this.schemaInjectorFactory = Objects + .requireNonNull(schemaInjectorFactory, "schemaInjectorFactory"); + this.topicInjectorFactory = Objects + .requireNonNull(topicInjectorFactory, "topicInjectorFactory"); } public ServiceContext getServiceContext() { @@ -110,12 +121,25 @@ public List sql(final String sql, final Map overr final List statements = ksqlEngine.parse(sql); final KsqlExecutionContext sandbox = ksqlEngine.createSandbox(); + final SchemaInjector sandboxSchemaInjector = schemaInjectorFactory + .apply(sandbox.getServiceContext()); + final TopicInjector sandboxTopicInjector = topicInjectorFactory.apply(sandbox); + + for (ParsedStatement stmt : statements) { + execute( + sandbox, + stmt, + ksqlConfig, + overriddenProperties, + sandboxSchemaInjector, + sandboxTopicInjector); + } - statements.forEach(stmt -> execute(sandbox, stmt, ksqlConfig, overriddenProperties)); - + final SchemaInjector schemaInjector = schemaInjectorFactory.apply(serviceContext); + final TopicInjector topicInjector = topicInjectorFactory.apply(ksqlEngine); final List queries = new ArrayList<>(); for (final ParsedStatement parsed : statements) { - execute(ksqlEngine, parsed, ksqlConfig, overriddenProperties) + execute(ksqlEngine, parsed, ksqlConfig, overriddenProperties, schemaInjector, topicInjector) .getQuery() .ifPresent(queries::add); } @@ -157,10 +181,13 @@ private ExecuteResult execute( final KsqlExecutionContext executionContext, final ParsedStatement stmt, final KsqlConfig ksqlConfig, - final Map overriddenProperties - ) { + final Map overriddenProperties, + final SchemaInjector schemaInjector, + final TopicInjector topicInjector) { final PreparedStatement prepared = executionContext.prepare(stmt); final PreparedStatement withSchema = schemaInjector.forStatement(prepared); - return executionContext.execute(withSchema, ksqlConfig, overriddenProperties); + final PreparedStatement withInferredTopic = + topicInjector.forStatement(withSchema, ksqlConfig, overriddenProperties); + return executionContext.execute(withInferredTopic, ksqlConfig, overriddenProperties); } } diff --git a/ksql-engine/src/main/java/io/confluent/ksql/KsqlExecutionContext.java b/ksql-engine/src/main/java/io/confluent/ksql/KsqlExecutionContext.java index a23c5aa96a6e..3dc422713fcf 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/KsqlExecutionContext.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/KsqlExecutionContext.java @@ -20,6 +20,7 @@ import io.confluent.ksql.parser.KsqlParser.ParsedStatement; import io.confluent.ksql.parser.KsqlParser.PreparedStatement; import io.confluent.ksql.query.QueryId; +import io.confluent.ksql.services.ServiceContext; import io.confluent.ksql.util.KsqlConfig; import io.confluent.ksql.util.PersistentQueryMetadata; import io.confluent.ksql.util.QueryMetadata; @@ -46,6 +47,11 @@ public interface KsqlExecutionContext { */ MetaStore getMetaStore(); + /** + * @return the service context used for this execution context + */ + ServiceContext getServiceContext(); + /** * Retrieve the details of a persistent query. * diff --git a/ksql-engine/src/main/java/io/confluent/ksql/analyzer/Analyzer.java b/ksql-engine/src/main/java/io/confluent/ksql/analyzer/Analyzer.java index 10c6e3a1e9b1..f52dac3789b3 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/analyzer/Analyzer.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/analyzer/Analyzer.java @@ -57,6 +57,7 @@ import io.confluent.ksql.util.Pair; import io.confluent.ksql.util.SchemaUtil; import io.confluent.ksql.util.StringUtil; +import io.confluent.ksql.util.WithClauseUtil; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -136,34 +137,22 @@ private void setIntoProperties(final Sink sink) { } if (sink.getProperties().get(KsqlConstants.SINK_NUMBER_OF_PARTITIONS) != null) { - try { - final int numberOfPartitions = Integer.parseInt( - sink.getProperties().get(KsqlConstants.SINK_NUMBER_OF_PARTITIONS).toString() - ); - analysis.getIntoProperties().put( - KsqlConfig.SINK_NUMBER_OF_PARTITIONS_PROPERTY, - numberOfPartitions - ); + final int numberOfPartitions = + WithClauseUtil.parsePartitions( + sink.getProperties().get(KsqlConstants.SINK_NUMBER_OF_PARTITIONS)); - } catch (final NumberFormatException e) { - throw new KsqlException( - "Invalid number of partitions in WITH clause: " - + sink.getProperties().get(KsqlConstants.SINK_NUMBER_OF_PARTITIONS).toString()); - } + analysis.getIntoProperties().put( + KsqlConfig.SINK_NUMBER_OF_PARTITIONS_PROPERTY, + numberOfPartitions + ); } if (sink.getProperties().get(KsqlConstants.SINK_NUMBER_OF_REPLICAS) != null) { - try { - final short numberOfReplications = - Short.parseShort( - sink.getProperties().get(KsqlConstants.SINK_NUMBER_OF_REPLICAS).toString() - ); - analysis.getIntoProperties() - .put(KsqlConfig.SINK_NUMBER_OF_REPLICAS_PROPERTY, numberOfReplications); - } catch (final NumberFormatException e) { - throw new KsqlException("Invalid number of replications in WITH clause: " + sink - .getProperties().get(KsqlConstants.SINK_NUMBER_OF_REPLICAS).toString()); - } + final short numberOfReplications = + WithClauseUtil.parseReplicas( + sink.getProperties().get(KsqlConstants.SINK_NUMBER_OF_REPLICAS)); + analysis.getIntoProperties() + .put(KsqlConfig.SINK_NUMBER_OF_REPLICAS_PROPERTY, numberOfReplications); } } diff --git a/ksql-engine/src/main/java/io/confluent/ksql/engine/KsqlEngine.java b/ksql-engine/src/main/java/io/confluent/ksql/engine/KsqlEngine.java index 8b10632f8637..b0586212e555 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/engine/KsqlEngine.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/engine/KsqlEngine.java @@ -131,6 +131,11 @@ public MetaStore getMetaStore() { return primaryContext.getMetaStore(); } + @Override + public ServiceContext getServiceContext() { + return serviceContext; + } + public DdlCommandExec getDdlCommandExec() { return primaryContext.getDdlCommandExec(); } diff --git a/ksql-engine/src/main/java/io/confluent/ksql/engine/SandboxedExecutionContext.java b/ksql-engine/src/main/java/io/confluent/ksql/engine/SandboxedExecutionContext.java index ac581531147c..9d7c38f37a53 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/engine/SandboxedExecutionContext.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/engine/SandboxedExecutionContext.java @@ -21,6 +21,7 @@ import io.confluent.ksql.parser.KsqlParser.ParsedStatement; import io.confluent.ksql.parser.KsqlParser.PreparedStatement; import io.confluent.ksql.query.QueryId; +import io.confluent.ksql.services.ServiceContext; import io.confluent.ksql.util.KsqlConfig; import io.confluent.ksql.util.PersistentQueryMetadata; import java.util.List; @@ -44,6 +45,11 @@ public MetaStore getMetaStore() { return engineContext.getMetaStore(); } + @Override + public ServiceContext getServiceContext() { + return engineContext.getServiceContext(); + } + @Override public KsqlExecutionContext createSandbox() { return new SandboxedExecutionContext(engineContext); diff --git a/ksql-engine/src/main/java/io/confluent/ksql/planner/plan/KsqlStructuredDataOutputNode.java b/ksql-engine/src/main/java/io/confluent/ksql/planner/plan/KsqlStructuredDataOutputNode.java index 82954e957a44..951877b7a0ba 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/planner/plan/KsqlStructuredDataOutputNode.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/planner/plan/KsqlStructuredDataOutputNode.java @@ -31,6 +31,7 @@ import io.confluent.ksql.structured.QueryContext; import io.confluent.ksql.structured.SchemaKStream; import io.confluent.ksql.structured.SchemaKTable; +import io.confluent.ksql.topic.TopicProperties; import io.confluent.ksql.util.KsqlConfig; import io.confluent.ksql.util.KsqlException; import io.confluent.ksql.util.QueryIdGenerator; @@ -42,6 +43,7 @@ import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.function.Supplier; import org.apache.kafka.clients.admin.TopicDescription; import org.apache.kafka.common.config.TopicConfig; import org.apache.kafka.connect.data.Field; @@ -141,19 +143,22 @@ public SchemaKStream buildStream( final KsqlStructuredDataOutputNode noRowKey = outputNodeBuilder.build(); if (doCreateInto) { - final SourceTopicProperties sourceTopicProperties = getSourceTopicProperties( - getTheSourceNode().getStructuredDataSource().getKsqlTopic().getKafkaTopicName(), - outputProperties, - serviceContext.getTopicClient(), - ksqlConfig - ); + final Supplier sourceTopicDescription = () -> + getSourceTopicPropertiesFromKafka( + getTheSourceNode().getStructuredDataSource().getKsqlTopic().getKafkaTopicName(), + serviceContext.getTopicClient()); + createSinkTopic( - noRowKey.getKafkaTopicName(), serviceContext.getTopicClient(), shouldBeCompacted(result), - sourceTopicProperties.partitions, - sourceTopicProperties.replicas); + new TopicProperties.Builder() + .withName(noRowKey.getKafkaTopicName()) + .withOverrides(outputProperties) + .withKsqlConfig(ksqlConfig) + .withSource(sourceTopicDescription) + .build()); } + result.into( noRowKey.getKafkaTopicName(), noRowKey.getKsqlTopic().getKsqlTopicSerDe() @@ -236,69 +241,22 @@ private void addAvroSchemaToResultTopic(final Builder builder) { } private static void createSinkTopic( - final String kafkaTopicName, final KafkaTopicClient kafkaTopicClient, final boolean isCompacted, - final int numberOfPartitions, - final short numberOfReplications + final TopicProperties topicProperties ) { final Map config = isCompacted ? ImmutableMap.of(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT) : Collections.emptyMap(); - kafkaTopicClient.createTopic(kafkaTopicName, - numberOfPartitions, - numberOfReplications, + kafkaTopicClient.createTopic( + topicProperties.getTopicName(), + topicProperties.getPartitions(), + topicProperties.getReplicas(), config ); } - private static SourceTopicProperties getSourceTopicProperties( - final String kafkaTopicName, - final Map sinkProperties, - final KafkaTopicClient kafkaTopicClient, - final KsqlConfig ksqlConfig - ) { - final Map ksqlProperties = ksqlConfig.values(); - if (ksqlProperties.get(KsqlConfig.SINK_NUMBER_OF_PARTITIONS_PROPERTY) != null - || ksqlProperties.get(KsqlConfig.SINK_NUMBER_OF_REPLICAS_PROPERTY) != null) { - return getSinkTopicPropertiesLegacyWay(sinkProperties, ksqlConfig); - } - // Don't request topic properties from Kafka if both are set in WITH clause. - if (sinkProperties.get(KsqlConfig.SINK_NUMBER_OF_PARTITIONS_PROPERTY) != null - && sinkProperties.get(KsqlConfig.SINK_NUMBER_OF_REPLICAS_PROPERTY) != null) { - return new SourceTopicProperties( - (Integer) sinkProperties.get(KsqlConfig.SINK_NUMBER_OF_PARTITIONS_PROPERTY), - (Short) sinkProperties.get(KsqlConfig.SINK_NUMBER_OF_REPLICAS_PROPERTY) - ); - } - final TopicDescription topicDescription = getSourceTopicPropertiesFromKafka( - kafkaTopicName, - kafkaTopicClient); - - final int partitions = (Integer) sinkProperties.getOrDefault( - KsqlConfig.SINK_NUMBER_OF_PARTITIONS_PROPERTY, - topicDescription.partitions().size()); - final short replicas = (Short) sinkProperties.getOrDefault( - KsqlConfig.SINK_NUMBER_OF_REPLICAS_PROPERTY, - (short) topicDescription.partitions().get(0).replicas().size()); - - return new SourceTopicProperties(partitions, replicas); - } - - private static SourceTopicProperties getSinkTopicPropertiesLegacyWay( - final Map sinkProperties, - final KsqlConfig ksqlConfig - ) { - final int partitions = (Integer) sinkProperties.getOrDefault( - KsqlConfig.SINK_NUMBER_OF_PARTITIONS_PROPERTY, - ksqlConfig.getInt(KsqlConfig.SINK_NUMBER_OF_PARTITIONS_PROPERTY)); - final short replicas = (Short) sinkProperties.getOrDefault( - KsqlConfig.SINK_NUMBER_OF_REPLICAS_PROPERTY, - ksqlConfig.getShort(KsqlConfig.SINK_NUMBER_OF_REPLICAS_PROPERTY)); - return new SourceTopicProperties(partitions, replicas); - } - private static TopicDescription getSourceTopicPropertiesFromKafka( final String kafkaTopicName, final KafkaTopicClient kafkaTopicClient @@ -318,17 +276,6 @@ private KsqlTopicSerDe getTopicSerde() { return ksqlTopic.getKsqlTopicSerDe(); } - private static class SourceTopicProperties { - - private final int partitions; - private final short replicas; - - SourceTopicProperties(final int partitions, final short replicas) { - this.partitions = partitions; - this.replicas = replicas; - } - } - public static class Builder { private PlanNodeId id; diff --git a/ksql-engine/src/main/java/io/confluent/ksql/topic/DefaultTopicInjector.java b/ksql-engine/src/main/java/io/confluent/ksql/topic/DefaultTopicInjector.java new file mode 100644 index 000000000000..6cc56aacc521 --- /dev/null +++ b/ksql-engine/src/main/java/io/confluent/ksql/topic/DefaultTopicInjector.java @@ -0,0 +1,135 @@ +/* + * Copyright 2019 Confluent Inc. + * + * Licensed under the Confluent Community License; you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.topic; + +import com.google.common.annotations.VisibleForTesting; +import io.confluent.ksql.KsqlExecutionContext; +import io.confluent.ksql.ddl.DdlConfig; +import io.confluent.ksql.metastore.MetaStore; +import io.confluent.ksql.metastore.model.StructuredDataSource; +import io.confluent.ksql.parser.DefaultTraversalVisitor; +import io.confluent.ksql.parser.KsqlParser.PreparedStatement; +import io.confluent.ksql.parser.SqlFormatter; +import io.confluent.ksql.parser.tree.AliasedRelation; +import io.confluent.ksql.parser.tree.CreateAsSelect; +import io.confluent.ksql.parser.tree.Expression; +import io.confluent.ksql.parser.tree.IntegerLiteral; +import io.confluent.ksql.parser.tree.Join; +import io.confluent.ksql.parser.tree.Node; +import io.confluent.ksql.parser.tree.Statement; +import io.confluent.ksql.parser.tree.StringLiteral; +import io.confluent.ksql.parser.tree.Table; +import io.confluent.ksql.services.KafkaTopicClient; +import io.confluent.ksql.util.KsqlConfig; +import io.confluent.ksql.util.KsqlConstants; +import io.confluent.ksql.util.KsqlException; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; +import org.apache.kafka.clients.admin.TopicDescription; + +public class DefaultTopicInjector implements TopicInjector { + + private final KafkaTopicClient topicClient; + private final MetaStore metaStore; + + public DefaultTopicInjector( + final KsqlExecutionContext executionContext + ) { + this(executionContext.getServiceContext().getTopicClient(), executionContext.getMetaStore()); + } + + DefaultTopicInjector( + final KafkaTopicClient topicClient, + final MetaStore metaStore) { + this.topicClient = Objects.requireNonNull(topicClient, "topicClient"); + this.metaStore = Objects.requireNonNull(metaStore, "metaStore"); + } + + @Override + public PreparedStatement forStatement( + final PreparedStatement statement, + final KsqlConfig ksqlConfig, + final Map propertyOverrides) { + return forStatement(statement, ksqlConfig, propertyOverrides, new TopicProperties.Builder()); + } + + @SuppressWarnings("unchecked") + @VisibleForTesting + PreparedStatement forStatement( + final PreparedStatement statement, + final KsqlConfig ksqlConfig, + final Map propertyOverrides, + final TopicProperties.Builder topicPropertiesBuilder) { + if (!(statement.getStatement() instanceof CreateAsSelect)) { + return statement; + } + + final PreparedStatement cas = + (PreparedStatement) statement; + + final TopicProperties info = topicPropertiesBuilder + .withName(ksqlConfig.getString(KsqlConfig.KSQL_OUTPUT_TOPIC_NAME_PREFIX_CONFIG) + + cas.getStatement().getName().getSuffix()) + .withWithClause(cas.getStatement().getProperties()) + .withOverrides(propertyOverrides) + .withKsqlConfig(ksqlConfig) + .withSource(() -> describeSource(topicClient, cas)) + .build(); + + final Map props = new HashMap<>(cas.getStatement().getProperties()); + props.put(DdlConfig.KAFKA_TOPIC_NAME_PROPERTY, new StringLiteral(info.getTopicName())); + props.put(KsqlConstants.SINK_NUMBER_OF_REPLICAS, new IntegerLiteral(info.getReplicas())); + props.put(KsqlConstants.SINK_NUMBER_OF_PARTITIONS, new IntegerLiteral(info.getPartitions())); + + final CreateAsSelect withTopic = cas.getStatement().copyWith(props); + final String withTopicText = SqlFormatter.formatSql(withTopic) + ";"; + + return (PreparedStatement) PreparedStatement.of(withTopicText, withTopic); + } + + private TopicDescription describeSource( + final KafkaTopicClient topicClient, + final PreparedStatement cas + ) { + final SourceTopicExtractor extractor = new SourceTopicExtractor(); + extractor.process(cas.getStatement().getQuery(), null); + final String kafkaTopicName = extractor.primaryKafkaTopicName; + return topicClient.describeTopic(kafkaTopicName); + } + + private final class SourceTopicExtractor extends DefaultTraversalVisitor { + + private String primaryKafkaTopicName = null; + + @Override + protected Node visitJoin(final Join node, final Void context) { + process(node.getLeft(), context); + return null; + } + + @Override + protected Node visitAliasedRelation(final AliasedRelation node, final Void context) { + final String structuredDataSourceName = ((Table) node.getRelation()).getName().getSuffix(); + final StructuredDataSource source = metaStore.getSource(structuredDataSourceName); + if (source == null) { + throw new KsqlException(structuredDataSourceName + " does not exist."); + } + + primaryKafkaTopicName = source.getKsqlTopic().getKafkaTopicName(); + return node; + } + } +} diff --git a/ksql-engine/src/main/java/io/confluent/ksql/topic/TopicInjector.java b/ksql-engine/src/main/java/io/confluent/ksql/topic/TopicInjector.java new file mode 100644 index 000000000000..ea04ea66f066 --- /dev/null +++ b/ksql-engine/src/main/java/io/confluent/ksql/topic/TopicInjector.java @@ -0,0 +1,49 @@ +/* + * Copyright 2019 Confluent Inc. + * + * Licensed under the Confluent Community License; you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.topic; + +import io.confluent.ksql.parser.KsqlParser.PreparedStatement; +import io.confluent.ksql.parser.tree.Statement; +import io.confluent.ksql.util.KsqlConfig; +import java.util.Map; + +/** + * Injects the topic into the WITH clause for statements that have + * incomplete topic property information. + */ +public interface TopicInjector { + + /** + * Attempt to inject topic name, number of partitions and number of replicas into the topic + * properties of the supplied {@code statement}. + + *

If a statement that is not {@code CreateAsSelect} is passed in, this results in a + * no-op that returns the incoming statement.

+ * + * @see TopicProperties.Builder + * + * @param statement the statement to inject the topic properties into + * @param ksqlConfig the default configurations for the service + * @param propertyOverrides the overrides for this statement + * @param the type of statement, will do nothing unless + * {@code } + * + * @return a statement that has the kafka topic properties injected + */ + PreparedStatement forStatement( + PreparedStatement statement, + KsqlConfig ksqlConfig, + Map propertyOverrides); +} diff --git a/ksql-engine/src/main/java/io/confluent/ksql/topic/TopicProperties.java b/ksql-engine/src/main/java/io/confluent/ksql/topic/TopicProperties.java new file mode 100644 index 000000000000..e5c3bee4e0cb --- /dev/null +++ b/ksql-engine/src/main/java/io/confluent/ksql/topic/TopicProperties.java @@ -0,0 +1,194 @@ +/* + * Copyright 2019 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"; you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.topic; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Suppliers; +import io.confluent.ksql.ddl.DdlConfig; +import io.confluent.ksql.parser.tree.Expression; +import io.confluent.ksql.util.KsqlConfig; +import io.confluent.ksql.util.KsqlConstants; +import io.confluent.ksql.util.KsqlException; +import io.confluent.ksql.util.WithClauseUtil; +import java.util.Map; +import java.util.Objects; +import java.util.function.Supplier; +import java.util.stream.Stream; +import org.apache.commons.lang3.ObjectUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.kafka.clients.admin.TopicDescription; + +/** + * A container for all properties required for creating/validating + * a kafka topic. + */ +public final class TopicProperties { + + private static final String INVALID_TOPIC_NAME = ":INVALID:"; + private static final int INVALID_PARTITIONS = -1; + private static final short INVALID_REPLICAS = -1; + + private final String topicName; + private final Integer partitions; + private final Short replicas; + + @VisibleForTesting + TopicProperties( + final String topicName, + final Integer partitions, + final Short replicas + ) { + this.topicName = topicName; + this.partitions = partitions; + this.replicas = replicas; + } + + @Override + public String toString() { + return "TopicProperties{" + "topicName='" + getTopicName() + '\'' + + ", partitions=" + getPartitions() + + ", replicas=" + getReplicas() + + '}'; + } + + public String getTopicName() { + return topicName == null ? INVALID_TOPIC_NAME : topicName; + } + + public int getPartitions() { + return partitions == null ? INVALID_PARTITIONS : partitions; + } + + public short getReplicas() { + return replicas == null ? INVALID_REPLICAS : replicas; + } + + /** + * Constructs a {@link TopicProperties} with the following precedence order: + * + *
    + *
  • The statement itself, if it has a WITH clause
  • + *
  • The overrides, if present (note that this is a legacy approach)
  • + *
  • The KsqlConfig property, if present (note that this is a legacy approach)
  • + *
  • The topic properties from the source that it is reading from. If the source is a join, + * then the left value is used as the source.
  • + *
  • Generated based on some recipe - this is the case for topic name, which will never + * use the source topic
  • + *
+ * + *

It is possible that only partial information exists at higher levels of precedence. If + * this is the case, the values will be inferred in cascading fashion (e.g. topic name from + * WITH clause, replicas from property overrides and partitions source topic).

+ */ + public static final class Builder { + + private String name; + private TopicProperties fromWithClause = new TopicProperties(null, null, null); + private TopicProperties fromOverrides = new TopicProperties(null, null, null); + private TopicProperties fromKsqlConfig = new TopicProperties(null, null, null); + private Supplier fromSource = () -> new TopicProperties(null, null, null); + + public Builder withName(final String name) { + this.name = name; + return this; + } + + public Builder withWithClause(final Map withClause) { + final Expression nameExpression = withClause.get(DdlConfig.KAFKA_TOPIC_NAME_PROPERTY); + final String name = nameExpression == null + ? null + : StringUtils.strip(nameExpression.toString(), "'"); + + final Integer partitions = + WithClauseUtil.parsePartitions(withClause.get(KsqlConstants.SINK_NUMBER_OF_PARTITIONS)); + final Short replicas = + WithClauseUtil.parseReplicas(withClause.get(KsqlConstants.SINK_NUMBER_OF_REPLICAS)); + + fromWithClause = new TopicProperties(name, partitions, replicas); + return this; + } + + public Builder withOverrides(final Map overrides) { + final Integer partitions = (Integer) overrides.get( + KsqlConfig.SINK_NUMBER_OF_PARTITIONS_PROPERTY); + final Short replicas = (Short) overrides.get( + KsqlConfig.SINK_NUMBER_OF_REPLICAS_PROPERTY); + + fromOverrides = new TopicProperties(null, partitions, replicas); + return this; + } + + public Builder withKsqlConfig(final KsqlConfig config) { + // requires check for containsKey because `getInt` will return 0 otherwise + Integer partitions = null; + if (config.values().containsKey(KsqlConfig.SINK_NUMBER_OF_PARTITIONS_PROPERTY)) { + partitions = config.getInt(KsqlConfig.SINK_NUMBER_OF_PARTITIONS_PROPERTY); + } + + // requires check for containsKey because `getShort` will return 0 otherwise + Short replicas = null; + if (config.values().containsKey(KsqlConfig.SINK_NUMBER_OF_REPLICAS_PROPERTY)) { + replicas = config.getShort(KsqlConfig.SINK_NUMBER_OF_REPLICAS_PROPERTY); + } + + fromKsqlConfig = new TopicProperties(null, partitions, replicas); + return this; + } + + public Builder withSource(final Supplier descriptionSupplier) { + fromSource = Suppliers.memoize(() -> { + final TopicDescription description = descriptionSupplier.get(); + final Integer partitions = description.partitions().size(); + final Short replicas = (short) description.partitions().get(0).replicas().size(); + + return new TopicProperties(null, partitions, replicas); + }); + return this; + } + + public TopicProperties build() { + // this method should use the field directly instead of accessors to force null checks + + final String name = ObjectUtils.firstNonNull(fromWithClause.topicName, this.name); + Objects.requireNonNull(name, "Was not supplied with any valid source for topic name!"); + if (StringUtils.strip(name).isEmpty()) { + throw new KsqlException("Must have non-empty topic name."); + } + + final Integer partitions = Stream.of( + fromWithClause.partitions, + fromOverrides.partitions, + fromKsqlConfig.partitions) + .filter(Objects::nonNull) + .findFirst() + .orElseGet(() -> fromSource.get().partitions); + Objects.requireNonNull(partitions, "Was not supplied with any valid source for partitions!"); + + final Short replicas = Stream.of( + fromWithClause.replicas, + fromOverrides.replicas, + fromKsqlConfig.replicas) + .filter(Objects::nonNull) + .findFirst() + .orElseGet(() -> fromSource.get().replicas); + Objects.requireNonNull(replicas, "Was not supplied with any valid source for replicas!"); + + return new TopicProperties(name, partitions, replicas); + } + + } + +} diff --git a/ksql-engine/src/test/java/io/confluent/ksql/KsqlContextTest.java b/ksql-engine/src/test/java/io/confluent/ksql/KsqlContextTest.java index 22740d3ba0ec..0922904eda06 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/KsqlContextTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/KsqlContextTest.java @@ -17,9 +17,11 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.atLeastOnce; import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -32,12 +34,14 @@ import io.confluent.ksql.parser.SqlBaseParser.SingleStatementContext; import io.confluent.ksql.parser.tree.Statement; import io.confluent.ksql.schema.inference.SchemaInjector; +import io.confluent.ksql.topic.TopicInjector; import io.confluent.ksql.services.ServiceContext; import io.confluent.ksql.util.KsqlConfig; import io.confluent.ksql.util.KsqlException; import io.confluent.ksql.util.PersistentQueryMetadata; import io.confluent.ksql.util.QueuedQueryMetadata; import java.util.Collections; +import java.util.function.Function; import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -46,6 +50,7 @@ import org.mockito.InOrder; import org.mockito.Mock; import org.mockito.junit.MockitoJUnitRunner; +import org.mockito.verification.VerificationMode; @RunWith(MockitoJUnitRunner.class) public class KsqlContextTest { @@ -72,6 +77,12 @@ public class KsqlContextTest { private final static PreparedStatement STMT_1_WITH_SCHEMA = PreparedStatement .of("sql 1", mock(Statement.class)); + private final static PreparedStatement STMT_0_WITH_TOPIC = PreparedStatement + .of("sql 0", mock(Statement.class)); + + private final static PreparedStatement STMT_1_WITH_TOPIC = PreparedStatement + .of("sql 1", mock(Statement.class)); + @Rule public final ExpectedException expectedException = ExpectedException.none(); @@ -87,14 +98,16 @@ public class KsqlContextTest { private QueuedQueryMetadata transientQuery; @Mock private SchemaInjector schemaInjector; + @Mock + private Function topicInjectorFactory; + @Mock + private TopicInjector topicInjector; private KsqlContext ksqlContext; @SuppressWarnings("unchecked") @Before public void setUp() { - ksqlContext = new KsqlContext(serviceContext, SOME_CONFIG, ksqlEngine, schemaInjector); - when(ksqlEngine.parse(any())).thenReturn(ImmutableList.of(PARSED_STMT_0)); when(ksqlEngine.prepare(PARSED_STMT_0)).thenReturn((PreparedStatement) PREPARED_STMT_0); @@ -111,6 +124,14 @@ public void setUp() { .thenReturn((PreparedStatement) STMT_0_WITH_SCHEMA); when(schemaInjector.forStatement(PREPARED_STMT_1)) .thenReturn((PreparedStatement) STMT_1_WITH_SCHEMA); + + when(topicInjectorFactory.apply(any())).thenReturn(topicInjector); + when(topicInjector.forStatement(any(), any(), any())) + .thenAnswer(inv -> inv.getArgument(0)); + + ksqlContext = new KsqlContext( + serviceContext, SOME_CONFIG, ksqlEngine, sc -> schemaInjector, topicInjectorFactory); + } @Test @@ -278,4 +299,60 @@ public void shouldThrowIfFailedToInferSchema() { // When: ksqlContext.sql("Some SQL", SOME_PROPERTIES); } + + @SuppressWarnings("unchecked") + @Test + public void shouldInferTopic() { + // Given: + when(topicInjector.forStatement(any(), any(), any())) + .thenReturn((PreparedStatement) STMT_0_WITH_TOPIC); + + // When: + ksqlContext.sql("Some SQL", SOME_PROPERTIES); + + // Then: + verify(ksqlEngine).execute(eq(STMT_0_WITH_TOPIC), any(), any()); + } + + @Test + public void shouldInferTopicWithValidArgs() { + // Given: + when(schemaInjector.forStatement(any())).thenAnswer(inv -> inv.getArgument(0)); + + // When: + ksqlContext.sql("Some SQL", SOME_PROPERTIES); + + // Then: + verify(topicInjector, times(2) /* once to validate, once to execute */) + .forStatement(PREPARED_STMT_0, SOME_CONFIG, SOME_PROPERTIES); + } + + @Test + public void shouldThrowIfFailedToInferTopic() { + // Given: + when(topicInjector.forStatement(any(), any(), any())) + .thenThrow(new RuntimeException("Boom")); + + // Then: + expectedException.expect(RuntimeException.class); + expectedException.expectMessage("Boom"); + + // When: + ksqlContext.sql("Some SQL", SOME_PROPERTIES); + } + + @SuppressWarnings("unchecked") + @Test + public void shouldInferTopicAfterInferringSchema() { + // Given: + when(schemaInjector.forStatement(any())).thenReturn((PreparedStatement) STMT_1_WITH_SCHEMA); + when(topicInjector.forStatement(eq(STMT_1_WITH_SCHEMA), any(), any())) + .thenReturn((PreparedStatement) STMT_1_WITH_TOPIC); + + // When: + ksqlContext.sql("Some SQL", SOME_PROPERTIES); + + // Then: + verify(ksqlEngine).execute(eq(STMT_1_WITH_TOPIC), any(), any()); + } } diff --git a/ksql-engine/src/test/java/io/confluent/ksql/KsqlContextTestUtil.java b/ksql-engine/src/test/java/io/confluent/ksql/KsqlContextTestUtil.java index 9df50e1bb3bb..29dde17bbc2d 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/KsqlContextTestUtil.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/KsqlContextTestUtil.java @@ -20,6 +20,7 @@ import io.confluent.ksql.function.FunctionRegistry; import io.confluent.ksql.logging.processing.ProcessingLogContext; import io.confluent.ksql.schema.inference.DefaultSchemaInjector; +import io.confluent.ksql.topic.DefaultTopicInjector; import io.confluent.ksql.schema.inference.SchemaRegistryTopicSchemaSupplier; import io.confluent.ksql.services.KafkaTopicClient; import io.confluent.ksql.services.KafkaTopicClientImpl; @@ -70,8 +71,8 @@ public static KsqlContext create( final DefaultSchemaInjector schemaInjector = new DefaultSchemaInjector( new SchemaRegistryTopicSchemaSupplier(serviceContext.getSchemaRegistryClient())); - - return new KsqlContext(serviceContext, ksqlConfig, engine, schemaInjector); + return new KsqlContext( + serviceContext, ksqlConfig, engine, sc -> schemaInjector, DefaultTopicInjector::new); } public static KsqlConfig createKsqlConfig(final EmbeddedSingleNodeKafkaCluster kafkaCluster) { diff --git a/ksql-engine/src/test/java/io/confluent/ksql/planner/plan/KsqlStructuredDataOutputNodeTest.java b/ksql-engine/src/test/java/io/confluent/ksql/planner/plan/KsqlStructuredDataOutputNodeTest.java index d28c297ac99e..8118fd48064b 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/planner/plan/KsqlStructuredDataOutputNodeTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/planner/plan/KsqlStructuredDataOutputNodeTest.java @@ -276,6 +276,8 @@ public void shouldCreateSinkWithCorrectCleanupPolicyNonWindowedTable() { public void shouldCreateSinkWithCorrectCleanupPolicyWindowedTable() { // Given: reset(mockTopicClient); + when(mockTopicClient.describeTopic(any())).thenReturn(topicDescription); + outputNode = getKsqlStructuredDataOutputNodeForTable( () -> WindowedSerdes.timeWindowedSerdeFrom(String.class)); diff --git a/ksql-engine/src/test/java/io/confluent/ksql/topic/DefaultTopicInjectorTest.java b/ksql-engine/src/test/java/io/confluent/ksql/topic/DefaultTopicInjectorTest.java new file mode 100644 index 000000000000..82f4dbb01742 --- /dev/null +++ b/ksql-engine/src/test/java/io/confluent/ksql/topic/DefaultTopicInjectorTest.java @@ -0,0 +1,273 @@ +/* + * Copyright 2019 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"; you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.topic; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.CoreMatchers.sameInstance; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.hasEntry; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static org.mockito.hamcrest.MockitoHamcrest.argThat; + +import io.confluent.ksql.ddl.DdlConfig; +import io.confluent.ksql.function.InternalFunctionRegistry; +import io.confluent.ksql.metastore.MetaStoreImpl; +import io.confluent.ksql.metastore.MutableMetaStore; +import io.confluent.ksql.metastore.model.KsqlStream; +import io.confluent.ksql.metastore.model.KsqlTopic; +import io.confluent.ksql.parser.DefaultKsqlParser; +import io.confluent.ksql.parser.KsqlParser; +import io.confluent.ksql.parser.KsqlParser.PreparedStatement; +import io.confluent.ksql.parser.tree.CreateStreamAsSelect; +import io.confluent.ksql.parser.tree.IntegerLiteral; +import io.confluent.ksql.parser.tree.StringLiteral; +import io.confluent.ksql.serde.json.KsqlJsonTopicSerDe; +import io.confluent.ksql.services.FakeKafkaTopicClient; +import io.confluent.ksql.services.KafkaTopicClient; +import io.confluent.ksql.util.KsqlConfig; +import io.confluent.ksql.util.KsqlConstants; +import io.confluent.ksql.util.timestamp.MetadataTimestampExtractionPolicy; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.function.Supplier; +import org.apache.kafka.clients.admin.TopicDescription; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaBuilder; +import org.hamcrest.Description; +import org.hamcrest.TypeSafeMatcher; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; + +@RunWith(MockitoJUnitRunner.class) +public class DefaultTopicInjectorTest { + + private static final Schema SCHEMA = SchemaBuilder + .struct() + .field("F1", Schema.OPTIONAL_STRING_SCHEMA) + .build(); + + @Mock public TopicProperties.Builder builder; + + private KsqlParser parser; + private MutableMetaStore metaStore; + private DefaultTopicInjector injector; + private Map overrides; + private PreparedStatement statement; + private KsqlConfig config; + private TopicDescription sourceDescription; + + @Before + public void setUp() { + parser = new DefaultKsqlParser(); + metaStore = new MetaStoreImpl(new InternalFunctionRegistry()); + overrides = new HashMap<>(); + config = new KsqlConfig(new HashMap<>()); + + final KafkaTopicClient topicClient = new FakeKafkaTopicClient(); + injector = new DefaultTopicInjector(topicClient, metaStore); + + topicClient.createTopic("source", 1, (short) 1); + sourceDescription = topicClient.describeTopic("source"); + + topicClient.createTopic("jSource", 2, (short) 2); + + final KsqlTopic sourceTopic = + new KsqlTopic("SOURCE", "source", new KsqlJsonTopicSerDe(), false); + final KsqlStream source = new KsqlStream<>( + "", + "SOURCE", + SCHEMA, + Optional.empty(), + new MetadataTimestampExtractionPolicy(), + sourceTopic, + Serdes::String); + metaStore.putSource(source); + + final KsqlTopic joinTopic = + new KsqlTopic("J_SOURCE", "jSource", new KsqlJsonTopicSerDe(), false); + final KsqlStream joinSource = new KsqlStream<>( + "", + "J_SOURCE", + SCHEMA, + Optional.empty(), + new MetadataTimestampExtractionPolicy(), + joinTopic, + Serdes::String); + metaStore.putSource(joinSource); + + when(builder.withName(any())).thenReturn(builder); + when(builder.withWithClause(any())).thenReturn(builder); + when(builder.withOverrides(any())).thenReturn(builder); + when(builder.withKsqlConfig(any())).thenReturn(builder); + when(builder.withSource(any())).thenReturn(builder); + when(builder.build()).thenReturn(new TopicProperties("name", 1, (short) 1)); + } + + @Test + public void shouldDoNothingForNonCAS() { + // Given: + final PreparedStatement statement = givenStatement("LIST PROPERTIES;"); + + // When: + final PreparedStatement result = injector.forStatement(statement, config, overrides); + + // Then: + assertThat(result, is(sameInstance(statement))); + } + + @Test + public void shouldGenerateName() { + // Given: + givenStatement("CREATE STREAM x AS SELECT * FROM SOURCE;"); + + // When: + injector.forStatement(statement, config, overrides, builder); + + // Then: + verify(builder).withName("X"); + } + + @Test + public void shouldPassThroughWithClauseToBuilder() { + // Given: + givenStatement("CREATE STREAM x WITH (kafka_topic='topic') AS SELECT * FROM SOURCE;"); + + // When: + injector.forStatement(statement, config, overrides, builder); + + // Then: + verify(builder).withWithClause(statement.getStatement().getProperties()); + } + + @Test + public void shouldPassThroughOverridesToBuilder() { + // Given: + givenStatement("CREATE STREAM x WITH (kafka_topic='topic') AS SELECT * FROM SOURCE;"); + + // When: + injector.forStatement(statement, config, overrides, builder); + + // Then: + verify(builder).withOverrides(overrides); + } + + @Test + public void shouldPassThroughConfigToBuilder() { + // Given: + givenStatement("CREATE STREAM x WITH (kafka_topic='topic') AS SELECT * FROM SOURCE;"); + + // When: + injector.forStatement(statement, config, overrides, builder); + + // Then: + verify(builder).withKsqlConfig(config); + } + + @Test + public void shouldIdentifyAndUseCorrectSource() { + // Given: + givenStatement("CREATE STREAM x WITH (kafka_topic='topic') AS SELECT * FROM SOURCE;"); + + // When: + injector.forStatement(statement, config, overrides, builder); + + // Then: + verify(builder).withSource(argThat(supplierThatGets(sourceDescription))); + } + + @Test + public void shouldIdentifyAndUseCorrectSourceInJoin() { + // Given: + givenStatement("CREATE STREAM x WITH (kafka_topic='topic') AS SELECT * FROM SOURCE " + + "JOIN J_SOURCE ON SOURCE.X = J_SOURCE.X;"); + + // When: + injector.forStatement(statement, config, overrides, builder); + + // Then: + verify(builder).withSource(argThat(supplierThatGets(sourceDescription))); + } + + @Test + public void shouldBuildWithClauseWithTopicProperties() { + // Given: + givenStatement("CREATE STREAM x WITH (kafka_topic='topic') AS SELECT * FROM SOURCE;"); + when(builder.build()).thenReturn(new TopicProperties("expectedName", 10, (short) 10)); + + // When: + final PreparedStatement result = + injector.forStatement(statement, config, overrides, builder); + + // Then: + assertThat(result.getStatement().getProperties(), + hasEntry(DdlConfig.KAFKA_TOPIC_NAME_PROPERTY, new StringLiteral("expectedName"))); + assertThat(result.getStatement().getProperties(), + hasEntry(KsqlConstants.SINK_NUMBER_OF_PARTITIONS, new IntegerLiteral(10))); + assertThat(result.getStatement().getProperties(), + hasEntry(KsqlConstants.SINK_NUMBER_OF_REPLICAS, new IntegerLiteral(10))); + } + + @Test + public void shouldUpdateStatementText() { + // Given: + givenStatement("CREATE STREAM x AS SELECT * FROM SOURCE;"); + + // When: + final PreparedStatement result = + injector.forStatement(statement, config, overrides, builder); + + // Then: + assertThat(result.getStatementText(), + equalTo( + "CREATE STREAM X WITH (REPLICAS = 1, PARTITIONS = 1, KAFKA_TOPIC = 'name') AS SELECT *" + + "\nFROM SOURCE SOURCE;")); + } + + @SuppressWarnings("unchecked") + private PreparedStatement givenStatement(final String sql) { + final PreparedStatement preparedStatement = + parser.prepare(parser.parse(sql).get(0), metaStore); + if (preparedStatement.getStatement() instanceof CreateStreamAsSelect) { + statement = (PreparedStatement) preparedStatement; + } + return preparedStatement; + } + + private static TypeSafeMatcher> supplierThatGets( + final TopicDescription topicDescription) { + return new TypeSafeMatcher>() { + @Override + protected boolean matchesSafely(final Supplier item) { + return item.get().equals(topicDescription); + } + + @Override + public void describeTo(final Description description) { + description.appendText(topicDescription.toString()); + } + }; + } + +} diff --git a/ksql-engine/src/test/java/io/confluent/ksql/topic/TopicPropertiesTest.java b/ksql-engine/src/test/java/io/confluent/ksql/topic/TopicPropertiesTest.java new file mode 100644 index 000000000000..a8072a855cf4 --- /dev/null +++ b/ksql-engine/src/test/java/io/confluent/ksql/topic/TopicPropertiesTest.java @@ -0,0 +1,473 @@ +/* + * Copyright 2019 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"; you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.topic; + +import static io.confluent.ksql.topic.TopicPropertiesTest.Inject.*; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; +import io.confluent.ksql.ddl.DdlConfig; +import io.confluent.ksql.parser.tree.Expression; +import io.confluent.ksql.parser.tree.IntegerLiteral; +import io.confluent.ksql.parser.tree.StringLiteral; +import io.confluent.ksql.util.KsqlConfig; +import io.confluent.ksql.util.KsqlConstants; +import io.confluent.ksql.util.KsqlException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Comparator; +import java.util.EnumSet; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.function.Supplier; +import java.util.stream.Collectors; +import org.apache.kafka.clients.admin.TopicDescription; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.TopicPartitionInfo; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.runners.Enclosed; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameter; +import org.junit.runners.Parameterized.Parameters; + +@RunWith(Enclosed.class) +public class TopicPropertiesTest { + + public static class Tests { + + public @Rule ExpectedException expectedException = ExpectedException.none(); + + final KsqlConfig config = new KsqlConfig(ImmutableMap.of( + KsqlConfig.SINK_NUMBER_OF_PARTITIONS_PROPERTY, 1, + KsqlConfig.SINK_NUMBER_OF_REPLICAS_PROPERTY, (short) 1 + )); + + @Test + public void shouldUseNameFromWithClause() { + // Given: + final Map withClause = ImmutableMap.of( + DdlConfig.KAFKA_TOPIC_NAME_PROPERTY, new StringLiteral("name") + ); + + // When: + final TopicProperties properties = new TopicProperties.Builder() + .withWithClause(withClause) + .withKsqlConfig(config) + .build(); + + // Then: + assertThat(properties.getTopicName(), equalTo("name")); + } + + @Test + public void shouldUseNameFromWithClauseWhenNameIsAlsoPresent() { + // Given: + final Map withClause = ImmutableMap.of( + DdlConfig.KAFKA_TOPIC_NAME_PROPERTY, new StringLiteral("name") + ); + + // When: + final TopicProperties properties = new TopicProperties.Builder() + .withName("oh no!") + .withWithClause(withClause) + .withKsqlConfig(config) + .build(); + + // Then: + assertThat(properties.getTopicName(), equalTo("name")); + } + + @Test + public void shouldUseNameIfNoWIthClause() { + // When: + final TopicProperties properties = new TopicProperties.Builder() + .withName("name") + .withKsqlConfig(config) + .build(); + + // Then: + assertThat(properties.getTopicName(), equalTo("name")); + } + + @Test + public void shouldFailIfNoNameSupplied() { + // Expect: + expectedException.expect(NullPointerException.class); + expectedException.expectMessage("Was not supplied with any valid source for topic name!"); + + // When: + new TopicProperties.Builder() + .withKsqlConfig(config) + .build(); + } + + @Test + public void shouldFailIfEmptyNameSupplied() { + // Expect: + expectedException.expect(KsqlException.class); + expectedException.expectMessage("Must have non-empty topic name."); + + // When: + new TopicProperties.Builder() + .withName("") + .withKsqlConfig(config) + .build(); + } + + @Test + public void shouldFailIfNoPartitionsSupplied() { + // Given: + final KsqlConfig config = new KsqlConfig(ImmutableMap.of( + KsqlConfig.SINK_NUMBER_OF_REPLICAS_PROPERTY, (short) 1 + )); + + // Expect: + expectedException.expect(NullPointerException.class); + expectedException.expectMessage("Was not supplied with any valid source for partitions!"); + + // When: + new TopicProperties.Builder() + .withName("name") + .withKsqlConfig(config) + .build(); + } + + @Test + public void shouldFailIfNoReplicasSupplied() { + // Given: + final KsqlConfig config = new KsqlConfig(ImmutableMap.of( + KsqlConfig.SINK_NUMBER_OF_PARTITIONS_PROPERTY, 1 + )); + + // Expect: + expectedException.expect(NullPointerException.class); + expectedException.expectMessage("Was not supplied with any valid source for replicas!"); + + // When: + new TopicProperties.Builder() + .withName("name") + .withKsqlConfig(config) + .build(); + } + + @Test + public void shouldNotMakeRemoteCallIfUnnecessary() { + // Given: + final Map withClause = ImmutableMap.of( + DdlConfig.KAFKA_TOPIC_NAME_PROPERTY, new StringLiteral("name"), + KsqlConstants.SINK_NUMBER_OF_PARTITIONS, new IntegerLiteral(1), + KsqlConstants.SINK_NUMBER_OF_REPLICAS, new IntegerLiteral(1) + ); + + // When: + final TopicProperties properties = new TopicProperties.Builder() + .withWithClause(withClause) + .withKsqlConfig(config) + .withSource(() -> { + throw new RuntimeException(); + }) + .build(); + + // Then: + assertThat(properties.getPartitions(), equalTo(1)); + assertThat(properties.getReplicas(), equalTo((short) 1)); + } + + @SuppressWarnings("unchecked") + @Test + public void shouldNotMakeMultipleRemoteCalls() { + // Given: + final Supplier source = mock(Supplier.class); + when(source.get()) + .thenReturn( + new TopicDescription( + "", + false, + ImmutableList.of( + new TopicPartitionInfo( + 0, + null, + ImmutableList.of(new Node(1, "", 1)), + ImmutableList.of())))) + .thenThrow(); + + // When: + final TopicProperties properties = new TopicProperties.Builder() + .withName("name") + .withSource(source) + .build(); + + // Then: + assertThat(properties.getPartitions(), equalTo(1)); + assertThat(properties.getReplicas(), equalTo((short) 1)); + } + + } + + @RunWith(Parameterized.class) + public static class PartitionsAndReplicasPrecedence { + + @Parameters(name = "given {0} -> expect({2} partitions, {3} replicas)") + public static Iterable data() { + final Object[][] data = new Object[][]{ + // THIS LIST WAS GENERATED BY RUNNING Inject#main + // + // Given: Overrides Expect: [Partitions, Replicas ] + {new Inject[]{WITH, OVERRIDES, KSQL_CONFIG }, WITH , WITH }, + {new Inject[]{WITH, OVERRIDES, KSQL_CONFIG_P }, WITH , WITH }, + {new Inject[]{WITH, OVERRIDES, KSQL_CONFIG_R }, WITH , WITH }, + {new Inject[]{WITH, OVERRIDES, NO_CONFIG }, WITH , WITH }, + {new Inject[]{WITH, OVERRIDES_P, KSQL_CONFIG }, WITH , WITH }, + {new Inject[]{WITH, OVERRIDES_P, KSQL_CONFIG_P }, WITH , WITH }, + {new Inject[]{WITH, OVERRIDES_P, KSQL_CONFIG_R }, WITH , WITH }, + {new Inject[]{WITH, OVERRIDES_P, NO_CONFIG }, WITH , WITH }, + {new Inject[]{WITH, OVERRIDES_R, KSQL_CONFIG }, WITH , WITH }, + {new Inject[]{WITH, OVERRIDES_R, KSQL_CONFIG_P }, WITH , WITH }, + {new Inject[]{WITH, OVERRIDES_R, KSQL_CONFIG_R }, WITH , WITH }, + {new Inject[]{WITH, OVERRIDES_R, NO_CONFIG }, WITH , WITH }, + {new Inject[]{WITH, NO_OVERRIDES, KSQL_CONFIG }, WITH , WITH }, + {new Inject[]{WITH, NO_OVERRIDES, KSQL_CONFIG_P }, WITH , WITH }, + {new Inject[]{WITH, NO_OVERRIDES, KSQL_CONFIG_R }, WITH , WITH }, + {new Inject[]{WITH, NO_OVERRIDES, NO_CONFIG }, WITH , WITH }, + {new Inject[]{WITH_P, OVERRIDES, KSQL_CONFIG }, WITH_P , OVERRIDES }, + {new Inject[]{WITH_P, OVERRIDES, KSQL_CONFIG_P }, WITH_P , OVERRIDES }, + {new Inject[]{WITH_P, OVERRIDES, KSQL_CONFIG_R }, WITH_P , OVERRIDES }, + {new Inject[]{WITH_P, OVERRIDES, NO_CONFIG }, WITH_P , OVERRIDES }, + {new Inject[]{WITH_P, OVERRIDES_P, KSQL_CONFIG }, WITH_P , KSQL_CONFIG }, + {new Inject[]{WITH_P, OVERRIDES_P, KSQL_CONFIG_P }, WITH_P , SOURCE }, + {new Inject[]{WITH_P, OVERRIDES_P, KSQL_CONFIG_R }, WITH_P , KSQL_CONFIG_R }, + {new Inject[]{WITH_P, OVERRIDES_P, NO_CONFIG }, WITH_P , SOURCE }, + {new Inject[]{WITH_P, OVERRIDES_R, KSQL_CONFIG }, WITH_P , OVERRIDES_R }, + {new Inject[]{WITH_P, OVERRIDES_R, KSQL_CONFIG_P }, WITH_P , OVERRIDES_R }, + {new Inject[]{WITH_P, OVERRIDES_R, KSQL_CONFIG_R }, WITH_P , OVERRIDES_R }, + {new Inject[]{WITH_P, OVERRIDES_R, NO_CONFIG }, WITH_P , OVERRIDES_R }, + {new Inject[]{WITH_P, NO_OVERRIDES, KSQL_CONFIG }, WITH_P , KSQL_CONFIG }, + {new Inject[]{WITH_P, NO_OVERRIDES, KSQL_CONFIG_P }, WITH_P , SOURCE }, + {new Inject[]{WITH_P, NO_OVERRIDES, KSQL_CONFIG_R }, WITH_P , KSQL_CONFIG_R }, + {new Inject[]{WITH_P, NO_OVERRIDES, NO_CONFIG }, WITH_P , SOURCE }, + {new Inject[]{WITH_R, OVERRIDES, KSQL_CONFIG }, OVERRIDES , WITH_R }, + {new Inject[]{WITH_R, OVERRIDES, KSQL_CONFIG_P }, OVERRIDES , WITH_R }, + {new Inject[]{WITH_R, OVERRIDES, KSQL_CONFIG_R }, OVERRIDES , WITH_R }, + {new Inject[]{WITH_R, OVERRIDES, NO_CONFIG }, OVERRIDES , WITH_R }, + {new Inject[]{WITH_R, OVERRIDES_P, KSQL_CONFIG }, OVERRIDES_P , WITH_R }, + {new Inject[]{WITH_R, OVERRIDES_P, KSQL_CONFIG_P }, OVERRIDES_P , WITH_R }, + {new Inject[]{WITH_R, OVERRIDES_P, KSQL_CONFIG_R }, OVERRIDES_P , WITH_R }, + {new Inject[]{WITH_R, OVERRIDES_P, NO_CONFIG }, OVERRIDES_P , WITH_R }, + {new Inject[]{WITH_R, OVERRIDES_R, KSQL_CONFIG }, KSQL_CONFIG , WITH_R }, + {new Inject[]{WITH_R, OVERRIDES_R, KSQL_CONFIG_P }, KSQL_CONFIG_P , WITH_R }, + {new Inject[]{WITH_R, OVERRIDES_R, KSQL_CONFIG_R }, SOURCE , WITH_R }, + {new Inject[]{WITH_R, OVERRIDES_R, NO_CONFIG }, SOURCE , WITH_R }, + {new Inject[]{WITH_R, NO_OVERRIDES, KSQL_CONFIG }, KSQL_CONFIG , WITH_R }, + {new Inject[]{WITH_R, NO_OVERRIDES, KSQL_CONFIG_P }, KSQL_CONFIG_P , WITH_R }, + {new Inject[]{WITH_R, NO_OVERRIDES, KSQL_CONFIG_R }, SOURCE , WITH_R }, + {new Inject[]{WITH_R, NO_OVERRIDES, NO_CONFIG }, SOURCE , WITH_R }, + {new Inject[]{NO_WITH, OVERRIDES, KSQL_CONFIG }, OVERRIDES , OVERRIDES }, + {new Inject[]{NO_WITH, OVERRIDES, KSQL_CONFIG_P }, OVERRIDES , OVERRIDES }, + {new Inject[]{NO_WITH, OVERRIDES, KSQL_CONFIG_R }, OVERRIDES , OVERRIDES }, + {new Inject[]{NO_WITH, OVERRIDES, NO_CONFIG }, OVERRIDES , OVERRIDES }, + {new Inject[]{NO_WITH, OVERRIDES_P, KSQL_CONFIG }, OVERRIDES_P , KSQL_CONFIG }, + {new Inject[]{NO_WITH, OVERRIDES_P, KSQL_CONFIG_P }, OVERRIDES_P , SOURCE }, + {new Inject[]{NO_WITH, OVERRIDES_P, KSQL_CONFIG_R }, OVERRIDES_P , KSQL_CONFIG_R }, + {new Inject[]{NO_WITH, OVERRIDES_P, NO_CONFIG }, OVERRIDES_P , SOURCE }, + {new Inject[]{NO_WITH, OVERRIDES_R, KSQL_CONFIG }, KSQL_CONFIG , OVERRIDES_R }, + {new Inject[]{NO_WITH, OVERRIDES_R, KSQL_CONFIG_P }, KSQL_CONFIG_P , OVERRIDES_R }, + {new Inject[]{NO_WITH, OVERRIDES_R, KSQL_CONFIG_R }, SOURCE , OVERRIDES_R }, + {new Inject[]{NO_WITH, OVERRIDES_R, NO_CONFIG }, SOURCE , OVERRIDES_R }, + {new Inject[]{NO_WITH, NO_OVERRIDES, KSQL_CONFIG }, KSQL_CONFIG , KSQL_CONFIG }, + {new Inject[]{NO_WITH, NO_OVERRIDES, KSQL_CONFIG_P }, KSQL_CONFIG_P , SOURCE }, + {new Inject[]{NO_WITH, NO_OVERRIDES, KSQL_CONFIG_R }, SOURCE , KSQL_CONFIG_R }, + {new Inject[]{NO_WITH, NO_OVERRIDES, NO_CONFIG }, SOURCE , SOURCE }, + }; + + // generate the description from the given injections and put it at the beginning + return Lists.newArrayList(data) + .stream() + .map(params -> Lists.asList( + Arrays.stream((Inject[]) params[0]) + .map(Objects::toString) + .collect(Collectors.joining(", ")), + params)) + .map(List::toArray) + .collect(Collectors.toList()); + } + + @Parameter + public String description; + + @Parameter(1) + public Inject[] injects; + + @Parameter(2) + public Inject expectedPartitions; + + @Parameter(3) + public Inject expectedReplicas; + + private KsqlConfig ksqlConfig = new KsqlConfig(new HashMap<>()); + private Map propertyOverrides = new HashMap<>(); + private Map withClause = new HashMap<>(); + + @Test + public void shouldInferCorrectPartitionsAndReplicas() { + // Given: + Arrays.stream(injects).forEach(this::givenInject); + + // When: + final TopicProperties properties = new TopicProperties.Builder() + .withName("name") + .withWithClause(withClause) + .withOverrides(propertyOverrides) + .withKsqlConfig(ksqlConfig) + .withSource(() -> source(SOURCE)) + .build(); + + // Then: + assertThat(properties.getPartitions(), equalTo(expectedPartitions.partitions)); + assertThat(properties.getReplicas(), equalTo(expectedReplicas.replicas)); + } + + private void givenInject(final Inject inject) { + switch (inject.type) { + case WITH: + if (inject.partitions != null) { + withClause.put( + KsqlConstants.SINK_NUMBER_OF_PARTITIONS, + new IntegerLiteral(inject.partitions)); + } + if (inject.replicas != null) { + withClause.put( + KsqlConstants.SINK_NUMBER_OF_REPLICAS, + new IntegerLiteral(inject.replicas)); + } + break; + case OVERRIDES: + if (inject.partitions != null) { + propertyOverrides.put(KsqlConfig.SINK_NUMBER_OF_PARTITIONS_PROPERTY, inject.partitions); + } + if (inject.replicas != null) { + propertyOverrides.put(KsqlConfig.SINK_NUMBER_OF_REPLICAS_PROPERTY, inject.replicas); + } + break; + case KSQL_CONFIG: + final Map cfg = new HashMap<>(); + if (inject.partitions != null) { + cfg.put(KsqlConfig.SINK_NUMBER_OF_PARTITIONS_PROPERTY, inject.partitions); + } + if (inject.replicas != null) { + cfg.put(KsqlConfig.SINK_NUMBER_OF_REPLICAS_PROPERTY, inject.replicas); + } + ksqlConfig = new KsqlConfig(cfg); + break; + case SOURCE: + default: + throw new IllegalArgumentException(inject.type.toString()); + } + } + + public TopicDescription source(final Inject source) { + return new TopicDescription( + "source", + false, + Collections.nCopies(source.partitions, + new TopicPartitionInfo( + 0, + null, + Collections.nCopies(source.replicas, new Node(0, "", 0)), + ImmutableList.of()) + ) + ); + } + } + + enum Inject { + SOURCE(Type.SOURCE, 1, (short) 1), + SOURCE2(Type.SOURCE, 12, (short) 12), + + WITH(Type.WITH, 2, (short) 2), + OVERRIDES(Type.OVERRIDES, 3, (short) 3), + KSQL_CONFIG(Type.KSQL_CONFIG, 4, (short) 4), + + WITH_P(Type.WITH, 5, null), + OVERRIDES_P(Type.OVERRIDES, 6, null), + KSQL_CONFIG_P(Type.KSQL_CONFIG, 7, null), + + WITH_R(Type.WITH, null, (short) 8), + OVERRIDES_R(Type.OVERRIDES, null, (short) 9), + KSQL_CONFIG_R(Type.KSQL_CONFIG, null, (short) 10), + + NO_WITH(Type.WITH, null, null), + NO_OVERRIDES(Type.OVERRIDES, null, null), + NO_CONFIG(Type.KSQL_CONFIG, null, null) + ; + + final Type type; + final Integer partitions; + final Short replicas; + + Inject(final Type type, final Integer partitions, final Short replicas) { + this.type = type; + this.partitions = partitions; + this.replicas = replicas; + } + + enum Type { + WITH, + OVERRIDES, + KSQL_CONFIG, + SOURCE + } + + /** + * Generates code for all combinations of Injects + */ + public static void main(String[] args) { + final List withs = EnumSet.allOf(Inject.class) + .stream().filter(i -> i.type == Type.WITH).collect(Collectors.toList()); + final List overrides = EnumSet.allOf(Inject.class) + .stream().filter(i -> i.type == Type.OVERRIDES).collect(Collectors.toList()); + final List ksqlConfigs = EnumSet.allOf(Inject.class) + .stream().filter(i -> i.type == Type.KSQL_CONFIG).collect(Collectors.toList()); + + for (List injects : Lists.cartesianProduct(withs, overrides, ksqlConfigs)) { + // sort by precedence order + injects = new ArrayList<>(injects); + injects.sort(Comparator.comparing(i -> i.type)); + + final Inject expectedPartitions = + injects.stream().filter(i -> i.partitions != null).findFirst().orElse(Inject.SOURCE); + final Inject expectedReplicas = + injects.stream().filter(i -> i.replicas != null).findFirst().orElse(Inject.SOURCE); + + System.out.println(String.format("{new Inject[]{%-38s}, %-15s, %-15s},", + injects.stream().map(Objects::toString).collect(Collectors.joining(", ")), + expectedPartitions, + expectedReplicas) + ); + } + } + } +} diff --git a/ksql-parser/src/main/java/io/confluent/ksql/parser/tree/CreateAsSelect.java b/ksql-parser/src/main/java/io/confluent/ksql/parser/tree/CreateAsSelect.java index 784c3a178d9b..5185f85a1173 100644 --- a/ksql-parser/src/main/java/io/confluent/ksql/parser/tree/CreateAsSelect.java +++ b/ksql-parser/src/main/java/io/confluent/ksql/parser/tree/CreateAsSelect.java @@ -15,16 +15,100 @@ package io.confluent.ksql.parser.tree; +import static java.util.Objects.requireNonNull; + +import com.google.common.collect.ImmutableMap; import java.util.Map; +import java.util.Objects; import java.util.Optional; -public interface CreateAsSelect extends QueryContainer { +public abstract class CreateAsSelect extends Statement implements QueryContainer { + + private final QualifiedName name; + private final Query query; + private final boolean notExists; + private final ImmutableMap properties; + private final Optional partitionByColumn; + + CreateAsSelect( + final Optional location, + final QualifiedName name, + final Query query, + final boolean notExists, + final Map properties, + final Optional partitionByColumn) { + super(location); + this.name = requireNonNull(name, "name is null"); + this.query = requireNonNull(query, "query is null"); + this.notExists = notExists; + this.properties = ImmutableMap + .copyOf(requireNonNull(properties, "properties is null")); + this.partitionByColumn = requireNonNull(partitionByColumn, "partitionByColumn"); + } + + CreateAsSelect( + final CreateAsSelect other, + final Map properties) { + this( + other.getLocation(), + other.name, + other.query, + other.notExists, + properties, + other.partitionByColumn); + } + + public abstract CreateAsSelect copyWith(Map properties); + + public QualifiedName getName() { + return name; + } + + @Override + public Query getQuery() { + return query; + } + + public boolean isNotExists() { + return notExists; + } + + public Map getProperties() { + return properties; + } - Query getQuery(); + public Optional getPartitionByColumn() { + return partitionByColumn; + } - Map getProperties(); + @Override + public int hashCode() { + return Objects.hash(name, query, properties, notExists, getClass()); + } - Optional getPartitionByColumn(); + @Override + public boolean equals(final Object obj) { + if (this == obj) { + return true; + } + if ((obj == null) || (getClass() != obj.getClass())) { + return false; + } + final CreateAsSelect o = (CreateAsSelect) obj; + return Objects.equals(name, o.name) + && Objects.equals(query, o.query) + && Objects.equals(notExists, o.notExists) + && Objects.equals(properties, o.properties) + && Objects.equals(partitionByColumn, o.partitionByColumn); + } - QualifiedName getName(); + @Override + public String toString() { + return "CreateAsSelect{" + "name=" + name + + ", query=" + query + + ", notExists=" + notExists + + ", properties=" + properties + + ", partitionByColumn=" + partitionByColumn + + '}'; + } } diff --git a/ksql-parser/src/main/java/io/confluent/ksql/parser/tree/CreateStreamAsSelect.java b/ksql-parser/src/main/java/io/confluent/ksql/parser/tree/CreateStreamAsSelect.java index 277b399c17f0..7ea058d3cc03 100644 --- a/ksql-parser/src/main/java/io/confluent/ksql/parser/tree/CreateStreamAsSelect.java +++ b/ksql-parser/src/main/java/io/confluent/ksql/parser/tree/CreateStreamAsSelect.java @@ -15,22 +15,14 @@ package io.confluent.ksql.parser.tree; -import static com.google.common.base.MoreObjects.toStringHelper; -import static java.util.Objects.requireNonNull; - import com.google.common.collect.ImmutableMap; +import com.google.errorprone.annotations.Immutable; import io.confluent.ksql.ddl.DdlConfig; import java.util.Map; -import java.util.Objects; import java.util.Optional; -public class CreateStreamAsSelect extends Statement implements CreateAsSelect { - - private final QualifiedName name; - private final Query query; - private final boolean notExists; - private final ImmutableMap properties; - private final Optional partitionByColumn; +@Immutable +public class CreateStreamAsSelect extends CreateAsSelect { public CreateStreamAsSelect( final QualifiedName name, @@ -48,51 +40,33 @@ public CreateStreamAsSelect( final Query query, final boolean notExists, final Map properties, - final Optional partitionByColumn - ) { - super(location); - this.name = requireNonNull(name, "name"); - this.query = requireNonNull(query, "query"); - this.notExists = notExists; - this.properties = ImmutableMap.copyOf(requireNonNull(properties, "properties")); - this.partitionByColumn = requireNonNull(partitionByColumn, "partitionByColumn"); - } - - @Override - public QualifiedName getName() { - return name; + final Optional partitionByColumn) { + super(location, name, query, notExists, properties, partitionByColumn); } - @Override - public Query getQuery() { - return query; + private CreateStreamAsSelect( + final CreateStreamAsSelect other, + final Map properties + ) { + super(other, properties); } @Override public Sink getSink() { - final Map sinkProperties = partitionByColumn + final Map sinkProperties = getPartitionByColumn() .map(exp -> (Map)ImmutableMap.builder() - .putAll(properties) + .putAll(getProperties()) .put(DdlConfig.PARTITION_BY_PROPERTY, exp) .build() ) - .orElse(properties); - - return Sink.of(name.getSuffix(), true, sinkProperties); - } + .orElse(getProperties()); - public boolean isNotExists() { - return notExists; + return Sink.of(getName().getSuffix(), true, sinkProperties); } @Override - public Map getProperties() { - return properties; - } - - @Override - public Optional getPartitionByColumn() { - return partitionByColumn; + public CreateAsSelect copyWith(final Map properties) { + return new CreateStreamAsSelect(this, properties); } @Override @@ -100,35 +74,8 @@ public R accept(final AstVisitor visitor, final C context) { return visitor.visitCreateStreamAsSelect(this, context); } - @Override - public int hashCode() { - return Objects.hash(name, query, notExists, properties, partitionByColumn); - } - - @Override - public boolean equals(final Object obj) { - if (this == obj) { - return true; - } - if ((obj == null) || (getClass() != obj.getClass())) { - return false; - } - final CreateStreamAsSelect o = (CreateStreamAsSelect) obj; - return Objects.equals(name, o.name) - && Objects.equals(query, o.query) - && Objects.equals(notExists, o.notExists) - && Objects.equals(partitionByColumn, o.partitionByColumn) - && Objects.equals(properties, o.properties); - } - @Override public String toString() { - return toStringHelper(this) - .add("name", name) - .add("query", query) - .add("notExists", notExists) - .add("properties", properties) - .add("partitionByColumn", partitionByColumn) - .toString(); + return "CreateStreamAsSelect{" + super.toString() + '}'; } } diff --git a/ksql-parser/src/main/java/io/confluent/ksql/parser/tree/CreateTableAsSelect.java b/ksql-parser/src/main/java/io/confluent/ksql/parser/tree/CreateTableAsSelect.java index 48e630d8d050..001f7389a468 100644 --- a/ksql-parser/src/main/java/io/confluent/ksql/parser/tree/CreateTableAsSelect.java +++ b/ksql-parser/src/main/java/io/confluent/ksql/parser/tree/CreateTableAsSelect.java @@ -15,22 +15,12 @@ package io.confluent.ksql.parser.tree; -import static com.google.common.base.MoreObjects.toStringHelper; -import static java.util.Objects.requireNonNull; - -import com.google.common.collect.ImmutableMap; import com.google.errorprone.annotations.Immutable; import java.util.Map; -import java.util.Objects; import java.util.Optional; @Immutable -public class CreateTableAsSelect extends Statement implements CreateAsSelect { - - private final QualifiedName name; - private final Query query; - private final boolean notExists; - private final ImmutableMap properties; +public class CreateTableAsSelect extends CreateAsSelect { public CreateTableAsSelect( final QualifiedName name, @@ -48,40 +38,19 @@ public CreateTableAsSelect( final boolean notExists, final Map properties ) { - super(location); - this.name = requireNonNull(name, "name"); - this.query = requireNonNull(query, "query"); - this.notExists = notExists; - this.properties = ImmutableMap.copyOf(requireNonNull(properties, "properties")); - } - - @Override - public QualifiedName getName() { - return name; + super(location, name, query, notExists, properties, Optional.empty()); } - @Override - public Query getQuery() { - return query; - } - - @Override - public Sink getSink() { - return Sink.of(name.getSuffix(), true, properties); - } - - public boolean isNotExists() { - return notExists; - } - - @Override - public Map getProperties() { - return properties; + private CreateTableAsSelect( + final CreateTableAsSelect other, + final Map properties + ) { + super(other, properties); } @Override - public Optional getPartitionByColumn() { - return Optional.empty(); + public CreateAsSelect copyWith(final Map properties) { + return new CreateTableAsSelect(this, properties); } @Override @@ -90,32 +59,12 @@ public R accept(final AstVisitor visitor, final C context) { } @Override - public int hashCode() { - return Objects.hash(name, query, properties); - } - - @Override - public boolean equals(final Object obj) { - if (this == obj) { - return true; - } - if ((obj == null) || (getClass() != obj.getClass())) { - return false; - } - final CreateTableAsSelect o = (CreateTableAsSelect) obj; - return Objects.equals(name, o.name) - && Objects.equals(query, o.query) - && Objects.equals(notExists, o.notExists) - && Objects.equals(properties, o.properties); + public Sink getSink() { + return Sink.of(getName().getSuffix(), true, getProperties()); } @Override public String toString() { - return toStringHelper(this) - .add("name", name) - .add("query", query) - .add("notExists", notExists) - .add("properties", properties) - .toString(); + return "CreateTableAsSelect{" + super.toString() + '}'; } } diff --git a/ksql-parser/src/main/java/io/confluent/ksql/util/ParserUtil.java b/ksql-parser/src/main/java/io/confluent/ksql/util/ParserUtil.java index 249b4f49330e..0194523beae9 100644 --- a/ksql-parser/src/main/java/io/confluent/ksql/util/ParserUtil.java +++ b/ksql-parser/src/main/java/io/confluent/ksql/util/ParserUtil.java @@ -17,7 +17,6 @@ import com.google.common.collect.ImmutableSet; import io.confluent.ksql.parser.SqlBaseLexer; - import java.util.Objects; import java.util.Set; import java.util.stream.Collectors; diff --git a/ksql-parser/src/main/java/io/confluent/ksql/util/WithClauseUtil.java b/ksql-parser/src/main/java/io/confluent/ksql/util/WithClauseUtil.java new file mode 100644 index 000000000000..70a6d1e1793f --- /dev/null +++ b/ksql-parser/src/main/java/io/confluent/ksql/util/WithClauseUtil.java @@ -0,0 +1,62 @@ +/* + * Copyright 2019 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"; you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.util; + +import io.confluent.ksql.parser.tree.Expression; +import javax.annotation.Nullable; +import org.apache.commons.lang3.StringUtils; + +public final class WithClauseUtil { + + private WithClauseUtil() { } + + public static Integer parsePartitions(@Nullable final Expression expression) { + if (expression == null) { + return null; + } + + final String expAsString = expression.toString(); + try { + final int partitions = Integer.parseInt(StringUtils.strip(expAsString, "'")); + if (partitions <= 0) { + throw new KsqlException("Invalid number of partitions in WITH clause (must be positive): " + + partitions); + } + return partitions; + } catch (NumberFormatException e) { + throw new KsqlException("Invalid number of partitions in WITH clause: " + expression, e); + } + } + + public static Short parseReplicas(@Nullable final Expression expression) { + if (expression == null) { + return null; + } + + final String expAsString = expression.toString(); + try { + final short replicas = Short.parseShort(StringUtils.strip(expAsString, "'")); + if (replicas <= 0) { + throw new KsqlException("Invalid number of replicas in WITH clause (must be positive): " + + replicas); + } + return replicas; + } catch (NumberFormatException e) { + throw new KsqlException("Invalid number of replicas in WITH clause: " + expression, e); + } + } + +} diff --git a/ksql-parser/src/test/java/io/confluent/ksql/parser/tree/CreateTableAsSelectTest.java b/ksql-parser/src/test/java/io/confluent/ksql/parser/tree/CreateTableAsSelectTest.java index 6017d27966ae..feef362d2037 100644 --- a/ksql-parser/src/test/java/io/confluent/ksql/parser/tree/CreateTableAsSelectTest.java +++ b/ksql-parser/src/test/java/io/confluent/ksql/parser/tree/CreateTableAsSelectTest.java @@ -56,6 +56,9 @@ public void shouldImplementHashCodeAndEqualsProperty() { .addEqualityGroup( new CreateTableAsSelect(SOME_NAME, SOME_QUERY, true, Collections.emptyMap()) ) + .addEqualityGroup( + new CreateStreamAsSelect(SOME_NAME, SOME_QUERY, true, Collections.emptyMap(), Optional.empty()) + ) .testEquals(); } } \ No newline at end of file diff --git a/ksql-parser/src/test/java/io/confluent/ksql/util/WithClauseUtilTest.java b/ksql-parser/src/test/java/io/confluent/ksql/util/WithClauseUtilTest.java new file mode 100644 index 000000000000..53b67c5548ba --- /dev/null +++ b/ksql-parser/src/test/java/io/confluent/ksql/util/WithClauseUtilTest.java @@ -0,0 +1,228 @@ +package io.confluent.ksql.util; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; + +import io.confluent.ksql.parser.tree.Expression; +import io.confluent.ksql.parser.tree.IntegerLiteral; +import io.confluent.ksql.parser.tree.LongLiteral; +import io.confluent.ksql.parser.tree.StringLiteral; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +public class WithClauseUtilTest { + + @Rule public ExpectedException expectedException = ExpectedException.none(); + + @Test + public void shouldParseIntLiteralPartitions() { + // Given: + final Expression expression = new IntegerLiteral(1); + + // When: + final int partitions = WithClauseUtil.parsePartitions(expression); + + // Then: + assertThat(partitions, equalTo(1)); + } + + @Test + public void shouldParseLongLiteralPartitions() { + // Given: + final Expression expression = new LongLiteral(1); + + // When: + final int partitions = WithClauseUtil.parsePartitions(expression); + + // Then: + assertThat(partitions, equalTo(1)); + } + + @Test + public void shouldParseStringLiteralPartitions() { + // Given: + final Expression expression = new StringLiteral("1"); + + // When: + final int partitions = WithClauseUtil.parsePartitions(expression); + + // Then: + assertThat(partitions, equalTo(1)); + } + + @Test + public void shouldFailParseNonNumericPartitions() { + // Given: + final Expression expression = new StringLiteral("not a number"); + + // Expect: + expectedException.expect(KsqlException.class); + expectedException.expectMessage("Invalid number of partitions in WITH clause"); + + // When: + WithClauseUtil.parsePartitions(expression); + } + + @Test + public void shouldFailParseFractionPartitions() { + // Given: + final Expression expression = new StringLiteral("0.5"); + + // Expect: + expectedException.expect(KsqlException.class); + expectedException.expectMessage( + "Invalid number of partitions in WITH clause"); + + // When: + WithClauseUtil.parsePartitions(expression); + } + + @Test + public void shouldFailParseNegativePartitions() { + // Given: + final Expression expression = new IntegerLiteral(-1); + + // Expect: + expectedException.expect(KsqlException.class); + expectedException.expectMessage( + "Invalid number of partitions in WITH clause (must be positive)"); + + // When: + WithClauseUtil.parsePartitions(expression); + } + + @Test + public void shouldFailParseZeroPartitions() { + // Given: + final Expression expression = new IntegerLiteral(0); + + // Expect: + expectedException.expect(KsqlException.class); + expectedException.expectMessage( + "Invalid number of partitions in WITH clause (must be positive)"); + + // When: + WithClauseUtil.parsePartitions(expression); + } + + @Test + public void shouldFailParsePartitionsOverflow() { + // Given: + final Expression expression = new StringLiteral("9999999999999999999999"); + + // Expect: + expectedException.expect(KsqlException.class); + expectedException.expectMessage( + "Invalid number of partitions in WITH clause"); + + // When: + WithClauseUtil.parsePartitions(expression); + } + + @Test + public void shouldParseIntLiteralReplicas() { + // Given: + final Expression expression = new IntegerLiteral(1); + + // When: + final short replicas = WithClauseUtil.parseReplicas(expression); + + // Then: + assertThat(replicas, equalTo((short) 1)); + } + + @Test + public void shouldParseLongLiteralReplicas() { + // Given: + final Expression expression = new LongLiteral(1); + + // When: + final short replicas = WithClauseUtil.parseReplicas(expression); + + // Then: + assertThat(replicas, equalTo((short) 1)); + } + + @Test + public void shouldParseStringLiteralReplicas() { + // Given: + final Expression expression = new StringLiteral("1"); + + // When: + final short replicas = WithClauseUtil.parseReplicas(expression); + + // Then: + assertThat(replicas, equalTo((short) 1)); + } + + @Test + public void shouldFailParseNonNumericReplicas() { + // Given: + final Expression expression = new StringLiteral("not a number"); + + // Expect: + expectedException.expect(KsqlException.class); + expectedException.expectMessage("Invalid number of replicas in WITH clause"); + + // When: + WithClauseUtil.parseReplicas(expression); + } + + @Test + public void shouldFailParseFractionReplicas() { + // Given: + final Expression expression = new StringLiteral("0.5"); + + // Expect: + expectedException.expect(KsqlException.class); + expectedException.expectMessage( + "Invalid number of replicas in WITH clause"); + + // When: + WithClauseUtil.parseReplicas(expression); + } + + @Test + public void shouldFailParseZeroReplicas() { + // Given: + final Expression expression = new IntegerLiteral(0); + + // Expect: + expectedException.expect(KsqlException.class); + expectedException.expectMessage( + "Invalid number of replicas in WITH clause (must be positive)"); + + // When: + WithClauseUtil.parseReplicas(expression); + } + + @Test + public void shouldFailParseNegativeReplicas() { + // Given: + final Expression expression = new IntegerLiteral(-1); + + // Expect: + expectedException.expect(KsqlException.class); + expectedException.expectMessage( + "Invalid number of replicas in WITH clause (must be positive)"); + + // When: + WithClauseUtil.parseReplicas(expression); + } + + @Test + public void shouldFailParseReplicasOverflow() { + // Given: + final Expression expression = new StringLiteral("9999999999999999999999"); + + // Expect: + expectedException.expect(KsqlException.class); + expectedException.expectMessage( + "Invalid number of replicas in WITH clause"); + + // When: + WithClauseUtil.parseReplicas(expression); + } + +} \ No newline at end of file diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java index eea7435e64c3..646b5cc2712e 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java @@ -60,6 +60,7 @@ import io.confluent.ksql.schema.inference.SchemaRegistryTopicSchemaSupplier; import io.confluent.ksql.services.DefaultServiceContext; import io.confluent.ksql.services.ServiceContext; +import io.confluent.ksql.topic.DefaultTopicInjector; import io.confluent.ksql.util.KsqlConfig; import io.confluent.ksql.util.KsqlException; import io.confluent.ksql.util.Version; @@ -410,8 +411,8 @@ static KsqlRestApplication buildApplication( commandStore, Duration.ofMillis(restConfig.getLong(DISTRIBUTED_COMMAND_RESPONSE_TIMEOUT_MS_CONFIG)), versionChecker::updateLastRequestTime, - schemaInjectorFactory - ); + schemaInjectorFactory, + DefaultTopicInjector::new); final Optional processingLogTopic = ProcessingLogServerUtils.maybeCreateProcessingLogTopic( diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/StandaloneExecutor.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/StandaloneExecutor.java index 30f8d54f5733..d09b36b8cf24 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/StandaloneExecutor.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/StandaloneExecutor.java @@ -34,8 +34,8 @@ import io.confluent.ksql.parser.tree.UnsetProperty; import io.confluent.ksql.rest.util.ProcessingLogServerUtils; import io.confluent.ksql.schema.inference.SchemaInjector; -import io.confluent.ksql.services.SandboxedServiceContext; import io.confluent.ksql.services.ServiceContext; +import io.confluent.ksql.topic.TopicInjector; import io.confluent.ksql.util.KsqlConfig; import io.confluent.ksql.util.KsqlException; import io.confluent.ksql.util.KsqlStatementException; @@ -78,6 +78,7 @@ public class StandaloneExecutor implements Executable { private final boolean failOnNoQueries; private final VersionCheckerAgent versionChecker; private final Function schemaInjectorFactory; + private final Function topicInjectorFactory; StandaloneExecutor( final ServiceContext serviceContext, @@ -88,7 +89,8 @@ public class StandaloneExecutor implements Executable { final UdfLoader udfLoader, final boolean failOnNoQueries, final VersionCheckerAgent versionChecker, - final Function schemaInjectorFactory + final Function schemaInjectorFactory, + final Function topicInjectorFactory ) { this.serviceContext = Objects.requireNonNull(serviceContext, "serviceContext"); this.processingLogConfig = Objects.requireNonNull(processingLogConfig, "processingLogConfig"); @@ -100,6 +102,8 @@ public class StandaloneExecutor implements Executable { this.versionChecker = Objects.requireNonNull(versionChecker, "versionChecker"); this.schemaInjectorFactory = Objects .requireNonNull(schemaInjectorFactory, "schemaInjectorFactory"); + this.topicInjectorFactory = Objects + .requireNonNull(topicInjectorFactory, "topicInjectorFactory"); } public void start() { @@ -166,22 +170,27 @@ private void processesQueryFile(final String queries) { validateStatements(preparedStatements); final SchemaInjector schemaInjector = schemaInjectorFactory.apply(serviceContext); + final TopicInjector topicInjector = topicInjectorFactory.apply(ksqlEngine); executeStatements( preparedStatements, - new StatementExecutor(ksqlEngine, schemaInjector, configProperties, ksqlConfig) + new StatementExecutor( + ksqlEngine, schemaInjector, topicInjector, configProperties, ksqlConfig) ); ksqlEngine.getPersistentQueries().forEach(QueryMetadata::start); } private void validateStatements(final List statements) { + final KsqlExecutionContext sandboxEngine = ksqlEngine.createSandbox(); final SchemaInjector schemaInjector = schemaInjectorFactory - .apply(SandboxedServiceContext.create(serviceContext)); + .apply(sandboxEngine.getServiceContext()); + final TopicInjector topicInjector = topicInjectorFactory.apply(sandboxEngine); final StatementExecutor sandboxExecutor = new StatementExecutor( - ksqlEngine.createSandbox(), + sandboxEngine, schemaInjector, + topicInjector, new HashMap<>(configProperties), ksqlConfig ); @@ -259,10 +268,12 @@ private static final class StatementExecutor { private final SchemaInjector schemaInjector; private final Map configProperties; private final KsqlConfig ksqlConfig; + private final TopicInjector topicInjector; private StatementExecutor( final KsqlExecutionContext executionContext, final SchemaInjector schemaInjector, + final TopicInjector topicInjector, final Map configProperties, final KsqlConfig ksqlConfig ) { @@ -270,6 +281,7 @@ private StatementExecutor( this.schemaInjector = Objects.requireNonNull(schemaInjector, "schemaInjector"); this.configProperties = Objects.requireNonNull(configProperties, "configProperties"); this.ksqlConfig = Objects.requireNonNull(ksqlConfig, "ksqlConfig"); + this.topicInjector = Objects.requireNonNull(topicInjector, "topicInjector"); } /** @@ -294,9 +306,15 @@ boolean execute(final ParsedStatement statement) { return prepared.getStatement() instanceof QueryContainer; } - private PreparedStatement prepare(final ParsedStatement statement) { + private PreparedStatement prepare( + final ParsedStatement statement + ) { final PreparedStatement prepared = executionContext.prepare(statement); - return schemaInjector.forStatement(prepared); + final PreparedStatement withSchema = schemaInjector.forStatement(prepared); + return topicInjector.forStatement( + withSchema, + ksqlConfig, + configProperties); } private static void throwOnMissingSchema(final PreparedStatement statement) { diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/StandaloneExecutorFactory.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/StandaloneExecutorFactory.java index 6b0d356f79e6..56e931d48d47 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/StandaloneExecutorFactory.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/StandaloneExecutorFactory.java @@ -16,6 +16,7 @@ package io.confluent.ksql.rest.server; import com.google.common.annotations.VisibleForTesting; +import io.confluent.ksql.KsqlExecutionContext; import io.confluent.ksql.engine.KsqlEngine; import io.confluent.ksql.function.InternalFunctionRegistry; import io.confluent.ksql.function.MutableFunctionRegistry; @@ -30,6 +31,8 @@ import io.confluent.ksql.schema.inference.SchemaRegistryTopicSchemaSupplier; import io.confluent.ksql.services.DefaultServiceContext; import io.confluent.ksql.services.ServiceContext; +import io.confluent.ksql.topic.DefaultTopicInjector; +import io.confluent.ksql.topic.TopicInjector; import io.confluent.ksql.util.KsqlConfig; import io.confluent.ksql.version.metrics.KsqlVersionCheckerAgent; import io.confluent.ksql.version.metrics.VersionCheckerAgent; @@ -72,7 +75,8 @@ StandaloneExecutor create( UdfLoader udfLoader, boolean failOnNoQueries, VersionCheckerAgent versionChecker, - Function schemaInjectorFactory + Function schemaInjectorFactory, + Function topicInjectorFactory ); } @@ -132,7 +136,8 @@ static StandaloneExecutor create( udfLoader, true, versionChecker, - schemaInjectorFactory + schemaInjectorFactory, + DefaultTopicInjector::new ); } } diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/DistributingExecutor.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/DistributingExecutor.java index fb2cca605ba7..fbb66f3b397d 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/DistributingExecutor.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/DistributingExecutor.java @@ -23,6 +23,7 @@ import io.confluent.ksql.rest.server.execution.StatementExecutor; import io.confluent.ksql.schema.inference.SchemaInjector; import io.confluent.ksql.services.ServiceContext; +import io.confluent.ksql.topic.TopicInjector; import io.confluent.ksql.util.KsqlConfig; import io.confluent.ksql.util.KsqlServerException; import java.time.Duration; @@ -42,14 +43,18 @@ public class DistributingExecutor implements StatementExecutor { private final CommandQueue commandQueue; private final Duration distributedCmdResponseTimeout; private final Function schemaInjectorFactory; + private final Function topicInjectorFactory; public DistributingExecutor( final CommandQueue commandQueue, final Duration distributedCmdResponseTimeout, - final Function schemaInjectorFactory) { + final Function schemaInjectorFactory, + final Function topicInjectorFactory) { this.commandQueue = Objects.requireNonNull(commandQueue, "commandQueue"); this.schemaInjectorFactory = Objects.requireNonNull(schemaInjectorFactory, "schemaInjectorFactory"); + this.topicInjectorFactory = Objects + .requireNonNull(topicInjectorFactory, "topicInjectorFactory"); this.distributedCmdResponseTimeout = Objects.requireNonNull(distributedCmdResponseTimeout, "distributedCmdResponseTimeout"); } @@ -61,18 +66,20 @@ public Optional execute( final ServiceContext serviceContext, final KsqlConfig ksqlConfig, final Map propertyOverrides) { - final PreparedStatement withSchema = - schemaInjectorFactory.apply(serviceContext).forStatement(statement); + final PreparedStatement withSchema = schemaInjectorFactory + .apply(serviceContext).forStatement(statement); + final PreparedStatement withTopic = topicInjectorFactory + .apply(executionContext).forStatement(withSchema, ksqlConfig, propertyOverrides); try { final QueuedCommandStatus queuedCommandStatus = commandQueue - .enqueueCommand(withSchema, ksqlConfig, propertyOverrides); + .enqueueCommand(withTopic, ksqlConfig, propertyOverrides); final CommandStatus commandStatus = queuedCommandStatus .tryWaitForFinalStatus(distributedCmdResponseTimeout); return Optional.of(new CommandStatusEntity( - withSchema.getStatementText(), + withTopic.getStatementText(), queuedCommandStatus.getCommandId(), commandStatus, queuedCommandStatus.getCommandSequenceNumber() diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/KsqlResource.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/KsqlResource.java index 88923deb1a6f..4bec05c64cc0 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/KsqlResource.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/KsqlResource.java @@ -18,6 +18,7 @@ import static java.util.regex.Pattern.compile; import com.google.common.collect.ImmutableSet; +import io.confluent.ksql.KsqlExecutionContext; import io.confluent.ksql.engine.KsqlEngine; import io.confluent.ksql.parser.DefaultKsqlParser; import io.confluent.ksql.parser.KsqlParser.ParsedStatement; @@ -44,6 +45,7 @@ import io.confluent.ksql.schema.inference.SchemaInjector; import io.confluent.ksql.services.SandboxedServiceContext; import io.confluent.ksql.services.ServiceContext; +import io.confluent.ksql.topic.TopicInjector; import io.confluent.ksql.util.KsqlConfig; import io.confluent.ksql.util.KsqlException; import io.confluent.ksql.util.KsqlStatementException; @@ -96,7 +98,8 @@ public KsqlResource( final CommandQueue commandQueue, final Duration distributedCmdResponseTimeout, final ActivenessRegistrar activenessRegistrar, - final Function schemaInjectorFactory + final Function schemaInjectorFactory, + final Function topicInjectorFactory ) { this.ksqlEngine = Objects.requireNonNull(ksqlEngine, "ksqlEngine"); this.commandQueue = Objects.requireNonNull(commandQueue, "commandQueue"); @@ -108,6 +111,7 @@ public KsqlResource( this.validator = new RequestValidator( CustomValidators.VALIDATOR_MAP, schemaInjectorFactory, + topicInjectorFactory, ksqlEngine::createSandbox, SandboxedServiceContext.create(serviceContext), ksqlConfig); @@ -116,7 +120,8 @@ public KsqlResource( new DistributingExecutor( commandQueue, distributedCmdResponseTimeout, - schemaInjectorFactory), + schemaInjectorFactory, + topicInjectorFactory), ksqlEngine, ksqlConfig, serviceContext, diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/validation/RequestValidator.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/validation/RequestValidator.java index 0694a2d5ae1f..69c1e70d478c 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/validation/RequestValidator.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/validation/RequestValidator.java @@ -15,6 +15,8 @@ package io.confluent.ksql.rest.server.validation; +import static java.util.Objects.requireNonNull; + import io.confluent.ksql.KsqlExecutionContext; import io.confluent.ksql.engine.KsqlEngine; import io.confluent.ksql.parser.KsqlParser.ParsedStatement; @@ -26,13 +28,13 @@ import io.confluent.ksql.rest.util.QueryCapacityUtil; import io.confluent.ksql.schema.inference.SchemaInjector; import io.confluent.ksql.services.ServiceContext; +import io.confluent.ksql.topic.TopicInjector; import io.confluent.ksql.util.KsqlConfig; import io.confluent.ksql.util.KsqlConstants; import io.confluent.ksql.util.KsqlException; import io.confluent.ksql.util.KsqlStatementException; import java.util.List; import java.util.Map; -import java.util.Objects; import java.util.function.Function; import java.util.function.Supplier; import org.slf4j.Logger; @@ -48,6 +50,7 @@ public class RequestValidator { private static final Logger LOG = LoggerFactory.getLogger(RequestValidator.class); private final Map, StatementValidator> customValidators; + private final Function topicInjectorFactory; private final Function schemaInjectorFactory; private final Supplier snapshotSupplier; private final ServiceContext serviceContext; @@ -65,15 +68,17 @@ public class RequestValidator { public RequestValidator( final Map, StatementValidator> customValidators, final Function schemaInjectorFactory, + final Function topicInjectorFactory, final Supplier snapshotSupplier, final ServiceContext serviceContext, final KsqlConfig ksqlConfig ) { - this.customValidators = Objects.requireNonNull(customValidators, "customValidators"); - this.schemaInjectorFactory = Objects.requireNonNull(schemaInjectorFactory, "schemaInjector"); - this.snapshotSupplier = Objects.requireNonNull(snapshotSupplier, "snapshotSupplier"); - this.serviceContext = Objects.requireNonNull(serviceContext, "serviceContext"); - this.ksqlConfig = Objects.requireNonNull(ksqlConfig, "ksqlConfig"); + this.customValidators = requireNonNull(customValidators, "customValidators"); + this.schemaInjectorFactory = requireNonNull(schemaInjectorFactory, "schemaInjector"); + this.topicInjectorFactory = requireNonNull(topicInjectorFactory, "topicInjectorFactory"); + this.snapshotSupplier = requireNonNull(snapshotSupplier, "snapshotSupplier"); + this.serviceContext = requireNonNull(serviceContext, "serviceContext"); + this.ksqlConfig = requireNonNull(ksqlConfig, "ksqlConfig"); } /** @@ -96,7 +101,8 @@ public int validate( final String sql ) { final KsqlExecutionContext ctx = snapshotSupplier.get(); - final SchemaInjector injector = schemaInjectorFactory.apply(serviceContext); + final SchemaInjector schemaInjector = schemaInjectorFactory.apply(serviceContext); + final TopicInjector topicInjector = topicInjectorFactory.apply(ctx); int numPersistentQueries = 0; for (ParsedStatement parsed : statements) { @@ -104,7 +110,7 @@ public int validate( numPersistentQueries += (prepared.getStatement() instanceof RunScript) ? validateRunScript(prepared, propertyOverrides, ctx) - : validate(prepared, ksqlConfig, propertyOverrides, ctx, injector); + : validate(prepared, ksqlConfig, propertyOverrides, ctx, schemaInjector, topicInjector); } if (QueryCapacityUtil.exceedsPersistentQueryCapacity(ctx, ksqlConfig, numPersistentQueries)) { @@ -125,8 +131,8 @@ private int validate( final KsqlConfig ksqlConfig, final Map propertyOverrides, final KsqlExecutionContext executionContext, - final SchemaInjector schemaInjector - ) throws KsqlStatementException { + final SchemaInjector schemaInjector, + final TopicInjector topicInjector) throws KsqlStatementException { final Statement statement = prepared.getStatement(); final Class statementClass = statement.getClass(); final StatementValidator customValidator = (StatementValidator) @@ -137,7 +143,8 @@ private int validate( prepared, executionContext, serviceContext, ksqlConfig, propertyOverrides); } else if (KsqlEngine.isExecutableStatement(prepared)) { executionContext.execute( - schemaInjector.forStatement(prepared), + topicInjector.forStatement( + schemaInjector.forStatement(prepared), ksqlConfig, propertyOverrides), ksqlConfig, propertyOverrides ); diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/parser/ParserMatchers.java b/ksql-rest-app/src/test/java/io/confluent/ksql/parser/ParserMatchers.java index 129ae8db334b..0aa70f33dd8c 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/parser/ParserMatchers.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/parser/ParserMatchers.java @@ -57,6 +57,15 @@ public static Matcher> preparedStatem .and(StatementMatcher.statement(instanceOf(statementType))); } + @SuppressWarnings("unchecked") + public static Matcher> preparedStatement( + final Matcher statementText, + final Class statementType + ) { + return (Matcher) both(StatementTextMatcher.statementWithText(statementText)) + .and(StatementMatcher.statement(instanceOf(statementType))); + } + @SuppressWarnings("unchecked") public static Matcher> preparedStatement( final Matcher statementTextMatcher, @@ -98,8 +107,8 @@ public static Matcher> statementWithT public static final class StatementMatcher extends FeatureMatcher, Statement> { - public StatementMatcher(Matcher textMatcher) { - super(textMatcher, "a prepared statement", "statement"); + public StatementMatcher(Matcher statementMatcher) { + super(statementMatcher, "a prepared statement", "statement"); } @Override @@ -109,9 +118,9 @@ protected Statement featureValueOf(final PreparedStatement actual) { @Factory public static Matcher> statement( - final Matcher textMatcher + final Matcher statementMatcher ) { - return new StatementMatcher<>(textMatcher); + return new StatementMatcher<>(statementMatcher); } } } \ No newline at end of file diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/StandaloneExecutorFactoryTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/StandaloneExecutorFactoryTest.java index b1f1b937309e..f007b180a6bd 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/StandaloneExecutorFactoryTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/StandaloneExecutorFactoryTest.java @@ -78,14 +78,14 @@ public void setup() { when(topicClient.isTopicExists(configTopicName)).thenReturn(false); when(configStore.getKsqlConfig()).thenReturn(mergedConfig); when(constructor - .create(any(), any(), any(), any(), anyString(), any(), anyBoolean(), any(), any())) + .create(any(), any(), any(), any(), anyString(), any(), anyBoolean(), any(), any(), any())) .thenReturn(standaloneExecutor); } @After public void tearDown() throws Exception { verify(constructor) - .create(any(), any(), any(), engineCaptor.capture(), any(), any(), anyBoolean(), any(), any()); + .create(any(), any(), any(), engineCaptor.capture(), any(), any(), anyBoolean(), any(), any(), any()); engineCaptor.getAllValues().forEach(KsqlEngine::close); } @@ -135,6 +135,6 @@ public void shouldCreateConfigTopicThenGetConfig() { inOrder.verify(topicClient).createTopic(eq(configTopicName), anyInt(), anyShort(), anyMap()); inOrder.verify(configStoreFactory).apply(eq(configTopicName), argThat(sameConfig(baseConfig))); inOrder.verify(constructor).create( - any(), any(), same(mergedConfig), any(), anyString(), any(), anyBoolean(), any(), any()); + any(), any(), same(mergedConfig), any(), anyString(), any(), anyBoolean(), any(), any(), any()); } } \ No newline at end of file diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/StandaloneExecutorTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/StandaloneExecutorTest.java index 1b9ab4cf6f76..0cc6b6b3f30c 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/StandaloneExecutorTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/StandaloneExecutorTest.java @@ -40,6 +40,7 @@ import io.confluent.ksql.parser.KsqlParser.ParsedStatement; import io.confluent.ksql.parser.KsqlParser.PreparedStatement; import io.confluent.ksql.parser.SqlBaseParser.SingleStatementContext; +import io.confluent.ksql.parser.tree.AllColumns; import io.confluent.ksql.parser.tree.CreateStream; import io.confluent.ksql.parser.tree.CreateStreamAsSelect; import io.confluent.ksql.parser.tree.CreateTable; @@ -50,12 +51,15 @@ import io.confluent.ksql.parser.tree.PrimitiveType; import io.confluent.ksql.parser.tree.QualifiedName; import io.confluent.ksql.parser.tree.Query; +import io.confluent.ksql.parser.tree.Select; import io.confluent.ksql.parser.tree.SetProperty; import io.confluent.ksql.parser.tree.StringLiteral; +import io.confluent.ksql.parser.tree.Table; import io.confluent.ksql.parser.tree.TableElement; import io.confluent.ksql.parser.tree.Type.SqlType; import io.confluent.ksql.parser.tree.UnsetProperty; import io.confluent.ksql.schema.inference.SchemaInjector; +import io.confluent.ksql.topic.TopicInjector; import io.confluent.ksql.services.KafkaTopicClient; import io.confluent.ksql.services.ServiceContext; import io.confluent.ksql.util.KsqlConfig; @@ -74,6 +78,7 @@ import java.util.Collections; import java.util.List; import java.util.Optional; +import java.util.OptionalInt; import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -115,18 +120,42 @@ public class StandaloneExecutorTest { private static final CreateStream CREATE_STREAM = new CreateStream( SOME_NAME, SOME_ELEMENTS, true, JSON_PROPS); + private static final CreateStreamAsSelect CREATE_STREAM_AS_SELECT = new CreateStreamAsSelect( + QualifiedName.of("stream"), + new Query( + Optional.empty(), + new Select(ImmutableList.of(new AllColumns(Optional.empty()))), + new Table(QualifiedName.of("sink")), + Optional.empty(), + Optional.empty(), + Optional.empty(), + Optional.empty(), + OptionalInt.empty() + ), + false, + ImmutableMap.of(), + Optional.empty() + ); + + private final static ParsedStatement PARSED_STMT_0 = ParsedStatement .of("sql 0", mock(SingleStatementContext.class)); private final static ParsedStatement PARSED_STMT_1 = ParsedStatement .of("sql 1", mock(SingleStatementContext.class)); + private static final ParsedStatement PARSED_CSAS = ParsedStatement + .of("CSAS", mock(SingleStatementContext.class)); + private final static PreparedStatement PREPARED_STMT_0 = PreparedStatement .of("sql 0", CREATE_STREAM); private final static PreparedStatement PREPARED_STMT_1 = PreparedStatement .of("sql 1", CREATE_STREAM); + private final static PreparedStatement PREPARED_CSAS = PreparedStatement. + of("CSAS", CREATE_STREAM_AS_SELECT); + private final static PreparedStatement STMT_0_WITH_SCHEMA = PreparedStatement .of("sql 0", new CreateStream( QualifiedName.of("CS 0"), @@ -143,6 +172,10 @@ public class StandaloneExecutorTest { Collections.emptyMap() )); + + private final static PreparedStatement CSAS_WITH_TOPIC = PreparedStatement + .of("CSAS_TOPIC", CREATE_STREAM_AS_SELECT); + @Rule public final ExpectedException expectedException = ExpectedException.none(); @@ -174,6 +207,12 @@ public class StandaloneExecutorTest { private SchemaInjector schemaInjector; @Mock private SchemaInjector sandBoxSchemaInjector; + @Mock + private Function topicInjectorFactory; + @Mock + private TopicInjector topicInjector; + @Mock + private TopicInjector sandBoxTopicInjector; private Path queriesFile; private StandaloneExecutor standaloneExecutor; @@ -184,7 +223,6 @@ public void before() throws Exception { givenQueryFileContains("something"); when(serviceContext.getTopicClient()).thenReturn(kafkaTopicClient); - when(serviceContext.getSchemaRegistryClient()).thenReturn(srClient); when(ksqlEngine.parse(any())).thenReturn(ImmutableList.of(PARSED_STMT_0)); @@ -199,12 +237,20 @@ public void before() throws Exception { when(sandBox.prepare(PARSED_STMT_1)).thenReturn((PreparedStatement) PREPARED_STMT_1); when(sandBox.execute(any(), any(), any())).thenReturn(ExecuteResult.of("success")); + when(sandBox.execute(eq(CSAS_WITH_TOPIC), any(), any())) + .thenReturn(ExecuteResult.of(persistentQuery)); when(schemaInjectorFactory.apply(any())).thenReturn(sandBoxSchemaInjector); when(schemaInjectorFactory.apply(serviceContext)).thenReturn(schemaInjector); when(sandBoxSchemaInjector.forStatement(any())).thenAnswer(inv -> inv.getArgument(0)); when(schemaInjector.forStatement(any())).thenAnswer(inv -> inv.getArgument(0)); + when(topicInjectorFactory.apply(any())).thenReturn(sandBoxTopicInjector); + when(topicInjectorFactory.apply(ksqlEngine)).thenReturn(topicInjector); + when(sandBoxTopicInjector.forStatement(any(), any(), any())) + .thenAnswer(inv -> inv.getArgument(0)); + when(topicInjector.forStatement(any(), any(), any())).thenAnswer(inv -> inv.getArgument(0)); + standaloneExecutor = new StandaloneExecutor( serviceContext, processingLogConfig, @@ -214,7 +260,8 @@ public void before() throws Exception { udfLoader, false, versionChecker, - schemaInjectorFactory); + schemaInjectorFactory, + topicInjectorFactory); } @Test @@ -282,7 +329,8 @@ public void shouldNotCreateProcessingLogTopicIfNotConfigured() { udfLoader, false, versionChecker, - schemaInjectorFactory + schemaInjectorFactory, + topicInjectorFactory ); // When: @@ -602,6 +650,21 @@ public void shouldSupportSchemaInference() { verify(ksqlEngine).execute(eq(STMT_1_WITH_SCHEMA), any(), any()); } + @Test + public void shouldSupportTopicInference() { + // Given: + givenQueryFileParsesTo(PREPARED_CSAS); + + when(sandBoxTopicInjector.forStatement(eq(PREPARED_CSAS), any(), any())) + .thenReturn((PreparedStatement) CSAS_WITH_TOPIC); + + // When: + standaloneExecutor.start(); + + // Then: + verify(sandBox).execute(eq(CSAS_WITH_TOPIC), any(), any()); + } + private void givenExecutorWillFailOnNoQueries() { standaloneExecutor = new StandaloneExecutor( serviceContext, @@ -612,7 +675,8 @@ private void givenExecutorWillFailOnNoQueries() { udfLoader, true, versionChecker, - schemaInjectorFactory + schemaInjectorFactory, + topicInjectorFactory ); } diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/DistributingExecutorTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/DistributingExecutorTest.java index 664b100fc9f8..dd35c7ca9b28 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/DistributingExecutorTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/DistributingExecutorTest.java @@ -32,6 +32,7 @@ import io.confluent.ksql.rest.server.computation.CommandId.Action; import io.confluent.ksql.rest.server.computation.CommandId.Type; import io.confluent.ksql.schema.inference.SchemaInjector; +import io.confluent.ksql.topic.TopicInjector; import io.confluent.ksql.services.ServiceContext; import io.confluent.ksql.util.KsqlException; import io.confluent.ksql.util.KsqlServerException; @@ -62,6 +63,7 @@ public class DistributingExecutorTest { @Mock QueuedCommandStatus status; @Mock ServiceContext serviceContext; @Mock SchemaInjector schemaInjector; + @Mock TopicInjector topicInjector; private DistributingExecutor distributor; private AtomicLong scnCounter; @@ -70,12 +72,17 @@ public class DistributingExecutorTest { public void setUp() throws InterruptedException { scnCounter = new AtomicLong(); when(schemaInjector.forStatement(any())).thenAnswer(inv -> inv.getArgument(0)); + when(topicInjector.forStatement(any(), any(), any())).thenAnswer(inv -> inv.getArgument(0)); when(queue.enqueueCommand(any(), any(), any())).thenReturn(status); when(status.tryWaitForFinalStatus(any())).thenReturn(SUCCESS_STATUS); when(status.getCommandId()).thenReturn(CS_COMMAND); when(status.getCommandSequenceNumber()).thenAnswer(inv -> scnCounter.incrementAndGet()); - distributor = new DistributingExecutor(queue, DURATION_10_MS, sc -> schemaInjector); + distributor = new DistributingExecutor( + queue, + DURATION_10_MS, + sc -> schemaInjector, + ec -> topicInjector); } @Test diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/RecoveryTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/RecoveryTest.java index 4d48428b72d7..aa43c602994c 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/RecoveryTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/RecoveryTest.java @@ -40,6 +40,7 @@ import io.confluent.ksql.rest.server.resources.KsqlResource; import io.confluent.ksql.rest.util.ClusterTerminator; import io.confluent.ksql.schema.inference.DefaultSchemaInjector; +import io.confluent.ksql.topic.DefaultTopicInjector; import io.confluent.ksql.schema.inference.SchemaInjector; import io.confluent.ksql.schema.inference.SchemaRegistryTopicSchemaSupplier; import io.confluent.ksql.serde.KsqlTopicSerDe; @@ -181,8 +182,8 @@ private class KsqlServer { fakeCommandQueue, Duration.ofMillis(0), ()->{}, - schemaInjectorFactory - ); + schemaInjectorFactory, + DefaultTopicInjector::new); this.statementExecutor = new StatementExecutor( ksqlConfig, ksqlEngine, diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/KsqlResourceTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/KsqlResourceTest.java index 9d454765ff8b..0dfaa7a3732f 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/KsqlResourceTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/KsqlResourceTest.java @@ -111,6 +111,7 @@ import io.confluent.ksql.rest.util.EntityUtil; import io.confluent.ksql.rest.util.TerminateCluster; import io.confluent.ksql.schema.inference.SchemaInjector; +import io.confluent.ksql.topic.TopicInjector; import io.confluent.ksql.serde.DataSource; import io.confluent.ksql.serde.DataSource.DataSourceType; import io.confluent.ksql.serde.json.KsqlJsonTopicSerDe; @@ -221,6 +222,13 @@ public class KsqlResourceTest { private SchemaInjector schemaInjector; @Mock private SchemaInjector sandboxSchemaInjector; + @Mock + private Function topicInjectorFactory; + @Mock + private TopicInjector topicInjector; + @Mock + private TopicInjector sandboxTopicInjector; + private KsqlResource ksqlResource; private SchemaRegistryClient schemaRegistryClient; private QueuedCommandStatus commandStatus; @@ -269,9 +277,17 @@ public void setUp() throws IOException, RestClientException { when(schemaInjectorFactory.apply(any())).thenReturn(sandboxSchemaInjector); when(schemaInjectorFactory.apply(serviceContext)).thenReturn(schemaInjector); + when(topicInjectorFactory.apply(any())).thenReturn(sandboxTopicInjector); + when(topicInjectorFactory.apply(ksqlEngine)).thenReturn(topicInjector); + when(sandboxSchemaInjector.forStatement(any())).thenAnswer(inv -> inv.getArgument(0)); when(schemaInjector.forStatement(any())).thenAnswer(inv -> inv.getArgument(0)); + when(sandboxTopicInjector.forStatement(any(), any(), any())) + .thenAnswer(inv -> inv.getArgument(0)); + when(topicInjector.forStatement(any(), any(), any())) + .thenAnswer(inv -> inv.getArgument(0)); + setUpKsqlResource(); } @@ -649,6 +665,94 @@ public void shouldDistributeAvoCreateStatementWithColumns() { )), any(), any()); } + @Test + public void shouldSupportTopicInferenceInVerification() { + // Given: + final Schema schema = SchemaBuilder.struct().field("f1", Schema.OPTIONAL_STRING_SCHEMA).build(); + givenMockEngine(); + givenSource(DataSourceType.KSTREAM, "ORDERS1", "ORDERS1", "ORDERS1", schema); + + final String sql = "CREATE STREAM orders2 AS SELECT * FROM orders1;"; + final String sqlWithTopic = "CREATE STREAM orders2 WITH(kafka_topic='orders2') AS SELECT * FROM orders1;"; + + final PreparedStatement statementWithTopic = + ksqlEngine.prepare(ksqlEngine.parse(sqlWithTopic).get(0)); + + when(sandboxTopicInjector.forStatement(argThat(is(preparedStatementText(sql))), any(), any())) + .thenReturn(statementWithTopic); + + + // When: + makeRequest(sql); + + // Then: + verify(sandbox).execute(eq(statementWithTopic), any(), any()); + verify(commandStore).enqueueCommand(argThat(preparedStatementText(sql)), any(), any()); + } + + @Test + public void shouldSupportTopicInferenceInExecution() { + // Given: + final Schema schema = SchemaBuilder.struct().field("f1", Schema.OPTIONAL_STRING_SCHEMA).build(); + givenMockEngine(); + givenSource(DataSourceType.KSTREAM, "ORDERS1", "ORDERS1", "ORDERS1", schema); + + final String sql = "CREATE STREAM orders2 AS SELECT * FROM orders1;"; + final String sqlWithTopic = "CREATE STREAM orders2 WITH(kafka_topic='orders2') AS SELECT * FROM orders1;"; + + final PreparedStatement statementWithTopic = + ksqlEngine.prepare(ksqlEngine.parse(sqlWithTopic).get(0)); + + when(topicInjector.forStatement(argThat(is(preparedStatementText(sql))), any(), any())) + .thenReturn(statementWithTopic); + + + // When: + makeRequest(sql); + + // Then: + verify(commandStore).enqueueCommand(eq(statementWithTopic), any(), any()); + } + + @Test + public void shouldFailWhenTopicInferenceFailsDuringValidate() { + // Given: + final Schema schema = SchemaBuilder.struct().field("f1", Schema.OPTIONAL_STRING_SCHEMA).build(); + givenSource(DataSourceType.KSTREAM, "ORDERS1", "ORDERS1", "ORDERS1", schema); + when(sandboxTopicInjector.forStatement(any(), any(), any())) + .thenThrow(new KsqlStatementException("boom", "sql")); + + // When: + final KsqlErrorMessage result = makeFailingRequest( + "CREATE STREAM orders2 AS SELECT * FROM orders1;", + Code.BAD_REQUEST); + + // Then: + assertThat(result.getErrorCode(), is(Errors.ERROR_CODE_BAD_STATEMENT)); + assertThat(result.getMessage(), is("boom")); + } + + @Test + public void shouldFailWhenTopicInferenceFailsDuringExecute() { + // Given: + final Schema schema = SchemaBuilder.struct().field("f1", Schema.OPTIONAL_STRING_SCHEMA).build(); + givenSource(DataSourceType.KSTREAM, "ORDERS1", "ORDERS1", "ORDERS1", schema); + + when(topicInjector.forStatement(any(), any(), any())) + .thenThrow(new KsqlStatementException("boom", "some-sql")); + + // Then: + expectedException.expect(KsqlRestException.class); + expectedException.expect(exceptionStatusCode(is(Code.BAD_REQUEST))); + expectedException + .expect(exceptionErrorMessage(errorCode(is(Errors.ERROR_CODE_BAD_STATEMENT)))); + expectedException + .expect(exceptionStatementErrorMessage(errorMessage(is("boom")))); + + // When: + makeRequest("CREATE STREAM orders2 AS SELECT * FROM orders1;"); + } + @Test public void shouldSupportSchemaInference() { // Given: @@ -1623,6 +1727,7 @@ private void givenMockEngine() { when(sandbox.prepare(any())) .thenAnswer(invocation -> realEngine.createSandbox().prepare(invocation.getArgument(0))); when(ksqlEngine.createSandbox()).thenReturn(sandbox); + when(topicInjectorFactory.apply(ksqlEngine)).thenReturn(topicInjector); setUpKsqlResource(); } @@ -1769,7 +1874,7 @@ private static void validateQueryDescription( private void setUpKsqlResource() { ksqlResource = new KsqlResource( ksqlConfig, ksqlEngine, serviceContext, commandStore, DISTRIBUTED_COMMAND_RESPONSE_TIMEOUT, - activenessRegistrar, schemaInjectorFactory); + activenessRegistrar, schemaInjectorFactory, topicInjectorFactory); } private void givenKsqlConfigWith(final Map additionalConfig) { diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/validation/RequestValidatorTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/validation/RequestValidatorTest.java index 684996b570f8..5f86e35a0917 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/validation/RequestValidatorTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/validation/RequestValidatorTest.java @@ -43,6 +43,7 @@ import io.confluent.ksql.schema.inference.SchemaInjector; import io.confluent.ksql.services.ServiceContext; import io.confluent.ksql.services.TestServiceContext; +import io.confluent.ksql.topic.TopicInjector; import io.confluent.ksql.util.KsqlConfig; import io.confluent.ksql.util.KsqlConstants; import io.confluent.ksql.util.KsqlException; @@ -76,6 +77,8 @@ public class RequestValidatorTest { StatementValidator statementValidator; @Mock SchemaInjector schemaInjector; + @Mock + TopicInjector topicInjector; private ServiceContext serviceContext; private MutableMetaStore metaStore; @@ -95,6 +98,7 @@ public void setUp() { when(ksqlConfig.getInt(KsqlConfig.KSQL_ACTIVE_PERSISTENT_QUERY_LIMIT_CONFIG)) .thenReturn(Integer.MAX_VALUE); when(schemaInjector.forStatement(any())).thenAnswer(inv -> inv.getArgument(0)); + when(topicInjector.forStatement(any(), any(), any())).thenAnswer(inv -> inv.getArgument(0)); final KsqlStream source = mock(KsqlStream.class); when(source.getName()).thenReturn("SOURCE"); @@ -258,6 +262,7 @@ private void givenRequestValidator( validator = new RequestValidator( customValidators, sc -> schemaInjector, + ec -> topicInjector, () -> executionContext, serviceContext, ksqlConfig