Skip to content

Commit

Permalink
fix: standardize KSQL up-casting (#3516)
Browse files Browse the repository at this point in the history
* fix: standardize KSQL up-casting

Logic for controlling what types can be up-cast to another, e.g. up-casting an INT to a BIGINT was spread across different parts of the code base, had no tests to ensure all parts were consistent and hence was inconsistent.

Up-casting is primarily used during arithmetic binary operations, e.g. adding two numbers together, and when trying to coerce user supplied values in `INSERT VALUE` statements to the required SQL types.

Numeric Up-casting rules for KSQL are:

- `INT` can be up-cast to `BIGINT`
- `BIGINT` can be up-cast to `DECIMAL`
- `DECIMAL` can be up-cast to `DOUBLE`.

In the existing code:

- `SqlBaseType` has a `canUpCast` method, but it doesn't take `DECIMAL`s into account.
- `SqlBaseType` has an `isNumber` method, but it doesn't treat `DECIMAL` as a numeric type.
- `SchemaUtil` has the logic on how to resolve the resulting (connect) schema given an `Operation` and two input types.
- `DefaultSqlValueCoercer`, allowed coercion of `DOUBLE` -> `DECIMAL`, which is against out up-casting rules.

This PR looks to make the code a bit more object orientated and hopefully better structured.

With this change:
- `SqlBaseType`'s `canUpCast` and `isNumber` methodss correctly handle `DECIMAL`.
- Any type can be up-cast to itself. Only numeric types can be up-cast to other types and the rules are encapsulated in `SqlBaseType.canUpCast`.
- The logic on how to resolve the resulting SQL type given an `Operation` and two input types now lives in `Operation`, making use of the decimal specific handling which now lives in `SqlDecimal`.
- The `SqlDecimal` type an `INT` or `BIGINT` is up-cast to is now stored in `SqlTypes`.

However, the main benefit of this commit is the addition of tests in `DefaultSqlValueCoercer` and `OperatorTest` to ensure that these two classes follow the up-casting rules.
  • Loading branch information
big-andy-coates authored Oct 11, 2019
1 parent ed94895 commit 7fe8772
Show file tree
Hide file tree
Showing 27 changed files with 864 additions and 598 deletions.
78 changes: 71 additions & 7 deletions ksql-common/src/main/java/io/confluent/ksql/schema/Operator.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,84 @@

package io.confluent.ksql.schema;

import static java.util.Objects.requireNonNull;

import io.confluent.ksql.schema.ksql.SqlBaseType;
import io.confluent.ksql.schema.ksql.types.SqlDecimal;
import io.confluent.ksql.schema.ksql.types.SqlType;
import io.confluent.ksql.schema.ksql.types.SqlTypes;
import io.confluent.ksql.util.KsqlException;
import java.util.function.BinaryOperator;

public enum Operator {
ADD("+"),
SUBTRACT("-"),
MULTIPLY("*"),
DIVIDE("/"),
MODULUS("%");
ADD("+", SqlDecimal::add) {
@Override
public SqlType resultType(final SqlType left, final SqlType right) {
if (left.baseType() == SqlBaseType.STRING && right.baseType() == SqlBaseType.STRING) {
return SqlTypes.STRING;
}

return super.resultType(left, right);
}
},
SUBTRACT("-", SqlDecimal::subtract),
MULTIPLY("*", SqlDecimal::multiply),
DIVIDE("/", SqlDecimal::divide),
MODULUS("%", SqlDecimal::modulus);

private final String symbol;
private final BinaryOperator<SqlDecimal> binaryResolver;

Operator(final String symbol) {
this.symbol = symbol;
Operator(final String symbol, final BinaryOperator<SqlDecimal> binaryResolver) {
this.symbol = requireNonNull(symbol, "symbol");
this.binaryResolver = requireNonNull(binaryResolver, "binaryResolver");
}

public String getSymbol() {
return symbol;
}

/**
* Determine the result type for the given parameters.
*
* @param left the left side of the operation.
* @param right the right side of the operation.
* @return the result schema.
*/
public SqlType resultType(final SqlType left, final SqlType right) {
if (left.baseType().isNumber() && right.baseType().isNumber()) {
if (left.baseType().canUpCast(right.baseType())) {
if (right.baseType() != SqlBaseType.DECIMAL) {
return right;
}

return binaryResolver.apply(toDecimal(left), (SqlDecimal) right);
}

if (right.baseType().canUpCast(left.baseType())) {
if (left.baseType() != SqlBaseType.DECIMAL) {
return left;
}

return binaryResolver.apply((SqlDecimal) left, toDecimal(right));
}
}

throw new KsqlException(
"Unsupported arithmetic types. " + left.baseType() + " " + right.baseType());
}

private static SqlDecimal toDecimal(final SqlType type) {
switch (type.baseType()) {
case DECIMAL:
return (SqlDecimal) type;
case INTEGER:
return SqlTypes.INT_UPCAST_TO_DECIMAL;
case BIGINT:
return SqlTypes.BIGINT_UPCAST_TO_DECIMAL;
default:
throw new KsqlException(
"Cannot convert " + type.baseType() + " to " + SqlBaseType.DECIMAL + ".");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,28 @@
* The SQL types supported by KSQL.
*/
public enum SqlBaseType {
BOOLEAN, INTEGER, BIGINT, DOUBLE, DECIMAL, STRING, ARRAY, MAP, STRUCT;
BOOLEAN, INTEGER, BIGINT, DECIMAL, DOUBLE, STRING, ARRAY, MAP, STRUCT;

/**
* @return {@code true} if numeric type.
*/
public boolean isNumber() {
// for now, conversions between DECIMAL and other numeric types is not supported
return this == INTEGER || this == BIGINT || this == DOUBLE;
return this == INTEGER || this == BIGINT || this == DECIMAL || this == DOUBLE;
}

/**
* Test to see if this type can be up-cast to another.
*
* <p>This defines if KSQL supports <i>implicitly</i> converting one numeric type to another.
*
* <p>Types can always be upcast to themselves. Only numeric types can be upcast to different
* numeric types. Note: STRING to DECIMAL handling is not seen as up-casting, it's parsing.
*
* @param to the target type.
* @return true if this type can be upcast to the supplied type.
*/
public boolean canUpCast(final SqlBaseType to) {
return isNumber() && to.isNumber() && this.ordinal() <= to.ordinal();
return this.equals(to)
|| (isNumber() && to.isNumber() && this.ordinal() <= to.ordinal());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -105,4 +105,75 @@ public String toString() {
public String toString(final FormatOptions formatOptions) {
return toString();
}

/**
* Determine the decimal type should two decimals be added together.
*
* @param left the left side decimal.
* @param right the right side decimal.
* @return the resulting decimal type.
*/
public static SqlDecimal add(final SqlDecimal left, final SqlDecimal right) {
final int precision = Math.max(left.scale, right.scale)
+ Math.max(left.precision - left.scale, right.precision - right.scale)
+ 1;

final int scale = Math.max(left.scale, right.scale);
return SqlDecimal.of(precision, scale);
}

/**
* Determine the decimal type should one decimal be subtracted from another.
*
* @param left the left side decimal.
* @param right the right side decimal.
* @return the resulting decimal type.
*/
public static SqlDecimal subtract(final SqlDecimal left, final SqlDecimal right) {
return add(left, right);
}

/**
* Determine the decimal type should one decimal be multiplied by another.
*
* @param left the left side decimal.
* @param right the right side decimal.
* @return the resulting decimal type.
*/
public static SqlDecimal multiply(final SqlDecimal left, final SqlDecimal right) {
final int precision = left.precision + right.precision + 1;
final int scale = left.scale + right.scale;
return SqlDecimal.of(precision, scale);
}

/**
* Determine the decimal type should one decimal be divided by another.
*
* @param left the left side decimal.
* @param right the right side decimal.
* @return the resulting decimal type.
*/
public static SqlDecimal divide(final SqlDecimal left, final SqlDecimal right) {
final int precision = left.precision - left.scale + right.scale
+ Math.max(6, left.scale + right.precision + 1);

final int scale = Math.max(6, left.scale + right.precision + 1);
return SqlDecimal.of(precision, scale);
}

/**
* Determine the decimal result type when calculating the remainder of dividing one decimal by
* another.
*
* @param left the left side decimal.
* @param right the right side decimal.
* @return the resulting decimal type.
*/
public static SqlDecimal modulus(final SqlDecimal left, final SqlDecimal right) {
final int precision = Math.min(left.precision - left.scale, right.precision - right.scale)
+ Math.max(left.scale, right.scale);

final int scale = Math.max(left.scale, right.scale);
return SqlDecimal.of(precision, scale);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,14 @@ public static SqlMap map(final SqlType valueType) {
public static SqlStruct.Builder struct() {
return SqlStruct.builder();
}

/**
* Schema of an INT up-cast to a DECIMAL
*/
public static final SqlDecimal INT_UPCAST_TO_DECIMAL = SqlDecimal.of(10, 0);

/**
* Schema of an BIGINT up-cast to a DECIMAL
*/
public static final SqlDecimal BIGINT_UPCAST_TO_DECIMAL = SqlDecimal.of(19, 0);
}
27 changes: 3 additions & 24 deletions ksql-common/src/main/java/io/confluent/ksql/util/DecimalUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package io.confluent.ksql.util;

import io.confluent.ksql.schema.ksql.types.SqlDecimal;
import io.confluent.ksql.schema.ksql.types.SqlTypes;
import java.math.BigDecimal;
import java.math.MathContext;
import java.math.RoundingMode;
Expand Down Expand Up @@ -154,28 +155,6 @@ public static BigDecimal ensureFit(final BigDecimal value, final Schema schema)
}
}

/**
* Converts a schema to a decimal schema with set precision/scale without losing
* scale or precision.
*
* @param schema the schema
* @return the decimal schema
* @throws KsqlException if the schema cannot safely be converted to decimal
*/
public static Schema toDecimal(final Schema schema) {
switch (schema.type()) {
case BYTES:
requireDecimal(schema);
return schema;
case INT32:
return builder(10, 0).build();
case INT64:
return builder(19, 0).build();
default:
throw new KsqlException("Cannot convert schema of type " + schema.type() + " to decimal.");
}
}

/**
* Converts a schema to a sql decimal with set precision/scale without losing
* scale or precision.
Expand All @@ -190,9 +169,9 @@ public static SqlDecimal toSqlDecimal(final Schema schema) {
requireDecimal(schema);
return SqlDecimal.of(precision(schema), scale(schema));
case INT32:
return SqlDecimal.of(10, 0);
return SqlTypes.INT_UPCAST_TO_DECIMAL;
case INT64:
return SqlDecimal.of(19, 0);
return SqlTypes.BIGINT_UPCAST_TO_DECIMAL;
default:
throw new KsqlException("Cannot convert schema of type " + schema.type() + " to decimal.");
}
Expand Down
Loading

0 comments on commit 7fe8772

Please sign in to comment.