Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-32068] connector jdbc support clickhouse #49

Open
wants to merge 24 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
d9039c4
[FLINK-32068]connector jdbc support clickhouse
WenDing-Y Jun 5, 2023
21d15e0
[FLINK-32068] connector jdbc support clickhouse
WenDing-Y May 18, 2023
223a9cb
[FLINK-32068] connector jdbc support clickhouse
WenDing-Y May 22, 2023
b0e7b37
[FLINK-32068] connector jdbc support clickhouse
WenDing-Y Jun 6, 2023
d1f96df
[FLINK-32068] connector jdbc support clickhouse ,support map types
WenDing-Y Jul 12, 2023
036a67f
[FLINK-32068]connector jdbc support clickhouse
WenDing-Y May 24, 2023
c8decbe
[FLINK-32068] jdbc support clickhouse
May 13, 2023
6d4a0d5
[FLINK-32068]connector jdbc support clickhouse
WenDing-Y Jun 5, 2023
8b1df04
[FLINK-32068]connector jdbc support clickhouse
WenDing-Y Jun 5, 2023
9c75447
[FLINK-32068] connector jdbc support clickhouse
WenDing-Y May 18, 2023
4975492
[FLINK-32068] connector jdbc support clickhouse
WenDing-Y May 22, 2023
9ee3d1f
[FLINK-32068]connector jdbc support clickhouse
WenDing-Y May 24, 2023
924c107
[FLINK-32068]connector jdbc support clickhouse
WenDing-Y May 25, 2023
a56e098
[FLINK-32068] jdbc support clickhouse
May 13, 2023
7caac6f
[FLINK-32068]connector jdbc support clickhouse
WenDing-Y Jun 5, 2023
8f13540
[FLINK-32068]connector jdbc support clickhouse
WenDing-Y Jun 5, 2023
cd9ba3f
[FLINK-32068] connector jdbc support clickhouse
WenDing-Y May 18, 2023
1a43dfe
[FLINK-32068]connector jdbc support clickhouse
WenDing-Y May 24, 2023
afce802
[FLINK-32068]connector jdbc support clickhouse
WenDing-Y May 25, 2023
ef91923
[FLINK-32068] jdbc support clickhouse
May 13, 2023
4d0aa96
[FLINK-32068]connector jdbc support clickhouse
WenDing-Y Jun 5, 2023
b597ed3
[FLINK-32068] connector jdbc support clickhouse
WenDing-Y Jun 6, 2023
6ce9bf9
[FLINK-32068] connector jdbc support clickhouse ,support map types
WenDing-Y Jun 7, 2023
4facf4c
[FLINK-32068] connector jdbc support clickhouse ,fix test fail
WenDing-Y Jul 12, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions flink-connector-jdbc/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,21 @@ under the License.
<scope>test</scope>
</dependency>

<!-- clickhouse tests -->
<dependency>
<groupId>com.clickhouse</groupId>
<artifactId>clickhouse-jdbc</artifactId>
<version>0.4.6</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>clickhouse</artifactId>
<scope>test</scope>
</dependency>


<!-- ArchUit test dependencies -->

<dependency>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
package org.apache.flink.connector.jdbc.databases.clickhouse.dialect;

import org.apache.flink.connector.jdbc.converter.AbstractJdbcRowConverter;
import org.apache.flink.connector.jdbc.dialect.AbstractDialect;
import org.apache.flink.table.types.logical.LogicalTypeRoot;
import org.apache.flink.table.types.logical.RowType;

import java.util.EnumSet;
import java.util.Optional;
import java.util.Set;

