From a61a2d778f5b2761e7f8e165122563f3a0a653ca Mon Sep 17 00:00:00 2001 From: wudi <> Date: Wed, 22 Nov 2023 11:23:55 +0800 Subject: [PATCH] [pipeline-connector][doris] add doris pipeline connector. --- .../common/data/LocalZonedTimestampData.java | 5 +- .../ververica/cdc/common/types/DataTypes.java | 18 +- .../cdc/common/types/utils/DataTypeUtils.java | 67 ++++++ .../pom.xml | 74 +++++++ .../doris/factory/DorisDataSinkFactory.java | 160 ++++++++++++++ .../connectors/doris/sink/DorisDataSink.java | 78 +++++++ .../doris/sink/DorisDataSinkOptions.java | 166 +++++++++++++++ .../doris/sink/DorisEventSerializer.java | 132 ++++++++++++ .../doris/sink/DorisMetadataApplier.java | 195 ++++++++++++++++++ .../doris/sink/DorisRowConverter.java | 177 ++++++++++++++++ ...com.ververica.cdc.common.factories.Factory | 17 ++ .../doris/sink/DorisRowConverterTest.java | 90 ++++++++ .../flink-cdc-pipeline-connectors/pom.xml | 1 + pom.xml | 1 + 14 files changed, 1179 insertions(+), 2 deletions(-) create mode 100644 flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/pom.xml create mode 100644 flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/com/ververica/cdc/connectors/doris/factory/DorisDataSinkFactory.java create mode 100644 flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/com/ververica/cdc/connectors/doris/sink/DorisDataSink.java create mode 100644 flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/com/ververica/cdc/connectors/doris/sink/DorisDataSinkOptions.java create mode 100644 flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/com/ververica/cdc/connectors/doris/sink/DorisEventSerializer.java create mode 100644 flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/com/ververica/cdc/connectors/doris/sink/DorisMetadataApplier.java create mode 100644 flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/com/ververica/cdc/connectors/doris/sink/DorisRowConverter.java create mode 100644 flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/resources/META-INF/services/com.ververica.cdc.common.factories.Factory create mode 100644 flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/test/java/com/ververica/cdc/connectors/doris/sink/DorisRowConverterTest.java diff --git a/flink-cdc-common/src/main/java/com/ververica/cdc/common/data/LocalZonedTimestampData.java b/flink-cdc-common/src/main/java/com/ververica/cdc/common/data/LocalZonedTimestampData.java index 1a3b8f361d..217ffb0b13 100644 --- a/flink-cdc-common/src/main/java/com/ververica/cdc/common/data/LocalZonedTimestampData.java +++ b/flink-cdc-common/src/main/java/com/ververica/cdc/common/data/LocalZonedTimestampData.java @@ -24,6 +24,7 @@ import java.time.LocalDate; import java.time.LocalDateTime; import java.time.LocalTime; +import java.time.ZoneId; /** * An internal data structure representing data of {@link LocalZonedTimestampType}. @@ -74,7 +75,9 @@ public Instant toInstant() { milliOfSecond += 1000; } long nanoAdjustment = milliOfSecond * 1_000_000 + epochNanoOfMillisecond; - return Instant.ofEpochSecond(epochSecond, nanoAdjustment); + return Instant.ofEpochSecond(epochSecond, nanoAdjustment) + .atZone(ZoneId.of("UTC")) + .toInstant(); } @Override diff --git a/flink-cdc-common/src/main/java/com/ververica/cdc/common/types/DataTypes.java b/flink-cdc-common/src/main/java/com/ververica/cdc/common/types/DataTypes.java index a7818998e7..cfdedd3d3a 100644 --- a/flink-cdc-common/src/main/java/com/ververica/cdc/common/types/DataTypes.java +++ b/flink-cdc-common/src/main/java/com/ververica/cdc/common/types/DataTypes.java @@ -426,8 +426,12 @@ public static OptionalInt getLength(DataType dataType) { return dataType.accept(LENGTH_EXTRACTOR); } - private static final PrecisionExtractor PRECISION_EXTRACTOR = new PrecisionExtractor(); + public static OptionalInt getScale(DataType dataType) { + return dataType.accept(SCALE_EXTRACTOR); + } + private static final PrecisionExtractor PRECISION_EXTRACTOR = new PrecisionExtractor(); + private static final ScaleExtractor SCALE_EXTRACTOR = new ScaleExtractor(); private static final LengthExtractor LENGTH_EXTRACTOR = new LengthExtractor(); private static class PrecisionExtractor extends DataTypeDefaultVisitor { @@ -463,6 +467,18 @@ protected OptionalInt defaultMethod(DataType dataType) { } } + private static class ScaleExtractor extends DataTypeDefaultVisitor { + @Override + public OptionalInt visit(DecimalType decimalType) { + return OptionalInt.of(decimalType.getScale()); + } + + @Override + protected OptionalInt defaultMethod(DataType dataType) { + return OptionalInt.empty(); + } + } + private static class LengthExtractor extends DataTypeDefaultVisitor { @Override diff --git a/flink-cdc-common/src/main/java/com/ververica/cdc/common/types/utils/DataTypeUtils.java b/flink-cdc-common/src/main/java/com/ververica/cdc/common/types/utils/DataTypeUtils.java index 6145ab23bf..8388e64d71 100644 --- a/flink-cdc-common/src/main/java/com/ververica/cdc/common/types/utils/DataTypeUtils.java +++ b/flink-cdc-common/src/main/java/com/ververica/cdc/common/types/utils/DataTypeUtils.java @@ -16,6 +16,8 @@ package com.ververica.cdc.common.types.utils; +import org.apache.flink.util.CollectionUtil; + import com.ververica.cdc.common.data.ArrayData; import com.ververica.cdc.common.data.DecimalData; import com.ververica.cdc.common.data.MapData; @@ -24,6 +26,10 @@ import com.ververica.cdc.common.data.TimestampData; import com.ververica.cdc.common.data.ZonedTimestampData; import com.ververica.cdc.common.types.DataType; +import com.ververica.cdc.common.types.DataTypes; +import com.ververica.cdc.common.utils.Preconditions; + +import java.util.List; /** Utilities for handling {@link DataType}s. */ public class DataTypeUtils { @@ -73,4 +79,65 @@ public static Class toInternalConversionClass(DataType type) { throw new IllegalArgumentException("Illegal type: " + type); } } + + /** + * Convert CDC's {@link DataType} to Flink's internal {@link + * org.apache.flink.table.types.DataType}. + */ + public static org.apache.flink.table.types.DataType toFlinkDataType(DataType type) { + // ordered by type root definition + List children = type.getChildren(); + int length = DataTypes.getLength(type).orElse(0); + int precision = DataTypes.getPrecision(type).orElse(0); + int scale = DataTypes.getScale(type).orElse(0); + switch (type.getTypeRoot()) { + case CHAR: + return org.apache.flink.table.api.DataTypes.CHAR(length); + case VARCHAR: + return org.apache.flink.table.api.DataTypes.VARCHAR(length); + case BOOLEAN: + return org.apache.flink.table.api.DataTypes.BOOLEAN(); + case BINARY: + return org.apache.flink.table.api.DataTypes.BINARY(length); + case VARBINARY: + return org.apache.flink.table.api.DataTypes.VARBINARY(length); + case DECIMAL: + return org.apache.flink.table.api.DataTypes.DECIMAL(precision, scale); + case TINYINT: + return org.apache.flink.table.api.DataTypes.TINYINT(); + case SMALLINT: + return org.apache.flink.table.api.DataTypes.SMALLINT(); + case INTEGER: + return org.apache.flink.table.api.DataTypes.INT(); + case DATE: + return org.apache.flink.table.api.DataTypes.DATE(); + case TIME_WITHOUT_TIME_ZONE: + return org.apache.flink.table.api.DataTypes.TIME(length); + case BIGINT: + return org.apache.flink.table.api.DataTypes.BIGINT(); + case FLOAT: + return org.apache.flink.table.api.DataTypes.FLOAT(); + case DOUBLE: + return org.apache.flink.table.api.DataTypes.DOUBLE(); + case TIMESTAMP_WITHOUT_TIME_ZONE: + return org.apache.flink.table.api.DataTypes.TIMESTAMP_WITH_TIME_ZONE(length); + case TIMESTAMP_WITH_LOCAL_TIME_ZONE: + return org.apache.flink.table.api.DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(length); + case TIMESTAMP_WITH_TIME_ZONE: + return org.apache.flink.table.api.DataTypes.TIMESTAMP_WITH_TIME_ZONE(length); + case ARRAY: + Preconditions.checkState(children != null && children.size() > 0); + return org.apache.flink.table.api.DataTypes.ARRAY(toFlinkDataType(children.get(0))); + case MAP: + Preconditions.checkState(children != null && children.size() > 1); + return org.apache.flink.table.api.DataTypes.MAP( + toFlinkDataType(children.get(0)), toFlinkDataType(children.get(1))); + case ROW: + Preconditions.checkState(!CollectionUtil.isNullOrEmpty(children)); + return org.apache.flink.table.api.DataTypes.ROW( + children.toArray(new org.apache.flink.table.types.DataType[] {})); + default: + throw new IllegalArgumentException("Illegal type: " + type); + } + } } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/pom.xml b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/pom.xml new file mode 100644 index 0000000000..f6240e3e3a --- /dev/null +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/pom.xml @@ -0,0 +1,74 @@ + + + + 4.0.0 + + com.ververica + flink-cdc-pipeline-connectors + ${revision} + + flink-cdc-pipeline-connector-doris + flink-cdc-pipeline-connector-doris + + + + com.ververica + flink-cdc-composer + ${revision} + test + + + + org.apache.flink + flink-runtime + ${flink.version} + provided + + + + org.apache.doris + flink-doris-connector-${flink.major.version} + 1.5.0 + + + + + + + org.apache.maven.plugins + maven-shade-plugin + + + shade-flink + package + + shade + + + false + + + org.apache.doris:* + + + + + + + + + diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/com/ververica/cdc/connectors/doris/factory/DorisDataSinkFactory.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/com/ververica/cdc/connectors/doris/factory/DorisDataSinkFactory.java new file mode 100644 index 0000000000..6b803eb8ad --- /dev/null +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/com/ververica/cdc/connectors/doris/factory/DorisDataSinkFactory.java @@ -0,0 +1,160 @@ +/* + * Copyright 2023 Ververica Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ververica.cdc.connectors.doris.factory; + +import com.ververica.cdc.common.annotation.Internal; +import com.ververica.cdc.common.configuration.ConfigOption; +import com.ververica.cdc.common.configuration.Configuration; +import com.ververica.cdc.common.factories.DataSinkFactory; +import com.ververica.cdc.common.pipeline.PipelineOptions; +import com.ververica.cdc.common.sink.DataSink; +import com.ververica.cdc.connectors.doris.sink.DorisDataSink; +import com.ververica.cdc.connectors.doris.sink.DorisDataSinkOptions; +import org.apache.doris.flink.cfg.DorisExecutionOptions; +import org.apache.doris.flink.cfg.DorisOptions; +import org.apache.doris.flink.cfg.DorisReadOptions; + +import java.time.ZoneId; +import java.util.HashSet; +import java.util.Map; +import java.util.Properties; +import java.util.Set; + +import static com.ververica.cdc.connectors.doris.sink.DorisDataSinkOptions.AUTO_REDIRECT; +import static com.ververica.cdc.connectors.doris.sink.DorisDataSinkOptions.BENODES; +import static com.ververica.cdc.connectors.doris.sink.DorisDataSinkOptions.FENODES; +import static com.ververica.cdc.connectors.doris.sink.DorisDataSinkOptions.JDBC_URL; +import static com.ververica.cdc.connectors.doris.sink.DorisDataSinkOptions.PASSWORD; +import static com.ververica.cdc.connectors.doris.sink.DorisDataSinkOptions.SINK_BUFFER_COUNT; +import static com.ververica.cdc.connectors.doris.sink.DorisDataSinkOptions.SINK_BUFFER_FLUSH_INTERVAL; +import static com.ververica.cdc.connectors.doris.sink.DorisDataSinkOptions.SINK_BUFFER_FLUSH_MAX_BYTES; +import static com.ververica.cdc.connectors.doris.sink.DorisDataSinkOptions.SINK_BUFFER_FLUSH_MAX_ROWS; +import static com.ververica.cdc.connectors.doris.sink.DorisDataSinkOptions.SINK_BUFFER_SIZE; +import static com.ververica.cdc.connectors.doris.sink.DorisDataSinkOptions.SINK_CHECK_INTERVAL; +import static com.ververica.cdc.connectors.doris.sink.DorisDataSinkOptions.SINK_ENABLE_2PC; +import static com.ververica.cdc.connectors.doris.sink.DorisDataSinkOptions.SINK_ENABLE_BATCH_MODE; +import static com.ververica.cdc.connectors.doris.sink.DorisDataSinkOptions.SINK_ENABLE_DELETE; +import static com.ververica.cdc.connectors.doris.sink.DorisDataSinkOptions.SINK_FLUSH_QUEUE_SIZE; +import static com.ververica.cdc.connectors.doris.sink.DorisDataSinkOptions.SINK_IGNORE_UPDATE_BEFORE; +import static com.ververica.cdc.connectors.doris.sink.DorisDataSinkOptions.SINK_LABEL_PREFIX; +import static com.ververica.cdc.connectors.doris.sink.DorisDataSinkOptions.SINK_MAX_RETRIES; +import static com.ververica.cdc.connectors.doris.sink.DorisDataSinkOptions.SINK_USE_CACHE; +import static com.ververica.cdc.connectors.doris.sink.DorisDataSinkOptions.STREAM_LOAD_PROP_PREFIX; +import static com.ververica.cdc.connectors.doris.sink.DorisDataSinkOptions.USERNAME; + +/** A dummy {@link DataSinkFactory} to create {@link DorisDataSink}. */ +@Internal +public class DorisDataSinkFactory implements DataSinkFactory { + @Override + public DataSink createDataSink(Context context) { + Configuration config = context.getFactoryConfiguration(); + DorisOptions.Builder optionsBuilder = DorisOptions.builder(); + DorisExecutionOptions.Builder executionBuilder = DorisExecutionOptions.builder(); + config.getOptional(FENODES).ifPresent(optionsBuilder::setFenodes); + config.getOptional(BENODES).ifPresent(optionsBuilder::setBenodes); + config.getOptional(USERNAME).ifPresent(optionsBuilder::setUsername); + config.getOptional(PASSWORD).ifPresent(optionsBuilder::setPassword); + config.getOptional(JDBC_URL).ifPresent(optionsBuilder::setJdbcUrl); + config.getOptional(AUTO_REDIRECT).ifPresent(optionsBuilder::setAutoRedirect); + + config.getOptional(SINK_CHECK_INTERVAL).ifPresent(executionBuilder::setCheckInterval); + config.getOptional(SINK_MAX_RETRIES).ifPresent(executionBuilder::setMaxRetries); + config.getOptional(SINK_ENABLE_DELETE).ifPresent(executionBuilder::setDeletable); + config.getOptional(SINK_LABEL_PREFIX).ifPresent(executionBuilder::setLabelPrefix); + config.getOptional(SINK_BUFFER_SIZE).ifPresent(executionBuilder::setBufferSize); + config.getOptional(SINK_BUFFER_COUNT).ifPresent(executionBuilder::setBufferCount); + config.getOptional(SINK_BUFFER_FLUSH_MAX_ROWS) + .ifPresent(executionBuilder::setBufferFlushMaxRows); + config.getOptional(SINK_BUFFER_FLUSH_MAX_BYTES) + .ifPresent(executionBuilder::setBufferFlushMaxBytes); + config.getOptional(SINK_FLUSH_QUEUE_SIZE).ifPresent(executionBuilder::setFlushQueueSize); + config.getOptional(SINK_IGNORE_UPDATE_BEFORE) + .ifPresent(executionBuilder::setIgnoreUpdateBefore); + config.getOptional(SINK_USE_CACHE).ifPresent(executionBuilder::setUseCache); + config.getOptional(SINK_BUFFER_FLUSH_INTERVAL) + .ifPresent(v -> executionBuilder.setBufferFlushIntervalMs(v.toMillis())); + config.getOptional(SINK_ENABLE_2PC) + .ifPresent( + b -> { + if (b) { + executionBuilder.enable2PC(); + } else { + executionBuilder.disable2PC(); + } + }); + // default batch mode + executionBuilder.setBatchMode(config.get(SINK_ENABLE_BATCH_MODE)); + + // set streamload properties + Properties properties = DorisExecutionOptions.defaultsProperties(); + Map streamLoadProp = + DorisDataSinkOptions.getPropertiesByPrefix(config, STREAM_LOAD_PROP_PREFIX); + properties.putAll(streamLoadProp); + executionBuilder.setStreamLoadProp(properties); + + return new DorisDataSink( + optionsBuilder.build(), + DorisReadOptions.builder().build(), + executionBuilder.build(), + config, + ZoneId.of( + context.getPipelineConfiguration() + .get(PipelineOptions.PIPELINE_LOCAL_TIME_ZONE))); + } + + @Override + public String identifier() { + return "doris"; + } + + @Override + public Set> requiredOptions() { + Set> options = new HashSet<>(); + options.add(FENODES); + options.add(USERNAME); + return options; + } + + @Override + public Set> optionalOptions() { + Set> options = new HashSet<>(); + options.add(FENODES); + options.add(BENODES); + options.add(USERNAME); + options.add(PASSWORD); + options.add(JDBC_URL); + options.add(AUTO_REDIRECT); + + options.add(SINK_CHECK_INTERVAL); + options.add(SINK_ENABLE_2PC); + options.add(SINK_MAX_RETRIES); + options.add(SINK_ENABLE_DELETE); + options.add(SINK_LABEL_PREFIX); + options.add(SINK_BUFFER_SIZE); + options.add(SINK_BUFFER_COUNT); + + options.add(SINK_ENABLE_BATCH_MODE); + options.add(SINK_BUFFER_FLUSH_MAX_ROWS); + options.add(SINK_BUFFER_FLUSH_MAX_BYTES); + options.add(SINK_FLUSH_QUEUE_SIZE); + options.add(SINK_BUFFER_FLUSH_INTERVAL); + options.add(SINK_IGNORE_UPDATE_BEFORE); + options.add(SINK_USE_CACHE); + + return options; + } +} diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/com/ververica/cdc/connectors/doris/sink/DorisDataSink.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/com/ververica/cdc/connectors/doris/sink/DorisDataSink.java new file mode 100644 index 0000000000..ff77eed3aa --- /dev/null +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/com/ververica/cdc/connectors/doris/sink/DorisDataSink.java @@ -0,0 +1,78 @@ +/* + * Copyright 2023 Ververica Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ververica.cdc.connectors.doris.sink; + +import com.ververica.cdc.common.configuration.Configuration; +import com.ververica.cdc.common.sink.DataSink; +import com.ververica.cdc.common.sink.EventSinkProvider; +import com.ververica.cdc.common.sink.FlinkSinkProvider; +import com.ververica.cdc.common.sink.MetadataApplier; +import org.apache.doris.flink.cfg.DorisExecutionOptions; +import org.apache.doris.flink.cfg.DorisOptions; +import org.apache.doris.flink.cfg.DorisReadOptions; +import org.apache.doris.flink.sink.DorisSink; +import org.apache.doris.flink.sink.batch.DorisBatchSink; + +import java.io.Serializable; +import java.time.ZoneId; + +/** A {@link DataSink} for "Doris" connector. */ +public class DorisDataSink implements DataSink, Serializable { + + private final DorisOptions dorisOptions; + private final DorisReadOptions readOptions; + private final DorisExecutionOptions executionOptions; + private Configuration configuration; + private final ZoneId zoneId; + + public DorisDataSink( + DorisOptions dorisOptions, + DorisReadOptions dorisReadOptions, + DorisExecutionOptions dorisExecutionOptions, + Configuration configuration, + ZoneId zoneId) { + this.dorisOptions = dorisOptions; + this.readOptions = dorisReadOptions; + this.executionOptions = dorisExecutionOptions; + this.configuration = configuration; + this.zoneId = zoneId; + } + + @Override + public EventSinkProvider getEventSinkProvider() { + if (!executionOptions.enableBatchMode()) { + return FlinkSinkProvider.of( + new DorisSink<>( + dorisOptions, + readOptions, + executionOptions, + new DorisEventSerializer(zoneId))); + } else { + return FlinkSinkProvider.of( + new DorisBatchSink<>( + dorisOptions, + readOptions, + executionOptions, + new DorisEventSerializer(zoneId))); + } + } + + @Override + public MetadataApplier getMetadataApplier() { + return new DorisMetadataApplier(dorisOptions, configuration); + } +} diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/com/ververica/cdc/connectors/doris/sink/DorisDataSinkOptions.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/com/ververica/cdc/connectors/doris/sink/DorisDataSinkOptions.java new file mode 100644 index 0000000000..12733b076a --- /dev/null +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/com/ververica/cdc/connectors/doris/sink/DorisDataSinkOptions.java @@ -0,0 +1,166 @@ +/* + * Copyright 2023 Ververica Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ververica.cdc.connectors.doris.sink; + +import com.ververica.cdc.common.configuration.ConfigOption; +import com.ververica.cdc.common.configuration.ConfigOptions; +import com.ververica.cdc.common.configuration.Configuration; +import org.apache.doris.flink.table.DorisConfigOptions; + +import java.time.Duration; +import java.util.HashMap; +import java.util.Map; + +/** DorisDataSink Options reference {@link DorisConfigOptions}. */ +public class DorisDataSinkOptions { + public static final ConfigOption FENODES = + ConfigOptions.key("fenodes") + .stringType() + .noDefaultValue() + .withDescription("doris fe http address."); + public static final ConfigOption BENODES = + ConfigOptions.key("benodes") + .stringType() + .noDefaultValue() + .withDescription("doris be http address."); + public static final ConfigOption USERNAME = + ConfigOptions.key("username") + .stringType() + .noDefaultValue() + .withDescription("the doris user name."); + public static final ConfigOption PASSWORD = + ConfigOptions.key("password") + .stringType() + .noDefaultValue() + .withDescription("the doris password."); + public static final ConfigOption JDBC_URL = + ConfigOptions.key("jdbc-url") + .stringType() + .noDefaultValue() + .withDescription("doris jdbc url address."); + public static final ConfigOption AUTO_REDIRECT = + ConfigOptions.key("auto-redirect") + .booleanType() + .defaultValue(false) + .withDescription( + "Use automatic redirection of fe without explicitly obtaining the be list"); + + // Streaming Sink options + public static final ConfigOption SINK_ENABLE_2PC = + ConfigOptions.key("sink.enable-2pc") + .booleanType() + .defaultValue(false) + .withDescription("enable 2PC while loading"); + + public static final ConfigOption SINK_CHECK_INTERVAL = + ConfigOptions.key("sink.check-interval") + .intType() + .defaultValue(10000) + .withDescription("check exception with the interval while loading"); + public static final ConfigOption SINK_MAX_RETRIES = + ConfigOptions.key("sink.max-retries") + .intType() + .defaultValue(3) + .withDescription("the max retry times if writing records to database failed."); + public static final ConfigOption SINK_BUFFER_SIZE = + ConfigOptions.key("sink.buffer-size") + .intType() + .defaultValue(1024 * 1024) + .withDescription("the buffer size to cache data for stream load."); + public static final ConfigOption SINK_BUFFER_COUNT = + ConfigOptions.key("sink.buffer-count") + .intType() + .defaultValue(3) + .withDescription("the buffer count to cache data for stream load."); + public static final ConfigOption SINK_LABEL_PREFIX = + ConfigOptions.key("sink.label-prefix") + .stringType() + .defaultValue("") + .withDescription("the unique label prefix."); + public static final ConfigOption SINK_ENABLE_DELETE = + ConfigOptions.key("sink.enable-delete") + .booleanType() + .defaultValue(true) + .withDescription("whether to enable the delete function"); + + // batch sink options + public static final ConfigOption SINK_ENABLE_BATCH_MODE = + ConfigOptions.key("sink.enable.batch-mode") + .booleanType() + .defaultValue(true) + .withDescription("Whether to enable batch write mode"); + + public static final ConfigOption SINK_FLUSH_QUEUE_SIZE = + ConfigOptions.key("sink.flush.queue-size") + .intType() + .defaultValue(2) + .withDescription("Queue length for async stream load, default is 2"); + + public static final ConfigOption SINK_BUFFER_FLUSH_MAX_ROWS = + ConfigOptions.key("sink.buffer-flush.max-rows") + .intType() + .defaultValue(50000) + .withDescription( + "The maximum number of flush items in each batch, the default is 5w"); + + public static final ConfigOption SINK_BUFFER_FLUSH_MAX_BYTES = + ConfigOptions.key("sink.buffer-flush.max-bytes") + .intType() + .defaultValue(10 * 1024 * 1024) + .withDescription( + "The maximum number of bytes flushed in each batch, the default is 10MB"); + + public static final ConfigOption SINK_BUFFER_FLUSH_INTERVAL = + ConfigOptions.key("sink.buffer-flush.interval") + .durationType() + .defaultValue(Duration.ofSeconds(10)) + .withDescription( + "the flush interval mills, over this time, asynchronous threads will flush data. The " + + "default value is 10s."); + + public static final ConfigOption SINK_IGNORE_UPDATE_BEFORE = + ConfigOptions.key("sink.ignore.update-before") + .booleanType() + .defaultValue(true) + .withDescription( + "In the CDC scenario, when the primary key of the upstream is inconsistent with that of the downstream, the update-before data needs to be passed to the downstream as deleted data, otherwise the data cannot be deleted.\n" + + "The default is to ignore, that is, perform upsert semantics."); + + public static final ConfigOption SINK_USE_CACHE = + ConfigOptions.key("sink.use-cache") + .booleanType() + .defaultValue(false) + .withDescription("Whether to use buffer cache for breakpoint resume"); + + // Prefix for Doris StreamLoad specific properties. + public static final String STREAM_LOAD_PROP_PREFIX = "sink.properties."; + // Prefix for Doris Create table. + public static final String TABLE_CREATE_PROPERTIES_PREFIX = "table.create.properties."; + + public static Map getPropertiesByPrefix( + Configuration tableOptions, String prefix) { + final Map props = new HashMap<>(); + + for (Map.Entry entry : tableOptions.toMap().entrySet()) { + if (entry.getKey().startsWith(prefix)) { + String subKey = entry.getKey().substring(prefix.length()); + props.put(subKey, entry.getValue()); + } + } + return props; + } +} diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/com/ververica/cdc/connectors/doris/sink/DorisEventSerializer.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/com/ververica/cdc/connectors/doris/sink/DorisEventSerializer.java new file mode 100644 index 0000000000..1c47b082e5 --- /dev/null +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/com/ververica/cdc/connectors/doris/sink/DorisEventSerializer.java @@ -0,0 +1,132 @@ +/* + * Copyright 2023 Ververica Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ververica.cdc.connectors.doris.sink; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; + +import com.ververica.cdc.common.data.RecordData; +import com.ververica.cdc.common.event.CreateTableEvent; +import com.ververica.cdc.common.event.DataChangeEvent; +import com.ververica.cdc.common.event.Event; +import com.ververica.cdc.common.event.OperationType; +import com.ververica.cdc.common.event.SchemaChangeEvent; +import com.ververica.cdc.common.event.TableId; +import com.ververica.cdc.common.schema.Column; +import com.ververica.cdc.common.schema.Schema; +import com.ververica.cdc.common.utils.Preconditions; +import com.ververica.cdc.common.utils.SchemaUtils; +import org.apache.doris.flink.sink.writer.serializer.DorisRecord; +import org.apache.doris.flink.sink.writer.serializer.DorisRecordSerializer; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.text.SimpleDateFormat; +import java.time.ZoneId; +import java.time.format.DateTimeFormatter; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.apache.doris.flink.sink.util.DeleteOperation.addDeleteSign; + +/** A serializer for Event to DorisRecord. */ +public class DorisEventSerializer implements DorisRecordSerializer { + private ObjectMapper objectMapper = new ObjectMapper(); + private Map schemaMaps = new HashMap<>(); + + /** Format DATE type data. */ + public static final SimpleDateFormat DATE_FORMATTER = new SimpleDateFormat("yyyy-MM-dd"); + + /** Format timestamp-related type data. */ + public static final DateTimeFormatter DATE_TIME_FORMATTER = + DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); + + /** ZoneId from pipeline config to support timestamp with local time zone. */ + public final ZoneId pipelineZoneId; + + public DorisEventSerializer(ZoneId zoneId) { + pipelineZoneId = zoneId; + } + + @Override + public DorisRecord serialize(Event event) throws IOException { + if (event instanceof DataChangeEvent) { + return applyDataChangeEvent((DataChangeEvent) event); + } else if (event instanceof SchemaChangeEvent) { + SchemaChangeEvent schemaChangeEvent = (SchemaChangeEvent) event; + TableId tableId = schemaChangeEvent.tableId(); + if (event instanceof CreateTableEvent) { + schemaMaps.put(tableId, ((CreateTableEvent) event).getSchema()); + } else { + if (!schemaMaps.containsKey(tableId)) { + throw new RuntimeException("schema of " + tableId + " is not existed."); + } + schemaMaps.put( + tableId, + SchemaUtils.applySchemaChangeEvent( + schemaMaps.get(tableId), schemaChangeEvent)); + } + } + return null; + } + + private DorisRecord applyDataChangeEvent(DataChangeEvent event) throws JsonProcessingException { + TableId tableId = event.tableId(); + Schema schema = schemaMaps.get(tableId); + Preconditions.checkNotNull(schema, event.tableId() + " is not existed"); + Map valueMap; + OperationType op = event.op(); + switch (op) { + case INSERT: + case UPDATE: + case REPLACE: + valueMap = serializerRecord(event.after(), schema); + addDeleteSign(valueMap, false); + break; + case DELETE: + valueMap = serializerRecord(event.before(), schema); + addDeleteSign(valueMap, true); + break; + default: + throw new UnsupportedOperationException("Unsupport Operation " + op); + } + + return DorisRecord.of( + tableId.getSchemaName(), + tableId.getTableName(), + objectMapper.writeValueAsString(valueMap).getBytes(StandardCharsets.UTF_8)); + } + + /** serializer RecordData to Doris Value. */ + public Map serializerRecord(RecordData recordData, Schema schema) { + List columns = schema.getColumns(); + Map record = new HashMap<>(); + Preconditions.checkState( + columns.size() == recordData.getArity(), + "Column size does not match the data size"); + + for (int i = 0; i < recordData.getArity(); i++) { + DorisRowConverter.SerializationConverter converter = + DorisRowConverter.createNullableExternalConverter( + columns.get(i).getType(), pipelineZoneId); + Object field = converter.serialize(i, recordData); + record.put(columns.get(i).getName(), field); + } + return record; + } +} diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/com/ververica/cdc/connectors/doris/sink/DorisMetadataApplier.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/com/ververica/cdc/connectors/doris/sink/DorisMetadataApplier.java new file mode 100644 index 0000000000..7ad1235f61 --- /dev/null +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/com/ververica/cdc/connectors/doris/sink/DorisMetadataApplier.java @@ -0,0 +1,195 @@ +/* + * Copyright 2023 Ververica Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ververica.cdc.connectors.doris.sink; + +import org.apache.flink.util.CollectionUtil; + +import com.ververica.cdc.common.configuration.Configuration; +import com.ververica.cdc.common.event.AddColumnEvent; +import com.ververica.cdc.common.event.AlterColumnTypeEvent; +import com.ververica.cdc.common.event.CreateTableEvent; +import com.ververica.cdc.common.event.DropColumnEvent; +import com.ververica.cdc.common.event.RenameColumnEvent; +import com.ververica.cdc.common.event.SchemaChangeEvent; +import com.ververica.cdc.common.event.TableId; +import com.ververica.cdc.common.schema.Column; +import com.ververica.cdc.common.schema.Schema; +import com.ververica.cdc.common.sink.MetadataApplier; +import com.ververica.cdc.common.types.DataTypeChecks; +import com.ververica.cdc.common.types.LocalZonedTimestampType; +import com.ververica.cdc.common.types.TimestampType; +import com.ververica.cdc.common.types.ZonedTimestampType; +import com.ververica.cdc.common.types.utils.DataTypeUtils; +import org.apache.doris.flink.catalog.DorisTypeMapper; +import org.apache.doris.flink.catalog.doris.DataModel; +import org.apache.doris.flink.catalog.doris.FieldSchema; +import org.apache.doris.flink.catalog.doris.TableSchema; +import org.apache.doris.flink.cfg.DorisOptions; +import org.apache.doris.flink.exception.IllegalArgumentException; +import org.apache.doris.flink.sink.schema.SchemaChangeManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +import static com.ververica.cdc.connectors.doris.sink.DorisDataSinkOptions.TABLE_CREATE_PROPERTIES_PREFIX; + +/** Supports {@link DorisDataSink} to schema evolution. */ +public class DorisMetadataApplier implements MetadataApplier { + private static final Logger LOG = LoggerFactory.getLogger(DorisMetadataApplier.class); + private DorisOptions dorisOptions; + private SchemaChangeManager schemaChangeManager; + private Configuration config; + + public DorisMetadataApplier(DorisOptions dorisOptions, Configuration config) { + this.dorisOptions = dorisOptions; + this.schemaChangeManager = new SchemaChangeManager(dorisOptions); + this.config = config; + } + + @Override + public void applySchemaChange(SchemaChangeEvent event) { + try { + // send schema change op to doris + if (event instanceof CreateTableEvent) { + applyCreateTableEvent((CreateTableEvent) event); + } else if (event instanceof AddColumnEvent) { + applyAddColumnEvent((AddColumnEvent) event); + } else if (event instanceof DropColumnEvent) { + applyDropColumnEvent((DropColumnEvent) event); + } else if (event instanceof RenameColumnEvent) { + applyRenameColumnEvent((RenameColumnEvent) event); + } else if (event instanceof AlterColumnTypeEvent) { + throw new RuntimeException("Unsupport schema change event, " + event); + } + } catch (Exception ex) { + throw new RuntimeException( + "Failed to schema change, " + event + ", reason: " + ex.getMessage()); + } + } + + private void applyCreateTableEvent(CreateTableEvent event) + throws IOException, IllegalArgumentException { + Schema schema = event.getSchema(); + TableId tableId = event.tableId(); + TableSchema tableSchema = new TableSchema(); + tableSchema.setTable(tableId.getTableName()); + tableSchema.setDatabase(tableId.getSchemaName()); + tableSchema.setFields(buildFields(schema)); + tableSchema.setDistributeKeys(buildDistributeKeys(schema)); + + if (CollectionUtil.isNullOrEmpty(schema.primaryKeys())) { + tableSchema.setModel(DataModel.DUPLICATE); + } else { + tableSchema.setKeys(schema.primaryKeys()); + tableSchema.setModel(DataModel.UNIQUE); + } + + Map tableProperties = + DorisDataSinkOptions.getPropertiesByPrefix(config, TABLE_CREATE_PROPERTIES_PREFIX); + tableSchema.setProperties(tableProperties); + schemaChangeManager.createTable(tableSchema); + } + + private Map buildFields(Schema schema) { + // Guaranteed the order of column + Map fieldSchemaMap = new LinkedHashMap<>(); + List columnNameList = schema.getColumnNames(); + for (String columnName : columnNameList) { + Column column = schema.getColumn(columnName).get(); + String typeString; + if (column.getType() instanceof LocalZonedTimestampType + || column.getType() instanceof TimestampType + || column.getType() instanceof ZonedTimestampType) { + int precision = DataTypeChecks.getPrecision(column.getType()); + typeString = + String.format("%s(%s)", "DATETIMEV2", Math.min(Math.max(precision, 0), 6)); + } else { + typeString = + DorisTypeMapper.toDorisType( + DataTypeUtils.toFlinkDataType(column.getType())); + } + fieldSchemaMap.put( + column.getName(), + new FieldSchema(column.getName(), typeString, column.getComment())); + } + return fieldSchemaMap; + } + + private List buildDistributeKeys(Schema schema) { + if (!CollectionUtil.isNullOrEmpty(schema.primaryKeys())) { + return schema.primaryKeys(); + } + if (!CollectionUtil.isNullOrEmpty(schema.getColumnNames())) { + return Collections.singletonList(schema.getColumnNames().get(0)); + } + return new ArrayList<>(); + } + + private void applyAddColumnEvent(AddColumnEvent event) + throws IOException, IllegalArgumentException { + TableId tableId = event.tableId(); + List addedColumns = event.getAddedColumns(); + for (AddColumnEvent.ColumnWithPosition col : addedColumns) { + Column column = col.getAddColumn(); + String typeString; + if (column.getType() instanceof LocalZonedTimestampType + || column.getType() instanceof TimestampType + || column.getType() instanceof ZonedTimestampType) { + int precision = DataTypeChecks.getPrecision(column.getType()); + typeString = + String.format("%s(%s)", "DATETIMEV2", Math.min(Math.max(precision, 0), 6)); + } else { + typeString = + DorisTypeMapper.toDorisType( + DataTypeUtils.toFlinkDataType(column.getType())); + } + FieldSchema addFieldSchema = + new FieldSchema(column.getName(), typeString, column.getComment()); + schemaChangeManager.addColumn( + tableId.getSchemaName(), tableId.getTableName(), addFieldSchema); + } + } + + private void applyDropColumnEvent(DropColumnEvent event) + throws IOException, IllegalArgumentException { + TableId tableId = event.tableId(); + List droppedColumns = event.getDroppedColumns(); + for (Column col : droppedColumns) { + schemaChangeManager.dropColumn( + tableId.getSchemaName(), tableId.getTableName(), col.getName()); + } + } + + private void applyRenameColumnEvent(RenameColumnEvent event) + throws IOException, IllegalArgumentException { + TableId tableId = event.tableId(); + Map nameMapping = event.getNameMapping(); + for (Map.Entry entry : nameMapping.entrySet()) { + schemaChangeManager.renameColumn( + tableId.getSchemaName(), + tableId.getTableName(), + entry.getKey(), + entry.getValue()); + } + } +} diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/com/ververica/cdc/connectors/doris/sink/DorisRowConverter.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/com/ververica/cdc/connectors/doris/sink/DorisRowConverter.java new file mode 100644 index 0000000000..bfef35ded7 --- /dev/null +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/com/ververica/cdc/connectors/doris/sink/DorisRowConverter.java @@ -0,0 +1,177 @@ +/* + * Copyright 2023 Ververica Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ververica.cdc.connectors.doris.sink; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; + +import com.ververica.cdc.common.data.ArrayData; +import com.ververica.cdc.common.data.GenericArrayData; +import com.ververica.cdc.common.data.GenericMapData; +import com.ververica.cdc.common.data.MapData; +import com.ververica.cdc.common.data.RecordData; +import com.ververica.cdc.common.types.DataField; +import com.ververica.cdc.common.types.DataType; +import com.ververica.cdc.common.types.DataTypeChecks; +import com.ververica.cdc.common.types.DecimalType; +import com.ververica.cdc.common.types.RowType; +import com.ververica.cdc.common.types.ZonedTimestampType; + +import java.io.IOException; +import java.io.Serializable; +import java.sql.Date; +import java.time.LocalDate; +import java.time.ZoneId; +import java.time.ZonedDateTime; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static com.ververica.cdc.connectors.doris.sink.DorisEventSerializer.DATE_FORMATTER; +import static com.ververica.cdc.connectors.doris.sink.DorisEventSerializer.DATE_TIME_FORMATTER; + +/** converter {@link RecordData} type object to doris field. */ +public class DorisRowConverter implements Serializable { + private static final long serialVersionUID = 1L; + private static final ObjectMapper objectMapper = new ObjectMapper(); + + /** Runtime converter to convert {@link RecordData} type object to doris field. */ + @FunctionalInterface + interface SerializationConverter extends Serializable { + Object serialize(int index, RecordData field); + } + + static SerializationConverter createNullableExternalConverter( + DataType type, ZoneId pipelineZoneId) { + return wrapIntoNullableExternalConverter(createExternalConverter(type, pipelineZoneId)); + } + + static SerializationConverter wrapIntoNullableExternalConverter( + SerializationConverter serializationConverter) { + return (index, val) -> { + if (val == null || val.isNullAt(index)) { + return null; + } else { + return serializationConverter.serialize(index, val); + } + }; + } + + static SerializationConverter createExternalConverter(DataType type, ZoneId pipelineZoneId) { + switch (type.getTypeRoot()) { + case CHAR: + case VARCHAR: + return (index, val) -> val.getString(index).toString(); + case BOOLEAN: + return (index, val) -> val.getBoolean(index); + case BINARY: + case VARBINARY: + return (index, val) -> val.getBinary(index); + case DECIMAL: + final int decimalPrecision = ((DecimalType) type).getPrecision(); + final int decimalScale = ((DecimalType) type).getScale(); + return (index, val) -> + val.getDecimal(index, decimalPrecision, decimalScale).toBigDecimal(); + case TINYINT: + return (index, val) -> val.getByte(index); + case SMALLINT: + return (index, val) -> val.getShort(index); + case INTEGER: + return (index, val) -> val.getInt(index); + case BIGINT: + return (index, val) -> val.getLong(index); + case FLOAT: + return (index, val) -> val.getFloat(index); + case DOUBLE: + return (index, val) -> val.getDouble(index); + case DATE: + return (index, val) -> + DATE_FORMATTER.format( + Date.valueOf(LocalDate.ofEpochDay(val.getInt(index)))); + case TIMESTAMP_WITHOUT_TIME_ZONE: + return (index, val) -> + val.getTimestamp(index, DataTypeChecks.getPrecision(type)) + .toLocalDateTime() + .format(DATE_TIME_FORMATTER); + case TIMESTAMP_WITH_LOCAL_TIME_ZONE: + return (index, val) -> + ZonedDateTime.ofInstant( + val.getLocalZonedTimestampData( + index, DataTypeChecks.getPrecision(type)) + .toInstant(), + pipelineZoneId) + .toLocalDateTime() + .format(DATE_TIME_FORMATTER); + case TIMESTAMP_WITH_TIME_ZONE: + final int zonedP = ((ZonedTimestampType) type).getPrecision(); + return (index, val) -> val.getTimestamp(index, zonedP).toTimestamp(); + case ARRAY: + return (index, val) -> convertArrayData(val.getArray(index), type); + case MAP: + return (index, val) -> writeValueAsString(convertMapData(val.getMap(index), type)); + case ROW: + return (index, val) -> + writeValueAsString(convertRowData(val, index, type, pipelineZoneId)); + default: + throw new UnsupportedOperationException("Unsupported type:" + type); + } + } + + private static List convertArrayData(ArrayData array, DataType type) { + if (array instanceof GenericArrayData) { + return Arrays.asList(((GenericArrayData) array).toObjectArray()); + } + throw new UnsupportedOperationException("Unsupported array data: " + array.getClass()); + } + + private static Object convertMapData(MapData map, DataType type) { + Map result = new HashMap<>(); + if (map instanceof GenericMapData) { + GenericMapData gMap = (GenericMapData) map; + for (Object key : ((GenericArrayData) gMap.keyArray()).toObjectArray()) { + result.put(key, gMap.get(key)); + } + return result; + } + throw new UnsupportedOperationException("Unsupported map data: " + map.getClass()); + } + + private static Object convertRowData( + RecordData val, int index, DataType type, ZoneId pipelineZoneId) { + RowType rowType = (RowType) type; + Map value = new HashMap<>(); + RecordData row = val.getRow(index, rowType.getFieldCount()); + + List fields = rowType.getFields(); + for (int i = 0; i < fields.size(); i++) { + DataField rowField = fields.get(i); + SerializationConverter converter = + createNullableExternalConverter(rowField.getType(), pipelineZoneId); + Object valTmp = converter.serialize(i, row); + value.put(rowField.getName(), valTmp.toString()); + } + return value; + } + + private static String writeValueAsString(Object object) { + try { + return objectMapper.writeValueAsString(object); + } catch (IOException e) { + throw new RuntimeException(e); + } + } +} diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/resources/META-INF/services/com.ververica.cdc.common.factories.Factory b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/resources/META-INF/services/com.ververica.cdc.common.factories.Factory new file mode 100644 index 0000000000..0d01c867de --- /dev/null +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/resources/META-INF/services/com.ververica.cdc.common.factories.Factory @@ -0,0 +1,17 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +com.ververica.cdc.connectors.doris.factory.DorisDataSinkFactory + diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/test/java/com/ververica/cdc/connectors/doris/sink/DorisRowConverterTest.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/test/java/com/ververica/cdc/connectors/doris/sink/DorisRowConverterTest.java new file mode 100644 index 0000000000..3a56bfdd84 --- /dev/null +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/test/java/com/ververica/cdc/connectors/doris/sink/DorisRowConverterTest.java @@ -0,0 +1,90 @@ +/* + * Copyright 2023 Ververica Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ververica.cdc.connectors.doris.sink; + +import com.ververica.cdc.common.data.TimestampData; +import com.ververica.cdc.common.data.binary.BinaryRecordData; +import com.ververica.cdc.common.data.binary.BinaryStringData; +import com.ververica.cdc.common.schema.Column; +import com.ververica.cdc.common.types.DataType; +import com.ververica.cdc.common.types.DataTypes; +import com.ververica.cdc.common.types.RowType; +import com.ververica.cdc.runtime.typeutils.BinaryRecordDataGenerator; +import org.junit.Assert; +import org.junit.Test; + +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; + +/** A test for {@link DorisRowConverter} . */ +public class DorisRowConverterTest { + + @Test + public void testExternalConvert() { + List columns = + Arrays.asList( + Column.physicalColumn("f2", DataTypes.BOOLEAN()), + Column.physicalColumn("f3", DataTypes.FLOAT()), + Column.physicalColumn("f4", DataTypes.DOUBLE()), + Column.physicalColumn("f7", DataTypes.TINYINT()), + Column.physicalColumn("f8", DataTypes.SMALLINT()), + Column.physicalColumn("f9", DataTypes.INT()), + Column.physicalColumn("f10", DataTypes.BIGINT()), + Column.physicalColumn("f12", DataTypes.TIMESTAMP()), + Column.physicalColumn("f14", DataTypes.DATE()), + Column.physicalColumn("f15", DataTypes.CHAR(1)), + Column.physicalColumn("f16", DataTypes.VARCHAR(256))); + + List dataTypes = + columns.stream().map(v -> v.getType()).collect(Collectors.toList()); + LocalDateTime time1 = LocalDateTime.of(2021, 1, 1, 8, 0, 0); + LocalDate date1 = LocalDate.of(2021, 1, 1); + + BinaryRecordDataGenerator generator = + new BinaryRecordDataGenerator(RowType.of(dataTypes.toArray(new DataType[] {}))); + BinaryRecordData recordData = + generator.generate( + new Object[] { + true, + 1.2F, + 1.2345D, + (byte) 1, + (short) 32, + 64, + 128L, + TimestampData.fromLocalDateTime(time1), + (int) date1.toEpochDay(), + BinaryStringData.fromString("a"), + BinaryStringData.fromString("doris") + }); + List row = new ArrayList(); + for (int i = 0; i < recordData.getArity(); i++) { + DorisRowConverter.SerializationConverter converter = + DorisRowConverter.createNullableExternalConverter( + columns.get(i).getType(), ZoneId.systemDefault()); + row.add(converter.serialize(i, recordData)); + } + Assert.assertEquals( + "[true, 1.2, 1.2345, 1, 32, 64, 128, 2021-01-01 08:00:00, 2021-01-01, a, doris]", + row.toString()); + } +} diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/pom.xml b/flink-cdc-connect/flink-cdc-pipeline-connectors/pom.xml index b68c609767..59305fe725 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/pom.xml +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/pom.xml @@ -28,6 +28,7 @@ under the License. flink-cdc-pipeline-connector-values flink-cdc-pipeline-connector-mysql + flink-cdc-pipeline-connector-doris diff --git a/pom.xml b/pom.xml index e270e21e72..191b90a135 100644 --- a/pom.xml +++ b/pom.xml @@ -72,6 +72,7 @@ under the License. 1.18.0 + 1.18 1.9.7.Final 3.2.0 2.2.0