Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix the problem that the proxy in the cluster mode obtains the wrong address of broker #6909

Merged
merged 2 commits into from
Jun 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package org.apache.rocketmq.proxy.service.route;

import java.util.List;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.proxy.common.Address;
import org.apache.rocketmq.client.impl.mqclient.MQClientAPIFactory;
Expand Down Expand Up @@ -59,11 +58,8 @@ public ProxyTopicRouteData getTopicRouteForProxy(ProxyContext ctx, List<Address>

@Override
public String getBrokerAddr(ProxyContext ctx, String brokerName) throws Exception {
List<BrokerData> brokerDataList = getAllMessageQueueView(ctx, brokerName).getTopicRouteData().getBrokerDatas();
if (brokerDataList.isEmpty()) {
return null;
}
return brokerDataList.get(0).getBrokerAddrs().get(MixAll.MASTER_ID);
TopicRouteWrapper topicRouteWrapper = getAllMessageQueueView(ctx, brokerName).getTopicRouteWrapper();
return topicRouteWrapper.getMasterAddr(brokerName);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ public TopicRouteData getTopicRouteData() {
return topicRouteWrapper.getTopicRouteData();
}

public TopicRouteWrapper getTopicRouteWrapper() {
return topicRouteWrapper;
}

public String getTopicName() {
return topicRouteWrapper.getTopicName();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.LoadingCache;
import com.google.common.net.HostAndPort;

import java.util.HashMap;
import java.util.List;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
Expand All @@ -32,6 +34,9 @@
import org.apache.rocketmq.proxy.common.ProxyContext;
import org.apache.rocketmq.proxy.service.BaseServiceTest;
import org.apache.rocketmq.remoting.protocol.ResponseCode;
import org.apache.rocketmq.remoting.protocol.route.BrokerData;
import org.apache.rocketmq.remoting.protocol.route.QueueData;
import org.apache.rocketmq.remoting.protocol.route.TopicRouteData;
import org.assertj.core.util.Lists;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
Expand All @@ -51,13 +56,46 @@ public class ClusterTopicRouteServiceTest extends BaseServiceTest {

private ClusterTopicRouteService topicRouteService;

protected static final String BROKER2_NAME = "broker2";
protected static final String BROKER2_ADDR = "127.0.0.2:10911";

@Before
public void before() throws Throwable {
super.before();
this.topicRouteService = new ClusterTopicRouteService(this.mqClientAPIFactory);

when(this.mqClientAPIExt.getTopicRouteInfoFromNameServer(eq(TOPIC), anyLong())).thenReturn(topicRouteData);
when(this.mqClientAPIExt.getTopicRouteInfoFromNameServer(eq(ERR_TOPIC), anyLong())).thenThrow(new MQClientException(ResponseCode.TOPIC_NOT_EXIST, ""));

// build broker
BrokerData brokerData = new BrokerData();
brokerData.setCluster(CLUSTER_NAME);
brokerData.setBrokerName(BROKER_NAME);
HashMap<Long, String> brokerAddrs = new HashMap<>();
brokerAddrs.put(MixAll.MASTER_ID, BROKER_ADDR);
brokerData.setBrokerAddrs(brokerAddrs);

// build broker2
BrokerData broke2Data = new BrokerData();
broke2Data.setCluster(CLUSTER_NAME);
broke2Data.setBrokerName(BROKER2_NAME);
HashMap<Long, String> broker2Addrs = new HashMap<>();
broker2Addrs.put(MixAll.MASTER_ID, BROKER2_ADDR);
broke2Data.setBrokerAddrs(broker2Addrs);

// add brokers
TopicRouteData brokerTopicRouteData = new TopicRouteData();
brokerTopicRouteData.setBrokerDatas(Lists.newArrayList(brokerData, broke2Data));

// add queue data
QueueData queueData = new QueueData();
queueData.setBrokerName(BROKER_NAME);

QueueData queue2Data = new QueueData();
queue2Data.setBrokerName(BROKER2_NAME);
brokerTopicRouteData.setQueueDatas(Lists.newArrayList(queueData, queue2Data));
when(this.mqClientAPIExt.getTopicRouteInfoFromNameServer(eq(BROKER_NAME), anyLong())).thenReturn(brokerTopicRouteData);
when(this.mqClientAPIExt.getTopicRouteInfoFromNameServer(eq(BROKER2_NAME), anyLong())).thenReturn(brokerTopicRouteData);
}

@Test
Expand All @@ -71,6 +109,13 @@ public void testGetCurrentMessageQueueView() throws Throwable {
assertEquals(2, this.topicRouteService.topicCache.asMap().size());
}

@Test
public void testGetBrokerAddr() throws Throwable {
ProxyContext ctx = ProxyContext.create();
assertEquals(BROKER_ADDR, topicRouteService.getBrokerAddr(ctx, BROKER_NAME));
assertEquals(BROKER2_ADDR, topicRouteService.getBrokerAddr(ctx, BROKER2_NAME));
}

@Test
public void testGetTopicRouteForProxy() throws Throwable {
ProxyContext ctx = ProxyContext.create();
Expand Down