diff --git a/client/src/main/java/org/apache/rocketmq/client/MqClientAdmin.java b/client/src/main/java/org/apache/rocketmq/client/MqClientAdmin.java new file mode 100644 index 00000000000..4eb74c0ca9b --- /dev/null +++ b/client/src/main/java/org/apache/rocketmq/client/MqClientAdmin.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.client; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.remoting.protocol.admin.ConsumeStats; +import org.apache.rocketmq.remoting.protocol.admin.TopicStatsTable; +import org.apache.rocketmq.remoting.protocol.body.ClusterInfo; +import org.apache.rocketmq.remoting.protocol.body.ConsumeMessageDirectlyResult; +import org.apache.rocketmq.remoting.protocol.body.ConsumerConnection; +import org.apache.rocketmq.remoting.protocol.body.ConsumerRunningInfo; +import org.apache.rocketmq.remoting.protocol.body.GroupList; +import org.apache.rocketmq.remoting.protocol.body.QueueTimeSpan; +import org.apache.rocketmq.remoting.protocol.body.TopicList; +import org.apache.rocketmq.remoting.protocol.header.ConsumeMessageDirectlyResultRequestHeader; +import org.apache.rocketmq.remoting.protocol.header.CreateTopicRequestHeader; +import org.apache.rocketmq.remoting.protocol.header.DeleteSubscriptionGroupRequestHeader; +import org.apache.rocketmq.remoting.protocol.header.DeleteTopicRequestHeader; +import org.apache.rocketmq.remoting.protocol.header.GetConsumeStatsRequestHeader; +import org.apache.rocketmq.remoting.protocol.header.GetConsumerConnectionListRequestHeader; +import org.apache.rocketmq.remoting.protocol.header.GetConsumerRunningInfoRequestHeader; +import org.apache.rocketmq.remoting.protocol.header.GetTopicStatsInfoRequestHeader; +import org.apache.rocketmq.remoting.protocol.header.QueryConsumeTimeSpanRequestHeader; +import org.apache.rocketmq.remoting.protocol.header.QueryMessageRequestHeader; +import org.apache.rocketmq.remoting.protocol.header.QuerySubscriptionByConsumerRequestHeader; +import org.apache.rocketmq.remoting.protocol.header.QueryTopicConsumeByWhoRequestHeader; +import org.apache.rocketmq.remoting.protocol.header.QueryTopicsByConsumerRequestHeader; +import org.apache.rocketmq.remoting.protocol.header.ResetOffsetRequestHeader; +import org.apache.rocketmq.remoting.protocol.header.ViewMessageRequestHeader; +import org.apache.rocketmq.remoting.protocol.header.namesrv.DeleteKVConfigRequestHeader; +import org.apache.rocketmq.remoting.protocol.header.namesrv.DeleteTopicFromNamesrvRequestHeader; +import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData; +import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig; + +public interface MqClientAdmin { + CompletableFuture> queryMessage(String address, boolean uniqueKeyFlag, boolean decompressBody, + QueryMessageRequestHeader requestHeader, long timeoutMillis); + + CompletableFuture getTopicStatsInfo(String address, + GetTopicStatsInfoRequestHeader requestHeader, long timeoutMillis); + + CompletableFuture> queryConsumeTimeSpan(String address, + QueryConsumeTimeSpanRequestHeader requestHeader, long timeoutMillis); + + CompletableFuture updateOrCreateTopic(String address, CreateTopicRequestHeader requestHeader, + long timeoutMillis); + + CompletableFuture updateOrCreateSubscriptionGroup(String address, SubscriptionGroupConfig config, + long timeoutMillis); + + CompletableFuture deleteTopicInBroker(String address, DeleteTopicRequestHeader requestHeader, + long timeoutMillis); + + CompletableFuture deleteTopicInNameserver(String address, DeleteTopicFromNamesrvRequestHeader requestHeader, + long timeoutMillis); + + CompletableFuture deleteKvConfig(String address, DeleteKVConfigRequestHeader requestHeader, + long timeoutMillis); + + CompletableFuture deleteSubscriptionGroup(String address, DeleteSubscriptionGroupRequestHeader requestHeader, + long timeoutMillis); + + CompletableFuture> invokeBrokerToResetOffset(String address, + ResetOffsetRequestHeader requestHeader, long timeoutMillis); + + CompletableFuture viewMessage(String address, ViewMessageRequestHeader requestHeader, + long timeoutMillis); + + CompletableFuture getBrokerClusterInfo(String address, long timeoutMillis); + + CompletableFuture getConsumerConnectionList(String address, + GetConsumerConnectionListRequestHeader requestHeader, long timeoutMillis); + + CompletableFuture queryTopicsByConsumer(String address, + QueryTopicsByConsumerRequestHeader requestHeader, long timeoutMillis); + + CompletableFuture querySubscriptionByConsumer(String address, + QuerySubscriptionByConsumerRequestHeader requestHeader, long timeoutMillis); + + CompletableFuture getConsumeStats(String address, GetConsumeStatsRequestHeader requestHeader, + long timeoutMillis); + + CompletableFuture queryTopicConsumeByWho(String address, + QueryTopicConsumeByWhoRequestHeader requestHeader, long timeoutMillis); + + CompletableFuture getConsumerRunningInfo(String address, + GetConsumerRunningInfoRequestHeader requestHeader, long timeoutMillis); + + CompletableFuture consumeMessageDirectly(String address, + ConsumeMessageDirectlyResultRequestHeader requestHeader, long timeoutMillis); +} diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/admin/MqClientAdminImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/admin/MqClientAdminImpl.java index 3fc889251d4..34f066c7ddd 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/admin/MqClientAdminImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/admin/MqClientAdminImpl.java @@ -24,6 +24,7 @@ import java.util.concurrent.CompletableFuture; import java.util.stream.Collectors; import org.apache.commons.lang3.StringUtils; +import org.apache.rocketmq.client.MqClientAdmin; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.message.MessageConst; @@ -69,7 +70,7 @@ import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData; import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig; -public class MqClientAdminImpl { +public class MqClientAdminImpl implements MqClientAdmin { private final static Logger log = LoggerFactory.getLogger(MqClientAdminImpl.class); private final RemotingClient remotingClient; @@ -77,6 +78,7 @@ public MqClientAdminImpl(RemotingClient remotingClient) { this.remotingClient = remotingClient; } + @Override public CompletableFuture> queryMessage(String address, boolean uniqueKeyFlag, boolean decompressBody, QueryMessageRequestHeader requestHeader, long timeoutMillis) { CompletableFuture> future = new CompletableFuture<>(); @@ -98,6 +100,7 @@ public CompletableFuture> queryMessage(String address, boolean return future; } + @Override public CompletableFuture getTopicStatsInfo(String address, GetTopicStatsInfoRequestHeader requestHeader, long timeoutMillis) { CompletableFuture future = new CompletableFuture<>(); @@ -114,6 +117,7 @@ public CompletableFuture getTopicStatsInfo(String address, return future; } + @Override public CompletableFuture> queryConsumeTimeSpan(String address, QueryConsumeTimeSpanRequestHeader requestHeader, long timeoutMillis) { CompletableFuture> future = new CompletableFuture<>(); @@ -130,6 +134,7 @@ public CompletableFuture> queryConsumeTimeSpan(String addres return future; } + @Override public CompletableFuture updateOrCreateTopic(String address, CreateTopicRequestHeader requestHeader, long timeoutMillis) { CompletableFuture future = new CompletableFuture<>(); @@ -145,6 +150,7 @@ public CompletableFuture updateOrCreateTopic(String address, CreateTopicRe return future; } + @Override public CompletableFuture updateOrCreateSubscriptionGroup(String address, SubscriptionGroupConfig config, long timeoutMillis) { CompletableFuture future = new CompletableFuture<>(); @@ -162,6 +168,7 @@ public CompletableFuture updateOrCreateSubscriptionGroup(String address, S return future; } + @Override public CompletableFuture deleteTopicInBroker(String address, DeleteTopicRequestHeader requestHeader, long timeoutMillis) { CompletableFuture future = new CompletableFuture<>(); @@ -177,6 +184,7 @@ public CompletableFuture deleteTopicInBroker(String address, DeleteTopicRe return future; } + @Override public CompletableFuture deleteTopicInNameserver(String address, DeleteTopicFromNamesrvRequestHeader requestHeader, long timeoutMillis) { CompletableFuture future = new CompletableFuture<>(); @@ -192,6 +200,7 @@ public CompletableFuture deleteTopicInNameserver(String address, DeleteTop return future; } + @Override public CompletableFuture deleteKvConfig(String address, DeleteKVConfigRequestHeader requestHeader, long timeoutMillis) { CompletableFuture future = new CompletableFuture<>(); @@ -207,6 +216,7 @@ public CompletableFuture deleteKvConfig(String address, DeleteKVConfigRequ return future; } + @Override public CompletableFuture deleteSubscriptionGroup(String address, DeleteSubscriptionGroupRequestHeader requestHeader, long timeoutMillis) { CompletableFuture future = new CompletableFuture<>(); @@ -222,6 +232,7 @@ public CompletableFuture deleteSubscriptionGroup(String address, DeleteSub return future; } + @Override public CompletableFuture> invokeBrokerToResetOffset(String address, ResetOffsetRequestHeader requestHeader, long timeoutMillis) { CompletableFuture> future = new CompletableFuture<>(); @@ -240,6 +251,7 @@ public CompletableFuture> invokeBrokerToResetOffset(Stri return future; } + @Override public CompletableFuture viewMessage(String address, ViewMessageRequestHeader requestHeader, long timeoutMillis) { CompletableFuture future = new CompletableFuture<>(); @@ -257,6 +269,7 @@ public CompletableFuture viewMessage(String address, ViewMessageRequ return future; } + @Override public CompletableFuture getBrokerClusterInfo(String address, long timeoutMillis) { CompletableFuture future = new CompletableFuture<>(); RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_BROKER_CLUSTER_INFO, null); @@ -272,6 +285,7 @@ public CompletableFuture getBrokerClusterInfo(String address, long return future; } + @Override public CompletableFuture getConsumerConnectionList(String address, GetConsumerConnectionListRequestHeader requestHeader, long timeoutMillis) { CompletableFuture future = new CompletableFuture<>(); @@ -288,6 +302,7 @@ public CompletableFuture getConsumerConnectionList(String ad return future; } + @Override public CompletableFuture queryTopicsByConsumer(String address, QueryTopicsByConsumerRequestHeader requestHeader, long timeoutMillis) { CompletableFuture future = new CompletableFuture<>(); @@ -304,6 +319,7 @@ public CompletableFuture queryTopicsByConsumer(String address, return future; } + @Override public CompletableFuture querySubscriptionByConsumer(String address, QuerySubscriptionByConsumerRequestHeader requestHeader, long timeoutMillis) { CompletableFuture future = new CompletableFuture<>(); @@ -321,6 +337,7 @@ public CompletableFuture querySubscriptionByConsumer(String ad return future; } + @Override public CompletableFuture getConsumeStats(String address, GetConsumeStatsRequestHeader requestHeader, long timeoutMillis) { CompletableFuture future = new CompletableFuture<>(); @@ -337,6 +354,7 @@ public CompletableFuture getConsumeStats(String address, GetConsum return future; } + @Override public CompletableFuture queryTopicConsumeByWho(String address, QueryTopicConsumeByWhoRequestHeader requestHeader, long timeoutMillis) { CompletableFuture future = new CompletableFuture<>(); @@ -353,6 +371,7 @@ public CompletableFuture queryTopicConsumeByWho(String address, return future; } + @Override public CompletableFuture getConsumerRunningInfo(String address, GetConsumerRunningInfoRequestHeader requestHeader, long timeoutMillis) { CompletableFuture future = new CompletableFuture<>(); @@ -369,6 +388,7 @@ public CompletableFuture getConsumerRunningInfo(String addr return future; } + @Override public CompletableFuture consumeMessageDirectly(String address, ConsumeMessageDirectlyResultRequestHeader requestHeader, long timeoutMillis) { CompletableFuture future = new CompletableFuture<>();