From dbee4fd414b3576ccd586ae7c03c4db388dce59a Mon Sep 17 00:00:00 2001 From: yongfeigao Date: Thu, 15 Jun 2023 14:30:01 +0800 Subject: [PATCH 1/2] Fix the problem that the proxy in the cluster mode obtains the wrong address of the broker. --- .../route/ClusterTopicRouteService.java | 7 +-- .../proxy/service/route/MessageQueueView.java | 4 ++ .../route/ClusterTopicRouteServiceTest.java | 45 +++++++++++++++++++ 3 files changed, 51 insertions(+), 5 deletions(-) diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/ClusterTopicRouteService.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/ClusterTopicRouteService.java index fb97002df7f..ceaaaac3d0e 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/ClusterTopicRouteService.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/ClusterTopicRouteService.java @@ -59,11 +59,8 @@ public ProxyTopicRouteData getTopicRouteForProxy(ProxyContext ctx, List
@Override public String getBrokerAddr(ProxyContext ctx, String brokerName) throws Exception { - List brokerDataList = getAllMessageQueueView(ctx, brokerName).getTopicRouteData().getBrokerDatas(); - if (brokerDataList.isEmpty()) { - return null; - } - return brokerDataList.get(0).getBrokerAddrs().get(MixAll.MASTER_ID); + TopicRouteWrapper topicRouteWrapper = getAllMessageQueueView(ctx, brokerName).getTopicRouteWrapper(); + return topicRouteWrapper.getMasterAddr(brokerName); } @Override diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/MessageQueueView.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/MessageQueueView.java index b3a6b9e4ba3..fe5387cfd7e 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/MessageQueueView.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/MessageQueueView.java @@ -37,6 +37,10 @@ public TopicRouteData getTopicRouteData() { return topicRouteWrapper.getTopicRouteData(); } + public TopicRouteWrapper getTopicRouteWrapper() { + return topicRouteWrapper; + } + public String getTopicName() { return topicRouteWrapper.getTopicName(); } diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/service/route/ClusterTopicRouteServiceTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/service/route/ClusterTopicRouteServiceTest.java index b5fc1b6713f..15d83483b9d 100644 --- a/proxy/src/test/java/org/apache/rocketmq/proxy/service/route/ClusterTopicRouteServiceTest.java +++ b/proxy/src/test/java/org/apache/rocketmq/proxy/service/route/ClusterTopicRouteServiceTest.java @@ -21,6 +21,8 @@ import com.github.benmanes.caffeine.cache.Caffeine; import com.github.benmanes.caffeine.cache.LoadingCache; import com.google.common.net.HostAndPort; + +import java.util.HashMap; import java.util.List; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -32,6 +34,9 @@ import org.apache.rocketmq.proxy.common.ProxyContext; import org.apache.rocketmq.proxy.service.BaseServiceTest; import org.apache.rocketmq.remoting.protocol.ResponseCode; +import org.apache.rocketmq.remoting.protocol.route.BrokerData; +import org.apache.rocketmq.remoting.protocol.route.QueueData; +import org.apache.rocketmq.remoting.protocol.route.TopicRouteData; import org.assertj.core.util.Lists; import org.checkerframework.checker.nullness.qual.NonNull; import org.checkerframework.checker.nullness.qual.Nullable; @@ -51,6 +56,9 @@ public class ClusterTopicRouteServiceTest extends BaseServiceTest { private ClusterTopicRouteService topicRouteService; + protected static final String BROKER2_NAME = "broker2"; + protected static final String BROKER2_ADDR = "127.0.0.2:10911"; + @Before public void before() throws Throwable { super.before(); @@ -58,6 +66,36 @@ public void before() throws Throwable { when(this.mqClientAPIExt.getTopicRouteInfoFromNameServer(eq(TOPIC), anyLong())).thenReturn(topicRouteData); when(this.mqClientAPIExt.getTopicRouteInfoFromNameServer(eq(ERR_TOPIC), anyLong())).thenThrow(new MQClientException(ResponseCode.TOPIC_NOT_EXIST, "")); + + // build broker + BrokerData brokerData = new BrokerData(); + brokerData.setCluster(CLUSTER_NAME); + brokerData.setBrokerName(BROKER_NAME); + HashMap brokerAddrs = new HashMap<>(); + brokerAddrs.put(MixAll.MASTER_ID, BROKER_ADDR); + brokerData.setBrokerAddrs(brokerAddrs); + + // build broker2 + BrokerData broke2Data = new BrokerData(); + broke2Data.setCluster(CLUSTER_NAME); + broke2Data.setBrokerName(BROKER2_NAME); + HashMap broker2Addrs = new HashMap<>(); + broker2Addrs.put(MixAll.MASTER_ID, BROKER2_ADDR); + broke2Data.setBrokerAddrs(broker2Addrs); + + // add brokers + TopicRouteData brokerTopicRouteData = new TopicRouteData(); + brokerTopicRouteData.setBrokerDatas(Lists.newArrayList(brokerData, broke2Data)); + + // add queue data + QueueData queueData = new QueueData(); + queueData.setBrokerName(BROKER_NAME); + + QueueData queue2Data = new QueueData(); + queue2Data.setBrokerName(BROKER2_NAME); + brokerTopicRouteData.setQueueDatas(Lists.newArrayList(queueData, queue2Data)); + when(this.mqClientAPIExt.getTopicRouteInfoFromNameServer(eq(BROKER_NAME), anyLong())).thenReturn(brokerTopicRouteData); + when(this.mqClientAPIExt.getTopicRouteInfoFromNameServer(eq(BROKER2_NAME), anyLong())).thenReturn(brokerTopicRouteData); } @Test @@ -71,6 +109,13 @@ public void testGetCurrentMessageQueueView() throws Throwable { assertEquals(2, this.topicRouteService.topicCache.asMap().size()); } + @Test + public void testGetBrokerAddr() throws Throwable { + ProxyContext ctx = ProxyContext.create(); + assertEquals(BROKER_ADDR, topicRouteService.getBrokerAddr(ctx, BROKER_NAME)); + assertEquals(BROKER2_ADDR, topicRouteService.getBrokerAddr(ctx, BROKER2_NAME)); + } + @Test public void testGetTopicRouteForProxy() throws Throwable { ProxyContext ctx = ProxyContext.create(); From ac5ad228656287e6b52e06ae6788145ef0fe27be Mon Sep 17 00:00:00 2001 From: yongfeigao Date: Thu, 15 Jun 2023 17:32:46 +0800 Subject: [PATCH 2/2] remove the unused imports --- .../rocketmq/proxy/service/route/ClusterTopicRouteService.java | 1 - 1 file changed, 1 deletion(-) diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/ClusterTopicRouteService.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/ClusterTopicRouteService.java index ceaaaac3d0e..84252f8b8e7 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/ClusterTopicRouteService.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/ClusterTopicRouteService.java @@ -17,7 +17,6 @@ package org.apache.rocketmq.proxy.service.route; import java.util.List; -import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.proxy.common.Address; import org.apache.rocketmq.client.impl.mqclient.MQClientAPIFactory;