Skip to content

Commit

Permalink
[ISSUE #6545] Remove getTopicConfigs method in interface MessageStore (
Browse files Browse the repository at this point in the history
…#6531)

* change map -> lambda

* f

* fix unit test

* remove getTopicConfig function

* Update MultiDispatchTest.java

* Update CompactionStore.java

* update

* update test

* update test

* Update BatchConsumeMessageTest.java

* Update BrokerController.java

* Update BrokerController.java

* check

* Update BrokerController.java

* Update BatchConsumeMessageTest.java
  • Loading branch information
joeCarf authored Apr 15, 2023
1 parent d1b14b0 commit 3fe81bf
Show file tree
Hide file tree
Showing 25 changed files with 101 additions and 113 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -737,8 +737,7 @@ public boolean initialize() throws CloneNotSupportedException {

if (result) {
try {
DefaultMessageStore defaultMessageStore = new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener, this.brokerConfig);
defaultMessageStore.setTopicConfigTable(topicConfigManager.getTopicConfigTable());
DefaultMessageStore defaultMessageStore = new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener, this.brokerConfig, topicConfigManager.getTopicConfigTable());

if (messageStoreConfig.isEnableDLegerCommitLog()) {
DLedgerRoleChangeHandler roleChangeHandler = new DLedgerRoleChangeHandler(this, defaultMessageStore);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.MixAll;
Expand Down Expand Up @@ -159,7 +160,7 @@ public void arriving(String topic, int queueId, long logicOffset, long tagsCode,
long msgStoreTime, byte[] filterBitMap, Map<String, String> properties) {
}
}
, brokerConfig);
, brokerConfig, new ConcurrentHashMap<>());

