Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #7326] Split the request to register to the nameserver #7325

Merged
merged 1 commit into from
Sep 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1766,29 +1766,34 @@ public synchronized void registerIncrementBrokerData(List<TopicConfig> topicConf
}

public synchronized void registerBrokerAll(final boolean checkOrderConfig, boolean oneway, boolean forceRegister) {
ConcurrentMap<String, TopicConfig> topicConfigMap = this.getTopicConfigManager().getTopicConfigTable();
ConcurrentHashMap<String, TopicConfig> topicConfigTable = new ConcurrentHashMap<>();

TopicConfigAndMappingSerializeWrapper topicConfigWrapper = new TopicConfigAndMappingSerializeWrapper();

topicConfigWrapper.setDataVersion(this.getTopicConfigManager().getDataVersion());
topicConfigWrapper.setTopicConfigTable(this.getTopicConfigManager().getTopicConfigTable());

topicConfigWrapper.setTopicQueueMappingInfoMap(this.getTopicQueueMappingManager().getTopicQueueMappingTable().entrySet().stream().map(
entry -> new AbstractMap.SimpleImmutableEntry<>(entry.getKey(), TopicQueueMappingDetail.cloneAsMappingInfo(entry.getValue()))
).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)));

if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission())
|| !PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) {
ConcurrentHashMap<String, TopicConfig> topicConfigTable = new ConcurrentHashMap<>();
for (TopicConfig topicConfig : topicConfigWrapper.getTopicConfigTable().values()) {
TopicConfig tmp =
for (TopicConfig topicConfig : topicConfigMap.values()) {
if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission())
|| !PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) {
topicConfigTable.put(topicConfig.getTopicName(),
new TopicConfig(topicConfig.getTopicName(), topicConfig.getReadQueueNums(), topicConfig.getWriteQueueNums(),
topicConfig.getPerm() & this.brokerConfig.getBrokerPermission(), topicConfig.getTopicSysFlag());
topicConfigTable.put(topicConfig.getTopicName(), tmp);
topicConfig.getPerm() & getBrokerConfig().getBrokerPermission()));
} else {
topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
}

if (this.brokerConfig.isEnableSplitRegistration()
&& topicConfigTable.size() >= this.brokerConfig.getSplitRegistrationSize()) {
TopicConfigAndMappingSerializeWrapper topicConfigWrapper = this.getTopicConfigManager().buildSerializeWrapper(topicConfigTable);
doRegisterBrokerAll(checkOrderConfig, oneway, topicConfigWrapper);
topicConfigTable.clear();
}
topicConfigWrapper.setTopicConfigTable(topicConfigTable);
}

if (forceRegister || needRegister(this.brokerConfig.getBrokerClusterName(),
Map<String, TopicQueueMappingInfo> topicQueueMappingInfoMap = this.getTopicQueueMappingManager().getTopicQueueMappingTable().entrySet().stream()
.map(entry -> new AbstractMap.SimpleImmutableEntry<>(entry.getKey(), TopicQueueMappingDetail.cloneAsMappingInfo(entry.getValue())))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));

