Skip to content

Commit

Permalink
[cdc-connector][sqlserver] sqlserver support use specific chunk colum…
Browse files Browse the repository at this point in the history
…n as a split key (apache#2972)

This closes apache#2971.
  • Loading branch information
gong authored and joyCurry30 committed Mar 22, 2024
1 parent ddecf56 commit 8a5c9bb
Show file tree
Hide file tree
Showing 9 changed files with 257 additions and 24 deletions.
8 changes: 8 additions & 0 deletions docs/content/connectors/sqlserver-cdc.md
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,14 @@ Connector Options
If the flink version is greater than or equal to 1.15, the default value of 'execution.checkpointing.checkpoints-after-tasks-finish.enabled' has been changed to true,
so it does not need to be explicitly configured 'execution.checkpointing.checkpoints-after-tasks-finish.enabled' = 'true'
</td>
</tr>
<tr>
<td>scan.incremental.snapshot.chunk.key-column</td>
<td>optional</td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>The chunk key of table snapshot, captured tables are split into multiple chunks by a chunk key when read the snapshot of table.
By default, the chunk key is the first column of the primary key. This column must be a column of the primary key.</td>
</tr>
</tbody>
</table>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.flink.table.data.RowData;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
import org.apache.flink.util.ExceptionUtils;

import com.ververica.cdc.connectors.base.config.JdbcSourceConfig;
import com.ververica.cdc.connectors.base.options.StartupOptions;
Expand Down Expand Up @@ -60,6 +61,7 @@
import static org.apache.flink.table.api.DataTypes.STRING;
import static org.apache.flink.table.catalog.Column.physical;
import static org.apache.flink.util.Preconditions.checkState;
import static org.junit.Assert.assertTrue;

/** IT tests for {@link OracleSourceBuilder.OracleIncrementalSource}. */
public class OracleSourceITCase extends OracleSourceTestBase {
Expand Down Expand Up @@ -127,7 +129,9 @@ public void testReadSingleTableWithSingleParallelismAndSkipBackfill() throws Exc
FailoverType.TM,
FailoverPhase.SNAPSHOT,
new String[] {"CUSTOMERS"},
true);
true,
RestartStrategies.fixedDelayRestart(1, 0),
null);
}

@Test
Expand Down Expand Up @@ -302,6 +306,29 @@ public void testSkipBackfillWithDMLPostLowWaterMark() throws Exception {
assertEqualsInAnyOrder(expectedRecords, records);
}

@Test
public void testTableWithChunkColumnOfNoPrimaryKey() {
String chunkColumn = "NAME";
try {
testOracleParallelSource(
1,
FailoverType.NONE,
FailoverPhase.NEVER,
new String[] {"CUSTOMERS"},
false,
RestartStrategies.noRestart(),
chunkColumn);
} catch (Exception e) {
assertTrue(
ExceptionUtils.findThrowableWithMessage(
e,
String.format(
"Chunk key column '%s' doesn't exist in the primary key [%s] of the table %s.",
chunkColumn, "ID", "customer.DEBEZIUM.CUSTOMERS"))
.isPresent());
}
}

private List<String> testBackfillWhenWritingEvents(
boolean skipSnapshotBackfill, int fetchSize, int hookType) throws Exception {
createAndInitialize("customer.sql");
Expand Down Expand Up @@ -400,23 +427,31 @@ private void testOracleParallelSource(
String[] captureCustomerTables)
throws Exception {
testOracleParallelSource(
parallelism, failoverType, failoverPhase, captureCustomerTables, false);
parallelism,
failoverType,
failoverPhase,
captureCustomerTables,
false,
RestartStrategies.fixedDelayRestart(1, 0),
null);
}

private void testOracleParallelSource(
int parallelism,
FailoverType failoverType,
FailoverPhase failoverPhase,
String[] captureCustomerTables,
boolean skipSnapshotBackfill)
boolean skipSnapshotBackfill,
RestartStrategies.RestartStrategyConfiguration restartStrategyConfiguration,
String chunkColumn)
throws Exception {
createAndInitialize("customer.sql");
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

env.setParallelism(parallelism);
env.enableCheckpointing(200L);
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0));
env.setRestartStrategy(restartStrategyConfiguration);

String sourceDDL =
format(
Expand All @@ -439,6 +474,7 @@ private void testOracleParallelSource(
+ " 'debezium.log.mining.strategy' = 'online_catalog',"
+ " 'debezium.database.history.store.only.captured.tables.ddl' = 'true',"
+ " 'scan.incremental.snapshot.backfill.skip' = '%s'"
+ "%s"
+ ")",
ORACLE_CONTAINER.getHost(),
ORACLE_CONTAINER.getOraclePort(),
Expand All @@ -447,7 +483,12 @@ private void testOracleParallelSource(
ORACLE_DATABASE,
ORACLE_SCHEMA,
getTableNameRegex(captureCustomerTables), // (customer|customer_1)
skipSnapshotBackfill);
skipSnapshotBackfill,
chunkColumn == null
? ""
: ",'scan.incremental.snapshot.chunk.key-column'='"
+ chunkColumn
+ "'");

// first step: check the snapshot data
String[] snapshotForSingleTable =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,8 @@ public void testConsumingTableWithoutPrimaryKey() throws Exception {
FailoverPhase.NEVER,
new String[] {"customers_no_pk"},
RestartStrategies.noRestart(),
false);
false,
null);
} catch (Exception e) {
assertTrue(
ExceptionUtils.findThrowableWithMessage(
Expand All @@ -256,7 +257,8 @@ public void testConsumingTableWithoutPrimaryKey() throws Exception {
FailoverPhase.NEVER,
new String[] {"customers_no_pk"},
RestartStrategies.noRestart(),
false);
false,
null);
}
}

