Skip to content

Commit

Permalink
Fix the problem that the proxy in the cluster mode obtains the wrong …
Browse files Browse the repository at this point in the history
…address of broker (apache#6909)

Fix the problem that the proxy in the cluster mode obtains the wrong address of the broker.
  • Loading branch information
gaoyf authored Jun 16, 2023
1 parent 2246c32 commit 0b76f6f
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package org.apache.rocketmq.proxy.service.route;

import java.util.List;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.proxy.common.Address;
import org.apache.rocketmq.client.impl.mqclient.MQClientAPIFactory;
Expand Down Expand Up @@ -59,11 +58,8 @@ public ProxyTopicRouteData getTopicRouteForProxy(ProxyContext ctx, List<Address>

@Override
public String getBrokerAddr(ProxyContext ctx, String brokerName) throws Exception {
List<BrokerData> brokerDataList = getAllMessageQueueView(ctx, brokerName).getTopicRouteData().getBrokerDatas();
if (brokerDataList.isEmpty()) {
return null;
}
return brokerDataList.get(0).getBrokerAddrs().get(MixAll.MASTER_ID);
TopicRouteWrapper topicRouteWrapper = getAllMessageQueueView(ctx, brokerName).getTopicRouteWrapper();
return topicRouteWrapper.getMasterAddr(brokerName);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ public TopicRouteData getTopicRouteData() {
return topicRouteWrapper.getTopicRouteData();
}

public TopicRouteWrapper getTopicRouteWrapper() {
return topicRouteWrapper;
}

public String getTopicName() {
return topicRouteWrapper.getTopicName();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.LoadingCache;
import com.google.common.net.HostAndPort;

import java.util.HashMap;
import java.util.List;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
Expand All @@ -32,6 +34,9 @@
import org.apache.rocketmq.proxy.common.ProxyContext;
import org.apache.rocketmq.proxy.service.BaseServiceTest;
import org.apache.rocketmq.remoting.protocol.ResponseCode;
import org.apache.rocketmq.remoting.protocol.route.BrokerData;
import org.apache.rocketmq.remoting.protocol.route.QueueData;
import org.apache.rocketmq.remoting.protocol.route.TopicRouteData;
import org.assertj.core.util.Lists;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
Expand All @@ -51,13 +56,46 @@ public class ClusterTopicRouteServiceTest extends BaseServiceTest {

private ClusterTopicRouteService topicRouteService;

protected static final String BROKER2_NAME = "broker2";
protected static final String BROKER2_ADDR = "127.0.0.2:10911";

@Before
public void before() throws Throwable {
super.before();
this.topicRouteService = new ClusterTopicRouteService(this.mqClientAPIFactory);

when(this.mqClientAPIExt.getTopicRouteInfoFromNameServer(eq(TOPIC), anyLong())).thenReturn(topicRouteData);
when(this.mqClientAPIExt.getTopicRouteInfoFromNameServer(eq(ERR_TOPIC), anyLong())).thenThrow(new MQClientException(ResponseCode.TOPIC_NOT_EXIST, ""));

// build broker
BrokerData brokerData = new BrokerData();
brokerData.setCluster(CLUSTER_NAME);
brokerData.setBrokerName(BROKER_NAME);
HashMap<Long, String> brokerAddrs = new HashMap<>();
brokerAddrs.put(MixAll.MASTER_ID, BROKER_ADDR);
brokerData.setBrokerAddrs(brokerAddrs);

// build broker2
BrokerData broke2Data = new BrokerData();
broke2Data.setCluster(CLUSTER_NAME);
broke2Data.setBrokerName(BROKER2_NAME);
HashMap<Long, String> broker2Addrs = new HashMap<>();
broker2Addrs.put(MixAll.MASTER_ID, BROKER2_ADDR);
broke2Data.setBrokerAddrs(broker2Addrs);

// add brokers
TopicRouteData brokerTopicRouteData = new TopicRouteData();
brokerTopicRouteData.setBrokerDatas(Lists.newArrayList(brokerData, broke2Data));

// add queue data
QueueData queueData = new QueueData();
queueData.setBrokerName(BROKER_NAME);

QueueData queue2Data = new QueueData();
queue2Data.setBrokerName(BROKER2_NAME);
brokerTopicRouteData.setQueueDatas(Lists.newArrayList(queueData, queue2Data));
when(this.mqClientAPIExt.getTopicRouteInfoFromNameServer(eq(BROKER_NAME), anyLong())).thenReturn(brokerTopicRouteData);
when(this.mqClientAPIExt.getTopicRouteInfoFromNameServer(eq(BROKER2_NAME), anyLong())).thenReturn(brokerTopicRouteData);
}

@Test
Expand All @@ -71,6 +109,13 @@ public void testGetCurrentMessageQueueView() throws Throwable {
assertEquals(2, this.topicRouteService.topicCache.asMap().size());
}

@Test
public void testGetBrokerAddr() throws Throwable {
ProxyContext ctx = ProxyContext.create();
assertEquals(BROKER_ADDR, topicRouteService.getBrokerAddr(ctx, BROKER_NAME));
assertEquals(BROKER2_ADDR, topicRouteService.getBrokerAddr(ctx, BROKER2_NAME));
}

@Test
public void testGetTopicRouteForProxy() throws Throwable {
ProxyContext ctx = ProxyContext.create();
Expand Down

0 comments on commit 0b76f6f

Please sign in to comment.