diff --git a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/config/JdbcSourceConfigFactory.java b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/config/JdbcSourceConfigFactory.java index 8cfd08c37f..2fa10e1150 100644 --- a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/config/JdbcSourceConfigFactory.java +++ b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/config/JdbcSourceConfigFactory.java @@ -102,7 +102,7 @@ public JdbcSourceConfigFactory password(String password) { /** * The session time zone in database server, e.g. "America/Los_Angeles". It controls how the - * TIMESTAMP type in MYSQL converted to STRING. See more + * TIMESTAMP type converted to STRING. See more * https://debezium.io/documentation/reference/1.5/connectors/mysql.html#mysql-temporal-types */ public JdbcSourceConfigFactory serverTimeZone(String timeZone) { @@ -153,7 +153,7 @@ public JdbcSourceConfigFactory fetchSize(int fetchSize) { } /** - * The maximum time that the connector should wait after trying to connect to the MySQL database + * The maximum time that the connector should wait after trying to connect to the database * server before timing out. */ public JdbcSourceConfigFactory connectTimeout(Duration connectTimeout) { @@ -179,7 +179,7 @@ public JdbcSourceConfigFactory includeSchemaChanges(boolean includeSchemaChanges return this; } - /** The Debezium MySQL connector properties. For example, "snapshot.mode". */ + /** The Debezium connector properties. For example, "snapshot.mode". */ public JdbcSourceConfigFactory debeziumProperties(Properties properties) { this.dbzProperties = properties; return this; diff --git a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/options/JdbcSourceOptions.java b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/options/JdbcSourceOptions.java index 5106a06625..6312ba53ce 100644 --- a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/options/JdbcSourceOptions.java +++ b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/options/JdbcSourceOptions.java @@ -30,39 +30,38 @@ public class JdbcSourceOptions extends SourceOptions { ConfigOptions.key("hostname") .stringType() .noDefaultValue() - .withDescription("IP address or hostname of the MySQL database server."); + .withDescription("IP address or hostname of the database server."); public static final ConfigOption PORT = ConfigOptions.key("port") .intType() .defaultValue(3306) - .withDescription("Integer port number of the MySQL database server."); + .withDescription("Integer port number of the database server."); public static final ConfigOption USERNAME = ConfigOptions.key("username") .stringType() .noDefaultValue() .withDescription( - "Name of the MySQL database to use when connecting to the MySQL database server."); + "Name of the database to use when connecting to the database server."); public static final ConfigOption PASSWORD = ConfigOptions.key("password") .stringType() .noDefaultValue() - .withDescription( - "Password to use when connecting to the MySQL database server."); + .withDescription("Password to use when connecting to the database server."); public static final ConfigOption DATABASE_NAME = ConfigOptions.key("database-name") .stringType() .noDefaultValue() - .withDescription("Database name of the MySQL server to monitor."); + .withDescription("Database name of the database to monitor."); public static final ConfigOption TABLE_NAME = ConfigOptions.key("table-name") .stringType() .noDefaultValue() - .withDescription("Table name of the MySQL database to monitor."); + .withDescription("Table name of the database to monitor."); public static final ConfigOption SERVER_TIME_ZONE = ConfigOptions.key("server-time-zone") @@ -89,7 +88,7 @@ public class JdbcSourceOptions extends SourceOptions { .durationType() .defaultValue(Duration.ofSeconds(30)) .withDescription( - "The maximum time that the connector should wait after trying to connect to the MySQL database server before timing out."); + "The maximum time that the connector should wait after trying to connect to the database server before timing out."); public static final ConfigOption CONNECTION_POOL_SIZE = ConfigOptions.key("connection.pool.size") @@ -102,5 +101,5 @@ public class JdbcSourceOptions extends SourceOptions { .intType() .defaultValue(3) .withDescription( - "The max retry times that the connector should retry to build MySQL database server connection."); + "The max retry times that the connector should retry to build database server connection."); } diff --git a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/options/SourceOptions.java b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/options/SourceOptions.java index dba621777d..264368e5d4 100644 --- a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/options/SourceOptions.java +++ b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/options/SourceOptions.java @@ -56,7 +56,7 @@ public class SourceOptions { .stringType() .defaultValue("initial") .withDescription( - "Optional startup mode for MySQL CDC consumer, valid enumerations are " + "Optional startup mode for CDC consumer, valid enumerations are " + "\"initial\", \"earliest-offset\", \"latest-offset\", \"timestamp\"\n" + "or \"specific-offset\""); @@ -97,7 +97,7 @@ public class SourceOptions { "The upper bound of chunk key distribution factor. The distribution factor is used to determine whether the" + " table is evenly distribution or not." + " The table chunks would use evenly calculation optimization when the data distribution is even," - + " and the query MySQL for splitting would happen when it is uneven." + + " and the query for splitting would happen when it is uneven." + " The distribution factor could be calculated by (MAX(id) - MIN(id) + 1) / rowCount."); public static final ConfigOption SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND = @@ -109,6 +109,6 @@ public class SourceOptions { "The lower bound of chunk key distribution factor. The distribution factor is used to determine whether the" + " table is evenly distribution or not." + " The table chunks would use evenly calculation optimization when the data distribution is even," - + " and the query MySQL for splitting would happen when it is uneven." + + " and the query for splitting would happen when it is uneven." + " The distribution factor could be calculated by (MAX(id) - MIN(id) + 1) / rowCount."); } diff --git a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/options/StartupOptions.java b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/options/StartupOptions.java index 602ab47262..f306f22160 100644 --- a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/options/StartupOptions.java +++ b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/options/StartupOptions.java @@ -32,7 +32,7 @@ public final class StartupOptions implements Serializable { /** * Performs an initial snapshot on the monitored database tables upon first startup, and - * continue to read the latest binlog. + * continue to read the latest change log. */ public static StartupOptions initial() { return new StartupOptions(StartupMode.INITIAL, null, null, null); @@ -40,8 +40,8 @@ public static StartupOptions initial() { /** * Never to perform snapshot on the monitored database tables upon first startup, just read from - * the beginning of the binlog. This should be used with care, as it is only valid when the - * binlog is guaranteed to contain the entire history of the database. + * the beginning of the change log. This should be used with care, as it is only valid when the + * change log is guaranteed to contain the entire history of the database. */ public static StartupOptions earliest() { return new StartupOptions(StartupMode.EARLIEST_OFFSET, null, null, null); @@ -49,7 +49,7 @@ public static StartupOptions earliest() { /** * Never to perform snapshot on the monitored database tables upon first startup, just read from - * the end of the binlog which means only have the changes since the connector was started. + * the end of the change log which means only have the changes since the connector was started. */ public static StartupOptions latest() { return new StartupOptions(StartupMode.LATEST_OFFSET, null, null, null); @@ -57,7 +57,7 @@ public static StartupOptions latest() { /** * Never to perform snapshot on the monitored database tables upon first startup, and directly - * read binlog from the specified offset. + * read change log from the specified offset. */ public static StartupOptions specificOffset(String specificOffsetFile, int specificOffsetPos) { return new StartupOptions( @@ -66,10 +66,10 @@ public static StartupOptions specificOffset(String specificOffsetFile, int speci /** * Never to perform snapshot on the monitored database tables upon first startup, and directly - * read binlog from the specified timestamp. + * read change log from the specified timestamp. * - *

The consumer will traverse the binlog from the beginning and ignore change events whose - * timestamp is smaller than the specified timestamp. + *

The consumer will traverse the change log from the beginning and ignore change events + * whose timestamp is smaller than the specified timestamp. * * @param startupTimestampMillis timestamp for the startup offsets, as milliseconds from epoch. */ diff --git a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/relational/JdbcSourceEventDispatcher.java b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/relational/JdbcSourceEventDispatcher.java index 58244cdf38..33ad9d0c4a 100644 --- a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/relational/JdbcSourceEventDispatcher.java +++ b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/relational/JdbcSourceEventDispatcher.java @@ -288,7 +288,7 @@ private Struct signalRecordValue(String splitId, WatermarkKind watermarkKind) { public enum WatermarkKind { LOW, HIGH, - BINLOG_END; + END; public WatermarkKind fromString(String kindString) { if (LOW.name().equalsIgnoreCase(kindString)) { @@ -296,7 +296,7 @@ public WatermarkKind fromString(String kindString) { } else if (HIGH.name().equalsIgnoreCase(kindString)) { return HIGH; } else { - return BINLOG_END; + return END; } } } diff --git a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/relational/JdbcSourceRecordEmitter.java b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/relational/JdbcSourceRecordEmitter.java index ac98927c99..71766e5816 100644 --- a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/relational/JdbcSourceRecordEmitter.java +++ b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/relational/JdbcSourceRecordEmitter.java @@ -50,7 +50,7 @@ /** * The {@link RecordEmitter} implementation for {@link JdbcIncrementalSourceReader}. * - *

The {@link RecordEmitter} buffers the snapshot records of split and call the binlog reader to + *

The {@link RecordEmitter} buffers the snapshot records of split and call the stream reader to * emit records rather than emit the records directly. */ public class JdbcSourceRecordEmitter diff --git a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/assigner/HybridSplitAssigner.java b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/assigner/HybridSplitAssigner.java index 7d70e86fbd..f128fb0be8 100644 --- a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/assigner/HybridSplitAssigner.java +++ b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/assigner/HybridSplitAssigner.java @@ -43,7 +43,7 @@ public class HybridSplitAssigner implements SplitAssigner { private static final Logger LOG = LoggerFactory.getLogger(HybridSplitAssigner.class); - private static final String BINLOG_SPLIT_ID = "binlog-split"; + private static final String STREAM_SPLIT_ID = "stream-split"; private final int splitMetaGroupSize; @@ -107,18 +107,18 @@ public void open() { @Override public Optional getNext() { if (snapshotSplitAssigner.noMoreSplits()) { - // binlog split assigning + // stream split assigning if (isStreamSplitAssigned) { // no more splits for the assigner return Optional.empty(); } else if (snapshotSplitAssigner.isFinished()) { // we need to wait snapshot-assigner to be finished before - // assigning the binlog split. Otherwise, records emitted from binlog split + // assigning the stream split. Otherwise, records emitted from stream split // might be out-of-order in terms of same primary key with snapshot splits. isStreamSplitAssigned = true; return Optional.of(createStreamSplit()); } else { - // binlog split is not ready by now + // stream split is not ready by now return Optional.empty(); } } else { @@ -149,7 +149,7 @@ public void addSplits(Collection splits) { if (split.isSnapshotSplit()) { snapshotSplits.add(split); } else { - // we don't store the split, but will re-create binlog split later + // we don't store the split, but will re-create stream split later isStreamSplitAssigned = false; } } @@ -183,12 +183,12 @@ public StreamSplit createStreamSplit() { Map splitFinishedOffsets = snapshotSplitAssigner.getSplitFinishedOffsets(); final List finishedSnapshotSplitInfos = new ArrayList<>(); - Offset minBinlogOffset = null; + Offset minOffset = null; for (SchemalessSnapshotSplit split : assignedSnapshotSplit) { - // find the min binlog offset - Offset binlogOffset = splitFinishedOffsets.get(split.splitId()); - if (minBinlogOffset == null || binlogOffset.isBefore(minBinlogOffset)) { - minBinlogOffset = binlogOffset; + // find the min offset of change log + Offset changeLogOffset = splitFinishedOffsets.get(split.splitId()); + if (minOffset == null || changeLogOffset.isBefore(minOffset)) { + minOffset = changeLogOffset; } finishedSnapshotSplitInfos.add( new FinishedSnapshotSplitInfo( @@ -196,7 +196,7 @@ public StreamSplit createStreamSplit() { split.splitId(), split.getSplitStart(), split.getSplitEnd(), - binlogOffset, + changeLogOffset, offsetFactory)); } @@ -205,8 +205,8 @@ public StreamSplit createStreamSplit() { boolean divideMetaToGroups = finishedSnapshotSplitInfos.size() > splitMetaGroupSize; return new StreamSplit( - BINLOG_SPLIT_ID, - minBinlogOffset == null ? offsetFactory.createInitialOffset() : minBinlogOffset, + STREAM_SPLIT_ID, + minOffset == null ? offsetFactory.createInitialOffset() : minOffset, offsetFactory.createNoStoppingOffset(), divideMetaToGroups ? new ArrayList<>() : finishedSnapshotSplitInfos, new HashMap<>(), diff --git a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/assigner/SnapshotSplitAssigner.java b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/assigner/SnapshotSplitAssigner.java index f9bfc0c82e..b0dffe5e98 100644 --- a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/assigner/SnapshotSplitAssigner.java +++ b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/assigner/SnapshotSplitAssigner.java @@ -213,14 +213,14 @@ public List getFinishedSplitInfos() { .collect(Collectors.toList()); List finishedSnapshotSplitInfos = new ArrayList<>(); for (SchemalessSnapshotSplit split : assignedSnapshotSplit) { - Offset binlogOffset = splitFinishedOffsets.get(split.splitId()); + Offset finishedOffset = splitFinishedOffsets.get(split.splitId()); finishedSnapshotSplitInfos.add( new FinishedSnapshotSplitInfo( split.getTableId(), split.splitId(), split.getSplitStart(), split.getSplitEnd(), - binlogOffset, + finishedOffset, offsetFactory)); } return finishedSnapshotSplitInfos; @@ -231,7 +231,7 @@ public void onFinishedSplits(Map splitFinishedOffsets) { this.splitFinishedOffsets.putAll(splitFinishedOffsets); if (allSplitsFinished()) { // Skip the waiting checkpoint when current parallelism is 1 which means we do not need - // to care about the global output data order of snapshot splits and binlog split. + // to care about the global output data order of snapshot splits and stream split. if (currentParallelism == 1) { assignerFinished = true; LOG.info( diff --git a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/assigner/SplitAssigner.java b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/assigner/SplitAssigner.java index eacdd87b38..f4f9a63f8a 100644 --- a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/assigner/SplitAssigner.java +++ b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/assigner/SplitAssigner.java @@ -56,14 +56,14 @@ public interface SplitAssigner { boolean waitingForFinishedSplits(); /** - * Gets the finished splits information. This is useful meta data to generate a binlog split + * Gets the finished splits information. This is useful meta data to generate a stream split * that considering finished snapshot splits. */ List getFinishedSplitInfos(); /** - * Callback to handle the finished splits with finished binlog offset. This is useful for - * determine when to generate binlog split and what binlog split to generate. + * Callback to handle the finished splits with finished change log offset. This is useful for + * determine when to generate stream split and what stream split to generate. */ void onFinishedSplits(Map splitFinishedOffsets); diff --git a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/assigner/StreamSplitAssigner.java b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/assigner/StreamSplitAssigner.java index 33594f397f..959f613bd6 100644 --- a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/assigner/StreamSplitAssigner.java +++ b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/assigner/StreamSplitAssigner.java @@ -40,7 +40,7 @@ public class StreamSplitAssigner implements SplitAssigner { private static final Logger LOG = LoggerFactory.getLogger(StreamSplitAssigner.class); - private static final String BINLOG_SPLIT_ID = "binlog-split"; + private static final String STREAM_SPLIT_ID = "stream-split"; private final SourceConfig sourceConfig; @@ -103,7 +103,7 @@ public void onFinishedSplits(Map splitFinishedOffsets) { @Override public void addSplits(Collection splits) { - // we don't store the split, but will re-create binlog split later + // we don't store the split, but will re-create stream split later isStreamSplitAssigned = false; } @@ -125,7 +125,7 @@ public void close() {} public StreamSplit createStreamSplit() { return new StreamSplit( - BINLOG_SPLIT_ID, + STREAM_SPLIT_ID, dialect.displayCurrentOffset(sourceConfig), offsetFactory.createInitialOffset(), new ArrayList<>(), diff --git a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/assigner/state/HybridPendingSplitsState.java b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/assigner/state/HybridPendingSplitsState.java index 5479527507..fb9295ad6b 100644 --- a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/assigner/state/HybridPendingSplitsState.java +++ b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/assigner/state/HybridPendingSplitsState.java @@ -18,7 +18,7 @@ import java.util.Objects; -/** A {@link PendingSplitsState} for pending hybrid (snapshot & binlog) splits. */ +/** A {@link PendingSplitsState} for pending hybrid (snapshot & stream) splits. */ public class HybridPendingSplitsState extends PendingSplitsState { private final SnapshotPendingSplitsState snapshotPendingSplits; private final boolean isStreamSplitAssigned; @@ -60,7 +60,7 @@ public String toString() { return "HybridPendingSplitsState{" + "snapshotPendingSplits=" + snapshotPendingSplits - + ", isBinlogSplitAssigned=" + + ", isStreamSplitAssigned=" + isStreamSplitAssigned + '}'; } diff --git a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/assigner/state/PendingSplitsStateSerializer.java b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/assigner/state/PendingSplitsStateSerializer.java index cf08e2c0ac..7acd3ea3b2 100644 --- a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/assigner/state/PendingSplitsStateSerializer.java +++ b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/assigner/state/PendingSplitsStateSerializer.java @@ -38,10 +38,7 @@ import static com.ververica.cdc.connectors.base.source.meta.split.SourceSplitSerializer.readTableSchemas; import static com.ververica.cdc.connectors.base.source.meta.split.SourceSplitSerializer.writeTableSchemas; -/** - * The {@link SimpleVersionedSerializer Serializer} for the {@link PendingSplitsState} of MySQL CDC - * source. - */ +/** The {@link SimpleVersionedSerializer Serializer} for the {@link PendingSplitsState}. */ public class PendingSplitsStateSerializer implements SimpleVersionedSerializer { private static final int VERSION = 4; @@ -49,7 +46,7 @@ public class PendingSplitsStateSerializer implements SimpleVersionedSerializer

new DataOutputSerializer(64)); private static final int SNAPSHOT_PENDING_SPLITS_STATE_FLAG = 1; - private static final int BINLOG_PENDING_SPLITS_STATE_FLAG = 2; + private static final int STREAM_PENDING_SPLITS_STATE_FLAG = 2; private static final int HYBRID_PENDING_SPLITS_STATE_FLAG = 3; private final SourceSplitSerializer splitSerializer; @@ -76,8 +73,8 @@ public byte[] serialize(PendingSplitsState state) throws IOException { out.writeInt(SNAPSHOT_PENDING_SPLITS_STATE_FLAG); serializeSnapshotPendingSplitsState((SnapshotPendingSplitsState) state, out); } else if (state instanceof StreamPendingSplitsState) { - out.writeInt(BINLOG_PENDING_SPLITS_STATE_FLAG); - serializeBinlogPendingSplitsState((StreamPendingSplitsState) state, out); + out.writeInt(STREAM_PENDING_SPLITS_STATE_FLAG); + serializeStreamPendingSplitsState((StreamPendingSplitsState) state, out); } else if (state instanceof HybridPendingSplitsState) { out.writeInt(HYBRID_PENDING_SPLITS_STATE_FLAG); serializeHybridPendingSplitsState((HybridPendingSplitsState) state, out); @@ -118,8 +115,8 @@ public PendingSplitsState deserializeLegacyPendingSplitsState(byte[] serialized) return deserializeLegacySnapshotPendingSplitsState(splitVersion, in); } else if (stateFlag == HYBRID_PENDING_SPLITS_STATE_FLAG) { return deserializeLegacyHybridPendingSplitsState(splitVersion, in); - } else if (stateFlag == BINLOG_PENDING_SPLITS_STATE_FLAG) { - return deserializeBinlogPendingSplitsState(in); + } else if (stateFlag == STREAM_PENDING_SPLITS_STATE_FLAG) { + return deserializeStreamPendingSplitsState(in); } else { throw new IOException( "Unsupported to deserialize PendingSplitsState flag: " + stateFlag); @@ -135,8 +132,8 @@ public PendingSplitsState deserializePendingSplitsState(int version, byte[] seri return deserializeSnapshotPendingSplitsState(version, splitVersion, in); } else if (stateFlag == HYBRID_PENDING_SPLITS_STATE_FLAG) { return deserializeHybridPendingSplitsState(version, splitVersion, in); - } else if (stateFlag == BINLOG_PENDING_SPLITS_STATE_FLAG) { - return deserializeBinlogPendingSplitsState(in); + } else if (stateFlag == STREAM_PENDING_SPLITS_STATE_FLAG) { + return deserializeStreamPendingSplitsState(in); } else { throw new IOException( "Unsupported to deserialize PendingSplitsState flag: " + stateFlag); @@ -165,7 +162,7 @@ private void serializeHybridPendingSplitsState( out.writeBoolean(state.isStreamSplitAssigned()); } - private void serializeBinlogPendingSplitsState( + private void serializeStreamPendingSplitsState( StreamPendingSplitsState state, DataOutputSerializer out) throws IOException { out.writeBoolean(state.isStreamSplitAssigned()); } @@ -177,7 +174,7 @@ private void serializeBinlogPendingSplitsState( private SnapshotPendingSplitsState deserializeLegacySnapshotPendingSplitsState( int splitVersion, DataInputDeserializer in) throws IOException { List alreadyProcessedTables = readTableIds(in); - List remainingSplits = readMySqlSnapshotSplits(splitVersion, in); + List remainingSplits = readSnapshotSplits(splitVersion, in); Map assignedSnapshotSplits = readAssignedSnapshotSplits(splitVersion, in); @@ -216,14 +213,14 @@ private HybridPendingSplitsState deserializeLegacyHybridPendingSplitsState( int splitVersion, DataInputDeserializer in) throws IOException { SnapshotPendingSplitsState snapshotPendingSplitsState = deserializeLegacySnapshotPendingSplitsState(splitVersion, in); - boolean isBinlogSplitAssigned = in.readBoolean(); - return new HybridPendingSplitsState(snapshotPendingSplitsState, isBinlogSplitAssigned); + boolean isStreamSplitAssigned = in.readBoolean(); + return new HybridPendingSplitsState(snapshotPendingSplitsState, isStreamSplitAssigned); } private SnapshotPendingSplitsState deserializeSnapshotPendingSplitsState( int version, int splitVersion, DataInputDeserializer in) throws IOException { List alreadyProcessedTables = readTableIds(in); - List remainingSplits = readMySqlSnapshotSplits(splitVersion, in); + List remainingSplits = readSnapshotSplits(splitVersion, in); Map assignedSnapshotSplits = readAssignedSnapshotSplits(splitVersion, in); Map finishedOffsets = readFinishedOffsets(splitVersion, in); @@ -266,11 +263,11 @@ private HybridPendingSplitsState deserializeHybridPendingSplitsState( int version, int splitVersion, DataInputDeserializer in) throws IOException { SnapshotPendingSplitsState snapshotPendingSplitsState = deserializeSnapshotPendingSplitsState(version, splitVersion, in); - boolean isBinlogSplitAssigned = in.readBoolean(); - return new HybridPendingSplitsState(snapshotPendingSplitsState, isBinlogSplitAssigned); + boolean isStreamSplitAssigned = in.readBoolean(); + return new HybridPendingSplitsState(snapshotPendingSplitsState, isStreamSplitAssigned); } - private StreamPendingSplitsState deserializeBinlogPendingSplitsState(DataInputDeserializer in) + private StreamPendingSplitsState deserializeStreamPendingSplitsState(DataInputDeserializer in) throws IOException { return new StreamPendingSplitsState(in.readBoolean()); } @@ -295,9 +292,8 @@ private Map readFinishedOffsets(int offsetVersion, DataInputDese final int size = in.readInt(); for (int i = 0; i < size; i++) { String splitId = in.readUTF(); - Offset binlogOffset = splitSerializer.readOffsetPosition(offsetVersion, in); - // Offset binlogOffset = readBinlogPosition(offsetVersion, in); - splitsInfo.put(splitId, binlogOffset); + Offset offsetPosition = splitSerializer.readOffsetPosition(offsetVersion, in); + splitsInfo.put(splitId, offsetPosition); } return splitsInfo; } @@ -321,35 +317,35 @@ private Map readAssignedSnapshotSplits( final int size = in.readInt(); for (int i = 0; i < size; i++) { String splitId = in.readUTF(); - SnapshotSplit mySqlSplit = readMySqlSplit(splitVersion, in).asSnapshotSplit(); - assignedSplits.put(splitId, mySqlSplit); + SnapshotSplit snapshotSplit = readSnapshotSplit(splitVersion, in).asSnapshotSplit(); + assignedSplits.put(splitId, snapshotSplit); } return assignedSplits; } private void writeRemainingSplits( - Collection mySqlSplits, DataOutputSerializer out) throws IOException { - final int size = mySqlSplits.size(); + Collection remainingSplits, DataOutputSerializer out) throws IOException { + final int size = remainingSplits.size(); out.writeInt(size); - for (SourceSplitBase split : mySqlSplits) { + for (SourceSplitBase split : remainingSplits) { byte[] splitBytes = splitSerializer.serialize(split); out.writeInt(splitBytes.length); out.write(splitBytes); } } - private List readMySqlSnapshotSplits(int splitVersion, DataInputDeserializer in) + private List readSnapshotSplits(int splitVersion, DataInputDeserializer in) throws IOException { - List mySqlSplits = new ArrayList<>(); + List snapshotSplits = new ArrayList<>(); final int size = in.readInt(); for (int i = 0; i < size; i++) { - SnapshotSplit mySqlSplit = readMySqlSplit(splitVersion, in).asSnapshotSplit(); - mySqlSplits.add(mySqlSplit); + SnapshotSplit snapshotSplit = readSnapshotSplit(splitVersion, in).asSnapshotSplit(); + snapshotSplits.add(snapshotSplit); } - return mySqlSplits; + return snapshotSplits; } - private SourceSplitBase readMySqlSplit(int splitVersion, DataInputDeserializer in) + private SourceSplitBase readSnapshotSplit(int splitVersion, DataInputDeserializer in) throws IOException { int splitBytesLen = in.readInt(); byte[] splitBytes = new byte[splitBytesLen]; diff --git a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/assigner/state/StreamPendingSplitsState.java b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/assigner/state/StreamPendingSplitsState.java index 4c1e4cc30e..cadabc88a7 100644 --- a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/assigner/state/StreamPendingSplitsState.java +++ b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/assigner/state/StreamPendingSplitsState.java @@ -18,7 +18,7 @@ import java.util.Objects; -/** A {@link PendingSplitsState} for pending binlog splits. */ +/** A {@link PendingSplitsState} for pending stream splits. */ public class StreamPendingSplitsState extends PendingSplitsState { private final boolean isStreamSplitAssigned; diff --git a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/enumerator/IncrementalSourceEnumerator.java b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/enumerator/IncrementalSourceEnumerator.java index 9d28d90be7..c8cf10c9fb 100644 --- a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/enumerator/IncrementalSourceEnumerator.java +++ b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/enumerator/IncrementalSourceEnumerator.java @@ -49,8 +49,8 @@ import java.util.stream.Collectors; /** - * A CDC source enumerator that enumerates receive the split request and assign the split to source - * readers. + * Incremental source enumerator that enumerates receive the split request and assign the split to + * source readers. */ @Experimental public class IncrementalSourceEnumerator @@ -62,9 +62,9 @@ public class IncrementalSourceEnumerator private final SourceConfig sourceConfig; private final SplitAssigner splitAssigner; - // using TreeSet to prefer assigning binlog split to task-0 for easier debug + // using TreeSet to prefer assigning stream split to task-0 for easier debug private final TreeSet readersAwaitingSplit; - private List> binlogSplitMeta; + private List> finishedSnapshotSplitMeta; public IncrementalSourceEnumerator( SplitEnumeratorContext context, @@ -99,7 +99,7 @@ public void handleSplitRequest(int subtaskId, @Nullable String requesterHostname @Override public void addSplitsBack(List splits, int subtaskId) { - LOG.debug("MySQL Source Enumerator adds splits back: {}", splits); + LOG.debug("Incremental Source Enumerator adds splits back: {}", splits); splitAssigner.addSplits(splits); } @@ -125,9 +125,9 @@ public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) { context.sendEventToSourceReader(subtaskId, ackEvent); } else if (sourceEvent instanceof StreamSplitMetaRequestEvent) { LOG.debug( - "The enumerator receives request for binlog split meta from subtask {}.", + "The enumerator receives request for stream split meta from subtask {}.", subtaskId); - sendBinlogMeta(subtaskId, (StreamSplitMetaRequestEvent) sourceEvent); + sendStreamMetaRequestEvent(subtaskId, (StreamSplitMetaRequestEvent) sourceEvent); } } @@ -139,7 +139,7 @@ public PendingSplitsState snapshotState(long checkpointId) { @Override public void notifyCheckpointComplete(long checkpointId) { splitAssigner.notifyCheckpointComplete(checkpointId); - // binlog split may be available after checkpoint complete + // stream split may be available after checkpoint complete assignSplits(); } @@ -165,10 +165,10 @@ private void assignSplits() { Optional split = splitAssigner.getNext(); if (split.isPresent()) { - final SourceSplitBase mySqlSplit = split.get(); - context.assignSplit(mySqlSplit, nextAwaiting); + final SourceSplitBase sourceSplit = split.get(); + context.assignSplit(sourceSplit, nextAwaiting); awaitingReader.remove(); - LOG.info("Assign split {} to subtask {}", mySqlSplit, nextAwaiting); + LOG.info("Assign split {} to subtask {}", sourceSplit, nextAwaiting); } else { // there is no available splits by now, skip assigning break; @@ -199,9 +199,9 @@ private void syncWithReaders(int[] subtaskIds, Throwable t) { } } - private void sendBinlogMeta(int subTask, StreamSplitMetaRequestEvent requestEvent) { + private void sendStreamMetaRequestEvent(int subTask, StreamSplitMetaRequestEvent requestEvent) { // initialize once - if (binlogSplitMeta == null) { + if (finishedSnapshotSplitMeta == null) { final List finishedSnapshotSplitInfos = splitAssigner.getFinishedSplitInfos(); if (finishedSnapshotSplitInfos.isEmpty()) { @@ -210,14 +210,15 @@ private void sendBinlogMeta(int subTask, StreamSplitMetaRequestEvent requestEven throw new FlinkRuntimeException( "The assigner offer empty finished split information, this should not happen"); } - binlogSplitMeta = + finishedSnapshotSplitMeta = Lists.partition( finishedSnapshotSplitInfos, sourceConfig.getSplitMetaGroupSize()); } final int requestMetaGroupId = requestEvent.getRequestMetaGroupId(); - if (binlogSplitMeta.size() > requestMetaGroupId) { - List metaToSend = binlogSplitMeta.get(requestMetaGroupId); + if (finishedSnapshotSplitMeta.size() > requestMetaGroupId) { + List metaToSend = + finishedSnapshotSplitMeta.get(requestMetaGroupId); StreamSplitMetaEvent metadataEvent = new StreamSplitMetaEvent( requestEvent.getSplitId(), @@ -230,7 +231,7 @@ private void sendBinlogMeta(int subTask, StreamSplitMetaRequestEvent requestEven LOG.error( "Received invalid request meta group id {}, the invalid meta group id range is [0, {}]", requestMetaGroupId, - binlogSplitMeta.size() - 1); + finishedSnapshotSplitMeta.size() - 1); } } } diff --git a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/meta/events/FinishedSnapshotSplitsReportEvent.java b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/meta/events/FinishedSnapshotSplitsReportEvent.java index fb074be3b1..65a8eb8601 100644 --- a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/meta/events/FinishedSnapshotSplitsReportEvent.java +++ b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/meta/events/FinishedSnapshotSplitsReportEvent.java @@ -27,7 +27,7 @@ /** * The {@link SourceEvent} that {@link JdbcIncrementalSourceReader} sends to {@link * IncrementalSourceEnumerator} to notify the snapshot split has read finished with the consistent - * binlog position. + * change log position. */ public class FinishedSnapshotSplitsReportEvent implements SourceEvent { diff --git a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/meta/events/StreamSplitMetaEvent.java b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/meta/events/StreamSplitMetaEvent.java index 306917898e..1428a7383f 100644 --- a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/meta/events/StreamSplitMetaEvent.java +++ b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/meta/events/StreamSplitMetaEvent.java @@ -26,7 +26,7 @@ /** * The {@link SourceEvent} that {@link IncrementalSourceEnumerator} sends to {@link - * JdbcIncrementalSourceReader} to pass binlog metadata, i.e. {@link FinishedSnapshotSplitInfo}. + * JdbcIncrementalSourceReader} to pass change log metadata, i.e. {@link FinishedSnapshotSplitInfo}. */ public class StreamSplitMetaEvent implements SourceEvent { @@ -34,10 +34,10 @@ public class StreamSplitMetaEvent implements SourceEvent { private final String splitId; - /** The metadata of binlog split is divided to multiple groups. */ + /** The metadata of stream split is divided to multiple groups. */ private final int metaGroupId; /** - * The serialized metadata of binlog split, it's serialized/deserialized by {@link + * The serialized metadata of stream split, it's serialized/deserialized by {@link * FinishedSnapshotSplitInfo#serialize()} and {@link * FinishedSnapshotSplitInfo#deserialize(byte[])}. */ diff --git a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/meta/events/StreamSplitMetaRequestEvent.java b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/meta/events/StreamSplitMetaRequestEvent.java index a1b4c77d32..2326705238 100644 --- a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/meta/events/StreamSplitMetaRequestEvent.java +++ b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/meta/events/StreamSplitMetaRequestEvent.java @@ -23,7 +23,8 @@ /** * The {@link SourceEvent} that {@link JdbcIncrementalSourceReader} sends to {@link - * IncrementalSourceEnumerator} to pull binlog metadata, i.e. sending {@link StreamSplitMetaEvent}. + * IncrementalSourceEnumerator} to pull change log metadata, i.e. sending {@link + * StreamSplitMetaEvent}. */ public class StreamSplitMetaRequestEvent implements SourceEvent { diff --git a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/meta/offset/Offset.java b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/meta/offset/Offset.java index b01b762dc9..7b0944c250 100644 --- a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/meta/offset/Offset.java +++ b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/meta/offset/Offset.java @@ -25,10 +25,9 @@ import java.util.Objects; /** - * A structure describes a fine-grained offset in a binlog event including binlog position and gtid - * set etc. + * A structure describes a fine-grained offset in a change event including change log position. * - *

This structure can also be used to deal the binlog event in transaction, a transaction may + *

This structure can also be used to deal the change event in transaction, a transaction may * contain multiple change events, and each change event may contain multiple rows. When restart * from a specific {@link Offset}, we need to skip the processed change events and the processed * rows. diff --git a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/meta/offset/OffsetDeserializerSerializer.java b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/meta/offset/OffsetDeserializerSerializer.java index 9859a82fb7..c05837f6a0 100644 --- a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/meta/offset/OffsetDeserializerSerializer.java +++ b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/meta/offset/OffsetDeserializerSerializer.java @@ -56,11 +56,11 @@ default Offset readOffsetPosition(int offsetVersion, DataInputDeserializer in) default Offset readOffsetPosition(DataInputDeserializer in) throws IOException { boolean offsetNonNull = in.readBoolean(); if (offsetNonNull) { - int binlogOffsetBytesLength = in.readInt(); - byte[] binlogOffsetBytes = new byte[binlogOffsetBytesLength]; - in.readFully(binlogOffsetBytes); + int offsetBytesLength = in.readInt(); + byte[] offsetBytes = new byte[offsetBytesLength]; + in.readFully(offsetBytes); OffsetDeserializer offsetDeserializer = createOffsetDeserializer(); - return offsetDeserializer.deserialize(binlogOffsetBytes); + return offsetDeserializer.deserialize(offsetBytes); } else { return null; } @@ -69,9 +69,9 @@ default Offset readOffsetPosition(DataInputDeserializer in) throws IOException { default void writeOffsetPosition(Offset offset, DataOutputSerializer out) throws IOException { out.writeBoolean(offset != null); if (offset != null) { - byte[] binlogOffsetBytes = OffsetSerializer.INSTANCE.serialize(offset); - out.writeInt(binlogOffsetBytes.length); - out.write(binlogOffsetBytes); + byte[] offsetBytes = OffsetSerializer.INSTANCE.serialize(offset); + out.writeInt(offsetBytes.length); + out.write(offsetBytes); } } diff --git a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/meta/split/SnapshotSplit.java b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/meta/split/SnapshotSplit.java index e344aa4650..c99c81270f 100644 --- a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/meta/split/SnapshotSplit.java +++ b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/meta/split/SnapshotSplit.java @@ -29,7 +29,7 @@ import java.util.Objects; import java.util.stream.Collectors; -/** The split to describe a split of a MySql table snapshot. */ +/** The split to describe a split of a database table snapshot. */ public class SnapshotSplit extends SourceSplitBase { private final TableId tableId; @@ -132,7 +132,7 @@ public String toString() { splitKeyType.getFields().stream() .map(RowType.RowField::asSummaryString) .collect(Collectors.joining(",", "[", "]")); - return "MySqlSnapshotSplit{" + return "SnapshotSplit{" + "tableId=" + tableId + ", splitId='" diff --git a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/meta/split/SourceSplitSerializer.java b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/meta/split/SourceSplitSerializer.java index c72174533a..36b5cc166c 100644 --- a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/meta/split/SourceSplitSerializer.java +++ b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/meta/split/SourceSplitSerializer.java @@ -49,7 +49,7 @@ public abstract class SourceSplitSerializer ThreadLocal.withInitial(() -> new DataOutputSerializer(64)); private static final int SNAPSHOT_SPLIT_FLAG = 1; - private static final int BINLOG_SPLIT_FLAG = 2; + private static final int STREAM_SPLIT_FLAG = 2; @Override public int getVersion() { @@ -85,25 +85,25 @@ public byte[] serialize(SourceSplitBase split) throws IOException { snapshotSplit.serializedFormCache = result; return result; } else { - final StreamSplit binlogSplit = split.asStreamSplit(); + final StreamSplit streamSplit = split.asStreamSplit(); // optimization: the splits lazily cache their own serialized form - if (binlogSplit.serializedFormCache != null) { - return binlogSplit.serializedFormCache; + if (streamSplit.serializedFormCache != null) { + return streamSplit.serializedFormCache; } final DataOutputSerializer out = SERIALIZER_CACHE.get(); - out.writeInt(BINLOG_SPLIT_FLAG); - out.writeUTF(binlogSplit.splitId()); + out.writeInt(STREAM_SPLIT_FLAG); + out.writeUTF(streamSplit.splitId()); out.writeUTF(""); - writeOffsetPosition(binlogSplit.getStartingOffset(), out); - writeOffsetPosition(binlogSplit.getEndingOffset(), out); - writeFinishedSplitsInfo(binlogSplit.getFinishedSnapshotSplitInfos(), out); - writeTableSchemas(binlogSplit.getTableSchemas(), out); - out.writeInt(binlogSplit.getTotalFinishedSplitSize()); + writeOffsetPosition(streamSplit.getStartingOffset(), out); + writeOffsetPosition(streamSplit.getEndingOffset(), out); + writeFinishedSplitsInfo(streamSplit.getFinishedSnapshotSplitInfos(), out); + writeTableSchemas(streamSplit.getTableSchemas(), out); + out.writeInt(streamSplit.getTotalFinishedSplitSize()); final byte[] result = out.getCopyOfBuffer(); out.clear(); // optimization: cache the serialized from, so we avoid the byte work during repeated // serialization - binlogSplit.serializedFormCache = result; + streamSplit.serializedFormCache = result; return result; } } @@ -141,7 +141,7 @@ public SourceSplitBase deserializeSplit(int version, byte[] serialized) throws I splitBoundaryEnd, highWatermark, tableSchemas); - } else if (splitKind == BINLOG_SPLIT_FLAG) { + } else if (splitKind == STREAM_SPLIT_FLAG) { String splitId = in.readUTF(); // skip split Key Type in.readUTF(); diff --git a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/meta/split/SourceSplitState.java b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/meta/split/SourceSplitState.java index 3549f1d916..25d0fc9404 100644 --- a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/meta/split/SourceSplitState.java +++ b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/meta/split/SourceSplitState.java @@ -30,7 +30,7 @@ public final boolean isSnapshotSplitState() { return getClass() == SnapshotSplitState.class; } - /** Checks whether this split state is a binlog split state. */ + /** Checks whether this split state is a stream split state. */ public final boolean isStreamSplitState() { return getClass() == StreamSplitState.class; } @@ -45,6 +45,6 @@ public final StreamSplitState asStreamSplitState() { return (StreamSplitState) this; } - /** Use the current split state to create a new MySqlSplit. */ + /** Use the current split state to create a new SourceSplit. */ public abstract SourceSplitBase toSourceSplit(); } diff --git a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/meta/split/StreamSplit.java b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/meta/split/StreamSplit.java index 809a02c98c..cbaa6d3555 100644 --- a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/meta/split/StreamSplit.java +++ b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/meta/split/StreamSplit.java @@ -26,7 +26,7 @@ import java.util.Map; import java.util.Objects; -/** The split to describe the binlog of MySql table(s). */ +/** The split to describe the change log of database table(s). */ public class StreamSplit extends SourceSplitBase { private final Offset startingOffset; @@ -108,7 +108,7 @@ public int hashCode() { @Override public String toString() { - return "MySqlBinlogSplit{" + return "StreamSplit{" + "splitId='" + splitId + '\'' @@ -123,26 +123,26 @@ public String toString() { // factory utils to build new StreamSplit instance // ------------------------------------------------------------------- public static StreamSplit appendFinishedSplitInfos( - StreamSplit binlogSplit, List splitInfos) { - splitInfos.addAll(binlogSplit.getFinishedSnapshotSplitInfos()); + StreamSplit streamSplit, List splitInfos) { + splitInfos.addAll(streamSplit.getFinishedSnapshotSplitInfos()); return new StreamSplit( - binlogSplit.splitId, - binlogSplit.getStartingOffset(), - binlogSplit.getEndingOffset(), + streamSplit.splitId, + streamSplit.getStartingOffset(), + streamSplit.getEndingOffset(), splitInfos, - binlogSplit.getTableSchemas(), - binlogSplit.getTotalFinishedSplitSize()); + streamSplit.getTableSchemas(), + streamSplit.getTotalFinishedSplitSize()); } public static StreamSplit fillTableSchemas( - StreamSplit binlogSplit, Map tableSchemas) { - tableSchemas.putAll(binlogSplit.getTableSchemas()); + StreamSplit streamSplit, Map tableSchemas) { + tableSchemas.putAll(streamSplit.getTableSchemas()); return new StreamSplit( - binlogSplit.splitId, - binlogSplit.getStartingOffset(), - binlogSplit.getEndingOffset(), - binlogSplit.getFinishedSnapshotSplitInfos(), + streamSplit.splitId, + streamSplit.getStartingOffset(), + streamSplit.getEndingOffset(), + streamSplit.getFinishedSnapshotSplitInfos(), tableSchemas, - binlogSplit.getTotalFinishedSplitSize()); + streamSplit.getTotalFinishedSplitSize()); } } diff --git a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/meta/split/StreamSplitState.java b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/meta/split/StreamSplitState.java index 1a19b05d6a..8bf770b5b6 100644 --- a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/meta/split/StreamSplitState.java +++ b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/meta/split/StreamSplitState.java @@ -24,7 +24,7 @@ import java.util.Map; -/** The state of split to describe the binlog of table(s). */ +/** The state of split to describe the change log of table(s). */ public class StreamSplitState extends SourceSplitState { @Nullable private Offset startingOffset; @@ -66,14 +66,14 @@ public void recordSchema(TableId tableId, TableChange latestTableChange) { @Override public StreamSplit toSourceSplit() { - final StreamSplit binlogSplit = split.asStreamSplit(); + final StreamSplit streamSplit = split.asStreamSplit(); return new StreamSplit( - binlogSplit.splitId(), + streamSplit.splitId(), getStartingOffset(), getEndingOffset(), - binlogSplit.asStreamSplit().getFinishedSnapshotSplitInfos(), + streamSplit.asStreamSplit().getFinishedSnapshotSplitInfos(), getTableSchemas(), - binlogSplit.getTotalFinishedSplitSize()); + streamSplit.getTotalFinishedSplitSize()); } @Override diff --git a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/reader/JdbcIncrementalSourceReader.java b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/reader/JdbcIncrementalSourceReader.java index d0c106e950..de77b2a083 100644 --- a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/reader/JdbcIncrementalSourceReader.java +++ b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/reader/JdbcIncrementalSourceReader.java @@ -73,7 +73,7 @@ public class JdbcIncrementalSourceReader private static final Logger LOG = LoggerFactory.getLogger(JdbcIncrementalSourceReader.class); private final Map finishedUnackedSplits; - private final Map uncompletedBinlogSplits; + private final Map uncompletedStreamSplits; private final int subtaskId; private final SourceSplitSerializer sourceSplitSerializer; private final JdbcSourceConfig sourceConfig; @@ -96,7 +96,7 @@ public JdbcIncrementalSourceReader( context); this.sourceConfig = sourceConfig; this.finishedUnackedSplits = new HashMap<>(); - this.uncompletedBinlogSplits = new HashMap<>(); + this.uncompletedStreamSplits = new HashMap<>(); this.subtaskId = context.getIndexOfSubtask(); this.sourceSplitSerializer = checkNotNull(sourceSplitSerializer); this.dialect = dialect; @@ -126,8 +126,8 @@ public List snapshotState(long checkpointId) { // add finished snapshot splits that didn't receive ack yet stateSplits.addAll(finishedUnackedSplits.values()); - // add binlog splits who are uncompleted - stateSplits.addAll(uncompletedBinlogSplits.values()); + // add stream splits who are uncompleted + stateSplits.addAll(uncompletedStreamSplits.values()); return stateSplits; } @@ -135,13 +135,13 @@ public List snapshotState(long checkpointId) { @Override protected void onSplitFinished(Map finishedSplitIds) { for (SourceSplitState splitState : finishedSplitIds.values()) { - SourceSplitBase mySqlSplit = splitState.toSourceSplit(); + SourceSplitBase sourceSplit = splitState.toSourceSplit(); checkState( - mySqlSplit.isSnapshotSplit(), + sourceSplit.isSnapshotSplit(), String.format( - "Only snapshot split could finish, but the actual split is binlog split %s", - mySqlSplit)); - finishedUnackedSplits.put(mySqlSplit.splitId(), mySqlSplit.asSnapshotSplit()); + "Only snapshot split could finish, but the actual split is stream split %s", + sourceSplit)); + finishedUnackedSplits.put(sourceSplit.splitId(), sourceSplit.asSnapshotSplit()); } reportFinishedSnapshotSplitsIfNeed(); context.sendSplitRequest(); @@ -160,31 +160,31 @@ public void addSplits(List splits) { unfinishedSplits.add(split); } } else { - // the binlog split is uncompleted + // the stream split is uncompleted if (!split.asStreamSplit().isCompletedSplit()) { - uncompletedBinlogSplits.put(split.splitId(), split.asStreamSplit()); - requestBinlogSplitMetaIfNeeded(split.asStreamSplit()); + uncompletedStreamSplits.put(split.splitId(), split.asStreamSplit()); + requestStreamSplitMetaIfNeeded(split.asStreamSplit()); } else { - uncompletedBinlogSplits.remove(split.splitId()); - StreamSplit mySqlBinlogSplit = - discoverTableSchemasForBinlogSplit(split.asStreamSplit()); - unfinishedSplits.add(mySqlBinlogSplit); + uncompletedStreamSplits.remove(split.splitId()); + StreamSplit streamSplit = + discoverTableSchemasForStreamSplit(split.asStreamSplit()); + unfinishedSplits.add(streamSplit); } } } // notify split enumerator again about the finished unacked snapshot splits reportFinishedSnapshotSplitsIfNeed(); - // add all un-finished splits (including binlog split) to SourceReaderBase + // add all un-finished splits (including stream split) to SourceReaderBase super.addSplits(unfinishedSplits); } - private StreamSplit discoverTableSchemasForBinlogSplit(StreamSplit split) { + private StreamSplit discoverTableSchemasForStreamSplit(StreamSplit split) { final String splitId = split.splitId(); if (split.getTableSchemas().isEmpty()) { try { Map tableSchemas = dialect.discoverDataCollectionSchemas(sourceConfig); - LOG.info("The table schema discovery for binlog split {} success", splitId); + LOG.info("The table schema discovery for stream split {} success", splitId); return StreamSplit.fillTableSchemas(split, tableSchemas); } catch (Exception e) { LOG.error("Failed to obtains table schemas due to {}", e.getMessage()); @@ -192,7 +192,7 @@ private StreamSplit discoverTableSchemasForBinlogSplit(StreamSplit split) { } } else { LOG.warn( - "The binlog split {} has table schemas yet, skip the table schema discovery", + "The stream split {} has table schemas yet, skip the table schema discovery", split); return split; } @@ -217,61 +217,61 @@ public void handleSourceEvents(SourceEvent sourceEvent) { reportFinishedSnapshotSplitsIfNeed(); } else if (sourceEvent instanceof StreamSplitMetaEvent) { LOG.debug( - "The subtask {} receives binlog meta with group id {}.", + "The subtask {} receives stream meta with group id {}.", subtaskId, ((StreamSplitMetaEvent) sourceEvent).getMetaGroupId()); - fillMetaDataForBinlogSplit((StreamSplitMetaEvent) sourceEvent); + fillMetaDataForStreamSplit((StreamSplitMetaEvent) sourceEvent); } else { super.handleSourceEvents(sourceEvent); } } - private void fillMetaDataForBinlogSplit(StreamSplitMetaEvent metadataEvent) { - StreamSplit binlogSplit = uncompletedBinlogSplits.get(metadataEvent.getSplitId()); - if (binlogSplit != null) { + private void fillMetaDataForStreamSplit(StreamSplitMetaEvent metadataEvent) { + StreamSplit streamSplit = uncompletedStreamSplits.get(metadataEvent.getSplitId()); + if (streamSplit != null) { final int receivedMetaGroupId = metadataEvent.getMetaGroupId(); final int expectedMetaGroupId = getNextMetaGroupId( - binlogSplit.getFinishedSnapshotSplitInfos().size(), + streamSplit.getFinishedSnapshotSplitInfos().size(), sourceConfig.getSplitMetaGroupSize()); if (receivedMetaGroupId == expectedMetaGroupId) { List metaDataGroup = metadataEvent.getMetaGroup().stream() .map(bytes -> sourceSplitSerializer.deserialize(bytes)) .collect(Collectors.toList()); - uncompletedBinlogSplits.put( - binlogSplit.splitId(), - StreamSplit.appendFinishedSplitInfos(binlogSplit, metaDataGroup)); + uncompletedStreamSplits.put( + streamSplit.splitId(), + StreamSplit.appendFinishedSplitInfos(streamSplit, metaDataGroup)); - LOG.info("Fill meta data of group {} to binlog split", metaDataGroup.size()); + LOG.info("Fill meta data of group {} to stream split", metaDataGroup.size()); } else { LOG.warn( - "Received out of oder binlog meta event for split {}, the received meta group id is {}, but expected is {}, ignore it", + "Received out of oder metadata event for split {}, the received meta group id is {}, but expected is {}, ignore it", metadataEvent.getSplitId(), receivedMetaGroupId, expectedMetaGroupId); } - requestBinlogSplitMetaIfNeeded(binlogSplit); + requestStreamSplitMetaIfNeeded(streamSplit); } else { LOG.warn( - "Received binlog meta event for split {}, but the uncompleted split map does not contain it", + "Received metadata event for split {}, but the uncompleted split map does not contain it", metadataEvent.getSplitId()); } } - private void requestBinlogSplitMetaIfNeeded(StreamSplit binlogSplit) { - final String splitId = binlogSplit.splitId(); - if (!binlogSplit.isCompletedSplit()) { + private void requestStreamSplitMetaIfNeeded(StreamSplit streamSplit) { + final String splitId = streamSplit.splitId(); + if (!streamSplit.isCompletedSplit()) { final int nextMetaGroupId = getNextMetaGroupId( - binlogSplit.getFinishedSnapshotSplitInfos().size(), + streamSplit.getFinishedSnapshotSplitInfos().size(), sourceConfig.getSplitMetaGroupSize()); StreamSplitMetaRequestEvent splitMetaRequestEvent = new StreamSplitMetaRequestEvent(splitId, nextMetaGroupId); context.sendSourceEventToCoordinator(splitMetaRequestEvent); } else { - LOG.info("The meta of binlog split {} has been collected success", splitId); - this.addSplits(Collections.singletonList(binlogSplit)); + LOG.info("The meta of stream split {} has been collected success", splitId); + this.addSplits(Collections.singletonList(streamSplit)); } } diff --git a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/reader/JdbcSourceSplitReader.java b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/reader/JdbcSourceSplitReader.java index 9649e0f5c8..e46f05477a 100644 --- a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/reader/JdbcSourceSplitReader.java +++ b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/reader/JdbcSourceSplitReader.java @@ -104,7 +104,7 @@ public void close() throws Exception { } protected void checkSplitOrStartNext() throws IOException { - // the binlog fetcher should keep alive + // the stream fetcher should keep alive if (currentFetcher instanceof JdbcSourceStreamFetcher) { return; } @@ -123,9 +123,9 @@ protected void checkSplitOrStartNext() throws IOException { currentFetcher = new JdbcSourceScanFetcher(taskContext, subtaskId); } } else { - // point from snapshot split to binlog split + // point from snapshot split to stream split if (currentFetcher != null) { - LOG.info("It's turn to read binlog split, close current snapshot fetcher."); + LOG.info("It's turn to read stream split, close current snapshot fetcher."); currentFetcher.close(); } final JdbcSourceFetchTaskContext taskContext = diff --git a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/reader/external/JdbcSourceScanFetcher.java b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/reader/external/JdbcSourceScanFetcher.java index 7d46b8314b..fca64087cc 100644 --- a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/reader/external/JdbcSourceScanFetcher.java +++ b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/reader/external/JdbcSourceScanFetcher.java @@ -50,8 +50,8 @@ import static com.ververica.cdc.connectors.base.utils.SourceRecordUtils.isEndWatermarkEvent; import static com.ververica.cdc.connectors.base.utils.SourceRecordUtils.isHighWatermarkEvent; import static com.ververica.cdc.connectors.base.utils.SourceRecordUtils.isLowWatermarkEvent; +import static com.ververica.cdc.connectors.base.utils.SourceRecordUtils.rewriteOutputBuffer; import static com.ververica.cdc.connectors.base.utils.SourceRecordUtils.splitKeyRangeContains; -import static com.ververica.cdc.connectors.base.utils.SourceRecordUtils.upsertBinlog; import static org.apache.flink.util.Preconditions.checkState; /** @@ -123,15 +123,16 @@ public Iterator pollSplitRecords() throws InterruptedException { checkReadException(); if (hasNextElement.get()) { - // data input: [low watermark event][snapshot events][high watermark event][binlog - // events][binlog-end event] + // eg: + // data input: [low watermark event][snapshot events][high watermark event][change + // events][end watermark event] // data output: [low watermark event][normalized events][high watermark event] - boolean reachBinlogStart = false; - boolean reachBinlogEnd = false; + boolean reachChangeLogStart = false; + boolean reachChangeLogEnd = false; SourceRecord lowWatermark = null; SourceRecord highWatermark = null; - Map snapshotRecords = new HashMap<>(); - while (!reachBinlogEnd) { + Map outputBuffer = new HashMap<>(); + while (!reachChangeLogEnd) { checkReadException(); List batch = queue.poll(); for (DataChangeEvent event : batch) { @@ -145,22 +146,22 @@ public Iterator pollSplitRecords() throws InterruptedException { if (highWatermark == null && isHighWatermarkEvent(record)) { highWatermark = record; // snapshot events capture end and begin to capture binlog events - reachBinlogStart = true; + reachChangeLogStart = true; continue; } - if (reachBinlogStart && isEndWatermarkEvent(record)) { + if (reachChangeLogStart && isEndWatermarkEvent(record)) { // capture to end watermark events, stop the loop - reachBinlogEnd = true; + reachChangeLogEnd = true; break; } - if (!reachBinlogStart) { - snapshotRecords.put((Struct) record.key(), record); + if (!reachChangeLogStart) { + outputBuffer.put((Struct) record.key(), record); } else { - if (isRequiredBinlogRecord(record)) { - // upsert binlog events through the record key - upsertBinlog(snapshotRecords, record); + if (isChangeRecordInChunkRange(record)) { + // rewrite overlapping snapshot records through the record key + rewriteOutputBuffer(outputBuffer, record); } } } @@ -170,7 +171,7 @@ public Iterator pollSplitRecords() throws InterruptedException { final List normalizedRecords = new ArrayList<>(); normalizedRecords.add(lowWatermark); - normalizedRecords.addAll(formatMessageTimestamp(snapshotRecords.values())); + normalizedRecords.addAll(formatMessageTimestamp(outputBuffer.values())); normalizedRecords.add(highWatermark); final List sourceRecordsSet = new ArrayList<>(); @@ -217,7 +218,7 @@ private void assertLowWatermark(SourceRecord lowWatermark) { lowWatermark)); } - private boolean isRequiredBinlogRecord(SourceRecord record) { + private boolean isChangeRecordInChunkRange(SourceRecord record) { if (isDataChangeRecord(record)) { Object[] key = getSplitKey(currentSnapshotSplit.getSplitKeyType(), record, nameAdjuster); diff --git a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/reader/external/JdbcSourceStreamFetcher.java b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/reader/external/JdbcSourceStreamFetcher.java index 2800954e6d..04baa41466 100644 --- a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/reader/external/JdbcSourceStreamFetcher.java +++ b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/reader/external/JdbcSourceStreamFetcher.java @@ -59,7 +59,7 @@ public class JdbcSourceStreamFetcher implements Fetcher pureBinlogPhaseTables; + private final Set pureStreamPhaseTables; private volatile ChangeEventQueue queue; private volatile Throwable readException; @@ -78,7 +78,7 @@ public JdbcSourceStreamFetcher(JdbcSourceFetchTaskContext taskContext, int subTa ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("debezium-reader-" + subTaskId).build(); this.executorService = Executors.newSingleThreadExecutor(threadFactory); - this.pureBinlogPhaseTables = new HashSet<>(); + this.pureStreamPhaseTables = new HashSet<>(); } @Override @@ -95,7 +95,7 @@ public void submitTask(FetchTask fetchTask) { } catch (Exception e) { LOG.error( String.format( - "Execute binlog read task for stream split %s fail", + "Execute stream read task for stream split %s fail", currentStreamSplit), e); readException = e; @@ -156,23 +156,23 @@ public void close() { /** * Returns the record should emit or not. * - *

The watermark signal algorithm is the binlog split reader only sends the binlog event that - * belongs to its finished snapshot splits. For each snapshot split, the binlog event is valid + *

The watermark signal algorithm is the stream split reader only sends the change event that + * belongs to its finished snapshot splits. For each snapshot split, the change event is valid * since the offset is after its high watermark. * *

 E.g: the data input is :
      *    snapshot-split-0 info : [0,    1024) highWatermark0
      *    snapshot-split-1 info : [1024, 2048) highWatermark1
      *  the data output is:
-     *  only the binlog event belong to [0,    1024) and offset is after highWatermark0 should send,
-     *  only the binlog event belong to [1024, 2048) and offset is after highWatermark1 should send.
+     *  only the change event belong to [0,    1024) and offset is after highWatermark0 should send,
+     *  only the change event belong to [1024, 2048) and offset is after highWatermark1 should send.
      * 
*/ private boolean shouldEmit(SourceRecord sourceRecord) { if (isDataChangeRecord(sourceRecord)) { TableId tableId = getTableId(sourceRecord); Offset position = taskContext.getStreamOffset(sourceRecord); - if (hasEnterPureBinlogPhase(tableId, position)) { + if (hasEnterPureStreamPhase(tableId, position)) { return true; } // only the table who captured snapshot splits need to filter @@ -198,14 +198,14 @@ private boolean shouldEmit(SourceRecord sourceRecord) { return true; } - private boolean hasEnterPureBinlogPhase(TableId tableId, Offset position) { - if (pureBinlogPhaseTables.contains(tableId)) { + private boolean hasEnterPureStreamPhase(TableId tableId, Offset position) { + if (pureStreamPhaseTables.contains(tableId)) { return true; } // the existed tables those have finished snapshot reading if (maxSplitHighWatermarkMap.containsKey(tableId) && position.isAtOrAfter(maxSplitHighWatermarkMap.get(tableId))) { - pureBinlogPhaseTables.add(tableId); + pureStreamPhaseTables.add(tableId); return true; } @@ -217,11 +217,11 @@ private void configureFilter() { List finishedSplitInfos = currentStreamSplit.getFinishedSnapshotSplitInfos(); Map> splitsInfoMap = new HashMap<>(); - Map tableIdBinlogPositionMap = new HashMap<>(); + Map tableIdOffsetPositionMap = new HashMap<>(); // latest-offset mode if (finishedSplitInfos.isEmpty()) { for (TableId tableId : currentStreamSplit.getTableSchemas().keySet()) { - tableIdBinlogPositionMap.put(tableId, currentStreamSplit.getStartingOffset()); + tableIdOffsetPositionMap.put(tableId, currentStreamSplit.getStartingOffset()); } } // initial mode @@ -234,14 +234,14 @@ private void configureFilter() { splitsInfoMap.put(tableId, list); Offset highWatermark = finishedSplitInfo.getHighWatermark(); - Offset maxHighWatermark = tableIdBinlogPositionMap.get(tableId); + Offset maxHighWatermark = tableIdOffsetPositionMap.get(tableId); if (maxHighWatermark == null || highWatermark.isAfter(maxHighWatermark)) { - tableIdBinlogPositionMap.put(tableId, highWatermark); + tableIdOffsetPositionMap.put(tableId, highWatermark); } } } this.finishedSplitsInfo = splitsInfoMap; - this.maxSplitHighWatermarkMap = tableIdBinlogPositionMap; - this.pureBinlogPhaseTables.clear(); + this.maxSplitHighWatermarkMap = tableIdOffsetPositionMap; + this.pureStreamPhaseTables.clear(); } } diff --git a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/utils/SourceRecordUtils.java b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/utils/SourceRecordUtils.java index 8118c2edb4..6f8114f4b6 100644 --- a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/utils/SourceRecordUtils.java +++ b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/utils/SourceRecordUtils.java @@ -83,7 +83,7 @@ public static boolean isHighWatermarkEvent(SourceRecord record) { public static boolean isEndWatermarkEvent(SourceRecord record) { Optional watermarkKind = getWatermarkKind(record); - return watermarkKind.isPresent() && watermarkKind.get() == WatermarkKind.BINLOG_END; + return watermarkKind.isPresent() && watermarkKind.get() == WatermarkKind.END; } /** @@ -236,42 +236,42 @@ private static Optional getWatermarkKind(SourceRecord record) { return Optional.empty(); } - /** upsert binlog events to snapshot events collection. */ - public static void upsertBinlog( - Map snapshotRecords, SourceRecord binlogRecord) { - Struct key = (Struct) binlogRecord.key(); - Struct value = (Struct) binlogRecord.value(); + /** rewrite output buffer by data change event. */ + public static void rewriteOutputBuffer( + Map outputBuffer, SourceRecord changeRecord) { + Struct key = (Struct) changeRecord.key(); + Struct value = (Struct) changeRecord.value(); if (value != null) { Envelope.Operation operation = Envelope.Operation.forCode(value.getString(Envelope.FieldName.OPERATION)); switch (operation) { case CREATE: case UPDATE: - Envelope envelope = Envelope.fromSchema(binlogRecord.valueSchema()); + Envelope envelope = Envelope.fromSchema(changeRecord.valueSchema()); Struct source = value.getStruct(Envelope.FieldName.SOURCE); Struct after = value.getStruct(Envelope.FieldName.AFTER); Instant fetchTs = Instant.ofEpochMilli((Long) source.get(Envelope.FieldName.TIMESTAMP)); SourceRecord record = new SourceRecord( - binlogRecord.sourcePartition(), - binlogRecord.sourceOffset(), - binlogRecord.topic(), - binlogRecord.kafkaPartition(), - binlogRecord.keySchema(), - binlogRecord.key(), - binlogRecord.valueSchema(), + changeRecord.sourcePartition(), + changeRecord.sourceOffset(), + changeRecord.topic(), + changeRecord.kafkaPartition(), + changeRecord.keySchema(), + changeRecord.key(), + changeRecord.valueSchema(), envelope.read(after, source, fetchTs)); - snapshotRecords.put(key, record); + outputBuffer.put(key, record); break; case DELETE: - snapshotRecords.remove(key); + outputBuffer.remove(key); break; case READ: throw new IllegalStateException( String.format( - "Binlog record shouldn't use READ operation, the the record is %s.", - binlogRecord)); + "Data change record shouldn't use READ operation, the the record is %s.", + changeRecord)); } } } diff --git a/flink-cdc-base/src/test/java/com/ververica/cdc/connectors/base/experimental/fetch/MySqlScanFetchTask.java b/flink-cdc-base/src/test/java/com/ververica/cdc/connectors/base/experimental/fetch/MySqlScanFetchTask.java index a8e6a6e2ac..2f93acaadd 100644 --- a/flink-cdc-base/src/test/java/com/ververica/cdc/connectors/base/experimental/fetch/MySqlScanFetchTask.java +++ b/flink-cdc-base/src/test/java/com/ververica/cdc/connectors/base/experimental/fetch/MySqlScanFetchTask.java @@ -187,7 +187,7 @@ private void dispatchBinlogEndEvent( sourcePartition, backFillBinlogSplit, backFillBinlogSplit.getEndingOffset(), - WatermarkKind.BINLOG_END); + WatermarkKind.END); } /** A wrapped task to fetch snapshot split of table. */ diff --git a/flink-cdc-base/src/test/java/com/ververica/cdc/connectors/base/experimental/fetch/MySqlStreamFetchTask.java b/flink-cdc-base/src/test/java/com/ververica/cdc/connectors/base/experimental/fetch/MySqlStreamFetchTask.java index 67fa95b13d..d09757d9db 100644 --- a/flink-cdc-base/src/test/java/com/ververica/cdc/connectors/base/experimental/fetch/MySqlStreamFetchTask.java +++ b/flink-cdc-base/src/test/java/com/ververica/cdc/connectors/base/experimental/fetch/MySqlStreamFetchTask.java @@ -139,7 +139,7 @@ protected void handleEvent(MySqlOffsetContext offsetContext, Event event) { offsetContext.getPartition(), binlogSplit, currentBinlogOffset, - JdbcSourceEventDispatcher.WatermarkKind.BINLOG_END); + JdbcSourceEventDispatcher.WatermarkKind.END); } catch (InterruptedException e) { LOG.error("Send signal event error.", e); errorHandler.setProducerThrowable(