Skip to content

Commit

Permalink
[sqlserver] Merge high and low watermark data during snapshot reading…
Browse files Browse the repository at this point in the history
… process
  • Loading branch information
fuyun2024 authored and GOODBOY008 committed Jun 15, 2023
1 parent 1344ca5 commit 621ca21
Show file tree
Hide file tree
Showing 6 changed files with 484 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.ververica.cdc.connectors.base.source.reader.external;

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.util.FlinkRuntimeException;

import org.apache.flink.shaded.guava30.com.google.common.util.concurrent.ThreadFactoryBuilder;
Expand Down Expand Up @@ -202,6 +203,11 @@ public void close() {
}
}

@VisibleForTesting
public ExecutorService getExecutorService() {
return executorService;
}

private void assertLowWatermark(SourceRecord lowWatermark) {
checkState(
isLowWatermarkEvent(lowWatermark),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public LsnOffset(Lsn lsn) {
}

public Lsn getLcn() {
return Lsn.valueOf(offset.get(SourceInfo.CHANGE_LSN_KEY));
return Lsn.valueOf(offset.get(SourceInfo.COMMIT_LSN_KEY));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@
import java.util.ArrayList;
import java.util.Map;

import static com.ververica.cdc.connectors.sqlserver.source.utils.SqlServerConnectionUtils.createSqlServerConnection;
import static com.ververica.cdc.connectors.sqlserver.source.utils.SqlServerUtils.buildSplitScanQuery;
import static com.ververica.cdc.connectors.sqlserver.source.utils.SqlServerUtils.currentLsn;
import static com.ververica.cdc.connectors.sqlserver.source.utils.SqlServerUtils.readTableSplitDataStatement;
Expand Down Expand Up @@ -90,6 +89,7 @@ public void execute(Context context) throws Exception {
sourceFetchContext.getDatabaseSchema(),
sourceFetchContext.getConnection(),
sourceFetchContext.getDispatcher(),
sourceFetchContext.getSnapshotReceiver(),
split);
SnapshotSplitChangeEventSourceContext changeEventSourceContext =
new SnapshotSplitChangeEventSourceContext();
Expand All @@ -114,11 +114,15 @@ public void execute(Context context) throws Exception {
}
// execute stream read task
if (snapshotResult.isCompletedOrSkipped()) {
final SqlServerOffsetContext.Loader loader =
new SqlServerOffsetContext.Loader(sourceFetchContext.getDbzConnectorConfig());
final SqlServerOffsetContext streamOffsetContext =
loader.load(backfillBinlogSplit.getStartingOffset().getOffset());

final LsnSplitReadTask backfillBinlogReadTask =
createBackFillLsnSplitReadTask(backfillBinlogSplit, sourceFetchContext);
backfillBinlogReadTask.execute(
new SnapshotBinlogSplitChangeEventSourceContext(),
sourceFetchContext.getOffsetContext());
new SnapshotBinlogSplitChangeEventSourceContext(), streamOffsetContext);
} else {
taskRunning = false;
throw new IllegalStateException(
Expand Down Expand Up @@ -163,8 +167,8 @@ private LsnSplitReadTask createBackFillLsnSplitReadTask(
.build();
// task to read binlog and backfill for current split
return new LsnSplitReadTask(
new SqlServerConnectorConfig(dezConf),
createSqlServerConnection(context.getSourceConfig().getDbzConfiguration()),
context.getDbzConnectorConfig(),
context.getConnection(),
context.getMetaDataConnection(),
context.getDispatcher(),
context.getErrorHandler(),
Expand Down Expand Up @@ -199,6 +203,7 @@ public static class SqlServerSnapshotSplitReadTask extends AbstractSnapshotChang
private final SnapshotSplit snapshotSplit;
private final SqlServerOffsetContext offsetContext;
private final SnapshotProgressListener snapshotProgressListener;
private final EventDispatcher.SnapshotReceiver snapshotReceiver;

public SqlServerSnapshotSplitReadTask(
SqlServerConnectorConfig connectorConfig,
Expand All @@ -207,6 +212,7 @@ public SqlServerSnapshotSplitReadTask(
SqlServerDatabaseSchema databaseSchema,
SqlServerConnection jdbcConnection,
JdbcSourceEventDispatcher dispatcher,
EventDispatcher.SnapshotReceiver snapshotReceiver,
SnapshotSplit snapshotSplit) {
super(connectorConfig, snapshotProgressListener);
this.offsetContext = previousOffset;
Expand All @@ -217,6 +223,7 @@ public SqlServerSnapshotSplitReadTask(
this.clock = Clock.SYSTEM;
this.snapshotSplit = snapshotSplit;
this.snapshotProgressListener = snapshotProgressListener;
this.snapshotReceiver = snapshotReceiver;
}

@Override
Expand Down Expand Up @@ -269,7 +276,7 @@ protected SnapshotResult doExecute(
"Snapshot step 3 - Determining high watermark {} for split {}",
highWatermark,
snapshotSplit);
((SnapshotSplitChangeEventSourceContext) (context)).setHighWatermark(lowWatermark);
((SnapshotSplitChangeEventSourceContext) (context)).setHighWatermark(highWatermark);
dispatcher.dispatchWatermarkEvent(
offsetContext.getPartition(), snapshotSplit, highWatermark, WatermarkKind.HIGH);
return SnapshotResult.completed(ctx.offset);
Expand All @@ -290,8 +297,6 @@ private void createDataEvents(
RelationalSnapshotChangeEventSource.RelationalSnapshotContext snapshotContext,
TableId tableId)
throws Exception {
EventDispatcher.SnapshotReceiver snapshotReceiver =
dispatcher.getSnapshotChangeEventReceiver();
LOG.debug("Snapshotting table {}", tableId);
createDataEventsForTable(
snapshotContext, snapshotReceiver, databaseSchema.tableFor(tableId));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import io.debezium.data.Envelope.FieldName;
import io.debezium.pipeline.DataChangeEvent;
import io.debezium.pipeline.ErrorHandler;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.metrics.DefaultChangeEventSourceMetricsFactory;
import io.debezium.pipeline.metrics.SnapshotChangeEventSourceMetrics;
import io.debezium.pipeline.metrics.StreamingChangeEventSourceMetrics;
Expand Down Expand Up @@ -84,6 +85,7 @@ public class SqlServerSourceFetchTaskContext extends JdbcSourceFetchTaskContext
private ChangeEventQueue<DataChangeEvent> queue;
private SqlServerTaskContext taskContext;
private TopicSelector<TableId> topicSelector;
private EventDispatcher.SnapshotReceiver snapshotReceiver;
private SnapshotChangeEventSourceMetrics snapshotChangeEventSourceMetrics;
private StreamingChangeEventSourceMetrics streamingChangeEventSourceMetrics;

Expand Down Expand Up @@ -143,6 +145,8 @@ public void configure(SourceSplitBase sourceSplitBase) {
metadataProvider,
schemaNameAdjuster);

this.snapshotReceiver = dispatcher.getSnapshotChangeEventReceiver();

final DefaultChangeEventSourceMetricsFactory changeEventSourceMetricsFactory =
new DefaultChangeEventSourceMetricsFactory();
this.snapshotChangeEventSourceMetrics =
Expand All @@ -163,7 +167,6 @@ private SqlServerOffsetContext loadStartingOffsetState(
: sourceSplitBase.asStreamSplit().getStartingOffset();

SqlServerOffsetContext sqlServerOffsetContext = loader.load(offset.getOffset());
sqlServerOffsetContext.preSnapshotCompletion();
return sqlServerOffsetContext;
}

Expand Down Expand Up @@ -218,6 +221,10 @@ public JdbcSourceEventDispatcher getDispatcher() {
return dispatcher;
}

public EventDispatcher.SnapshotReceiver getSnapshotReceiver() {
return snapshotReceiver;
}

@Override
public SqlServerOffsetContext getOffsetContext() {
return offsetContext;
Expand Down
Loading

0 comments on commit 621ca21

Please sign in to comment.