Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #6545] Remove getTopicConfigs method in interface MessageStore #6531

Merged
merged 19 commits into from
Apr 15, 2023
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -737,8 +737,7 @@ public boolean initialize() throws CloneNotSupportedException {

if (result) {
try {
DefaultMessageStore defaultMessageStore = new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener, this.brokerConfig);
defaultMessageStore.setTopicConfigTable(topicConfigManager.getTopicConfigTable());
DefaultMessageStore defaultMessageStore = new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener, this.brokerConfig, (ConcurrentMap<String, TopicConfig>) Collections.unmodifiableMap(topicConfigManager.getTopicConfigTable()));
joeCarf marked this conversation as resolved.
Show resolved Hide resolved

if (messageStoreConfig.isEnableDLegerCommitLog()) {
DLedgerRoleChangeHandler roleChangeHandler = new DLedgerRoleChangeHandler(this, defaultMessageStore);
Expand Down Expand Up @@ -907,16 +906,16 @@ private void initialTransaction() {
this.transactionalMessageService = ServiceProvider.loadClass(TransactionalMessageService.class);
if (null == this.transactionalMessageService) {
this.transactionalMessageService = new TransactionalMessageServiceImpl(
new TransactionalMessageBridge(this, this.getMessageStore()));
new TransactionalMessageBridge(this, this.getMessageStore()));
LOG.warn("Load default transaction message hook service: {}",
TransactionalMessageServiceImpl.class.getSimpleName());
TransactionalMessageServiceImpl.class.getSimpleName());
}
this.transactionalMessageCheckListener = ServiceProvider.loadClass(
AbstractTransactionalMessageCheckListener.class);
AbstractTransactionalMessageCheckListener.class);
if (null == this.transactionalMessageCheckListener) {
this.transactionalMessageCheckListener = new DefaultTransactionalMessageCheckListener();
LOG.warn("Load default discard message hook service: {}",
DefaultTransactionalMessageCheckListener.class.getSimpleName());
DefaultTransactionalMessageCheckListener.class.getSimpleName());
}
this.transactionalMessageCheckListener.setBrokerController(this);
this.transactionalMessageCheckService = new TransactionalMessageCheckService(this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.MixAll;
Expand Down Expand Up @@ -159,7 +160,7 @@ public void arriving(String topic, int queueId, long logicOffset, long tagsCode,
long msgStoreTime, byte[] filterBitMap, Map<String, String> properties) {
}
}
, brokerConfig);
, brokerConfig, new ConcurrentHashMap<>());

