Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(static): static select support #3369

Merged
merged 5 commits into from
Sep 19, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ public final class SchemaUtil {

public static final String ROWKEY_NAME = "ROWKEY";
public static final String ROWTIME_NAME = "ROWTIME";
public static final String WINDOWSTART_NAME = "WINDOWSTART";

public static final int ROWKEY_INDEX = 1;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ Analysis analyze(
final Query query,
final Optional<Sink> sink
) {
final Visitor visitor = new Visitor();
final Visitor visitor = new Visitor(query.isStatic());
visitor.process(query, null);

sink.ifPresent(visitor::analyzeNonStdOutSink);
Expand All @@ -146,9 +146,14 @@ private final class Visitor extends DefaultTraversalVisitor<AstNode, Void> {
// CHECKSTYLE_RULES.ON: ClassDataAbstractionCoupling

private final Analysis analysis = new Analysis();
private final boolean staticQuery;
private boolean isJoin = false;
private boolean isGroupBy = false;

Visitor(final boolean staticQuery) {
this.staticQuery = staticQuery;
big-andy-coates marked this conversation as resolved.
Show resolved Hide resolved
}

private void analyzeNonStdOutSink(final Sink sink) {
analysis.setProperties(sink.getProperties());
sink.getPartitionBy().ifPresent(analysis::setPartitionBy);
Expand Down Expand Up @@ -301,19 +306,26 @@ private void throwOnUnknownColumnReference() {
new ExpressionAnalyzer(analysis.getFromSourceSchemas());

for (final Expression selectExpression : analysis.getSelectExpressions()) {
expressionAnalyzer.analyzeExpression(selectExpression);
expressionAnalyzer.analyzeExpression(selectExpression, false);
}

if (analysis.getWhereExpression() != null) {
expressionAnalyzer.analyzeExpression(analysis.getWhereExpression());
final boolean allowWindowMetaFields = staticQuery
&& analysis.getFromDataSources().get(0)
.getDataSource()
big-andy-coates marked this conversation as resolved.
Show resolved Hide resolved
.getKsqlTopic()
.getKeyFormat()
.isWindowed();

expressionAnalyzer.analyzeExpression(analysis.getWhereExpression(), allowWindowMetaFields);
}

for (final Expression expression : analysis.getGroupByExpressions()) {
expressionAnalyzer.analyzeExpression(expression);
expressionAnalyzer.analyzeExpression(expression, false);
}

if (analysis.getHavingExpression() != null) {
expressionAnalyzer.analyzeExpression(analysis.getHavingExpression());
expressionAnalyzer.analyzeExpression(analysis.getHavingExpression(), false);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright 2019 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.analyzer;

import io.confluent.ksql.parser.tree.Query;
import io.confluent.ksql.parser.tree.ResultMaterialization;
import io.confluent.ksql.parser.tree.Sink;
import io.confluent.ksql.util.KsqlException;
import java.util.Optional;

public class ContinuousQueryValidator implements QueryValidator {

@Override
public void preValidate(
final Query query,
final Optional<Sink> sink
) {
if (query.isStatic()) {
throw new IllegalArgumentException("static");
}

if (query.getResultMaterialization() != ResultMaterialization.CHANGES) {
throw new KsqlException("Continuous queries do not yet support `EMIT FINAL`. "
+ "Consider changing to `EMIT CHANGES`."
+ QueryAnalyzer.NEW_QUERY_SYNTAX_HELP
);
}
}

@Override
public void postValidate(final Analysis analysis) {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,34 +45,18 @@ class ExpressionAnalyzer {
this.sourceSchemas = Objects.requireNonNull(sourceSchemas, "sourceSchemas");
}

void analyzeExpression(final Expression expression) {
final Visitor visitor = new Visitor();
void analyzeExpression(final Expression expression, final boolean allowWindowMetaFields) {
final Visitor visitor = new Visitor(allowWindowMetaFields);
visitor.process(expression, null);
}

private void throwOnUnknownField(final QualifiedName name) {
final Set<String> sourcesWithField = sourceSchemas.sourcesWithField(name.name());
if (sourcesWithField.isEmpty()) {
throw new KsqlException("Field '" + name + "' cannot be resolved.");
}
private final class Visitor extends VisitParentExpressionVisitor<Object, Object> {

if (name.qualifier().isPresent()) {
if (!sourcesWithField.contains(name.qualifier().get())) {
throw new KsqlException("Source '" + name.qualifier() + "', "
+ "used in '" + name + "' cannot be resolved.");
}
} else if (sourcesWithField.size() > 1) {
final String possibilities = sourcesWithField.stream()
.sorted()
.map(source -> SchemaUtil.buildAliasedFieldName(source, name.name()))
.collect(Collectors.joining(","));

throw new KsqlException("Field '" + name + "' is ambiguous. "
+ "Could be any of: " + possibilities);
}
}
private final boolean allowWindowMetaFields;

private class Visitor extends VisitParentExpressionVisitor<Object, Object> {
Visitor(final boolean allowWindowMetaFields) {
this.allowWindowMetaFields = allowWindowMetaFields;
}

public Object visitLikePredicate(final LikePredicate node, final Object context) {
process(node.getValue(), null);
Expand Down Expand Up @@ -138,5 +122,32 @@ public Object visitQualifiedNameReference(
throwOnUnknownField(node.getName());
return null;
}

private void throwOnUnknownField(final QualifiedName name) {
final Set<String> sourcesWithField = sourceSchemas.sourcesWithField(name.name());
if (sourcesWithField.isEmpty()) {
if (allowWindowMetaFields && name.name().equals(SchemaUtil.WINDOWSTART_NAME)) {
big-andy-coates marked this conversation as resolved.
Show resolved Hide resolved
return;
}

throw new KsqlException("Field '" + name + "' cannot be resolved.");
}

if (name.qualifier().isPresent()) {
final String qualifier = name.qualifier().get();
if (!sourcesWithField.contains(qualifier)) {
throw new KsqlException("Source '" + qualifier + "', "
+ "used in '" + name + "' cannot be resolved.");
}
} else if (sourcesWithField.size() > 1) {
final String possibilities = sourcesWithField.stream()
.sorted()
.map(source -> SchemaUtil.buildAliasedFieldName(source, name.name()))
.collect(Collectors.joining(", "));

throw new KsqlException("Field '" + name + "' is ambiguous. "
+ "Could be any of: " + possibilities);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import static java.util.Objects.requireNonNull;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
import com.google.common.collect.Sets.SetView;
Expand All @@ -27,7 +28,6 @@
import io.confluent.ksql.metastore.MetaStore;
import io.confluent.ksql.parser.rewrite.ExpressionTreeRewriter;
import io.confluent.ksql.parser.tree.Query;
import io.confluent.ksql.parser.tree.ResultMaterialization;
import io.confluent.ksql.parser.tree.Sink;
import io.confluent.ksql.serde.SerdeOption;
import io.confluent.ksql.util.AggregateExpressionRewriter;
Expand All @@ -40,57 +40,70 @@

public class QueryAnalyzer {

private static final String NEW_QUERY_SYNTAX_HELP =
"'EMIT CHANGES' is used to indicate a query is continuous and outputs all changes."
+ System.lineSeparator()
+ "'Bare queries, e.g. those in the format 'SELECT * FROM X ...' are now, by default, "
+ "static queries, i.e. they query the current state of the system and return a final "
+ "result."
+ System.lineSeparator()
+ "To turn a static query into a streaming query, as was the default in older versions "
+ "of KSQL, add `EMIT CHANGES` to the end of the statement, before any limit clause."
+ System.lineSeparator()
+ "Persistent queries, e.g. `CREATE STREAM AS ...`, currently have an implicit "
+ "`EMIT CHANGES`. However, it is recommended to add `EMIT CHANGES` to such statements "
+ "as a this will be required in a future release.";

static final String NEW_QUERY_SYNTAX_HELP = System.lineSeparator()
+ "'EMIT CHANGES' is used to indicate a query is continuous and outputs all changes."
+ System.lineSeparator()
+ "'Bare queries, e.g. those in the format 'SELECT * FROM X ...' are now, by default, "
+ "static queries, i.e. they query the current state of the system and return a final "
+ "result."
+ System.lineSeparator()
+ "To turn a static query into a streaming query, as was the default in older versions "
+ "of KSQL, add `EMIT CHANGES` to the end of the statement, before any limit clause."
+ System.lineSeparator()
+ "Persistent queries, e.g. `CREATE STREAM AS ...`, currently have an implicit "
+ "`EMIT CHANGES`. However, it is recommended to add `EMIT CHANGES` to such statements "
+ "as a this will be required in a future release.";

private final Analyzer analyzer;
private final MetaStore metaStore;
private final String outputTopicPrefix;
private final Set<SerdeOption> defaultSerdeOptions;
private final QueryValidator continuousValidator;
private final QueryValidator staticValidator;

public QueryAnalyzer(
final MetaStore metaStore,
final String outputTopicPrefix,
final Set<SerdeOption> defaultSerdeOptions
) {
this(
metaStore,
new Analyzer(metaStore, outputTopicPrefix, defaultSerdeOptions),
new ContinuousQueryValidator(),
new StaticQueryValidator()
);
}

@VisibleForTesting
QueryAnalyzer(
final MetaStore metaStore,
final Analyzer analyzer,
final QueryValidator continuousValidator,
final QueryValidator staticValidator
) {
this.metaStore = requireNonNull(metaStore, "metaStore");
this.outputTopicPrefix = requireNonNull(outputTopicPrefix, "outputTopicPrefix");
this.defaultSerdeOptions = ImmutableSet.copyOf(
requireNonNull(defaultSerdeOptions, "defaultSerdeOptions"));
this.analyzer = requireNonNull(analyzer, "analyzer");
this.continuousValidator = requireNonNull(continuousValidator, "continuousValidator");
this.staticValidator = requireNonNull(staticValidator, "staticValidator");
}

public Analysis analyze(
final Query query,
final Optional<Sink> sink
) {
if (query.isStatic()) {
throw new KsqlException("Static queries are not yet supported. "
+ "Consider adding 'EMIT CHANGES' to any bare query, "
+ System.lineSeparator()
+ NEW_QUERY_SYNTAX_HELP
);
staticValidator.preValidate(query, sink);
} else {
continuousValidator.preValidate(query, sink);
}

if (query.getResultMaterialization() != ResultMaterialization.CHANGES) {
throw new KsqlException("Continous queries do not yet support `EMIT FINAL`. "
+ "Consider changing to `EMIT CHANGES`."
+ System.lineSeparator()
+ NEW_QUERY_SYNTAX_HELP
);
final Analysis analysis = analyzer.analyze(query, sink);

if (query.isStatic()) {
staticValidator.postValidate(analysis);
} else {
continuousValidator.postValidate(analysis);
}

return new Analyzer(metaStore, outputTopicPrefix, defaultSerdeOptions)
.analyze(query, sink);
return analysis;
}

public AggregateAnalysis analyzeAggregate(final Query query, final Analysis analysis) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright 2019 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.analyzer;

import io.confluent.ksql.parser.tree.Query;
import io.confluent.ksql.parser.tree.Sink;
import java.util.Optional;

/**
* Vaidator used by {@link QueryAnalyzer}.
big-andy-coates marked this conversation as resolved.
Show resolved Hide resolved
*/
interface QueryValidator {

void preValidate(Query query, Optional<Sink> sink);

void postValidate(Analysis analysis);
}
Loading