diff --git a/flink-connector-clickhouse/src/main/java/org/apache/flink/connector/clickhouse/internal/converter/ClickHouseConverterUtils.java b/flink-connector-clickhouse/src/main/java/org/apache/flink/connector/clickhouse/internal/converter/ClickHouseConverterUtils.java index 2dbcc55..0c79c00 100644 --- a/flink-connector-clickhouse/src/main/java/org/apache/flink/connector/clickhouse/internal/converter/ClickHouseConverterUtils.java +++ b/flink-connector-clickhouse/src/main/java/org/apache/flink/connector/clickhouse/internal/converter/ClickHouseConverterUtils.java @@ -29,8 +29,14 @@ import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.logical.MapType; +import com.clickhouse.data.value.UnsignedByte; +import com.clickhouse.data.value.UnsignedInteger; +import com.clickhouse.data.value.UnsignedLong; +import com.clickhouse.data.value.UnsignedShort; + import java.math.BigDecimal; import java.math.BigInteger; +import java.net.InetAddress; import java.sql.Array; import java.sql.Date; import java.sql.SQLException; @@ -39,8 +45,10 @@ import java.time.LocalDate; import java.time.LocalDateTime; import java.time.LocalTime; +import java.time.OffsetDateTime; import java.util.HashMap; import java.util.Map; +import java.util.UUID; import static org.apache.flink.connector.clickhouse.util.ClickHouseUtil.getFlinkTimeZone; import static org.apache.flink.connector.clickhouse.util.ClickHouseUtil.toEpochDayOneTimestamp; @@ -129,35 +137,51 @@ public static Object toInternal(Object value, LogicalType type) throws SQLExcept case DOUBLE: case INTERVAL_YEAR_MONTH: case INTERVAL_DAY_TIME: - case INTEGER: - case BIGINT: + case TINYINT: case BINARY: case VARBINARY: return value; - case TINYINT: - return ((Integer) value).byteValue(); case SMALLINT: - return value instanceof Integer ? ((Integer) value).shortValue() : value; + return value instanceof UnsignedByte ? ((UnsignedByte) value).shortValue() : value; + case INTEGER: + return value instanceof UnsignedShort ? ((UnsignedShort) value).intValue() : value; + case BIGINT: + return value instanceof UnsignedInteger + ? ((UnsignedInteger) value).longValue() + : value; case DECIMAL: final int precision = ((DecimalType) type).getPrecision(); final int scale = ((DecimalType) type).getScale(); - return value instanceof BigInteger - ? DecimalData.fromBigDecimal( - new BigDecimal((BigInteger) value, 0), precision, scale) - : DecimalData.fromBigDecimal((BigDecimal) value, precision, scale); + BigDecimal decimalValue = + value instanceof BigDecimal + ? (BigDecimal) value + : new BigDecimal( + value instanceof UnsignedLong + ? ((UnsignedLong) value).bigIntegerValue() + : (BigInteger) value); + return DecimalData.fromBigDecimal(decimalValue, precision, scale); case DATE: - return (int) (((Date) value).toLocalDate().toEpochDay()); + return (int) (((LocalDate) value).toEpochDay()); case TIME_WITHOUT_TIME_ZONE: return (int) (((Time) value).toLocalTime().toNanoOfDay() / 1_000_000L); case TIMESTAMP_WITH_TIME_ZONE: case TIMESTAMP_WITHOUT_TIME_ZONE: - return TimestampData.fromLocalDateTime((LocalDateTime) value); + return TimestampData.fromLocalDateTime( + value instanceof OffsetDateTime + ? ((OffsetDateTime) value).toLocalDateTime() + : (LocalDateTime) value); case TIMESTAMP_WITH_LOCAL_TIME_ZONE: return TimestampData.fromInstant( ((LocalDateTime) value).atZone(getFlinkTimeZone().toZoneId()).toInstant()); case CHAR: case VARCHAR: - return StringData.fromString((String) value); + if (value instanceof UUID) { + return StringData.fromString(value.toString()); + } else if (value instanceof InetAddress) { + return StringData.fromString(((InetAddress) value).getHostAddress()); + } else { + return StringData.fromString((String) value); + } case ARRAY: LogicalType elementType = type.getChildren().stream() diff --git a/flink-connector-clickhouse/src/main/java/org/apache/flink/connector/clickhouse/internal/converter/ClickHouseRowConverter.java b/flink-connector-clickhouse/src/main/java/org/apache/flink/connector/clickhouse/internal/converter/ClickHouseRowConverter.java index a0efbf6..32a98b0 100644 --- a/flink-connector-clickhouse/src/main/java/org/apache/flink/connector/clickhouse/internal/converter/ClickHouseRowConverter.java +++ b/flink-connector-clickhouse/src/main/java/org/apache/flink/connector/clickhouse/internal/converter/ClickHouseRowConverter.java @@ -31,12 +31,17 @@ import org.apache.flink.table.types.logical.TimestampType; import org.apache.flink.util.Preconditions; +import com.clickhouse.data.value.UnsignedByte; +import com.clickhouse.data.value.UnsignedInteger; +import com.clickhouse.data.value.UnsignedLong; +import com.clickhouse.data.value.UnsignedShort; import com.clickhouse.jdbc.ClickHousePreparedStatement; import com.clickhouse.jdbc.ClickHouseResultSet; import java.io.Serializable; import java.math.BigDecimal; import java.math.BigInteger; +import java.net.InetAddress; import java.sql.Date; import java.sql.ResultSet; import java.sql.SQLException; @@ -45,6 +50,7 @@ import java.time.LocalDate; import java.time.LocalDateTime; import java.time.LocalTime; +import java.time.OffsetDateTime; import java.util.UUID; import static org.apache.flink.connector.clickhouse.internal.converter.ClickHouseConverterUtils.BOOL_TRUE; @@ -109,30 +115,41 @@ private DeserializationConverter createToInternalConverter(LogicalType type) { case DOUBLE: case INTERVAL_YEAR_MONTH: case INTERVAL_DAY_TIME: - case INTEGER: - case BIGINT: + case TINYINT: case BINARY: case VARBINARY: return val -> val; - case TINYINT: - return val -> ((Integer) val).byteValue(); case SMALLINT: - return val -> val instanceof Integer ? ((Integer) val).shortValue() : val; + return val -> val instanceof UnsignedByte ? ((UnsignedByte) val).shortValue() : val; + case INTEGER: + return val -> val instanceof UnsignedShort ? ((UnsignedShort) val).intValue() : val; + case BIGINT: + return val -> + val instanceof UnsignedInteger ? ((UnsignedInteger) val).longValue() : val; case DECIMAL: final int precision = ((DecimalType) type).getPrecision(); final int scale = ((DecimalType) type).getScale(); - return val -> - val instanceof BigInteger - ? DecimalData.fromBigDecimal( - new BigDecimal((BigInteger) val, 0), precision, scale) - : DecimalData.fromBigDecimal((BigDecimal) val, precision, scale); + return val -> { + BigDecimal decimalValue = + val instanceof BigDecimal + ? (BigDecimal) val + : new BigDecimal( + val instanceof UnsignedLong + ? ((UnsignedLong) val).bigIntegerValue() + : (BigInteger) val); + return DecimalData.fromBigDecimal(decimalValue, precision, scale); + }; case DATE: - return val -> (int) ((Date) val).toLocalDate().toEpochDay(); + return val -> (int) ((LocalDate) val).toEpochDay(); case TIME_WITHOUT_TIME_ZONE: return val -> (int) (((Time) val).toLocalTime().toNanoOfDay() / 1_000_000L); case TIMESTAMP_WITH_TIME_ZONE: case TIMESTAMP_WITHOUT_TIME_ZONE: - return val -> TimestampData.fromLocalDateTime((LocalDateTime) val); + return val -> + TimestampData.fromLocalDateTime( + val instanceof OffsetDateTime + ? ((OffsetDateTime) val).toLocalDateTime() + : (LocalDateTime) val); case TIMESTAMP_WITH_LOCAL_TIME_ZONE: return val -> TimestampData.fromInstant( @@ -141,10 +158,15 @@ private DeserializationConverter createToInternalConverter(LogicalType type) { .toInstant()); case CHAR: case VARCHAR: - return val -> - val instanceof UUID - ? StringData.fromString(val.toString()) - : StringData.fromString((String) val); + return val -> { + if (val instanceof UUID) { + return StringData.fromString(val.toString()); + } else if (val instanceof InetAddress) { + return StringData.fromString(((InetAddress) val).getHostAddress()); + } else { + return StringData.fromString((String) val); + } + }; case ARRAY: case MAP: return val -> ClickHouseConverterUtils.toInternal(val, type); @@ -242,6 +264,7 @@ private SerializationConverter createToExternalConverter(LogicalType type) { @FunctionalInterface interface SerializationConverter extends Serializable { + /** * Convert an internal field to java object and fill into the {@link * ClickHousePreparedStatement}. @@ -252,6 +275,7 @@ void serialize(RowData rowData, int index, ClickHouseStatementWrapper statement) @FunctionalInterface interface DeserializationConverter extends Serializable { + /** * Convert an object of {@link ClickHouseResultSet} to the internal data structure object. */ diff --git a/flink-connector-clickhouse/src/main/java/org/apache/flink/connector/clickhouse/util/DataTypeUtil.java b/flink-connector-clickhouse/src/main/java/org/apache/flink/connector/clickhouse/util/DataTypeUtil.java index ea0de3d..57dc58d 100644 --- a/flink-connector-clickhouse/src/main/java/org/apache/flink/connector/clickhouse/util/DataTypeUtil.java +++ b/flink-connector-clickhouse/src/main/java/org/apache/flink/connector/clickhouse/util/DataTypeUtil.java @@ -41,9 +41,9 @@ public static DataType toFlinkType(ClickHouseColumn clickHouseColumnInfo) { switch (clickHouseColumnInfo.getDataType()) { case Int8: return DataTypes.TINYINT(); - case Int16: case Bool: return DataTypes.BOOLEAN(); + case Int16: case UInt8: return DataTypes.SMALLINT(); case Int32: @@ -92,6 +92,7 @@ public static DataType toFlinkType(ClickHouseColumn clickHouseColumnInfo) { case UUID: return DataTypes.VARCHAR(clickHouseColumnInfo.getPrecision()); case Date: + case Date32: return DataTypes.DATE(); case DateTime: case DateTime32: