diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/pom.xml b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/pom.xml
new file mode 100644
index 0000000000..06c9fd8de0
--- /dev/null
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/pom.xml
@@ -0,0 +1,88 @@
+
+
+
+
+ flink-cdc-pipeline-connectors
+ com.ververica
+ ${revision}
+
+ 4.0.0
+
+ flink-cdc-pipeline-connector-starrocks
+
+
+ 1.2.9_flink-${flink.major.version}
+
+
+
+
+ com.starrocks
+ flink-connector-starrocks
+ ${starrocks.connector.version}
+
+
+
+
+ org.apache.commons
+ commons-compress
+ 1.21
+
+
+
+ com.ververica
+ flink-cdc-composer
+ ${revision}
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-shade-plugin
+ 3.1.1
+
+
+ shade-flink
+ package
+
+ shade
+
+
+ false
+
+
+ com.starrocks:*
+ org.apache.commons:commons-compress
+
+
+
+
+ org.apache.commons.compress
+ com.starrocks.shade.org.apache.commons.compress
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/com/ververica/cdc/connectors/starrocks/sink/EventRecordSerializationSchema.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/com/ververica/cdc/connectors/starrocks/sink/EventRecordSerializationSchema.java
new file mode 100644
index 0000000000..4a1b4a9798
--- /dev/null
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/com/ververica/cdc/connectors/starrocks/sink/EventRecordSerializationSchema.java
@@ -0,0 +1,148 @@
+/*
+ * Copyright 2023 Ververica Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.ververica.cdc.connectors.starrocks.sink;
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+
+import com.starrocks.connector.flink.table.data.DefaultStarRocksRowData;
+import com.starrocks.connector.flink.table.data.StarRocksRowData;
+import com.starrocks.connector.flink.table.sink.v2.RecordSerializationSchema;
+import com.starrocks.connector.flink.table.sink.v2.StarRocksSinkContext;
+import com.starrocks.connector.flink.tools.JsonWrapper;
+import com.ververica.cdc.common.data.RecordData;
+import com.ververica.cdc.common.event.CreateTableEvent;
+import com.ververica.cdc.common.event.DataChangeEvent;
+import com.ververica.cdc.common.event.Event;
+import com.ververica.cdc.common.event.SchemaChangeEvent;
+import com.ververica.cdc.common.event.TableId;
+import com.ververica.cdc.common.schema.Column;
+import com.ververica.cdc.common.schema.Schema;
+import com.ververica.cdc.common.utils.Preconditions;
+import com.ververica.cdc.common.utils.SchemaUtils;
+
+import java.time.ZoneId;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static com.ververica.cdc.connectors.starrocks.sink.StarRocksUtils.createFieldGetter;
+
+/** Serializer for the input {@link Event}. It will serialize a row to a json string. */
+public class EventRecordSerializationSchema implements RecordSerializationSchema {
+
+ private static final long serialVersionUID = 1L;
+
+ /**
+ * The local time zone used when converting from TIMESTAMP WITH LOCAL TIME ZONE
.
+ */
+ private final ZoneId zoneId;
+
+ /** keep the relationship of TableId and table information. */
+ private transient Map tableInfoMap;
+
+ private transient DefaultStarRocksRowData reusableRowData;
+ private transient JsonWrapper jsonWrapper;
+
+ public EventRecordSerializationSchema(ZoneId zoneId) {
+ this.zoneId = zoneId;
+ }
+
+ @Override
+ public void open(
+ SerializationSchema.InitializationContext context, StarRocksSinkContext sinkContext) {
+ this.tableInfoMap = new HashMap<>();
+ this.reusableRowData = new DefaultStarRocksRowData();
+ this.jsonWrapper = new JsonWrapper();
+ }
+
+ @Override
+ public StarRocksRowData serialize(Event record) {
+ if (record instanceof SchemaChangeEvent) {
+ applySchemaChangeEvent((SchemaChangeEvent) record);
+ return null;
+ } else if (record instanceof DataChangeEvent) {
+ return applyDataChangeEvent((DataChangeEvent) record);
+ } else {
+ throw new UnsupportedOperationException("Don't support event " + record);
+ }
+ }
+
+ private void applySchemaChangeEvent(SchemaChangeEvent event) {
+ TableId tableId = event.tableId();
+ Schema newSchema;
+ if (event instanceof CreateTableEvent) {
+ newSchema = ((CreateTableEvent) event).getSchema();
+ } else {
+ TableInfo tableInfo = tableInfoMap.get(tableId);
+ if (tableInfo == null) {
+ throw new RuntimeException("schema of " + tableId + " is not existed.");
+ }
+ newSchema = SchemaUtils.applySchemaChangeEvent(tableInfo.schema, event);
+ }
+ TableInfo tableInfo = new TableInfo();
+ tableInfo.schema = newSchema;
+ tableInfo.fieldGetters = new RecordData.FieldGetter[newSchema.getColumnCount()];
+ for (int i = 0; i < newSchema.getColumnCount(); i++) {
+ tableInfo.fieldGetters[i] =
+ createFieldGetter(newSchema.getColumns().get(i).getType(), i, zoneId);
+ }
+ tableInfoMap.put(tableId, tableInfo);
+ }
+
+ private StarRocksRowData applyDataChangeEvent(DataChangeEvent event) {
+ TableInfo tableInfo = tableInfoMap.get(event.tableId());
+ Preconditions.checkNotNull(tableInfo, event.tableId() + " is not existed");
+ reusableRowData.setDatabase(event.tableId().getSchemaName());
+ reusableRowData.setTable(event.tableId().getTableName());
+ String value;
+ switch (event.op()) {
+ case INSERT:
+ case UPDATE:
+ case REPLACE:
+ value = serializeRecord(tableInfo, event.after(), false);
+ break;
+ case DELETE:
+ value = serializeRecord(tableInfo, event.before(), true);
+ break;
+ default:
+ throw new UnsupportedOperationException(
+ "Don't support operation type " + event.op());
+ }
+ reusableRowData.setRow(value);
+ return reusableRowData;
+ }
+
+ private String serializeRecord(TableInfo tableInfo, RecordData record, boolean isDelete) {
+ List columns = tableInfo.schema.getColumns();
+ Preconditions.checkArgument(columns.size() == record.getArity());
+ Map rowMap = new HashMap<>(record.getArity() + 1);
+ for (int i = 0; i < record.getArity(); i++) {
+ rowMap.put(columns.get(i).getName(), tableInfo.fieldGetters[i].getFieldOrNull(record));
+ }
+ rowMap.put("__op", isDelete ? 1 : 0);
+ return jsonWrapper.toJSONString(rowMap);
+ }
+
+ @Override
+ public void close() {}
+
+ /** Table information. */
+ private static class TableInfo {
+ Schema schema;
+ RecordData.FieldGetter[] fieldGetters;
+ }
+}
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/com/ververica/cdc/connectors/starrocks/sink/SchemaChangeConfig.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/com/ververica/cdc/connectors/starrocks/sink/SchemaChangeConfig.java
new file mode 100644
index 0000000000..820ac2faef
--- /dev/null
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/com/ververica/cdc/connectors/starrocks/sink/SchemaChangeConfig.java
@@ -0,0 +1,48 @@
+/*
+ * Copyright 2023 Ververica Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.ververica.cdc.connectors.starrocks.sink;
+
+import com.ververica.cdc.common.configuration.Configuration;
+import com.ververica.cdc.common.utils.Preconditions;
+
+import java.io.Serializable;
+
+import static com.ververica.cdc.connectors.starrocks.sink.StarRocksDataSinkOptions.TABLE_SCHEMA_CHANGE_TIMEOUT;
+
+/** Configurations for schema change. */
+public class SchemaChangeConfig implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ /** Timeout for a schema change on StarRocks side. */
+ private final long timeoutSecond;
+
+ public SchemaChangeConfig(long timeoutSecond) {
+ Preconditions.checkArgument(
+ timeoutSecond > 0, "Timeout must be positive, but actually is %s", timeoutSecond);
+ this.timeoutSecond = timeoutSecond;
+ }
+
+ public long getTimeoutSecond() {
+ return timeoutSecond;
+ }
+
+ public static SchemaChangeConfig from(Configuration config) {
+ long timeoutSecond = Math.max(1, config.get(TABLE_SCHEMA_CHANGE_TIMEOUT).getSeconds());
+ return new SchemaChangeConfig(timeoutSecond);
+ }
+}
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/com/ververica/cdc/connectors/starrocks/sink/StarRocksDataSink.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/com/ververica/cdc/connectors/starrocks/sink/StarRocksDataSink.java
new file mode 100644
index 0000000000..bf130c4e4c
--- /dev/null
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/com/ververica/cdc/connectors/starrocks/sink/StarRocksDataSink.java
@@ -0,0 +1,79 @@
+/*
+ * Copyright 2023 Ververica Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.ververica.cdc.connectors.starrocks.sink;
+
+import com.starrocks.connector.flink.catalog.StarRocksCatalog;
+import com.starrocks.connector.flink.table.sink.SinkFunctionFactory;
+import com.starrocks.connector.flink.table.sink.StarRocksSinkOptions;
+import com.starrocks.connector.flink.table.sink.v2.StarRocksSink;
+import com.ververica.cdc.common.event.Event;
+import com.ververica.cdc.common.sink.DataSink;
+import com.ververica.cdc.common.sink.EventSinkProvider;
+import com.ververica.cdc.common.sink.FlinkSinkProvider;
+import com.ververica.cdc.common.sink.MetadataApplier;
+
+import java.io.Serializable;
+import java.time.ZoneId;
+
+/** A {@link DataSink} for StarRocks connector that supports schema evolution. */
+public class StarRocksDataSink implements DataSink, Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ /** Configurations for sink connector. */
+ private final StarRocksSinkOptions sinkOptions;
+
+ /** Configurations for creating a StarRocks table. */
+ private final TableCreateConfig tableCreateConfig;
+
+ /** Configurations for schema change. */
+ private final SchemaChangeConfig schemaChangeConfig;
+
+ /**
+ * The local time zone used when converting from TIMESTAMP WITH LOCAL TIME ZONE
.
+ */
+ private final ZoneId zoneId;
+
+ public StarRocksDataSink(
+ StarRocksSinkOptions sinkOptions,
+ TableCreateConfig tableCreateConfig,
+ SchemaChangeConfig schemaChangeConfig,
+ ZoneId zoneId) {
+ this.sinkOptions = sinkOptions;
+ this.tableCreateConfig = tableCreateConfig;
+ this.schemaChangeConfig = schemaChangeConfig;
+ this.zoneId = zoneId;
+ }
+
+ @Override
+ public EventSinkProvider getEventSinkProvider() {
+ StarRocksSink starRocksSink =
+ SinkFunctionFactory.createSink(
+ sinkOptions, new EventRecordSerializationSchema(zoneId));
+ return FlinkSinkProvider.of(starRocksSink);
+ }
+
+ @Override
+ public MetadataApplier getMetadataApplier() {
+ StarRocksCatalog catalog =
+ new StarRocksCatalog(
+ sinkOptions.getJdbcUrl(),
+ sinkOptions.getUsername(),
+ sinkOptions.getPassword());
+ return new StarRocksMetadataApplier(catalog, tableCreateConfig, schemaChangeConfig);
+ }
+}
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/com/ververica/cdc/connectors/starrocks/sink/StarRocksDataSinkFactory.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/com/ververica/cdc/connectors/starrocks/sink/StarRocksDataSinkFactory.java
new file mode 100644
index 0000000000..eac903277c
--- /dev/null
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/com/ververica/cdc/connectors/starrocks/sink/StarRocksDataSinkFactory.java
@@ -0,0 +1,185 @@
+/*
+ * Copyright 2023 Ververica Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.ververica.cdc.connectors.starrocks.sink;
+
+import com.starrocks.connector.flink.table.sink.StarRocksSinkOptions;
+import com.ververica.cdc.common.configuration.ConfigOption;
+import com.ververica.cdc.common.configuration.Configuration;
+import com.ververica.cdc.common.factories.DataSinkFactory;
+import com.ververica.cdc.common.sink.DataSink;
+
+import java.time.ZoneId;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static com.ververica.cdc.common.pipeline.PipelineOptions.PIPELINE_LOCAL_TIME_ZONE;
+import static com.ververica.cdc.connectors.starrocks.sink.StarRocksDataSinkOptions.JDBC_URL;
+import static com.ververica.cdc.connectors.starrocks.sink.StarRocksDataSinkOptions.LOAD_URL;
+import static com.ververica.cdc.connectors.starrocks.sink.StarRocksDataSinkOptions.PASSWORD;
+import static com.ververica.cdc.connectors.starrocks.sink.StarRocksDataSinkOptions.SINK_AT_LEAST_ONCE_USE_TRANSACTION_LOAD;
+import static com.ververica.cdc.connectors.starrocks.sink.StarRocksDataSinkOptions.SINK_BATCH_FLUSH_INTERVAL;
+import static com.ververica.cdc.connectors.starrocks.sink.StarRocksDataSinkOptions.SINK_BATCH_MAX_SIZE;
+import static com.ververica.cdc.connectors.starrocks.sink.StarRocksDataSinkOptions.SINK_CONNECT_TIMEOUT;
+import static com.ververica.cdc.connectors.starrocks.sink.StarRocksDataSinkOptions.SINK_IO_THREAD_COUNT;
+import static com.ververica.cdc.connectors.starrocks.sink.StarRocksDataSinkOptions.SINK_LABEL_PREFIX;
+import static com.ververica.cdc.connectors.starrocks.sink.StarRocksDataSinkOptions.SINK_METRIC_HISTOGRAM_WINDOW_SIZE;
+import static com.ververica.cdc.connectors.starrocks.sink.StarRocksDataSinkOptions.SINK_PROPERTIES_PREFIX;
+import static com.ververica.cdc.connectors.starrocks.sink.StarRocksDataSinkOptions.SINK_SCAN_FREQUENCY;
+import static com.ververica.cdc.connectors.starrocks.sink.StarRocksDataSinkOptions.SINK_WAIT_FOR_CONTINUE_TIMEOUT;
+import static com.ververica.cdc.connectors.starrocks.sink.StarRocksDataSinkOptions.TABLE_CREATE_NUM_BUCKETS;
+import static com.ververica.cdc.connectors.starrocks.sink.StarRocksDataSinkOptions.TABLE_SCHEMA_CHANGE_TIMEOUT;
+import static com.ververica.cdc.connectors.starrocks.sink.StarRocksDataSinkOptions.USERNAME;
+
+/** A {@link DataSinkFactory} to create {@link StarRocksDataSink}. */
+public class StarRocksDataSinkFactory implements DataSinkFactory {
+
+ public static final String IDENTIFIER = "starrocks";
+
+ @Override
+ public DataSink createDataSink(Context context) {
+ StarRocksSinkOptions sinkOptions =
+ buildSinkConnectorOptions(context.getFactoryConfiguration());
+ TableCreateConfig tableCreateConfig =
+ TableCreateConfig.from(context.getFactoryConfiguration());
+ SchemaChangeConfig schemaChangeConfig =
+ SchemaChangeConfig.from(context.getFactoryConfiguration());
+ String zoneStr = context.getFactoryConfiguration().get(PIPELINE_LOCAL_TIME_ZONE);
+ ZoneId zoneId =
+ PIPELINE_LOCAL_TIME_ZONE.defaultValue().equals(zoneStr)
+ ? ZoneId.systemDefault()
+ : ZoneId.of(zoneStr);
+ return new StarRocksDataSink(sinkOptions, tableCreateConfig, schemaChangeConfig, zoneId);
+ }
+
+ private StarRocksSinkOptions buildSinkConnectorOptions(Configuration cdcConfig) {
+ org.apache.flink.configuration.Configuration sinkConfig =
+ new org.apache.flink.configuration.Configuration();
+ // required sink configurations
+ sinkConfig.set(StarRocksSinkOptions.JDBC_URL, cdcConfig.get(JDBC_URL));
+ sinkConfig.set(StarRocksSinkOptions.LOAD_URL, cdcConfig.get(LOAD_URL));
+ sinkConfig.set(StarRocksSinkOptions.USERNAME, cdcConfig.get(USERNAME));
+ sinkConfig.set(StarRocksSinkOptions.PASSWORD, cdcConfig.get(PASSWORD));
+ // optional sink configurations
+ cdcConfig
+ .getOptional(SINK_LABEL_PREFIX)
+ .ifPresent(
+ config -> sinkConfig.set(StarRocksSinkOptions.SINK_LABEL_PREFIX, config));
+ cdcConfig
+ .getOptional(SINK_CONNECT_TIMEOUT)
+ .ifPresent(
+ config ->
+ sinkConfig.set(StarRocksSinkOptions.SINK_CONNECT_TIMEOUT, config));
+ cdcConfig
+ .getOptional(SINK_WAIT_FOR_CONTINUE_TIMEOUT)
+ .ifPresent(
+ config ->
+ sinkConfig.set(
+ StarRocksSinkOptions.SINK_WAIT_FOR_CONTINUE_TIMEOUT,
+ config));
+ cdcConfig
+ .getOptional(SINK_BATCH_MAX_SIZE)
+ .ifPresent(
+ config -> sinkConfig.set(StarRocksSinkOptions.SINK_BATCH_MAX_SIZE, config));
+ cdcConfig
+ .getOptional(SINK_BATCH_FLUSH_INTERVAL)
+ .ifPresent(
+ config ->
+ sinkConfig.set(
+ StarRocksSinkOptions.SINK_BATCH_FLUSH_INTERVAL, config));
+ cdcConfig
+ .getOptional(SINK_SCAN_FREQUENCY)
+ .ifPresent(
+ config -> sinkConfig.set(StarRocksSinkOptions.SINK_SCAN_FREQUENCY, config));
+ cdcConfig
+ .getOptional(SINK_IO_THREAD_COUNT)
+ .ifPresent(
+ config ->
+ sinkConfig.set(StarRocksSinkOptions.SINK_IO_THREAD_COUNT, config));
+ cdcConfig
+ .getOptional(SINK_AT_LEAST_ONCE_USE_TRANSACTION_LOAD)
+ .ifPresent(
+ config ->
+ sinkConfig.set(
+ StarRocksSinkOptions
+ .SINK_AT_LEAST_ONCE_USE_TRANSACTION_LOAD,
+ config));
+ cdcConfig
+ .getOptional(SINK_METRIC_HISTOGRAM_WINDOW_SIZE)
+ .ifPresent(
+ config ->
+ sinkConfig.set(
+ StarRocksSinkOptions.SINK_METRIC_HISTOGRAM_WINDOW_SIZE,
+ config));
+ // specified sink configurations for cdc scenario
+ sinkConfig.set(StarRocksSinkOptions.DATABASE_NAME, "*");
+ sinkConfig.set(StarRocksSinkOptions.TABLE_NAME, "*");
+ sinkConfig.set(StarRocksSinkOptions.SINK_USE_NEW_SINK_API, true);
+ // currently cdc framework only supports at-least-once
+ sinkConfig.set(StarRocksSinkOptions.SINK_SEMANTIC, "at-least-once");
+
+ Map streamProperties =
+ getPrefixConfigs(cdcConfig.toMap(), SINK_PROPERTIES_PREFIX);
+ // force to use json format for stream load to simplify the configuration,
+ // such as there is no need to reconfigure the "columns" property after
+ // schema change. csv format can be supported in the future if needed
+ streamProperties.put("sink.properties.format", "json");
+ streamProperties.put("sink.properties.strip_outer_array", "true");
+ streamProperties.put("sink.properties.ignore_json_size", "true");
+
+ return new StarRocksSinkOptions(sinkConfig, streamProperties);
+ }
+
+ private Map getPrefixConfigs(Map config, String prefix) {
+ return config.entrySet().stream()
+ .filter(entry -> entry.getKey().startsWith(prefix))
+ .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+ }
+
+ @Override
+ public String identifier() {
+ return IDENTIFIER;
+ }
+
+ @Override
+ public Set> requiredOptions() {
+ Set> requiredOptions = new HashSet<>();
+ requiredOptions.add(JDBC_URL);
+ requiredOptions.add(LOAD_URL);
+ requiredOptions.add(USERNAME);
+ requiredOptions.add(PASSWORD);
+ return requiredOptions;
+ }
+
+ @Override
+ public Set> optionalOptions() {
+ Set> optionalOptions = new HashSet<>();
+ optionalOptions.add(SINK_LABEL_PREFIX);
+ optionalOptions.add(SINK_CONNECT_TIMEOUT);
+ optionalOptions.add(SINK_WAIT_FOR_CONTINUE_TIMEOUT);
+ optionalOptions.add(SINK_BATCH_MAX_SIZE);
+ optionalOptions.add(SINK_BATCH_FLUSH_INTERVAL);
+ optionalOptions.add(SINK_SCAN_FREQUENCY);
+ optionalOptions.add(SINK_IO_THREAD_COUNT);
+ optionalOptions.add(SINK_AT_LEAST_ONCE_USE_TRANSACTION_LOAD);
+ optionalOptions.add(SINK_METRIC_HISTOGRAM_WINDOW_SIZE);
+ optionalOptions.add(TABLE_CREATE_NUM_BUCKETS);
+ optionalOptions.add(TABLE_SCHEMA_CHANGE_TIMEOUT);
+ return optionalOptions;
+ }
+}
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/com/ververica/cdc/connectors/starrocks/sink/StarRocksDataSinkOptions.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/com/ververica/cdc/connectors/starrocks/sink/StarRocksDataSinkOptions.java
new file mode 100644
index 0000000000..71dd7b6ee0
--- /dev/null
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/com/ververica/cdc/connectors/starrocks/sink/StarRocksDataSinkOptions.java
@@ -0,0 +1,150 @@
+/*
+ * Copyright 2023 Ververica Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.ververica.cdc.connectors.starrocks.sink;
+
+import com.starrocks.connector.flink.table.sink.StarRocksSinkOptions;
+import com.ververica.cdc.common.configuration.ConfigOption;
+import com.ververica.cdc.common.configuration.ConfigOptions;
+
+import java.time.Duration;
+import java.util.List;
+
+/** Options for {@link StarRocksDataSink}. */
+public class StarRocksDataSinkOptions {
+
+ // ------------------------------------------------------------------------------------------
+ // Options for sink connector
+ // ------------------------------------------------------------------------------------------
+
+ public static final ConfigOption JDBC_URL =
+ ConfigOptions.key("jdbc-url")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "Url of the jdbc like: `jdbc:mysql://fe_ip1:query_port,fe_ip2:query_port...`.");
+
+ public static final ConfigOption> LOAD_URL =
+ ConfigOptions.key("load-url")
+ .stringType()
+ .asList()
+ .noDefaultValue()
+ .withDescription(
+ "Url of the stream load, if you you don't specify the http/https prefix, the default http. "
+ + "like: `fe_ip1:http_port;http://fe_ip2:http_port;https://fe_nlb`.");
+
+ public static final ConfigOption USERNAME =
+ ConfigOptions.key("username")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("StarRocks user name.");
+
+ public static final ConfigOption PASSWORD =
+ ConfigOptions.key("password")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("StarRocks user password.");
+
+ public static final ConfigOption SINK_LABEL_PREFIX =
+ ConfigOptions.key("sink.label-prefix")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "The prefix of the stream load label. Available values are within [-_A-Za-z0-9]");
+
+ public static final ConfigOption SINK_CONNECT_TIMEOUT =
+ ConfigOptions.key("sink.connect.timeout-ms")
+ .intType()
+ .defaultValue(30000)
+ .withDescription("Timeout in millisecond for connecting to the `load-url`.");
+
+ public static final ConfigOption SINK_WAIT_FOR_CONTINUE_TIMEOUT =
+ ConfigOptions.key("sink.wait-for-continue.timeout-ms")
+ .intType()
+ .defaultValue(30000)
+ .withDescription(
+ "Timeout in millisecond to wait for 100-continue response from FE http server.");
+
+ public static final ConfigOption SINK_BATCH_MAX_SIZE =
+ ConfigOptions.key("sink.buffer-flush.max-bytes")
+ .longType()
+ .defaultValue(150L * 1024 * 1024)
+ .withDescription("Max data bytes of the flush.");
+
+ public static final ConfigOption SINK_BATCH_FLUSH_INTERVAL =
+ ConfigOptions.key("sink.buffer-flush.interval-ms")
+ .longType()
+ .defaultValue(300000L)
+ .withDescription("Flush interval of the row batch in millisecond.");
+
+ public static final ConfigOption SINK_SCAN_FREQUENCY =
+ ConfigOptions.key("sink.scan-frequency.ms")
+ .longType()
+ .defaultValue(50L)
+ .withDescription(
+ "Scan frequency in milliseconds to check whether the buffer reaches the flush interval.");
+
+ public static final ConfigOption SINK_IO_THREAD_COUNT =
+ ConfigOptions.key("sink.io.thread-count")
+ .intType()
+ .defaultValue(2)
+ .withDescription(
+ "Number of threads used for concurrent stream loads among different tables.");
+
+ public static final ConfigOption SINK_AT_LEAST_ONCE_USE_TRANSACTION_LOAD =
+ ConfigOptions.key("sink.at-least-once.use-transaction-stream-load")
+ .booleanType()
+ .defaultValue(true)
+ .withDescription(
+ "Whether to use transaction stream load for at-least-once when it's available.");
+
+ public static final ConfigOption SINK_METRIC_HISTOGRAM_WINDOW_SIZE =
+ ConfigOptions.key("sink.metric.histogram-window-size")
+ .intType()
+ .defaultValue(100)
+ .withDescription("Window size of histogram metrics.");
+
+ /** The prefix for stream load properties, such as sink.properties.timeout. */
+ public static final String SINK_PROPERTIES_PREFIX = StarRocksSinkOptions.SINK_PROPERTIES_PREFIX;
+
+ // ------------------------------------------------------------------------------------------
+ // Options for schema change
+ // ------------------------------------------------------------------------------------------
+
+ /**
+ * The prefix for properties used for creating a table. You can refer to StarRocks documentation
+ * for the DDL.
+ * https://docs.starrocks.io/docs/table_design/table_types/primary_key_table/#create-a-table
+ */
+ public static final String TABLE_CREATE_PROPERTIES_PREFIX = "table.create.properties.";
+
+ public static final ConfigOption TABLE_CREATE_NUM_BUCKETS =
+ ConfigOptions.key("table.create.num-buckets")
+ .intType()
+ .noDefaultValue()
+ .withDescription(
+ "Number of buckets for creating a StarRocks table. If not set, StarRocks will "
+ + "automatically choose the number of buckets.");
+
+ public static final ConfigOption TABLE_SCHEMA_CHANGE_TIMEOUT =
+ ConfigOptions.key("table.schema-change.timeout")
+ .durationType()
+ .defaultValue(Duration.ofSeconds(1800))
+ .withDescription(
+ "Timeout for a schema change on StarRocks side, and must be an integral multiple of "
+ + "seconds. StarRocks will cancel the schema change after timeout which will "
+ + "cause the sink failure.");
+}
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/com/ververica/cdc/connectors/starrocks/sink/StarRocksMetadataApplier.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/com/ververica/cdc/connectors/starrocks/sink/StarRocksMetadataApplier.java
new file mode 100644
index 0000000000..afb083ba23
--- /dev/null
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/com/ververica/cdc/connectors/starrocks/sink/StarRocksMetadataApplier.java
@@ -0,0 +1,273 @@
+/*
+ * Copyright 2023 Ververica Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.ververica.cdc.connectors.starrocks.sink;
+
+import com.starrocks.connector.flink.catalog.StarRocksCatalog;
+import com.starrocks.connector.flink.catalog.StarRocksCatalogException;
+import com.starrocks.connector.flink.catalog.StarRocksColumn;
+import com.starrocks.connector.flink.catalog.StarRocksTable;
+import com.ververica.cdc.common.event.AddColumnEvent;
+import com.ververica.cdc.common.event.AlterColumnTypeEvent;
+import com.ververica.cdc.common.event.CreateTableEvent;
+import com.ververica.cdc.common.event.DropColumnEvent;
+import com.ververica.cdc.common.event.RenameColumnEvent;
+import com.ververica.cdc.common.event.SchemaChangeEvent;
+import com.ververica.cdc.common.event.TableId;
+import com.ververica.cdc.common.schema.Column;
+import com.ververica.cdc.common.sink.MetadataApplier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static com.ververica.cdc.connectors.starrocks.sink.StarRocksUtils.toStarRocksDataType;
+
+/** A {@code MetadataApplier} that applies metadata changes to StarRocks. */
+public class StarRocksMetadataApplier implements MetadataApplier {
+
+ private static final long serialVersionUID = 1L;
+
+ private static final Logger LOG = LoggerFactory.getLogger(StarRocksMetadataApplier.class);
+
+ private final StarRocksCatalog catalog;
+ private final TableCreateConfig tableCreateConfig;
+ private final SchemaChangeConfig schemaChangeConfig;
+ private boolean isOpened;
+
+ public StarRocksMetadataApplier(
+ StarRocksCatalog catalog,
+ TableCreateConfig tableCreateConfig,
+ SchemaChangeConfig schemaChangeConfig) {
+ this.catalog = catalog;
+ this.tableCreateConfig = tableCreateConfig;
+ this.schemaChangeConfig = schemaChangeConfig;
+ this.isOpened = false;
+ }
+
+ @Override
+ public void applySchemaChange(SchemaChangeEvent schemaChangeEvent) {
+ if (!isOpened) {
+ isOpened = true;
+ catalog.open();
+ }
+
+ if (schemaChangeEvent instanceof CreateTableEvent) {
+ applyCreateTable((CreateTableEvent) schemaChangeEvent);
+ } else if (schemaChangeEvent instanceof AddColumnEvent) {
+ applyAddColumn((AddColumnEvent) schemaChangeEvent);
+ } else if (schemaChangeEvent instanceof DropColumnEvent) {
+ applyDropColumn((DropColumnEvent) schemaChangeEvent);
+ } else if (schemaChangeEvent instanceof RenameColumnEvent) {
+ applyRenameColumn((RenameColumnEvent) schemaChangeEvent);
+ } else if (schemaChangeEvent instanceof AlterColumnTypeEvent) {
+ applyAlterColumn((AlterColumnTypeEvent) schemaChangeEvent);
+ } else {
+ throw new UnsupportedOperationException(
+ "StarRocksDataSink doesn't support schema change event " + schemaChangeEvent);
+ }
+ }
+
+ private void applyCreateTable(CreateTableEvent createTableEvent) {
+ StarRocksTable starRocksTable =
+ StarRocksUtils.toStarRocksTable(
+ createTableEvent.tableId(),
+ createTableEvent.getSchema(),
+ tableCreateConfig);
+ if (!catalog.databaseExists(starRocksTable.getDatabaseName())) {
+ catalog.createDatabase(starRocksTable.getDatabaseName(), true);
+ }
+
+ try {
+ catalog.createTable(starRocksTable, true);
+ LOG.info("Successful to create table, event: {}", createTableEvent);
+ } catch (StarRocksCatalogException e) {
+ LOG.error("Failed to create table, event: {}", createTableEvent.tableId(), e);
+ throw new RuntimeException("Failed to create table, event: " + createTableEvent, e);
+ }
+ }
+
+ private void applyAddColumn(AddColumnEvent addColumnEvent) {
+ List addColumns = new ArrayList<>();
+ for (AddColumnEvent.ColumnWithPosition columnWithPosition :
+ addColumnEvent.getAddedColumns()) {
+ // we will ignore position information, and always add the column to the last.
+ // The reason is that the order of columns between source table and StarRocks
+ // table may be not consistent because of limitations of StarRocks table, so the
+ // position may be meaningless. For example, primary keys of StarRocks table
+ // must be at the front, but mysql doest not have this limitation, so the order
+ // may be different, and also FIRST position is not allowed for StarRocks primary
+ // key table.
+ Column column = columnWithPosition.getAddColumn();
+ StarRocksColumn.Builder builder =
+ new StarRocksColumn.Builder()
+ .setColumnName(column.getName())
+ .setOrdinalPosition(-1)
+ .setColumnComment(column.getComment());
+ toStarRocksDataType(column, builder);
+ addColumns.add(builder.build());
+ }
+
+ TableId tableId = addColumnEvent.tableId();
+ StarRocksCatalogException alterException = null;
+ try {
+ catalog.alterAddColumns(
+ tableId.getSchemaName(),
+ tableId.getTableName(),
+ addColumns,
+ schemaChangeConfig.getTimeoutSecond());
+ } catch (StarRocksCatalogException e) {
+ alterException = e;
+ }
+
+ // Check whether the columns have been actually added to the table.
+ // This is useful for duplicate schema change after failover. Adding
+ // same columns will fail on StarRocks side, but it should be successful
+ // on CDC side
+ StarRocksTable table = null;
+ try {
+ table = catalog.getTable(tableId.getSchemaName(), tableId.getTableName()).orElse(null);
+ } catch (StarRocksCatalogException e) {
+ LOG.warn("Failed to get table {}", tableId, e);
+ }
+ boolean allAdded = true;
+ if (table != null) {
+ for (StarRocksColumn column : addColumns) {
+ if (table.getColumn(column.getColumnName()) == null) {
+ allAdded = false;
+ break;
+ }
+ }
+ }
+
+ if (allAdded) {
+ if (alterException == null) {
+ LOG.info("Successful to apply add column, event: {}", addColumnEvent);
+ } else {
+ LOG.info(
+ "Successful to apply add column, event: {}, and ignore the alter exception",
+ addColumnEvent,
+ alterException);
+ }
+ return;
+ }
+
+ if (alterException != null) {
+ LOG.error(
+ "Failed to apply add column because of alter exception, event: {}",
+ addColumnEvent,
+ alterException);
+ throw new RuntimeException(
+ "Failed to apply add column because of alter exception, event: "
+ + addColumnEvent,
+ alterException);
+ } else {
+ String errorMsg =
+ String.format(
+ "Failed to apply add column because of validation failure, event: %s, table: %s",
+ addColumnEvent, table);
+ LOG.error(errorMsg);
+ throw new RuntimeException(errorMsg);
+ }
+ }
+
+ private void applyDropColumn(DropColumnEvent dropColumnEvent) {
+ List dropColumns =
+ dropColumnEvent.getDroppedColumns().stream()
+ .map(Column::getName)
+ .collect(Collectors.toList());
+ TableId tableId = dropColumnEvent.tableId();
+ StarRocksCatalogException alterException = null;
+ try {
+ catalog.alterDropColumns(
+ dropColumnEvent.tableId().getSchemaName(),
+ dropColumnEvent.tableId().getTableName(),
+ dropColumns,
+ schemaChangeConfig.getTimeoutSecond());
+ } catch (StarRocksCatalogException e) {
+ alterException = e;
+ }
+
+ // Check whether the columns have been actually dropped from the table.
+ // This is useful for duplicate schema change after failover. Drop
+ // non-existed columns will fail on StarRocks side, but it should be
+ // successful on CDC side
+ StarRocksTable table = null;
+ try {
+ table = catalog.getTable(tableId.getSchemaName(), tableId.getTableName()).orElse(null);
+ } catch (StarRocksCatalogException ie) {
+ LOG.warn("Failed to get table {}", tableId, ie);
+ }
+
+ boolean allDrop = true;
+ if (table != null) {
+ for (String columnName : dropColumns) {
+ if (table.getColumn(columnName) != null) {
+ allDrop = false;
+ break;
+ }
+ }
+ }
+
+ if (allDrop) {
+ if (alterException == null) {
+ LOG.info("Successful to apply drop column, event: {}", dropColumnEvent);
+ } else {
+ LOG.info(
+ "Successful to apply drop column, event: {}, and ignore the alter exception",
+ dropColumnEvent,
+ alterException);
+ }
+ return;
+ }
+
+ if (alterException != null) {
+ LOG.error(
+ "Failed to apply drop column because of alter exception, event: {}",
+ dropColumnEvent,
+ alterException);
+ throw new RuntimeException(
+ "Failed to apply drop column because of alter exception, event: "
+ + dropColumnEvent,
+ alterException);
+ } else {
+ String errorMsg =
+ String.format(
+ "Failed to apply drop column because of validation failure, event: %s, table: %s",
+ dropColumnEvent, table);
+ LOG.error(errorMsg);
+ throw new RuntimeException(errorMsg);
+ }
+ }
+
+ private void applyRenameColumn(RenameColumnEvent renameColumnEvent) {
+ // TODO StarRocks plans to support column rename since 3.3 which has not been released.
+ // Support it later.
+ throw new UnsupportedOperationException("Rename column is not supported currently");
+ }
+
+ private void applyAlterColumn(AlterColumnTypeEvent alterColumnTypeEvent) {
+ // TODO There are limitations for data type conversions. We should know the data types
+ // before and after changing so that we can make a validation. But the event only contains
+ // data
+ // types after changing. One way is that the framework delivers the old schema. We can
+ // support
+ // the alter after a discussion.
+ throw new UnsupportedOperationException("Alter column is not supported currently");
+ }
+}
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/com/ververica/cdc/connectors/starrocks/sink/StarRocksUtils.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/com/ververica/cdc/connectors/starrocks/sink/StarRocksUtils.java
new file mode 100644
index 0000000000..4f1394caae
--- /dev/null
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/com/ververica/cdc/connectors/starrocks/sink/StarRocksUtils.java
@@ -0,0 +1,341 @@
+/*
+ * Copyright 2023 Ververica Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.ververica.cdc.connectors.starrocks.sink;
+
+import com.starrocks.connector.flink.catalog.StarRocksColumn;
+import com.starrocks.connector.flink.catalog.StarRocksTable;
+import com.ververica.cdc.common.data.RecordData;
+import com.ververica.cdc.common.event.TableId;
+import com.ververica.cdc.common.schema.Column;
+import com.ververica.cdc.common.schema.Schema;
+import com.ververica.cdc.common.types.BigIntType;
+import com.ververica.cdc.common.types.BooleanType;
+import com.ververica.cdc.common.types.CharType;
+import com.ververica.cdc.common.types.DataType;
+import com.ververica.cdc.common.types.DataTypeDefaultVisitor;
+import com.ververica.cdc.common.types.DateType;
+import com.ververica.cdc.common.types.DecimalType;
+import com.ververica.cdc.common.types.DoubleType;
+import com.ververica.cdc.common.types.FloatType;
+import com.ververica.cdc.common.types.IntType;
+import com.ververica.cdc.common.types.LocalZonedTimestampType;
+import com.ververica.cdc.common.types.SmallIntType;
+import com.ververica.cdc.common.types.TimestampType;
+import com.ververica.cdc.common.types.TinyIntType;
+import com.ververica.cdc.common.types.VarCharType;
+
+import java.sql.Date;
+import java.text.SimpleDateFormat;
+import java.time.LocalDate;
+import java.time.ZoneId;
+import java.time.ZonedDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.List;
+
+import static com.ververica.cdc.common.types.DataTypeChecks.getPrecision;
+import static com.ververica.cdc.common.types.DataTypeChecks.getScale;
+
+/** Utilities for conversion from source table to StarRocks table. */
+public class StarRocksUtils {
+
+ /** Convert a source table to {@link StarRocksTable}. */
+ public static StarRocksTable toStarRocksTable(
+ TableId tableId, Schema schema, TableCreateConfig tableCreateConfig) {
+ if (schema.primaryKeys().isEmpty()) {
+ throw new RuntimeException(
+ String.format(
+ "Only support StarRocks primary key table, but the source table %s has no primary keys",
+ tableId));
+ }
+
+ // For StarRocks primary key table DDL, primary key columns must be defined before other
+ // columns,
+ // so reorder the columns in the source schema to make primary key columns at the front
+ List orderedColumns = new ArrayList<>();
+ for (String primaryKey : schema.primaryKeys()) {
+ orderedColumns.add(schema.getColumn(primaryKey).get());
+ }
+ for (Column column : schema.getColumns()) {
+ if (!schema.primaryKeys().contains(column.getName())) {
+ orderedColumns.add(column);
+ }
+ }
+
+ List starRocksColumns = new ArrayList<>();
+ for (int i = 0; i < orderedColumns.size(); i++) {
+ Column column = orderedColumns.get(i);
+ StarRocksColumn.Builder builder =
+ new StarRocksColumn.Builder()
+ .setColumnName(column.getName())
+ .setOrdinalPosition(i)
+ .setColumnComment(column.getComment());
+ toStarRocksDataType(column, builder);
+ starRocksColumns.add(builder.build());
+ }
+
+ StarRocksTable.Builder tableBuilder =
+ new StarRocksTable.Builder()
+ .setDatabaseName(tableId.getSchemaName())
+ .setTableName(tableId.getTableName())
+ .setTableType(StarRocksTable.TableType.PRIMARY_KEY)
+ .setColumns(starRocksColumns)
+ .setTableKeys(schema.primaryKeys())
+ // use primary keys as distribution keys by default
+ .setDistributionKeys(schema.primaryKeys())
+ .setComment(schema.comment());
+ if (tableCreateConfig.getNumBuckets().isPresent()) {
+ tableBuilder.setNumBuckets(tableCreateConfig.getNumBuckets().get());
+ }
+ tableBuilder.setTableProperties(tableCreateConfig.getProperties());
+ return tableBuilder.build();
+ }
+
+ /** Convert CDC data type to StarRocks data type. */
+ public static void toStarRocksDataType(Column cdcColumn, StarRocksColumn.Builder builder) {
+ CdcDataTypeTransformer dataTypeTransformer = new CdcDataTypeTransformer(builder);
+ cdcColumn.getType().accept(dataTypeTransformer);
+ }
+
+ /** Format DATE type data. */
+ private static final SimpleDateFormat DATE_FORMATTER = new SimpleDateFormat("yyyy-MM-dd");
+
+ /** Format timestamp-related type data. */
+ private static final DateTimeFormatter DATETIME_FORMATTER =
+ DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
+
+ /**
+ * Creates an accessor for getting elements in an internal RecordData structure at the given
+ * position.
+ *
+ * @param fieldType the element type of the RecordData
+ * @param fieldPos the element position of the RecordData
+ * @param zoneId the time zone used when converting from TIMESTAMP WITH LOCAL TIME ZONE
+ *
+ */
+ public static RecordData.FieldGetter createFieldGetter(
+ DataType fieldType, int fieldPos, ZoneId zoneId) {
+ final RecordData.FieldGetter fieldGetter;
+ // ordered by type root definition
+ switch (fieldType.getTypeRoot()) {
+ case BOOLEAN:
+ fieldGetter = record -> record.getBoolean(fieldPos);
+ break;
+ case TINYINT:
+ fieldGetter = record -> record.getByte(fieldPos);
+ break;
+ case SMALLINT:
+ fieldGetter = record -> record.getShort(fieldPos);
+ break;
+ case INTEGER:
+ fieldGetter = record -> record.getInt(fieldPos);
+ break;
+ case BIGINT:
+ fieldGetter = record -> record.getLong(fieldPos);
+ break;
+ case FLOAT:
+ fieldGetter = record -> record.getFloat(fieldPos);
+ break;
+ case DOUBLE:
+ fieldGetter = record -> record.getDouble(fieldPos);
+ break;
+ case DECIMAL:
+ final int decimalPrecision = getPrecision(fieldType);
+ final int decimalScale = getScale(fieldType);
+ fieldGetter =
+ record ->
+ record.getDecimal(fieldPos, decimalPrecision, decimalScale)
+ .toBigDecimal();
+ break;
+ case CHAR:
+ case VARCHAR:
+ fieldGetter = record -> record.getString(fieldPos).toString();
+ break;
+ case DATE:
+ fieldGetter =
+ record ->
+ DATE_FORMATTER.format(
+ Date.valueOf(
+ LocalDate.ofEpochDay(record.getInt(fieldPos))));
+ break;
+ case TIMESTAMP_WITHOUT_TIME_ZONE:
+ fieldGetter =
+ record ->
+ record.getTimestamp(fieldPos, getPrecision(fieldType))
+ .toLocalDateTime()
+ .format(DATETIME_FORMATTER);
+ break;
+ case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+ fieldGetter =
+ record ->
+ ZonedDateTime.ofInstant(
+ record.getLocalZonedTimestampData(
+ fieldPos, getPrecision(fieldType))
+ .toInstant(),
+ zoneId)
+ .toLocalDateTime()
+ .format(DATETIME_FORMATTER);
+ break;
+ default:
+ throw new UnsupportedOperationException(
+ "Don't support data type " + fieldType.getTypeRoot());
+ }
+ if (!fieldType.isNullable()) {
+ return fieldGetter;
+ }
+ return row -> {
+ if (row.isNullAt(fieldPos)) {
+ return null;
+ }
+ return fieldGetter.getFieldOrNull(row);
+ };
+ }
+
+ // ------------------------------------------------------------------------------------------
+ // StarRocks data types
+ // ------------------------------------------------------------------------------------------
+
+ public static final String BOOLEAN = "BOOLEAN";
+ public static final String TINYINT = "TINYINT";
+ public static final String SMALLINT = "SMALLINT";
+ public static final String INT = "INT";
+ public static final String BIGINT = "BIGINT";
+ public static final String LARGEINT = "BIGINT UNSIGNED";
+ public static final String FLOAT = "FLOAT";
+ public static final String DOUBLE = "DOUBLE";
+ public static final String DECIMAL = "DECIMAL";
+ public static final String CHAR = "CHAR";
+ public static final String VARCHAR = "VARCHAR";
+ public static final String STRING = "STRING";
+ public static final String DATE = "DATE";
+ public static final String DATETIME = "DATETIME";
+ public static final String JSON = "JSON";
+
+ /** Max size of varchar type of StarRocks. */
+ public static final int MAX_VARCHAR_SIZE = 1048576;
+
+ /** Transforms CDC {@link DataType} to StarRocks data type. */
+ private static class CdcDataTypeTransformer
+ extends DataTypeDefaultVisitor {
+
+ private final StarRocksColumn.Builder builder;
+
+ public CdcDataTypeTransformer(StarRocksColumn.Builder builder) {
+ this.builder = builder;
+ }
+
+ @Override
+ public StarRocksColumn.Builder visit(BooleanType booleanType) {
+ builder.setDataType(BOOLEAN);
+ builder.setNullable(booleanType.isNullable());
+ return builder;
+ }
+
+ @Override
+ public StarRocksColumn.Builder visit(TinyIntType tinyIntType) {
+ builder.setDataType(TINYINT);
+ builder.setNullable(tinyIntType.isNullable());
+ return builder;
+ }
+
+ @Override
+ public StarRocksColumn.Builder visit(SmallIntType smallIntType) {
+ builder.setDataType(SMALLINT);
+ builder.setNullable(smallIntType.isNullable());
+ return builder;
+ }
+
+ @Override
+ public StarRocksColumn.Builder visit(IntType intType) {
+ builder.setDataType(INT);
+ builder.setNullable(intType.isNullable());
+ return builder;
+ }
+
+ @Override
+ public StarRocksColumn.Builder visit(BigIntType bigIntType) {
+ builder.setDataType(BIGINT);
+ builder.setNullable(bigIntType.isNullable());
+ return builder;
+ }
+
+ @Override
+ public StarRocksColumn.Builder visit(FloatType floatType) {
+ builder.setDataType(FLOAT);
+ builder.setNullable(floatType.isNullable());
+ return builder;
+ }
+
+ @Override
+ public StarRocksColumn.Builder visit(DoubleType doubleType) {
+ builder.setDataType(DOUBLE);
+ builder.setNullable(doubleType.isNullable());
+ return builder;
+ }
+
+ @Override
+ public StarRocksColumn.Builder visit(DecimalType decimalType) {
+ builder.setDataType(DECIMAL);
+ builder.setNullable(decimalType.isNullable());
+ builder.setColumnSize(decimalType.getPrecision());
+ builder.setDecimalDigits(decimalType.getScale());
+ return builder;
+ }
+
+ @Override
+ public StarRocksColumn.Builder visit(CharType charType) {
+ builder.setDataType(CHAR);
+ builder.setNullable(charType.isNullable());
+ builder.setColumnSize(charType.getLength());
+ return builder;
+ }
+
+ @Override
+ public StarRocksColumn.Builder visit(VarCharType varCharType) {
+ builder.setDataType(VARCHAR);
+ builder.setNullable(varCharType.isNullable());
+ builder.setColumnSize(Math.min(varCharType.getLength(), MAX_VARCHAR_SIZE));
+ return builder;
+ }
+
+ @Override
+ public StarRocksColumn.Builder visit(DateType dateType) {
+ builder.setDataType(DATE);
+ builder.setNullable(dateType.isNullable());
+ return builder;
+ }
+
+ @Override
+ public StarRocksColumn.Builder visit(TimestampType timestampType) {
+ builder.setDataType(DATETIME);
+ builder.setNullable(timestampType.isNullable());
+ return builder;
+ }
+
+ @Override
+ public StarRocksColumn.Builder visit(LocalZonedTimestampType localZonedTimestampType) {
+ builder.setDataType(DATETIME);
+ builder.setNullable(localZonedTimestampType.isNullable());
+ return builder;
+ }
+
+ @Override
+ protected StarRocksColumn.Builder defaultMethod(DataType dataType) {
+ throw new UnsupportedOperationException("Unsupported CDC data type " + dataType);
+ }
+ }
+}
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/com/ververica/cdc/connectors/starrocks/sink/TableCreateConfig.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/com/ververica/cdc/connectors/starrocks/sink/TableCreateConfig.java
new file mode 100644
index 0000000000..d797c057e9
--- /dev/null
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/com/ververica/cdc/connectors/starrocks/sink/TableCreateConfig.java
@@ -0,0 +1,78 @@
+/*
+ * Copyright 2023 Ververica Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.ververica.cdc.connectors.starrocks.sink;
+
+import com.ververica.cdc.common.configuration.Configuration;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+import static com.ververica.cdc.connectors.starrocks.sink.StarRocksDataSinkOptions.TABLE_CREATE_NUM_BUCKETS;
+import static com.ververica.cdc.connectors.starrocks.sink.StarRocksDataSinkOptions.TABLE_CREATE_PROPERTIES_PREFIX;
+
+/**
+ * Configurations for creating a StarRocks table. See StarRocks Documentation for how to create a StarRocks
+ * primary key table.
+ */
+public class TableCreateConfig implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ /** Number of buckets for the table. null if not set. */
+ @Nullable private final Integer numBuckets;
+
+ /** Properties for the table. */
+ private final Map properties;
+
+ public TableCreateConfig(@Nullable Integer numBuckets, Map properties) {
+ this.numBuckets = numBuckets;
+ this.properties = new HashMap<>(properties);
+ }
+
+ public Optional getNumBuckets() {
+ return numBuckets == null ? Optional.empty() : Optional.of(numBuckets);
+ }
+
+ public Map getProperties() {
+ return Collections.unmodifiableMap(properties);
+ }
+
+ public static TableCreateConfig from(Configuration config) {
+ Integer numBuckets = config.get(TABLE_CREATE_NUM_BUCKETS);
+ Map tableProperties =
+ config.toMap().entrySet().stream()
+ .filter(entry -> entry.getKey().startsWith(TABLE_CREATE_PROPERTIES_PREFIX))
+ .collect(
+ Collectors.toMap(
+ entry ->
+ entry.getKey()
+ .substring(
+ TABLE_CREATE_PROPERTIES_PREFIX
+ .length())
+ .toLowerCase(),
+ Map.Entry::getValue));
+ return new TableCreateConfig(numBuckets, tableProperties);
+ }
+}
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/resources/META-INF/services/com.ververica.cdc.common.factories.Factory b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/resources/META-INF/services/com.ververica.cdc.common.factories.Factory
new file mode 100644
index 0000000000..9d3c827d61
--- /dev/null
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/resources/META-INF/services/com.ververica.cdc.common.factories.Factory
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+com.ververica.cdc.connectors.starrocks.sink.StarRocksDataSinkFactory
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/com/ververical/cdc/connectors/starrocks/sink/EventRecordSerializationSchemaTest.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/com/ververical/cdc/connectors/starrocks/sink/EventRecordSerializationSchemaTest.java
new file mode 100644
index 0000000000..0616f64155
--- /dev/null
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/com/ververical/cdc/connectors/starrocks/sink/EventRecordSerializationSchemaTest.java
@@ -0,0 +1,356 @@
+/*
+ * Copyright 2023 Ververica Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.ververical.cdc.connectors.starrocks.sink;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.operators.MailboxExecutor;
+import org.apache.flink.api.common.operators.ProcessingTimeService;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.apache.flink.util.SimpleUserCodeClassLoader;
+import org.apache.flink.util.UserCodeClassLoader;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.type.TypeReference;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+
+import com.starrocks.connector.flink.table.data.StarRocksRowData;
+import com.starrocks.connector.flink.table.sink.StarRocksSinkOptions;
+import com.starrocks.connector.flink.table.sink.v2.DefaultStarRocksSinkContext;
+import com.ververica.cdc.common.data.DecimalData;
+import com.ververica.cdc.common.data.LocalZonedTimestampData;
+import com.ververica.cdc.common.data.TimestampData;
+import com.ververica.cdc.common.data.binary.BinaryStringData;
+import com.ververica.cdc.common.event.AddColumnEvent;
+import com.ververica.cdc.common.event.CreateTableEvent;
+import com.ververica.cdc.common.event.DataChangeEvent;
+import com.ververica.cdc.common.event.DropColumnEvent;
+import com.ververica.cdc.common.event.TableId;
+import com.ververica.cdc.common.schema.Column;
+import com.ververica.cdc.common.schema.Schema;
+import com.ververica.cdc.common.types.BooleanType;
+import com.ververica.cdc.common.types.DataType;
+import com.ververica.cdc.common.types.DateType;
+import com.ververica.cdc.common.types.DecimalType;
+import com.ververica.cdc.common.types.FloatType;
+import com.ververica.cdc.common.types.IntType;
+import com.ververica.cdc.common.types.LocalZonedTimestampType;
+import com.ververica.cdc.common.types.SmallIntType;
+import com.ververica.cdc.common.types.TimestampType;
+import com.ververica.cdc.common.types.VarCharType;
+import com.ververica.cdc.common.utils.SchemaUtils;
+import com.ververica.cdc.connectors.starrocks.sink.EventRecordSerializationSchema;
+import com.ververica.cdc.runtime.typeutils.BinaryRecordDataGenerator;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.math.BigDecimal;
+import java.sql.Timestamp;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.time.ZoneOffset;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.OptionalLong;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+/** Tests for {@link EventRecordSerializationSchema}. */
+public class EventRecordSerializationSchemaTest {
+
+ private EventRecordSerializationSchema serializer;
+ private ObjectMapper objectMapper;
+
+ @Before
+ public void setup() {
+ this.serializer = new EventRecordSerializationSchema(ZoneId.of("+08"));
+ this.serializer.open(
+ new MockInitializationContext(),
+ new DefaultStarRocksSinkContext(
+ new MockInitContext(),
+ new StarRocksSinkOptions(new Configuration(), new HashMap<>())));
+ this.objectMapper = new ObjectMapper();
+ }
+
+ @After
+ public void teardown() {
+ this.serializer.close();
+ }
+
+ @Test
+ public void testMixedSchemaAndDataChanges() throws Exception {
+ // 1. create table1, and insert/delete/update data
+ TableId table1 = TableId.parse("test.tbl1");
+ Schema schema1 =
+ Schema.newBuilder()
+ .physicalColumn("col1", new IntType())
+ .physicalColumn("col2", new BooleanType())
+ .physicalColumn("col3", new TimestampType())
+ .primaryKey("col1")
+ .build();
+ CreateTableEvent createTableEvent1 = new CreateTableEvent(table1, schema1);
+ assertNull(serializer.serialize(createTableEvent1));
+
+ BinaryRecordDataGenerator generator1 =
+ new BinaryRecordDataGenerator(
+ schema1.getColumnDataTypes().toArray(new DataType[0]));
+ DataChangeEvent insertEvent1 =
+ DataChangeEvent.insertEvent(
+ table1,
+ generator1.generate(
+ new Object[] {
+ 1,
+ true,
+ TimestampData.fromTimestamp(
+ Timestamp.valueOf("2023-11-27 18:00:00"))
+ }));
+ verifySerializeResult(
+ table1,
+ "{\"col1\":1,\"col2\":true,\"col3\":\"2023-11-27 18:00:00\",\"__op\":0}",
+ serializer.serialize(insertEvent1));
+
+ DataChangeEvent deleteEvent1 =
+ DataChangeEvent.deleteEvent(
+ table1,
+ generator1.generate(
+ new Object[] {
+ 2,
+ false,
+ TimestampData.fromTimestamp(
+ Timestamp.valueOf("2023-11-27 19:00:00"))
+ }));
+ verifySerializeResult(
+ table1,
+ "{\"col1\":2,\"col2\":false,\"col3\":\"2023-11-27 19:00:00\",\"__op\":1}",
+ serializer.serialize(deleteEvent1));
+
+ DataChangeEvent updateEvent1 =
+ DataChangeEvent.updateEvent(
+ table1,
+ generator1.generate(
+ new Object[] {
+ 3,
+ false,
+ TimestampData.fromTimestamp(
+ Timestamp.valueOf("2023-11-27 20:00:00"))
+ }),
+ generator1.generate(
+ new Object[] {
+ 3,
+ true,
+ TimestampData.fromTimestamp(
+ Timestamp.valueOf("2023-11-27 21:00:00"))
+ }));
+ verifySerializeResult(
+ table1,
+ "{\"col1\":3,\"col2\":true,\"col3\":\"2023-11-27 21:00:00\",\"__op\":0}",
+ serializer.serialize(updateEvent1));
+
+ // 2. create table2, and insert data
+ TableId table2 = TableId.parse("test.tbl2");
+ Schema schema2 =
+ Schema.newBuilder()
+ .physicalColumn("col1", new DateType())
+ .physicalColumn("col2", new FloatType())
+ .physicalColumn("col3", new VarCharType(20))
+ .primaryKey("col1")
+ .build();
+ CreateTableEvent createTableEvent2 = new CreateTableEvent(table2, schema2);
+ assertNull(serializer.serialize(createTableEvent2));
+
+ BinaryRecordDataGenerator generator2 =
+ new BinaryRecordDataGenerator(
+ schema2.getColumnDataTypes().toArray(new DataType[0]));
+ DataChangeEvent insertEvent2 =
+ DataChangeEvent.insertEvent(
+ table2,
+ generator2.generate(
+ new Object[] {
+ (int) LocalDate.of(2023, 11, 27).toEpochDay(),
+ 3.4f,
+ BinaryStringData.fromString("insert table2")
+ }));
+ verifySerializeResult(
+ table2,
+ "{\"col1\":\"2023-11-27\",\"col2\":3.4,\"col3\":\"insert table2\",\"__op\":0}",
+ serializer.serialize(insertEvent2));
+
+ // 3. add columns to table1, and delete data
+ AddColumnEvent addColumnEvent =
+ new AddColumnEvent(
+ table1,
+ Arrays.asList(
+ new AddColumnEvent.ColumnWithPosition(
+ Column.physicalColumn("col4", new DecimalType(20, 5))),
+ new AddColumnEvent.ColumnWithPosition(
+ Column.physicalColumn("col5", new SmallIntType())),
+ new AddColumnEvent.ColumnWithPosition(
+ Column.physicalColumn(
+ "col6", new LocalZonedTimestampType()))));
+ Schema newSchema1 = SchemaUtils.applySchemaChangeEvent(schema1, addColumnEvent);
+ BinaryRecordDataGenerator newGenerator1 =
+ new BinaryRecordDataGenerator(
+ newSchema1.getColumnDataTypes().toArray(new DataType[0]));
+ assertNull(serializer.serialize(addColumnEvent));
+
+ DataChangeEvent deleteEvent2 =
+ DataChangeEvent.deleteEvent(
+ table1,
+ newGenerator1.generate(
+ new Object[] {
+ 4,
+ true,
+ TimestampData.fromTimestamp(
+ Timestamp.valueOf("2023-11-27 21:00:00")),
+ DecimalData.fromBigDecimal(new BigDecimal("83.23"), 20, 5),
+ (short) 9,
+ LocalZonedTimestampData.fromInstant(
+ LocalDateTime.of(2023, 11, 27, 21, 0, 0)
+ .toInstant(ZoneOffset.of("+10")))
+ }));
+ verifySerializeResult(
+ table1,
+ "{\"col1\":4,\"col2\":true,\"col3\":\"2023-11-27 21:00:00\",\"col4\":83.23,\"col5\":9,\"col6\":\"2023-11-27 19:00:00\",\"__op\":1}",
+ serializer.serialize(deleteEvent2));
+
+ // 4. drop columns from table2, and insert data
+ DropColumnEvent dropColumnEvent =
+ new DropColumnEvent(
+ table2,
+ Arrays.asList(
+ Column.physicalColumn("col2", new FloatType()),
+ Column.physicalColumn("col3", new VarCharType(20))));
+ Schema newSchema2 = SchemaUtils.applySchemaChangeEvent(schema2, dropColumnEvent);
+ BinaryRecordDataGenerator newGenerator2 =
+ new BinaryRecordDataGenerator(
+ newSchema2.getColumnDataTypes().toArray(new DataType[0]));
+ assertNull(serializer.serialize(dropColumnEvent));
+
+ DataChangeEvent insertEvent3 =
+ DataChangeEvent.insertEvent(
+ table2,
+ newGenerator2.generate(
+ new Object[] {(int) LocalDate.of(2023, 11, 28).toEpochDay()}));
+ verifySerializeResult(
+ table2, "{\"col1\":\"2023-11-28\",\"__op\":0}", serializer.serialize(insertEvent3));
+ }
+
+ private void verifySerializeResult(
+ TableId expectTable, String expectRow, StarRocksRowData actualRowData)
+ throws Exception {
+ assertEquals(expectTable.getSchemaName(), actualRowData.getDatabase());
+ assertEquals(expectTable.getTableName(), actualRowData.getTable());
+ SortedMap expectMap =
+ objectMapper.readValue(expectRow, new TypeReference>() {});
+ SortedMap actualMap =
+ actualRowData.getRow() == null
+ ? null
+ : objectMapper.readValue(
+ actualRowData.getRow(),
+ new TypeReference>() {});
+ assertEquals(expectMap, actualMap);
+ }
+
+ /** A mock context for serialization schema testing. */
+ private static class MockInitializationContext
+ implements SerializationSchema.InitializationContext {
+
+ @Override
+ public MetricGroup getMetricGroup() {
+ return new UnregisteredMetricsGroup();
+ }
+
+ @Override
+ public UserCodeClassLoader getUserCodeClassLoader() {
+ return SimpleUserCodeClassLoader.create(
+ MockInitializationContext.class.getClassLoader());
+ }
+ }
+
+ private static class MockInitContext implements Sink.InitContext {
+
+ @Override
+ public UserCodeClassLoader getUserCodeClassLoader() {
+ return SimpleUserCodeClassLoader.create(MockInitContext.class.getClassLoader());
+ }
+
+ @Override
+ public MailboxExecutor getMailboxExecutor() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public ProcessingTimeService getProcessingTimeService() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public int getSubtaskId() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public int getNumberOfParallelSubtasks() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public int getAttemptNumber() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public SinkWriterMetricGroup metricGroup() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public OptionalLong getRestoredCheckpointId() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public SerializationSchema.InitializationContext
+ asSerializationSchemaInitializationContext() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean isObjectReuseEnabled() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public TypeSerializer createInputSerializer() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public JobID getJobId() {
+ throw new UnsupportedOperationException();
+ }
+ }
+}
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/com/ververical/cdc/connectors/starrocks/sink/MockStarRocksCatalog.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/com/ververical/cdc/connectors/starrocks/sink/MockStarRocksCatalog.java
new file mode 100644
index 0000000000..7d152755e2
--- /dev/null
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/com/ververical/cdc/connectors/starrocks/sink/MockStarRocksCatalog.java
@@ -0,0 +1,197 @@
+/*
+ * Copyright 2023 Ververica Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.ververical.cdc.connectors.starrocks.sink;
+
+import com.starrocks.connector.flink.catalog.StarRocksCatalog;
+import com.starrocks.connector.flink.catalog.StarRocksCatalogException;
+import com.starrocks.connector.flink.catalog.StarRocksColumn;
+import com.starrocks.connector.flink.catalog.StarRocksTable;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+/** Mock {@link StarRocksCatalog} for testing. */
+public class MockStarRocksCatalog extends StarRocksCatalog {
+
+ /** database name -> table name -> table. */
+ private final Map> tables;
+
+ public MockStarRocksCatalog() {
+ super("jdbc:mysql://127.0.0.1:9030", "root", "");
+ this.tables = new HashMap<>();
+ }
+
+ @Override
+ public void open() throws StarRocksCatalogException {
+ // do nothing
+ }
+
+ @Override
+ public void close() throws StarRocksCatalogException {
+ // do nothing
+ }
+
+ @Override
+ public boolean databaseExists(String databaseName) throws StarRocksCatalogException {
+ return tables.containsKey(databaseName);
+ }
+
+ @Override
+ public void createDatabase(String databaseName, boolean ignoreIfExists)
+ throws StarRocksCatalogException {
+ if (!tables.containsKey(databaseName)) {
+ tables.put(databaseName, new HashMap<>());
+ } else if (!ignoreIfExists) {
+ throw new StarRocksCatalogException(
+ String.format("database %s already exists", databaseName));
+ }
+ }
+
+ @Override
+ public Optional getTable(String databaseName, String tableName)
+ throws StarRocksCatalogException {
+ if (!tables.containsKey(databaseName)) {
+ return Optional.empty();
+ }
+
+ StarRocksTable table = tables.get(databaseName).get(tableName);
+ return Optional.ofNullable(table);
+ }
+
+ @Override
+ public void createTable(StarRocksTable table, boolean ignoreIfExists)
+ throws StarRocksCatalogException {
+ String databaseName = table.getDatabaseName();
+ String tableName = table.getTableName();
+ Map dbTables = tables.get(databaseName);
+ if (dbTables == null) {
+ throw new StarRocksCatalogException(
+ String.format("database %s does not exist", databaseName));
+ }
+
+ StarRocksTable oldTable = dbTables.get(tableName);
+ if (oldTable == null) {
+ dbTables.put(tableName, table);
+ } else if (!ignoreIfExists) {
+ throw new StarRocksCatalogException(
+ String.format("table %s.%s already exists", databaseName, tableName));
+ }
+ }
+
+ @Override
+ public void alterAddColumns(
+ String databaseName,
+ String tableName,
+ List addColumns,
+ long timeoutSecond)
+ throws StarRocksCatalogException {
+ Map dbTables = tables.get(databaseName);
+ if (dbTables == null) {
+ throw new StarRocksCatalogException(
+ String.format("database %s does not exist", databaseName));
+ }
+
+ StarRocksTable oldTable = dbTables.get(tableName);
+ if (oldTable == null) {
+ throw new StarRocksCatalogException(
+ String.format("table %s.%s does not exist", databaseName, tableName));
+ }
+
+ List newColumns = new ArrayList<>(oldTable.getColumns());
+ for (StarRocksColumn column : addColumns) {
+ StarRocksColumn newColumn =
+ new StarRocksColumn.Builder()
+ .setColumnName(column.getColumnName())
+ .setOrdinalPosition(newColumns.size())
+ .setDataType(column.getDataType())
+ .setNullable(column.isNullable())
+ .setDefaultValue(column.getDefaultValue().orElse(null))
+ .setColumnSize(column.getColumnSize().orElse(null))
+ .setDecimalDigits(column.getDecimalDigits().orElse(null))
+ .setColumnComment(column.getColumnComment().orElse(null))
+ .build();
+ newColumns.add(newColumn);
+ }
+
+ StarRocksTable newTable =
+ new StarRocksTable.Builder()
+ .setDatabaseName(oldTable.getDatabaseName())
+ .setTableName(oldTable.getTableName())
+ .setTableType(oldTable.getTableType())
+ .setColumns(newColumns)
+ .setTableKeys(oldTable.getTableKeys().orElse(null))
+ .setDistributionKeys(oldTable.getDistributionKeys().orElse(null))
+ .setNumBuckets(oldTable.getNumBuckets().orElse(null))
+ .setComment(oldTable.getComment().orElse(null))
+ .setTableProperties(oldTable.getProperties())
+ .build();
+ dbTables.put(tableName, newTable);
+ }
+
+ @Override
+ public void alterDropColumns(
+ String databaseName, String tableName, List dropColumns, long timeoutSecond)
+ throws StarRocksCatalogException {
+ Map dbTables = tables.get(databaseName);
+ if (dbTables == null) {
+ throw new StarRocksCatalogException(
+ String.format("database %s does not exist", databaseName));
+ }
+
+ StarRocksTable oldTable = dbTables.get(tableName);
+ if (oldTable == null) {
+ throw new StarRocksCatalogException(
+ String.format("table %s.%s does not exist", databaseName, tableName));
+ }
+
+ List newColumns = new ArrayList<>();
+ for (StarRocksColumn column : oldTable.getColumns()) {
+ if (dropColumns.contains(column.getColumnName())) {
+ continue;
+ }
+ StarRocksColumn newColumn =
+ new StarRocksColumn.Builder()
+ .setColumnName(column.getColumnName())
+ .setOrdinalPosition(newColumns.size())
+ .setDataType(column.getDataType())
+ .setNullable(column.isNullable())
+ .setDefaultValue(column.getDefaultValue().orElse(null))
+ .setColumnSize(column.getColumnSize().orElse(null))
+ .setDecimalDigits(column.getDecimalDigits().orElse(null))
+ .setColumnComment(column.getColumnComment().orElse(null))
+ .build();
+ newColumns.add(newColumn);
+ }
+
+ StarRocksTable newTable =
+ new StarRocksTable.Builder()
+ .setDatabaseName(oldTable.getDatabaseName())
+ .setTableName(oldTable.getTableName())
+ .setTableType(oldTable.getTableType())
+ .setColumns(newColumns)
+ .setTableKeys(oldTable.getTableKeys().orElse(null))
+ .setDistributionKeys(oldTable.getDistributionKeys().orElse(null))
+ .setNumBuckets(oldTable.getNumBuckets().orElse(null))
+ .setComment(oldTable.getComment().orElse(null))
+ .setTableProperties(oldTable.getProperties())
+ .build();
+ dbTables.put(tableName, newTable);
+ }
+}
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/com/ververical/cdc/connectors/starrocks/sink/StarRocksDataSinkFactoryTest.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/com/ververical/cdc/connectors/starrocks/sink/StarRocksDataSinkFactoryTest.java
new file mode 100644
index 0000000000..367e6bac48
--- /dev/null
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/com/ververical/cdc/connectors/starrocks/sink/StarRocksDataSinkFactoryTest.java
@@ -0,0 +1,55 @@
+/*
+ * Copyright 2023 Ververica Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.ververical.cdc.connectors.starrocks.sink;
+
+import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableMap;
+
+import com.ververica.cdc.common.configuration.Configuration;
+import com.ververica.cdc.common.factories.DataSinkFactory;
+import com.ververica.cdc.common.factories.FactoryHelper;
+import com.ververica.cdc.common.sink.DataSink;
+import com.ververica.cdc.composer.utils.FactoryDiscoveryUtils;
+import com.ververica.cdc.connectors.starrocks.sink.StarRocksDataSink;
+import com.ververica.cdc.connectors.starrocks.sink.StarRocksDataSinkFactory;
+import org.junit.Test;
+
+import static org.junit.Assert.assertTrue;
+
+/** Tests for {@link StarRocksDataSinkFactory}. */
+public class StarRocksDataSinkFactoryTest {
+
+ @Test
+ public void testCreateDataSink() {
+ DataSinkFactory sinkFactory =
+ FactoryDiscoveryUtils.getFactoryByIdentifier("starrocks", DataSinkFactory.class);
+ assertTrue(sinkFactory instanceof StarRocksDataSinkFactory);
+
+ Configuration conf =
+ Configuration.fromMap(
+ ImmutableMap.builder()
+ .put("jdbc-url", "jdbc:mysql://127.0.0.1:9030")
+ .put("load-url", "127.0.0.1:8030")
+ .put("username", "root")
+ .put("password", "")
+ .build());
+ DataSink dataSink =
+ sinkFactory.createDataSink(
+ new FactoryHelper.DefaultContext(
+ conf, conf, Thread.currentThread().getContextClassLoader()));
+ assertTrue(dataSink instanceof StarRocksDataSink);
+ }
+}
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/com/ververical/cdc/connectors/starrocks/sink/StarRocksMetadataApplierTest.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/com/ververical/cdc/connectors/starrocks/sink/StarRocksMetadataApplierTest.java
new file mode 100644
index 0000000000..8430ce25f8
--- /dev/null
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/com/ververical/cdc/connectors/starrocks/sink/StarRocksMetadataApplierTest.java
@@ -0,0 +1,235 @@
+/*
+ * Copyright 2023 Ververica Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.ververical.cdc.connectors.starrocks.sink;
+
+import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableMap;
+
+import com.starrocks.connector.flink.catalog.StarRocksColumn;
+import com.starrocks.connector.flink.catalog.StarRocksTable;
+import com.ververica.cdc.common.configuration.Configuration;
+import com.ververica.cdc.common.event.AddColumnEvent;
+import com.ververica.cdc.common.event.CreateTableEvent;
+import com.ververica.cdc.common.event.DropColumnEvent;
+import com.ververica.cdc.common.event.TableId;
+import com.ververica.cdc.common.schema.Column;
+import com.ververica.cdc.common.schema.Schema;
+import com.ververica.cdc.common.types.BooleanType;
+import com.ververica.cdc.common.types.DecimalType;
+import com.ververica.cdc.common.types.IntType;
+import com.ververica.cdc.common.types.SmallIntType;
+import com.ververica.cdc.common.types.TimestampType;
+import com.ververica.cdc.connectors.starrocks.sink.SchemaChangeConfig;
+import com.ververica.cdc.connectors.starrocks.sink.StarRocksMetadataApplier;
+import com.ververica.cdc.connectors.starrocks.sink.TableCreateConfig;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static com.ververica.cdc.connectors.starrocks.sink.StarRocksDataSinkOptions.TABLE_CREATE_NUM_BUCKETS;
+import static com.ververica.cdc.connectors.starrocks.sink.StarRocksDataSinkOptions.TABLE_SCHEMA_CHANGE_TIMEOUT;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+/** Tests for {@link StarRocksMetadataApplier}. */
+public class StarRocksMetadataApplierTest {
+
+ private MockStarRocksCatalog catalog;
+ private StarRocksMetadataApplier metadataApplier;
+
+ @Before
+ public void setup() {
+ Configuration configuration =
+ Configuration.fromMap(
+ ImmutableMap.builder()
+ .put(TABLE_SCHEMA_CHANGE_TIMEOUT.key(), "100s")
+ .put(TABLE_CREATE_NUM_BUCKETS.key(), "10")
+ .put("table.create.properties.replication_num", "5")
+ .build());
+ SchemaChangeConfig schemaChangeConfig = SchemaChangeConfig.from(configuration);
+ TableCreateConfig tableCreateConfig = TableCreateConfig.from(configuration);
+ this.catalog = new MockStarRocksCatalog();
+ this.metadataApplier =
+ new StarRocksMetadataApplier(catalog, tableCreateConfig, schemaChangeConfig);
+ }
+
+ @Test
+ public void testCreateTable() throws Exception {
+ TableId tableId = TableId.parse("test.tbl1");
+ Schema schema =
+ Schema.newBuilder()
+ .physicalColumn("col1", new IntType())
+ .physicalColumn("col2", new BooleanType())
+ .physicalColumn("col3", new TimestampType())
+ .primaryKey("col1")
+ .build();
+ CreateTableEvent createTableEvent = new CreateTableEvent(tableId, schema);
+ metadataApplier.applySchemaChange(createTableEvent);
+
+ StarRocksTable actualTable =
+ catalog.getTable(tableId.getSchemaName(), tableId.getTableName()).orElse(null);
+ assertNotNull(actualTable);
+
+ List columns = new ArrayList<>();
+ columns.add(
+ new StarRocksColumn.Builder()
+ .setColumnName("col1")
+ .setOrdinalPosition(0)
+ .setDataType("int")
+ .setNullable(true)
+ .build());
+ columns.add(
+ new StarRocksColumn.Builder()
+ .setColumnName("col2")
+ .setOrdinalPosition(1)
+ .setDataType("boolean")
+ .setNullable(true)
+ .build());
+ columns.add(
+ new StarRocksColumn.Builder()
+ .setColumnName("col3")
+ .setOrdinalPosition(2)
+ .setDataType("datetime")
+ .setNullable(true)
+ .build());
+ StarRocksTable expectTable =
+ new StarRocksTable.Builder()
+ .setDatabaseName(tableId.getSchemaName())
+ .setTableName(tableId.getTableName())
+ .setTableType(StarRocksTable.TableType.PRIMARY_KEY)
+ .setColumns(columns)
+ .setTableKeys(schema.primaryKeys())
+ .setDistributionKeys(schema.primaryKeys())
+ .setNumBuckets(10)
+ .setTableProperties(Collections.singletonMap("replication_num", "5"))
+ .build();
+ assertEquals(expectTable, actualTable);
+ }
+
+ @Test
+ public void testAddColumn() throws Exception {
+ TableId tableId = TableId.parse("test.tbl2");
+ Schema schema =
+ Schema.newBuilder()
+ .physicalColumn("col1", new IntType())
+ .primaryKey("col1")
+ .build();
+ CreateTableEvent createTableEvent = new CreateTableEvent(tableId, schema);
+ metadataApplier.applySchemaChange(createTableEvent);
+
+ AddColumnEvent addColumnEvent =
+ new AddColumnEvent(
+ tableId,
+ Arrays.asList(
+ new AddColumnEvent.ColumnWithPosition(
+ Column.physicalColumn("col2", new DecimalType(20, 5))),
+ new AddColumnEvent.ColumnWithPosition(
+ Column.physicalColumn("col3", new SmallIntType()))));
+ metadataApplier.applySchemaChange(addColumnEvent);
+
+ StarRocksTable actualTable =
+ catalog.getTable(tableId.getSchemaName(), tableId.getTableName()).orElse(null);
+ assertNotNull(actualTable);
+
+ List columns = new ArrayList<>();
+ columns.add(
+ new StarRocksColumn.Builder()
+ .setColumnName("col1")
+ .setOrdinalPosition(0)
+ .setDataType("int")
+ .setNullable(true)
+ .build());
+ columns.add(
+ new StarRocksColumn.Builder()
+ .setColumnName("col2")
+ .setOrdinalPosition(1)
+ .setDataType("decimal")
+ .setColumnSize(20)
+ .setDecimalDigits(5)
+ .setNullable(true)
+ .build());
+ columns.add(
+ new StarRocksColumn.Builder()
+ .setColumnName("col3")
+ .setOrdinalPosition(2)
+ .setDataType("smallint")
+ .setNullable(true)
+ .build());
+ StarRocksTable expectTable =
+ new StarRocksTable.Builder()
+ .setDatabaseName(tableId.getSchemaName())
+ .setTableName(tableId.getTableName())
+ .setTableType(StarRocksTable.TableType.PRIMARY_KEY)
+ .setColumns(columns)
+ .setTableKeys(schema.primaryKeys())
+ .setDistributionKeys(schema.primaryKeys())
+ .setNumBuckets(10)
+ .setTableProperties(Collections.singletonMap("replication_num", "5"))
+ .build();
+ assertEquals(expectTable, actualTable);
+ }
+
+ @Test
+ public void testDropColumn() throws Exception {
+ TableId tableId = TableId.parse("test.tbl3");
+ Schema schema =
+ Schema.newBuilder()
+ .physicalColumn("col1", new IntType())
+ .physicalColumn("col2", new BooleanType())
+ .physicalColumn("col3", new TimestampType())
+ .primaryKey("col1")
+ .build();
+ CreateTableEvent createTableEvent = new CreateTableEvent(tableId, schema);
+ metadataApplier.applySchemaChange(createTableEvent);
+
+ DropColumnEvent dropColumnEvent =
+ new DropColumnEvent(
+ tableId,
+ Arrays.asList(
+ Column.physicalColumn("col2", new BooleanType()),
+ Column.physicalColumn("col3", new TimestampType())));
+ metadataApplier.applySchemaChange(dropColumnEvent);
+
+ StarRocksTable actualTable =
+ catalog.getTable(tableId.getSchemaName(), tableId.getTableName()).orElse(null);
+ assertNotNull(actualTable);
+
+ List columns = new ArrayList<>();
+ columns.add(
+ new StarRocksColumn.Builder()
+ .setColumnName("col1")
+ .setOrdinalPosition(0)
+ .setDataType("int")
+ .setNullable(true)
+ .build());
+ StarRocksTable expectTable =
+ new StarRocksTable.Builder()
+ .setDatabaseName(tableId.getSchemaName())
+ .setTableName(tableId.getTableName())
+ .setTableType(StarRocksTable.TableType.PRIMARY_KEY)
+ .setColumns(columns)
+ .setTableKeys(schema.primaryKeys())
+ .setDistributionKeys(schema.primaryKeys())
+ .setNumBuckets(10)
+ .setTableProperties(Collections.singletonMap("replication_num", "5"))
+ .build();
+ assertEquals(expectTable, actualTable);
+ }
+}
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/resources/log4j2-test.properties b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/resources/log4j2-test.properties
new file mode 100644
index 0000000000..532eeec6e2
--- /dev/null
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/resources/log4j2-test.properties
@@ -0,0 +1,26 @@
+################################################################################
+# Copyright 2023 Ververica Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Set root logger level to OFF to not flood build logs
+# set manually to INFO for debugging purposes
+rootLogger.level=OFF
+rootLogger.appenderRef.test.ref = TestLogger
+
+appender.testlogger.name = TestLogger
+appender.testlogger.type = CONSOLE
+appender.testlogger.target = SYSTEM_ERR
+appender.testlogger.layout.type = PatternLayout
+appender.testlogger.layout.pattern = %-4r [%t] %-5p %c - %m%n
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/com/ververica/cdc/connectors/values/ValuesDatabase.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/com/ververica/cdc/connectors/values/ValuesDatabase.java
index 0c1be06217..3bfc3654bf 100644
--- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/com/ververica/cdc/connectors/values/ValuesDatabase.java
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/com/ververica/cdc/connectors/values/ValuesDatabase.java
@@ -332,13 +332,13 @@ private void applyAddColumnEvent(AddColumnEvent event) {
case BEFORE:
{
int index = columns.indexOf(columnWithPosition.getExistingColumn());
- columns.add(index, columnWithPosition.getExistingColumn());
+ columns.add(index, columnWithPosition.getAddColumn());
break;
}
case AFTER:
{
int index = columns.indexOf(columnWithPosition.getExistingColumn());
- columns.add(index + 1, columnWithPosition.getExistingColumn());
+ columns.add(index + 1, columnWithPosition.getAddColumn());
break;
}
}
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/test/resources/log4j2-test.properties b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/test/resources/log4j2-test.properties
index a9d045e0ef..532eeec6e2 100644
--- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/test/resources/log4j2-test.properties
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/test/resources/log4j2-test.properties
@@ -16,7 +16,7 @@
# Set root logger level to OFF to not flood build logs
# set manually to INFO for debugging purposes
-rootLogger.level=INFO
+rootLogger.level=OFF
rootLogger.appenderRef.test.ref = TestLogger
appender.testlogger.name = TestLogger
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/pom.xml b/flink-cdc-connect/flink-cdc-pipeline-connectors/pom.xml
index c3ef405ad5..7eb2b21d40 100644
--- a/flink-cdc-connect/flink-cdc-pipeline-connectors/pom.xml
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/pom.xml
@@ -27,6 +27,7 @@ under the License.
pom
flink-cdc-pipeline-connector-values
+ flink-cdc-pipeline-connector-starrocks
diff --git a/pom.xml b/pom.xml
index e270e21e72..191b90a135 100644
--- a/pom.xml
+++ b/pom.xml
@@ -72,6 +72,7 @@ under the License.
1.18.0
+ 1.18
1.9.7.Final
3.2.0
2.2.0