Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[cdc-base] Abstract naming for variables of incremental framework. #1646

Merged
merged 1 commit into from
Oct 27, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ public JdbcSourceConfigFactory password(String password) {

/**
* The session time zone in database server, e.g. "America/Los_Angeles". It controls how the
* TIMESTAMP type in MYSQL converted to STRING. See more
* TIMESTAMP type converted to STRING. See more
* https://debezium.io/documentation/reference/1.5/connectors/mysql.html#mysql-temporal-types
*/
public JdbcSourceConfigFactory serverTimeZone(String timeZone) {
Expand Down Expand Up @@ -153,7 +153,7 @@ public JdbcSourceConfigFactory fetchSize(int fetchSize) {
}

/**
* The maximum time that the connector should wait after trying to connect to the MySQL database
* The maximum time that the connector should wait after trying to connect to the database
* server before timing out.
*/
public JdbcSourceConfigFactory connectTimeout(Duration connectTimeout) {
Expand All @@ -179,7 +179,7 @@ public JdbcSourceConfigFactory includeSchemaChanges(boolean includeSchemaChanges
return this;
}

/** The Debezium MySQL connector properties. For example, "snapshot.mode". */
/** The Debezium connector properties. For example, "snapshot.mode". */
public JdbcSourceConfigFactory debeziumProperties(Properties properties) {
this.dbzProperties = properties;
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,39 +30,38 @@ public class JdbcSourceOptions extends SourceOptions {
ConfigOptions.key("hostname")
.stringType()
.noDefaultValue()
.withDescription("IP address or hostname of the MySQL database server.");
.withDescription("IP address or hostname of the database server.");

public static final ConfigOption<Integer> PORT =
ConfigOptions.key("port")
.intType()
.defaultValue(3306)
.withDescription("Integer port number of the MySQL database server.");
.withDescription("Integer port number of the database server.");

public static final ConfigOption<String> USERNAME =
ConfigOptions.key("username")
.stringType()
.noDefaultValue()
.withDescription(
"Name of the MySQL database to use when connecting to the MySQL database server.");
"Name of the database to use when connecting to the database server.");

public static final ConfigOption<String> PASSWORD =
ConfigOptions.key("password")
.stringType()
.noDefaultValue()
.withDescription(
"Password to use when connecting to the MySQL database server.");
.withDescription("Password to use when connecting to the database server.");

public static final ConfigOption<String> DATABASE_NAME =
ConfigOptions.key("database-name")
.stringType()
.noDefaultValue()
.withDescription("Database name of the MySQL server to monitor.");
.withDescription("Database name of the database to monitor.");

public static final ConfigOption<String> TABLE_NAME =
ConfigOptions.key("table-name")
.stringType()
.noDefaultValue()
.withDescription("Table name of the MySQL database to monitor.");
.withDescription("Table name of the database to monitor.");

public static final ConfigOption<String> SERVER_TIME_ZONE =
ConfigOptions.key("server-time-zone")
Expand All @@ -89,7 +88,7 @@ public class JdbcSourceOptions extends SourceOptions {
.durationType()
.defaultValue(Duration.ofSeconds(30))
.withDescription(
"The maximum time that the connector should wait after trying to connect to the MySQL database server before timing out.");
"The maximum time that the connector should wait after trying to connect to the database server before timing out.");

public static final ConfigOption<Integer> CONNECTION_POOL_SIZE =
ConfigOptions.key("connection.pool.size")
Expand All @@ -102,5 +101,5 @@ public class JdbcSourceOptions extends SourceOptions {
.intType()
.defaultValue(3)
.withDescription(
"The max retry times that the connector should retry to build MySQL database server connection.");
"The max retry times that the connector should retry to build database server connection.");
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public class SourceOptions {
.stringType()
.defaultValue("initial")
.withDescription(
"Optional startup mode for MySQL CDC consumer, valid enumerations are "
"Optional startup mode for CDC consumer, valid enumerations are "
+ "\"initial\", \"earliest-offset\", \"latest-offset\", \"timestamp\"\n"
+ "or \"specific-offset\"");

Expand Down Expand Up @@ -97,7 +97,7 @@ public class SourceOptions {
"The upper bound of chunk key distribution factor. The distribution factor is used to determine whether the"
+ " table is evenly distribution or not."
+ " The table chunks would use evenly calculation optimization when the data distribution is even,"
+ " and the query MySQL for splitting would happen when it is uneven."
+ " and the query for splitting would happen when it is uneven."
+ " The distribution factor could be calculated by (MAX(id) - MIN(id) + 1) / rowCount.");

public static final ConfigOption<Double> SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND =
Expand All @@ -109,6 +109,6 @@ public class SourceOptions {
"The lower bound of chunk key distribution factor. The distribution factor is used to determine whether the"
+ " table is evenly distribution or not."
+ " The table chunks would use evenly calculation optimization when the data distribution is even,"
+ " and the query MySQL for splitting would happen when it is uneven."
+ " and the query for splitting would happen when it is uneven."
+ " The distribution factor could be calculated by (MAX(id) - MIN(id) + 1) / rowCount.");
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,32 +32,32 @@ public final class StartupOptions implements Serializable {

/**
* Performs an initial snapshot on the monitored database tables upon first startup, and
* continue to read the latest binlog.
* continue to read the latest change log.
*/
public static StartupOptions initial() {
return new StartupOptions(StartupMode.INITIAL, null, null, null);
}

/**
* Never to perform snapshot on the monitored database tables upon first startup, just read from
* the beginning of the binlog. This should be used with care, as it is only valid when the
* binlog is guaranteed to contain the entire history of the database.
* the beginning of the change log. This should be used with care, as it is only valid when the
* change log is guaranteed to contain the entire history of the database.
*/
public static StartupOptions earliest() {
return new StartupOptions(StartupMode.EARLIEST_OFFSET, null, null, null);
}

/**
* Never to perform snapshot on the monitored database tables upon first startup, just read from
* the end of the binlog which means only have the changes since the connector was started.
* the end of the change log which means only have the changes since the connector was started.
*/
public static StartupOptions latest() {
return new StartupOptions(StartupMode.LATEST_OFFSET, null, null, null);
}

/**
* Never to perform snapshot on the monitored database tables upon first startup, and directly
* read binlog from the specified offset.
* read change log from the specified offset.
*/
public static StartupOptions specificOffset(String specificOffsetFile, int specificOffsetPos) {
return new StartupOptions(
Expand All @@ -66,10 +66,10 @@ public static StartupOptions specificOffset(String specificOffsetFile, int speci

/**
* Never to perform snapshot on the monitored database tables upon first startup, and directly
* read binlog from the specified timestamp.
* read change log from the specified timestamp.
*
* <p>The consumer will traverse the binlog from the beginning and ignore change events whose
* timestamp is smaller than the specified timestamp.
* <p>The consumer will traverse the change log from the beginning and ignore change events
* whose timestamp is smaller than the specified timestamp.
*
* @param startupTimestampMillis timestamp for the startup offsets, as milliseconds from epoch.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -288,15 +288,15 @@ private Struct signalRecordValue(String splitId, WatermarkKind watermarkKind) {
public enum WatermarkKind {
LOW,
HIGH,
BINLOG_END;
END;

public WatermarkKind fromString(String kindString) {
if (LOW.name().equalsIgnoreCase(kindString)) {
return LOW;
} else if (HIGH.name().equalsIgnoreCase(kindString)) {
return HIGH;
} else {
return BINLOG_END;
return END;
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
/**
* The {@link RecordEmitter} implementation for {@link JdbcIncrementalSourceReader}.
*
* <p>The {@link RecordEmitter} buffers the snapshot records of split and call the binlog reader to
* <p>The {@link RecordEmitter} buffers the snapshot records of split and call the stream reader to
* emit records rather than emit the records directly.
*/
public class JdbcSourceRecordEmitter<T>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
public class HybridSplitAssigner implements SplitAssigner {

private static final Logger LOG = LoggerFactory.getLogger(HybridSplitAssigner.class);
private static final String BINLOG_SPLIT_ID = "binlog-split";
private static final String STREAM_SPLIT_ID = "stream-split";

private final int splitMetaGroupSize;

Expand Down Expand Up @@ -107,18 +107,18 @@ public void open() {
@Override
public Optional<SourceSplitBase> getNext() {
if (snapshotSplitAssigner.noMoreSplits()) {
// binlog split assigning
// stream split assigning
if (isStreamSplitAssigned) {
// no more splits for the assigner
return Optional.empty();
} else if (snapshotSplitAssigner.isFinished()) {
// we need to wait snapshot-assigner to be finished before
// assigning the binlog split. Otherwise, records emitted from binlog split
// assigning the stream split. Otherwise, records emitted from stream split
// might be out-of-order in terms of same primary key with snapshot splits.
isStreamSplitAssigned = true;
return Optional.of(createStreamSplit());
} else {
// binlog split is not ready by now
// stream split is not ready by now
return Optional.empty();
}
} else {
Expand Down Expand Up @@ -149,7 +149,7 @@ public void addSplits(Collection<SourceSplitBase> splits) {
if (split.isSnapshotSplit()) {
snapshotSplits.add(split);
} else {
// we don't store the split, but will re-create binlog split later
// we don't store the split, but will re-create stream split later
isStreamSplitAssigned = false;
}
}
Expand Down Expand Up @@ -183,20 +183,20 @@ public StreamSplit createStreamSplit() {
Map<String, Offset> splitFinishedOffsets = snapshotSplitAssigner.getSplitFinishedOffsets();
final List<FinishedSnapshotSplitInfo> finishedSnapshotSplitInfos = new ArrayList<>();

Offset minBinlogOffset = null;
Offset minOffset = null;
for (SchemalessSnapshotSplit split : assignedSnapshotSplit) {
// find the min binlog offset
Offset binlogOffset = splitFinishedOffsets.get(split.splitId());
if (minBinlogOffset == null || binlogOffset.isBefore(minBinlogOffset)) {
minBinlogOffset = binlogOffset;
// find the min offset of change log
Offset changeLogOffset = splitFinishedOffsets.get(split.splitId());
if (minOffset == null || changeLogOffset.isBefore(minOffset)) {
minOffset = changeLogOffset;
}
finishedSnapshotSplitInfos.add(
new FinishedSnapshotSplitInfo(
split.getTableId(),
split.splitId(),
split.getSplitStart(),
split.getSplitEnd(),
binlogOffset,
changeLogOffset,
offsetFactory));
}

Expand All @@ -205,8 +205,8 @@ public StreamSplit createStreamSplit() {

boolean divideMetaToGroups = finishedSnapshotSplitInfos.size() > splitMetaGroupSize;
return new StreamSplit(
BINLOG_SPLIT_ID,
minBinlogOffset == null ? offsetFactory.createInitialOffset() : minBinlogOffset,
STREAM_SPLIT_ID,
minOffset == null ? offsetFactory.createInitialOffset() : minOffset,
offsetFactory.createNoStoppingOffset(),
divideMetaToGroups ? new ArrayList<>() : finishedSnapshotSplitInfos,
new HashMap<>(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,14 +213,14 @@ public List<FinishedSnapshotSplitInfo> getFinishedSplitInfos() {
.collect(Collectors.toList());
List<FinishedSnapshotSplitInfo> finishedSnapshotSplitInfos = new ArrayList<>();
for (SchemalessSnapshotSplit split : assignedSnapshotSplit) {
Offset binlogOffset = splitFinishedOffsets.get(split.splitId());
Offset finishedOffset = splitFinishedOffsets.get(split.splitId());
finishedSnapshotSplitInfos.add(
new FinishedSnapshotSplitInfo(
split.getTableId(),
split.splitId(),
split.getSplitStart(),
split.getSplitEnd(),
binlogOffset,
finishedOffset,
offsetFactory));
}
return finishedSnapshotSplitInfos;
Expand All @@ -231,7 +231,7 @@ public void onFinishedSplits(Map<String, Offset> splitFinishedOffsets) {
this.splitFinishedOffsets.putAll(splitFinishedOffsets);
if (allSplitsFinished()) {
// Skip the waiting checkpoint when current parallelism is 1 which means we do not need
// to care about the global output data order of snapshot splits and binlog split.
// to care about the global output data order of snapshot splits and stream split.
if (currentParallelism == 1) {
assignerFinished = true;
LOG.info(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,14 +56,14 @@ public interface SplitAssigner {
boolean waitingForFinishedSplits();

/**
* Gets the finished splits information. This is useful meta data to generate a binlog split
* Gets the finished splits information. This is useful meta data to generate a stream split
* that considering finished snapshot splits.
*/
List<FinishedSnapshotSplitInfo> getFinishedSplitInfos();

/**
* Callback to handle the finished splits with finished binlog offset. This is useful for
* determine when to generate binlog split and what binlog split to generate.
* Callback to handle the finished splits with finished change log offset. This is useful for
* determine when to generate stream split and what stream split to generate.
*/
void onFinishedSplits(Map<String, Offset> splitFinishedOffsets);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
public class StreamSplitAssigner implements SplitAssigner {

private static final Logger LOG = LoggerFactory.getLogger(StreamSplitAssigner.class);
private static final String BINLOG_SPLIT_ID = "binlog-split";
private static final String STREAM_SPLIT_ID = "stream-split";

private final SourceConfig sourceConfig;

Expand Down Expand Up @@ -103,7 +103,7 @@ public void onFinishedSplits(Map<String, Offset> splitFinishedOffsets) {

@Override
public void addSplits(Collection<SourceSplitBase> splits) {
// we don't store the split, but will re-create binlog split later
// we don't store the split, but will re-create stream split later
isStreamSplitAssigned = false;
}

Expand All @@ -125,7 +125,7 @@ public void close() {}
public StreamSplit createStreamSplit() {

return new StreamSplit(
BINLOG_SPLIT_ID,
STREAM_SPLIT_ID,
dialect.displayCurrentOffset(sourceConfig),
offsetFactory.createInitialOffset(),
new ArrayList<>(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

import java.util.Objects;

/** A {@link PendingSplitsState} for pending hybrid (snapshot & binlog) splits. */
/** A {@link PendingSplitsState} for pending hybrid (snapshot & stream) splits. */
public class HybridPendingSplitsState extends PendingSplitsState {
private final SnapshotPendingSplitsState snapshotPendingSplits;
private final boolean isStreamSplitAssigned;
Expand Down Expand Up @@ -60,7 +60,7 @@ public String toString() {
return "HybridPendingSplitsState{"
+ "snapshotPendingSplits="
+ snapshotPendingSplits
+ ", isBinlogSplitAssigned="
+ ", isStreamSplitAssigned="
+ isStreamSplitAssigned
+ '}';
}
Expand Down
Loading