Skip to content

Commit

Permalink
[hotfix][sqlserver] Fix backfill stream task hang (apache#2374)
Browse files Browse the repository at this point in the history
* [sqlserver] Fix backfill stream task hang

* [sqlserver] Fix backfill stream task hang
  • Loading branch information
GOODBOY008 authored Aug 21, 2023
1 parent 8e7378d commit 6fa02ef
Show file tree
Hide file tree
Showing 6 changed files with 43 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ private List<ChunkRange> splitUnevenlySizedChunks(
while (chunkEnd != null && ObjectUtils.compare(chunkEnd, max) <= 0) {
// we start from [null, min + chunk_size) and avoid [null, min)
splits.add(ChunkRange.of(chunkStart, chunkEnd));
// may sleep a while to avoid DDOS on SqlServer server
// may sleep awhile to avoid DDOS on SqlServer server
maySleep(count++, tableId);
chunkStart = chunkEnd;
chunkEnd = nextChunkEnd(jdbc, chunkEnd, tableId, splitColumnName, max, chunkSize);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,8 @@ public LsnOffset(Lsn scn, Lsn commitScn, Long eventSerialNo) {
this.offset = offsetMap;
}

public LsnOffset(Lsn lsn) {
this(lsn, null, null);
}

public Lsn getLcn() {
return Lsn.valueOf(offset.get(SourceInfo.COMMIT_LSN_KEY));
public LsnOffset(Lsn changeLsn) {
this(changeLsn, null, null);
}

@Override
Expand All @@ -69,8 +65,8 @@ public int compareTo(Offset offset) {
return -1;
}

Lsn lsn = this.getLcn();
Lsn targetLsn = that.getLcn();
Lsn lsn = Lsn.valueOf(this.offset.get(SourceInfo.COMMIT_LSN_KEY));
Lsn targetLsn = Lsn.valueOf(that.offset.get(SourceInfo.COMMIT_LSN_KEY));
if (targetLsn.isAvailable()) {
if (lsn.isAvailable()) {
return lsn.compareTo(targetLsn);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import com.ververica.cdc.connectors.base.source.meta.wartermark.WatermarkKind;
import com.ververica.cdc.connectors.base.source.reader.external.FetchTask;
import com.ververica.cdc.connectors.sqlserver.source.offset.LsnOffset;
import com.ververica.cdc.connectors.sqlserver.source.reader.fetch.SqlServerStreamFetchTask.LsnSplitReadTask;
import com.ververica.cdc.connectors.sqlserver.source.reader.fetch.SqlServerStreamFetchTask.StreamSplitReadTask;
import io.debezium.DebeziumException;
import io.debezium.config.Configuration;
import io.debezium.connector.sqlserver.SqlServerConnection;
Expand Down Expand Up @@ -118,7 +118,7 @@ public void execute(Context context) throws Exception {
final SqlServerOffsetContext streamOffsetContext =
loader.load(backfillBinlogSplit.getStartingOffset().getOffset());

final LsnSplitReadTask backfillBinlogReadTask =
final StreamSplitReadTask backfillBinlogReadTask =
createBackFillLsnSplitReadTask(backfillBinlogSplit, sourceFetchContext);
backfillBinlogReadTask.execute(
new SnapshotBinlogSplitChangeEventSourceContext(),
Expand Down Expand Up @@ -154,21 +154,27 @@ private void dispatchLsnEndEvent(
WatermarkKind.END);
}

private LsnSplitReadTask createBackFillLsnSplitReadTask(
private StreamSplitReadTask createBackFillLsnSplitReadTask(
StreamSplit backfillBinlogSplit, SqlServerSourceFetchTaskContext context) {
// we should only capture events for the current table,
// otherwise, we may can't find corresponding schema
Configuration dezConf =
context.getSourceConfig()
.getDbzConfiguration()
context.getDbzConnectorConfig()
.getConfig()
.edit()
.with("table.include.list", split.getTableId().toString())
// table.include.list is schema.table format
.with(
"table.include.list",
new TableId(
null,
split.getTableId().schema(),
split.getTableId().table()))
// Disable heartbeat event in snapshot split fetcher
.with(Heartbeat.HEARTBEAT_INTERVAL, 0)
.build();
// task to read binlog and backfill for current split
return new LsnSplitReadTask(
context.getDbzConnectorConfig(),
return new StreamSplitReadTask(
new SqlServerConnectorConfig(dezConf),
context.getConnection(),
context.getMetaDataConnection(),
context.getDispatcher(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,8 @@ public class SqlServerSourceFetchTaskContext extends JdbcSourceFetchTaskContext
* A separate connection for retrieving details of the schema changes; without it, adaptive
* buffering will not work.
*
* @link
* https://docs.microsoft.com/en-us/sql/connect/jdbc/using-adaptive-buffering?view=sql-server-2017#guidelines-for-using-adaptive-buffering
* <p>For more details, please refer to <a
* href="https://docs.microsoft.com/en-us/sql/connect/jdbc/using-adaptive-buffering?view=sql-server-2017#guidelines-for-using-adaptive-buffering">guidelines-for-using-adaptive-buffering</a>
*/
private final SqlServerConnection metaDataConnection;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@
package com.ververica.cdc.connectors.sqlserver.source.reader.fetch;

import com.ververica.cdc.connectors.base.relational.JdbcSourceEventDispatcher;
import com.ververica.cdc.connectors.base.source.meta.offset.Offset;
import com.ververica.cdc.connectors.base.source.meta.split.SourceSplitBase;
import com.ververica.cdc.connectors.base.source.meta.split.StreamSplit;
import com.ververica.cdc.connectors.base.source.meta.wartermark.WatermarkKind;
import com.ververica.cdc.connectors.base.source.reader.external.FetchTask;
import com.ververica.cdc.connectors.sqlserver.source.offset.LsnOffset;
import io.debezium.DebeziumException;
import io.debezium.connector.sqlserver.Lsn;
import io.debezium.connector.sqlserver.SqlServerConnection;
import io.debezium.connector.sqlserver.SqlServerConnectorConfig;
import io.debezium.connector.sqlserver.SqlServerDatabaseSchema;
Expand All @@ -36,14 +38,13 @@
import org.slf4j.LoggerFactory;

import static com.ververica.cdc.connectors.sqlserver.source.offset.LsnOffset.NO_STOPPING_OFFSET;
import static com.ververica.cdc.connectors.sqlserver.source.utils.SqlServerUtils.getLsnPosition;

/** The task to work for fetching data of SqlServer table stream split . */
public class SqlServerStreamFetchTask implements FetchTask<SourceSplitBase> {

private final StreamSplit split;
private volatile boolean taskRunning = false;
private LsnSplitReadTask redoLogSplitReadTask;
private StreamSplitReadTask redoLogSplitReadTask;

public SqlServerStreamFetchTask(StreamSplit split) {
this.split = split;
Expand All @@ -56,7 +57,7 @@ public void execute(Context context) throws Exception {
sourceFetchContext.getOffsetContext().preSnapshotCompletion();
taskRunning = true;
redoLogSplitReadTask =
new LsnSplitReadTask(
new StreamSplitReadTask(
sourceFetchContext.getDbzConnectorConfig(),
sourceFetchContext.getConnection(),
sourceFetchContext.getMetaDataConnection(),
Expand Down Expand Up @@ -91,15 +92,15 @@ public void close() {
* A wrapped task to read all binlog for table and also supports read bounded (from lowWatermark
* to highWatermark) binlog.
*/
public static class LsnSplitReadTask extends SqlServerStreamingChangeEventSource {
public static class StreamSplitReadTask extends SqlServerStreamingChangeEventSource {

private static final Logger LOG = LoggerFactory.getLogger(LsnSplitReadTask.class);
private static final Logger LOG = LoggerFactory.getLogger(StreamSplitReadTask.class);
private final StreamSplit lsnSplit;
private final JdbcSourceEventDispatcher<SqlServerPartition> dispatcher;
private final ErrorHandler errorHandler;
private ChangeEventSourceContext context;

public LsnSplitReadTask(
public StreamSplitReadTask(
SqlServerConnectorConfig connectorConfig,
SqlServerConnection connection,
SqlServerConnection metadataConnection,
Expand All @@ -121,26 +122,25 @@ public LsnSplitReadTask(
}

@Override
public void afterHandleLsn(
SqlServerPartition partition, SqlServerOffsetContext offsetContext) {
public void afterHandleLsn(SqlServerPartition partition, Lsn toLsn) {
// check do we need to stop for fetch binlog for snapshot split.
if (isBoundedRead()) {
final LsnOffset currentRedoLogOffset = getLsnPosition(offsetContext.getOffset());
// reach the high watermark, the binlog fetcher should be finished
if (currentRedoLogOffset.isAtOrAfter(lsnSplit.getEndingOffset())) {
// send binlog end event
LsnOffset currentLsnOffset = new LsnOffset(null, toLsn, null);
Offset endingOffset = lsnSplit.getEndingOffset();
if (currentLsnOffset.isAtOrAfter(endingOffset)) {
// send streaming end event
try {
dispatcher.dispatchWatermarkEvent(
partition.getSourcePartition(),
lsnSplit,
currentRedoLogOffset,
currentLsnOffset,
WatermarkKind.END);
} catch (InterruptedException e) {
LOG.error("Send signal event error.", e);
errorHandler.setProducerThrowable(
new DebeziumException("Error processing binlog signal event", e));
}
// tell fetcher the binlog task finished
// tell fetcher the streaming task finished
((SqlServerScanFetchTask.SnapshotBinlogSplitChangeEventSourceContext) context)
.finished();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,11 @@

/**
* Copied from Debezium project(1.9.7.final) to add method {@link
* SqlServerStreamingChangeEventSource#afterHandleLsn(SqlServerPartition, SqlServerOffsetContext)}.
* Also implemented {@link SqlServerStreamingChangeEventSource#execute( ChangeEventSourceContext,
* SqlServerPartition, SqlServerOffsetContext)}. A {@link StreamingChangeEventSource} based on SQL
* Server change data capture functionality. A main loop polls database DDL change and change data
* tables and turns them into change events.
* SqlServerStreamingChangeEventSource#afterHandleLsn(SqlServerPartition, Lsn)}. Also implemented
* {@link SqlServerStreamingChangeEventSource#execute( ChangeEventSourceContext, SqlServerPartition,
* SqlServerOffsetContext)}. A {@link StreamingChangeEventSource} based on SQL Server change data
* capture functionality. A main loop polls database DDL change and change data tables and turns
* them into change events.
*
* <p>The connector uses CDC functionality of SQL Server that is implemented as as a process that
* monitors source table and write changes from the table into the change table.
Expand Down Expand Up @@ -198,7 +198,6 @@ public boolean executeIteration(

if (context.isRunning()) {
commitTransaction();
afterHandleLsn(partition, offsetContext);
final Lsn toLsn =
getToLsn(
dataConnection,
Expand Down Expand Up @@ -449,6 +448,8 @@ public boolean executeIteration(
TxLogPosition.valueOf(toLsn));
// Terminate the transaction otherwise CDC could not be disabled for tables
dataConnection.rollback();
// Determine whether to continue streaming in sqlserver cdc snapshot phase
afterHandleLsn(partition, toLsn);
} catch (SQLException e) {
tablesSlot.set(
processErrorFromChangeTableQuery(databaseName, e, tablesSlot.get()));
Expand Down Expand Up @@ -625,8 +626,7 @@ private Lsn getToLsn(
}

/** expose control to the user to stop the connector. */
protected void afterHandleLsn(
SqlServerPartition partition, SqlServerOffsetContext offsetContext) {
protected void afterHandleLsn(SqlServerPartition partition, Lsn toLsn) {
// do nothing
}
}

0 comments on commit 6fa02ef

Please sign in to comment.