diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java index 70e59e098de..a35618dc0a5 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java @@ -737,8 +737,7 @@ public boolean initialize() throws CloneNotSupportedException { if (result) { try { - DefaultMessageStore defaultMessageStore = new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener, this.brokerConfig); - defaultMessageStore.setTopicConfigTable(topicConfigManager.getTopicConfigTable()); + DefaultMessageStore defaultMessageStore = new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener, this.brokerConfig, topicConfigManager.getTopicConfigTable()); if (messageStoreConfig.isEnableDLegerCommitLog()) { DLedgerRoleChangeHandler roleChangeHandler = new DLedgerRoleChangeHandler(this, defaultMessageStore); diff --git a/broker/src/test/java/org/apache/rocketmq/broker/filter/MessageStoreWithFilterTest.java b/broker/src/test/java/org/apache/rocketmq/broker/filter/MessageStoreWithFilterTest.java index 678c5079df3..84bca916998 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/filter/MessageStoreWithFilterTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/filter/MessageStoreWithFilterTest.java @@ -26,6 +26,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import org.apache.rocketmq.common.BrokerConfig; import org.apache.rocketmq.common.MixAll; @@ -159,7 +160,7 @@ public void arriving(String topic, int queueId, long logicOffset, long tagsCode, long msgStoreTime, byte[] filterBitMap, Map properties) { } } - , brokerConfig); + , brokerConfig, new ConcurrentHashMap<>()); master.getDispatcherList().addFirst(new CommitLogDispatcher() { @Override diff --git a/broker/src/test/java/org/apache/rocketmq/broker/schedule/ScheduleMessageServiceTest.java b/broker/src/test/java/org/apache/rocketmq/broker/schedule/ScheduleMessageServiceTest.java index d3f6753d23e..b90fb2931d5 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/schedule/ScheduleMessageServiceTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/schedule/ScheduleMessageServiceTest.java @@ -119,7 +119,7 @@ public void setUp() throws Exception { brokerConfig = new BrokerConfig(); BrokerStatsManager manager = new BrokerStatsManager(brokerConfig.getBrokerClusterName(), brokerConfig.isEnableDetailStat()); - messageStore = new DefaultMessageStore(messageStoreConfig, manager, new MyMessageArrivingListener(), new BrokerConfig()); + messageStore = new DefaultMessageStore(messageStoreConfig, manager, new MyMessageArrivingListener(), new BrokerConfig(), new ConcurrentHashMap<>()); assertThat(messageStore.load()).isTrue(); diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java index e51132bbf23..73b0f42e587 100644 --- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java @@ -200,16 +200,20 @@ public class DefaultMessageStore implements MessageStore { private long stateMachineVersion = 0L; + // this is a unmodifiableMap + private ConcurrentMap topicConfigTable; + private final ScheduledExecutorService scheduledCleanQueueExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("StoreCleanQueueScheduledThread")); public DefaultMessageStore(final MessageStoreConfig messageStoreConfig, final BrokerStatsManager brokerStatsManager, - final MessageArrivingListener messageArrivingListener, final BrokerConfig brokerConfig) throws IOException { + final MessageArrivingListener messageArrivingListener, final BrokerConfig brokerConfig, final ConcurrentMap topicConfigTable) throws IOException { this.messageArrivingListener = messageArrivingListener; this.brokerConfig = brokerConfig; this.messageStoreConfig = messageStoreConfig; this.aliveReplicasNum = messageStoreConfig.getTotalReplicas(); this.brokerStatsManager = brokerStatsManager; + this.topicConfigTable = topicConfigTable; this.allocateMappedFileService = new AllocateMappedFileService(this); if (messageStoreConfig.isEnableDLegerCommitLog()) { this.commitLog = new DLedgerCommitLog(this); @@ -2047,18 +2051,16 @@ public void assignOffset(MessageExtBrokerInner msg, short messageNum) { } } - @Override public ConcurrentMap getTopicConfigs() { - return this.consumeQueueStore.getTopicConfigs(); + return this.topicConfigTable; } - @Override public Optional getTopicConfig(String topic) { - return this.consumeQueueStore.getTopicConfig(topic); - } + if (this.topicConfigTable == null) { + return Optional.empty(); + } - public void setTopicConfigTable(ConcurrentMap topicConfigTable) { - this.consumeQueueStore.setTopicConfigTable(topicConfigTable); + return Optional.ofNullable(this.topicConfigTable.get(topic)); } public BrokerIdentity getBrokerIdentity() { diff --git a/store/src/main/java/org/apache/rocketmq/store/MessageStore.java b/store/src/main/java/org/apache/rocketmq/store/MessageStore.java index f77739fc475..a7da245551e 100644 --- a/store/src/main/java/org/apache/rocketmq/store/MessageStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/MessageStore.java @@ -25,15 +25,12 @@ import java.util.HashMap; import java.util.LinkedList; import java.util.List; -import java.util.Map; -import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.function.Supplier; import org.apache.rocketmq.common.Pair; import org.apache.rocketmq.common.SystemClock; -import org.apache.rocketmq.common.TopicConfig; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageExtBatch; import org.apache.rocketmq.common.message.MessageExtBrokerInner; @@ -736,21 +733,6 @@ void onCommitLogDispatch(DispatchRequest dispatchRequest, boolean doDispatch, Ma */ void assignOffset(MessageExtBrokerInner msg, short messageNum); - /** - * get all topic config - * - * @return all topic config info - */ - Map getTopicConfigs(); - - /** - * get topic config - * - * @param topic topic name - * @return topic config info - */ - Optional getTopicConfig(String topic); - /** * Get master broker message store in process in broker container * diff --git a/store/src/main/java/org/apache/rocketmq/store/kv/CompactionStore.java b/store/src/main/java/org/apache/rocketmq/store/kv/CompactionStore.java index b6d73b81c79..3ba37df9571 100644 --- a/store/src/main/java/org/apache/rocketmq/store/kv/CompactionStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/kv/CompactionStore.java @@ -29,9 +29,9 @@ import org.apache.rocketmq.common.utils.CleanupPolicyUtils; import org.apache.rocketmq.logging.org.slf4j.Logger; import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; +import org.apache.rocketmq.store.DefaultMessageStore; import org.apache.rocketmq.store.DispatchRequest; import org.apache.rocketmq.store.GetMessageResult; -import org.apache.rocketmq.store.MessageStore; import org.apache.rocketmq.store.SelectMappedBufferResult; import org.apache.rocketmq.store.config.MessageStoreConfig; @@ -53,7 +53,7 @@ public class CompactionStore { private final String compactionPath; private final String compactionLogPath; private final String compactionCqPath; - private final MessageStore defaultMessageStore; + private final DefaultMessageStore defaultMessageStore; private final CompactionPositionMgr positionMgr; private final ConcurrentHashMap compactionLogTable; private final ScheduledExecutorService compactionSchedule; @@ -65,7 +65,7 @@ public class CompactionStore { private static final Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME); - public CompactionStore(MessageStore defaultMessageStore) { + public CompactionStore(DefaultMessageStore defaultMessageStore) { this.defaultMessageStore = defaultMessageStore; this.compactionLogTable = new ConcurrentHashMap<>(); MessageStoreConfig config = defaultMessageStore.getMessageStoreConfig(); diff --git a/store/src/main/java/org/apache/rocketmq/store/plugin/AbstractPluginMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/plugin/AbstractPluginMessageStore.java index 3f43adc12d8..89c3e53b6bf 100644 --- a/store/src/main/java/org/apache/rocketmq/store/plugin/AbstractPluginMessageStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/plugin/AbstractPluginMessageStore.java @@ -26,15 +26,12 @@ import java.util.HashMap; import java.util.LinkedList; import java.util.List; -import java.util.Map; -import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.function.Supplier; import org.apache.rocketmq.common.Pair; import org.apache.rocketmq.common.SystemClock; -import org.apache.rocketmq.common.TopicConfig; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageExtBatch; import org.apache.rocketmq.common.message.MessageExtBrokerInner; @@ -595,16 +592,6 @@ public void assignOffset(MessageExtBrokerInner msg, short messageNum) { next.assignOffset(msg, messageNum); } - @Override - public Map getTopicConfigs() { - return next.getTopicConfigs(); - } - - @Override - public Optional getTopicConfig(String topic) { - return next.getTopicConfig(topic); - } - @Override public List getPutMessageHookList() { return next.getPutMessageHookList(); diff --git a/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java b/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java index 90f2e74aadb..151c36f168b 100644 --- a/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java @@ -64,20 +64,12 @@ public class ConsumeQueueStore { protected final QueueOffsetAssigner queueOffsetAssigner = new QueueOffsetAssigner(); protected final ConcurrentMap> consumeQueueTable; - // Should be careful, do not change the topic config - // TopicConfigManager is more suitable here. - private ConcurrentMap topicConfigTable; - public ConsumeQueueStore(DefaultMessageStore messageStore, MessageStoreConfig messageStoreConfig) { this.messageStore = messageStore; this.messageStoreConfig = messageStoreConfig; this.consumeQueueTable = new ConcurrentHashMap<>(32); } - public void setTopicConfigTable(ConcurrentMap topicConfigTable) { - this.topicConfigTable = topicConfigTable; - } - private FileQueueLifeCycle getLifeCycle(String topic, int queueId) { return (FileQueueLifeCycle) findOrCreateConsumeQueue(topic, queueId); } @@ -173,9 +165,9 @@ private ConsumeQueueInterface createConsumeQueueByType(CQType cqType, String top } private void queueTypeShouldBe(String topic, CQType cqTypeExpected) { - TopicConfig topicConfig = this.topicConfigTable == null ? null : this.topicConfigTable.get(topic); + Optional topicConfig = this.messageStore.getTopicConfig(topic); - CQType cqTypeActual = QueueTypeUtils.getCQType(Optional.ofNullable(topicConfig)); + CQType cqTypeActual = QueueTypeUtils.getCQType(topicConfig); if (!Objects.equals(cqTypeExpected, cqTypeActual)) { throw new RuntimeException(format("The queue type of topic: %s should be %s, but is %s", topic, cqTypeExpected, cqTypeActual)); @@ -341,7 +333,7 @@ private ConsumeQueueInterface doFindOrCreateConsumeQueue(String topic, int queue ConsumeQueueInterface newLogic; - Optional topicConfig = this.getTopicConfig(topic); + Optional topicConfig = this.messageStore.getTopicConfig(topic); // TODO maybe the topic has been deleted. if (Objects.equals(CQType.BatchCQ, QueueTypeUtils.getCQType(topicConfig))) { newLogic = new BatchConsumeQueue( @@ -537,18 +529,6 @@ public void truncateDirty(long phyOffset) { } } - public ConcurrentMap getTopicConfigs() { - return this.topicConfigTable; - } - - public Optional getTopicConfig(String topic) { - if (this.topicConfigTable == null) { - return Optional.empty(); - } - - return Optional.ofNullable(this.topicConfigTable.get(topic)); - } - public long getTotalSize() { long totalSize = 0; for (ConcurrentMap maps : this.consumeQueueTable.values()) { diff --git a/store/src/test/java/org/apache/rocketmq/store/AppendCallbackTest.java b/store/src/test/java/org/apache/rocketmq/store/AppendCallbackTest.java index dc1af78b3fd..87bfe85da23 100644 --- a/store/src/test/java/org/apache/rocketmq/store/AppendCallbackTest.java +++ b/store/src/test/java/org/apache/rocketmq/store/AppendCallbackTest.java @@ -25,6 +25,7 @@ import java.util.List; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import org.apache.rocketmq.common.BrokerConfig; import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.message.Message; @@ -55,7 +56,7 @@ public void init() throws Exception { messageStoreConfig.setStorePathRootDir(System.getProperty("java.io.tmpdir") + File.separator + "unitteststore"); messageStoreConfig.setStorePathCommitLog(System.getProperty("java.io.tmpdir") + File.separator + "unitteststore" + File.separator + "commitlog"); //too much reference - DefaultMessageStore messageStore = new DefaultMessageStore(messageStoreConfig, null, null, new BrokerConfig()); + DefaultMessageStore messageStore = new DefaultMessageStore(messageStoreConfig, null, null, new BrokerConfig(), new ConcurrentHashMap<>()); CommitLog commitLog = new CommitLog(messageStore); callback = commitLog.new DefaultAppendMessageCallback(); } diff --git a/store/src/test/java/org/apache/rocketmq/store/BatchPutMessageTest.java b/store/src/test/java/org/apache/rocketmq/store/BatchPutMessageTest.java index 8332f38c3c5..43ca38eb484 100644 --- a/store/src/test/java/org/apache/rocketmq/store/BatchPutMessageTest.java +++ b/store/src/test/java/org/apache/rocketmq/store/BatchPutMessageTest.java @@ -25,6 +25,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import org.apache.rocketmq.common.BrokerConfig; import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.message.Message; @@ -78,7 +79,7 @@ private MessageStore buildMessageStore() throws Exception { messageStoreConfig.setStorePathCommitLog(System.getProperty("java.io.tmpdir") + File.separator + "putmessagesteststore" + File.separator + "commitlog"); messageStoreConfig.setHaListenPort(0); - return new DefaultMessageStore(messageStoreConfig, new BrokerStatsManager("simpleTest", true), new MyMessageArrivingListener(), new BrokerConfig()); + return new DefaultMessageStore(messageStoreConfig, new BrokerStatsManager("simpleTest", true), new MyMessageArrivingListener(), new BrokerConfig(), new ConcurrentHashMap<>()); } @Test diff --git a/store/src/test/java/org/apache/rocketmq/store/ConsumeQueueTest.java b/store/src/test/java/org/apache/rocketmq/store/ConsumeQueueTest.java index d80a6f25fac..2e08369bde9 100644 --- a/store/src/test/java/org/apache/rocketmq/store/ConsumeQueueTest.java +++ b/store/src/test/java/org/apache/rocketmq/store/ConsumeQueueTest.java @@ -24,6 +24,7 @@ import java.net.SocketAddress; import java.net.UnknownHostException; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; @@ -151,7 +152,7 @@ public void arriving(String topic, int queueId, long logicOffset, long tagsCode, long msgStoreTime, byte[] filterBitMap, Map properties) { } } - , brokerConfig); + , brokerConfig, new ConcurrentHashMap<>()); assertThat(master.load()).isTrue(); @@ -179,7 +180,7 @@ public void arriving(String topic, int queueId, long logicOffset, long tagsCode, long msgStoreTime, byte[] filterBitMap, Map properties) { } } - , brokerConfig); + , brokerConfig, new ConcurrentHashMap<>()); assertThat(master.load()).isTrue(); diff --git a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreCleanFilesTest.java b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreCleanFilesTest.java index 601d50c0f52..083aabc48b3 100644 --- a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreCleanFilesTest.java +++ b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreCleanFilesTest.java @@ -17,6 +17,7 @@ package org.apache.rocketmq.store; +import java.util.concurrent.ConcurrentHashMap; import org.apache.rocketmq.common.BrokerConfig; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.UtilAll; @@ -483,7 +484,7 @@ private MessageStoreConfig genMessageStoreConfig(String deleteWhen, int diskMaxU private void initMessageStore(MessageStoreConfig messageStoreConfig, double diskSpaceCleanForciblyRatio) throws Exception { messageStore = new DefaultMessageStore(messageStoreConfig, - new BrokerStatsManager("test", true), new MyMessageArrivingListener(), new BrokerConfig()); + new BrokerStatsManager("test", true), new MyMessageArrivingListener(), new BrokerConfig(), new ConcurrentHashMap<>()); cleanCommitLogService = getCleanCommitLogService(); cleanConsumeQueueService = getCleanConsumeQueueService(); diff --git a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreShutDownTest.java b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreShutDownTest.java index 7329098a38e..515a4845a4e 100644 --- a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreShutDownTest.java +++ b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreShutDownTest.java @@ -18,6 +18,7 @@ package org.apache.rocketmq.store; import java.io.File; +import java.util.concurrent.ConcurrentHashMap; import org.apache.rocketmq.common.BrokerConfig; import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.store.config.FlushDiskType; @@ -74,7 +75,7 @@ public DefaultMessageStore buildMessageStore() throws Exception { String storeRootPath = System.getProperty("java.io.tmpdir") + File.separator + "store"; messageStoreConfig.setStorePathRootDir(storeRootPath); messageStoreConfig.setHaListenPort(0); - return new DefaultMessageStore(messageStoreConfig, new BrokerStatsManager("simpleTest", true), null, new BrokerConfig()); + return new DefaultMessageStore(messageStoreConfig, new BrokerStatsManager("simpleTest", true), null, new BrokerConfig(), new ConcurrentHashMap<>()); } } diff --git a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java index 46751571666..2f22de4d110 100644 --- a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java +++ b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java @@ -99,7 +99,7 @@ public void test_repeat_restart() throws Exception { messageStoreConfig.setMaxIndexNum(100 * 10); messageStoreConfig.setStorePathRootDir(System.getProperty("java.io.tmpdir") + File.separator + "store"); messageStoreConfig.setHaListenPort(0); - MessageStore master = new DefaultMessageStore(messageStoreConfig, null, new MyMessageArrivingListener(), new BrokerConfig()); + MessageStore master = new DefaultMessageStore(messageStoreConfig, null, new MyMessageArrivingListener(), new BrokerConfig(), new ConcurrentHashMap<>()); boolean load = master.load(); assertTrue(load); @@ -144,7 +144,7 @@ private MessageStore buildMessageStore(String storePathRootDir) throws Exception return new DefaultMessageStore(messageStoreConfig, new BrokerStatsManager("simpleTest", true), new MyMessageArrivingListener(), - new BrokerConfig()); + new BrokerConfig(), new ConcurrentHashMap<>()); } @Test diff --git a/store/src/test/java/org/apache/rocketmq/store/HATest.java b/store/src/test/java/org/apache/rocketmq/store/HATest.java index e1dc16bdc13..38a04358174 100644 --- a/store/src/test/java/org/apache/rocketmq/store/HATest.java +++ b/store/src/test/java/org/apache/rocketmq/store/HATest.java @@ -27,6 +27,7 @@ import java.time.Duration; import java.util.UUID; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; import org.apache.rocketmq.common.BrokerConfig; import org.apache.rocketmq.common.MixAll; @@ -250,7 +251,7 @@ public void destroy() throws Exception { private MessageStore buildMessageStore(MessageStoreConfig messageStoreConfig, long brokerId) throws Exception { BrokerConfig brokerConfig = new BrokerConfig(); brokerConfig.setBrokerId(brokerId); - return new DefaultMessageStore(messageStoreConfig, brokerStatsManager, null, brokerConfig); + return new DefaultMessageStore(messageStoreConfig, brokerStatsManager, null, brokerConfig, new ConcurrentHashMap<>()); } private void buildMessageStoreConfig(MessageStoreConfig messageStoreConfig) { diff --git a/store/src/test/java/org/apache/rocketmq/store/MultiDispatchTest.java b/store/src/test/java/org/apache/rocketmq/store/MultiDispatchTest.java index daa17eef88e..3ae4b2be565 100644 --- a/store/src/test/java/org/apache/rocketmq/store/MultiDispatchTest.java +++ b/store/src/test/java/org/apache/rocketmq/store/MultiDispatchTest.java @@ -21,6 +21,7 @@ import java.net.InetSocketAddress; import java.nio.charset.Charset; +import java.util.concurrent.ConcurrentHashMap; import org.apache.rocketmq.common.BrokerConfig; import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.message.MessageConst; @@ -57,7 +58,7 @@ public void init() throws Exception { messageStoreConfig.setEnableMultiDispatch(true); BrokerConfig brokerConfig = new BrokerConfig(); //too much reference - messageStore = new DefaultMessageStore(messageStoreConfig, null, null, brokerConfig); + messageStore = new DefaultMessageStore(messageStoreConfig, null, null, brokerConfig, new ConcurrentHashMap<>()); consumeQueue = new ConsumeQueue("xxx", 0, getStorePathConsumeQueue(messageStoreConfig.getStorePathRootDir()), messageStoreConfig.getMappedFileSizeConsumeQueue(), messageStore); } diff --git a/store/src/test/java/org/apache/rocketmq/store/dledger/DLedgerMultiPathTest.java b/store/src/test/java/org/apache/rocketmq/store/dledger/DLedgerMultiPathTest.java index ebeba5013a9..5eb83207322 100644 --- a/store/src/test/java/org/apache/rocketmq/store/dledger/DLedgerMultiPathTest.java +++ b/store/src/test/java/org/apache/rocketmq/store/dledger/DLedgerMultiPathTest.java @@ -21,6 +21,7 @@ import java.time.Duration; import java.util.Objects; import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; import org.apache.rocketmq.common.BrokerConfig; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.store.DefaultMessageStore; @@ -107,7 +108,7 @@ protected DefaultMessageStore createDLedgerMessageStore(String base, String grou storeConfig.setdLegerSelfId(selfId); DefaultMessageStore defaultMessageStore = new DefaultMessageStore(storeConfig, new BrokerStatsManager("DLedgerCommitLogTest", true), (topic, queueId, logicOffset, tagsCode, msgStoreTime, filterBitMap, properties) -> { - }, new BrokerConfig()); + }, new BrokerConfig(), new ConcurrentHashMap<>()); Assert.assertTrue(defaultMessageStore.load()); defaultMessageStore.start(); return defaultMessageStore; diff --git a/store/src/test/java/org/apache/rocketmq/store/dledger/MessageStoreTestBase.java b/store/src/test/java/org/apache/rocketmq/store/dledger/MessageStoreTestBase.java index 3ae0cb64eca..a21806ffcf6 100644 --- a/store/src/test/java/org/apache/rocketmq/store/dledger/MessageStoreTestBase.java +++ b/store/src/test/java/org/apache/rocketmq/store/dledger/MessageStoreTestBase.java @@ -21,6 +21,7 @@ import java.io.File; import java.net.UnknownHostException; import java.util.Arrays; +import java.util.concurrent.ConcurrentHashMap; import org.apache.rocketmq.common.BrokerConfig; import org.apache.rocketmq.common.message.MessageDecoder; import org.apache.rocketmq.common.message.MessageExt; @@ -60,7 +61,7 @@ protected DefaultMessageStore createDledgerMessageStore(String base, String grou storeConfig.setRecheckReputOffsetFromCq(true); DefaultMessageStore defaultMessageStore = new DefaultMessageStore(storeConfig, new BrokerStatsManager("DLedgerCommitlogTest", true), (topic, queueId, logicOffset, tagsCode, msgStoreTime, filterBitMap, properties) -> { - }, new BrokerConfig()); + }, new BrokerConfig(), new ConcurrentHashMap<>()); DLedgerServer dLegerServer = ((DLedgerCommitLog) defaultMessageStore.getCommitLog()).getdLedgerServer(); if (leaderId != null) { dLegerServer.getdLedgerConfig().setEnableLeaderElector(false); @@ -109,7 +110,7 @@ protected DefaultMessageStore createMessageStore(String base, boolean createAbor storeConfig.setFlushDiskType(FlushDiskType.ASYNC_FLUSH); DefaultMessageStore defaultMessageStore = new DefaultMessageStore(storeConfig, new BrokerStatsManager("CommitlogTest", true), (topic, queueId, logicOffset, tagsCode, msgStoreTime, filterBitMap, properties) -> { - }, new BrokerConfig()); + }, new BrokerConfig(), new ConcurrentHashMap<>()); if (createAbort) { String fileName = StorePathConfigHelper.getAbortFile(storeConfig.getStorePathRootDir()); diff --git a/store/src/test/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHATest.java b/store/src/test/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHATest.java index 3c4e7af8d57..6d105289f05 100644 --- a/store/src/test/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHATest.java +++ b/store/src/test/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHATest.java @@ -17,6 +17,7 @@ package org.apache.rocketmq.store.ha.autoswitch; +import java.util.concurrent.ConcurrentHashMap; import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.common.BrokerConfig; import org.apache.rocketmq.common.MixAll; @@ -497,7 +498,7 @@ private DefaultMessageStore buildMessageStore(MessageStoreConfig messageStoreCon BrokerConfig brokerConfig = new BrokerConfig(); brokerConfig.setBrokerId(brokerId); brokerConfig.setEnableControllerMode(true); - return new DefaultMessageStore(messageStoreConfig, brokerStatsManager, null, brokerConfig); + return new DefaultMessageStore(messageStoreConfig, brokerStatsManager, null, brokerConfig, new ConcurrentHashMap<>()); } private void buildMessageStoreConfig(MessageStoreConfig messageStoreConfig, int mappedFileSize) { diff --git a/store/src/test/java/org/apache/rocketmq/store/queue/BatchConsumeMessageTest.java b/store/src/test/java/org/apache/rocketmq/store/queue/BatchConsumeMessageTest.java index 10c2454afa0..e3ac1b6bdac 100644 --- a/store/src/test/java/org/apache/rocketmq/store/queue/BatchConsumeMessageTest.java +++ b/store/src/test/java/org/apache/rocketmq/store/queue/BatchConsumeMessageTest.java @@ -27,6 +27,9 @@ import java.util.Queue; import java.util.Random; import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import org.apache.rocketmq.common.TopicConfig; import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.attribute.CQType; import org.apache.rocketmq.common.message.MessageAccessor; @@ -37,10 +40,10 @@ import org.apache.rocketmq.common.sysflag.MessageSysFlag; import org.apache.rocketmq.common.utils.QueueTypeUtils; import org.apache.rocketmq.store.ConsumeQueueExt; +import org.apache.rocketmq.store.DefaultMessageStore; import org.apache.rocketmq.store.GetMessageResult; import org.apache.rocketmq.store.GetMessageStatus; import org.apache.rocketmq.store.MessageFilter; -import org.apache.rocketmq.store.MessageStore; import org.apache.rocketmq.store.PutMessageResult; import org.apache.rocketmq.store.PutMessageStatus; import org.apache.rocketmq.store.SelectMappedBufferResult; @@ -55,11 +58,13 @@ public class BatchConsumeMessageTest extends QueueTestBase { private static final int BATCH_NUM = 10; private static final int TOTAL_MSGS = 200; - private MessageStore messageStore; + private DefaultMessageStore messageStore; + private ConcurrentMap topicConfigTableMap; @Before public void init() throws Exception { - messageStore = createMessageStore(null, true); + this.topicConfigTableMap = new ConcurrentHashMap<>(); + messageStore = (DefaultMessageStore) createMessageStore(null, true, this.topicConfigTableMap); messageStore.load(); messageStore.start(); } @@ -76,7 +81,8 @@ public void destroy() { @Test public void testSendMessagesToCqTopic() { String topic = UUID.randomUUID().toString(); - createTopic(topic, CQType.SimpleCQ, messageStore); + ConcurrentMap topicConfigTable = createTopicConfigTable(topic, CQType.SimpleCQ); + this.topicConfigTableMap.putAll(topicConfigTable); // int batchNum = 10; @@ -100,7 +106,8 @@ public void testSendMessagesToCqTopic() { @Test public void testSendMessagesToBcqTopic() { String topic = UUID.randomUUID().toString(); - createTopic(topic, CQType.BatchCQ, messageStore); + ConcurrentMap topicConfigTable = createTopicConfigTable(topic, CQType.BatchCQ); + this.topicConfigTableMap.putAll(topicConfigTable); // case 1 has PROPERTY_INNER_NUM but has no INNER_BATCH_FLAG // MessageExtBrokerInner messageExtBrokerInner = buildMessage(topic, 1); @@ -123,7 +130,8 @@ public void testSendMessagesToBcqTopic() { @Test public void testConsumeBatchMessage() { String topic = UUID.randomUUID().toString(); - createTopic(topic, CQType.BatchCQ, messageStore); + ConcurrentMap topicConfigTable = createTopicConfigTable(topic, CQType.BatchCQ); + this.topicConfigTableMap.putAll(topicConfigTable); int batchNum = 10; MessageExtBrokerInner messageExtBrokerInner = buildMessage(topic, batchNum); @@ -153,7 +161,8 @@ public void testConsumeBatchMessage() { @Test public void testNextBeginOffsetConsumeBatchMessage() { String topic = UUID.randomUUID().toString(); - createTopic(topic, CQType.BatchCQ, messageStore); + ConcurrentMap topicConfigTable = createTopicConfigTable(topic, CQType.BatchCQ); + this.topicConfigTableMap.putAll(topicConfigTable); Random random = new Random(); int putMessageCount = 1000; @@ -191,7 +200,8 @@ public void testNextBeginOffsetConsumeBatchMessage() { public void testGetOffsetInQueueByTime() throws Exception { String topic = "testGetOffsetInQueueByTime"; - createTopic(topic, CQType.BatchCQ, messageStore); + ConcurrentMap topicConfigTable = createTopicConfigTable(topic, CQType.BatchCQ); + this.topicConfigTableMap.putAll(topicConfigTable); Assert.assertTrue(QueueTypeUtils.isBatchCq(messageStore.getTopicConfig(topic))); // The initial min max offset, before and after the creation of consume queue @@ -231,7 +241,8 @@ public void testGetOffsetInQueueByTime() throws Exception { @Test public void testDispatchNormalConsumeQueue() throws Exception { String topic = "TestDispatchBuildConsumeQueue"; - createTopic(topic, CQType.SimpleCQ, messageStore); + ConcurrentMap topicConfigTable = createTopicConfigTable(topic, CQType.SimpleCQ); + this.topicConfigTableMap.putAll(topicConfigTable); long timeStart = -1; long timeMid = -1; @@ -291,7 +302,8 @@ public void testDispatchBuildBatchConsumeQueue() throws Exception { long timeStart = -1; long timeMid = -1; - createTopic(topic, CQType.BatchCQ, messageStore); + ConcurrentMap topicConfigTable = createTopicConfigTable(topic, CQType.BatchCQ); + this.topicConfigTableMap.putAll(topicConfigTable); for (int i = 0; i < 100; i++) { PutMessageResult putMessageResult = messageStore.putMessage(buildMessage(topic, batchNum)); @@ -349,7 +361,8 @@ public void testDispatchBuildBatchConsumeQueue() throws Exception { public void testGetBatchMessageWithinNumber() { String topic = UUID.randomUUID().toString(); - createTopic(topic, CQType.BatchCQ, messageStore); + ConcurrentMap topicConfigTable = createTopicConfigTable(topic, CQType.BatchCQ); + this.topicConfigTableMap.putAll(topicConfigTable); int batchNum = 20; for (int i = 0; i < 200; i++) { @@ -409,7 +422,8 @@ public void testGetBatchMessageWithinNumber() { @Test public void testGetBatchMessageWithinSize() { String topic = UUID.randomUUID().toString(); - createTopic(topic, CQType.BatchCQ, messageStore); + ConcurrentMap topicConfigTable = createTopicConfigTable(topic, CQType.BatchCQ); + this.topicConfigTableMap.putAll(topicConfigTable); int batchNum = 10; for (int i = 0; i < 100; i++) { @@ -463,7 +477,8 @@ public void testGetBatchMessageWithinSize() { } protected void putMsg(String topic) { - createTopic(topic, CQType.BatchCQ, messageStore); + ConcurrentMap topicConfigTable = createTopicConfigTable(topic, CQType.BatchCQ); + this.topicConfigTableMap.putAll(topicConfigTable); for (int i = 0; i < TOTAL_MSGS; i++) { MessageExtBrokerInner message = buildMessage(topic, BATCH_NUM * (i % 2 + 1)); diff --git a/store/src/test/java/org/apache/rocketmq/store/queue/BatchConsumeQueueTest.java b/store/src/test/java/org/apache/rocketmq/store/queue/BatchConsumeQueueTest.java index c0a9c4276f4..c6525bd8365 100644 --- a/store/src/test/java/org/apache/rocketmq/store/queue/BatchConsumeQueueTest.java +++ b/store/src/test/java/org/apache/rocketmq/store/queue/BatchConsumeQueueTest.java @@ -17,6 +17,7 @@ package org.apache.rocketmq.store.queue; +import java.util.concurrent.ConcurrentHashMap; import org.apache.rocketmq.common.BrokerConfig; import org.apache.rocketmq.store.ConsumeQueue; import org.apache.rocketmq.store.DefaultMessageStore; @@ -299,7 +300,7 @@ protected MessageStore createMessageStore(String baseDir) throws Exception { new BrokerStatsManager("simpleTest", true), (topic, queueId, logicOffset, tagsCode, msgStoreTime, filterBitMap, properties) -> { }, - new BrokerConfig()); + new BrokerConfig(), new ConcurrentHashMap<>()); } } diff --git a/store/src/test/java/org/apache/rocketmq/store/queue/ConsumeQueueStoreTest.java b/store/src/test/java/org/apache/rocketmq/store/queue/ConsumeQueueStoreTest.java index a8379fcf023..59e1d08791f 100644 --- a/store/src/test/java/org/apache/rocketmq/store/queue/ConsumeQueueStoreTest.java +++ b/store/src/test/java/org/apache/rocketmq/store/queue/ConsumeQueueStoreTest.java @@ -16,9 +16,11 @@ */ package org.apache.rocketmq.store.queue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import org.apache.rocketmq.common.TopicConfig; import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.attribute.CQType; -import org.apache.rocketmq.store.DefaultMessageStore; import org.apache.rocketmq.store.MessageStore; import org.apache.rocketmq.store.PutMessageResult; import org.apache.rocketmq.store.PutMessageStatus; @@ -36,10 +38,14 @@ public class ConsumeQueueStoreTest extends QueueTestBase { private MessageStore messageStore; + private ConcurrentMap topicConfigTableMap; + + @Before public void init() throws Exception { - messageStore = createMessageStore(null, true); + this.topicConfigTableMap = new ConcurrentHashMap<>(); + messageStore = createMessageStore(null, true, topicConfigTableMap); messageStore.load(); messageStore.start(); } @@ -56,7 +62,8 @@ public void destroy() { @Test public void testLoadConsumeQueuesWithWrongAttribute() { String normalTopic = UUID.randomUUID().toString(); - createTopic(normalTopic, CQType.SimpleCQ, messageStore); + ConcurrentMap topicConfigTable = createTopicConfigTable(normalTopic, CQType.SimpleCQ); + this.topicConfigTableMap.putAll(topicConfigTable); for (int i = 0; i < 10; i++) { PutMessageResult putMessageResult = messageStore.putMessage(buildMessage(normalTopic, -1)); @@ -66,10 +73,10 @@ public void testLoadConsumeQueuesWithWrongAttribute() { await().atMost(5, SECONDS).until(fullyDispatched(messageStore)); // simulate delete topic but with files left. - ((DefaultMessageStore)messageStore).setTopicConfigTable(null); + this.topicConfigTableMap.clear(); - createTopic(normalTopic, CQType.BatchCQ, messageStore); - messageStore.shutdown(); + topicConfigTable = createTopicConfigTable(normalTopic, CQType.BatchCQ); + this.topicConfigTableMap.putAll(topicConfigTable); RuntimeException runtimeException = Assert.assertThrows(RuntimeException.class, () -> messageStore.getQueueStore().load()); Assert.assertTrue(runtimeException.getMessage().endsWith("should be SimpleCQ, but is BatchCQ")); @@ -78,7 +85,8 @@ public void testLoadConsumeQueuesWithWrongAttribute() { @Test public void testLoadBatchConsumeQueuesWithWrongAttribute() { String batchTopic = UUID.randomUUID().toString(); - createTopic(batchTopic, CQType.BatchCQ, messageStore); + ConcurrentMap topicConfigTable = createTopicConfigTable(batchTopic, CQType.BatchCQ); + this.topicConfigTableMap.putAll(topicConfigTable); for (int i = 0; i < 10; i++) { PutMessageResult putMessageResult = messageStore.putMessage(buildMessage(batchTopic, 10)); @@ -88,9 +96,10 @@ public void testLoadBatchConsumeQueuesWithWrongAttribute() { await().atMost(5, SECONDS).until(fullyDispatched(messageStore)); // simulate delete topic but with files left. - ((DefaultMessageStore)messageStore).setTopicConfigTable(null); + this.topicConfigTableMap.clear(); - createTopic(batchTopic, CQType.SimpleCQ, messageStore); + topicConfigTable = createTopicConfigTable(batchTopic, CQType.SimpleCQ); + this.topicConfigTableMap.putAll(topicConfigTable); messageStore.shutdown(); RuntimeException runtimeException = Assert.assertThrows(RuntimeException.class, () -> messageStore.getQueueStore().load()); diff --git a/store/src/test/java/org/apache/rocketmq/store/queue/ConsumeQueueTest.java b/store/src/test/java/org/apache/rocketmq/store/queue/ConsumeQueueTest.java index 6a8bfc5bc66..c3c8be52ddd 100644 --- a/store/src/test/java/org/apache/rocketmq/store/queue/ConsumeQueueTest.java +++ b/store/src/test/java/org/apache/rocketmq/store/queue/ConsumeQueueTest.java @@ -20,6 +20,7 @@ import java.nio.ByteBuffer; import java.util.Map; import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; import org.apache.rocketmq.common.BrokerConfig; import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.attribute.CQType; @@ -74,7 +75,7 @@ protected DefaultMessageStore gen() throws Exception { DefaultMessageStore master = new DefaultMessageStore( messageStoreConfig, new BrokerStatsManager(brokerConfig), (topic, queueId, logicOffset, tagsCode, msgStoreTime, filterBitMap, properties) -> { - }, brokerConfig); + }, brokerConfig, new ConcurrentHashMap<>()); assertThat(master.load()).isTrue(); @@ -112,7 +113,7 @@ protected void putMsg(DefaultMessageStore messageStore) throws Exception { public void testIterator() throws Exception { final int msgNum = 100; final int msgSize = 1000; - MessageStore messageStore = createMessageStore(null, true); + MessageStore messageStore = createMessageStore(null, true, null); messageStore.load(); String topic = UUID.randomUUID().toString(); //The initial min max offset, before and after the creation of consume queue diff --git a/store/src/test/java/org/apache/rocketmq/store/queue/QueueTestBase.java b/store/src/test/java/org/apache/rocketmq/store/queue/QueueTestBase.java index a1e1cc1f5c0..81dc158db53 100644 --- a/store/src/test/java/org/apache/rocketmq/store/queue/QueueTestBase.java +++ b/store/src/test/java/org/apache/rocketmq/store/queue/QueueTestBase.java @@ -41,7 +41,7 @@ public class QueueTestBase extends StoreTestBase { - protected void createTopic(String topic, CQType cqType, MessageStore messageStore) { + protected ConcurrentMap createTopicConfigTable(String topic, CQType cqType) { ConcurrentMap topicConfigTable = new ConcurrentHashMap<>(); TopicConfig topicConfigToBeAdded = new TopicConfig(); @@ -51,14 +51,14 @@ protected void createTopic(String topic, CQType cqType, MessageStore messageStor topicConfigToBeAdded.setAttributes(attributes); topicConfigTable.put(topic, topicConfigToBeAdded); - ((DefaultMessageStore) messageStore).setTopicConfigTable(topicConfigTable); + return topicConfigTable; } protected Callable fullyDispatched(MessageStore messageStore) { return () -> messageStore.dispatchBehindBytes() == 0; } - protected MessageStore createMessageStore(String baseDir, boolean extent) throws Exception { + protected MessageStore createMessageStore(String baseDir, boolean extent, ConcurrentMap topicConfigTable) throws Exception { if (baseDir == null) { baseDir = createBaseDir(); } @@ -86,7 +86,7 @@ protected MessageStore createMessageStore(String baseDir, boolean extent) throws new BrokerStatsManager("simpleTest", true), (topic, queueId, logicOffset, tagsCode, msgStoreTime, filterBitMap, properties) -> { }, - new BrokerConfig()); + new BrokerConfig(), topicConfigTable); } public MessageExtBrokerInner buildMessage(String topic, int batchNum) { diff --git a/store/src/test/java/org/apache/rocketmq/store/timer/TimerMessageStoreTest.java b/store/src/test/java/org/apache/rocketmq/store/timer/TimerMessageStoreTest.java index 7ace2d9fe61..63ec97cdb0b 100644 --- a/store/src/test/java/org/apache/rocketmq/store/timer/TimerMessageStoreTest.java +++ b/store/src/test/java/org/apache/rocketmq/store/timer/TimerMessageStoreTest.java @@ -29,6 +29,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import org.apache.rocketmq.common.BrokerConfig; @@ -99,7 +100,7 @@ public void init() throws Exception { storeConfig.setTimerInterceptDelayLevel(true); storeConfig.setTimerPrecisionMs(precisionMs); - messageStore = new DefaultMessageStore(storeConfig, new BrokerStatsManager("TimerTest",false), new MyMessageArrivingListener(), new BrokerConfig()); + messageStore = new DefaultMessageStore(storeConfig, new BrokerStatsManager("TimerTest",false), new MyMessageArrivingListener(), new BrokerConfig(), new ConcurrentHashMap<>()); boolean load = messageStore.load(); assertTrue(load); messageStore.start();