Skip to content

Commit

Permalink
[ISSUE #5714] Fix the issue that broker can't work normally when tran…
Browse files Browse the repository at this point in the history
…sientStorePool=true in controller mode (#5722)

* Fix the issue that the slave role does not initialize the transientPool in controller mode

* Format the checkstyle

* Remove the useless import

* Fix the HA transmission disconnection issue when transientStorePoolEnable is true

* just test

* just test

* just test

* just test

* just test

* just test

* just test

* just test

* just test

* Format the check style

* Format the check style
  • Loading branch information
RongtongJin authored Jan 1, 2023
1 parent 56490de commit e1dfc0d
Show file tree
Hide file tree
Showing 9 changed files with 140 additions and 68 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@
import org.apache.rocketmq.remoting.rpc.RpcRequest;
import org.apache.rocketmq.remoting.rpc.RpcResponse;
import org.apache.rocketmq.store.ConsumeQueueExt;
import org.apache.rocketmq.store.DefaultMessageStore;
import org.apache.rocketmq.store.MessageFilter;
import org.apache.rocketmq.store.MessageStore;
import org.apache.rocketmq.store.PutMessageResult;
Expand Down Expand Up @@ -2276,7 +2277,7 @@ private HashMap<String, String> prepareRuntimeInfo() {
}
MessageStore messageStore = this.brokerController.getMessageStore();
runtimeInfo.put("remainTransientStoreBufferNumbs", String.valueOf(messageStore.remainTransientStoreBufferNumbs()));
if (this.brokerController.getMessageStoreConfig().isTransientStorePoolEnable()) {
if (this.brokerController.getMessageStore() instanceof DefaultMessageStore && ((DefaultMessageStore) this.brokerController.getMessageStore()).isTransientStorePoolEnable()) {
runtimeInfo.put("remainHowManyDataToCommit", MixAll.humanReadableByteCount(messageStore.remainHowManyDataToCommit(), false));
}
runtimeInfo.put("remainHowManyDataToFlush", MixAll.humanReadableByteCount(messageStore.remainHowManyDataToFlush(), false));
Expand Down Expand Up @@ -2606,7 +2607,6 @@ private RemotingCommand getBrokerEpochCache(ChannelHandlerContext ctx, RemotingC
return response;
}


private RemotingCommand resetMasterFlushOffset(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public AllocateMappedFileService(DefaultMessageStore messageStore) {

public MappedFile putRequestAndReturnMappedFile(String nextFilePath, String nextNextFilePath, int fileSize) {
int canSubmitRequests = 2;
if (this.messageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
if (this.messageStore.isTransientStorePoolEnable()) {
if (this.messageStore.getMessageStoreConfig().isFastFailIfNoBufferInStorePool()
&& BrokerRole.SLAVE != this.messageStore.getMessageStoreConfig().getBrokerRole()) { //if broker is slave, don't fast fail even no buffer in pool
canSubmitRequests = this.messageStore.getTransientStorePool().availableBufferNums() - this.requestQueue.size();
Expand Down Expand Up @@ -171,7 +171,7 @@ private boolean mmapOperation() {
long beginTime = System.currentTimeMillis();

MappedFile mappedFile;
if (messageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
if (messageStore.isTransientStorePoolEnable()) {
try {
mappedFile = ServiceLoader.load(MappedFile.class).iterator().next();
mappedFile.init(req.getFilePath(), req.getFileSize(), messageStore.getTransientStorePool());
Expand Down
50 changes: 22 additions & 28 deletions store/src/main/java/org/apache/rocketmq/store/CommitLog.java
Original file line number Diff line number Diff line change
Expand Up @@ -531,7 +531,6 @@ public DispatchRequest checkMessageAndReturnSize(java.nio.ByteBuffer byteBuffer,

return dispatchRequest;
} catch (Exception e) {
log.error("Check message and return size error", e);
}

return new DispatchRequest(-1, false /* success */);
Expand Down Expand Up @@ -1821,24 +1820,13 @@ public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer

}

interface FlushManager {
void start();

void shutdown();

void wakeUpFlush();

void handleDiskFlush(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt);

CompletableFuture<PutMessageStatus> handleDiskFlush(AppendMessageResult result, MessageExt messageExt);
}

class DefaultFlushManager implements FlushManager {

private final FlushCommitLogService flushCommitLogService;

//If TransientStorePool enabled, we must flush message to FileChannel at fixed periods
private final FlushCommitLogService commitLogService;
private final FlushCommitLogService commitRealTimeService;

public DefaultFlushManager() {
if (FlushDiskType.SYNC_FLUSH == CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
Expand All @@ -1847,15 +1835,14 @@ public DefaultFlushManager() {
this.flushCommitLogService = new CommitLog.FlushRealTimeService();
}

this.commitLogService = new CommitLog.CommitRealTimeService();
this.commitRealTimeService = new CommitLog.CommitRealTimeService();
}

@Override
public void start() {
@Override public void start() {
this.flushCommitLogService.start();

if (defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
this.commitLogService.start();
if (defaultMessageStore.isTransientStorePoolEnable()) {
this.commitRealTimeService.start();
}
}

Expand All @@ -1870,14 +1857,12 @@ public void handleDiskFlush(AppendMessageResult result, PutMessageResult putMess
CompletableFuture<PutMessageStatus> flushOkFuture = request.future();
PutMessageStatus flushStatus = null;
try {
flushStatus = flushOkFuture.get(CommitLog.this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout(),
TimeUnit.MILLISECONDS);
flushStatus = flushOkFuture.get(CommitLog.this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout(), TimeUnit.MILLISECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
//flushOK=false;
}
if (flushStatus != PutMessageStatus.PUT_OK) {
log.error("do groupcommit, wait for flush failed, topic: " + messageExt.getTopic() + " tags: " + messageExt.getTags()
+ " client address: " + messageExt.getBornHostString());
log.error("do groupcommit, wait for flush failed, topic: " + messageExt.getTopic() + " tags: " + messageExt.getTags() + " client address: " + messageExt.getBornHostString());
putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);
}
} else {
Expand All @@ -1886,10 +1871,10 @@ public void handleDiskFlush(AppendMessageResult result, PutMessageResult putMess
}
// Asynchronous flush
else {
if (!CommitLog.this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
if (!CommitLog.this.defaultMessageStore.isTransientStorePoolEnable()) {
flushCommitLogService.wakeup();
} else {
commitLogService.wakeup();
commitRealTimeService.wakeup();
}
}
}
Expand All @@ -1911,10 +1896,10 @@ public CompletableFuture<PutMessageStatus> handleDiskFlush(AppendMessageResult r
}
// Asynchronous flush
else {
if (!CommitLog.this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
if (!CommitLog.this.defaultMessageStore.isTransientStorePoolEnable()) {
flushCommitLogService.wakeup();
} else {
commitLogService.wakeup();
commitRealTimeService.wakeup();
}
return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);
}
Expand All @@ -1926,10 +1911,16 @@ public void wakeUpFlush() {
flushCommitLogService.wakeup();
}

@Override
public void wakeUpCommit() {
// now wake up commit log thread.
commitRealTimeService.wakeup();
}

@Override
public void shutdown() {
if (defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
this.commitLogService.shutdown();
if (defaultMessageStore.isTransientStorePoolEnable()) {
this.commitRealTimeService.shutdown();
}

this.flushCommitLogService.shutdown();
Expand Down Expand Up @@ -1963,4 +1954,7 @@ public void cleanSwappedMap(long forceCleanSwapIntervalMs) {
this.getMappedFileQueue().cleanSwappedMap(forceCleanSwapIntervalMs);
}

public FlushManager getFlushManager() {
return flushManager;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ public DefaultMessageStore(final MessageStoreConfig messageStoreConfig, final Br

this.reputMessageService = new ReputMessageService();

this.transientStorePool = new TransientStorePool(messageStoreConfig);
this.transientStorePool = new TransientStorePool(this);

this.scheduledExecutorService =
Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("StoreScheduledThread", getBrokerIdentity()));
Expand Down Expand Up @@ -338,7 +338,7 @@ public void start() throws Exception {
this.haService.init(this);
}

if (messageStoreConfig.isTransientStorePoolEnable()) {
if (this.isTransientStorePoolEnable()) {
this.transientStorePool.init();
}

Expand Down Expand Up @@ -1067,6 +1067,7 @@ public long getMaxPhyOffset() {
return this.commitLog.getMaxOffset();
}


@Override
public long getMinPhyOffset() {
return this.commitLog.getMinOffset();
Expand Down Expand Up @@ -2745,4 +2746,15 @@ public List<Pair<InstrumentSelector, View>> getMetricsView() {
public void initMetrics(Meter meter, Supplier<AttributesBuilder> attributesBuilderSupplier) {
DefaultStoreMetricsManager.init(meter, attributesBuilderSupplier, this);
}

/**
* Enable transient commitLog store pool only if transientStorePoolEnable is true and broker role is not SLAVE or
* enableControllerMode is true
*
* @return <tt>true</tt> or <tt>false</tt>
*/
public boolean isTransientStorePoolEnable() {
return this.messageStoreConfig.isTransientStorePoolEnable() &&
(this.brokerConfig.isEnableControllerMode() || this.messageStoreConfig.getBrokerRole() != BrokerRole.SLAVE);
}
}
36 changes: 36 additions & 0 deletions store/src/main/java/org/apache/rocketmq/store/FlushManager.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.rocketmq.store;

import java.util.concurrent.CompletableFuture;
import org.apache.rocketmq.common.message.MessageExt;

public interface FlushManager {

void start();

void shutdown();

void wakeUpFlush();

void wakeUpCommit();

void handleDiskFlush(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt);

CompletableFuture<PutMessageStatus> handleDiskFlush(AppendMessageResult result, MessageExt messageExt);
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.apache.rocketmq.store.util.LibC;
import sun.nio.ch.DirectBuffer;

Expand All @@ -34,12 +33,13 @@ public class TransientStorePool {
private final int poolSize;
private final int fileSize;
private final Deque<ByteBuffer> availableBuffers;
private final MessageStoreConfig storeConfig;
private final DefaultMessageStore messageStore;
private volatile boolean isRealCommit;

public TransientStorePool(final MessageStoreConfig storeConfig) {
this.storeConfig = storeConfig;
this.poolSize = storeConfig.getTransientStorePoolSize();
this.fileSize = storeConfig.getMappedFileSizeCommitLog();
public TransientStorePool(final DefaultMessageStore messageStore) {
this.messageStore = messageStore;
this.poolSize = messageStore.getMessageStoreConfig().getTransientStorePoolSize();
this.fileSize = messageStore.getMessageStoreConfig().getMappedFileSizeCommitLog();
this.availableBuffers = new ConcurrentLinkedDeque<>();
}

Expand Down Expand Up @@ -81,9 +81,17 @@ public ByteBuffer borrowBuffer() {
}

public int availableBufferNums() {
if (storeConfig.isTransientStorePoolEnable()) {
if (messageStore.isTransientStorePoolEnable()) {
return availableBuffers.size();
}
return Integer.MAX_VALUE;
}

public boolean isRealCommit() {
return isRealCommit;
}

public void setRealCommit(boolean realCommit) {
isRealCommit = realCommit;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -965,12 +965,8 @@ public void setDefaultQueryMaxNum(int defaultQueryMaxNum) {
this.defaultQueryMaxNum = defaultQueryMaxNum;
}

/**
* Enable transient commitLog store pool only if transientStorePoolEnable is true and broker role is not SLAVE
* @return <tt>true</tt> or <tt>false</tt>
*/
public boolean isTransientStorePoolEnable() {
return transientStorePoolEnable && BrokerRole.SLAVE != getBrokerRole();
return transientStorePoolEnable;
}

public void setTransientStorePoolEnable(final boolean transientStorePoolEnable) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,11 @@ public boolean changeToMaster(int masterEpoch) {
}
}

if (defaultMessageStore.isTransientStorePoolEnable()) {
waitingForAllCommit();
defaultMessageStore.getTransientStorePool().setRealCommit(true);
}

LOGGER.info("TruncateOffset is {}, confirmOffset is {}, maxPhyOffset is {}", truncateOffset, getConfirmOffset(), this.defaultMessageStore.getMaxPhyOffset());

this.defaultMessageStore.recoverTopicQueueTable();
Expand All @@ -162,6 +167,12 @@ public boolean changeToSlave(String newMasterAddr, int newMasterEpoch, Long slav
this.haClient.updateMasterAddress(newMasterAddr);
this.haClient.updateHaMasterAddress(null);
this.haClient.start();

if (defaultMessageStore.isTransientStorePoolEnable()) {
waitingForAllCommit();
defaultMessageStore.getTransientStorePool().setRealCommit(false);
}

LOGGER.info("Change ha to slave success, newMasterAddress:{}, newMasterEpoch:{}", newMasterAddr, newMasterEpoch);
return true;
} catch (final Exception e) {
Expand All @@ -170,6 +181,17 @@ public boolean changeToSlave(String newMasterAddr, int newMasterEpoch, Long slav
}
}

public void waitingForAllCommit() {
while (getDefaultMessageStore().remainHowManyDataToCommit() > 0) {
getDefaultMessageStore().getCommitLog().getFlushManager().wakeUpCommit();
try {
Thread.sleep(100);
} catch (Exception e) {

}
}
}

@Override
public HAClient getHAClient() {
return this.haClient;
Expand Down
Loading

0 comments on commit e1dfc0d

Please sign in to comment.