Skip to content

Commit

Permalink
add some tests for nameserver (#8349)
Browse files Browse the repository at this point in the history
  • Loading branch information
TanXiang7o authored Jul 18, 2024
1 parent 73d0c33 commit d9d53d5
Show file tree
Hide file tree
Showing 4 changed files with 269 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,14 +56,16 @@ public void doAfterResponse(String remoteAddr, RemotingCommand request, Remoting
return;
}
TopicRouteData topicRouteData = RemotingSerializable.decode(response.getBody(), TopicRouteData.class);

response.setBody(filterByZoneName(topicRouteData, zoneName).encode());
}

private TopicRouteData filterByZoneName(TopicRouteData topicRouteData, String zoneName) {
List<BrokerData> brokerDataReserved = new ArrayList<>();
Map<String, BrokerData> brokerDataRemoved = new HashMap<>();
for (BrokerData brokerData : topicRouteData.getBrokerDatas()) {
if (brokerData.getBrokerAddrs() == null) {
continue;
}
//master down, consume from slave. break nearby route rule.
if (brokerData.getBrokerAddrs().get(MixAll.MASTER_ID) == null
|| StringUtils.equalsIgnoreCase(brokerData.getZoneName(), zoneName)) {
Expand All @@ -85,9 +87,6 @@ private TopicRouteData filterByZoneName(TopicRouteData topicRouteData, String zo
if (topicRouteData.getFilterServerTable() != null && !topicRouteData.getFilterServerTable().isEmpty()) {
for (Entry<String, BrokerData> entry : brokerDataRemoved.entrySet()) {
BrokerData brokerData = entry.getValue();
if (brokerData.getBrokerAddrs() == null) {
continue;
}
brokerData.getBrokerAddrs().values()
.forEach(brokerAddr -> topicRouteData.getFilterServerTable().remove(brokerAddr));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -448,6 +448,42 @@ public void testGetBrokerClusterInfo() throws RemotingCommandException {
assertThat(remotingCommand.getCode()).isEqualTo(ResponseCode.SUCCESS);
}

@Test
public void testQueryDataVersion()throws RemotingCommandException {
ChannelHandlerContext ctx = mock(ChannelHandlerContext.class);
when(ctx.channel()).thenReturn(null);
RemotingCommand request = getRemotingCommand(RequestCode.QUERY_DATA_VERSION);
RemotingCommand remotingCommand = defaultRequestProcessor.processRequest(ctx, request);
assertThat(remotingCommand.getCode()).isEqualTo(ResponseCode.SUCCESS);
}

@Test
public void testGetBrokerMemberBroker() throws RemotingCommandException {
ChannelHandlerContext ctx = mock(ChannelHandlerContext.class);
when(ctx.channel()).thenReturn(null);
RemotingCommand request = getRemotingCommand(RequestCode.GET_BROKER_MEMBER_GROUP);
RemotingCommand remotingCommand = defaultRequestProcessor.processRequest(ctx, request);
assertThat(remotingCommand.getCode()).isEqualTo(ResponseCode.SUCCESS);
}

@Test
public void testBrokerHeartBeat() throws RemotingCommandException {
ChannelHandlerContext ctx = mock(ChannelHandlerContext.class);
when(ctx.channel()).thenReturn(null);
RemotingCommand request = getRemotingCommand(RequestCode.BROKER_HEARTBEAT);
RemotingCommand remotingCommand = defaultRequestProcessor.processRequest(ctx, request);
assertThat(remotingCommand.getCode()).isEqualTo(ResponseCode.SUCCESS);
}

@Test
public void testAddWritePermOfBroker() throws RemotingCommandException {
ChannelHandlerContext ctx = mock(ChannelHandlerContext.class);
when(ctx.channel()).thenReturn(null);
RemotingCommand request = getRemotingCommand(RequestCode.ADD_WRITE_PERM_OF_BROKER);
RemotingCommand remotingCommand = defaultRequestProcessor.processRequest(ctx, request);
assertThat(remotingCommand.getCode()).isEqualTo(ResponseCode.SUCCESS);
}

@Test
public void testWipeWritePermOfBroker() throws RemotingCommandException {
ChannelHandlerContext ctx = mock(ChannelHandlerContext.class);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.namesrv.route;

import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
import org.apache.rocketmq.remoting.protocol.ResponseCode;
import org.apache.rocketmq.remoting.protocol.route.BrokerData;
import org.apache.rocketmq.remoting.protocol.route.QueueData;
import org.apache.rocketmq.remoting.protocol.route.TopicRouteData;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;


public class ZoneRouteRPCHookTest {

private ZoneRouteRPCHook zoneRouteRPCHook;

@Before
public void setup() {
zoneRouteRPCHook = new ZoneRouteRPCHook();
}

@Test
public void testDoAfterResponseWithNoZoneMode() {
RemotingCommand request1 = RemotingCommand.createRequestCommand(106,null);
zoneRouteRPCHook.doAfterResponse("", request1, null);

HashMap<String, String> extFields = new HashMap<>();
extFields.put(MixAll.ZONE_MODE, "false");
RemotingCommand request = RemotingCommand.createRequestCommand(105,null);
request.setExtFields(extFields);
RemotingCommand response = RemotingCommand.createResponseCommand(null);
response.setCode(ResponseCode.SUCCESS);
response.setBody(RemotingSerializable.encode(createSampleTopicRouteData()));
zoneRouteRPCHook.doAfterResponse("", request, response);
}

@Test
public void testDoAfterResponseWithNoZoneName() {
HashMap<String, String> extFields = new HashMap<>();
extFields.put(MixAll.ZONE_MODE, "true");
RemotingCommand request = RemotingCommand.createRequestCommand(105,null);
request.setExtFields(extFields);
RemotingCommand response = RemotingCommand.createResponseCommand(null);
response.setCode(ResponseCode.SUCCESS);
response.setBody(RemotingSerializable.encode(createSampleTopicRouteData()));
zoneRouteRPCHook.doAfterResponse("", request, response);
}

@Test
public void testDoAfterResponseWithNoResponse() {
HashMap<String, String> extFields = new HashMap<>();
extFields.put(MixAll.ZONE_MODE, "true");
RemotingCommand request = RemotingCommand.createRequestCommand(105,null);
request.setExtFields(extFields);
zoneRouteRPCHook.doAfterResponse("", request, null);

RemotingCommand response = RemotingCommand.createResponseCommand(null);
response.setCode(ResponseCode.SUCCESS);
zoneRouteRPCHook.doAfterResponse("", request, response);

response.setBody(RemotingSerializable.encode(createSampleTopicRouteData()));
response.setCode(ResponseCode.NO_PERMISSION);
zoneRouteRPCHook.doAfterResponse("", request, response);
}


@Test
public void testDoAfterResponseWithValidZoneFiltering() throws Exception {
HashMap<String, String> extFields = new HashMap<>();
extFields.put(MixAll.ZONE_MODE, "true");
extFields.put(MixAll.ZONE_NAME,"zone1");
RemotingCommand request = RemotingCommand.createRequestCommand(105,null);
request.setExtFields(extFields);
RemotingCommand response = RemotingCommand.createResponseCommand(null);
response.setCode(ResponseCode.SUCCESS);
TopicRouteData topicRouteData = createSampleTopicRouteData();
response.setBody(RemotingSerializable.encode(topicRouteData));
zoneRouteRPCHook.doAfterResponse("", request, response);

HashMap<Long,String> brokeraddrs = new HashMap<>();
brokeraddrs.put(MixAll.MASTER_ID,"127.0.0.1:10911");
topicRouteData.getBrokerDatas().get(0).setBrokerAddrs(brokeraddrs);
response.setBody(RemotingSerializable.encode(topicRouteData));
zoneRouteRPCHook.doAfterResponse("", request, response);

topicRouteData.getQueueDatas().add(createQueueData("BrokerB"));
HashMap<Long,String> brokeraddrsB = new HashMap<>();
brokeraddrsB.put(MixAll.MASTER_ID,"127.0.0.1:10912");
BrokerData brokerData1 = createBrokerData("BrokerB","zone2",brokeraddrsB);
BrokerData brokerData2 = createBrokerData("BrokerC","zone1",null);
topicRouteData.getBrokerDatas().add(brokerData1);
topicRouteData.getBrokerDatas().add(brokerData2);
response.setBody(RemotingSerializable.encode(topicRouteData));
zoneRouteRPCHook.doAfterResponse("", request, response);

topicRouteData.getFilterServerTable().put("127.0.0.1:10911",new ArrayList<>());
response.setBody(RemotingSerializable.encode(topicRouteData));
zoneRouteRPCHook.doAfterResponse("", request, response);
Assert.assertEquals(1,RemotingSerializable
.decode(response.getBody(), TopicRouteData.class)
.getFilterServerTable()
.size());

topicRouteData.getFilterServerTable().put("127.0.0.1:10912",new ArrayList<>());
response.setBody(RemotingSerializable.encode(topicRouteData));
zoneRouteRPCHook.doAfterResponse("", request, response);
Assert.assertEquals(1,RemotingSerializable
.decode(response.getBody(), TopicRouteData.class)
.getFilterServerTable()
.size());
}

private TopicRouteData createSampleTopicRouteData() {
TopicRouteData topicRouteData = new TopicRouteData();
List<BrokerData> brokerDatas = new ArrayList<>();
BrokerData brokerData = createBrokerData("BrokerA","zone1",new HashMap<>());
List<QueueData> queueDatas = new ArrayList<>();
QueueData queueData = createQueueData("BrokerA");
queueDatas.add(queueData);
brokerDatas.add(brokerData);
topicRouteData.setBrokerDatas(brokerDatas);
topicRouteData.setQueueDatas(queueDatas);
return topicRouteData;
}

private BrokerData createBrokerData(String brokerName,String zoneName,HashMap<Long,String> brokerAddrs) {
BrokerData brokerData = new BrokerData();
brokerData.setBrokerName(brokerName);
brokerData.setZoneName(zoneName);
brokerData.setBrokerAddrs(brokerAddrs);
return brokerData;
}

private QueueData createQueueData(String brokerName) {
QueueData queueData = new QueueData();
queueData.setBrokerName(brokerName);
queueData.setReadQueueNums(8);
queueData.setWriteQueueNums(8);
queueData.setPerm(6);
return queueData;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -416,9 +416,12 @@ public void pickupTopicRouteDataWithSlave() {
}

@Test
public void scanNotActiveBroker() {
public void scanNotActiveBroker() throws InterruptedException {
registerBrokerWithNormalTopic(BrokerBasicInfo.defaultBroker(), "TestTopic");
routeInfoManager.scanNotActiveBroker();
registerBrokerWithNormalTopicAndExpire(BrokerBasicInfo.defaultBroker(),"TestTopic");
Thread.sleep(30000);
routeInfoManager.scanNotActiveBroker();
}

@Test
Expand Down Expand Up @@ -589,6 +592,16 @@ public void onChannelDestroy() {
assertThat(routeInfoManager.pickupTopicRouteData("TestTopic")).isNull();
}

@Test
public void onChannelDestroyByBrokerInfo() {
registerBroker(BrokerBasicInfo.defaultBroker(), mock(Channel.class), null, "TestTopic", "TestTopic1");
BrokerAddrInfo brokerAddrInfo = new BrokerAddrInfo(DEFAULT_CLUSTER, DEFAULT_ADDR);
routeInfoManager.onChannelDestroy(brokerAddrInfo);
await().atMost(Duration.ofSeconds(5)).until(() -> routeInfoManager.blockedUnRegisterRequests() == 0);
assertThat(routeInfoManager.pickupTopicRouteData("TestTopic")).isNull();
assertThat(routeInfoManager.pickupTopicRouteData("TestTopic1")).isNull();
}

@Test
public void switchBrokerRole_ChannelDestroy() {
final BrokerBasicInfo masterBroker = BrokerBasicInfo.defaultBroker().enableActingMaster(false);
Expand Down Expand Up @@ -728,6 +741,23 @@ private RegisterBrokerResult registerBrokerWithNormalTopic(BrokerBasicInfo broke
return registerBroker(brokerInfo, mock(Channel.class), topicConfigConcurrentHashMap, topics);
}

private RegisterBrokerResult registerBrokerWithNormalTopicAndExpire(BrokerBasicInfo brokerInfo, String... topics) {
ConcurrentHashMap<String, TopicConfig> topicConfigConcurrentHashMap = new ConcurrentHashMap<>();
TopicConfig baseTopic = new TopicConfig("baseTopic");
topicConfigConcurrentHashMap.put(baseTopic.getTopicName(), baseTopic);
for (final String topic : topics) {
TopicConfig topicConfig = new TopicConfig();
topicConfig.setWriteQueueNums(8);
topicConfig.setTopicName(topic);
topicConfig.setPerm(6);
topicConfig.setReadQueueNums(8);
topicConfig.setOrder(false);
topicConfigConcurrentHashMap.put(topic, topicConfig);
}

return registerBrokerWithExpiredTime(brokerInfo, mock(Channel.class), topicConfigConcurrentHashMap, topics);
}

private RegisterBrokerResult registerBrokerWithOrderTopic(BrokerBasicInfo brokerBasicInfo, String... topics) {
ConcurrentHashMap<String, TopicConfig> topicConfigConcurrentHashMap = new ConcurrentHashMap<>();

Expand Down Expand Up @@ -785,7 +815,7 @@ private RegisterBrokerResult registerBroker(BrokerBasicInfo brokerInfo, Channel
topicConfigSerializeWrapper.setDataVersion(brokerInfo.dataVersion);
topicConfigSerializeWrapper.setTopicConfigTable(topicConfigConcurrentHashMap);

RegisterBrokerResult registerBrokerResult = routeInfoManager.registerBroker(
return routeInfoManager.registerBroker(
brokerInfo.clusterName,
brokerInfo.brokerAddr,
brokerInfo.brokerName,
Expand All @@ -795,7 +825,40 @@ private RegisterBrokerResult registerBroker(BrokerBasicInfo brokerInfo, Channel
null,
brokerInfo.enableActingMaster,
topicConfigSerializeWrapper, new ArrayList<>(), channel);
return registerBrokerResult;
}

private RegisterBrokerResult registerBrokerWithExpiredTime(BrokerBasicInfo brokerInfo, Channel channel,
ConcurrentMap<String, TopicConfig> topicConfigConcurrentHashMap, String... topics) {

if (topicConfigConcurrentHashMap == null) {
topicConfigConcurrentHashMap = new ConcurrentHashMap<>();
TopicConfig baseTopic = new TopicConfig("baseTopic");
topicConfigConcurrentHashMap.put(baseTopic.getTopicName(), baseTopic);
for (final String topic : topics) {
TopicConfig topicConfig = new TopicConfig();
topicConfig.setWriteQueueNums(8);
topicConfig.setTopicName(topic);
topicConfig.setPerm(6);
topicConfig.setReadQueueNums(8);
topicConfig.setOrder(false);
topicConfigConcurrentHashMap.put(topic, topicConfig);
}
}

TopicConfigSerializeWrapper topicConfigSerializeWrapper = new TopicConfigSerializeWrapper();
topicConfigSerializeWrapper.setDataVersion(brokerInfo.dataVersion);
topicConfigSerializeWrapper.setTopicConfigTable(topicConfigConcurrentHashMap);

return routeInfoManager.registerBroker(
brokerInfo.clusterName,
brokerInfo.brokerAddr,
brokerInfo.brokerName,
brokerInfo.brokerId,
brokerInfo.haAddr,
"",
30000L,
brokerInfo.enableActingMaster,
topicConfigSerializeWrapper, new ArrayList<>(), channel);
}

private void registerSingleTopicWithBrokerName(String brokerName, String... topics) {
Expand Down

0 comments on commit d9d53d5

Please sign in to comment.