Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #5714] Fix the issue that broker can't work normally when transientStorePool=true in controller mode #5722

Merged
merged 15 commits into from
Jan 1, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@
import org.apache.rocketmq.remoting.rpc.RpcRequest;
import org.apache.rocketmq.remoting.rpc.RpcResponse;
import org.apache.rocketmq.store.ConsumeQueueExt;
import org.apache.rocketmq.store.DefaultMessageStore;
import org.apache.rocketmq.store.MessageFilter;
import org.apache.rocketmq.store.MessageStore;
import org.apache.rocketmq.store.PutMessageResult;
Expand Down Expand Up @@ -2276,7 +2277,7 @@ private HashMap<String, String> prepareRuntimeInfo() {
}
MessageStore messageStore = this.brokerController.getMessageStore();
runtimeInfo.put("remainTransientStoreBufferNumbs", String.valueOf(messageStore.remainTransientStoreBufferNumbs()));
if (this.brokerController.getMessageStoreConfig().isTransientStorePoolEnable()) {
if (this.brokerController.getMessageStore() instanceof DefaultMessageStore && ((DefaultMessageStore) this.brokerController.getMessageStore()).isTransientStorePoolEnable()) {
runtimeInfo.put("remainHowManyDataToCommit", MixAll.humanReadableByteCount(messageStore.remainHowManyDataToCommit(), false));
}
runtimeInfo.put("remainHowManyDataToFlush", MixAll.humanReadableByteCount(messageStore.remainHowManyDataToFlush(), false));
Expand Down Expand Up @@ -2606,7 +2607,6 @@ private RemotingCommand getBrokerEpochCache(ChannelHandlerContext ctx, RemotingC
return response;
}


private RemotingCommand resetMasterFlushOffset(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public AllocateMappedFileService(DefaultMessageStore messageStore) {

public MappedFile putRequestAndReturnMappedFile(String nextFilePath, String nextNextFilePath, int fileSize) {
int canSubmitRequests = 2;
if (this.messageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
if (this.messageStore.isTransientStorePoolEnable()) {
if (this.messageStore.getMessageStoreConfig().isFastFailIfNoBufferInStorePool()
&& BrokerRole.SLAVE != this.messageStore.getMessageStoreConfig().getBrokerRole()) { //if broker is slave, don't fast fail even no buffer in pool
canSubmitRequests = this.messageStore.getTransientStorePool().availableBufferNums() - this.requestQueue.size();
Expand Down Expand Up @@ -171,7 +171,7 @@ private boolean mmapOperation() {
long beginTime = System.currentTimeMillis();

MappedFile mappedFile;
if (messageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
if (messageStore.isTransientStorePoolEnable()) {
try {
mappedFile = ServiceLoader.load(MappedFile.class).iterator().next();
mappedFile.init(req.getFilePath(), req.getFileSize(), messageStore.getTransientStorePool());
Expand Down
50 changes: 22 additions & 28 deletions store/src/main/java/org/apache/rocketmq/store/CommitLog.java
Original file line number Diff line number Diff line change
Expand Up @@ -531,7 +531,6 @@ public DispatchRequest checkMessageAndReturnSize(java.nio.ByteBuffer byteBuffer,

return dispatchRequest;
} catch (Exception e) {
log.error("Check message and return size error", e);
}

return new DispatchRequest(-1, false /* success */);
Expand Down Expand Up @@ -1821,24 +1820,13 @@ public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer

}

interface FlushManager {
void start();

void shutdown();

void wakeUpFlush();

void handleDiskFlush(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt);

CompletableFuture<PutMessageStatus> handleDiskFlush(AppendMessageResult result, MessageExt messageExt);
}

class DefaultFlushManager implements FlushManager {

private final FlushCommitLogService flushCommitLogService;

//If TransientStorePool enabled, we must flush message to FileChannel at fixed periods
private final FlushCommitLogService commitLogService;
private final FlushCommitLogService commitRealTimeService;

public DefaultFlushManager() {
if (FlushDiskType.SYNC_FLUSH == CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
Expand All @@ -1847,15 +1835,14 @@ public DefaultFlushManager() {
this.flushCommitLogService = new CommitLog.FlushRealTimeService();
}

this.commitLogService = new CommitLog.CommitRealTimeService();
this.commitRealTimeService = new CommitLog.CommitRealTimeService();
}

@Override
public void start() {
@Override public void start() {
this.flushCommitLogService.start();

if (defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
this.commitLogService.start();
if (defaultMessageStore.isTransientStorePoolEnable()) {
this.commitRealTimeService.start();
}
}

Expand All @@ -1870,14 +1857,12 @@ public void handleDiskFlush(AppendMessageResult result, PutMessageResult putMess
CompletableFuture<PutMessageStatus> flushOkFuture = request.future();
PutMessageStatus flushStatus = null;
try {
flushStatus = flushOkFuture.get(CommitLog.this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout(),
TimeUnit.MILLISECONDS);
flushStatus = flushOkFuture.get(CommitLog.this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout(), TimeUnit.MILLISECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
//flushOK=false;
}
if (flushStatus != PutMessageStatus.PUT_OK) {
log.error("do groupcommit, wait for flush failed, topic: " + messageExt.getTopic() + " tags: " + messageExt.getTags()
+ " client address: " + messageExt.getBornHostString());
log.error("do groupcommit, wait for flush failed, topic: " + messageExt.getTopic() + " tags: " + messageExt.getTags() + " client address: " + messageExt.getBornHostString());
putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);
}
} else {
Expand All @@ -1886,10 +1871,10 @@ public void handleDiskFlush(AppendMessageResult result, PutMessageResult putMess
}
// Asynchronous flush
else {
if (!CommitLog.this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
if (!CommitLog.this.defaultMessageStore.isTransientStorePoolEnable()) {
flushCommitLogService.wakeup();
} else {
commitLogService.wakeup();
commitRealTimeService.wakeup();
}
}
}
Expand All @@ -1911,10 +1896,10 @@ public CompletableFuture<PutMessageStatus> handleDiskFlush(AppendMessageResult r
}
// Asynchronous flush
else {
if (!CommitLog.this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
if (!CommitLog.this.defaultMessageStore.isTransientStorePoolEnable()) {
flushCommitLogService.wakeup();
} else {
commitLogService.wakeup();
commitRealTimeService.wakeup();
}
return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);
}
Expand All @@ -1926,10 +1911,16 @@ public void wakeUpFlush() {
flushCommitLogService.wakeup();
}

@Override
public void wakeUpCommit() {
// now wake up commit log thread.
commitRealTimeService.wakeup();
}

@Override
public void shutdown() {
if (defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
this.commitLogService.shutdown();
if (defaultMessageStore.isTransientStorePoolEnable()) {
this.commitRealTimeService.shutdown();
}

this.flushCommitLogService.shutdown();
Expand Down Expand Up @@ -1963,4 +1954,7 @@ public void cleanSwappedMap(long forceCleanSwapIntervalMs) {
this.getMappedFileQueue().cleanSwappedMap(forceCleanSwapIntervalMs);
}

public FlushManager getFlushManager() {
return flushManager;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ public DefaultMessageStore(final MessageStoreConfig messageStoreConfig, final Br

this.reputMessageService = new ReputMessageService();

this.transientStorePool = new TransientStorePool(messageStoreConfig);
this.transientStorePool = new TransientStorePool(this);

this.scheduledExecutorService =
Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("StoreScheduledThread", getBrokerIdentity()));
Expand Down Expand Up @@ -338,7 +338,7 @@ public void start() throws Exception {
this.haService.init(this);
}

if (messageStoreConfig.isTransientStorePoolEnable()) {
if (this.isTransientStorePoolEnable()) {
this.transientStorePool.init();
}

Expand Down Expand Up @@ -1067,6 +1067,7 @@ public long getMaxPhyOffset() {
return this.commitLog.getMaxOffset();
}


@Override
public long getMinPhyOffset() {
return this.commitLog.getMinOffset();
Expand Down Expand Up @@ -2745,4 +2746,15 @@ public List<Pair<InstrumentSelector, View>> getMetricsView() {
public void initMetrics(Meter meter, Supplier<AttributesBuilder> attributesBuilderSupplier) {
DefaultStoreMetricsManager.init(meter, attributesBuilderSupplier, this);
}

/**
* Enable transient commitLog store pool only if transientStorePoolEnable is true and broker role is not SLAVE or
* enableControllerMode is true
*
* @return <tt>true</tt> or <tt>false</tt>
*/
public boolean isTransientStorePoolEnable() {
return this.messageStoreConfig.isTransientStorePoolEnable() &&
(this.brokerConfig.isEnableControllerMode() || this.messageStoreConfig.getBrokerRole() != BrokerRole.SLAVE);
}
}
36 changes: 36 additions & 0 deletions store/src/main/java/org/apache/rocketmq/store/FlushManager.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.rocketmq.store;

import java.util.concurrent.CompletableFuture;
import org.apache.rocketmq.common.message.MessageExt;

public interface FlushManager {

void start();

void shutdown();

void wakeUpFlush();

void wakeUpCommit();

void handleDiskFlush(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt);

CompletableFuture<PutMessageStatus> handleDiskFlush(AppendMessageResult result, MessageExt messageExt);
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.apache.rocketmq.store.util.LibC;
import sun.nio.ch.DirectBuffer;

Expand All @@ -34,12 +33,13 @@ public class TransientStorePool {
private final int poolSize;
private final int fileSize;
private final Deque<ByteBuffer> availableBuffers;
private final MessageStoreConfig storeConfig;
private final DefaultMessageStore messageStore;
private volatile boolean isRealCommit;

public TransientStorePool(final MessageStoreConfig storeConfig) {
this.storeConfig = storeConfig;
this.poolSize = storeConfig.getTransientStorePoolSize();
this.fileSize = storeConfig.getMappedFileSizeCommitLog();
public TransientStorePool(final DefaultMessageStore messageStore) {
this.messageStore = messageStore;
this.poolSize = messageStore.getMessageStoreConfig().getTransientStorePoolSize();
this.fileSize = messageStore.getMessageStoreConfig().getMappedFileSizeCommitLog();
this.availableBuffers = new ConcurrentLinkedDeque<>();
}

Expand Down Expand Up @@ -81,9 +81,17 @@ public ByteBuffer borrowBuffer() {
}

public int availableBufferNums() {
if (storeConfig.isTransientStorePoolEnable()) {
if (messageStore.isTransientStorePoolEnable()) {
return availableBuffers.size();
}
return Integer.MAX_VALUE;
}

public boolean isRealCommit() {
return isRealCommit;
}

public void setRealCommit(boolean realCommit) {
isRealCommit = realCommit;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -965,12 +965,8 @@ public void setDefaultQueryMaxNum(int defaultQueryMaxNum) {
this.defaultQueryMaxNum = defaultQueryMaxNum;
}

/**
* Enable transient commitLog store pool only if transientStorePoolEnable is true and broker role is not SLAVE
* @return <tt>true</tt> or <tt>false</tt>
*/
public boolean isTransientStorePoolEnable() {
return transientStorePoolEnable && BrokerRole.SLAVE != getBrokerRole();
return transientStorePoolEnable;
}

public void setTransientStorePoolEnable(final boolean transientStorePoolEnable) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,11 @@ public boolean changeToMaster(int masterEpoch) {
}
}

if (defaultMessageStore.isTransientStorePoolEnable()) {
waitingForAllCommit();
defaultMessageStore.getTransientStorePool().setRealCommit(true);
}

LOGGER.info("TruncateOffset is {}, confirmOffset is {}, maxPhyOffset is {}", truncateOffset, getConfirmOffset(), this.defaultMessageStore.getMaxPhyOffset());

this.defaultMessageStore.recoverTopicQueueTable();
Expand All @@ -162,6 +167,12 @@ public boolean changeToSlave(String newMasterAddr, int newMasterEpoch, Long slav
this.haClient.updateMasterAddress(newMasterAddr);
this.haClient.updateHaMasterAddress(null);
this.haClient.start();

if (defaultMessageStore.isTransientStorePoolEnable()) {
waitingForAllCommit();
defaultMessageStore.getTransientStorePool().setRealCommit(false);
}

LOGGER.info("Change ha to slave success, newMasterAddress:{}, newMasterEpoch:{}", newMasterAddr, newMasterEpoch);
return true;
} catch (final Exception e) {
Expand All @@ -170,6 +181,17 @@ public boolean changeToSlave(String newMasterAddr, int newMasterEpoch, Long slav
}
}

public void waitingForAllCommit() {
while (getDefaultMessageStore().remainHowManyDataToCommit() > 0) {
getDefaultMessageStore().getCommitLog().getFlushManager().wakeUpCommit();
try {
Thread.sleep(100);
} catch (Exception e) {

}
}
}

@Override
public HAClient getHAClient() {
return this.haClient;
Expand Down
Loading