Skip to content

Commit

Permalink
feat: support Comparisons on complex types (#6149)
Browse files Browse the repository at this point in the history
  • Loading branch information
uurl authored Sep 29, 2020
1 parent f97c9a6 commit 0695213
Show file tree
Hide file tree
Showing 29 changed files with 2,457 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -478,6 +478,42 @@ private String visitStringComparisonExpression(final ComparisonExpression.Type t
}
}

private String visitArrayComparisonExpression(final ComparisonExpression.Type type) {
switch (type) {
case EQUAL:
return "(%1$s.equals(%2$s))";
case NOT_EQUAL:
case IS_DISTINCT_FROM:
return "(!%1$s.equals(%2$s))";
default:
throw new KsqlException("Unexpected array comparison: " + type.getValue());
}
}

private String visitMapComparisonExpression(final ComparisonExpression.Type type) {
switch (type) {
case EQUAL:
return "(%1$s.equals(%2$s))";
case NOT_EQUAL:
case IS_DISTINCT_FROM:
return "(!%1$s.equals(%2$s))";
default:
throw new KsqlException("Unexpected map comparison: " + type.getValue());
}
}

private String visitStructComparisonExpression(final ComparisonExpression.Type type) {
switch (type) {
case EQUAL:
return "(%1$s.equals(%2$s))";
case NOT_EQUAL:
case IS_DISTINCT_FROM:
return "(!%1$s.equals(%2$s))";
default:
throw new KsqlException("Unexpected struct comparison: " + type.getValue());
}
}

