diff --git a/ksql-engine/src/main/java/io/confluent/ksql/analyzer/Analysis.java b/ksql-engine/src/main/java/io/confluent/ksql/analyzer/Analysis.java index 3521d22cd0d9..66b9b0b6c848 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/analyzer/Analysis.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/analyzer/Analysis.java @@ -281,30 +281,30 @@ public DataSource getDataSource() { @Immutable public static final class JoinInfo { - private final ColumnRef leftJoinField; - private final ColumnRef rightJoinField; + private final Expression leftJoinExpression; + private final Expression rightJoinExpression; private final JoinNode.JoinType type; private final Optional withinExpression; JoinInfo( - final ColumnRef leftJoinField, - final ColumnRef rightJoinField, + final Expression leftJoinExpression, + final Expression rightJoinExpression, final JoinType type, final Optional withinExpression ) { - this.leftJoinField = requireNonNull(leftJoinField, "leftJoinField"); - this.rightJoinField = requireNonNull(rightJoinField, "rightJoinField"); + this.leftJoinExpression = requireNonNull(leftJoinExpression, "leftJoinExpression"); + this.rightJoinExpression = requireNonNull(rightJoinExpression, "rightJoinExpression"); this.type = requireNonNull(type, "type"); this.withinExpression = requireNonNull(withinExpression, "withinExpression"); } - public ColumnRef getLeftJoinField() { - return leftJoinField; + public Expression getLeftJoinExpression() { + return leftJoinExpression; } - public ColumnRef getRightJoinField() { - return rightJoinField; + public Expression getRightJoinExpression() { + return rightJoinExpression; } public JoinType getType() { diff --git a/ksql-engine/src/main/java/io/confluent/ksql/analyzer/Analyzer.java b/ksql-engine/src/main/java/io/confluent/ksql/analyzer/Analyzer.java index 7f0c174529ce..1cffda3855da 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/analyzer/Analyzer.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/analyzer/Analyzer.java @@ -18,7 +18,9 @@ import static java.util.Objects.requireNonNull; import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; +import com.google.common.collect.MoreCollectors; import io.confluent.ksql.analyzer.Analysis.AliasedDataSource; import io.confluent.ksql.analyzer.Analysis.Into; import io.confluent.ksql.analyzer.Analysis.JoinInfo; @@ -67,7 +69,6 @@ import io.confluent.ksql.util.SchemaUtil; import java.util.HashSet; import java.util.List; -import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; @@ -349,21 +350,28 @@ protected AstNode visitJoin(final Join node, final Void context) { ); } - final ColumnRef leftJoinField = getJoinFieldName( - comparisonExpression, - left.getAlias(), - left.getDataSource().getSchema() - ); + final Set colsUsedInLeft = new ExpressionAnalyzer(analysis.getFromSourceSchemas()) + .analyzeExpression(comparisonExpression.getLeft(), false); + final Set colsUsedInRight = new ExpressionAnalyzer(analysis.getFromSourceSchemas()) + .analyzeExpression(comparisonExpression.getRight(), false); - final ColumnRef rightJoinField = getJoinFieldName( - comparisonExpression, - right.getAlias(), - right.getDataSource().getSchema() - ); + final SourceName leftSourceName = getOnlySourceForJoin( + comparisonExpression.getLeft(), comparisonExpression, colsUsedInLeft); + final SourceName rightSourceName = getOnlySourceForJoin( + comparisonExpression.getRight(), comparisonExpression, colsUsedInRight); + + if (!validJoin(left.getAlias(), right.getAlias(), leftSourceName, rightSourceName)) { + throw new KsqlException( + "Each side of the join must reference exactly one source and not the same source. " + + "Left side references " + leftSourceName + + " and right references " + rightSourceName + ); + } + final boolean flipped = leftSourceName.equals(right.getAlias()); analysis.setJoin(new JoinInfo( - leftJoinField, - rightJoinField, + flipped ? comparisonExpression.getRight() : comparisonExpression.getLeft(), + flipped ? comparisonExpression.getLeft() : comparisonExpression.getRight(), joinType, node.getWithinExpression() )); @@ -371,6 +379,34 @@ protected AstNode visitJoin(final Join node, final Void context) { return null; } + private boolean validJoin( + final SourceName leftName, + final SourceName rightName, + final SourceName leftExpressionSource, + final SourceName rightExpressionSource + ) { + return ImmutableSet.of(leftExpressionSource, rightExpressionSource) + .containsAll(ImmutableList.of(leftName, rightName)); + } + + private SourceName getOnlySourceForJoin( + final Expression exp, + final ComparisonExpression join, + final Set columnRefs + ) { + try { + return columnRefs.stream() + .map(ColumnRef::source) + .filter(Optional::isPresent) + .map(Optional::get) + .collect(MoreCollectors.onlyElement()); + } catch (final Exception e) { + throw new KsqlException("Invalid comparison expression '" + exp + "' in join '" + join + + "'. Each side of the join comparision must contain references from exactly one " + + "source."); + } + } + private JoinNode.JoinType getJoinType(final Join node) { final JoinNode.JoinType joinType; switch (node.getType()) { @@ -389,86 +425,6 @@ private JoinNode.JoinType getJoinType(final Join node) { return joinType; } - private ColumnReferenceExp checkExpressionType( - final ComparisonExpression comparisonExpression, - final Expression subExpression) { - - if (!(subExpression instanceof ColumnReferenceExp)) { - throw new KsqlException( - String.format( - "%s : Invalid comparison expression '%s' in join '%s'. Joins must only contain a " - + "field comparison.", - comparisonExpression.getLocation().map(Objects::toString).orElse(""), - subExpression, - comparisonExpression - ) - ); - } - return (ColumnReferenceExp) subExpression; - } - - private ColumnRef getJoinFieldName( - final ComparisonExpression comparisonExpression, - final SourceName sourceAlias, - final LogicalSchema sourceSchema - ) { - final ColumnReferenceExp left = - checkExpressionType(comparisonExpression, comparisonExpression.getLeft()); - - Optional joinFieldName = getJoinFieldNameFromExpr(left, sourceAlias); - - if (!joinFieldName.isPresent()) { - final ColumnReferenceExp right = - checkExpressionType(comparisonExpression, comparisonExpression.getRight()); - - joinFieldName = getJoinFieldNameFromExpr(right, sourceAlias); - - if (!joinFieldName.isPresent()) { - // Should never happen as only QualifiedNameReference are allowed - throw new IllegalStateException("Cannot find join field name"); - } - } - - final ColumnRef fieldName = joinFieldName.get(); - - final Optional joinField = - getJoinFieldNameFromSource(fieldName.withoutSource(), sourceAlias, sourceSchema); - - return joinField - .orElseThrow(() -> new KsqlException( - String.format( - "%s : Invalid join criteria %s. Column %s.%s does not exist.", - comparisonExpression.getLocation().map(Objects::toString).orElse(""), - comparisonExpression, - sourceAlias.name(), - fieldName.name().toString(FormatOptions.noEscape()) - ) - )); - } - - private Optional getJoinFieldNameFromExpr( - final ColumnReferenceExp nameRef, - final SourceName sourceAlias - ) { - if (nameRef.getReference().source().isPresent() - && !nameRef.getReference().source().get().equals(sourceAlias)) { - return Optional.empty(); - } - - final ColumnRef fieldName = nameRef.getReference(); - return Optional.of(fieldName); - } - - private Optional getJoinFieldNameFromSource( - final ColumnRef fieldName, - final SourceName sourceAlias, - final LogicalSchema sourceSchema - ) { - return sourceSchema.findColumn(fieldName) - .map(Column::ref) - .map(ref -> ref.withSource(sourceAlias)); - } - @Override protected AstNode visitAliasedRelation(final AliasedRelation node, final Void context) { final SourceName structuredDataSourceName = ((Table) node.getRelation()).getName(); diff --git a/ksql-engine/src/main/java/io/confluent/ksql/analyzer/ExpressionAnalyzer.java b/ksql-engine/src/main/java/io/confluent/ksql/analyzer/ExpressionAnalyzer.java index e10ee0908c2f..0374ea0954e9 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/analyzer/ExpressionAnalyzer.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/analyzer/ExpressionAnalyzer.java @@ -15,24 +15,16 @@ package io.confluent.ksql.analyzer; -import io.confluent.ksql.execution.expression.tree.ArithmeticBinaryExpression; -import io.confluent.ksql.execution.expression.tree.Cast; +import com.google.common.collect.Iterables; import io.confluent.ksql.execution.expression.tree.ColumnReferenceExp; -import io.confluent.ksql.execution.expression.tree.ComparisonExpression; -import io.confluent.ksql.execution.expression.tree.DereferenceExpression; import io.confluent.ksql.execution.expression.tree.Expression; -import io.confluent.ksql.execution.expression.tree.FunctionCall; -import io.confluent.ksql.execution.expression.tree.IsNotNullPredicate; -import io.confluent.ksql.execution.expression.tree.IsNullPredicate; -import io.confluent.ksql.execution.expression.tree.LikePredicate; -import io.confluent.ksql.execution.expression.tree.LogicalBinaryExpression; -import io.confluent.ksql.execution.expression.tree.NotExpression; -import io.confluent.ksql.execution.expression.tree.VisitParentExpressionVisitor; +import io.confluent.ksql.execution.expression.tree.TraversalExpressionVisitor; import io.confluent.ksql.name.SourceName; import io.confluent.ksql.schema.ksql.ColumnRef; import io.confluent.ksql.schema.ksql.FormatOptions; import io.confluent.ksql.util.KsqlException; import io.confluent.ksql.util.SchemaUtil; +import java.util.HashSet; import java.util.Objects; import java.util.Set; import java.util.stream.Collectors; @@ -48,99 +40,46 @@ class ExpressionAnalyzer { this.sourceSchemas = Objects.requireNonNull(sourceSchemas, "sourceSchemas"); } - void analyzeExpression(final Expression expression, final boolean allowWindowMetaFields) { - final Visitor visitor = new Visitor(allowWindowMetaFields); - visitor.process(expression, null); + Set analyzeExpression( + final Expression expression, + final boolean allowWindowMetaFields + ) { + final Set referencedColumns = new HashSet<>(); + final ColumnExtractor extractor = new ColumnExtractor(allowWindowMetaFields, referencedColumns); + extractor.process(expression, null); + return referencedColumns; } - private final class Visitor extends VisitParentExpressionVisitor { + private final class ColumnExtractor extends TraversalExpressionVisitor { + private final Set referencedColumns; private final boolean allowWindowMetaFields; - Visitor(final boolean allowWindowMetaFields) { + ColumnExtractor( + final boolean allowWindowMetaFields, + final Set referencedColumns + ) { this.allowWindowMetaFields = allowWindowMetaFields; - } - - public Object visitLikePredicate(final LikePredicate node, final Object context) { - process(node.getValue(), null); - return null; - } - - public Object visitFunctionCall(final FunctionCall node, final Object context) { - for (final Expression argExpr : node.getArguments()) { - process(argExpr, null); - } - return null; - } - - public Object visitArithmeticBinary( - final ArithmeticBinaryExpression node, - final Object context) { - process(node.getLeft(), null); - process(node.getRight(), null); - return null; - } - - public Object visitIsNotNullPredicate(final IsNotNullPredicate node, final Object context) { - return process(node.getValue(), context); - } - - public Object visitIsNullPredicate(final IsNullPredicate node, final Object context) { - return process(node.getValue(), context); - } - - public Object visitLogicalBinaryExpression( - final LogicalBinaryExpression node, - final Object context) { - process(node.getLeft(), null); - process(node.getRight(), null); - return null; + this.referencedColumns = referencedColumns; } @Override - public Object visitComparisonExpression( - final ComparisonExpression node, - final Object context) { - process(node.getLeft(), null); - process(node.getRight(), null); - return null; - } - - @Override - public Object visitNotExpression(final NotExpression node, final Object context) { - return process(node.getValue(), null); - } - - @Override - public Object visitCast(final Cast node, final Object context) { - process(node.getExpression(), context); - return null; - } - - @Override - public Object visitColumnReference( + public Void visitColumnReference( final ColumnReferenceExp node, final Object context ) { - throwOnUnknownOrAmbiguousColumn(node.getReference()); + final ColumnRef reference = node.getReference(); + referencedColumns.add(getQualifiedColumnRef(reference)); return null; } - @Override - public Object visitDereferenceExpression( - final DereferenceExpression node, - final Object context - ) { - process(node.getBase(), context); - return null; - } - - private void throwOnUnknownOrAmbiguousColumn(final ColumnRef name) { + private ColumnRef getQualifiedColumnRef(final ColumnRef name) { final Set sourcesWithField = sourceSchemas.sourcesWithField(name); if (sourcesWithField.isEmpty()) { if (allowWindowMetaFields && name.name().equals(SchemaUtil.WINDOWSTART_NAME)) { - return; + // window start doesn't need a qualifier as it's a special hacky column + return name; } throw new KsqlException("Column '" + name.toString(FormatOptions.noEscape()) @@ -156,6 +95,8 @@ private void throwOnUnknownOrAmbiguousColumn(final ColumnRef name) { throw new KsqlException("Column '" + name.name().name() + "' is ambiguous. " + "Could be any of: " + possibilities); } + + return name.withSource(Iterables.getOnlyElement(sourcesWithField)); } } } diff --git a/ksql-engine/src/main/java/io/confluent/ksql/planner/LogicalPlanner.java b/ksql-engine/src/main/java/io/confluent/ksql/planner/LogicalPlanner.java index b6488ff65f79..e95cf32c886c 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/planner/LogicalPlanner.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/planner/LogicalPlanner.java @@ -84,7 +84,8 @@ public OutputNode buildPlan() { } if (analysis.getPartitionBy().isPresent()) { - currentNode = buildRepartitionNode(currentNode, analysis.getPartitionBy().get()); + currentNode = buildRepartitionNode( + "PartitionBy", currentNode, analysis.getPartitionBy().get()); } if (!analysis.getTableFunctions().isEmpty()) { @@ -207,6 +208,7 @@ private static FilterNode buildFilterNode( } private RepartitionNode buildRepartitionNode( + final String planId, final PlanNode sourceNode, final Expression partitionBy ) { @@ -219,7 +221,7 @@ private RepartitionNode buildRepartitionNode( final LogicalSchema sourceSchema = sourceNode.getSchema(); final Column proposedKey = sourceSchema - .findValueColumn(columnRef) + .findColumn(columnRef) .orElseThrow(() -> new KsqlException("Invalid identifier for PARTITION BY clause: '" + columnRef.name().toString(FormatOptions.noEscape()) + "' Only columns from the " + "source schema can be referenced in the PARTITION BY clause.")); @@ -240,7 +242,7 @@ private RepartitionNode buildRepartitionNode( final LogicalSchema schema = buildRepartitionedSchema(sourceNode, partitionBy); return new RepartitionNode( - new PlanNodeId("PartitionBy"), + new PlanNodeId(planId), sourceNode, schema, partitionBy, @@ -286,10 +288,13 @@ private PlanNode buildSourceNode() { new PlanNodeId("Join"), analysis.getSelectExpressions(), joinInfo.get().getType(), - leftSourceNode, - rightSourceNode, - joinInfo.get().getLeftJoinField(), - joinInfo.get().getRightJoinField(), + // it is always safe to build the repartition node - this operation will be + // a no-op if a repartition is not required. if the source is a table, and + // a repartition is needed, then an exception will be thrown + buildRepartitionNode( + "LeftSourceKeyed", leftSourceNode, joinInfo.get().getLeftJoinExpression()), + buildRepartitionNode( + "RightSourceKeyed", rightSourceNode, joinInfo.get().getRightJoinExpression()), joinInfo.get().getWithinExpression() ); } diff --git a/ksql-engine/src/main/java/io/confluent/ksql/planner/plan/JoinNode.java b/ksql-engine/src/main/java/io/confluent/ksql/planner/plan/JoinNode.java index 5f07e36f585a..a1b0ce8dfd0a 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/planner/plan/JoinNode.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/planner/plan/JoinNode.java @@ -19,17 +19,11 @@ import com.google.common.collect.ImmutableMap; import io.confluent.ksql.execution.builder.KsqlQueryBuilder; import io.confluent.ksql.execution.context.QueryContext; -import io.confluent.ksql.execution.context.QueryContext.Stacker; -import io.confluent.ksql.execution.expression.tree.ColumnReferenceExp; import io.confluent.ksql.execution.plan.SelectExpression; import io.confluent.ksql.execution.streams.JoinParamsFactory; import io.confluent.ksql.metastore.model.DataSource.DataSourceType; import io.confluent.ksql.metastore.model.KeyField; import io.confluent.ksql.parser.tree.WithinExpression; -import io.confluent.ksql.schema.ksql.Column; -import io.confluent.ksql.schema.ksql.Column.Namespace; -import io.confluent.ksql.schema.ksql.ColumnRef; -import io.confluent.ksql.schema.ksql.FormatOptions; import io.confluent.ksql.schema.ksql.LogicalSchema; import io.confluent.ksql.serde.ValueFormat; import io.confluent.ksql.services.KafkaTopicClient; @@ -37,7 +31,6 @@ import io.confluent.ksql.structured.SchemaKTable; import io.confluent.ksql.util.KsqlException; import io.confluent.ksql.util.Pair; -import io.confluent.ksql.util.SchemaUtil; import java.util.Arrays; import java.util.Collections; import java.util.List; @@ -54,11 +47,9 @@ public enum JoinType { } private final JoinType joinType; - private final DataSourceNode left; - private final DataSourceNode right; + private final PlanNode left; + private final PlanNode right; private final LogicalSchema schema; - private final ColumnRef leftJoinFieldName; - private final ColumnRef rightJoinFieldName; private final KeyField keyField; private final Optional withinExpression; private final ImmutableList selectExpressions; @@ -67,32 +58,23 @@ public JoinNode( final PlanNodeId id, final List selectExpressions, final JoinType joinType, - final DataSourceNode left, - final DataSourceNode right, - final ColumnRef leftJoinFieldName, - final ColumnRef rightJoinFieldName, + final PlanNode left, + final PlanNode right, final Optional withinExpression ) { super(id, calculateSinkType(left, right)); this.joinType = Objects.requireNonNull(joinType, "joinType"); this.left = Objects.requireNonNull(left, "left"); this.right = Objects.requireNonNull(right, "right"); - this.leftJoinFieldName = Objects.requireNonNull(leftJoinFieldName, "leftJoinFieldName"); - this.rightJoinFieldName = Objects.requireNonNull(rightJoinFieldName, "rightJoinFieldName"); this.withinExpression = Objects.requireNonNull(withinExpression, "withinExpression"); this.selectExpressions = ImmutableList .copyOf(Objects.requireNonNull(selectExpressions, "selectExpressions")); - final Column leftKeyCol = validateSchemaColumn(leftJoinFieldName, left.getSchema()); - validateSchemaColumn(rightJoinFieldName, right.getSchema()); - this.keyField = joinType == JoinType.OUTER ? KeyField.none() // Both source key columns can be null, hence neither can be the keyField - : left.getSchema().isKeyColumn(leftKeyCol.name()) - ? left.getKeyField() - : KeyField.of(leftKeyCol.ref()); + : left.getKeyField(); - this.schema = buildJoinSchema(left, leftJoinFieldName, right, rightJoinFieldName); + this.schema = buildJoinSchema(left, right); } @Override @@ -120,11 +102,11 @@ public List getSelectExpressions() { return selectExpressions; } - public DataSourceNode getLeft() { + public PlanNode getLeft() { return left; } - public DataSourceNode getRight() { + public PlanNode getRight() { return right; } @@ -138,7 +120,7 @@ public SchemaKStream buildStream(final KsqlQueryBuilder builder) { this, builder.buildNodeContext(getId().toString())); - return joinerFactory.getJoiner(left.getDataSourceType(), right.getDataSourceType()).join(); + return joinerFactory.getJoiner(left.getNodeOutputType(), right.getNodeOutputType()).join(); } @Override @@ -161,14 +143,8 @@ private void ensureMatchingPartitionCounts(final KafkaTopicClient kafkaTopicClie } } - private static String getSourceName(final DataSourceNode node) { - return node.getDataSource().getName().name(); - } - - private static Column validateSchemaColumn(final ColumnRef column, final LogicalSchema schema) { - return schema.findValueColumn(column) - .orElseThrow(() -> new IllegalArgumentException( - "Invalid join field, not found in schema: " + column)); + private static String getSourceName(final PlanNode node) { + return node.getTheSourceNode().getAlias().name(); } private static class JoinerFactory { @@ -220,15 +196,9 @@ private abstract static class Joiner { public abstract SchemaKStream join(); - SchemaKStream buildStream( - final PlanNode node, - final ColumnRef joinFieldName - ) { - return maybeRePartitionByKey( - node.buildStream(builder), - joinFieldName, - contextStacker - ); + @SuppressWarnings("unchecked") + SchemaKStream buildStream(final PlanNode node) { + return (SchemaKStream) node.buildStream(builder); } @SuppressWarnings("unchecked") @@ -246,17 +216,9 @@ SchemaKTable buildTable(final PlanNode node) { return (SchemaKTable) schemaKStream; } - @SuppressWarnings("unchecked") - static SchemaKStream maybeRePartitionByKey( - final SchemaKStream stream, - final ColumnRef joinFieldName, - final Stacker contextStacker - ) { - return stream.selectKey(new ColumnReferenceExp(joinFieldName), contextStacker); - } - - static ValueFormat getFormatForSource(final DataSourceNode sourceNode) { - return sourceNode.getDataSource() + static ValueFormat getFormatForSource(final PlanNode sourceNode) { + return sourceNode.getTheSourceNode() + .getDataSource() .getKsqlTopic() .getValueFormat(); } @@ -282,10 +244,10 @@ public SchemaKStream join() { } final SchemaKStream leftStream = buildStream( - joinNode.getLeft(), joinNode.leftJoinFieldName); + joinNode.getLeft()); final SchemaKStream rightStream = buildStream( - joinNode.getRight(), joinNode.rightJoinFieldName); + joinNode.getRight()); switch (joinNode.joinType) { case LEFT: @@ -342,7 +304,7 @@ public SchemaKStream join() { final SchemaKTable rightTable = buildTable(joinNode.getRight()); final SchemaKStream leftStream = buildStream( - joinNode.getLeft(), joinNode.leftJoinFieldName); + joinNode.getLeft()); switch (joinNode.joinType) { case LEFT: @@ -414,90 +376,20 @@ public SchemaKTable join() { } private static DataSourceType calculateSinkType( - final DataSourceNode left, - final DataSourceNode right + final PlanNode left, + final PlanNode right ) { - final DataSourceType leftType = left.getDataSourceType(); - final DataSourceType rightType = right.getDataSourceType(); + final DataSourceType leftType = left.getNodeOutputType(); + final DataSourceType rightType = right.getNodeOutputType(); return leftType == DataSourceType.KTABLE && rightType == DataSourceType.KTABLE ? DataSourceType.KTABLE : DataSourceType.KSTREAM; } private static LogicalSchema buildJoinSchema( - final DataSourceNode left, - final ColumnRef leftJoinFieldName, - final DataSourceNode right, - final ColumnRef rightJoinFieldName - ) { - final LogicalSchema leftSchema = selectKey(left, leftJoinFieldName); - final LogicalSchema rightSchema = selectKey(right, rightJoinFieldName); - - return JoinParamsFactory.createSchema(leftSchema, rightSchema); - } - - /** - * Adjust the schema to take into account any change in key columns. - * - * @param source the source node - * @param joinColumnRef the join column - * @return the true source schema after any change of key columns. - */ - private static LogicalSchema selectKey( - final DataSourceNode source, - final ColumnRef joinColumnRef + final PlanNode left, + final PlanNode right ) { - final LogicalSchema sourceSchema = source.getSchema(); - - final Column joinCol = sourceSchema.findColumn(joinColumnRef) - .orElseThrow(() -> new KsqlException("Unknown join column: " + joinColumnRef)); - - if (sourceSchema.key().size() != 1) { - throw new UnsupportedOperationException("Only single key columns supported"); - } - - if (joinCol.namespace() == Namespace.KEY) { - // Join column is only key column, so no change of key columns required: - return sourceSchema; - } - - final Optional keyColumn = source - .getKeyField() - .resolve(sourceSchema); - - if (keyColumn.isPresent() && keyColumn.get().equals(joinCol)) { - // Join column is KEY field, which is an alias for the only key column, so no change of key - // columns required: - return sourceSchema; - } - - // Change of key columns required - - if (source.getDataSourceType() == DataSourceType.KTABLE) { - // Tables do not support rekey: - final String sourceName = source.getDataSource().getName().toString(FormatOptions.noEscape()); - - if (!keyColumn.isPresent()) { - throw new KsqlException( - "Invalid join criteria: Source table (" + sourceName + ") has no key column " - + "defined. Only 'ROWKEY' is supported in the join criteria for a TABLE." - ); - } - - throw new KsqlException( - "Invalid join criteria: Source table " - + "(" + sourceName + ") key column " - + "(" + keyColumn.get().ref().toString(FormatOptions.noEscape()) + ") " - + "is not the column used in the join criteria (" - + joinCol.ref().toString(FormatOptions.noEscape()) + "). " - + "Only the table's key column or 'ROWKEY' is supported in the join criteria " - + "for a TABLE." - ); - } - - return LogicalSchema.builder() - .keyColumn(source.getAlias(), SchemaUtil.ROWKEY_NAME, joinCol.type()) - .valueColumns(sourceSchema.value()) - .build(); + return JoinParamsFactory.createSchema(left.getSchema(), right.getSchema()); } } diff --git a/ksql-engine/src/main/java/io/confluent/ksql/structured/SchemaKStream.java b/ksql-engine/src/main/java/io/confluent/ksql/structured/SchemaKStream.java index 28762d95e2c8..9018192fc8ad 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/structured/SchemaKStream.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/structured/SchemaKStream.java @@ -360,7 +360,7 @@ private KeyField getNewKeyField(final Expression expression) { return getSchema().isMetaColumn(columnRef.name()) ? KeyField.none() : newKeyField; } - private boolean needsRepartition(final Expression expression) { + protected boolean needsRepartition(final Expression expression) { if (!(expression instanceof ColumnReferenceExp)) { return true; } diff --git a/ksql-engine/src/main/java/io/confluent/ksql/structured/SchemaKTable.java b/ksql-engine/src/main/java/io/confluent/ksql/structured/SchemaKTable.java index 68268f839070..b28976cddf73 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/structured/SchemaKTable.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/structured/SchemaKTable.java @@ -42,6 +42,7 @@ import java.util.List; import java.util.Optional; import java.util.Set; +import org.apache.kafka.connect.data.Struct; // CHECKSTYLE_RULES.OFF: ClassDataAbstractionCoupling public class SchemaKTable extends SchemaKStream { @@ -134,6 +135,20 @@ public SchemaKTable select( ); } + @SuppressWarnings("unchecked") + @Override + public SchemaKStream selectKey(final Expression keyExpression, + final Stacker contextStacker) { + if (!needsRepartition(keyExpression)) { + return (SchemaKStream) this; + } + + throw new UnsupportedOperationException("Cannot repartition a TABLE source. " + + "If this is a join, make sure that the criteria uses the TABLE key " + + this.keyField.ref().map(ColumnRef::toString).orElse("ROWKEY") + " instead of " + + keyExpression); + } + @Override public ExecutionStep getSourceStep() { return sourceTableStep; diff --git a/ksql-engine/src/test/java/io/confluent/ksql/analyzer/AnalyzerFunctionalTest.java b/ksql-engine/src/test/java/io/confluent/ksql/analyzer/AnalyzerFunctionalTest.java index 4c8587a61d2c..a6f4ee46f244 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/analyzer/AnalyzerFunctionalTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/analyzer/AnalyzerFunctionalTest.java @@ -38,6 +38,8 @@ import io.confluent.ksql.execution.ddl.commands.KsqlTopic; import io.confluent.ksql.execution.expression.tree.BooleanLiteral; import io.confluent.ksql.execution.expression.tree.ColumnReferenceExp; +import io.confluent.ksql.execution.expression.tree.FunctionCall; +import io.confluent.ksql.execution.expression.tree.IntegerLiteral; import io.confluent.ksql.execution.expression.tree.Literal; import io.confluent.ksql.execution.expression.tree.StringLiteral; import io.confluent.ksql.execution.plan.SelectExpression; @@ -47,6 +49,7 @@ import io.confluent.ksql.metastore.model.KeyField; import io.confluent.ksql.metastore.model.KsqlStream; import io.confluent.ksql.name.ColumnName; +import io.confluent.ksql.name.FunctionName; import io.confluent.ksql.name.SourceName; import io.confluent.ksql.parser.KsqlParser.PreparedStatement; import io.confluent.ksql.parser.properties.with.CreateSourceAsProperties; @@ -171,8 +174,47 @@ public void testSimpleLeftJoinAnalysis() { assertThat(analysis.getFromDataSources().get(1).getAlias(), is(SourceName.of("T2"))); assertThat(analysis.getJoin(), is(not(Optional.empty()))); - assertThat(analysis.getJoin().get().getLeftJoinField(), is(ColumnRef.of(SourceName.of("T1"),ColumnName.of("COL1")))); - assertThat(analysis.getJoin().get().getRightJoinField(), is(ColumnRef.of(SourceName.of("T2"),ColumnName.of("COL1")))); + assertThat(analysis.getJoin().get().getLeftJoinExpression(), is(new ColumnReferenceExp(ColumnRef.of(SourceName.of("T1"),ColumnName.of("COL1"))))); + assertThat(analysis.getJoin().get().getRightJoinExpression(), is(new ColumnReferenceExp(ColumnRef.of(SourceName.of("T2"),ColumnName.of("COL1"))))); + + final List selects = analysis.getSelectExpressions().stream() + .map(SelectExpression::getExpression) + .map(Objects::toString) + .collect(Collectors.toList()); + + assertThat(selects, contains("T1.COL1", "T2.COL1", "T2.COL4", "T1.COL5", "T2.COL2")); + + final List aliases = analysis.getSelectExpressions().stream() + .map(SelectExpression::getAlias) + .collect(Collectors.toList()); + + assertThat(aliases.stream().map(ColumnName::name).collect(Collectors.toList()), + contains("T1_COL1", "T2_COL1", "T2_COL4", "COL5", "T2_COL2")); + } + + @Test + public void testExpressionLeftJoinAnalysis() { + // When: + final Analysis analysis = analyzeQuery( + "SELECT t1.col1, t2.col1, t2.col4, col5, t2.col2 " + + "FROM test1 t1 LEFT JOIN test2 t2 " + + "ON t1.col1 = SUBSTRING(t2.col1, 2) EMIT CHANGES;", jsonMetaStore); + + // Then: + assertThat(analysis.getFromDataSources(), hasSize(2)); + assertThat(analysis.getFromDataSources().get(0).getAlias(), is(SourceName.of("T1"))); + assertThat(analysis.getFromDataSources().get(1).getAlias(), is(SourceName.of("T2"))); + + assertThat(analysis.getJoin(), is(not(Optional.empty()))); + assertThat(analysis.getJoin().get().getLeftJoinExpression(), is(new ColumnReferenceExp(ColumnRef.of(SourceName.of("T1"),ColumnName.of("COL1"))))); + assertThat( + analysis.getJoin().get().getRightJoinExpression(), + is(new FunctionCall( + FunctionName.of("SUBSTRING"), + ImmutableList.of( + new ColumnReferenceExp(ColumnRef.of(SourceName.of("T2"),ColumnName.of("COL1"))), + new IntegerLiteral(2) + )))); final List selects = analysis.getSelectExpressions().stream() .map(SelectExpression::getExpression) @@ -200,8 +242,8 @@ public void shouldHandleJoinOnRowKey() { // Then: assertThat(join, is(not(Optional.empty()))); assertThat(join.get().getType(), is(JoinType.LEFT)); - assertThat(join.get().getLeftJoinField(), is(ColumnRef.of(SourceName.of("T1"),ColumnName.of("ROWKEY")))); - assertThat(join.get().getRightJoinField(), is(ColumnRef.of(SourceName.of("T2"), ColumnName.of("ROWKEY")))); + assertThat(join.get().getLeftJoinExpression(), is(new ColumnReferenceExp(ColumnRef.of(SourceName.of("T1"),ColumnName.of("ROWKEY"))))); + assertThat(join.get().getRightJoinExpression(), is(new ColumnReferenceExp(ColumnRef.of(SourceName.of("T2"), ColumnName.of("ROWKEY"))))); } @Test @@ -504,6 +546,71 @@ public void shouldThrowOnSelfJoin() { analyzer.analyze(query, Optional.of(createStreamAsSelect.getSink())); } + @Test + public void shouldFailOnJoinWithoutSource() { + // Given: + final CreateStreamAsSelect createStreamAsSelect = parseSingle( + "CREATE STREAM FOO AS " + + "SELECT * FROM test1 t1 JOIN test2 t2 ON t1.rowkey = 'foo';" + ); + + final Query query = createStreamAsSelect.getQuery(); + + final Analyzer analyzer = new Analyzer(jsonMetaStore, "", DEFAULT_SERDE_OPTIONS); + + // Expect: + expectedException.expect(KsqlException.class); + expectedException.expectMessage("Invalid comparison expression ''foo'' in join " + + "'(T1.ROWKEY = 'foo')'. Each side of the join comparision must contain references " + + "from exactly one source."); + + // When: + analyzer.analyze(query, Optional.of(createStreamAsSelect.getSink())); + } + + @Test + public void shouldFailOnJoinOnOverlappingSources() { + // Given: + final CreateStreamAsSelect createStreamAsSelect = parseSingle( + "CREATE STREAM FOO AS " + + "SELECT * FROM test1 t1 JOIN test2 t2 ON t1.rowkey + t2.rowkey = t1.rowkey;" + ); + + final Query query = createStreamAsSelect.getQuery(); + + final Analyzer analyzer = new Analyzer(jsonMetaStore, "", DEFAULT_SERDE_OPTIONS); + + // Expect: + expectedException.expect(KsqlException.class); + expectedException.expectMessage("Invalid comparison expression '(T1.ROWKEY + T2.ROWKEY)' in " + + "join '((T1.ROWKEY + T2.ROWKEY) = T1.ROWKEY)'. Each side of the join comparision must " + + "contain references from exactly one source."); + + // When: + analyzer.analyze(query, Optional.of(createStreamAsSelect.getSink())); + } + + @Test + public void shouldFailOnSelfJoinInCondition() { + // Given: + final CreateStreamAsSelect createStreamAsSelect = parseSingle( + "CREATE STREAM FOO AS " + + "SELECT * FROM test1 t1 JOIN test2 t2 ON t1.rowkey = t1.rowkey;" + ); + + final Query query = createStreamAsSelect.getQuery(); + + final Analyzer analyzer = new Analyzer(jsonMetaStore, "", DEFAULT_SERDE_OPTIONS); + + // Expect: + expectedException.expect(KsqlException.class); + expectedException.expectMessage("Each side of the join must reference exactly one source " + + "and not the same source. Left side references `T1` and right references `T1`"); + + // When: + analyzer.analyze(query, Optional.of(createStreamAsSelect.getSink())); + } + @SuppressWarnings("unchecked") private T parseSingle(final String simpleQuery) { return (T) Iterables.getOnlyElement(parse(simpleQuery, jsonMetaStore)); diff --git a/ksql-engine/src/test/java/io/confluent/ksql/analyzer/ExpressionAnalyzerTest.java b/ksql-engine/src/test/java/io/confluent/ksql/analyzer/ExpressionAnalyzerTest.java index a39383f6ef1e..187f9e0b4493 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/analyzer/ExpressionAnalyzerTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/analyzer/ExpressionAnalyzerTest.java @@ -15,11 +15,14 @@ package io.confluent.ksql.analyzer; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Iterables; import io.confluent.ksql.execution.expression.tree.ColumnReferenceExp; import io.confluent.ksql.execution.expression.tree.ComparisonExpression; import io.confluent.ksql.execution.expression.tree.ComparisonExpression.Type; @@ -33,6 +36,7 @@ import java.util.Arrays; import java.util.Set; import java.util.stream.Collectors; +import org.hamcrest.CoreMatchers; import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -129,6 +133,25 @@ public void shouldThrowOnMultipleSources() { analyzer.analyzeExpression(expression, true); } + @Test + public void shouldAddQualifier() { + // Given: + final Expression expression = new ColumnReferenceExp( + ColumnRef.withoutSource(ColumnName.of("else")) + ); + + when(sourceSchemas.sourcesWithField(any())) + .thenReturn(ImmutableSet.of(SourceName.of("something"))); + + // When: + final Set columnRefs = analyzer.analyzeExpression(expression, true); + + // Then: + assertThat( + Iterables.getOnlyElement(columnRefs), + is(ColumnRef.of(SourceName.of("something"), ColumnName.of("else")))); + } + @Test public void shouldThrowOnNoSources() { // Given: diff --git a/ksql-engine/src/test/java/io/confluent/ksql/physical/PhysicalPlanBuilderTest.java b/ksql-engine/src/test/java/io/confluent/ksql/physical/PhysicalPlanBuilderTest.java index 6ef10d5f24d1..eef1046a9c51 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/physical/PhysicalPlanBuilderTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/physical/PhysicalPlanBuilderTest.java @@ -76,11 +76,11 @@ public class PhysicalPlanBuilderTest { + " WITH (KAFKA_TOPIC = 'test3', VALUE_FORMAT = 'JSON', KEY='ID');"; private static final String CREATE_TABLE_TEST4 = "CREATE TABLE TEST4 " - + "(ROWKEY BIGINT KEY, ID BIGINT, COL0 VARCHAR, COL1 DOUBLE) " + + "(ROWKEY BIGINT KEY, ID BIGINT, COL0 BIGINT, COL1 DOUBLE) " + " WITH (KAFKA_TOPIC = 'test4', VALUE_FORMAT = 'JSON', KEY='ID');"; private static final String CREATE_TABLE_TEST5 = "CREATE TABLE TEST5 " - + "(ROWKEY BIGINT KEY, ID BIGINT, COL0 VARCHAR, COL1 DOUBLE) " + + "(ROWKEY BIGINT KEY, ID BIGINT, COL0 BIGINT, COL1 DOUBLE) " + " WITH (KAFKA_TOPIC = 'test5', VALUE_FORMAT = 'JSON', KEY='ID');"; private static final String CREATE_STREAM_TEST6 = "CREATE STREAM TEST6 " @@ -343,9 +343,8 @@ public void shouldThrowIfLeftTableNotJoiningOnTableKey() { // Then: expectedException.expect(KsqlException.class); - expectedException.expectMessage( - "Source table (TEST4) key column (TEST4.ID) is not the column " - + "used in the join criteria (TEST4.COL0)."); + expectedException.expectMessage("Cannot repartition a TABLE source. If this is a join, make " + + "sure that the criteria uses the TABLE key TEST4.ID instead of TEST4.COL0"); // When: execute("CREATE TABLE t1 AS " @@ -361,9 +360,8 @@ public void shouldThrowIfRightTableNotJoiningOnTableKey() { // Then: expectedException.expect(KsqlException.class); - expectedException.expectMessage( - "Source table (TEST5) key column (TEST5.ID) is not the column " - + "used in the join criteria (TEST5.COL0)."); + expectedException.expectMessage("Cannot repartition a TABLE source. If this is a join, make " + + "sure that the criteria uses the TABLE key TEST5.ID instead of TEST5.COL0"); // When: execute("CREATE TABLE t1 AS " diff --git a/ksql-engine/src/test/java/io/confluent/ksql/planner/LogicalPlannerTest.java b/ksql-engine/src/test/java/io/confluent/ksql/planner/LogicalPlannerTest.java index c3d85e440171..e3d29f588810 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/planner/LogicalPlannerTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/planner/LogicalPlannerTest.java @@ -32,6 +32,7 @@ import io.confluent.ksql.planner.plan.JoinNode; import io.confluent.ksql.planner.plan.PlanNode; import io.confluent.ksql.planner.plan.ProjectNode; +import io.confluent.ksql.planner.plan.RepartitionNode; import io.confluent.ksql.schema.ksql.ColumnRef; import io.confluent.ksql.schema.ksql.types.SqlTypes; import io.confluent.ksql.testutils.AnalysisTestUtil; @@ -93,9 +94,9 @@ public void testSimpleLeftJoinLogicalPlan() { assertThat(logicalPlan.getSources().get(0), instanceOf(ProjectNode.class)); assertThat(logicalPlan.getSources().get(0).getSources().get(0), instanceOf(JoinNode.class)); assertThat(logicalPlan.getSources().get(0).getSources().get(0).getSources() - .get(0), instanceOf(DataSourceNode.class)); + .get(0), instanceOf(RepartitionNode.class)); assertThat(logicalPlan.getSources().get(0).getSources().get(0).getSources() - .get(1), instanceOf(DataSourceNode.class)); + .get(1), instanceOf(RepartitionNode.class)); assertThat(logicalPlan.getSchema().value().size(), equalTo(4)); @@ -121,8 +122,8 @@ public void testSimpleLeftJoinFilterLogicalPlan() { assertThat(filterNode.getSources().get(0), instanceOf(JoinNode.class)); final JoinNode joinNode = (JoinNode) filterNode.getSources().get(0); - assertThat(joinNode.getSources().get(0), instanceOf(DataSourceNode.class)); - assertThat(joinNode.getSources().get(1), instanceOf(DataSourceNode.class)); + assertThat(joinNode.getSources().get(0), instanceOf(RepartitionNode.class)); + assertThat(joinNode.getSources().get(1), instanceOf(RepartitionNode.class)); } @Test diff --git a/ksql-engine/src/test/java/io/confluent/ksql/planner/plan/JoinNodeTest.java b/ksql-engine/src/test/java/io/confluent/ksql/planner/plan/JoinNodeTest.java index 3a5ecc64bd94..ed6d6c77ce43 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/planner/plan/JoinNodeTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/planner/plan/JoinNodeTest.java @@ -33,7 +33,6 @@ import io.confluent.ksql.execution.builder.KsqlQueryBuilder; import io.confluent.ksql.execution.context.QueryContext; import io.confluent.ksql.execution.ddl.commands.KsqlTopic; -import io.confluent.ksql.execution.expression.tree.ColumnReferenceExp; import io.confluent.ksql.execution.streams.KSPlanBuilder; import io.confluent.ksql.function.FunctionRegistry; import io.confluent.ksql.function.InternalFunctionRegistry; @@ -172,9 +171,6 @@ public void setUp() { new QueryContext.Stacker() .push(inv.getArgument(0).toString())); - when(left.getDataSourceType()).thenReturn(DataSourceType.KSTREAM); - when(right.getDataSourceType()).thenReturn(DataSourceType.KTABLE); - when(left.getSchema()).thenReturn(LEFT_NODE_SCHEMA); when(right.getSchema()).thenReturn(RIGHT_NODE_SCHEMA); @@ -182,50 +178,11 @@ public void setUp() { when(right.getPartitions(mockKafkaTopicClient)).thenReturn(2); when(left.getKeyField()).thenReturn(KeyField.of(LEFT_JOIN_FIELD_REF)); - when(right.getKeyField()).thenReturn(KeyField.of(RIGHT_JOIN_FIELD_REF)); setUpSource(left, VALUE_FORMAT, leftSource, "Foobar1"); setUpSource(right, OTHER_FORMAT, rightSource, "Foobar2"); } - @Test - public void shouldThrowIfLeftKeyFieldNotInLeftSchema() { - // Then: - expectedException.expect(IllegalArgumentException.class); - expectedException.expectMessage("Invalid join field"); - - // When: - new JoinNode( - nodeId, - Collections.emptyList(), - JoinNode.JoinType.LEFT, - left, - right, - ColumnRef.withoutSource(ColumnName.of("won't find me")), - RIGHT_JOIN_FIELD_REF, - Optional.empty() - ); - } - - @Test - public void shouldThrowIfRightKeyFieldNotInRightSchema() { - // Then: - expectedException.expect(IllegalArgumentException.class); - expectedException.expectMessage("Invalid join field"); - - // When: - new JoinNode( - nodeId, - Collections.emptyList(), - JoinNode.JoinType.LEFT, - left, - right, - LEFT_JOIN_FIELD_REF, - ColumnRef.withoutSource(ColumnName.of("won't find me")), - Optional.empty() - ); - } - @Test public void shouldReturnLeftJoinKeyAsKeyField() { // When: @@ -235,8 +192,6 @@ public void shouldReturnLeftJoinKeyAsKeyField() { JoinType.LEFT, left, right, - LEFT_JOIN_FIELD_REF, - RIGHT_JOIN_FIELD_REF, Optional.empty() ); @@ -275,8 +230,8 @@ public void shouldThrowOnPartitionMismatch() { // Then: expectedException.expect(KsqlException.class); expectedException.expectMessage( - "Can't join TEST1 with TEST2 since the number of partitions don't match. TEST1 " - + "partitions = 1; TEST2 partitions = 2. Please repartition either one so that the " + "Can't join T1 with T2 since the number of partitions don't match. T1 " + + "partitions = 1; T2 partitions = 2. Please repartition either one so that the " + "number of partitions match." ); @@ -299,8 +254,6 @@ public void shouldPerformStreamToStreamLeftJoin() { JoinNode.JoinType.LEFT, left, right, - LEFT_JOIN_FIELD_REF, - RIGHT_JOIN_FIELD_REF, WITHIN_EXPRESSION ); @@ -330,8 +283,6 @@ public void shouldPerformStreamToStreamInnerJoin() { JoinNode.JoinType.INNER, left, right, - LEFT_JOIN_FIELD_REF, - RIGHT_JOIN_FIELD_REF, WITHIN_EXPRESSION ); @@ -361,8 +312,6 @@ public void shouldPerformStreamToStreamOuterJoin() { JoinNode.JoinType.OUTER, left, right, - LEFT_JOIN_FIELD_REF, - RIGHT_JOIN_FIELD_REF, WITHIN_EXPRESSION ); @@ -383,8 +332,8 @@ public void shouldPerformStreamToStreamOuterJoin() { @Test public void shouldNotPerformStreamStreamJoinWithoutJoinWindow() { // Given: - when(left.getDataSourceType()).thenReturn(DataSourceType.KSTREAM); - when(right.getDataSourceType()).thenReturn(DataSourceType.KSTREAM); + when(left.getNodeOutputType()).thenReturn(DataSourceType.KSTREAM); + when(right.getNodeOutputType()).thenReturn(DataSourceType.KSTREAM); final JoinNode joinNode = new JoinNode( nodeId, @@ -392,8 +341,6 @@ public void shouldNotPerformStreamStreamJoinWithoutJoinWindow() { JoinNode.JoinType.INNER, left, right, - LEFT_JOIN_FIELD_REF, - RIGHT_JOIN_FIELD_REF, Optional.empty() ); @@ -410,6 +357,10 @@ public void shouldNotPerformStreamStreamJoinWithoutJoinWindow() { @Test public void shouldNotPerformJoinIfInputPartitionsMisMatch() { // Given: + when(left.getTheSourceNode()).thenReturn(left); + when(right.getTheSourceNode()).thenReturn(right); + when(left.getAlias()).thenReturn(LEFT_ALIAS); + when(right.getAlias()).thenReturn(RIGHT_ALIAS); when(left.getPartitions(mockKafkaTopicClient)).thenReturn(3); final JoinNode joinNode = new JoinNode( @@ -418,81 +369,24 @@ public void shouldNotPerformJoinIfInputPartitionsMisMatch() { JoinNode.JoinType.OUTER, left, right, - LEFT_JOIN_FIELD_REF, - RIGHT_JOIN_FIELD_REF, WITHIN_EXPRESSION ); // Then: expectedException.expect(KsqlException.class); expectedException.expectMessage( - "Can't join Foobar1 with Foobar2 since the number of partitions don't match." + "Can't join left with right since the number of partitions don't match." ); // When: joinNode.buildStream(ksqlStreamBuilder); } - @Test - public void shouldFailJoinIfTableCriteriaColumnIsNotKey() { - // Given: - setupStream(left, leftSchemaKStream); - setupTable(right, rightSchemaKTable); - - final ColumnRef rightCriteriaColumn = - getNonKeyColumn(RIGHT_SOURCE_SCHEMA, RIGHT_ALIAS, RIGHT_JOIN_FIELD_REF); - - // Then: - expectedException.expect(KsqlException.class); - expectedException.expectMessage( - "Source table (Foobar2) key column (right.R1) is not the column used in the join criteria (right.C0). " - + "Only the table's key column or 'ROWKEY' is supported in the join criteria for a TABLE." - ); - - // When: - new JoinNode( - nodeId, - Collections.emptyList(), - JoinType.LEFT, - left, - right, - LEFT_JOIN_FIELD_REF, - rightCriteriaColumn, - Optional.empty() - ); - } - - @Test - public void shouldFailJoinIfTableHasNoKeyAndJoinFieldIsNotRowKey() { - // Given: - setupStream(left, leftSchemaKStream); - setupTable(right, rightSchemaKTable, NO_KEY_FIELD); - - // Then: - expectedException.expect(KsqlException.class); - expectedException.expectMessage( - "Source table (Foobar2) has no key column defined. " - + "Only 'ROWKEY' is supported in the join criteria for a TABLE." - ); - - // When: - new JoinNode( - nodeId, - Collections.emptyList(), - JoinNode.JoinType.LEFT, - left, - right, - LEFT_JOIN_FIELD_REF, - RIGHT_JOIN_FIELD_REF, - Optional.empty() - ); - } - @Test public void shouldHandleJoinIfTableHasNoKeyAndJoinFieldIsRowKey() { // Given: setupStream(left, leftSchemaKStream); - setupTable(right, rightSchemaKTable, NO_KEY_FIELD); + setupTable(right, rightSchemaKTable); final JoinNode joinNode = new JoinNode( nodeId, @@ -500,8 +394,6 @@ public void shouldHandleJoinIfTableHasNoKeyAndJoinFieldIsRowKey() { JoinNode.JoinType.LEFT, left, right, - LEFT_JOIN_FIELD_REF, - ColumnRef.of(SourceName.of("right"), ColumnName.of("ROWKEY")), Optional.empty() ); @@ -529,8 +421,6 @@ public void shouldPerformStreamToTableLeftJoin() { JoinNode.JoinType.LEFT, left, right, - LEFT_JOIN_FIELD_REF, - RIGHT_JOIN_FIELD_REF, Optional.empty() ); @@ -558,8 +448,6 @@ public void shouldPerformStreamToTableInnerJoin() { JoinNode.JoinType.INNER, left, right, - LEFT_JOIN_FIELD_REF, - RIGHT_JOIN_FIELD_REF, Optional.empty() ); @@ -587,8 +475,6 @@ public void shouldNotAllowStreamToTableOuterJoin() { JoinNode.JoinType.OUTER, left, right, - LEFT_JOIN_FIELD_REF, - RIGHT_JOIN_FIELD_REF, Optional.empty() ); @@ -605,8 +491,8 @@ public void shouldNotAllowStreamToTableOuterJoin() { @Test public void shouldNotPerformStreamToTableJoinIfJoinWindowIsSpecified() { // Given: - when(left.getDataSourceType()).thenReturn(DataSourceType.KSTREAM); - when(right.getDataSourceType()).thenReturn(DataSourceType.KTABLE); + when(left.getNodeOutputType()).thenReturn(DataSourceType.KSTREAM); + when(right.getNodeOutputType()).thenReturn(DataSourceType.KTABLE); final WithinExpression withinExpression = new WithinExpression(10, TimeUnit.SECONDS); @@ -616,8 +502,6 @@ public void shouldNotPerformStreamToTableJoinIfJoinWindowIsSpecified() { JoinNode.JoinType.OUTER, left, right, - LEFT_JOIN_FIELD_REF, - RIGHT_JOIN_FIELD_REF, Optional.of(withinExpression) ); @@ -631,64 +515,6 @@ public void shouldNotPerformStreamToTableJoinIfJoinWindowIsSpecified() { joinNode.buildStream(ksqlStreamBuilder); } - @Test - public void shouldFailTableTableJoinIfLeftCriteriaColumnIsNotKey() { - // Given: - setupTable(left, leftSchemaKTable); - setupTable(right, rightSchemaKTable); - - final ColumnRef leftCriteriaColumn = - getNonKeyColumn(LEFT_SOURCE_SCHEMA, LEFT_ALIAS, LEFT_JOIN_FIELD_REF); - - // Then: - expectedException.expect(KsqlException.class); - expectedException.expectMessage( - "Source table (Foobar1) key column (left.C0) is not the column used in the join criteria (left.L1). " - + "Only the table's key column or 'ROWKEY' is supported in the join criteria for a TABLE." - ); - - // When: - new JoinNode( - nodeId, - Collections.emptyList(), - JoinNode.JoinType.LEFT, - left, - right, - leftCriteriaColumn, - RIGHT_JOIN_FIELD_REF, - Optional.empty() - ); - } - - @Test - public void shouldFailTableTableJoinIfRightCriteriaColumnIsNotKey() { - // Given: - setupTable(left, leftSchemaKTable); - setupTable(right, rightSchemaKTable); - - final ColumnRef rightCriteriaColumn = - getNonKeyColumn(RIGHT_SOURCE_SCHEMA, RIGHT_ALIAS, RIGHT_JOIN_FIELD_REF); - - // Then: - expectedException.expect(KsqlException.class); - expectedException.expectMessage( - "Source table (Foobar2) key column (right.R1) is not the column used in the join criteria (right.C0). " - + "Only the table's key column or 'ROWKEY' is supported in the join criteria for a TABLE." - ); - - // When: - new JoinNode( - nodeId, - Collections.emptyList(), - JoinNode.JoinType.LEFT, - left, - right, - LEFT_JOIN_FIELD_REF, - rightCriteriaColumn, - Optional.empty() - ); - } - @Test public void shouldPerformTableToTableInnerJoin() { // Given: @@ -701,8 +527,6 @@ public void shouldPerformTableToTableInnerJoin() { JoinNode.JoinType.INNER, left, right, - LEFT_JOIN_FIELD_REF, - RIGHT_JOIN_FIELD_REF, Optional.empty() ); @@ -728,8 +552,6 @@ public void shouldPerformTableToTableLeftJoin() { JoinNode.JoinType.LEFT, left, right, - LEFT_JOIN_FIELD_REF, - RIGHT_JOIN_FIELD_REF, Optional.empty() ); @@ -755,8 +577,6 @@ public void shouldPerformTableToTableOuterJoin() { JoinNode.JoinType.OUTER, left, right, - LEFT_JOIN_FIELD_REF, - RIGHT_JOIN_FIELD_REF, Optional.empty() ); @@ -773,8 +593,8 @@ public void shouldPerformTableToTableOuterJoin() { @Test public void shouldNotPerformTableToTableJoinIfJoinWindowIsSpecified() { // Given: - when(left.getDataSourceType()).thenReturn(DataSourceType.KTABLE); - when(right.getDataSourceType()).thenReturn(DataSourceType.KTABLE); + when(left.getNodeOutputType()).thenReturn(DataSourceType.KTABLE); + when(right.getNodeOutputType()).thenReturn(DataSourceType.KTABLE); final WithinExpression withinExpression = new WithinExpression(10, TimeUnit.SECONDS); @@ -784,8 +604,6 @@ public void shouldNotPerformTableToTableJoinIfJoinWindowIsSpecified() { JoinNode.JoinType.OUTER, left, right, - LEFT_JOIN_FIELD_REF, - RIGHT_JOIN_FIELD_REF, Optional.of(withinExpression) ); @@ -808,8 +626,6 @@ public void shouldHaveFullyQualifiedJoinSchema() { JoinNode.JoinType.OUTER, left, right, - LEFT_JOIN_FIELD_REF, - RIGHT_JOIN_FIELD_REF, Optional.empty() ); @@ -828,33 +644,6 @@ public void shouldHaveFullyQualifiedJoinSchema() { )); } - @Test - public void shouldSelectLeftKeyField() { - // Given: - setupStream(left, leftSchemaKStream); - setupStream(right, rightSchemaKStream); - - final JoinNode joinNode = new JoinNode( - nodeId, - Collections.emptyList(), - JoinNode.JoinType.OUTER, - left, - right, - LEFT_JOIN_FIELD_REF, - RIGHT_JOIN_FIELD_REF, - WITHIN_EXPRESSION - ); - - // When: - joinNode.buildStream(ksqlStreamBuilder); - - // Then: - verify(leftSchemaKStream).selectKey( - eq(new ColumnReferenceExp(LEFT_JOIN_FIELD_REF)), - any() - ); - } - @Test public void shouldNotUseSourceSerdeOptionsForInternalTopics() { // Given: @@ -867,8 +656,6 @@ public void shouldNotUseSourceSerdeOptionsForInternalTopics() { JoinNode.JoinType.LEFT, left, right, - LEFT_JOIN_FIELD_REF, - RIGHT_JOIN_FIELD_REF, WITHIN_EXPRESSION ); @@ -889,8 +676,6 @@ public void shouldReturnCorrectSchema() { JoinNode.JoinType.LEFT, left, right, - LEFT_JOIN_FIELD_REF, - RIGHT_JOIN_FIELD_REF, WITHIN_EXPRESSION ); @@ -908,17 +693,7 @@ private void setupTable( final SchemaKTable table ) { when(node.buildStream(ksqlStreamBuilder)).thenReturn((SchemaKTable) table); - when(node.getDataSourceType()).thenReturn(DataSourceType.KTABLE); - } - - private void setupTable( - final DataSourceNode node, - final SchemaKTable table, - final Optional keyFieldName - ) { - setupTable(node, table); - - when(node.getKeyField()).thenReturn(KeyField.of(keyFieldName)); + when(node.getNodeOutputType()).thenReturn(DataSourceType.KTABLE); } @SuppressWarnings("unchecked") @@ -927,8 +702,8 @@ private void setupStream( final SchemaKStream stream ) { when(node.buildStream(ksqlStreamBuilder)).thenReturn(stream); - when(stream.selectKey(any(), any())).thenReturn(stream); - when(node.getDataSourceType()).thenReturn(DataSourceType.KSTREAM); + when(node.getTheSourceNode()).thenReturn(node); + when(node.getNodeOutputType()).thenReturn(DataSourceType.KSTREAM); } private void buildJoin() { @@ -1015,7 +790,6 @@ private static void setUpSource( final DataSource dataSource, final String name ) { - when(dataSource.getName()).thenReturn(SourceName.of(name)); when(node.getDataSource()).thenReturn((DataSource)dataSource); final KsqlTopic ksqlTopic = mock(KsqlTopic.class); diff --git a/ksql-engine/src/test/java/io/confluent/ksql/structured/SchemaKStreamTest.java b/ksql-engine/src/test/java/io/confluent/ksql/structured/SchemaKStreamTest.java index 88453d24e37b..e1d89e5e6edf 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/structured/SchemaKStreamTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/structured/SchemaKStreamTest.java @@ -391,6 +391,21 @@ public void shouldHandleSourceWithoutKey() { assertThat(result.getKeyField(), is(KeyField.none())); } + @Test(expected = UnsupportedOperationException.class) + public void shouldFailRepartitionTable() { + // Given: + final PlanNode planNode = givenInitialKStreamOf("SELECT * FROM test2 EMIT CHANGES;"); + final RepartitionNode repartitionNode = new RepartitionNode( + planNode.getId(), + planNode, + schemaKTable.schema, + new ColumnReferenceExp(ColumnRef.withoutSource(ColumnName.of("COL2"))), + KeyField.none()); + + // When: + schemaKTable.selectKey(repartitionNode.getPartitionBy(), childContextStacker); + } + @Test public void testSelectWithExpression() { // Given: diff --git a/ksql-functional-tests/src/test/resources/query-validation-tests/joins.json b/ksql-functional-tests/src/test/resources/query-validation-tests/joins.json index 125764a25f8f..fff0cfac10de 100644 --- a/ksql-functional-tests/src/test/resources/query-validation-tests/joins.json +++ b/ksql-functional-tests/src/test/resources/query-validation-tests/joins.json @@ -1286,7 +1286,7 @@ ], "expectedException": { "type": "io.confluent.ksql.util.KsqlStatementException", - "message": "Source table (NO_KEY) has no key column defined. Only 'ROWKEY' is supported in the join criteria for a TABLE." + "message": "Cannot repartition a TABLE source. If this is a join, make sure that the criteria uses the TABLE key ROWKEY instead of T.ID" } }, { @@ -1534,7 +1534,7 @@ ], "expectedException": { "type": "io.confluent.ksql.util.KsqlStatementException", - "message": "Invalid comparison expression '0' in join '(T.ID = 0)'. Joins must only contain a field comparison." + "message": "Invalid comparison expression '0' in join '(T.ID = 0)'. Each side of the join comparision must contain references from exactly one source." } }, { @@ -1546,176 +1546,113 @@ ], "expectedException": { "type": "io.confluent.ksql.util.KsqlStatementException", - "message": "Invalid comparison expression '0' in join '(0 = T.ID)'. Joins must only contain a field comparison." + "message": "Invalid comparison expression '0' in join '(0 = T.ID)'. Each side of the join comparision must contain references from exactly one source." } }, { - "name": "stream stream left join - invalid join field - contains function", + "name": "stream stream join - contains function", "statements": [ - "CREATE STREAM TEST1 (ID bigint) WITH (kafka_topic='left_topic', value_format='JSON');", - "CREATE STREAM TEST2 (ID bigint) WITH (kafka_topic='right_topic', value_format='JSON');", - "CREATE STREAM LEFT_OUTER_JOIN as SELECT * FROM test1 t left join test2 tt ON t.id = test_udf(tt.id);" + "CREATE STREAM TEST1 (ID varchar) WITH (kafka_topic='left_topic', value_format='JSON');", + "CREATE STREAM TEST2 (ID varchar) WITH (kafka_topic='right_topic', value_format='JSON');", + "CREATE STREAM OUTPUT as SELECT T.ID FROM test1 t join test2 tt WITHIN 30 SECONDS ON t.id = SUBSTRING(tt.id, 2);" ], - "expectedException": { - "type": "io.confluent.ksql.util.KsqlStatementException", - "message": "Invalid comparison expression 'TEST_UDF(TT.ID)' in join '(T.ID = TEST_UDF(TT.ID))'. Joins must only contain a field comparison." - } - }, - { - "name": "stream stream left join - invalid join field - contains CAST", - "statements": [ - "CREATE STREAM TEST1 (ID bigint) WITH (kafka_topic='left_topic', value_format='JSON');", - "CREATE STREAM TEST2 (ID bigint) WITH (kafka_topic='right_topic', value_format='JSON');", - "CREATE STREAM LEFT_OUTER_JOIN as SELECT * FROM test1 t left join test2 tt ON t.id = CAST(tt.id AS BIGINT);" + "inputs": [ + {"topic": "left_topic", "key": "foo", "value": {"id": "foo"}, "timestamp": 0}, + {"topic": "right_topic", "key": "!foo", "value": {"id": "!foo"}, "timestamp": 10} ], - "expectedException": { - "type": "io.confluent.ksql.util.KsqlStatementException", - "message": "Invalid comparison expression 'CAST(TT.ID AS BIGINT)' in join '(T.ID = CAST(TT.ID AS BIGINT))'. Joins must only contain a field comparison." - } + "outputs": [ + {"topic": "OUTPUT", "key": "foo", "value": {"T_ID": "foo"}, "timestamp": 10} + ] }, { - "name": "stream stream left join - invalid join field - contains subscript", + "name": "stream stream join - contains CAST", "statements": [ "CREATE STREAM TEST1 (ID bigint) WITH (kafka_topic='left_topic', value_format='JSON');", - "CREATE STREAM TEST2 (ID bigint) WITH (kafka_topic='right_topic', value_format='JSON');", - "CREATE STREAM LEFT_OUTER_JOIN as SELECT * FROM test1 t left join test2 tt ON t.id = tt.id[0];" + "CREATE STREAM TEST2 (ID int) WITH (kafka_topic='right_topic', value_format='JSON');", + "CREATE STREAM OUTPUT as SELECT t.ID FROM test1 t JOIN test2 tt WITHIN 30 seconds ON t.id = CAST(tt.id AS BIGINT);" ], - "expectedException": { - "type": "io.confluent.ksql.util.KsqlStatementException", - "message": "Invalid comparison expression 'TT.ID[0]' in join '(T.ID = TT.ID[0])'. Joins must only contain a field comparison." - } - }, - { - "name": "stream stream left join - invalid join field - contains subexpression", - "statements": [ - "CREATE STREAM TEST1 (ID bigint) WITH (kafka_topic='left_topic', value_format='JSON');", - "CREATE STREAM TEST2 (ID bigint) WITH (kafka_topic='right_topic', value_format='JSON');", - "CREATE STREAM LEFT_OUTER_JOIN as SELECT * FROM test1 t left join test2 tt ON t.id = (tt.id = 0);" + "inputs": [ + {"topic": "left_topic", "key": "1", "value": {"id": 1}, "timestamp": 10}, + {"topic": "right_topic", "key": "1", "value": {"id": 1}, "timestamp": 10} ], - "expectedException": { - "type": "io.confluent.ksql.util.KsqlStatementException", - "message": "Invalid comparison expression '(TT.ID = 0)' in join '(T.ID = (TT.ID = 0))'. Joins must only contain a field comparison." - } + "outputs": [ + {"topic": "OUTPUT", "key": 1, "value": {"T_ID": 1}, "timestamp": 10} + ] }, { - "name": "stream stream left join - invalid join field - contains arithmetic binary expression", + "name": "stream stream join - contains subscript", "statements": [ "CREATE STREAM TEST1 (ID bigint) WITH (kafka_topic='left_topic', value_format='JSON');", - "CREATE STREAM TEST2 (ID bigint) WITH (kafka_topic='right_topic', value_format='JSON');", - "CREATE STREAM LEFT_OUTER_JOIN as SELECT * FROM test1 t left join test2 tt ON t.id = tt.id + 1;" + "CREATE STREAM TEST2 (ID ARRAY) WITH (kafka_topic='right_topic', value_format='JSON');", + "CREATE STREAM OUTPUT as SELECT T.ID FROM test1 t JOIN test2 tt WITHIN 30 SECONDS ON t.id = tt.id[1];" ], - "expectedException": { - "type": "io.confluent.ksql.util.KsqlStatementException", - "message": "Invalid comparison expression '(TT.ID + 1)' in join '(T.ID = (TT.ID + 1))'. Joins must only contain a field comparison." - } - }, - { - "name": "stream stream left join - invalid join field - contains IS NULL expression", - "statements": [ - "CREATE STREAM TEST1 (ID bigint) WITH (kafka_topic='left_topic', value_format='JSON');", - "CREATE STREAM TEST2 (ID bigint) WITH (kafka_topic='right_topic', value_format='JSON');", - "CREATE STREAM LEFT_OUTER_JOIN as SELECT * FROM test1 t left join test2 tt ON t.id = (tt.id IS NULL);" + "inputs": [ + {"topic": "left_topic", "key": "1", "value": {"id": 1}, "timestamp": 0}, + {"topic": "right_topic", "key": "1", "value": {"id": [1]}, "timestamp": 10} ], - "expectedException": { - "type": "io.confluent.ksql.util.KsqlStatementException", - "message": "Invalid comparison expression '(TT.ID IS NULL)' in join '(T.ID = (TT.ID IS NULL))'. Joins must only contain a field comparison." - } + "outputs": [ + {"topic": "OUTPUT", "key": 1, "value": {"T_ID": 1}, "timestamp": 10} + ] }, { - "name": "stream stream left join - invalid join field - contains IS NOT NULL expression", + "name": "stream stream join - contains arithmetic binary expression", "statements": [ "CREATE STREAM TEST1 (ID bigint) WITH (kafka_topic='left_topic', value_format='JSON');", "CREATE STREAM TEST2 (ID bigint) WITH (kafka_topic='right_topic', value_format='JSON');", - "CREATE STREAM LEFT_OUTER_JOIN as SELECT * FROM test1 t left join test2 tt ON t.id = (tt.id IS NOT NULL);" + "CREATE STREAM OUTPUT as SELECT T.ID FROM test1 t join test2 tt WITHIN 30 seconds ON t.id = tt.id + 1;" ], - "expectedException": { - "type": "io.confluent.ksql.util.KsqlStatementException", - "message": "Invalid comparison expression '(TT.ID IS NOT NULL)' in join '(T.ID = (TT.ID IS NOT NULL))'. Joins must only contain a field comparison." - } - }, - { - "name": "stream stream left join - invalid join field - contains logical binary expression", - "statements": [ - "CREATE STREAM TEST1 (ID bigint) WITH (kafka_topic='left_topic', value_format='JSON');", - "CREATE STREAM TEST2 (ID bigint) WITH (kafka_topic='right_topic', value_format='JSON');", - "CREATE STREAM LEFT_OUTER_JOIN as SELECT * FROM test1 t left join test2 tt ON t.id = (tt.id AND tt.f1);" + "inputs": [ + {"topic": "left_topic", "key": "1", "value": {"id": 1}, "timestamp": 0}, + {"topic": "right_topic", "key": "0", "value": {"id": 0}, "timestamp": 10} ], - "expectedException": { - "type": "io.confluent.ksql.util.KsqlStatementException", - "message": "Invalid comparison expression '(TT.ID AND TT.F1)' in join '(T.ID = (TT.ID AND TT.F1))'. Joins must only contain a field comparison." - } + "outputs": [ + {"topic": "OUTPUT", "key": 1, "value": {"T_ID": 1}, "timestamp": 10} + ] }, { - "name": "stream stream left join - invalid join field - contains not expression", + "name": "stream stream join - contains arithmetic unary expression", "statements": [ "CREATE STREAM TEST1 (ID bigint) WITH (kafka_topic='left_topic', value_format='JSON');", "CREATE STREAM TEST2 (ID bigint) WITH (kafka_topic='right_topic', value_format='JSON');", - "CREATE STREAM LEFT_OUTER_JOIN as SELECT * FROM test1 t left join test2 tt ON t.id = (NOT tt.id);" + "CREATE STREAM OUTPUT as SELECT T.ID FROM test1 t join test2 tt WITHIN 30 seconds ON t.id = -tt.id;" ], - "expectedException": { - "type": "io.confluent.ksql.util.KsqlStatementException", - "message": "Invalid comparison expression '(NOT TT.ID)' in join '(T.ID = (NOT TT.ID))'. Joins must only contain a field comparison." - } - }, - { - "name": "stream stream left join - invalid join field - contains arithmetic unary expression", - "statements": [ - "CREATE STREAM TEST1 (ID bigint) WITH (kafka_topic='left_topic', value_format='JSON');", - "CREATE STREAM TEST2 (ID bigint) WITH (kafka_topic='right_topic', value_format='JSON');", - "CREATE STREAM LEFT_OUTER_JOIN as SELECT * FROM test1 t left join test2 tt ON t.id = -tt.id;" + "inputs": [ + {"topic": "left_topic", "key": "1", "value": {"id": 1}, "timestamp": 0}, + {"topic": "right_topic", "key": "1", "value": {"id": -1}, "timestamp": 10} ], - "expectedException": { - "type": "io.confluent.ksql.util.KsqlStatementException", - "message": "Invalid comparison expression '-TT.ID' in join '(T.ID = -TT.ID)'. Joins must only contain a field comparison." - } + "outputs": [ + {"topic": "OUTPUT", "key": 1, "value": {"T_ID": 1}, "timestamp": 10} + ] }, { - "name": "stream stream left join - invalid join field - contains LIKE expression", + "name": "stream stream join - contains CASE expression", "statements": [ - "CREATE STREAM TEST1 (ID bigint) WITH (kafka_topic='left_topic', value_format='JSON');", - "CREATE STREAM TEST2 (ID bigint) WITH (kafka_topic='right_topic', value_format='JSON');", - "CREATE STREAM LEFT_OUTER_JOIN as SELECT * FROM test1 t left join test2 tt ON t.id = (tt.id LIKE '%x');" + "CREATE STREAM TEST1 (ID int) WITH (kafka_topic='left_topic', value_format='JSON');", + "CREATE STREAM TEST2 (ID int) WITH (kafka_topic='right_topic', value_format='JSON');", + "CREATE STREAM OUTPUT as SELECT T.ID FROM test1 t join test2 tt WITHIN 30 SECONDS ON t.id = (CASE WHEN tt.id = 2 THEN 1 ELSE 3 END);" ], - "expectedException": { - "type": "io.confluent.ksql.util.KsqlStatementException", - "message": "Invalid comparison expression '(TT.ID LIKE '%x')' in join '(T.ID = (TT.ID LIKE '%x'))'. Joins must only contain a field comparison." - } - }, - { - "name": "stream stream left join - invalid join field - contains CASE expression", - "statements": [ - "CREATE STREAM TEST1 (ID bigint) WITH (kafka_topic='left_topic', value_format='JSON');", - "CREATE STREAM TEST2 (ID bigint) WITH (kafka_topic='right_topic', value_format='JSON');", - "CREATE STREAM LEFT_OUTER_JOIN as SELECT * FROM test1 t left join test2 tt ON t.id = (CASE WHEN 1 THEN 2 END);" + "inputs": [ + {"topic": "left_topic", "key": "1", "value": {"id": 1}, "timestamp": 0}, + {"topic": "right_topic", "key": "1", "value": {"id": 2}, "timestamp": 10} ], - "expectedException": { - "type": "io.confluent.ksql.util.KsqlStatementException", - "message": "Invalid comparison expression '(CASE WHEN 1 THEN 2 END)' in join '(T.ID = (CASE WHEN 1 THEN 2 END))'. Joins must only contain a field comparison." - } + "outputs": [ + {"topic": "OUTPUT", "key": 1, "value": {"T_ID": 1}, "timestamp": 10} + ] }, { - "name": "stream stream left join - invalid join field - contains IN expression", + "name": "stream stream join - contains arithmetic unary expression flipped sides", "statements": [ "CREATE STREAM TEST1 (ID bigint) WITH (kafka_topic='left_topic', value_format='JSON');", "CREATE STREAM TEST2 (ID bigint) WITH (kafka_topic='right_topic', value_format='JSON');", - "CREATE STREAM LEFT_OUTER_JOIN as SELECT * FROM test1 t left join test2 tt ON t.id = (tt.id IN (1, 2, 3));" + "CREATE STREAM OUTPUT as SELECT T.ID FROM test1 t join test2 tt WITHIN 30 seconds ON -tt.id = t.id;" ], - "expectedException": { - "type": "io.confluent.ksql.util.KsqlStatementException", - "message": "Invalid comparison expression '(TT.ID IN (1, 2, 3))' in join '(T.ID = (TT.ID IN (1, 2, 3)))'. Joins must only contain a field comparison." - } - }, - { - "name": "stream stream left join - invalid join field - contains BETWEEN expression", - "statements": [ - "CREATE STREAM TEST1 (ID bigint) WITH (kafka_topic='left_topic', value_format='JSON');", - "CREATE STREAM TEST2 (ID bigint) WITH (kafka_topic='right_topic', value_format='JSON');", - "CREATE STREAM LEFT_OUTER_JOIN as SELECT * FROM test1 t left join test2 tt ON t.id = (tt.id BETWEEN 1 AND 3);" + "inputs": [ + {"topic": "left_topic", "key": "1", "value": {"id": 1}, "timestamp": 0}, + {"topic": "right_topic", "key": "1", "value": {"id": -1}, "timestamp": 10} ], - "expectedException": { - "type": "io.confluent.ksql.util.KsqlStatementException", - "message": "Invalid comparison expression '(TT.ID BETWEEN 1 AND 3)' in join '(T.ID = (TT.ID BETWEEN 1 AND 3))'. Joins must only contain a field comparison." - } + "outputs": [ + {"topic": "OUTPUT", "key": 1, "value": {"T_ID": 1}, "timestamp": 10} + ] }, { "name": "stream stream left join - invalid left join expression - field does not exist", @@ -1726,7 +1663,7 @@ ], "expectedException": { "type": "io.confluent.ksql.util.KsqlStatementException", - "message": "Invalid join criteria (T.IID = TT.ID). Column T.IID does not exist." + "message": "Column 'T.IID' cannot be resolved." } }, { @@ -1738,7 +1675,7 @@ ], "expectedException": { "type": "io.confluent.ksql.util.KsqlStatementException", - "message": "Invalid join criteria (T.ID = TT.IID). Column TT.IID does not exist." + "message": "Column 'TT.IID' cannot be resolved." } }, { @@ -1836,7 +1773,7 @@ ], "expectedException": { "type": "io.confluent.ksql.util.KsqlStatementException", - "message": "Invalid join criteria: Source table (INPUT_TABLE) has no key column defined. Only 'ROWKEY' is supported in the join criteria for a TABLE." + "message": "Cannot repartition a TABLE source. If this is a join, make sure that the criteria uses the TABLE key ROWKEY instead of T.ID" } }, {