Skip to content

Commit

Permalink
[ISSUE apache#7326] split the request to register to the nameserver
Browse files Browse the repository at this point in the history
  • Loading branch information
Ziy1-Tan committed Sep 8, 2023
1 parent e11e294 commit 1c01a0f
Show file tree
Hide file tree
Showing 3 changed files with 92 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.rocketmq.broker;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.rocketmq.acl.AccessValidator;
import org.apache.rocketmq.acl.plain.PlainAccessValidator;
import org.apache.rocketmq.broker.client.ClientHousekeepingService;
Expand Down Expand Up @@ -1766,29 +1767,32 @@ public synchronized void registerIncrementBrokerData(List<TopicConfig> topicConf
}

public synchronized void registerBrokerAll(final boolean checkOrderConfig, boolean oneway, boolean forceRegister) {
ConcurrentMap<String, TopicConfig> topicConfigMap = this.getTopicConfigManager().getTopicConfigTable();
ConcurrentHashMap<String, TopicConfig> topicConfigTable = new ConcurrentHashMap<>();

TopicConfigAndMappingSerializeWrapper topicConfigWrapper = new TopicConfigAndMappingSerializeWrapper();

topicConfigWrapper.setDataVersion(this.getTopicConfigManager().getDataVersion());
topicConfigWrapper.setTopicConfigTable(this.getTopicConfigManager().getTopicConfigTable());

topicConfigWrapper.setTopicQueueMappingInfoMap(this.getTopicQueueMappingManager().getTopicQueueMappingTable().entrySet().stream().map(
entry -> new AbstractMap.SimpleImmutableEntry<>(entry.getKey(), TopicQueueMappingDetail.cloneAsMappingInfo(entry.getValue()))
).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)));

if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission())
|| !PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) {
ConcurrentHashMap<String, TopicConfig> topicConfigTable = new ConcurrentHashMap<>();
for (TopicConfig topicConfig : topicConfigWrapper.getTopicConfigTable().values()) {
TopicConfig tmp =
for (TopicConfig topicConfig : topicConfigMap.values()) {
if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission())
|| !PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) {
topicConfigTable.put(topicConfig.getTopicName(),
new TopicConfig(topicConfig.getTopicName(), topicConfig.getReadQueueNums(), topicConfig.getWriteQueueNums(),
topicConfig.getPerm() & this.brokerConfig.getBrokerPermission(), topicConfig.getTopicSysFlag());
topicConfigTable.put(topicConfig.getTopicName(), tmp);
topicConfig.getPerm() & getBrokerConfig().getBrokerPermission()));
} else {
topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
}

if (this.brokerConfig.isEnableSplitRegistration() && topicConfigTable.size() >= this.brokerConfig.getSplitRegistrationSize()) {
TopicConfigAndMappingSerializeWrapper topicConfigWrapper = buildSerializeWrapper(topicConfigTable);
doRegisterBrokerAll(checkOrderConfig, oneway, topicConfigWrapper);
topicConfigTable.clear();
}
topicConfigWrapper.setTopicConfigTable(topicConfigTable);
}

if (forceRegister || needRegister(this.brokerConfig.getBrokerClusterName(),
Map<String, TopicQueueMappingInfo> topicQueueMappingInfoMap = this.getTopicQueueMappingManager().getTopicQueueMappingTable().entrySet().stream()
.map(entry -> new AbstractMap.SimpleImmutableEntry<>(entry.getKey(), TopicQueueMappingDetail.cloneAsMappingInfo(entry.getValue())))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));