master.getDispatcherList().addFirst(new CommitLogDispatcher() {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ public void setUp() throws Exception {

brokerConfig = new BrokerConfig();
BrokerStatsManager manager = new BrokerStatsManager(brokerConfig.getBrokerClusterName(), brokerConfig.isEnableDetailStat());
messageStore = new DefaultMessageStore(messageStoreConfig, manager, new MyMessageArrivingListener(), new BrokerConfig());
messageStore = new DefaultMessageStore(messageStoreConfig, manager, new MyMessageArrivingListener(), new BrokerConfig(), new ConcurrentHashMap<>());

assertThat(messageStore.load()).isTrue();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,16 +200,20 @@ public class DefaultMessageStore implements MessageStore {

private long stateMachineVersion = 0L;

// this is a unmodifiableMap
private ConcurrentMap<String, TopicConfig> topicConfigTable;

private final ScheduledExecutorService scheduledCleanQueueExecutorService =
Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("StoreCleanQueueScheduledThread"));

public DefaultMessageStore(final MessageStoreConfig messageStoreConfig, final BrokerStatsManager brokerStatsManager,
final MessageArrivingListener messageArrivingListener, final BrokerConfig brokerConfig) throws IOException {
final MessageArrivingListener messageArrivingListener, final BrokerConfig brokerConfig, final ConcurrentMap<String, TopicConfig> topicConfigTable) throws IOException {
this.messageArrivingListener = messageArrivingListener;
this.brokerConfig = brokerConfig;
this.messageStoreConfig = messageStoreConfig;
this.aliveReplicasNum = messageStoreConfig.getTotalReplicas();
this.brokerStatsManager = brokerStatsManager;
this.topicConfigTable = topicConfigTable;
this.allocateMappedFileService = new AllocateMappedFileService(this);
if (messageStoreConfig.isEnableDLegerCommitLog()) {
this.commitLog = new DLedgerCommitLog(this);
Expand Down Expand Up @@ -2047,20 +2051,22 @@ public void assignOffset(MessageExtBrokerInner msg, short messageNum) {
}
}

@Override
public ConcurrentMap<String, TopicConfig> getTopicConfigs() {
return this.consumeQueueStore.getTopicConfigs();
return this.topicConfigTable;
}

@Override
public Optional<TopicConfig> getTopicConfig(String topic) {
return this.consumeQueueStore.getTopicConfig(topic);
}
if (this.topicConfigTable == null) {
return Optional.empty();
}

public void setTopicConfigTable(ConcurrentMap<String, TopicConfig> topicConfigTable) {
this.consumeQueueStore.setTopicConfigTable(topicConfigTable);
return Optional.ofNullable(this.topicConfigTable.get(topic));
}

// public void setTopicConfigTable(ConcurrentMap<String, TopicConfig> topicConfigTable) {
// this.topicConfigTable = topicConfigTable;
// }

public BrokerIdentity getBrokerIdentity() {
if (messageStoreConfig.isEnableDLegerCommitLog()) {
return new BrokerIdentity(
Expand Down
18 changes: 0 additions & 18 deletions store/src/main/java/org/apache/rocketmq/store/MessageStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,12 @@
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;

import org.apache.rocketmq.common.Pair;
import org.apache.rocketmq.common.SystemClock;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageExtBatch;
import org.apache.rocketmq.common.message.MessageExtBrokerInner;
Expand Down Expand Up @@ -736,21 +733,6 @@ void onCommitLogDispatch(DispatchRequest dispatchRequest, boolean doDispatch, Ma
*/
void assignOffset(MessageExtBrokerInner msg, short messageNum);

/**
* get all topic config
*
* @return all topic config info
*/
Map<String, TopicConfig> getTopicConfigs();

/**
* get topic config
*
* @param topic topic name
* @return topic config info
*/
Optional<TopicConfig> getTopicConfig(String topic);

/**
* Get master broker message store in process in broker container
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@
import org.apache.rocketmq.common.utils.CleanupPolicyUtils;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.store.DefaultMessageStore;
import org.apache.rocketmq.store.DispatchRequest;
import org.apache.rocketmq.store.GetMessageResult;
import org.apache.rocketmq.store.MessageStore;
import org.apache.rocketmq.store.SelectMappedBufferResult;
import org.apache.rocketmq.store.config.MessageStoreConfig;

Expand All @@ -53,7 +53,7 @@ public class CompactionStore {
private final String compactionPath;
private final String compactionLogPath;
private final String compactionCqPath;
private final MessageStore defaultMessageStore;
private final DefaultMessageStore defaultMessageStore;
private final CompactionPositionMgr positionMgr;
private final ConcurrentHashMap<String, CompactionLog> compactionLogTable;
private final ScheduledExecutorService compactionSchedule;
Expand All @@ -65,7 +65,7 @@ public class CompactionStore {

private static final Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);

public CompactionStore(MessageStore defaultMessageStore) {
public CompactionStore(DefaultMessageStore defaultMessageStore) {
this.defaultMessageStore = defaultMessageStore;
this.compactionLogTable = new ConcurrentHashMap<>();
MessageStoreConfig config = defaultMessageStore.getMessageStoreConfig();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,12 @@
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;

import org.apache.rocketmq.common.Pair;
import org.apache.rocketmq.common.SystemClock;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageExtBatch;
import org.apache.rocketmq.common.message.MessageExtBrokerInner;
Expand Down Expand Up @@ -595,16 +592,6 @@ public void assignOffset(MessageExtBrokerInner msg, short messageNum) {
next.assignOffset(msg, messageNum);
}

@Override
public Map<String, TopicConfig> getTopicConfigs() {
return next.getTopicConfigs();
}

@Override
public Optional<TopicConfig> getTopicConfig(String topic) {
return next.getTopicConfig(topic);
}

@Override
public List<PutMessageHook> getPutMessageHookList() {
return next.getPutMessageHookList();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,20 +64,12 @@ public class ConsumeQueueStore {
protected final QueueOffsetAssigner queueOffsetAssigner = new QueueOffsetAssigner();
protected final ConcurrentMap<String/* topic */, ConcurrentMap<Integer/* queueId */, ConsumeQueueInterface>> consumeQueueTable;

// Should be careful, do not change the topic config
// TopicConfigManager is more suitable here.
private ConcurrentMap<String, TopicConfig> topicConfigTable;

public ConsumeQueueStore(DefaultMessageStore messageStore, MessageStoreConfig messageStoreConfig) {
this.messageStore = messageStore;
this.messageStoreConfig = messageStoreConfig;
this.consumeQueueTable = new ConcurrentHashMap<>(32);
}

public void setTopicConfigTable(ConcurrentMap<String, TopicConfig> topicConfigTable) {
this.topicConfigTable = topicConfigTable;
}

private FileQueueLifeCycle getLifeCycle(String topic, int queueId) {
return (FileQueueLifeCycle) findOrCreateConsumeQueue(topic, queueId);
}
Expand Down Expand Up @@ -173,9 +165,9 @@ private ConsumeQueueInterface createConsumeQueueByType(CQType cqType, String top
}

private void queueTypeShouldBe(String topic, CQType cqTypeExpected) {
TopicConfig topicConfig = this.topicConfigTable == null ? null : this.topicConfigTable.get(topic);
Optional<TopicConfig> topicConfig = this.messageStore.getTopicConfig(topic);

CQType cqTypeActual = QueueTypeUtils.getCQType(Optional.ofNullable(topicConfig));
CQType cqTypeActual = QueueTypeUtils.getCQType(topicConfig);

if (!Objects.equals(cqTypeExpected, cqTypeActual)) {
throw new RuntimeException(format("The queue type of topic: %s should be %s, but is %s", topic, cqTypeExpected, cqTypeActual));
Expand Down Expand Up @@ -341,7 +333,7 @@ private ConsumeQueueInterface doFindOrCreateConsumeQueue(String topic, int queue

ConsumeQueueInterface newLogic;

Optional<TopicConfig> topicConfig = this.getTopicConfig(topic);
Optional<TopicConfig> topicConfig = this.messageStore.getTopicConfig(topic);
// TODO maybe the topic has been deleted.
if (Objects.equals(CQType.BatchCQ, QueueTypeUtils.getCQType(topicConfig))) {
newLogic = new BatchConsumeQueue(
Expand Down Expand Up @@ -537,18 +529,6 @@ public void truncateDirty(long phyOffset) {
}
}

public ConcurrentMap<String, TopicConfig> getTopicConfigs() {
return this.topicConfigTable;
}

public Optional<TopicConfig> getTopicConfig(String topic) {
if (this.topicConfigTable == null) {
return Optional.empty();
}

return Optional.ofNullable(this.topicConfigTable.get(topic));
}

public long getTotalSize() {
long totalSize = 0;
for (ConcurrentMap<Integer, ConsumeQueueInterface> maps : this.consumeQueueTable.values()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.List;
import java.util.Set;

import java.util.concurrent.ConcurrentHashMap;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.message.Message;
Expand Down Expand Up @@ -55,7 +56,7 @@ public void init() throws Exception {
messageStoreConfig.setStorePathRootDir(System.getProperty("java.io.tmpdir") + File.separator + "unitteststore");
messageStoreConfig.setStorePathCommitLog(System.getProperty("java.io.tmpdir") + File.separator + "unitteststore" + File.separator + "commitlog");
//too much reference
DefaultMessageStore messageStore = new DefaultMessageStore(messageStoreConfig, null, null, new BrokerConfig());
DefaultMessageStore messageStore = new DefaultMessageStore(messageStoreConfig, null, null, new BrokerConfig(), new ConcurrentHashMap<>());
CommitLog commitLog = new CommitLog(messageStore);
callback = commitLog.new DefaultAppendMessageCallback();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.message.Message;
Expand Down Expand Up @@ -78,7 +79,7 @@ private MessageStore buildMessageStore() throws Exception {
messageStoreConfig.setStorePathCommitLog(System.getProperty("java.io.tmpdir") + File.separator
+ "putmessagesteststore" + File.separator + "commitlog");
messageStoreConfig.setHaListenPort(0);
return new DefaultMessageStore(messageStoreConfig, new BrokerStatsManager("simpleTest", true), new MyMessageArrivingListener(), new BrokerConfig());
return new DefaultMessageStore(messageStoreConfig, new BrokerStatsManager("simpleTest", true), new MyMessageArrivingListener(), new BrokerConfig(), new ConcurrentHashMap<>());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.net.SocketAddress;
import java.net.UnknownHostException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;

Expand Down Expand Up @@ -151,7 +152,7 @@ public void arriving(String topic, int queueId, long logicOffset, long tagsCode,
long msgStoreTime, byte[] filterBitMap, Map<String, String> properties) {
}
}
, brokerConfig);
, brokerConfig, new ConcurrentHashMap<>());

assertThat(master.load()).isTrue();

Expand Down Expand Up @@ -179,7 +180,7 @@ public void arriving(String topic, int queueId, long logicOffset, long tagsCode,
long msgStoreTime, byte[] filterBitMap, Map<String, String> properties) {
}
}
, brokerConfig);
, brokerConfig, new ConcurrentHashMap<>());

assertThat(master.load()).isTrue();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.rocketmq.store;

import java.util.concurrent.ConcurrentHashMap;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.UtilAll;
Expand Down Expand Up @@ -483,7 +484,7 @@ private MessageStoreConfig genMessageStoreConfig(String deleteWhen, int diskMaxU

private void initMessageStore(MessageStoreConfig messageStoreConfig, double diskSpaceCleanForciblyRatio) throws Exception {
messageStore = new DefaultMessageStore(messageStoreConfig,
new BrokerStatsManager("test", true), new MyMessageArrivingListener(), new BrokerConfig());
new BrokerStatsManager("test", true), new MyMessageArrivingListener(), new BrokerConfig(), new ConcurrentHashMap<>());

cleanCommitLogService = getCleanCommitLogService();
cleanConsumeQueueService = getCleanConsumeQueueService();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.rocketmq.store;

import java.io.File;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.store.config.FlushDiskType;
Expand Down Expand Up @@ -74,7 +75,7 @@ public DefaultMessageStore buildMessageStore() throws Exception {
String storeRootPath = System.getProperty("java.io.tmpdir") + File.separator + "store";
messageStoreConfig.setStorePathRootDir(storeRootPath);
messageStoreConfig.setHaListenPort(0);
return new DefaultMessageStore(messageStoreConfig, new BrokerStatsManager("simpleTest", true), null, new BrokerConfig());
return new DefaultMessageStore(messageStoreConfig, new BrokerStatsManager("simpleTest", true), null, new BrokerConfig(), new ConcurrentHashMap<>());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ public void test_repeat_restart() throws Exception {
messageStoreConfig.setMaxIndexNum(100 * 10);
messageStoreConfig.setStorePathRootDir(System.getProperty("java.io.tmpdir") + File.separator + "store");
messageStoreConfig.setHaListenPort(0);
MessageStore master = new DefaultMessageStore(messageStoreConfig, null, new MyMessageArrivingListener(), new BrokerConfig());
MessageStore master = new DefaultMessageStore(messageStoreConfig, null, new MyMessageArrivingListener(), new BrokerConfig(), new ConcurrentHashMap<>());

boolean load = master.load();
assertTrue(load);
Expand Down Expand Up @@ -144,7 +144,7 @@ private MessageStore buildMessageStore(String storePathRootDir) throws Exception
return new DefaultMessageStore(messageStoreConfig,
new BrokerStatsManager("simpleTest", true),
new MyMessageArrivingListener(),
new BrokerConfig());
new BrokerConfig(), new ConcurrentHashMap<>());
}

@Test
Expand Down
3 changes: 2 additions & 1 deletion store/src/test/java/org/apache/rocketmq/store/HATest.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.time.Duration;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.MixAll;
Expand Down Expand Up @@ -250,7 +251,7 @@ public void destroy() throws Exception {
private MessageStore buildMessageStore(MessageStoreConfig messageStoreConfig, long brokerId) throws Exception {
BrokerConfig brokerConfig = new BrokerConfig();
brokerConfig.setBrokerId(brokerId);
return new DefaultMessageStore(messageStoreConfig, brokerStatsManager, null, brokerConfig);
return new DefaultMessageStore(messageStoreConfig, brokerStatsManager, null, brokerConfig, new ConcurrentHashMap<>());
}

private void buildMessageStoreConfig(MessageStoreConfig messageStoreConfig) {
Expand Down
Loading