Skip to content

Commit

Permalink
Refactor some code to help introducing DECIMAL types in confluentinc#842
Browse files Browse the repository at this point in the history


This refactor is needed to avoid disabling CyclomaticComplexity checkstyle
rules.
  • Loading branch information
spena committed Mar 27, 2019
1 parent cf29742 commit aee720e
Show file tree
Hide file tree
Showing 4 changed files with 129 additions and 123 deletions.
135 changes: 66 additions & 69 deletions ksql-common/src/main/java/io/confluent/ksql/util/SchemaUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.util.NavigableMap;
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.avro.SchemaBuilder.FieldAssembler;
Expand Down Expand Up @@ -83,30 +84,65 @@ public final class SchemaUtil {
.put(Schema.Type.FLOAT64, Schema.OPTIONAL_FLOAT64_SCHEMA)
.build();

private static final ImmutableMap<String, String> TYPE_MAP =
new ImmutableMap.Builder<String, String>()
.put("STRING", "VARCHAR(STRING)")
.put("INT64", "BIGINT")
.put("INT32", "INTEGER")
.put("FLOAT64", "DOUBLE")
.put("BOOLEAN", "BOOLEAN")
.put("ARRAY", "ARRAY")
.put("MAP", "MAP")
.put("STRUCT", "STRUCT")
.build();

private static final Map<Schema.Type, Function<Schema, Type>> SCHEMA_TYPE_TO_JAVA_TYPE =
ImmutableMap.<Schema.Type, Function<Schema, Type>>builder()
.put(Schema.Type.STRING, s -> String.class)
.put(Schema.Type.BOOLEAN, s -> Boolean.class)
.put(Schema.Type.INT32, s -> Integer.class)
.put(Schema.Type.INT64, s -> Long.class)
.put(Schema.Type.FLOAT64, s -> Double.class)
.put(Schema.Type.ARRAY, s -> List.class)
.put(Schema.Type.MAP, s -> Map.class)
.put(Schema.Type.STRUCT, s -> Struct.class)
.build();

private static Map<Schema.Type, Function<Schema, String>> SCHEMA_TYPE_TO_SQL_TYPE =
ImmutableMap.<Schema.Type, Function<Schema, String>>builder()
.put(Schema.Type.INT32, s -> "INT")
.put(Schema.Type.INT64, s -> "BIGINT")
.put(Schema.Type.FLOAT32, s -> "DOUBLE")
.put(Schema.Type.FLOAT64, s -> "DOUBLE")
.put(Schema.Type.BOOLEAN, s -> "BOOLEAN")
.put(Schema.Type.STRING, s -> "VARCHAR")
.put(Schema.Type.ARRAY, s ->
"ARRAY<" + getSqlTypeName(s.valueSchema()) + ">")
.put(Schema.Type.MAP, s ->
"MAP<" + getSqlTypeName(s.keySchema()) + "," + getSqlTypeName(s.valueSchema()) + ">")
.put(Schema.Type.STRUCT, s -> getStructString(s))
.build();

private static final ImmutableMap<Schema.Type, String> SCHEMA_TYPE_TO_CAST_STRING =
new ImmutableMap.Builder<Schema.Type, String>()
.put(Schema.Type.INT32, "(Integer)")
.put(Schema.Type.INT64, "(Long)")
.put(Schema.Type.FLOAT64, "(Double)")
.put(Schema.Type.STRING, "(String)")
.put(Schema.Type.BOOLEAN, "(Boolean)")
.build();


private SchemaUtil() {
}

public static Class<?> getJavaType(final Schema schema) {
switch (schema.type()) {
case STRING:
return String.class;
case BOOLEAN:
return Boolean.class;
case INT32:
return Integer.class;
case INT64:
return Long.class;
case FLOAT64:
return Double.class;
case ARRAY:
return List.class;
case MAP:
return Map.class;
case STRUCT:
return Struct.class;
default:
throw new KsqlException("Type is not supported: " + schema.type());
final Function<Schema, Type> handler = SCHEMA_TYPE_TO_JAVA_TYPE.get(schema.type());
if (handler == null) {
throw new KsqlException("Type is not supported: " + schema.type());
}

return (Class) handler.apply(schema);
}

public static Schema getSchemaFromType(final Type type) {
Expand Down Expand Up @@ -168,18 +204,6 @@ public static Schema buildSchemaWithAlias(final Schema schema, final String alia
return newSchema.build();
}

private static final ImmutableMap<String, String> TYPE_MAP =
new ImmutableMap.Builder<String, String>()
.put("STRING", "VARCHAR(STRING)")
.put("INT64", "BIGINT")
.put("INT32", "INTEGER")
.put("FLOAT64", "DOUBLE")
.put("BOOLEAN", "BOOLEAN")
.put("ARRAY", "ARRAY")
.put("MAP", "MAP")
.put("STRUCT", "STRUCT")
.build();

public static String getSchemaTypeAsSqlType(final Schema.Type type) {
final String sqlType = TYPE_MAP.get(type.name());
if (sqlType == null) {
Expand All @@ -190,21 +214,13 @@ public static String getSchemaTypeAsSqlType(final Schema.Type type) {
}

public static String getJavaCastString(final Schema schema) {
switch (schema.type()) {
case INT32:
return "(Integer)";
case INT64:
return "(Long)";
case FLOAT64:
return "(Double)";
case STRING:
return "(String)";
case BOOLEAN:
return "(Boolean)";
default:
//TODO: Add complex types later!
return "";
final String castString = SCHEMA_TYPE_TO_CAST_STRING.get(schema.type());
if (castString == null) {
//TODO: Add complex or other types later!
return "";
}

return castString;
}

public static Schema addImplicitRowTimeRowKeyToSchema(final Schema schema) {
Expand Down Expand Up @@ -252,31 +268,12 @@ public static String getSchemaDefinitionString(final Schema schema) {
}

public static String getSqlTypeName(final Schema schema) {
switch (schema.type()) {
case INT32:
return "INT";
case INT64:
return "BIGINT";
case FLOAT32:
case FLOAT64:
return "DOUBLE";
case BOOLEAN:
return "BOOLEAN";
case STRING:
return "VARCHAR";
case ARRAY:
return "ARRAY<" + getSqlTypeName(schema.valueSchema()) + ">";
case MAP:
return "MAP<"
+ getSqlTypeName(schema.keySchema())
+ ","
+ getSqlTypeName(schema.valueSchema())
+ ">";
case STRUCT:
return getStructString(schema);
default:
throw new KsqlException(String.format("Invalid type in schema: %s.", schema.toString()));
final Function<Schema, String> handler = SCHEMA_TYPE_TO_SQL_TYPE.get(schema.type());
if (handler == null) {
throw new KsqlException(String.format("Invalid type in schema: %s.", schema.toString()));
}

return handler.apply(schema);
}

private static String getStructString(final Schema schema) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,31 @@

package io.confluent.ksql.util;

import com.google.common.collect.ImmutableMap;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.util.List;
import java.util.Map;
import java.util.function.Function;

import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;

public class GenericRowValueTypeEnforcer {

private final List<Field> fields;

private static final Map<Schema.Type, Function<Object, Object>> SCHEMA_TYPE_TO_ENFORCE =
ImmutableMap.<Schema.Type, Function<Object, Object>>builder()
.put(Schema.Type.INT32, v -> enforceInteger(v))
.put(Schema.Type.INT64, v -> enforceLong(v))
.put(Schema.Type.FLOAT64, v -> enforceDouble(v))
.put(Schema.Type.STRING, v -> enforceString(v))
.put(Schema.Type.BOOLEAN, v -> enforceBoolean(v))
.put(Schema.Type.ARRAY, v -> v)
.put(Schema.Type.MAP, v -> v)
.put(Schema.Type.STRUCT, v -> v)
.build();

public GenericRowValueTypeEnforcer(final Schema schema) {
this.fields = schema.fields();
}
Expand All @@ -34,28 +50,15 @@ public Object enforceFieldType(final int index, final Object value) {
}

private Object enforceFieldType(final Schema schema, final Object value) {

switch (schema.type()) {
case INT32:
return enforceInteger(value);
case INT64:
return enforceLong(value);
case FLOAT64:
return enforceDouble(value);
case STRING:
return enforceString(value);
case BOOLEAN:
return enforceBoolean(value);
case ARRAY:
case MAP:
case STRUCT:
return value;
default:
throw new KsqlException("Type is not supported: " + schema);
final Function<Object, Object> handler = SCHEMA_TYPE_TO_ENFORCE.get(schema.type());
if (handler == null) {
throw new KsqlException("Type is not supported: " + schema);
}

return handler.apply(value);
}

private Double enforceDouble(final Object value) {
private static Double enforceDouble(final Object value) {
if (value instanceof Double) {
return (Double) value;
} else if (value instanceof Integer) {
Expand All @@ -77,7 +80,7 @@ private Double enforceDouble(final Object value) {
}
}

private Long enforceLong(final Object value) {
private static Long enforceLong(final Object value) {
if (value instanceof Long) {
return (Long) value;
} else if (value instanceof Integer) {
Expand All @@ -97,7 +100,7 @@ private Long enforceLong(final Object value) {
}
}

private Integer enforceInteger(final Object value) {
private static Integer enforceInteger(final Object value) {

if (value instanceof Integer) {
return (Integer) value;
Expand All @@ -118,7 +121,7 @@ private Integer enforceInteger(final Object value) {
}
}

private String enforceString(final Object value) {
private static String enforceString(final Object value) {
if (value instanceof String || value instanceof CharSequence) {
return value.toString();
} else if (value == null) {
Expand All @@ -129,7 +132,7 @@ private String enforceString(final Object value) {
}

@SuppressFBWarnings("NP_BOOLEAN_RETURN_NULL")
private Boolean enforceBoolean(final Object value) {
private static Boolean enforceBoolean(final Object value) {
if (value instanceof Boolean) {
return (Boolean) value;
} else if (value instanceof String) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,15 @@
@Immutable
public final class PrimitiveType extends Type {

private static final ImmutableMap<SqlType, PrimitiveType> TYPES = ImmutableMap.of(
SqlType.BOOLEAN, new PrimitiveType(SqlType.BOOLEAN),
SqlType.INTEGER, new PrimitiveType(SqlType.INTEGER),
SqlType.BIGINT, new PrimitiveType(SqlType.BIGINT),
SqlType.DOUBLE, new PrimitiveType(SqlType.DOUBLE),
SqlType.STRING, new PrimitiveType(SqlType.STRING)
);
private static final ImmutableMap<SqlType, PrimitiveType> TYPES =
ImmutableMap.<SqlType, PrimitiveType>builder()
.put(SqlType.BOOLEAN, new PrimitiveType(SqlType.BOOLEAN))
.put(SqlType.INTEGER, new PrimitiveType(SqlType.INTEGER))
.put(SqlType.BIGINT, new PrimitiveType(SqlType.BIGINT))
.put(SqlType.DOUBLE, new PrimitiveType(SqlType.DOUBLE))
.put(SqlType.STRING, new PrimitiveType(SqlType.STRING))
.build();


public static PrimitiveType of(final String typeName) {
switch (typeName.toUpperCase()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,30 @@

package io.confluent.ksql.rest.util;

import avro.shaded.com.google.common.collect.ImmutableMap;
import io.confluent.ksql.rest.entity.FieldInfo;
import io.confluent.ksql.rest.entity.SchemaInfo;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.kafka.connect.data.Schema;

public final class EntityUtil {
private static final Map<Schema.Type, Function<Schema, SchemaInfo.Type>>
SCHEMA_TYPE_TO_SCHEMA_INFO_TYPE =
ImmutableMap.<Schema.Type, Function<Schema, SchemaInfo.Type>>builder()
.put(Schema.Type.INT32, s -> SchemaInfo.Type.INTEGER)
.put(Schema.Type.INT64, s -> SchemaInfo.Type.BIGINT)
.put(Schema.Type.FLOAT32, s -> SchemaInfo.Type.DOUBLE)
.put(Schema.Type.FLOAT64, s -> SchemaInfo.Type.DOUBLE)
.put(Schema.Type.BOOLEAN, s -> SchemaInfo.Type.BOOLEAN)
.put(Schema.Type.STRING, s -> SchemaInfo.Type.STRING)
.put(Schema.Type.ARRAY, s -> SchemaInfo.Type.ARRAY)
.put(Schema.Type.MAP, s -> SchemaInfo.Type.MAP)
.put(Schema.Type.STRUCT, s -> SchemaInfo.Type.STRUCT)
.build();

private EntityUtil() {
}

Expand All @@ -35,13 +52,13 @@ private static SchemaInfo buildSchemaEntity(final Schema schema) {
case ARRAY:
case MAP:
return new SchemaInfo(
getSchemaTypeString(schema.type()),
getSchemaTypeString(schema),
null,
buildSchemaEntity(schema.valueSchema())
);
case STRUCT:
return new SchemaInfo(
getSchemaTypeString(schema.type()),
getSchemaTypeString(schema),
schema.fields()
.stream()
.map(
Expand All @@ -50,31 +67,18 @@ private static SchemaInfo buildSchemaEntity(final Schema schema) {
null
);
default:
return new SchemaInfo(getSchemaTypeString(schema.type()), null, null);
return new SchemaInfo(getSchemaTypeString(schema), null, null);
}
}

private static SchemaInfo.Type getSchemaTypeString(final Schema.Type type) {
switch (type) {
case INT32:
return SchemaInfo.Type.INTEGER;
case INT64:
return SchemaInfo.Type.BIGINT;
case FLOAT32:
case FLOAT64:
return SchemaInfo.Type.DOUBLE;
case BOOLEAN:
return SchemaInfo.Type.BOOLEAN;
case STRING:
return SchemaInfo.Type.STRING;
case ARRAY:
return SchemaInfo.Type.ARRAY;
case MAP:
return SchemaInfo.Type.MAP;
case STRUCT:
return SchemaInfo.Type.STRUCT;
default:
throw new RuntimeException(String.format("Invalid type in schema: %s.", type.getName()));
private static SchemaInfo.Type getSchemaTypeString(final Schema schema) {
final Function<Schema, SchemaInfo.Type> handler =
SCHEMA_TYPE_TO_SCHEMA_INFO_TYPE.get(schema.type());
if (handler == null) {
throw new RuntimeException(String.format("Invalid type in schema: %s.",
schema.type().getName()));
}

return handler.apply(schema);
}
}

0 comments on commit aee720e

Please sign in to comment.