Skip to content

Commit

Permalink
feat: add BYTES type to ksqlDB (#7778)
Browse files Browse the repository at this point in the history
* feat: add BYTES type to ksqlDB

* use byte array for ByteLIteral

* return readonlybuffer

* spotbugs
  • Loading branch information
Zara Lim authored Jul 14, 2021
1 parent 1fd2ff1 commit 06657ba
Show file tree
Hide file tree
Showing 32 changed files with 315 additions and 6 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright 2021 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"; you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.function.types;

public class BytesType extends ObjectType {
public static final BytesType INSTANCE = new BytesType();

@Override
public int hashCode() {
return 10;
}

@Override
public boolean equals(final Object obj) {
return obj instanceof BytesType;
}

@Override
public String toString() {
return "BYTES";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ private ParamTypes() {
public static final DateType DATE = DateType.INSTANCE;
public static final TimestampType TIMESTAMP = TimestampType.INSTANCE;
public static final IntervalUnitType INTERVALUNIT = IntervalUnitType.INSTANCE;
public static final BytesType BYTES = BytesType.INSTANCE;

public static boolean areCompatible(final SqlArgument actual, final ParamType declared) {
return areCompatible(actual, declared, false);
Expand Down Expand Up @@ -159,6 +160,7 @@ private static boolean isPrimitiveMatch(
|| base == SqlBaseType.TIME && declared instanceof TimeType
|| base == SqlBaseType.DATE && declared instanceof DateType
|| base == SqlBaseType.TIMESTAMP && declared instanceof TimestampType
|| base == SqlBaseType.BYTES && declared instanceof BytesType
|| allowCast && base.canImplicitlyCast(functionToSqlBaseConverter().toBaseType(declared));
// CHECKSTYLE_RULES.ON: BooleanExpressionComplexity
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import io.confluent.ksql.util.DecimalUtil;
import io.confluent.ksql.util.KsqlException;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
Expand Down Expand Up @@ -244,9 +245,12 @@ private static SqlType sqlType(final Schema schema) {
return handler.apply(schema);
}

private static SqlDecimal handleBytes(final Schema schema) {
DecimalUtil.requireDecimal(schema);
return SqlDecimal.of(DecimalUtil.precision(schema), DecimalUtil.scale(schema));
private static SqlType handleBytes(final Schema schema) {
if (DecimalUtil.isDecimal(schema)) {
return SqlDecimal.of(DecimalUtil.precision(schema), DecimalUtil.scale(schema));
} else {
return SqlTypes.BYTES;
}
}

private static SqlArray toSqlArray(final Schema schema) {
Expand Down Expand Up @@ -293,6 +297,7 @@ private static final class ConnectFromSqlConverter implements SqlToConnectTypeCo
.put(SqlBaseType.TIME, t -> Time.builder().optional())
.put(SqlBaseType.DATE, t -> Date.builder().optional())
.put(SqlBaseType.TIMESTAMP, t -> Timestamp.builder().optional())
.put(SqlBaseType.BYTES, t -> SchemaBuilder.bytes().optional())
.build();

@Override
Expand Down Expand Up @@ -360,6 +365,7 @@ private static class JavaToSqlConverter implements JavaToSqlTypeConverter {
.put(java.sql.Time.class, SqlBaseType.TIME)
.put(java.sql.Date.class, SqlBaseType.DATE)
.put(java.sql.Timestamp.class, SqlBaseType.TIMESTAMP)
.put(ByteBuffer.class, SqlBaseType.BYTES)
.build();

@Override
Expand Down Expand Up @@ -400,6 +406,7 @@ private static class FunctionToSql implements FunctionToSqlConverter {
.put(ParamTypes.TIME, SqlTypes.TIME)
.put(ParamTypes.DATE, SqlTypes.DATE)
.put(ParamTypes.TIMESTAMP, SqlTypes.TIMESTAMP)
.put(ParamTypes.BYTES, SqlTypes.BYTES)
.build();

@Override
Expand Down Expand Up @@ -444,6 +451,7 @@ private static class FunctionToSqlBase implements FunctionToSqlBaseConverter {
.put(ParamTypes.TIME, SqlBaseType.TIME)
.put(ParamTypes.DATE, SqlBaseType.DATE)
.put(ParamTypes.TIMESTAMP, SqlBaseType.TIMESTAMP)
.put(ParamTypes.BYTES, SqlBaseType.BYTES)
.build();

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ public final class SqlTypeWalker {
.put(SqlBaseType.TIME, (v, t) -> v.visitTime((SqlPrimitiveType) t))
.put(SqlBaseType.DATE, (v, t) -> v.visitDate((SqlPrimitiveType) t))
.put(SqlBaseType.TIMESTAMP, (v, t) -> v.visitTimestamp((SqlPrimitiveType) t))
.put(SqlBaseType.BYTES, (v, t) -> v.visitBytes((SqlPrimitiveType) t))
.put(SqlBaseType.ARRAY, SqlTypeWalker::visitArray)
.put(SqlBaseType.MAP, SqlTypeWalker::visitMap)
.put(SqlBaseType.STRUCT, SqlTypeWalker::visitStruct)
Expand Down Expand Up @@ -100,6 +101,10 @@ default S visitTimestamp(final SqlPrimitiveType type) {
return visitPrimitive(type);
}

default S visitBytes(final SqlPrimitiveType type) {
return visitPrimitive(type);
}

default S visitArray(final SqlArray type, final S element) {
return visitType(type);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static io.confluent.ksql.schema.Operator.SUBTRACT;
import static io.confluent.ksql.schema.ksql.types.SqlTypes.BIGINT;
import static io.confluent.ksql.schema.ksql.types.SqlTypes.BOOLEAN;
import static io.confluent.ksql.schema.ksql.types.SqlTypes.BYTES;
import static io.confluent.ksql.schema.ksql.types.SqlTypes.DATE;
import static io.confluent.ksql.schema.ksql.types.SqlTypes.DOUBLE;
import static io.confluent.ksql.schema.ksql.types.SqlTypes.INTEGER;
Expand Down Expand Up @@ -66,6 +67,7 @@ public class OperatorTest {
.put(SqlBaseType.TIME, TIME)
.put(SqlBaseType.DATE, DATE)
.put(SqlBaseType.TIMESTAMP, TIMESTAMP)
.put(SqlBaseType.BYTES, BYTES)
.put(SqlBaseType.ARRAY, SqlTypes.array(BIGINT))
.put(SqlBaseType.MAP, SqlTypes.map(SqlTypes.STRING, INTEGER))
.put(SqlBaseType.STRUCT, SqlTypes.struct().field("f", INTEGER).build())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import io.confluent.ksql.util.DecimalUtil;
import io.confluent.ksql.util.KsqlException;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.sql.Date;
import java.sql.Time;
import java.sql.Timestamp;
Expand All @@ -65,6 +66,7 @@ public class SchemaConvertersTest {
private static final Schema CONNECT_BIGINT_SCHEMA = SchemaBuilder.int64().optional().build();
private static final Schema CONNECT_DOUBLE_SCHEMA = SchemaBuilder.float64().optional().build();
private static final Schema CONNECT_STRING_SCHEMA = SchemaBuilder.string().optional().build();
private static final Schema CONNECT_BYTES_SCHEMA = SchemaBuilder.bytes().optional().build();
private static final Schema CONNECT_TIME_SCHEMA =
org.apache.kafka.connect.data.Time.builder().optional().schema();
private static final Schema CONNECT_DATE_SCHEMA =
Expand All @@ -81,6 +83,7 @@ public class SchemaConvertersTest {
.put(SqlTypes.TIME, CONNECT_TIME_SCHEMA)
.put(SqlTypes.DATE, CONNECT_DATE_SCHEMA)
.put(SqlTypes.TIMESTAMP, CONNECT_TIMESTAMP_SCHEMA)
.put(SqlTypes.BYTES, CONNECT_BYTES_SCHEMA)
.put(SqlArray.of(SqlTypes.INTEGER), SchemaBuilder
.array(Schema.OPTIONAL_INT32_SCHEMA)
.optional()
Expand Down Expand Up @@ -115,6 +118,7 @@ public class SchemaConvertersTest {
.put(SqlBaseType.TIME, Time.class)
.put(SqlBaseType.DATE, Date.class)
.put(SqlBaseType.TIMESTAMP, Timestamp.class)
.put(SqlBaseType.BYTES, ByteBuffer.class)
.build();

private static final BiMap<SqlType, ParamType> SQL_TO_FUNCTION = ImmutableBiMap
Expand All @@ -127,6 +131,7 @@ public class SchemaConvertersTest {
.put(SqlTypes.TIME, ParamTypes.TIME)
.put(SqlTypes.DATE, ParamTypes.DATE)
.put(SqlTypes.TIMESTAMP, ParamTypes.TIMESTAMP)
.put(SqlTypes.BYTES, ParamTypes.BYTES)
.put(SqlArray.of(SqlTypes.INTEGER), ArrayType.of(ParamTypes.INTEGER))
.put(SqlDecimal.of(2, 1), ParamTypes.DECIMAL)
.put(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import static io.confluent.ksql.schema.ksql.types.SqlBaseType.ARRAY;
import static io.confluent.ksql.schema.ksql.types.SqlBaseType.BIGINT;
import static io.confluent.ksql.schema.ksql.types.SqlBaseType.BOOLEAN;
import static io.confluent.ksql.schema.ksql.types.SqlBaseType.BYTES;
import static io.confluent.ksql.schema.ksql.types.SqlBaseType.DATE;
import static io.confluent.ksql.schema.ksql.types.SqlBaseType.DECIMAL;
import static io.confluent.ksql.schema.ksql.types.SqlBaseType.DOUBLE;
Expand Down Expand Up @@ -323,6 +324,8 @@ private static final class Rules {
.put(key(TIMESTAMP, TIMESTAMP), Coercer.PASS_THROUGH)
.put(key(TIME, TIME), Coercer.PASS_THROUGH)
.put(key(DATE, DATE), Coercer.PASS_THROUGH)
// BYTES:
.put(key(BYTES, BYTES), Coercer.PASS_THROUGH)
.build();

private static final ImmutableMap<SupportedCoercion, Coercer> LAX_ADDITIONAL =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import static io.confluent.ksql.schema.ksql.types.SqlTypes.BIGINT;
import static io.confluent.ksql.schema.ksql.types.SqlTypes.BOOLEAN;
import static io.confluent.ksql.schema.ksql.types.SqlTypes.BYTES;
import static io.confluent.ksql.schema.ksql.types.SqlTypes.DATE;
import static io.confluent.ksql.schema.ksql.types.SqlTypes.DOUBLE;
import static io.confluent.ksql.schema.ksql.types.SqlTypes.INTEGER;
Expand Down Expand Up @@ -50,6 +51,7 @@
import io.confluent.ksql.schema.ksql.types.SqlType;
import io.confluent.ksql.schema.ksql.types.SqlTypes;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.sql.Date;
import java.sql.Time;
import java.sql.Timestamp;
Expand Down Expand Up @@ -315,6 +317,11 @@ public static class LaxValueCoercionTest {
.put(STRING, laxOnly("1990-03-03"))
.build()
)
// BYTES:
.put(ByteBuffer.wrap(new byte[] {110}), ImmutableMap.<SqlType, Object>builder()
.put(BYTES, ByteBuffer.wrap(new byte[] {110}))
.build()
)
// ARRAY:
.put(ImmutableList.of(), ImmutableMap.<SqlType, Object>builder()
.put(array(BOOLEAN), ImmutableList.of())
Expand All @@ -329,6 +336,7 @@ public static class LaxValueCoercionTest {
.put(array(TIME), ImmutableList.of())
.put(array(DATE), ImmutableList.of())
.put(array(TIMESTAMP), ImmutableList.of())
.put(array(BYTES), ImmutableList.of())
.build()
)
.put(ImmutableList.of(true, false), ImmutableMap.<SqlType, Object>builder()
Expand Down Expand Up @@ -432,6 +440,10 @@ public static class LaxValueCoercionTest {
struct().field("def", DATE).build(),
createStruct(struct().field("def", DATE).build())
)
.put(
struct().field("ghi", BYTES).build(),
createStruct(struct().field("ghi", BYTES).build())
)
.build()
)
.put(createStruct(SqlTypes.struct()
Expand Down Expand Up @@ -1018,7 +1030,8 @@ public void shouldFailIfNewSqlBaseTypeAdded() {
is(ImmutableSet.of(
SqlBaseType.BOOLEAN, SqlBaseType.INTEGER, SqlBaseType.BIGINT, SqlBaseType.DECIMAL,
SqlBaseType.DOUBLE, SqlBaseType.STRING, SqlBaseType.ARRAY, SqlBaseType.MAP,
SqlBaseType.STRUCT, SqlBaseType.TIME, SqlBaseType.DATE, SqlBaseType.TIMESTAMP
SqlBaseType.STRUCT, SqlBaseType.TIME, SqlBaseType.DATE, SqlBaseType.TIMESTAMP,
SqlBaseType.BYTES
))
);
}
Expand Down Expand Up @@ -1092,6 +1105,7 @@ private static final class TypeInstances {
.put(SqlBaseType.TIME, SqlTypes.TIME)
.put(SqlBaseType.DATE, SqlTypes.DATE)
.put(SqlBaseType.TIMESTAMP, SqlTypes.TIMESTAMP)
.put(SqlBaseType.BYTES, BYTES)
.build();

static SqlType typeInstanceFor(final SqlBaseType baseType) {
Expand All @@ -1118,6 +1132,7 @@ private static final class InstanceInstances {
.put(SqlBaseType.TIME, new Time(1000L))
.put(SqlBaseType.DATE, new Date(636451200000L))
.put(SqlBaseType.TIMESTAMP, new Timestamp(1535792475000L))
.put(SqlBaseType.BYTES, ByteBuffer.wrap(new byte[] {88, 34, 120}))
.build();

@SuppressWarnings("fallthrough")
Expand Down Expand Up @@ -1225,6 +1240,9 @@ private static final class SupportedCoercions {
.put(SqlBaseType.TIMESTAMP, ImmutableSet.<SqlBaseType>builder()
.add(SqlBaseType.TIMESTAMP)
.build())
.put(SqlBaseType.BYTES, ImmutableSet.<SqlBaseType>builder()
.add(SqlBaseType.BYTES)
.build())
.build();

private static final ImmutableMap<SqlBaseType, ImmutableSet<SqlBaseType>> LAX_ADDITIONAL =
Expand Down Expand Up @@ -1266,6 +1284,9 @@ private static final class SupportedCoercions {
.put(SqlBaseType.DATE, ImmutableSet.<SqlBaseType>builder()
.add(SqlBaseType.STRING)
.build())
.put(SqlBaseType.BYTES, ImmutableSet.<SqlBaseType>builder()
.add(SqlBaseType.BYTES)
.build())
.build();

private static boolean supported(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.confluent.ksql.execution.expression.tree.ArithmeticUnaryExpression;
import io.confluent.ksql.execution.expression.tree.BetweenPredicate;
import io.confluent.ksql.execution.expression.tree.BooleanLiteral;
import io.confluent.ksql.execution.expression.tree.BytesLiteral;
import io.confluent.ksql.execution.expression.tree.Cast;
import io.confluent.ksql.execution.expression.tree.ComparisonExpression;
import io.confluent.ksql.execution.expression.tree.CreateArrayExpression;
Expand Down Expand Up @@ -542,5 +543,12 @@ public Expression visitTimestampLiteral(
final C context) {
return plugin.apply(node, new Context<>(context, this)).orElse(node);
}

@Override
public Expression visitBytesLiteral(
final BytesLiteral node,
final C context) {
return plugin.apply(node, new Context<>(context, this)).orElse(node);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.lang.reflect.Type;
import java.lang.reflect.TypeVariable;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.sql.Date;
import java.sql.Time;
import java.sql.Timestamp;
Expand Down Expand Up @@ -65,6 +66,7 @@ class UdafTypes {
.add(Function.class)
.add(BiFunction.class)
.add(TriFunction.class)
.add(ByteBuffer.class)
.build();

private final Type inputType;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import io.confluent.ksql.execution.expression.tree.ArithmeticUnaryExpression;
import io.confluent.ksql.execution.expression.tree.BetweenPredicate;
import io.confluent.ksql.execution.expression.tree.BooleanLiteral;
import io.confluent.ksql.execution.expression.tree.BytesLiteral;
import io.confluent.ksql.execution.expression.tree.Cast;
import io.confluent.ksql.execution.expression.tree.ComparisonExpression;
import io.confluent.ksql.execution.expression.tree.CreateArrayExpression;
Expand Down Expand Up @@ -333,6 +334,16 @@ public Pair<String, SqlType> visitInListExpression(
return visitUnsupported(inListExpression);
}

@Override
public Pair<String, SqlType> visitBytesLiteral(
final BytesLiteral node, final Context context
) {
return new Pair<>(
node.toString(),
SqlTypes.BYTES
);
}

@Override
public Pair<String, SqlType> visitTimestampLiteral(
final TimestampLiteral node, final Context context
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,11 @@ public String visitTimestamp(final SqlPrimitiveType type) {
return "SqlTypes.TIMESTAMP";
}

@Override
public String visitBytes(final SqlPrimitiveType type) {
return "SqlTypes.BYTES";
}

@Override
public String visitDecimal(final SqlDecimal type) {
return "SqlTypes.decimal(" + type.getPrecision() + "," + type.getScale() + ")";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import io.confluent.ksql.execution.expression.tree.ArithmeticUnaryExpression;
import io.confluent.ksql.execution.expression.tree.BetweenPredicate;
import io.confluent.ksql.execution.expression.tree.BooleanLiteral;
import io.confluent.ksql.execution.expression.tree.BytesLiteral;
import io.confluent.ksql.execution.expression.tree.Cast;
import io.confluent.ksql.execution.expression.tree.ComparisonExpression;
import io.confluent.ksql.execution.expression.tree.CreateArrayExpression;
Expand Down Expand Up @@ -66,6 +67,7 @@
import java.util.List;
import java.util.Locale;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;

public final class ExpressionFormatter {

Expand Down Expand Up @@ -190,6 +192,12 @@ public String visitDecimalLiteral(final DecimalLiteral node, final Context conte
return node.getValue().toString();
}

@Override
public String visitBytesLiteral(final BytesLiteral bytesLiteral, final Context context) {
return "ByteBuffer.wrap(new byte[]{"
+ StringUtils.join(bytesLiteral.getByteArray(), ',') + "})";
}

@Override
public String visitTimeLiteral(final TimeLiteral node, final Context context) {
return SqlTimeTypes.formatTime(node.getValue());
Expand Down
Loading

0 comments on commit 06657ba

Please sign in to comment.