From 0695213f757d457ca4fd6ab67952a982874e16bd Mon Sep 17 00:00:00 2001 From: Raul Estrada Date: Tue, 29 Sep 2020 11:25:46 -0500 Subject: [PATCH] feat: support Comparisons on complex types (#6149) --- .../execution/codegen/SqlToJavaVisitor.java | 47 +++++- .../expression/tree/ComparisonExpression.java | 3 +- .../ksql/execution/util/ComparisonUtil.java | 25 +++- .../execution/util/ComparisonUtilTest.java | 12 +- .../6.1.0_1601291312437/plan.json | 140 ++++++++++++++++++ .../6.1.0_1601291312437/spec.json | 106 +++++++++++++ .../6.1.0_1601291312437/topology | 13 ++ .../6.1.0_1601291311609/plan.json | 140 ++++++++++++++++++ .../6.1.0_1601291311609/spec.json | 136 +++++++++++++++++ .../6.1.0_1601291311609/topology | 13 ++ .../6.1.0_1601291312203/plan.json | 140 ++++++++++++++++++ .../6.1.0_1601291312203/spec.json | 136 +++++++++++++++++ .../6.1.0_1601291312203/topology | 13 ++ .../6.1.0_1601291312322/plan.json | 140 ++++++++++++++++++ .../6.1.0_1601291312322/spec.json | 136 +++++++++++++++++ .../6.1.0_1601291312322/topology | 13 ++ .../6.1.0_1601291312494/plan.json | 140 ++++++++++++++++++ .../6.1.0_1601291312494/spec.json | 134 +++++++++++++++++ .../6.1.0_1601291312494/topology | 13 ++ .../6.1.0_1601291312596/plan.json | 140 ++++++++++++++++++ .../6.1.0_1601291312596/spec.json | 128 ++++++++++++++++ .../6.1.0_1601291312596/topology | 13 ++ .../6.1.0_1601291311729/plan.json | 140 ++++++++++++++++++ .../6.1.0_1601291311729/spec.json | 136 +++++++++++++++++ .../6.1.0_1601291311729/topology | 13 ++ .../6.1.0_1601291312550/plan.json | 140 ++++++++++++++++++ .../6.1.0_1601291312550/spec.json | 134 +++++++++++++++++ .../6.1.0_1601291312550/topology | 13 ++ .../binary-comparison.json | 136 ++++++++++++++--- 29 files changed, 2457 insertions(+), 36 deletions(-) create mode 100644 ksqldb-functional-tests/src/test/resources/historical_plans/binary-comparison_-_array_equality/6.1.0_1601291312437/plan.json create mode 100644 ksqldb-functional-tests/src/test/resources/historical_plans/binary-comparison_-_array_equality/6.1.0_1601291312437/spec.json create mode 100644 ksqldb-functional-tests/src/test/resources/historical_plans/binary-comparison_-_array_equality/6.1.0_1601291312437/topology create mode 100644 ksqldb-functional-tests/src/test/resources/historical_plans/binary-comparison_-_equals/6.1.0_1601291311609/plan.json create mode 100644 ksqldb-functional-tests/src/test/resources/historical_plans/binary-comparison_-_equals/6.1.0_1601291311609/spec.json create mode 100644 ksqldb-functional-tests/src/test/resources/historical_plans/binary-comparison_-_equals/6.1.0_1601291311609/topology create mode 100644 ksqldb-functional-tests/src/test/resources/historical_plans/binary-comparison_-_is_distinct_from_(2)/6.1.0_1601291312203/plan.json create mode 100644 ksqldb-functional-tests/src/test/resources/historical_plans/binary-comparison_-_is_distinct_from_(2)/6.1.0_1601291312203/spec.json create mode 100644 ksqldb-functional-tests/src/test/resources/historical_plans/binary-comparison_-_is_distinct_from_(2)/6.1.0_1601291312203/topology create mode 100644 ksqldb-functional-tests/src/test/resources/historical_plans/binary-comparison_-_is_not_distinct_from_(2)/6.1.0_1601291312322/plan.json create mode 100644 ksqldb-functional-tests/src/test/resources/historical_plans/binary-comparison_-_is_not_distinct_from_(2)/6.1.0_1601291312322/spec.json create mode 100644 ksqldb-functional-tests/src/test/resources/historical_plans/binary-comparison_-_is_not_distinct_from_(2)/6.1.0_1601291312322/topology create mode 100644 ksqldb-functional-tests/src/test/resources/historical_plans/binary-comparison_-_map_equality/6.1.0_1601291312494/plan.json create mode 100644 ksqldb-functional-tests/src/test/resources/historical_plans/binary-comparison_-_map_equality/6.1.0_1601291312494/spec.json create mode 100644 ksqldb-functional-tests/src/test/resources/historical_plans/binary-comparison_-_map_equality/6.1.0_1601291312494/topology create mode 100644 ksqldb-functional-tests/src/test/resources/historical_plans/binary-comparison_-_nested_collections_comparison/6.1.0_1601291312596/plan.json create mode 100644 ksqldb-functional-tests/src/test/resources/historical_plans/binary-comparison_-_nested_collections_comparison/6.1.0_1601291312596/spec.json create mode 100644 ksqldb-functional-tests/src/test/resources/historical_plans/binary-comparison_-_nested_collections_comparison/6.1.0_1601291312596/topology create mode 100644 ksqldb-functional-tests/src/test/resources/historical_plans/binary-comparison_-_not_equals/6.1.0_1601291311729/plan.json create mode 100644 ksqldb-functional-tests/src/test/resources/historical_plans/binary-comparison_-_not_equals/6.1.0_1601291311729/spec.json create mode 100644 ksqldb-functional-tests/src/test/resources/historical_plans/binary-comparison_-_not_equals/6.1.0_1601291311729/topology create mode 100644 ksqldb-functional-tests/src/test/resources/historical_plans/binary-comparison_-_struct_equality/6.1.0_1601291312550/plan.json create mode 100644 ksqldb-functional-tests/src/test/resources/historical_plans/binary-comparison_-_struct_equality/6.1.0_1601291312550/spec.json create mode 100644 ksqldb-functional-tests/src/test/resources/historical_plans/binary-comparison_-_struct_equality/6.1.0_1601291312550/topology diff --git a/ksqldb-execution/src/main/java/io/confluent/ksql/execution/codegen/SqlToJavaVisitor.java b/ksqldb-execution/src/main/java/io/confluent/ksql/execution/codegen/SqlToJavaVisitor.java index c847e5ab487d..a6362b9c519b 100644 --- a/ksqldb-execution/src/main/java/io/confluent/ksql/execution/codegen/SqlToJavaVisitor.java +++ b/ksqldb-execution/src/main/java/io/confluent/ksql/execution/codegen/SqlToJavaVisitor.java @@ -478,6 +478,42 @@ private String visitStringComparisonExpression(final ComparisonExpression.Type t } } + private String visitArrayComparisonExpression(final ComparisonExpression.Type type) { + switch (type) { + case EQUAL: + return "(%1$s.equals(%2$s))"; + case NOT_EQUAL: + case IS_DISTINCT_FROM: + return "(!%1$s.equals(%2$s))"; + default: + throw new KsqlException("Unexpected array comparison: " + type.getValue()); + } + } + + private String visitMapComparisonExpression(final ComparisonExpression.Type type) { + switch (type) { + case EQUAL: + return "(%1$s.equals(%2$s))"; + case NOT_EQUAL: + case IS_DISTINCT_FROM: + return "(!%1$s.equals(%2$s))"; + default: + throw new KsqlException("Unexpected map comparison: " + type.getValue()); + } + } + + private String visitStructComparisonExpression(final ComparisonExpression.Type type) { + switch (type) { + case EQUAL: + return "(%1$s.equals(%2$s))"; + case NOT_EQUAL: + case IS_DISTINCT_FROM: + return "(!%1$s.equals(%2$s))"; + default: + throw new KsqlException("Unexpected struct comparison: " + type.getValue()); + } + } + private String visitScalarComparisonExpression(final ComparisonExpression.Type type) { switch (type) { case EQUAL: @@ -552,10 +588,15 @@ public Pair visitComparisonExpression( case STRING: exprFormat += visitStringComparisonExpression(node.getType()); break; - case MAP: - throw new KsqlException("Cannot compare MAP values"); case ARRAY: - throw new KsqlException("Cannot compare ARRAY values"); + exprFormat += visitArrayComparisonExpression(node.getType()); + break; + case MAP: + exprFormat += visitMapComparisonExpression(node.getType()); + break; + case STRUCT: + exprFormat += visitStructComparisonExpression(node.getType()); + break; case BOOLEAN: exprFormat += visitBooleanComparisonExpression(node.getType()); break; diff --git a/ksqldb-execution/src/main/java/io/confluent/ksql/execution/expression/tree/ComparisonExpression.java b/ksqldb-execution/src/main/java/io/confluent/ksql/execution/expression/tree/ComparisonExpression.java index 853347e0a85f..e193bb96e9c6 100644 --- a/ksqldb-execution/src/main/java/io/confluent/ksql/execution/expression/tree/ComparisonExpression.java +++ b/ksqldb-execution/src/main/java/io/confluent/ksql/execution/expression/tree/ComparisonExpression.java @@ -32,7 +32,8 @@ public enum Type { LESS_THAN_OR_EQUAL("<="), GREATER_THAN(">"), GREATER_THAN_OR_EQUAL(">="), - IS_DISTINCT_FROM("IS DISTINCT FROM"); + IS_DISTINCT_FROM("IS DISTINCT FROM"), + IS_NOT_DISTINCT_FROM("IS NOT DISTINCT FROM"); private final String value; diff --git a/ksqldb-execution/src/main/java/io/confluent/ksql/execution/util/ComparisonUtil.java b/ksqldb-execution/src/main/java/io/confluent/ksql/execution/util/ComparisonUtil.java index 9109ba85c38f..55e425273693 100644 --- a/ksqldb-execution/src/main/java/io/confluent/ksql/execution/util/ComparisonUtil.java +++ b/ksqldb-execution/src/main/java/io/confluent/ksql/execution/util/ComparisonUtil.java @@ -33,6 +33,9 @@ final class ComparisonUtil { .add(handler(SqlBaseType::isNumber, ComparisonUtil::handleNumber)) .add(handler(SqlBaseType.STRING, ComparisonUtil::handleString)) .add(handler(SqlBaseType.BOOLEAN, ComparisonUtil::handleBoolean)) + .add(handler(SqlBaseType.ARRAY, ComparisonUtil::handleArray)) + .add(handler(SqlBaseType.MAP, ComparisonUtil::handleMap)) + .add(handler(SqlBaseType.STRUCT, ComparisonUtil::handleStruct)) .build(); private ComparisonUtil() { @@ -77,8 +80,26 @@ private static boolean handleString(final Type operator, final SqlType right) { } private static boolean handleBoolean(final Type operator, final SqlType right) { - return right.baseType() == SqlBaseType.BOOLEAN - && (operator == Type.EQUAL || operator == Type.NOT_EQUAL); + return right.baseType() == SqlBaseType.BOOLEAN && isEqualityOperator(operator); + } + + private static boolean handleArray(final Type operator, final SqlType right) { + return right.baseType() == SqlBaseType.ARRAY && isEqualityOperator(operator); + } + + private static boolean handleMap(final Type operator, final SqlType right) { + return right.baseType() == SqlBaseType.MAP && isEqualityOperator(operator); + } + + private static boolean handleStruct(final Type operator, final SqlType right) { + return right.baseType() == SqlBaseType.STRUCT && isEqualityOperator(operator); + } + + private static boolean isEqualityOperator(final Type operator) { + return operator == Type.EQUAL + || operator == Type.NOT_EQUAL + || operator == Type.IS_DISTINCT_FROM + || operator == Type.IS_NOT_DISTINCT_FROM; } private static Handler handler( diff --git a/ksqldb-execution/src/test/java/io/confluent/ksql/execution/util/ComparisonUtilTest.java b/ksqldb-execution/src/test/java/io/confluent/ksql/execution/util/ComparisonUtilTest.java index 37d2971f9193..84ce02031d67 100644 --- a/ksqldb-execution/src/test/java/io/confluent/ksql/execution/util/ComparisonUtilTest.java +++ b/ksqldb-execution/src/test/java/io/confluent/ksql/execution/util/ComparisonUtilTest.java @@ -22,7 +22,6 @@ import com.google.common.collect.ImmutableList; import io.confluent.ksql.execution.expression.tree.ComparisonExpression; -import io.confluent.ksql.schema.ksql.types.SqlBaseType; import io.confluent.ksql.schema.ksql.types.SqlDecimal; import io.confluent.ksql.schema.ksql.types.SqlType; import io.confluent.ksql.schema.ksql.types.SqlTypes; @@ -44,11 +43,6 @@ public class ComparisonUtilTest { SqlTypes.struct().field("foo", SqlTypes.BIGINT).build() ); - private static final SqlBaseType[] SCHEMA_TO_SQL_NAME = new SqlBaseType[] { - SqlBaseType.BOOLEAN, SqlBaseType.INTEGER, SqlBaseType.BIGINT, SqlBaseType.DOUBLE, - SqlBaseType.DECIMAL, SqlBaseType.STRING, SqlBaseType.ARRAY, SqlBaseType.MAP, SqlBaseType.STRUCT - }; - private static final List> expectedResults = ImmutableList.of( ImmutableList.of(true, false, false, false, false, false, false, false, false), // Boolean ImmutableList.of(false, true, true, true, true, false, false, false, false), // Int @@ -56,9 +50,9 @@ public class ComparisonUtilTest { ImmutableList.of(false, true, true, true, true, false, false, false, false), // Double ImmutableList.of(false, true, true, true, true, false, false, false, false), // Decimal ImmutableList.of(false, false, false, false, false, true, false, false, false), // String - ImmutableList.of(false, false, false, false, false, false, false, false, false), // Array - ImmutableList.of(false, false, false, false, false, false, false, false, false), // Map - ImmutableList.of(false, false, false, false, false, false, false, false, false) // Struct + ImmutableList.of(false, false, false, false, false, false, true, false, false), // Array + ImmutableList.of(false, false, false, false, false, false, false, true, false), // Map + ImmutableList.of(false, false, false, false, false, false, false, false, true) // Struct ); @Test diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/binary-comparison_-_array_equality/6.1.0_1601291312437/plan.json b/ksqldb-functional-tests/src/test/resources/historical_plans/binary-comparison_-_array_equality/6.1.0_1601291312437/plan.json new file mode 100644 index 000000000000..5720ac95cf42 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/binary-comparison_-_array_equality/6.1.0_1601291312437/plan.json @@ -0,0 +1,140 @@ +{ + "plan" : [ { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM INPUT (A INTEGER KEY, B ARRAY, C ARRAY) WITH (KAFKA_TOPIC='test_topic', VALUE_FORMAT='JSON');", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "INPUT", + "schema" : "`A` INTEGER KEY, `B` ARRAY, `C` ARRAY", + "topicName" : "test_topic", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "orReplace" : false + } + }, { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM OUTPUT AS SELECT\n INPUT.A A,\n (INPUT.B = INPUT.C) KSQL_COL_0,\n (INPUT.B <> INPUT.C) KSQL_COL_1,\n (NOT (INPUT.B IS DISTINCT FROM INPUT.C)) KSQL_COL_2,\n (INPUT.B IS DISTINCT FROM INPUT.C) KSQL_COL_3\nFROM INPUT INPUT\nEMIT CHANGES", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "OUTPUT", + "schema" : "`A` INTEGER KEY, `KSQL_COL_0` BOOLEAN, `KSQL_COL_1` BOOLEAN, `KSQL_COL_2` BOOLEAN, `KSQL_COL_3` BOOLEAN", + "topicName" : "OUTPUT", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "orReplace" : false + }, + "queryPlan" : { + "sources" : [ "INPUT" ], + "sink" : "OUTPUT", + "physicalPlan" : { + "@type" : "streamSinkV1", + "properties" : { + "queryContext" : "OUTPUT" + }, + "source" : { + "@type" : "streamSelectV1", + "properties" : { + "queryContext" : "Project" + }, + "source" : { + "@type" : "streamSourceV1", + "properties" : { + "queryContext" : "KsqlTopic/Source" + }, + "topicName" : "test_topic", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "sourceSchema" : "`A` INTEGER KEY, `B` ARRAY, `C` ARRAY" + }, + "keyColumnNames" : [ "A" ], + "selectExpressions" : [ "(B = C) AS KSQL_COL_0", "(B <> C) AS KSQL_COL_1", "(NOT (B IS DISTINCT FROM C)) AS KSQL_COL_2", "(B IS DISTINCT FROM C) AS KSQL_COL_3" ] + }, + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "topicName" : "OUTPUT" + }, + "queryId" : "CSAS_OUTPUT_0" + } + } ], + "configs" : { + "ksql.extension.dir" : "ext", + "ksql.streams.cache.max.bytes.buffering" : "0", + "ksql.security.extension.class" : null, + "metric.reporters" : "", + "ksql.transient.prefix" : "transient_", + "ksql.query.status.running.threshold.seconds" : "300", + "ksql.streams.default.deserialization.exception.handler" : "io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler", + "ksql.output.topic.name.prefix" : "", + "ksql.query.pull.enable.standby.reads" : "false", + "ksql.streams.max.task.idle.ms" : "0", + "ksql.query.error.max.queue.size" : "10", + "ksql.internal.topic.min.insync.replicas" : "1", + "ksql.streams.shutdown.timeout.ms" : "300000", + "ksql.internal.topic.replicas" : "1", + "ksql.insert.into.values.enabled" : "true", + "ksql.query.pull.max.allowed.offset.lag" : "9223372036854775807", + "ksql.query.pull.max.qps" : "2147483647", + "ksql.access.validator.enable" : "auto", + "ksql.streams.bootstrap.servers" : "localhost:0", + "ksql.query.pull.metrics.enabled" : "false", + "ksql.create.or.replace.enabled" : "true", + "ksql.metrics.extension" : null, + "ksql.hidden.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.cast.strings.preserve.nulls" : "true", + "ksql.authorization.cache.max.entries" : "10000", + "ksql.pull.queries.enable" : "true", + "ksql.suppress.enabled" : "false", + "ksql.sink.window.change.log.additional.retention" : "1000000", + "ksql.readonly.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.query.persistent.active.limit" : "2147483647", + "ksql.persistence.wrap.single.values" : null, + "ksql.authorization.cache.expiry.time.secs" : "30", + "ksql.query.retry.backoff.initial.ms" : "15000", + "ksql.schema.registry.url" : "", + "ksql.properties.overrides.denylist" : "", + "ksql.streams.auto.offset.reset" : "earliest", + "ksql.connect.url" : "http://localhost:8083", + "ksql.service.id" : "some.ksql.service.id", + "ksql.streams.default.production.exception.handler" : "io.confluent.ksql.errors.ProductionExceptionHandlerUtil$LogAndFailProductionExceptionHandler", + "ksql.enable.metastore.backup" : "false", + "ksql.streams.commit.interval.ms" : "2000", + "ksql.streams.auto.commit.interval.ms" : "0", + "ksql.streams.topology.optimization" : "all", + "ksql.query.retry.backoff.max.ms" : "900000", + "ksql.streams.num.stream.threads" : "4", + "ksql.timestamp.throw.on.invalid" : "false", + "ksql.metrics.tags.custom" : "", + "ksql.udfs.enabled" : "true", + "ksql.udf.enable.security.manager" : "true", + "ksql.connect.worker.config" : "", + "ksql.udf.collect.metrics" : "false", + "ksql.persistent.prefix" : "query_", + "ksql.metastore.backup.location" : "", + "ksql.error.classifier.regex" : "", + "ksql.suppress.buffer.size.bytes" : "-1" + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/binary-comparison_-_array_equality/6.1.0_1601291312437/spec.json b/ksqldb-functional-tests/src/test/resources/historical_plans/binary-comparison_-_array_equality/6.1.0_1601291312437/spec.json new file mode 100644 index 000000000000..417e03123769 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/binary-comparison_-_array_equality/6.1.0_1601291312437/spec.json @@ -0,0 +1,106 @@ +{ + "version" : "6.1.0", + "timestamp" : 1601291312437, + "path" : "query-validation-tests/binary-comparison.json", + "schemas" : { + "CSAS_OUTPUT_0.KsqlTopic.Source" : { + "schema" : "`A` INTEGER KEY, `B` ARRAY, `C` ARRAY", + "serdeOptions" : [ ] + }, + "CSAS_OUTPUT_0.OUTPUT" : { + "schema" : "`A` INTEGER KEY, `KSQL_COL_0` BOOLEAN, `KSQL_COL_1` BOOLEAN, `KSQL_COL_2` BOOLEAN, `KSQL_COL_3` BOOLEAN", + "serdeOptions" : [ ] + } + }, + "testCase" : { + "name" : "array equality", + "inputs" : [ { + "topic" : "test_topic", + "key" : 1, + "value" : { + "B" : [ 1, 2 ], + "C" : [ 1, 2.0 ] + } + }, { + "topic" : "test_topic", + "key" : 2, + "value" : { + "B" : [ 1, 2 ], + "C" : [ 1 ] + } + } ], + "outputs" : [ { + "topic" : "OUTPUT", + "key" : 1, + "value" : { + "KSQL_COL_0" : true, + "KSQL_COL_1" : false, + "KSQL_COL_2" : true, + "KSQL_COL_3" : false + } + }, { + "topic" : "OUTPUT", + "key" : 2, + "value" : { + "KSQL_COL_0" : false, + "KSQL_COL_1" : true, + "KSQL_COL_2" : false, + "KSQL_COL_3" : true + } + } ], + "topics" : [ { + "name" : "OUTPUT", + "replicas" : 1, + "numPartitions" : 4 + }, { + "name" : "test_topic", + "replicas" : 1, + "numPartitions" : 4 + } ], + "statements" : [ "CREATE STREAM INPUT (A INT KEY, B ARRAY, C ARRAY) WITH (kafka_topic='test_topic', value_format='JSON');", "CREATE STREAM OUTPUT AS SELECT A, B = C, B <> C, B IS NOT DISTINCT FROM C, B IS DISTINCT FROM C FROM INPUT;" ], + "post" : { + "sources" : [ { + "name" : "INPUT", + "type" : "STREAM", + "schema" : "`A` INTEGER KEY, `B` ARRAY, `C` ARRAY", + "keyFormat" : { + "format" : "KAFKA" + }, + "serdeOptions" : [ ] + }, { + "name" : "OUTPUT", + "type" : "STREAM", + "schema" : "`A` INTEGER KEY, `KSQL_COL_0` BOOLEAN, `KSQL_COL_1` BOOLEAN, `KSQL_COL_2` BOOLEAN, `KSQL_COL_3` BOOLEAN", + "keyFormat" : { + "format" : "KAFKA" + }, + "serdeOptions" : [ ] + } ], + "topics" : { + "topics" : [ { + "name" : "test_topic", + "keyFormat" : { + "formatInfo" : { + "format" : "KAFKA" + } + }, + "valueFormat" : { + "format" : "JSON" + }, + "partitions" : 4 + }, { + "name" : "OUTPUT", + "keyFormat" : { + "formatInfo" : { + "format" : "KAFKA" + } + }, + "valueFormat" : { + "format" : "JSON" + }, + "partitions" : 4 + } ] + } + } + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/binary-comparison_-_array_equality/6.1.0_1601291312437/topology b/ksqldb-functional-tests/src/test/resources/historical_plans/binary-comparison_-_array_equality/6.1.0_1601291312437/topology new file mode 100644 index 000000000000..441a8f282644 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/binary-comparison_-_array_equality/6.1.0_1601291312437/topology @@ -0,0 +1,13 @@ +Topologies: + Sub-topology: 0 + Source: KSTREAM-SOURCE-0000000000 (topics: [test_topic]) + --> KSTREAM-TRANSFORMVALUES-0000000001 + Processor: KSTREAM-TRANSFORMVALUES-0000000001 (stores: []) + --> Project + <-- KSTREAM-SOURCE-0000000000 + Processor: Project (stores: []) + --> KSTREAM-SINK-0000000003 + <-- KSTREAM-TRANSFORMVALUES-0000000001 + Sink: KSTREAM-SINK-0000000003 (topic: OUTPUT) + <-- Project + diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/binary-comparison_-_equals/6.1.0_1601291311609/plan.json b/ksqldb-functional-tests/src/test/resources/historical_plans/binary-comparison_-_equals/6.1.0_1601291311609/plan.json new file mode 100644 index 000000000000..15a4187aa221 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/binary-comparison_-_equals/6.1.0_1601291311609/plan.json @@ -0,0 +1,140 @@ +{ + "plan" : [ { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM INPUT (A INTEGER KEY, B BOOLEAN, C BIGINT, D DOUBLE, E DECIMAL(4, 3), F STRING, G ARRAY, H MAP, I STRUCT) WITH (KAFKA_TOPIC='test_topic', VALUE_FORMAT='JSON');", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "INPUT", + "schema" : "`A` INTEGER KEY, `B` BOOLEAN, `C` BIGINT, `D` DOUBLE, `E` DECIMAL(4, 3), `F` STRING, `G` ARRAY, `H` MAP, `I` STRUCT<`ID` INTEGER>", + "topicName" : "test_topic", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "orReplace" : false + } + }, { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM OUTPUT AS SELECT\n INPUT.A A,\n (INPUT.A = 1) KSQL_COL_0,\n (INPUT.B = true) KSQL_COL_1,\n (INPUT.C = 11) KSQL_COL_2,\n (INPUT.D = 1.1) KSQL_COL_3,\n (INPUT.E = 1.20) KSQL_COL_4,\n (INPUT.F = 'foo') KSQL_COL_5,\n (INPUT.G = ARRAY[1, 2]) KSQL_COL_6,\n (INPUT.H = MAP('a':=1)) KSQL_COL_7,\n (INPUT.I = STRUCT(ID:=2)) KSQL_COL_8\nFROM INPUT INPUT\nEMIT CHANGES", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "OUTPUT", + "schema" : "`A` INTEGER KEY, `KSQL_COL_0` BOOLEAN, `KSQL_COL_1` BOOLEAN, `KSQL_COL_2` BOOLEAN, `KSQL_COL_3` BOOLEAN, `KSQL_COL_4` BOOLEAN, `KSQL_COL_5` BOOLEAN, `KSQL_COL_6` BOOLEAN, `KSQL_COL_7` BOOLEAN, `KSQL_COL_8` BOOLEAN", + "topicName" : "OUTPUT", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "orReplace" : false + }, + "queryPlan" : { + "sources" : [ "INPUT" ], + "sink" : "OUTPUT", + "physicalPlan" : { + "@type" : "streamSinkV1", + "properties" : { + "queryContext" : "OUTPUT" + }, + "source" : { + "@type" : "streamSelectV1", + "properties" : { + "queryContext" : "Project" + }, + "source" : { + "@type" : "streamSourceV1", + "properties" : { + "queryContext" : "KsqlTopic/Source" + }, + "topicName" : "test_topic", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "sourceSchema" : "`A` INTEGER KEY, `B` BOOLEAN, `C` BIGINT, `D` DOUBLE, `E` DECIMAL(4, 3), `F` STRING, `G` ARRAY, `H` MAP, `I` STRUCT<`ID` INTEGER>" + }, + "keyColumnNames" : [ "A" ], + "selectExpressions" : [ "(A = 1) AS KSQL_COL_0", "(B = true) AS KSQL_COL_1", "(C = 11) AS KSQL_COL_2", "(D = 1.1) AS KSQL_COL_3", "(E = 1.20) AS KSQL_COL_4", "(F = 'foo') AS KSQL_COL_5", "(G = ARRAY[1, 2]) AS KSQL_COL_6", "(H = MAP('a':=1)) AS KSQL_COL_7", "(I = STRUCT(ID:=2)) AS KSQL_COL_8" ] + }, + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "topicName" : "OUTPUT" + }, + "queryId" : "CSAS_OUTPUT_0" + } + } ], + "configs" : { + "ksql.extension.dir" : "ext", + "ksql.streams.cache.max.bytes.buffering" : "0", + "ksql.security.extension.class" : null, + "metric.reporters" : "", + "ksql.transient.prefix" : "transient_", + "ksql.query.status.running.threshold.seconds" : "300", + "ksql.streams.default.deserialization.exception.handler" : "io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler", + "ksql.output.topic.name.prefix" : "", + "ksql.query.pull.enable.standby.reads" : "false", + "ksql.streams.max.task.idle.ms" : "0", + "ksql.query.error.max.queue.size" : "10", + "ksql.internal.topic.min.insync.replicas" : "1", + "ksql.streams.shutdown.timeout.ms" : "300000", + "ksql.internal.topic.replicas" : "1", + "ksql.insert.into.values.enabled" : "true", + "ksql.query.pull.max.allowed.offset.lag" : "9223372036854775807", + "ksql.query.pull.max.qps" : "2147483647", + "ksql.access.validator.enable" : "auto", + "ksql.streams.bootstrap.servers" : "localhost:0", + "ksql.query.pull.metrics.enabled" : "false", + "ksql.create.or.replace.enabled" : "true", + "ksql.metrics.extension" : null, + "ksql.hidden.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.cast.strings.preserve.nulls" : "true", + "ksql.authorization.cache.max.entries" : "10000", + "ksql.pull.queries.enable" : "true", + "ksql.suppress.enabled" : "false", + "ksql.sink.window.change.log.additional.retention" : "1000000", + "ksql.readonly.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.query.persistent.active.limit" : "2147483647", + "ksql.persistence.wrap.single.values" : null, + "ksql.authorization.cache.expiry.time.secs" : "30", + "ksql.query.retry.backoff.initial.ms" : "15000", + "ksql.schema.registry.url" : "", + "ksql.properties.overrides.denylist" : "", + "ksql.streams.auto.offset.reset" : "earliest", + "ksql.connect.url" : "http://localhost:8083", + "ksql.service.id" : "some.ksql.service.id", + "ksql.streams.default.production.exception.handler" : "io.confluent.ksql.errors.ProductionExceptionHandlerUtil$LogAndFailProductionExceptionHandler", + "ksql.enable.metastore.backup" : "false", + "ksql.streams.commit.interval.ms" : "2000", + "ksql.streams.auto.commit.interval.ms" : "0", + "ksql.streams.topology.optimization" : "all", + "ksql.query.retry.backoff.max.ms" : "900000", + "ksql.streams.num.stream.threads" : "4", + "ksql.timestamp.throw.on.invalid" : "false", + "ksql.metrics.tags.custom" : "", + "ksql.udfs.enabled" : "true", + "ksql.udf.enable.security.manager" : "true", + "ksql.connect.worker.config" : "", + "ksql.udf.collect.metrics" : "false", + "ksql.persistent.prefix" : "query_", + "ksql.metastore.backup.location" : "", + "ksql.error.classifier.regex" : "", + "ksql.suppress.buffer.size.bytes" : "-1" + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/binary-comparison_-_equals/6.1.0_1601291311609/spec.json b/ksqldb-functional-tests/src/test/resources/historical_plans/binary-comparison_-_equals/6.1.0_1601291311609/spec.json new file mode 100644 index 000000000000..097c62009e51 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/binary-comparison_-_equals/6.1.0_1601291311609/spec.json @@ -0,0 +1,136 @@ +{ + "version" : "6.1.0", + "timestamp" : 1601291311609, + "path" : "query-validation-tests/binary-comparison.json", + "schemas" : { + "CSAS_OUTPUT_0.KsqlTopic.Source" : { + "schema" : "`A` INTEGER KEY, `B` BOOLEAN, `C` BIGINT, `D` DOUBLE, `E` DECIMAL(4, 3), `F` STRING, `G` ARRAY, `H` MAP, `I` STRUCT<`ID` INTEGER>", + "serdeOptions" : [ ] + }, + "CSAS_OUTPUT_0.OUTPUT" : { + "schema" : "`A` INTEGER KEY, `KSQL_COL_0` BOOLEAN, `KSQL_COL_1` BOOLEAN, `KSQL_COL_2` BOOLEAN, `KSQL_COL_3` BOOLEAN, `KSQL_COL_4` BOOLEAN, `KSQL_COL_5` BOOLEAN, `KSQL_COL_6` BOOLEAN, `KSQL_COL_7` BOOLEAN, `KSQL_COL_8` BOOLEAN", + "serdeOptions" : [ ] + } + }, + "testCase" : { + "name" : "equals", + "inputs" : [ { + "topic" : "test_topic", + "key" : 1, + "value" : { + "B" : true, + "C" : 11, + "D" : 1.1, + "E" : 1.20, + "F" : "foo", + "G" : [ 1, 2 ], + "H" : { + "a" : 1 + }, + "I" : { + "id" : 2 + } + } + }, { + "topic" : "test_topic", + "key" : 2, + "value" : { + "B" : false, + "C" : 10, + "D" : 1.0, + "E" : 1.21, + "F" : "Foo", + "G" : [ 1 ], + "H" : { + "b" : 1 + }, + "I" : { + "id" : 3 + } + } + } ], + "outputs" : [ { + "topic" : "OUTPUT", + "key" : 1, + "value" : { + "KSQL_COL_0" : true, + "KSQL_COL_1" : true, + "KSQL_COL_2" : true, + "KSQL_COL_3" : true, + "KSQL_COL_4" : true, + "KSQL_COL_5" : true, + "KSQL_COL_6" : true, + "KSQL_COL_7" : true, + "KSQL_COL_8" : true + } + }, { + "topic" : "OUTPUT", + "key" : 2, + "value" : { + "KSQL_COL_0" : false, + "KSQL_COL_1" : false, + "KSQL_COL_2" : false, + "KSQL_COL_3" : false, + "KSQL_COL_4" : false, + "KSQL_COL_5" : false, + "KSQL_COL_6" : false, + "KSQL_COL_7" : false, + "KSQL_COL_8" : false + } + } ], + "topics" : [ { + "name" : "OUTPUT", + "replicas" : 1, + "numPartitions" : 4 + }, { + "name" : "test_topic", + "replicas" : 1, + "numPartitions" : 4 + } ], + "statements" : [ "CREATE STREAM INPUT (A INT KEY, B BOOLEAN, C BIGINT, D DOUBLE, E DECIMAL(4,3), F STRING, G ARRAY, H MAP, I STRUCT) WITH (kafka_topic='test_topic', value_format='JSON');", "CREATE STREAM OUTPUT AS SELECT A, A = 1, B = true, C = 11, D = 1.1, E = 1.20, F = 'foo', G = ARRAY[1,2], H = MAP('a':=1), I = STRUCT(ID:=2) FROM INPUT;" ], + "post" : { + "sources" : [ { + "name" : "INPUT", + "type" : "STREAM", + "schema" : "`A` INTEGER KEY, `B` BOOLEAN, `C` BIGINT, `D` DOUBLE, `E` DECIMAL(4, 3), `F` STRING, `G` ARRAY, `H` MAP, `I` STRUCT<`ID` INTEGER>", + "keyFormat" : { + "format" : "KAFKA" + }, + "serdeOptions" : [ ] + }, { + "name" : "OUTPUT", + "type" : "STREAM", + "schema" : "`A` INTEGER KEY, `KSQL_COL_0` BOOLEAN, `KSQL_COL_1` BOOLEAN, `KSQL_COL_2` BOOLEAN, `KSQL_COL_3` BOOLEAN, `KSQL_COL_4` BOOLEAN, `KSQL_COL_5` BOOLEAN, `KSQL_COL_6` BOOLEAN, `KSQL_COL_7` BOOLEAN, `KSQL_COL_8` BOOLEAN", + "keyFormat" : { + "format" : "KAFKA" + }, + "serdeOptions" : [ ] + } ], + "topics" : { + "topics" : [ { + "name" : "test_topic", + "keyFormat" : { + "formatInfo" : { + "format" : "KAFKA" + } + }, + "valueFormat" : { + "format" : "JSON" + }, + "partitions" : 4 + }, { + "name" : "OUTPUT", + "keyFormat" : { + "formatInfo" : { + "format" : "KAFKA" + } + }, + "valueFormat" : { + "format" : "JSON" + }, + "partitions" : 4 + } ] + } + } + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/binary-comparison_-_equals/6.1.0_1601291311609/topology b/ksqldb-functional-tests/src/test/resources/historical_plans/binary-comparison_-_equals/6.1.0_1601291311609/topology new file mode 100644 index 000000000000..441a8f282644 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/binary-comparison_-_equals/6.1.0_1601291311609/topology @@ -0,0 +1,13 @@ +Topologies: + Sub-topology: 0 + Source: KSTREAM-SOURCE-0000000000 (topics: [test_topic]) + --> KSTREAM-TRANSFORMVALUES-0000000001 + Processor: KSTREAM-TRANSFORMVALUES-0000000001 (stores: []) + --> Project + <-- KSTREAM-SOURCE-0000000000 + Processor: Project (stores: []) + --> KSTREAM-SINK-0000000003 + <-- KSTREAM-TRANSFORMVALUES-0000000001 + Sink: KSTREAM-SINK-0000000003 (topic: OUTPUT) + <-- Project + diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/binary-comparison_-_is_distinct_from_(2)/6.1.0_1601291312203/plan.json b/ksqldb-functional-tests/src/test/resources/historical_plans/binary-comparison_-_is_distinct_from_(2)/6.1.0_1601291312203/plan.json new file mode 100644 index 000000000000..c482813d0dbf --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/binary-comparison_-_is_distinct_from_(2)/6.1.0_1601291312203/plan.json @@ -0,0 +1,140 @@ +{ + "plan" : [ { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM INPUT (A INTEGER KEY, B BOOLEAN, C BIGINT, D DOUBLE, E DECIMAL(4, 3), F STRING, G ARRAY, H MAP, I STRUCT) WITH (KAFKA_TOPIC='test_topic', VALUE_FORMAT='JSON');", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "INPUT", + "schema" : "`A` INTEGER KEY, `B` BOOLEAN, `C` BIGINT, `D` DOUBLE, `E` DECIMAL(4, 3), `F` STRING, `G` ARRAY, `H` MAP, `I` STRUCT<`ID` INTEGER>", + "topicName" : "test_topic", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "orReplace" : false + } + }, { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM OUTPUT AS SELECT\n INPUT.A A,\n (INPUT.A IS DISTINCT FROM 1) KSQL_COL_0,\n (INPUT.B IS DISTINCT FROM true) KSQL_COL_1,\n (INPUT.C IS DISTINCT FROM 11) KSQL_COL_2,\n (INPUT.D IS DISTINCT FROM 1.1) KSQL_COL_3,\n (INPUT.E IS DISTINCT FROM 1.20) KSQL_COL_4,\n (INPUT.F IS DISTINCT FROM 'foo') KSQL_COL_5,\n (INPUT.G IS DISTINCT FROM ARRAY[1, 2]) KSQL_COL_6,\n (INPUT.H IS DISTINCT FROM MAP('a':=1)) KSQL_COL_7,\n (INPUT.I IS DISTINCT FROM STRUCT(ID:=2)) KSQL_COL_8\nFROM INPUT INPUT\nEMIT CHANGES", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "OUTPUT", + "schema" : "`A` INTEGER KEY, `KSQL_COL_0` BOOLEAN, `KSQL_COL_1` BOOLEAN, `KSQL_COL_2` BOOLEAN, `KSQL_COL_3` BOOLEAN, `KSQL_COL_4` BOOLEAN, `KSQL_COL_5` BOOLEAN, `KSQL_COL_6` BOOLEAN, `KSQL_COL_7` BOOLEAN, `KSQL_COL_8` BOOLEAN", + "topicName" : "OUTPUT", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "orReplace" : false + }, + "queryPlan" : { + "sources" : [ "INPUT" ], + "sink" : "OUTPUT", + "physicalPlan" : { + "@type" : "streamSinkV1", + "properties" : { + "queryContext" : "OUTPUT" + }, + "source" : { + "@type" : "streamSelectV1", + "properties" : { + "queryContext" : "Project" + }, + "source" : { + "@type" : "streamSourceV1", + "properties" : { + "queryContext" : "KsqlTopic/Source" + }, + "topicName" : "test_topic", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "sourceSchema" : "`A` INTEGER KEY, `B` BOOLEAN, `C` BIGINT, `D` DOUBLE, `E` DECIMAL(4, 3), `F` STRING, `G` ARRAY, `H` MAP, `I` STRUCT<`ID` INTEGER>" + }, + "keyColumnNames" : [ "A" ], + "selectExpressions" : [ "(A IS DISTINCT FROM 1) AS KSQL_COL_0", "(B IS DISTINCT FROM true) AS KSQL_COL_1", "(C IS DISTINCT FROM 11) AS KSQL_COL_2", "(D IS DISTINCT FROM 1.1) AS KSQL_COL_3", "(E IS DISTINCT FROM 1.20) AS KSQL_COL_4", "(F IS DISTINCT FROM 'foo') AS KSQL_COL_5", "(G IS DISTINCT FROM ARRAY[1, 2]) AS KSQL_COL_6", "(H IS DISTINCT FROM MAP('a':=1)) AS KSQL_COL_7", "(I IS DISTINCT FROM STRUCT(ID:=2)) AS KSQL_COL_8" ] + }, + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "topicName" : "OUTPUT" + }, + "queryId" : "CSAS_OUTPUT_0" + } + } ], + "configs" : { + "ksql.extension.dir" : "ext", + "ksql.streams.cache.max.bytes.buffering" : "0", + "ksql.security.extension.class" : null, + "metric.reporters" : "", + "ksql.transient.prefix" : "transient_", + "ksql.query.status.running.threshold.seconds" : "300", + "ksql.streams.default.deserialization.exception.handler" : "io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler", + "ksql.output.topic.name.prefix" : "", + "ksql.query.pull.enable.standby.reads" : "false", + "ksql.streams.max.task.idle.ms" : "0", + "ksql.query.error.max.queue.size" : "10", + "ksql.internal.topic.min.insync.replicas" : "1", + "ksql.streams.shutdown.timeout.ms" : "300000", + "ksql.internal.topic.replicas" : "1", + "ksql.insert.into.values.enabled" : "true", + "ksql.query.pull.max.allowed.offset.lag" : "9223372036854775807", + "ksql.query.pull.max.qps" : "2147483647", + "ksql.access.validator.enable" : "auto", + "ksql.streams.bootstrap.servers" : "localhost:0", + "ksql.query.pull.metrics.enabled" : "false", + "ksql.create.or.replace.enabled" : "true", + "ksql.metrics.extension" : null, + "ksql.hidden.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.cast.strings.preserve.nulls" : "true", + "ksql.authorization.cache.max.entries" : "10000", + "ksql.pull.queries.enable" : "true", + "ksql.suppress.enabled" : "false", + "ksql.sink.window.change.log.additional.retention" : "1000000", + "ksql.readonly.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.query.persistent.active.limit" : "2147483647", + "ksql.persistence.wrap.single.values" : null, + "ksql.authorization.cache.expiry.time.secs" : "30", + "ksql.query.retry.backoff.initial.ms" : "15000", + "ksql.schema.registry.url" : "", + "ksql.properties.overrides.denylist" : "", + "ksql.streams.auto.offset.reset" : "earliest", + "ksql.connect.url" : "http://localhost:8083", + "ksql.service.id" : "some.ksql.service.id", + "ksql.streams.default.production.exception.handler" : "io.confluent.ksql.errors.ProductionExceptionHandlerUtil$LogAndFailProductionExceptionHandler", + "ksql.enable.metastore.backup" : "false", + "ksql.streams.commit.interval.ms" : "2000", + "ksql.streams.auto.commit.interval.ms" : "0", + "ksql.streams.topology.optimization" : "all", + "ksql.query.retry.backoff.max.ms" : "900000", + "ksql.streams.num.stream.threads" : "4", + "ksql.timestamp.throw.on.invalid" : "false", + "ksql.metrics.tags.custom" : "", + "ksql.udfs.enabled" : "true", + "ksql.udf.enable.security.manager" : "true", + "ksql.connect.worker.config" : "", + "ksql.udf.collect.metrics" : "false", + "ksql.persistent.prefix" : "query_", + "ksql.metastore.backup.location" : "", + "ksql.error.classifier.regex" : "", + "ksql.suppress.buffer.size.bytes" : "-1" + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/binary-comparison_-_is_distinct_from_(2)/6.1.0_1601291312203/spec.json b/ksqldb-functional-tests/src/test/resources/historical_plans/binary-comparison_-_is_distinct_from_(2)/6.1.0_1601291312203/spec.json new file mode 100644 index 000000000000..a7124ad478d8 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/binary-comparison_-_is_distinct_from_(2)/6.1.0_1601291312203/spec.json @@ -0,0 +1,136 @@ +{ + "version" : "6.1.0", + "timestamp" : 1601291312203, + "path" : "query-validation-tests/binary-comparison.json", + "schemas" : { + "CSAS_OUTPUT_0.KsqlTopic.Source" : { + "schema" : "`A` INTEGER KEY, `B` BOOLEAN, `C` BIGINT, `D` DOUBLE, `E` DECIMAL(4, 3), `F` STRING, `G` ARRAY, `H` MAP, `I` STRUCT<`ID` INTEGER>", + "serdeOptions" : [ ] + }, + "CSAS_OUTPUT_0.OUTPUT" : { + "schema" : "`A` INTEGER KEY, `KSQL_COL_0` BOOLEAN, `KSQL_COL_1` BOOLEAN, `KSQL_COL_2` BOOLEAN, `KSQL_COL_3` BOOLEAN, `KSQL_COL_4` BOOLEAN, `KSQL_COL_5` BOOLEAN, `KSQL_COL_6` BOOLEAN, `KSQL_COL_7` BOOLEAN, `KSQL_COL_8` BOOLEAN", + "serdeOptions" : [ ] + } + }, + "testCase" : { + "name" : "is distinct from (2)", + "inputs" : [ { + "topic" : "test_topic", + "key" : 1, + "value" : { + "B" : true, + "C" : 11, + "D" : 1.1, + "E" : 1.20, + "F" : "foo", + "G" : [ 1, 2 ], + "H" : { + "a" : 1 + }, + "I" : { + "id" : 2 + } + } + }, { + "topic" : "test_topic", + "key" : 2, + "value" : { + "B" : false, + "C" : 10, + "D" : 1.0, + "E" : 1.21, + "F" : "Foo", + "G" : [ 1 ], + "H" : { + "b" : 1 + }, + "I" : { + "id" : 3 + } + } + } ], + "outputs" : [ { + "topic" : "OUTPUT", + "key" : 1, + "value" : { + "KSQL_COL_0" : false, + "KSQL_COL_1" : false, + "KSQL_COL_2" : false, + "KSQL_COL_3" : false, + "KSQL_COL_4" : false, + "KSQL_COL_5" : false, + "KSQL_COL_6" : false, + "KSQL_COL_7" : false, + "KSQL_COL_8" : false + } + }, { + "topic" : "OUTPUT", + "key" : 2, + "value" : { + "KSQL_COL_0" : true, + "KSQL_COL_1" : true, + "KSQL_COL_2" : true, + "KSQL_COL_3" : true, + "KSQL_COL_4" : true, + "KSQL_COL_5" : true, + "KSQL_COL_6" : true, + "KSQL_COL_7" : true, + "KSQL_COL_8" : true + } + } ], + "topics" : [ { + "name" : "OUTPUT", + "replicas" : 1, + "numPartitions" : 4 + }, { + "name" : "test_topic", + "replicas" : 1, + "numPartitions" : 4 + } ], + "statements" : [ "CREATE STREAM INPUT (A INT KEY, B BOOLEAN, C BIGINT, D DOUBLE, E DECIMAL(4,3), F STRING, G ARRAY, H MAP, I STRUCT) WITH (kafka_topic='test_topic', value_format='JSON');", "CREATE STREAM OUTPUT AS SELECT A, A IS DISTINCT FROM 1, B IS DISTINCT FROM true, C IS DISTINCT FROM 11, D IS DISTINCT FROM 1.1, E IS DISTINCT FROM 1.20, F IS DISTINCT FROM 'foo', G IS DISTINCT FROM ARRAY[1,2], H IS DISTINCT FROM MAP('a':=1), I IS DISTINCT FROM STRUCT(ID:=2) FROM INPUT;" ], + "post" : { + "sources" : [ { + "name" : "INPUT", + "type" : "STREAM", + "schema" : "`A` INTEGER KEY, `B` BOOLEAN, `C` BIGINT, `D` DOUBLE, `E` DECIMAL(4, 3), `F` STRING, `G` ARRAY, `H` MAP, `I` STRUCT<`ID` INTEGER>", + "keyFormat" : { + "format" : "KAFKA" + }, + "serdeOptions" : [ ] + }, { + "name" : "OUTPUT", + "type" : "STREAM", + "schema" : "`A` INTEGER KEY, `KSQL_COL_0` BOOLEAN, `KSQL_COL_1` BOOLEAN, `KSQL_COL_2` BOOLEAN, `KSQL_COL_3` BOOLEAN, `KSQL_COL_4` BOOLEAN, `KSQL_COL_5` BOOLEAN, `KSQL_COL_6` BOOLEAN, `KSQL_COL_7` BOOLEAN, `KSQL_COL_8` BOOLEAN", + "keyFormat" : { + "format" : "KAFKA" + }, + "serdeOptions" : [ ] + } ], + "topics" : { + "topics" : [ { + "name" : "test_topic", + "keyFormat" : { + "formatInfo" : { + "format" : "KAFKA" + } + }, + "valueFormat" : { + "format" : "JSON" + }, + "partitions" : 4 + }, { + "name" : "OUTPUT", + "keyFormat" : { + "formatInfo" : { + "format" : "KAFKA" + } + }, + "valueFormat" : { + "format" : "JSON" + }, + "partitions" : 4 + } ] + } + } + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/binary-comparison_-_is_distinct_from_(2)/6.1.0_1601291312203/topology b/ksqldb-functional-tests/src/test/resources/historical_plans/binary-comparison_-_is_distinct_from_(2)/6.1.0_1601291312203/topology new file mode 100644 index 000000000000..441a8f282644 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/binary-comparison_-_is_distinct_from_(2)/6.1.0_1601291312203/topology @@ -0,0 +1,13 @@ +Topologies: + Sub-topology: 0 + Source: KSTREAM-SOURCE-0000000000 (topics: [test_topic]) + --> KSTREAM-TRANSFORMVALUES-0000000001 + Processor: KSTREAM-TRANSFORMVALUES-0000000001 (stores: []) + --> Project + <-- KSTREAM-SOURCE-0000000000 + Processor: Project (stores: []) + --> KSTREAM-SINK-0000000003 + <-- KSTREAM-TRANSFORMVALUES-0000000001 + Sink: KSTREAM-SINK-0000000003 (topic: OUTPUT) + <-- Project + diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/binary-comparison_-_is_not_distinct_from_(2)/6.1.0_1601291312322/plan.json b/ksqldb-functional-tests/src/test/resources/historical_plans/binary-comparison_-_is_not_distinct_from_(2)/6.1.0_1601291312322/plan.json new file mode 100644 index 000000000000..d1f32e9844d4 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/binary-comparison_-_is_not_distinct_from_(2)/6.1.0_1601291312322/plan.json @@ -0,0 +1,140 @@ +{ + "plan" : [ { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM INPUT (A INTEGER KEY, B BOOLEAN, C BIGINT, D DOUBLE, E DECIMAL(4, 3), F STRING, G ARRAY, H MAP, I STRUCT) WITH (KAFKA_TOPIC='test_topic', VALUE_FORMAT='JSON');", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "INPUT", + "schema" : "`A` INTEGER KEY, `B` BOOLEAN, `C` BIGINT, `D` DOUBLE, `E` DECIMAL(4, 3), `F` STRING, `G` ARRAY, `H` MAP, `I` STRUCT<`ID` INTEGER>", + "topicName" : "test_topic", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "orReplace" : false + } + }, { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM OUTPUT AS SELECT\n INPUT.A A,\n (NOT (INPUT.A IS DISTINCT FROM 1)) KSQL_COL_0,\n (NOT (INPUT.B IS DISTINCT FROM true)) KSQL_COL_1,\n (NOT (INPUT.C IS DISTINCT FROM 11)) KSQL_COL_2,\n (NOT (INPUT.D IS DISTINCT FROM 1.1)) KSQL_COL_3,\n (NOT (INPUT.E IS DISTINCT FROM 1.20)) KSQL_COL_4,\n (NOT (INPUT.F IS DISTINCT FROM 'foo')) KSQL_COL_5,\n (NOT (INPUT.G IS DISTINCT FROM ARRAY[1, 2])) KSQL_COL_6,\n (NOT (INPUT.H IS DISTINCT FROM MAP('a':=1))) KSQL_COL_7,\n (NOT (INPUT.I IS DISTINCT FROM STRUCT(ID:=2))) KSQL_COL_8\nFROM INPUT INPUT\nEMIT CHANGES", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "OUTPUT", + "schema" : "`A` INTEGER KEY, `KSQL_COL_0` BOOLEAN, `KSQL_COL_1` BOOLEAN, `KSQL_COL_2` BOOLEAN, `KSQL_COL_3` BOOLEAN, `KSQL_COL_4` BOOLEAN, `KSQL_COL_5` BOOLEAN, `KSQL_COL_6` BOOLEAN, `KSQL_COL_7` BOOLEAN, `KSQL_COL_8` BOOLEAN", + "topicName" : "OUTPUT", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "orReplace" : false + }, + "queryPlan" : { + "sources" : [ "INPUT" ], + "sink" : "OUTPUT", + "physicalPlan" : { + "@type" : "streamSinkV1", + "properties" : { + "queryContext" : "OUTPUT" + }, + "source" : { + "@type" : "streamSelectV1", + "properties" : { + "queryContext" : "Project" + }, + "source" : { + "@type" : "streamSourceV1", + "properties" : { + "queryContext" : "KsqlTopic/Source" + }, + "topicName" : "test_topic", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "sourceSchema" : "`A` INTEGER KEY, `B` BOOLEAN, `C` BIGINT, `D` DOUBLE, `E` DECIMAL(4, 3), `F` STRING, `G` ARRAY, `H` MAP, `I` STRUCT<`ID` INTEGER>" + }, + "keyColumnNames" : [ "A" ], + "selectExpressions" : [ "(NOT (A IS DISTINCT FROM 1)) AS KSQL_COL_0", "(NOT (B IS DISTINCT FROM true)) AS KSQL_COL_1", "(NOT (C IS DISTINCT FROM 11)) AS KSQL_COL_2", "(NOT (D IS DISTINCT FROM 1.1)) AS KSQL_COL_3", "(NOT (E IS DISTINCT FROM 1.20)) AS KSQL_COL_4", "(NOT (F IS DISTINCT FROM 'foo')) AS KSQL_COL_5", "(NOT (G IS DISTINCT FROM ARRAY[1, 2])) AS KSQL_COL_6", "(NOT (H IS DISTINCT FROM MAP('a':=1))) AS KSQL_COL_7", "(NOT (I IS DISTINCT FROM STRUCT(ID:=2))) AS KSQL_COL_8" ] + }, + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "topicName" : "OUTPUT" + }, + "queryId" : "CSAS_OUTPUT_0" + } + } ], + "configs" : { + "ksql.extension.dir" : "ext", + "ksql.streams.cache.max.bytes.buffering" : "0", + "ksql.security.extension.class" : null, + "metric.reporters" : "", + "ksql.transient.prefix" : "transient_", + "ksql.query.status.running.threshold.seconds" : "300", + "ksql.streams.default.deserialization.exception.handler" : "io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler", + "ksql.output.topic.name.prefix" : "", + "ksql.query.pull.enable.standby.reads" : "false", + "ksql.streams.max.task.idle.ms" : "0", + "ksql.query.error.max.queue.size" : "10", + "ksql.internal.topic.min.insync.replicas" : "1", + "ksql.streams.shutdown.timeout.ms" : "300000", + "ksql.internal.topic.replicas" : "1", + "ksql.insert.into.values.enabled" : "true", + "ksql.query.pull.max.allowed.offset.lag" : "9223372036854775807", + "ksql.query.pull.max.qps" : "2147483647", + "ksql.access.validator.enable" : "auto", + "ksql.streams.bootstrap.servers" : "localhost:0", + "ksql.query.pull.metrics.enabled" : "false", + "ksql.create.or.replace.enabled" : "true", + "ksql.metrics.extension" : null, + "ksql.hidden.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.cast.strings.preserve.nulls" : "true", + "ksql.authorization.cache.max.entries" : "10000", + "ksql.pull.queries.enable" : "true", + "ksql.suppress.enabled" : "false", + "ksql.sink.window.change.log.additional.retention" : "1000000", + "ksql.readonly.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.query.persistent.active.limit" : "2147483647", + "ksql.persistence.wrap.single.values" : null, + "ksql.authorization.cache.expiry.time.secs" : "30", + "ksql.query.retry.backoff.initial.ms" : "15000", + "ksql.schema.registry.url" : "", + "ksql.properties.overrides.denylist" : "", + "ksql.streams.auto.offset.reset" : "earliest", + "ksql.connect.url" : "http://localhost:8083", + "ksql.service.id" : "some.ksql.service.id", + "ksql.streams.default.production.exception.handler" : "io.confluent.ksql.errors.ProductionExceptionHandlerUtil$LogAndFailProductionExceptionHandler", + "ksql.enable.metastore.backup" : "false", + "ksql.streams.commit.interval.ms" : "2000", + "ksql.streams.auto.commit.interval.ms" : "0", + "ksql.streams.topology.optimization" : "all", + "ksql.query.retry.backoff.max.ms" : "900000", + "ksql.streams.num.stream.threads" : "4", + "ksql.timestamp.throw.on.invalid" : "false", + "ksql.metrics.tags.custom" : "", + "ksql.udfs.enabled" : "true", + "ksql.udf.enable.security.manager" : "true", + "ksql.connect.worker.config" : "", + "ksql.udf.collect.metrics" : "false", + "ksql.persistent.prefix" : "query_", + "ksql.metastore.backup.location" : "", + "ksql.error.classifier.regex" : "", + "ksql.suppress.buffer.size.bytes" : "-1" + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/binary-comparison_-_is_not_distinct_from_(2)/6.1.0_1601291312322/spec.json b/ksqldb-functional-tests/src/test/resources/historical_plans/binary-comparison_-_is_not_distinct_from_(2)/6.1.0_1601291312322/spec.json new file mode 100644 index 000000000000..6e743f38e0fb --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/binary-comparison_-_is_not_distinct_from_(2)/6.1.0_1601291312322/spec.json @@ -0,0 +1,136 @@ +{ + "version" : "6.1.0", + "timestamp" : 1601291312322, + "path" : "query-validation-tests/binary-comparison.json", + "schemas" : { + "CSAS_OUTPUT_0.KsqlTopic.Source" : { + "schema" : "`A` INTEGER KEY, `B` BOOLEAN, `C` BIGINT, `D` DOUBLE, `E` DECIMAL(4, 3), `F` STRING, `G` ARRAY, `H` MAP, `I` STRUCT<`ID` INTEGER>", + "serdeOptions" : [ ] + }, + "CSAS_OUTPUT_0.OUTPUT" : { + "schema" : "`A` INTEGER KEY, `KSQL_COL_0` BOOLEAN, `KSQL_COL_1` BOOLEAN, `KSQL_COL_2` BOOLEAN, `KSQL_COL_3` BOOLEAN, `KSQL_COL_4` BOOLEAN, `KSQL_COL_5` BOOLEAN, `KSQL_COL_6` BOOLEAN, `KSQL_COL_7` BOOLEAN, `KSQL_COL_8` BOOLEAN", + "serdeOptions" : [ ] + } + }, + "testCase" : { + "name" : "is not distinct from (2)", + "inputs" : [ { + "topic" : "test_topic", + "key" : 1, + "value" : { + "B" : true, + "C" : 11, + "D" : 1.1, + "E" : 1.20, + "F" : "foo", + "G" : [ 1, 2 ], + "H" : { + "a" : 1 + }, + "I" : { + "id" : 2 + } + } + }, { + "topic" : "test_topic", + "key" : 2, + "value" : { + "B" : false, + "C" : 10, + "D" : 1.0, + "E" : 1.21, + "F" : "Foo", + "G" : [ 1 ], + "H" : { + "b" : 1 + }, + "I" : { + "id" : 3 + } + } + } ], + "outputs" : [ { + "topic" : "OUTPUT", + "key" : 1, + "value" : { + "KSQL_COL_0" : true, + "KSQL_COL_1" : true, + "KSQL_COL_2" : true, + "KSQL_COL_3" : true, + "KSQL_COL_4" : true, + "KSQL_COL_5" : true, + "KSQL_COL_6" : true, + "KSQL_COL_7" : true, + "KSQL_COL_8" : true + } + }, { + "topic" : "OUTPUT", + "key" : 2, + "value" : { + "KSQL_COL_0" : false, + "KSQL_COL_1" : false, + "KSQL_COL_2" : false, + "KSQL_COL_3" : false, + "KSQL_COL_4" : false, + "KSQL_COL_5" : false, + "KSQL_COL_6" : false, + "KSQL_COL_7" : false, + "KSQL_COL_8" : false + } + } ], + "topics" : [ { + "name" : "OUTPUT", + "replicas" : 1, + "numPartitions" : 4 + }, { + "name" : "test_topic", + "replicas" : 1, + "numPartitions" : 4 + } ], + "statements" : [ "CREATE STREAM INPUT (A INT KEY, B BOOLEAN, C BIGINT, D DOUBLE, E DECIMAL(4,3), F STRING, G ARRAY, H MAP, I STRUCT) WITH (kafka_topic='test_topic', value_format='JSON');", "CREATE STREAM OUTPUT AS SELECT A, A IS NOT DISTINCT FROM 1, B IS NOT DISTINCT FROM true, C IS NOT DISTINCT FROM 11, D IS NOT DISTINCT FROM 1.1, E IS NOT DISTINCT FROM 1.20, F IS NOT DISTINCT FROM 'foo', G IS NOT DISTINCT FROM ARRAY[1,2], H IS NOT DISTINCT FROM MAP('a':=1), I IS NOT DISTINCT FROM STRUCT(ID:=2) FROM INPUT;" ], + "post" : { + "sources" : [ { + "name" : "INPUT", + "type" : "STREAM", + "schema" : "`A` INTEGER KEY, `B` BOOLEAN, `C` BIGINT, `D` DOUBLE, `E` DECIMAL(4, 3), `F` STRING, `G` ARRAY, `H` MAP, `I` STRUCT<`ID` INTEGER>", + "keyFormat" : { + "format" : "KAFKA" + }, + "serdeOptions" : [ ] + }, { + "name" : "OUTPUT", + "type" : "STREAM", + "schema" : "`A` INTEGER KEY, `KSQL_COL_0` BOOLEAN, `KSQL_COL_1` BOOLEAN, `KSQL_COL_2` BOOLEAN, `KSQL_COL_3` BOOLEAN, `KSQL_COL_4` BOOLEAN, `KSQL_COL_5` BOOLEAN, `KSQL_COL_6` BOOLEAN, `KSQL_COL_7` BOOLEAN, `KSQL_COL_8` BOOLEAN", + "keyFormat" : { + "format" : "KAFKA" + }, + "serdeOptions" : [ ] + } ], + "topics" : { + "topics" : [ { + "name" : "test_topic", + "keyFormat" : { + "formatInfo" : { + "format" : "KAFKA" + } + }, + "valueFormat" : { + "format" : "JSON" + }, + "partitions" : 4 + }, { + "name" : "OUTPUT", + "keyFormat" : { + "formatInfo" : { + "format" : "KAFKA" + } + }, + "valueFormat" : { + "format" : "JSON" + }, + "partitions" : 4 + } ] + } + } + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/binary-comparison_-_is_not_distinct_from_(2)/6.1.0_1601291312322/topology b/ksqldb-functional-tests/src/test/resources/historical_plans/binary-comparison_-_is_not_distinct_from_(2)/6.1.0_1601291312322/topology new file mode 100644 index 000000000000..441a8f282644 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/binary-comparison_-_is_not_distinct_from_(2)/6.1.0_1601291312322/topology @@ -0,0 +1,13 @@ +Topologies: + Sub-topology: 0 + Source: KSTREAM-SOURCE-0000000000 (topics: [test_topic]) + --> KSTREAM-TRANSFORMVALUES-0000000001 + Processor: KSTREAM-TRANSFORMVALUES-0000000001 (stores: []) + --> Project + <-- KSTREAM-SOURCE-0000000000 + Processor: Project (stores: []) + --> KSTREAM-SINK-0000000003 + <-- KSTREAM-TRANSFORMVALUES-0000000001 + Sink: KSTREAM-SINK-0000000003 (topic: OUTPUT) + <-- Project + diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/binary-comparison_-_map_equality/6.1.0_1601291312494/plan.json b/ksqldb-functional-tests/src/test/resources/historical_plans/binary-comparison_-_map_equality/6.1.0_1601291312494/plan.json new file mode 100644 index 000000000000..c2cb22f1e228 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/binary-comparison_-_map_equality/6.1.0_1601291312494/plan.json @@ -0,0 +1,140 @@ +{ + "plan" : [ { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM INPUT (A INTEGER KEY, B MAP, C MAP) WITH (KAFKA_TOPIC='test_topic', VALUE_FORMAT='JSON');", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "INPUT", + "schema" : "`A` INTEGER KEY, `B` MAP, `C` MAP", + "topicName" : "test_topic", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "orReplace" : false + } + }, { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM OUTPUT AS SELECT\n INPUT.A A,\n (INPUT.B = INPUT.C) KSQL_COL_0,\n (INPUT.B <> INPUT.C) KSQL_COL_1,\n (NOT (INPUT.B IS DISTINCT FROM INPUT.C)) KSQL_COL_2,\n (INPUT.B IS DISTINCT FROM INPUT.C) KSQL_COL_3\nFROM INPUT INPUT\nEMIT CHANGES", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "OUTPUT", + "schema" : "`A` INTEGER KEY, `KSQL_COL_0` BOOLEAN, `KSQL_COL_1` BOOLEAN, `KSQL_COL_2` BOOLEAN, `KSQL_COL_3` BOOLEAN", + "topicName" : "OUTPUT", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "orReplace" : false + }, + "queryPlan" : { + "sources" : [ "INPUT" ], + "sink" : "OUTPUT", + "physicalPlan" : { + "@type" : "streamSinkV1", + "properties" : { + "queryContext" : "OUTPUT" + }, + "source" : { + "@type" : "streamSelectV1", + "properties" : { + "queryContext" : "Project" + }, + "source" : { + "@type" : "streamSourceV1", + "properties" : { + "queryContext" : "KsqlTopic/Source" + }, + "topicName" : "test_topic", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "sourceSchema" : "`A` INTEGER KEY, `B` MAP, `C` MAP" + }, + "keyColumnNames" : [ "A" ], + "selectExpressions" : [ "(B = C) AS KSQL_COL_0", "(B <> C) AS KSQL_COL_1", "(NOT (B IS DISTINCT FROM C)) AS KSQL_COL_2", "(B IS DISTINCT FROM C) AS KSQL_COL_3" ] + }, + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "topicName" : "OUTPUT" + }, + "queryId" : "CSAS_OUTPUT_0" + } + } ], + "configs" : { + "ksql.extension.dir" : "ext", + "ksql.streams.cache.max.bytes.buffering" : "0", + "ksql.security.extension.class" : null, + "metric.reporters" : "", + "ksql.transient.prefix" : "transient_", + "ksql.query.status.running.threshold.seconds" : "300", + "ksql.streams.default.deserialization.exception.handler" : "io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler", + "ksql.output.topic.name.prefix" : "", + "ksql.query.pull.enable.standby.reads" : "false", + "ksql.streams.max.task.idle.ms" : "0", + "ksql.query.error.max.queue.size" : "10", + "ksql.internal.topic.min.insync.replicas" : "1", + "ksql.streams.shutdown.timeout.ms" : "300000", + "ksql.internal.topic.replicas" : "1", + "ksql.insert.into.values.enabled" : "true", + "ksql.query.pull.max.allowed.offset.lag" : "9223372036854775807", + "ksql.query.pull.max.qps" : "2147483647", + "ksql.access.validator.enable" : "auto", + "ksql.streams.bootstrap.servers" : "localhost:0", + "ksql.query.pull.metrics.enabled" : "false", + "ksql.create.or.replace.enabled" : "true", + "ksql.metrics.extension" : null, + "ksql.hidden.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.cast.strings.preserve.nulls" : "true", + "ksql.authorization.cache.max.entries" : "10000", + "ksql.pull.queries.enable" : "true", + "ksql.suppress.enabled" : "false", + "ksql.sink.window.change.log.additional.retention" : "1000000", + "ksql.readonly.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.query.persistent.active.limit" : "2147483647", + "ksql.persistence.wrap.single.values" : null, + "ksql.authorization.cache.expiry.time.secs" : "30", + "ksql.query.retry.backoff.initial.ms" : "15000", + "ksql.schema.registry.url" : "", + "ksql.properties.overrides.denylist" : "", + "ksql.streams.auto.offset.reset" : "earliest", + "ksql.connect.url" : "http://localhost:8083", + "ksql.service.id" : "some.ksql.service.id", + "ksql.streams.default.production.exception.handler" : "io.confluent.ksql.errors.ProductionExceptionHandlerUtil$LogAndFailProductionExceptionHandler", + "ksql.enable.metastore.backup" : "false", + "ksql.streams.commit.interval.ms" : "2000", + "ksql.streams.auto.commit.interval.ms" : "0", + "ksql.streams.topology.optimization" : "all", + "ksql.query.retry.backoff.max.ms" : "900000", + "ksql.streams.num.stream.threads" : "4", + "ksql.timestamp.throw.on.invalid" : "false", + "ksql.metrics.tags.custom" : "", + "ksql.udfs.enabled" : "true", + "ksql.udf.enable.security.manager" : "true", + "ksql.connect.worker.config" : "", + "ksql.udf.collect.metrics" : "false", + "ksql.persistent.prefix" : "query_", + "ksql.metastore.backup.location" : "", + "ksql.error.classifier.regex" : "", + "ksql.suppress.buffer.size.bytes" : "-1" + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/binary-comparison_-_map_equality/6.1.0_1601291312494/spec.json b/ksqldb-functional-tests/src/test/resources/historical_plans/binary-comparison_-_map_equality/6.1.0_1601291312494/spec.json new file mode 100644 index 000000000000..9ed361248add --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/binary-comparison_-_map_equality/6.1.0_1601291312494/spec.json @@ -0,0 +1,134 @@ +{ + "version" : "6.1.0", + "timestamp" : 1601291312494, + "path" : "query-validation-tests/binary-comparison.json", + "schemas" : { + "CSAS_OUTPUT_0.KsqlTopic.Source" : { + "schema" : "`A` INTEGER KEY, `B` MAP, `C` MAP", + "serdeOptions" : [ ] + }, + "CSAS_OUTPUT_0.OUTPUT" : { + "schema" : "`A` INTEGER KEY, `KSQL_COL_0` BOOLEAN, `KSQL_COL_1` BOOLEAN, `KSQL_COL_2` BOOLEAN, `KSQL_COL_3` BOOLEAN", + "serdeOptions" : [ ] + } + }, + "testCase" : { + "name" : "map equality", + "inputs" : [ { + "topic" : "test_topic", + "key" : 1, + "value" : { + "B" : { + "a" : 1 + }, + "C" : { + "a" : 1.0 + } + } + }, { + "topic" : "test_topic", + "key" : 2, + "value" : { + "B" : { + "a" : 1 + }, + "C" : { + "a" : 2 + } + } + }, { + "topic" : "test_topic", + "key" : 3, + "value" : { + "B" : { + "a" : 1 + }, + "C" : { + "b" : 1 + } + } + } ], + "outputs" : [ { + "topic" : "OUTPUT", + "key" : 1, + "value" : { + "KSQL_COL_0" : true, + "KSQL_COL_1" : false, + "KSQL_COL_2" : true, + "KSQL_COL_3" : false + } + }, { + "topic" : "OUTPUT", + "key" : 2, + "value" : { + "KSQL_COL_0" : false, + "KSQL_COL_1" : true, + "KSQL_COL_2" : false, + "KSQL_COL_3" : true + } + }, { + "topic" : "OUTPUT", + "key" : 3, + "value" : { + "KSQL_COL_0" : false, + "KSQL_COL_1" : true, + "KSQL_COL_2" : false, + "KSQL_COL_3" : true + } + } ], + "topics" : [ { + "name" : "OUTPUT", + "replicas" : 1, + "numPartitions" : 4 + }, { + "name" : "test_topic", + "replicas" : 1, + "numPartitions" : 4 + } ], + "statements" : [ "CREATE STREAM INPUT (A INT KEY, B MAP, C MAP) WITH (kafka_topic='test_topic', value_format='JSON');", "CREATE STREAM OUTPUT AS SELECT A, B = C, B <> C, B IS NOT DISTINCT FROM C, B IS DISTINCT FROM C FROM INPUT;" ], + "post" : { + "sources" : [ { + "name" : "INPUT", + "type" : "STREAM", + "schema" : "`A` INTEGER KEY, `B` MAP, `C` MAP", + "keyFormat" : { + "format" : "KAFKA" + }, + "serdeOptions" : [ ] + }, { + "name" : "OUTPUT", + "type" : "STREAM", + "schema" : "`A` INTEGER KEY, `KSQL_COL_0` BOOLEAN, `KSQL_COL_1` BOOLEAN, `KSQL_COL_2` BOOLEAN, `KSQL_COL_3` BOOLEAN", + "keyFormat" : { + "format" : "KAFKA" + }, + "serdeOptions" : [ ] + } ], + "topics" : { + "topics" : [ { + "name" : "test_topic", + "keyFormat" : { + "formatInfo" : { + "format" : "KAFKA" + } + }, + "valueFormat" : { + "format" : "JSON" + }, + "partitions" : 4 + }, { + "name" : "OUTPUT", + "keyFormat" : { + "formatInfo" : { + "format" : "KAFKA" + } + }, + "valueFormat" : { + "format" : "JSON" + }, + "partitions" : 4 + } ] + } + } + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/binary-comparison_-_map_equality/6.1.0_1601291312494/topology b/ksqldb-functional-tests/src/test/resources/historical_plans/binary-comparison_-_map_equality/6.1.0_1601291312494/topology new file mode 100644 index 000000000000..441a8f282644 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/binary-comparison_-_map_equality/6.1.0_1601291312494/topology @@ -0,0 +1,13 @@ +Topologies: + Sub-topology: 0 + Source: KSTREAM-SOURCE-0000000000 (topics: [test_topic]) + --> KSTREAM-TRANSFORMVALUES-0000000001 + Processor: KSTREAM-TRANSFORMVALUES-0000000001 (stores: []) + --> Project + <-- KSTREAM-SOURCE-0000000000 + Processor: Project (stores: []) + --> KSTREAM-SINK-0000000003 + <-- KSTREAM-TRANSFORMVALUES-0000000001 + Sink: KSTREAM-SINK-0000000003 (topic: OUTPUT) + <-- Project + diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/binary-comparison_-_nested_collections_comparison/6.1.0_1601291312596/plan.json b/ksqldb-functional-tests/src/test/resources/historical_plans/binary-comparison_-_nested_collections_comparison/6.1.0_1601291312596/plan.json new file mode 100644 index 000000000000..f4d343462b9c --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/binary-comparison_-_nested_collections_comparison/6.1.0_1601291312596/plan.json @@ -0,0 +1,140 @@ +{ + "plan" : [ { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM INPUT (A INTEGER KEY, B ARRAY>, C ARRAY>) WITH (KAFKA_TOPIC='test_topic', VALUE_FORMAT='JSON');", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "INPUT", + "schema" : "`A` INTEGER KEY, `B` ARRAY>, `C` ARRAY>", + "topicName" : "test_topic", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "orReplace" : false + } + }, { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM OUTPUT AS SELECT\n INPUT.A A,\n (INPUT.B = INPUT.C) KSQL_COL_0\nFROM INPUT INPUT\nEMIT CHANGES", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "OUTPUT", + "schema" : "`A` INTEGER KEY, `KSQL_COL_0` BOOLEAN", + "topicName" : "OUTPUT", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "orReplace" : false + }, + "queryPlan" : { + "sources" : [ "INPUT" ], + "sink" : "OUTPUT", + "physicalPlan" : { + "@type" : "streamSinkV1", + "properties" : { + "queryContext" : "OUTPUT" + }, + "source" : { + "@type" : "streamSelectV1", + "properties" : { + "queryContext" : "Project" + }, + "source" : { + "@type" : "streamSourceV1", + "properties" : { + "queryContext" : "KsqlTopic/Source" + }, + "topicName" : "test_topic", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "sourceSchema" : "`A` INTEGER KEY, `B` ARRAY>, `C` ARRAY>" + }, + "keyColumnNames" : [ "A" ], + "selectExpressions" : [ "(B = C) AS KSQL_COL_0" ] + }, + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "topicName" : "OUTPUT" + }, + "queryId" : "CSAS_OUTPUT_0" + } + } ], + "configs" : { + "ksql.extension.dir" : "ext", + "ksql.streams.cache.max.bytes.buffering" : "0", + "ksql.security.extension.class" : null, + "metric.reporters" : "", + "ksql.transient.prefix" : "transient_", + "ksql.query.status.running.threshold.seconds" : "300", + "ksql.streams.default.deserialization.exception.handler" : "io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler", + "ksql.output.topic.name.prefix" : "", + "ksql.query.pull.enable.standby.reads" : "false", + "ksql.streams.max.task.idle.ms" : "0", + "ksql.query.error.max.queue.size" : "10", + "ksql.internal.topic.min.insync.replicas" : "1", + "ksql.streams.shutdown.timeout.ms" : "300000", + "ksql.internal.topic.replicas" : "1", + "ksql.insert.into.values.enabled" : "true", + "ksql.query.pull.max.allowed.offset.lag" : "9223372036854775807", + "ksql.query.pull.max.qps" : "2147483647", + "ksql.access.validator.enable" : "auto", + "ksql.streams.bootstrap.servers" : "localhost:0", + "ksql.query.pull.metrics.enabled" : "false", + "ksql.create.or.replace.enabled" : "true", + "ksql.metrics.extension" : null, + "ksql.hidden.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.cast.strings.preserve.nulls" : "true", + "ksql.authorization.cache.max.entries" : "10000", + "ksql.pull.queries.enable" : "true", + "ksql.suppress.enabled" : "false", + "ksql.sink.window.change.log.additional.retention" : "1000000", + "ksql.readonly.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.query.persistent.active.limit" : "2147483647", + "ksql.persistence.wrap.single.values" : null, + "ksql.authorization.cache.expiry.time.secs" : "30", + "ksql.query.retry.backoff.initial.ms" : "15000", + "ksql.schema.registry.url" : "", + "ksql.properties.overrides.denylist" : "", + "ksql.streams.auto.offset.reset" : "earliest", + "ksql.connect.url" : "http://localhost:8083", + "ksql.service.id" : "some.ksql.service.id", + "ksql.streams.default.production.exception.handler" : "io.confluent.ksql.errors.ProductionExceptionHandlerUtil$LogAndFailProductionExceptionHandler", + "ksql.enable.metastore.backup" : "false", + "ksql.streams.commit.interval.ms" : "2000", + "ksql.streams.auto.commit.interval.ms" : "0", + "ksql.streams.topology.optimization" : "all", + "ksql.query.retry.backoff.max.ms" : "900000", + "ksql.streams.num.stream.threads" : "4", + "ksql.timestamp.throw.on.invalid" : "false", + "ksql.metrics.tags.custom" : "", + "ksql.udfs.enabled" : "true", + "ksql.udf.enable.security.manager" : "true", + "ksql.connect.worker.config" : "", + "ksql.udf.collect.metrics" : "false", + "ksql.persistent.prefix" : "query_", + "ksql.metastore.backup.location" : "", + "ksql.error.classifier.regex" : "", + "ksql.suppress.buffer.size.bytes" : "-1" + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/binary-comparison_-_nested_collections_comparison/6.1.0_1601291312596/spec.json b/ksqldb-functional-tests/src/test/resources/historical_plans/binary-comparison_-_nested_collections_comparison/6.1.0_1601291312596/spec.json new file mode 100644 index 000000000000..f9c7006744cb --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/binary-comparison_-_nested_collections_comparison/6.1.0_1601291312596/spec.json @@ -0,0 +1,128 @@ +{ + "version" : "6.1.0", + "timestamp" : 1601291312596, + "path" : "query-validation-tests/binary-comparison.json", + "schemas" : { + "CSAS_OUTPUT_0.KsqlTopic.Source" : { + "schema" : "`A` INTEGER KEY, `B` ARRAY>, `C` ARRAY>", + "serdeOptions" : [ ] + }, + "CSAS_OUTPUT_0.OUTPUT" : { + "schema" : "`A` INTEGER KEY, `KSQL_COL_0` BOOLEAN", + "serdeOptions" : [ ] + } + }, + "testCase" : { + "name" : "nested collections comparison", + "inputs" : [ { + "topic" : "test_topic", + "key" : 1, + "value" : { + "B" : [ { + "X" : { + "foo" : 1 + } + }, { + "Y" : { + "bar" : 2 + } + } ], + "C" : [ { + "Y" : { + "foo" : 2 + } + }, { + "X" : { + "bar" : 1 + } + } ] + } + }, { + "topic" : "test_topic", + "key" : 2, + "value" : { + "B" : [ { + "X" : { + "foo" : 1 + } + }, { + "Y" : { + "bar" : 2 + } + } ], + "C" : [ { + "X" : { + "foo" : 1 + } + } ] + } + } ], + "outputs" : [ { + "topic" : "OUTPUT", + "key" : 1, + "value" : { + "KSQL_COL_0" : true + } + }, { + "topic" : "OUTPUT", + "key" : 2, + "value" : { + "KSQL_COL_0" : false + } + } ], + "topics" : [ { + "name" : "OUTPUT", + "replicas" : 1, + "numPartitions" : 4 + }, { + "name" : "test_topic", + "replicas" : 1, + "numPartitions" : 4 + } ], + "statements" : [ "CREATE STREAM INPUT (A INT KEY, B ARRAY>, C ARRAY>) WITH (kafka_topic='test_topic', value_format='JSON');", "CREATE STREAM OUTPUT AS SELECT A, B = C FROM INPUT;" ], + "post" : { + "sources" : [ { + "name" : "INPUT", + "type" : "STREAM", + "schema" : "`A` INTEGER KEY, `B` ARRAY>, `C` ARRAY>", + "keyFormat" : { + "format" : "KAFKA" + }, + "serdeOptions" : [ ] + }, { + "name" : "OUTPUT", + "type" : "STREAM", + "schema" : "`A` INTEGER KEY, `KSQL_COL_0` BOOLEAN", + "keyFormat" : { + "format" : "KAFKA" + }, + "serdeOptions" : [ ] + } ], + "topics" : { + "topics" : [ { + "name" : "test_topic", + "keyFormat" : { + "formatInfo" : { + "format" : "KAFKA" + } + }, + "valueFormat" : { + "format" : "JSON" + }, + "partitions" : 4 + }, { + "name" : "OUTPUT", + "keyFormat" : { + "formatInfo" : { + "format" : "KAFKA" + } + }, + "valueFormat" : { + "format" : "JSON" + }, + "partitions" : 4 + } ] + } + } + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/binary-comparison_-_nested_collections_comparison/6.1.0_1601291312596/topology b/ksqldb-functional-tests/src/test/resources/historical_plans/binary-comparison_-_nested_collections_comparison/6.1.0_1601291312596/topology new file mode 100644 index 000000000000..441a8f282644 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/binary-comparison_-_nested_collections_comparison/6.1.0_1601291312596/topology @@ -0,0 +1,13 @@ +Topologies: + Sub-topology: 0 + Source: KSTREAM-SOURCE-0000000000 (topics: [test_topic]) + --> KSTREAM-TRANSFORMVALUES-0000000001 + Processor: KSTREAM-TRANSFORMVALUES-0000000001 (stores: []) + --> Project + <-- KSTREAM-SOURCE-0000000000 + Processor: Project (stores: []) + --> KSTREAM-SINK-0000000003 + <-- KSTREAM-TRANSFORMVALUES-0000000001 + Sink: KSTREAM-SINK-0000000003 (topic: OUTPUT) + <-- Project + diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/binary-comparison_-_not_equals/6.1.0_1601291311729/plan.json b/ksqldb-functional-tests/src/test/resources/historical_plans/binary-comparison_-_not_equals/6.1.0_1601291311729/plan.json new file mode 100644 index 000000000000..ce83e1ff9584 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/binary-comparison_-_not_equals/6.1.0_1601291311729/plan.json @@ -0,0 +1,140 @@ +{ + "plan" : [ { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM INPUT (A INTEGER KEY, B BOOLEAN, C BIGINT, D DOUBLE, E DECIMAL(4, 3), F STRING, G ARRAY, H MAP, I STRUCT) WITH (KAFKA_TOPIC='test_topic', VALUE_FORMAT='JSON');", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "INPUT", + "schema" : "`A` INTEGER KEY, `B` BOOLEAN, `C` BIGINT, `D` DOUBLE, `E` DECIMAL(4, 3), `F` STRING, `G` ARRAY, `H` MAP, `I` STRUCT<`ID` INTEGER>", + "topicName" : "test_topic", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "orReplace" : false + } + }, { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM OUTPUT AS SELECT\n INPUT.A A,\n (INPUT.A <> 1) KSQL_COL_0,\n (INPUT.B <> true) KSQL_COL_1,\n (INPUT.C <> 11) KSQL_COL_2,\n (INPUT.D <> 1.1) KSQL_COL_3,\n (INPUT.E <> 1.20) KSQL_COL_4,\n (INPUT.F <> 'foo') KSQL_COL_5,\n (INPUT.G <> ARRAY[1, 2]) KSQL_COL_6,\n (INPUT.H <> MAP('a':=1)) KSQL_COL_7,\n (INPUT.I <> STRUCT(ID:=2)) KSQL_COL_8\nFROM INPUT INPUT\nEMIT CHANGES", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "OUTPUT", + "schema" : "`A` INTEGER KEY, `KSQL_COL_0` BOOLEAN, `KSQL_COL_1` BOOLEAN, `KSQL_COL_2` BOOLEAN, `KSQL_COL_3` BOOLEAN, `KSQL_COL_4` BOOLEAN, `KSQL_COL_5` BOOLEAN, `KSQL_COL_6` BOOLEAN, `KSQL_COL_7` BOOLEAN, `KSQL_COL_8` BOOLEAN", + "topicName" : "OUTPUT", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "orReplace" : false + }, + "queryPlan" : { + "sources" : [ "INPUT" ], + "sink" : "OUTPUT", + "physicalPlan" : { + "@type" : "streamSinkV1", + "properties" : { + "queryContext" : "OUTPUT" + }, + "source" : { + "@type" : "streamSelectV1", + "properties" : { + "queryContext" : "Project" + }, + "source" : { + "@type" : "streamSourceV1", + "properties" : { + "queryContext" : "KsqlTopic/Source" + }, + "topicName" : "test_topic", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "sourceSchema" : "`A` INTEGER KEY, `B` BOOLEAN, `C` BIGINT, `D` DOUBLE, `E` DECIMAL(4, 3), `F` STRING, `G` ARRAY, `H` MAP, `I` STRUCT<`ID` INTEGER>" + }, + "keyColumnNames" : [ "A" ], + "selectExpressions" : [ "(A <> 1) AS KSQL_COL_0", "(B <> true) AS KSQL_COL_1", "(C <> 11) AS KSQL_COL_2", "(D <> 1.1) AS KSQL_COL_3", "(E <> 1.20) AS KSQL_COL_4", "(F <> 'foo') AS KSQL_COL_5", "(G <> ARRAY[1, 2]) AS KSQL_COL_6", "(H <> MAP('a':=1)) AS KSQL_COL_7", "(I <> STRUCT(ID:=2)) AS KSQL_COL_8" ] + }, + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "topicName" : "OUTPUT" + }, + "queryId" : "CSAS_OUTPUT_0" + } + } ], + "configs" : { + "ksql.extension.dir" : "ext", + "ksql.streams.cache.max.bytes.buffering" : "0", + "ksql.security.extension.class" : null, + "metric.reporters" : "", + "ksql.transient.prefix" : "transient_", + "ksql.query.status.running.threshold.seconds" : "300", + "ksql.streams.default.deserialization.exception.handler" : "io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler", + "ksql.output.topic.name.prefix" : "", + "ksql.query.pull.enable.standby.reads" : "false", + "ksql.streams.max.task.idle.ms" : "0", + "ksql.query.error.max.queue.size" : "10", + "ksql.internal.topic.min.insync.replicas" : "1", + "ksql.streams.shutdown.timeout.ms" : "300000", + "ksql.internal.topic.replicas" : "1", + "ksql.insert.into.values.enabled" : "true", + "ksql.query.pull.max.allowed.offset.lag" : "9223372036854775807", + "ksql.query.pull.max.qps" : "2147483647", + "ksql.access.validator.enable" : "auto", + "ksql.streams.bootstrap.servers" : "localhost:0", + "ksql.query.pull.metrics.enabled" : "false", + "ksql.create.or.replace.enabled" : "true", + "ksql.metrics.extension" : null, + "ksql.hidden.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.cast.strings.preserve.nulls" : "true", + "ksql.authorization.cache.max.entries" : "10000", + "ksql.pull.queries.enable" : "true", + "ksql.suppress.enabled" : "false", + "ksql.sink.window.change.log.additional.retention" : "1000000", + "ksql.readonly.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.query.persistent.active.limit" : "2147483647", + "ksql.persistence.wrap.single.values" : null, + "ksql.authorization.cache.expiry.time.secs" : "30", + "ksql.query.retry.backoff.initial.ms" : "15000", + "ksql.schema.registry.url" : "", + "ksql.properties.overrides.denylist" : "", + "ksql.streams.auto.offset.reset" : "earliest", + "ksql.connect.url" : "http://localhost:8083", + "ksql.service.id" : "some.ksql.service.id", + "ksql.streams.default.production.exception.handler" : "io.confluent.ksql.errors.ProductionExceptionHandlerUtil$LogAndFailProductionExceptionHandler", + "ksql.enable.metastore.backup" : "false", + "ksql.streams.commit.interval.ms" : "2000", + "ksql.streams.auto.commit.interval.ms" : "0", + "ksql.streams.topology.optimization" : "all", + "ksql.query.retry.backoff.max.ms" : "900000", + "ksql.streams.num.stream.threads" : "4", + "ksql.timestamp.throw.on.invalid" : "false", + "ksql.metrics.tags.custom" : "", + "ksql.udfs.enabled" : "true", + "ksql.udf.enable.security.manager" : "true", + "ksql.connect.worker.config" : "", + "ksql.udf.collect.metrics" : "false", + "ksql.persistent.prefix" : "query_", + "ksql.metastore.backup.location" : "", + "ksql.error.classifier.regex" : "", + "ksql.suppress.buffer.size.bytes" : "-1" + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/binary-comparison_-_not_equals/6.1.0_1601291311729/spec.json b/ksqldb-functional-tests/src/test/resources/historical_plans/binary-comparison_-_not_equals/6.1.0_1601291311729/spec.json new file mode 100644 index 000000000000..174a5c72ea2b --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/binary-comparison_-_not_equals/6.1.0_1601291311729/spec.json @@ -0,0 +1,136 @@ +{ + "version" : "6.1.0", + "timestamp" : 1601291311729, + "path" : "query-validation-tests/binary-comparison.json", + "schemas" : { + "CSAS_OUTPUT_0.KsqlTopic.Source" : { + "schema" : "`A` INTEGER KEY, `B` BOOLEAN, `C` BIGINT, `D` DOUBLE, `E` DECIMAL(4, 3), `F` STRING, `G` ARRAY, `H` MAP, `I` STRUCT<`ID` INTEGER>", + "serdeOptions" : [ ] + }, + "CSAS_OUTPUT_0.OUTPUT" : { + "schema" : "`A` INTEGER KEY, `KSQL_COL_0` BOOLEAN, `KSQL_COL_1` BOOLEAN, `KSQL_COL_2` BOOLEAN, `KSQL_COL_3` BOOLEAN, `KSQL_COL_4` BOOLEAN, `KSQL_COL_5` BOOLEAN, `KSQL_COL_6` BOOLEAN, `KSQL_COL_7` BOOLEAN, `KSQL_COL_8` BOOLEAN", + "serdeOptions" : [ ] + } + }, + "testCase" : { + "name" : "not equals", + "inputs" : [ { + "topic" : "test_topic", + "key" : 1, + "value" : { + "B" : true, + "C" : 11, + "D" : 1.1, + "E" : 1.20, + "F" : "foo", + "G" : [ 1, 2 ], + "H" : { + "a" : 1 + }, + "I" : { + "id" : 2 + } + } + }, { + "topic" : "test_topic", + "key" : 2, + "value" : { + "B" : false, + "C" : 10, + "D" : 1.0, + "E" : 1.21, + "F" : "Foo", + "G" : [ 1 ], + "H" : { + "b" : 1 + }, + "I" : { + "id" : 3 + } + } + } ], + "outputs" : [ { + "topic" : "OUTPUT", + "key" : 1, + "value" : { + "KSQL_COL_0" : false, + "KSQL_COL_1" : false, + "KSQL_COL_2" : false, + "KSQL_COL_3" : false, + "KSQL_COL_4" : false, + "KSQL_COL_5" : false, + "KSQL_COL_6" : false, + "KSQL_COL_7" : false, + "KSQL_COL_8" : false + } + }, { + "topic" : "OUTPUT", + "key" : 2, + "value" : { + "KSQL_COL_0" : true, + "KSQL_COL_1" : true, + "KSQL_COL_2" : true, + "KSQL_COL_3" : true, + "KSQL_COL_4" : true, + "KSQL_COL_5" : true, + "KSQL_COL_6" : true, + "KSQL_COL_7" : true, + "KSQL_COL_8" : true + } + } ], + "topics" : [ { + "name" : "OUTPUT", + "replicas" : 1, + "numPartitions" : 4 + }, { + "name" : "test_topic", + "replicas" : 1, + "numPartitions" : 4 + } ], + "statements" : [ "CREATE STREAM INPUT (A INT KEY, B BOOLEAN, C BIGINT, D DOUBLE, E DECIMAL(4,3), F STRING, G ARRAY, H MAP, I STRUCT) WITH (kafka_topic='test_topic', value_format='JSON');", "CREATE STREAM OUTPUT AS SELECT A, A <> 1, B <> true, C <> 11, D <> 1.1, E <> 1.20, F <> 'foo', G <> ARRAY[1,2], H <> MAP('a':=1), I <> STRUCT(ID:=2) FROM INPUT;" ], + "post" : { + "sources" : [ { + "name" : "INPUT", + "type" : "STREAM", + "schema" : "`A` INTEGER KEY, `B` BOOLEAN, `C` BIGINT, `D` DOUBLE, `E` DECIMAL(4, 3), `F` STRING, `G` ARRAY, `H` MAP, `I` STRUCT<`ID` INTEGER>", + "keyFormat" : { + "format" : "KAFKA" + }, + "serdeOptions" : [ ] + }, { + "name" : "OUTPUT", + "type" : "STREAM", + "schema" : "`A` INTEGER KEY, `KSQL_COL_0` BOOLEAN, `KSQL_COL_1` BOOLEAN, `KSQL_COL_2` BOOLEAN, `KSQL_COL_3` BOOLEAN, `KSQL_COL_4` BOOLEAN, `KSQL_COL_5` BOOLEAN, `KSQL_COL_6` BOOLEAN, `KSQL_COL_7` BOOLEAN, `KSQL_COL_8` BOOLEAN", + "keyFormat" : { + "format" : "KAFKA" + }, + "serdeOptions" : [ ] + } ], + "topics" : { + "topics" : [ { + "name" : "test_topic", + "keyFormat" : { + "formatInfo" : { + "format" : "KAFKA" + } + }, + "valueFormat" : { + "format" : "JSON" + }, + "partitions" : 4 + }, { + "name" : "OUTPUT", + "keyFormat" : { + "formatInfo" : { + "format" : "KAFKA" + } + }, + "valueFormat" : { + "format" : "JSON" + }, + "partitions" : 4 + } ] + } + } + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/binary-comparison_-_not_equals/6.1.0_1601291311729/topology b/ksqldb-functional-tests/src/test/resources/historical_plans/binary-comparison_-_not_equals/6.1.0_1601291311729/topology new file mode 100644 index 000000000000..441a8f282644 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/binary-comparison_-_not_equals/6.1.0_1601291311729/topology @@ -0,0 +1,13 @@ +Topologies: + Sub-topology: 0 + Source: KSTREAM-SOURCE-0000000000 (topics: [test_topic]) + --> KSTREAM-TRANSFORMVALUES-0000000001 + Processor: KSTREAM-TRANSFORMVALUES-0000000001 (stores: []) + --> Project + <-- KSTREAM-SOURCE-0000000000 + Processor: Project (stores: []) + --> KSTREAM-SINK-0000000003 + <-- KSTREAM-TRANSFORMVALUES-0000000001 + Sink: KSTREAM-SINK-0000000003 (topic: OUTPUT) + <-- Project + diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/binary-comparison_-_struct_equality/6.1.0_1601291312550/plan.json b/ksqldb-functional-tests/src/test/resources/historical_plans/binary-comparison_-_struct_equality/6.1.0_1601291312550/plan.json new file mode 100644 index 000000000000..8f97914309db --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/binary-comparison_-_struct_equality/6.1.0_1601291312550/plan.json @@ -0,0 +1,140 @@ +{ + "plan" : [ { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM INPUT (A INTEGER KEY, B STRUCT, C STRUCT) WITH (KAFKA_TOPIC='test_topic', VALUE_FORMAT='JSON');", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "INPUT", + "schema" : "`A` INTEGER KEY, `B` STRUCT<`ID` INTEGER>, `C` STRUCT<`ID` INTEGER>", + "topicName" : "test_topic", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "orReplace" : false + } + }, { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM OUTPUT AS SELECT\n INPUT.A A,\n (INPUT.B = INPUT.C) KSQL_COL_0,\n (INPUT.B <> INPUT.C) KSQL_COL_1,\n (NOT (INPUT.B IS DISTINCT FROM INPUT.C)) KSQL_COL_2,\n (INPUT.B IS DISTINCT FROM INPUT.C) KSQL_COL_3\nFROM INPUT INPUT\nEMIT CHANGES", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "OUTPUT", + "schema" : "`A` INTEGER KEY, `KSQL_COL_0` BOOLEAN, `KSQL_COL_1` BOOLEAN, `KSQL_COL_2` BOOLEAN, `KSQL_COL_3` BOOLEAN", + "topicName" : "OUTPUT", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "orReplace" : false + }, + "queryPlan" : { + "sources" : [ "INPUT" ], + "sink" : "OUTPUT", + "physicalPlan" : { + "@type" : "streamSinkV1", + "properties" : { + "queryContext" : "OUTPUT" + }, + "source" : { + "@type" : "streamSelectV1", + "properties" : { + "queryContext" : "Project" + }, + "source" : { + "@type" : "streamSourceV1", + "properties" : { + "queryContext" : "KsqlTopic/Source" + }, + "topicName" : "test_topic", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "sourceSchema" : "`A` INTEGER KEY, `B` STRUCT<`ID` INTEGER>, `C` STRUCT<`ID` INTEGER>" + }, + "keyColumnNames" : [ "A" ], + "selectExpressions" : [ "(B = C) AS KSQL_COL_0", "(B <> C) AS KSQL_COL_1", "(NOT (B IS DISTINCT FROM C)) AS KSQL_COL_2", "(B IS DISTINCT FROM C) AS KSQL_COL_3" ] + }, + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "topicName" : "OUTPUT" + }, + "queryId" : "CSAS_OUTPUT_0" + } + } ], + "configs" : { + "ksql.extension.dir" : "ext", + "ksql.streams.cache.max.bytes.buffering" : "0", + "ksql.security.extension.class" : null, + "metric.reporters" : "", + "ksql.transient.prefix" : "transient_", + "ksql.query.status.running.threshold.seconds" : "300", + "ksql.streams.default.deserialization.exception.handler" : "io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler", + "ksql.output.topic.name.prefix" : "", + "ksql.query.pull.enable.standby.reads" : "false", + "ksql.streams.max.task.idle.ms" : "0", + "ksql.query.error.max.queue.size" : "10", + "ksql.internal.topic.min.insync.replicas" : "1", + "ksql.streams.shutdown.timeout.ms" : "300000", + "ksql.internal.topic.replicas" : "1", + "ksql.insert.into.values.enabled" : "true", + "ksql.query.pull.max.allowed.offset.lag" : "9223372036854775807", + "ksql.query.pull.max.qps" : "2147483647", + "ksql.access.validator.enable" : "auto", + "ksql.streams.bootstrap.servers" : "localhost:0", + "ksql.query.pull.metrics.enabled" : "false", + "ksql.create.or.replace.enabled" : "true", + "ksql.metrics.extension" : null, + "ksql.hidden.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.cast.strings.preserve.nulls" : "true", + "ksql.authorization.cache.max.entries" : "10000", + "ksql.pull.queries.enable" : "true", + "ksql.suppress.enabled" : "false", + "ksql.sink.window.change.log.additional.retention" : "1000000", + "ksql.readonly.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.query.persistent.active.limit" : "2147483647", + "ksql.persistence.wrap.single.values" : null, + "ksql.authorization.cache.expiry.time.secs" : "30", + "ksql.query.retry.backoff.initial.ms" : "15000", + "ksql.schema.registry.url" : "", + "ksql.properties.overrides.denylist" : "", + "ksql.streams.auto.offset.reset" : "earliest", + "ksql.connect.url" : "http://localhost:8083", + "ksql.service.id" : "some.ksql.service.id", + "ksql.streams.default.production.exception.handler" : "io.confluent.ksql.errors.ProductionExceptionHandlerUtil$LogAndFailProductionExceptionHandler", + "ksql.enable.metastore.backup" : "false", + "ksql.streams.commit.interval.ms" : "2000", + "ksql.streams.auto.commit.interval.ms" : "0", + "ksql.streams.topology.optimization" : "all", + "ksql.query.retry.backoff.max.ms" : "900000", + "ksql.streams.num.stream.threads" : "4", + "ksql.timestamp.throw.on.invalid" : "false", + "ksql.metrics.tags.custom" : "", + "ksql.udfs.enabled" : "true", + "ksql.udf.enable.security.manager" : "true", + "ksql.connect.worker.config" : "", + "ksql.udf.collect.metrics" : "false", + "ksql.persistent.prefix" : "query_", + "ksql.metastore.backup.location" : "", + "ksql.error.classifier.regex" : "", + "ksql.suppress.buffer.size.bytes" : "-1" + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/binary-comparison_-_struct_equality/6.1.0_1601291312550/spec.json b/ksqldb-functional-tests/src/test/resources/historical_plans/binary-comparison_-_struct_equality/6.1.0_1601291312550/spec.json new file mode 100644 index 000000000000..a301871bd235 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/binary-comparison_-_struct_equality/6.1.0_1601291312550/spec.json @@ -0,0 +1,134 @@ +{ + "version" : "6.1.0", + "timestamp" : 1601291312550, + "path" : "query-validation-tests/binary-comparison.json", + "schemas" : { + "CSAS_OUTPUT_0.KsqlTopic.Source" : { + "schema" : "`A` INTEGER KEY, `B` STRUCT<`ID` INTEGER>, `C` STRUCT<`ID` INTEGER>", + "serdeOptions" : [ ] + }, + "CSAS_OUTPUT_0.OUTPUT" : { + "schema" : "`A` INTEGER KEY, `KSQL_COL_0` BOOLEAN, `KSQL_COL_1` BOOLEAN, `KSQL_COL_2` BOOLEAN, `KSQL_COL_3` BOOLEAN", + "serdeOptions" : [ ] + } + }, + "testCase" : { + "name" : "struct equality", + "inputs" : [ { + "topic" : "test_topic", + "key" : 1, + "value" : { + "B" : { + "id" : 2 + }, + "C" : { + "id" : 2.0 + } + } + }, { + "topic" : "test_topic", + "key" : 2, + "value" : { + "B" : { + "id" : 2 + }, + "C" : { + "id" : 1 + } + } + }, { + "topic" : "test_topic", + "key" : 3, + "value" : { + "B" : { + "id" : 2 + }, + "C" : { + "i" : 2 + } + } + } ], + "outputs" : [ { + "topic" : "OUTPUT", + "key" : 1, + "value" : { + "KSQL_COL_0" : true, + "KSQL_COL_1" : false, + "KSQL_COL_2" : true, + "KSQL_COL_3" : false + } + }, { + "topic" : "OUTPUT", + "key" : 2, + "value" : { + "KSQL_COL_0" : false, + "KSQL_COL_1" : true, + "KSQL_COL_2" : false, + "KSQL_COL_3" : true + } + }, { + "topic" : "OUTPUT", + "key" : 3, + "value" : { + "KSQL_COL_0" : false, + "KSQL_COL_1" : true, + "KSQL_COL_2" : false, + "KSQL_COL_3" : true + } + } ], + "topics" : [ { + "name" : "OUTPUT", + "replicas" : 1, + "numPartitions" : 4 + }, { + "name" : "test_topic", + "replicas" : 1, + "numPartitions" : 4 + } ], + "statements" : [ "CREATE STREAM INPUT (A INT KEY, B STRUCT, C STRUCT) WITH (kafka_topic='test_topic', value_format='JSON');", "CREATE STREAM OUTPUT AS SELECT A, B = C, B <> C, B IS NOT DISTINCT FROM C, B IS DISTINCT FROM C FROM INPUT;" ], + "post" : { + "sources" : [ { + "name" : "INPUT", + "type" : "STREAM", + "schema" : "`A` INTEGER KEY, `B` STRUCT<`ID` INTEGER>, `C` STRUCT<`ID` INTEGER>", + "keyFormat" : { + "format" : "KAFKA" + }, + "serdeOptions" : [ ] + }, { + "name" : "OUTPUT", + "type" : "STREAM", + "schema" : "`A` INTEGER KEY, `KSQL_COL_0` BOOLEAN, `KSQL_COL_1` BOOLEAN, `KSQL_COL_2` BOOLEAN, `KSQL_COL_3` BOOLEAN", + "keyFormat" : { + "format" : "KAFKA" + }, + "serdeOptions" : [ ] + } ], + "topics" : { + "topics" : [ { + "name" : "test_topic", + "keyFormat" : { + "formatInfo" : { + "format" : "KAFKA" + } + }, + "valueFormat" : { + "format" : "JSON" + }, + "partitions" : 4 + }, { + "name" : "OUTPUT", + "keyFormat" : { + "formatInfo" : { + "format" : "KAFKA" + } + }, + "valueFormat" : { + "format" : "JSON" + }, + "partitions" : 4 + } ] + } + } + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/binary-comparison_-_struct_equality/6.1.0_1601291312550/topology b/ksqldb-functional-tests/src/test/resources/historical_plans/binary-comparison_-_struct_equality/6.1.0_1601291312550/topology new file mode 100644 index 000000000000..441a8f282644 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/binary-comparison_-_struct_equality/6.1.0_1601291312550/topology @@ -0,0 +1,13 @@ +Topologies: + Sub-topology: 0 + Source: KSTREAM-SOURCE-0000000000 (topics: [test_topic]) + --> KSTREAM-TRANSFORMVALUES-0000000001 + Processor: KSTREAM-TRANSFORMVALUES-0000000001 (stores: []) + --> Project + <-- KSTREAM-SOURCE-0000000000 + Processor: Project (stores: []) + --> KSTREAM-SINK-0000000003 + <-- KSTREAM-TRANSFORMVALUES-0000000001 + Sink: KSTREAM-SINK-0000000003 (topic: OUTPUT) + <-- Project + diff --git a/ksqldb-functional-tests/src/test/resources/query-validation-tests/binary-comparison.json b/ksqldb-functional-tests/src/test/resources/query-validation-tests/binary-comparison.json index b52500555f4b..cdadbfc95c9f 100644 --- a/ksqldb-functional-tests/src/test/resources/query-validation-tests/binary-comparison.json +++ b/ksqldb-functional-tests/src/test/resources/query-validation-tests/binary-comparison.json @@ -6,31 +6,31 @@ { "name": "equals", "statements": [ - "CREATE STREAM INPUT (A INT KEY, B BOOLEAN, C BIGINT, D DOUBLE, E DECIMAL(4,3), F STRING) WITH (kafka_topic='test_topic', value_format='JSON');", - "CREATE STREAM OUTPUT AS SELECT A, A = 1, B = true, C = 11, D = 1.1, E = 1.20, F = 'foo' FROM INPUT;" + "CREATE STREAM INPUT (A INT KEY, B BOOLEAN, C BIGINT, D DOUBLE, E DECIMAL(4,3), F STRING, G ARRAY, H MAP, I STRUCT) WITH (kafka_topic='test_topic', value_format='JSON');", + "CREATE STREAM OUTPUT AS SELECT A, A = 1, B = true, C = 11, D = 1.1, E = 1.20, F = 'foo', G = ARRAY[1,2], H = MAP('a':=1), I = STRUCT(ID:=2) FROM INPUT;" ], "inputs": [ - {"topic": "test_topic", "key": 1, "value": {"B": true, "C": 11, "D": 1.1, "E": 1.20, "F": "foo"}}, - {"topic": "test_topic", "key": 2, "value": {"B": false, "C": 10, "D": 1.0, "E": 1.21, "F": "Foo"}} + {"topic": "test_topic", "key": 1, "value": {"B": true, "C": 11, "D": 1.1, "E": 1.20, "F": "foo", "G": [1,2], "H": {"a": 1}, "I": {"id": 2}}}, + {"topic": "test_topic", "key": 2, "value": {"B": false, "C": 10, "D": 1.0, "E": 1.21, "F": "Foo", "G": [1], "H": {"b": 1}, "I": {"id": 3}}} ], "outputs": [ - {"topic": "OUTPUT", "key": 1, "value": {"KSQL_COL_0": true, "KSQL_COL_1": true, "KSQL_COL_2": true, "KSQL_COL_3": true, "KSQL_COL_4": true, "KSQL_COL_5": true}}, - {"topic": "OUTPUT", "key": 2, "value": {"KSQL_COL_0": false, "KSQL_COL_1": false, "KSQL_COL_2": false, "KSQL_COL_3": false, "KSQL_COL_4": false, "KSQL_COL_5": false}} + {"topic": "OUTPUT", "key": 1, "value": {"KSQL_COL_0": true, "KSQL_COL_1": true, "KSQL_COL_2": true, "KSQL_COL_3": true, "KSQL_COL_4": true, "KSQL_COL_5": true, "KSQL_COL_6": true, "KSQL_COL_7": true, "KSQL_COL_8": true}}, + {"topic": "OUTPUT", "key": 2, "value": {"KSQL_COL_0": false, "KSQL_COL_1": false, "KSQL_COL_2": false, "KSQL_COL_3": false, "KSQL_COL_4": false, "KSQL_COL_5": false, "KSQL_COL_6": false, "KSQL_COL_7": false, "KSQL_COL_8": false}} ] }, { "name": "not equals", "statements": [ - "CREATE STREAM INPUT (A INT KEY, B BOOLEAN, C BIGINT, D DOUBLE, E DECIMAL(4,3), F STRING) WITH (kafka_topic='test_topic', value_format='JSON');", - "CREATE STREAM OUTPUT AS SELECT A, A <> 1, B <> true, C <> 11, D <> 1.1, E <> 1.20, F <> 'foo' FROM INPUT;" + "CREATE STREAM INPUT (A INT KEY, B BOOLEAN, C BIGINT, D DOUBLE, E DECIMAL(4,3), F STRING, G ARRAY, H MAP, I STRUCT) WITH (kafka_topic='test_topic', value_format='JSON');", + "CREATE STREAM OUTPUT AS SELECT A, A <> 1, B <> true, C <> 11, D <> 1.1, E <> 1.20, F <> 'foo', G <> ARRAY[1,2], H <> MAP('a':=1), I <> STRUCT(ID:=2) FROM INPUT;" ], "inputs": [ - {"topic": "test_topic", "key": 1, "value": {"B": true, "C": 11, "D": 1.1, "E": 1.20, "F": "foo"}}, - {"topic": "test_topic", "key": 2, "value": {"B": false, "C": 10, "D": 1.0, "E": 1.21, "F": "Foo"}} + {"topic": "test_topic", "key": 1, "value": {"B": true, "C": 11, "D": 1.1, "E": 1.20, "F": "foo", "G": [1,2], "H": {"a": 1}, "I": {"id": 2}}}, + {"topic": "test_topic", "key": 2, "value": {"B": false, "C": 10, "D": 1.0, "E": 1.21, "F": "Foo", "G": [1], "H": {"b": 1}, "I": {"id": 3}}} ], "outputs": [ - {"topic": "OUTPUT", "key": 1, "value": {"KSQL_COL_0": false, "KSQL_COL_1": false, "KSQL_COL_2": false, "KSQL_COL_3": false, "KSQL_COL_4": false, "KSQL_COL_5": false}}, - {"topic": "OUTPUT", "key": 2, "value": {"KSQL_COL_0": true, "KSQL_COL_1": true, "KSQL_COL_2": true, "KSQL_COL_3": true, "KSQL_COL_4": true, "KSQL_COL_5": true}} + {"topic": "OUTPUT", "key": 1, "value": {"KSQL_COL_0": false, "KSQL_COL_1": false, "KSQL_COL_2": false, "KSQL_COL_3": false, "KSQL_COL_4": false, "KSQL_COL_5": false, "KSQL_COL_6": false, "KSQL_COL_7": false, "KSQL_COL_8": false}}, + {"topic": "OUTPUT", "key": 2, "value": {"KSQL_COL_0": true, "KSQL_COL_1": true, "KSQL_COL_2": true, "KSQL_COL_3": true, "KSQL_COL_4": true, "KSQL_COL_5": true, "KSQL_COL_6": true, "KSQL_COL_7": true, "KSQL_COL_8": true}} ] }, { @@ -160,6 +160,21 @@ {"topic": "OUTPUT", "key": null, "value": {"KSQL_COL_0": false}} ] }, + { + "name": "is distinct from (2)", + "statements": [ + "CREATE STREAM INPUT (A INT KEY, B BOOLEAN, C BIGINT, D DOUBLE, E DECIMAL(4,3), F STRING, G ARRAY, H MAP, I STRUCT) WITH (kafka_topic='test_topic', value_format='JSON');", + "CREATE STREAM OUTPUT AS SELECT A, A IS DISTINCT FROM 1, B IS DISTINCT FROM true, C IS DISTINCT FROM 11, D IS DISTINCT FROM 1.1, E IS DISTINCT FROM 1.20, F IS DISTINCT FROM 'foo', G IS DISTINCT FROM ARRAY[1,2], H IS DISTINCT FROM MAP('a':=1), I IS DISTINCT FROM STRUCT(ID:=2) FROM INPUT;" + ], + "inputs": [ + {"topic": "test_topic", "key": 1, "value": {"B": true, "C": 11, "D": 1.1, "E": 1.20, "F": "foo", "G": [1,2], "H": {"a": 1}, "I": {"id": 2}}}, + {"topic": "test_topic", "key": 2, "value": {"B": false, "C": 10, "D": 1.0, "E": 1.21, "F": "Foo", "G": [1], "H": {"b": 1}, "I": {"id": 3}}} + ], + "outputs": [ + {"topic": "OUTPUT", "key": 1, "value": {"KSQL_COL_0": false, "KSQL_COL_1": false, "KSQL_COL_2": false, "KSQL_COL_3": false, "KSQL_COL_4": false, "KSQL_COL_5": false, "KSQL_COL_6": false, "KSQL_COL_7": false, "KSQL_COL_8": false}}, + {"topic": "OUTPUT", "key": 2, "value": {"KSQL_COL_0": true, "KSQL_COL_1": true, "KSQL_COL_2": true, "KSQL_COL_3": true, "KSQL_COL_4": true, "KSQL_COL_5": true, "KSQL_COL_6": true, "KSQL_COL_7": true, "KSQL_COL_8": true}} + ] + }, { "name": "is not distinct from", "statements": [ @@ -182,37 +197,116 @@ ] }, { - "name": "comparison array fails", + "name": "is not distinct from (2)", + "statements": [ + "CREATE STREAM INPUT (A INT KEY, B BOOLEAN, C BIGINT, D DOUBLE, E DECIMAL(4,3), F STRING, G ARRAY, H MAP, I STRUCT) WITH (kafka_topic='test_topic', value_format='JSON');", + "CREATE STREAM OUTPUT AS SELECT A, A IS NOT DISTINCT FROM 1, B IS NOT DISTINCT FROM true, C IS NOT DISTINCT FROM 11, D IS NOT DISTINCT FROM 1.1, E IS NOT DISTINCT FROM 1.20, F IS NOT DISTINCT FROM 'foo', G IS NOT DISTINCT FROM ARRAY[1,2], H IS NOT DISTINCT FROM MAP('a':=1), I IS NOT DISTINCT FROM STRUCT(ID:=2) FROM INPUT;" + ], + "inputs": [ + {"topic": "test_topic", "key": 1, "value": {"B": true, "C": 11, "D": 1.1, "E": 1.20, "F": "foo", "G": [1,2], "H": {"a": 1}, "I": {"id": 2}}}, + {"topic": "test_topic", "key": 2, "value": {"B": false, "C": 10, "D": 1.0, "E": 1.21, "F": "Foo", "G": [1], "H": {"b": 1}, "I": {"id": 3}}} + ], + "outputs": [ + {"topic": "OUTPUT", "key": 1, "value": {"KSQL_COL_0": true, "KSQL_COL_1": true, "KSQL_COL_2": true, "KSQL_COL_3": true, "KSQL_COL_4": true, "KSQL_COL_5": true, "KSQL_COL_6": true, "KSQL_COL_7": true, "KSQL_COL_8": true}}, + {"topic": "OUTPUT", "key": 2, "value": {"KSQL_COL_0": false, "KSQL_COL_1": false, "KSQL_COL_2": false, "KSQL_COL_3": false, "KSQL_COL_4": false, "KSQL_COL_5": false, "KSQL_COL_6": false, "KSQL_COL_7": false, "KSQL_COL_8": false}} + ] + }, + { + "name": "array comparison fails", "statements": [ "CREATE STREAM INPUT (A INT KEY, B ARRAY, C ARRAY) WITH (kafka_topic='test_topic', value_format='JSON');", - "CREATE STREAM OUTPUT AS SELECT A, B = C FROM INPUT;" + "CREATE STREAM OUTPUT AS SELECT A, B < C, B > C FROM INPUT;" ], "expectedException": { "type": "io.confluent.ksql.util.KsqlStatementException", - "message": "Cannot compare B (ARRAY) to C (ARRAY) with EQUAL." + "message": "Cannot compare B (ARRAY) to C (ARRAY) with LESS_THAN." } }, { - "name": "comparison map fails", + "name": "array equality", + "statements": [ + "CREATE STREAM INPUT (A INT KEY, B ARRAY, C ARRAY) WITH (kafka_topic='test_topic', value_format='JSON');", + "CREATE STREAM OUTPUT AS SELECT A, B = C, B <> C, B IS NOT DISTINCT FROM C, B IS DISTINCT FROM C FROM INPUT;" + ], + "inputs": [ + {"topic": "test_topic", "key": 1, "value": {"B": [1,2], "C": [1,2.0]}}, + {"topic": "test_topic", "key": 2, "value": {"B": [1,2], "C": [1]}} + ], + "outputs": [ + {"topic": "OUTPUT", "key": 1, "value": {"KSQL_COL_0": true, "KSQL_COL_1": false, "KSQL_COL_2": true, "KSQL_COL_3": false}}, + {"topic": "OUTPUT", "key": 2, "value": {"KSQL_COL_0": false, "KSQL_COL_1": true, "KSQL_COL_2": false, "KSQL_COL_3": true}} + ] + }, + { + "name": "map comparison fails", "statements": [ "CREATE STREAM INPUT (A INT KEY, B MAP, C MAP) WITH (kafka_topic='test_topic', value_format='JSON');", - "CREATE STREAM OUTPUT AS SELECT A, B = C FROM INPUT;" + "CREATE STREAM OUTPUT AS SELECT A, B < C, B > C FROM INPUT;" ], "expectedException": { "type": "io.confluent.ksql.util.KsqlStatementException", - "message": "Cannot compare B (MAP) to C (MAP) with EQUAL." + "message": "Cannot compare B (MAP) to C (MAP) with LESS_THAN." } }, { - "name": "comparison struct fails", + "name": "map equality", + "statements": [ + "CREATE STREAM INPUT (A INT KEY, B MAP, C MAP) WITH (kafka_topic='test_topic', value_format='JSON');", + "CREATE STREAM OUTPUT AS SELECT A, B = C, B <> C, B IS NOT DISTINCT FROM C, B IS DISTINCT FROM C FROM INPUT;" + ], + "inputs": [ + {"topic": "test_topic", "key": 1, "value": {"B": {"a": 1}, "C": {"a": 1.0}}}, + {"topic": "test_topic", "key": 2, "value": {"B": {"a": 1}, "C": {"a": 2}}}, + {"topic": "test_topic", "key": 3, "value": {"B": {"a": 1}, "C": {"b": 1}}} + ], + "outputs": [ + {"topic": "OUTPUT", "key": 1, "value": {"KSQL_COL_0": true, "KSQL_COL_1": false, "KSQL_COL_2": true, "KSQL_COL_3": false}}, + {"topic": "OUTPUT", "key": 2, "value": {"KSQL_COL_0": false, "KSQL_COL_1": true, "KSQL_COL_2": false, "KSQL_COL_3": true}}, + {"topic": "OUTPUT", "key": 3, "value": {"KSQL_COL_0": false, "KSQL_COL_1": true, "KSQL_COL_2": false, "KSQL_COL_3": true}} + ] + }, + { + "name": "struct comparison fails", "statements": [ "CREATE STREAM INPUT (A INT KEY, B STRUCT, C STRUCT) WITH (kafka_topic='test_topic', value_format='JSON');", - "CREATE STREAM OUTPUT AS SELECT A, B = C FROM INPUT;" + "CREATE STREAM OUTPUT AS SELECT A, B < C, B > C FROM INPUT;" ], "expectedException": { "type": "io.confluent.ksql.util.KsqlStatementException", - "message": "Cannot compare B (STRUCT<`ID` INTEGER>) to C (STRUCT<`ID` INTEGER>) with EQUAL" + "message": "Cannot compare B (STRUCT<`ID` INTEGER>) to C (STRUCT<`ID` INTEGER>) with LESS_THAN." } + }, + { + "name": "struct equality", + "statements": [ + "CREATE STREAM INPUT (A INT KEY, B STRUCT, C STRUCT) WITH (kafka_topic='test_topic', value_format='JSON');", + "CREATE STREAM OUTPUT AS SELECT A, B = C, B <> C, B IS NOT DISTINCT FROM C, B IS DISTINCT FROM C FROM INPUT;" + ], + "inputs": [ + {"topic": "test_topic", "key": 1, "value": {"B": {"id": 2}, "C": {"id": 2.0}}}, + {"topic": "test_topic", "key": 2, "value": {"B": {"id": 2}, "C": {"id": 1}}}, + {"topic": "test_topic", "key": 3, "value": {"B": {"id": 2}, "C": {"i": 2}}} + ], + "outputs": [ + {"topic": "OUTPUT", "key": 1, "value": {"KSQL_COL_0": true, "KSQL_COL_1": false, "KSQL_COL_2": true, "KSQL_COL_3": false}}, + {"topic": "OUTPUT", "key": 2, "value": {"KSQL_COL_0": false, "KSQL_COL_1": true, "KSQL_COL_2": false, "KSQL_COL_3": true}}, + {"topic": "OUTPUT", "key": 3, "value": {"KSQL_COL_0": false, "KSQL_COL_1": true, "KSQL_COL_2": false, "KSQL_COL_3": true}} + ] + }, + { + "name": "nested collections comparison", + "statements": [ + "CREATE STREAM INPUT (A INT KEY, B ARRAY>, C ARRAY>) WITH (kafka_topic='test_topic', value_format='JSON');", + "CREATE STREAM OUTPUT AS SELECT A, B = C FROM INPUT;" + ], + "inputs": [ + {"topic": "test_topic", "key": 1, "value": {"B": [{"X": {"foo": 1}},{"Y": {"bar": 2}}], "C": [{"Y": {"foo": 2}},{"X": {"bar": 1}}]}}, + {"topic": "test_topic", "key": 2, "value": {"B": [{"X": {"foo": 1}},{"Y": {"bar": 2}}], "C": [{"X": {"foo": 1}}]}} + ], + "outputs": [ + {"topic": "OUTPUT", "key": 1, "value": {"KSQL_COL_0": true}}, + {"topic": "OUTPUT", "key": 2, "value": {"KSQL_COL_0": false}} + ] } ] } \ No newline at end of file