diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java index 275b64b1abd..9e49f636d29 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java @@ -1765,29 +1765,34 @@ public synchronized void registerIncrementBrokerData(List topicConf } public synchronized void registerBrokerAll(final boolean checkOrderConfig, boolean oneway, boolean forceRegister) { + ConcurrentMap topicConfigMap = this.getTopicConfigManager().getTopicConfigTable(); + ConcurrentHashMap topicConfigTable = new ConcurrentHashMap<>(); - TopicConfigAndMappingSerializeWrapper topicConfigWrapper = new TopicConfigAndMappingSerializeWrapper(); - - topicConfigWrapper.setDataVersion(this.getTopicConfigManager().getDataVersion()); - topicConfigWrapper.setTopicConfigTable(this.getTopicConfigManager().getTopicConfigTable()); - - topicConfigWrapper.setTopicQueueMappingInfoMap(this.getTopicQueueMappingManager().getTopicQueueMappingTable().entrySet().stream().map( - entry -> new AbstractMap.SimpleImmutableEntry<>(entry.getKey(), TopicQueueMappingDetail.cloneAsMappingInfo(entry.getValue())) - ).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))); - - if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission()) - || !PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) { - ConcurrentHashMap topicConfigTable = new ConcurrentHashMap<>(); - for (TopicConfig topicConfig : topicConfigWrapper.getTopicConfigTable().values()) { - TopicConfig tmp = + for (TopicConfig topicConfig : topicConfigMap.values()) { + if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission()) + || !PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) { + topicConfigTable.put(topicConfig.getTopicName(), new TopicConfig(topicConfig.getTopicName(), topicConfig.getReadQueueNums(), topicConfig.getWriteQueueNums(), - topicConfig.getPerm() & this.brokerConfig.getBrokerPermission(), topicConfig.getTopicSysFlag()); - topicConfigTable.put(topicConfig.getTopicName(), tmp); + topicConfig.getPerm() & getBrokerConfig().getBrokerPermission())); + } else { + topicConfigTable.put(topicConfig.getTopicName(), topicConfig); + } + + if (this.brokerConfig.isEnableSplitRegistration() + && topicConfigTable.size() >= this.brokerConfig.getSplitRegistrationSize()) { + TopicConfigAndMappingSerializeWrapper topicConfigWrapper = this.getTopicConfigManager().buildSerializeWrapper(topicConfigTable); + doRegisterBrokerAll(checkOrderConfig, oneway, topicConfigWrapper); + topicConfigTable.clear(); } - topicConfigWrapper.setTopicConfigTable(topicConfigTable); } - if (forceRegister || needRegister(this.brokerConfig.getBrokerClusterName(), + Map topicQueueMappingInfoMap = this.getTopicQueueMappingManager().getTopicQueueMappingTable().entrySet().stream() + .map(entry -> new AbstractMap.SimpleImmutableEntry<>(entry.getKey(), TopicQueueMappingDetail.cloneAsMappingInfo(entry.getValue()))) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + + TopicConfigAndMappingSerializeWrapper topicConfigWrapper = this.getTopicConfigManager(). + buildSerializeWrapper(topicConfigTable, topicQueueMappingInfoMap); + if (this.brokerConfig.isEnableSplitRegistration() || forceRegister || needRegister(this.brokerConfig.getBrokerClusterName(), this.getBrokerAddr(), this.brokerConfig.getBrokerName(), this.brokerConfig.getBrokerId(), diff --git a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java index 754605438d8..8537929be75 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java @@ -29,6 +29,7 @@ import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Maps; import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.broker.BrokerPathConfigHelper; @@ -47,7 +48,9 @@ import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; import org.apache.rocketmq.remoting.protocol.DataVersion; import org.apache.rocketmq.remoting.protocol.body.KVTable; +import org.apache.rocketmq.remoting.protocol.body.TopicConfigAndMappingSerializeWrapper; import org.apache.rocketmq.remoting.protocol.body.TopicConfigSerializeWrapper; +import org.apache.rocketmq.remoting.protocol.statictopic.TopicQueueMappingInfo; import static com.google.common.base.Preconditions.checkNotNull; @@ -609,6 +612,24 @@ public TopicConfigSerializeWrapper buildTopicConfigSerializeWrapper() { return topicConfigSerializeWrapper; } + public TopicConfigAndMappingSerializeWrapper buildSerializeWrapper(final ConcurrentMap topicConfigTable) { + return buildSerializeWrapper(topicConfigTable, Maps.newHashMap()); + } + + public TopicConfigAndMappingSerializeWrapper buildSerializeWrapper( + final ConcurrentMap topicConfigTable, + final Map topicQueueMappingInfoMap + ) { + TopicConfigAndMappingSerializeWrapper topicConfigWrapper = new TopicConfigAndMappingSerializeWrapper(); + topicConfigWrapper.setTopicConfigTable(topicConfigTable); + topicConfigWrapper.setTopicQueueMappingInfoMap(topicQueueMappingInfoMap); + topicConfigWrapper.setDataVersion(this.getDataVersion()); + if (this.brokerController.getBrokerConfig().isEnableSplitRegistration()) { + this.getDataVersion().nextVersion(); + } + return topicConfigWrapper; + } + @Override public String encode() { return encode(false); diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java index 45d26b29cba..0d248c4e170 100644 --- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java +++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java @@ -396,6 +396,14 @@ public class BrokerConfig extends BrokerIdentity { private boolean enableMixedMessageType = false; + /** + * This flag and deleteTopicWithBrokerRegistration flag in the NameServer cannot be set to true at the same time, + * otherwise there will be a loss of routing + */ + private boolean enableSplitRegistration = false; + + private int splitRegistrationSize = 800; + public long getMaxPopPollingSize() { return maxPopPollingSize; } @@ -1731,4 +1739,20 @@ public boolean isEnableMixedMessageType() { public void setEnableMixedMessageType(boolean enableMixedMessageType) { this.enableMixedMessageType = enableMixedMessageType; } + + public boolean isEnableSplitRegistration() { + return enableSplitRegistration; + } + + public void setEnableSplitRegistration(boolean enableSplitRegistration) { + this.enableSplitRegistration = enableSplitRegistration; + } + + public int getSplitRegistrationSize() { + return splitRegistrationSize; + } + + public void setSplitRegistrationSize(int splitRegistrationSize) { + this.splitRegistrationSize = splitRegistrationSize; + } } diff --git a/test/src/test/java/org/apache/rocketmq/test/route/CreateAndUpdateTopicIT.java b/test/src/test/java/org/apache/rocketmq/test/route/CreateAndUpdateTopicIT.java index 7e3c7b871dc..2370e68c0f1 100644 --- a/test/src/test/java/org/apache/rocketmq/test/route/CreateAndUpdateTopicIT.java +++ b/test/src/test/java/org/apache/rocketmq/test/route/CreateAndUpdateTopicIT.java @@ -17,6 +17,7 @@ package org.apache.rocketmq.test.route; +import org.apache.rocketmq.common.TopicConfig; import org.apache.rocketmq.remoting.protocol.route.TopicRouteData; import org.apache.rocketmq.test.base.BaseConf; import org.apache.rocketmq.test.util.MQAdminTestUtils; @@ -111,4 +112,34 @@ public void testStaticTopicNotAffected() throws Exception { brokerController3.getBrokerConfig().setEnableSingleTopicRegister(false); namesrvController.getNamesrvConfig().setDeleteTopicWithBrokerRegistration(false); } + + @Test + public void testCreateOrUpdateTopic_EnableSplitRegistration() { + brokerController1.getBrokerConfig().setEnableSplitRegistration(true); + brokerController2.getBrokerConfig().setEnableSplitRegistration(true); + brokerController3.getBrokerConfig().setEnableSplitRegistration(true); + + String testTopic = "test-topic-"; + + for (int i = 0; i < 1000; i++) { + TopicConfig topicConfig = new TopicConfig(testTopic + i, 8, 8); + brokerController1.getTopicConfigManager().updateTopicConfig(topicConfig); + brokerController2.getTopicConfigManager().updateTopicConfig(topicConfig); + brokerController3.getTopicConfigManager().updateTopicConfig(topicConfig); + } + + brokerController1.registerBrokerAll(false, true, true); + brokerController2.registerBrokerAll(false, true, true); + brokerController3.registerBrokerAll(false, true, true); + + for (int i = 0; i < 1000; i++) { + TopicRouteData route = MQAdminTestUtils.examineTopicRouteInfo(NAMESRV_ADDR, testTopic + i); + assertThat(route.getBrokerDatas()).hasSize(3); + assertThat(route.getQueueDatas()).hasSize(3); + } + + brokerController1.getBrokerConfig().setEnableSplitRegistration(false); + brokerController2.getBrokerConfig().setEnableSplitRegistration(false); + brokerController3.getBrokerConfig().setEnableSplitRegistration(false); + } }