Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[sqlserver] Fix backfill stream task hang #2374

Merged
merged 2 commits into from
Aug 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ private List<ChunkRange> splitUnevenlySizedChunks(
while (chunkEnd != null && ObjectUtils.compare(chunkEnd, max) <= 0) {
// we start from [null, min + chunk_size) and avoid [null, min)
splits.add(ChunkRange.of(chunkStart, chunkEnd));
// may sleep a while to avoid DDOS on SqlServer server
// may sleep awhile to avoid DDOS on SqlServer server
maySleep(count++, tableId);
chunkStart = chunkEnd;
chunkEnd = nextChunkEnd(jdbc, chunkEnd, tableId, splitColumnName, max, chunkSize);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,8 @@ public LsnOffset(Lsn scn, Lsn commitScn, Long eventSerialNo) {
this.offset = offsetMap;
}

public LsnOffset(Lsn lsn) {
this(lsn, null, null);
}

public Lsn getLcn() {
return Lsn.valueOf(offset.get(SourceInfo.COMMIT_LSN_KEY));
public LsnOffset(Lsn changeLsn) {
this(changeLsn, null, null);
}

@Override
Expand All @@ -69,8 +65,8 @@ public int compareTo(Offset offset) {
return -1;
}

Lsn lsn = this.getLcn();
Lsn targetLsn = that.getLcn();
Lsn lsn = Lsn.valueOf(this.offset.get(SourceInfo.COMMIT_LSN_KEY));
Lsn targetLsn = Lsn.valueOf(that.offset.get(SourceInfo.COMMIT_LSN_KEY));
if (targetLsn.isAvailable()) {
if (lsn.isAvailable()) {
return lsn.compareTo(targetLsn);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import com.ververica.cdc.connectors.base.source.meta.wartermark.WatermarkKind;
import com.ververica.cdc.connectors.base.source.reader.external.FetchTask;
import com.ververica.cdc.connectors.sqlserver.source.offset.LsnOffset;
import com.ververica.cdc.connectors.sqlserver.source.reader.fetch.SqlServerStreamFetchTask.LsnSplitReadTask;
import com.ververica.cdc.connectors.sqlserver.source.reader.fetch.SqlServerStreamFetchTask.StreamSplitReadTask;
import io.debezium.DebeziumException;
import io.debezium.config.Configuration;
import io.debezium.connector.sqlserver.SqlServerConnection;
Expand Down Expand Up @@ -118,7 +118,7 @@ public void execute(Context context) throws Exception {
final SqlServerOffsetContext streamOffsetContext =
loader.load(backfillBinlogSplit.getStartingOffset().getOffset());

final LsnSplitReadTask backfillBinlogReadTask =
final StreamSplitReadTask backfillBinlogReadTask =
createBackFillLsnSplitReadTask(backfillBinlogSplit, sourceFetchContext);
backfillBinlogReadTask.execute(
new SnapshotBinlogSplitChangeEventSourceContext(),
Expand Down Expand Up @@ -154,21 +154,27 @@ private void dispatchLsnEndEvent(
WatermarkKind.END);
}

private LsnSplitReadTask createBackFillLsnSplitReadTask(
private StreamSplitReadTask createBackFillLsnSplitReadTask(
StreamSplit backfillBinlogSplit, SqlServerSourceFetchTaskContext context) {
// we should only capture events for the current table,
// otherwise, we may can't find corresponding schema
Configuration dezConf =
context.getSourceConfig()
.getDbzConfiguration()
context.getDbzConnectorConfig()
.getConfig()
.edit()
.with("table.include.list", split.getTableId().toString())
// table.include.list is schema.table format
.with(
"table.include.list",
new TableId(
null,
split.getTableId().schema(),
split.getTableId().table()))
// Disable heartbeat event in snapshot split fetcher
.with(Heartbeat.HEARTBEAT_INTERVAL, 0)
.build();
// task to read binlog and backfill for current split
return new LsnSplitReadTask(
context.getDbzConnectorConfig(),
return new StreamSplitReadTask(
new SqlServerConnectorConfig(dezConf),
context.getConnection(),
context.getMetaDataConnection(),
context.getDispatcher(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,8 @@ public class SqlServerSourceFetchTaskContext extends JdbcSourceFetchTaskContext
* A separate connection for retrieving details of the schema changes; without it, adaptive
* buffering will not work.
*
* @link
* https://docs.microsoft.com/en-us/sql/connect/jdbc/using-adaptive-buffering?view=sql-server-2017#guidelines-for-using-adaptive-buffering
* <p>For more details, please refer to <a
* href="https://docs.microsoft.com/en-us/sql/connect/jdbc/using-adaptive-buffering?view=sql-server-2017#guidelines-for-using-adaptive-buffering">guidelines-for-using-adaptive-buffering</a>
*/
private final SqlServerConnection metaDataConnection;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@
package com.ververica.cdc.connectors.sqlserver.source.reader.fetch;

import com.ververica.cdc.connectors.base.relational.JdbcSourceEventDispatcher;
import com.ververica.cdc.connectors.base.source.meta.offset.Offset;
import com.ververica.cdc.connectors.base.source.meta.split.SourceSplitBase;
import com.ververica.cdc.connectors.base.source.meta.split.StreamSplit;
import com.ververica.cdc.connectors.base.source.meta.wartermark.WatermarkKind;
import com.ververica.cdc.connectors.base.source.reader.external.FetchTask;
import com.ververica.cdc.connectors.sqlserver.source.offset.LsnOffset;
import io.debezium.DebeziumException;
import io.debezium.connector.sqlserver.Lsn;
import io.debezium.connector.sqlserver.SqlServerConnection;
import io.debezium.connector.sqlserver.SqlServerConnectorConfig;
import io.debezium.connector.sqlserver.SqlServerDatabaseSchema;
Expand All @@ -36,14 +38,13 @@
import org.slf4j.LoggerFactory;

import static com.ververica.cdc.connectors.sqlserver.source.offset.LsnOffset.NO_STOPPING_OFFSET;
import static com.ververica.cdc.connectors.sqlserver.source.utils.SqlServerUtils.getLsnPosition;

/** The task to work for fetching data of SqlServer table stream split . */
public class SqlServerStreamFetchTask implements FetchTask<SourceSplitBase> {

private final StreamSplit split;
private volatile boolean taskRunning = false;
private LsnSplitReadTask redoLogSplitReadTask;
private StreamSplitReadTask redoLogSplitReadTask;

public SqlServerStreamFetchTask(StreamSplit split) {
this.split = split;
Expand All @@ -56,7 +57,7 @@ public void execute(Context context) throws Exception {
sourceFetchContext.getOffsetContext().preSnapshotCompletion();
taskRunning = true;
redoLogSplitReadTask =
new LsnSplitReadTask(
new StreamSplitReadTask(
sourceFetchContext.getDbzConnectorConfig(),
sourceFetchContext.getConnection(),
sourceFetchContext.getMetaDataConnection(),
Expand Down Expand Up @@ -91,15 +92,15 @@ public void close() {
* A wrapped task to read all binlog for table and also supports read bounded (from lowWatermark
* to highWatermark) binlog.
*/
public static class LsnSplitReadTask extends SqlServerStreamingChangeEventSource {
public static class StreamSplitReadTask extends SqlServerStreamingChangeEventSource {

private static final Logger LOG = LoggerFactory.getLogger(LsnSplitReadTask.class);
private static final Logger LOG = LoggerFactory.getLogger(StreamSplitReadTask.class);
private final StreamSplit lsnSplit;
private final JdbcSourceEventDispatcher<SqlServerPartition> dispatcher;
private final ErrorHandler errorHandler;
private ChangeEventSourceContext context;

public LsnSplitReadTask(
public StreamSplitReadTask(
SqlServerConnectorConfig connectorConfig,
SqlServerConnection connection,
SqlServerConnection metadataConnection,
Expand All @@ -121,26 +122,25 @@ public LsnSplitReadTask(
}

@Override
public void afterHandleLsn(
SqlServerPartition partition, SqlServerOffsetContext offsetContext) {
public void afterHandleLsn(SqlServerPartition partition, Lsn toLsn) {
// check do we need to stop for fetch binlog for snapshot split.
if (isBoundedRead()) {
final LsnOffset currentRedoLogOffset = getLsnPosition(offsetContext.getOffset());
// reach the high watermark, the binlog fetcher should be finished
if (currentRedoLogOffset.isAtOrAfter(lsnSplit.getEndingOffset())) {
// send binlog end event
LsnOffset currentLsnOffset = new LsnOffset(null, toLsn, null);
Offset endingOffset = lsnSplit.getEndingOffset();
if (currentLsnOffset.isAtOrAfter(endingOffset)) {
// send streaming end event
try {
dispatcher.dispatchWatermarkEvent(
partition.getSourcePartition(),
lsnSplit,
currentRedoLogOffset,
currentLsnOffset,
WatermarkKind.END);
} catch (InterruptedException e) {
LOG.error("Send signal event error.", e);
errorHandler.setProducerThrowable(
new DebeziumException("Error processing binlog signal event", e));
}
// tell fetcher the binlog task finished
// tell fetcher the streaming task finished
((SqlServerScanFetchTask.SnapshotBinlogSplitChangeEventSourceContext) context)
.finished();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,11 @@

/**
* Copied from Debezium project(1.9.7.final) to add method {@link
* SqlServerStreamingChangeEventSource#afterHandleLsn(SqlServerPartition, SqlServerOffsetContext)}.
* Also implemented {@link SqlServerStreamingChangeEventSource#execute( ChangeEventSourceContext,
* SqlServerPartition, SqlServerOffsetContext)}. A {@link StreamingChangeEventSource} based on SQL
* Server change data capture functionality. A main loop polls database DDL change and change data
* tables and turns them into change events.
* SqlServerStreamingChangeEventSource#afterHandleLsn(SqlServerPartition, Lsn)}. Also implemented
* {@link SqlServerStreamingChangeEventSource#execute( ChangeEventSourceContext, SqlServerPartition,
* SqlServerOffsetContext)}. A {@link StreamingChangeEventSource} based on SQL Server change data
* capture functionality. A main loop polls database DDL change and change data tables and turns
* them into change events.
*
* <p>The connector uses CDC functionality of SQL Server that is implemented as as a process that
* monitors source table and write changes from the table into the change table.
Expand Down Expand Up @@ -198,7 +198,6 @@ public boolean executeIteration(

if (context.isRunning()) {
commitTransaction();
afterHandleLsn(partition, offsetContext);
final Lsn toLsn =
getToLsn(
dataConnection,
Expand Down Expand Up @@ -449,6 +448,8 @@ public boolean executeIteration(
TxLogPosition.valueOf(toLsn));
// Terminate the transaction otherwise CDC could not be disabled for tables
dataConnection.rollback();
// Determine whether to continue streaming in sqlserver cdc snapshot phase
afterHandleLsn(partition, toLsn);
} catch (SQLException e) {
tablesSlot.set(
processErrorFromChangeTableQuery(databaseName, e, tablesSlot.get()));
Expand Down Expand Up @@ -625,8 +626,7 @@ private Lsn getToLsn(
}

/** expose control to the user to stop the connector. */
protected void afterHandleLsn(
SqlServerPartition partition, SqlServerOffsetContext offsetContext) {
protected void afterHandleLsn(SqlServerPartition partition, Lsn toLsn) {
// do nothing
}
}