Skip to content

Commit

Permalink
[ISSUE #7326] Split the request to register to the nameserver (#7325)
Browse files Browse the repository at this point in the history
Signed-off-by: Ziy1-Tan <[email protected]>
  • Loading branch information
Ziy1-Tan authored Sep 11, 2023
1 parent dad6b4d commit 0dbd077
Show file tree
Hide file tree
Showing 4 changed files with 99 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1765,29 +1765,34 @@ public synchronized void registerIncrementBrokerData(List<TopicConfig> topicConf
}

public synchronized void registerBrokerAll(final boolean checkOrderConfig, boolean oneway, boolean forceRegister) {
ConcurrentMap<String, TopicConfig> topicConfigMap = this.getTopicConfigManager().getTopicConfigTable();
ConcurrentHashMap<String, TopicConfig> topicConfigTable = new ConcurrentHashMap<>();

TopicConfigAndMappingSerializeWrapper topicConfigWrapper = new TopicConfigAndMappingSerializeWrapper();

topicConfigWrapper.setDataVersion(this.getTopicConfigManager().getDataVersion());
topicConfigWrapper.setTopicConfigTable(this.getTopicConfigManager().getTopicConfigTable());

topicConfigWrapper.setTopicQueueMappingInfoMap(this.getTopicQueueMappingManager().getTopicQueueMappingTable().entrySet().stream().map(
entry -> new AbstractMap.SimpleImmutableEntry<>(entry.getKey(), TopicQueueMappingDetail.cloneAsMappingInfo(entry.getValue()))
).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)));

if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission())
|| !PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) {
ConcurrentHashMap<String, TopicConfig> topicConfigTable = new ConcurrentHashMap<>();
for (TopicConfig topicConfig : topicConfigWrapper.getTopicConfigTable().values()) {
TopicConfig tmp =
for (TopicConfig topicConfig : topicConfigMap.values()) {
if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission())
|| !PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) {
topicConfigTable.put(topicConfig.getTopicName(),
new TopicConfig(topicConfig.getTopicName(), topicConfig.getReadQueueNums(), topicConfig.getWriteQueueNums(),
topicConfig.getPerm() & this.brokerConfig.getBrokerPermission(), topicConfig.getTopicSysFlag());
topicConfigTable.put(topicConfig.getTopicName(), tmp);
topicConfig.getPerm() & getBrokerConfig().getBrokerPermission()));
} else {
topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
}

if (this.brokerConfig.isEnableSplitRegistration()
&& topicConfigTable.size() >= this.brokerConfig.getSplitRegistrationSize()) {
TopicConfigAndMappingSerializeWrapper topicConfigWrapper = this.getTopicConfigManager().buildSerializeWrapper(topicConfigTable);
doRegisterBrokerAll(checkOrderConfig, oneway, topicConfigWrapper);
topicConfigTable.clear();
}
topicConfigWrapper.setTopicConfigTable(topicConfigTable);
}

if (forceRegister || needRegister(this.brokerConfig.getBrokerClusterName(),
Map<String, TopicQueueMappingInfo> topicQueueMappingInfoMap = this.getTopicQueueMappingManager().getTopicQueueMappingTable().entrySet().stream()
.map(entry -> new AbstractMap.SimpleImmutableEntry<>(entry.getKey(), TopicQueueMappingDetail.cloneAsMappingInfo(entry.getValue())))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));