/** clickhouse dialect. */
public class ClickHouseDialect extends AbstractDialect {

private static final long serialVersionUID = 1L;

// Define MAX/MIN precision of TIMESTAMP type according to clickhouse docs:
// https://clickhouse.com/docs/en/sql-reference/data-types/datetime64
private static final int MAX_TIMESTAMP_PRECISION = 9;
private static final int MIN_TIMESTAMP_PRECISION = 0;

// Define MAX/MIN precision of DECIMAL type according to clickhouse docs:
// https://clickhouse.com/docs/en/sql-reference/data-types/decimal
private static final int MAX_DECIMAL_PRECISION = 76;
private static final int MIN_DECIMAL_PRECISION = 1;

@Override
public AbstractJdbcRowConverter getRowConverter(RowType rowType) {
return new ClickHouseRowConvert(rowType);
}

@Override
public String getLimitClause(long limit) {
return "LIMIT " + limit;
}

@Override
public Optional<String> defaultDriverName() {
return Optional.of("com.clickhouse.jdbc.ClickHouseDriver");
}

@Override
public String quoteIdentifier(String identifier) {
return "`" + identifier + "`";
}

@Override
public Optional<String> getUpsertStatement(
String tableName, String[] fieldNames, String[] uniqueKeyFields) {
return Optional.empty();
}

@Override
public String dialectName() {
return "ClickHouse";
}

@Override
public Optional<Range> timestampPrecisionRange() {
return Optional.of(Range.of(MIN_TIMESTAMP_PRECISION, MAX_TIMESTAMP_PRECISION));
}

@Override
public Optional<Range> decimalPrecisionRange() {
return Optional.of(Range.of(MIN_DECIMAL_PRECISION, MAX_DECIMAL_PRECISION));
}

@Override
public Set<LogicalTypeRoot> supportedTypes() {
// LegacyTypeInfoDataTypeConverter.
return EnumSet.of(
LogicalTypeRoot.CHAR,
LogicalTypeRoot.VARCHAR,
LogicalTypeRoot.BOOLEAN,
LogicalTypeRoot.DECIMAL,
LogicalTypeRoot.TINYINT,
LogicalTypeRoot.SMALLINT,
LogicalTypeRoot.INTEGER,
LogicalTypeRoot.BIGINT,
LogicalTypeRoot.FLOAT,
LogicalTypeRoot.DOUBLE,
LogicalTypeRoot.DATE,
LogicalTypeRoot.MAP,
LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE,
LogicalTypeRoot.TIMESTAMP_WITH_TIME_ZONE);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package org.apache.flink.connector.jdbc.databases.clickhouse.dialect;

import org.apache.flink.annotation.Internal;
import org.apache.flink.connector.jdbc.dialect.JdbcDialect;
import org.apache.flink.connector.jdbc.dialect.JdbcDialectFactory;

/** clickhouse dialect factory. */
@Internal
public class ClickHouseDialectFactory implements JdbcDialectFactory {
@Override
public boolean acceptsURL(String url) {
return url.startsWith("jdbc:clickhouse:");
}

@Override
public JdbcDialect create() {
return new ClickHouseDialect();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
package org.apache.flink.connector.jdbc.databases.clickhouse.dialect;

import org.apache.flink.connector.jdbc.converter.AbstractJdbcRowConverter;
import org.apache.flink.table.data.DecimalData;
import org.apache.flink.table.data.GenericMapData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.types.logical.DecimalType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;

import com.clickhouse.data.value.UnsignedByte;
import com.clickhouse.data.value.UnsignedInteger;
import com.clickhouse.data.value.UnsignedShort;

import java.math.BigDecimal;
import java.math.BigInteger;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.util.Map;

/**
* Runtime converter that responsible to convert between JDBC object and Flink internal object for
* the clickhouse data types range link
* https://clickhouse.com/docs/en/sql-reference/data-types/int-uint .
*/
public class ClickHouseRowConvert extends AbstractJdbcRowConverter {
@Override
public String converterName() {
return "ClickHouse";
}

public ClickHouseRowConvert(RowType rowType) {
super(rowType);
}

@Override
protected JdbcDeserializationConverter createInternalConverter(LogicalType type) {
switch (type.getTypeRoot()) {
case NULL:
return null;
case BOOLEAN:
case FLOAT:
case DOUBLE:
return val -> val;
case TINYINT:
return val -> ((Byte) val).byteValue();
case SMALLINT:
return val ->
val instanceof UnsignedByte
? ((UnsignedByte) val).shortValue()
: ((Short) val).shortValue();
case INTEGER:
return val ->
val instanceof UnsignedShort
? ((UnsignedShort) val).intValue()
: ((Integer) val).intValue();
case BIGINT:
return jdbcField -> {
if (jdbcField instanceof UnsignedInteger) {
return ((UnsignedInteger) jdbcField).longValue();
} else if (jdbcField instanceof Long) {
return ((Long) jdbcField).longValue();
}
// UINT64 is not supported,the uint64 range exceeds the long range
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we use BigInteger for UINT64?

Copy link
Author

@WenDing-Y WenDing-Y Jun 5, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the clickhouse data types range
UInt8 — [0 : 255]
UInt16 — [0 : 65535]
UInt32 — [0 : 4294967295]
UInt64 — [0 : 18446744073709551615]
UInt128 — [0 : 340282366920938463463374607431768211455]
UInt256 — [0 : 115792089237316195423570985008687907853269984665640564039457584007913129639935]

Copy link
Contributor

@snuyanzin snuyanzin Jun 7, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i still don't see the issue why for UInt64, UInt128, UInt256 we can not return BigInteger ?

throw new UnsupportedOperationException("Unsupported type:" + type);
};
case DECIMAL:
final int precision = ((DecimalType) type).getPrecision();
final int scale = ((DecimalType) type).getScale();
return val ->
val instanceof BigInteger
? DecimalData.fromBigDecimal(
new BigDecimal((BigInteger) val, 0), precision, scale)
: DecimalData.fromBigDecimal((BigDecimal) val, precision, scale);
case CHAR:
case VARCHAR:
return val -> StringData.fromString((String) val);
case DATE:
return val -> Long.valueOf(((LocalDate) val).toEpochDay()).intValue();
case TIMESTAMP_WITHOUT_TIME_ZONE:
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
return val -> TimestampData.fromLocalDateTime((LocalDateTime) val);
case MAP:
return val -> new GenericMapData((Map<?, ?>) val);
default:
return super.createInternalConverter(type);
}
}

@Override
protected JdbcSerializationConverter createExternalConverter(LogicalType type) {
switch (type.getTypeRoot()) {
case MAP:
return (val, index, statement) -> statement.setObject(index, val);
default:
return super.createExternalConverter(type);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import static org.apache.flink.table.types.logical.LogicalTypeRoot.DOUBLE;
import static org.apache.flink.table.types.logical.LogicalTypeRoot.FLOAT;
import static org.apache.flink.table.types.logical.LogicalTypeRoot.INTEGER;
import static org.apache.flink.table.types.logical.LogicalTypeRoot.MAP;
import static org.apache.flink.table.types.logical.LogicalTypeRoot.SMALLINT;
import static org.apache.flink.table.types.logical.LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE;
import static org.apache.flink.table.types.logical.LogicalTypeRoot.TIMESTAMP_WITH_TIME_ZONE;
Expand Down Expand Up @@ -107,6 +108,7 @@ public class JdbcTypeUtil {
put(TIME_WITHOUT_TIME_ZONE, Types.TIME);
put(DECIMAL, Types.DECIMAL);
put(ARRAY, Types.ARRAY);
put(MAP, Types.JAVA_OBJECT);
}
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,4 @@ org.apache.flink.connector.jdbc.databases.postgres.dialect.PostgresDialectFactor
org.apache.flink.connector.jdbc.databases.oracle.dialect.OracleDialectFactory
org.apache.flink.connector.jdbc.databases.sqlserver.dialect.SqlServerDialectFactory
org.apache.flink.connector.jdbc.databases.cratedb.dialect.CrateDBDialectFactory
org.apache.flink.connector.jdbc.databases.clickhouse.dialect.ClickHouseDialectFactory
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package org.apache.flink.connector.jdbc.databases.clickhouse;

import org.apache.flink.connector.jdbc.testutils.DatabaseMetadata;
import org.apache.flink.connector.jdbc.testutils.DatabaseTest;
import org.apache.flink.connector.jdbc.testutils.databases.clickhouse.ClickHouseDatabase;

import org.junit.jupiter.api.extension.ExtendWith;

/** clickhouse database for testing. */
@ExtendWith(ClickHouseDatabase.class)
public interface ClickHouseTestBase extends DatabaseTest {

@Override
default DatabaseMetadata getMetadata() {
return ClickHouseDatabase.getMetadata();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package org.apache.flink.connector.jdbc.databases.clickhouse.dialect;

import org.apache.flink.connector.jdbc.dialect.JdbcDialectTypeTest;

import java.util.Arrays;
import java.util.List;

/** The Clickhouse params for {@link JdbcDialectTypeTest}. */
public class ClickHouseDialectTypeTest extends JdbcDialectTypeTest {

@Override
protected String testDialect() {
return "clickhouse";
}

@Override
protected List<TestItem> testData() {
return Arrays.asList(
createTestItem("CHAR"),
WenDing-Y marked this conversation as resolved.
Show resolved Hide resolved
createTestItem("VARCHAR"),
createTestItem("BOOLEAN"),
createTestItem("TINYINT"),
createTestItem("SMALLINT"),
createTestItem("INTEGER"),
createTestItem("BIGINT"),
createTestItem("FLOAT"),
createTestItem("DOUBLE"),
createTestItem("DECIMAL(10, 4)"),
createTestItem("DECIMAL(38, 18)"),
createTestItem("DATE"),
createTestItem("TIMESTAMP(3)"),
createTestItem("TIMESTAMP WITHOUT TIME ZONE"),
createTestItem("VARBINARY", "The ClickHouse dialect doesn't support type: BYTES"),

// Not valid data
createTestItem("BINARY", "The ClickHouse dialect doesn't support type: BINARY(1)."),
createTestItem(
"VARBINARY(10)",
"The ClickHouse dialect doesn't support type: VARBINARY(10)."));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
package org.apache.flink.connector.jdbc.databases.clickhouse.table;

import org.apache.flink.connector.jdbc.databases.clickhouse.ClickHouseTestBase;
import org.apache.flink.connector.jdbc.databases.clickhouse.dialect.ClickHouseDialect;
import org.apache.flink.connector.jdbc.table.JdbcDynamicTableSourceITCase;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.types.Row;

import java.math.BigDecimal;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.TimeZone;

import static org.apache.flink.connector.jdbc.testutils.tables.TableBuilder.ckTableRow;
import static org.apache.flink.connector.jdbc.testutils.tables.TableBuilder.dbType;
import static org.apache.flink.connector.jdbc.testutils.tables.TableBuilder.field;
import static org.apache.flink.connector.jdbc.testutils.tables.TableBuilder.pkField;

/** The Table Source ITCase for {@link ClickHouseDialect}. */
class ClickHouseTableSourceITCase extends JdbcDynamicTableSourceITCase
implements ClickHouseTestBase {

@Override
protected ClickhouseTableRow createInputTable() {
return ckTableRow(
"jdbDynamicTableSource",
pkField("id", dbType("Int64"), DataTypes.BIGINT().notNull()),
field("user_id_int8", dbType("Int8"), DataTypes.TINYINT().notNull()),
field("user_id_int16", dbType("Int16"), DataTypes.SMALLINT().notNull()),
field("user_id_int32", dbType("Int32"), DataTypes.INT().notNull()),
field("user_id_int64", dbType("Int64"), DataTypes.BIGINT().notNull()),
field("price_float", dbType("Float32"), DataTypes.FLOAT()),
field("price_double", dbType("Float64"), DataTypes.DOUBLE()),
field("decimal_col", dbType("Decimal64(4)"), DataTypes.DECIMAL(10, 4)),
field("user_date", dbType("Date"), DataTypes.DATE()),
field("timestamp6_col", dbType("DateTime(6)"), DataTypes.TIMESTAMP(6)),
field("decimal_column", dbType("Decimal(3,1)"), DataTypes.DECIMAL(3, 1)),
field("bool_flag", dbType("Bool"), DataTypes.BOOLEAN()),
field("message", dbType("String"), DataTypes.VARCHAR(100)),
field(
"test_map",
dbType("Map(Int64,Int64)"),
DataTypes.MAP(DataTypes.BIGINT(), DataTypes.BIGINT())));
}

@Override
protected List<Row> getTestData() {
TimeZone timeZone = TimeZone.getTimeZone("GTM+0");
TimeZone.setDefault(timeZone);
HashMap<Long, Long> map = new HashMap<>();
map.put(1L, 2L);
return Arrays.asList(
Row.of(
1L,
(byte) 1,
(short) -32768,
-2147483648,
-9223372036854775808L,
-3.4e+38f,
-1.7e+308d,
BigDecimal.valueOf(100.1234),
LocalDate.parse("2023-01-01"),
LocalDateTime.parse("2020-01-01T15:35:00.123456"),
BigDecimal.valueOf(-99.9),
true,
"this is a test message",
map),
Row.of(
2L,
(byte) 2,
(short) 32767,
2147483647,
9223372036854775807L,
3.4e+38f,
1.7e+308d,
BigDecimal.valueOf(101.1234),
LocalDate.parse("2023-01-02"),
LocalDateTime.parse("2020-01-01T15:36:01.123456"),
BigDecimal.valueOf(99.9),
false,
"this is a test message",
map));
}
}
Loading