Skip to content

Commit

Permalink
[sqlserver] Fix backfill stream task hang
Browse files Browse the repository at this point in the history
  • Loading branch information
GOODBOY008 committed Aug 15, 2023
1 parent f7df47e commit c987d29
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ private List<ChunkRange> splitUnevenlySizedChunks(
while (chunkEnd != null && ObjectUtils.compare(chunkEnd, max) <= 0) {
// we start from [null, min + chunk_size) and avoid [null, min)
splits.add(ChunkRange.of(chunkStart, chunkEnd));
// may sleep a while to avoid DDOS on SqlServer server
// may sleep awhile to avoid DDOS on SqlServer server
maySleep(count++, tableId);
chunkStart = chunkEnd;
chunkEnd = nextChunkEnd(jdbc, chunkEnd, tableId, splitColumnName, max, chunkSize);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,16 +159,22 @@ private LsnSplitReadTask createBackFillLsnSplitReadTask(
// we should only capture events for the current table,
// otherwise, we may can't find corresponding schema
Configuration dezConf =
context.getSourceConfig()
.getDbzConfiguration()
context.getDbzConnectorConfig()
.getConfig()
.edit()
.with("table.include.list", split.getTableId().toString())
// table.include.list is schema.table format
.with(
"table.include.list",
new TableId(
null,
split.getTableId().schema(),
split.getTableId().table()))
// Disable heartbeat event in snapshot split fetcher
.with(Heartbeat.HEARTBEAT_INTERVAL, 0)
.build();
// task to read binlog and backfill for current split
return new LsnSplitReadTask(
context.getDbzConnectorConfig(),
new SqlServerConnectorConfig(dezConf),
context.getConnection(),
context.getMetaDataConnection(),
context.getDispatcher(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@
package com.ververica.cdc.connectors.sqlserver.source.reader.fetch;

import com.ververica.cdc.connectors.base.relational.JdbcSourceEventDispatcher;
import com.ververica.cdc.connectors.base.source.meta.offset.Offset;
import com.ververica.cdc.connectors.base.source.meta.split.SourceSplitBase;
import com.ververica.cdc.connectors.base.source.meta.split.StreamSplit;
import com.ververica.cdc.connectors.base.source.meta.wartermark.WatermarkKind;
import com.ververica.cdc.connectors.base.source.reader.external.FetchTask;
import com.ververica.cdc.connectors.sqlserver.source.offset.LsnOffset;
import io.debezium.DebeziumException;
import io.debezium.connector.sqlserver.Lsn;
import io.debezium.connector.sqlserver.SqlServerConnection;
import io.debezium.connector.sqlserver.SqlServerConnectorConfig;
import io.debezium.connector.sqlserver.SqlServerDatabaseSchema;
Expand All @@ -36,7 +38,6 @@
import org.slf4j.LoggerFactory;

import static com.ververica.cdc.connectors.sqlserver.source.offset.LsnOffset.NO_STOPPING_OFFSET;
import static com.ververica.cdc.connectors.sqlserver.source.utils.SqlServerUtils.getLsnPosition;

/** The task to work for fetching data of SqlServer table stream split . */
public class SqlServerStreamFetchTask implements FetchTask<SourceSplitBase> {
Expand Down Expand Up @@ -121,19 +122,17 @@ public LsnSplitReadTask(
}

@Override
public void afterHandleLsn(
SqlServerPartition partition, SqlServerOffsetContext offsetContext) {
public void afterHandleLsn(SqlServerPartition partition, Lsn toLsn) {
// check do we need to stop for fetch binlog for snapshot split.
if (isBoundedRead()) {
final LsnOffset currentRedoLogOffset = getLsnPosition(offsetContext.getOffset());
// reach the high watermark, the binlog fetcher should be finished
if (currentRedoLogOffset.isAtOrAfter(lsnSplit.getEndingOffset())) {
Offset endingOffset = lsnSplit.getEndingOffset();
if (toLsn.compareTo(((LsnOffset) endingOffset).getLcn()) >= 0) {
// send binlog end event
try {
dispatcher.dispatchWatermarkEvent(
partition.getSourcePartition(),
lsnSplit,
currentRedoLogOffset,
endingOffset,
WatermarkKind.END);
} catch (InterruptedException e) {
LOG.error("Send signal event error.", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,11 @@

/**
* Copied from Debezium project(1.9.7.final) to add method {@link
* SqlServerStreamingChangeEventSource#afterHandleLsn(SqlServerPartition, SqlServerOffsetContext)}.
* Also implemented {@link SqlServerStreamingChangeEventSource#execute( ChangeEventSourceContext,
* SqlServerPartition, SqlServerOffsetContext)}. A {@link StreamingChangeEventSource} based on SQL
* Server change data capture functionality. A main loop polls database DDL change and change data
* tables and turns them into change events.
* SqlServerStreamingChangeEventSource#afterHandleLsn(SqlServerPartition, Lsn)}. Also implemented
* {@link SqlServerStreamingChangeEventSource#execute( ChangeEventSourceContext, SqlServerPartition,
* SqlServerOffsetContext)}. A {@link StreamingChangeEventSource} based on SQL Server change data
* capture functionality. A main loop polls database DDL change and change data tables and turns
* them into change events.
*
* <p>The connector uses CDC functionality of SQL Server that is implemented as as a process that
* monitors source table and write changes from the table into the change table.
Expand Down Expand Up @@ -198,7 +198,6 @@ public boolean executeIteration(

if (context.isRunning()) {
commitTransaction();
afterHandleLsn(partition, offsetContext);
final Lsn toLsn =
getToLsn(
dataConnection,
Expand Down Expand Up @@ -444,6 +443,8 @@ public boolean executeIteration(
clock));
tableWithSmallestLsn.next();
}
// call after handle to judge whether to complete the stream
afterHandleLsn(partition, toLsn);
});
streamingExecutionContext.setLastProcessedPosition(
TxLogPosition.valueOf(toLsn));
Expand Down Expand Up @@ -625,8 +626,7 @@ private Lsn getToLsn(
}

/** expose control to the user to stop the connector. */
protected void afterHandleLsn(
SqlServerPartition partition, SqlServerOffsetContext offsetContext) {
protected void afterHandleLsn(SqlServerPartition partition, Lsn toLsn) {
// do nothing
}
}

0 comments on commit c987d29

Please sign in to comment.