Skip to content

Commit

Permalink
[ISSUE #7429] clean channel map when CLIENT_UNREGISTER in proxy
Browse files Browse the repository at this point in the history
  • Loading branch information
xdkxlk committed Oct 9, 2023
1 parent b9ffe0f commit 3808387
Show file tree
Hide file tree
Showing 2 changed files with 88 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.rocketmq.proxy.service.sysmessage;

import com.alibaba.fastjson.JSON;
import io.netty.channel.Channel;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -73,16 +74,8 @@ protected void init() {
);
this.consumerManager.appendConsumerIdsChangeListener(new ConsumerIdsChangeListener() {
@Override
public void handle(ConsumerGroupEvent event, String s, Object... args) {
if (event == ConsumerGroupEvent.CLIENT_UNREGISTER) {
if (args == null || args.length < 1) {
return;
}
if (args[0] instanceof ClientChannelInfo) {
ClientChannelInfo clientChannelInfo = (ClientChannelInfo) args[0];
remoteChannelMap.remove(clientChannelInfo.getChannel().id().asLongText());
}
}
public void handle(ConsumerGroupEvent event, String group, Object... args) {
processConsumerGroupEvent(event, group, args);
}

@Override
Expand All @@ -98,6 +91,18 @@ public void shutdown() throws Exception {
super.shutdown();
}

protected void processConsumerGroupEvent(ConsumerGroupEvent event, String group, Object... args) {
if (event == ConsumerGroupEvent.CLIENT_UNREGISTER) {
if (args == null || args.length < 1) {
return;
}
if (args[0] instanceof ClientChannelInfo) {
ClientChannelInfo clientChannelInfo = (ClientChannelInfo) args[0];
remoteChannelMap.remove(buildKey(group, clientChannelInfo.getChannel()));
}
}
}

public void onConsumerRegister(String consumerGroup, ClientChannelInfo clientChannelInfo,
ConsumeType consumeType, MessageModel messageModel, ConsumeFromWhere consumeFromWhere,
Set<SubscriptionData> subList) {
Expand Down Expand Up @@ -189,7 +194,7 @@ public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeCo
}

RemoteChannel decodedChannel = RemoteChannel.decode(data.getChannelData());
RemoteChannel channel = remoteChannelMap.computeIfAbsent(data.getGroup() + "@" + decodedChannel.id().asLongText(), key -> decodedChannel);
RemoteChannel channel = remoteChannelMap.computeIfAbsent(buildKey(data.getGroup(), decodedChannel), key -> decodedChannel);
channel.setExtendAttribute(decodedChannel.getChannelExtendAttribute());
ClientChannelInfo clientChannelInfo = new ClientChannelInfo(
channel,
Expand Down Expand Up @@ -228,4 +233,8 @@ private String buildLocalProxyId() {
// use local address, remoting port and grpc port to build unique local proxy Id
return proxyConfig.getLocalServeAddr() + "%" + proxyConfig.getRemotingListenPort() + "%" + proxyConfig.getGrpcServerPort();
}

private static String buildKey(String group, Channel channel) {
return group + "@" + channel.id().asLongText();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import io.netty.channel.Channel;
import io.netty.channel.ChannelId;
import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
Expand All @@ -35,6 +36,7 @@
import java.util.stream.Collectors;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.rocketmq.broker.client.ClientChannelInfo;
import org.apache.rocketmq.broker.client.ConsumerGroupEvent;
import org.apache.rocketmq.broker.client.ConsumerManager;
import org.apache.rocketmq.client.impl.mqclient.MQClientAPIExt;
import org.apache.rocketmq.client.impl.mqclient.MQClientAPIFactory;
Expand Down Expand Up @@ -320,6 +322,72 @@ public void testSyncRemotingChannel() throws Exception {
}
}

@Test
public void testProcessConsumerGroupEventForRemoting() {
String consumerGroup = "consumerGroup";
Channel channel = createMockChannel();
RemotingProxyOutClient remotingProxyOutClient = mock(RemotingProxyOutClient.class);
RemotingChannel remotingChannel = new RemotingChannel(remotingProxyOutClient, proxyRelayService, channel, clientId, Collections.emptySet());
ClientChannelInfo clientChannelInfo = new ClientChannelInfo(
remotingChannel,
clientId,
LanguageCode.JAVA,
4
);

testProcessConsumerGroupEvent(consumerGroup, clientChannelInfo);
}

@Test
public void testProcessConsumerGroupEventForGrpcV2() {
String consumerGroup = "consumerGroup";
GrpcClientSettingsManager grpcClientSettingsManager = mock(GrpcClientSettingsManager.class);
GrpcChannelManager grpcChannelManager = mock(GrpcChannelManager.class);
GrpcClientChannel grpcClientChannel = new GrpcClientChannel(
proxyRelayService, grpcClientSettingsManager, grpcChannelManager,
ProxyContext.create().setRemoteAddress(remoteAddress).setLocalAddress(localAddress),
clientId);
ClientChannelInfo clientChannelInfo = new ClientChannelInfo(
grpcClientChannel,
clientId,
LanguageCode.JAVA,
5
);

testProcessConsumerGroupEvent(consumerGroup, clientChannelInfo);
}

private void testProcessConsumerGroupEvent(String consumerGroup, ClientChannelInfo clientChannelInfo) {
HeartbeatSyncer heartbeatSyncer = new HeartbeatSyncer(topicRouteService, adminService, consumerManager, mqClientAPIFactory, null);
SendResult okSendResult = new SendResult();
okSendResult.setSendStatus(SendStatus.SEND_OK);

ArgumentCaptor<Message> messageArgumentCaptor = ArgumentCaptor.forClass(Message.class);
doReturn(CompletableFuture.completedFuture(okSendResult)).when(this.mqClientAPIExt)
.sendMessageAsync(anyString(), anyString(), messageArgumentCaptor.capture(), any(), anyLong());

heartbeatSyncer.onConsumerRegister(
consumerGroup,
clientChannelInfo,
ConsumeType.CONSUME_PASSIVELY,
MessageModel.CLUSTERING,
ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET,
Collections.emptySet()
);
await().atMost(Duration.ofSeconds(3)).until(() -> messageArgumentCaptor.getAllValues().size() == 1);

// change local serve addr, to simulate other proxy receive messages
heartbeatSyncer.localProxyId = RandomStringUtils.randomAlphabetic(10);
ArgumentCaptor<ClientChannelInfo> channelInfoArgumentCaptor = ArgumentCaptor.forClass(ClientChannelInfo.class);
doReturn(true).when(consumerManager).registerConsumer(anyString(), channelInfoArgumentCaptor.capture(), any(), any(), any(), any(), anyBoolean());

heartbeatSyncer.consumeMessage(convertFromMessage(messageArgumentCaptor.getAllValues()), null);
assertEquals(1, heartbeatSyncer.remoteChannelMap.size());

heartbeatSyncer.processConsumerGroupEvent(ConsumerGroupEvent.CLIENT_UNREGISTER, consumerGroup, channelInfoArgumentCaptor.getValue());
assertTrue(heartbeatSyncer.remoteChannelMap.isEmpty());
}

private MessageExt convertFromMessage(Message message) {
MessageExt messageExt = new MessageExt();
messageExt.setTopic(message.getTopic());
Expand Down

0 comments on commit 3808387

Please sign in to comment.