Skip to content

Commit

Permalink
[ISSUE #8822] Double write cq, reduce unnecessary switches (#8823)
Browse files Browse the repository at this point in the history
* Reduce unnecessary switches
  • Loading branch information
LetLetMe authored Oct 29, 2024
1 parent a96a14f commit 0b24768
Show file tree
Hide file tree
Showing 21 changed files with 246 additions and 145 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,40 +17,40 @@
package org.apache.rocketmq.broker;

import com.alibaba.fastjson.JSON;
import java.nio.charset.StandardCharsets;
import java.util.function.BiConsumer;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.common.config.ConfigRocksDBStorage;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.remoting.protocol.DataVersion;
import org.rocksdb.CompressionType;
import org.rocksdb.FlushOptions;
import org.rocksdb.RocksIterator;
import org.rocksdb.Statistics;
import org.rocksdb.WriteBatch;

import java.nio.charset.StandardCharsets;
import java.util.function.BiConsumer;

public class RocksDBConfigManager {
protected static final Logger BROKER_LOG = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
public volatile boolean isStop = false;
public ConfigRocksDBStorage configRocksDBStorage = null;
private FlushOptions flushOptions = null;
private volatile long lastFlushMemTableMicroSecond = 0;

private final String filePath;
private final long memTableFlushInterval;
private final CompressionType compressionType;
private DataVersion kvDataVersion = new DataVersion();


public RocksDBConfigManager(String filePath, long memTableFlushInterval) {
public RocksDBConfigManager(String filePath, long memTableFlushInterval, CompressionType compressionType) {
this.filePath = filePath;
this.memTableFlushInterval = memTableFlushInterval;
this.compressionType = compressionType;
}

public boolean init() {
this.isStop = false;
this.configRocksDBStorage = new ConfigRocksDBStorage(filePath);
this.configRocksDBStorage = new ConfigRocksDBStorage(filePath, compressionType);
return this.configRocksDBStorage.start();
}
public boolean loadDataVersion() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.remoting.protocol.DataVersion;
import org.rocksdb.CompressionType;
import org.rocksdb.WriteBatch;

public class RocksDBConsumerOffsetManager extends ConsumerOffsetManager {
Expand All @@ -41,7 +42,9 @@ public class RocksDBConsumerOffsetManager extends ConsumerOffsetManager {

public RocksDBConsumerOffsetManager(BrokerController brokerController) {
super(brokerController);
this.rocksDBConfigManager = new RocksDBConfigManager(rocksdbConfigFilePath(), brokerController.getMessageStoreConfig().getMemTableFlushIntervalMs());
this.rocksDBConfigManager = new RocksDBConfigManager(rocksdbConfigFilePath(), brokerController.getMessageStoreConfig().getMemTableFlushIntervalMs(),
CompressionType.getCompressionType(brokerController.getMessageStoreConfig().getRocksdbCompressionType()));

}

@Override
Expand All @@ -61,10 +64,6 @@ public boolean loadConsumerOffset() {
}

private boolean merge() {
if (!brokerController.getMessageStoreConfig().isTransferOffsetJsonToRocksdb()) {
log.info("the switch transferOffsetJsonToRocksdb is off, no merge offset operation is needed.");
return true;
}
if (!UtilAll.isPathExists(this.configFilePath()) && !UtilAll.isPathExists(this.configFilePath() + ".bak")) {
log.info("consumerOffset json file does not exist, so skip merge");
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.rocketmq.common.utils.DataConverter;
import org.apache.rocketmq.remoting.protocol.DataVersion;
import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
import org.rocksdb.CompressionType;
import org.rocksdb.RocksIterator;

public class RocksDBSubscriptionGroupManager extends SubscriptionGroupManager {
Expand All @@ -40,7 +41,8 @@ public class RocksDBSubscriptionGroupManager extends SubscriptionGroupManager {

public RocksDBSubscriptionGroupManager(BrokerController brokerController) {
super(brokerController, false);
this.rocksDBConfigManager = new RocksDBConfigManager(rocksdbConfigFilePath(), brokerController.getMessageStoreConfig().getMemTableFlushIntervalMs());
this.rocksDBConfigManager = new RocksDBConfigManager(rocksdbConfigFilePath(), brokerController.getMessageStoreConfig().getMemTableFlushIntervalMs(),
CompressionType.getCompressionType(brokerController.getMessageStoreConfig().getRocksdbCompressionType()));
}

@Override
Expand Down Expand Up @@ -78,10 +80,6 @@ public boolean loadForbidden(BiConsumer<byte[], byte[]> biConsumer) {


private boolean merge() {
if (!brokerController.getMessageStoreConfig().isTransferMetadataJsonToRocksdb()) {
log.info("the switch transferMetadataJsonToRocksdb is off, no merge subGroup operation is needed.");
return true;
}
if (!UtilAll.isPathExists(this.configFilePath()) && !UtilAll.isPathExists(this.configFilePath() + ".bak")) {
log.info("subGroup json file does not exist, so skip merge");
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,16 @@
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.utils.DataConverter;
import org.apache.rocketmq.remoting.protocol.DataVersion;
import org.rocksdb.CompressionType;

public class RocksDBTopicConfigManager extends TopicConfigManager {

protected RocksDBConfigManager rocksDBConfigManager;

public RocksDBTopicConfigManager(BrokerController brokerController) {
super(brokerController, false);
this.rocksDBConfigManager = new RocksDBConfigManager(rocksdbConfigFilePath(), brokerController.getMessageStoreConfig().getMemTableFlushIntervalMs());
this.rocksDBConfigManager = new RocksDBConfigManager(rocksdbConfigFilePath(), brokerController.getMessageStoreConfig().getMemTableFlushIntervalMs(),
CompressionType.getCompressionType(brokerController.getMessageStoreConfig().getRocksdbCompressionType()));
}

@Override
Expand All @@ -59,10 +61,6 @@ public boolean loadDataVersion() {
}

private boolean merge() {
if (!brokerController.getMessageStoreConfig().isTransferMetadataJsonToRocksdb()) {
log.info("the switch transferMetadataJsonToRocksdb is off, no merge topic operation is needed.");
return true;
}
if (!UtilAll.isPathExists(this.configFilePath()) && !UtilAll.isPathExists(this.configFilePath() + ".bak")) {
log.info("topic json file does not exist, so skip merge");
return true;
Expand Down
Loading

0 comments on commit 0b24768

Please sign in to comment.