diff --git a/client/src/main/java/org/apache/rocketmq/client/MqClientAdmin.java b/client/src/main/java/org/apache/rocketmq/client/MqClientAdmin.java new file mode 100644 index 00000000000..4eb74c0ca9b --- /dev/null +++ b/client/src/main/java/org/apache/rocketmq/client/MqClientAdmin.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.client; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.remoting.protocol.admin.ConsumeStats; +import org.apache.rocketmq.remoting.protocol.admin.TopicStatsTable; +import org.apache.rocketmq.remoting.protocol.body.ClusterInfo; +import org.apache.rocketmq.remoting.protocol.body.ConsumeMessageDirectlyResult; +import org.apache.rocketmq.remoting.protocol.body.ConsumerConnection; +import org.apache.rocketmq.remoting.protocol.body.ConsumerRunningInfo; +import org.apache.rocketmq.remoting.protocol.body.GroupList; +import org.apache.rocketmq.remoting.protocol.body.QueueTimeSpan; +import org.apache.rocketmq.remoting.protocol.body.TopicList; +import org.apache.rocketmq.remoting.protocol.header.ConsumeMessageDirectlyResultRequestHeader; +import org.apache.rocketmq.remoting.protocol.header.CreateTopicRequestHeader; +import org.apache.rocketmq.remoting.protocol.header.DeleteSubscriptionGroupRequestHeader; +import org.apache.rocketmq.remoting.protocol.header.DeleteTopicRequestHeader; +import org.apache.rocketmq.remoting.protocol.header.GetConsumeStatsRequestHeader; +import org.apache.rocketmq.remoting.protocol.header.GetConsumerConnectionListRequestHeader; +import org.apache.rocketmq.remoting.protocol.header.GetConsumerRunningInfoRequestHeader; +import org.apache.rocketmq.remoting.protocol.header.GetTopicStatsInfoRequestHeader; +import org.apache.rocketmq.remoting.protocol.header.QueryConsumeTimeSpanRequestHeader; +import org.apache.rocketmq.remoting.protocol.header.QueryMessageRequestHeader; +import org.apache.rocketmq.remoting.protocol.header.QuerySubscriptionByConsumerRequestHeader; +import org.apache.rocketmq.remoting.protocol.header.QueryTopicConsumeByWhoRequestHeader; +import org.apache.rocketmq.remoting.protocol.header.QueryTopicsByConsumerRequestHeader; +import org.apache.rocketmq.remoting.protocol.header.ResetOffsetRequestHeader; +import org.apache.rocketmq.remoting.protocol.header.ViewMessageRequestHeader; +import org.apache.rocketmq.remoting.protocol.header.namesrv.DeleteKVConfigRequestHeader; +import org.apache.rocketmq.remoting.protocol.header.namesrv.DeleteTopicFromNamesrvRequestHeader; +import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData; +import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig; + +public interface MqClientAdmin { + CompletableFuture> queryMessage(String address, boolean uniqueKeyFlag, boolean decompressBody, + QueryMessageRequestHeader requestHeader, long timeoutMillis); + + CompletableFuture getTopicStatsInfo(String address, + GetTopicStatsInfoRequestHeader requestHeader, long timeoutMillis); + + CompletableFuture> queryConsumeTimeSpan(String address, + QueryConsumeTimeSpanRequestHeader requestHeader, long timeoutMillis); + + CompletableFuture updateOrCreateTopic(String address, CreateTopicRequestHeader requestHeader, + long timeoutMillis); + + CompletableFuture updateOrCreateSubscriptionGroup(String address, SubscriptionGroupConfig config, + long timeoutMillis); + + CompletableFuture deleteTopicInBroker(String address, DeleteTopicRequestHeader requestHeader, + long timeoutMillis); + + CompletableFuture deleteTopicInNameserver(String address, DeleteTopicFromNamesrvRequestHeader requestHeader, + long timeoutMillis); + + CompletableFuture deleteKvConfig(String address, DeleteKVConfigRequestHeader requestHeader, + long timeoutMillis); + + CompletableFuture deleteSubscriptionGroup(String address, DeleteSubscriptionGroupRequestHeader requestHeader, + long timeoutMillis); + + CompletableFuture> invokeBrokerToResetOffset(String address, + ResetOffsetRequestHeader requestHeader, long timeoutMillis); + + CompletableFuture viewMessage(String address, ViewMessageRequestHeader requestHeader, + long timeoutMillis); + + CompletableFuture getBrokerClusterInfo(String address, long timeoutMillis); + + CompletableFuture getConsumerConnectionList(String address, + GetConsumerConnectionListRequestHeader requestHeader, long timeoutMillis); + + CompletableFuture queryTopicsByConsumer(String address, + QueryTopicsByConsumerRequestHeader requestHeader, long timeoutMillis); + + CompletableFuture querySubscriptionByConsumer(String address, + QuerySubscriptionByConsumerRequestHeader requestHeader, long timeoutMillis); + + CompletableFuture getConsumeStats(String address, GetConsumeStatsRequestHeader requestHeader, + long timeoutMillis); + + CompletableFuture queryTopicConsumeByWho(String address, + QueryTopicConsumeByWhoRequestHeader requestHeader, long timeoutMillis); + + CompletableFuture getConsumerRunningInfo(String address, + GetConsumerRunningInfoRequestHeader requestHeader, long timeoutMillis); + + CompletableFuture consumeMessageDirectly(String address, + ConsumeMessageDirectlyResultRequestHeader requestHeader, long timeoutMillis); +} diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/admin/MqClientAdminImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/admin/MqClientAdminImpl.java new file mode 100644 index 00000000000..34f066c7ddd --- /dev/null +++ b/client/src/main/java/org/apache/rocketmq/client/impl/admin/MqClientAdminImpl.java @@ -0,0 +1,438 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.client.impl.admin; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; +import org.apache.commons.lang3.StringUtils; +import org.apache.rocketmq.client.MqClientAdmin; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.common.message.MessageConst; +import org.apache.rocketmq.common.message.MessageDecoder; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.logging.org.slf4j.Logger; +import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; +import org.apache.rocketmq.remoting.RemotingClient; +import org.apache.rocketmq.remoting.protocol.RemotingCommand; +import org.apache.rocketmq.remoting.protocol.RemotingSerializable; +import org.apache.rocketmq.remoting.protocol.RequestCode; +import org.apache.rocketmq.remoting.protocol.ResponseCode; +import org.apache.rocketmq.remoting.protocol.admin.ConsumeStats; +import org.apache.rocketmq.remoting.protocol.admin.TopicStatsTable; +import org.apache.rocketmq.remoting.protocol.body.ClusterInfo; +import org.apache.rocketmq.remoting.protocol.body.ConsumeMessageDirectlyResult; +import org.apache.rocketmq.remoting.protocol.body.ConsumerConnection; +import org.apache.rocketmq.remoting.protocol.body.ConsumerRunningInfo; +import org.apache.rocketmq.remoting.protocol.body.GroupList; +import org.apache.rocketmq.remoting.protocol.body.QueryConsumeTimeSpanBody; +import org.apache.rocketmq.remoting.protocol.body.QuerySubscriptionResponseBody; +import org.apache.rocketmq.remoting.protocol.body.QueueTimeSpan; +import org.apache.rocketmq.remoting.protocol.body.ResetOffsetBody; +import org.apache.rocketmq.remoting.protocol.body.TopicList; +import org.apache.rocketmq.remoting.protocol.header.ConsumeMessageDirectlyResultRequestHeader; +import org.apache.rocketmq.remoting.protocol.header.CreateTopicRequestHeader; +import org.apache.rocketmq.remoting.protocol.header.DeleteSubscriptionGroupRequestHeader; +import org.apache.rocketmq.remoting.protocol.header.DeleteTopicRequestHeader; +import org.apache.rocketmq.remoting.protocol.header.GetConsumeStatsRequestHeader; +import org.apache.rocketmq.remoting.protocol.header.GetConsumerConnectionListRequestHeader; +import org.apache.rocketmq.remoting.protocol.header.GetConsumerRunningInfoRequestHeader; +import org.apache.rocketmq.remoting.protocol.header.GetTopicStatsInfoRequestHeader; +import org.apache.rocketmq.remoting.protocol.header.QueryConsumeTimeSpanRequestHeader; +import org.apache.rocketmq.remoting.protocol.header.QueryMessageRequestHeader; +import org.apache.rocketmq.remoting.protocol.header.QuerySubscriptionByConsumerRequestHeader; +import org.apache.rocketmq.remoting.protocol.header.QueryTopicConsumeByWhoRequestHeader; +import org.apache.rocketmq.remoting.protocol.header.QueryTopicsByConsumerRequestHeader; +import org.apache.rocketmq.remoting.protocol.header.ResetOffsetRequestHeader; +import org.apache.rocketmq.remoting.protocol.header.ViewMessageRequestHeader; +import org.apache.rocketmq.remoting.protocol.header.namesrv.DeleteKVConfigRequestHeader; +import org.apache.rocketmq.remoting.protocol.header.namesrv.DeleteTopicFromNamesrvRequestHeader; +import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData; +import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig; + +public class MqClientAdminImpl implements MqClientAdmin { + private final static Logger log = LoggerFactory.getLogger(MqClientAdminImpl.class); + private final RemotingClient remotingClient; + + public MqClientAdminImpl(RemotingClient remotingClient) { + this.remotingClient = remotingClient; + } + + @Override + public CompletableFuture> queryMessage(String address, boolean uniqueKeyFlag, boolean decompressBody, + QueryMessageRequestHeader requestHeader, long timeoutMillis) { + CompletableFuture> future = new CompletableFuture<>(); + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.QUERY_MESSAGE, requestHeader); + request.addExtField(MixAll.UNIQUE_MSG_QUERY_FLAG, String.valueOf(uniqueKeyFlag)); + remotingClient.invoke(address, request, timeoutMillis).thenAccept(response -> { + if (response.getCode() == ResponseCode.SUCCESS) { + List wrappers = MessageDecoder.decodesBatch(ByteBuffer.wrap(response.getBody()), true, decompressBody, true); + future.complete(filterMessages(wrappers, requestHeader.getTopic(), requestHeader.getKey(), uniqueKeyFlag)); + } else if (response.getCode() == ResponseCode.QUERY_NOT_FOUND) { + List wrappers = new ArrayList<>(); + future.complete(wrappers); + } else { + log.warn("queryMessage getResponseCommand failed, {} {}, header={}", response.getCode(), response.getRemark(), requestHeader); + future.completeExceptionally(new MQClientException(response.getCode(), response.getRemark())); + } + }); + + return future; + } + + @Override + public CompletableFuture getTopicStatsInfo(String address, + GetTopicStatsInfoRequestHeader requestHeader, long timeoutMillis) { + CompletableFuture future = new CompletableFuture<>(); + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_TOPIC_STATS_INFO, requestHeader); + remotingClient.invoke(address, request, timeoutMillis).thenAccept(response -> { + if (response.getCode() == ResponseCode.SUCCESS) { + TopicStatsTable topicStatsTable = TopicStatsTable.decode(response.getBody(), TopicStatsTable.class); + future.complete(topicStatsTable); + } else { + log.warn("getTopicStatsInfo getResponseCommand failed, {} {}, header={}", response.getCode(), response.getRemark(), requestHeader); + future.completeExceptionally(new MQClientException(response.getCode(), response.getRemark())); + } + }); + return future; + } + + @Override + public CompletableFuture> queryConsumeTimeSpan(String address, + QueryConsumeTimeSpanRequestHeader requestHeader, long timeoutMillis) { + CompletableFuture> future = new CompletableFuture<>(); + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.QUERY_CONSUME_TIME_SPAN, requestHeader); + remotingClient.invoke(address, request, timeoutMillis).thenAccept(response -> { + if (response.getCode() == ResponseCode.SUCCESS) { + QueryConsumeTimeSpanBody consumeTimeSpanBody = GroupList.decode(response.getBody(), QueryConsumeTimeSpanBody.class); + future.complete(consumeTimeSpanBody.getConsumeTimeSpanSet()); + } else { + log.warn("queryConsumerTimeSpan getResponseCommand failed, {} {}, header={}", response.getCode(), response.getRemark(), requestHeader); + future.completeExceptionally(new MQClientException(response.getCode(), response.getRemark())); + } + }); + return future; + } + + @Override + public CompletableFuture updateOrCreateTopic(String address, CreateTopicRequestHeader requestHeader, + long timeoutMillis) { + CompletableFuture future = new CompletableFuture<>(); + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_AND_CREATE_TOPIC, requestHeader); + remotingClient.invoke(address, request, timeoutMillis).thenAccept(response -> { + if (response.getCode() == ResponseCode.SUCCESS) { + future.complete(null); + } else { + log.warn("updateOrCreateTopic getResponseCommand failed, {} {}, header={}", response.getCode(), response.getRemark(), requestHeader); + future.completeExceptionally(new MQClientException(response.getCode(), response.getRemark())); + } + }); + return future; + } + + @Override + public CompletableFuture updateOrCreateSubscriptionGroup(String address, SubscriptionGroupConfig config, + long timeoutMillis) { + CompletableFuture future = new CompletableFuture<>(); + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_AND_CREATE_SUBSCRIPTIONGROUP, null); + byte[] body = RemotingSerializable.encode(config); + request.setBody(body); + remotingClient.invoke(address, request, timeoutMillis).thenAccept(response -> { + if (response.getCode() == ResponseCode.SUCCESS) { + future.complete(null); + } else { + log.warn("updateOrCreateSubscriptionGroup getResponseCommand failed, {} {}, header={}", response.getCode(), response.getRemark(), config); + future.completeExceptionally(new MQClientException(response.getCode(), response.getRemark())); + } + }); + return future; + } + + @Override + public CompletableFuture deleteTopicInBroker(String address, DeleteTopicRequestHeader requestHeader, + long timeoutMillis) { + CompletableFuture future = new CompletableFuture<>(); + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.DELETE_TOPIC_IN_BROKER, requestHeader); + remotingClient.invoke(address, request, timeoutMillis).thenAccept(response -> { + if (response.getCode() == ResponseCode.SUCCESS) { + future.complete(null); + } else { + log.warn("deleteTopicInBroker getResponseCommand failed, {} {}, header={}", response.getCode(), response.getRemark(), requestHeader); + future.completeExceptionally(new MQClientException(response.getCode(), response.getRemark())); + } + }); + return future; + } + + @Override + public CompletableFuture deleteTopicInNameserver(String address, DeleteTopicFromNamesrvRequestHeader requestHeader, + long timeoutMillis) { + CompletableFuture future = new CompletableFuture<>(); + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.DELETE_TOPIC_IN_NAMESRV, requestHeader); + remotingClient.invoke(address, request, timeoutMillis).thenAccept(response -> { + if (response.getCode() == ResponseCode.SUCCESS) { + future.complete(null); + } else { + log.warn("deleteTopicInNameserver getResponseCommand failed, {} {}, header={}", response.getCode(), response.getRemark(), requestHeader); + future.completeExceptionally(new MQClientException(response.getCode(), response.getRemark())); + } + }); + return future; + } + + @Override + public CompletableFuture deleteKvConfig(String address, DeleteKVConfigRequestHeader requestHeader, + long timeoutMillis) { + CompletableFuture future = new CompletableFuture<>(); + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.DELETE_KV_CONFIG, requestHeader); + remotingClient.invoke(address, request, timeoutMillis).thenAccept(response -> { + if (response.getCode() == ResponseCode.SUCCESS) { + future.complete(null); + } else { + log.warn("deleteKvConfig getResponseCommand failed, {} {}, header={}", response.getCode(), response.getRemark(), requestHeader); + future.completeExceptionally(new MQClientException(response.getCode(), response.getRemark())); + } + }); + return future; + } + + @Override + public CompletableFuture deleteSubscriptionGroup(String address, DeleteSubscriptionGroupRequestHeader requestHeader, + long timeoutMillis) { + CompletableFuture future = new CompletableFuture<>(); + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.DELETE_SUBSCRIPTIONGROUP, requestHeader); + remotingClient.invoke(address, request, timeoutMillis).thenAccept(response -> { + if (response.getCode() == ResponseCode.SUCCESS) { + future.complete(null); + } else { + log.warn("deleteSubscriptionGroup getResponseCommand failed, {} {}, header={}", response.getCode(), response.getRemark(), requestHeader); + future.completeExceptionally(new MQClientException(response.getCode(), response.getRemark())); + } + }); + return future; + } + + @Override + public CompletableFuture> invokeBrokerToResetOffset(String address, + ResetOffsetRequestHeader requestHeader, long timeoutMillis) { + CompletableFuture> future = new CompletableFuture<>(); + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.INVOKE_BROKER_TO_RESET_OFFSET, requestHeader); + remotingClient.invoke(address, request, timeoutMillis).thenAccept(response -> { + if (response.getCode() == ResponseCode.SUCCESS && null != response.getBody()) { + Map offsetTable = ResetOffsetBody.decode(response.getBody(), ResetOffsetBody.class).getOffsetTable(); + future.complete(offsetTable); + log.info("Invoke broker to reset offset success. address:{}, header:{}, offsetTable:{}", + address, requestHeader, offsetTable); + } else { + log.warn("invokeBrokerToResetOffset getResponseCommand failed, {} {}, header={}", response.getCode(), response.getRemark(), requestHeader); + future.completeExceptionally(new MQClientException(response.getCode(), response.getRemark())); + } + }); + return future; + } + + @Override + public CompletableFuture viewMessage(String address, ViewMessageRequestHeader requestHeader, + long timeoutMillis) { + CompletableFuture future = new CompletableFuture<>(); + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.VIEW_MESSAGE_BY_ID, requestHeader); + remotingClient.invoke(address, request, timeoutMillis).thenAccept(response -> { + if (response.getCode() == ResponseCode.SUCCESS) { + ByteBuffer byteBuffer = ByteBuffer.wrap(response.getBody()); + MessageExt messageExt = MessageDecoder.clientDecode(byteBuffer, true); + future.complete(messageExt); + } else { + log.warn("viewMessage getResponseCommand failed, {} {}, header={}", response.getCode(), response.getRemark(), requestHeader); + future.completeExceptionally(new MQClientException(response.getCode(), response.getRemark())); + } + }); + return future; + } + + @Override + public CompletableFuture getBrokerClusterInfo(String address, long timeoutMillis) { + CompletableFuture future = new CompletableFuture<>(); + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_BROKER_CLUSTER_INFO, null); + remotingClient.invoke(address, request, timeoutMillis).thenAccept(response -> { + if (response.getCode() == ResponseCode.SUCCESS) { + ClusterInfo clusterInfo = ClusterInfo.decode(response.getBody(), ClusterInfo.class); + future.complete(clusterInfo); + } else { + log.warn("getBrokerClusterInfo getResponseCommand failed, {} {}", response.getCode(), response.getRemark()); + future.completeExceptionally(new MQClientException(response.getCode(), response.getRemark())); + } + }); + return future; + } + + @Override + public CompletableFuture getConsumerConnectionList(String address, + GetConsumerConnectionListRequestHeader requestHeader, long timeoutMillis) { + CompletableFuture future = new CompletableFuture<>(); + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_CONSUMER_CONNECTION_LIST, requestHeader); + remotingClient.invoke(address, request, timeoutMillis).thenAccept(response -> { + if (response.getCode() == ResponseCode.SUCCESS) { + ConsumerConnection consumerConnection = ConsumerConnection.decode(response.getBody(), ConsumerConnection.class); + future.complete(consumerConnection); + } else { + log.warn("getConsumerConnectionList getResponseCommand failed, {} {}", response.getCode(), response.getRemark()); + future.completeExceptionally(new MQClientException(response.getCode(), response.getRemark())); + } + }); + return future; + } + + @Override + public CompletableFuture queryTopicsByConsumer(String address, + QueryTopicsByConsumerRequestHeader requestHeader, long timeoutMillis) { + CompletableFuture future = new CompletableFuture<>(); + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.QUERY_TOPICS_BY_CONSUMER, requestHeader); + remotingClient.invoke(address, request, timeoutMillis).thenAccept(response -> { + if (response.getCode() == ResponseCode.SUCCESS) { + TopicList topicList = TopicList.decode(response.getBody(), TopicList.class); + future.complete(topicList); + } else { + log.warn("queryTopicsByConsumer getResponseCommand failed, {} {}", response.getCode(), response.getRemark()); + future.completeExceptionally(new MQClientException(response.getCode(), response.getRemark())); + } + }); + return future; + } + + @Override + public CompletableFuture querySubscriptionByConsumer(String address, + QuerySubscriptionByConsumerRequestHeader requestHeader, long timeoutMillis) { + CompletableFuture future = new CompletableFuture<>(); + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.QUERY_SUBSCRIPTION_BY_CONSUMER, requestHeader); + remotingClient.invoke(address, request, timeoutMillis).thenAccept(response -> { + if (response.getCode() == ResponseCode.SUCCESS) { + QuerySubscriptionResponseBody subscriptionResponseBody = + QuerySubscriptionResponseBody.decode(response.getBody(), QuerySubscriptionResponseBody.class); + future.complete(subscriptionResponseBody.getSubscriptionData()); + } else { + log.warn("querySubscriptionByConsumer getResponseCommand failed, {} {}", response.getCode(), response.getRemark()); + future.completeExceptionally(new MQClientException(response.getCode(), response.getRemark())); + } + }); + return future; + } + + @Override + public CompletableFuture getConsumeStats(String address, GetConsumeStatsRequestHeader requestHeader, + long timeoutMillis) { + CompletableFuture future = new CompletableFuture<>(); + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_CONSUME_STATS, requestHeader); + remotingClient.invoke(address, request, timeoutMillis).thenAccept(response -> { + if (response.getCode() == ResponseCode.SUCCESS) { + ConsumeStats consumeStats = ConsumeStats.decode(response.getBody(), ConsumeStats.class); + future.complete(consumeStats); + } else { + log.warn("getConsumeStats getResponseCommand failed, {} {}", response.getCode(), response.getRemark()); + future.completeExceptionally(new MQClientException(response.getCode(), response.getRemark())); + } + }); + return future; + } + + @Override + public CompletableFuture queryTopicConsumeByWho(String address, + QueryTopicConsumeByWhoRequestHeader requestHeader, long timeoutMillis) { + CompletableFuture future = new CompletableFuture<>(); + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.QUERY_TOPIC_CONSUME_BY_WHO, requestHeader); + remotingClient.invoke(address, request, timeoutMillis).thenAccept(response -> { + if (response.getCode() == ResponseCode.SUCCESS) { + GroupList groupList = GroupList.decode(response.getBody(), GroupList.class); + future.complete(groupList); + } else { + log.warn("queryTopicConsumeByWho getResponseCommand failed, {} {}", response.getCode(), response.getRemark()); + future.completeExceptionally(new MQClientException(response.getCode(), response.getRemark())); + } + }); + return future; + } + + @Override + public CompletableFuture getConsumerRunningInfo(String address, + GetConsumerRunningInfoRequestHeader requestHeader, long timeoutMillis) { + CompletableFuture future = new CompletableFuture<>(); + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_CONSUMER_RUNNING_INFO, requestHeader); + remotingClient.invoke(address, request, timeoutMillis).thenAccept(response -> { + if (response.getCode() == ResponseCode.SUCCESS) { + ConsumerRunningInfo info = ConsumerRunningInfo.decode(response.getBody(), ConsumerRunningInfo.class); + future.complete(info); + } else { + log.warn("getConsumerRunningInfo getResponseCommand failed, {} {}", response.getCode(), response.getRemark()); + future.completeExceptionally(new MQClientException(response.getCode(), response.getRemark())); + } + }); + return future; + } + + @Override + public CompletableFuture consumeMessageDirectly(String address, + ConsumeMessageDirectlyResultRequestHeader requestHeader, long timeoutMillis) { + CompletableFuture future = new CompletableFuture<>(); + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CONSUME_MESSAGE_DIRECTLY, requestHeader); + remotingClient.invoke(address, request, timeoutMillis).thenAccept(response -> { + if (response.getCode() == ResponseCode.SUCCESS) { + ConsumeMessageDirectlyResult info = ConsumeMessageDirectlyResult.decode(response.getBody(), ConsumeMessageDirectlyResult.class); + future.complete(info); + } else { + log.warn("consumeMessageDirectly getResponseCommand failed, {} {}", response.getCode(), response.getRemark()); + future.completeExceptionally(new MQClientException(response.getCode(), response.getRemark())); + } + }); + return future; + } + + private List filterMessages(List messageFoundList, String topic, String key, + boolean uniqueKeyFlag) { + List matchedMessages = new ArrayList<>(); + if (uniqueKeyFlag) { + matchedMessages.addAll(messageFoundList.stream() + .filter(msg -> topic.equals(msg.getTopic())) + .filter(msg -> key.equals(msg.getMsgId())) + .collect(Collectors.toList()) + ); + } else { + matchedMessages.addAll(messageFoundList.stream() + .filter(msg -> topic.equals(msg.getTopic())) + .filter(msg -> { + boolean matched = false; + if (StringUtils.isNotBlank(msg.getKeys())) { + String[] keyArray = msg.getKeys().split(MessageConst.KEY_SEPARATOR); + for (String s : keyArray) { + if (key.equals(s)) { + matched = true; + break; + } + } + } + + return matched; + }).collect(Collectors.toList())); + } + + return matchedMessages; + } +} diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/mqclient/MQClientAPIExt.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/mqclient/MQClientAPIExt.java index ec81e815cef..cc8252c2e6b 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/mqclient/MQClientAPIExt.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/mqclient/MQClientAPIExt.java @@ -35,6 +35,7 @@ import org.apache.rocketmq.client.impl.ClientRemotingProcessor; import org.apache.rocketmq.client.impl.CommunicationMode; import org.apache.rocketmq.client.impl.MQClientAPIImpl; +import org.apache.rocketmq.client.impl.admin.MqClientAdminImpl; import org.apache.rocketmq.client.impl.consumer.PullResultExt; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.constant.LoggerName; @@ -83,6 +84,8 @@ public class MQClientAPIExt extends MQClientAPIImpl { private final ClientConfig clientConfig; + private MqClientAdminImpl mqClientAdmin; + public MQClientAPIExt( ClientConfig clientConfig, NettyClientConfig nettyClientConfig, @@ -91,6 +94,7 @@ public MQClientAPIExt( ) { super(nettyClientConfig, clientRemotingProcessor, rpcHook, clientConfig); this.clientConfig = clientConfig; + this.mqClientAdmin = new MqClientAdminImpl(getRemotingClient()); } public boolean updateNameServerAddressList() { @@ -621,20 +625,7 @@ public CompletableFuture notification(String brokerAddr, NotificationRe } public CompletableFuture invoke(String brokerAddr, RemotingCommand request, long timeoutMillis) { - CompletableFuture future = new CompletableFuture<>(); - try { - this.getRemotingClient().invokeAsync(brokerAddr, request, timeoutMillis, responseFuture -> { - RemotingCommand response = responseFuture.getResponseCommand(); - if (response != null) { - future.complete(response); - } else { - future.completeExceptionally(processNullResponseErr(responseFuture)); - } - }); - } catch (Exception e) { - future.completeExceptionally(e); - } - return future; + return getRemotingClient().invoke(brokerAddr, request, timeoutMillis); } public CompletableFuture invokeOneway(String brokerAddr, RemotingCommand request, long timeoutMillis) { @@ -647,4 +638,8 @@ public CompletableFuture invokeOneway(String brokerAddr, RemotingCommand r } return future; } + + public MqClientAdminImpl getMqClientAdmin() { + return mqClientAdmin; + } } diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClient.java b/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClient.java index 5c3766b2d27..ff0b3df95a6 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClient.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClient.java @@ -17,8 +17,10 @@ package org.apache.rocketmq.remoting; import java.util.List; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import org.apache.rocketmq.remoting.exception.RemotingConnectException; +import org.apache.rocketmq.remoting.exception.RemotingException; import org.apache.rocketmq.remoting.exception.RemotingSendRequestException; import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException; @@ -45,6 +47,30 @@ void invokeOneway(final String addr, final RemotingCommand request, final long t throws InterruptedException, RemotingConnectException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException; + default CompletableFuture invoke(final String addr, final RemotingCommand request, + final long timeoutMillis) { + CompletableFuture future = new CompletableFuture<>(); + try { + invokeAsync(addr, request, timeoutMillis, responseFuture -> { + RemotingCommand response = responseFuture.getResponseCommand(); + if (response != null) { + future.complete(response); + } else { + if (!responseFuture.isSendRequestOK()) { + future.completeExceptionally(new RemotingSendRequestException(addr, responseFuture.getCause())); + } else if (responseFuture.isTimeout()) { + future.completeExceptionally(new RemotingTimeoutException(addr, timeoutMillis, responseFuture.getCause())); + } else { + future.completeExceptionally(new RemotingException(request.toString(), responseFuture.getCause())); + } + } + }); + } catch (Throwable t) { + future.completeExceptionally(t); + } + return future; + } + void registerProcessor(final int requestCode, final NettyRequestProcessor processor, final ExecutorService executor); diff --git a/remoting/src/test/java/org/apache/rocketmq/remoting/netty/NettyRemotingClientTest.java b/remoting/src/test/java/org/apache/rocketmq/remoting/netty/NettyRemotingClientTest.java index 4b38ce9524c..efa3eb3d592 100644 --- a/remoting/src/test/java/org/apache/rocketmq/remoting/netty/NettyRemotingClientTest.java +++ b/remoting/src/test/java/org/apache/rocketmq/remoting/netty/NettyRemotingClientTest.java @@ -16,23 +16,111 @@ */ package org.apache.rocketmq.remoting.netty; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import org.apache.rocketmq.remoting.InvokeCallback; +import org.apache.rocketmq.remoting.exception.RemotingException; +import org.apache.rocketmq.remoting.exception.RemotingSendRequestException; +import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; +import org.apache.rocketmq.remoting.protocol.RemotingCommand; +import org.apache.rocketmq.remoting.protocol.RequestCode; +import org.apache.rocketmq.remoting.protocol.ResponseCode; import org.junit.Test; import org.junit.runner.RunWith; +import org.mockito.Spy; import org.mockito.junit.MockitoJUnitRunner; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.AssertionsForClassTypes.catchThrowable; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.doAnswer; @RunWith(MockitoJUnitRunner.class) public class NettyRemotingClientTest { + @Spy private NettyRemotingClient remotingClient = new NettyRemotingClient(new NettyClientConfig()); @Test public void testSetCallbackExecutor() throws NoSuchFieldException, IllegalAccessException { ExecutorService customized = Executors.newCachedThreadPool(); remotingClient.setCallbackExecutor(customized); - assertThat(remotingClient.getCallbackExecutor()).isEqualTo(customized); } + + @Test + public void testInvokeResponse() throws Exception { + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, null); + + RemotingCommand response = RemotingCommand.createResponseCommand(null); + response.setCode(ResponseCode.SUCCESS); + doAnswer(invocation -> { + InvokeCallback callback = invocation.getArgument(3); + ResponseFuture responseFuture = new ResponseFuture(null, request.getOpaque(), 3 * 1000, null, null); + responseFuture.setResponseCommand(response); + callback.operationComplete(responseFuture); + return null; + }).when(remotingClient).invokeAsync(anyString(), any(RemotingCommand.class), anyLong(), any(InvokeCallback.class)); + + CompletableFuture future = remotingClient.invoke("0.0.0.0", request, 1000); + RemotingCommand actual = future.get(); + assertThat(actual).isEqualTo(response); + } + + @Test + public void testRemotingSendRequestException() throws Exception { + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, null); + + RemotingCommand response = RemotingCommand.createResponseCommand(null); + response.setCode(ResponseCode.SUCCESS); + doAnswer(invocation -> { + InvokeCallback callback = invocation.getArgument(3); + ResponseFuture responseFuture = new ResponseFuture(null, request.getOpaque(), 3 * 1000, null, null); + responseFuture.setSendRequestOK(false); + callback.operationComplete(responseFuture); + return null; + }).when(remotingClient).invokeAsync(anyString(), any(RemotingCommand.class), anyLong(), any(InvokeCallback.class)); + + CompletableFuture future = remotingClient.invoke("0.0.0.0", request, 1000); + Throwable thrown = catchThrowable(future::get); + assertThat(thrown.getCause()).isInstanceOf(RemotingSendRequestException.class); + } + + @Test + public void testRemotingTimeoutException() throws Exception { + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, null); + + RemotingCommand response = RemotingCommand.createResponseCommand(null); + response.setCode(ResponseCode.SUCCESS); + doAnswer(invocation -> { + InvokeCallback callback = invocation.getArgument(3); + ResponseFuture responseFuture = new ResponseFuture(null, request.getOpaque(), -1L, null, null); + callback.operationComplete(responseFuture); + return null; + }).when(remotingClient).invokeAsync(anyString(), any(RemotingCommand.class), anyLong(), any(InvokeCallback.class)); + + CompletableFuture future = remotingClient.invoke("0.0.0.0", request, 1000); + Throwable thrown = catchThrowable(future::get); + assertThat(thrown.getCause()).isInstanceOf(RemotingTimeoutException.class); + } + + @Test + public void testRemotingException() throws Exception { + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, null); + + RemotingCommand response = RemotingCommand.createResponseCommand(null); + response.setCode(ResponseCode.SUCCESS); + doAnswer(invocation -> { + InvokeCallback callback = invocation.getArgument(3); + ResponseFuture responseFuture = new ResponseFuture(null, request.getOpaque(), 3 * 1000, null, null); + callback.operationComplete(responseFuture); + return null; + }).when(remotingClient).invokeAsync(anyString(), any(RemotingCommand.class), anyLong(), any(InvokeCallback.class)); + + CompletableFuture future = remotingClient.invoke("0.0.0.0", request, 1000); + Throwable thrown = catchThrowable(future::get); + assertThat(thrown.getCause()).isInstanceOf(RemotingException.class); + } }