diff --git a/flink-connector-sqlserver-cdc/src/main/java/com/ververica/cdc/connectors/sqlserver/source/dialect/SqlServerChunkSplitter.java b/flink-connector-sqlserver-cdc/src/main/java/com/ververica/cdc/connectors/sqlserver/source/dialect/SqlServerChunkSplitter.java index de5e1bcd53..3800e0cb41 100644 --- a/flink-connector-sqlserver-cdc/src/main/java/com/ververica/cdc/connectors/sqlserver/source/dialect/SqlServerChunkSplitter.java +++ b/flink-connector-sqlserver-cdc/src/main/java/com/ververica/cdc/connectors/sqlserver/source/dialect/SqlServerChunkSplitter.java @@ -274,7 +274,7 @@ private List splitUnevenlySizedChunks( while (chunkEnd != null && ObjectUtils.compare(chunkEnd, max) <= 0) { // we start from [null, min + chunk_size) and avoid [null, min) splits.add(ChunkRange.of(chunkStart, chunkEnd)); - // may sleep a while to avoid DDOS on SqlServer server + // may sleep awhile to avoid DDOS on SqlServer server maySleep(count++, tableId); chunkStart = chunkEnd; chunkEnd = nextChunkEnd(jdbc, chunkEnd, tableId, splitColumnName, max, chunkSize); diff --git a/flink-connector-sqlserver-cdc/src/main/java/com/ververica/cdc/connectors/sqlserver/source/reader/fetch/SqlServerScanFetchTask.java b/flink-connector-sqlserver-cdc/src/main/java/com/ververica/cdc/connectors/sqlserver/source/reader/fetch/SqlServerScanFetchTask.java index 24430084c6..7013eec260 100644 --- a/flink-connector-sqlserver-cdc/src/main/java/com/ververica/cdc/connectors/sqlserver/source/reader/fetch/SqlServerScanFetchTask.java +++ b/flink-connector-sqlserver-cdc/src/main/java/com/ververica/cdc/connectors/sqlserver/source/reader/fetch/SqlServerScanFetchTask.java @@ -159,16 +159,22 @@ private LsnSplitReadTask createBackFillLsnSplitReadTask( // we should only capture events for the current table, // otherwise, we may can't find corresponding schema Configuration dezConf = - context.getSourceConfig() - .getDbzConfiguration() + context.getDbzConnectorConfig() + .getConfig() .edit() - .with("table.include.list", split.getTableId().toString()) + // table.include.list is schema.table format + .with( + "table.include.list", + new TableId( + null, + split.getTableId().schema(), + split.getTableId().table())) // Disable heartbeat event in snapshot split fetcher .with(Heartbeat.HEARTBEAT_INTERVAL, 0) .build(); // task to read binlog and backfill for current split return new LsnSplitReadTask( - context.getDbzConnectorConfig(), + new SqlServerConnectorConfig(dezConf), context.getConnection(), context.getMetaDataConnection(), context.getDispatcher(), diff --git a/flink-connector-sqlserver-cdc/src/main/java/com/ververica/cdc/connectors/sqlserver/source/reader/fetch/SqlServerStreamFetchTask.java b/flink-connector-sqlserver-cdc/src/main/java/com/ververica/cdc/connectors/sqlserver/source/reader/fetch/SqlServerStreamFetchTask.java index bc47d0b359..0e70c81722 100644 --- a/flink-connector-sqlserver-cdc/src/main/java/com/ververica/cdc/connectors/sqlserver/source/reader/fetch/SqlServerStreamFetchTask.java +++ b/flink-connector-sqlserver-cdc/src/main/java/com/ververica/cdc/connectors/sqlserver/source/reader/fetch/SqlServerStreamFetchTask.java @@ -17,12 +17,14 @@ package com.ververica.cdc.connectors.sqlserver.source.reader.fetch; import com.ververica.cdc.connectors.base.relational.JdbcSourceEventDispatcher; +import com.ververica.cdc.connectors.base.source.meta.offset.Offset; import com.ververica.cdc.connectors.base.source.meta.split.SourceSplitBase; import com.ververica.cdc.connectors.base.source.meta.split.StreamSplit; import com.ververica.cdc.connectors.base.source.meta.wartermark.WatermarkKind; import com.ververica.cdc.connectors.base.source.reader.external.FetchTask; import com.ververica.cdc.connectors.sqlserver.source.offset.LsnOffset; import io.debezium.DebeziumException; +import io.debezium.connector.sqlserver.Lsn; import io.debezium.connector.sqlserver.SqlServerConnection; import io.debezium.connector.sqlserver.SqlServerConnectorConfig; import io.debezium.connector.sqlserver.SqlServerDatabaseSchema; @@ -36,7 +38,6 @@ import org.slf4j.LoggerFactory; import static com.ververica.cdc.connectors.sqlserver.source.offset.LsnOffset.NO_STOPPING_OFFSET; -import static com.ververica.cdc.connectors.sqlserver.source.utils.SqlServerUtils.getLsnPosition; /** The task to work for fetching data of SqlServer table stream split . */ public class SqlServerStreamFetchTask implements FetchTask { @@ -121,19 +122,17 @@ public LsnSplitReadTask( } @Override - public void afterHandleLsn( - SqlServerPartition partition, SqlServerOffsetContext offsetContext) { + public void afterHandleLsn(SqlServerPartition partition, Lsn toLsn) { // check do we need to stop for fetch binlog for snapshot split. if (isBoundedRead()) { - final LsnOffset currentRedoLogOffset = getLsnPosition(offsetContext.getOffset()); - // reach the high watermark, the binlog fetcher should be finished - if (currentRedoLogOffset.isAtOrAfter(lsnSplit.getEndingOffset())) { + Offset endingOffset = lsnSplit.getEndingOffset(); + if (toLsn.compareTo(((LsnOffset) endingOffset).getLcn()) >= 0) { // send binlog end event try { dispatcher.dispatchWatermarkEvent( partition.getSourcePartition(), lsnSplit, - currentRedoLogOffset, + endingOffset, WatermarkKind.END); } catch (InterruptedException e) { LOG.error("Send signal event error.", e); diff --git a/flink-connector-sqlserver-cdc/src/main/java/io/debezium/connector/sqlserver/SqlServerStreamingChangeEventSource.java b/flink-connector-sqlserver-cdc/src/main/java/io/debezium/connector/sqlserver/SqlServerStreamingChangeEventSource.java index 9073b2844a..fd9805ee0f 100644 --- a/flink-connector-sqlserver-cdc/src/main/java/io/debezium/connector/sqlserver/SqlServerStreamingChangeEventSource.java +++ b/flink-connector-sqlserver-cdc/src/main/java/io/debezium/connector/sqlserver/SqlServerStreamingChangeEventSource.java @@ -38,11 +38,11 @@ /** * Copied from Debezium project(1.9.7.final) to add method {@link - * SqlServerStreamingChangeEventSource#afterHandleLsn(SqlServerPartition, SqlServerOffsetContext)}. - * Also implemented {@link SqlServerStreamingChangeEventSource#execute( ChangeEventSourceContext, - * SqlServerPartition, SqlServerOffsetContext)}. A {@link StreamingChangeEventSource} based on SQL - * Server change data capture functionality. A main loop polls database DDL change and change data - * tables and turns them into change events. + * SqlServerStreamingChangeEventSource#afterHandleLsn(SqlServerPartition, Lsn)}. Also implemented + * {@link SqlServerStreamingChangeEventSource#execute( ChangeEventSourceContext, SqlServerPartition, + * SqlServerOffsetContext)}. A {@link StreamingChangeEventSource} based on SQL Server change data + * capture functionality. A main loop polls database DDL change and change data tables and turns + * them into change events. * *

The connector uses CDC functionality of SQL Server that is implemented as as a process that * monitors source table and write changes from the table into the change table. @@ -198,7 +198,6 @@ public boolean executeIteration( if (context.isRunning()) { commitTransaction(); - afterHandleLsn(partition, offsetContext); final Lsn toLsn = getToLsn( dataConnection, @@ -444,6 +443,8 @@ public boolean executeIteration( clock)); tableWithSmallestLsn.next(); } + // call after handle to judge whether to complete the stream + afterHandleLsn(partition, toLsn); }); streamingExecutionContext.setLastProcessedPosition( TxLogPosition.valueOf(toLsn)); @@ -625,8 +626,7 @@ private Lsn getToLsn( } /** expose control to the user to stop the connector. */ - protected void afterHandleLsn( - SqlServerPartition partition, SqlServerOffsetContext offsetContext) { + protected void afterHandleLsn(SqlServerPartition partition, Lsn toLsn) { // do nothing } }