TopicConfigAndMappingSerializeWrapper topicConfigWrapper = this.getTopicConfigManager().
buildSerializeWrapper(topicConfigTable, topicQueueMappingInfoMap);
if (this.brokerConfig.isEnableSplitRegistration() || forceRegister || needRegister(this.brokerConfig.getBrokerClusterName(),
this.getBrokerAddr(),
this.brokerConfig.getBrokerName(),
this.brokerConfig.getBrokerId(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

import com.google.common.collect.ImmutableMap;

import com.google.common.collect.Maps;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.BrokerPathConfigHelper;
Expand All @@ -47,7 +48,9 @@
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.remoting.protocol.DataVersion;
import org.apache.rocketmq.remoting.protocol.body.KVTable;
import org.apache.rocketmq.remoting.protocol.body.TopicConfigAndMappingSerializeWrapper;
import org.apache.rocketmq.remoting.protocol.body.TopicConfigSerializeWrapper;
import org.apache.rocketmq.remoting.protocol.statictopic.TopicQueueMappingInfo;

import static com.google.common.base.Preconditions.checkNotNull;

Expand Down Expand Up @@ -589,6 +592,24 @@ public TopicConfigSerializeWrapper buildTopicConfigSerializeWrapper() {
return topicConfigSerializeWrapper;
}

public TopicConfigAndMappingSerializeWrapper buildSerializeWrapper(final ConcurrentMap<String, TopicConfig> topicConfigTable) {
return buildSerializeWrapper(topicConfigTable, Maps.newHashMap());
}

public TopicConfigAndMappingSerializeWrapper buildSerializeWrapper(
final ConcurrentMap<String, TopicConfig> topicConfigTable,
final Map<String, TopicQueueMappingInfo> topicQueueMappingInfoMap
) {
TopicConfigAndMappingSerializeWrapper topicConfigWrapper = new TopicConfigAndMappingSerializeWrapper();
topicConfigWrapper.setTopicConfigTable(topicConfigTable);
topicConfigWrapper.setTopicQueueMappingInfoMap(topicQueueMappingInfoMap);
topicConfigWrapper.setDataVersion(this.getDataVersion());
if (this.brokerController.getBrokerConfig().isEnableSplitRegistration()) {
this.getDataVersion().nextVersion();
}
return topicConfigWrapper;
}

@Override
public String encode() {
return encode(false);
Expand Down
24 changes: 24 additions & 0 deletions common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -396,6 +396,14 @@ public class BrokerConfig extends BrokerIdentity {

private boolean enableMixedMessageType = false;

/**
* This flag and deleteTopicWithBrokerRegistration flag in the NameServer cannot be set to true at the same time,
* otherwise there will be a loss of routing
*/
private boolean enableSplitRegistration = false;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a reminder to the users that this switch cannot be used together with deleteTopicWithBrokerRegistration, otherwise there will be a loss of routing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.


private int splitRegistrationSize = 800;

public long getMaxPopPollingSize() {
return maxPopPollingSize;
}
Expand Down Expand Up @@ -1731,4 +1739,20 @@ public boolean isEnableMixedMessageType() {
public void setEnableMixedMessageType(boolean enableMixedMessageType) {
this.enableMixedMessageType = enableMixedMessageType;
}

public boolean isEnableSplitRegistration() {
return enableSplitRegistration;
}

public void setEnableSplitRegistration(boolean enableSplitRegistration) {
this.enableSplitRegistration = enableSplitRegistration;
}

public int getSplitRegistrationSize() {
return splitRegistrationSize;
}

public void setSplitRegistrationSize(int splitRegistrationSize) {
this.splitRegistrationSize = splitRegistrationSize;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.rocketmq.test.route;

import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.remoting.protocol.route.TopicRouteData;
import org.apache.rocketmq.test.base.BaseConf;
import org.apache.rocketmq.test.util.MQAdminTestUtils;
Expand Down Expand Up @@ -111,4 +112,34 @@ public void testStaticTopicNotAffected() throws Exception {
brokerController3.getBrokerConfig().setEnableSingleTopicRegister(false);
namesrvController.getNamesrvConfig().setDeleteTopicWithBrokerRegistration(false);
}

@Test
public void testCreateOrUpdateTopic_EnableSplitRegistration() {
brokerController1.getBrokerConfig().setEnableSplitRegistration(true);
brokerController2.getBrokerConfig().setEnableSplitRegistration(true);
brokerController3.getBrokerConfig().setEnableSplitRegistration(true);

String testTopic = "test-topic-";

for (int i = 0; i < 1000; i++) {
TopicConfig topicConfig = new TopicConfig(testTopic + i, 8, 8);
brokerController1.getTopicConfigManager().updateTopicConfig(topicConfig);
brokerController2.getTopicConfigManager().updateTopicConfig(topicConfig);
brokerController3.getTopicConfigManager().updateTopicConfig(topicConfig);
}

brokerController1.registerBrokerAll(false, true, true);
brokerController2.registerBrokerAll(false, true, true);
brokerController3.registerBrokerAll(false, true, true);

for (int i = 0; i < 1000; i++) {
TopicRouteData route = MQAdminTestUtils.examineTopicRouteInfo(NAMESRV_ADDR, testTopic + i);
assertThat(route.getBrokerDatas()).hasSize(3);
assertThat(route.getQueueDatas()).hasSize(3);
}

brokerController1.getBrokerConfig().setEnableSplitRegistration(false);
brokerController2.getBrokerConfig().setEnableSplitRegistration(false);
brokerController3.getBrokerConfig().setEnableSplitRegistration(false);
}
}
Loading