master.getDispatcherList().addFirst(new CommitLogDispatcher() {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ public void setUp() throws Exception {

brokerConfig = new BrokerConfig();
BrokerStatsManager manager = new BrokerStatsManager(brokerConfig.getBrokerClusterName(), brokerConfig.isEnableDetailStat());
messageStore = new DefaultMessageStore(messageStoreConfig, manager, new MyMessageArrivingListener(), new BrokerConfig());
messageStore = new DefaultMessageStore(messageStoreConfig, manager, new MyMessageArrivingListener(), new BrokerConfig(), new ConcurrentHashMap<>());

assertThat(messageStore.load()).isTrue();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,16 +200,20 @@ public class DefaultMessageStore implements MessageStore {

private long stateMachineVersion = 0L;

// this is a unmodifiableMap
private ConcurrentMap<String, TopicConfig> topicConfigTable;

private final ScheduledExecutorService scheduledCleanQueueExecutorService =
Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("StoreCleanQueueScheduledThread"));

public DefaultMessageStore(final MessageStoreConfig messageStoreConfig, final BrokerStatsManager brokerStatsManager,
final MessageArrivingListener messageArrivingListener, final BrokerConfig brokerConfig) throws IOException {
final MessageArrivingListener messageArrivingListener, final BrokerConfig brokerConfig, final ConcurrentMap<String, TopicConfig> topicConfigTable) throws IOException {
this.messageArrivingListener = messageArrivingListener;
this.brokerConfig = brokerConfig;
this.messageStoreConfig = messageStoreConfig;
this.aliveReplicasNum = messageStoreConfig.getTotalReplicas();
this.brokerStatsManager = brokerStatsManager;
this.topicConfigTable = topicConfigTable;
this.allocateMappedFileService = new AllocateMappedFileService(this);
if (messageStoreConfig.isEnableDLegerCommitLog()) {
this.commitLog = new DLedgerCommitLog(this);
Expand Down Expand Up @@ -2047,18 +2051,16 @@ public void assignOffset(MessageExtBrokerInner msg, short messageNum) {
}
}

@Override
public ConcurrentMap<String, TopicConfig> getTopicConfigs() {
return this.consumeQueueStore.getTopicConfigs();
return this.topicConfigTable;
}

@Override
public Optional<TopicConfig> getTopicConfig(String topic) {
return this.consumeQueueStore.getTopicConfig(topic);
}
if (this.topicConfigTable == null) {
return Optional.empty();
}

public void setTopicConfigTable(ConcurrentMap<String, TopicConfig> topicConfigTable) {
this.consumeQueueStore.setTopicConfigTable(topicConfigTable);
return Optional.ofNullable(this.topicConfigTable.get(topic));
}

public BrokerIdentity getBrokerIdentity() {
Expand Down
18 changes: 0 additions & 18 deletions store/src/main/java/org/apache/rocketmq/store/MessageStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,12 @@
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;

import org.apache.rocketmq.common.Pair;
import org.apache.rocketmq.common.SystemClock;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageExtBatch;
import org.apache.rocketmq.common.message.MessageExtBrokerInner;
Expand Down Expand Up @@ -736,21 +733,6 @@ void onCommitLogDispatch(DispatchRequest dispatchRequest, boolean doDispatch, Ma
*/
void assignOffset(MessageExtBrokerInner msg, short messageNum);

/**
* get all topic config
*
* @return all topic config info
*/
Map<String, TopicConfig> getTopicConfigs();

/**
* get topic config
*
* @param topic topic name
* @return topic config info
*/
Optional<TopicConfig> getTopicConfig(String topic);

/**
* Get master broker message store in process in broker container
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@
import org.apache.rocketmq.common.utils.CleanupPolicyUtils;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.store.DefaultMessageStore;
import org.apache.rocketmq.store.DispatchRequest;
import org.apache.rocketmq.store.GetMessageResult;
import org.apache.rocketmq.store.MessageStore;
import org.apache.rocketmq.store.SelectMappedBufferResult;
import org.apache.rocketmq.store.config.MessageStoreConfig;

Expand All @@ -53,7 +53,7 @@ public class CompactionStore {
private final String compactionPath;
private final String compactionLogPath;
private final String compactionCqPath;
private final MessageStore defaultMessageStore;
private final DefaultMessageStore defaultMessageStore;
private final CompactionPositionMgr positionMgr;
private final ConcurrentHashMap<String, CompactionLog> compactionLogTable;
private final ScheduledExecutorService compactionSchedule;
Expand All @@ -65,7 +65,7 @@ public class CompactionStore {

private static final Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);

public CompactionStore(MessageStore defaultMessageStore) {
public CompactionStore(DefaultMessageStore defaultMessageStore) {
this.defaultMessageStore = defaultMessageStore;
this.compactionLogTable = new ConcurrentHashMap<>();
MessageStoreConfig config = defaultMessageStore.getMessageStoreConfig();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,12 @@
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;

import org.apache.rocketmq.common.Pair;
import org.apache.rocketmq.common.SystemClock;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageExtBatch;
import org.apache.rocketmq.common.message.MessageExtBrokerInner;
Expand Down Expand Up @@ -595,16 +592,6 @@ public void assignOffset(MessageExtBrokerInner msg, short messageNum) {
next.assignOffset(msg, messageNum);
}

@Override
public Map<String, TopicConfig> getTopicConfigs() {
return next.getTopicConfigs();
}

@Override
public Optional<TopicConfig> getTopicConfig(String topic) {
return next.getTopicConfig(topic);
}

@Override
public List<PutMessageHook> getPutMessageHookList() {
return next.getPutMessageHookList();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,20 +64,12 @@ public class ConsumeQueueStore {
protected final QueueOffsetAssigner queueOffsetAssigner = new QueueOffsetAssigner();
protected final ConcurrentMap<String/* topic */, ConcurrentMap<Integer/* queueId */, ConsumeQueueInterface>> consumeQueueTable;

// Should be careful, do not change the topic config
// TopicConfigManager is more suitable here.
private ConcurrentMap<String, TopicConfig> topicConfigTable;

public ConsumeQueueStore(DefaultMessageStore messageStore, MessageStoreConfig messageStoreConfig) {
this.messageStore = messageStore;
this.messageStoreConfig = messageStoreConfig;
this.consumeQueueTable = new ConcurrentHashMap<>(32);
}

public void setTopicConfigTable(ConcurrentMap<String, TopicConfig> topicConfigTable) {
this.topicConfigTable = topicConfigTable;
}

private FileQueueLifeCycle getLifeCycle(String topic, int queueId) {
return findOrCreateConsumeQueue(topic, queueId);
}
Expand Down Expand Up @@ -173,9 +165,9 @@ private ConsumeQueueInterface createConsumeQueueByType(CQType cqType, String top
}

private void queueTypeShouldBe(String topic, CQType cqTypeExpected) {
TopicConfig topicConfig = this.topicConfigTable == null ? null : this.topicConfigTable.get(topic);
Optional<TopicConfig> topicConfig = this.messageStore.getTopicConfig(topic);

CQType cqTypeActual = QueueTypeUtils.getCQType(Optional.ofNullable(topicConfig));
CQType cqTypeActual = QueueTypeUtils.getCQType(topicConfig);

if (!Objects.equals(cqTypeExpected, cqTypeActual)) {
throw new RuntimeException(format("The queue type of topic: %s should be %s, but is %s", topic, cqTypeExpected, cqTypeActual));
Expand Down Expand Up @@ -341,7 +333,7 @@ private ConsumeQueueInterface doFindOrCreateConsumeQueue(String topic, int queue

ConsumeQueueInterface newLogic;

Optional<TopicConfig> topicConfig = this.getTopicConfig(topic);
Optional<TopicConfig> topicConfig = this.messageStore.getTopicConfig(topic);
// TODO maybe the topic has been deleted.
if (Objects.equals(CQType.BatchCQ, QueueTypeUtils.getCQType(topicConfig))) {
newLogic = new BatchConsumeQueue(
Expand Down Expand Up @@ -537,18 +529,6 @@ public void truncateDirty(long phyOffset) {
}
}

public ConcurrentMap<String, TopicConfig> getTopicConfigs() {
return this.topicConfigTable;
}

public Optional<TopicConfig> getTopicConfig(String topic) {
if (this.topicConfigTable == null) {
return Optional.empty();
}

return Optional.ofNullable(this.topicConfigTable.get(topic));
}

public long getTotalSize() {
long totalSize = 0;
for (ConcurrentMap<Integer, ConsumeQueueInterface> maps : this.consumeQueueTable.values()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.List;
import java.util.Set;

import java.util.concurrent.ConcurrentHashMap;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.message.Message;
Expand Down Expand Up @@ -55,7 +56,7 @@ public void init() throws Exception {
messageStoreConfig.setStorePathRootDir(System.getProperty("java.io.tmpdir") + File.separator + "unitteststore");
messageStoreConfig.setStorePathCommitLog(System.getProperty("java.io.tmpdir") + File.separator + "unitteststore" + File.separator + "commitlog");
//too much reference
DefaultMessageStore messageStore = new DefaultMessageStore(messageStoreConfig, null, null, new BrokerConfig());
DefaultMessageStore messageStore = new DefaultMessageStore(messageStoreConfig, null, null, new BrokerConfig(), new ConcurrentHashMap<>());
CommitLog commitLog = new CommitLog(messageStore);
callback = commitLog.new DefaultAppendMessageCallback();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.message.Message;
Expand Down Expand Up @@ -78,7 +79,7 @@ private MessageStore buildMessageStore() throws Exception {
messageStoreConfig.setStorePathCommitLog(System.getProperty("java.io.tmpdir") + File.separator
+ "putmessagesteststore" + File.separator + "commitlog");
messageStoreConfig.setHaListenPort(0);
return new DefaultMessageStore(messageStoreConfig, new BrokerStatsManager("simpleTest", true), new MyMessageArrivingListener(), new BrokerConfig());
return new DefaultMessageStore(messageStoreConfig, new BrokerStatsManager("simpleTest", true), new MyMessageArrivingListener(), new BrokerConfig(), new ConcurrentHashMap<>());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.net.SocketAddress;
import java.net.UnknownHostException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;

Expand Down Expand Up @@ -151,7 +152,7 @@ public void arriving(String topic, int queueId, long logicOffset, long tagsCode,
long msgStoreTime, byte[] filterBitMap, Map<String, String> properties) {
}
}
, brokerConfig);
, brokerConfig, new ConcurrentHashMap<>());

assertThat(master.load()).isTrue();

Expand Down Expand Up @@ -179,7 +180,7 @@ public void arriving(String topic, int queueId, long logicOffset, long tagsCode,
long msgStoreTime, byte[] filterBitMap, Map<String, String> properties) {
}
}
, brokerConfig);
, brokerConfig, new ConcurrentHashMap<>());

assertThat(master.load()).isTrue();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.rocketmq.store;

import java.util.concurrent.ConcurrentHashMap;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.UtilAll;
Expand Down Expand Up @@ -483,7 +484,7 @@ private MessageStoreConfig genMessageStoreConfig(String deleteWhen, int diskMaxU

private void initMessageStore(MessageStoreConfig messageStoreConfig, double diskSpaceCleanForciblyRatio) throws Exception {
messageStore = new DefaultMessageStore(messageStoreConfig,
new BrokerStatsManager("test", true), new MyMessageArrivingListener(), new BrokerConfig());
new BrokerStatsManager("test", true), new MyMessageArrivingListener(), new BrokerConfig(), new ConcurrentHashMap<>());

cleanCommitLogService = getCleanCommitLogService();
cleanConsumeQueueService = getCleanConsumeQueueService();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.rocketmq.store;

import java.io.File;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.store.config.FlushDiskType;
Expand Down Expand Up @@ -74,7 +75,7 @@ public DefaultMessageStore buildMessageStore() throws Exception {
String storeRootPath = System.getProperty("java.io.tmpdir") + File.separator + "store";
messageStoreConfig.setStorePathRootDir(storeRootPath);
messageStoreConfig.setHaListenPort(0);
return new DefaultMessageStore(messageStoreConfig, new BrokerStatsManager("simpleTest", true), null, new BrokerConfig());
return new DefaultMessageStore(messageStoreConfig, new BrokerStatsManager("simpleTest", true), null, new BrokerConfig(), new ConcurrentHashMap<>());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ public void test_repeat_restart() throws Exception {
messageStoreConfig.setMaxIndexNum(100 * 10);
messageStoreConfig.setStorePathRootDir(System.getProperty("java.io.tmpdir") + File.separator + "store");
messageStoreConfig.setHaListenPort(0);
MessageStore master = new DefaultMessageStore(messageStoreConfig, null, new MyMessageArrivingListener(), new BrokerConfig());
MessageStore master = new DefaultMessageStore(messageStoreConfig, null, new MyMessageArrivingListener(), new BrokerConfig(), new ConcurrentHashMap<>());

boolean load = master.load();
assertTrue(load);
Expand Down Expand Up @@ -144,7 +144,7 @@ private MessageStore buildMessageStore(String storePathRootDir) throws Exception
return new DefaultMessageStore(messageStoreConfig,
new BrokerStatsManager("simpleTest", true),
new MyMessageArrivingListener(),
new BrokerConfig());
new BrokerConfig(), new ConcurrentHashMap<>());
}

@Test
Expand Down
3 changes: 2 additions & 1 deletion store/src/test/java/org/apache/rocketmq/store/HATest.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.time.Duration;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.MixAll;
Expand Down Expand Up @@ -250,7 +251,7 @@ public void destroy() throws Exception {
private MessageStore buildMessageStore(MessageStoreConfig messageStoreConfig, long brokerId) throws Exception {
BrokerConfig brokerConfig = new BrokerConfig();
brokerConfig.setBrokerId(brokerId);
return new DefaultMessageStore(messageStoreConfig, brokerStatsManager, null, brokerConfig);
return new DefaultMessageStore(messageStoreConfig, brokerStatsManager, null, brokerConfig, new ConcurrentHashMap<>());
}

private void buildMessageStoreConfig(MessageStoreConfig messageStoreConfig) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.net.InetSocketAddress;
import java.nio.charset.Charset;

import java.util.concurrent.ConcurrentHashMap;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.message.MessageConst;
Expand Down Expand Up @@ -57,7 +58,7 @@ public void init() throws Exception {
messageStoreConfig.setEnableMultiDispatch(true);
BrokerConfig brokerConfig = new BrokerConfig();
//too much reference
messageStore = new DefaultMessageStore(messageStoreConfig, null, null, brokerConfig);
messageStore = new DefaultMessageStore(messageStoreConfig, null, null, brokerConfig, new ConcurrentHashMap<>());
consumeQueue = new ConsumeQueue("xxx", 0,
getStorePathConsumeQueue(messageStoreConfig.getStorePathRootDir()), messageStoreConfig.getMappedFileSizeConsumeQueue(), messageStore);
}
Expand Down
Loading

0 comments on commit 3fe81bf

Please sign in to comment.