Expand All @@ -269,7 +271,8 @@ public void testReadSingleTableWithSingleParallelismAndSkipBackfill() throws Exc
FailoverPhase.SNAPSHOT,
new String[] {"customers"},
RestartStrategies.fixedDelayRestart(1, 0),
true);
true,
null);
}

@Test
Expand Down Expand Up @@ -621,6 +624,33 @@ public void testNewLsnCommittedWhenCheckpoint() throws Exception {
Thread.sleep(1000L);
}

@Test
public void testTableWithChunkColumnOfNoPrimaryKey() {
if (!DEFAULT_SCAN_STARTUP_MODE.equals(scanStartupMode)) {
return;
}
String chunkColumn = "name";
try {
testPostgresParallelSource(
1,
scanStartupMode,
FailoverType.NONE,
FailoverPhase.NEVER,
new String[] {"customers"},
RestartStrategies.noRestart(),
false,
chunkColumn);
} catch (Exception e) {
assertTrue(
ExceptionUtils.findThrowableWithMessage(
e,
String.format(
"Chunk key column '%s' doesn't exist in the primary key [%s] of the table %s.",
chunkColumn, "id", "customer.customers"))
.isPresent());
}
}

private List<String> testBackfillWhenWritingEvents(
boolean skipSnapshotBackfill,
int fetchSize,
Expand Down Expand Up @@ -721,7 +751,8 @@ private void testPostgresParallelSource(
failoverPhase,
captureCustomerTables,
RestartStrategies.fixedDelayRestart(1, 0),
false);
false,
null);
}

private void testPostgresParallelSource(
Expand All @@ -731,7 +762,8 @@ private void testPostgresParallelSource(
FailoverPhase failoverPhase,
String[] captureCustomerTables,
RestartStrategies.RestartStrategyConfiguration restartStrategyConfiguration,
boolean skipSnapshotBackfill)
boolean skipSnapshotBackfill,
String chunkColumn)
throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
Expand Down Expand Up @@ -761,6 +793,7 @@ private void testPostgresParallelSource(
+ " 'scan.incremental.snapshot.chunk.size' = '100',"
+ " 'slot.name' = '%s',"
+ " 'scan.incremental.snapshot.backfill.skip' = '%s'"
+ ""
+ ")",
customDatabase.getHost(),
customDatabase.getDatabasePort(),
Expand All @@ -771,7 +804,12 @@ private void testPostgresParallelSource(
getTableNameRegex(captureCustomerTables),
scanStartupMode,
slotName,
skipSnapshotBackfill);
skipSnapshotBackfill,
chunkColumn == null
? ""
: ",'scan.incremental.snapshot.chunk.key-column'='"
+ chunkColumn
+ "'");
tEnv.executeSql(sourceDDL);
TableResult tableResult = tEnv.executeSql("select * from customers");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,8 @@ public Collection<SnapshotSplit> generateSplits(TableId tableId) {
long start = System.currentTimeMillis();

Table table = dialect.queryTableSchema(jdbc, tableId).getTable();
Column splitColumn = SqlServerUtils.getSplitColumn(table);
Column splitColumn =
SqlServerUtils.getSplitColumn(table, sourceConfig.getChunkKeyColumn());
final List<ChunkRange> chunks;
try {
chunks = splitTableIntoChunks(jdbc, tableId, splitColumn);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import io.debezium.pipeline.metrics.StreamingChangeEventSourceMetrics;
import io.debezium.pipeline.source.spi.EventMetadataProvider;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.relational.Column;
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
import io.debezium.relational.Tables.TableFilter;
Expand Down Expand Up @@ -205,7 +206,8 @@ public SqlServerDatabaseSchema getDatabaseSchema() {

@Override
public RowType getSplitType(Table table) {
return SqlServerUtils.getSplitType(table);
Column splitColumn = SqlServerUtils.getSplitColumn(table, sourceConfig.getChunkKeyColumn());
return SqlServerUtils.getSplitType(splitColumn);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
import io.debezium.util.SchemaNameAdjuster;
import org.apache.kafka.connect.source.SourceRecord;

import javax.annotation.Nullable;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
Expand Down Expand Up @@ -158,7 +160,7 @@ public static Object queryNextChunkMax(
});
}

public static Column getSplitColumn(Table table) {
public static Column getSplitColumn(Table table, @Nullable String chunkKeyColumn) {
List<Column> primaryKeys = table.primaryKeyColumns();
if (primaryKeys.isEmpty()) {
throw new ValidationException(
Expand All @@ -168,15 +170,27 @@ public static Column getSplitColumn(Table table) {
table.id()));
}

if (chunkKeyColumn != null) {
Optional<Column> targetPkColumn =
primaryKeys.stream()
.filter(col -> chunkKeyColumn.equals(col.name()))
.findFirst();
if (targetPkColumn.isPresent()) {
return targetPkColumn.get();
}
throw new ValidationException(
String.format(
"Chunk key column '%s' doesn't exist in the primary key [%s] of the table %s.",
chunkKeyColumn,
primaryKeys.stream().map(Column::name).collect(Collectors.joining(",")),
table.id()));
}

// use first field in primary key as the split key
return primaryKeys.get(0);
}

public static RowType getSplitType(Table table) {
return getSplitType(getSplitColumn(table));
}

private static RowType getSplitType(Column splitColumn) {
public static RowType getSplitType(Column splitColumn) {
return (RowType)
ROW(FIELD(splitColumn.name(), SqlServerTypeUtils.fromDbzColumn(splitColumn)))
.getLogicalType();
Expand Down
Loading

0 comments on commit 8a5c9bb

Please sign in to comment.