diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/ClusterTopicRouteService.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/ClusterTopicRouteService.java
index fb97002df7f..84252f8b8e7 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/ClusterTopicRouteService.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/ClusterTopicRouteService.java
@@ -17,7 +17,6 @@
package org.apache.rocketmq.proxy.service.route;
import java.util.List;
-import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.proxy.common.Address;
import org.apache.rocketmq.client.impl.mqclient.MQClientAPIFactory;
@@ -59,11 +58,8 @@ public ProxyTopicRouteData getTopicRouteForProxy(ProxyContext ctx, List
@Override
public String getBrokerAddr(ProxyContext ctx, String brokerName) throws Exception {
- List brokerDataList = getAllMessageQueueView(ctx, brokerName).getTopicRouteData().getBrokerDatas();
- if (brokerDataList.isEmpty()) {
- return null;
- }
- return brokerDataList.get(0).getBrokerAddrs().get(MixAll.MASTER_ID);
+ TopicRouteWrapper topicRouteWrapper = getAllMessageQueueView(ctx, brokerName).getTopicRouteWrapper();
+ return topicRouteWrapper.getMasterAddr(brokerName);
}
@Override
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/MessageQueueView.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/MessageQueueView.java
index b3a6b9e4ba3..fe5387cfd7e 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/MessageQueueView.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/MessageQueueView.java
@@ -37,6 +37,10 @@ public TopicRouteData getTopicRouteData() {
return topicRouteWrapper.getTopicRouteData();
}
+ public TopicRouteWrapper getTopicRouteWrapper() {
+ return topicRouteWrapper;
+ }
+
public String getTopicName() {
return topicRouteWrapper.getTopicName();
}
diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/service/route/ClusterTopicRouteServiceTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/service/route/ClusterTopicRouteServiceTest.java
index b5fc1b6713f..15d83483b9d 100644
--- a/proxy/src/test/java/org/apache/rocketmq/proxy/service/route/ClusterTopicRouteServiceTest.java
+++ b/proxy/src/test/java/org/apache/rocketmq/proxy/service/route/ClusterTopicRouteServiceTest.java
@@ -21,6 +21,8 @@
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.LoadingCache;
import com.google.common.net.HostAndPort;
+
+import java.util.HashMap;
import java.util.List;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@@ -32,6 +34,9 @@
import org.apache.rocketmq.proxy.common.ProxyContext;
import org.apache.rocketmq.proxy.service.BaseServiceTest;
import org.apache.rocketmq.remoting.protocol.ResponseCode;
+import org.apache.rocketmq.remoting.protocol.route.BrokerData;
+import org.apache.rocketmq.remoting.protocol.route.QueueData;
+import org.apache.rocketmq.remoting.protocol.route.TopicRouteData;
import org.assertj.core.util.Lists;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
@@ -51,6 +56,9 @@ public class ClusterTopicRouteServiceTest extends BaseServiceTest {
private ClusterTopicRouteService topicRouteService;
+ protected static final String BROKER2_NAME = "broker2";
+ protected static final String BROKER2_ADDR = "127.0.0.2:10911";
+
@Before
public void before() throws Throwable {
super.before();
@@ -58,6 +66,36 @@ public void before() throws Throwable {
when(this.mqClientAPIExt.getTopicRouteInfoFromNameServer(eq(TOPIC), anyLong())).thenReturn(topicRouteData);
when(this.mqClientAPIExt.getTopicRouteInfoFromNameServer(eq(ERR_TOPIC), anyLong())).thenThrow(new MQClientException(ResponseCode.TOPIC_NOT_EXIST, ""));
+
+ // build broker
+ BrokerData brokerData = new BrokerData();
+ brokerData.setCluster(CLUSTER_NAME);
+ brokerData.setBrokerName(BROKER_NAME);
+ HashMap brokerAddrs = new HashMap<>();
+ brokerAddrs.put(MixAll.MASTER_ID, BROKER_ADDR);
+ brokerData.setBrokerAddrs(brokerAddrs);
+
+ // build broker2
+ BrokerData broke2Data = new BrokerData();
+ broke2Data.setCluster(CLUSTER_NAME);
+ broke2Data.setBrokerName(BROKER2_NAME);
+ HashMap broker2Addrs = new HashMap<>();
+ broker2Addrs.put(MixAll.MASTER_ID, BROKER2_ADDR);
+ broke2Data.setBrokerAddrs(broker2Addrs);
+
+ // add brokers
+ TopicRouteData brokerTopicRouteData = new TopicRouteData();
+ brokerTopicRouteData.setBrokerDatas(Lists.newArrayList(brokerData, broke2Data));
+
+ // add queue data
+ QueueData queueData = new QueueData();
+ queueData.setBrokerName(BROKER_NAME);
+
+ QueueData queue2Data = new QueueData();
+ queue2Data.setBrokerName(BROKER2_NAME);
+ brokerTopicRouteData.setQueueDatas(Lists.newArrayList(queueData, queue2Data));
+ when(this.mqClientAPIExt.getTopicRouteInfoFromNameServer(eq(BROKER_NAME), anyLong())).thenReturn(brokerTopicRouteData);
+ when(this.mqClientAPIExt.getTopicRouteInfoFromNameServer(eq(BROKER2_NAME), anyLong())).thenReturn(brokerTopicRouteData);
}
@Test
@@ -71,6 +109,13 @@ public void testGetCurrentMessageQueueView() throws Throwable {
assertEquals(2, this.topicRouteService.topicCache.asMap().size());
}
+ @Test
+ public void testGetBrokerAddr() throws Throwable {
+ ProxyContext ctx = ProxyContext.create();
+ assertEquals(BROKER_ADDR, topicRouteService.getBrokerAddr(ctx, BROKER_NAME));
+ assertEquals(BROKER2_ADDR, topicRouteService.getBrokerAddr(ctx, BROKER2_NAME));
+ }
+
@Test
public void testGetTopicRouteForProxy() throws Throwable {
ProxyContext ctx = ProxyContext.create();