From 40314b607d28d9a174573009b4bceceb3adf68b6 Mon Sep 17 00:00:00 2001 From: "kaiyi.lk" Date: Tue, 6 Jun 2023 15:26:42 +0800 Subject: [PATCH] [ISSUE #6858] passing through ProxyContext for future expansion --- .../proxy/grpc/v2/client/ClientActivity.java | 2 +- .../grpc/v2/consumer/AckMessageActivity.java | 2 +- .../ChangeInvisibleDurationActivity.java | 2 +- .../v2/consumer/ReceiveMessageActivity.java | 2 +- .../producer/ForwardMessageToDLQActivity.java | 2 +- .../proxy/grpc/v2/route/RouteActivity.java | 2 +- .../proxy/processor/ConsumerProcessor.java | 20 +++++----- .../processor/DefaultMessagingProcessor.java | 4 +- .../proxy/processor/ProducerProcessor.java | 9 +++-- .../processor/ReceiptHandleProcessor.java | 14 +++---- .../proxy/processor/TransactionProcessor.java | 3 +- .../activity/SendMessageActivity.java | 2 +- .../proxy/service/ClusterServiceManager.java | 3 +- .../message/ClusterMessageService.java | 20 +++++----- .../metadata/ClusterMetadataService.java | 7 ++-- .../metadata/LocalMetadataService.java | 5 ++- .../service/metadata/MetadataService.java | 5 ++- .../relay/AbstractProxyRelayService.java | 1 + .../route/ClusterTopicRouteService.java | 17 +++++---- .../service/route/LocalTopicRouteService.java | 13 ++++--- .../service/route/TopicRouteService.java | 11 +++--- .../AbstractSystemMessageSyncer.java | 3 +- .../AbstractTransactionService.java | 8 ++-- .../ClusterTransactionService.java | 21 +++++----- .../transaction/LocalTransactionService.java | 9 +++-- .../transaction/TransactionService.java | 14 +++---- .../grpc/v2/client/ClientActivityTest.java | 4 +- .../ChangeInvisibleDurationActivityTest.java | 2 +- .../ForwardMessageToDLQActivityTest.java | 2 +- .../grpc/v2/route/RouteActivityTest.java | 2 +- .../processor/ConsumerProcessorTest.java | 15 ++++---- .../processor/ProducerProcessorTest.java | 4 +- .../processor/ReceiptHandleProcessorTest.java | 38 +++++++++---------- .../processor/TransactionProcessorTest.java | 2 +- .../activity/SendMessageActivityTest.java | 2 +- .../proxy/service/BaseServiceTest.java | 7 ++-- .../message/ClusterMessageServiceTest.java | 3 +- .../metadata/ClusterMetadataServiceTest.java | 11 ++++-- .../route/ClusterTopicRouteServiceTest.java | 9 +++-- .../route/LocalTopicRouteServiceTest.java | 7 +++- .../sysmessage/HeartbeatSyncerTest.java | 2 +- .../AbstractTransactionServiceTest.java | 14 +++++-- .../ClusterTransactionServiceTest.java | 26 +++++++------ 43 files changed, 193 insertions(+), 158 deletions(-) diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/client/ClientActivity.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/client/ClientActivity.java index de8fba4a637..a60228eb9f8 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/client/ClientActivity.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/client/ClientActivity.java @@ -287,7 +287,7 @@ protected GrpcClientChannel registerProducer(ProxyContext ctx, String topicName) // use topic name as producer group ClientChannelInfo clientChannelInfo = new ClientChannelInfo(channel, clientId, languageCode, parseClientVersion(ctx.getClientVersion())); this.messagingProcessor.registerProducer(ctx, topicName, clientChannelInfo); - TopicMessageType topicMessageType = this.messagingProcessor.getMetadataService().getTopicMessageType(topicName); + TopicMessageType topicMessageType = this.messagingProcessor.getMetadataService().getTopicMessageType(ctx, topicName); if (TopicMessageType.TRANSACTION.equals(topicMessageType)) { this.messagingProcessor.addTransactionSubscription(ctx, topicName, topicName); } diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/AckMessageActivity.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/AckMessageActivity.java index fb31a606242..993f069b947 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/AckMessageActivity.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/AckMessageActivity.java @@ -98,7 +98,7 @@ protected CompletableFuture processAckMessage(ProxyContex String handleString = ackMessageEntry.getReceiptHandle(); String group = GrpcConverter.getInstance().wrapResourceWithNamespace(request.getGroup()); - MessageReceiptHandle messageReceiptHandle = receiptHandleProcessor.removeReceiptHandle(grpcChannelManager.getChannel(ctx.getClientID()), group, ackMessageEntry.getMessageId(), ackMessageEntry.getReceiptHandle()); + MessageReceiptHandle messageReceiptHandle = receiptHandleProcessor.removeReceiptHandle(ctx, grpcChannelManager.getChannel(ctx.getClientID()), group, ackMessageEntry.getMessageId(), ackMessageEntry.getReceiptHandle()); if (messageReceiptHandle != null) { handleString = messageReceiptHandle.getReceiptHandleStr(); } diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ChangeInvisibleDurationActivity.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ChangeInvisibleDurationActivity.java index 0f33cc7aa77..9b7e947e0ba 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ChangeInvisibleDurationActivity.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ChangeInvisibleDurationActivity.java @@ -55,7 +55,7 @@ public CompletableFuture changeInvisibleDuratio ReceiptHandle receiptHandle = ReceiptHandle.decode(request.getReceiptHandle()); String group = GrpcConverter.getInstance().wrapResourceWithNamespace(request.getGroup()); - MessageReceiptHandle messageReceiptHandle = receiptHandleProcessor.removeReceiptHandle(grpcChannelManager.getChannel(ctx.getClientID()), group, request.getMessageId(), receiptHandle.getReceiptHandle()); + MessageReceiptHandle messageReceiptHandle = receiptHandleProcessor.removeReceiptHandle(ctx, grpcChannelManager.getChannel(ctx.getClientID()), group, request.getMessageId(), receiptHandle.getReceiptHandle()); if (messageReceiptHandle != null) { receiptHandle = ReceiptHandle.decode(messageReceiptHandle.getReceiptHandleStr()); } diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivity.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivity.java index 9df4101f732..22a149004ce 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivity.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivity.java @@ -144,7 +144,7 @@ public void receiveMessage(ProxyContext ctx, ReceiveMessageRequest request, MessageReceiptHandle messageReceiptHandle = new MessageReceiptHandle(group, topic, messageExt.getQueueId(), receiptHandle, messageExt.getMsgId(), messageExt.getQueueOffset(), messageExt.getReconsumeTimes()); - receiptHandleProcessor.addReceiptHandle(grpcChannelManager.getChannel(ctx.getClientID()), group, messageExt.getMsgId(), receiptHandle, messageReceiptHandle); + receiptHandleProcessor.addReceiptHandle(ctx, grpcChannelManager.getChannel(ctx.getClientID()), group, messageExt.getMsgId(), receiptHandle, messageReceiptHandle); } } } diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/producer/ForwardMessageToDLQActivity.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/producer/ForwardMessageToDLQActivity.java index 789927d693b..6b5c5c7e07b 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/producer/ForwardMessageToDLQActivity.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/producer/ForwardMessageToDLQActivity.java @@ -48,7 +48,7 @@ public CompletableFuture forwardMessage String group = GrpcConverter.getInstance().wrapResourceWithNamespace(request.getGroup()); String handleString = request.getReceiptHandle(); - MessageReceiptHandle messageReceiptHandle = receiptHandleProcessor.removeReceiptHandle(grpcChannelManager.getChannel(ctx.getClientID()), group, request.getMessageId(), request.getReceiptHandle()); + MessageReceiptHandle messageReceiptHandle = receiptHandleProcessor.removeReceiptHandle(ctx, grpcChannelManager.getChannel(ctx.getClientID()), group, request.getMessageId(), request.getReceiptHandle()); if (messageReceiptHandle != null) { handleString = messageReceiptHandle.getReceiptHandleStr(); } diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/route/RouteActivity.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/route/RouteActivity.java index eb7385f8746..c5d485691b6 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/route/RouteActivity.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/route/RouteActivity.java @@ -71,7 +71,7 @@ public CompletableFuture queryRoute(ProxyContext ctx, QueryR List messageQueueList = new ArrayList<>(); Map> brokerMap = buildBrokerMap(proxyTopicRouteData.getBrokerDatas()); - TopicMessageType topicMessageType = messagingProcessor.getMetadataService().getTopicMessageType(topicName); + TopicMessageType topicMessageType = messagingProcessor.getMetadataService().getTopicMessageType(ctx, topicName); for (QueueData queueData : proxyTopicRouteData.getQueueDatas()) { String brokerName = queueData.getBrokerName(); Map brokerIdMap = brokerMap.get(brokerName); diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ConsumerProcessor.java b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ConsumerProcessor.java index d67f4b855d9..c860ee8a1a9 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ConsumerProcessor.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ConsumerProcessor.java @@ -87,7 +87,7 @@ public CompletableFuture popMessage( ) { CompletableFuture future = new CompletableFuture<>(); try { - AddressableMessageQueue messageQueue = queueSelector.select(ctx, this.serviceManager.getTopicRouteService().getCurrentMessageQueueView(topic)); + AddressableMessageQueue messageQueue = queueSelector.select(ctx, this.serviceManager.getTopicRouteService().getCurrentMessageQueueView(ctx, topic)); if (messageQueue == null) { throw new ProxyException(ProxyExceptionCode.FORBIDDEN, "no readable queue"); } @@ -287,7 +287,7 @@ public CompletableFuture pullMessage(ProxyContext ctx, MessageQueue CompletableFuture future = new CompletableFuture<>(); try { AddressableMessageQueue addressableMessageQueue = serviceManager.getTopicRouteService() - .buildAddressableMessageQueue(messageQueue); + .buildAddressableMessageQueue(ctx, messageQueue); PullMessageRequestHeader requestHeader = new PullMessageRequestHeader(); requestHeader.setConsumerGroup(consumerGroup); requestHeader.setTopic(addressableMessageQueue.getTopic()); @@ -311,7 +311,7 @@ public CompletableFuture updateConsumerOffset(ProxyContext ctx, MessageQue CompletableFuture future = new CompletableFuture<>(); try { AddressableMessageQueue addressableMessageQueue = serviceManager.getTopicRouteService() - .buildAddressableMessageQueue(messageQueue); + .buildAddressableMessageQueue(ctx, messageQueue); UpdateConsumerOffsetRequestHeader requestHeader = new UpdateConsumerOffsetRequestHeader(); requestHeader.setConsumerGroup(consumerGroup); requestHeader.setTopic(addressableMessageQueue.getTopic()); @@ -329,7 +329,7 @@ public CompletableFuture queryConsumerOffset(ProxyContext ctx, MessageQueu CompletableFuture future = new CompletableFuture<>(); try { AddressableMessageQueue addressableMessageQueue = serviceManager.getTopicRouteService() - .buildAddressableMessageQueue(messageQueue); + .buildAddressableMessageQueue(ctx, messageQueue); QueryConsumerOffsetRequestHeader requestHeader = new QueryConsumerOffsetRequestHeader(); requestHeader.setConsumerGroup(consumerGroup); requestHeader.setTopic(addressableMessageQueue.getTopic()); @@ -345,7 +345,7 @@ public CompletableFuture> lockBatchMQ(ProxyContext ctx, Set> future = new CompletableFuture<>(); Set successSet = new CopyOnWriteArraySet<>(); - Set addressableMessageQueueSet = buildAddressableSet(mqSet); + Set addressableMessageQueueSet = buildAddressableSet(ctx, mqSet); Map> messageQueueSetMap = buildAddressableMapByBrokerName(addressableMessageQueueSet); List> futureList = new ArrayList<>(); messageQueueSetMap.forEach((k, v) -> { @@ -370,7 +370,7 @@ public CompletableFuture> lockBatchMQ(ProxyContext ctx, Set unlockBatchMQ(ProxyContext ctx, Set mqSet, String consumerGroup, String clientId, long timeoutMillis) { CompletableFuture future = new CompletableFuture<>(); - Set addressableMessageQueueSet = buildAddressableSet(mqSet); + Set addressableMessageQueueSet = buildAddressableSet(ctx, mqSet); Map> messageQueueSetMap = buildAddressableMapByBrokerName(addressableMessageQueueSet); List> futureList = new ArrayList<>(); messageQueueSetMap.forEach((k, v) -> { @@ -394,7 +394,7 @@ public CompletableFuture getMaxOffset(ProxyContext ctx, MessageQueue messa CompletableFuture future = new CompletableFuture<>(); try { AddressableMessageQueue addressableMessageQueue = serviceManager.getTopicRouteService() - .buildAddressableMessageQueue(messageQueue); + .buildAddressableMessageQueue(ctx, messageQueue); GetMaxOffsetRequestHeader requestHeader = new GetMaxOffsetRequestHeader(); requestHeader.setTopic(addressableMessageQueue.getTopic()); requestHeader.setQueueId(addressableMessageQueue.getQueueId()); @@ -409,7 +409,7 @@ public CompletableFuture getMinOffset(ProxyContext ctx, MessageQueue messa CompletableFuture future = new CompletableFuture<>(); try { AddressableMessageQueue addressableMessageQueue = serviceManager.getTopicRouteService() - .buildAddressableMessageQueue(messageQueue); + .buildAddressableMessageQueue(ctx, messageQueue); GetMinOffsetRequestHeader requestHeader = new GetMinOffsetRequestHeader(); requestHeader.setTopic(addressableMessageQueue.getTopic()); requestHeader.setQueueId(addressableMessageQueue.getQueueId()); @@ -420,10 +420,10 @@ public CompletableFuture getMinOffset(ProxyContext ctx, MessageQueue messa return FutureUtils.addExecutor(future, this.executor); } - protected Set buildAddressableSet(Set mqSet) { + protected Set buildAddressableSet(ProxyContext ctx, Set mqSet) { return mqSet.stream().map(mq -> { try { - return serviceManager.getTopicRouteService().buildAddressableMessageQueue(mq); + return serviceManager.getTopicRouteService().buildAddressableMessageQueue(ctx, mq); } catch (Exception e) { return null; } diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/DefaultMessagingProcessor.java b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/DefaultMessagingProcessor.java index bfadc0d3e0c..81d2b9df359 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/DefaultMessagingProcessor.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/DefaultMessagingProcessor.java @@ -127,13 +127,13 @@ protected void init() { @Override public SubscriptionGroupConfig getSubscriptionGroupConfig(ProxyContext ctx, String consumerGroupName) { - return this.serviceManager.getMetadataService().getSubscriptionGroupConfig(consumerGroupName); + return this.serviceManager.getMetadataService().getSubscriptionGroupConfig(ctx, consumerGroupName); } @Override public ProxyTopicRouteData getTopicRouteDataForProxy(ProxyContext ctx, List
requestHostAndPortList, String topicName) throws Exception { - return this.serviceManager.getTopicRouteService().getTopicRouteForProxy(requestHostAndPortList, topicName); + return this.serviceManager.getTopicRouteService().getTopicRouteForProxy(ctx, requestHostAndPortList, topicName); } @Override diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ProducerProcessor.java b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ProducerProcessor.java index 749f9da2bec..0d0c6216862 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ProducerProcessor.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ProducerProcessor.java @@ -73,14 +73,14 @@ public CompletableFuture> sendMessage(ProxyContext ctx, QueueSe if (topicMessageTypeValidator != null) { // Do not check retry or dlq topic if (!NamespaceUtil.isRetryTopic(topic) && !NamespaceUtil.isDLQTopic(topic)) { - TopicMessageType topicMessageType = serviceManager.getMetadataService().getTopicMessageType(topic); + TopicMessageType topicMessageType = serviceManager.getMetadataService().getTopicMessageType(ctx, topic); TopicMessageType messageType = TopicMessageType.parseFromMessageProperty(message.getProperties()); topicMessageTypeValidator.validate(topicMessageType, messageType); } } } AddressableMessageQueue messageQueue = queueSelector.select(ctx, - this.serviceManager.getTopicRouteService().getCurrentMessageQueueView(topic)); + this.serviceManager.getTopicRouteService().getCurrentMessageQueueView(ctx, topic)); if (messageQueue == null) { throw new ProxyException(ProxyExceptionCode.FORBIDDEN, "no writable queue"); } @@ -102,7 +102,7 @@ public CompletableFuture> sendMessage(ProxyContext ctx, QueueSe if (SendStatus.SEND_OK.equals(sendResult.getSendStatus()) && tranType == MessageSysFlag.TRANSACTION_PREPARED_TYPE && StringUtils.isNotBlank(sendResult.getTransactionId())) { - fillTransactionData(producerGroup, messageQueue, sendResult, messageList); + fillTransactionData(ctx, producerGroup, messageQueue, sendResult, messageList); } } return sendResultList; @@ -113,7 +113,7 @@ public CompletableFuture> sendMessage(ProxyContext ctx, QueueSe return FutureUtils.addExecutor(future, this.executor); } - protected void fillTransactionData(String producerGroup, AddressableMessageQueue messageQueue, SendResult sendResult, List messageList) { + protected void fillTransactionData(ProxyContext ctx, String producerGroup, AddressableMessageQueue messageQueue, SendResult sendResult, List messageList) { try { MessageId id; if (sendResult.getOffsetMsgId() != null) { @@ -122,6 +122,7 @@ protected void fillTransactionData(String producerGroup, AddressableMessageQueue id = MessageDecoder.decodeMessageId(sendResult.getMsgId()); } this.serviceManager.getTransactionService().addTransactionDataByBrokerName( + ctx, messageQueue.getBrokerName(), producerGroup, sendResult.getQueueOffset(), diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java index c903220bbec..7fe97db7985 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java @@ -201,7 +201,7 @@ protected CompletableFuture startRenewMessage(MessageRecei }); } else { SubscriptionGroupConfig subscriptionGroupConfig = - messagingProcessor.getMetadataService().getSubscriptionGroupConfig(messageReceiptHandle.getGroup()); + messagingProcessor.getMetadataService().getSubscriptionGroupConfig(context, messageReceiptHandle.getGroup()); if (subscriptionGroupConfig == null) { log.error("group's subscriptionGroupConfig is null when renew. handle: {}", messageReceiptHandle); return CompletableFuture.completedFuture(null); @@ -240,12 +240,12 @@ protected boolean clientIsOffline(ReceiptHandleGroupKey groupKey) { return this.messagingProcessor.findConsumerChannel(createContext("JudgeClientOnline"), groupKey.group, groupKey.channel) == null; } - public void addReceiptHandle(Channel channel, String group, String msgID, String receiptHandle, + public void addReceiptHandle(ProxyContext ctx, Channel channel, String group, String msgID, String receiptHandle, MessageReceiptHandle messageReceiptHandle) { - this.addReceiptHandle(new ReceiptHandleGroupKey(channel, group), msgID, receiptHandle, messageReceiptHandle); + this.addReceiptHandle(ctx, new ReceiptHandleGroupKey(channel, group), msgID, receiptHandle, messageReceiptHandle); } - protected void addReceiptHandle(ReceiptHandleGroupKey key, String msgID, String receiptHandle, + protected void addReceiptHandle(ProxyContext ctx, ReceiptHandleGroupKey key, String msgID, String receiptHandle, MessageReceiptHandle messageReceiptHandle) { if (key == null) { return; @@ -254,11 +254,11 @@ protected void addReceiptHandle(ReceiptHandleGroupKey key, String msgID, String k -> new ReceiptHandleGroup()).put(msgID, receiptHandle, messageReceiptHandle); } - public MessageReceiptHandle removeReceiptHandle(Channel channel, String group, String msgID, String receiptHandle) { - return this.removeReceiptHandle(new ReceiptHandleGroupKey(channel, group), msgID, receiptHandle); + public MessageReceiptHandle removeReceiptHandle(ProxyContext ctx, Channel channel, String group, String msgID, String receiptHandle) { + return this.removeReceiptHandle(ctx, new ReceiptHandleGroupKey(channel, group), msgID, receiptHandle); } - protected MessageReceiptHandle removeReceiptHandle(ReceiptHandleGroupKey key, String msgID, String receiptHandle) { + protected MessageReceiptHandle removeReceiptHandle(ProxyContext ctx, ReceiptHandleGroupKey key, String msgID, String receiptHandle) { if (key == null) { return null; } diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/TransactionProcessor.java b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/TransactionProcessor.java index 3b284cd0569..c0ba255f544 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/TransactionProcessor.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/TransactionProcessor.java @@ -36,6 +36,7 @@ public CompletableFuture endTransaction(ProxyContext ctx, String transacti CompletableFuture future = new CompletableFuture<>(); try { EndTransactionRequestData headerData = serviceManager.getTransactionService().genEndTransactionRequestHeader( + ctx, producerGroup, buildCommitOrRollback(transactionStatus), fromTransactionCheck, @@ -70,6 +71,6 @@ protected int buildCommitOrRollback(TransactionStatus transactionStatus) { } public void addTransactionSubscription(ProxyContext ctx, String producerGroup, String topic) { - this.serviceManager.getTransactionService().addTransactionSubscription(producerGroup, topic); + this.serviceManager.getTransactionService().addTransactionSubscription(ctx, producerGroup, topic); } } diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/SendMessageActivity.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/SendMessageActivity.java index 618d4587434..17af0fdcb37 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/SendMessageActivity.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/SendMessageActivity.java @@ -70,7 +70,7 @@ protected RemotingCommand sendMessage(ChannelHandlerContext ctx, RemotingCommand if (topicMessageTypeValidator != null) { // Do not check retry or dlq topic if (!NamespaceUtil.isRetryTopic(topic) && !NamespaceUtil.isDLQTopic(topic)) { - TopicMessageType topicMessageType = messagingProcessor.getMetadataService().getTopicMessageType(topic); + TopicMessageType topicMessageType = messagingProcessor.getMetadataService().getTopicMessageType(context, topic); topicMessageTypeValidator.validate(topicMessageType, messageType); } } diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/ClusterServiceManager.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/ClusterServiceManager.java index 20beeb56669..95cc4d14977 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/ClusterServiceManager.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/ClusterServiceManager.java @@ -31,6 +31,7 @@ import org.apache.rocketmq.logging.org.slf4j.Logger; import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; import org.apache.rocketmq.common.utils.AbstractStartAndShutdown; +import org.apache.rocketmq.proxy.common.ProxyContext; import org.apache.rocketmq.proxy.config.ConfigurationManager; import org.apache.rocketmq.proxy.config.ProxyConfig; import org.apache.rocketmq.proxy.service.admin.AdminService; @@ -191,7 +192,7 @@ protected class ProducerChangeListenerImpl implements ProducerChangeListener { @Override public void handle(ProducerGroupEvent event, String group, ClientChannelInfo clientChannelInfo) { if (event == ProducerGroupEvent.GROUP_UNREGISTER) { - getTransactionService().unSubscribeAllTransactionTopic(group); + getTransactionService().unSubscribeAllTransactionTopic(ProxyContext.createForInner(this.getClass()), group); } } } diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/ClusterMessageService.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/ClusterMessageService.java index 7150967d458..9f163f1b987 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/ClusterMessageService.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/ClusterMessageService.java @@ -80,7 +80,7 @@ public CompletableFuture> sendMessage(ProxyContext ctx, Address public CompletableFuture sendMessageBack(ProxyContext ctx, ReceiptHandle handle, String messageId, ConsumerSendMsgBackRequestHeader requestHeader, long timeoutMillis) { return this.mqClientAPIFactory.getClient().sendMessageBackAsync( - this.resolveBrokerAddrInReceiptHandle(handle), + this.resolveBrokerAddrInReceiptHandle(ctx, handle), requestHeader, timeoutMillis ); @@ -93,7 +93,7 @@ public CompletableFuture endTransactionOneway(ProxyContext ctx, String bro CompletableFuture future = new CompletableFuture<>(); try { this.mqClientAPIFactory.getClient().endTransactionOneway( - this.resolveBrokerAddr(brokerName), + this.resolveBrokerAddr(ctx, brokerName), requestHeader, "end transaction from proxy", timeoutMillis @@ -120,7 +120,7 @@ public CompletableFuture popMessage(ProxyContext ctx, AddressableMess public CompletableFuture changeInvisibleTime(ProxyContext ctx, ReceiptHandle handle, String messageId, ChangeInvisibleTimeRequestHeader requestHeader, long timeoutMillis) { return this.mqClientAPIFactory.getClient().changeInvisibleTimeAsync( - this.resolveBrokerAddrInReceiptHandle(handle), + this.resolveBrokerAddrInReceiptHandle(ctx, handle), handle.getBrokerName(), requestHeader, timeoutMillis @@ -131,7 +131,7 @@ public CompletableFuture changeInvisibleTime(ProxyContext ctx, Receip public CompletableFuture ackMessage(ProxyContext ctx, ReceiptHandle handle, String messageId, AckMessageRequestHeader requestHeader, long timeoutMillis) { return this.mqClientAPIFactory.getClient().ackMessageAsync( - this.resolveBrokerAddrInReceiptHandle(handle), + this.resolveBrokerAddrInReceiptHandle(ctx, handle), requestHeader, timeoutMillis ); @@ -211,7 +211,7 @@ public CompletableFuture getMinOffset(ProxyContext ctx, AddressableMessage public CompletableFuture request(ProxyContext ctx, String brokerName, RemotingCommand request, long timeoutMillis) { try { - String brokerAddress = topicRouteService.getBrokerAddr(brokerName); + String brokerAddress = topicRouteService.getBrokerAddr(ctx, brokerName); return mqClientAPIFactory.getClient().invoke(brokerAddress, request, timeoutMillis); } catch (Throwable t) { return FutureUtils.completeExceptionally(t); @@ -222,24 +222,24 @@ public CompletableFuture request(ProxyContext ctx, String broke public CompletableFuture requestOneway(ProxyContext ctx, String brokerName, RemotingCommand request, long timeoutMillis) { try { - String brokerAddress = topicRouteService.getBrokerAddr(brokerName); + String brokerAddress = topicRouteService.getBrokerAddr(ctx, brokerName); return mqClientAPIFactory.getClient().invokeOneway(brokerAddress, request, timeoutMillis); } catch (Throwable t) { return FutureUtils.completeExceptionally(t); } } - protected String resolveBrokerAddrInReceiptHandle(ReceiptHandle handle) { + protected String resolveBrokerAddrInReceiptHandle(ProxyContext ctx, ReceiptHandle handle) { try { - return this.topicRouteService.getBrokerAddr(handle.getBrokerName()); + return this.topicRouteService.getBrokerAddr(ctx, handle.getBrokerName()); } catch (Throwable t) { throw new ProxyException(ProxyExceptionCode.INVALID_RECEIPT_HANDLE, "cannot find broker " + handle.getBrokerName(), t); } } - protected String resolveBrokerAddr(String brokerName) { + protected String resolveBrokerAddr(ProxyContext ctx, String brokerName) { try { - return this.topicRouteService.getBrokerAddr(brokerName); + return this.topicRouteService.getBrokerAddr(ctx, brokerName); } catch (Throwable t) { throw new ProxyException(ProxyExceptionCode.INVALID_BROKER_NAME, "cannot find broker " + brokerName, t); } diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/metadata/ClusterMetadataService.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/metadata/ClusterMetadataService.java index 7934d3c860c..bc9582ad816 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/metadata/ClusterMetadataService.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/metadata/ClusterMetadataService.java @@ -29,6 +29,7 @@ import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; import org.apache.rocketmq.proxy.common.AbstractCacheLoader; import org.apache.rocketmq.common.utils.AbstractStartAndShutdown; +import org.apache.rocketmq.proxy.common.ProxyContext; import org.apache.rocketmq.proxy.config.ConfigurationManager; import org.apache.rocketmq.proxy.config.ProxyConfig; import org.apache.rocketmq.client.impl.mqclient.MQClientAPIFactory; @@ -83,7 +84,7 @@ protected void init() { } @Override - public TopicMessageType getTopicMessageType(String topic) { + public TopicMessageType getTopicMessageType(ProxyContext ctx, String topic) { TopicConfigAndQueueMapping topicConfigAndQueueMapping; try { topicConfigAndQueueMapping = topicConfigCache.get(topic); @@ -97,7 +98,7 @@ public TopicMessageType getTopicMessageType(String topic) { } @Override - public SubscriptionGroupConfig getSubscriptionGroupConfig(String group) { + public SubscriptionGroupConfig getSubscriptionGroupConfig(ProxyContext ctx, String group) { SubscriptionGroupConfig config; try { config = this.subscriptionGroupConfigCache.get(group); @@ -158,7 +159,7 @@ protected void onErr(String key, Exception e) { protected Optional findOneBroker(String topic) throws Exception { try { - return topicRouteService.getAllMessageQueueView(topic).getTopicRouteData().getBrokerDatas().stream().findAny(); + return topicRouteService.getAllMessageQueueView(ProxyContext.createForInner(this.getClass()), topic).getTopicRouteData().getBrokerDatas().stream().findAny(); } catch (Exception e) { if (TopicRouteHelper.isTopicNotExistError(e)) { return Optional.empty(); diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/metadata/LocalMetadataService.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/metadata/LocalMetadataService.java index bc1d03e74ba..7f3c041f259 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/metadata/LocalMetadataService.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/metadata/LocalMetadataService.java @@ -20,6 +20,7 @@ import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.common.TopicConfig; import org.apache.rocketmq.common.attribute.TopicMessageType; +import org.apache.rocketmq.proxy.common.ProxyContext; import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig; public class LocalMetadataService implements MetadataService { @@ -30,7 +31,7 @@ public LocalMetadataService(BrokerController brokerController) { } @Override - public TopicMessageType getTopicMessageType(String topic) { + public TopicMessageType getTopicMessageType(ProxyContext ctx, String topic) { TopicConfig topicConfig = brokerController.getTopicConfigManager().selectTopicConfig(topic); if (topicConfig == null) { return TopicMessageType.UNSPECIFIED; @@ -39,7 +40,7 @@ public TopicMessageType getTopicMessageType(String topic) { } @Override - public SubscriptionGroupConfig getSubscriptionGroupConfig(String group) { + public SubscriptionGroupConfig getSubscriptionGroupConfig(ProxyContext ctx, String group) { return this.brokerController.getSubscriptionGroupManager().getSubscriptionGroupTable().get(group); } } diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/metadata/MetadataService.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/metadata/MetadataService.java index d5e38f145bc..3ee0f3eacd3 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/metadata/MetadataService.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/metadata/MetadataService.java @@ -18,11 +18,12 @@ package org.apache.rocketmq.proxy.service.metadata; import org.apache.rocketmq.common.attribute.TopicMessageType; +import org.apache.rocketmq.proxy.common.ProxyContext; import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig; public interface MetadataService { - TopicMessageType getTopicMessageType(String topic); + TopicMessageType getTopicMessageType(ProxyContext ctx, String topic); - SubscriptionGroupConfig getSubscriptionGroupConfig(String group); + SubscriptionGroupConfig getSubscriptionGroupConfig(ProxyContext ctx, String group); } diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/relay/AbstractProxyRelayService.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/relay/AbstractProxyRelayService.java index ed68d1d3ae8..08f00bd83c0 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/relay/AbstractProxyRelayService.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/relay/AbstractProxyRelayService.java @@ -43,6 +43,7 @@ public RelayData processCheckTransactionState(ProxyContex CompletableFuture> future = new CompletableFuture<>(); String group = messageExt.getProperty(MessageConst.PROPERTY_PRODUCER_GROUP); TransactionData transactionData = transactionService.addTransactionDataByBrokerAddr( + context, command.getExtFields().get(ProxyUtils.BROKER_ADDR), group, header.getTranStateTableOffset(), diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/ClusterTopicRouteService.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/ClusterTopicRouteService.java index 31e1f94c55b..fb97002df7f 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/ClusterTopicRouteService.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/ClusterTopicRouteService.java @@ -21,6 +21,7 @@ import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.proxy.common.Address; import org.apache.rocketmq.client.impl.mqclient.MQClientAPIFactory; +import org.apache.rocketmq.proxy.common.ProxyContext; import org.apache.rocketmq.remoting.protocol.route.BrokerData; import org.apache.rocketmq.remoting.protocol.route.TopicRouteData; @@ -31,14 +32,14 @@ public ClusterTopicRouteService(MQClientAPIFactory mqClientAPIFactory) { } @Override - public MessageQueueView getCurrentMessageQueueView(String topicName) throws Exception { - return getAllMessageQueueView(topicName); + public MessageQueueView getCurrentMessageQueueView(ProxyContext ctx, String topicName) throws Exception { + return getAllMessageQueueView(ctx, topicName); } @Override - public ProxyTopicRouteData getTopicRouteForProxy(List
requestHostAndPortList, + public ProxyTopicRouteData getTopicRouteForProxy(ProxyContext ctx, List
requestHostAndPortList, String topicName) throws Exception { - TopicRouteData topicRouteData = getAllMessageQueueView(topicName).getTopicRouteData(); + TopicRouteData topicRouteData = getAllMessageQueueView(ctx, topicName).getTopicRouteData(); ProxyTopicRouteData proxyTopicRouteData = new ProxyTopicRouteData(); proxyTopicRouteData.setQueueDatas(topicRouteData.getQueueDatas()); @@ -57,8 +58,8 @@ public ProxyTopicRouteData getTopicRouteForProxy(List
requestHostAndPor } @Override - public String getBrokerAddr(String brokerName) throws Exception { - List brokerDataList = getAllMessageQueueView(brokerName).getTopicRouteData().getBrokerDatas(); + public String getBrokerAddr(ProxyContext ctx, String brokerName) throws Exception { + List brokerDataList = getAllMessageQueueView(ctx, brokerName).getTopicRouteData().getBrokerDatas(); if (brokerDataList.isEmpty()) { return null; } @@ -66,8 +67,8 @@ public String getBrokerAddr(String brokerName) throws Exception { } @Override - public AddressableMessageQueue buildAddressableMessageQueue(MessageQueue messageQueue) throws Exception { - String brokerAddress = getBrokerAddr(messageQueue.getBrokerName()); + public AddressableMessageQueue buildAddressableMessageQueue(ProxyContext ctx, MessageQueue messageQueue) throws Exception { + String brokerAddress = getBrokerAddr(ctx, messageQueue.getBrokerName()); return new AddressableMessageQueue(messageQueue, brokerAddress); } } diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/LocalTopicRouteService.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/LocalTopicRouteService.java index 3ac7ae75b94..d67b68f38e9 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/LocalTopicRouteService.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/LocalTopicRouteService.java @@ -26,6 +26,7 @@ import org.apache.rocketmq.common.TopicConfig; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.proxy.common.Address; +import org.apache.rocketmq.proxy.common.ProxyContext; import org.apache.rocketmq.proxy.config.ConfigurationManager; import org.apache.rocketmq.client.impl.mqclient.MQClientAPIFactory; import org.apache.rocketmq.remoting.protocol.route.BrokerData; @@ -51,15 +52,15 @@ public LocalTopicRouteService(BrokerController brokerController, MQClientAPIFact } @Override - public MessageQueueView getCurrentMessageQueueView(String topic) throws Exception { + public MessageQueueView getCurrentMessageQueueView(ProxyContext ctx, String topic) throws Exception { TopicConfig topicConfig = this.brokerController.getTopicConfigManager().getTopicConfigTable().get(topic); return new MessageQueueView(topic, toTopicRouteData(topicConfig)); } @Override - public ProxyTopicRouteData getTopicRouteForProxy(List
requestHostAndPortList, + public ProxyTopicRouteData getTopicRouteForProxy(ProxyContext ctx, List
requestHostAndPortList, String topicName) throws Exception { - MessageQueueView messageQueueView = getAllMessageQueueView(topicName); + MessageQueueView messageQueueView = getAllMessageQueueView(ctx, topicName); TopicRouteData topicRouteData = messageQueueView.getTopicRouteData(); ProxyTopicRouteData proxyTopicRouteData = new ProxyTopicRouteData(); @@ -83,13 +84,13 @@ public ProxyTopicRouteData getTopicRouteForProxy(List
requestHostAndPor } @Override - public String getBrokerAddr(String brokerName) throws Exception { + public String getBrokerAddr(ProxyContext ctx, String brokerName) throws Exception { return this.brokerController.getBrokerAddr(); } @Override - public AddressableMessageQueue buildAddressableMessageQueue(MessageQueue messageQueue) throws Exception { - String brokerAddress = getBrokerAddr(messageQueue.getBrokerName()); + public AddressableMessageQueue buildAddressableMessageQueue(ProxyContext ctx, MessageQueue messageQueue) throws Exception { + String brokerAddress = getBrokerAddr(ctx, messageQueue.getBrokerName()); return new AddressableMessageQueue(messageQueue, brokerAddress); } diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java index ba97e183b03..3fa6414c39b 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java @@ -35,6 +35,7 @@ import org.apache.rocketmq.proxy.common.AbstractCacheLoader; import org.apache.rocketmq.common.utils.AbstractStartAndShutdown; import org.apache.rocketmq.proxy.common.Address; +import org.apache.rocketmq.proxy.common.ProxyContext; import org.apache.rocketmq.proxy.config.ConfigurationManager; import org.apache.rocketmq.proxy.config.ProxyConfig; import org.apache.rocketmq.client.impl.mqclient.MQClientAPIFactory; @@ -109,18 +110,18 @@ protected void init() { this.appendStartAndShutdown(this.mqClientAPIFactory); } - public MessageQueueView getAllMessageQueueView(String topicName) throws Exception { + public MessageQueueView getAllMessageQueueView(ProxyContext ctx, String topicName) throws Exception { return getCacheMessageQueueWrapper(this.topicCache, topicName); } - public abstract MessageQueueView getCurrentMessageQueueView(String topicName) throws Exception; + public abstract MessageQueueView getCurrentMessageQueueView(ProxyContext ctx, String topicName) throws Exception; - public abstract ProxyTopicRouteData getTopicRouteForProxy(List
requestHostAndPortList, + public abstract ProxyTopicRouteData getTopicRouteForProxy(ProxyContext ctx, List
requestHostAndPortList, String topicName) throws Exception; - public abstract String getBrokerAddr(String brokerName) throws Exception; + public abstract String getBrokerAddr(ProxyContext ctx, String brokerName) throws Exception; - public abstract AddressableMessageQueue buildAddressableMessageQueue(MessageQueue messageQueue) throws Exception; + public abstract AddressableMessageQueue buildAddressableMessageQueue(ProxyContext ctx, MessageQueue messageQueue) throws Exception; protected static MessageQueueView getCacheMessageQueueWrapper(LoadingCache topicCache, String key) throws Exception { diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/sysmessage/AbstractSystemMessageSyncer.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/sysmessage/AbstractSystemMessageSyncer.java index cc988db7821..2ef84973748 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/sysmessage/AbstractSystemMessageSyncer.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/sysmessage/AbstractSystemMessageSyncer.java @@ -32,6 +32,7 @@ import org.apache.rocketmq.common.topic.TopicValidator; import org.apache.rocketmq.logging.org.slf4j.Logger; import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; +import org.apache.rocketmq.proxy.common.ProxyContext; import org.apache.rocketmq.proxy.common.ProxyException; import org.apache.rocketmq.proxy.common.ProxyExceptionCode; import org.apache.rocketmq.common.utils.StartAndShutdown; @@ -95,7 +96,7 @@ protected void sendSystemMessage(Object data) { JSON.toJSONString(data).getBytes(StandardCharsets.UTF_8) ); - AddressableMessageQueue messageQueue = this.topicRouteService.getAllMessageQueueView(targetTopic) + AddressableMessageQueue messageQueue = this.topicRouteService.getAllMessageQueueView(ProxyContext.createForInner(this.getClass()), targetTopic) .getWriteSelector().selectOne(true); this.mqClientAPIFactory.getClient().sendMessageAsync( messageQueue.getBrokerAddr(), diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/transaction/AbstractTransactionService.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/transaction/AbstractTransactionService.java index 3254d711d71..f0e083adead 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/transaction/AbstractTransactionService.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/transaction/AbstractTransactionService.java @@ -29,13 +29,13 @@ public abstract class AbstractTransactionService implements TransactionService, protected TransactionDataManager transactionDataManager = new TransactionDataManager(); @Override - public TransactionData addTransactionDataByBrokerAddr(String brokerAddr, String producerGroup, long tranStateTableOffset, long commitLogOffset, String transactionId, + public TransactionData addTransactionDataByBrokerAddr(ProxyContext ctx, String brokerAddr, String producerGroup, long tranStateTableOffset, long commitLogOffset, String transactionId, Message message) { - return this.addTransactionDataByBrokerName(this.getBrokerNameByAddr(brokerAddr), producerGroup, tranStateTableOffset, commitLogOffset, transactionId, message); + return this.addTransactionDataByBrokerName(ctx, this.getBrokerNameByAddr(brokerAddr), producerGroup, tranStateTableOffset, commitLogOffset, transactionId, message); } @Override - public TransactionData addTransactionDataByBrokerName(String brokerName, String producerGroup, long tranStateTableOffset, long commitLogOffset, String transactionId, + public TransactionData addTransactionDataByBrokerName(ProxyContext ctx, String brokerName, String producerGroup, long tranStateTableOffset, long commitLogOffset, String transactionId, Message message) { if (StringUtils.isBlank(brokerName)) { return null; @@ -55,7 +55,7 @@ public TransactionData addTransactionDataByBrokerName(String brokerName, String } @Override - public EndTransactionRequestData genEndTransactionRequestHeader(String producerGroup, Integer commitOrRollback, + public EndTransactionRequestData genEndTransactionRequestHeader(ProxyContext ctx, String producerGroup, Integer commitOrRollback, boolean fromTransactionCheck, String msgId, String transactionId) { TransactionData transactionData = this.transactionDataManager.pollNoExpireTransactionData(producerGroup, transactionId); if (transactionData == null) { diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/transaction/ClusterTransactionService.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/transaction/ClusterTransactionService.java index eff3ac49225..1ec42864636 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/transaction/ClusterTransactionService.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/transaction/ClusterTransactionService.java @@ -36,6 +36,7 @@ import org.apache.rocketmq.common.thread.ThreadPoolMonitor; import org.apache.rocketmq.logging.org.slf4j.Logger; import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; +import org.apache.rocketmq.proxy.common.ProxyContext; import org.apache.rocketmq.proxy.config.ConfigurationManager; import org.apache.rocketmq.proxy.config.ProxyConfig; import org.apache.rocketmq.client.impl.mqclient.MQClientAPIFactory; @@ -67,20 +68,20 @@ public ClusterTransactionService(TopicRouteService topicRouteService, ProducerMa } @Override - public void addTransactionSubscription(String group, List topicList) { + public void addTransactionSubscription(ProxyContext ctx, String group, List topicList) { for (String topic : topicList) { - addTransactionSubscription(group, topic); + addTransactionSubscription(ctx, group, topic); } } @Override - public void addTransactionSubscription(String group, String topic) { + public void addTransactionSubscription(ProxyContext ctx, String group, String topic) { try { groupClusterData.compute(group, (groupName, clusterDataSet) -> { if (clusterDataSet == null) { clusterDataSet = Sets.newHashSet(); } - clusterDataSet.addAll(getClusterDataFromTopic(topic)); + clusterDataSet.addAll(getClusterDataFromTopic(ctx, topic)); return clusterDataSet; }); } catch (Exception e) { @@ -89,17 +90,17 @@ public void addTransactionSubscription(String group, String topic) { } @Override - public void replaceTransactionSubscription(String group, List topicList) { + public void replaceTransactionSubscription(ProxyContext ctx, String group, List topicList) { Set clusterDataSet = new HashSet<>(); for (String topic : topicList) { - clusterDataSet.addAll(getClusterDataFromTopic(topic)); + clusterDataSet.addAll(getClusterDataFromTopic(ctx, topic)); } groupClusterData.put(group, clusterDataSet); } - private Set getClusterDataFromTopic(String topic) { + private Set getClusterDataFromTopic(ProxyContext ctx, String topic) { try { - MessageQueueView messageQueue = this.topicRouteService.getAllMessageQueueView(topic); + MessageQueueView messageQueue = this.topicRouteService.getAllMessageQueueView(ctx, topic); List brokerDataList = messageQueue.getTopicRouteData().getBrokerDatas(); if (brokerDataList == null) { @@ -117,7 +118,7 @@ private Set getClusterDataFromTopic(String topic) { } @Override - public void unSubscribeAllTransactionTopic(String group) { + public void unSubscribeAllTransactionTopic(ProxyContext ctx, String group) { groupClusterData.remove(group); } @@ -195,7 +196,7 @@ protected void sendHeartBeatToCluster(String clusterName, List he protected void sendHeartBeatToCluster(String clusterName, HeartbeatData heartbeatData, Map brokerAddrNameMap) { try { - MessageQueueView messageQueue = this.topicRouteService.getAllMessageQueueView(clusterName); + MessageQueueView messageQueue = this.topicRouteService.getAllMessageQueueView(ProxyContext.createForInner(this.getClass()), clusterName); List brokerDataList = messageQueue.getTopicRouteData().getBrokerDatas(); if (brokerDataList == null) { return; diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/transaction/LocalTransactionService.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/transaction/LocalTransactionService.java index 2371b25a243..4a27e4ff24a 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/transaction/LocalTransactionService.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/transaction/LocalTransactionService.java @@ -18,6 +18,7 @@ import java.util.List; import org.apache.rocketmq.common.BrokerConfig; +import org.apache.rocketmq.proxy.common.ProxyContext; /** * no need to implements, because the channel of producer will put into the broker's producerManager @@ -31,22 +32,22 @@ public LocalTransactionService(BrokerConfig brokerConfig) { } @Override - public void addTransactionSubscription(String group, List topicList) { + public void addTransactionSubscription(ProxyContext ctx, String group, List topicList) { } @Override - public void addTransactionSubscription(String group, String topic) { + public void addTransactionSubscription(ProxyContext ctx, String group, String topic) { } @Override - public void replaceTransactionSubscription(String group, List topicList) { + public void replaceTransactionSubscription(ProxyContext ctx, String group, List topicList) { } @Override - public void unSubscribeAllTransactionTopic(String group) { + public void unSubscribeAllTransactionTopic(ProxyContext ctx, String group) { } diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/transaction/TransactionService.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/transaction/TransactionService.java index 2a851051eb1..a7ab3532424 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/transaction/TransactionService.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/transaction/TransactionService.java @@ -22,21 +22,21 @@ public interface TransactionService { - void addTransactionSubscription(String group, List topicList); + void addTransactionSubscription(ProxyContext ctx, String group, List topicList); - void addTransactionSubscription(String group, String topic); + void addTransactionSubscription(ProxyContext ctx, String group, String topic); - void replaceTransactionSubscription(String group, List topicList); + void replaceTransactionSubscription(ProxyContext ctx, String group, List topicList); - void unSubscribeAllTransactionTopic(String group); + void unSubscribeAllTransactionTopic(ProxyContext ctx, String group); - TransactionData addTransactionDataByBrokerAddr(String brokerAddr, String producerGroup, long tranStateTableOffset, long commitLogOffset, String transactionId, + TransactionData addTransactionDataByBrokerAddr(ProxyContext ctx, String brokerAddr, String producerGroup, long tranStateTableOffset, long commitLogOffset, String transactionId, Message message); - TransactionData addTransactionDataByBrokerName(String brokerName, String producerGroup, long tranStateTableOffset, long commitLogOffset, String transactionId, + TransactionData addTransactionDataByBrokerName(ProxyContext ctx, String brokerName, String producerGroup, long tranStateTableOffset, long commitLogOffset, String transactionId, Message message); - EndTransactionRequestData genEndTransactionRequestHeader(String producerGroup, Integer commitOrRollback, + EndTransactionRequestData genEndTransactionRequestHeader(ProxyContext ctx, String producerGroup, Integer commitOrRollback, boolean fromTransactionCheck, String msgId, String transactionId); void onSendCheckTransactionStateFailed(ProxyContext context, String producerGroup, TransactionData transactionData); diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/client/ClientActivityTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/client/ClientActivityTest.java index 84310770207..a5d4e3c9193 100644 --- a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/client/ClientActivityTest.java +++ b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/client/ClientActivityTest.java @@ -134,7 +134,7 @@ public void testProducerHeartbeat() throws Throwable { txProducerTopicArgumentCaptor.capture() ); - when(this.metadataService.getTopicMessageType(anyString())).thenReturn(TopicMessageType.TRANSACTION); + when(this.metadataService.getTopicMessageType(any(), anyString())).thenReturn(TopicMessageType.TRANSACTION); HeartbeatResponse response = this.sendProducerHeartbeat(context); @@ -222,7 +222,7 @@ public void testProducerNotifyClientTermination() throws Throwable { .build()); ArgumentCaptor channelInfoArgumentCaptor = ArgumentCaptor.forClass(ClientChannelInfo.class); doNothing().when(this.messagingProcessor).unRegisterProducer(any(), anyString(), channelInfoArgumentCaptor.capture()); - when(this.metadataService.getTopicMessageType(anyString())).thenReturn(TopicMessageType.NORMAL); + when(this.metadataService.getTopicMessageType(any(), anyString())).thenReturn(TopicMessageType.NORMAL); this.sendProducerTelemetry(context); this.sendProducerHeartbeat(context); diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ChangeInvisibleDurationActivityTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ChangeInvisibleDurationActivityTest.java index a861e8c13fa..fdd052da764 100644 --- a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ChangeInvisibleDurationActivityTest.java +++ b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ChangeInvisibleDurationActivityTest.java @@ -92,7 +92,7 @@ public void testChangeInvisibleDurationActivityWhenHasMappingHandle() throws Thr when(this.messagingProcessor.changeInvisibleTime( any(), receiptHandleCaptor.capture(), anyString(), anyString(), anyString(), invisibleTimeArgumentCaptor.capture() )).thenReturn(CompletableFuture.completedFuture(ackResult)); - when(receiptHandleProcessor.removeReceiptHandle(any(), anyString(), anyString(), anyString())) + when(receiptHandleProcessor.removeReceiptHandle(any(), any(), anyString(), anyString(), anyString())) .thenReturn(new MessageReceiptHandle("group", "topic", 0, savedHandleStr, "msgId", 0, 0)); ChangeInvisibleDurationResponse response = this.changeInvisibleDurationActivity.changeInvisibleDuration( diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/producer/ForwardMessageToDLQActivityTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/producer/ForwardMessageToDLQActivityTest.java index 68db3020e35..ec620340c57 100644 --- a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/producer/ForwardMessageToDLQActivityTest.java +++ b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/producer/ForwardMessageToDLQActivityTest.java @@ -75,7 +75,7 @@ public void testForwardMessageToDeadLetterQueueWhenHasMappingHandle() throws Thr .thenReturn(CompletableFuture.completedFuture(RemotingCommand.createResponseCommand(ResponseCode.SUCCESS, ""))); String savedHandleStr = buildReceiptHandle("topic", System.currentTimeMillis(),3000); - when(receiptHandleProcessor.removeReceiptHandle(any(), anyString(), anyString(), anyString())) + when(receiptHandleProcessor.removeReceiptHandle(any(), any(), anyString(), anyString(), anyString())) .thenReturn(new MessageReceiptHandle("group", "topic", 0, savedHandleStr, "msgId", 0, 0)); ForwardMessageToDeadLetterQueueResponse response = this.forwardMessageToDLQActivity.forwardMessageToDeadLetterQueue( diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/route/RouteActivityTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/route/RouteActivityTest.java index ce98b7494d2..a7ba69098bc 100644 --- a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/route/RouteActivityTest.java +++ b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/route/RouteActivityTest.java @@ -101,7 +101,7 @@ public void testQueryRoute() throws Throwable { .thenReturn(createProxyTopicRouteData(2, 2, 6)); MetadataService metadataService = Mockito.mock(LocalMetadataService.class); when(this.messagingProcessor.getMetadataService()).thenReturn(metadataService); - when(metadataService.getTopicMessageType(anyString())).thenReturn(TopicMessageType.NORMAL); + when(metadataService.getTopicMessageType(any(), anyString())).thenReturn(TopicMessageType.NORMAL); QueryRouteResponse response = this.routeActivity.queryRoute( createContext(), diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ConsumerProcessorTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ConsumerProcessorTest.java index c695eb09442..876b25b30b2 100644 --- a/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ConsumerProcessorTest.java +++ b/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ConsumerProcessorTest.java @@ -38,6 +38,7 @@ import org.apache.rocketmq.common.message.MessageClientIDSetter; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.proxy.common.ProxyContext; import org.apache.rocketmq.proxy.common.utils.ProxyUtils; import org.apache.rocketmq.proxy.service.route.AddressableMessageQueue; import org.apache.rocketmq.proxy.service.route.MessageQueueView; @@ -91,7 +92,7 @@ public void testPopMessage() throws Throwable { when(this.messageService.popMessage(any(), messageQueueArgumentCaptor.capture(), requestHeaderArgumentCaptor.capture(), anyLong())) .thenReturn(CompletableFuture.completedFuture(innerPopResult)); - when(this.topicRouteService.getCurrentMessageQueueView(anyString())) + when(this.topicRouteService.getCurrentMessageQueueView(any(), anyString())) .thenReturn(mock(MessageQueueView.class)); ArgumentCaptor ackMessageIdArgumentCaptor = ArgumentCaptor.forClass(String.class); @@ -191,12 +192,12 @@ public void testLockBatch() throws Throwable { AddressableMessageQueue addressableMessageQueue2 = new AddressableMessageQueue(mq2, "127.0.0.1"); mqSet.add(mq1); mqSet.add(mq2); - when(this.topicRouteService.buildAddressableMessageQueue(any())).thenAnswer(i -> new AddressableMessageQueue((MessageQueue) i.getArguments()[0], "127.0.0.1")); + when(this.topicRouteService.buildAddressableMessageQueue(any(), any())).thenAnswer(i -> new AddressableMessageQueue((MessageQueue) i.getArguments()[1], "127.0.0.1")); when(this.messageService.lockBatchMQ(any(), eq(addressableMessageQueue1), any(), anyLong())) .thenReturn(CompletableFuture.completedFuture(Sets.newHashSet(mq1))); when(this.messageService.lockBatchMQ(any(), eq(addressableMessageQueue2), any(), anyLong())) .thenReturn(CompletableFuture.completedFuture(Sets.newHashSet(mq2))); - Set result = this.consumerProcessor.lockBatchMQ(null, mqSet, CONSUMER_GROUP, CLIENT_ID, 1000) + Set result = this.consumerProcessor.lockBatchMQ(ProxyContext.create(), mqSet, CONSUMER_GROUP, CLIENT_ID, 1000) .get(); assertThat(result).isEqualTo(mqSet); } @@ -210,12 +211,12 @@ public void testLockBatchPartialSuccess() throws Throwable { AddressableMessageQueue addressableMessageQueue2 = new AddressableMessageQueue(mq2, "127.0.0.1"); mqSet.add(mq1); mqSet.add(mq2); - when(this.topicRouteService.buildAddressableMessageQueue(any())).thenAnswer(i -> new AddressableMessageQueue((MessageQueue) i.getArguments()[0], "127.0.0.1")); + when(this.topicRouteService.buildAddressableMessageQueue(any(), any())).thenAnswer(i -> new AddressableMessageQueue((MessageQueue) i.getArguments()[1], "127.0.0.1")); when(this.messageService.lockBatchMQ(any(), eq(addressableMessageQueue1), any(), anyLong())) .thenReturn(CompletableFuture.completedFuture(Sets.newHashSet(mq1))); when(this.messageService.lockBatchMQ(any(), eq(addressableMessageQueue2), any(), anyLong())) .thenReturn(CompletableFuture.completedFuture(Sets.newHashSet())); - Set result = this.consumerProcessor.lockBatchMQ(null, mqSet, CONSUMER_GROUP, CLIENT_ID, 1000) + Set result = this.consumerProcessor.lockBatchMQ(ProxyContext.create(), mqSet, CONSUMER_GROUP, CLIENT_ID, 1000) .get(); assertThat(result).isEqualTo(Sets.newHashSet(mq1)); } @@ -229,14 +230,14 @@ public void testLockBatchPartialSuccessWithException() throws Throwable { AddressableMessageQueue addressableMessageQueue2 = new AddressableMessageQueue(mq2, "127.0.0.1"); mqSet.add(mq1); mqSet.add(mq2); - when(this.topicRouteService.buildAddressableMessageQueue(any())).thenAnswer(i -> new AddressableMessageQueue((MessageQueue) i.getArguments()[0], "127.0.0.1")); + when(this.topicRouteService.buildAddressableMessageQueue(any(), any())).thenAnswer(i -> new AddressableMessageQueue((MessageQueue) i.getArguments()[1], "127.0.0.1")); when(this.messageService.lockBatchMQ(any(), eq(addressableMessageQueue1), any(), anyLong())) .thenReturn(CompletableFuture.completedFuture(Sets.newHashSet(mq1))); CompletableFuture> future = new CompletableFuture<>(); future.completeExceptionally(new MQBrokerException(1, "err")); when(this.messageService.lockBatchMQ(any(), eq(addressableMessageQueue2), any(), anyLong())) .thenReturn(future); - Set result = this.consumerProcessor.lockBatchMQ(null, mqSet, CONSUMER_GROUP, CLIENT_ID, 1000) + Set result = this.consumerProcessor.lockBatchMQ(ProxyContext.create(), mqSet, CONSUMER_GROUP, CLIENT_ID, 1000) .get(); assertThat(result).isEqualTo(Sets.newHashSet(mq1)); } diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ProducerProcessorTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ProducerProcessorTest.java index 213e6a6bebf..de63b7e75f8 100644 --- a/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ProducerProcessorTest.java +++ b/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ProducerProcessorTest.java @@ -70,7 +70,7 @@ public void before() throws Throwable { @Test public void testSendMessage() throws Throwable { - when(metadataService.getTopicMessageType(eq(TOPIC))).thenReturn(TopicMessageType.NORMAL); + when(metadataService.getTopicMessageType(any(), eq(TOPIC))).thenReturn(TopicMessageType.NORMAL); String txId = MessageClientIDSetter.createUniqID(); String msgId = MessageClientIDSetter.createUniqID(); long commitLogOffset = 1000L; @@ -96,6 +96,7 @@ public void testSendMessage() throws Throwable { ArgumentCaptor tranStateTableOffsetCaptor = ArgumentCaptor.forClass(Long.class); ArgumentCaptor commitLogOffsetCaptor = ArgumentCaptor.forClass(Long.class); when(transactionService.addTransactionDataByBrokerName( + any(), brokerNameCaptor.capture(), anyString(), tranStateTableOffsetCaptor.capture(), @@ -150,6 +151,7 @@ public void testSendRetryMessage() throws Throwable { ArgumentCaptor tranStateTableOffsetCaptor = ArgumentCaptor.forClass(Long.class); ArgumentCaptor commitLogOffsetCaptor = ArgumentCaptor.forClass(Long.class); when(transactionService.addTransactionDataByBrokerName( + any(), brokerNameCaptor.capture(), anyString(), tranStateTableOffsetCaptor.capture(), diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessorTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessorTest.java index c0bff981f0e..7206e6b791a 100644 --- a/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessorTest.java +++ b/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessorTest.java @@ -107,8 +107,8 @@ public void setup() { @Test public void testAddReceiptHandle() { Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL); - receiptHandleProcessor.addReceiptHandle(channel, GROUP, MSG_ID, receiptHandle, messageReceiptHandle); - Mockito.when(metadataService.getSubscriptionGroupConfig(Mockito.eq(GROUP))).thenReturn(new SubscriptionGroupConfig()); + receiptHandleProcessor.addReceiptHandle(PROXY_CONTEXT, channel, GROUP, MSG_ID, receiptHandle, messageReceiptHandle); + Mockito.when(metadataService.getSubscriptionGroupConfig(Mockito.any(), Mockito.eq(GROUP))).thenReturn(new SubscriptionGroupConfig()); Mockito.when(messagingProcessor.findConsumerChannel(Mockito.any(), Mockito.eq(GROUP), Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class)); receiptHandleProcessor.scheduleRenewTask(); Mockito.verify(messagingProcessor, Mockito.timeout(1000).times(1)) @@ -120,9 +120,9 @@ public void testAddReceiptHandle() { public void testRenewReceiptHandle() { ProxyConfig config = ConfigurationManager.getProxyConfig(); Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL); - receiptHandleProcessor.addReceiptHandle(channel, GROUP, MSG_ID, receiptHandle, messageReceiptHandle); + receiptHandleProcessor.addReceiptHandle(PROXY_CONTEXT, channel, GROUP, MSG_ID, receiptHandle, messageReceiptHandle); SubscriptionGroupConfig groupConfig = new SubscriptionGroupConfig(); - Mockito.when(metadataService.getSubscriptionGroupConfig(Mockito.eq(GROUP))).thenReturn(groupConfig); + Mockito.when(metadataService.getSubscriptionGroupConfig(Mockito.any(), Mockito.eq(GROUP))).thenReturn(groupConfig); Mockito.when(messagingProcessor.findConsumerChannel(Mockito.any(), Mockito.eq(GROUP), Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class)); long newInvisibleTime = 18000L; @@ -167,7 +167,7 @@ public void testRenewExceedMaxRenewTimes() { ProxyConfig config = ConfigurationManager.getProxyConfig(); Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL); Mockito.when(messagingProcessor.findConsumerChannel(Mockito.any(), Mockito.eq(GROUP), Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class)); - receiptHandleProcessor.addReceiptHandle(channel, GROUP, MSG_ID, receiptHandle, messageReceiptHandle); + receiptHandleProcessor.addReceiptHandle(PROXY_CONTEXT, channel, GROUP, MSG_ID, receiptHandle, messageReceiptHandle); CompletableFuture ackResultFuture = new CompletableFuture<>(); ackResultFuture.completeExceptionally(new MQClientException(0, "error")); @@ -197,7 +197,7 @@ public void testRenewExceedMaxRenewTimes() { public void testRenewWithInvalidHandle() { Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL); Mockito.when(messagingProcessor.findConsumerChannel(Mockito.any(), Mockito.eq(GROUP), Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class)); - receiptHandleProcessor.addReceiptHandle(channel, GROUP, MSG_ID, receiptHandle, messageReceiptHandle); + receiptHandleProcessor.addReceiptHandle(PROXY_CONTEXT, channel, GROUP, MSG_ID, receiptHandle, messageReceiptHandle); CompletableFuture ackResultFuture = new CompletableFuture<>(); ackResultFuture.completeExceptionally(new ProxyException(ProxyExceptionCode.INVALID_RECEIPT_HANDLE, "error")); @@ -221,7 +221,7 @@ public void testRenewWithErrorThenOK() { ProxyConfig config = ConfigurationManager.getProxyConfig(); Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL); Mockito.when(messagingProcessor.findConsumerChannel(Mockito.any(), Mockito.eq(GROUP), Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class)); - receiptHandleProcessor.addReceiptHandle(channel, GROUP, MSG_ID, receiptHandle, messageReceiptHandle); + receiptHandleProcessor.addReceiptHandle(PROXY_CONTEXT, channel, GROUP, MSG_ID, receiptHandle, messageReceiptHandle); AtomicInteger count = new AtomicInteger(0); List> futureList = new ArrayList<>(); @@ -299,10 +299,10 @@ public void testRenewReceiptHandleWhenTimeout() { messageReceiptHandle = new MessageReceiptHandle(GROUP, TOPIC, QUEUE_ID, newReceiptHandle, MESSAGE_ID, OFFSET, RECONSUME_TIMES); Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL); - receiptHandleProcessor.addReceiptHandle(channel, GROUP, MSG_ID, newReceiptHandle, messageReceiptHandle); + receiptHandleProcessor.addReceiptHandle(PROXY_CONTEXT, channel, GROUP, MSG_ID, newReceiptHandle, messageReceiptHandle); Mockito.when(messagingProcessor.findConsumerChannel(Mockito.any(), Mockito.eq(GROUP), Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class)); SubscriptionGroupConfig groupConfig = new SubscriptionGroupConfig(); - Mockito.when(metadataService.getSubscriptionGroupConfig(Mockito.eq(GROUP))).thenReturn(groupConfig); + Mockito.when(metadataService.getSubscriptionGroupConfig(Mockito.any(), Mockito.eq(GROUP))).thenReturn(groupConfig); Mockito.when(messagingProcessor.changeInvisibleTime(Mockito.any(), Mockito.any(), Mockito.anyString(), Mockito.anyString(), Mockito.anyString(), Mockito.anyLong())) .thenReturn(CompletableFuture.completedFuture(new AckResult())); receiptHandleProcessor.scheduleRenewTask(); @@ -333,9 +333,9 @@ public void testRenewReceiptHandleWhenTimeoutWithNoSubscription() { messageReceiptHandle = new MessageReceiptHandle(GROUP, TOPIC, QUEUE_ID, newReceiptHandle, MESSAGE_ID, OFFSET, RECONSUME_TIMES); Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL); - receiptHandleProcessor.addReceiptHandle(channel, GROUP, MSG_ID, newReceiptHandle, messageReceiptHandle); + receiptHandleProcessor.addReceiptHandle(PROXY_CONTEXT, channel, GROUP, MSG_ID, newReceiptHandle, messageReceiptHandle); Mockito.when(messagingProcessor.findConsumerChannel(Mockito.any(), Mockito.eq(GROUP), Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class)); - Mockito.when(metadataService.getSubscriptionGroupConfig(Mockito.eq(GROUP))).thenReturn(null); + Mockito.when(metadataService.getSubscriptionGroupConfig(Mockito.any(), Mockito.eq(GROUP))).thenReturn(null); Mockito.when(messagingProcessor.changeInvisibleTime(Mockito.any(), Mockito.any(), Mockito.anyString(), Mockito.anyString(), Mockito.anyString(), Mockito.anyLong())) .thenReturn(CompletableFuture.completedFuture(new AckResult())); receiptHandleProcessor.scheduleRenewTask(); @@ -369,9 +369,9 @@ public void testRenewReceiptHandleWhenNotArrivingTime() { messageReceiptHandle = new MessageReceiptHandle(GROUP, TOPIC, QUEUE_ID, newReceiptHandle, MESSAGE_ID, OFFSET, RECONSUME_TIMES); Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL); - receiptHandleProcessor.addReceiptHandle(channel, GROUP, MSG_ID, newReceiptHandle, messageReceiptHandle); + receiptHandleProcessor.addReceiptHandle(PROXY_CONTEXT, channel, GROUP, MSG_ID, newReceiptHandle, messageReceiptHandle); SubscriptionGroupConfig groupConfig = new SubscriptionGroupConfig(); - Mockito.when(metadataService.getSubscriptionGroupConfig(Mockito.eq(GROUP))).thenReturn(groupConfig); + Mockito.when(metadataService.getSubscriptionGroupConfig(Mockito.any(), Mockito.eq(GROUP))).thenReturn(groupConfig); Mockito.when(messagingProcessor.findConsumerChannel(Mockito.any(), Mockito.eq(GROUP), Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class)); receiptHandleProcessor.scheduleRenewTask(); Mockito.verify(messagingProcessor, Mockito.timeout(1000).times(0)) @@ -382,10 +382,10 @@ public void testRenewReceiptHandleWhenNotArrivingTime() { @Test public void testRemoveReceiptHandle() { Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL); - receiptHandleProcessor.addReceiptHandle(channel, GROUP, MSG_ID, receiptHandle, messageReceiptHandle); - receiptHandleProcessor.removeReceiptHandle(channel, GROUP, MSG_ID, receiptHandle); + receiptHandleProcessor.addReceiptHandle(PROXY_CONTEXT, channel, GROUP, MSG_ID, receiptHandle, messageReceiptHandle); + receiptHandleProcessor.removeReceiptHandle(PROXY_CONTEXT, channel, GROUP, MSG_ID, receiptHandle); SubscriptionGroupConfig groupConfig = new SubscriptionGroupConfig(); - Mockito.when(metadataService.getSubscriptionGroupConfig(Mockito.eq(GROUP))).thenReturn(groupConfig); + Mockito.when(metadataService.getSubscriptionGroupConfig(Mockito.any(), Mockito.eq(GROUP))).thenReturn(groupConfig); receiptHandleProcessor.scheduleRenewTask(); Mockito.verify(messagingProcessor, Mockito.timeout(1000).times(0)) .changeInvisibleTime(Mockito.any(ProxyContext.class), Mockito.any(ReceiptHandle.class), Mockito.anyString(), @@ -395,10 +395,10 @@ public void testRemoveReceiptHandle() { @Test public void testClearGroup() { Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL); - receiptHandleProcessor.addReceiptHandle(channel, GROUP, MSG_ID, receiptHandle, messageReceiptHandle); + receiptHandleProcessor.addReceiptHandle(PROXY_CONTEXT, channel, GROUP, MSG_ID, receiptHandle, messageReceiptHandle); receiptHandleProcessor.clearGroup(new ReceiptHandleProcessor.ReceiptHandleGroupKey(channel, GROUP)); SubscriptionGroupConfig groupConfig = new SubscriptionGroupConfig(); - Mockito.when(metadataService.getSubscriptionGroupConfig(Mockito.eq(GROUP))).thenReturn(groupConfig); + Mockito.when(metadataService.getSubscriptionGroupConfig(Mockito.any(), Mockito.eq(GROUP))).thenReturn(groupConfig); receiptHandleProcessor.scheduleRenewTask(); Mockito.verify(messagingProcessor, Mockito.timeout(1000).times(1)) .changeInvisibleTime(Mockito.any(ProxyContext.class), Mockito.any(ReceiptHandle.class), Mockito.eq(MESSAGE_ID), @@ -410,7 +410,7 @@ public void testClientOffline() { ArgumentCaptor listenerArgumentCaptor = ArgumentCaptor.forClass(ConsumerIdsChangeListener.class); Mockito.verify(messagingProcessor, Mockito.times(1)).registerConsumerListener(listenerArgumentCaptor.capture()); Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL); - receiptHandleProcessor.addReceiptHandle(channel, GROUP, MSG_ID, receiptHandle, messageReceiptHandle); + receiptHandleProcessor.addReceiptHandle(PROXY_CONTEXT, channel, GROUP, MSG_ID, receiptHandle, messageReceiptHandle); listenerArgumentCaptor.getValue().handle(ConsumerGroupEvent.CLIENT_UNREGISTER, GROUP, new ClientChannelInfo(channel, "", LanguageCode.JAVA, 0)); assertTrue(receiptHandleProcessor.receiptHandleGroupMap.isEmpty()); } diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/processor/TransactionProcessorTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/processor/TransactionProcessorTest.java index f9473b450e3..6bffb15bd13 100644 --- a/proxy/src/test/java/org/apache/rocketmq/proxy/processor/TransactionProcessorTest.java +++ b/proxy/src/test/java/org/apache/rocketmq/proxy/processor/TransactionProcessorTest.java @@ -54,7 +54,7 @@ public void testEndTransaction() throws Throwable { protected void testEndTransaction(int sysFlag, TransactionStatus transactionStatus) throws Throwable { when(this.messageService.endTransactionOneway(any(), any(), any(), anyLong())).thenReturn(CompletableFuture.completedFuture(null)); ArgumentCaptor commitOrRollbackCaptor = ArgumentCaptor.forClass(Integer.class); - when(transactionService.genEndTransactionRequestHeader(anyString(), commitOrRollbackCaptor.capture(), anyBoolean(), anyString(), anyString())) + when(transactionService.genEndTransactionRequestHeader(any(), anyString(), commitOrRollbackCaptor.capture(), anyBoolean(), anyString(), anyString())) .thenReturn(new EndTransactionRequestData("brokerName", new EndTransactionRequestHeader())); this.transactionProcessor.endTransaction( diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/activity/SendMessageActivityTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/activity/SendMessageActivityTest.java index b88f6677ef4..9d897642fdb 100644 --- a/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/activity/SendMessageActivityTest.java +++ b/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/activity/SendMessageActivityTest.java @@ -76,7 +76,7 @@ public void setup() { @Test public void testSendMessage() throws Exception { - when(metadataServiceMock.getTopicMessageType(eq(topic))).thenReturn(TopicMessageType.NORMAL); + when(metadataServiceMock.getTopicMessageType(any(), eq(topic))).thenReturn(TopicMessageType.NORMAL); Message message = new Message(topic, "123".getBytes()); message.putUserProperty("a", "b"); SendMessageRequestHeader sendMessageRequestHeader = new SendMessageRequestHeader(); diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/service/BaseServiceTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/service/BaseServiceTest.java index baecd474283..c97bd5a72b4 100644 --- a/proxy/src/test/java/org/apache/rocketmq/proxy/service/BaseServiceTest.java +++ b/proxy/src/test/java/org/apache/rocketmq/proxy/service/BaseServiceTest.java @@ -35,6 +35,7 @@ import org.junit.runner.RunWith; import org.mockito.junit.MockitoJUnitRunner; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -76,8 +77,8 @@ public void before() throws Throwable { brokerData.setBrokerAddrs(brokerAddrs); topicRouteData.setBrokerDatas(Lists.newArrayList(brokerData)); - when(this.topicRouteService.getAllMessageQueueView(eq(ERR_TOPIC))).thenThrow(new MQClientException(ResponseCode.TOPIC_NOT_EXIST, "")); - when(this.topicRouteService.getAllMessageQueueView(eq(TOPIC))).thenReturn(new MessageQueueView(TOPIC, topicRouteData)); - when(this.topicRouteService.getAllMessageQueueView(eq(CLUSTER_NAME))).thenReturn(new MessageQueueView(CLUSTER_NAME, topicRouteData)); + when(this.topicRouteService.getAllMessageQueueView(any(), eq(ERR_TOPIC))).thenThrow(new MQClientException(ResponseCode.TOPIC_NOT_EXIST, "")); + when(this.topicRouteService.getAllMessageQueueView(any(), eq(TOPIC))).thenReturn(new MessageQueueView(TOPIC, topicRouteData)); + when(this.topicRouteService.getAllMessageQueueView(any(), eq(CLUSTER_NAME))).thenReturn(new MessageQueueView(CLUSTER_NAME, topicRouteData)); } } diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/service/message/ClusterMessageServiceTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/service/message/ClusterMessageServiceTest.java index 734207f025b..7e4d25f0c09 100644 --- a/proxy/src/test/java/org/apache/rocketmq/proxy/service/message/ClusterMessageServiceTest.java +++ b/proxy/src/test/java/org/apache/rocketmq/proxy/service/message/ClusterMessageServiceTest.java @@ -32,6 +32,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -50,7 +51,7 @@ public void before() { @Test public void testAckMessageByInvalidBrokerNameHandle() throws Exception { - when(topicRouteService.getBrokerAddr(anyString())).thenThrow(new MQClientException(ResponseCode.TOPIC_NOT_EXIST, "")); + when(topicRouteService.getBrokerAddr(any(), anyString())).thenThrow(new MQClientException(ResponseCode.TOPIC_NOT_EXIST, "")); try { this.clusterMessageService.ackMessage( ProxyContext.create(), diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/service/metadata/ClusterMetadataServiceTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/service/metadata/ClusterMetadataServiceTest.java index 50afbc48f85..98bf1104f8b 100644 --- a/proxy/src/test/java/org/apache/rocketmq/proxy/service/metadata/ClusterMetadataServiceTest.java +++ b/proxy/src/test/java/org/apache/rocketmq/proxy/service/metadata/ClusterMetadataServiceTest.java @@ -19,6 +19,7 @@ import java.util.HashMap; import org.apache.rocketmq.common.attribute.TopicMessageType; +import org.apache.rocketmq.proxy.common.ProxyContext; import org.apache.rocketmq.proxy.config.ConfigurationManager; import org.apache.rocketmq.proxy.service.BaseServiceTest; import org.apache.rocketmq.remoting.protocol.statictopic.TopicConfigAndQueueMapping; @@ -54,17 +55,19 @@ public void before() throws Throwable { @Test public void testGetTopicMessageType() { - assertEquals(TopicMessageType.UNSPECIFIED, this.clusterMetadataService.getTopicMessageType(ERR_TOPIC)); + ProxyContext ctx = ProxyContext.create(); + assertEquals(TopicMessageType.UNSPECIFIED, this.clusterMetadataService.getTopicMessageType(ctx, ERR_TOPIC)); assertEquals(1, this.clusterMetadataService.topicConfigCache.asMap().size()); - assertEquals(TopicMessageType.UNSPECIFIED, this.clusterMetadataService.getTopicMessageType(ERR_TOPIC)); + assertEquals(TopicMessageType.UNSPECIFIED, this.clusterMetadataService.getTopicMessageType(ctx, ERR_TOPIC)); - assertEquals(TopicMessageType.NORMAL, this.clusterMetadataService.getTopicMessageType(TOPIC)); + assertEquals(TopicMessageType.NORMAL, this.clusterMetadataService.getTopicMessageType(ctx, TOPIC)); assertEquals(2, this.clusterMetadataService.topicConfigCache.asMap().size()); } @Test public void testGetSubscriptionGroupConfig() { - assertNotNull(this.clusterMetadataService.getSubscriptionGroupConfig(GROUP)); + ProxyContext ctx = ProxyContext.create(); + assertNotNull(this.clusterMetadataService.getSubscriptionGroupConfig(ctx, GROUP)); assertEquals(1, this.clusterMetadataService.subscriptionGroupConfigCache.asMap().size()); } } diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/service/route/ClusterTopicRouteServiceTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/service/route/ClusterTopicRouteServiceTest.java index 9b27eb56c48..b5fc1b6713f 100644 --- a/proxy/src/test/java/org/apache/rocketmq/proxy/service/route/ClusterTopicRouteServiceTest.java +++ b/proxy/src/test/java/org/apache/rocketmq/proxy/service/route/ClusterTopicRouteServiceTest.java @@ -29,6 +29,7 @@ import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.thread.ThreadPoolMonitor; import org.apache.rocketmq.proxy.common.Address; +import org.apache.rocketmq.proxy.common.ProxyContext; import org.apache.rocketmq.proxy.service.BaseServiceTest; import org.apache.rocketmq.remoting.protocol.ResponseCode; import org.assertj.core.util.Lists; @@ -61,18 +62,20 @@ public void before() throws Throwable { @Test public void testGetCurrentMessageQueueView() throws Throwable { - MQClientException exception = catchThrowableOfType(() -> this.topicRouteService.getCurrentMessageQueueView(ERR_TOPIC), MQClientException.class); + ProxyContext ctx = ProxyContext.create(); + MQClientException exception = catchThrowableOfType(() -> this.topicRouteService.getCurrentMessageQueueView(ctx, ERR_TOPIC), MQClientException.class); assertTrue(TopicRouteHelper.isTopicNotExistError(exception)); assertEquals(1, this.topicRouteService.topicCache.asMap().size()); - assertNotNull(this.topicRouteService.getCurrentMessageQueueView(TOPIC)); + assertNotNull(this.topicRouteService.getCurrentMessageQueueView(ctx, TOPIC)); assertEquals(2, this.topicRouteService.topicCache.asMap().size()); } @Test public void testGetTopicRouteForProxy() throws Throwable { + ProxyContext ctx = ProxyContext.create(); List
addressList = Lists.newArrayList(new Address(Address.AddressScheme.IPv4, HostAndPort.fromParts("127.0.0.1", 8888))); - ProxyTopicRouteData proxyTopicRouteData = this.topicRouteService.getTopicRouteForProxy(addressList, TOPIC); + ProxyTopicRouteData proxyTopicRouteData = this.topicRouteService.getTopicRouteForProxy(ctx, addressList, TOPIC); assertEquals(1, proxyTopicRouteData.getBrokerDatas().size()); assertEquals(addressList, proxyTopicRouteData.getBrokerDatas().get(0).getBrokerAddrs().get(MixAll.MASTER_ID)); diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/service/route/LocalTopicRouteServiceTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/service/route/LocalTopicRouteServiceTest.java index 4948cddc2e2..1ad39a1db64 100644 --- a/proxy/src/test/java/org/apache/rocketmq/proxy/service/route/LocalTopicRouteServiceTest.java +++ b/proxy/src/test/java/org/apache/rocketmq/proxy/service/route/LocalTopicRouteServiceTest.java @@ -29,6 +29,7 @@ import org.apache.rocketmq.common.TopicConfig; import org.apache.rocketmq.common.constant.PermName; import org.apache.rocketmq.proxy.common.Address; +import org.apache.rocketmq.proxy.common.ProxyContext; import org.apache.rocketmq.proxy.config.ConfigurationManager; import org.apache.rocketmq.proxy.service.BaseServiceTest; import org.apache.rocketmq.remoting.protocol.ResponseCode; @@ -76,8 +77,9 @@ public void before() throws Throwable { @Test public void testGetCurrentMessageQueueView() throws Throwable { + ProxyContext ctx = ProxyContext.create(); this.topicConfigTable.put(TOPIC, new TopicConfig(TOPIC, 3, 2, PermName.PERM_WRITE | PermName.PERM_READ)); - MessageQueueView messageQueueView = this.topicRouteService.getCurrentMessageQueueView(TOPIC); + MessageQueueView messageQueueView = this.topicRouteService.getCurrentMessageQueueView(ctx, TOPIC); assertEquals(3, messageQueueView.getReadSelector().getQueues().size()); assertEquals(2, messageQueueView.getWriteSelector().getQueues().size()); assertEquals(1, messageQueueView.getReadSelector().getBrokerActingQueues().size()); @@ -90,7 +92,8 @@ public void testGetCurrentMessageQueueView() throws Throwable { @Test public void testGetTopicRouteForProxy() throws Throwable { - ProxyTopicRouteData proxyTopicRouteData = this.topicRouteService.getTopicRouteForProxy(new ArrayList<>(), TOPIC); + ProxyContext ctx = ProxyContext.create(); + ProxyTopicRouteData proxyTopicRouteData = this.topicRouteService.getTopicRouteForProxy(ctx, new ArrayList<>(), TOPIC); assertEquals(1, proxyTopicRouteData.getBrokerDatas().size()); assertEquals( diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncerTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncerTest.java index 65405235985..6373aba3085 100644 --- a/proxy/src/test/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncerTest.java +++ b/proxy/src/test/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncerTest.java @@ -133,7 +133,7 @@ public void before() throws Throwable { brokerData.setBrokerAddrs(brokerAddr); topicRouteData.getBrokerDatas().add(brokerData); MessageQueueView messageQueueView = new MessageQueueView("foo", topicRouteData); - when(this.topicRouteService.getAllMessageQueueView(anyString())).thenReturn(messageQueueView); + when(this.topicRouteService.getAllMessageQueueView(any(), anyString())).thenReturn(messageQueueView); } } diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/service/transaction/AbstractTransactionServiceTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/service/transaction/AbstractTransactionServiceTest.java index 6e4af2e8f60..81de5ec843a 100644 --- a/proxy/src/test/java/org/apache/rocketmq/proxy/service/transaction/AbstractTransactionServiceTest.java +++ b/proxy/src/test/java/org/apache/rocketmq/proxy/service/transaction/AbstractTransactionServiceTest.java @@ -37,6 +37,7 @@ public class AbstractTransactionServiceTest extends InitConfigTest { private static final String BROKER_NAME = "mockBroker"; private static final String PRODUCER_GROUP = "producerGroup"; private static final Random RANDOM = new Random(); + private final ProxyContext ctx = ProxyContext.createForInner(this.getClass()); public static class MockAbstractTransactionServiceTest extends AbstractTransactionService { @@ -46,22 +47,22 @@ protected String getBrokerNameByAddr(String brokerAddr) { } @Override - public void addTransactionSubscription(String group, List topicList) { + public void addTransactionSubscription(ProxyContext ctx, String group, List topicList) { } @Override - public void addTransactionSubscription(String group, String topic) { + public void addTransactionSubscription(ProxyContext ctx, String group, String topic) { } @Override - public void replaceTransactionSubscription(String group, List topicList) { + public void replaceTransactionSubscription(ProxyContext ctx, String group, List topicList) { } @Override - public void unSubscribeAllTransactionTopic(String group) { + public void unSubscribeAllTransactionTopic(ProxyContext ctx, String group) { } } @@ -81,6 +82,7 @@ public void testAddAndGenEndHeader() { String txId = MessageClientIDSetter.createUniqID(); TransactionData transactionData = transactionService.addTransactionDataByBrokerName( + ctx, BROKER_NAME, PRODUCER_GROUP, RANDOM.nextLong(), @@ -91,6 +93,7 @@ public void testAddAndGenEndHeader() { assertNotNull(transactionData); EndTransactionRequestData requestData = transactionService.genEndTransactionRequestHeader( + ctx, PRODUCER_GROUP, MessageSysFlag.TRANSACTION_COMMIT_TYPE, true, @@ -104,6 +107,7 @@ public void testAddAndGenEndHeader() { assertEquals(transactionData.getTranStateTableOffset(), requestData.getRequestHeader().getTranStateTableOffset().longValue()); assertNull(transactionService.genEndTransactionRequestHeader( + ctx, "group", MessageSysFlag.TRANSACTION_COMMIT_TYPE, true, @@ -119,6 +123,7 @@ public void testOnSendCheckTransactionStateFailedFailed() { String txId = MessageClientIDSetter.createUniqID(); TransactionData transactionData = transactionService.addTransactionDataByBrokerName( + ctx, BROKER_NAME, PRODUCER_GROUP, RANDOM.nextLong(), @@ -128,6 +133,7 @@ public void testOnSendCheckTransactionStateFailedFailed() { ); transactionService.onSendCheckTransactionStateFailed(ProxyContext.createForInner(this.getClass()), PRODUCER_GROUP, transactionData); assertNull(transactionService.genEndTransactionRequestHeader( + ctx, PRODUCER_GROUP, MessageSysFlag.TRANSACTION_COMMIT_TYPE, true, diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/service/transaction/ClusterTransactionServiceTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/service/transaction/ClusterTransactionServiceTest.java index 2b568393016..a0063544ecf 100644 --- a/proxy/src/test/java/org/apache/rocketmq/proxy/service/transaction/ClusterTransactionServiceTest.java +++ b/proxy/src/test/java/org/apache/rocketmq/proxy/service/transaction/ClusterTransactionServiceTest.java @@ -26,6 +26,7 @@ import java.util.stream.Collectors; import org.apache.rocketmq.broker.client.ProducerManager; import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.proxy.common.ProxyContext; import org.apache.rocketmq.proxy.config.ConfigurationManager; import org.apache.rocketmq.proxy.service.BaseServiceTest; import org.apache.rocketmq.proxy.service.route.MessageQueueView; @@ -44,6 +45,7 @@ import static org.awaitility.Awaitility.await; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.eq; @@ -53,7 +55,7 @@ public class ClusterTransactionServiceTest extends BaseServiceTest { @Mock private ProducerManager producerManager; - + private ProxyContext ctx = ProxyContext.create(); private ClusterTransactionService clusterTransactionService; @Before @@ -63,7 +65,7 @@ public void before() throws Throwable { this.mqClientAPIFactory); MessageQueueView messageQueueView = new MessageQueueView(TOPIC, topicRouteData); - when(this.topicRouteService.getAllMessageQueueView(anyString())) + when(this.topicRouteService.getAllMessageQueueView(any(), anyString())) .thenReturn(messageQueueView); when(mqClientAPIFactory.getClient()).thenReturn(mqClientAPIExt); @@ -71,7 +73,7 @@ public void before() throws Throwable { @Test public void testAddTransactionSubscription() { - this.clusterTransactionService.addTransactionSubscription(GROUP, TOPIC); + this.clusterTransactionService.addTransactionSubscription(ctx, GROUP, TOPIC); assertEquals(1, this.clusterTransactionService.getGroupClusterData().size()); assertEquals(CLUSTER_NAME, this.clusterTransactionService.getGroupClusterData().get(GROUP).stream().findAny().get().getCluster()); @@ -79,7 +81,7 @@ public void testAddTransactionSubscription() { @Test public void testAddTransactionSubscriptionTopicList() { - this.clusterTransactionService.addTransactionSubscription(GROUP, Lists.newArrayList(TOPIC + 1, TOPIC + 2)); + this.clusterTransactionService.addTransactionSubscription(ctx, GROUP, Lists.newArrayList(TOPIC + 1, TOPIC + 2)); assertEquals(1, this.clusterTransactionService.getGroupClusterData().size()); assertEquals(CLUSTER_NAME, this.clusterTransactionService.getGroupClusterData().get(GROUP).stream().findAny().get().getCluster()); @@ -87,21 +89,21 @@ public void testAddTransactionSubscriptionTopicList() { @Test public void testReplaceTransactionSubscription() { - this.clusterTransactionService.addTransactionSubscription(GROUP, TOPIC); + this.clusterTransactionService.addTransactionSubscription(ctx, GROUP, TOPIC); assertEquals(1, this.clusterTransactionService.getGroupClusterData().size()); assertEquals(CLUSTER_NAME, this.clusterTransactionService.getGroupClusterData().get(GROUP).stream().findAny().get().getCluster()); this.brokerData.setCluster(CLUSTER_NAME + 1); - this.clusterTransactionService.replaceTransactionSubscription(GROUP, Lists.newArrayList(TOPIC + 1)); + this.clusterTransactionService.replaceTransactionSubscription(ctx, GROUP, Lists.newArrayList(TOPIC + 1)); assertEquals(1, this.clusterTransactionService.getGroupClusterData().size()); assertEquals(CLUSTER_NAME + 1, this.clusterTransactionService.getGroupClusterData().get(GROUP).stream().findAny().get().getCluster()); } @Test public void testUnSubscribeAllTransactionTopic() { - this.clusterTransactionService.addTransactionSubscription(GROUP, TOPIC); - this.clusterTransactionService.unSubscribeAllTransactionTopic(GROUP); + this.clusterTransactionService.addTransactionSubscription(ctx, GROUP, TOPIC); + this.clusterTransactionService.unSubscribeAllTransactionTopic(ctx, GROUP); assertEquals(0, this.clusterTransactionService.getGroupClusterData().size()); } @@ -125,7 +127,7 @@ public void testScanProducerHeartBeat() throws Exception { brokerData.setBrokerAddrs(brokerAddrs); topicRouteData.getQueueDatas().add(queueData); topicRouteData.getBrokerDatas().add(brokerData); - when(this.topicRouteService.getAllMessageQueueView(eq(TOPIC))).thenReturn(new MessageQueueView(TOPIC, topicRouteData)); + when(this.topicRouteService.getAllMessageQueueView(any(), eq(TOPIC))).thenReturn(new MessageQueueView(TOPIC, topicRouteData)); TopicRouteData clusterTopicRouteData = new TopicRouteData(); QueueData clusterQueueData = new QueueData(); @@ -139,7 +141,7 @@ public void testScanProducerHeartBeat() throws Exception { brokerAddrs.put(MixAll.MASTER_ID, BROKER_ADDR); clusterBrokerData.setBrokerAddrs(brokerAddrs); clusterTopicRouteData.setBrokerDatas(Lists.newArrayList(clusterBrokerData)); - when(this.topicRouteService.getAllMessageQueueView(eq(CLUSTER_NAME))).thenReturn(new MessageQueueView(CLUSTER_NAME, clusterTopicRouteData)); + when(this.topicRouteService.getAllMessageQueueView(any(), eq(CLUSTER_NAME))).thenReturn(new MessageQueueView(CLUSTER_NAME, clusterTopicRouteData)); TopicRouteData clusterTopicRouteData2 = new TopicRouteData(); QueueData clusterQueueData2 = new QueueData(); @@ -153,7 +155,7 @@ public void testScanProducerHeartBeat() throws Exception { brokerAddrs.put(MixAll.MASTER_ID, brokerAddr2); clusterBrokerData2.setBrokerAddrs(brokerAddrs); clusterTopicRouteData2.setBrokerDatas(Lists.newArrayList(clusterBrokerData2)); - when(this.topicRouteService.getAllMessageQueueView(eq(clusterName2))).thenReturn(new MessageQueueView(clusterName2, clusterTopicRouteData2)); + when(this.topicRouteService.getAllMessageQueueView(any(), eq(clusterName2))).thenReturn(new MessageQueueView(clusterName2, clusterTopicRouteData2)); ConfigurationManager.getProxyConfig().setTransactionHeartbeatBatchNum(2); this.clusterTransactionService.start(); @@ -161,7 +163,7 @@ public void testScanProducerHeartBeat() throws Exception { for (int i = 0; i < 3; i++) { groupSet.add(GROUP + i); - this.clusterTransactionService.addTransactionSubscription(GROUP + i, TOPIC); + this.clusterTransactionService.addTransactionSubscription(ctx, GROUP + i, TOPIC); } ArgumentCaptor brokerAddrArgumentCaptor = ArgumentCaptor.forClass(String.class);