Skip to content

Commit

Permalink
[postgres] Not drop replication slot for stream split (apache#2436)
Browse files Browse the repository at this point in the history
  • Loading branch information
loserwang1024 authored Sep 26, 2023
1 parent 82e3053 commit 928ccf1
Show file tree
Hide file tree
Showing 4 changed files with 136 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public abstract class JdbcSourceFetchTaskContext implements FetchTask.Context {

protected final JdbcSourceConfig sourceConfig;
protected final JdbcDataSourceDialect dataSourceDialect;
protected final CommonConnectorConfig dbzConnectorConfig;
protected CommonConnectorConfig dbzConnectorConfig;
protected final SchemaNameAdjuster schemaNameAdjuster;

public JdbcSourceFetchTaskContext(
Expand Down Expand Up @@ -156,6 +156,10 @@ public CommonConnectorConfig getDbzConnectorConfig() {
return dbzConnectorConfig;
}

public void setDbzConnectorConfig(CommonConnectorConfig dbzConnectorConfig) {
this.dbzConnectorConfig = dbzConnectorConfig;
}

public SchemaNameAdjuster getSchemaNameAdjuster() {
return SchemaNameAdjuster.create();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,13 @@
import com.ververica.cdc.connectors.postgres.source.offset.PostgresOffset;
import com.ververica.cdc.connectors.postgres.source.offset.PostgresOffsetUtils;
import com.ververica.cdc.connectors.postgres.source.utils.PostgresQueryUtils;
import io.debezium.config.Configuration;
import io.debezium.connector.postgresql.PostgresConnectorConfig;
import io.debezium.connector.postgresql.PostgresOffsetContext;
import io.debezium.connector.postgresql.PostgresPartition;
import io.debezium.connector.postgresql.PostgresSchema;
import io.debezium.connector.postgresql.connection.PostgresConnection;
import io.debezium.connector.postgresql.connection.PostgresReplicationConnection;
import io.debezium.connector.postgresql.spi.SlotState;
import io.debezium.heartbeat.Heartbeat;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.source.AbstractSnapshotChangeEventSource;
import io.debezium.pipeline.source.spi.ChangeEventSource;
Expand All @@ -60,8 +58,6 @@
import java.util.ArrayList;
import java.util.Objects;

import static io.debezium.connector.postgresql.PostgresConnectorConfig.DROP_SLOT_ON_STOP;
import static io.debezium.connector.postgresql.PostgresConnectorConfig.SLOT_NAME;
import static io.debezium.connector.postgresql.PostgresObjectUtils.waitForReplicationSlotReady;
import static io.debezium.connector.postgresql.Utils.currentOffset;
import static io.debezium.connector.postgresql.Utils.refreshSchema;
Expand Down Expand Up @@ -164,24 +160,9 @@ private void executeBackfillTask(
PostgresOffsetUtils.getPostgresOffsetContext(
loader, backfillSplit.getStartingOffset());

// we should only capture events for the current table,
// otherwise, we may not find corresponding schema
PostgresSourceConfig config = (PostgresSourceConfig) ctx.getSourceConfig();
Configuration dbzConf =
ctx.getDbzConnectorConfig()
.getConfig()
.edit()
.with("table.include.list", split.getTableId().toString())
.with(SLOT_NAME.name(), config.getSlotNameForBackfillTask())
// drop slot for backfill stream split
.with(DROP_SLOT_ON_STOP.name(), true)
// Disable heartbeat event in snapshot split fetcher
.with(Heartbeat.HEARTBEAT_INTERVAL, 0)
.build();

final PostgresStreamFetchTask.StreamSplitReadTask backfillReadTask =
new PostgresStreamFetchTask.StreamSplitReadTask(
new PostgresConnectorConfig(dbzConf),
ctx.getDbzConnectorConfig(),
ctx.getSnapShotter(),
ctx.getConnection(),
ctx.getDispatcher(),
Expand All @@ -195,7 +176,7 @@ private void executeBackfillTask(
LOG.info(
"Execute backfillReadTask for split {} with slot name {}",
split,
dbzConf.getString(SLOT_NAME.name()));
((PostgresSourceConfig) ctx.getSourceConfig()).getSlotNameForBackfillTask());
backfillReadTask.execute(
new PostgresChangeEventSourceContext(), ctx.getPartition(), postgresOffsetContext);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,11 @@ public void configure(SourceSplitBase sourceSplitBase) {
dbzConfig
.getConfig()
.edit()
.with(
"table.include.list",
((SnapshotSplit) sourceSplitBase)
.getTableId()
.toString())
.with(
SLOT_NAME.name(),
((PostgresSourceConfig) sourceConfig)
Expand All @@ -136,8 +141,19 @@ public void configure(SourceSplitBase sourceSplitBase) {
// Disable heartbeat event in snapshot split fetcher
.with(Heartbeat.HEARTBEAT_INTERVAL, 0)
.build());
} else {
dbzConfig =
new PostgresConnectorConfig(
dbzConfig
.getConfig()
.edit()
// never drop slot for stream split, which is also global split
.with(DROP_SLOT_ON_STOP.name(), false)
.build());
}

LOG.info("PostgresConnectorConfig is ", dbzConfig.getConfig().asProperties().toString());
setDbzConnectorConfig(dbzConfig);
PostgresConnectorConfig.SnapshotMode snapshotMode =
PostgresConnectorConfig.SnapshotMode.parse(
dbzConfig.getConfig().getString(SNAPSHOT_MODE));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,64 @@ public void testConsumingTableWithoutPrimaryKey() {
}
}

@Test
public void testDebeziumSlotDropOnStop() throws Exception {
String scanStartupMode = DEFAULT_SCAN_STARTUP_MODE;
customDatabase.createAndInitialize();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

env.setParallelism(2);
env.enableCheckpointing(200L);
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0));
String sourceDDL =
format(
"CREATE TABLE customers ("
+ " id BIGINT NOT NULL,"
+ " name STRING,"
+ " address STRING,"
+ " phone_number STRING,"
+ " primary key (id) not enforced"
+ ") WITH ("
+ " 'connector' = 'postgres-cdc',"
+ " 'scan.incremental.snapshot.enabled' = 'true',"
+ " 'hostname' = '%s',"
+ " 'port' = '%s',"
+ " 'username' = '%s',"
+ " 'password' = '%s',"
+ " 'database-name' = '%s',"
+ " 'schema-name' = '%s',"
+ " 'table-name' = '%s',"
+ " 'scan.startup.mode' = '%s',"
+ " 'scan.incremental.snapshot.chunk.size' = '100',"
+ " 'slot.name' = '%s', "
+ " 'debezium.slot.drop.on.stop' = 'true'"
+ ")",
customDatabase.getHost(),
customDatabase.getDatabasePort(),
customDatabase.getUsername(),
customDatabase.getPassword(),
customDatabase.getDatabaseName(),
SCHEMA_NAME,
"customers",
scanStartupMode,
getSlotName());
tEnv.executeSql(sourceDDL);
TableResult tableResult = tEnv.executeSql("select * from customers");

// first step: check the snapshot data
if (DEFAULT_SCAN_STARTUP_MODE.equals(scanStartupMode)) {
checkSnapshotData(
tableResult, FailoverType.JM, FailoverPhase.STREAM, new String[] {"customers"});
}

// second step: check the stream data
checkStreamDataWithDDLDuringFailover(
tableResult, FailoverType.JM, FailoverPhase.STREAM, new String[] {"customers"});

tableResult.getJobClient().get().cancel().get();
}

private void testPostgresParallelSource(
FailoverType failoverType, FailoverPhase failoverPhase, String[] captureCustomerTables)
throws Exception {
Expand Down Expand Up @@ -371,6 +429,61 @@ private void checkStreamData(
assertTrue(!hasNextData(iterator));
}

private void checkStreamDataWithDDLDuringFailover(
TableResult tableResult,
FailoverType failoverType,
FailoverPhase failoverPhase,
String[] captureCustomerTables)
throws Exception {
waitUntilJobRunning(tableResult);
CloseableIterator<Row> iterator = tableResult.collect();
JobID jobId = tableResult.getJobClient().get().getJobID();

for (String tableId : captureCustomerTables) {
makeFirstPartStreamEvents(
getConnection(),
customDatabase.getDatabaseName() + '.' + SCHEMA_NAME + '.' + tableId);
}

// wait for the stream reading
Thread.sleep(2000L);

if (failoverPhase == FailoverPhase.STREAM) {
triggerFailover(
failoverType,
jobId,
miniClusterResource.getMiniCluster(),
() -> {
for (String tableId : captureCustomerTables) {
try {
makeSecondPartStreamEvents(
getConnection(),
customDatabase.getDatabaseName()
+ '.'
+ SCHEMA_NAME
+ '.'
+ tableId);
} catch (SQLException e) {
throw new RuntimeException(e);
}
}
sleepMs(200);
});
waitUntilJobRunning(tableResult);
}

List<String> expectedStreamData = new ArrayList<>();
for (int i = 0; i < captureCustomerTables.length; i++) {
expectedStreamData.addAll(firstPartStreamEvents);
expectedStreamData.addAll(secondPartStreamEvents);
}
// wait for the stream reading
Thread.sleep(2000L);

assertEqualsInAnyOrder(expectedStreamData, fetchRows(iterator, expectedStreamData.size()));
assertTrue(!hasNextData(iterator));
}

private void sleepMs(long millis) {
try {
Thread.sleep(millis);
Expand Down

0 comments on commit 928ccf1

Please sign in to comment.