From 06657bafe73aa6066723dfbaf0742e92a289fb80 Mon Sep 17 00:00:00 2001 From: Zara Lim Date: Wed, 14 Jul 2021 11:26:15 -0700 Subject: [PATCH] feat: add BYTES type to ksqlDB (#7778) * feat: add BYTES type to ksqlDB * use byte array for ByteLIteral * return readonlybuffer * spotbugs --- .../ksql/function/types/BytesType.java | 35 +++++++++ .../ksql/function/types/ParamTypes.java | 2 + .../ksql/schema/ksql/SchemaConverters.java | 14 +++- .../ksql/schema/ksql/SqlTypeWalker.java | 5 ++ .../confluent/ksql/schema/OperatorTest.java | 2 + .../schema/ksql/SchemaConvertersTest.java | 5 ++ .../schema/ksql/DefaultSqlValueCoercer.java | 3 + .../ksql/DefaultSqlValueCoercerTest.java | 23 +++++- .../rewrite/ExpressionTreeRewriter.java | 8 +++ .../io/confluent/ksql/function/UdafTypes.java | 2 + .../execution/codegen/SqlToJavaVisitor.java | 11 +++ .../codegen/helpers/SqlTypeCodeGen.java | 5 ++ .../formatter/ExpressionFormatter.java | 8 +++ .../expression/tree/BytesLiteral.java | 72 +++++++++++++++++++ .../expression/tree/ExpressionVisitor.java | 2 + .../tree/TraversalExpressionVisitor.java | 5 ++ .../tree/VisitParentExpressionVisitor.java | 5 ++ .../ksql/execution/function/UdfUtil.java | 2 + .../execution/interpreter/TermCompiler.java | 9 +++ .../interpreter/terms/LiteralTerms.java | 24 +++++++ .../execution/util/ExpressionTypeManager.java | 9 +++ .../ksql/execution/util/Literals.java | 3 + .../ksql/execution/ImmutabilityTest.java | 2 + .../codegen/helpers/CastEvaluatorTest.java | 9 ++- .../codegen/helpers/SqlTypeCodeGenTest.java | 1 + .../formatter/ExpressionFormatterTest.java | 9 +++ .../util/ExpressionTypeManagerTest.java | 7 ++ .../ksql/api/util/ApiSqlValueCoercerTest.java | 32 +++++++++ .../ksql/schema/ksql/JavaToSqlConverter.java | 2 + .../ksql/schema/ksql/types/SqlBaseType.java | 3 +- .../schema/ksql/types/SqlPrimitiveType.java | 1 + .../ksql/schema/ksql/types/SqlTypes.java | 1 + 32 files changed, 315 insertions(+), 6 deletions(-) create mode 100644 ksqldb-common/src/main/java/io/confluent/ksql/function/types/BytesType.java create mode 100644 ksqldb-execution/src/main/java/io/confluent/ksql/execution/expression/tree/BytesLiteral.java diff --git a/ksqldb-common/src/main/java/io/confluent/ksql/function/types/BytesType.java b/ksqldb-common/src/main/java/io/confluent/ksql/function/types/BytesType.java new file mode 100644 index 000000000000..d92868e0e380 --- /dev/null +++ b/ksqldb-common/src/main/java/io/confluent/ksql/function/types/BytesType.java @@ -0,0 +1,35 @@ +/* + * Copyright 2021 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"; you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.function.types; + +public class BytesType extends ObjectType { + public static final BytesType INSTANCE = new BytesType(); + + @Override + public int hashCode() { + return 10; + } + + @Override + public boolean equals(final Object obj) { + return obj instanceof BytesType; + } + + @Override + public String toString() { + return "BYTES"; + } +} diff --git a/ksqldb-common/src/main/java/io/confluent/ksql/function/types/ParamTypes.java b/ksqldb-common/src/main/java/io/confluent/ksql/function/types/ParamTypes.java index 02f101ee8ef3..a471645aa701 100644 --- a/ksqldb-common/src/main/java/io/confluent/ksql/function/types/ParamTypes.java +++ b/ksqldb-common/src/main/java/io/confluent/ksql/function/types/ParamTypes.java @@ -44,6 +44,7 @@ private ParamTypes() { public static final DateType DATE = DateType.INSTANCE; public static final TimestampType TIMESTAMP = TimestampType.INSTANCE; public static final IntervalUnitType INTERVALUNIT = IntervalUnitType.INSTANCE; + public static final BytesType BYTES = BytesType.INSTANCE; public static boolean areCompatible(final SqlArgument actual, final ParamType declared) { return areCompatible(actual, declared, false); @@ -159,6 +160,7 @@ private static boolean isPrimitiveMatch( || base == SqlBaseType.TIME && declared instanceof TimeType || base == SqlBaseType.DATE && declared instanceof DateType || base == SqlBaseType.TIMESTAMP && declared instanceof TimestampType + || base == SqlBaseType.BYTES && declared instanceof BytesType || allowCast && base.canImplicitlyCast(functionToSqlBaseConverter().toBaseType(declared)); // CHECKSTYLE_RULES.ON: BooleanExpressionComplexity } diff --git a/ksqldb-common/src/main/java/io/confluent/ksql/schema/ksql/SchemaConverters.java b/ksqldb-common/src/main/java/io/confluent/ksql/schema/ksql/SchemaConverters.java index 447cc08a412f..380dd8485534 100644 --- a/ksqldb-common/src/main/java/io/confluent/ksql/schema/ksql/SchemaConverters.java +++ b/ksqldb-common/src/main/java/io/confluent/ksql/schema/ksql/SchemaConverters.java @@ -36,6 +36,7 @@ import io.confluent.ksql.util.DecimalUtil; import io.confluent.ksql.util.KsqlException; import java.math.BigDecimal; +import java.nio.ByteBuffer; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -244,9 +245,12 @@ private static SqlType sqlType(final Schema schema) { return handler.apply(schema); } - private static SqlDecimal handleBytes(final Schema schema) { - DecimalUtil.requireDecimal(schema); - return SqlDecimal.of(DecimalUtil.precision(schema), DecimalUtil.scale(schema)); + private static SqlType handleBytes(final Schema schema) { + if (DecimalUtil.isDecimal(schema)) { + return SqlDecimal.of(DecimalUtil.precision(schema), DecimalUtil.scale(schema)); + } else { + return SqlTypes.BYTES; + } } private static SqlArray toSqlArray(final Schema schema) { @@ -293,6 +297,7 @@ private static final class ConnectFromSqlConverter implements SqlToConnectTypeCo .put(SqlBaseType.TIME, t -> Time.builder().optional()) .put(SqlBaseType.DATE, t -> Date.builder().optional()) .put(SqlBaseType.TIMESTAMP, t -> Timestamp.builder().optional()) + .put(SqlBaseType.BYTES, t -> SchemaBuilder.bytes().optional()) .build(); @Override @@ -360,6 +365,7 @@ private static class JavaToSqlConverter implements JavaToSqlTypeConverter { .put(java.sql.Time.class, SqlBaseType.TIME) .put(java.sql.Date.class, SqlBaseType.DATE) .put(java.sql.Timestamp.class, SqlBaseType.TIMESTAMP) + .put(ByteBuffer.class, SqlBaseType.BYTES) .build(); @Override @@ -400,6 +406,7 @@ private static class FunctionToSql implements FunctionToSqlConverter { .put(ParamTypes.TIME, SqlTypes.TIME) .put(ParamTypes.DATE, SqlTypes.DATE) .put(ParamTypes.TIMESTAMP, SqlTypes.TIMESTAMP) + .put(ParamTypes.BYTES, SqlTypes.BYTES) .build(); @Override @@ -444,6 +451,7 @@ private static class FunctionToSqlBase implements FunctionToSqlBaseConverter { .put(ParamTypes.TIME, SqlBaseType.TIME) .put(ParamTypes.DATE, SqlBaseType.DATE) .put(ParamTypes.TIMESTAMP, SqlBaseType.TIMESTAMP) + .put(ParamTypes.BYTES, SqlBaseType.BYTES) .build(); @Override diff --git a/ksqldb-common/src/main/java/io/confluent/ksql/schema/ksql/SqlTypeWalker.java b/ksqldb-common/src/main/java/io/confluent/ksql/schema/ksql/SqlTypeWalker.java index 077a191181a3..a212ee880f01 100644 --- a/ksqldb-common/src/main/java/io/confluent/ksql/schema/ksql/SqlTypeWalker.java +++ b/ksqldb-common/src/main/java/io/confluent/ksql/schema/ksql/SqlTypeWalker.java @@ -46,6 +46,7 @@ public final class SqlTypeWalker { .put(SqlBaseType.TIME, (v, t) -> v.visitTime((SqlPrimitiveType) t)) .put(SqlBaseType.DATE, (v, t) -> v.visitDate((SqlPrimitiveType) t)) .put(SqlBaseType.TIMESTAMP, (v, t) -> v.visitTimestamp((SqlPrimitiveType) t)) + .put(SqlBaseType.BYTES, (v, t) -> v.visitBytes((SqlPrimitiveType) t)) .put(SqlBaseType.ARRAY, SqlTypeWalker::visitArray) .put(SqlBaseType.MAP, SqlTypeWalker::visitMap) .put(SqlBaseType.STRUCT, SqlTypeWalker::visitStruct) @@ -100,6 +101,10 @@ default S visitTimestamp(final SqlPrimitiveType type) { return visitPrimitive(type); } + default S visitBytes(final SqlPrimitiveType type) { + return visitPrimitive(type); + } + default S visitArray(final SqlArray type, final S element) { return visitType(type); } diff --git a/ksqldb-common/src/test/java/io/confluent/ksql/schema/OperatorTest.java b/ksqldb-common/src/test/java/io/confluent/ksql/schema/OperatorTest.java index d79d1b27f097..39610b444c84 100644 --- a/ksqldb-common/src/test/java/io/confluent/ksql/schema/OperatorTest.java +++ b/ksqldb-common/src/test/java/io/confluent/ksql/schema/OperatorTest.java @@ -22,6 +22,7 @@ import static io.confluent.ksql.schema.Operator.SUBTRACT; import static io.confluent.ksql.schema.ksql.types.SqlTypes.BIGINT; import static io.confluent.ksql.schema.ksql.types.SqlTypes.BOOLEAN; +import static io.confluent.ksql.schema.ksql.types.SqlTypes.BYTES; import static io.confluent.ksql.schema.ksql.types.SqlTypes.DATE; import static io.confluent.ksql.schema.ksql.types.SqlTypes.DOUBLE; import static io.confluent.ksql.schema.ksql.types.SqlTypes.INTEGER; @@ -66,6 +67,7 @@ public class OperatorTest { .put(SqlBaseType.TIME, TIME) .put(SqlBaseType.DATE, DATE) .put(SqlBaseType.TIMESTAMP, TIMESTAMP) + .put(SqlBaseType.BYTES, BYTES) .put(SqlBaseType.ARRAY, SqlTypes.array(BIGINT)) .put(SqlBaseType.MAP, SqlTypes.map(SqlTypes.STRING, INTEGER)) .put(SqlBaseType.STRUCT, SqlTypes.struct().field("f", INTEGER).build()) diff --git a/ksqldb-common/src/test/java/io/confluent/ksql/schema/ksql/SchemaConvertersTest.java b/ksqldb-common/src/test/java/io/confluent/ksql/schema/ksql/SchemaConvertersTest.java index 6a58ce83cb05..2a4e05b9f456 100644 --- a/ksqldb-common/src/test/java/io/confluent/ksql/schema/ksql/SchemaConvertersTest.java +++ b/ksqldb-common/src/test/java/io/confluent/ksql/schema/ksql/SchemaConvertersTest.java @@ -42,6 +42,7 @@ import io.confluent.ksql.util.DecimalUtil; import io.confluent.ksql.util.KsqlException; import java.math.BigDecimal; +import java.nio.ByteBuffer; import java.sql.Date; import java.sql.Time; import java.sql.Timestamp; @@ -65,6 +66,7 @@ public class SchemaConvertersTest { private static final Schema CONNECT_BIGINT_SCHEMA = SchemaBuilder.int64().optional().build(); private static final Schema CONNECT_DOUBLE_SCHEMA = SchemaBuilder.float64().optional().build(); private static final Schema CONNECT_STRING_SCHEMA = SchemaBuilder.string().optional().build(); + private static final Schema CONNECT_BYTES_SCHEMA = SchemaBuilder.bytes().optional().build(); private static final Schema CONNECT_TIME_SCHEMA = org.apache.kafka.connect.data.Time.builder().optional().schema(); private static final Schema CONNECT_DATE_SCHEMA = @@ -81,6 +83,7 @@ public class SchemaConvertersTest { .put(SqlTypes.TIME, CONNECT_TIME_SCHEMA) .put(SqlTypes.DATE, CONNECT_DATE_SCHEMA) .put(SqlTypes.TIMESTAMP, CONNECT_TIMESTAMP_SCHEMA) + .put(SqlTypes.BYTES, CONNECT_BYTES_SCHEMA) .put(SqlArray.of(SqlTypes.INTEGER), SchemaBuilder .array(Schema.OPTIONAL_INT32_SCHEMA) .optional() @@ -115,6 +118,7 @@ public class SchemaConvertersTest { .put(SqlBaseType.TIME, Time.class) .put(SqlBaseType.DATE, Date.class) .put(SqlBaseType.TIMESTAMP, Timestamp.class) + .put(SqlBaseType.BYTES, ByteBuffer.class) .build(); private static final BiMap SQL_TO_FUNCTION = ImmutableBiMap @@ -127,6 +131,7 @@ public class SchemaConvertersTest { .put(SqlTypes.TIME, ParamTypes.TIME) .put(SqlTypes.DATE, ParamTypes.DATE) .put(SqlTypes.TIMESTAMP, ParamTypes.TIMESTAMP) + .put(SqlTypes.BYTES, ParamTypes.BYTES) .put(SqlArray.of(SqlTypes.INTEGER), ArrayType.of(ParamTypes.INTEGER)) .put(SqlDecimal.of(2, 1), ParamTypes.DECIMAL) .put( diff --git a/ksqldb-engine-common/src/main/java/io/confluent/ksql/schema/ksql/DefaultSqlValueCoercer.java b/ksqldb-engine-common/src/main/java/io/confluent/ksql/schema/ksql/DefaultSqlValueCoercer.java index b07b2de0b4a5..ac401c8cf0e2 100644 --- a/ksqldb-engine-common/src/main/java/io/confluent/ksql/schema/ksql/DefaultSqlValueCoercer.java +++ b/ksqldb-engine-common/src/main/java/io/confluent/ksql/schema/ksql/DefaultSqlValueCoercer.java @@ -18,6 +18,7 @@ import static io.confluent.ksql.schema.ksql.types.SqlBaseType.ARRAY; import static io.confluent.ksql.schema.ksql.types.SqlBaseType.BIGINT; import static io.confluent.ksql.schema.ksql.types.SqlBaseType.BOOLEAN; +import static io.confluent.ksql.schema.ksql.types.SqlBaseType.BYTES; import static io.confluent.ksql.schema.ksql.types.SqlBaseType.DATE; import static io.confluent.ksql.schema.ksql.types.SqlBaseType.DECIMAL; import static io.confluent.ksql.schema.ksql.types.SqlBaseType.DOUBLE; @@ -323,6 +324,8 @@ private static final class Rules { .put(key(TIMESTAMP, TIMESTAMP), Coercer.PASS_THROUGH) .put(key(TIME, TIME), Coercer.PASS_THROUGH) .put(key(DATE, DATE), Coercer.PASS_THROUGH) + // BYTES: + .put(key(BYTES, BYTES), Coercer.PASS_THROUGH) .build(); private static final ImmutableMap LAX_ADDITIONAL = diff --git a/ksqldb-engine-common/src/test/java/io/confluent/ksql/schema/ksql/DefaultSqlValueCoercerTest.java b/ksqldb-engine-common/src/test/java/io/confluent/ksql/schema/ksql/DefaultSqlValueCoercerTest.java index 31ea5d6de8f4..0589014a8c6a 100644 --- a/ksqldb-engine-common/src/test/java/io/confluent/ksql/schema/ksql/DefaultSqlValueCoercerTest.java +++ b/ksqldb-engine-common/src/test/java/io/confluent/ksql/schema/ksql/DefaultSqlValueCoercerTest.java @@ -17,6 +17,7 @@ import static io.confluent.ksql.schema.ksql.types.SqlTypes.BIGINT; import static io.confluent.ksql.schema.ksql.types.SqlTypes.BOOLEAN; +import static io.confluent.ksql.schema.ksql.types.SqlTypes.BYTES; import static io.confluent.ksql.schema.ksql.types.SqlTypes.DATE; import static io.confluent.ksql.schema.ksql.types.SqlTypes.DOUBLE; import static io.confluent.ksql.schema.ksql.types.SqlTypes.INTEGER; @@ -50,6 +51,7 @@ import io.confluent.ksql.schema.ksql.types.SqlType; import io.confluent.ksql.schema.ksql.types.SqlTypes; import java.math.BigDecimal; +import java.nio.ByteBuffer; import java.sql.Date; import java.sql.Time; import java.sql.Timestamp; @@ -315,6 +317,11 @@ public static class LaxValueCoercionTest { .put(STRING, laxOnly("1990-03-03")) .build() ) + // BYTES: + .put(ByteBuffer.wrap(new byte[] {110}), ImmutableMap.builder() + .put(BYTES, ByteBuffer.wrap(new byte[] {110})) + .build() + ) // ARRAY: .put(ImmutableList.of(), ImmutableMap.builder() .put(array(BOOLEAN), ImmutableList.of()) @@ -329,6 +336,7 @@ public static class LaxValueCoercionTest { .put(array(TIME), ImmutableList.of()) .put(array(DATE), ImmutableList.of()) .put(array(TIMESTAMP), ImmutableList.of()) + .put(array(BYTES), ImmutableList.of()) .build() ) .put(ImmutableList.of(true, false), ImmutableMap.builder() @@ -432,6 +440,10 @@ public static class LaxValueCoercionTest { struct().field("def", DATE).build(), createStruct(struct().field("def", DATE).build()) ) + .put( + struct().field("ghi", BYTES).build(), + createStruct(struct().field("ghi", BYTES).build()) + ) .build() ) .put(createStruct(SqlTypes.struct() @@ -1018,7 +1030,8 @@ public void shouldFailIfNewSqlBaseTypeAdded() { is(ImmutableSet.of( SqlBaseType.BOOLEAN, SqlBaseType.INTEGER, SqlBaseType.BIGINT, SqlBaseType.DECIMAL, SqlBaseType.DOUBLE, SqlBaseType.STRING, SqlBaseType.ARRAY, SqlBaseType.MAP, - SqlBaseType.STRUCT, SqlBaseType.TIME, SqlBaseType.DATE, SqlBaseType.TIMESTAMP + SqlBaseType.STRUCT, SqlBaseType.TIME, SqlBaseType.DATE, SqlBaseType.TIMESTAMP, + SqlBaseType.BYTES )) ); } @@ -1092,6 +1105,7 @@ private static final class TypeInstances { .put(SqlBaseType.TIME, SqlTypes.TIME) .put(SqlBaseType.DATE, SqlTypes.DATE) .put(SqlBaseType.TIMESTAMP, SqlTypes.TIMESTAMP) + .put(SqlBaseType.BYTES, BYTES) .build(); static SqlType typeInstanceFor(final SqlBaseType baseType) { @@ -1118,6 +1132,7 @@ private static final class InstanceInstances { .put(SqlBaseType.TIME, new Time(1000L)) .put(SqlBaseType.DATE, new Date(636451200000L)) .put(SqlBaseType.TIMESTAMP, new Timestamp(1535792475000L)) + .put(SqlBaseType.BYTES, ByteBuffer.wrap(new byte[] {88, 34, 120})) .build(); @SuppressWarnings("fallthrough") @@ -1225,6 +1240,9 @@ private static final class SupportedCoercions { .put(SqlBaseType.TIMESTAMP, ImmutableSet.builder() .add(SqlBaseType.TIMESTAMP) .build()) + .put(SqlBaseType.BYTES, ImmutableSet.builder() + .add(SqlBaseType.BYTES) + .build()) .build(); private static final ImmutableMap> LAX_ADDITIONAL = @@ -1266,6 +1284,9 @@ private static final class SupportedCoercions { .put(SqlBaseType.DATE, ImmutableSet.builder() .add(SqlBaseType.STRING) .build()) + .put(SqlBaseType.BYTES, ImmutableSet.builder() + .add(SqlBaseType.BYTES) + .build()) .build(); private static boolean supported( diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/engine/rewrite/ExpressionTreeRewriter.java b/ksqldb-engine/src/main/java/io/confluent/ksql/engine/rewrite/ExpressionTreeRewriter.java index 06ba25f39b43..82909aac20b7 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/engine/rewrite/ExpressionTreeRewriter.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/engine/rewrite/ExpressionTreeRewriter.java @@ -22,6 +22,7 @@ import io.confluent.ksql.execution.expression.tree.ArithmeticUnaryExpression; import io.confluent.ksql.execution.expression.tree.BetweenPredicate; import io.confluent.ksql.execution.expression.tree.BooleanLiteral; +import io.confluent.ksql.execution.expression.tree.BytesLiteral; import io.confluent.ksql.execution.expression.tree.Cast; import io.confluent.ksql.execution.expression.tree.ComparisonExpression; import io.confluent.ksql.execution.expression.tree.CreateArrayExpression; @@ -542,5 +543,12 @@ public Expression visitTimestampLiteral( final C context) { return plugin.apply(node, new Context<>(context, this)).orElse(node); } + + @Override + public Expression visitBytesLiteral( + final BytesLiteral node, + final C context) { + return plugin.apply(node, new Context<>(context, this)).orElse(node); + } } } diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/function/UdafTypes.java b/ksqldb-engine/src/main/java/io/confluent/ksql/function/UdafTypes.java index 3874527692f1..ff2f267ab377 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/function/UdafTypes.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/function/UdafTypes.java @@ -30,6 +30,7 @@ import java.lang.reflect.Type; import java.lang.reflect.TypeVariable; import java.math.BigDecimal; +import java.nio.ByteBuffer; import java.sql.Date; import java.sql.Time; import java.sql.Timestamp; @@ -65,6 +66,7 @@ class UdafTypes { .add(Function.class) .add(BiFunction.class) .add(TriFunction.class) + .add(ByteBuffer.class) .build(); private final Type inputType; diff --git a/ksqldb-execution/src/main/java/io/confluent/ksql/execution/codegen/SqlToJavaVisitor.java b/ksqldb-execution/src/main/java/io/confluent/ksql/execution/codegen/SqlToJavaVisitor.java index 673bb6d7e820..27deea00eea1 100644 --- a/ksqldb-execution/src/main/java/io/confluent/ksql/execution/codegen/SqlToJavaVisitor.java +++ b/ksqldb-execution/src/main/java/io/confluent/ksql/execution/codegen/SqlToJavaVisitor.java @@ -40,6 +40,7 @@ import io.confluent.ksql.execution.expression.tree.ArithmeticUnaryExpression; import io.confluent.ksql.execution.expression.tree.BetweenPredicate; import io.confluent.ksql.execution.expression.tree.BooleanLiteral; +import io.confluent.ksql.execution.expression.tree.BytesLiteral; import io.confluent.ksql.execution.expression.tree.Cast; import io.confluent.ksql.execution.expression.tree.ComparisonExpression; import io.confluent.ksql.execution.expression.tree.CreateArrayExpression; @@ -333,6 +334,16 @@ public Pair visitInListExpression( return visitUnsupported(inListExpression); } + @Override + public Pair visitBytesLiteral( + final BytesLiteral node, final Context context + ) { + return new Pair<>( + node.toString(), + SqlTypes.BYTES + ); + } + @Override public Pair visitTimestampLiteral( final TimestampLiteral node, final Context context diff --git a/ksqldb-execution/src/main/java/io/confluent/ksql/execution/codegen/helpers/SqlTypeCodeGen.java b/ksqldb-execution/src/main/java/io/confluent/ksql/execution/codegen/helpers/SqlTypeCodeGen.java index a6b3479a4fb3..ea95c779b3de 100644 --- a/ksqldb-execution/src/main/java/io/confluent/ksql/execution/codegen/helpers/SqlTypeCodeGen.java +++ b/ksqldb-execution/src/main/java/io/confluent/ksql/execution/codegen/helpers/SqlTypeCodeGen.java @@ -80,6 +80,11 @@ public String visitTimestamp(final SqlPrimitiveType type) { return "SqlTypes.TIMESTAMP"; } + @Override + public String visitBytes(final SqlPrimitiveType type) { + return "SqlTypes.BYTES"; + } + @Override public String visitDecimal(final SqlDecimal type) { return "SqlTypes.decimal(" + type.getPrecision() + "," + type.getScale() + ")"; diff --git a/ksqldb-execution/src/main/java/io/confluent/ksql/execution/expression/formatter/ExpressionFormatter.java b/ksqldb-execution/src/main/java/io/confluent/ksql/execution/expression/formatter/ExpressionFormatter.java index ceaca1a81047..4cfc48802c1a 100644 --- a/ksqldb-execution/src/main/java/io/confluent/ksql/execution/expression/formatter/ExpressionFormatter.java +++ b/ksqldb-execution/src/main/java/io/confluent/ksql/execution/expression/formatter/ExpressionFormatter.java @@ -24,6 +24,7 @@ import io.confluent.ksql.execution.expression.tree.ArithmeticUnaryExpression; import io.confluent.ksql.execution.expression.tree.BetweenPredicate; import io.confluent.ksql.execution.expression.tree.BooleanLiteral; +import io.confluent.ksql.execution.expression.tree.BytesLiteral; import io.confluent.ksql.execution.expression.tree.Cast; import io.confluent.ksql.execution.expression.tree.ComparisonExpression; import io.confluent.ksql.execution.expression.tree.CreateArrayExpression; @@ -66,6 +67,7 @@ import java.util.List; import java.util.Locale; import java.util.stream.Collectors; +import org.apache.commons.lang3.StringUtils; public final class ExpressionFormatter { @@ -190,6 +192,12 @@ public String visitDecimalLiteral(final DecimalLiteral node, final Context conte return node.getValue().toString(); } + @Override + public String visitBytesLiteral(final BytesLiteral bytesLiteral, final Context context) { + return "ByteBuffer.wrap(new byte[]{" + + StringUtils.join(bytesLiteral.getByteArray(), ',') + "})"; + } + @Override public String visitTimeLiteral(final TimeLiteral node, final Context context) { return SqlTimeTypes.formatTime(node.getValue()); diff --git a/ksqldb-execution/src/main/java/io/confluent/ksql/execution/expression/tree/BytesLiteral.java b/ksqldb-execution/src/main/java/io/confluent/ksql/execution/expression/tree/BytesLiteral.java new file mode 100644 index 000000000000..2bf1a7da9d5c --- /dev/null +++ b/ksqldb-execution/src/main/java/io/confluent/ksql/execution/expression/tree/BytesLiteral.java @@ -0,0 +1,72 @@ +/* + * Copyright 2021 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.execution.expression.tree; + +import static java.util.Objects.requireNonNull; + +import com.google.errorprone.annotations.Immutable; +import io.confluent.ksql.parser.NodeLocation; +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.Optional; + +@Immutable +public class BytesLiteral extends Literal { + + private final byte[] value; + + public BytesLiteral(final ByteBuffer value) { + this(Optional.empty(), value); + } + + public BytesLiteral(final Optional location, final ByteBuffer value) { + super(location); + this.value = new byte[requireNonNull(value, "value").capacity()]; + value.get(this.value); + } + + @Override + public ByteBuffer getValue() { + return ByteBuffer.wrap(value).asReadOnlyBuffer(); + } + + public byte[] getByteArray() { + return value.clone(); + } + + @Override + public R accept(final ExpressionVisitor visitor, final C context) { + return visitor.visitBytesLiteral(this, context); + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + final BytesLiteral that = (BytesLiteral) o; + return Arrays.equals(value, that.value); + } + + @Override + public int hashCode() { + return Arrays.hashCode(value); + } +} diff --git a/ksqldb-execution/src/main/java/io/confluent/ksql/execution/expression/tree/ExpressionVisitor.java b/ksqldb-execution/src/main/java/io/confluent/ksql/execution/expression/tree/ExpressionVisitor.java index 7dbcb01be5e9..9f32331372f9 100644 --- a/ksqldb-execution/src/main/java/io/confluent/ksql/execution/expression/tree/ExpressionVisitor.java +++ b/ksqldb-execution/src/main/java/io/confluent/ksql/execution/expression/tree/ExpressionVisitor.java @@ -94,4 +94,6 @@ default R process(final Expression node, final C context) { R visitLambdaVariable(LambdaVariable exp, C context); R visitIntervalUnit(IntervalUnit exp, C context); + + R visitBytesLiteral(BytesLiteral exp, C context); } diff --git a/ksqldb-execution/src/main/java/io/confluent/ksql/execution/expression/tree/TraversalExpressionVisitor.java b/ksqldb-execution/src/main/java/io/confluent/ksql/execution/expression/tree/TraversalExpressionVisitor.java index 3a11b9540daa..1fea98434d2f 100644 --- a/ksqldb-execution/src/main/java/io/confluent/ksql/execution/expression/tree/TraversalExpressionVisitor.java +++ b/ksqldb-execution/src/main/java/io/confluent/ksql/execution/expression/tree/TraversalExpressionVisitor.java @@ -249,6 +249,11 @@ public Void visitIntegerLiteral(final IntegerLiteral node, final C context) { return null; } + @Override + public Void visitBytesLiteral(final BytesLiteral node, final C context) { + return null; + } + @Override public Void visitType(final Type node, final C context) { return null; diff --git a/ksqldb-execution/src/main/java/io/confluent/ksql/execution/expression/tree/VisitParentExpressionVisitor.java b/ksqldb-execution/src/main/java/io/confluent/ksql/execution/expression/tree/VisitParentExpressionVisitor.java index a71fef9bce2d..fc4c0984fdc4 100644 --- a/ksqldb-execution/src/main/java/io/confluent/ksql/execution/expression/tree/VisitParentExpressionVisitor.java +++ b/ksqldb-execution/src/main/java/io/confluent/ksql/execution/expression/tree/VisitParentExpressionVisitor.java @@ -233,4 +233,9 @@ public R visitLambdaVariable(final LambdaVariable node, final C context) { public R visitIntervalUnit(final IntervalUnit node, final C context) { return visitExpression(node, context); } + + @Override + public R visitBytesLiteral(final BytesLiteral node, final C context) { + return visitLiteral(node, context); + } } diff --git a/ksqldb-execution/src/main/java/io/confluent/ksql/execution/function/UdfUtil.java b/ksqldb-execution/src/main/java/io/confluent/ksql/execution/function/UdfUtil.java index e30810dc4eca..f5602f58f3cd 100644 --- a/ksqldb-execution/src/main/java/io/confluent/ksql/execution/function/UdfUtil.java +++ b/ksqldb-execution/src/main/java/io/confluent/ksql/execution/function/UdfUtil.java @@ -32,6 +32,7 @@ import java.lang.reflect.Type; import java.lang.reflect.TypeVariable; import java.math.BigDecimal; +import java.nio.ByteBuffer; import java.sql.Date; import java.sql.Time; import java.sql.Timestamp; @@ -61,6 +62,7 @@ public final class UdfUtil { .put(Timestamp.class, ParamTypes.TIMESTAMP) .put(Time.class, ParamTypes.TIME) .put(TimeUnit.class, ParamTypes.INTERVALUNIT) + .put(ByteBuffer.class, ParamTypes.BYTES) .build(); private UdfUtil() { diff --git a/ksqldb-execution/src/main/java/io/confluent/ksql/execution/interpreter/TermCompiler.java b/ksqldb-execution/src/main/java/io/confluent/ksql/execution/interpreter/TermCompiler.java index 4c26a145c5d8..01996542b353 100644 --- a/ksqldb-execution/src/main/java/io/confluent/ksql/execution/interpreter/TermCompiler.java +++ b/ksqldb-execution/src/main/java/io/confluent/ksql/execution/interpreter/TermCompiler.java @@ -29,6 +29,7 @@ import io.confluent.ksql.execution.expression.tree.ArithmeticUnaryExpression; import io.confluent.ksql.execution.expression.tree.BetweenPredicate; import io.confluent.ksql.execution.expression.tree.BooleanLiteral; +import io.confluent.ksql.execution.expression.tree.BytesLiteral; import io.confluent.ksql.execution.expression.tree.Cast; import io.confluent.ksql.execution.expression.tree.ComparisonExpression; import io.confluent.ksql.execution.expression.tree.CreateArrayExpression; @@ -358,6 +359,14 @@ public Term visitIntegerLiteral( return LiteralTerms.of(node.getValue()); } + @Override + public Term visitBytesLiteral( + final BytesLiteral node, + final Context context + ) { + return LiteralTerms.of(node.getValue()); + } + @Override public Term visitFunctionCall(final FunctionCall node, final Context context) { final UdfFactory udfFactory = functionRegistry.getUdfFactory(node.getName()); diff --git a/ksqldb-execution/src/main/java/io/confluent/ksql/execution/interpreter/terms/LiteralTerms.java b/ksqldb-execution/src/main/java/io/confluent/ksql/execution/interpreter/terms/LiteralTerms.java index 007bfb3472ab..0d85b71bbea4 100644 --- a/ksqldb-execution/src/main/java/io/confluent/ksql/execution/interpreter/terms/LiteralTerms.java +++ b/ksqldb-execution/src/main/java/io/confluent/ksql/execution/interpreter/terms/LiteralTerms.java @@ -19,6 +19,7 @@ import io.confluent.ksql.schema.ksql.types.SqlType; import io.confluent.ksql.schema.ksql.types.SqlTypes; import java.math.BigDecimal; +import java.nio.ByteBuffer; import java.sql.Date; import java.sql.Time; import java.sql.Timestamp; @@ -69,6 +70,10 @@ public static Term of(final TimeUnit value) { return new IntervalUnitTermImpl(value); } + public static Term of(final ByteBuffer value) { + return new BytesTermImpl(value); + } + public static NullTerm ofNull() { return new NullTerm(); } @@ -263,6 +268,25 @@ public SqlType getSqlType() { } } + public static class BytesTermImpl implements Term { + + private final ByteBuffer value; + + public BytesTermImpl(final ByteBuffer bytes) { + this.value = bytes; + } + + @Override + public Object getValue(final TermEvaluationContext context) { + return value; + } + + @Override + public SqlType getSqlType() { + return SqlTypes.BYTES; + } + } + public static class IntervalUnitTermImpl implements Term { private final TimeUnit value; diff --git a/ksqldb-execution/src/main/java/io/confluent/ksql/execution/util/ExpressionTypeManager.java b/ksqldb-execution/src/main/java/io/confluent/ksql/execution/util/ExpressionTypeManager.java index 8a3fd6801d0a..b50a2b4bf47e 100644 --- a/ksqldb-execution/src/main/java/io/confluent/ksql/execution/util/ExpressionTypeManager.java +++ b/ksqldb-execution/src/main/java/io/confluent/ksql/execution/util/ExpressionTypeManager.java @@ -21,6 +21,7 @@ import io.confluent.ksql.execution.expression.tree.ArithmeticUnaryExpression; import io.confluent.ksql.execution.expression.tree.BetweenPredicate; import io.confluent.ksql.execution.expression.tree.BooleanLiteral; +import io.confluent.ksql.execution.expression.tree.BytesLiteral; import io.confluent.ksql.execution.expression.tree.Cast; import io.confluent.ksql.execution.expression.tree.ComparisonExpression; import io.confluent.ksql.execution.expression.tree.CreateArrayExpression; @@ -306,6 +307,14 @@ public Void visitStringLiteral( return null; } + @Override + public Void visitBytesLiteral( + final BytesLiteral node, final Context context + ) { + context.setSqlType(SqlTypes.BYTES); + return null; + } + @Override public Void visitBooleanLiteral( final BooleanLiteral node, final Context context diff --git a/ksqldb-execution/src/main/java/io/confluent/ksql/execution/util/Literals.java b/ksqldb-execution/src/main/java/io/confluent/ksql/execution/util/Literals.java index ff3a87a9785b..bfcfffde68dc 100644 --- a/ksqldb-execution/src/main/java/io/confluent/ksql/execution/util/Literals.java +++ b/ksqldb-execution/src/main/java/io/confluent/ksql/execution/util/Literals.java @@ -17,6 +17,7 @@ import com.google.common.collect.ImmutableMap; import io.confluent.ksql.execution.expression.tree.BooleanLiteral; +import io.confluent.ksql.execution.expression.tree.BytesLiteral; import io.confluent.ksql.execution.expression.tree.DateLiteral; import io.confluent.ksql.execution.expression.tree.DecimalLiteral; import io.confluent.ksql.execution.expression.tree.DoubleLiteral; @@ -28,6 +29,7 @@ import io.confluent.ksql.execution.expression.tree.TimestampLiteral; import io.confluent.ksql.schema.ksql.types.SqlBaseType; import java.math.BigDecimal; +import java.nio.ByteBuffer; import java.sql.Date; import java.sql.Time; import java.sql.Timestamp; @@ -51,6 +53,7 @@ public final class Literals { .put(SqlBaseType.TIME, v -> new TimeLiteral((Time) v)) .put(SqlBaseType.DATE, v -> new DateLiteral((Date) v)) .put(SqlBaseType.TIMESTAMP, v -> new TimestampLiteral((Timestamp) v)) + .put(SqlBaseType.BYTES, v -> new BytesLiteral((ByteBuffer) v)) .build(); private Literals() { diff --git a/ksqldb-execution/src/test/java/io/confluent/ksql/execution/ImmutabilityTest.java b/ksqldb-execution/src/test/java/io/confluent/ksql/execution/ImmutabilityTest.java index 05a0a0dd07d5..77227beda295 100644 --- a/ksqldb-execution/src/test/java/io/confluent/ksql/execution/ImmutabilityTest.java +++ b/ksqldb-execution/src/test/java/io/confluent/ksql/execution/ImmutabilityTest.java @@ -15,6 +15,7 @@ package io.confluent.ksql.execution; +import io.confluent.ksql.execution.expression.tree.BytesLiteral; import io.confluent.ksql.test.util.ImmutableTester; import java.util.Collection; import java.util.Objects; @@ -75,6 +76,7 @@ public void shouldBeImmutable() { .withKnownImmutableType(KGroupedStream.class) .withKnownImmutableType(KGroupedTable.class) .withKnownImmutableType(Serde.class) + .withKnownImmutableType(BytesLiteral.class) .test(modelClass); } } diff --git a/ksqldb-execution/src/test/java/io/confluent/ksql/execution/codegen/helpers/CastEvaluatorTest.java b/ksqldb-execution/src/test/java/io/confluent/ksql/execution/codegen/helpers/CastEvaluatorTest.java index 0c502daab5ba..d89b742979f5 100644 --- a/ksqldb-execution/src/test/java/io/confluent/ksql/execution/codegen/helpers/CastEvaluatorTest.java +++ b/ksqldb-execution/src/test/java/io/confluent/ksql/execution/codegen/helpers/CastEvaluatorTest.java @@ -53,6 +53,7 @@ import io.confluent.ksql.util.KsqlConfig; import io.confluent.ksql.util.KsqlException; import java.math.BigDecimal; +import java.nio.ByteBuffer; import java.sql.Date; import java.sql.Time; import java.sql.Timestamp; @@ -674,7 +675,8 @@ public void shouldFailIfNewSqlBaseTypeAdded() { is(ImmutableSet.of( SqlBaseType.BOOLEAN, SqlBaseType.INTEGER, SqlBaseType.BIGINT, SqlBaseType.DECIMAL, SqlBaseType.DOUBLE, SqlBaseType.STRING, SqlBaseType.ARRAY, MAP, - SqlBaseType.STRUCT, SqlBaseType.TIME, SqlBaseType.DATE, SqlBaseType.TIMESTAMP + SqlBaseType.STRUCT, SqlBaseType.TIME, SqlBaseType.DATE, SqlBaseType.TIMESTAMP, + SqlBaseType.BYTES )) ); } @@ -741,6 +743,7 @@ private static final class TypeInstances { .put(SqlBaseType.TIME, SqlTypes.TIME) .put(SqlBaseType.DATE, SqlTypes.DATE) .put(SqlBaseType.TIMESTAMP, SqlTypes.TIMESTAMP) + .put(SqlBaseType.BYTES, SqlTypes.BYTES) .build(); static SqlType typeInstanceFor(final SqlBaseType baseType) { @@ -767,6 +770,7 @@ private static final class InstanceInstances { .put(SqlBaseType.TIME, new Time(500L)) .put(SqlBaseType.DATE, new Date(500L)) .put(SqlBaseType.TIMESTAMP, new Timestamp(500)) + .put(SqlBaseType.BYTES, ByteBuffer.wrap(new byte[] {123})) .build(); @SuppressWarnings("fallthrough") @@ -900,6 +904,9 @@ private static final class SupportedCasts { .add(SqlBaseType.DATE) .add(SqlBaseType.STRING) .build()) + .put(SqlBaseType.BYTES, ImmutableSet.builder() + .add(SqlBaseType.BYTES) + .build()) .build(); private static boolean supported(final SqlBaseType from, final SqlBaseType to) { diff --git a/ksqldb-execution/src/test/java/io/confluent/ksql/execution/codegen/helpers/SqlTypeCodeGenTest.java b/ksqldb-execution/src/test/java/io/confluent/ksql/execution/codegen/helpers/SqlTypeCodeGenTest.java index bc40871867ca..163850dd0cc4 100644 --- a/ksqldb-execution/src/test/java/io/confluent/ksql/execution/codegen/helpers/SqlTypeCodeGenTest.java +++ b/ksqldb-execution/src/test/java/io/confluent/ksql/execution/codegen/helpers/SqlTypeCodeGenTest.java @@ -46,6 +46,7 @@ private static final class TypeInstances { .put(SqlBaseType.STRUCT, SqlTypes.struct() .field("Bob", SqlTypes.STRING) .build()) + .put(SqlBaseType.BYTES, SqlTypes.BYTES) .build(); static SqlType typeInstanceFor(final SqlBaseType baseType) { diff --git a/ksqldb-execution/src/test/java/io/confluent/ksql/execution/expression/formatter/ExpressionFormatterTest.java b/ksqldb-execution/src/test/java/io/confluent/ksql/execution/expression/formatter/ExpressionFormatterTest.java index d7b59b0c9f90..2b55a5c71d61 100644 --- a/ksqldb-execution/src/test/java/io/confluent/ksql/execution/expression/formatter/ExpressionFormatterTest.java +++ b/ksqldb-execution/src/test/java/io/confluent/ksql/execution/expression/formatter/ExpressionFormatterTest.java @@ -26,6 +26,7 @@ import io.confluent.ksql.execution.expression.tree.ArithmeticUnaryExpression; import io.confluent.ksql.execution.expression.tree.BetweenPredicate; import io.confluent.ksql.execution.expression.tree.BooleanLiteral; +import io.confluent.ksql.execution.expression.tree.BytesLiteral; import io.confluent.ksql.execution.expression.tree.Cast; import io.confluent.ksql.execution.expression.tree.ComparisonExpression; import io.confluent.ksql.execution.expression.tree.CreateArrayExpression; @@ -72,6 +73,7 @@ import io.confluent.ksql.schema.ksql.types.SqlTypes; import io.confluent.ksql.schema.utils.FormatOptions; import java.math.BigDecimal; +import java.nio.ByteBuffer; import java.sql.Date; import java.sql.Time; import java.sql.Timestamp; @@ -89,6 +91,13 @@ public void shouldFormatBooleanLiteral() { assertThat(ExpressionFormatter.formatExpression(new BooleanLiteral("true")), equalTo("true")); } + @Test + public void shouldFormatBytesLiteral() { + assertThat(ExpressionFormatter.formatExpression(new BytesLiteral(ByteBuffer.wrap(new byte[] {123, 45}))), equalTo("ByteBuffer.wrap(new byte[]{123,45})")); + assertThat(ExpressionFormatter.formatExpression(new BytesLiteral(ByteBuffer.wrap(new byte[] {}))), equalTo("ByteBuffer.wrap(new byte[]{})")); + assertThat(ExpressionFormatter.formatExpression(new BytesLiteral(ByteBuffer.wrap(new byte[] {123}))), equalTo("ByteBuffer.wrap(new byte[]{123})")); + } + @Test public void shouldFormatStringLiteral() { assertThat(ExpressionFormatter.formatExpression(new StringLiteral("string")), equalTo("'string'")); diff --git a/ksqldb-execution/src/test/java/io/confluent/ksql/execution/util/ExpressionTypeManagerTest.java b/ksqldb-execution/src/test/java/io/confluent/ksql/execution/util/ExpressionTypeManagerTest.java index c33a55a161dd..3c5991edee46 100644 --- a/ksqldb-execution/src/test/java/io/confluent/ksql/execution/util/ExpressionTypeManagerTest.java +++ b/ksqldb-execution/src/test/java/io/confluent/ksql/execution/util/ExpressionTypeManagerTest.java @@ -39,6 +39,7 @@ import com.google.common.collect.ImmutableMap; import io.confluent.ksql.execution.expression.tree.ArithmeticBinaryExpression; import io.confluent.ksql.execution.expression.tree.BooleanLiteral; +import io.confluent.ksql.execution.expression.tree.BytesLiteral; import io.confluent.ksql.execution.expression.tree.ComparisonExpression; import io.confluent.ksql.execution.expression.tree.ComparisonExpression.Type; import io.confluent.ksql.execution.expression.tree.CreateArrayExpression; @@ -90,6 +91,7 @@ import io.confluent.ksql.schema.ksql.types.SqlType; import io.confluent.ksql.schema.ksql.types.SqlTypes; import io.confluent.ksql.util.KsqlException; +import java.nio.ByteBuffer; import java.sql.Date; import java.sql.Time; import java.util.Optional; @@ -1069,6 +1071,11 @@ public void shouldProcessDateLiteral() { assertThat(expressionTypeManager.getExpressionSqlType(new DateLiteral(new Date(86400000))), is(SqlTypes.DATE)); } + @Test + public void shouldProcessBytesLiteral() { + assertThat(expressionTypeManager.getExpressionSqlType(new BytesLiteral(ByteBuffer.wrap(new byte[] {123}))), is(SqlTypes.BYTES)); + } + @Test public void shouldReturnBooleanForInPredicate() { // Given: diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/util/ApiSqlValueCoercerTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/util/ApiSqlValueCoercerTest.java index f012c04a73a3..670c7f32cbd5 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/util/ApiSqlValueCoercerTest.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/util/ApiSqlValueCoercerTest.java @@ -31,6 +31,7 @@ import io.vertx.core.json.JsonArray; import io.vertx.core.json.JsonObject; import java.math.BigDecimal; +import java.nio.ByteBuffer; import java.sql.Date; import java.sql.Time; import java.sql.Timestamp; @@ -59,6 +60,7 @@ public class ApiSqlValueCoercerTest { .put(SqlBaseType.TIME, SqlTypes.TIME) .put(SqlBaseType.DATE, SqlTypes.DATE) .put(SqlBaseType.TIMESTAMP, SqlTypes.TIMESTAMP) + .put(SqlBaseType.BYTES, SqlTypes.BYTES) .put(SqlBaseType.ARRAY, SqlTypes.array(SqlTypes.BIGINT)) .put(SqlBaseType.MAP, SqlTypes.map(SqlTypes.STRING, SqlTypes.BIGINT)) .put(SqlBaseType.STRUCT, SqlTypes.struct().field("fred", SqlTypes.INTEGER).build()) @@ -75,6 +77,7 @@ public class ApiSqlValueCoercerTest { .put(SqlBaseType.TIME, new Time(300)) .put(SqlBaseType.DATE, new Date(300)) .put(SqlBaseType.TIMESTAMP, new Timestamp(300)) + .put(SqlBaseType.BYTES, ByteBuffer.wrap(new byte[] {123})) .put(SqlBaseType.ARRAY, new JsonArray().add(1L).add(2L)) .put(SqlBaseType.MAP, new JsonObject().put("k", 1L)) .put(SqlBaseType.STRUCT, new JsonObject().put("fred", 11)) @@ -108,6 +111,7 @@ public void shouldNotCoerceToBoolean() { assertThat(coercer.coerce(new Timestamp(3213), SqlTypes.BOOLEAN), is(Result.failure())); assertThat(coercer.coerce(new Time(3213), SqlTypes.BOOLEAN), is(Result.failure())); assertThat(coercer.coerce(new Date(3213), SqlTypes.BOOLEAN), is(Result.failure())); + assertThat(coercer.coerce(ByteBuffer.wrap(new byte[] {123}), SqlTypes.BOOLEAN), is(Result.failure())); } @Test @@ -125,6 +129,7 @@ public void shouldNotCoerceToInteger() { assertThat(coercer.coerce(new Timestamp(3213), SqlTypes.INTEGER), is(Result.failure())); assertThat(coercer.coerce(new Time(3213), SqlTypes.INTEGER), is(Result.failure())); assertThat(coercer.coerce(new Date(3213), SqlTypes.INTEGER), is(Result.failure())); + assertThat(coercer.coerce(ByteBuffer.wrap(new byte[] {123}), SqlTypes.INTEGER), is(Result.failure())); } @Test @@ -142,6 +147,7 @@ public void shouldNotCoerceToBigInt() { assertThat(coercer.coerce(new Timestamp(3213), SqlTypes.BIGINT), is(Result.failure())); assertThat(coercer.coerce(new Time(3213), SqlTypes.BIGINT), is(Result.failure())); assertThat(coercer.coerce(new Date(3213), SqlTypes.BIGINT), is(Result.failure())); + assertThat(coercer.coerce(ByteBuffer.wrap(new byte[] {123}), SqlTypes.BIGINT), is(Result.failure())); } @Test @@ -164,6 +170,7 @@ public void shouldNotCoerceToDecimal() { assertThat(coercer.coerce(new Timestamp(3213), decimalType), is(Result.failure())); assertThat(coercer.coerce(new Time(3213), decimalType), is(Result.failure())); assertThat(coercer.coerce(new Date(3213), decimalType), is(Result.failure())); + assertThat(coercer.coerce(ByteBuffer.wrap(new byte[] {123}), decimalType), is(Result.failure())); } @Test @@ -181,6 +188,7 @@ public void shouldNotCoerceToDouble() { assertThat(coercer.coerce(new Timestamp(3213), SqlTypes.DOUBLE), is(Result.failure())); assertThat(coercer.coerce(new Time(3213), SqlTypes.DOUBLE), is(Result.failure())); assertThat(coercer.coerce(new Date(3213), SqlTypes.DOUBLE), is(Result.failure())); + assertThat(coercer.coerce(ByteBuffer.wrap(new byte[] {123}), SqlTypes.DOUBLE), is(Result.failure())); } @Test @@ -201,6 +209,7 @@ public void shouldNotCoerceToArray() { assertThat(coercer.coerce(new Timestamp(3213), arrayType), is(Result.failure())); assertThat(coercer.coerce(new Time(3213), arrayType), is(Result.failure())); assertThat(coercer.coerce(new Date(3213), arrayType), is(Result.failure())); + assertThat(coercer.coerce(ByteBuffer.wrap(new byte[] {123}), arrayType), is(Result.failure())); assertThat(coercer.coerce(ImmutableMap.of("foo", 1), arrayType), is(Result.failure())); assertThat(coercer.coerce(new JsonObject().put("foo", 1), arrayType), is(Result.failure())); } @@ -223,6 +232,7 @@ public void shouldNotCoerceToMap() { assertThat(coercer.coerce(new Timestamp(3213), mapType), is(Result.failure())); assertThat(coercer.coerce(new Time(3213), mapType), is(Result.failure())); assertThat(coercer.coerce(new Date(3213), mapType), is(Result.failure())); + assertThat(coercer.coerce(ByteBuffer.wrap(new byte[] {123}), mapType), is(Result.failure())); assertThat(coercer.coerce(ImmutableList.of("foo"), mapType), is(Result.failure())); assertThat(coercer.coerce(new JsonArray().add("foo"), mapType), is(Result.failure())); } @@ -312,6 +322,7 @@ public void shouldNotCoerceToString() { assertThat(coercer.coerce(new Time(1000L), SqlTypes.STRING), is(Result.failure())); assertThat(coercer.coerce(new Date(1000L), SqlTypes.STRING), is(Result.failure())); assertThat(coercer.coerce(new BigDecimal(123), SqlTypes.STRING), is(Result.failure())); + assertThat(coercer.coerce(ByteBuffer.wrap(new byte[] {123}), SqlTypes.STRING), is(Result.failure())); } @Test @@ -330,6 +341,7 @@ public void shouldNotCoerceToTimestamp() { assertThat(coercer.coerce(new Time(1000L), SqlTypes.TIMESTAMP), is(Result.failure())); assertThat(coercer.coerce(new Date(1000L), SqlTypes.TIMESTAMP), is(Result.failure())); assertThat(coercer.coerce(new BigDecimal(123), SqlTypes.TIMESTAMP), is(Result.failure())); + assertThat(coercer.coerce(ByteBuffer.wrap(new byte[] {123}), SqlTypes.TIMESTAMP), is(Result.failure())); } @Test @@ -348,6 +360,7 @@ public void shouldNotCoerceToTime() { assertThat(coercer.coerce(new Timestamp(1000L), SqlTypes.TIME), is(Result.failure())); assertThat(coercer.coerce(new Date(1000L), SqlTypes.TIME), is(Result.failure())); assertThat(coercer.coerce(new BigDecimal(123), SqlTypes.TIME), is(Result.failure())); + assertThat(coercer.coerce(ByteBuffer.wrap(new byte[] {123}), SqlTypes.TIME), is(Result.failure())); } @Test @@ -366,6 +379,25 @@ public void shouldNotCoerceToDate() { assertThat(coercer.coerce(new Time(1000L), SqlTypes.DATE), is(Result.failure())); assertThat(coercer.coerce(new Timestamp(1000L), SqlTypes.DATE), is(Result.failure())); assertThat(coercer.coerce(new BigDecimal(123), SqlTypes.DATE), is(Result.failure())); + assertThat(coercer.coerce(ByteBuffer.wrap(new byte[] {123}), SqlTypes.DATE), is(Result.failure())); + } + + @Test + public void shouldCoerceToBytes() { + assertThat(coercer.coerce(ByteBuffer.wrap(new byte[] {123}), SqlTypes.BYTES), is(Result.of(ByteBuffer.wrap(new byte[] {123})))); + } + + @Test + public void shouldNotCoerceToBytes() { + assertThat(coercer.coerce(true, SqlTypes.BYTES), is(Result.failure())); + assertThat(coercer.coerce(1, SqlTypes.BYTES), is(Result.failure())); + assertThat(coercer.coerce(1L, SqlTypes.BYTES), is(Result.failure())); + assertThat(coercer.coerce(1.0d, SqlTypes.BYTES), is(Result.failure())); + assertThat(coercer.coerce("aaa", SqlTypes.BYTES), is(Result.failure())); + assertThat(coercer.coerce(new Time(1000L), SqlTypes.BYTES), is(Result.failure())); + assertThat(coercer.coerce(new Date(3213), SqlTypes.BYTES), is(Result.failure())); + assertThat(coercer.coerce(new Timestamp(1000L), SqlTypes.BYTES), is(Result.failure())); + assertThat(coercer.coerce(new BigDecimal(123), SqlTypes.BYTES), is(Result.failure())); } @Test diff --git a/ksqldb-udf/src/main/java/io/confluent/ksql/schema/ksql/JavaToSqlConverter.java b/ksqldb-udf/src/main/java/io/confluent/ksql/schema/ksql/JavaToSqlConverter.java index d43a3563da16..20c315896fb1 100644 --- a/ksqldb-udf/src/main/java/io/confluent/ksql/schema/ksql/JavaToSqlConverter.java +++ b/ksqldb-udf/src/main/java/io/confluent/ksql/schema/ksql/JavaToSqlConverter.java @@ -21,6 +21,7 @@ import io.confluent.ksql.schema.utils.SchemaException; import io.confluent.ksql.types.KsqlStruct; import java.math.BigDecimal; +import java.nio.ByteBuffer; import java.sql.Date; import java.sql.Time; import java.sql.Timestamp; @@ -47,6 +48,7 @@ class JavaToSqlConverter implements JavaToSqlTypeConverter { .put(Date.class, SqlBaseType.DATE) .put(Time.class, SqlBaseType.TIME) .put(Timestamp.class, SqlBaseType.TIMESTAMP) + .put(ByteBuffer.class, SqlBaseType.BYTES) .build(); @Override diff --git a/ksqldb-udf/src/main/java/io/confluent/ksql/schema/ksql/types/SqlBaseType.java b/ksqldb-udf/src/main/java/io/confluent/ksql/schema/ksql/types/SqlBaseType.java index 2ce024f50602..febd40d7ff44 100644 --- a/ksqldb-udf/src/main/java/io/confluent/ksql/schema/ksql/types/SqlBaseType.java +++ b/ksqldb-udf/src/main/java/io/confluent/ksql/schema/ksql/types/SqlBaseType.java @@ -22,7 +22,8 @@ * The SQL types supported by KSQL. */ public enum SqlBaseType { - BOOLEAN, INTEGER, BIGINT, DECIMAL, DOUBLE, STRING, ARRAY, MAP, STRUCT, TIME, DATE, TIMESTAMP; + BOOLEAN, INTEGER, BIGINT, DECIMAL, DOUBLE, STRING, ARRAY, MAP, STRUCT, TIME, DATE, TIMESTAMP, + BYTES; /** * @return {@code true} if numeric type. diff --git a/ksqldb-udf/src/main/java/io/confluent/ksql/schema/ksql/types/SqlPrimitiveType.java b/ksqldb-udf/src/main/java/io/confluent/ksql/schema/ksql/types/SqlPrimitiveType.java index 400fc983b1c7..41c958d90add 100644 --- a/ksqldb-udf/src/main/java/io/confluent/ksql/schema/ksql/types/SqlPrimitiveType.java +++ b/ksqldb-udf/src/main/java/io/confluent/ksql/schema/ksql/types/SqlPrimitiveType.java @@ -39,6 +39,7 @@ public final class SqlPrimitiveType extends SqlType { .put(SqlBaseType.TIME, new SqlPrimitiveType(SqlBaseType.TIME)) .put(SqlBaseType.DATE, new SqlPrimitiveType(SqlBaseType.DATE)) .put(SqlBaseType.TIMESTAMP, new SqlPrimitiveType(SqlBaseType.TIMESTAMP)) + .put(SqlBaseType.BYTES, new SqlPrimitiveType(SqlBaseType.BYTES)) .build(); private static final ImmutableSet PRIMITIVE_TYPE_NAMES = ImmutableSet.builder() diff --git a/ksqldb-udf/src/main/java/io/confluent/ksql/schema/ksql/types/SqlTypes.java b/ksqldb-udf/src/main/java/io/confluent/ksql/schema/ksql/types/SqlTypes.java index 73f43c36de0d..17e65bfd04e4 100644 --- a/ksqldb-udf/src/main/java/io/confluent/ksql/schema/ksql/types/SqlTypes.java +++ b/ksqldb-udf/src/main/java/io/confluent/ksql/schema/ksql/types/SqlTypes.java @@ -28,6 +28,7 @@ private SqlTypes() { public static final SqlPrimitiveType TIME = SqlPrimitiveType.of(SqlBaseType.TIME); public static final SqlPrimitiveType DATE = SqlPrimitiveType.of(SqlBaseType.DATE); public static final SqlPrimitiveType TIMESTAMP = SqlPrimitiveType.of(SqlBaseType.TIMESTAMP); + public static final SqlPrimitiveType BYTES = SqlPrimitiveType.of(SqlBaseType.BYTES); public static SqlDecimal decimal(final int precision, final int scale) { return SqlDecimal.of(precision, scale);