Skip to content

Commit

Permalink
[oracle] Correct the naming error (#2405)
Browse files Browse the repository at this point in the history
* [oracle] Correct the naming error

* Fix comments

* fix name error
  • Loading branch information
e-mhui authored Aug 26, 2023
1 parent 639ba31 commit 8feb51b
Show file tree
Hide file tree
Showing 6 changed files with 55 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -110,34 +110,34 @@ public void execute(Context context) throws Exception {
sourceFetchContext.getPartition(),
sourceFetchContext.getOffsetContext());

final StreamSplit backfillBinlogSplit =
final StreamSplit backfillRedoLogSplit =
createBackfillRedoLogSplit(changeEventSourceContext);
// optimization that skip the binlog read when the low watermark equals high
// optimization that skip the redo log read when the low watermark equals high
// watermark
final boolean binlogBackfillRequired =
backfillBinlogSplit
final boolean redoLogBackfillRequired =
backfillRedoLogSplit
.getEndingOffset()
.isAfter(backfillBinlogSplit.getStartingOffset());
if (!binlogBackfillRequired) {
dispatchBinlogEndEvent(
backfillBinlogSplit,
.isAfter(backfillRedoLogSplit.getStartingOffset());
if (!redoLogBackfillRequired) {
dispatchRedoLogEndEvent(
backfillRedoLogSplit,
sourceFetchContext.getPartition().getSourcePartition(),
((OracleSourceFetchTaskContext) context).getDispatcher());
taskRunning = false;
return;
}
// execute redoLog read task
if (snapshotResult.isCompletedOrSkipped()) {
final RedoLogSplitReadTask backfillBinlogReadTask =
createBackfillRedoLogReadTask(backfillBinlogSplit, sourceFetchContext);
final RedoLogSplitReadTask backfillRedoLogReadTask =
createBackfillRedoLogReadTask(backfillRedoLogSplit, sourceFetchContext);

final LogMinerOracleOffsetContextLoader loader =
new LogMinerOracleOffsetContextLoader(
((OracleSourceFetchTaskContext) context).getDbzConnectorConfig());
final OracleOffsetContext oracleOffsetContext =
loader.load(backfillBinlogSplit.getStartingOffset().getOffset());
backfillBinlogReadTask.execute(
new SnapshotBinlogSplitChangeEventSourceContext(),
loader.load(backfillRedoLogSplit.getStartingOffset().getOffset());
backfillRedoLogReadTask.execute(
new SnapshotRedoLogSplitChangeEventSourceContext(),
sourceFetchContext.getPartition(),
oracleOffsetContext);
taskRunning = false;
Expand All @@ -160,7 +160,7 @@ private StreamSplit createBackfillRedoLogSplit(
}

private RedoLogSplitReadTask createBackfillRedoLogReadTask(
StreamSplit backfillBinlogSplit, OracleSourceFetchTaskContext context) {
StreamSplit backfillRedoLogSplit, OracleSourceFetchTaskContext context) {
// we should only capture events for the current table,
// otherwise, we may can't find corresponding schema
Configuration dezConf =
Expand All @@ -171,7 +171,7 @@ private RedoLogSplitReadTask createBackfillRedoLogReadTask(
// Disable heartbeat event in snapshot split fetcher
.with(Heartbeat.HEARTBEAT_INTERVAL, 0)
.build();
// task to read binlog and backfill for current split
// task to read redo log and backfill for current split
return new RedoLogSplitReadTask(
new OracleConnectorConfig(dezConf),
context.getConnection(),
Expand All @@ -180,18 +180,18 @@ private RedoLogSplitReadTask createBackfillRedoLogReadTask(
context.getDatabaseSchema(),
context.getSourceConfig().getOriginDbzConnectorConfig(),
context.getStreamingChangeEventSourceMetrics(),
backfillBinlogSplit);
backfillRedoLogSplit);
}

private void dispatchBinlogEndEvent(
StreamSplit backFillBinlogSplit,
private void dispatchRedoLogEndEvent(
StreamSplit backFillRedoLogSplit,
Map<String, ?> sourcePartition,
JdbcSourceEventDispatcher<OraclePartition> eventDispatcher)
throws InterruptedException {
eventDispatcher.dispatchWatermarkEvent(
sourcePartition,
backFillBinlogSplit,
backFillBinlogSplit.getEndingOffset(),
backFillRedoLogSplit,
backFillRedoLogSplit.getEndingOffset(),
WatermarkKind.END);
}

Expand Down Expand Up @@ -446,10 +446,10 @@ public boolean isRunning() {
}

/**
* The {@link ChangeEventSource.ChangeEventSourceContext} implementation for bounded binlog task
* of a snapshot split task.
* The {@link ChangeEventSource.ChangeEventSourceContext} implementation for bounded redo log
* task of a snapshot split task.
*/
public class SnapshotBinlogSplitChangeEventSourceContext
public class SnapshotRedoLogSplitChangeEventSourceContext
implements ChangeEventSource.ChangeEventSourceContext {

public void finished() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,8 @@ public void close() {
}

/**
* A wrapped task to read all binlog for table and also supports read bounded (from lowWatermark
* to highWatermark) binlog.
* A wrapped task to read all redo log for table and also supports read bounded (from
* lowWatermark to highWatermark) redo log.
*/
public static class RedoLogSplitReadTask extends LogMinerStreamingChangeEventSource {

Expand Down Expand Up @@ -138,13 +138,13 @@ public void execute(
protected void afterHandleScn(
OraclePartition partition, OracleOffsetContext offsetContext) {
super.afterHandleScn(partition, offsetContext);
// check do we need to stop for fetch binlog for snapshot split.
// check do we need to stop for fetch redo log for snapshot split.
if (isBoundedRead()) {
final RedoLogOffset currentRedoLogOffset =
getCurrentRedoLogOffset(offsetContext.getOffset());
// reach the high watermark, the binlog fetcher should be finished
// reach the high watermark, the redo log fetcher should be finished
if (currentRedoLogOffset.isAtOrAfter(redoLogSplit.getEndingOffset())) {
// send binlog end event
// send redo log end event
try {
dispatcher.dispatchWatermarkEvent(
partition.getSourcePartition(),
Expand All @@ -154,10 +154,10 @@ protected void afterHandleScn(
} catch (InterruptedException e) {
LOG.error("Send signal event error.", e);
errorHandler.setProducerThrowable(
new DebeziumException("Error processing binlog signal event", e));
new DebeziumException("Error processing redo log signal event", e));
}
// tell fetcher the binlog task finished
((OracleScanFetchTask.SnapshotBinlogSplitChangeEventSourceContext) context)
// tell fetcher the redo log task finished
((OracleScanFetchTask.SnapshotRedoLogSplitChangeEventSourceContext) context)
.finished();
}
}
Expand All @@ -179,7 +179,8 @@ public static RedoLogOffset getCurrentRedoLogOffset(Map<String, ?> offset) {
}

/**
* The {@link ChangeEventSource.ChangeEventSourceContext} implementation for binlog split task.
* The {@link ChangeEventSource.ChangeEventSourceContext} implementation for redo log split
* task.
*/
private class RedoLogSplitChangeEventSourceContext
implements ChangeEventSource.ChangeEventSourceContext {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
import static org.apache.flink.util.Preconditions.checkNotNull;

/**
* A {@link DynamicTableSource} that describes how to create a Oracle binlog from a logical
* A {@link DynamicTableSource} that describes how to create a Oracle redo log from a logical
* description.
*/
public class OracleTableSource implements ScanTableSource, SupportsReadingMetadata {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ public void go() throws Exception {
String state = new String(offsetState.list.get(0), StandardCharsets.UTF_8);
assertEquals("oracle_logminer", JsonPath.read(state, "$.sourcePartition.server"));

// execute 2 more DMLs to have more binlog
// execute 2 more DMLs to have more redo log
statement.execute(
"INSERT INTO debezium.products VALUES (1001,'roy','old robot',1234.56)"); // 1001
statement.execute("UPDATE debezium.products SET weight=1345.67 WHERE id=1001");
Expand Down Expand Up @@ -300,7 +300,7 @@ public void go() throws Exception {
};
runThread3.start();

// consume the unconsumed binlog
// consume the unconsumed redo log
List<SourceRecord> records = drain(sourceContext3, 2);
assertInsert(records.get(0), "ID", 1001);
assertUpdate(records.get(1), "ID", 1001);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,9 @@ public void testTaskManagerFailoverInSnapshotPhase() throws Exception {
}

@Test
public void testTaskManagerFailoverInBinlogPhase() throws Exception {
testOracleParallelSource(FailoverType.TM, FailoverPhase.BINLOG, new String[] {"CUSTOMERS"});
public void testTaskManagerFailoverInRedoLogPhase() throws Exception {
testOracleParallelSource(
FailoverType.TM, FailoverPhase.REDO_LOG, new String[] {"CUSTOMERS"});
}

@Test
Expand All @@ -90,8 +91,9 @@ public void testJobManagerFailoverInSnapshotPhase() throws Exception {
}

@Test
public void testJobManagerFailoverInBinlogPhase() throws Exception {
testOracleParallelSource(FailoverType.JM, FailoverPhase.BINLOG, new String[] {"CUSTOMERS"});
public void testJobManagerFailoverInRedoLogPhase() throws Exception {
testOracleParallelSource(
FailoverType.JM, FailoverPhase.REDO_LOG, new String[] {"CUSTOMERS"});
}

@Test
Expand Down Expand Up @@ -202,19 +204,19 @@ private void testOracleParallelSource(
assertEqualsInAnyOrder(
expectedSnapshotData, fetchRows(iterator, expectedSnapshotData.size()));

// second step: check the binlog data
// second step: check the redo log data
for (String tableId : captureCustomerTables) {
makeFirstPartBinlogEvents(ORACLE_SCHEMA + '.' + tableId);
makeFirstPartRedoLogEvents(ORACLE_SCHEMA + '.' + tableId);
}
if (failoverPhase == FailoverPhase.BINLOG) {
if (failoverPhase == FailoverPhase.REDO_LOG) {
triggerFailover(
failoverType, jobId, miniClusterResource.getMiniCluster(), () -> sleepMs(200));
}
for (String tableId : captureCustomerTables) {
makeSecondPartBinlogEvents(ORACLE_SCHEMA + '.' + tableId);
makeSecondPartRedoLogEvents(ORACLE_SCHEMA + '.' + tableId);
}

String[] binlogForSingleTable =
String[] redoLogForSingleTable =
new String[] {
"-U[103, user_3, Shanghai, 123567891234]",
"+U[103, user_3, Hangzhou, 123567891234]",
Expand All @@ -228,22 +230,23 @@ private void testOracleParallelSource(
"+I[2002, user_23, Shanghai, 123567891234]",
"+I[2003, user_24, Shanghai, 123567891234]"
};
List<String> expectedBinlogData = new ArrayList<>();
List<String> expectedRedoLogData = new ArrayList<>();
for (int i = 0; i < captureCustomerTables.length; i++) {
expectedBinlogData.addAll(Arrays.asList(binlogForSingleTable));
expectedRedoLogData.addAll(Arrays.asList(redoLogForSingleTable));
}
assertEqualsInAnyOrder(expectedBinlogData, fetchRows(iterator, expectedBinlogData.size()));
assertEqualsInAnyOrder(
expectedRedoLogData, fetchRows(iterator, expectedRedoLogData.size()));
tableResult.getJobClient().get().cancel().get();
}

private void makeFirstPartBinlogEvents(String tableId) throws Exception {
private void makeFirstPartRedoLogEvents(String tableId) throws Exception {
executeSql("UPDATE " + tableId + " SET address = 'Hangzhou' where id = 103");
executeSql("DELETE FROM " + tableId + " where id = 102");
executeSql("INSERT INTO " + tableId + " VALUES(102, 'user_2','Shanghai','123567891234')");
executeSql("UPDATE " + tableId + " SET address = 'Shanghai' where id = 103");
}

private void makeSecondPartBinlogEvents(String tableId) throws Exception {
private void makeSecondPartRedoLogEvents(String tableId) throws Exception {
executeSql("UPDATE " + tableId + " SET address = 'Hangzhou' where id = 1010");
executeSql("INSERT INTO " + tableId + " VALUES(2001, 'user_22','Shanghai','123567891234')");
executeSql("INSERT INTO " + tableId + " VALUES(2002, 'user_23','Shanghai','123567891234')");
Expand Down Expand Up @@ -334,7 +337,7 @@ private enum FailoverType {
/** The phase of failover. */
private enum FailoverPhase {
SNAPSHOT,
BINLOG,
REDO_LOG,
NEVER
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;

/** Integration tests for Oracle binlog SQL source. */
/** Integration tests for Oracle redo log SQL source. */
@RunWith(Parameterized.class)
public class OracleConnectorITCase extends AbstractTestBase {
private static final int RECORDS_COUNT = 10_000;
Expand Down

0 comments on commit 8feb51b

Please sign in to comment.