Skip to content

Commit

Permalink
fixed issue #440 , optimize find position
Browse files Browse the repository at this point in the history
  • Loading branch information
agapple committed Dec 10, 2017
1 parent 448fc8b commit 5e206e4
Showing 1 changed file with 37 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,8 @@ protected EntryPosition findEndPositionWithMasterIdAndTimestamp(MysqlConnection
return findAsPerTimestampInSpecificLogFile(mysqlConnection,
startTimestamp,
endPosition,
endPosition.getJournalName());
endPosition.getJournalName(),
true);
} else {
return endPosition;
}
Expand All @@ -388,7 +389,8 @@ protected EntryPosition findPositionWithMasterIdAndTimestamp(MysqlConnection con
EntryPosition entryPosition = findAsPerTimestampInSpecificLogFile(mysqlConnection,
startTimestamp,
fixedPosition,
fixedPosition.getJournalName());
fixedPosition.getJournalName(),
true);
if (entryPosition == null) {
throw new CanalParseException("[fixed timestamp] can't found begin/commit position before with fixed position"
+ fixedPosition.getJournalName() + ":" + fixedPosition.getPosition());
Expand Down Expand Up @@ -446,7 +448,8 @@ protected EntryPosition findStartPositionInternal(ErosaConnection connection) {
specificLogFilePosition = findAsPerTimestampInSpecificLogFile(mysqlConnection,
entryPosition.getTimestamp(),
endPosition,
entryPosition.getJournalName());
entryPosition.getJournalName(),
true);
}
}

Expand Down Expand Up @@ -590,7 +593,8 @@ private EntryPosition findByStartTimeStamp(MysqlConnection mysqlConnection, Long
EntryPosition entryPosition = findAsPerTimestampInSpecificLogFile(mysqlConnection,
startTimestamp,
endPosition,
startSearchBinlogFile);
startSearchBinlogFile,
false);
if (entryPosition == null) {
if (StringUtils.equalsIgnoreCase(minBinlogFileName, startSearchBinlogFile)) {
// 已经找到最早的一个binlog,没必要往前找了
Expand Down Expand Up @@ -738,7 +742,8 @@ private SlaveEntryPosition findSlavePosition(MysqlConnection mysqlConnection) {
private EntryPosition findAsPerTimestampInSpecificLogFile(MysqlConnection mysqlConnection,
final Long startTimestamp,
final EntryPosition endPosition,
final String searchBinlogFile) {
final String searchBinlogFile,
final Boolean justForPositionTimestamp) {

final LogPosition logPosition = new LogPosition();
try {
Expand All @@ -752,6 +757,15 @@ public boolean sink(LogEvent event) {
EntryPosition entryPosition = null;
try {
CanalEntry.Entry entry = parseAndProfilingIfNecessary(event, true);
if (justForPositionTimestamp && logPosition.getPostion() == null && event.getWhen() > 0) {
// 初始位点
entryPosition = new EntryPosition(searchBinlogFile,
event.getLogPos(),
event.getWhen() * 1000,
event.getServerId());
logPosition.setPostion(entryPosition);
}

if (entry == null) {
return true;
}
Expand All @@ -761,21 +775,16 @@ public boolean sink(LogEvent event) {
Long logposTimestamp = entry.getHeader().getExecuteTime();
Long serverId = entry.getHeader().getServerId();

if (logger.isDebugEnabled()) {
logger.debug("compare exit condition:{},{},{}, startTimestamp={}...", new Object[] {
logfilename, logfileoffset, logposTimestamp, startTimestamp });
}
// 事务头和尾寻找第一条记录时间戳,如果最小的一条记录都不满足条件,可直接退出
if (logposTimestamp >= startTimestamp) {
return false;
}

if (entryPosition == null) {
// 如果啥都找不到,就返回第一条Format Event的位点,一般是116位点
entryPosition = new EntryPosition(logfilename, logfileoffset, logposTimestamp, serverId);
logger.debug("set {} to be pending start position before finding another proper one...",
entryPosition);
logPosition.setPostion(entryPosition);
if (CanalEntry.EntryType.TRANSACTIONBEGIN.equals(entry.getEntryType())
|| CanalEntry.EntryType.TRANSACTIONEND.equals(entry.getEntryType())) {
if (logger.isDebugEnabled()) {
logger.debug("compare exit condition:{},{},{}, startTimestamp={}...", new Object[] {
logfilename, logfileoffset, logposTimestamp, startTimestamp });
}
// 事务头和尾寻找第一条记录时间戳,如果最小的一条记录都不满足条件,可直接退出
if (logposTimestamp >= startTimestamp) {
return false;
}
}

if (StringUtils.equals(endPosition.getJournalName(), logfilename)
Expand All @@ -788,14 +797,18 @@ public boolean sink(LogEvent event) {
// data.length,代表该事务的下一条offest,避免多余的事务重复
if (CanalEntry.EntryType.TRANSACTIONEND.equals(entry.getEntryType())) {
entryPosition = new EntryPosition(logfilename, logfileoffset, logposTimestamp, serverId);
logger.debug("set {} to be pending start position before finding another proper one...",
entryPosition);
if (logger.isDebugEnabled()) {
logger.debug("set {} to be pending start position before finding another proper one...",
entryPosition);
}
logPosition.setPostion(entryPosition);
} else if (CanalEntry.EntryType.TRANSACTIONBEGIN.equals(entry.getEntryType())) {
// 当前事务开始位点
entryPosition = new EntryPosition(logfilename, logfileoffset, logposTimestamp, serverId);
logger.debug("set {} to be pending start position before finding another proper one...",
entryPosition);
if (logger.isDebugEnabled()) {
logger.debug("set {} to be pending start position before finding another proper one...",
entryPosition);
}
logPosition.setPostion(entryPosition);
}

Expand Down

0 comments on commit 5e206e4

Please sign in to comment.