From 8058c5def4476aee5b70b1294ee8ab8269c2fc3f Mon Sep 17 00:00:00 2001 From: Andy Coates Date: Tue, 17 Sep 2019 20:45:06 +0100 Subject: [PATCH 1/4] feat(static): static select support static queries now support arbitrary select expressions. --- .../io/confluent/ksql/util/SchemaUtil.java | 1 + .../io/confluent/ksql/analyzer/Analyzer.java | 22 +- .../analyzer/ContinuousQueryValidator.java | 46 ++ .../ksql/analyzer/ExpressionAnalyzer.java | 57 ++- .../ksql/analyzer/QueryAnalyzer.java | 79 +-- .../ksql/analyzer/QueryValidator.java | 30 ++ .../ksql/analyzer/StaticQueryValidator.java | 77 +++ ...rTest.java => AnalyzerFunctionalTest.java} | 9 +- .../ContinuousQueryValidatorTest.java | 66 +++ .../ksql/analyzer/ExpressionAnalyzerTest.java | 163 ++++++ .../ksql/analyzer/QueryAnalyzerTest.java | 483 ++---------------- .../analyzer/QueryFunctionalAnalyzerTest.java | 477 +++++++++++++++++ .../analyzer/StaticQueryValidatorTest.java | 160 ++++++ ...materialized-aggregate-static-queries.json | 65 ++- .../server/execution/StaticQueryExecutor.java | 202 +++++--- 15 files changed, 1355 insertions(+), 582 deletions(-) create mode 100644 ksql-engine/src/main/java/io/confluent/ksql/analyzer/ContinuousQueryValidator.java create mode 100644 ksql-engine/src/main/java/io/confluent/ksql/analyzer/QueryValidator.java create mode 100644 ksql-engine/src/main/java/io/confluent/ksql/analyzer/StaticQueryValidator.java rename ksql-engine/src/test/java/io/confluent/ksql/analyzer/{AnalyzerTest.java => AnalyzerFunctionalTest.java} (98%) create mode 100644 ksql-engine/src/test/java/io/confluent/ksql/analyzer/ContinuousQueryValidatorTest.java create mode 100644 ksql-engine/src/test/java/io/confluent/ksql/analyzer/ExpressionAnalyzerTest.java create mode 100644 ksql-engine/src/test/java/io/confluent/ksql/analyzer/QueryFunctionalAnalyzerTest.java create mode 100644 ksql-engine/src/test/java/io/confluent/ksql/analyzer/StaticQueryValidatorTest.java diff --git a/ksql-common/src/main/java/io/confluent/ksql/util/SchemaUtil.java b/ksql-common/src/main/java/io/confluent/ksql/util/SchemaUtil.java index 8438e4ac509e..34cd02471ba4 100644 --- a/ksql-common/src/main/java/io/confluent/ksql/util/SchemaUtil.java +++ b/ksql-common/src/main/java/io/confluent/ksql/util/SchemaUtil.java @@ -48,6 +48,7 @@ public final class SchemaUtil { public static final String ROWKEY_NAME = "ROWKEY"; public static final String ROWTIME_NAME = "ROWTIME"; + public static final String WINDOWSTART_NAME = "WINDOWSTART"; public static final int ROWKEY_INDEX = 1; diff --git a/ksql-engine/src/main/java/io/confluent/ksql/analyzer/Analyzer.java b/ksql-engine/src/main/java/io/confluent/ksql/analyzer/Analyzer.java index 52278fa96c08..190c5fddc1c5 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/analyzer/Analyzer.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/analyzer/Analyzer.java @@ -131,7 +131,7 @@ Analysis analyze( final Query query, final Optional sink ) { - final Visitor visitor = new Visitor(); + final Visitor visitor = new Visitor(query.isStatic()); visitor.process(query, null); sink.ifPresent(visitor::analyzeNonStdOutSink); @@ -146,9 +146,14 @@ private final class Visitor extends DefaultTraversalVisitor { // CHECKSTYLE_RULES.ON: ClassDataAbstractionCoupling private final Analysis analysis = new Analysis(); + private final boolean staticQuery; private boolean isJoin = false; private boolean isGroupBy = false; + Visitor(final boolean staticQuery) { + this.staticQuery = staticQuery; + } + private void analyzeNonStdOutSink(final Sink sink) { analysis.setProperties(sink.getProperties()); sink.getPartitionBy().ifPresent(analysis::setPartitionBy); @@ -301,19 +306,26 @@ private void throwOnUnknownColumnReference() { new ExpressionAnalyzer(analysis.getFromSourceSchemas()); for (final Expression selectExpression : analysis.getSelectExpressions()) { - expressionAnalyzer.analyzeExpression(selectExpression); + expressionAnalyzer.analyzeExpression(selectExpression, false); } if (analysis.getWhereExpression() != null) { - expressionAnalyzer.analyzeExpression(analysis.getWhereExpression()); + final boolean allowWindowMetaFields = staticQuery + && analysis.getFromDataSources().get(0) + .getDataSource() + .getKsqlTopic() + .getKeyFormat() + .isWindowed(); + + expressionAnalyzer.analyzeExpression(analysis.getWhereExpression(), allowWindowMetaFields); } for (final Expression expression : analysis.getGroupByExpressions()) { - expressionAnalyzer.analyzeExpression(expression); + expressionAnalyzer.analyzeExpression(expression, false); } if (analysis.getHavingExpression() != null) { - expressionAnalyzer.analyzeExpression(analysis.getHavingExpression()); + expressionAnalyzer.analyzeExpression(analysis.getHavingExpression(), false); } } diff --git a/ksql-engine/src/main/java/io/confluent/ksql/analyzer/ContinuousQueryValidator.java b/ksql-engine/src/main/java/io/confluent/ksql/analyzer/ContinuousQueryValidator.java new file mode 100644 index 000000000000..57061aba217f --- /dev/null +++ b/ksql-engine/src/main/java/io/confluent/ksql/analyzer/ContinuousQueryValidator.java @@ -0,0 +1,46 @@ +/* + * Copyright 2019 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.analyzer; + +import io.confluent.ksql.parser.tree.Query; +import io.confluent.ksql.parser.tree.ResultMaterialization; +import io.confluent.ksql.parser.tree.Sink; +import io.confluent.ksql.util.KsqlException; +import java.util.Optional; + +public class ContinuousQueryValidator implements QueryValidator { + + @Override + public void preValidate( + final Query query, + final Optional sink + ) { + if (query.isStatic()) { + throw new IllegalArgumentException("static"); + } + + if (query.getResultMaterialization() != ResultMaterialization.CHANGES) { + throw new KsqlException("Continuous queries do not yet support `EMIT FINAL`. " + + "Consider changing to `EMIT CHANGES`." + + QueryAnalyzer.NEW_QUERY_SYNTAX_HELP + ); + } + } + + @Override + public void postValidate(final Analysis analysis) { + } +} diff --git a/ksql-engine/src/main/java/io/confluent/ksql/analyzer/ExpressionAnalyzer.java b/ksql-engine/src/main/java/io/confluent/ksql/analyzer/ExpressionAnalyzer.java index 758c583b3829..0cded2fceb21 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/analyzer/ExpressionAnalyzer.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/analyzer/ExpressionAnalyzer.java @@ -45,34 +45,18 @@ class ExpressionAnalyzer { this.sourceSchemas = Objects.requireNonNull(sourceSchemas, "sourceSchemas"); } - void analyzeExpression(final Expression expression) { - final Visitor visitor = new Visitor(); + void analyzeExpression(final Expression expression, final boolean allowWindowMetaFields) { + final Visitor visitor = new Visitor(allowWindowMetaFields); visitor.process(expression, null); } - private void throwOnUnknownField(final QualifiedName name) { - final Set sourcesWithField = sourceSchemas.sourcesWithField(name.name()); - if (sourcesWithField.isEmpty()) { - throw new KsqlException("Field '" + name + "' cannot be resolved."); - } + private final class Visitor extends VisitParentExpressionVisitor { - if (name.qualifier().isPresent()) { - if (!sourcesWithField.contains(name.qualifier().get())) { - throw new KsqlException("Source '" + name.qualifier() + "', " - + "used in '" + name + "' cannot be resolved."); - } - } else if (sourcesWithField.size() > 1) { - final String possibilities = sourcesWithField.stream() - .sorted() - .map(source -> SchemaUtil.buildAliasedFieldName(source, name.name())) - .collect(Collectors.joining(",")); - - throw new KsqlException("Field '" + name + "' is ambiguous. " - + "Could be any of: " + possibilities); - } - } + private final boolean allowWindowMetaFields; - private class Visitor extends VisitParentExpressionVisitor { + Visitor(final boolean allowWindowMetaFields) { + this.allowWindowMetaFields = allowWindowMetaFields; + } public Object visitLikePredicate(final LikePredicate node, final Object context) { process(node.getValue(), null); @@ -138,5 +122,32 @@ public Object visitQualifiedNameReference( throwOnUnknownField(node.getName()); return null; } + + private void throwOnUnknownField(final QualifiedName name) { + final Set sourcesWithField = sourceSchemas.sourcesWithField(name.name()); + if (sourcesWithField.isEmpty()) { + if (allowWindowMetaFields && name.name().equals(SchemaUtil.WINDOWSTART_NAME)) { + return; + } + + throw new KsqlException("Field '" + name + "' cannot be resolved."); + } + + if (name.qualifier().isPresent()) { + final String qualifier = name.qualifier().get(); + if (!sourcesWithField.contains(qualifier)) { + throw new KsqlException("Source '" + qualifier + "', " + + "used in '" + name + "' cannot be resolved."); + } + } else if (sourcesWithField.size() > 1) { + final String possibilities = sourcesWithField.stream() + .sorted() + .map(source -> SchemaUtil.buildAliasedFieldName(source, name.name())) + .collect(Collectors.joining(", ")); + + throw new KsqlException("Field '" + name + "' is ambiguous. " + + "Could be any of: " + possibilities); + } + } } } diff --git a/ksql-engine/src/main/java/io/confluent/ksql/analyzer/QueryAnalyzer.java b/ksql-engine/src/main/java/io/confluent/ksql/analyzer/QueryAnalyzer.java index 9d5d42d37cb2..46dd0f4545c3 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/analyzer/QueryAnalyzer.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/analyzer/QueryAnalyzer.java @@ -17,6 +17,7 @@ import static java.util.Objects.requireNonNull; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Sets; import com.google.common.collect.Sets.SetView; @@ -27,7 +28,6 @@ import io.confluent.ksql.metastore.MetaStore; import io.confluent.ksql.parser.rewrite.ExpressionTreeRewriter; import io.confluent.ksql.parser.tree.Query; -import io.confluent.ksql.parser.tree.ResultMaterialization; import io.confluent.ksql.parser.tree.Sink; import io.confluent.ksql.serde.SerdeOption; import io.confluent.ksql.util.AggregateExpressionRewriter; @@ -40,33 +40,49 @@ public class QueryAnalyzer { - private static final String NEW_QUERY_SYNTAX_HELP = - "'EMIT CHANGES' is used to indicate a query is continuous and outputs all changes." - + System.lineSeparator() - + "'Bare queries, e.g. those in the format 'SELECT * FROM X ...' are now, by default, " - + "static queries, i.e. they query the current state of the system and return a final " - + "result." - + System.lineSeparator() - + "To turn a static query into a streaming query, as was the default in older versions " - + "of KSQL, add `EMIT CHANGES` to the end of the statement, before any limit clause." - + System.lineSeparator() - + "Persistent queries, e.g. `CREATE STREAM AS ...`, currently have an implicit " - + "`EMIT CHANGES`. However, it is recommended to add `EMIT CHANGES` to such statements " - + "as a this will be required in a future release."; - + static final String NEW_QUERY_SYNTAX_HELP = System.lineSeparator() + + "'EMIT CHANGES' is used to indicate a query is continuous and outputs all changes." + + System.lineSeparator() + + "'Bare queries, e.g. those in the format 'SELECT * FROM X ...' are now, by default, " + + "static queries, i.e. they query the current state of the system and return a final " + + "result." + + System.lineSeparator() + + "To turn a static query into a streaming query, as was the default in older versions " + + "of KSQL, add `EMIT CHANGES` to the end of the statement, before any limit clause." + + System.lineSeparator() + + "Persistent queries, e.g. `CREATE STREAM AS ...`, currently have an implicit " + + "`EMIT CHANGES`. However, it is recommended to add `EMIT CHANGES` to such statements " + + "as a this will be required in a future release."; + + private final Analyzer analyzer; private final MetaStore metaStore; - private final String outputTopicPrefix; - private final Set defaultSerdeOptions; + private final QueryValidator continuousValidator; + private final QueryValidator staticValidator; public QueryAnalyzer( final MetaStore metaStore, final String outputTopicPrefix, final Set defaultSerdeOptions + ) { + this( + metaStore, + new Analyzer(metaStore, outputTopicPrefix, defaultSerdeOptions), + new ContinuousQueryValidator(), + new StaticQueryValidator() + ); + } + + @VisibleForTesting + QueryAnalyzer( + final MetaStore metaStore, + final Analyzer analyzer, + final QueryValidator continuousValidator, + final QueryValidator staticValidator ) { this.metaStore = requireNonNull(metaStore, "metaStore"); - this.outputTopicPrefix = requireNonNull(outputTopicPrefix, "outputTopicPrefix"); - this.defaultSerdeOptions = ImmutableSet.copyOf( - requireNonNull(defaultSerdeOptions, "defaultSerdeOptions")); + this.analyzer = requireNonNull(analyzer, "analyzer"); + this.continuousValidator = requireNonNull(continuousValidator, "continuousValidator"); + this.staticValidator = requireNonNull(staticValidator, "staticValidator"); } public Analysis analyze( @@ -74,23 +90,20 @@ public Analysis analyze( final Optional sink ) { if (query.isStatic()) { - throw new KsqlException("Static queries are not yet supported. " - + "Consider adding 'EMIT CHANGES' to any bare query, " - + System.lineSeparator() - + NEW_QUERY_SYNTAX_HELP - ); + staticValidator.preValidate(query, sink); + } else { + continuousValidator.preValidate(query, sink); } - if (query.getResultMaterialization() != ResultMaterialization.CHANGES) { - throw new KsqlException("Continous queries do not yet support `EMIT FINAL`. " - + "Consider changing to `EMIT CHANGES`." - + System.lineSeparator() - + NEW_QUERY_SYNTAX_HELP - ); + final Analysis analysis = analyzer.analyze(query, sink); + + if (query.isStatic()) { + staticValidator.postValidate(analysis); + } else { + continuousValidator.postValidate(analysis); } - return new Analyzer(metaStore, outputTopicPrefix, defaultSerdeOptions) - .analyze(query, sink); + return analysis; } public AggregateAnalysis analyzeAggregate(final Query query, final Analysis analysis) { diff --git a/ksql-engine/src/main/java/io/confluent/ksql/analyzer/QueryValidator.java b/ksql-engine/src/main/java/io/confluent/ksql/analyzer/QueryValidator.java new file mode 100644 index 000000000000..5a3b8f949323 --- /dev/null +++ b/ksql-engine/src/main/java/io/confluent/ksql/analyzer/QueryValidator.java @@ -0,0 +1,30 @@ +/* + * Copyright 2019 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.analyzer; + +import io.confluent.ksql.parser.tree.Query; +import io.confluent.ksql.parser.tree.Sink; +import java.util.Optional; + +/** + * Vaidator used by {@link QueryAnalyzer}. + */ +interface QueryValidator { + + void preValidate(Query query, Optional sink); + + void postValidate(Analysis analysis); +} diff --git a/ksql-engine/src/main/java/io/confluent/ksql/analyzer/StaticQueryValidator.java b/ksql-engine/src/main/java/io/confluent/ksql/analyzer/StaticQueryValidator.java new file mode 100644 index 000000000000..58307c34c0ac --- /dev/null +++ b/ksql-engine/src/main/java/io/confluent/ksql/analyzer/StaticQueryValidator.java @@ -0,0 +1,77 @@ +/* + * Copyright 2019 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.analyzer; + +import io.confluent.ksql.parser.tree.Query; +import io.confluent.ksql.parser.tree.ResultMaterialization; +import io.confluent.ksql.parser.tree.Sink; +import io.confluent.ksql.util.KsqlException; +import java.util.Optional; + +public class StaticQueryValidator implements QueryValidator { + + @Override + public void preValidate( + final Query query, + final Optional sink + ) { + if (!query.isStatic()) { + throw new IllegalArgumentException("not static"); + } + + if (query.getResultMaterialization() != ResultMaterialization.FINAL) { + throw new KsqlException("Static queries do not yet support `EMIT CHANGES`. " + + "Consider removing 'EMIT CHANGES' to any bare query, " + + QueryAnalyzer.NEW_QUERY_SYNTAX_HELP + ); + } + + if (sink.isPresent()) { + throw new IllegalArgumentException("static queries should not have a sink"); + } + } + + @Override + public void postValidate(final Analysis analysis) { + if (analysis.getInto().isPresent()) { + throw new KsqlException("Static queries do not support outputting to sinks."); + } + + if (analysis.isJoin()) { + throw new KsqlException("Static queries do not support joins."); + } + + if (analysis.getWindowExpression() != null) { + throw new KsqlException("Static queries do not support WINDOW clauses."); + } + + if (!analysis.getGroupByExpressions().isEmpty()) { + throw new KsqlException("Static queries do not support GROUP BY clauses."); + } + + if (analysis.getPartitionBy().isPresent()) { + throw new KsqlException("Static queries do not support PARTITION BY clauses."); + } + + if (analysis.getHavingExpression() != null) { + throw new KsqlException("Static queries do not support HAVING clauses."); + } + + if (analysis.getLimitClause().isPresent()) { + throw new KsqlException("Static queries do not support LIMIT clauses."); + } + } +} diff --git a/ksql-engine/src/test/java/io/confluent/ksql/analyzer/AnalyzerTest.java b/ksql-engine/src/test/java/io/confluent/ksql/analyzer/AnalyzerFunctionalTest.java similarity index 98% rename from ksql-engine/src/test/java/io/confluent/ksql/analyzer/AnalyzerTest.java rename to ksql-engine/src/test/java/io/confluent/ksql/analyzer/AnalyzerFunctionalTest.java index e4143e430416..dc0d143d8830 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/analyzer/AnalyzerTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/analyzer/AnalyzerFunctionalTest.java @@ -75,9 +75,16 @@ import org.mockito.Mock; import org.mockito.junit.MockitoJUnitRunner; +/** + * DO NOT ADD NEW TESTS TO THIS FILE + * + *

Instead add new JSON based tests to QueryTranslationTest. + * + *

This test file is more of a functional test, which is better implemented using QTT. + */ @SuppressWarnings("OptionalGetWithoutIsPresent") @RunWith(MockitoJUnitRunner.class) -public class AnalyzerTest { +public class AnalyzerFunctionalTest { private static final Set DEFAULT_SERDE_OPTIONS = SerdeOption.none(); diff --git a/ksql-engine/src/test/java/io/confluent/ksql/analyzer/ContinuousQueryValidatorTest.java b/ksql-engine/src/test/java/io/confluent/ksql/analyzer/ContinuousQueryValidatorTest.java new file mode 100644 index 000000000000..54c67aba6075 --- /dev/null +++ b/ksql-engine/src/test/java/io/confluent/ksql/analyzer/ContinuousQueryValidatorTest.java @@ -0,0 +1,66 @@ +/* + * Copyright 2019 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.analyzer; + +import static org.mockito.Mockito.when; + +import io.confluent.ksql.parser.tree.Query; +import io.confluent.ksql.parser.tree.ResultMaterialization; +import io.confluent.ksql.parser.tree.Sink; +import io.confluent.ksql.util.KsqlException; +import java.util.Optional; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; + +@RunWith(MockitoJUnitRunner.class) +public class ContinuousQueryValidatorTest { + + @Rule + public final ExpectedException expectedException = ExpectedException.none(); + + @Mock + private Query query; + @Mock + private Sink sink; + + private QueryValidator validator; + + @Before + public void setUp() { + validator = new ContinuousQueryValidator(); + + when(query.isStatic()).thenReturn(false); + } + + @Test + public void shouldThrowOnContinuousQueryThatIsFinal() { + // Given: + when(query.isStatic()).thenReturn(false); + when(query.getResultMaterialization()).thenReturn(ResultMaterialization.FINAL); + + // Then: + expectedException.expect(KsqlException.class); + expectedException.expectMessage("Continuous queries do not yet support `EMIT FINAL`."); + + // When: + validator.preValidate(query, Optional.empty()); + } +} \ No newline at end of file diff --git a/ksql-engine/src/test/java/io/confluent/ksql/analyzer/ExpressionAnalyzerTest.java b/ksql-engine/src/test/java/io/confluent/ksql/analyzer/ExpressionAnalyzerTest.java new file mode 100644 index 000000000000..21207a20de91 --- /dev/null +++ b/ksql-engine/src/test/java/io/confluent/ksql/analyzer/ExpressionAnalyzerTest.java @@ -0,0 +1,163 @@ +/* + * Copyright 2019 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.analyzer; + +import static org.mockito.Mockito.when; + +import com.google.common.collect.ImmutableSet; +import io.confluent.ksql.execution.expression.tree.ComparisonExpression; +import io.confluent.ksql.execution.expression.tree.ComparisonExpression.Type; +import io.confluent.ksql.execution.expression.tree.Expression; +import io.confluent.ksql.execution.expression.tree.QualifiedName; +import io.confluent.ksql.execution.expression.tree.QualifiedNameReference; +import io.confluent.ksql.execution.expression.tree.StringLiteral; +import io.confluent.ksql.util.KsqlException; +import io.confluent.ksql.util.SchemaUtil; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; + +@RunWith(MockitoJUnitRunner.class) +public class ExpressionAnalyzerTest { + + private static final Expression WINDOW_START_EXP = new QualifiedNameReference( + QualifiedName.of("something", SchemaUtil.WINDOWSTART_NAME) + ); + + private static final Expression OTHER_EXP = new StringLiteral("foo"); + + @Rule + public final ExpectedException expectedException = ExpectedException.none(); + + @Mock + private SourceSchemas sourceSchemas; + private ExpressionAnalyzer analyzer; + + @Before + public void setUp() { + analyzer = new ExpressionAnalyzer(sourceSchemas); + } + + @Test + public void shouldNotThrowOnWindowStartIfAllowed() { + // Given: + final Expression expression = new ComparisonExpression( + Type.EQUAL, + WINDOW_START_EXP, + OTHER_EXP + ); + + // When: + analyzer.analyzeExpression(expression, true); + + // Then: did not throw + } + + @Test + public void shouldThrowOnWindowStartIfNotAllowed() { + // Given: + final Expression expression = new ComparisonExpression( + Type.EQUAL, + WINDOW_START_EXP, + OTHER_EXP + ); + + // Then: + expectedException.expect(KsqlException.class); + expectedException.expectMessage( + "Field 'something.WINDOWSTART' cannot be resolved."); + + // When: + analyzer.analyzeExpression(expression, false); + } + + @Test + public void shouldNotThrowOnMultipleSourcesIfFullyQualified() { + // Given: + final Expression expression = new QualifiedNameReference( + QualifiedName.of("fully", "qualified") + ); + + when(sourceSchemas.sourcesWithField("qualified")) + .thenReturn(ImmutableSet.of("multiple", "sources", "fully")); + + // When: + analyzer.analyzeExpression(expression, true); + + // Then: did not throw + } + + @Test + public void shouldThrowOnMultipleSourcesIfFullyQualifiedButNoMatch() { + // Given: + final Expression expression = new QualifiedNameReference( + QualifiedName.of("fully", "qualified") + ); + + when(sourceSchemas.sourcesWithField("qualified")) + .thenReturn(ImmutableSet.of("not-fully", "also-not-fully")); + + // Then: + expectedException.expect(KsqlException.class); + expectedException.expectMessage( + "Source 'fully', used in 'fully.qualified' cannot be resolved."); + + // When: + analyzer.analyzeExpression(expression, true); + } + + @Test + public void shouldThrowOnMultipleSourcesIfNotFullyQualified() { + // Given: + final Expression expression = new QualifiedNameReference( + QualifiedName.of("just-name") + ); + + when(sourceSchemas.sourcesWithField("just-name")) + .thenReturn(ImmutableSet.of("multiple", "sources")); + + // Then: + expectedException.expect(KsqlException.class); + expectedException.expectMessage( + "Field 'just-name' is ambiguous. Could be any of: multiple.just-name, sources.just-name"); + + // When: + analyzer.analyzeExpression(expression, true); + } + + @Test + public void shouldThrowOnNoSources() { + // Given: + final Expression expression = new QualifiedNameReference( + QualifiedName.of("just-name") + ); + + when(sourceSchemas.sourcesWithField("just-name")) + .thenReturn(ImmutableSet.of()); + + // Then: + expectedException.expect(KsqlException.class); + expectedException.expectMessage( + "Field 'just-name' cannot be resolved."); + + // When: + analyzer.analyzeExpression(expression, true); + } +} \ No newline at end of file diff --git a/ksql-engine/src/test/java/io/confluent/ksql/analyzer/QueryAnalyzerTest.java b/ksql-engine/src/test/java/io/confluent/ksql/analyzer/QueryAnalyzerTest.java index fafe47ce1868..665542871de5 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/analyzer/QueryAnalyzerTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/analyzer/QueryAnalyzerTest.java @@ -1,5 +1,5 @@ /* - * Copyright 2018 Confluent Inc. + * Copyright 2019 Confluent Inc. * * Licensed under the Confluent Community License (the "License"); you may not use * this file except in compliance with the License. You may obtain a copy of the @@ -15,472 +15,85 @@ package io.confluent.ksql.analyzer; -import static io.confluent.ksql.util.ExpressionMatchers.qualifiedNameExpressions; -import static org.hamcrest.CoreMatchers.equalTo; -import static org.hamcrest.CoreMatchers.instanceOf; -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.contains; -import static org.hamcrest.Matchers.containsInAnyOrder; -import static org.hamcrest.Matchers.hasItem; -import static org.hamcrest.Matchers.hasSize; -import static org.hamcrest.Matchers.is; -import static org.hamcrest.Matchers.not; -import static org.mockito.Mockito.mock; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; -import io.confluent.ksql.analyzer.Analysis.AliasedDataSource; -import io.confluent.ksql.analyzer.Analysis.Into; -import io.confluent.ksql.execution.expression.tree.ComparisonExpression; -import io.confluent.ksql.execution.expression.tree.Expression; -import io.confluent.ksql.execution.expression.tree.IntegerLiteral; -import io.confluent.ksql.execution.expression.tree.QualifiedName; -import io.confluent.ksql.execution.expression.tree.QualifiedNameReference; -import io.confluent.ksql.function.InternalFunctionRegistry; import io.confluent.ksql.metastore.MetaStore; -import io.confluent.ksql.metastore.model.DataSource; -import io.confluent.ksql.metastore.model.KsqlStream; -import io.confluent.ksql.metastore.model.KsqlTable; -import io.confluent.ksql.parser.KsqlParser.PreparedStatement; -import io.confluent.ksql.parser.KsqlParserTestUtil; -import io.confluent.ksql.parser.tree.CreateStreamAsSelect; -import io.confluent.ksql.parser.tree.CreateTableAsSelect; -import io.confluent.ksql.parser.tree.InsertInto; import io.confluent.ksql.parser.tree.Query; import io.confluent.ksql.parser.tree.Sink; -import io.confluent.ksql.serde.Format; -import io.confluent.ksql.serde.SerdeOption; -import io.confluent.ksql.util.KsqlException; -import io.confluent.ksql.util.MetaStoreFixture; -import java.util.Arrays; -import java.util.Collections; import java.util.Optional; +import org.junit.Before; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.mockito.InOrder; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.MockitoJUnitRunner; -@SuppressWarnings("OptionalGetWithoutIsPresent") +@RunWith(MockitoJUnitRunner.class) public class QueryAnalyzerTest { - private static final QualifiedNameReference ITEM_ID = - new QualifiedNameReference(QualifiedName.of("ORDERS", "ITEMID")); - - private static final QualifiedNameReference ORDER_ID = - new QualifiedNameReference(QualifiedName.of("ORDERS", "ORDERID")); - - private static final QualifiedNameReference ORDER_UNITS = - new QualifiedNameReference(QualifiedName.of("ORDERS", "ORDERUNITS")); - @Rule public final ExpectedException expectedException = ExpectedException.none(); - private final MetaStore metaStore = MetaStoreFixture.getNewMetaStore(new InternalFunctionRegistry()); - private final QueryAnalyzer queryAnalyzer = - new QueryAnalyzer(metaStore, "prefix-~", SerdeOption.none()); - - @Test - public void shouldCreateAnalysisForSimpleQuery() { - // Given: - final Query query = givenQuery("select orderid from orders EMIT CHANGES;"); - - // When: - final Analysis analysis = queryAnalyzer.analyze(query, Optional.empty()); - - // Then: - final AliasedDataSource fromDataSource = analysis.getFromDataSources().get(0); - assertThat(analysis.getSelectExpressions(), equalTo(Collections.singletonList(ORDER_ID))); - assertThat(analysis.getFromDataSources(), hasSize(1)); - assertThat(fromDataSource.getDataSource(), instanceOf(KsqlStream.class)); - assertThat(fromDataSource.getAlias(), equalTo("ORDERS")); - } - - @Test - public void shouldCreateAnalysisForCsas() { - // Given: - final PreparedStatement statement = KsqlParserTestUtil.buildSingleAst( - "create stream s as select col1 from test1 EMIT CHANGES;", metaStore); - final Query query = statement.getStatement().getQuery(); - final Optional sink = Optional.of(statement.getStatement().getSink()); - - // When: - final Analysis analysis = queryAnalyzer.analyze(query, sink); - - // Then: - assertThat(analysis.getSelectExpressions(), contains( - new QualifiedNameReference(QualifiedName.of("TEST1", "COL1")))); - - assertThat(analysis.getFromDataSources(), hasSize(1)); - - final AliasedDataSource fromDataSource = analysis.getFromDataSources().get(0); - assertThat(fromDataSource.getDataSource(), instanceOf(KsqlStream.class)); - assertThat(fromDataSource.getAlias(), equalTo("TEST1")); - assertThat(analysis.getInto().get().getName(), is("S")); - } - - @Test - public void shouldCreateAnalysisForCtas() { - // Given: - final PreparedStatement statement = KsqlParserTestUtil.buildSingleAst( - "create table t as select col1 from test2 EMIT CHANGES;", metaStore); - final Query query = statement.getStatement().getQuery(); - final Optional sink = Optional.of(statement.getStatement().getSink()); - - // When: - final Analysis analysis = queryAnalyzer.analyze(query, sink); - - // Then: - assertThat(analysis.getSelectExpressions(), contains( - new QualifiedNameReference(QualifiedName.of("TEST2", "COL1")))); - - assertThat(analysis.getFromDataSources(), hasSize(1)); - - final AliasedDataSource fromDataSource = analysis.getFromDataSources().get(0); - assertThat(fromDataSource.getDataSource(), instanceOf(KsqlTable.class)); - assertThat(fromDataSource.getAlias(), equalTo("TEST2")); - assertThat(analysis.getInto().get().getName(), is("T")); - } - - @Test - public void shouldCreateAnalysisForInsertInto() { - // Given: - final PreparedStatement statement = KsqlParserTestUtil.buildSingleAst( - "insert into test0 select col1 from test1 EMIT CHANGES;", metaStore); - final Query query = statement.getStatement().getQuery(); - final Optional sink = Optional.of(statement.getStatement().getSink()); - - // When: - final Analysis analysis = queryAnalyzer.analyze(query, sink); - - // Then: - assertThat(analysis.getSelectExpressions(), contains( - new QualifiedNameReference(QualifiedName.of("TEST1", "COL1")))); - - assertThat(analysis.getFromDataSources(), hasSize(1)); - - final AliasedDataSource fromDataSource = analysis.getFromDataSources().get(0); - assertThat(fromDataSource.getDataSource(), instanceOf(KsqlStream.class)); - assertThat(fromDataSource.getAlias(), equalTo("TEST1")); - assertThat(analysis.getInto(), is(not(Optional.empty()))); - final Into into = analysis.getInto().get(); - final DataSource test0 = metaStore.getSource("TEST0"); - assertThat(into.getName(), is(test0.getName())); - assertThat(into.getKsqlTopic(), is(test0.getKsqlTopic())); - } - - @Test - public void shouldAnalyseWindowedAggregate() { - // Given: - final Query query = givenQuery( - "select itemid, sum(orderunits) from orders window TUMBLING ( size 30 second) " + - "where orderunits > 5 group by itemid EMIT CHANGES;"); - - // When: - final Analysis analysis = queryAnalyzer.analyze(query, Optional.empty()); - final AggregateAnalysis aggregateAnalysis = queryAnalyzer.analyzeAggregate(query, analysis); - - // Then: - assertThat(aggregateAnalysis.getNonAggregateSelectExpressions().get(ITEM_ID), contains(ITEM_ID)); - assertThat(aggregateAnalysis.getFinalSelectExpressions(), equalTo(Arrays.asList(ITEM_ID, new QualifiedNameReference(QualifiedName.of("KSQL_AGG_VARIABLE_0"))))); - assertThat(aggregateAnalysis.getAggregateFunctionArguments(), equalTo(Collections.singletonList(ORDER_UNITS))); - assertThat(aggregateAnalysis.getRequiredColumns(), containsInAnyOrder(ITEM_ID, ORDER_UNITS)); - } - - @Test - public void shouldThrowIfAggregateAnalysisDoesNotHaveGroupBy() { - // Given: - final Query query = givenQuery("select itemid, sum(orderunits) from orders EMIT CHANGES;"); - - final Analysis analysis = queryAnalyzer.analyze(query, Optional.empty()); - - expectedException.expect(KsqlException.class); - expectedException.expectMessage( - "Use of aggregate functions requires a GROUP BY clause. Aggregate function(s): SUM"); - - // When: - queryAnalyzer.analyzeAggregate(query, analysis); - } - - @Test - public void shouldThrowOnAdditionalNonAggregateSelects() { - // Given: - final Query query = givenQuery( - "select itemid, orderid, sum(orderunits) from orders group by itemid EMIT CHANGES;"); - - final Analysis analysis = queryAnalyzer.analyze(query, Optional.empty()); - - expectedException.expect(KsqlException.class); - expectedException.expectMessage( - "Non-aggregate SELECT expression(s) not part of GROUP BY: [ORDERS.ORDERID]"); - - // When: - queryAnalyzer.analyzeAggregate(query, analysis); - } - - @Test - public void shouldThrowOnAdditionalNonAggregateHavings() { - // Given: - final Query query = givenQuery( - "select sum(orderunits) from orders group by itemid having orderid = 1 EMIT CHANGES;"); - - final Analysis analysis = queryAnalyzer.analyze(query, Optional.empty()); - - expectedException.expect(KsqlException.class); - expectedException - .expectMessage("Non-aggregate HAVING expression not part of GROUP BY: [ORDERS.ORDERID]"); - - // When: - queryAnalyzer.analyzeAggregate(query, analysis); - } - - @Test - public void shouldProcessGroupByExpression() { - // Given: - final Query query = givenQuery( - "select sum(orderunits) from orders group by itemid EMIT CHANGES;"); - - final Analysis analysis = queryAnalyzer.analyze(query, Optional.empty()); - - // When: - final AggregateAnalysis aggregateAnalysis = queryAnalyzer.analyzeAggregate(query, analysis); - - // Then: - assertThat(aggregateAnalysis.getRequiredColumns(), hasItem(ITEM_ID)); - } - - @Test - public void shouldProcessGroupByArithmetic() { - // Given: - final Query query = givenQuery( - "select sum(orderunits) from orders group by itemid + 1 EMIT CHANGES;"); - - final Analysis analysis = queryAnalyzer.analyze(query, Optional.empty()); - - // When: - final AggregateAnalysis aggregateAnalysis = queryAnalyzer.analyzeAggregate(query, analysis); - - // Then: - assertThat(aggregateAnalysis.getRequiredColumns(), hasItem(ITEM_ID)); - } - - @Test - public void shouldProcessGroupByFunction() { - // Given: - final Query query = givenQuery( - "select sum(orderunits) from orders group by ucase(itemid) EMIT CHANGES;"); - - final Analysis analysis = queryAnalyzer.analyze(query, Optional.empty()); - - // When: - final AggregateAnalysis aggregateAnalysis = queryAnalyzer.analyzeAggregate(query, analysis); - - // Then: - assertThat(aggregateAnalysis.getRequiredColumns(), hasItem(ITEM_ID)); - } - - @Test - public void shouldProcessGroupByConstant() { - // Given: - final Query query = givenQuery( - "select sum(orderunits) from orders group by 1 EMIT CHANGES;"); - - final Analysis analysis = queryAnalyzer.analyze(query, Optional.empty()); - - // When: - queryAnalyzer.analyzeAggregate(query, analysis); - - // Then: did not throw. - } - - @Test - public void shouldThrowIfGroupByAggFunction() { - // Given: - final Query query = givenQuery( - "select sum(orderunits) from orders group by sum(orderid) EMIT CHANGES;"); - - final Analysis analysis = queryAnalyzer.analyze(query, Optional.empty()); - - // Then: - expectedException.expect(KsqlException.class); - expectedException.expectMessage( - "GROUP BY does not support aggregate functions: SUM is an aggregate function."); - - // When: - queryAnalyzer.analyzeAggregate(query, analysis); - } - - @Test - public void shouldProcessHavingExpression() { - // Given: - final Query query = givenQuery( - "select itemid, sum(orderunits) from orders window TUMBLING ( size 30 second) " + - "where orderunits > 5 group by itemid having count(itemid) > 10 EMIT CHANGES;"); - - final Analysis analysis = queryAnalyzer.analyze(query, Optional.empty()); - - // When: - final AggregateAnalysis aggregateAnalysis = queryAnalyzer.analyzeAggregate(query, analysis); - - // Then: - final Expression havingExpression = aggregateAnalysis.getHavingExpression(); - assertThat(havingExpression, equalTo(new ComparisonExpression( - ComparisonExpression.Type.GREATER_THAN, - new QualifiedNameReference(QualifiedName.of("KSQL_AGG_VARIABLE_1")), - new IntegerLiteral(10)))); - } - - @Test - public void shouldFailOnSelectStarWithGroupBy() { - // Given: - final Query query = givenQuery("select *, count() from orders group by itemid EMIT CHANGES;"); - final Analysis analysis = queryAnalyzer.analyze(query, Optional.empty()); - - expectedException.expect(KsqlException.class); - expectedException.expectMessage( - "Non-aggregate SELECT expression(s) not part of GROUP BY: " - + "[ORDERS.ADDRESS, ORDERS.ARRAYCOL, ORDERS.ITEMINFO, ORDERS.MAPCOL, ORDERS.ORDERID, " - + "ORDERS.ORDERTIME, ORDERS.ORDERUNITS, ORDERS.ROWKEY, ORDERS.ROWTIME]" - ); - - // When: - queryAnalyzer.analyzeAggregate(query, analysis); - } - - @Test - public void shouldHandleSelectStarWithCorrectGroupBy() { - // Given: - final Query query = givenQuery("select *, count() from orders group by " - + "ROWTIME, ROWKEY, ITEMID, ORDERTIME, ORDERUNITS, MAPCOL, ORDERID, ITEMINFO, ARRAYCOL, ADDRESS" - + " EMIT CHANGES;"); - - final Analysis analysis = queryAnalyzer.analyze(query, Optional.empty()); - - // When: - final AggregateAnalysis aggregateAnalysis = queryAnalyzer.analyzeAggregate(query, analysis); - - // Then: - assertThat(aggregateAnalysis.getNonAggregateSelectExpressions().keySet(), containsInAnyOrder( - qualifiedNameExpressions( - "ORDERS.ROWTIME", "ORDERS.ROWKEY", "ORDERS.ITEMID", "ORDERS.ORDERTIME", - "ORDERS.ORDERUNITS", "ORDERS.MAPCOL", "ORDERS.ORDERID", "ORDERS.ITEMINFO", - "ORDERS.ARRAYCOL", "ORDERS.ADDRESS") - )); - } - - @Test - public void shouldThrowIfSelectContainsUdfNotInGroupBy() { - // Given: - final Query query = givenQuery("select substring(orderid, 1, 2), count(*) " - + "from orders group by substring(orderid, 2, 5) EMIT CHANGES;"); - - final Analysis analysis = queryAnalyzer.analyze(query, Optional.empty()); - - expectedException.expect(KsqlException.class); - expectedException.expectMessage( - "Non-aggregate SELECT expression(s) not part of GROUP BY: [SUBSTRING(ORDERS.ORDERID, 1, 2)]" - ); - - // When: - queryAnalyzer.analyzeAggregate(query, analysis); - } - - @Test - public void shouldThrowIfSelectContainsReversedStringConcatExpression() { - // Given: - final Query query = givenQuery("select itemid + address->street, count(*) " - + "from orders group by address->street + itemid EMIT CHANGES;"); - - final Analysis analysis = queryAnalyzer.analyze(query, Optional.empty()); - - expectedException.expect(KsqlException.class); - expectedException.expectMessage( - "Non-aggregate SELECT expression(s) not part of GROUP BY: " - + "[(ORDERS.ITEMID + FETCH_FIELD_FROM_STRUCT(ORDERS.ADDRESS, 'STREET'))]" + @Mock + private MetaStore metaStore; + @Mock + private Analyzer analyzer; + @Mock + private Query query; + @Mock + private Analysis analysis; + @Mock + private QueryValidator continuousValidator; + @Mock + private QueryValidator staticValidator; + @Mock + private Sink sink; + private QueryAnalyzer queryAnalyzer; + + @Before + public void setUp() { + queryAnalyzer = new QueryAnalyzer( + metaStore, + analyzer, + continuousValidator, + staticValidator ); - // When: - queryAnalyzer.analyzeAggregate(query, analysis); + when(analyzer.analyze(any(), any())).thenReturn(analysis); } @Test - public void shouldThrowIfSelectContainsFieldsUsedInExpressionInGroupBy() { + public void shouldPreThenPostValidateContinuousQueries() { // Given: - final Query query = givenQuery("select orderId, count(*) " - + "from orders group by orderid + orderunits EMIT CHANGES;"); - - final Analysis analysis = queryAnalyzer.analyze(query, Optional.empty()); - - expectedException.expect(KsqlException.class); - expectedException.expectMessage( - "Non-aggregate SELECT expression(s) not part of GROUP BY: [ORDERS.ORDERID]" - ); + when(query.isStatic()).thenReturn(false); // When: - queryAnalyzer.analyzeAggregate(query, analysis); - } - - @Test - public void shouldThrowIfSelectContainsIncompatibleBinaryArithmetic() { - // Given: - final Query query = givenQuery("SELECT orderId - ordertime, COUNT(*) " - + "FROM ORDERS GROUP BY ordertime - orderId EMIT CHANGES;"); - - final Analysis analysis = queryAnalyzer.analyze(query, Optional.empty()); - - expectedException.expect(KsqlException.class); - expectedException.expectMessage( - "Non-aggregate SELECT expression(s) not part of GROUP BY: " - + "[(ORDERS.ORDERID - ORDERS.ORDERTIME)]" - ); - - // When: - queryAnalyzer.analyzeAggregate(query, analysis); - } - - @Test - public void shouldThrowIfGroupByMissingAggregateSelectExpressions() { - // Given: - final Query query = givenQuery("select orderid from orders group by orderid EMIT CHANGES;"); - final Analysis analysis = queryAnalyzer.analyze(query, Optional.empty()); - - expectedException.expect(KsqlException.class); - expectedException.expectMessage( - "GROUP BY requires columns using aggregate functions in SELECT clause." - ); - - // When: - queryAnalyzer.analyzeAggregate(query, analysis); - } - - @Test - public void shouldHandleValueFormat() { - // Given: - final PreparedStatement statement = KsqlParserTestUtil.buildSingleAst( - "create stream s with(value_format='delimited') as select * from test1;", metaStore); - final Query query = statement.getStatement().getQuery(); - final Optional sink = Optional.of(statement.getStatement().getSink()); - - // When: - final Analysis analysis = queryAnalyzer.analyze(query, sink); + queryAnalyzer.analyze(query, Optional.of(sink)); // Then: - assertThat(analysis.getInto().get().getKsqlTopic().getValueFormat().getFormat(), - is(Format.DELIMITED)); + final InOrder inOrder = Mockito.inOrder(continuousValidator); + inOrder.verify(continuousValidator).preValidate(query, Optional.of(sink)); + inOrder.verify(continuousValidator).postValidate(analysis); + verifyNoMoreInteractions(staticValidator); } @Test - public void shouldRejectStaticQueries() { + public void shouldPreValidateStaticQueries() { // Given: - final Query query = mock(Query.class); when(query.isStatic()).thenReturn(true); - // Then: - expectedException.expect(KsqlException.class); - expectedException.expectMessage("Static queries are not yet supported"); - // When: - queryAnalyzer.analyze(query, Optional.empty()); - } + queryAnalyzer.analyze(query, Optional.of(sink)); - private Query givenQuery(final String sql) { - return KsqlParserTestUtil.buildSingleAst(sql, metaStore).getStatement(); + // Then: + final InOrder inOrder = Mockito.inOrder(staticValidator); + inOrder.verify(staticValidator).preValidate(query, Optional.of(sink)); + inOrder.verify(staticValidator).postValidate(analysis); + verifyNoMoreInteractions(continuousValidator); } } \ No newline at end of file diff --git a/ksql-engine/src/test/java/io/confluent/ksql/analyzer/QueryFunctionalAnalyzerTest.java b/ksql-engine/src/test/java/io/confluent/ksql/analyzer/QueryFunctionalAnalyzerTest.java new file mode 100644 index 000000000000..b96bfd37cd40 --- /dev/null +++ b/ksql-engine/src/test/java/io/confluent/ksql/analyzer/QueryFunctionalAnalyzerTest.java @@ -0,0 +1,477 @@ +/* + * Copyright 2018 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.analyzer; + +import static io.confluent.ksql.util.ExpressionMatchers.qualifiedNameExpressions; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.instanceOf; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.hasItem; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.not; + +import io.confluent.ksql.analyzer.Analysis.AliasedDataSource; +import io.confluent.ksql.analyzer.Analysis.Into; +import io.confluent.ksql.execution.expression.tree.ComparisonExpression; +import io.confluent.ksql.execution.expression.tree.Expression; +import io.confluent.ksql.execution.expression.tree.IntegerLiteral; +import io.confluent.ksql.execution.expression.tree.QualifiedName; +import io.confluent.ksql.execution.expression.tree.QualifiedNameReference; +import io.confluent.ksql.function.InternalFunctionRegistry; +import io.confluent.ksql.metastore.MetaStore; +import io.confluent.ksql.metastore.model.DataSource; +import io.confluent.ksql.metastore.model.KsqlStream; +import io.confluent.ksql.metastore.model.KsqlTable; +import io.confluent.ksql.parser.KsqlParser.PreparedStatement; +import io.confluent.ksql.parser.KsqlParserTestUtil; +import io.confluent.ksql.parser.tree.CreateStreamAsSelect; +import io.confluent.ksql.parser.tree.CreateTableAsSelect; +import io.confluent.ksql.parser.tree.InsertInto; +import io.confluent.ksql.parser.tree.Query; +import io.confluent.ksql.parser.tree.Sink; +import io.confluent.ksql.serde.Format; +import io.confluent.ksql.serde.SerdeOption; +import io.confluent.ksql.util.KsqlException; +import io.confluent.ksql.util.MetaStoreFixture; +import java.util.Arrays; +import java.util.Collections; +import java.util.Optional; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +/** + * DO NOT ADD NEW TESTS TO THIS FILE + * + *

Instead add new JSON based tests to QueryTranslationTest + * + *

This test file is more of a functional test, which is better implemented using QTT. + */ +@SuppressWarnings("OptionalGetWithoutIsPresent") +public class QueryFunctionalAnalyzerTest { + + private static final QualifiedNameReference ITEM_ID = + new QualifiedNameReference(QualifiedName.of("ORDERS", "ITEMID")); + + private static final QualifiedNameReference ORDER_ID = + new QualifiedNameReference(QualifiedName.of("ORDERS", "ORDERID")); + + private static final QualifiedNameReference ORDER_UNITS = + new QualifiedNameReference(QualifiedName.of("ORDERS", "ORDERUNITS")); + + @Rule + public final ExpectedException expectedException = ExpectedException.none(); + + private final MetaStore metaStore = MetaStoreFixture.getNewMetaStore(new InternalFunctionRegistry()); + private final QueryAnalyzer queryAnalyzer = + new QueryAnalyzer(metaStore, "prefix-~", SerdeOption.none()); + + @Test + public void shouldCreateAnalysisForSimpleQuery() { + // Given: + final Query query = givenQuery("select orderid from orders EMIT CHANGES;"); + + // When: + final Analysis analysis = queryAnalyzer.analyze(query, Optional.empty()); + + // Then: + final AliasedDataSource fromDataSource = analysis.getFromDataSources().get(0); + assertThat(analysis.getSelectExpressions(), equalTo(Collections.singletonList(ORDER_ID))); + assertThat(analysis.getFromDataSources(), hasSize(1)); + assertThat(fromDataSource.getDataSource(), instanceOf(KsqlStream.class)); + assertThat(fromDataSource.getAlias(), equalTo("ORDERS")); + } + + @Test + public void shouldCreateAnalysisForCsas() { + // Given: + final PreparedStatement statement = KsqlParserTestUtil.buildSingleAst( + "create stream s as select col1 from test1 EMIT CHANGES;", metaStore); + final Query query = statement.getStatement().getQuery(); + final Optional sink = Optional.of(statement.getStatement().getSink()); + + // When: + final Analysis analysis = queryAnalyzer.analyze(query, sink); + + // Then: + assertThat(analysis.getSelectExpressions(), contains( + new QualifiedNameReference(QualifiedName.of("TEST1", "COL1")))); + + assertThat(analysis.getFromDataSources(), hasSize(1)); + + final AliasedDataSource fromDataSource = analysis.getFromDataSources().get(0); + assertThat(fromDataSource.getDataSource(), instanceOf(KsqlStream.class)); + assertThat(fromDataSource.getAlias(), equalTo("TEST1")); + assertThat(analysis.getInto().get().getName(), is("S")); + } + + @Test + public void shouldCreateAnalysisForCtas() { + // Given: + final PreparedStatement statement = KsqlParserTestUtil.buildSingleAst( + "create table t as select col1 from test2 EMIT CHANGES;", metaStore); + final Query query = statement.getStatement().getQuery(); + final Optional sink = Optional.of(statement.getStatement().getSink()); + + // When: + final Analysis analysis = queryAnalyzer.analyze(query, sink); + + // Then: + assertThat(analysis.getSelectExpressions(), contains( + new QualifiedNameReference(QualifiedName.of("TEST2", "COL1")))); + + assertThat(analysis.getFromDataSources(), hasSize(1)); + + final AliasedDataSource fromDataSource = analysis.getFromDataSources().get(0); + assertThat(fromDataSource.getDataSource(), instanceOf(KsqlTable.class)); + assertThat(fromDataSource.getAlias(), equalTo("TEST2")); + assertThat(analysis.getInto().get().getName(), is("T")); + } + + @Test + public void shouldCreateAnalysisForInsertInto() { + // Given: + final PreparedStatement statement = KsqlParserTestUtil.buildSingleAst( + "insert into test0 select col1 from test1 EMIT CHANGES;", metaStore); + final Query query = statement.getStatement().getQuery(); + final Optional sink = Optional.of(statement.getStatement().getSink()); + + // When: + final Analysis analysis = queryAnalyzer.analyze(query, sink); + + // Then: + assertThat(analysis.getSelectExpressions(), contains( + new QualifiedNameReference(QualifiedName.of("TEST1", "COL1")))); + + assertThat(analysis.getFromDataSources(), hasSize(1)); + + final AliasedDataSource fromDataSource = analysis.getFromDataSources().get(0); + assertThat(fromDataSource.getDataSource(), instanceOf(KsqlStream.class)); + assertThat(fromDataSource.getAlias(), equalTo("TEST1")); + assertThat(analysis.getInto(), is(not(Optional.empty()))); + final Into into = analysis.getInto().get(); + final DataSource test0 = metaStore.getSource("TEST0"); + assertThat(into.getName(), is(test0.getName())); + assertThat(into.getKsqlTopic(), is(test0.getKsqlTopic())); + } + + @Test + public void shouldAnalyseWindowedAggregate() { + // Given: + final Query query = givenQuery( + "select itemid, sum(orderunits) from orders window TUMBLING ( size 30 second) " + + "where orderunits > 5 group by itemid EMIT CHANGES;"); + + // When: + final Analysis analysis = queryAnalyzer.analyze(query, Optional.empty()); + final AggregateAnalysis aggregateAnalysis = queryAnalyzer.analyzeAggregate(query, analysis); + + // Then: + assertThat(aggregateAnalysis.getNonAggregateSelectExpressions().get(ITEM_ID), contains(ITEM_ID)); + assertThat(aggregateAnalysis.getFinalSelectExpressions(), equalTo(Arrays.asList(ITEM_ID, new QualifiedNameReference(QualifiedName.of("KSQL_AGG_VARIABLE_0"))))); + assertThat(aggregateAnalysis.getAggregateFunctionArguments(), equalTo(Collections.singletonList(ORDER_UNITS))); + assertThat(aggregateAnalysis.getRequiredColumns(), containsInAnyOrder(ITEM_ID, ORDER_UNITS)); + } + + @Test + public void shouldThrowIfAggregateAnalysisDoesNotHaveGroupBy() { + // Given: + final Query query = givenQuery("select itemid, sum(orderunits) from orders EMIT CHANGES;"); + + final Analysis analysis = queryAnalyzer.analyze(query, Optional.empty()); + + expectedException.expect(KsqlException.class); + expectedException.expectMessage( + "Use of aggregate functions requires a GROUP BY clause. Aggregate function(s): SUM"); + + // When: + queryAnalyzer.analyzeAggregate(query, analysis); + } + + @Test + public void shouldThrowOnAdditionalNonAggregateSelects() { + // Given: + final Query query = givenQuery( + "select itemid, orderid, sum(orderunits) from orders group by itemid EMIT CHANGES;"); + + final Analysis analysis = queryAnalyzer.analyze(query, Optional.empty()); + + expectedException.expect(KsqlException.class); + expectedException.expectMessage( + "Non-aggregate SELECT expression(s) not part of GROUP BY: [ORDERS.ORDERID]"); + + // When: + queryAnalyzer.analyzeAggregate(query, analysis); + } + + @Test + public void shouldThrowOnAdditionalNonAggregateHavings() { + // Given: + final Query query = givenQuery( + "select sum(orderunits) from orders group by itemid having orderid = 1 EMIT CHANGES;"); + + final Analysis analysis = queryAnalyzer.analyze(query, Optional.empty()); + + expectedException.expect(KsqlException.class); + expectedException + .expectMessage("Non-aggregate HAVING expression not part of GROUP BY: [ORDERS.ORDERID]"); + + // When: + queryAnalyzer.analyzeAggregate(query, analysis); + } + + @Test + public void shouldProcessGroupByExpression() { + // Given: + final Query query = givenQuery( + "select sum(orderunits) from orders group by itemid EMIT CHANGES;"); + + final Analysis analysis = queryAnalyzer.analyze(query, Optional.empty()); + + // When: + final AggregateAnalysis aggregateAnalysis = queryAnalyzer.analyzeAggregate(query, analysis); + + // Then: + assertThat(aggregateAnalysis.getRequiredColumns(), hasItem(ITEM_ID)); + } + + @Test + public void shouldProcessGroupByArithmetic() { + // Given: + final Query query = givenQuery( + "select sum(orderunits) from orders group by itemid + 1 EMIT CHANGES;"); + + final Analysis analysis = queryAnalyzer.analyze(query, Optional.empty()); + + // When: + final AggregateAnalysis aggregateAnalysis = queryAnalyzer.analyzeAggregate(query, analysis); + + // Then: + assertThat(aggregateAnalysis.getRequiredColumns(), hasItem(ITEM_ID)); + } + + @Test + public void shouldProcessGroupByFunction() { + // Given: + final Query query = givenQuery( + "select sum(orderunits) from orders group by ucase(itemid) EMIT CHANGES;"); + + final Analysis analysis = queryAnalyzer.analyze(query, Optional.empty()); + + // When: + final AggregateAnalysis aggregateAnalysis = queryAnalyzer.analyzeAggregate(query, analysis); + + // Then: + assertThat(aggregateAnalysis.getRequiredColumns(), hasItem(ITEM_ID)); + } + + @Test + public void shouldProcessGroupByConstant() { + // Given: + final Query query = givenQuery( + "select sum(orderunits) from orders group by 1 EMIT CHANGES;"); + + final Analysis analysis = queryAnalyzer.analyze(query, Optional.empty()); + + // When: + queryAnalyzer.analyzeAggregate(query, analysis); + + // Then: did not throw. + } + + @Test + public void shouldThrowIfGroupByAggFunction() { + // Given: + final Query query = givenQuery( + "select sum(orderunits) from orders group by sum(orderid) EMIT CHANGES;"); + + final Analysis analysis = queryAnalyzer.analyze(query, Optional.empty()); + + // Then: + expectedException.expect(KsqlException.class); + expectedException.expectMessage( + "GROUP BY does not support aggregate functions: SUM is an aggregate function."); + + // When: + queryAnalyzer.analyzeAggregate(query, analysis); + } + + @Test + public void shouldProcessHavingExpression() { + // Given: + final Query query = givenQuery( + "select itemid, sum(orderunits) from orders window TUMBLING ( size 30 second) " + + "where orderunits > 5 group by itemid having count(itemid) > 10 EMIT CHANGES;"); + + final Analysis analysis = queryAnalyzer.analyze(query, Optional.empty()); + + // When: + final AggregateAnalysis aggregateAnalysis = queryAnalyzer.analyzeAggregate(query, analysis); + + // Then: + final Expression havingExpression = aggregateAnalysis.getHavingExpression(); + assertThat(havingExpression, equalTo(new ComparisonExpression( + ComparisonExpression.Type.GREATER_THAN, + new QualifiedNameReference(QualifiedName.of("KSQL_AGG_VARIABLE_1")), + new IntegerLiteral(10)))); + } + + @Test + public void shouldFailOnSelectStarWithGroupBy() { + // Given: + final Query query = givenQuery("select *, count() from orders group by itemid EMIT CHANGES;"); + final Analysis analysis = queryAnalyzer.analyze(query, Optional.empty()); + + expectedException.expect(KsqlException.class); + expectedException.expectMessage( + "Non-aggregate SELECT expression(s) not part of GROUP BY: " + + "[ORDERS.ADDRESS, ORDERS.ARRAYCOL, ORDERS.ITEMINFO, ORDERS.MAPCOL, ORDERS.ORDERID, " + + "ORDERS.ORDERTIME, ORDERS.ORDERUNITS, ORDERS.ROWKEY, ORDERS.ROWTIME]" + ); + + // When: + queryAnalyzer.analyzeAggregate(query, analysis); + } + + @Test + public void shouldHandleSelectStarWithCorrectGroupBy() { + // Given: + final Query query = givenQuery("select *, count() from orders group by " + + "ROWTIME, ROWKEY, ITEMID, ORDERTIME, ORDERUNITS, MAPCOL, ORDERID, ITEMINFO, ARRAYCOL, ADDRESS" + + " EMIT CHANGES;"); + + final Analysis analysis = queryAnalyzer.analyze(query, Optional.empty()); + + // When: + final AggregateAnalysis aggregateAnalysis = queryAnalyzer.analyzeAggregate(query, analysis); + + // Then: + assertThat(aggregateAnalysis.getNonAggregateSelectExpressions().keySet(), containsInAnyOrder( + qualifiedNameExpressions( + "ORDERS.ROWTIME", "ORDERS.ROWKEY", "ORDERS.ITEMID", "ORDERS.ORDERTIME", + "ORDERS.ORDERUNITS", "ORDERS.MAPCOL", "ORDERS.ORDERID", "ORDERS.ITEMINFO", + "ORDERS.ARRAYCOL", "ORDERS.ADDRESS") + )); + } + + @Test + public void shouldThrowIfSelectContainsUdfNotInGroupBy() { + // Given: + final Query query = givenQuery("select substring(orderid, 1, 2), count(*) " + + "from orders group by substring(orderid, 2, 5) EMIT CHANGES;"); + + final Analysis analysis = queryAnalyzer.analyze(query, Optional.empty()); + + expectedException.expect(KsqlException.class); + expectedException.expectMessage( + "Non-aggregate SELECT expression(s) not part of GROUP BY: [SUBSTRING(ORDERS.ORDERID, 1, 2)]" + ); + + // When: + queryAnalyzer.analyzeAggregate(query, analysis); + } + + @Test + public void shouldThrowIfSelectContainsReversedStringConcatExpression() { + // Given: + final Query query = givenQuery("select itemid + address->street, count(*) " + + "from orders group by address->street + itemid EMIT CHANGES;"); + + final Analysis analysis = queryAnalyzer.analyze(query, Optional.empty()); + + expectedException.expect(KsqlException.class); + expectedException.expectMessage( + "Non-aggregate SELECT expression(s) not part of GROUP BY: " + + "[(ORDERS.ITEMID + FETCH_FIELD_FROM_STRUCT(ORDERS.ADDRESS, 'STREET'))]" + ); + + // When: + queryAnalyzer.analyzeAggregate(query, analysis); + } + + @Test + public void shouldThrowIfSelectContainsFieldsUsedInExpressionInGroupBy() { + // Given: + final Query query = givenQuery("select orderId, count(*) " + + "from orders group by orderid + orderunits EMIT CHANGES;"); + + final Analysis analysis = queryAnalyzer.analyze(query, Optional.empty()); + + expectedException.expect(KsqlException.class); + expectedException.expectMessage( + "Non-aggregate SELECT expression(s) not part of GROUP BY: [ORDERS.ORDERID]" + ); + + // When: + queryAnalyzer.analyzeAggregate(query, analysis); + } + + @Test + public void shouldThrowIfSelectContainsIncompatibleBinaryArithmetic() { + // Given: + final Query query = givenQuery("SELECT orderId - ordertime, COUNT(*) " + + "FROM ORDERS GROUP BY ordertime - orderId EMIT CHANGES;"); + + final Analysis analysis = queryAnalyzer.analyze(query, Optional.empty()); + + expectedException.expect(KsqlException.class); + expectedException.expectMessage( + "Non-aggregate SELECT expression(s) not part of GROUP BY: " + + "[(ORDERS.ORDERID - ORDERS.ORDERTIME)]" + ); + + // When: + queryAnalyzer.analyzeAggregate(query, analysis); + } + + @Test + public void shouldThrowIfGroupByMissingAggregateSelectExpressions() { + // Given: + final Query query = givenQuery("select orderid from orders group by orderid EMIT CHANGES;"); + final Analysis analysis = queryAnalyzer.analyze(query, Optional.empty()); + + expectedException.expect(KsqlException.class); + expectedException.expectMessage( + "GROUP BY requires columns using aggregate functions in SELECT clause." + ); + + // When: + queryAnalyzer.analyzeAggregate(query, analysis); + } + + @Test + public void shouldHandleValueFormat() { + // Given: + final PreparedStatement statement = KsqlParserTestUtil.buildSingleAst( + "create stream s with(value_format='delimited') as select * from test1;", metaStore); + final Query query = statement.getStatement().getQuery(); + final Optional sink = Optional.of(statement.getStatement().getSink()); + + // When: + final Analysis analysis = queryAnalyzer.analyze(query, sink); + + // Then: + assertThat(analysis.getInto().get().getKsqlTopic().getValueFormat().getFormat(), + is(Format.DELIMITED)); + } + + private Query givenQuery(final String sql) { + return KsqlParserTestUtil.buildSingleAst(sql, metaStore).getStatement(); + } +} \ No newline at end of file diff --git a/ksql-engine/src/test/java/io/confluent/ksql/analyzer/StaticQueryValidatorTest.java b/ksql-engine/src/test/java/io/confluent/ksql/analyzer/StaticQueryValidatorTest.java new file mode 100644 index 000000000000..7043d6905dd9 --- /dev/null +++ b/ksql-engine/src/test/java/io/confluent/ksql/analyzer/StaticQueryValidatorTest.java @@ -0,0 +1,160 @@ +/* + * Copyright 2019 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.analyzer; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import com.google.common.collect.ImmutableList; +import io.confluent.ksql.execution.expression.tree.Expression; +import io.confluent.ksql.parser.tree.Query; +import io.confluent.ksql.parser.tree.ResultMaterialization; +import io.confluent.ksql.parser.tree.Sink; +import io.confluent.ksql.parser.tree.WindowExpression; +import io.confluent.ksql.util.KsqlException; +import java.util.Optional; +import java.util.OptionalInt; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; + +@RunWith(MockitoJUnitRunner.class) +public class StaticQueryValidatorTest { + + private static final Expression AN_EXPRESSION = mock(Expression.class); + + @Rule + public final ExpectedException expectedException = ExpectedException.none(); + + @Mock + private Query query; + @Mock + private Analysis analysis; + @Mock + private WindowExpression windowExpression; + @Mock + private Sink sink; + + private QueryValidator validator; + + @Before + public void setUp() { + validator = new StaticQueryValidator(); + + when(query.isStatic()).thenReturn(true); + } + + @Test + public void shouldThrowOnStaticQueryThatIsNotFinal() { + // Given: + when(query.getResultMaterialization()).thenReturn(ResultMaterialization.CHANGES); + + // Then: + expectedException.expect(KsqlException.class); + expectedException.expectMessage("Static queries do not yet support `EMIT CHANGES`"); + + // When: + validator.preValidate(query, Optional.empty()); + } + + @Test(expected = IllegalArgumentException.class) + public void shouldThrowOnStaticQueryIfSinkSupplied() { + validator.preValidate(query, Optional.of(sink)); + } + + @Test + public void shouldThrowOnStaticQueryThatIsJoin() { + // Given: + when(analysis.isJoin()).thenReturn(true); + + // Then: + expectedException.expect(KsqlException.class); + expectedException.expectMessage("Static queries do not support joins."); + + // When: + validator.postValidate(analysis); + } + + @Test + public void shouldThrowOnStaticQueryThatIsWindowed() { + // Given: + + when(analysis.getWindowExpression()).thenReturn(windowExpression); + + // Then: + expectedException.expect(KsqlException.class); + expectedException.expectMessage("Static queries do not support WINDOW clauses."); + + // When: + validator.postValidate(analysis); + } + + @Test + public void shouldThrowOnStaticQueryThatHasGroupBy() { + // Given: + when(analysis.getGroupByExpressions()).thenReturn(ImmutableList.of(AN_EXPRESSION)); + + // Then: + expectedException.expect(KsqlException.class); + expectedException.expectMessage("Static queries do not support GROUP BY clauses."); + + // When: + validator.postValidate(analysis); + } + + @Test + public void shouldThrowOnStaticQueryThatHasPartitionBy() { + // Given: + when(analysis.getPartitionBy()).thenReturn(Optional.of("Something")); + + // Then: + expectedException.expect(KsqlException.class); + expectedException.expectMessage("Static queries do not support PARTITION BY clauses."); + + // When: + validator.postValidate(analysis); + } + + @Test + public void shouldThrowOnStaticQueryThatHasHavingClause() { + // Given: + when(analysis.getHavingExpression()).thenReturn(AN_EXPRESSION); + + // Then: + expectedException.expect(KsqlException.class); + expectedException.expectMessage("Static queries do not support HAVING clauses."); + + // When: + validator.postValidate(analysis); + } + + @Test + public void shouldThrowOnStaticQueryThatHasLimitClause() { + // Given: + when(analysis.getLimitClause()).thenReturn(OptionalInt.of(1)); + + // Then: + expectedException.expect(KsqlException.class); + expectedException.expectMessage("Static queries do not support LIMIT clauses."); + + // When: + validator.postValidate(analysis); + } +} \ No newline at end of file diff --git a/ksql-functional-tests/src/test/resources/rest-query-validation-tests/materialized-aggregate-static-queries.json b/ksql-functional-tests/src/test/resources/rest-query-validation-tests/materialized-aggregate-static-queries.json index 9b33d7581318..5112adbe74fd 100644 --- a/ksql-functional-tests/src/test/resources/rest-query-validation-tests/materialized-aggregate-static-queries.json +++ b/ksql-functional-tests/src/test/resources/rest-query-validation-tests/materialized-aggregate-static-queries.json @@ -24,6 +24,45 @@ {"@type": "row", "rows": []} ] }, + { + "name": "non-windowed with projection", + "statements": [ + "CREATE STREAM INPUT (IGNORED INT) WITH (kafka_topic='test_topic', value_format='JSON');", + "CREATE TABLE AGGREGATE AS SELECT ROWKEY AS ID, COUNT(1) AS COUNT FROM INPUT GROUP BY ROWKEY;", + "SELECT COUNT, CONCAT(ID, 'x') AS ID, COUNT * 2 FROM AGGREGATE WHERE ROWKEY='10';" + ], + "inputs": [ + {"topic": "test_topic", "key": "11", "value": {}}, + {"topic": "test_topic", "key": "10", "value": {}} + ], + "responses": [ + {"@type": "currentStatus"}, + {"@type": "currentStatus"}, + {"@type": "row", "rows": [ + {"window": null, "key": {"ROWKEY": "10"}, "value": {"COUNT": 1, "ID": "10x", "KSQL_COL_2": 2}} + ]} + ] + }, + { + "name": "windowed with projection", + "statements": [ + "CREATE STREAM INPUT (IGNORED INT) WITH (kafka_topic='test_topic', value_format='JSON');", + "CREATE TABLE AGGREGATE AS SELECT ROWKEY AS ID, COUNT(1) AS COUNT FROM INPUT WINDOW TUMBLING(SIZE 1 SECOND) GROUP BY ROWKEY;", + "SELECT COUNT, CONCAT(ID, 'x') AS ID, COUNT * 2 FROM AGGREGATE WHERE ROWKEY='10' AND WindowStart=12000;" + ], + "inputs": [ + {"topic": "test_topic", "timestamp": 12345, "key": "11", "value": {}}, + {"topic": "test_topic", "timestamp": 11345, "key": "10", "value": {}}, + {"topic": "test_topic", "timestamp": 12345, "key": "10", "value": {}} + ], + "responses": [ + {"@type": "currentStatus"}, + {"@type": "currentStatus"}, + {"@type": "row", "rows": [ + {"window": {"start": 12000, "end": null}, "key": {"ROWKEY": "10"}, "value": {"COUNT": 1, "ID": "10x", "KSQL_COL_2": 2}} + ]} + ] + }, { "name": "tumbling windowed single key lookup with exact window start", "statements": [ @@ -248,19 +287,6 @@ {"@type": "row", "rows": []} ] }, - { - "name": "fail on unsupported query feature: non-select-star projections", - "statements": [ - "CREATE STREAM INPUT (IGNORED INT) WITH (kafka_topic='test_topic', value_format='JSON');", - "CREATE TABLE AGGREGATE AS SELECT COUNT(1) AS COUNT FROM INPUT GROUP BY ROWKEY;", - "SELECT COUNT FROM AGGREGATE WHERE ROWKEY='10';" - ], - "expectedError": { - "type": "io.confluent.ksql.rest.entity.KsqlStatementErrorMessage", - "message": "Static queries currently only support a 'SELECT *' projections", - "status": 400 - } - }, { "name": "fail on unsupported query feature: join", "statements": [ @@ -389,6 +415,19 @@ "message": "WHERE clause missing WINDOWSTART", "status": 400 } + }, + { + "name": "fail if WINDOWSTART used in non-windowed static query", + "statements": [ + "CREATE STREAM INPUT (IGNORED INT) WITH (kafka_topic='test_topic', value_format='JSON');", + "CREATE TABLE AGGREGATE AS SELECT COUNT(1) AS COUNT FROM INPUT GROUP BY ROWKEY;", + "SELECT * FROM AGGREGATE WHERE ROWKEY='10' AND WINDOWSTART=10;" + ], + "expectedError": { + "type": "io.confluent.ksql.rest.entity.KsqlStatementErrorMessage", + "message": "Field 'AGGREGATE.WINDOWSTART' cannot be resolved", + "status": 400 + } } ] } \ No newline at end of file diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/StaticQueryExecutor.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/StaticQueryExecutor.java index 0d66858a626b..9a3182974c06 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/StaticQueryExecutor.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/StaticQueryExecutor.java @@ -26,6 +26,8 @@ import com.google.common.collect.Sets.SetView; import io.confluent.ksql.GenericRow; import io.confluent.ksql.KsqlExecutionContext; +import io.confluent.ksql.analyzer.Analysis; +import io.confluent.ksql.analyzer.QueryAnalyzer; import io.confluent.ksql.execution.context.QueryContext; import io.confluent.ksql.execution.context.QueryContext.Stacker; import io.confluent.ksql.execution.expression.tree.ComparisonExpression; @@ -37,6 +39,11 @@ import io.confluent.ksql.execution.expression.tree.LongLiteral; import io.confluent.ksql.execution.expression.tree.QualifiedNameReference; import io.confluent.ksql.execution.expression.tree.StringLiteral; +import io.confluent.ksql.execution.plan.SelectExpression; +import io.confluent.ksql.execution.streams.SelectValueMapper; +import io.confluent.ksql.execution.streams.SelectValueMapperFactory; +import io.confluent.ksql.execution.util.ExpressionTypeManager; +import io.confluent.ksql.logging.processing.NoopProcessingLogContext; import io.confluent.ksql.materialization.Locator; import io.confluent.ksql.materialization.Locator.KsqlNode; import io.confluent.ksql.materialization.Materialization; @@ -44,14 +51,9 @@ import io.confluent.ksql.materialization.Window; import io.confluent.ksql.metastore.MetaStore; import io.confluent.ksql.metastore.model.DataSource; -import io.confluent.ksql.parser.tree.AliasedRelation; -import io.confluent.ksql.parser.tree.Join; +import io.confluent.ksql.parser.tree.AllColumns; import io.confluent.ksql.parser.tree.Query; -import io.confluent.ksql.parser.tree.Relation; -import io.confluent.ksql.parser.tree.Select; import io.confluent.ksql.parser.tree.SelectItem; -import io.confluent.ksql.parser.tree.SingleColumn; -import io.confluent.ksql.parser.tree.Table; import io.confluent.ksql.query.QueryId; import io.confluent.ksql.rest.Errors; import io.confluent.ksql.rest.client.KsqlRestClient; @@ -61,9 +63,13 @@ import io.confluent.ksql.rest.entity.QueryResultEntity; import io.confluent.ksql.rest.entity.QueryResultEntityFactory; import io.confluent.ksql.rest.server.resources.KsqlRestException; +import io.confluent.ksql.schema.ksql.LogicalSchema; import io.confluent.ksql.schema.ksql.PhysicalSchema; +import io.confluent.ksql.schema.ksql.types.SqlType; +import io.confluent.ksql.serde.SerdeOption; import io.confluent.ksql.services.ServiceContext; import io.confluent.ksql.statement.ConfiguredStatement; +import io.confluent.ksql.util.KsqlConfig; import io.confluent.ksql.util.KsqlConstants; import io.confluent.ksql.util.KsqlException; import io.confluent.ksql.util.KsqlServerException; @@ -73,9 +79,12 @@ import io.confluent.ksql.util.timestamp.StringToTimestampParser; import java.time.Duration; import java.time.Instant; +import java.util.ArrayList; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; @@ -108,20 +117,11 @@ public static void validate( } try { - validateSelects(queryStmt.getSelect()); + final Analysis analysis = analyze(statement, executionContext); - final PersistentQueryMetadata query = findMaterializingQuery(executionContext, queryStmt); - - extractWhereInfo(queryStmt.getWhere(), query); - - if (queryStmt.getWindow().isPresent()) { - throw new KsqlException("Static queries do not support WINDOW clauses."); - } - - if (queryStmt.getGroupBy().isPresent()) { - throw new KsqlException("Static queries do not support GROUP BY clauses."); - } + final PersistentQueryMetadata query = findMaterializingQuery(executionContext, analysis); + extractWhereInfo(analysis, query); } catch (final Exception e) { throw new KsqlStatementException( e.getMessage(), @@ -137,17 +137,17 @@ public static Optional execute( final ServiceContext serviceContext ) { try { - final Query queryStmt = statement.getStatement(); + final Analysis analysis = analyze(statement, executionContext); - final PersistentQueryMetadata query = findMaterializingQuery(executionContext, queryStmt); + final PersistentQueryMetadata query = findMaterializingQuery(executionContext, analysis); - final WhereInfo whereInfo = extractWhereInfo(queryStmt.getWhere(), query); + final WhereInfo whereInfo = extractWhereInfo(analysis, query); final QueryContext.Stacker contextStacker = new Stacker(new QueryId("static-query")); final Materialization mat = query .getMaterialization(contextStacker) - .orElseThrow(() -> notMaterializedException(getSourceRelation(queryStmt.getFrom()))); + .orElseThrow(() -> notMaterializedException(getSourceName(analysis))); final Struct rowKey = asKeyStruct(whereInfo.rowkey, query.getPhysicalSchema()); @@ -156,7 +156,7 @@ public static Optional execute( return Optional.of(proxyTo(owner, statement)); } - final Map, GenericRow> result; + Result result; if (whereInfo.windowBounds.isPresent()) { final WindowBounds windowBounds = whereInfo.windowBounds.get(); @@ -164,18 +164,29 @@ public static Optional execute( mat.windowed().get(rowKey, windowBounds.lower, windowBounds.upper) .forEach((k, v) -> builder.put(Optional.of(k), v)); - result = builder.build(); + result = new Result( + mat.schema(), + builder.build() + ); } else { - result = mat.nonWindowed().get(rowKey) + final ImmutableMap, GenericRow> rows = mat + .nonWindowed().get(rowKey) .map(v -> ImmutableMap.of(Optional.empty(), v)) .orElse(ImmutableMap.of()); + + result = new Result( + mat.schema(), + rows + ); } + result = handleSelects(result, statement, executionContext, analysis); + final QueryResultEntity entity = new QueryResultEntity( statement.getStatementText(), mat.windowType(), - mat.schema(), - QueryResultEntityFactory.createRows(rowKey, result, mat.schema()) + result.schema, + QueryResultEntityFactory.createRows(rowKey, result.rows, result.schema) ); return Optional.of(entity); @@ -188,6 +199,19 @@ public static Optional execute( } } + private static Analysis analyze( + final ConfiguredStatement statement, + final KsqlExecutionContext executionContext + ) { + final QueryAnalyzer queryAnalyzer = new QueryAnalyzer( + executionContext.getMetaStore(), + "", + SerdeOption.none() + ); + + return queryAnalyzer.analyze(statement.getStatement(), Optional.empty()); + } + private static final class WindowBounds { private final Instant lower; @@ -217,14 +241,30 @@ private WhereInfo( } } + private static final class Result { + + private final LogicalSchema schema; + private final Map, GenericRow> rows; + + private Result( + final LogicalSchema schema, + final Map, GenericRow> rows + ) { + this.schema = Objects.requireNonNull(schema, "schema"); + this.rows = Objects.requireNonNull(rows, "rows"); + } + } + private static WhereInfo extractWhereInfo( - final Optional possibleWhere, + final Analysis analysis, final PersistentQueryMetadata query ) { final boolean windowed = query.getResultTopic().getKeyFormat().isWindowed(); - final Expression where = possibleWhere - .orElseThrow(() -> invalidWhereClauseException("missing WHERE clause", windowed)); + final Expression where = analysis.getWhereExpression(); + if (where == null) { + throw invalidWhereClauseException("missing WHERE clause", windowed); + } final Map> comparisons = extractComparisons(where); @@ -237,7 +277,7 @@ private static WhereInfo extractWhereInfo( if (!windowed) { if (comparisons.size() > 1) { - throw invalidWhereClauseException("Unsupported WHERE clause", windowed); + throw invalidWhereClauseException("Unsupported WHERE clause", false); } return new WhereInfo(rowKey, Optional.empty()); @@ -249,7 +289,7 @@ private static WhereInfo extractWhereInfo( if (windowBoundsComparison == null) { throw invalidWhereClauseException( "WHERE clause missing " + ComparisonTarget.WINDOWSTART, - windowed + true ); } @@ -465,32 +505,73 @@ private static ComparisonTarget extractWhereClauseTarget(final ComparisonExpress } } - private static void validateSelects(final Select select) { - final List selectItems = select.getSelectItems(); + private static boolean isSelectStar(final List selects) { + return selects.size() == 1 && selects.get(0) instanceof AllColumns; + } - if (selectItems.size() != 1 - || selectItems.get(0) instanceof SingleColumn - ) { - throw new KsqlException("Static queries currently only support a 'SELECT *' projections"); + private static Result handleSelects( + final Result input, + final ConfiguredStatement statement, + final KsqlExecutionContext executionContext, + final Analysis analysis + ) { + final List selectItems = statement.getStatement().getSelect().getSelectItems(); + if (input.rows.isEmpty() || isSelectStar(selectItems)) { + return input; + } + + final LogicalSchema.Builder schema = LogicalSchema.builder(); + schema.keyColumns(input.schema.key()); + + final ExpressionTypeManager expressionTypeManager = new ExpressionTypeManager( + input.schema, + executionContext.getMetaStore() + ); + + final List selects = new ArrayList<>(selectItems.size()); + for (int idx = 0; idx < analysis.getSelectExpressions().size(); idx++) { + final Expression exp = analysis.getSelectExpressions().get(idx); + final String alias = analysis.getSelectExpressionAlias().get(idx); + selects.add(SelectExpression.of(alias, exp)); + final SqlType type = expressionTypeManager.getExpressionSqlType(exp); + schema.valueColumn(alias, type); } + + final String sourceName = getSourceName(analysis); + + final KsqlConfig ksqlConfig = statement.getConfig() + .cloneWithPropertyOverwrite(statement.getOverrides()); + + final SelectValueMapper mapper = SelectValueMapperFactory.create( + selects, + input.schema.withAlias(sourceName), + ksqlConfig, + executionContext.getMetaStore(), + NoopProcessingLogContext.INSTANCE.getLoggerFactory().getLogger("any") + ); + + final Map, GenericRow> output = new LinkedHashMap<>(); + input.rows.forEach((k, v) -> output.put(k, mapper.apply(v))); + return new Result( + schema.build(), + output + ); } private static PersistentQueryMetadata findMaterializingQuery( final KsqlExecutionContext executionContext, - final Query query + final Analysis analysis ) { final MetaStore metaStore = executionContext.getMetaStore(); - final Table sourceTable = getSourceRelation(query.getFrom()); + final String sourceName = getSourceName(analysis); - final DataSource source = getSource(sourceTable, metaStore); - - final Set queries = metaStore.getQueriesWithSink(source.getName()); + final Set queries = metaStore.getQueriesWithSink(sourceName); if (queries.isEmpty()) { - throw notMaterializedException(sourceTable); + throw notMaterializedException(sourceName); } if (queries.size() > 1) { - throw new KsqlException("Multiple queries currently materialize '" + sourceTable + "'." + throw new KsqlException("Multiple queries currently materialize '" + sourceName + "'." + " KSQL currently only supports static queries when the table has only been" + " materialized once."); } @@ -501,32 +582,9 @@ private static PersistentQueryMetadata findMaterializingQuery( .orElseThrow(() -> new KsqlException("Materializing query has been stopped")); } - private static DataSource getSource( - final Table sourceTable, - final MetaStore metaStore - ) { - final DataSource source = metaStore.getSource(sourceTable.getName().toString()); - if (source == null) { - throw new KsqlException("Unknown source: " + sourceTable.getName()); - } - - return source; - } - - private static Table getSourceRelation(final Relation from) { - if (from instanceof Join) { - throw new KsqlException("Static queries do not support joins."); - } - - if (from instanceof Table) { - return (Table) from; - } - - if (from instanceof AliasedRelation) { - return getSourceRelation(((AliasedRelation) from).getRelation()); - } - - throw new KsqlException("Unsupported source type: " + from.getClass().getSimpleName()); + private static String getSourceName(final Analysis analysis) { + final DataSource source = analysis.getFromDataSources().get(0).getDataSource(); + return source.getName(); } private static KsqlNode getOwner(final Struct rowKey, final Materialization mat) { @@ -567,7 +625,7 @@ private static KsqlEntity proxyTo( } } - private static KsqlException notMaterializedException(final Table sourceTable) { + private static KsqlException notMaterializedException(final String sourceTable) { return new KsqlException( "Table '" + sourceTable + "' is not materialized." + " KSQL currently only supports static queries on materialized aggregate tables." From e6ca17bcdfed6e8fc0d32f9734c52f5f8a5206bd Mon Sep 17 00:00:00 2001 From: Andy Coates Date: Tue, 17 Sep 2019 20:53:02 +0100 Subject: [PATCH 2/4] fix: test class name --- ...tionalAnalyzerTest.java => QueryAnalyzerFunctionalTest.java} | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) rename ksql-engine/src/test/java/io/confluent/ksql/analyzer/{QueryFunctionalAnalyzerTest.java => QueryAnalyzerFunctionalTest.java} (99%) diff --git a/ksql-engine/src/test/java/io/confluent/ksql/analyzer/QueryFunctionalAnalyzerTest.java b/ksql-engine/src/test/java/io/confluent/ksql/analyzer/QueryAnalyzerFunctionalTest.java similarity index 99% rename from ksql-engine/src/test/java/io/confluent/ksql/analyzer/QueryFunctionalAnalyzerTest.java rename to ksql-engine/src/test/java/io/confluent/ksql/analyzer/QueryAnalyzerFunctionalTest.java index b96bfd37cd40..8e1792183d6f 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/analyzer/QueryFunctionalAnalyzerTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/analyzer/QueryAnalyzerFunctionalTest.java @@ -64,7 +64,7 @@ *

This test file is more of a functional test, which is better implemented using QTT. */ @SuppressWarnings("OptionalGetWithoutIsPresent") -public class QueryFunctionalAnalyzerTest { +public class QueryAnalyzerFunctionalTest { private static final QualifiedNameReference ITEM_ID = new QualifiedNameReference(QualifiedName.of("ORDERS", "ITEMID")); From a5648ad89e2c81241785b79c1c093439f7fc3c60 Mon Sep 17 00:00:00 2001 From: Andy Coates Date: Wed, 18 Sep 2019 17:40:15 +0100 Subject: [PATCH 3/4] chore: fix test --- .../io/confluent/ksql/analyzer/StaticQueryValidatorTest.java | 1 + 1 file changed, 1 insertion(+) diff --git a/ksql-engine/src/test/java/io/confluent/ksql/analyzer/StaticQueryValidatorTest.java b/ksql-engine/src/test/java/io/confluent/ksql/analyzer/StaticQueryValidatorTest.java index 7043d6905dd9..ce50e0df6b85 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/analyzer/StaticQueryValidatorTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/analyzer/StaticQueryValidatorTest.java @@ -59,6 +59,7 @@ public void setUp() { validator = new StaticQueryValidator(); when(query.isStatic()).thenReturn(true); + when(query.getResultMaterialization()).thenReturn(ResultMaterialization.FINAL); } @Test From 98a9a140c93384434c7f98d421030f1addcc32fa Mon Sep 17 00:00:00 2001 From: Andy Coates Date: Thu, 19 Sep 2019 15:22:22 +0100 Subject: [PATCH 4/4] chore: tim's requested changes --- .../main/java/io/confluent/ksql/analyzer/QueryValidator.java | 2 +- .../java/io/confluent/ksql/analyzer/StaticQueryValidator.java | 2 +- .../confluent/ksql/analyzer/ContinuousQueryValidatorTest.java | 1 - .../ksql/rest/server/execution/StaticQueryExecutor.java | 2 +- 4 files changed, 3 insertions(+), 4 deletions(-) diff --git a/ksql-engine/src/main/java/io/confluent/ksql/analyzer/QueryValidator.java b/ksql-engine/src/main/java/io/confluent/ksql/analyzer/QueryValidator.java index 5a3b8f949323..9147f90182f5 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/analyzer/QueryValidator.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/analyzer/QueryValidator.java @@ -20,7 +20,7 @@ import java.util.Optional; /** - * Vaidator used by {@link QueryAnalyzer}. + * Validator used by {@link QueryAnalyzer}. */ interface QueryValidator { diff --git a/ksql-engine/src/main/java/io/confluent/ksql/analyzer/StaticQueryValidator.java b/ksql-engine/src/main/java/io/confluent/ksql/analyzer/StaticQueryValidator.java index 58307c34c0ac..a8edafa8b36e 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/analyzer/StaticQueryValidator.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/analyzer/StaticQueryValidator.java @@ -34,7 +34,7 @@ public void preValidate( if (query.getResultMaterialization() != ResultMaterialization.FINAL) { throw new KsqlException("Static queries do not yet support `EMIT CHANGES`. " - + "Consider removing 'EMIT CHANGES' to any bare query, " + + "Consider removing 'EMIT CHANGES' to any bare query." + QueryAnalyzer.NEW_QUERY_SYNTAX_HELP ); } diff --git a/ksql-engine/src/test/java/io/confluent/ksql/analyzer/ContinuousQueryValidatorTest.java b/ksql-engine/src/test/java/io/confluent/ksql/analyzer/ContinuousQueryValidatorTest.java index 54c67aba6075..eefa57980263 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/analyzer/ContinuousQueryValidatorTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/analyzer/ContinuousQueryValidatorTest.java @@ -53,7 +53,6 @@ public void setUp() { @Test public void shouldThrowOnContinuousQueryThatIsFinal() { // Given: - when(query.isStatic()).thenReturn(false); when(query.getResultMaterialization()).thenReturn(ResultMaterialization.FINAL); // Then: diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/StaticQueryExecutor.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/StaticQueryExecutor.java index 9a3182974c06..611cdbcab880 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/StaticQueryExecutor.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/StaticQueryExecutor.java @@ -263,7 +263,7 @@ private static WhereInfo extractWhereInfo( final Expression where = analysis.getWhereExpression(); if (where == null) { - throw invalidWhereClauseException("missing WHERE clause", windowed); + throw invalidWhereClauseException("Missing WHERE clause", windowed); } final Map> comparisons = extractComparisons(where);