TopicConfigAndMappingSerializeWrapper topicConfigWrapper = this.getTopicConfigManager().
buildSerializeWrapper(topicConfigTable, topicQueueMappingInfoMap);
if (this.brokerConfig.isEnableSplitRegistration() || forceRegister || needRegister(this.brokerConfig.getBrokerClusterName(),
this.getBrokerAddr(),
this.brokerConfig.getBrokerName(),
this.brokerConfig.getBrokerId(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

import com.google.common.collect.ImmutableMap;

import com.google.common.collect.Maps;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.BrokerPathConfigHelper;
Expand All @@ -47,7 +48,9 @@
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.remoting.protocol.DataVersion;
import org.apache.rocketmq.remoting.protocol.body.KVTable;
import org.apache.rocketmq.remoting.protocol.body.TopicConfigAndMappingSerializeWrapper;
import org.apache.rocketmq.remoting.protocol.body.TopicConfigSerializeWrapper;
import org.apache.rocketmq.remoting.protocol.statictopic.TopicQueueMappingInfo;

import static com.google.common.base.Preconditions.checkNotNull;

Expand Down Expand Up @@ -609,6 +612,24 @@ public TopicConfigSerializeWrapper buildTopicConfigSerializeWrapper() {
return topicConfigSerializeWrapper;
}

public TopicConfigAndMappingSerializeWrapper buildSerializeWrapper(final ConcurrentMap<String, TopicConfig> topicConfigTable) {
return buildSerializeWrapper(topicConfigTable, Maps.newHashMap());
}

public TopicConfigAndMappingSerializeWrapper buildSerializeWrapper(
final ConcurrentMap<String, TopicConfig> topicConfigTable,
final Map<String, TopicQueueMappingInfo> topicQueueMappingInfoMap
) {
TopicConfigAndMappingSerializeWrapper topicConfigWrapper = new TopicConfigAndMappingSerializeWrapper();
topicConfigWrapper.setTopicConfigTable(topicConfigTable);
topicConfigWrapper.setTopicQueueMappingInfoMap(topicQueueMappingInfoMap);
topicConfigWrapper.setDataVersion(this.getDataVersion());
if (this.brokerController.getBrokerConfig().isEnableSplitRegistration()) {
this.getDataVersion().nextVersion();
}
return topicConfigWrapper;
}

@Override
public String encode() {
return encode(false);
Expand Down
24 changes: 24 additions & 0 deletions common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -396,6 +396,14 @@ public class BrokerConfig extends BrokerIdentity {

private boolean enableMixedMessageType = false;

/**
* This flag and deleteTopicWithBrokerRegistration flag in the NameServer cannot be set to true at the same time,
* otherwise there will be a loss of routing
*/
private boolean enableSplitRegistration = false;

private int splitRegistrationSize = 800;

public long getMaxPopPollingSize() {
return maxPopPollingSize;
}
Expand Down Expand Up @@ -1731,4 +1739,20 @@ public boolean isEnableMixedMessageType() {
public void setEnableMixedMessageType(boolean enableMixedMessageType) {
this.enableMixedMessageType = enableMixedMessageType;
}

public boolean isEnableSplitRegistration() {
return enableSplitRegistration;
}

public void setEnableSplitRegistration(boolean enableSplitRegistration) {
this.enableSplitRegistration = enableSplitRegistration;
}

public int getSplitRegistrationSize() {
return splitRegistrationSize;
}

public void setSplitRegistrationSize(int splitRegistrationSize) {
this.splitRegistrationSize = splitRegistrationSize;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.rocketmq.test.route;

import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.remoting.protocol.route.TopicRouteData;
import org.apache.rocketmq.test.base.BaseConf;
import org.apache.rocketmq.test.util.MQAdminTestUtils;
Expand Down Expand Up @@ -111,4 +112,34 @@ public void testStaticTopicNotAffected() throws Exception {
brokerController3.getBrokerConfig().setEnableSingleTopicRegister(false);
namesrvController.getNamesrvConfig().setDeleteTopicWithBrokerRegistration(false);
}

@Test
public void testCreateOrUpdateTopic_EnableSplitRegistration() {
brokerController1.getBrokerConfig().setEnableSplitRegistration(true);
brokerController2.getBrokerConfig().setEnableSplitRegistration(true);
brokerController3.getBrokerConfig().setEnableSplitRegistration(true);

String testTopic = "test-topic-";

for (int i = 0; i < 1000; i++) {
TopicConfig topicConfig = new TopicConfig(testTopic + i, 8, 8);
brokerController1.getTopicConfigManager().updateTopicConfig(topicConfig);
brokerController2.getTopicConfigManager().updateTopicConfig(topicConfig);
brokerController3.getTopicConfigManager().updateTopicConfig(topicConfig);
}

brokerController1.registerBrokerAll(false, true, true);
brokerController2.registerBrokerAll(false, true, true);
brokerController3.registerBrokerAll(false, true, true);

for (int i = 0; i < 1000; i++) {
TopicRouteData route = MQAdminTestUtils.examineTopicRouteInfo(NAMESRV_ADDR, testTopic + i);
assertThat(route.getBrokerDatas()).hasSize(3);
assertThat(route.getQueueDatas()).hasSize(3);
}

brokerController1.getBrokerConfig().setEnableSplitRegistration(false);
brokerController2.getBrokerConfig().setEnableSplitRegistration(false);
brokerController3.getBrokerConfig().setEnableSplitRegistration(false);
}
}

0 comments on commit 0dbd077

Please sign in to comment.