From ac9687eba117221f4cbeb9187daa956de6d63b16 Mon Sep 17 00:00:00 2001 From: e-mhui Date: Thu, 17 Aug 2023 21:05:17 +0800 Subject: [PATCH 1/3] [oracle] Correct the naming error --- .../reader/fetch/OracleScanFetchTask.java | 44 +++++++++---------- .../reader/fetch/OracleStreamFetchTask.java | 18 ++++---- .../oracle/table/OracleTableSource.java | 2 +- 3 files changed, 32 insertions(+), 32 deletions(-) diff --git a/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/reader/fetch/OracleScanFetchTask.java b/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/reader/fetch/OracleScanFetchTask.java index 68a71d584f..18a6b31129 100644 --- a/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/reader/fetch/OracleScanFetchTask.java +++ b/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/reader/fetch/OracleScanFetchTask.java @@ -110,17 +110,17 @@ public void execute(Context context) throws Exception { sourceFetchContext.getPartition(), sourceFetchContext.getOffsetContext()); - final StreamSplit backfillBinlogSplit = + final StreamSplit backfillRedoLogSplit = createBackfillRedoLogSplit(changeEventSourceContext); - // optimization that skip the binlog read when the low watermark equals high + // optimization that skip the redo log read when the low watermark equals high // watermark - final boolean binlogBackfillRequired = - backfillBinlogSplit + final boolean redoLogBackfillRequired = + backfillRedoLogSplit .getEndingOffset() - .isAfter(backfillBinlogSplit.getStartingOffset()); - if (!binlogBackfillRequired) { - dispatchBinlogEndEvent( - backfillBinlogSplit, + .isAfter(backfillRedoLogSplit.getStartingOffset()); + if (!redoLogBackfillRequired) { + dispatchRedoLogEndEvent( + backfillRedoLogSplit, sourceFetchContext.getPartition().getSourcePartition(), ((OracleSourceFetchTaskContext) context).getDispatcher()); taskRunning = false; @@ -128,16 +128,16 @@ public void execute(Context context) throws Exception { } // execute redoLog read task if (snapshotResult.isCompletedOrSkipped()) { - final RedoLogSplitReadTask backfillBinlogReadTask = - createBackfillRedoLogReadTask(backfillBinlogSplit, sourceFetchContext); + final RedoLogSplitReadTask backfillRedoLogReadTask = + createBackfillRedoLogReadTask(backfillRedoLogSplit, sourceFetchContext); final LogMinerOracleOffsetContextLoader loader = new LogMinerOracleOffsetContextLoader( ((OracleSourceFetchTaskContext) context).getDbzConnectorConfig()); final OracleOffsetContext oracleOffsetContext = - loader.load(backfillBinlogSplit.getStartingOffset().getOffset()); - backfillBinlogReadTask.execute( - new SnapshotBinlogSplitChangeEventSourceContext(), + loader.load(backfillRedoLogSplit.getStartingOffset().getOffset()); + backfillRedoLogReadTask.execute( + new SnapshotRedoLogSplitChangeEventSourceContext(), sourceFetchContext.getPartition(), oracleOffsetContext); taskRunning = false; @@ -160,7 +160,7 @@ private StreamSplit createBackfillRedoLogSplit( } private RedoLogSplitReadTask createBackfillRedoLogReadTask( - StreamSplit backfillBinlogSplit, OracleSourceFetchTaskContext context) { + StreamSplit backfillRedoLogSplit, OracleSourceFetchTaskContext context) { // we should only capture events for the current table, // otherwise, we may can't find corresponding schema Configuration dezConf = @@ -171,7 +171,7 @@ private RedoLogSplitReadTask createBackfillRedoLogReadTask( // Disable heartbeat event in snapshot split fetcher .with(Heartbeat.HEARTBEAT_INTERVAL, 0) .build(); - // task to read binlog and backfill for current split + // task to read redo log and backfill for current split return new RedoLogSplitReadTask( new OracleConnectorConfig(dezConf), context.getConnection(), @@ -180,18 +180,18 @@ private RedoLogSplitReadTask createBackfillRedoLogReadTask( context.getDatabaseSchema(), context.getSourceConfig().getOriginDbzConnectorConfig(), context.getStreamingChangeEventSourceMetrics(), - backfillBinlogSplit); + backfillRedoLogSplit); } - private void dispatchBinlogEndEvent( - StreamSplit backFillBinlogSplit, + private void dispatchRedoLogEndEvent( + StreamSplit backFillRedoLogSplit, Map sourcePartition, JdbcSourceEventDispatcher eventDispatcher) throws InterruptedException { eventDispatcher.dispatchWatermarkEvent( sourcePartition, - backFillBinlogSplit, - backFillBinlogSplit.getEndingOffset(), + backFillRedoLogSplit, + backFillRedoLogSplit.getEndingOffset(), WatermarkKind.END); } @@ -446,10 +446,10 @@ public boolean isRunning() { } /** - * The {@link ChangeEventSource.ChangeEventSourceContext} implementation for bounded binlog task + * The {@link ChangeEventSource.ChangeEventSourceContext} implementation for bounded redo log task * of a snapshot split task. */ - public class SnapshotBinlogSplitChangeEventSourceContext + public class SnapshotRedoLogSplitChangeEventSourceContext implements ChangeEventSource.ChangeEventSourceContext { public void finished() { diff --git a/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/reader/fetch/OracleStreamFetchTask.java b/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/reader/fetch/OracleStreamFetchTask.java index a0e6d2305a..13d0759b3e 100644 --- a/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/reader/fetch/OracleStreamFetchTask.java +++ b/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/reader/fetch/OracleStreamFetchTask.java @@ -91,8 +91,8 @@ public void close() { } /** - * A wrapped task to read all binlog for table and also supports read bounded (from lowWatermark - * to highWatermark) binlog. + * A wrapped task to read all redo log for table and also supports read bounded (from lowWatermark + * to highWatermark) redo log. */ public static class RedoLogSplitReadTask extends LogMinerStreamingChangeEventSource { @@ -138,13 +138,13 @@ public void execute( protected void afterHandleScn( OraclePartition partition, OracleOffsetContext offsetContext) { super.afterHandleScn(partition, offsetContext); - // check do we need to stop for fetch binlog for snapshot split. + // check do we need to stop for fetch redo log for snapshot split. if (isBoundedRead()) { final RedoLogOffset currentRedoLogOffset = getCurrentRedoLogOffset(offsetContext.getOffset()); - // reach the high watermark, the binlog fetcher should be finished + // reach the high watermark, the redo log fetcher should be finished if (currentRedoLogOffset.isAtOrAfter(redoLogSplit.getEndingOffset())) { - // send binlog end event + // send redo log end event try { dispatcher.dispatchWatermarkEvent( partition.getSourcePartition(), @@ -154,10 +154,10 @@ protected void afterHandleScn( } catch (InterruptedException e) { LOG.error("Send signal event error.", e); errorHandler.setProducerThrowable( - new DebeziumException("Error processing binlog signal event", e)); + new DebeziumException("Error processing redo log signal event", e)); } - // tell fetcher the binlog task finished - ((OracleScanFetchTask.SnapshotBinlogSplitChangeEventSourceContext) context) + // tell fetcher the redo log task finished + ((OracleScanFetchTask.SnapshotRedoLogSplitChangeEventSourceContext) context) .finished(); } } @@ -179,7 +179,7 @@ public static RedoLogOffset getCurrentRedoLogOffset(Map offset) { } /** - * The {@link ChangeEventSource.ChangeEventSourceContext} implementation for binlog split task. + * The {@link ChangeEventSource.ChangeEventSourceContext} implementation for redo log split task. */ private class RedoLogSplitChangeEventSourceContext implements ChangeEventSource.ChangeEventSourceContext { diff --git a/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/table/OracleTableSource.java b/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/table/OracleTableSource.java index 45cce35e49..4167e36b47 100644 --- a/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/table/OracleTableSource.java +++ b/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/table/OracleTableSource.java @@ -51,7 +51,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull; /** - * A {@link DynamicTableSource} that describes how to create a Oracle binlog from a logical + * A {@link DynamicTableSource} that describes how to create a Oracle redo log from a logical * description. */ public class OracleTableSource implements ScanTableSource, SupportsReadingMetadata { From c137225e059c32c025f4ad679a8909d8e3ffb452 Mon Sep 17 00:00:00 2001 From: e-mhui Date: Thu, 24 Aug 2023 15:29:06 +0800 Subject: [PATCH 2/3] Fix comments --- .../oracle/source/reader/fetch/OracleScanFetchTask.java | 4 ++-- .../oracle/source/reader/fetch/OracleStreamFetchTask.java | 7 ++++--- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/reader/fetch/OracleScanFetchTask.java b/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/reader/fetch/OracleScanFetchTask.java index 18a6b31129..6778d6e9f6 100644 --- a/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/reader/fetch/OracleScanFetchTask.java +++ b/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/reader/fetch/OracleScanFetchTask.java @@ -446,8 +446,8 @@ public boolean isRunning() { } /** - * The {@link ChangeEventSource.ChangeEventSourceContext} implementation for bounded redo log task - * of a snapshot split task. + * The {@link ChangeEventSource.ChangeEventSourceContext} implementation for bounded redo log + * task of a snapshot split task. */ public class SnapshotRedoLogSplitChangeEventSourceContext implements ChangeEventSource.ChangeEventSourceContext { diff --git a/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/reader/fetch/OracleStreamFetchTask.java b/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/reader/fetch/OracleStreamFetchTask.java index 13d0759b3e..32c0f1734f 100644 --- a/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/reader/fetch/OracleStreamFetchTask.java +++ b/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/reader/fetch/OracleStreamFetchTask.java @@ -91,8 +91,8 @@ public void close() { } /** - * A wrapped task to read all redo log for table and also supports read bounded (from lowWatermark - * to highWatermark) redo log. + * A wrapped task to read all redo log for table and also supports read bounded (from + * lowWatermark to highWatermark) redo log. */ public static class RedoLogSplitReadTask extends LogMinerStreamingChangeEventSource { @@ -179,7 +179,8 @@ public static RedoLogOffset getCurrentRedoLogOffset(Map offset) { } /** - * The {@link ChangeEventSource.ChangeEventSourceContext} implementation for redo log split task. + * The {@link ChangeEventSource.ChangeEventSourceContext} implementation for redo log split + * task. */ private class RedoLogSplitChangeEventSourceContext implements ChangeEventSource.ChangeEventSourceContext { From 0505c1a148374b8863698a791f64431c2ef817f2 Mon Sep 17 00:00:00 2001 From: e-mhui Date: Fri, 25 Aug 2023 11:27:56 +0800 Subject: [PATCH 3/3] fix name error --- .../connectors/oracle/OracleSourceTest.java | 4 +-- .../oracle/source/OracleSourceITCase.java | 33 ++++++++++--------- .../oracle/table/OracleConnectorITCase.java | 2 +- 3 files changed, 21 insertions(+), 18 deletions(-) diff --git a/flink-connector-oracle-cdc/src/test/java/com/ververica/cdc/connectors/oracle/OracleSourceTest.java b/flink-connector-oracle-cdc/src/test/java/com/ververica/cdc/connectors/oracle/OracleSourceTest.java index b4a63a8c48..bb07578927 100644 --- a/flink-connector-oracle-cdc/src/test/java/com/ververica/cdc/connectors/oracle/OracleSourceTest.java +++ b/flink-connector-oracle-cdc/src/test/java/com/ververica/cdc/connectors/oracle/OracleSourceTest.java @@ -271,7 +271,7 @@ public void go() throws Exception { String state = new String(offsetState.list.get(0), StandardCharsets.UTF_8); assertEquals("oracle_logminer", JsonPath.read(state, "$.sourcePartition.server")); - // execute 2 more DMLs to have more binlog + // execute 2 more DMLs to have more redo log statement.execute( "INSERT INTO debezium.products VALUES (1001,'roy','old robot',1234.56)"); // 1001 statement.execute("UPDATE debezium.products SET weight=1345.67 WHERE id=1001"); @@ -300,7 +300,7 @@ public void go() throws Exception { }; runThread3.start(); - // consume the unconsumed binlog + // consume the unconsumed redo log List records = drain(sourceContext3, 2); assertInsert(records.get(0), "ID", 1001); assertUpdate(records.get(1), "ID", 1001); diff --git a/flink-connector-oracle-cdc/src/test/java/com/ververica/cdc/connectors/oracle/source/OracleSourceITCase.java b/flink-connector-oracle-cdc/src/test/java/com/ververica/cdc/connectors/oracle/source/OracleSourceITCase.java index dad150f68c..ff083009e9 100644 --- a/flink-connector-oracle-cdc/src/test/java/com/ververica/cdc/connectors/oracle/source/OracleSourceITCase.java +++ b/flink-connector-oracle-cdc/src/test/java/com/ververica/cdc/connectors/oracle/source/OracleSourceITCase.java @@ -79,8 +79,9 @@ public void testTaskManagerFailoverInSnapshotPhase() throws Exception { } @Test - public void testTaskManagerFailoverInBinlogPhase() throws Exception { - testOracleParallelSource(FailoverType.TM, FailoverPhase.BINLOG, new String[] {"CUSTOMERS"}); + public void testTaskManagerFailoverInRedoLogPhase() throws Exception { + testOracleParallelSource( + FailoverType.TM, FailoverPhase.REDO_LOG, new String[] {"CUSTOMERS"}); } @Test @@ -90,8 +91,9 @@ public void testJobManagerFailoverInSnapshotPhase() throws Exception { } @Test - public void testJobManagerFailoverInBinlogPhase() throws Exception { - testOracleParallelSource(FailoverType.JM, FailoverPhase.BINLOG, new String[] {"CUSTOMERS"}); + public void testJobManagerFailoverInRedoLogPhase() throws Exception { + testOracleParallelSource( + FailoverType.JM, FailoverPhase.REDO_LOG, new String[] {"CUSTOMERS"}); } @Test @@ -202,19 +204,19 @@ private void testOracleParallelSource( assertEqualsInAnyOrder( expectedSnapshotData, fetchRows(iterator, expectedSnapshotData.size())); - // second step: check the binlog data + // second step: check the redo log data for (String tableId : captureCustomerTables) { - makeFirstPartBinlogEvents(ORACLE_SCHEMA + '.' + tableId); + makeFirstPartRedoLogEvents(ORACLE_SCHEMA + '.' + tableId); } - if (failoverPhase == FailoverPhase.BINLOG) { + if (failoverPhase == FailoverPhase.REDO_LOG) { triggerFailover( failoverType, jobId, miniClusterResource.getMiniCluster(), () -> sleepMs(200)); } for (String tableId : captureCustomerTables) { - makeSecondPartBinlogEvents(ORACLE_SCHEMA + '.' + tableId); + makeSecondPartRedoLogEvents(ORACLE_SCHEMA + '.' + tableId); } - String[] binlogForSingleTable = + String[] redoLogForSingleTable = new String[] { "-U[103, user_3, Shanghai, 123567891234]", "+U[103, user_3, Hangzhou, 123567891234]", @@ -228,22 +230,23 @@ private void testOracleParallelSource( "+I[2002, user_23, Shanghai, 123567891234]", "+I[2003, user_24, Shanghai, 123567891234]" }; - List expectedBinlogData = new ArrayList<>(); + List expectedRedoLogData = new ArrayList<>(); for (int i = 0; i < captureCustomerTables.length; i++) { - expectedBinlogData.addAll(Arrays.asList(binlogForSingleTable)); + expectedRedoLogData.addAll(Arrays.asList(redoLogForSingleTable)); } - assertEqualsInAnyOrder(expectedBinlogData, fetchRows(iterator, expectedBinlogData.size())); + assertEqualsInAnyOrder( + expectedRedoLogData, fetchRows(iterator, expectedRedoLogData.size())); tableResult.getJobClient().get().cancel().get(); } - private void makeFirstPartBinlogEvents(String tableId) throws Exception { + private void makeFirstPartRedoLogEvents(String tableId) throws Exception { executeSql("UPDATE " + tableId + " SET address = 'Hangzhou' where id = 103"); executeSql("DELETE FROM " + tableId + " where id = 102"); executeSql("INSERT INTO " + tableId + " VALUES(102, 'user_2','Shanghai','123567891234')"); executeSql("UPDATE " + tableId + " SET address = 'Shanghai' where id = 103"); } - private void makeSecondPartBinlogEvents(String tableId) throws Exception { + private void makeSecondPartRedoLogEvents(String tableId) throws Exception { executeSql("UPDATE " + tableId + " SET address = 'Hangzhou' where id = 1010"); executeSql("INSERT INTO " + tableId + " VALUES(2001, 'user_22','Shanghai','123567891234')"); executeSql("INSERT INTO " + tableId + " VALUES(2002, 'user_23','Shanghai','123567891234')"); @@ -334,7 +337,7 @@ private enum FailoverType { /** The phase of failover. */ private enum FailoverPhase { SNAPSHOT, - BINLOG, + REDO_LOG, NEVER } diff --git a/flink-connector-oracle-cdc/src/test/java/com/ververica/cdc/connectors/oracle/table/OracleConnectorITCase.java b/flink-connector-oracle-cdc/src/test/java/com/ververica/cdc/connectors/oracle/table/OracleConnectorITCase.java index 55e8dcc139..51ac1b3294 100644 --- a/flink-connector-oracle-cdc/src/test/java/com/ververica/cdc/connectors/oracle/table/OracleConnectorITCase.java +++ b/flink-connector-oracle-cdc/src/test/java/com/ververica/cdc/connectors/oracle/table/OracleConnectorITCase.java @@ -65,7 +65,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThat; -/** Integration tests for Oracle binlog SQL source. */ +/** Integration tests for Oracle redo log SQL source. */ @RunWith(Parameterized.class) public class OracleConnectorITCase extends AbstractTestBase { private static final int RECORDS_COUNT = 10_000;