From 2bc15ddeec9938e3613cce20d1dbbd7af120c899 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sergio=20Pe=C3=B1a?= Date: Tue, 18 Aug 2020 10:52:28 -0500 Subject: [PATCH] fix: allow implicit cast of numbers literals to decimals on insert/select (#6005) --- .../io/confluent/ksql/schema/ksql/Column.java | 22 ++- .../ksql/schema/ksql/LogicalSchema.java | 21 +++ .../io/confluent/ksql/util/DecimalUtil.java | 15 ++ .../ksql/schema/ksql/LogicalSchemaTest.java | 66 ++++++++ .../confluent/ksql/util/DecimalUtilTest.java | 52 +++++++ .../confluent/ksql/engine/EngineExecutor.java | 2 +- .../ksql/planner/LogicalPlanner.java | 37 +++-- .../ksql/planner/plan/FinalProjectNode.java | 33 ++-- .../planner/plan/ImplicitlyCastResolver.java | 69 +++++++++ .../ksql/planner/plan/SelectionUtil.java | 24 ++- .../planner/plan/FinalProjectNodeTest.java | 27 ++-- .../plan/ImplicitlyCastResolverTest.java | 105 +++++++++++++ .../6.1.0_1597343542697/plan.json | 141 ++++++++++++++++++ .../6.1.0_1597343542697/spec.json | 114 ++++++++++++++ .../6.1.0_1597343542697/topology | 13 ++ .../query-validation-tests/insert-into.json | 14 ++ 16 files changed, 704 insertions(+), 51 deletions(-) create mode 100644 ksqldb-engine/src/main/java/io/confluent/ksql/planner/plan/ImplicitlyCastResolver.java create mode 100644 ksqldb-engine/src/test/java/io/confluent/ksql/planner/plan/ImplicitlyCastResolverTest.java create mode 100644 ksqldb-functional-tests/src/test/resources/historical_plans/insert-into_-_implicitly_casts/6.1.0_1597343542697/plan.json create mode 100644 ksqldb-functional-tests/src/test/resources/historical_plans/insert-into_-_implicitly_casts/6.1.0_1597343542697/spec.json create mode 100644 ksqldb-functional-tests/src/test/resources/historical_plans/insert-into_-_implicitly_casts/6.1.0_1597343542697/topology diff --git a/ksqldb-common/src/main/java/io/confluent/ksql/schema/ksql/Column.java b/ksqldb-common/src/main/java/io/confluent/ksql/schema/ksql/Column.java index 674fc4f0afc4..6fbe040ae8ae 100644 --- a/ksqldb-common/src/main/java/io/confluent/ksql/schema/ksql/Column.java +++ b/ksqldb-common/src/main/java/io/confluent/ksql/schema/ksql/Column.java @@ -17,8 +17,11 @@ import com.google.errorprone.annotations.Immutable; import io.confluent.ksql.name.ColumnName; +import io.confluent.ksql.schema.ksql.types.SqlDecimal; import io.confluent.ksql.schema.ksql.types.SqlType; import io.confluent.ksql.schema.utils.FormatOptions; +import io.confluent.ksql.util.DecimalUtil; + import java.util.Objects; /** @@ -111,6 +114,20 @@ public int index() { return index; } + public boolean canImplicitlyCast(final SqlType toType) { + if (type instanceof SqlDecimal && toType instanceof SqlDecimal) { + return DecimalUtil.canImplicitlyCast((SqlDecimal)type, (SqlDecimal)toType); + } + + return type.equals(toType); + } + + public boolean equalsIgnoreType(final Column that) { + return Objects.equals(index, that.index) + && Objects.equals(namespace, that.namespace) + && Objects.equals(name, that.name); + } + @Override public boolean equals(final Object o) { if (this == o) { @@ -120,10 +137,7 @@ public boolean equals(final Object o) { return false; } final Column that = (Column) o; - return Objects.equals(index, that.index) - && Objects.equals(namespace, that.namespace) - && Objects.equals(type, that.type) - && Objects.equals(name, that.name); + return equalsIgnoreType(that) && Objects.equals(type, that.type); } @Override diff --git a/ksqldb-common/src/main/java/io/confluent/ksql/schema/ksql/LogicalSchema.java b/ksqldb-common/src/main/java/io/confluent/ksql/schema/ksql/LogicalSchema.java index 7d198831501a..7c402c63d1ba 100644 --- a/ksqldb-common/src/main/java/io/confluent/ksql/schema/ksql/LogicalSchema.java +++ b/ksqldb-common/src/main/java/io/confluent/ksql/schema/ksql/LogicalSchema.java @@ -159,6 +159,27 @@ public boolean isKeyColumn(final ColumnName columnName) { .isPresent(); } + /** + * Returns True if this schema is compatible with {@code other} schema. + */ + public boolean compatibleSchema(final LogicalSchema other) { + if (columns().size() != other.columns().size()) { + return false; + } + + for (int i = 0; i < columns().size(); i++) { + final Column s1Column = columns().get(i); + final Column s2Column = other.columns().get(i); + final SqlType s2Type = s2Column.type(); + + if (!s1Column.equalsIgnoreType(s2Column) || !s1Column.canImplicitlyCast(s2Type)) { + return false; + } + } + + return true; + } + @Override public boolean equals(final Object o) { if (this == o) { diff --git a/ksqldb-common/src/main/java/io/confluent/ksql/util/DecimalUtil.java b/ksqldb-common/src/main/java/io/confluent/ksql/util/DecimalUtil.java index a9553913d0ef..a5d012b2df89 100644 --- a/ksqldb-common/src/main/java/io/confluent/ksql/util/DecimalUtil.java +++ b/ksqldb-common/src/main/java/io/confluent/ksql/util/DecimalUtil.java @@ -188,6 +188,21 @@ public static SqlDecimal toSqlDecimal(final SqlType schema) { } } + /** + * Returns True if {@code s1} can be implicitly cast to {@code s2}. + *

+ * A decimal {@code s1} can be implicitly cast if precision/scale fits into the {@code s2} + * precision/scale. + * + */ + public static boolean canImplicitlyCast(final SqlDecimal s1, final SqlDecimal s2) { + return s1.getScale() <= s2.getScale() + && (s1.getPrecision() - s1.getScale()) <= (s2.getPrecision() - s2.getScale()); + } + public static BigDecimal cast(final long value, final int precision, final int scale) { validateParameters(precision, scale); final BigDecimal decimal = new BigDecimal(value, new MathContext(precision)); diff --git a/ksqldb-common/src/test/java/io/confluent/ksql/schema/ksql/LogicalSchemaTest.java b/ksqldb-common/src/test/java/io/confluent/ksql/schema/ksql/LogicalSchemaTest.java index 0138b95dff6c..ac0a5a8f32e0 100644 --- a/ksqldb-common/src/test/java/io/confluent/ksql/schema/ksql/LogicalSchemaTest.java +++ b/ksqldb-common/src/test/java/io/confluent/ksql/schema/ksql/LogicalSchemaTest.java @@ -37,6 +37,7 @@ import io.confluent.ksql.name.ColumnName; import io.confluent.ksql.schema.ksql.Column.Namespace; import io.confluent.ksql.schema.ksql.LogicalSchema.Builder; +import io.confluent.ksql.schema.ksql.types.SqlDecimal; import io.confluent.ksql.schema.ksql.types.SqlTypes; import io.confluent.ksql.schema.utils.FormatOptions; import io.confluent.ksql.util.KsqlException; @@ -746,4 +747,69 @@ private static org.apache.kafka.connect.data.Field connectField( ) { return new org.apache.kafka.connect.data.Field(fieldName, index, schema); } + + @Test + public void shouldSchemaNoCompatibleWithDifferentSizes() { + // Given: + final LogicalSchema schema = LogicalSchema.builder() + .valueColumn(F0, STRING) + .valueColumn(F1, BIGINT) + .build(); + final LogicalSchema otherSchema = LogicalSchema.builder() + .valueColumn(F0, STRING) + .valueColumn(F1, BIGINT) + .valueColumn(V1, BIGINT) + .build(); + + // Then: + assertThat(schema.compatibleSchema(otherSchema), is(false)); + } + + @Test + public void shouldSchemaNoCompatibleOnDifferentColumnName() { + // Given: + final LogicalSchema schema = LogicalSchema.builder() + .valueColumn(F0, STRING) + .valueColumn(F1, BIGINT) + .build(); + final LogicalSchema otherSchema = LogicalSchema.builder() + .valueColumn(F0, STRING) + .valueColumn(V1, BIGINT) + .build(); + + // Then: + assertThat(schema.compatibleSchema(otherSchema), is(false)); + } + + @Test + public void shouldSchemaNoCompatibleWhenCannotCastType() { + // Given: + final LogicalSchema schema = LogicalSchema.builder() + .valueColumn(F0, STRING) + .valueColumn(F1, BIGINT) + .build(); + final LogicalSchema otherSchema = LogicalSchema.builder() + .valueColumn(F0, STRING) + .valueColumn(F1, INTEGER) + .build(); + + // Then: + assertThat(schema.compatibleSchema(otherSchema), is(false)); + } + + @Test + public void shouldSchemaCompatibleWithImplicitlyCastType() { + // Given: + final LogicalSchema schema = LogicalSchema.builder() + .valueColumn(F0, STRING) + .valueColumn(F1, SqlDecimal.of(5, 2)) + .build(); + final LogicalSchema otherSchema = LogicalSchema.builder() + .valueColumn(F0, STRING) + .valueColumn(F1, SqlDecimal.of(6, 3)) + .build(); + + // Then: + assertThat(schema.compatibleSchema(otherSchema), is(true)); + } } diff --git a/ksqldb-common/src/test/java/io/confluent/ksql/util/DecimalUtilTest.java b/ksqldb-common/src/test/java/io/confluent/ksql/util/DecimalUtilTest.java index c4666376c9b4..ade75aa34c23 100644 --- a/ksqldb-common/src/test/java/io/confluent/ksql/util/DecimalUtilTest.java +++ b/ksqldb-common/src/test/java/io/confluent/ksql/util/DecimalUtilTest.java @@ -510,4 +510,56 @@ public void shouldNotCastStringNonNumber() { () -> cast("abc", 2, 1) ); } + + @Test + public void shouldAllowImplicitlyCastOnEqualSchema() { + // Given: + final SqlDecimal s1 = SqlTypes.decimal(5, 2); + final SqlDecimal s2 = SqlTypes.decimal(5, 2); + + // When: + final boolean compatible = DecimalUtil.canImplicitlyCast(s1, s2); + + // Then: + assertThat(compatible, is(true)); + } + + @Test + public void shouldAllowImplicitlyCastOnHigherPrecisionAndScale() { + // Given: + final SqlDecimal s1 = SqlTypes.decimal(5, 2); + final SqlDecimal s2 = SqlTypes.decimal(6, 3); + + // When: + final boolean compatible = DecimalUtil.canImplicitlyCast(s1, s2); + + // Then: + assertThat(compatible, is(true)); + } + + @Test + public void shouldAllowImplicitlyCastOnHigherScale() { + // Given: + final SqlDecimal s1 = SqlTypes.decimal(2, 1); + final SqlDecimal s2 = SqlTypes.decimal(2, 2); + + // When: + final boolean compatible = DecimalUtil.canImplicitlyCast(s1, s2); + + // Then: + assertThat(compatible, is(false)); + } + + @Test + public void shouldAllowImplicitlyCastOnLowerPrecision() { + // Given: + final SqlDecimal s1 = SqlTypes.decimal(2, 1); + final SqlDecimal s2 = SqlTypes.decimal(1, 1); + + // When: + final boolean compatible = DecimalUtil.canImplicitlyCast(s1, s2); + + // Then: + assertThat(compatible, is(false)); + } } \ No newline at end of file diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/engine/EngineExecutor.java b/ksqldb-engine/src/main/java/io/confluent/ksql/engine/EngineExecutor.java index 3cf66e4ff522..35c7d1dab7bc 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/engine/EngineExecutor.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/engine/EngineExecutor.java @@ -305,7 +305,7 @@ private void validateExistingSink( final LogicalSchema resultSchema = outputNode.getSchema(); final LogicalSchema existingSchema = existing.getSchema(); - if (!resultSchema.equals(existingSchema)) { + if (!resultSchema.compatibleSchema(existingSchema)) { throw new KsqlException("Incompatible schema between results and sink." + System.lineSeparator() + "Result schema is " + resultSchema diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/planner/LogicalPlanner.java b/ksqldb-engine/src/main/java/io/confluent/ksql/planner/LogicalPlanner.java index f7abf3702ec8..36956cfba81b 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/planner/LogicalPlanner.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/planner/LogicalPlanner.java @@ -37,8 +37,8 @@ import io.confluent.ksql.execution.streams.timestamp.TimestampExtractionPolicyFactory; import io.confluent.ksql.execution.timestamp.TimestampColumn; import io.confluent.ksql.execution.util.ExpressionTypeManager; -import io.confluent.ksql.function.FunctionRegistry; import io.confluent.ksql.function.udf.AsValue; +import io.confluent.ksql.metastore.MetaStore; import io.confluent.ksql.name.ColumnName; import io.confluent.ksql.name.SourceName; import io.confluent.ksql.parser.NodeLocation; @@ -93,21 +93,21 @@ public class LogicalPlanner { private final KsqlConfig ksqlConfig; private final RewrittenAnalysis analysis; - private final FunctionRegistry functionRegistry; + private final MetaStore metaStore; private final AggregateAnalyzer aggregateAnalyzer; private final ColumnReferenceRewriter refRewriter; public LogicalPlanner( final KsqlConfig ksqlConfig, final ImmutableAnalysis analysis, - final FunctionRegistry functionRegistry + final MetaStore metaStore ) { this.ksqlConfig = Objects.requireNonNull(ksqlConfig, "ksqlConfig"); this.refRewriter = new ColumnReferenceRewriter(analysis.getFromSourceSchemas(false).isJoin()); this.analysis = new RewrittenAnalysis(analysis, refRewriter::process); - this.functionRegistry = Objects.requireNonNull(functionRegistry, "functionRegistry"); - this.aggregateAnalyzer = new AggregateAnalyzer(functionRegistry); + this.metaStore = Objects.requireNonNull(metaStore, "metaStore"); + this.aggregateAnalyzer = new AggregateAnalyzer(metaStore); } // CHECKSTYLE_RULES.OFF: CyclomaticComplexity @@ -227,13 +227,20 @@ private Optional getTimestampColumn( return timestampColumn; } + private Optional getTargetSchema() { + return analysis.getInto().filter(i -> !i.isCreate()) + .map(i -> metaStore.getSource(i.getName())) + .map(target -> target.getSchema()); + } + private AggregateNode buildAggregateNode(final PlanNode sourcePlanNode) { final GroupBy groupBy = analysis.getGroupBy() .orElseThrow(IllegalStateException::new); final List projectionExpressions = SelectionUtil.buildSelectExpressions( sourcePlanNode, - analysis.getSelectItems() + analysis.getSelectItems(), + getTargetSchema() ); final LogicalSchema schema = @@ -247,7 +254,7 @@ private AggregateNode buildAggregateNode(final PlanNode sourcePlanNode) { if (analysis.getHavingExpression().isPresent()) { final FilterTypeValidator validator = new FilterTypeValidator( sourcePlanNode.getSchema(), - functionRegistry, + metaStore, FilterType.HAVING); validator.validateFilterExpression(analysis.getHavingExpression().get()); @@ -258,7 +265,7 @@ private AggregateNode buildAggregateNode(final PlanNode sourcePlanNode) { sourcePlanNode, schema, groupBy, - functionRegistry, + metaStore, analysis, aggregateAnalysis, projectionExpressions, @@ -271,8 +278,8 @@ private ProjectNode buildUserProjectNode(final PlanNode parentNode) { new PlanNodeId("Project"), parentNode, analysis.getSelectItems(), - analysis.getInto().isPresent(), - functionRegistry + analysis.getInto(), + metaStore ); } @@ -294,7 +301,7 @@ private FilterNode buildFilterNode( ) { final FilterTypeValidator validator = new FilterTypeValidator( sourcePlanNode.getSchema(), - functionRegistry, + metaStore, FilterType.WHERE); validator.validateFilterExpression(filterExpression); @@ -342,7 +349,7 @@ private RepartitionNode buildInternalRepartitionNode( } private FlatMapNode buildFlatMapNode(final PlanNode sourcePlanNode) { - return new FlatMapNode(new PlanNodeId("FlatMap"), sourcePlanNode, functionRegistry, analysis); + return new FlatMapNode(new PlanNodeId("FlatMap"), sourcePlanNode, metaStore, analysis); } private PlanNode buildSourceForJoin( @@ -511,7 +518,7 @@ private LogicalSchema buildAggregateSchema( sourceSchema .withPseudoAndKeyColsInValue(analysis.getWindowExpression().isPresent()), projectionExpressions, - functionRegistry + metaStore ); final List groupByExps = groupBy.getGroupingExpressions(); @@ -555,7 +562,7 @@ private LogicalSchema buildAggregateSchema( ); } else { final ExpressionTypeManager typeManager = - new ExpressionTypeManager(sourceSchema, functionRegistry); + new ExpressionTypeManager(sourceSchema, metaStore); final Expression expression = groupByExps.get(0); @@ -607,7 +614,7 @@ private LogicalSchema buildRepartitionedSchema( return PartitionByParamsFactory.buildSchema( sourceSchema, partitionBy, - functionRegistry + metaStore ); } diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/planner/plan/FinalProjectNode.java b/ksqldb-engine/src/main/java/io/confluent/ksql/planner/plan/FinalProjectNode.java index e213aa5ec901..f0ab7783e396 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/planner/plan/FinalProjectNode.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/planner/plan/FinalProjectNode.java @@ -16,11 +16,12 @@ package io.confluent.ksql.planner.plan; import com.google.common.collect.ImmutableList; +import io.confluent.ksql.analyzer.Analysis; import io.confluent.ksql.execution.expression.tree.ColumnReferenceExp; import io.confluent.ksql.execution.expression.tree.UnqualifiedColumnReferenceExp; import io.confluent.ksql.execution.plan.SelectExpression; -import io.confluent.ksql.function.FunctionRegistry; import io.confluent.ksql.function.udf.AsValue; +import io.confluent.ksql.metastore.MetaStore; import io.confluent.ksql.name.ColumnName; import io.confluent.ksql.name.SourceName; import io.confluent.ksql.parser.NodeLocation; @@ -35,6 +36,7 @@ import io.confluent.ksql.util.KsqlException; import io.confluent.ksql.util.Pair; import java.util.List; +import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; @@ -47,7 +49,7 @@ public class FinalProjectNode extends ProjectNode implements VerifiableNode { private final Projection projection; - private final boolean persistent; + private final Optional into; private final LogicalSchema schema; private final ImmutableList selectExpressions; @@ -55,14 +57,14 @@ public FinalProjectNode( final PlanNodeId id, final PlanNode source, final List selectItems, - final boolean persistent, - final FunctionRegistry functionRegistry + final Optional into, + final MetaStore metaStore ) { super(id, source); this.projection = Projection.of(selectItems); - this.persistent = persistent; + this.into = into; - final Pair> result = build(functionRegistry); + final Pair> result = build(metaStore); this.schema = result.left; this.selectExpressions = ImmutableList.copyOf(result.right); @@ -84,18 +86,23 @@ public void validateKeyPresent(final SourceName sinkName) { getSource().validateKeyPresent(sinkName, projection); } - private Pair> build( - final FunctionRegistry functionRegistry - ) { + private Optional getTargetSchema(final MetaStore metaStore) { + return into.filter(i -> !i.isCreate()) + .map(i -> metaStore.getSource(i.getName())) + .map(target -> target.getSchema()); + } + + private Pair> build(final MetaStore metaStore) { final LogicalSchema parentSchema = getSource().getSchema(); + final Optional targetSchema = getTargetSchema(metaStore); final List selectExpressions = SelectionUtil - .buildSelectExpressions(getSource(), projection.selectItems()); + .buildSelectExpressions(getSource(), projection.selectItems(), targetSchema); final LogicalSchema schema = - SelectionUtil.buildProjectionSchema(parentSchema, selectExpressions, functionRegistry); + SelectionUtil.buildProjectionSchema(parentSchema, selectExpressions, metaStore); - if (persistent) { + if (into.isPresent()) { // Persistent queries have key columns as key columns - so final projection can exclude them: selectExpressions.removeIf(se -> { if (se.getExpression() instanceof UnqualifiedColumnReferenceExp) { @@ -114,7 +121,7 @@ private Pair> build( } final LogicalSchema nodeSchema; - if (persistent) { + if (into.isPresent()) { nodeSchema = schema.withoutPseudoAndKeyColsInValue(); } else { // Transient queries return key columns in the value, so the projection includes them, and diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/planner/plan/ImplicitlyCastResolver.java b/ksqldb-engine/src/main/java/io/confluent/ksql/planner/plan/ImplicitlyCastResolver.java new file mode 100644 index 000000000000..3728ecfe0d30 --- /dev/null +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/planner/plan/ImplicitlyCastResolver.java @@ -0,0 +1,69 @@ +/* + * Copyright 2020 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.planner.plan; + +import io.confluent.ksql.execution.expression.tree.DecimalLiteral; +import io.confluent.ksql.execution.expression.tree.DoubleLiteral; +import io.confluent.ksql.execution.expression.tree.Expression; +import io.confluent.ksql.execution.expression.tree.IntegerLiteral; +import io.confluent.ksql.execution.expression.tree.LongLiteral; +import io.confluent.ksql.schema.ksql.types.SqlDecimal; +import io.confluent.ksql.schema.ksql.types.SqlType; +import io.confluent.ksql.util.DecimalUtil; + +import java.math.BigDecimal; + +public final class ImplicitlyCastResolver { + private ImplicitlyCastResolver() {} + + public static Expression resolve(final Expression expression, final SqlType sqlType) { + if (sqlType instanceof SqlDecimal) { + return resolveToDecimal(expression, (SqlDecimal)sqlType); + } + + return expression; + } + + @SuppressWarnings("CyclomaticComplexiIntegerLiteralty") + private static Expression resolveToDecimal( + final Expression expression, + final SqlDecimal toDecimalType + ) { + final BigDecimal literalValue; + + if (expression instanceof IntegerLiteral) { + literalValue = BigDecimal.valueOf(((IntegerLiteral) expression).getValue()); + } else if (expression instanceof LongLiteral) { + literalValue = BigDecimal.valueOf(((LongLiteral) expression).getValue()); + } else if (expression instanceof DoubleLiteral) { + literalValue = BigDecimal.valueOf(((DoubleLiteral) expression).getValue()); + } else if (expression instanceof DecimalLiteral) { + literalValue = ((DecimalLiteral) expression).getValue(); + } else { + return expression; + } + + final SqlDecimal fromDecimalType = (SqlDecimal) DecimalUtil.fromValue(literalValue); + if (DecimalUtil.canImplicitlyCast(fromDecimalType, toDecimalType)) { + return new DecimalLiteral( + expression.getLocation(), + DecimalUtil.cast(literalValue, toDecimalType.getPrecision(), toDecimalType.getScale()) + ); + } + + return expression; + } +} diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/planner/plan/SelectionUtil.java b/ksqldb-engine/src/main/java/io/confluent/ksql/planner/plan/SelectionUtil.java index 415a76935f78..8ba8d657736f 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/planner/plan/SelectionUtil.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/planner/plan/SelectionUtil.java @@ -26,10 +26,12 @@ import io.confluent.ksql.parser.tree.AllColumns; import io.confluent.ksql.parser.tree.SelectItem; import io.confluent.ksql.parser.tree.SingleColumn; +import io.confluent.ksql.schema.ksql.Column; import io.confluent.ksql.schema.ksql.LogicalSchema; import io.confluent.ksql.schema.ksql.LogicalSchema.Builder; import io.confluent.ksql.schema.ksql.types.SqlType; import java.util.List; +import java.util.Optional; import java.util.stream.Collectors; import java.util.stream.IntStream; import java.util.stream.Stream; @@ -75,24 +77,28 @@ public static LogicalSchema buildProjectionSchema( public static List buildSelectExpressions( final PlanNode parentNode, - final List selectItems + final List selectItems, + final Optional targetSchema ) { return IntStream.range(0, selectItems.size()) .boxed() - .flatMap(idx -> resolveSelectItem(idx, selectItems, parentNode)) + .flatMap(idx -> resolveSelectItem(idx, selectItems, parentNode, targetSchema)) .collect(Collectors.toList()); } private static Stream resolveSelectItem( final int idx, final List selectItems, - final PlanNode parentNode + final PlanNode parentNode, + final Optional targetSchema ) { final SelectItem selectItem = selectItems.get(idx); if (selectItem instanceof SingleColumn) { final SingleColumn column = (SingleColumn) selectItem; - return resolveSingleColumn(idx, parentNode, column); + final Optional targetColumn = targetSchema.map(schema -> schema.columns().get(idx)); + + return resolveSingleColumn(idx, parentNode, column, targetColumn); } if (selectItem instanceof AllColumns) { @@ -106,13 +112,19 @@ private static Stream resolveSelectItem( private static Stream resolveSingleColumn( final int idx, final PlanNode parentNode, - final SingleColumn column + final SingleColumn column, + final Optional targetColumn ) { final Expression expression = parentNode.resolveSelect(idx, column.getExpression()); final ColumnName alias = column.getAlias() .orElseThrow(() -> new IllegalStateException("Alias should be present by this point")); - return Stream.of(SelectExpression.of(alias, expression)); + return Stream.of(SelectExpression.of( + alias, + targetColumn + .map(col -> ImplicitlyCastResolver.resolve(expression, col.type())) + .orElse(expression)) + ); } private static Stream resolveAllColumns( diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/planner/plan/FinalProjectNodeTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/planner/plan/FinalProjectNodeTest.java index b8bee4066381..d1822703a315 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/planner/plan/FinalProjectNodeTest.java +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/planner/plan/FinalProjectNodeTest.java @@ -27,8 +27,9 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; +import io.confluent.ksql.analyzer.Analysis; import io.confluent.ksql.execution.expression.tree.UnqualifiedColumnReferenceExp; -import io.confluent.ksql.function.FunctionRegistry; +import io.confluent.ksql.metastore.MetaStore; import io.confluent.ksql.metastore.model.DataSource.DataSourceType; import io.confluent.ksql.name.ColumnName; import io.confluent.ksql.name.SourceName; @@ -66,7 +67,9 @@ public class FinalProjectNodeTest { @Mock private PlanNode source; @Mock - private FunctionRegistry functionRegistry; + private MetaStore metaStore; + @Mock + private Analysis.Into into; private List selects; private FinalProjectNode projectNode; @@ -88,8 +91,8 @@ public void setUp() { NODE_ID, source, selects, - true, - functionRegistry); + Optional.of(into), + metaStore); } @Test @@ -116,8 +119,8 @@ public void shouldNotThrowOnSyntheticKeyColumnInProjection() { NODE_ID, source, selects, - true, - functionRegistry); + Optional.of(into), + metaStore); // Then: verify(source).validateColumns(RequiredColumns.builder().add(syntheticKeyRef).build()); @@ -142,8 +145,8 @@ public void shouldThrowOnUnknownSyntheticKeyLikeColumnInProjection() { NODE_ID, source, selects, - true, - functionRegistry) + Optional.of(into), + metaStore) ); // Then: @@ -162,8 +165,8 @@ public void shouldThrowOnValidateIfSchemaHasNoValueColumns() { NODE_ID, source, selects, - true, - functionRegistry + Optional.of(into), + metaStore ) ); @@ -186,8 +189,8 @@ public void shouldThrowOnValidateIfMultipleKeyColumns() { NODE_ID, source, selects, - true, - functionRegistry + Optional.of(into), + metaStore ) ); diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/planner/plan/ImplicitlyCastResolverTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/planner/plan/ImplicitlyCastResolverTest.java new file mode 100644 index 000000000000..49a78ca7d971 --- /dev/null +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/planner/plan/ImplicitlyCastResolverTest.java @@ -0,0 +1,105 @@ +/* + * Copyright 2020 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.planner.plan; + +import static org.hamcrest.CoreMatchers.instanceOf; +import static org.hamcrest.CoreMatchers.is; + +import com.google.common.collect.ImmutableMap; +import io.confluent.ksql.execution.expression.tree.BooleanLiteral; +import io.confluent.ksql.execution.expression.tree.DecimalLiteral; +import io.confluent.ksql.execution.expression.tree.DoubleLiteral; +import io.confluent.ksql.execution.expression.tree.Expression; +import io.confluent.ksql.execution.expression.tree.IntegerLiteral; +import io.confluent.ksql.execution.expression.tree.Literal; +import io.confluent.ksql.execution.expression.tree.LongLiteral; +import io.confluent.ksql.execution.expression.tree.StringLiteral; +import io.confluent.ksql.schema.ksql.types.SqlDecimal; +import io.confluent.ksql.schema.ksql.types.SqlTypes; +import org.junit.Test; + +import java.math.BigDecimal; +import java.util.Arrays; +import java.util.List; +import java.util.Map; + +import static org.hamcrest.MatcherAssert.assertThat; + +public class ImplicitlyCastResolverTest { + private static final SqlDecimal DECIMAL_5_2 = SqlDecimal.of(5, 2); + + @Test + public void shouldNotResolveNonDecimalTarget() { + // When + final Expression expression = + ImplicitlyCastResolver.resolve(new IntegerLiteral(5), SqlTypes.STRING); + + // Then + assertThat(expression, instanceOf(IntegerLiteral.class)); + assertThat(((IntegerLiteral)expression).getValue(), is(5)); + } + + @Test + public void shouldCastToDecimal() { + // Given + final Map fromLiterals = ImmutableMap.of( + new IntegerLiteral(5), new BigDecimal("5.00"), + new LongLiteral(5), new BigDecimal("5.00"), + new DoubleLiteral(5), new BigDecimal("5.00"), + new DecimalLiteral(BigDecimal.TEN), new BigDecimal("10.00"), + new DecimalLiteral(new BigDecimal("10.1")), new BigDecimal("10.10") + ); + + for (final Map.Entry entry : fromLiterals.entrySet()) { + final Literal literal = entry.getKey(); + final BigDecimal expected = entry.getValue(); + + // When + final Expression expression = + ImplicitlyCastResolver.resolve(literal, DECIMAL_5_2); + + // Then + assertThat("Should cast " + literal.getClass().getSimpleName() + " to " + DECIMAL_5_2, + expression, instanceOf(DecimalLiteral.class)); + assertThat("Should cast " + literal.getClass().getSimpleName() + " to " + DECIMAL_5_2, + ((DecimalLiteral)expression).getValue(), + is(expected) + ); + } + } + + @Test + public void shouldNotCastToDecimal() { + // Given + final List fromLiterals = Arrays.asList( + new BooleanLiteral("true"), + new StringLiteral("10.2"), + new DecimalLiteral(BigDecimal.valueOf(10.133)) + ); + + for (final Literal literal : fromLiterals) { + // When + final Expression expression = + ImplicitlyCastResolver.resolve(literal, DECIMAL_5_2); + + // Then + assertThat("Should not cast " + literal.getClass().getSimpleName() + " to " + DECIMAL_5_2, + expression, instanceOf(literal.getClass())); + assertThat("Should not cast " + literal.getClass().getSimpleName() + " to " + DECIMAL_5_2, + expression.equals(literal), is(true)); + } + } +} diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/insert-into_-_implicitly_casts/6.1.0_1597343542697/plan.json b/ksqldb-functional-tests/src/test/resources/historical_plans/insert-into_-_implicitly_casts/6.1.0_1597343542697/plan.json new file mode 100644 index 000000000000..b313b166ae92 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/insert-into_-_implicitly_casts/6.1.0_1597343542697/plan.json @@ -0,0 +1,141 @@ +{ + "plan" : [ { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM SOURCE (IGNORED STRING) WITH (KAFKA_TOPIC='source', VALUE_FORMAT='AVRO');", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "SOURCE", + "schema" : "`IGNORED` STRING", + "topicName" : "source", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "AVRO" + } + }, + "orReplace" : false + } + }, { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM TARGET (C1 DECIMAL(5, 2), C2 DECIMAL(5, 2)) WITH (KAFKA_TOPIC='target', VALUE_FORMAT='AVRO');", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "TARGET", + "schema" : "`C1` DECIMAL(5, 2), `C2` DECIMAL(5, 2)", + "topicName" : "target", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "AVRO" + } + }, + "orReplace" : false + } + }, { + "@type" : "ksqlPlanV1", + "statementText" : "INSERT INTO TARGET SELECT\n 1 C1,\n 2.0 C2\nFROM SOURCE SOURCE\nEMIT CHANGES", + "queryPlan" : { + "sources" : [ "SOURCE" ], + "sink" : "TARGET", + "physicalPlan" : { + "@type" : "streamSinkV1", + "properties" : { + "queryContext" : "TARGET" + }, + "source" : { + "@type" : "streamSelectV1", + "properties" : { + "queryContext" : "Project" + }, + "source" : { + "@type" : "streamSourceV1", + "properties" : { + "queryContext" : "KsqlTopic/Source" + }, + "topicName" : "source", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "AVRO" + } + }, + "sourceSchema" : "`IGNORED` STRING" + }, + "selectExpressions" : [ "1.00 AS C1", "2.00 AS C2" ] + }, + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "AVRO" + } + }, + "topicName" : "target" + }, + "queryId" : "INSERTQUERY_0" + } + } ], + "configs" : { + "ksql.extension.dir" : "ext", + "ksql.streams.cache.max.bytes.buffering" : "0", + "ksql.security.extension.class" : null, + "metric.reporters" : "", + "ksql.transient.prefix" : "transient_", + "ksql.streams.default.deserialization.exception.handler" : "io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler", + "ksql.output.topic.name.prefix" : "", + "ksql.query.pull.enable.standby.reads" : "false", + "ksql.streams.max.task.idle.ms" : "0", + "ksql.query.error.max.queue.size" : "10", + "ksql.internal.topic.min.insync.replicas" : "1", + "ksql.streams.shutdown.timeout.ms" : "300000", + "ksql.internal.topic.replicas" : "1", + "ksql.insert.into.values.enabled" : "true", + "ksql.query.pull.max.allowed.offset.lag" : "9223372036854775807", + "ksql.query.pull.max.qps" : "2147483647", + "ksql.access.validator.enable" : "auto", + "ksql.streams.bootstrap.servers" : "localhost:0", + "ksql.query.pull.metrics.enabled" : "false", + "ksql.create.or.replace.enabled" : "false", + "ksql.metrics.extension" : null, + "ksql.hidden.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.cast.strings.preserve.nulls" : "true", + "ksql.authorization.cache.max.entries" : "10000", + "ksql.pull.queries.enable" : "true", + "ksql.suppress.enabled" : "true", + "ksql.sink.window.change.log.additional.retention" : "1000000", + "ksql.readonly.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.query.persistent.active.limit" : "2147483647", + "ksql.persistence.wrap.single.values" : "true", + "ksql.authorization.cache.expiry.time.secs" : "30", + "ksql.query.retry.backoff.initial.ms" : "15000", + "ksql.suppress.buffer.size" : "-1", + "ksql.schema.registry.url" : "", + "ksql.properties.overrides.denylist" : "", + "ksql.streams.auto.offset.reset" : "earliest", + "ksql.connect.url" : "http://localhost:8083", + "ksql.service.id" : "some.ksql.service.id", + "ksql.streams.default.production.exception.handler" : "io.confluent.ksql.errors.ProductionExceptionHandlerUtil$LogAndFailProductionExceptionHandler", + "ksql.enable.metastore.backup" : "false", + "ksql.streams.commit.interval.ms" : "2000", + "ksql.streams.auto.commit.interval.ms" : "0", + "ksql.streams.topology.optimization" : "all", + "ksql.query.retry.backoff.max.ms" : "900000", + "ksql.streams.num.stream.threads" : "4", + "ksql.timestamp.throw.on.invalid" : "false", + "ksql.metrics.tags.custom" : "", + "ksql.udfs.enabled" : "true", + "ksql.udf.enable.security.manager" : "true", + "ksql.connect.worker.config" : "", + "ksql.udf.collect.metrics" : "false", + "ksql.persistent.prefix" : "query_", + "ksql.metastore.backup.location" : "", + "ksql.error.classifier.regex" : "" + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/insert-into_-_implicitly_casts/6.1.0_1597343542697/spec.json b/ksqldb-functional-tests/src/test/resources/historical_plans/insert-into_-_implicitly_casts/6.1.0_1597343542697/spec.json new file mode 100644 index 000000000000..918d34435c20 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/insert-into_-_implicitly_casts/6.1.0_1597343542697/spec.json @@ -0,0 +1,114 @@ +{ + "version" : "6.1.0", + "timestamp" : 1597343542697, + "path" : "query-validation-tests/insert-into.json", + "schemas" : { + "INSERTQUERY_0.KsqlTopic.Source" : "STRUCT NOT NULL", + "INSERTQUERY_0.TARGET" : "STRUCT NOT NULL" + }, + "testCase" : { + "name" : "implicitly casts", + "inputs" : [ { + "topic" : "source", + "key" : null, + "value" : { + "ignored" : "v1" + } + } ], + "outputs" : [ { + "topic" : "target", + "key" : null, + "value" : { + "C1" : 1.00, + "C2" : 2.00 + } + } ], + "topics" : [ { + "name" : "source", + "schema" : { + "type" : "record", + "name" : "KsqlDataSourceSchema", + "namespace" : "io.confluent.ksql.avro_schemas", + "fields" : [ { + "name" : "IGNORED", + "type" : [ "null", "string" ], + "default" : null + } ], + "connect.name" : "io.confluent.ksql.avro_schemas.KsqlDataSourceSchema" + }, + "format" : "AVRO", + "replicas" : 1, + "numPartitions" : 4 + }, { + "name" : "target", + "schema" : { + "type" : "record", + "name" : "KsqlDataSourceSchema", + "namespace" : "io.confluent.ksql.avro_schemas", + "fields" : [ { + "name" : "C1", + "type" : [ "null", { + "type" : "bytes", + "scale" : 2, + "precision" : 5, + "connect.version" : 1, + "connect.parameters" : { + "scale" : "2", + "connect.decimal.precision" : "5" + }, + "connect.name" : "org.apache.kafka.connect.data.Decimal", + "logicalType" : "decimal" + } ], + "default" : null + }, { + "name" : "C2", + "type" : [ "null", { + "type" : "bytes", + "scale" : 2, + "precision" : 5, + "connect.version" : 1, + "connect.parameters" : { + "scale" : "2", + "connect.decimal.precision" : "5" + }, + "connect.name" : "org.apache.kafka.connect.data.Decimal", + "logicalType" : "decimal" + } ], + "default" : null + } ], + "connect.name" : "io.confluent.ksql.avro_schemas.KsqlDataSourceSchema" + }, + "format" : "AVRO", + "replicas" : 1, + "numPartitions" : 4 + } ], + "statements" : [ "CREATE STREAM SOURCE (ignored VARCHAR) WITH (kafka_topic='source', value_format='AVRO');", "CREATE STREAM TARGET (c1 DECIMAL(5,2), c2 DECIMAL(5,2)) WITH (kafka_topic='target', value_format='AVRO');", "INSERT INTO TARGET SELECT 1 as c1, 2.0 as c2 FROM SOURCE;" ], + "post" : { + "topics" : { + "topics" : [ { + "name" : "source", + "keyFormat" : { + "formatInfo" : { + "format" : "KAFKA" + } + }, + "valueFormat" : { + "format" : "AVRO" + }, + "partitions" : 4 + }, { + "name" : "target", + "keyFormat" : { + "formatInfo" : { + "format" : "KAFKA" + } + }, + "valueFormat" : { + "format" : "AVRO" + }, + "partitions" : 4 + } ] + } + } + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/insert-into_-_implicitly_casts/6.1.0_1597343542697/topology b/ksqldb-functional-tests/src/test/resources/historical_plans/insert-into_-_implicitly_casts/6.1.0_1597343542697/topology new file mode 100644 index 000000000000..46b5c8788c24 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/insert-into_-_implicitly_casts/6.1.0_1597343542697/topology @@ -0,0 +1,13 @@ +Topologies: + Sub-topology: 0 + Source: KSTREAM-SOURCE-0000000000 (topics: [source]) + --> KSTREAM-TRANSFORMVALUES-0000000001 + Processor: KSTREAM-TRANSFORMVALUES-0000000001 (stores: []) + --> Project + <-- KSTREAM-SOURCE-0000000000 + Processor: Project (stores: []) + --> KSTREAM-SINK-0000000003 + <-- KSTREAM-TRANSFORMVALUES-0000000001 + Sink: KSTREAM-SINK-0000000003 (topic: target) + <-- Project + diff --git a/ksqldb-functional-tests/src/test/resources/query-validation-tests/insert-into.json b/ksqldb-functional-tests/src/test/resources/query-validation-tests/insert-into.json index 93b51f958ddd..e21fc67fecfe 100644 --- a/ksqldb-functional-tests/src/test/resources/query-validation-tests/insert-into.json +++ b/ksqldb-functional-tests/src/test/resources/query-validation-tests/insert-into.json @@ -173,6 +173,20 @@ {"topic": "_confluent-ksql-some.ksql.service.idquery_INSERTQUERY_0-KSTREAM-JOINOTHER-0000000017-store-changelog", "window": {"start": 0, "end": 1000, "type": "time"}, "key": "v1", "value": {"S2_K": "v1", "S2_ROWTIME": 0, "S2_ID": "s2-key"}}, {"topic": "OUTPUT", "key": "v1", "value": {"DATA": "s1-keys2-key", "I": 1}} ] + }, + { + "name": "implicitly casts", + "statements": [ + "CREATE STREAM SOURCE (ignored VARCHAR) WITH (kafka_topic='source', value_format='AVRO');", + "CREATE STREAM TARGET (c1 DECIMAL(5,2), c2 DECIMAL(5,2)) WITH (kafka_topic='target', value_format='AVRO');", + "INSERT INTO TARGET SELECT 1 as c1, 2.0 as c2 FROM SOURCE;" + ], + "inputs": [ + {"topic": "source", "value": {"ignored": "v1"}} + ], + "outputs": [ + {"topic": "target", "value": {"C1": 1.00, "C2": 2.00}} + ] } ] } \ No newline at end of file