private String visitScalarComparisonExpression(final ComparisonExpression.Type type) {
switch (type) {
case EQUAL:
Expand Down Expand Up @@ -552,10 +588,15 @@ public Pair<String, SqlType> visitComparisonExpression(
case STRING:
exprFormat += visitStringComparisonExpression(node.getType());
break;
case MAP:
throw new KsqlException("Cannot compare MAP values");
case ARRAY:
throw new KsqlException("Cannot compare ARRAY values");
exprFormat += visitArrayComparisonExpression(node.getType());
break;
case MAP:
exprFormat += visitMapComparisonExpression(node.getType());
break;
case STRUCT:
exprFormat += visitStructComparisonExpression(node.getType());
break;
case BOOLEAN:
exprFormat += visitBooleanComparisonExpression(node.getType());
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ public enum Type {
LESS_THAN_OR_EQUAL("<="),
GREATER_THAN(">"),
GREATER_THAN_OR_EQUAL(">="),
IS_DISTINCT_FROM("IS DISTINCT FROM");
IS_DISTINCT_FROM("IS DISTINCT FROM"),
IS_NOT_DISTINCT_FROM("IS NOT DISTINCT FROM");

private final String value;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ final class ComparisonUtil {
.add(handler(SqlBaseType::isNumber, ComparisonUtil::handleNumber))
.add(handler(SqlBaseType.STRING, ComparisonUtil::handleString))
.add(handler(SqlBaseType.BOOLEAN, ComparisonUtil::handleBoolean))
.add(handler(SqlBaseType.ARRAY, ComparisonUtil::handleArray))
.add(handler(SqlBaseType.MAP, ComparisonUtil::handleMap))
.add(handler(SqlBaseType.STRUCT, ComparisonUtil::handleStruct))
.build();

private ComparisonUtil() {
Expand Down Expand Up @@ -77,8 +80,26 @@ private static boolean handleString(final Type operator, final SqlType right) {
}

private static boolean handleBoolean(final Type operator, final SqlType right) {
return right.baseType() == SqlBaseType.BOOLEAN
&& (operator == Type.EQUAL || operator == Type.NOT_EQUAL);
return right.baseType() == SqlBaseType.BOOLEAN && isEqualityOperator(operator);
}

private static boolean handleArray(final Type operator, final SqlType right) {
return right.baseType() == SqlBaseType.ARRAY && isEqualityOperator(operator);
}

private static boolean handleMap(final Type operator, final SqlType right) {
return right.baseType() == SqlBaseType.MAP && isEqualityOperator(operator);
}

private static boolean handleStruct(final Type operator, final SqlType right) {
return right.baseType() == SqlBaseType.STRUCT && isEqualityOperator(operator);
}

private static boolean isEqualityOperator(final Type operator) {
return operator == Type.EQUAL
|| operator == Type.NOT_EQUAL
|| operator == Type.IS_DISTINCT_FROM
|| operator == Type.IS_NOT_DISTINCT_FROM;
}

private static Handler handler(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@

import com.google.common.collect.ImmutableList;
import io.confluent.ksql.execution.expression.tree.ComparisonExpression;
import io.confluent.ksql.schema.ksql.types.SqlBaseType;
import io.confluent.ksql.schema.ksql.types.SqlDecimal;
import io.confluent.ksql.schema.ksql.types.SqlType;
import io.confluent.ksql.schema.ksql.types.SqlTypes;
Expand All @@ -44,21 +43,16 @@ public class ComparisonUtilTest {
SqlTypes.struct().field("foo", SqlTypes.BIGINT).build()
);

private static final SqlBaseType[] SCHEMA_TO_SQL_NAME = new SqlBaseType[] {
SqlBaseType.BOOLEAN, SqlBaseType.INTEGER, SqlBaseType.BIGINT, SqlBaseType.DOUBLE,
SqlBaseType.DECIMAL, SqlBaseType.STRING, SqlBaseType.ARRAY, SqlBaseType.MAP, SqlBaseType.STRUCT
};

private static final List<List<Boolean>> expectedResults = ImmutableList.of(
ImmutableList.of(true, false, false, false, false, false, false, false, false), // Boolean
ImmutableList.of(false, true, true, true, true, false, false, false, false), // Int
ImmutableList.of(false, true, true, true, true, false, false, false, false), // BigInt
ImmutableList.of(false, true, true, true, true, false, false, false, false), // Double
ImmutableList.of(false, true, true, true, true, false, false, false, false), // Decimal
ImmutableList.of(false, false, false, false, false, true, false, false, false), // String
ImmutableList.of(false, false, false, false, false, false, false, false, false), // Array
ImmutableList.of(false, false, false, false, false, false, false, false, false), // Map
ImmutableList.of(false, false, false, false, false, false, false, false, false) // Struct
ImmutableList.of(false, false, false, false, false, false, true, false, false), // Array
ImmutableList.of(false, false, false, false, false, false, false, true, false), // Map
ImmutableList.of(false, false, false, false, false, false, false, false, true) // Struct
);

@Test
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
{
"plan" : [ {
"@type" : "ksqlPlanV1",
"statementText" : "CREATE STREAM INPUT (A INTEGER KEY, B ARRAY<INTEGER>, C ARRAY<INTEGER>) WITH (KAFKA_TOPIC='test_topic', VALUE_FORMAT='JSON');",
"ddlCommand" : {
"@type" : "createStreamV1",
"sourceName" : "INPUT",
"schema" : "`A` INTEGER KEY, `B` ARRAY<INTEGER>, `C` ARRAY<INTEGER>",
"topicName" : "test_topic",
"formats" : {
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "JSON"
}
},
"orReplace" : false
}
}, {
"@type" : "ksqlPlanV1",
"statementText" : "CREATE STREAM OUTPUT AS SELECT\n INPUT.A A,\n (INPUT.B = INPUT.C) KSQL_COL_0,\n (INPUT.B <> INPUT.C) KSQL_COL_1,\n (NOT (INPUT.B IS DISTINCT FROM INPUT.C)) KSQL_COL_2,\n (INPUT.B IS DISTINCT FROM INPUT.C) KSQL_COL_3\nFROM INPUT INPUT\nEMIT CHANGES",
"ddlCommand" : {
"@type" : "createStreamV1",
"sourceName" : "OUTPUT",
"schema" : "`A` INTEGER KEY, `KSQL_COL_0` BOOLEAN, `KSQL_COL_1` BOOLEAN, `KSQL_COL_2` BOOLEAN, `KSQL_COL_3` BOOLEAN",
"topicName" : "OUTPUT",
"formats" : {
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "JSON"
}
},
"orReplace" : false
},
"queryPlan" : {
"sources" : [ "INPUT" ],
"sink" : "OUTPUT",
"physicalPlan" : {
"@type" : "streamSinkV1",
"properties" : {
"queryContext" : "OUTPUT"
},
"source" : {
"@type" : "streamSelectV1",
"properties" : {
"queryContext" : "Project"
},
"source" : {
"@type" : "streamSourceV1",
"properties" : {
"queryContext" : "KsqlTopic/Source"
},
"topicName" : "test_topic",
"formats" : {
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "JSON"
}
},
"sourceSchema" : "`A` INTEGER KEY, `B` ARRAY<INTEGER>, `C` ARRAY<INTEGER>"
},
"keyColumnNames" : [ "A" ],
"selectExpressions" : [ "(B = C) AS KSQL_COL_0", "(B <> C) AS KSQL_COL_1", "(NOT (B IS DISTINCT FROM C)) AS KSQL_COL_2", "(B IS DISTINCT FROM C) AS KSQL_COL_3" ]
},
"formats" : {
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "JSON"
}
},
"topicName" : "OUTPUT"
},
"queryId" : "CSAS_OUTPUT_0"
}
} ],
"configs" : {
"ksql.extension.dir" : "ext",
"ksql.streams.cache.max.bytes.buffering" : "0",
"ksql.security.extension.class" : null,
"metric.reporters" : "",
"ksql.transient.prefix" : "transient_",
"ksql.query.status.running.threshold.seconds" : "300",
"ksql.streams.default.deserialization.exception.handler" : "io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler",
"ksql.output.topic.name.prefix" : "",
"ksql.query.pull.enable.standby.reads" : "false",
"ksql.streams.max.task.idle.ms" : "0",
"ksql.query.error.max.queue.size" : "10",
"ksql.internal.topic.min.insync.replicas" : "1",
"ksql.streams.shutdown.timeout.ms" : "300000",
"ksql.internal.topic.replicas" : "1",
"ksql.insert.into.values.enabled" : "true",
"ksql.query.pull.max.allowed.offset.lag" : "9223372036854775807",
"ksql.query.pull.max.qps" : "2147483647",
"ksql.access.validator.enable" : "auto",
"ksql.streams.bootstrap.servers" : "localhost:0",
"ksql.query.pull.metrics.enabled" : "false",
"ksql.create.or.replace.enabled" : "true",
"ksql.metrics.extension" : null,
"ksql.hidden.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses",
"ksql.cast.strings.preserve.nulls" : "true",
"ksql.authorization.cache.max.entries" : "10000",
"ksql.pull.queries.enable" : "true",
"ksql.suppress.enabled" : "false",
"ksql.sink.window.change.log.additional.retention" : "1000000",
"ksql.readonly.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses",
"ksql.query.persistent.active.limit" : "2147483647",
"ksql.persistence.wrap.single.values" : null,
"ksql.authorization.cache.expiry.time.secs" : "30",
"ksql.query.retry.backoff.initial.ms" : "15000",
"ksql.schema.registry.url" : "",
"ksql.properties.overrides.denylist" : "",
"ksql.streams.auto.offset.reset" : "earliest",
"ksql.connect.url" : "http://localhost:8083",
"ksql.service.id" : "some.ksql.service.id",
"ksql.streams.default.production.exception.handler" : "io.confluent.ksql.errors.ProductionExceptionHandlerUtil$LogAndFailProductionExceptionHandler",
"ksql.enable.metastore.backup" : "false",
"ksql.streams.commit.interval.ms" : "2000",
"ksql.streams.auto.commit.interval.ms" : "0",
"ksql.streams.topology.optimization" : "all",
"ksql.query.retry.backoff.max.ms" : "900000",
"ksql.streams.num.stream.threads" : "4",
"ksql.timestamp.throw.on.invalid" : "false",
"ksql.metrics.tags.custom" : "",
"ksql.udfs.enabled" : "true",
"ksql.udf.enable.security.manager" : "true",
"ksql.connect.worker.config" : "",
"ksql.udf.collect.metrics" : "false",
"ksql.persistent.prefix" : "query_",
"ksql.metastore.backup.location" : "",
"ksql.error.classifier.regex" : "",
"ksql.suppress.buffer.size.bytes" : "-1"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
{
"version" : "6.1.0",
"timestamp" : 1601291312437,
"path" : "query-validation-tests/binary-comparison.json",
"schemas" : {
"CSAS_OUTPUT_0.KsqlTopic.Source" : {
"schema" : "`A` INTEGER KEY, `B` ARRAY<INTEGER>, `C` ARRAY<INTEGER>",
"serdeOptions" : [ ]
},
"CSAS_OUTPUT_0.OUTPUT" : {
"schema" : "`A` INTEGER KEY, `KSQL_COL_0` BOOLEAN, `KSQL_COL_1` BOOLEAN, `KSQL_COL_2` BOOLEAN, `KSQL_COL_3` BOOLEAN",
"serdeOptions" : [ ]
}
},
"testCase" : {
"name" : "array equality",
"inputs" : [ {
"topic" : "test_topic",
"key" : 1,
"value" : {
"B" : [ 1, 2 ],
"C" : [ 1, 2.0 ]
}
}, {
"topic" : "test_topic",
"key" : 2,
"value" : {
"B" : [ 1, 2 ],
"C" : [ 1 ]
}
} ],
"outputs" : [ {
"topic" : "OUTPUT",
"key" : 1,
"value" : {
"KSQL_COL_0" : true,
"KSQL_COL_1" : false,
"KSQL_COL_2" : true,
"KSQL_COL_3" : false
}
}, {
"topic" : "OUTPUT",
"key" : 2,
"value" : {
"KSQL_COL_0" : false,
"KSQL_COL_1" : true,
"KSQL_COL_2" : false,
"KSQL_COL_3" : true
}
} ],
"topics" : [ {
"name" : "OUTPUT",
"replicas" : 1,
"numPartitions" : 4
}, {
"name" : "test_topic",
"replicas" : 1,
"numPartitions" : 4
} ],
"statements" : [ "CREATE STREAM INPUT (A INT KEY, B ARRAY<INT>, C ARRAY<INT>) WITH (kafka_topic='test_topic', value_format='JSON');", "CREATE STREAM OUTPUT AS SELECT A, B = C, B <> C, B IS NOT DISTINCT FROM C, B IS DISTINCT FROM C FROM INPUT;" ],
"post" : {
"sources" : [ {
"name" : "INPUT",
"type" : "STREAM",
"schema" : "`A` INTEGER KEY, `B` ARRAY<INTEGER>, `C` ARRAY<INTEGER>",
"keyFormat" : {
"format" : "KAFKA"
},
"serdeOptions" : [ ]
}, {
"name" : "OUTPUT",
"type" : "STREAM",
"schema" : "`A` INTEGER KEY, `KSQL_COL_0` BOOLEAN, `KSQL_COL_1` BOOLEAN, `KSQL_COL_2` BOOLEAN, `KSQL_COL_3` BOOLEAN",
"keyFormat" : {
"format" : "KAFKA"
},
"serdeOptions" : [ ]
} ],
"topics" : {
"topics" : [ {
"name" : "test_topic",
"keyFormat" : {
"formatInfo" : {
"format" : "KAFKA"
}
},
"valueFormat" : {
"format" : "JSON"
},
"partitions" : 4
}, {
"name" : "OUTPUT",
"keyFormat" : {
"formatInfo" : {
"format" : "KAFKA"
}
},
"valueFormat" : {
"format" : "JSON"
},
"partitions" : 4
} ]
}
}
}
}
Loading

0 comments on commit 0695213

Please sign in to comment.