TopicConfigAndMappingSerializeWrapper topicConfigWrapper = buildSerializeWrapper(topicConfigTable, topicQueueMappingInfoMap);
if (this.brokerConfig.isEnableSplitRegistration() || forceRegister || needRegister(this.brokerConfig.getBrokerClusterName(),
this.getBrokerAddr(),
this.brokerConfig.getBrokerName(),
this.brokerConfig.getBrokerId(),
Expand All @@ -1798,6 +1802,25 @@ public synchronized void registerBrokerAll(final boolean checkOrderConfig, boole
}
}


private TopicConfigAndMappingSerializeWrapper buildSerializeWrapper(final ConcurrentMap<String, TopicConfig> topicConfigTable) {
return buildSerializeWrapper(topicConfigTable, Maps.newHashMap());
}

private TopicConfigAndMappingSerializeWrapper buildSerializeWrapper(
final ConcurrentMap<String, TopicConfig> topicConfigTable,
final Map<String, TopicQueueMappingInfo> topicQueueMappingInfoMap
) {
TopicConfigAndMappingSerializeWrapper topicConfigWrapper = new TopicConfigAndMappingSerializeWrapper();
topicConfigWrapper.setTopicConfigTable(topicConfigTable);
topicConfigWrapper.setTopicQueueMappingInfoMap(topicQueueMappingInfoMap);
topicConfigWrapper.setDataVersion(this.getTopicConfigManager().getDataVersion());
if (this.brokerConfig.isEnableSplitRegistration()) {
this.getTopicConfigManager().getDataVersion().nextVersion();
}
return topicConfigWrapper;
}

protected void doRegisterBrokerAll(boolean checkOrderConfig, boolean oneway,
TopicConfigSerializeWrapper topicConfigWrapper) {

Expand Down
20 changes: 20 additions & 0 deletions common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -396,6 +396,10 @@ public class BrokerConfig extends BrokerIdentity {

private boolean enableMixedMessageType = false;

private boolean enableSplitRegistration = false;

private int splitRegistrationSize = 800;

public long getMaxPopPollingSize() {
return maxPopPollingSize;
}
Expand Down Expand Up @@ -1731,4 +1735,20 @@ public boolean isEnableMixedMessageType() {
public void setEnableMixedMessageType(boolean enableMixedMessageType) {
this.enableMixedMessageType = enableMixedMessageType;
}

public boolean isEnableSplitRegistration() {
return enableSplitRegistration;
}

public void setEnableSplitRegistration(boolean enableSplitRegistration) {
this.enableSplitRegistration = enableSplitRegistration;
}

public int getSplitRegistrationSize() {
return splitRegistrationSize;
}

public void setSplitRegistrationSize(int splitRegistrationSize) {
this.splitRegistrationSize = splitRegistrationSize;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.rocketmq.test.route;

import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.remoting.protocol.route.TopicRouteData;
import org.apache.rocketmq.test.base.BaseConf;
import org.apache.rocketmq.test.util.MQAdminTestUtils;
Expand Down Expand Up @@ -111,4 +112,34 @@ public void testStaticTopicNotAffected() throws Exception {
brokerController3.getBrokerConfig().setEnableSingleTopicRegister(false);
namesrvController.getNamesrvConfig().setDeleteTopicWithBrokerRegistration(false);
}

@Test
public void testCreateOrUpdateTopic_EnableSplitRegistration() {
brokerController1.getBrokerConfig().setEnableSplitRegistration(true);
brokerController2.getBrokerConfig().setEnableSplitRegistration(true);
brokerController3.getBrokerConfig().setEnableSplitRegistration(true);

String testTopic = "test-topic-";

for (int i = 0; i < 1000; i++) {
TopicConfig topicConfig = new TopicConfig(testTopic + i, 8, 8);
brokerController1.getTopicConfigManager().updateTopicConfig(topicConfig);
brokerController2.getTopicConfigManager().updateTopicConfig(topicConfig);
brokerController3.getTopicConfigManager().updateTopicConfig(topicConfig);
}

brokerController1.registerBrokerAll(false, true, true);
brokerController2.registerBrokerAll(false, true, true);
brokerController3.registerBrokerAll(false, true, true);

for (int i = 0; i < 801; i++) {
TopicRouteData route = MQAdminTestUtils.examineTopicRouteInfo(NAMESRV_ADDR, testTopic + i);
assertThat(route.getBrokerDatas()).hasSize(3);
assertThat(route.getQueueDatas()).hasSize(3);
}

brokerController1.getBrokerConfig().setEnableSplitRegistration(false);
brokerController2.getBrokerConfig().setEnableSplitRegistration(false);
brokerController3.getBrokerConfig().setEnableSplitRegistration(false);
}
}

0 comments on commit 1c01a0f

Please sign in to comment.