From 65bdbfd7943b816a48a1222a71d95b476dbc53d2 Mon Sep 17 00:00:00 2001 From: zhaomin Date: Mon, 24 Jul 2023 16:34:41 +0800 Subject: [PATCH] [docs][oracle] Add an example of Incremental Snapshot based DataStream for Oracle CDC Connector This closes #2325. Co-authored-by: skylines <34996528+gtk96@users.noreply.github.com> --- docs/content/connectors/oracle-cdc.md | 58 ++++++++++++++++++++++++++- 1 file changed, 57 insertions(+), 1 deletion(-) diff --git a/docs/content/connectors/oracle-cdc.md b/docs/content/connectors/oracle-cdc.md index 405c558c7c..9e576e242d 100644 --- a/docs/content/connectors/oracle-cdc.md +++ b/docs/content/connectors/oracle-cdc.md @@ -469,7 +469,63 @@ The Oracle CDC source can't work in parallel reading, because there is only one ### DataStream Source -The Oracle CDC connector can also be a DataStream source. You can create a SourceFunction as the following shows: +The Oracle CDC connector can also be a DataStream source. There are two modes for the DataStream source: + +- incremental snapshot based, which allows parallel reading +- SourceFunction based, which only supports single thread reading + +#### Incremental Snapshot based DataStream (Experimental) + +```java +import com.ververica.cdc.connectors.base.options.StartupOptions; +import com.ververica.cdc.connectors.base.source.jdbc.JdbcIncrementalSource; +import com.ververica.cdc.connectors.oracle.source.OracleSourceBuilder; +import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; + +import java.util.Properties; + +public class OracleParallelSourceExample { + + public static void main(String[] args) throws Exception { + Properties debeziumProperties = new Properties(); + debeziumProperties.setProperty("log.mining.strategy", "online_catalog"); + debeziumProperties.setProperty("log.mining.continuous.mine", "true"); + + JdbcIncrementalSource oracleChangeEventSource = + new OracleSourceBuilder() + .hostname("host") + .port(1521) + .databaseList("XE") + .schemaList("DEBEZIUM") + .tableList("DEBEZIUM.PRODUCTS") + .username("username") + .password("password") + .deserializer(new JsonDebeziumDeserializationSchema()) + .includeSchemaChanges(true) // output the schema changes as well + .startupOptions(StartupOptions.initial()) + .debeziumProperties(debeziumProperties) + .splitSize(2) + .build(); + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + // enable checkpoint + env.enableCheckpointing(3000L); + // set the source parallelism to 4 + env.fromSource( + oracleChangeEventSource, + WatermarkStrategy.noWatermarks(), + "OracleParallelSource") + .setParallelism(4) + .print() + .setParallelism(1); + env.execute("Print Oracle Snapshot + RedoLog"); + } +} +``` + +#### SourceFunction-based DataStream ```java import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;