diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/com/ververica/cdc/connectors/starrocks/sink/StarRocksDataSinkFactory.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/com/ververica/cdc/connectors/starrocks/sink/StarRocksDataSinkFactory.java index 9a32c009f0..48754b2478 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/com/ververica/cdc/connectors/starrocks/sink/StarRocksDataSinkFactory.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/com/ververica/cdc/connectors/starrocks/sink/StarRocksDataSinkFactory.java @@ -101,6 +101,14 @@ private StarRocksSinkOptions buildSinkConnectorOptions(Configuration cdcConfig) .ifPresent( config -> sinkConfig.set(StarRocksSinkOptions.SINK_IO_THREAD_COUNT, config)); + cdcConfig + .getOptional(SINK_AT_LEAST_ONCE_USE_TRANSACTION_LOAD) + .ifPresent( + config -> + sinkConfig.set( + StarRocksSinkOptions + .SINK_AT_LEAST_ONCE_USE_TRANSACTION_LOAD, + config)); cdcConfig .getOptional(SINK_METRIC_HISTOGRAM_WINDOW_SIZE) .ifPresent( @@ -112,6 +120,8 @@ private StarRocksSinkOptions buildSinkConnectorOptions(Configuration cdcConfig) sinkConfig.set(StarRocksSinkOptions.DATABASE_NAME, "*"); sinkConfig.set(StarRocksSinkOptions.TABLE_NAME, "*"); sinkConfig.set(StarRocksSinkOptions.SINK_USE_NEW_SINK_API, true); + // currently cdc framework only supports at-least-once + sinkConfig.set(StarRocksSinkOptions.SINK_SEMANTIC, "at-least-once"); Map streamProperties = getPrefixConfigs(cdcConfig.toMap(), SINK_PROPERTIES_PREFIX);