Skip to content

Commit

Permalink
just test
Browse files Browse the repository at this point in the history
  • Loading branch information
RongtongJin committed Dec 22, 2022
1 parent 18ff2c6 commit 773a66e
Show file tree
Hide file tree
Showing 5 changed files with 107 additions and 25 deletions.
60 changes: 40 additions & 20 deletions store/src/main/java/org/apache/rocketmq/store/CommitLog.java
Original file line number Diff line number Diff line change
Expand Up @@ -505,7 +505,6 @@ public DispatchRequest checkMessageAndReturnSize(java.nio.ByteBuffer byteBuffer,

return dispatchRequest;
} catch (Exception e) {
log.error("Check message and return size error", e);
}

return new DispatchRequest(-1, false /* success */);
Expand Down Expand Up @@ -1215,6 +1214,8 @@ class CommitRealTimeService extends FlushCommitLogService {

private long lastCommitTimestamp = 0;

private volatile boolean shouldRunningCommit = false;

@Override public String getServiceName() {
if (CommitLog.this.defaultMessageStore.getBrokerConfig().isInBrokerContainer()) {
return CommitLog.this.defaultMessageStore.getBrokerIdentity().getIdentifier() + CommitRealTimeService.class.getSimpleName();
Expand All @@ -1226,6 +1227,11 @@ class CommitRealTimeService extends FlushCommitLogService {
CommitLog.log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {

if (!shouldRunningCommit) {
waitForRunning(5 * 1000);
continue;
}

int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitIntervalCommitLog();

int commitDataLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogLeastPages();
Expand All @@ -1239,7 +1245,7 @@ class CommitRealTimeService extends FlushCommitLogService {
}

try {
boolean result = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE || CommitLog.this.mappedFileQueue.commit(commitDataLeastPages);
boolean result = CommitLog.this.mappedFileQueue.commit(commitDataLeastPages);
long end = System.currentTimeMillis();
if (!result) {
this.lastCommitTimestamp = end; // result = false means some data committed.
Expand All @@ -1262,6 +1268,14 @@ class CommitRealTimeService extends FlushCommitLogService {
}
CommitLog.log.info(this.getServiceName() + " service end");
}

public void setShouldRunningCommit(boolean shouldRunningCommit) {
this.shouldRunningCommit = shouldRunningCommit;
}

public boolean isShouldRunningCommit() {
return shouldRunningCommit;
}
}

class FlushRealTimeService extends FlushCommitLogService {
Expand Down Expand Up @@ -1759,27 +1773,15 @@ public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer

return result;
}

}

interface FlushManager {
void start();

void shutdown();

void wakeUpFlush();

void handleDiskFlush(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt);

CompletableFuture<PutMessageStatus> handleDiskFlush(AppendMessageResult result, MessageExt messageExt);
}

class DefaultFlushManager implements FlushManager {

private final FlushCommitLogService flushCommitLogService;

//If TransientStorePool enabled, we must flush message to FileChannel at fixed periods
private final FlushCommitLogService commitLogService;
private final CommitRealTimeService commitRealTimeService;

public DefaultFlushManager() {
if (FlushDiskType.SYNC_FLUSH == CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
Expand All @@ -1788,14 +1790,14 @@ public DefaultFlushManager() {
this.flushCommitLogService = new CommitLog.FlushRealTimeService();
}

this.commitLogService = new CommitLog.CommitRealTimeService();
this.commitRealTimeService = new CommitLog.CommitRealTimeService();
}

@Override public void start() {
this.flushCommitLogService.start();

if (defaultMessageStore.isTransientStorePoolEnable()) {
this.commitLogService.start();
this.commitRealTimeService.start();
}
}

Expand Down Expand Up @@ -1827,7 +1829,7 @@ public void handleDiskFlush(AppendMessageResult result, PutMessageResult putMess
if (!CommitLog.this.defaultMessageStore.isTransientStorePoolEnable()) {
flushCommitLogService.wakeup();
} else {
commitLogService.wakeup();
commitRealTimeService.wakeup();
}
}
}
Expand All @@ -1852,7 +1854,7 @@ public CompletableFuture<PutMessageStatus> handleDiskFlush(AppendMessageResult r
if (!CommitLog.this.defaultMessageStore.isTransientStorePoolEnable()) {
flushCommitLogService.wakeup();
} else {
commitLogService.wakeup();
commitRealTimeService.wakeup();
}
return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);
}
Expand All @@ -1863,9 +1865,24 @@ public CompletableFuture<PutMessageStatus> handleDiskFlush(AppendMessageResult r
flushCommitLogService.wakeup();
}

@Override
public void wakeUpCommit() {
// now wake up commit log thread.
commitRealTimeService.wakeup();
}

@Override
public void setShouldRunningCommit(boolean shouldRunningCommit) {
commitRealTimeService.setShouldRunningCommit(shouldRunningCommit);
}

@Override public boolean isRunningCommit() {
return commitRealTimeService.isShouldRunningCommit();
}

@Override public void shutdown() {
if (defaultMessageStore.isTransientStorePoolEnable()) {
this.commitLogService.shutdown();
this.commitRealTimeService.shutdown();
}

this.flushCommitLogService.shutdown();
Expand Down Expand Up @@ -1897,4 +1914,7 @@ public boolean isMappedFilesEmpty() {
this.getMappedFileQueue().cleanSwappedMap(forceCleanSwapIntervalMs);
}

public FlushManager getFlushManager() {
return flushManager;
}
}
40 changes: 40 additions & 0 deletions store/src/main/java/org/apache/rocketmq/store/FlushManager.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.rocketmq.store;

import java.util.concurrent.CompletableFuture;
import org.apache.rocketmq.common.message.MessageExt;

public interface FlushManager {

void start();

void shutdown();

void wakeUpFlush();

void wakeUpCommit();

void setShouldRunningCommit(boolean shouldRunningCommit);

boolean isRunningCommit();

void handleDiskFlush(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt);

CompletableFuture<PutMessageStatus> handleDiskFlush(AppendMessageResult result, MessageExt messageExt);
}
Original file line number Diff line number Diff line change
Expand Up @@ -89,4 +89,8 @@ public int availableBufferNums() {
public DefaultMessageStore getMessageStore() {
return messageStore;
}

public boolean isRunningCommit() {
return messageStore.getCommitLog().getFlushManager().isRunningCommit();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,11 @@ public boolean changeToMaster(int masterEpoch) {
}
}

if (defaultMessageStore.isTransientStorePoolEnable()) {
defaultMessageStore.getCommitLog().resetOffset(defaultMessageStore.getMaxPhyOffset());
defaultMessageStore.getCommitLog().getFlushManager().setShouldRunningCommit(true);
}

LOGGER.info("TruncateOffset is {}, confirmOffset is {}, maxPhyOffset is {}", truncateOffset, getConfirmOffset(), this.defaultMessageStore.getMaxPhyOffset());

this.defaultMessageStore.recoverTopicQueueTable();
Expand All @@ -162,6 +167,19 @@ public boolean changeToSlave(String newMasterAddr, int newMasterEpoch, Long slav
this.haClient.updateMasterAddress(newMasterAddr);
this.haClient.updateHaMasterAddress(null);
this.haClient.start();

if (defaultMessageStore.isTransientStorePoolEnable()) {
while (getDefaultMessageStore().remainHowManyDataToCommit() > 0) {
getDefaultMessageStore().getCommitLog().getFlushManager().wakeUpCommit();
try {
Thread.sleep(100);
} catch (Exception e) {

}
}
defaultMessageStore.getCommitLog().getFlushManager().setShouldRunningCommit(false);
}

LOGGER.info("Change ha to slave success, newMasterAddress:{}, newMasterEpoch:{}", newMasterAddr, newMasterEpoch);
return true;
} catch (final Exception e) {
Expand Down Expand Up @@ -199,8 +217,8 @@ public void notifySyncStateSetChanged(final Set<String> newSyncStateSet) {
}

/**
* Check and maybe shrink the inSyncStateSet.
* A slave will be removed from inSyncStateSet if (curTime - HaConnection.lastCaughtUpTime) > option(haMaxTimeSlaveNotCatchup)
* Check and maybe shrink the inSyncStateSet. A slave will be removed from inSyncStateSet if (curTime -
* HaConnection.lastCaughtUpTime) > option(haMaxTimeSlaveNotCatchup)
*/
public Set<String> maybeShrinkInSyncStateSet() {
final Set<String> newSyncStateSet = getSyncStateSet();
Expand Down Expand Up @@ -385,7 +403,8 @@ public long truncateInvalidMsg() {
} finally {
result.release();
}
} while (reputFromOffset < this.defaultMessageStore.getMaxPhyOffset() && doNext);
}
while (reputFromOffset < this.defaultMessageStore.getMaxPhyOffset() && doNext);

LOGGER.info("Truncate commitLog to {}", reputFromOffset);
this.defaultMessageStore.truncateDirtyFiles(reputFromOffset);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@
import org.apache.rocketmq.store.PutMessageContext;
import org.apache.rocketmq.store.SelectMappedBufferResult;
import org.apache.rocketmq.store.TransientStorePool;
import org.apache.rocketmq.store.config.BrokerRole;
import org.apache.rocketmq.store.config.FlushDiskType;
import org.apache.rocketmq.store.util.LibC;
import sun.nio.ch.DirectBuffer;
Expand Down Expand Up @@ -550,7 +549,7 @@ public int getReadPosition() {
// } else {
// return COMMITTED_POSITION_UPDATER.get(this);
// }
return transientStorePool == null || transientStorePool.getMessageStore().getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE ? WROTE_POSITION_UPDATER.get(this) : COMMITTED_POSITION_UPDATER.get(this);
return transientStorePool == null || !transientStorePool.isRunningCommit() ? WROTE_POSITION_UPDATER.get(this) : COMMITTED_POSITION_UPDATER.get(this);
}

@Override
Expand Down

0 comments on commit 773a66e

Please sign in to comment.