Skip to content

Commit

Permalink
[ISSUE apache#6324] improving compact topic stability
Browse files Browse the repository at this point in the history
  • Loading branch information
fuyou001 committed Mar 13, 2023
1 parent d0df051 commit f7fb512
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public class MessageStoreConfig {

private int maxOffsetMapSize = 100 * 1024 * 1024;

private int compactionThreadNum = 0;
private int compactionThreadNum = 6;

private boolean enableCompaction = true;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@
*/
package org.apache.rocketmq.store.kv;

import java.util.Random;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.org.slf4j.Logger;
Expand Down Expand Up @@ -64,11 +67,8 @@ public CompactionStore(MessageStore defaultMessageStore) {
this.compactionLogPath = Paths.get(compactionPath, COMPACTION_LOG_DIR).toString();
this.compactionCqPath = Paths.get(compactionPath, COMPACTION_CQ_DIR).toString();
this.positionMgr = new CompactionPositionMgr(compactionPath);
if (config.getCompactionThreadNum() <= 0) {
this.compactionThreadNum = Runtime.getRuntime().availableProcessors();
} else {
this.compactionThreadNum = config.getCompactionThreadNum();
}
this.compactionThreadNum = Math.min(Runtime.getRuntime().availableProcessors(), config.getCompactionThreadNum());

this.compactionSchedule = Executors.newScheduledThreadPool(this.compactionThreadNum,
new ThreadFactoryImpl("compactionSchedule_"));
this.offsetMapSize = config.getMaxOffsetMapSize() / compactionThreadNum;
Expand Down Expand Up @@ -99,7 +99,8 @@ public void load(boolean exitOk) throws Exception {
CompactionLog log = new CompactionLog(defaultMessageStore, this, topic, queueId);
log.load(exitOk);
compactionLogTable.put(topic + "_" + queueId, log);
compactionSchedule.scheduleWithFixedDelay(log::doCompaction, compactionInterval, compactionInterval, TimeUnit.MILLISECONDS);
int randomDelay = 1000 + new Random(System.currentTimeMillis()).nextInt(compactionInterval);
compactionSchedule.scheduleWithFixedDelay(log::doCompaction, compactionInterval + randomDelay, compactionInterval + randomDelay, TimeUnit.MILLISECONDS);
} else {
log.error("{}:{} compactionLog mismatch with compactionCq", topic, queueId);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,12 +86,16 @@ public void execute(CommandLine commandLine, Options options, RPCHook rpcHook)
bb.rewind();
}

MessageExt messageExt = MessageDecoder.decode(bb, false, false);
if (messageExt == null) {
break;
} else {
current += size;
System.out.printf(messageExt + "\n");
try {
MessageExt messageExt = MessageDecoder.decode(bb, false, false);
if (messageExt == null) {
break;
} else {
current += size;
System.out.printf(messageExt + "\n");
}
} catch (Exception e) {
e.printStackTrace();
}
}

Expand Down

0 comments on commit f7fb512

Please sign in to comment.