diff --git a/ksqldb-common/src/main/java/io/confluent/ksql/schema/ksql/types/SqlArray.java b/ksqldb-common/src/main/java/io/confluent/ksql/schema/ksql/types/SqlArray.java index 768d19d9f844..1d9a081ee858 100644 --- a/ksqldb-common/src/main/java/io/confluent/ksql/schema/ksql/types/SqlArray.java +++ b/ksqldb-common/src/main/java/io/confluent/ksql/schema/ksql/types/SqlArray.java @@ -44,11 +44,6 @@ public SqlType getItemType() { return itemType; } - @Override - public boolean supportsCast() { - return false; - } - @Override public void validateValue(final Object value) { if (value == null) { diff --git a/ksqldb-common/src/main/java/io/confluent/ksql/schema/ksql/types/SqlDecimal.java b/ksqldb-common/src/main/java/io/confluent/ksql/schema/ksql/types/SqlDecimal.java index 5ef1b6402a4f..379809da3874 100644 --- a/ksqldb-common/src/main/java/io/confluent/ksql/schema/ksql/types/SqlDecimal.java +++ b/ksqldb-common/src/main/java/io/confluent/ksql/schema/ksql/types/SqlDecimal.java @@ -50,11 +50,6 @@ public int getScale() { return scale; } - @Override - public boolean supportsCast() { - return true; - } - @Override public void validateValue(final Object value) { if (value == null) { diff --git a/ksqldb-common/src/main/java/io/confluent/ksql/schema/ksql/types/SqlMap.java b/ksqldb-common/src/main/java/io/confluent/ksql/schema/ksql/types/SqlMap.java index 69b63f7ecaff..d0351aaf2f85 100644 --- a/ksqldb-common/src/main/java/io/confluent/ksql/schema/ksql/types/SqlMap.java +++ b/ksqldb-common/src/main/java/io/confluent/ksql/schema/ksql/types/SqlMap.java @@ -45,11 +45,6 @@ public SqlType getValueType() { return valueType; } - @Override - public boolean supportsCast() { - return false; - } - @Override public void validateValue(final Object value) { if (value == null) { diff --git a/ksqldb-common/src/main/java/io/confluent/ksql/schema/ksql/types/SqlPrimitiveType.java b/ksqldb-common/src/main/java/io/confluent/ksql/schema/ksql/types/SqlPrimitiveType.java index 2e1d3ebff254..01927f39431f 100644 --- a/ksqldb-common/src/main/java/io/confluent/ksql/schema/ksql/types/SqlPrimitiveType.java +++ b/ksqldb-common/src/main/java/io/confluent/ksql/schema/ksql/types/SqlPrimitiveType.java @@ -79,11 +79,6 @@ private SqlPrimitiveType(final SqlBaseType baseType) { super(baseType); } - @Override - public boolean supportsCast() { - return true; - } - @Override public void validateValue(final Object value) { if (value == null) { diff --git a/ksqldb-common/src/main/java/io/confluent/ksql/schema/ksql/types/SqlStruct.java b/ksqldb-common/src/main/java/io/confluent/ksql/schema/ksql/types/SqlStruct.java index 780905189686..3b7ccde9686e 100644 --- a/ksqldb-common/src/main/java/io/confluent/ksql/schema/ksql/types/SqlStruct.java +++ b/ksqldb-common/src/main/java/io/confluent/ksql/schema/ksql/types/SqlStruct.java @@ -62,11 +62,6 @@ public Optional field(final String name) { return Optional.ofNullable(byName.get(name)); } - @Override - public boolean supportsCast() { - return false; - } - @Override public void validateValue(final Object value) { if (value == null) { diff --git a/ksqldb-common/src/main/java/io/confluent/ksql/schema/ksql/types/SqlType.java b/ksqldb-common/src/main/java/io/confluent/ksql/schema/ksql/types/SqlType.java index c8e939c64c6f..be3cf52fb8b9 100644 --- a/ksqldb-common/src/main/java/io/confluent/ksql/schema/ksql/types/SqlType.java +++ b/ksqldb-common/src/main/java/io/confluent/ksql/schema/ksql/types/SqlType.java @@ -36,8 +36,6 @@ public SqlBaseType baseType() { return baseType; } - public abstract boolean supportsCast(); - public abstract void validateValue(Object value); public abstract String toString(FormatOptions formatOptions); diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/engine/InsertValuesExecutor.java b/ksqldb-engine/src/main/java/io/confluent/ksql/engine/InsertValuesExecutor.java index 93c517297330..aac464f8d1f5 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/engine/InsertValuesExecutor.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/engine/InsertValuesExecutor.java @@ -25,6 +25,7 @@ import io.confluent.ksql.execution.codegen.CodeGenRunner; import io.confluent.ksql.execution.codegen.ExpressionMetadata; import io.confluent.ksql.execution.expression.tree.Expression; +import io.confluent.ksql.execution.expression.tree.NullLiteral; import io.confluent.ksql.execution.expression.tree.VisitParentExpressionVisitor; import io.confluent.ksql.function.FunctionRegistry; import io.confluent.ksql.logging.processing.NoopProcessingLogContext; @@ -588,8 +589,13 @@ protected Object visitExpression(final Expression expression, final Void context fieldName, valueSqlType, value)); + }) + .orElse(null); + } - }); + @Override + public Object visitNullLiteral(final NullLiteral node, final Void context) { + return null; } } } \ No newline at end of file diff --git a/ksqldb-execution/src/main/java/io/confluent/ksql/execution/codegen/CodeGenRunner.java b/ksqldb-execution/src/main/java/io/confluent/ksql/execution/codegen/CodeGenRunner.java index 6ca850c5bd0a..a2d061cc24f8 100644 --- a/ksqldb-execution/src/main/java/io/confluent/ksql/execution/codegen/CodeGenRunner.java +++ b/ksqldb-execution/src/main/java/io/confluent/ksql/execution/codegen/CodeGenRunner.java @@ -112,6 +112,11 @@ public ExpressionMetadata buildCodeGenFromParseTree( final SqlType expressionType = expressionTypeManager .getExpressionSqlType(expression); + if (expressionType == null) { + // expressionType can be null if expression is NULL. + throw new KsqlException("NULL expression not supported"); + } + ee.setExpressionType(SQL_TO_JAVA_TYPE_CONVERTER.toJavaType(expressionType)); ee.cook(javaCode); @@ -123,8 +128,7 @@ public ExpressionMetadata buildCodeGenFromParseTree( expression ); } catch (KsqlException | CompileException e) { - throw new KsqlException("Code generation failed for " + type - + ": " + e.getMessage() + throw new KsqlException("Invalid " + type + ": " + e.getMessage() + ". expression:" + expression + ", schema:" + schema, e); } catch (final Exception e) { throw new RuntimeException("Unexpected error generating code for " + type diff --git a/ksqldb-execution/src/main/java/io/confluent/ksql/execution/codegen/SqlToJavaVisitor.java b/ksqldb-execution/src/main/java/io/confluent/ksql/execution/codegen/SqlToJavaVisitor.java index 4a9489a7b511..7fb6e9b269bd 100644 --- a/ksqldb-execution/src/main/java/io/confluent/ksql/execution/codegen/SqlToJavaVisitor.java +++ b/ksqldb-execution/src/main/java/io/confluent/ksql/execution/codegen/SqlToJavaVisitor.java @@ -27,6 +27,7 @@ import io.confluent.ksql.execution.codegen.helpers.ArrayAccess; import io.confluent.ksql.execution.codegen.helpers.ArrayBuilder; import io.confluent.ksql.execution.codegen.helpers.LikeEvaluator; +import io.confluent.ksql.execution.codegen.helpers.MapBuilder; import io.confluent.ksql.execution.codegen.helpers.SearchedCaseFunction; import io.confluent.ksql.execution.expression.tree.ArithmeticBinaryExpression; import io.confluent.ksql.execution.expression.tree.ArithmeticUnaryExpression; @@ -126,7 +127,8 @@ public class SqlToJavaVisitor { SchemaBuilder.class.getCanonicalName(), Struct.class.getCanonicalName(), ArrayBuilder.class.getCanonicalName(), - LikeEvaluator.class.getCanonicalName() + LikeEvaluator.class.getCanonicalName(), + MapBuilder.class.getCanonicalName() ); private static final Map DECIMAL_OPERATOR_NAME = ImmutableMap @@ -813,7 +815,9 @@ public Pair visitCreateMapExpression( final CreateMapExpression exp, final Void context ) { - final StringBuilder map = new StringBuilder("ImmutableMap.builder()"); + final StringBuilder map = new StringBuilder("new MapBuilder("); + map.append(exp.getMap().size()); + map.append((')')); for (Entry entry: exp.getMap().entrySet()) { map.append(".put("); @@ -900,27 +904,25 @@ private CastVisitor() { } static Pair getCast(final Pair expr, final SqlType sqlType) { - if (!sqlType.supportsCast()) { - throw new KsqlFunctionException( - "Only casts to primitive types and decimal are supported: " + sqlType); - } - - final SqlType rightSchema = expr.getRight(); - if (sqlType.equals(rightSchema) || rightSchema == null) { + final SqlType sourceType = expr.getRight(); + if (sourceType == null || sqlType.equals(sourceType)) { + // sourceType is null if source is SQL NULL return new Pair<>(expr.getLeft(), sqlType); } - return CASTERS.getOrDefault( - sqlType.baseType(), - (e, t, r) -> { - throw new KsqlException("Invalid cast operation: " + t); - } - ) - .cast(expr, sqlType, sqlType); + return CASTERS.getOrDefault(sqlType.baseType(), CastVisitor::unsupportedCast) + .cast(expr, sqlType); + } + + private static Pair unsupportedCast( + final Pair expr, final SqlType returnType + ) { + throw new KsqlFunctionException("Cast of " + expr.getRight() + + " to " + returnType + " is not supported"); } private static Pair castString( - final Pair expr, final SqlType sqltype, final SqlType returnType + final Pair expr, final SqlType returnType ) { final SqlType schema = expr.getRight(); final String exprStr; @@ -936,13 +938,13 @@ private static Pair castString( } private static Pair castBoolean( - final Pair expr, final SqlType sqltype, final SqlType returnType + final Pair expr, final SqlType returnType ) { return new Pair<>(getCastToBooleanString(expr.getRight(), expr.getLeft()), returnType); } private static Pair castInteger( - final Pair expr, final SqlType sqltype, final SqlType returnType + final Pair expr, final SqlType returnType ) { final String exprStr = getCastString( expr.getRight(), @@ -954,7 +956,7 @@ private static Pair castInteger( } private static Pair castLong( - final Pair expr, final SqlType sqltype, final SqlType returnType + final Pair expr, final SqlType returnType ) { final String exprStr = getCastString( expr.getRight(), @@ -966,7 +968,7 @@ private static Pair castLong( } private static Pair castDouble( - final Pair expr, final SqlType sqltype, final SqlType returnType + final Pair expr, final SqlType returnType ) { final String exprStr = getCastString( expr.getRight(), @@ -978,13 +980,13 @@ private static Pair castDouble( } private static Pair castDecimal( - final Pair expr, final SqlType sqltype, final SqlType returnType + final Pair expr, final SqlType returnType ) { - if (!(sqltype instanceof SqlDecimal)) { - throw new KsqlException("Expected decimal type: " + sqltype); + if (!(returnType instanceof SqlDecimal)) { + throw new KsqlException("Expected decimal type: " + returnType); } - final SqlDecimal sqlDecimal = (SqlDecimal) sqltype; + final SqlDecimal sqlDecimal = (SqlDecimal) returnType; if (expr.getRight().baseType() == SqlBaseType.DECIMAL && expr.right.equals(sqlDecimal)) { return expr; @@ -1024,7 +1026,6 @@ private static String getCastString( return "(new Double(" + exprStr + ")." + javaTypeMethod + ")"; case STRING: return javaStringParserMethod + "(" + exprStr + ")"; - default: throw new KsqlFunctionException( "Invalid cast operation: Cannot cast " @@ -1061,7 +1062,6 @@ private interface CastFunction { Pair cast( Pair expr, - SqlType sqltype, SqlType returnType ); } @@ -1079,5 +1079,4 @@ private CaseWhenProcessed( this.thenProcessResult = thenProcessResult; } } - } diff --git a/ksqldb-execution/src/main/java/io/confluent/ksql/execution/codegen/helpers/MapBuilder.java b/ksqldb-execution/src/main/java/io/confluent/ksql/execution/codegen/helpers/MapBuilder.java new file mode 100644 index 000000000000..d183d1fbe4eb --- /dev/null +++ b/ksqldb-execution/src/main/java/io/confluent/ksql/execution/codegen/helpers/MapBuilder.java @@ -0,0 +1,42 @@ +/* + * Copyright 2020 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"; you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.execution.codegen.helpers; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +/** + * Used to construct maps using the builder pattern. Note that we cannot use {@link + * com.google.common.collect.ImmutableMap} because it does not accept null values. + */ +public class MapBuilder { + + private final HashMap map; + + public MapBuilder(final int size) { + map = new HashMap<>(size); + } + + public MapBuilder put(final Object key, final Object value) { + map.put(key, value); + return this; + } + + public Map build() { + return Collections.unmodifiableMap(map); + } +} \ No newline at end of file diff --git a/ksqldb-execution/src/main/java/io/confluent/ksql/execution/util/ExpressionTypeManager.java b/ksqldb-execution/src/main/java/io/confluent/ksql/execution/util/ExpressionTypeManager.java index e6f1280959b4..6a2e3a11682f 100644 --- a/ksqldb-execution/src/main/java/io/confluent/ksql/execution/util/ExpressionTypeManager.java +++ b/ksqldb-execution/src/main/java/io/confluent/ksql/execution/util/ExpressionTypeManager.java @@ -55,7 +55,6 @@ import io.confluent.ksql.function.AggregateFunctionInitArguments; import io.confluent.ksql.function.FunctionRegistry; import io.confluent.ksql.function.KsqlAggregateFunction; -import io.confluent.ksql.function.KsqlFunctionException; import io.confluent.ksql.function.KsqlTableFunction; import io.confluent.ksql.function.UdfFactory; import io.confluent.ksql.schema.ksql.Column; @@ -146,13 +145,7 @@ public Void visitNotExpression( @Override public Void visitCast(final Cast node, final ExpressionTypeContext expressionTypeContext) { - final SqlType sqlType = node.getType().getSqlType(); - if (!sqlType.supportsCast()) { - throw new KsqlFunctionException("Only casts to primitive types or decimals " - + "are supported: " + sqlType); - } - - expressionTypeContext.setSqlType(sqlType); + expressionTypeContext.setSqlType(node.getType().getSqlType()); return null; } @@ -404,7 +397,11 @@ public Void visitCreateMapExpression( .collect(Collectors.toList()); if (keyTypes.stream().anyMatch(type -> !SqlTypes.STRING.equals(type))) { - throw new KsqlException("Only STRING keys are supported in maps but got: " + keyTypes); + final String types = keyTypes.stream() + .map(type -> type == null ? "NULL" : type.toString()) + .collect(Collectors.joining(", ", "[", "]")); + + throw new KsqlException("Only STRING keys are supported in maps but got: " + types); } final List valueTypes = exp.getMap() @@ -414,9 +411,16 @@ public Void visitCreateMapExpression( process(val, context); return context.getSqlType(); }) + .filter(Objects::nonNull) .distinct() .collect(Collectors.toList()); + if (valueTypes.size() == 0) { + throw new KsqlException("Cannot construct a map with all NULL values " + + "(see https://github.com/confluentinc/ksql/issues/4239). As a workaround, you may " + + "cast a NULL value to the desired type."); + } + if (valueTypes.size() != 1) { throw new KsqlException( String.format( @@ -425,11 +429,6 @@ public Void visitCreateMapExpression( exp)); } - if (valueTypes.get(0) == null) { - throw new KsqlException("Cannot construct MAP with NULL values. As a workaround, you " - + "may cast a NULL value to the desired type."); - } - context.setSqlType(SqlMap.of(valueTypes.get(0))); return null; } diff --git a/ksqldb-execution/src/test/java/io/confluent/ksql/execution/codegen/SqlToJavaVisitorTest.java b/ksqldb-execution/src/test/java/io/confluent/ksql/execution/codegen/SqlToJavaVisitorTest.java index 80aed138d992..d5d24b7ddec4 100644 --- a/ksqldb-execution/src/test/java/io/confluent/ksql/execution/codegen/SqlToJavaVisitorTest.java +++ b/ksqldb-execution/src/test/java/io/confluent/ksql/execution/codegen/SqlToJavaVisitorTest.java @@ -187,7 +187,7 @@ public void shouldProcessCreateMapExpressionCorrectly() { String java = sqlToJavaVisitor.process(expression); // Then: - assertThat(java, equalTo("((Map)ImmutableMap.builder().put(\"foo\", ((Double) ((java.util.Map)COL5).get(\"key1\"))).put(\"bar\", 1E0).build())")); + assertThat(java, equalTo("((Map)new MapBuilder(2).put(\"foo\", ((Double) ((java.util.Map)COL5).get(\"key1\"))).put(\"bar\", 1E0).build())")); } @Test diff --git a/ksqldb-execution/src/test/java/io/confluent/ksql/execution/util/ExpressionTypeManagerTest.java b/ksqldb-execution/src/test/java/io/confluent/ksql/execution/util/ExpressionTypeManagerTest.java index 6d8dfea47b4a..9a5cf26ea95b 100644 --- a/ksqldb-execution/src/test/java/io/confluent/ksql/execution/util/ExpressionTypeManagerTest.java +++ b/ksqldb-execution/src/test/java/io/confluent/ksql/execution/util/ExpressionTypeManagerTest.java @@ -470,7 +470,7 @@ public void shouldThrowOnMapOfNullValues() { // Expect expectedException.expect(KsqlException.class); - expectedException.expectMessage("Cannot construct MAP with NULL values"); + expectedException.expectMessage("Cannot construct a map with all NULL values"); // When: expressionTypeManager.getExpressionSqlType(expression); diff --git a/ksqldb-functional-tests/src/main/java/io/confluent/ksql/test/tools/TopicInfoCache.java b/ksqldb-functional-tests/src/main/java/io/confluent/ksql/test/tools/TopicInfoCache.java index 38e3f29d8ec3..2318a59ec328 100644 --- a/ksqldb-functional-tests/src/main/java/io/confluent/ksql/test/tools/TopicInfoCache.java +++ b/ksqldb-functional-tests/src/main/java/io/confluent/ksql/test/tools/TopicInfoCache.java @@ -249,24 +249,19 @@ private Function keyCoercer() { .get(0) .type(); - return key -> { - if (key == null) { - return null; - } - - return DefaultSqlValueCoercer.INSTANCE - .coerce(key, keyType) - .orElseThrow(() -> new AssertionError("Invalid key value for topic " + topicName + "." - + System.lineSeparator() - + "Expected KeyType: " + keyType - + System.lineSeparator() - + "Actual KeyType: " + SchemaConverters.javaToSqlConverter() - .toSqlType(key.getClass()) - + ", key: " + key + "." - + System.lineSeparator() - + "This is likely caused by the key type in the test-case not matching the schema." - )); - }; + return key -> DefaultSqlValueCoercer.INSTANCE + .coerce(key, keyType) + .orElseThrow(() -> new AssertionError("Invalid key value for topic " + topicName + "." + + System.lineSeparator() + + "Expected KeyType: " + keyType + + System.lineSeparator() + + "Actual KeyType: " + SchemaConverters.javaToSqlConverter() + .toSqlType(key.getClass()) + + ", key: " + key + "." + + System.lineSeparator() + + "This is likely caused by the key type in the test-case not matching the schema." + )) + .orElse(null); } } } diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/cast_-_array_to_array_with_same_schema/6.0.0_1586271089814/plan.json b/ksqldb-functional-tests/src/test/resources/historical_plans/cast_-_array_to_array_with_same_schema/6.0.0_1586271089814/plan.json new file mode 100644 index 000000000000..a714dc94a83a --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/cast_-_array_to_array_with_same_schema/6.0.0_1586271089814/plan.json @@ -0,0 +1,126 @@ +{ + "plan" : [ { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM TEST (F0 ARRAY) WITH (KAFKA_TOPIC='test_topic', VALUE_FORMAT='JSON');", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "TEST", + "schema" : "`ROWKEY` STRING KEY, `F0` ARRAY", + "topicName" : "test_topic", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + } + } + }, { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM OUTPUT AS SELECT CAST(TEST.F0 AS ARRAY) KSQL_COL_0\nFROM TEST TEST\nEMIT CHANGES", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "OUTPUT", + "schema" : "`ROWKEY` STRING KEY, `KSQL_COL_0` ARRAY", + "topicName" : "OUTPUT", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + } + }, + "queryPlan" : { + "sources" : [ "TEST" ], + "sink" : "OUTPUT", + "physicalPlan" : { + "@type" : "streamSinkV1", + "properties" : { + "queryContext" : "OUTPUT" + }, + "source" : { + "@type" : "streamSelectV1", + "properties" : { + "queryContext" : "Project" + }, + "source" : { + "@type" : "streamSourceV1", + "properties" : { + "queryContext" : "KsqlTopic/Source" + }, + "topicName" : "test_topic", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "sourceSchema" : "`ROWKEY` STRING KEY, `F0` ARRAY" + }, + "selectExpressions" : [ "CAST(F0 AS ARRAY) AS KSQL_COL_0" ] + }, + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "topicName" : "OUTPUT" + }, + "queryId" : "CSAS_OUTPUT_0" + } + } ], + "configs" : { + "ksql.extension.dir" : "ext", + "ksql.streams.cache.max.bytes.buffering" : "0", + "ksql.security.extension.class" : null, + "ksql.transient.prefix" : "transient_", + "ksql.persistence.wrap.single.values" : "true", + "ksql.authorization.cache.expiry.time.secs" : "30", + "ksql.schema.registry.url" : "", + "ksql.streams.default.deserialization.exception.handler" : "io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler", + "ksql.output.topic.name.prefix" : "", + "ksql.streams.auto.offset.reset" : "earliest", + "ksql.query.pull.enable.standby.reads" : "false", + "ksql.connect.url" : "http://localhost:8083", + "ksql.service.id" : "some.ksql.service.id", + "ksql.internal.topic.min.insync.replicas" : "1", + "ksql.streams.shutdown.timeout.ms" : "300000", + "ksql.internal.topic.replicas" : "1", + "ksql.insert.into.values.enabled" : "true", + "ksql.query.pull.max.allowed.offset.lag" : "9223372036854775807", + "ksql.query.pull.max.qps" : "2147483647", + "ksql.streams.default.production.exception.handler" : "io.confluent.ksql.errors.ProductionExceptionHandlerUtil$LogAndFailProductionExceptionHandler", + "ksql.access.validator.enable" : "auto", + "ksql.streams.bootstrap.servers" : "localhost:0", + "ksql.streams.commit.interval.ms" : "2000", + "ksql.metric.reporters" : "", + "ksql.query.pull.metrics.enabled" : "false", + "ksql.authentication.plugin.class" : null, + "ksql.streams.auto.commit.interval.ms" : "0", + "ksql.metrics.extension" : null, + "ksql.streams.topology.optimization" : "all", + "ksql.hidden.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.streams.num.stream.threads" : "4", + "ksql.timestamp.throw.on.invalid" : "false", + "ksql.authorization.cache.max.entries" : "10000", + "ksql.metrics.tags.custom" : "", + "ksql.pull.queries.enable" : "true", + "ksql.udfs.enabled" : "true", + "ksql.udf.enable.security.manager" : "true", + "ksql.connect.worker.config" : "", + "ksql.any.key.name.enabled" : "false", + "ksql.sink.window.change.log.additional.retention" : "1000000", + "ksql.readonly.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.udf.collect.metrics" : "false", + "ksql.persistent.prefix" : "query_", + "ksql.query.persistent.active.limit" : "2147483647" + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/cast_-_array_to_array_with_same_schema/6.0.0_1586271089814/spec.json b/ksqldb-functional-tests/src/test/resources/historical_plans/cast_-_array_to_array_with_same_schema/6.0.0_1586271089814/spec.json new file mode 100644 index 000000000000..41fab0eafbf4 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/cast_-_array_to_array_with_same_schema/6.0.0_1586271089814/spec.json @@ -0,0 +1,22 @@ +{ + "version" : "6.0.0", + "timestamp" : 1586271089814, + "schemas" : { + "CSAS_OUTPUT_0.KsqlTopic.Source" : "STRUCT> NOT NULL", + "CSAS_OUTPUT_0.OUTPUT" : "STRUCT> NOT NULL" + }, + "inputs" : [ { + "topic" : "test_topic", + "key" : "", + "value" : { + "f0" : [ 1, 3 ] + } + } ], + "outputs" : [ { + "topic" : "OUTPUT", + "key" : "", + "value" : { + "KSQL_COL_0" : [ 1, 3 ] + } + } ] +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/cast_-_array_to_array_with_same_schema/6.0.0_1586271089814/topology b/ksqldb-functional-tests/src/test/resources/historical_plans/cast_-_array_to_array_with_same_schema/6.0.0_1586271089814/topology new file mode 100644 index 000000000000..441a8f282644 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/cast_-_array_to_array_with_same_schema/6.0.0_1586271089814/topology @@ -0,0 +1,13 @@ +Topologies: + Sub-topology: 0 + Source: KSTREAM-SOURCE-0000000000 (topics: [test_topic]) + --> KSTREAM-TRANSFORMVALUES-0000000001 + Processor: KSTREAM-TRANSFORMVALUES-0000000001 (stores: []) + --> Project + <-- KSTREAM-SOURCE-0000000000 + Processor: Project (stores: []) + --> KSTREAM-SINK-0000000003 + <-- KSTREAM-TRANSFORMVALUES-0000000001 + Sink: KSTREAM-SINK-0000000003 (topic: OUTPUT) + <-- Project + diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/cast_-_map_to_map_with_same_schema/6.0.0_1586271089858/plan.json b/ksqldb-functional-tests/src/test/resources/historical_plans/cast_-_map_to_map_with_same_schema/6.0.0_1586271089858/plan.json new file mode 100644 index 000000000000..b8a36e5c0fba --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/cast_-_map_to_map_with_same_schema/6.0.0_1586271089858/plan.json @@ -0,0 +1,126 @@ +{ + "plan" : [ { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM TEST (F0 MAP) WITH (KAFKA_TOPIC='test_topic', VALUE_FORMAT='JSON');", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "TEST", + "schema" : "`ROWKEY` STRING KEY, `F0` MAP", + "topicName" : "test_topic", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + } + } + }, { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM OUTPUT AS SELECT CAST(TEST.F0 AS MAP) KSQL_COL_0\nFROM TEST TEST\nEMIT CHANGES", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "OUTPUT", + "schema" : "`ROWKEY` STRING KEY, `KSQL_COL_0` MAP", + "topicName" : "OUTPUT", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + } + }, + "queryPlan" : { + "sources" : [ "TEST" ], + "sink" : "OUTPUT", + "physicalPlan" : { + "@type" : "streamSinkV1", + "properties" : { + "queryContext" : "OUTPUT" + }, + "source" : { + "@type" : "streamSelectV1", + "properties" : { + "queryContext" : "Project" + }, + "source" : { + "@type" : "streamSourceV1", + "properties" : { + "queryContext" : "KsqlTopic/Source" + }, + "topicName" : "test_topic", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "sourceSchema" : "`ROWKEY` STRING KEY, `F0` MAP" + }, + "selectExpressions" : [ "CAST(F0 AS MAP) AS KSQL_COL_0" ] + }, + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "topicName" : "OUTPUT" + }, + "queryId" : "CSAS_OUTPUT_0" + } + } ], + "configs" : { + "ksql.extension.dir" : "ext", + "ksql.streams.cache.max.bytes.buffering" : "0", + "ksql.security.extension.class" : null, + "ksql.transient.prefix" : "transient_", + "ksql.persistence.wrap.single.values" : "true", + "ksql.authorization.cache.expiry.time.secs" : "30", + "ksql.schema.registry.url" : "", + "ksql.streams.default.deserialization.exception.handler" : "io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler", + "ksql.output.topic.name.prefix" : "", + "ksql.streams.auto.offset.reset" : "earliest", + "ksql.query.pull.enable.standby.reads" : "false", + "ksql.connect.url" : "http://localhost:8083", + "ksql.service.id" : "some.ksql.service.id", + "ksql.internal.topic.min.insync.replicas" : "1", + "ksql.streams.shutdown.timeout.ms" : "300000", + "ksql.internal.topic.replicas" : "1", + "ksql.insert.into.values.enabled" : "true", + "ksql.query.pull.max.allowed.offset.lag" : "9223372036854775807", + "ksql.query.pull.max.qps" : "2147483647", + "ksql.streams.default.production.exception.handler" : "io.confluent.ksql.errors.ProductionExceptionHandlerUtil$LogAndFailProductionExceptionHandler", + "ksql.access.validator.enable" : "auto", + "ksql.streams.bootstrap.servers" : "localhost:0", + "ksql.streams.commit.interval.ms" : "2000", + "ksql.metric.reporters" : "", + "ksql.query.pull.metrics.enabled" : "false", + "ksql.authentication.plugin.class" : null, + "ksql.streams.auto.commit.interval.ms" : "0", + "ksql.metrics.extension" : null, + "ksql.streams.topology.optimization" : "all", + "ksql.hidden.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.streams.num.stream.threads" : "4", + "ksql.timestamp.throw.on.invalid" : "false", + "ksql.authorization.cache.max.entries" : "10000", + "ksql.metrics.tags.custom" : "", + "ksql.pull.queries.enable" : "true", + "ksql.udfs.enabled" : "true", + "ksql.udf.enable.security.manager" : "true", + "ksql.connect.worker.config" : "", + "ksql.any.key.name.enabled" : "false", + "ksql.sink.window.change.log.additional.retention" : "1000000", + "ksql.readonly.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.udf.collect.metrics" : "false", + "ksql.persistent.prefix" : "query_", + "ksql.query.persistent.active.limit" : "2147483647" + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/cast_-_map_to_map_with_same_schema/6.0.0_1586271089858/spec.json b/ksqldb-functional-tests/src/test/resources/historical_plans/cast_-_map_to_map_with_same_schema/6.0.0_1586271089858/spec.json new file mode 100644 index 000000000000..7e643c18b66a --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/cast_-_map_to_map_with_same_schema/6.0.0_1586271089858/spec.json @@ -0,0 +1,26 @@ +{ + "version" : "6.0.0", + "timestamp" : 1586271089858, + "schemas" : { + "CSAS_OUTPUT_0.KsqlTopic.Source" : "STRUCT> NOT NULL", + "CSAS_OUTPUT_0.OUTPUT" : "STRUCT> NOT NULL" + }, + "inputs" : [ { + "topic" : "test_topic", + "key" : "", + "value" : { + "f0" : { + "this" : 1 + } + } + } ], + "outputs" : [ { + "topic" : "OUTPUT", + "key" : "", + "value" : { + "KSQL_COL_0" : { + "this" : 1 + } + } + } ] +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/cast_-_map_to_map_with_same_schema/6.0.0_1586271089858/topology b/ksqldb-functional-tests/src/test/resources/historical_plans/cast_-_map_to_map_with_same_schema/6.0.0_1586271089858/topology new file mode 100644 index 000000000000..441a8f282644 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/cast_-_map_to_map_with_same_schema/6.0.0_1586271089858/topology @@ -0,0 +1,13 @@ +Topologies: + Sub-topology: 0 + Source: KSTREAM-SOURCE-0000000000 (topics: [test_topic]) + --> KSTREAM-TRANSFORMVALUES-0000000001 + Processor: KSTREAM-TRANSFORMVALUES-0000000001 (stores: []) + --> Project + <-- KSTREAM-SOURCE-0000000000 + Processor: Project (stores: []) + --> KSTREAM-SINK-0000000003 + <-- KSTREAM-TRANSFORMVALUES-0000000001 + Sink: KSTREAM-SINK-0000000003 (topic: OUTPUT) + <-- Project + diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/cast_-_of_nulls/6.0.0_1586271089919/plan.json b/ksqldb-functional-tests/src/test/resources/historical_plans/cast_-_of_nulls/6.0.0_1586271089919/plan.json new file mode 100644 index 000000000000..6db0c3adda8f --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/cast_-_of_nulls/6.0.0_1586271089919/plan.json @@ -0,0 +1,126 @@ +{ + "plan" : [ { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM TEST (IGNORED STRING) WITH (KAFKA_TOPIC='test_topic', VALUE_FORMAT='JSON');", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "TEST", + "schema" : "`ROWKEY` STRING KEY, `IGNORED` STRING", + "topicName" : "test_topic", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + } + } + }, { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM OUTPUT AS SELECT\n CAST(null AS BOOLEAN) KSQL_COL_0,\n CAST(null AS INTEGER) KSQL_COL_1,\n CAST(null AS BIGINT) KSQL_COL_2,\n CAST(null AS DOUBLE) KSQL_COL_3,\n CAST(null AS STRING) KSQL_COL_4,\n CAST(null AS ARRAY) KSQL_COL_5,\n CAST(null AS MAP) KSQL_COL_6,\n CAST(null AS STRUCT) KSQL_COL_7\nFROM TEST TEST\nEMIT CHANGES", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "OUTPUT", + "schema" : "`ROWKEY` STRING KEY, `KSQL_COL_0` BOOLEAN, `KSQL_COL_1` INTEGER, `KSQL_COL_2` BIGINT, `KSQL_COL_3` DOUBLE, `KSQL_COL_4` STRING, `KSQL_COL_5` ARRAY, `KSQL_COL_6` MAP, `KSQL_COL_7` STRUCT<`F0` INTEGER>", + "topicName" : "OUTPUT", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + } + }, + "queryPlan" : { + "sources" : [ "TEST" ], + "sink" : "OUTPUT", + "physicalPlan" : { + "@type" : "streamSinkV1", + "properties" : { + "queryContext" : "OUTPUT" + }, + "source" : { + "@type" : "streamSelectV1", + "properties" : { + "queryContext" : "Project" + }, + "source" : { + "@type" : "streamSourceV1", + "properties" : { + "queryContext" : "KsqlTopic/Source" + }, + "topicName" : "test_topic", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "sourceSchema" : "`ROWKEY` STRING KEY, `IGNORED` STRING" + }, + "selectExpressions" : [ "CAST(null AS BOOLEAN) AS KSQL_COL_0", "CAST(null AS INTEGER) AS KSQL_COL_1", "CAST(null AS BIGINT) AS KSQL_COL_2", "CAST(null AS DOUBLE) AS KSQL_COL_3", "CAST(null AS STRING) AS KSQL_COL_4", "CAST(null AS ARRAY) AS KSQL_COL_5", "CAST(null AS MAP) AS KSQL_COL_6", "CAST(null AS STRUCT) AS KSQL_COL_7" ] + }, + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "topicName" : "OUTPUT" + }, + "queryId" : "CSAS_OUTPUT_0" + } + } ], + "configs" : { + "ksql.extension.dir" : "ext", + "ksql.streams.cache.max.bytes.buffering" : "0", + "ksql.security.extension.class" : null, + "ksql.transient.prefix" : "transient_", + "ksql.persistence.wrap.single.values" : "true", + "ksql.authorization.cache.expiry.time.secs" : "30", + "ksql.schema.registry.url" : "", + "ksql.streams.default.deserialization.exception.handler" : "io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler", + "ksql.output.topic.name.prefix" : "", + "ksql.streams.auto.offset.reset" : "earliest", + "ksql.query.pull.enable.standby.reads" : "false", + "ksql.connect.url" : "http://localhost:8083", + "ksql.service.id" : "some.ksql.service.id", + "ksql.internal.topic.min.insync.replicas" : "1", + "ksql.streams.shutdown.timeout.ms" : "300000", + "ksql.internal.topic.replicas" : "1", + "ksql.insert.into.values.enabled" : "true", + "ksql.query.pull.max.allowed.offset.lag" : "9223372036854775807", + "ksql.query.pull.max.qps" : "2147483647", + "ksql.streams.default.production.exception.handler" : "io.confluent.ksql.errors.ProductionExceptionHandlerUtil$LogAndFailProductionExceptionHandler", + "ksql.access.validator.enable" : "auto", + "ksql.streams.bootstrap.servers" : "localhost:0", + "ksql.streams.commit.interval.ms" : "2000", + "ksql.metric.reporters" : "", + "ksql.query.pull.metrics.enabled" : "false", + "ksql.authentication.plugin.class" : null, + "ksql.streams.auto.commit.interval.ms" : "0", + "ksql.metrics.extension" : null, + "ksql.streams.topology.optimization" : "all", + "ksql.hidden.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.streams.num.stream.threads" : "4", + "ksql.timestamp.throw.on.invalid" : "false", + "ksql.authorization.cache.max.entries" : "10000", + "ksql.metrics.tags.custom" : "", + "ksql.pull.queries.enable" : "true", + "ksql.udfs.enabled" : "true", + "ksql.udf.enable.security.manager" : "true", + "ksql.connect.worker.config" : "", + "ksql.any.key.name.enabled" : "false", + "ksql.sink.window.change.log.additional.retention" : "1000000", + "ksql.readonly.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.udf.collect.metrics" : "false", + "ksql.persistent.prefix" : "query_", + "ksql.query.persistent.active.limit" : "2147483647" + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/cast_-_of_nulls/6.0.0_1586271089919/spec.json b/ksqldb-functional-tests/src/test/resources/historical_plans/cast_-_of_nulls/6.0.0_1586271089919/spec.json new file mode 100644 index 000000000000..de36729dd341 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/cast_-_of_nulls/6.0.0_1586271089919/spec.json @@ -0,0 +1,27 @@ +{ + "version" : "6.0.0", + "timestamp" : 1586271089919, + "schemas" : { + "CSAS_OUTPUT_0.KsqlTopic.Source" : "STRUCT NOT NULL", + "CSAS_OUTPUT_0.OUTPUT" : "STRUCT, KSQL_COL_6 MAP, KSQL_COL_7 STRUCT> NOT NULL" + }, + "inputs" : [ { + "topic" : "test_topic", + "key" : "", + "value" : { } + } ], + "outputs" : [ { + "topic" : "OUTPUT", + "key" : "", + "value" : { + "KQL_COL_0" : null, + "KQL_COL_1" : null, + "KQL_COL_2" : null, + "KQL_COL_3" : null, + "KQL_COL_4" : null, + "KQL_COL_5" : null, + "KQL_COL_6" : null, + "KQL_COL_7" : null + } + } ] +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/cast_-_of_nulls/6.0.0_1586271089919/topology b/ksqldb-functional-tests/src/test/resources/historical_plans/cast_-_of_nulls/6.0.0_1586271089919/topology new file mode 100644 index 000000000000..441a8f282644 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/cast_-_of_nulls/6.0.0_1586271089919/topology @@ -0,0 +1,13 @@ +Topologies: + Sub-topology: 0 + Source: KSTREAM-SOURCE-0000000000 (topics: [test_topic]) + --> KSTREAM-TRANSFORMVALUES-0000000001 + Processor: KSTREAM-TRANSFORMVALUES-0000000001 (stores: []) + --> Project + <-- KSTREAM-SOURCE-0000000000 + Processor: Project (stores: []) + --> KSTREAM-SINK-0000000003 + <-- KSTREAM-TRANSFORMVALUES-0000000001 + Sink: KSTREAM-SINK-0000000003 (topic: OUTPUT) + <-- Project + diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/cast_-_struct_to_struct_with_same_schema/6.0.0_1586271089871/plan.json b/ksqldb-functional-tests/src/test/resources/historical_plans/cast_-_struct_to_struct_with_same_schema/6.0.0_1586271089871/plan.json new file mode 100644 index 000000000000..8eacd8cc4fa1 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/cast_-_struct_to_struct_with_same_schema/6.0.0_1586271089871/plan.json @@ -0,0 +1,126 @@ +{ + "plan" : [ { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM TEST (F0 STRUCT) WITH (KAFKA_TOPIC='test_topic', VALUE_FORMAT='JSON');", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "TEST", + "schema" : "`ROWKEY` STRING KEY, `F0` STRUCT<`F0` INTEGER>", + "topicName" : "test_topic", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + } + } + }, { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM OUTPUT AS SELECT CAST(TEST.F0 AS STRUCT) KSQL_COL_0\nFROM TEST TEST\nEMIT CHANGES", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "OUTPUT", + "schema" : "`ROWKEY` STRING KEY, `KSQL_COL_0` STRUCT<`F0` INTEGER>", + "topicName" : "OUTPUT", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + } + }, + "queryPlan" : { + "sources" : [ "TEST" ], + "sink" : "OUTPUT", + "physicalPlan" : { + "@type" : "streamSinkV1", + "properties" : { + "queryContext" : "OUTPUT" + }, + "source" : { + "@type" : "streamSelectV1", + "properties" : { + "queryContext" : "Project" + }, + "source" : { + "@type" : "streamSourceV1", + "properties" : { + "queryContext" : "KsqlTopic/Source" + }, + "topicName" : "test_topic", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "sourceSchema" : "`ROWKEY` STRING KEY, `F0` STRUCT<`F0` INTEGER>" + }, + "selectExpressions" : [ "CAST(F0 AS STRUCT) AS KSQL_COL_0" ] + }, + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "topicName" : "OUTPUT" + }, + "queryId" : "CSAS_OUTPUT_0" + } + } ], + "configs" : { + "ksql.extension.dir" : "ext", + "ksql.streams.cache.max.bytes.buffering" : "0", + "ksql.security.extension.class" : null, + "ksql.transient.prefix" : "transient_", + "ksql.persistence.wrap.single.values" : "true", + "ksql.authorization.cache.expiry.time.secs" : "30", + "ksql.schema.registry.url" : "", + "ksql.streams.default.deserialization.exception.handler" : "io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler", + "ksql.output.topic.name.prefix" : "", + "ksql.streams.auto.offset.reset" : "earliest", + "ksql.query.pull.enable.standby.reads" : "false", + "ksql.connect.url" : "http://localhost:8083", + "ksql.service.id" : "some.ksql.service.id", + "ksql.internal.topic.min.insync.replicas" : "1", + "ksql.streams.shutdown.timeout.ms" : "300000", + "ksql.internal.topic.replicas" : "1", + "ksql.insert.into.values.enabled" : "true", + "ksql.query.pull.max.allowed.offset.lag" : "9223372036854775807", + "ksql.query.pull.max.qps" : "2147483647", + "ksql.streams.default.production.exception.handler" : "io.confluent.ksql.errors.ProductionExceptionHandlerUtil$LogAndFailProductionExceptionHandler", + "ksql.access.validator.enable" : "auto", + "ksql.streams.bootstrap.servers" : "localhost:0", + "ksql.streams.commit.interval.ms" : "2000", + "ksql.metric.reporters" : "", + "ksql.query.pull.metrics.enabled" : "false", + "ksql.authentication.plugin.class" : null, + "ksql.streams.auto.commit.interval.ms" : "0", + "ksql.metrics.extension" : null, + "ksql.streams.topology.optimization" : "all", + "ksql.hidden.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.streams.num.stream.threads" : "4", + "ksql.timestamp.throw.on.invalid" : "false", + "ksql.authorization.cache.max.entries" : "10000", + "ksql.metrics.tags.custom" : "", + "ksql.pull.queries.enable" : "true", + "ksql.udfs.enabled" : "true", + "ksql.udf.enable.security.manager" : "true", + "ksql.connect.worker.config" : "", + "ksql.any.key.name.enabled" : "false", + "ksql.sink.window.change.log.additional.retention" : "1000000", + "ksql.readonly.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.udf.collect.metrics" : "false", + "ksql.persistent.prefix" : "query_", + "ksql.query.persistent.active.limit" : "2147483647" + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/cast_-_struct_to_struct_with_same_schema/6.0.0_1586271089871/spec.json b/ksqldb-functional-tests/src/test/resources/historical_plans/cast_-_struct_to_struct_with_same_schema/6.0.0_1586271089871/spec.json new file mode 100644 index 000000000000..86fdd3f03285 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/cast_-_struct_to_struct_with_same_schema/6.0.0_1586271089871/spec.json @@ -0,0 +1,26 @@ +{ + "version" : "6.0.0", + "timestamp" : 1586271089871, + "schemas" : { + "CSAS_OUTPUT_0.KsqlTopic.Source" : "STRUCT> NOT NULL", + "CSAS_OUTPUT_0.OUTPUT" : "STRUCT> NOT NULL" + }, + "inputs" : [ { + "topic" : "test_topic", + "key" : "", + "value" : { + "f0" : { + "f0" : 1 + } + } + } ], + "outputs" : [ { + "topic" : "OUTPUT", + "key" : "", + "value" : { + "KSQL_COL_0" : { + "F0" : 1 + } + } + } ] +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/cast_-_struct_to_struct_with_same_schema/6.0.0_1586271089871/topology b/ksqldb-functional-tests/src/test/resources/historical_plans/cast_-_struct_to_struct_with_same_schema/6.0.0_1586271089871/topology new file mode 100644 index 000000000000..441a8f282644 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/cast_-_struct_to_struct_with_same_schema/6.0.0_1586271089871/topology @@ -0,0 +1,13 @@ +Topologies: + Sub-topology: 0 + Source: KSTREAM-SOURCE-0000000000 (topics: [test_topic]) + --> KSTREAM-TRANSFORMVALUES-0000000001 + Processor: KSTREAM-TRANSFORMVALUES-0000000001 (stores: []) + --> Project + <-- KSTREAM-SOURCE-0000000000 + Processor: Project (stores: []) + --> KSTREAM-SINK-0000000003 + <-- KSTREAM-TRANSFORMVALUES-0000000001 + Sink: KSTREAM-SINK-0000000003 (topic: OUTPUT) + <-- Project + diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/create-struct_-_cast_null_values/6.0.0_1586271092094/plan.json b/ksqldb-functional-tests/src/test/resources/historical_plans/create-struct_-_cast_null_values/6.0.0_1586271092094/plan.json new file mode 100644 index 000000000000..266bd776dc6e --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/create-struct_-_cast_null_values/6.0.0_1586271092094/plan.json @@ -0,0 +1,126 @@ +{ + "plan" : [ { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM INPUT (IGNORED STRING) WITH (KAFKA_TOPIC='test', VALUE_FORMAT='JSON');", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "INPUT", + "schema" : "`ROWKEY` STRING KEY, `IGNORED` STRING", + "topicName" : "test", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + } + } + }, { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM OUTPUT AS SELECT STRUCT(F1:=CAST(null AS INTEGER)) S\nFROM INPUT INPUT\nEMIT CHANGES", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "OUTPUT", + "schema" : "`ROWKEY` STRING KEY, `S` STRUCT<`F1` INTEGER>", + "topicName" : "OUTPUT", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + } + }, + "queryPlan" : { + "sources" : [ "INPUT" ], + "sink" : "OUTPUT", + "physicalPlan" : { + "@type" : "streamSinkV1", + "properties" : { + "queryContext" : "OUTPUT" + }, + "source" : { + "@type" : "streamSelectV1", + "properties" : { + "queryContext" : "Project" + }, + "source" : { + "@type" : "streamSourceV1", + "properties" : { + "queryContext" : "KsqlTopic/Source" + }, + "topicName" : "test", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "sourceSchema" : "`ROWKEY` STRING KEY, `IGNORED` STRING" + }, + "selectExpressions" : [ "STRUCT(F1:=CAST(null AS INTEGER)) AS S" ] + }, + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "topicName" : "OUTPUT" + }, + "queryId" : "CSAS_OUTPUT_0" + } + } ], + "configs" : { + "ksql.extension.dir" : "ext", + "ksql.streams.cache.max.bytes.buffering" : "0", + "ksql.security.extension.class" : null, + "ksql.transient.prefix" : "transient_", + "ksql.persistence.wrap.single.values" : "true", + "ksql.authorization.cache.expiry.time.secs" : "30", + "ksql.schema.registry.url" : "", + "ksql.streams.default.deserialization.exception.handler" : "io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler", + "ksql.output.topic.name.prefix" : "", + "ksql.streams.auto.offset.reset" : "earliest", + "ksql.query.pull.enable.standby.reads" : "false", + "ksql.connect.url" : "http://localhost:8083", + "ksql.service.id" : "some.ksql.service.id", + "ksql.internal.topic.min.insync.replicas" : "1", + "ksql.streams.shutdown.timeout.ms" : "300000", + "ksql.internal.topic.replicas" : "1", + "ksql.insert.into.values.enabled" : "true", + "ksql.query.pull.max.allowed.offset.lag" : "9223372036854775807", + "ksql.query.pull.max.qps" : "2147483647", + "ksql.streams.default.production.exception.handler" : "io.confluent.ksql.errors.ProductionExceptionHandlerUtil$LogAndFailProductionExceptionHandler", + "ksql.access.validator.enable" : "auto", + "ksql.streams.bootstrap.servers" : "localhost:0", + "ksql.streams.commit.interval.ms" : "2000", + "ksql.metric.reporters" : "", + "ksql.query.pull.metrics.enabled" : "false", + "ksql.authentication.plugin.class" : null, + "ksql.streams.auto.commit.interval.ms" : "0", + "ksql.metrics.extension" : null, + "ksql.streams.topology.optimization" : "all", + "ksql.hidden.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.streams.num.stream.threads" : "4", + "ksql.timestamp.throw.on.invalid" : "false", + "ksql.authorization.cache.max.entries" : "10000", + "ksql.metrics.tags.custom" : "", + "ksql.pull.queries.enable" : "true", + "ksql.udfs.enabled" : "true", + "ksql.udf.enable.security.manager" : "true", + "ksql.connect.worker.config" : "", + "ksql.any.key.name.enabled" : "false", + "ksql.sink.window.change.log.additional.retention" : "1000000", + "ksql.readonly.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.udf.collect.metrics" : "false", + "ksql.persistent.prefix" : "query_", + "ksql.query.persistent.active.limit" : "2147483647" + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/create-struct_-_cast_null_values/6.0.0_1586271092094/spec.json b/ksqldb-functional-tests/src/test/resources/historical_plans/create-struct_-_cast_null_values/6.0.0_1586271092094/spec.json new file mode 100644 index 000000000000..1d50f2e4aa11 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/create-struct_-_cast_null_values/6.0.0_1586271092094/spec.json @@ -0,0 +1,29 @@ +{ + "version" : "6.0.0", + "timestamp" : 1586271092094, + "schemas" : { + "CSAS_OUTPUT_0.KsqlTopic.Source" : "STRUCT NOT NULL", + "CSAS_OUTPUT_0.OUTPUT" : "STRUCT> NOT NULL" + }, + "inputs" : [ { + "topic" : "test", + "key" : "", + "value" : { } + } ], + "outputs" : [ { + "topic" : "OUTPUT", + "key" : "", + "value" : { + "S" : { + "F1" : null + } + } + } ], + "postConditions" : { + "sources" : [ { + "name" : "OUTPUT", + "type" : "stream", + "schema" : "ROWKEY STRING KEY, S STRUCT" + } ] + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/create-struct_-_cast_null_values/6.0.0_1586271092094/topology b/ksqldb-functional-tests/src/test/resources/historical_plans/create-struct_-_cast_null_values/6.0.0_1586271092094/topology new file mode 100644 index 000000000000..9396b997ae94 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/create-struct_-_cast_null_values/6.0.0_1586271092094/topology @@ -0,0 +1,13 @@ +Topologies: + Sub-topology: 0 + Source: KSTREAM-SOURCE-0000000000 (topics: [test]) + --> KSTREAM-TRANSFORMVALUES-0000000001 + Processor: KSTREAM-TRANSFORMVALUES-0000000001 (stores: []) + --> Project + <-- KSTREAM-SOURCE-0000000000 + Processor: Project (stores: []) + --> KSTREAM-SINK-0000000003 + <-- KSTREAM-TRANSFORMVALUES-0000000001 + Sink: KSTREAM-SINK-0000000003 (topic: OUTPUT) + <-- Project + diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/create_map_-_create_map_from_named_tuples_and_null_string_key/6.0.0_1586271091959/plan.json b/ksqldb-functional-tests/src/test/resources/historical_plans/create_map_-_create_map_from_named_tuples_and_null_string_key/6.0.0_1586271091959/plan.json new file mode 100644 index 000000000000..da9b8025f241 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/create_map_-_create_map_from_named_tuples_and_null_string_key/6.0.0_1586271091959/plan.json @@ -0,0 +1,126 @@ +{ + "plan" : [ { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM TEST (K1 STRING, K2 STRING, V1 INTEGER) WITH (KAFKA_TOPIC='test_topic', VALUE_FORMAT='JSON');", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "TEST", + "schema" : "`ROWKEY` STRING KEY, `K1` STRING, `K2` STRING, `V1` INTEGER", + "topicName" : "test_topic", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + } + } + }, { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM OUTPUT AS SELECT MAP(CAST(null AS STRING):=TEST.V1) M\nFROM TEST TEST\nEMIT CHANGES", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "OUTPUT", + "schema" : "`ROWKEY` STRING KEY, `M` MAP", + "topicName" : "OUTPUT", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + } + }, + "queryPlan" : { + "sources" : [ "TEST" ], + "sink" : "OUTPUT", + "physicalPlan" : { + "@type" : "streamSinkV1", + "properties" : { + "queryContext" : "OUTPUT" + }, + "source" : { + "@type" : "streamSelectV1", + "properties" : { + "queryContext" : "Project" + }, + "source" : { + "@type" : "streamSourceV1", + "properties" : { + "queryContext" : "KsqlTopic/Source" + }, + "topicName" : "test_topic", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "sourceSchema" : "`ROWKEY` STRING KEY, `K1` STRING, `K2` STRING, `V1` INTEGER" + }, + "selectExpressions" : [ "MAP(CAST(null AS STRING):=V1) AS M" ] + }, + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "topicName" : "OUTPUT" + }, + "queryId" : "CSAS_OUTPUT_0" + } + } ], + "configs" : { + "ksql.extension.dir" : "ext", + "ksql.streams.cache.max.bytes.buffering" : "0", + "ksql.security.extension.class" : null, + "ksql.transient.prefix" : "transient_", + "ksql.persistence.wrap.single.values" : "true", + "ksql.authorization.cache.expiry.time.secs" : "30", + "ksql.schema.registry.url" : "", + "ksql.streams.default.deserialization.exception.handler" : "io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler", + "ksql.output.topic.name.prefix" : "", + "ksql.streams.auto.offset.reset" : "earliest", + "ksql.query.pull.enable.standby.reads" : "false", + "ksql.connect.url" : "http://localhost:8083", + "ksql.service.id" : "some.ksql.service.id", + "ksql.internal.topic.min.insync.replicas" : "1", + "ksql.streams.shutdown.timeout.ms" : "300000", + "ksql.internal.topic.replicas" : "1", + "ksql.insert.into.values.enabled" : "true", + "ksql.query.pull.max.allowed.offset.lag" : "9223372036854775807", + "ksql.query.pull.max.qps" : "2147483647", + "ksql.streams.default.production.exception.handler" : "io.confluent.ksql.errors.ProductionExceptionHandlerUtil$LogAndFailProductionExceptionHandler", + "ksql.access.validator.enable" : "auto", + "ksql.streams.bootstrap.servers" : "localhost:0", + "ksql.streams.commit.interval.ms" : "2000", + "ksql.metric.reporters" : "", + "ksql.query.pull.metrics.enabled" : "false", + "ksql.authentication.plugin.class" : null, + "ksql.streams.auto.commit.interval.ms" : "0", + "ksql.metrics.extension" : null, + "ksql.streams.topology.optimization" : "all", + "ksql.hidden.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.streams.num.stream.threads" : "4", + "ksql.timestamp.throw.on.invalid" : "false", + "ksql.authorization.cache.max.entries" : "10000", + "ksql.metrics.tags.custom" : "", + "ksql.pull.queries.enable" : "true", + "ksql.udfs.enabled" : "true", + "ksql.udf.enable.security.manager" : "true", + "ksql.connect.worker.config" : "", + "ksql.any.key.name.enabled" : "false", + "ksql.sink.window.change.log.additional.retention" : "1000000", + "ksql.readonly.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.udf.collect.metrics" : "false", + "ksql.persistent.prefix" : "query_", + "ksql.query.persistent.active.limit" : "2147483647" + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/create_map_-_create_map_from_named_tuples_and_null_string_key/6.0.0_1586271091959/spec.json b/ksqldb-functional-tests/src/test/resources/historical_plans/create_map_-_create_map_from_named_tuples_and_null_string_key/6.0.0_1586271091959/spec.json new file mode 100644 index 000000000000..31323214648a --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/create_map_-_create_map_from_named_tuples_and_null_string_key/6.0.0_1586271091959/spec.json @@ -0,0 +1,26 @@ +{ + "version" : "6.0.0", + "timestamp" : 1586271091959, + "schemas" : { + "CSAS_OUTPUT_0.KsqlTopic.Source" : "STRUCT NOT NULL", + "CSAS_OUTPUT_0.OUTPUT" : "STRUCT> NOT NULL" + }, + "inputs" : [ { + "topic" : "test_topic", + "key" : "", + "value" : { + "k1" : "foo", + "k2" : "bar", + "v1" : 10 + } + } ], + "outputs" : [ { + "topic" : "OUTPUT", + "key" : "", + "value" : { + "M" : { + "null" : 10 + } + } + } ] +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/create_map_-_create_map_from_named_tuples_and_null_string_key/6.0.0_1586271091959/topology b/ksqldb-functional-tests/src/test/resources/historical_plans/create_map_-_create_map_from_named_tuples_and_null_string_key/6.0.0_1586271091959/topology new file mode 100644 index 000000000000..441a8f282644 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/create_map_-_create_map_from_named_tuples_and_null_string_key/6.0.0_1586271091959/topology @@ -0,0 +1,13 @@ +Topologies: + Sub-topology: 0 + Source: KSTREAM-SOURCE-0000000000 (topics: [test_topic]) + --> KSTREAM-TRANSFORMVALUES-0000000001 + Processor: KSTREAM-TRANSFORMVALUES-0000000001 (stores: []) + --> Project + <-- KSTREAM-SOURCE-0000000000 + Processor: Project (stores: []) + --> KSTREAM-SINK-0000000003 + <-- KSTREAM-TRANSFORMVALUES-0000000001 + Sink: KSTREAM-SINK-0000000003 (topic: OUTPUT) + <-- Project + diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/create_map_-_create_map_from_named_tuples_and_some_values/6.0.0_1586271091940/plan.json b/ksqldb-functional-tests/src/test/resources/historical_plans/create_map_-_create_map_from_named_tuples_and_some_values/6.0.0_1586271091940/plan.json new file mode 100644 index 000000000000..ded5fb9bae30 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/create_map_-_create_map_from_named_tuples_and_some_values/6.0.0_1586271091940/plan.json @@ -0,0 +1,126 @@ +{ + "plan" : [ { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM TEST (K1 STRING, K2 STRING, V1 INTEGER) WITH (KAFKA_TOPIC='test_topic', VALUE_FORMAT='JSON');", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "TEST", + "schema" : "`ROWKEY` STRING KEY, `K1` STRING, `K2` STRING, `V1` INTEGER", + "topicName" : "test_topic", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + } + } + }, { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM OUTPUT AS SELECT MAP(TEST.K1:=TEST.V1, TEST.K2:=null) M\nFROM TEST TEST\nEMIT CHANGES", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "OUTPUT", + "schema" : "`ROWKEY` STRING KEY, `M` MAP", + "topicName" : "OUTPUT", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + } + }, + "queryPlan" : { + "sources" : [ "TEST" ], + "sink" : "OUTPUT", + "physicalPlan" : { + "@type" : "streamSinkV1", + "properties" : { + "queryContext" : "OUTPUT" + }, + "source" : { + "@type" : "streamSelectV1", + "properties" : { + "queryContext" : "Project" + }, + "source" : { + "@type" : "streamSourceV1", + "properties" : { + "queryContext" : "KsqlTopic/Source" + }, + "topicName" : "test_topic", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "sourceSchema" : "`ROWKEY` STRING KEY, `K1` STRING, `K2` STRING, `V1` INTEGER" + }, + "selectExpressions" : [ "MAP(K1:=V1, K2:=null) AS M" ] + }, + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "topicName" : "OUTPUT" + }, + "queryId" : "CSAS_OUTPUT_0" + } + } ], + "configs" : { + "ksql.extension.dir" : "ext", + "ksql.streams.cache.max.bytes.buffering" : "0", + "ksql.security.extension.class" : null, + "ksql.transient.prefix" : "transient_", + "ksql.persistence.wrap.single.values" : "true", + "ksql.authorization.cache.expiry.time.secs" : "30", + "ksql.schema.registry.url" : "", + "ksql.streams.default.deserialization.exception.handler" : "io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler", + "ksql.output.topic.name.prefix" : "", + "ksql.streams.auto.offset.reset" : "earliest", + "ksql.query.pull.enable.standby.reads" : "false", + "ksql.connect.url" : "http://localhost:8083", + "ksql.service.id" : "some.ksql.service.id", + "ksql.internal.topic.min.insync.replicas" : "1", + "ksql.streams.shutdown.timeout.ms" : "300000", + "ksql.internal.topic.replicas" : "1", + "ksql.insert.into.values.enabled" : "true", + "ksql.query.pull.max.allowed.offset.lag" : "9223372036854775807", + "ksql.query.pull.max.qps" : "2147483647", + "ksql.streams.default.production.exception.handler" : "io.confluent.ksql.errors.ProductionExceptionHandlerUtil$LogAndFailProductionExceptionHandler", + "ksql.access.validator.enable" : "auto", + "ksql.streams.bootstrap.servers" : "localhost:0", + "ksql.streams.commit.interval.ms" : "2000", + "ksql.metric.reporters" : "", + "ksql.query.pull.metrics.enabled" : "false", + "ksql.authentication.plugin.class" : null, + "ksql.streams.auto.commit.interval.ms" : "0", + "ksql.metrics.extension" : null, + "ksql.streams.topology.optimization" : "all", + "ksql.hidden.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.streams.num.stream.threads" : "4", + "ksql.timestamp.throw.on.invalid" : "false", + "ksql.authorization.cache.max.entries" : "10000", + "ksql.metrics.tags.custom" : "", + "ksql.pull.queries.enable" : "true", + "ksql.udfs.enabled" : "true", + "ksql.udf.enable.security.manager" : "true", + "ksql.connect.worker.config" : "", + "ksql.any.key.name.enabled" : "false", + "ksql.sink.window.change.log.additional.retention" : "1000000", + "ksql.readonly.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.udf.collect.metrics" : "false", + "ksql.persistent.prefix" : "query_", + "ksql.query.persistent.active.limit" : "2147483647" + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/create_map_-_create_map_from_named_tuples_and_some_values/6.0.0_1586271091940/spec.json b/ksqldb-functional-tests/src/test/resources/historical_plans/create_map_-_create_map_from_named_tuples_and_some_values/6.0.0_1586271091940/spec.json new file mode 100644 index 000000000000..0bedc5f59ef5 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/create_map_-_create_map_from_named_tuples_and_some_values/6.0.0_1586271091940/spec.json @@ -0,0 +1,27 @@ +{ + "version" : "6.0.0", + "timestamp" : 1586271091940, + "schemas" : { + "CSAS_OUTPUT_0.KsqlTopic.Source" : "STRUCT NOT NULL", + "CSAS_OUTPUT_0.OUTPUT" : "STRUCT> NOT NULL" + }, + "inputs" : [ { + "topic" : "test_topic", + "key" : "", + "value" : { + "k1" : "foo", + "k2" : "bar", + "v1" : 10 + } + } ], + "outputs" : [ { + "topic" : "OUTPUT", + "key" : "", + "value" : { + "M" : { + "foo" : 10, + "bar" : null + } + } + } ] +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/create_map_-_create_map_from_named_tuples_and_some_values/6.0.0_1586271091940/topology b/ksqldb-functional-tests/src/test/resources/historical_plans/create_map_-_create_map_from_named_tuples_and_some_values/6.0.0_1586271091940/topology new file mode 100644 index 000000000000..441a8f282644 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/create_map_-_create_map_from_named_tuples_and_some_values/6.0.0_1586271091940/topology @@ -0,0 +1,13 @@ +Topologies: + Sub-topology: 0 + Source: KSTREAM-SOURCE-0000000000 (topics: [test_topic]) + --> KSTREAM-TRANSFORMVALUES-0000000001 + Processor: KSTREAM-TRANSFORMVALUES-0000000001 (stores: []) + --> Project + <-- KSTREAM-SOURCE-0000000000 + Processor: Project (stores: []) + --> KSTREAM-SINK-0000000003 + <-- KSTREAM-TRANSFORMVALUES-0000000001 + Sink: KSTREAM-SINK-0000000003 (topic: OUTPUT) + <-- Project + diff --git a/ksqldb-functional-tests/src/test/resources/query-validation-tests/cast.json b/ksqldb-functional-tests/src/test/resources/query-validation-tests/cast.json index ba29c8e451a9..4fd1e560b606 100644 --- a/ksqldb-functional-tests/src/test/resources/query-validation-tests/cast.json +++ b/ksqldb-functional-tests/src/test/resources/query-validation-tests/cast.json @@ -4,36 +4,119 @@ ], "tests": [ { - "name": "to array", + "name": "non-array to array", "statements": [ - "CREATE STREAM TEST (f0 VARCHAR) WITH (kafka_topic='test_topic', value_format='DELIMITED');", + "CREATE STREAM TEST (f0 VARCHAR) WITH (kafka_topic='test_topic', value_format='JSON');", "CREATE STREAM OUTPUT AS SELECT cast(f0 as ARRAY) FROM TEST;" ], "expectedException": { "type": "io.confluent.ksql.util.KsqlStatementException", - "message": "Only casts to primitive types or decimals are supported: ARRAY" + "message": "Invalid Select: Cast of STRING to ARRAY is not supported" } }, { - "name": "to map", + "name": "array to array with same schema", "statements": [ - "CREATE STREAM TEST (f0 VARCHAR) WITH (kafka_topic='test_topic', value_format='DELIMITED');", + "CREATE STREAM TEST (f0 ARRAY) WITH (kafka_topic='test_topic', value_format='JSON');", + "CREATE STREAM OUTPUT AS SELECT cast(f0 as ARRAY) FROM TEST;" + ], + "inputs": [ + {"topic": "test_topic", "value": {"f0": [1, 3]}} + ], + "outputs": [ + {"topic": "OUTPUT", "value": {"KSQL_COL_0": [1, 3]}} + ] + }, + { + "name": "array to array with different element type", + "statements": [ + "CREATE STREAM TEST (f0 ARRAY) WITH (kafka_topic='test_topic', value_format='JSON');", + "CREATE STREAM OUTPUT AS SELECT cast(f0 as ARRAY) FROM TEST;" + ], + "expectedException": { + "type": "io.confluent.ksql.util.KsqlStatementException", + "message": "Invalid Select: Cast of ARRAY to ARRAY is not supported" + } + }, + { + "name": "non-map to map", + "statements": [ + "CREATE STREAM TEST (f0 VARCHAR) WITH (kafka_topic='test_topic', value_format='JSON');", "CREATE STREAM OUTPUT AS SELECT cast(f0 as MAP) FROM TEST;" ], "expectedException": { "type": "io.confluent.ksql.util.KsqlStatementException", - "message": "Only casts to primitive types or decimals are supported: MAP" + "message": "Invalid Select: Cast of STRING to MAP is not supported" + } + }, + { + "name": "map to map with same schema", + "statements": [ + "CREATE STREAM TEST (f0 MAP) WITH (kafka_topic='test_topic', value_format='JSON');", + "CREATE STREAM OUTPUT AS SELECT cast(f0 as MAP) FROM TEST;" + ], + "inputs": [ + {"topic": "test_topic", "value": {"f0": {"this": 1}}} + ], + "outputs": [ + {"topic": "OUTPUT", "value": {"KSQL_COL_0": {"this": 1}}} + ] + }, + { + "name": "map to map with different value type", + "statements": [ + "CREATE STREAM TEST (f0 MAP) WITH (kafka_topic='test_topic', value_format='JSON');", + "CREATE STREAM OUTPUT AS SELECT cast(f0 as MAP) FROM TEST;" + ], + "expectedException": { + "type": "io.confluent.ksql.util.KsqlStatementException", + "message": "Invalid Select: Cast of MAP to MAP is not supported" } }, { - "name": "to struct", + "name": "non-struct to struct", "statements": [ - "CREATE STREAM TEST (f0 VARCHAR) WITH (kafka_topic='test_topic', value_format='DELIMITED');", + "CREATE STREAM TEST (f0 VARCHAR) WITH (kafka_topic='test_topic', value_format='JSON');", "CREATE STREAM OUTPUT AS SELECT cast(f0 as STRUCT) FROM TEST;" ], "expectedException": { "type": "io.confluent.ksql.util.KsqlStatementException", - "message": "Only casts to primitive types or decimals are supported: STRUCT<`F0` STRING, `F1` INTEGER>" + "message": "Invalid Select: Cast of STRING to STRUCT<`F0` STRING, `F1` INTEGER> is not supported" + } + }, + { + "name": "struct to struct with same schema", + "statements": [ + "CREATE STREAM TEST (f0 STRUCT) WITH (kafka_topic='test_topic', value_format='JSON');", + "CREATE STREAM OUTPUT AS SELECT cast(f0 as STRUCT) FROM TEST;" + ], + "inputs": [ + {"topic": "test_topic", "value": {"f0": {"f0": 1}}} + ], + "outputs": [ + {"topic": "OUTPUT", "value": {"KSQL_COL_0": {"F0": 1}}} + ] + }, + { + "name": "struct to struct with different schema", + "statements": [ + "CREATE STREAM TEST (f0 STRUCT) WITH (kafka_topic='test_topic', value_format='JSON');", + "CREATE STREAM OUTPUT AS SELECT cast(f0 as STRUCT) FROM TEST;" + ], + "expectedException": { + "type": "io.confluent.ksql.util.KsqlStatementException", + "message": "Invalid Select: Cast of STRUCT<`F0` BIGINT> to STRUCT<`F0` INTEGER> is not supported" + } + }, + { + "name": "to null", + "statements": [ + "CREATE STREAM TEST (f0 VARCHAR) WITH (kafka_topic='test_topic', value_format='JSON');", + "CREATE STREAM OUTPUT AS SELECT cast(f0 as NULL) FROM TEST;" + ], + "expectedException": { + "type": "io.confluent.ksql.parser.exception.ParseFailedException", + "message": "mismatched input 'NULL'" } }, { @@ -52,14 +135,14 @@ { "name": "of nulls", "statements": [ - "CREATE STREAM TEST (ignored VARCHAR) WITH (kafka_topic='test_topic', value_format='DELIMITED');", - "CREATE STREAM OUTPUT AS SELECT cast(null as BOOLEAN), cast(null as INT), cast(null as BIGINT), cast(null as DOUBLE), cast(null as STRING) FROM TEST;" + "CREATE STREAM TEST (ignored VARCHAR) WITH (kafka_topic='test_topic', value_format='JSON');", + "CREATE STREAM OUTPUT AS SELECT cast(null as BOOLEAN), cast(null as INT), cast(null as BIGINT), cast(null as DOUBLE), cast(null as STRING), cast(null AS ARRAY), cast(null AS MAP), cast(null AS STRUCT) FROM TEST;" ], "inputs": [ - {"topic": "test_topic", "value": "-"} + {"topic": "test_topic", "value": {}} ], "outputs": [ - {"topic": "OUTPUT", "value": ",,,,"} + {"topic": "OUTPUT", "value": {"KQL_COL_0": null, "KQL_COL_1": null, "KQL_COL_2": null, "KQL_COL_3": null, "KQL_COL_4": null, "KQL_COL_5": null, "KQL_COL_6": null, "KQL_COL_7": null}} ] }, { diff --git a/ksqldb-functional-tests/src/test/resources/query-validation-tests/create-struct.json b/ksqldb-functional-tests/src/test/resources/query-validation-tests/create-struct.json index 46f1f8b5f8af..1a6738cfb2ad 100644 --- a/ksqldb-functional-tests/src/test/resources/query-validation-tests/create-struct.json +++ b/ksqldb-functional-tests/src/test/resources/query-validation-tests/create-struct.json @@ -85,6 +85,24 @@ "type": "io.confluent.ksql.util.KsqlException", "message": "Duplicate field names found in STRUCT" } + }, + { + "name": "cast null values", + "statements": [ + "CREATE STREAM INPUT (ignored STRING) WITH (kafka_topic='test', value_format='JSON');", + "CREATE STREAM OUTPUT AS SELECT STRUCT(F1 := CAST(NULL AS INT)) AS s FROM INPUT;" + ], + "inputs": [ + { "topic": "test", "value": {}} + ], + "outputs": [ + { "topic": "OUTPUT", "value": {"S": {"F1": null}}} + ], + "post": { + "sources": [ + {"name": "OUTPUT", "type": "stream", "schema": "ROWKEY STRING KEY, S STRUCT"} + ] + } } ] } \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/query-validation-tests/create_map.json b/ksqldb-functional-tests/src/test/resources/query-validation-tests/create_map.json index 71b7f5a398d5..6037a051b3c6 100644 --- a/ksqldb-functional-tests/src/test/resources/query-validation-tests/create_map.json +++ b/ksqldb-functional-tests/src/test/resources/query-validation-tests/create_map.json @@ -42,21 +42,58 @@ "CREATE STREAM OUTPUT AS SELECT MAP(k1:=v1, k2:='hello') as M FROM TEST;" ], "expectedException": { - "type": "io.confluent.ksql.util.KsqlException", + "type": "io.confluent.ksql.util.KsqlStatementException", "message": "Cannot construct a map with mismatching value types ([INTEGER, STRING]) from expression MAP(K1:=V1, K2:='hello')." } }, { - "name": "create map from named tuples null values", + "name": "create map from named tuples all null values", + "statements": [ + "CREATE STREAM TEST (k1 VARCHAR, k2 VARCHAR, v1 INT) WITH (kafka_topic='test_topic', value_format='JSON');", + "CREATE STREAM OUTPUT AS SELECT MAP(k1:=NULL, k2:=NULL) as M FROM TEST;" + ], + "expectedException": { + "type": "io.confluent.ksql.util.KsqlStatementException", + "message": "Cannot construct a map with all NULL values" + } + }, + { + "name": "create map from named tuples and some values", "statements": [ "CREATE STREAM TEST (k1 VARCHAR, k2 VARCHAR, v1 INT) WITH (kafka_topic='test_topic', value_format='JSON');", "CREATE STREAM OUTPUT AS SELECT MAP(k1:=v1, k2:=NULL) as M FROM TEST;" ], + "inputs": [ + {"topic": "test_topic", "value": {"k1": "foo", "k2": "bar", "v1": 10}} + ], + "outputs": [ + {"topic": "OUTPUT", "value": {"M": {"foo": 10, "bar": null}}} + ] + }, + { + "name": "create map from named tuples and null key", + "statements": [ + "CREATE STREAM TEST (k1 VARCHAR, k2 VARCHAR, v1 INT) WITH (kafka_topic='test_topic', value_format='JSON');", + "CREATE STREAM OUTPUT AS SELECT MAP(k1:=v1, NULL:=v1) as M FROM TEST;" + ], "expectedException": { - "type": "io.confluent.ksql.util.KsqlException", - "message": "Cannot construct a map with mismatching value types ([INTEGER, null]) from expression MAP(K1:=V1, K2:=null)." + "type": "io.confluent.ksql.util.KsqlStatementException", + "message": "Only STRING keys are supported in maps but got: [STRING, NULL]" } }, + { + "name": "create map from named tuples and null string key", + "statements": [ + "CREATE STREAM TEST (k1 VARCHAR, k2 VARCHAR, v1 INT) WITH (kafka_topic='test_topic', value_format='JSON');", + "CREATE STREAM OUTPUT AS SELECT MAP(CAST(NULL AS STRING) := v1) as M FROM TEST;" + ], + "inputs": [ + {"topic": "test_topic", "value": {"k1": "foo", "k2": "bar", "v1": 10}} + ], + "outputs": [ + {"topic": "OUTPUT", "value": {"M": {"null": 10}}} + ] + }, { "name": "create empty map", "statements": [ @@ -64,7 +101,7 @@ "CREATE STREAM OUTPUT AS SELECT MAP() as M FROM TEST;" ], "expectedException": { - "type": "io.confluent.ksql.util.KsqlException", + "type": "io.confluent.ksql.util.KsqlStatementException", "message": "Map constructor cannot be empty. Please supply at least one key value pair (see https://github.com/confluentinc/ksql/issues/4239)." } } diff --git a/ksqldb-functional-tests/src/test/resources/query-validation-tests/null.json b/ksqldb-functional-tests/src/test/resources/query-validation-tests/null.json index 64c2022f0c63..fe8882bfe719 100644 --- a/ksqldb-functional-tests/src/test/resources/query-validation-tests/null.json +++ b/ksqldb-functional-tests/src/test/resources/query-validation-tests/null.json @@ -134,6 +134,46 @@ {"topic": "OUTPUT", "key": null, "value": null}, {"topic": "OUTPUT", "key": 2, "value": {"A": 2, "B": "not null", "C": [4, 5, 6]}} ] + }, + { + "name": "NULL column", + "statements": [ + "CREATE STREAM INPUT (ROWKEY INT KEY, COL0 NULL) WITH (kafka_topic='test_topic', value_format='JSON');" + ], + "expectedException": { + "type": "io.confluent.ksql.parser.exception.ParseFailedException", + "message": "mismatched input 'NULL'" + } + }, + { + "name": "NULL element type", + "statements": [ + "CREATE STREAM INPUT (ROWKEY INT KEY, COL0 ARRAY) WITH (kafka_topic='test_topic', value_format='JSON');" + ], + "expectedException": { + "type": "io.confluent.ksql.parser.exception.ParseFailedException", + "message": "mismatched input 'NULL'" + } + }, + { + "name": "NULL value type", + "statements": [ + "CREATE STREAM INPUT (ROWKEY INT KEY, COL0 MAP) WITH (kafka_topic='test_topic', value_format='JSON');" + ], + "expectedException": { + "type": "io.confluent.ksql.parser.exception.ParseFailedException", + "message": "mismatched input 'NULL'" + } + }, + { + "name": "NULL field type", + "statements": [ + "CREATE STREAM INPUT (ROWKEY INT KEY, COL0 STRUCT) WITH (kafka_topic='test_topic', value_format='JSON');" + ], + "expectedException": { + "type": "io.confluent.ksql.parser.exception.ParseFailedException", + "message": "mismatched input 'NULL'" + } } ] } \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/rest-query-validation-tests/insert-values.json b/ksqldb-functional-tests/src/test/resources/rest-query-validation-tests/insert-values.json index bea23532e6fd..52671258c3fe 100644 --- a/ksqldb-functional-tests/src/test/resources/rest-query-validation-tests/insert-values.json +++ b/ksqldb-functional-tests/src/test/resources/rest-query-validation-tests/insert-values.json @@ -704,6 +704,66 @@ "outputs": [ {"topic": "test_topic", "key": null, "value": {"VAL": {"FOO": null, "bar": null}}} ] + }, + { + "name": "should insert nulls", + "statements": [ + "CREATE STREAM S (ROWKEY INT KEY, NAME STRING) WITH (kafka_topic='test_topic', value_format='JSON');", + "INSERT INTO S (ROWKEY, NAME) VALUES (NULL, NULL);" + ], + "inputs": [ + ], + "outputs": [ + {"topic": "test_topic", "key": null, "value": {"NAME": null}} + ] + }, + { + "name": "should insert array with null elements", + "statements": [ + "CREATE STREAM S (ROWKEY INT KEY, V0 ARRAY) WITH (kafka_topic='test_topic', value_format='JSON');", + "INSERT INTO S (ROWKEY, V0) VALUES (1, ARRAY[CAST(null AS INT)]);" + ], + "inputs": [ + ], + "outputs": [ + {"topic": "test_topic", "key": 1, "value": {"V0": [null]}} + ] + }, + { + "name": "should insert map with null values", + "statements": [ + "CREATE STREAM S (ROWKEY INT KEY, V0 MAP) WITH (kafka_topic='test_topic', value_format='JSON');", + "INSERT INTO S (ROWKEY, V0) VALUES (1, MAP('k1' := CAST(null AS INT)));" + ], + "inputs": [ + ], + "outputs": [ + {"topic": "test_topic", "key": 1, "value": {"V0": {"k1": null}}} + ] + }, + { + "name": "should insert map with null keys", + "statements": [ + "CREATE STREAM S (ROWKEY INT KEY, V0 MAP) WITH (kafka_topic='test_topic', value_format='JSON');", + "INSERT INTO S (ROWKEY, V0) VALUES (1, MAP(CAST(NULL AS STRING) := 1));" + ], + "inputs": [ + ], + "outputs": [ + {"topic": "test_topic", "key": 1, "value": {"V0": {"null": 1}}} + ] + }, + { + "name": "should insert struct with null values", + "statements": [ + "CREATE STREAM S (ROWKEY INT KEY, V0 STRUCT) WITH (kafka_topic='test_topic', value_format='JSON');", + "INSERT INTO S (ROWKEY, V0) VALUES (1, STRUCT(f0 := CAST(null AS INT)));" + ], + "inputs": [ + ], + "outputs": [ + {"topic": "test_topic", "key": 1, "value": {"V0": {"F0": null}}} + ] } ] } \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/rest-query-validation-tests/pull-queries-against-materialized-aggregates.json b/ksqldb-functional-tests/src/test/resources/rest-query-validation-tests/pull-queries-against-materialized-aggregates.json index d8388b85692f..34605ccd13d2 100644 --- a/ksqldb-functional-tests/src/test/resources/rest-query-validation-tests/pull-queries-against-materialized-aggregates.json +++ b/ksqldb-functional-tests/src/test/resources/rest-query-validation-tests/pull-queries-against-materialized-aggregates.json @@ -1018,6 +1018,32 @@ {"row":{"columns":["10", 12365, 1]}} ]} ] + }, + { + "name": "fail on null key", + "statements": [ + "CREATE STREAM INPUT (ROWKEY INT KEY, IGNORED INT) WITH (kafka_topic='test_topic', value_format='JSON');", + "CREATE TABLE AGGREGATE AS SELECT COUNT(1) AS COUNT FROM INPUT GROUP BY ROWKEY;", + "SELECT * FROM AGGREGATE WHERE ROWKEY=NULL;" + ], + "expectedError": { + "type": "io.confluent.ksql.rest.entity.KsqlStatementErrorMessage", + "message": "Primary key columns can not be NULL: (ROWKEY = null)", + "status": 400 + } + }, + { + "name": "fail on non-literal key", + "statements": [ + "CREATE STREAM INPUT (ROWKEY INT KEY, IGNORED INT) WITH (kafka_topic='test_topic', value_format='JSON');", + "CREATE TABLE AGGREGATE AS SELECT COUNT(1) AS COUNT FROM INPUT GROUP BY ROWKEY;", + "SELECT * FROM AGGREGATE WHERE ROWKEY=CAST(1 AS INT);" + ], + "expectedError": { + "type": "io.confluent.ksql.rest.entity.KsqlStatementErrorMessage", + "message": "Ony comparison to literals is currently supported: (ROWKEY = CAST(1 AS INTEGER))", + "status": 400 + } } ] } \ No newline at end of file diff --git a/ksqldb-parser/src/main/java/io/confluent/ksql/schema/ksql/DefaultSqlValueCoercer.java b/ksqldb-parser/src/main/java/io/confluent/ksql/schema/ksql/DefaultSqlValueCoercer.java index a55f3a9edf75..d2b4a8cda114 100644 --- a/ksqldb-parser/src/main/java/io/confluent/ksql/schema/ksql/DefaultSqlValueCoercer.java +++ b/ksqldb-parser/src/main/java/io/confluent/ksql/schema/ksql/DefaultSqlValueCoercer.java @@ -15,7 +15,6 @@ package io.confluent.ksql.schema.ksql; -import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import io.confluent.ksql.schema.ksql.types.SqlArray; import io.confluent.ksql.schema.ksql.types.SqlDecimal; @@ -25,6 +24,7 @@ import io.confluent.ksql.schema.ksql.types.SqlTypes; import io.confluent.ksql.util.DecimalUtil; import java.math.BigDecimal; +import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -37,28 +37,33 @@ public enum DefaultSqlValueCoercer implements SqlValueCoercer { INSTANCE; - private static final Map>> UPCASTER = - ImmutableMap.>>builder() - .put(SqlBaseType.INTEGER, (num, type) -> Optional.of(num.intValue())) - .put(SqlBaseType.BIGINT, (num, type) -> Optional.of(num.longValue())) - .put(SqlBaseType.DOUBLE, (num, type) -> Optional.of(num.doubleValue())) + private static final Map> UPCASTER = + ImmutableMap.>builder() + .put(SqlBaseType.INTEGER, (num, type) -> Result.of(num.intValue())) + .put(SqlBaseType.BIGINT, (num, type) -> Result.of(num.longValue())) + .put(SqlBaseType.DOUBLE, (num, type) -> Result.of(num.doubleValue())) .put(SqlBaseType.DECIMAL, (num, type) -> { try { - return Optional.ofNullable( + return Result.of( DecimalUtil.ensureFit( new BigDecimal(String.format("%s", num)), (SqlDecimal) type)); } catch (final Exception e) { - return Optional.empty(); + return Result.failure(); } }).build(); @Override - public Optional coerce(final Object value, final SqlType targetType) { + public Result coerce(final Object value, final SqlType targetType) { return doCoerce(value, targetType); } - private static Optional doCoerce(final Object value, final SqlType targetType) { + private static Result doCoerce(final Object value, final SqlType targetType) { + if (value == null) { + // NULL can be cast to any type: + return Result.nullResult(); + } + switch (targetType.baseType()) { case ARRAY: return coerceArray(value, (SqlArray) targetType); @@ -67,26 +72,29 @@ private static Optional doCoerce(final Object value, final SqlType targetType case STRUCT: return coerceStruct(value, (SqlStruct) targetType); default: - break; + return coerceOther(value, targetType); } + } + private static Result coerceOther(final Object value, final SqlType targetType) { final SqlBaseType valueSqlType = SchemaConverters.javaToSqlConverter() .toSqlType(value.getClass()); if (valueSqlType.equals(targetType.baseType())) { - return Optional.of(value); + return Result.of(value); } if (!(value instanceof Number) || !valueSqlType.canImplicitlyCast(targetType.baseType())) { - return Optional.empty(); + return Result.failure(); } - return UPCASTER.get(targetType.baseType()).apply((Number) value, targetType); + return UPCASTER.get(targetType.baseType()) + .apply((Number) value, targetType); } - private static Optional coerceStruct(final Object value, final SqlStruct targetType) { + private static Result coerceStruct(final Object value, final SqlStruct targetType) { if (!(value instanceof Struct)) { - return Optional.empty(); + return Result.failure(); } final Struct struct = (Struct) value; @@ -101,53 +109,61 @@ private static Optional coerceStruct(final Object value, final SqlStruct targ if (!sqlField.isPresent()) { // if there was a field in the struct that wasn't in the schema // we cannot coerce - return Optional.empty(); - } else if (struct.schema().field(field.name()) == null) { + return Result.failure(); + } + + if (struct.schema().field(field.name()) == null) { // if we cannot find the field in the struct, we can ignore it continue; } - final Optional val = doCoerce(struct.get(field), sqlField.get().type()); - val.ifPresent(v -> coerced.put(field.name(), v)); + final Result val = doCoerce(struct.get(field), sqlField.get().type()); + if (val.failed()) { + return Result.failure(); + } + + val.value().ifPresent(v -> coerced.put(field.name(), v)); } - return Optional.of(coerced); + return Result.of(coerced); } - private static Optional coerceArray(final Object value, final SqlArray targetType) { + private static Result coerceArray(final Object value, final SqlArray targetType) { if (!(value instanceof List)) { - return Optional.empty(); + return Result.failure(); } final List list = (List) value; - final ImmutableList.Builder coerced = ImmutableList.builder(); + final List coerced = new ArrayList<>(list.size()); for (final Object el : list) { - final Optional coercedEl = doCoerce(el, targetType.getItemType()); - if (!coercedEl.isPresent()) { - return Optional.empty(); + final Result result = doCoerce(el, targetType.getItemType()); + if (result.failed()) { + return Result.failure(); } - coerced.add(coercedEl.get()); + + coerced.add(result.value().orElse(null)); } - return Optional.of(coerced.build()); + return Result.of(coerced); } - private static Optional coerceMap(final Object value, final SqlMap targetType) { + private static Result coerceMap(final Object value, final SqlMap targetType) { if (!(value instanceof Map)) { - return Optional.empty(); + return Result.failure(); } final Map map = (Map) value; final HashMap coerced = new HashMap<>(); - for (final Map.Entry entry : map.entrySet()) { - final Optional coercedKey = doCoerce(entry.getKey(), SqlTypes.STRING); - final Optional coercedValue = doCoerce(entry.getValue(), targetType.getValueType()); - if (!coercedKey.isPresent() || !coercedValue.isPresent()) { - return Optional.empty(); + for (final Map.Entry entry : map.entrySet()) { + final Result coercedKey = doCoerce(entry.getKey(), SqlTypes.STRING); + final Result coercedValue = doCoerce(entry.getValue(), targetType.getValueType()); + if (coercedKey.failed() || coercedValue.failed()) { + return Result.failure(); } - coerced.put(coercedKey.get(), coercedValue.get()); + + coerced.put(coercedKey.value().orElse(null), coercedValue.value().orElse(null)); } - return Optional.of(coerced); + return Result.of(coerced); } } diff --git a/ksqldb-parser/src/main/java/io/confluent/ksql/schema/ksql/SqlValueCoercer.java b/ksqldb-parser/src/main/java/io/confluent/ksql/schema/ksql/SqlValueCoercer.java index 221c5467edee..8a89fde9b6cc 100644 --- a/ksqldb-parser/src/main/java/io/confluent/ksql/schema/ksql/SqlValueCoercer.java +++ b/ksqldb-parser/src/main/java/io/confluent/ksql/schema/ksql/SqlValueCoercer.java @@ -16,7 +16,9 @@ package io.confluent.ksql.schema.ksql; import io.confluent.ksql.schema.ksql.types.SqlType; +import java.util.Objects; import java.util.Optional; +import java.util.function.Supplier; /** * Coerces values to {@link SqlBaseType SQL types}. @@ -26,11 +28,71 @@ public interface SqlValueCoercer { /** * Coerce the supplied {@code value} to the supplied {@code sqlType}. * - *

Complex SQL types are not supported, (yet). - * * @param value the value to try to coerce. * @param targetSchema the target SQL type. - * @return the coerced value if the value could be coerced, {@link Optional#empty()} otherwise. + * @return the Result of the coercion. */ - Optional coerce(Object value, SqlType targetSchema); + Result coerce(Object value, SqlType targetSchema); + + class Result { + + private final Optional> result; + + public static Result failure() { + return new Result(Optional.empty()); + } + + public static Result nullResult() { + return new Result(Optional.of(Optional.empty())); + } + + public static Result of(final Object result) { + return new Result(Optional.of(Optional.of(result))); + } + + private Result(final Optional> result) { + this.result = result; + } + + public boolean failed() { + return !result.isPresent(); + } + + public Optional value() { + return result.orElseThrow(IllegalStateException::new); + } + + public Optional orElseThrow( + final Supplier exceptionSupplier + ) throws X { + return result.orElseThrow(exceptionSupplier); + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final Result result1 = (Result) o; + return Objects.equals(result, result1.result); + } + + @Override + public int hashCode() { + return Objects.hash(result); + } + + @Override + public String toString() { + return "Result(" + + result + .orElse(Optional.of("FAILED")) + .map(Objects::toString) + .orElse("null") + + ')'; + } + } } diff --git a/ksqldb-parser/src/test/java/io/confluent/ksql/schema/ksql/DefaultSqlValueCoercerTest.java b/ksqldb-parser/src/test/java/io/confluent/ksql/schema/ksql/DefaultSqlValueCoercerTest.java index 62d8bfd1f50e..7814e40c6170 100644 --- a/ksqldb-parser/src/test/java/io/confluent/ksql/schema/ksql/DefaultSqlValueCoercerTest.java +++ b/ksqldb-parser/src/test/java/io/confluent/ksql/schema/ksql/DefaultSqlValueCoercerTest.java @@ -24,14 +24,14 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; +import io.confluent.ksql.schema.ksql.SqlValueCoercer.Result; import io.confluent.ksql.schema.ksql.types.SqlArray; import io.confluent.ksql.schema.ksql.types.SqlMap; import io.confluent.ksql.schema.ksql.types.SqlType; import io.confluent.ksql.schema.ksql.types.SqlTypes; -import io.confluent.ksql.util.DecimalUtil; -import io.confluent.ksql.util.KsqlException; import java.math.BigDecimal; import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Optional; @@ -87,115 +87,127 @@ public void setUp() { coercer = DefaultSqlValueCoercer.INSTANCE; } + @Test + public void shouldCoerceNullToAnything() { + TYPES.values().forEach(type -> + assertThat(type.toString(), coercer.coerce(null, type), is(Result.nullResult()))); + } + @Test public void shouldCoerceToBoolean() { - assertThat(coercer.coerce(true, SqlTypes.BOOLEAN), is(Optional.of(true))); + assertThat(coercer.coerce(true, SqlTypes.BOOLEAN), is(Result.of(true))); } @Test public void shouldNotCoerceToBoolean() { - assertThat(coercer.coerce("true", SqlTypes.BOOLEAN), is(Optional.empty())); - assertThat(coercer.coerce(1, SqlTypes.BOOLEAN), is(Optional.empty())); - assertThat(coercer.coerce(1L, SqlTypes.BOOLEAN), is(Optional.empty())); - assertThat(coercer.coerce(1.0d, SqlTypes.BOOLEAN), is(Optional.empty())); - assertThat(coercer.coerce(new BigDecimal(123), SqlTypes.BOOLEAN), is(Optional.empty())); + assertThat(coercer.coerce("true", SqlTypes.BOOLEAN), is(Result.failure())); + assertThat(coercer.coerce(1, SqlTypes.BOOLEAN), is(Result.failure())); + assertThat(coercer.coerce(1L, SqlTypes.BOOLEAN), is(Result.failure())); + assertThat(coercer.coerce(1.0d, SqlTypes.BOOLEAN), is(Result.failure())); + assertThat(coercer.coerce(new BigDecimal(123), SqlTypes.BOOLEAN), is(Result.failure())); } @Test public void shouldCoerceToInteger() { - assertThat(coercer.coerce(1, SqlTypes.INTEGER), is(Optional.of(1))); + assertThat(coercer.coerce(1, SqlTypes.INTEGER), is(Result.of(1))); } @Test public void shouldNotCoerceToInteger() { - assertThat(coercer.coerce(true, SqlTypes.INTEGER), is(Optional.empty())); - assertThat(coercer.coerce(1L, SqlTypes.INTEGER), is(Optional.empty())); - assertThat(coercer.coerce(1.0d, SqlTypes.INTEGER), is(Optional.empty())); - assertThat(coercer.coerce("1", SqlTypes.INTEGER), is(Optional.empty())); - assertThat(coercer.coerce(new BigDecimal(123), SqlTypes.INTEGER), is(Optional.empty())); + assertThat(coercer.coerce(true, SqlTypes.INTEGER), is(Result.failure())); + assertThat(coercer.coerce(1L, SqlTypes.INTEGER), is(Result.failure())); + assertThat(coercer.coerce(1.0d, SqlTypes.INTEGER), is(Result.failure())); + assertThat(coercer.coerce("1", SqlTypes.INTEGER), is(Result.failure())); + assertThat(coercer.coerce(new BigDecimal(123), SqlTypes.INTEGER), is(Result.failure())); } @Test public void shouldCoerceToBigInt() { - assertThat(coercer.coerce(1, SqlTypes.BIGINT), is(Optional.of(1L))); - assertThat(coercer.coerce(1L, SqlTypes.BIGINT), is(Optional.of(1L))); + assertThat(coercer.coerce(1, SqlTypes.BIGINT), is(Result.of(1L))); + assertThat(coercer.coerce(1L, SqlTypes.BIGINT), is(Result.of(1L))); } @Test public void shouldNotCoerceToBigInt() { - assertThat(coercer.coerce(true, SqlTypes.BIGINT), is(Optional.empty())); - assertThat(coercer.coerce(1.0d, SqlTypes.BIGINT), is(Optional.empty())); - assertThat(coercer.coerce("1", SqlTypes.BIGINT), is(Optional.empty())); - assertThat(coercer.coerce(new BigDecimal(123), SqlTypes.BIGINT), is(Optional.empty())); + assertThat(coercer.coerce(true, SqlTypes.BIGINT), is(Result.failure())); + assertThat(coercer.coerce(1.0d, SqlTypes.BIGINT), is(Result.failure())); + assertThat(coercer.coerce("1", SqlTypes.BIGINT), is(Result.failure())); + assertThat(coercer.coerce(new BigDecimal(123), SqlTypes.BIGINT), is(Result.failure())); } @Test public void shouldCoerceToDecimal() { final SqlType decimalType = SqlTypes.decimal(2, 1); - assertThat(coercer.coerce(1, decimalType), is(Optional.of(new BigDecimal("1.0")))); - assertThat(coercer.coerce(1L, decimalType), is(Optional.of(new BigDecimal("1.0")))); + assertThat(coercer.coerce(1, decimalType), is(Result.of(new BigDecimal("1.0")))); + assertThat(coercer.coerce(1L, decimalType), is(Result.of(new BigDecimal("1.0")))); assertThat(coercer.coerce(new BigDecimal("1.0"), decimalType), - is(Optional.of(new BigDecimal("1.0")))); + is(Result.of(new BigDecimal("1.0")))); } @Test public void shouldNotCoerceToDecimal() { final SqlType decimalType = SqlTypes.decimal(2, 1); - assertThat(coercer.coerce(true, decimalType), is(Optional.empty())); - assertThat(coercer.coerce("1.0", decimalType), is(Optional.empty())); - assertThat(coercer.coerce(1.0d, decimalType), is(Optional.empty())); - assertThat(coercer.coerce(1234L, decimalType), is(Optional.empty())); + assertThat(coercer.coerce(true, decimalType), is(Result.failure())); + assertThat(coercer.coerce("1.0", decimalType), is(Result.failure())); + assertThat(coercer.coerce(1.0d, decimalType), is(Result.failure())); + assertThat(coercer.coerce(1234L, decimalType), is(Result.failure())); } @Test public void shouldCoerceToDouble() { - assertThat(coercer.coerce(1, SqlTypes.DOUBLE), is(Optional.of(1.0d))); - assertThat(coercer.coerce(1L, SqlTypes.DOUBLE), is(Optional.of(1.0d))); - assertThat(coercer.coerce(new BigDecimal(123), SqlTypes.DOUBLE), is(Optional.of(123.0d))); - assertThat(coercer.coerce(1.0d, SqlTypes.DOUBLE), is(Optional.of(1.0d))); + assertThat(coercer.coerce(1, SqlTypes.DOUBLE), is(Result.of(1.0d))); + assertThat(coercer.coerce(1L, SqlTypes.DOUBLE), is(Result.of(1.0d))); + assertThat(coercer.coerce(new BigDecimal(123), SqlTypes.DOUBLE), is(Result.of(123.0d))); + assertThat(coercer.coerce(1.0d, SqlTypes.DOUBLE), is(Result.of(1.0d))); } @Test public void shouldNotCoerceToDouble() { - assertThat(coercer.coerce(true, SqlTypes.DOUBLE), is(Optional.empty())); - assertThat(coercer.coerce("1", SqlTypes.DOUBLE), is(Optional.empty())); + assertThat(coercer.coerce(true, SqlTypes.DOUBLE), is(Result.failure())); + assertThat(coercer.coerce("1", SqlTypes.DOUBLE), is(Result.failure())); } @Test public void shouldCoerceToArray() { final SqlType arrayType = SqlTypes.array(SqlTypes.DOUBLE); - assertThat(coercer.coerce(ImmutableList.of(1), arrayType), is(Optional.of(ImmutableList.of(1d)))); - assertThat(coercer.coerce(ImmutableList.of(1L), arrayType), is(Optional.of(ImmutableList.of(1d)))); - assertThat(coercer.coerce(ImmutableList.of(1.1), arrayType), is(Optional.of(ImmutableList.of(1.1d)))); + assertThat(coercer.coerce(ImmutableList.of(1), arrayType), is(Result.of(ImmutableList.of(1d)))); + assertThat(coercer.coerce(ImmutableList.of(1L), arrayType), is(Result.of(ImmutableList.of(1d)))); + assertThat(coercer.coerce(ImmutableList.of(1.1), arrayType), + is(Result.of(ImmutableList.of(1.1d)))); + assertThat(coercer.coerce(Collections.singletonList(null), arrayType), + is(Result.of(Collections.singletonList(null)))); } @Test public void shouldNotCoerceToArray() { final SqlType arrayType = SqlTypes.array(SqlTypes.DOUBLE); - assertThat(coercer.coerce(true, arrayType), is(Optional.empty())); - assertThat(coercer.coerce(1L, arrayType), is(Optional.empty())); - assertThat(coercer.coerce("foo", arrayType), is(Optional.empty())); - assertThat(coercer.coerce(ImmutableMap.of("foo", 1), arrayType), is(Optional.empty())); + assertThat(coercer.coerce(true, arrayType), is(Result.failure())); + assertThat(coercer.coerce(1L, arrayType), is(Result.failure())); + assertThat(coercer.coerce("foo", arrayType), is(Result.failure())); + assertThat(coercer.coerce(ImmutableMap.of("foo", 1), arrayType), is(Result.failure())); } @Test public void shouldCoerceToMap() { final SqlType mapType = SqlTypes.map(SqlTypes.DOUBLE); - assertThat(coercer.coerce(ImmutableMap.of("foo", 1), mapType), is(Optional.of(ImmutableMap.of("foo", 1d)))); - assertThat(coercer.coerce(ImmutableMap.of("foo", 1L), mapType), is(Optional.of(ImmutableMap.of("foo", 1d)))); - assertThat(coercer.coerce(ImmutableMap.of("foo", 1.1), mapType), is(Optional.of(ImmutableMap.of("foo", 1.1d)))); + assertThat(coercer.coerce(ImmutableMap.of("foo", 1), mapType), is(Result.of(ImmutableMap.of("foo", 1d)))); + assertThat(coercer.coerce(ImmutableMap.of("foo", 1L), mapType), is(Result.of(ImmutableMap.of("foo", 1d)))); + assertThat(coercer.coerce(ImmutableMap.of("foo", 1.1), mapType), + is(Result.of(ImmutableMap.of("foo", 1.1d)))); + assertThat(coercer.coerce(Collections.singletonMap("foo", null), mapType), + is(Result.of(Collections.singletonMap("foo", null)))); } @Test public void shouldNotCoerceToMap() { final SqlType mapType = SqlTypes.map(SqlTypes.DOUBLE); - assertThat(coercer.coerce(true, mapType), is(Optional.empty())); - assertThat(coercer.coerce(1L, mapType), is(Optional.empty())); - assertThat(coercer.coerce("foo", mapType), is(Optional.empty())); - assertThat(coercer.coerce(ImmutableList.of("foo"), mapType), is(Optional.empty())); + assertThat(coercer.coerce(true, mapType), is(Result.failure())); + assertThat(coercer.coerce(1L, mapType), is(Result.failure())); + assertThat(coercer.coerce("foo", mapType), is(Result.failure())); + assertThat(coercer.coerce(ImmutableList.of("foo"), mapType), is(Result.failure())); } - @SuppressWarnings("unchecked") + @SuppressWarnings({"unchecked", "OptionalGetWithoutIsPresent"}) @Test public void shouldCoerceToStruct() { // Given: @@ -204,14 +216,32 @@ public void shouldCoerceToStruct() { final SqlType structType = SqlTypes.struct().field("foo", SqlTypes.decimal(2, 1)).build(); // When: - final Optional coerced = (Optional) coercer.coerce(struct, structType); + final Result result = coercer.coerce(struct, structType); // Then: - assertThat("", coerced.isPresent()); + assertThat("", !result.failed()); + final Optional coerced = (Optional) result.value(); assertThat(coerced.get().get("foo"), is(new BigDecimal("2.0"))); } - @SuppressWarnings("unchecked") + @SuppressWarnings({"unchecked", "OptionalGetWithoutIsPresent"}) + @Test + public void shouldCoerceToStructWithNullValues() { + // Given: + final Schema schema = SchemaBuilder.struct().field("foo", Schema.OPTIONAL_INT64_SCHEMA); + final Struct struct = new Struct(schema).put("foo", null); + final SqlType structType = SqlTypes.struct().field("foo", SqlTypes.decimal(2, 1)).build(); + + // When: + final Result result = coercer.coerce(struct, structType); + + // Then: + assertThat("", !result.failed()); + final Optional coerced = (Optional) result.value(); + assertThat(coerced.get().get("foo"), is(nullValue())); + } + + @SuppressWarnings({"unchecked", "OptionalGetWithoutIsPresent"}) @Test public void shouldSubsetCoerceToStruct() { // Given: @@ -222,26 +252,41 @@ public void shouldSubsetCoerceToStruct() { .field("bar", SqlTypes.STRING).build(); // When: - final Optional coerced = (Optional) coercer.coerce(struct, structType); + final Result result = coercer.coerce(struct, structType); // Then: - assertThat("", coerced.isPresent()); + assertThat("", !result.failed()); + final Optional coerced = (Optional) result.value(); assertThat(coerced.get().get("foo"), is("val1")); assertThat(coerced.get().get("bar"), nullValue()); } + @Test + public void shouldNotCoerceToStructIfAnyFieldFailsToCoerce() { + // Given: + final Schema schema = SchemaBuilder.struct().field("foo", Schema.INT64_SCHEMA); + final Struct struct = new Struct(schema).put("foo", 2L); + final SqlType structType = SqlTypes.struct().field("foo", SqlTypes.array(SqlTypes.INTEGER)).build(); + + // When: + final Result result = coercer.coerce(struct, structType); + + // Then: + assertThat(result, is(Result.failure())); + } + @Test public void shouldCoerceToString() { - assertThat(coercer.coerce("foobar", SqlTypes.STRING), is(Optional.of("foobar"))); + assertThat(coercer.coerce("foobar", SqlTypes.STRING), is(Result.of("foobar"))); } @Test public void shouldNotCoerceToString() { - assertThat(coercer.coerce(true, SqlTypes.STRING), is(Optional.empty())); - assertThat(coercer.coerce(1, SqlTypes.STRING), is(Optional.empty())); - assertThat(coercer.coerce(1L, SqlTypes.STRING), is(Optional.empty())); - assertThat(coercer.coerce(1.0d, SqlTypes.STRING), is(Optional.empty())); - assertThat(coercer.coerce(new BigDecimal(123), SqlTypes.STRING), is(Optional.empty())); + assertThat(coercer.coerce(true, SqlTypes.STRING), is(Result.failure())); + assertThat(coercer.coerce(1, SqlTypes.STRING), is(Result.failure())); + assertThat(coercer.coerce(1L, SqlTypes.STRING), is(Result.failure())); + assertThat(coercer.coerce(1.0d, SqlTypes.STRING), is(Result.failure())); + assertThat(coercer.coerce(new BigDecimal(123), SqlTypes.STRING), is(Result.failure())); } @Test @@ -259,13 +304,13 @@ public void shouldCoerceUsingSameRulesAsBaseTypeUpCastRules() { shouldUpCast.forEach(toBaseType -> assertThat( "should coerce " + fromBaseType + " to " + toBaseType, coercer.coerce(getInstance(fromBaseType), getType(toBaseType)), - is(not(Optional.empty())) + is(not(Result.failure())) )); shouldNotUpCast.forEach(toBaseType -> assertThat( "should not coerce " + fromBaseType + " to " + toBaseType, coercer.coerce(getInstance(fromBaseType), getType(toBaseType)), - is(Optional.empty()) + is(Result.failure()) )); } } diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/endpoints/KeyValueExtractor.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/endpoints/KeyValueExtractor.java index b9a428d021c4..068ec7f80d5c 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/endpoints/KeyValueExtractor.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/endpoints/KeyValueExtractor.java @@ -68,8 +68,11 @@ public static GenericRow extractValues(final JsonObject values, final LogicalSch return GenericRow.fromList(vals); } - private static Object coerceObject(final Object value, final SqlType sqlType, - final SqlValueCoercer sqlValueCoercer) { + private static Object coerceObject( + final Object value, + final SqlType sqlType, + final SqlValueCoercer sqlValueCoercer + ) { if (sqlType instanceof SqlDecimal) { // We have to handle this manually as SqlValueCoercer doesn't seem to do it final SqlDecimal decType = (SqlDecimal) sqlType; @@ -85,10 +88,12 @@ private static Object coerceObject(final Object value, final SqlType sqlType, } } return sqlValueCoercer.coerce(value, sqlType) + .orElseThrow(() -> new KsqlApiException( String.format("Can't coerce a field of type %s (%s) into type %s", value.getClass(), value, sqlType), - ErrorCodes.ERROR_CODE_CANNOT_COERCE_FIELD)); + ErrorCodes.ERROR_CODE_CANNOT_COERCE_FIELD)) + .orElse(null); } } diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/PullQueryExecutor.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/PullQueryExecutor.java index a1b415cd420c..c922df2dad27 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/PullQueryExecutor.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/PullQueryExecutor.java @@ -42,6 +42,7 @@ import io.confluent.ksql.execution.expression.tree.Literal; import io.confluent.ksql.execution.expression.tree.LogicalBinaryExpression; import io.confluent.ksql.execution.expression.tree.LongLiteral; +import io.confluent.ksql.execution.expression.tree.NullLiteral; import io.confluent.ksql.execution.expression.tree.QualifiedColumnReferenceExp; import io.confluent.ksql.execution.expression.tree.StringLiteral; import io.confluent.ksql.execution.expression.tree.UnqualifiedColumnReferenceExp; @@ -564,8 +565,15 @@ private static Object extractKeyWhereClause( } final Expression other = getNonColumnRefSide(comparison); - final Object right = ((Literal) other).getValue(); + if (!(other instanceof Literal)) { + throw new KsqlException("Ony comparison to literals is currently supported: " + comparison); + } + if (other instanceof NullLiteral) { + throw new KsqlException("Primary key columns can not be NULL: " + comparison); + } + + final Object right = ((Literal) other).getValue(); return coerceKey(schema, right, windowed); } @@ -582,7 +590,8 @@ private static Object coerceKey( return DefaultSqlValueCoercer.INSTANCE.coerce(right, keyColumn.type()) .orElseThrow(() -> new KsqlException("'" + right + "' can not be converted " - + "to the type of the key column: " + keyColumn.toString(FormatOptions.noEscape()))); + + "to the type of the key column: " + keyColumn.toString(FormatOptions.noEscape()))) + .orElse(null); } private static Range extractWhereClauseWindowBounds(