From e1dfc0db5347059edf31237c0e62889644c0dff9 Mon Sep 17 00:00:00 2001 From: rongtong Date: Sun, 1 Jan 2023 13:02:53 +0800 Subject: [PATCH] [ISSUE #5714] Fix the issue that broker can't work normally when transientStorePool=true in controller mode (#5722) * Fix the issue that the slave role does not initialize the transientPool in controller mode * Format the checkstyle * Remove the useless import * Fix the HA transmission disconnection issue when transientStorePoolEnable is true * just test * just test * just test * just test * just test * just test * just test * just test * just test * Format the check style * Format the check style --- .../processor/AdminBrokerProcessor.java | 4 +- .../store/AllocateMappedFileService.java | 4 +- .../org/apache/rocketmq/store/CommitLog.java | 50 ++++++++----------- .../rocketmq/store/DefaultMessageStore.java | 16 +++++- .../apache/rocketmq/store/FlushManager.java | 36 +++++++++++++ .../rocketmq/store/TransientStorePool.java | 22 +++++--- .../store/config/MessageStoreConfig.java | 6 +-- .../ha/autoswitch/AutoSwitchHAService.java | 22 ++++++++ .../store/logfile/DefaultMappedFile.java | 48 ++++++++++-------- 9 files changed, 140 insertions(+), 68 deletions(-) create mode 100644 store/src/main/java/org/apache/rocketmq/store/FlushManager.java diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java index 12eab475b8e..24162022c90 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java @@ -174,6 +174,7 @@ import org.apache.rocketmq.remoting.rpc.RpcRequest; import org.apache.rocketmq.remoting.rpc.RpcResponse; import org.apache.rocketmq.store.ConsumeQueueExt; +import org.apache.rocketmq.store.DefaultMessageStore; import org.apache.rocketmq.store.MessageFilter; import org.apache.rocketmq.store.MessageStore; import org.apache.rocketmq.store.PutMessageResult; @@ -2276,7 +2277,7 @@ private HashMap prepareRuntimeInfo() { } MessageStore messageStore = this.brokerController.getMessageStore(); runtimeInfo.put("remainTransientStoreBufferNumbs", String.valueOf(messageStore.remainTransientStoreBufferNumbs())); - if (this.brokerController.getMessageStoreConfig().isTransientStorePoolEnable()) { + if (this.brokerController.getMessageStore() instanceof DefaultMessageStore && ((DefaultMessageStore) this.brokerController.getMessageStore()).isTransientStorePoolEnable()) { runtimeInfo.put("remainHowManyDataToCommit", MixAll.humanReadableByteCount(messageStore.remainHowManyDataToCommit(), false)); } runtimeInfo.put("remainHowManyDataToFlush", MixAll.humanReadableByteCount(messageStore.remainHowManyDataToFlush(), false)); @@ -2606,7 +2607,6 @@ private RemotingCommand getBrokerEpochCache(ChannelHandlerContext ctx, RemotingC return response; } - private RemotingCommand resetMasterFlushOffset(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(null); diff --git a/store/src/main/java/org/apache/rocketmq/store/AllocateMappedFileService.java b/store/src/main/java/org/apache/rocketmq/store/AllocateMappedFileService.java index 4d2fc51683b..dca7d53258d 100644 --- a/store/src/main/java/org/apache/rocketmq/store/AllocateMappedFileService.java +++ b/store/src/main/java/org/apache/rocketmq/store/AllocateMappedFileService.java @@ -52,7 +52,7 @@ public AllocateMappedFileService(DefaultMessageStore messageStore) { public MappedFile putRequestAndReturnMappedFile(String nextFilePath, String nextNextFilePath, int fileSize) { int canSubmitRequests = 2; - if (this.messageStore.getMessageStoreConfig().isTransientStorePoolEnable()) { + if (this.messageStore.isTransientStorePoolEnable()) { if (this.messageStore.getMessageStoreConfig().isFastFailIfNoBufferInStorePool() && BrokerRole.SLAVE != this.messageStore.getMessageStoreConfig().getBrokerRole()) { //if broker is slave, don't fast fail even no buffer in pool canSubmitRequests = this.messageStore.getTransientStorePool().availableBufferNums() - this.requestQueue.size(); @@ -171,7 +171,7 @@ private boolean mmapOperation() { long beginTime = System.currentTimeMillis(); MappedFile mappedFile; - if (messageStore.getMessageStoreConfig().isTransientStorePoolEnable()) { + if (messageStore.isTransientStorePoolEnable()) { try { mappedFile = ServiceLoader.load(MappedFile.class).iterator().next(); mappedFile.init(req.getFilePath(), req.getFileSize(), messageStore.getTransientStorePool()); diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java index c38c2168e63..d7e141d31c5 100644 --- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java +++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java @@ -531,7 +531,6 @@ public DispatchRequest checkMessageAndReturnSize(java.nio.ByteBuffer byteBuffer, return dispatchRequest; } catch (Exception e) { - log.error("Check message and return size error", e); } return new DispatchRequest(-1, false /* success */); @@ -1821,24 +1820,13 @@ public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer } - interface FlushManager { - void start(); - - void shutdown(); - - void wakeUpFlush(); - - void handleDiskFlush(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt); - - CompletableFuture handleDiskFlush(AppendMessageResult result, MessageExt messageExt); - } class DefaultFlushManager implements FlushManager { private final FlushCommitLogService flushCommitLogService; //If TransientStorePool enabled, we must flush message to FileChannel at fixed periods - private final FlushCommitLogService commitLogService; + private final FlushCommitLogService commitRealTimeService; public DefaultFlushManager() { if (FlushDiskType.SYNC_FLUSH == CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) { @@ -1847,15 +1835,14 @@ public DefaultFlushManager() { this.flushCommitLogService = new CommitLog.FlushRealTimeService(); } - this.commitLogService = new CommitLog.CommitRealTimeService(); + this.commitRealTimeService = new CommitLog.CommitRealTimeService(); } - @Override - public void start() { + @Override public void start() { this.flushCommitLogService.start(); - if (defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) { - this.commitLogService.start(); + if (defaultMessageStore.isTransientStorePoolEnable()) { + this.commitRealTimeService.start(); } } @@ -1870,14 +1857,12 @@ public void handleDiskFlush(AppendMessageResult result, PutMessageResult putMess CompletableFuture flushOkFuture = request.future(); PutMessageStatus flushStatus = null; try { - flushStatus = flushOkFuture.get(CommitLog.this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout(), - TimeUnit.MILLISECONDS); + flushStatus = flushOkFuture.get(CommitLog.this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout(), TimeUnit.MILLISECONDS); } catch (InterruptedException | ExecutionException | TimeoutException e) { //flushOK=false; } if (flushStatus != PutMessageStatus.PUT_OK) { - log.error("do groupcommit, wait for flush failed, topic: " + messageExt.getTopic() + " tags: " + messageExt.getTags() - + " client address: " + messageExt.getBornHostString()); + log.error("do groupcommit, wait for flush failed, topic: " + messageExt.getTopic() + " tags: " + messageExt.getTags() + " client address: " + messageExt.getBornHostString()); putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT); } } else { @@ -1886,10 +1871,10 @@ public void handleDiskFlush(AppendMessageResult result, PutMessageResult putMess } // Asynchronous flush else { - if (!CommitLog.this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) { + if (!CommitLog.this.defaultMessageStore.isTransientStorePoolEnable()) { flushCommitLogService.wakeup(); } else { - commitLogService.wakeup(); + commitRealTimeService.wakeup(); } } } @@ -1911,10 +1896,10 @@ public CompletableFuture handleDiskFlush(AppendMessageResult r } // Asynchronous flush else { - if (!CommitLog.this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) { + if (!CommitLog.this.defaultMessageStore.isTransientStorePoolEnable()) { flushCommitLogService.wakeup(); } else { - commitLogService.wakeup(); + commitRealTimeService.wakeup(); } return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK); } @@ -1926,10 +1911,16 @@ public void wakeUpFlush() { flushCommitLogService.wakeup(); } + @Override + public void wakeUpCommit() { + // now wake up commit log thread. + commitRealTimeService.wakeup(); + } + @Override public void shutdown() { - if (defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) { - this.commitLogService.shutdown(); + if (defaultMessageStore.isTransientStorePoolEnable()) { + this.commitRealTimeService.shutdown(); } this.flushCommitLogService.shutdown(); @@ -1963,4 +1954,7 @@ public void cleanSwappedMap(long forceCleanSwapIntervalMs) { this.getMappedFileQueue().cleanSwappedMap(forceCleanSwapIntervalMs); } + public FlushManager getFlushManager() { + return flushManager; + } } diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java index f42960c425e..3cf8efdfa44 100644 --- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java @@ -223,7 +223,7 @@ public DefaultMessageStore(final MessageStoreConfig messageStoreConfig, final Br this.reputMessageService = new ReputMessageService(); - this.transientStorePool = new TransientStorePool(messageStoreConfig); + this.transientStorePool = new TransientStorePool(this); this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("StoreScheduledThread", getBrokerIdentity())); @@ -338,7 +338,7 @@ public void start() throws Exception { this.haService.init(this); } - if (messageStoreConfig.isTransientStorePoolEnable()) { + if (this.isTransientStorePoolEnable()) { this.transientStorePool.init(); } @@ -1067,6 +1067,7 @@ public long getMaxPhyOffset() { return this.commitLog.getMaxOffset(); } + @Override public long getMinPhyOffset() { return this.commitLog.getMinOffset(); @@ -2745,4 +2746,15 @@ public List> getMetricsView() { public void initMetrics(Meter meter, Supplier attributesBuilderSupplier) { DefaultStoreMetricsManager.init(meter, attributesBuilderSupplier, this); } + + /** + * Enable transient commitLog store pool only if transientStorePoolEnable is true and broker role is not SLAVE or + * enableControllerMode is true + * + * @return true or false + */ + public boolean isTransientStorePoolEnable() { + return this.messageStoreConfig.isTransientStorePoolEnable() && + (this.brokerConfig.isEnableControllerMode() || this.messageStoreConfig.getBrokerRole() != BrokerRole.SLAVE); + } } diff --git a/store/src/main/java/org/apache/rocketmq/store/FlushManager.java b/store/src/main/java/org/apache/rocketmq/store/FlushManager.java new file mode 100644 index 00000000000..fe3951ae7f7 --- /dev/null +++ b/store/src/main/java/org/apache/rocketmq/store/FlushManager.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.store; + +import java.util.concurrent.CompletableFuture; +import org.apache.rocketmq.common.message.MessageExt; + +public interface FlushManager { + + void start(); + + void shutdown(); + + void wakeUpFlush(); + + void wakeUpCommit(); + + void handleDiskFlush(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt); + + CompletableFuture handleDiskFlush(AppendMessageResult result, MessageExt messageExt); +} diff --git a/store/src/main/java/org/apache/rocketmq/store/TransientStorePool.java b/store/src/main/java/org/apache/rocketmq/store/TransientStorePool.java index a873fe05b76..cab7f931faf 100644 --- a/store/src/main/java/org/apache/rocketmq/store/TransientStorePool.java +++ b/store/src/main/java/org/apache/rocketmq/store/TransientStorePool.java @@ -24,7 +24,6 @@ import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.logging.org.slf4j.Logger; import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; -import org.apache.rocketmq.store.config.MessageStoreConfig; import org.apache.rocketmq.store.util.LibC; import sun.nio.ch.DirectBuffer; @@ -34,12 +33,13 @@ public class TransientStorePool { private final int poolSize; private final int fileSize; private final Deque availableBuffers; - private final MessageStoreConfig storeConfig; + private final DefaultMessageStore messageStore; + private volatile boolean isRealCommit; - public TransientStorePool(final MessageStoreConfig storeConfig) { - this.storeConfig = storeConfig; - this.poolSize = storeConfig.getTransientStorePoolSize(); - this.fileSize = storeConfig.getMappedFileSizeCommitLog(); + public TransientStorePool(final DefaultMessageStore messageStore) { + this.messageStore = messageStore; + this.poolSize = messageStore.getMessageStoreConfig().getTransientStorePoolSize(); + this.fileSize = messageStore.getMessageStoreConfig().getMappedFileSizeCommitLog(); this.availableBuffers = new ConcurrentLinkedDeque<>(); } @@ -81,9 +81,17 @@ public ByteBuffer borrowBuffer() { } public int availableBufferNums() { - if (storeConfig.isTransientStorePoolEnable()) { + if (messageStore.isTransientStorePoolEnable()) { return availableBuffers.size(); } return Integer.MAX_VALUE; } + + public boolean isRealCommit() { + return isRealCommit; + } + + public void setRealCommit(boolean realCommit) { + isRealCommit = realCommit; + } } diff --git a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java index 91663558ebb..e29fdc2b06a 100644 --- a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java +++ b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java @@ -965,12 +965,8 @@ public void setDefaultQueryMaxNum(int defaultQueryMaxNum) { this.defaultQueryMaxNum = defaultQueryMaxNum; } - /** - * Enable transient commitLog store pool only if transientStorePoolEnable is true and broker role is not SLAVE - * @return true or false - */ public boolean isTransientStorePoolEnable() { - return transientStorePoolEnable && BrokerRole.SLAVE != getBrokerRole(); + return transientStorePoolEnable; } public void setTransientStorePoolEnable(final boolean transientStorePoolEnable) { diff --git a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java index c4a9aeb812a..f2b421ecd20 100644 --- a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java +++ b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java @@ -136,6 +136,11 @@ public boolean changeToMaster(int masterEpoch) { } } + if (defaultMessageStore.isTransientStorePoolEnable()) { + waitingForAllCommit(); + defaultMessageStore.getTransientStorePool().setRealCommit(true); + } + LOGGER.info("TruncateOffset is {}, confirmOffset is {}, maxPhyOffset is {}", truncateOffset, getConfirmOffset(), this.defaultMessageStore.getMaxPhyOffset()); this.defaultMessageStore.recoverTopicQueueTable(); @@ -162,6 +167,12 @@ public boolean changeToSlave(String newMasterAddr, int newMasterEpoch, Long slav this.haClient.updateMasterAddress(newMasterAddr); this.haClient.updateHaMasterAddress(null); this.haClient.start(); + + if (defaultMessageStore.isTransientStorePoolEnable()) { + waitingForAllCommit(); + defaultMessageStore.getTransientStorePool().setRealCommit(false); + } + LOGGER.info("Change ha to slave success, newMasterAddress:{}, newMasterEpoch:{}", newMasterAddr, newMasterEpoch); return true; } catch (final Exception e) { @@ -170,6 +181,17 @@ public boolean changeToSlave(String newMasterAddr, int newMasterEpoch, Long slav } } + public void waitingForAllCommit() { + while (getDefaultMessageStore().remainHowManyDataToCommit() > 0) { + getDefaultMessageStore().getCommitLog().getFlushManager().wakeUpCommit(); + try { + Thread.sleep(100); + } catch (Exception e) { + + } + } + } + @Override public HAClient getHAClient() { return this.haClient; diff --git a/store/src/main/java/org/apache/rocketmq/store/logfile/DefaultMappedFile.java b/store/src/main/java/org/apache/rocketmq/store/logfile/DefaultMappedFile.java index 76ba89eba57..7b56150f64a 100644 --- a/store/src/main/java/org/apache/rocketmq/store/logfile/DefaultMappedFile.java +++ b/store/src/main/java/org/apache/rocketmq/store/logfile/DefaultMappedFile.java @@ -102,7 +102,7 @@ public DefaultMappedFile(final String fileName, final int fileSize) throws IOExc } public DefaultMappedFile(final String fileName, final int fileSize, - final TransientStorePool transientStorePool) throws IOException { + final TransientStorePool transientStorePool) throws IOException { init(fileName, fileSize, transientStorePool); } @@ -116,7 +116,7 @@ public static long getTotalMappedVirtualMemory() { @Override public void init(final String fileName, final int fileSize, - final TransientStorePool transientStorePool) throws IOException { + final TransientStorePool transientStorePool) throws IOException { init(fileName, fileSize); this.writeBuffer = transientStorePool.borrowBuffer(); this.transientStorePool = transientStorePool; @@ -186,11 +186,11 @@ public boolean getData(int pos, int size, ByteBuffer byteBuffer) { } } else { log.debug("matched, but hold failed, request pos: " + pos + ", fileFromOffset: " - + this.fileFromOffset); + + this.fileFromOffset); } } else { log.warn("selectMappedBuffer request pos invalid, request pos: " + pos + ", size: " + size - + ", fileFromOffset: " + this.fileFromOffset); + + ", fileFromOffset: " + this.fileFromOffset); } return false; @@ -225,18 +225,18 @@ public AppendMessageResult appendMessage(final ByteBuffer byteBufferMsg, final C @Override public AppendMessageResult appendMessage(final MessageExtBrokerInner msg, final AppendMessageCallback cb, - PutMessageContext putMessageContext) { + PutMessageContext putMessageContext) { return appendMessagesInner(msg, cb, putMessageContext); } @Override public AppendMessageResult appendMessages(final MessageExtBatch messageExtBatch, final AppendMessageCallback cb, - PutMessageContext putMessageContext) { + PutMessageContext putMessageContext) { return appendMessagesInner(messageExtBatch, cb, putMessageContext); } public AppendMessageResult appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb, - PutMessageContext putMessageContext) { + PutMessageContext putMessageContext) { assert messageExt != null; assert cb != null; @@ -249,11 +249,11 @@ public AppendMessageResult appendMessagesInner(final MessageExt messageExt, fina if (messageExt instanceof MessageExtBatch && !((MessageExtBatch) messageExt).isInnerBatch()) { // traditional batch message result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, - (MessageExtBatch) messageExt, putMessageContext); + (MessageExtBatch) messageExt, putMessageContext); } else if (messageExt instanceof MessageExtBrokerInner) { // traditional single message or newly introduced inner-batch message result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, - (MessageExtBrokerInner) messageExt, putMessageContext); + (MessageExtBrokerInner) messageExt, putMessageContext); } else { return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR); } @@ -364,7 +364,11 @@ public int commit(final int commitLeastPages) { //no need to commit data to file channel, so just regard wrotePosition as committedPosition. return WROTE_POSITION_UPDATER.get(this); } - if (this.isAbleToCommit(commitLeastPages)) { + + //no need to commit data to file channel, so just set committedPosition to wrotePosition. + if (transientStorePool != null && !transientStorePool.isRealCommit()) { + COMMITTED_POSITION_UPDATER.set(this, WROTE_POSITION_UPDATER.get(this)); + } else if (this.isAbleToCommit(commitLeastPages)) { if (this.hold()) { commit0(); this.release(); @@ -459,11 +463,11 @@ public SelectMappedBufferResult selectMappedBuffer(int pos, int size) { return new SelectMappedBufferResult(this.fileFromOffset + pos, byteBufferNew, size, this); } else { log.warn("matched, but hold failed, request pos: " + pos + ", fileFromOffset: " - + this.fileFromOffset); + + this.fileFromOffset); } } else { log.warn("selectMappedBuffer request pos invalid, request pos: " + pos + ", size: " + size - + ", fileFromOffset: " + this.fileFromOffset); + + ", fileFromOffset: " + this.fileFromOffset); } return null; @@ -491,13 +495,13 @@ public SelectMappedBufferResult selectMappedBuffer(int pos) { public boolean cleanup(final long currentRef) { if (this.isAvailable()) { log.error("this file[REF:" + currentRef + "] " + this.fileName - + " have not shutdown, stop unmapping."); + + " have not shutdown, stop unmapping."); return false; } if (this.isCleanupOver()) { log.error("this file[REF:" + currentRef + "] " + this.fileName - + " have cleanup, do not do it again."); + + " have cleanup, do not do it again."); return true; } @@ -523,10 +527,10 @@ public boolean destroy(final long intervalForcibly) { long beginTime = System.currentTimeMillis(); boolean result = this.file.delete(); log.info("delete file[REF:" + this.getRefCount() + "] " + this.fileName - + (result ? " OK, " : " Failed, ") + "W:" + this.getWrotePosition() + " M:" - + this.getFlushedPosition() + ", " - + UtilAll.computeElapsedTimeMilliseconds(beginTime) - + "," + (System.currentTimeMillis() - lastModified)); + + (result ? " OK, " : " Failed, ") + "W:" + this.getWrotePosition() + " M:" + + this.getFlushedPosition() + ", " + + UtilAll.computeElapsedTimeMilliseconds(beginTime) + + "," + (System.currentTimeMillis() - lastModified)); } catch (Exception e) { log.warn("close file channel " + this.fileName + " Failed. ", e); } @@ -534,7 +538,7 @@ public boolean destroy(final long intervalForcibly) { return true; } else { log.warn("destroy mapped file[REF:" + this.getRefCount() + "] " + this.fileName - + " Failed. cleanupOver: " + this.cleanupOver); + + " Failed. cleanupOver: " + this.cleanupOver); } return false; @@ -555,7 +559,7 @@ public void setWrotePosition(int pos) { */ @Override public int getReadPosition() { - return this.writeBuffer == null ? WROTE_POSITION_UPDATER.get(this) : COMMITTED_POSITION_UPDATER.get(this); + return transientStorePool == null || !transientStorePool.isRealCommit() ? WROTE_POSITION_UPDATER.get(this) : COMMITTED_POSITION_UPDATER.get(this); } @Override @@ -596,11 +600,11 @@ public void warmMappedFile(FlushDiskType type, int pages) { // force flush when prepare load finished if (type == FlushDiskType.SYNC_FLUSH) { log.info("mapped file warm-up done, force to disk, mappedFile={}, costTime={}", - this.getFileName(), System.currentTimeMillis() - beginTime); + this.getFileName(), System.currentTimeMillis() - beginTime); mappedByteBuffer.force(); } log.info("mapped file warm-up done. mappedFile={}, costTime={}", this.getFileName(), - System.currentTimeMillis() - beginTime); + System.currentTimeMillis() - beginTime); this.mlock(); }