Skip to content

Commit

Permalink
[ISSUE #6828] return the number of assignments equal to the number of…
Browse files Browse the repository at this point in the history
… messageQueues for order consumer (#6829)
  • Loading branch information
xdkxlk authored May 30, 2023
1 parent f4439c9 commit 40ada80
Show file tree
Hide file tree
Showing 5 changed files with 88 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.apache.rocketmq.proxy.processor.MessagingProcessor;
import org.apache.rocketmq.proxy.service.route.ProxyTopicRouteData;
import org.apache.rocketmq.remoting.protocol.route.QueueData;
import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;

public class RouteActivity extends AbstractMessingActivity {

Expand Down Expand Up @@ -106,23 +107,44 @@ public CompletableFuture<QueryAssignmentResponse> queryAssignment(ProxyContext c
addressList,
GrpcConverter.getInstance().wrapResourceWithNamespace(request.getTopic()));

boolean fifo = false;
SubscriptionGroupConfig config = this.messagingProcessor.getSubscriptionGroupConfig(ctx,
GrpcConverter.getInstance().wrapResourceWithNamespace(request.getGroup()));
if (config != null && config.isConsumeMessageOrderly()) {
fifo = true;
}

List<Assignment> assignments = new ArrayList<>();
Map<String, Map<Long, Broker>> brokerMap = buildBrokerMap(proxyTopicRouteData.getBrokerDatas());
for (QueueData queueData : proxyTopicRouteData.getQueueDatas()) {
if (PermName.isReadable(queueData.getPerm()) && queueData.getReadQueueNums() > 0) {
Map<Long, Broker> brokerIdMap = brokerMap.get(queueData.getBrokerName());
if (brokerIdMap != null) {
Broker broker = brokerIdMap.get(MixAll.MASTER_ID);
MessageQueue defaultMessageQueue = MessageQueue.newBuilder()
.setTopic(request.getTopic())
.setId(-1)
.setPermission(this.convertToPermission(queueData.getPerm()))
.setBroker(broker)
.build();

assignments.add(Assignment.newBuilder()
.setMessageQueue(defaultMessageQueue)
.build());
Permission permission = this.convertToPermission(queueData.getPerm());
if (fifo) {
for (int i = 0; i < queueData.getReadQueueNums(); i++) {
MessageQueue defaultMessageQueue = MessageQueue.newBuilder()
.setTopic(request.getTopic())
.setId(i)
.setPermission(permission)
.setBroker(broker)
.build();
assignments.add(Assignment.newBuilder()
.setMessageQueue(defaultMessageQueue)
.build());
}
} else {
MessageQueue defaultMessageQueue = MessageQueue.newBuilder()
.setTopic(request.getTopic())
.setId(-1)
.setPermission(permission)
.setBroker(broker)
.build();
assignments.add(Assignment.newBuilder()
.setMessageQueue(defaultMessageQueue)
.build());
}

}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.apache.rocketmq.proxy.service.route.ProxyTopicRouteData;
import org.apache.rocketmq.remoting.protocol.ResponseCode;
import org.apache.rocketmq.remoting.protocol.route.QueueData;
import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
Expand Down Expand Up @@ -191,6 +192,28 @@ public void testQueryAssignment() throws Throwable {
assertEquals(grpcEndpoints, response.getAssignments(0).getMessageQueue().getBroker().getEndpoints());
}

@Test
public void testQueryFifoAssignment() throws Throwable {
when(this.messagingProcessor.getTopicRouteDataForProxy(any(), any(), anyString()))
.thenReturn(createProxyTopicRouteData(2, 2, 6));
SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig();
subscriptionGroupConfig.setConsumeMessageOrderly(true);
when(this.messagingProcessor.getSubscriptionGroupConfig(any(), anyString())).thenReturn(subscriptionGroupConfig);

QueryAssignmentResponse response = this.routeActivity.queryAssignment(
createContext(),
QueryAssignmentRequest.newBuilder()
.setEndpoints(grpcEndpoints)
.setTopic(GRPC_TOPIC)
.setGroup(GRPC_GROUP)
.build()
).get();

assertEquals(Code.OK, response.getStatus().getCode());
assertEquals(2, response.getAssignmentsCount());
assertEquals(grpcEndpoints, response.getAssignments(0).getMessageQueue().getBroker().getEndpoints());
}

private static ProxyTopicRouteData createProxyTopicRouteData(int r, int w, int p) {
ProxyTopicRouteData proxyTopicRouteData = new ProxyTopicRouteData();
proxyTopicRouteData.getQueueDatas().add(createQueueData(r, w, p));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.rocketmq.test.grpc.v2;

import apache.rocketmq.v2.QueryAssignmentResponse;
import apache.rocketmq.v2.QueryRouteResponse;
import java.time.Duration;
import java.util.Map;
Expand Down Expand Up @@ -74,12 +73,12 @@ public void testQueryRoute() throws Exception {

@Test
public void testQueryAssignment() throws Exception {
String topic = initTopic();
String group = "group";

QueryAssignmentResponse response = blockingStub.queryAssignment(buildQueryAssignmentRequest(topic, group));
super.testQueryAssignment();
}

assertQueryAssignment(response, BROKER_NUM);
@Test
public void testQueryFifoAssignment() throws Exception {
super.testQueryFifoAssignment();
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,29 @@ protected Channel createChannel(int port) throws SSLException {
.build());
}

public void testQueryAssignment() throws Exception {
String topic = initTopic();
String group = "group";

QueryAssignmentResponse response = blockingStub.queryAssignment(buildQueryAssignmentRequest(topic, group));

assertQueryAssignment(response, BROKER_NUM);
}

public void testQueryFifoAssignment() throws Exception {
String topic = initTopic(TopicMessageType.FIFO);
String group = MQRandomUtils.getRandomConsumerGroup();
SubscriptionGroupConfig groupConfig = brokerController1.getSubscriptionGroupManager().findSubscriptionGroupConfig(group);
groupConfig.setConsumeMessageOrderly(true);
brokerController1.getSubscriptionGroupManager().updateSubscriptionGroupConfig(groupConfig);
brokerController2.getSubscriptionGroupManager().updateSubscriptionGroupConfig(groupConfig);
brokerController3.getSubscriptionGroupManager().updateSubscriptionGroupConfig(groupConfig);

QueryAssignmentResponse response = blockingStub.queryAssignment(buildQueryAssignmentRequest(topic, group));

assertQueryAssignment(response, BROKER_NUM * QUEUE_NUMBERS);
}

public void testTransactionCheckThenCommit() {
String topic = initTopicOnSampleTopicBroker(BROKER1_NAME, TopicMessageType.TRANSACTION);
String group = MQRandomUtils.getRandomConsumerGroup();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.rocketmq.test.grpc.v2;

import apache.rocketmq.v2.QueryAssignmentResponse;
import apache.rocketmq.v2.QueryRouteResponse;
import org.apache.rocketmq.proxy.config.ConfigurationManager;
import org.apache.rocketmq.proxy.grpc.v2.GrpcMessagingApplication;
Expand Down Expand Up @@ -62,12 +61,12 @@ public void testQueryRoute() throws Exception {

@Test
public void testQueryAssignment() throws Exception {
String topic = initTopic();
String group = "group";

QueryAssignmentResponse response = blockingStub.queryAssignment(buildQueryAssignmentRequest(topic, group));
super.testQueryAssignment();
}

assertQueryAssignment(response, BROKER_NUM);
@Test
public void testQueryFifoAssignment() throws Exception {
super.testQueryFifoAssignment();
}

@Test
Expand Down

0 comments on commit 40ada80

Please sign in to comment.