Skip to content

Commit

Permalink
[docs][oracle] Add an example of Incremental Snapshot based DataStrea…
Browse files Browse the repository at this point in the history
…m for Oracle CDC Connector

This closes apache#2325.

Co-authored-by: skylines <[email protected]>
  • Loading branch information
zhaomin1423 and gtk96 authored Jul 24, 2023
1 parent a8dd4d3 commit 65bdbfd
Showing 1 changed file with 57 additions and 1 deletion.
58 changes: 57 additions & 1 deletion docs/content/connectors/oracle-cdc.md
Original file line number Diff line number Diff line change
Expand Up @@ -469,7 +469,63 @@ The Oracle CDC source can't work in parallel reading, because there is only one
### DataStream Source
The Oracle CDC connector can also be a DataStream source. You can create a SourceFunction as the following shows:
The Oracle CDC connector can also be a DataStream source. There are two modes for the DataStream source:
- incremental snapshot based, which allows parallel reading
- SourceFunction based, which only supports single thread reading
#### Incremental Snapshot based DataStream (Experimental)
```java
import com.ververica.cdc.connectors.base.options.StartupOptions;
import com.ververica.cdc.connectors.base.source.jdbc.JdbcIncrementalSource;
import com.ververica.cdc.connectors.oracle.source.OracleSourceBuilder;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.Properties;
public class OracleParallelSourceExample {
public static void main(String[] args) throws Exception {
Properties debeziumProperties = new Properties();
debeziumProperties.setProperty("log.mining.strategy", "online_catalog");
debeziumProperties.setProperty("log.mining.continuous.mine", "true");
JdbcIncrementalSource<String> oracleChangeEventSource =
new OracleSourceBuilder()
.hostname("host")
.port(1521)
.databaseList("XE")
.schemaList("DEBEZIUM")
.tableList("DEBEZIUM.PRODUCTS")
.username("username")
.password("password")
.deserializer(new JsonDebeziumDeserializationSchema())
.includeSchemaChanges(true) // output the schema changes as well
.startupOptions(StartupOptions.initial())
.debeziumProperties(debeziumProperties)
.splitSize(2)
.build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// enable checkpoint
env.enableCheckpointing(3000L);
// set the source parallelism to 4
env.fromSource(
oracleChangeEventSource,
WatermarkStrategy.noWatermarks(),
"OracleParallelSource")
.setParallelism(4)
.print()
.setParallelism(1);
env.execute("Print Oracle Snapshot + RedoLog");
}
}
```
#### SourceFunction-based DataStream
```java
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
Expand Down

0 comments on commit 65bdbfd

Please sign in to comment.