From 5e206e4f7e7b62c1379f8f9b8945fd1eeb150fcf Mon Sep 17 00:00:00 2001 From: agapple Date: Sun, 10 Dec 2017 23:05:28 +0800 Subject: [PATCH] fixed issue #440 , optimize find position --- .../parse/inbound/mysql/MysqlEventParser.java | 61 +++++++++++-------- 1 file changed, 37 insertions(+), 24 deletions(-) diff --git a/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlEventParser.java b/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlEventParser.java index a398ea45c..38c92a6f1 100644 --- a/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlEventParser.java +++ b/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlEventParser.java @@ -374,7 +374,8 @@ protected EntryPosition findEndPositionWithMasterIdAndTimestamp(MysqlConnection return findAsPerTimestampInSpecificLogFile(mysqlConnection, startTimestamp, endPosition, - endPosition.getJournalName()); + endPosition.getJournalName(), + true); } else { return endPosition; } @@ -388,7 +389,8 @@ protected EntryPosition findPositionWithMasterIdAndTimestamp(MysqlConnection con EntryPosition entryPosition = findAsPerTimestampInSpecificLogFile(mysqlConnection, startTimestamp, fixedPosition, - fixedPosition.getJournalName()); + fixedPosition.getJournalName(), + true); if (entryPosition == null) { throw new CanalParseException("[fixed timestamp] can't found begin/commit position before with fixed position" + fixedPosition.getJournalName() + ":" + fixedPosition.getPosition()); @@ -446,7 +448,8 @@ protected EntryPosition findStartPositionInternal(ErosaConnection connection) { specificLogFilePosition = findAsPerTimestampInSpecificLogFile(mysqlConnection, entryPosition.getTimestamp(), endPosition, - entryPosition.getJournalName()); + entryPosition.getJournalName(), + true); } } @@ -590,7 +593,8 @@ private EntryPosition findByStartTimeStamp(MysqlConnection mysqlConnection, Long EntryPosition entryPosition = findAsPerTimestampInSpecificLogFile(mysqlConnection, startTimestamp, endPosition, - startSearchBinlogFile); + startSearchBinlogFile, + false); if (entryPosition == null) { if (StringUtils.equalsIgnoreCase(minBinlogFileName, startSearchBinlogFile)) { // 已经找到最早的一个binlog,没必要往前找了 @@ -738,7 +742,8 @@ private SlaveEntryPosition findSlavePosition(MysqlConnection mysqlConnection) { private EntryPosition findAsPerTimestampInSpecificLogFile(MysqlConnection mysqlConnection, final Long startTimestamp, final EntryPosition endPosition, - final String searchBinlogFile) { + final String searchBinlogFile, + final Boolean justForPositionTimestamp) { final LogPosition logPosition = new LogPosition(); try { @@ -752,6 +757,15 @@ public boolean sink(LogEvent event) { EntryPosition entryPosition = null; try { CanalEntry.Entry entry = parseAndProfilingIfNecessary(event, true); + if (justForPositionTimestamp && logPosition.getPostion() == null && event.getWhen() > 0) { + // 初始位点 + entryPosition = new EntryPosition(searchBinlogFile, + event.getLogPos(), + event.getWhen() * 1000, + event.getServerId()); + logPosition.setPostion(entryPosition); + } + if (entry == null) { return true; } @@ -761,21 +775,16 @@ public boolean sink(LogEvent event) { Long logposTimestamp = entry.getHeader().getExecuteTime(); Long serverId = entry.getHeader().getServerId(); - if (logger.isDebugEnabled()) { - logger.debug("compare exit condition:{},{},{}, startTimestamp={}...", new Object[] { - logfilename, logfileoffset, logposTimestamp, startTimestamp }); - } - // 事务头和尾寻找第一条记录时间戳,如果最小的一条记录都不满足条件,可直接退出 - if (logposTimestamp >= startTimestamp) { - return false; - } - - if (entryPosition == null) { - // 如果啥都找不到,就返回第一条Format Event的位点,一般是116位点 - entryPosition = new EntryPosition(logfilename, logfileoffset, logposTimestamp, serverId); - logger.debug("set {} to be pending start position before finding another proper one...", - entryPosition); - logPosition.setPostion(entryPosition); + if (CanalEntry.EntryType.TRANSACTIONBEGIN.equals(entry.getEntryType()) + || CanalEntry.EntryType.TRANSACTIONEND.equals(entry.getEntryType())) { + if (logger.isDebugEnabled()) { + logger.debug("compare exit condition:{},{},{}, startTimestamp={}...", new Object[] { + logfilename, logfileoffset, logposTimestamp, startTimestamp }); + } + // 事务头和尾寻找第一条记录时间戳,如果最小的一条记录都不满足条件,可直接退出 + if (logposTimestamp >= startTimestamp) { + return false; + } } if (StringUtils.equals(endPosition.getJournalName(), logfilename) @@ -788,14 +797,18 @@ public boolean sink(LogEvent event) { // data.length,代表该事务的下一条offest,避免多余的事务重复 if (CanalEntry.EntryType.TRANSACTIONEND.equals(entry.getEntryType())) { entryPosition = new EntryPosition(logfilename, logfileoffset, logposTimestamp, serverId); - logger.debug("set {} to be pending start position before finding another proper one...", - entryPosition); + if (logger.isDebugEnabled()) { + logger.debug("set {} to be pending start position before finding another proper one...", + entryPosition); + } logPosition.setPostion(entryPosition); } else if (CanalEntry.EntryType.TRANSACTIONBEGIN.equals(entry.getEntryType())) { // 当前事务开始位点 entryPosition = new EntryPosition(logfilename, logfileoffset, logposTimestamp, serverId); - logger.debug("set {} to be pending start position before finding another proper one...", - entryPosition); + if (logger.isDebugEnabled()) { + logger.debug("set {} to be pending start position before finding another proper one...", + entryPosition); + } logPosition.setPostion(entryPosition); }