Skip to content

Commit

Permalink
Add interface for MqClientAdmin
Browse files Browse the repository at this point in the history
  • Loading branch information
drpmma committed Apr 26, 2023
1 parent bfba7d4 commit ead0c57
Show file tree
Hide file tree
Showing 2 changed files with 131 additions and 1 deletion.
110 changes: 110 additions & 0 deletions client/src/main/java/org/apache/rocketmq/client/MqClientAdmin.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.rocketmq.client;

import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.protocol.admin.ConsumeStats;
import org.apache.rocketmq.remoting.protocol.admin.TopicStatsTable;
import org.apache.rocketmq.remoting.protocol.body.ClusterInfo;
import org.apache.rocketmq.remoting.protocol.body.ConsumeMessageDirectlyResult;
import org.apache.rocketmq.remoting.protocol.body.ConsumerConnection;
import org.apache.rocketmq.remoting.protocol.body.ConsumerRunningInfo;
import org.apache.rocketmq.remoting.protocol.body.GroupList;
import org.apache.rocketmq.remoting.protocol.body.QueueTimeSpan;
import org.apache.rocketmq.remoting.protocol.body.TopicList;
import org.apache.rocketmq.remoting.protocol.header.ConsumeMessageDirectlyResultRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.CreateTopicRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.DeleteSubscriptionGroupRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.DeleteTopicRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.GetConsumeStatsRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.GetConsumerConnectionListRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.GetConsumerRunningInfoRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.GetTopicStatsInfoRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.QueryConsumeTimeSpanRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.QueryMessageRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.QuerySubscriptionByConsumerRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.QueryTopicConsumeByWhoRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.QueryTopicsByConsumerRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.ResetOffsetRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.ViewMessageRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.namesrv.DeleteKVConfigRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.namesrv.DeleteTopicFromNamesrvRequestHeader;
import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;

public interface MqClientAdmin {
CompletableFuture<List<MessageExt>> queryMessage(String address, boolean uniqueKeyFlag, boolean decompressBody,
QueryMessageRequestHeader requestHeader, long timeoutMillis);

CompletableFuture<TopicStatsTable> getTopicStatsInfo(String address,
GetTopicStatsInfoRequestHeader requestHeader, long timeoutMillis);

CompletableFuture<List<QueueTimeSpan>> queryConsumeTimeSpan(String address,
QueryConsumeTimeSpanRequestHeader requestHeader, long timeoutMillis);

CompletableFuture<Void> updateOrCreateTopic(String address, CreateTopicRequestHeader requestHeader,
long timeoutMillis);

CompletableFuture<Void> updateOrCreateSubscriptionGroup(String address, SubscriptionGroupConfig config,
long timeoutMillis);

CompletableFuture<Void> deleteTopicInBroker(String address, DeleteTopicRequestHeader requestHeader,
long timeoutMillis);

CompletableFuture<Void> deleteTopicInNameserver(String address, DeleteTopicFromNamesrvRequestHeader requestHeader,
long timeoutMillis);

CompletableFuture<Void> deleteKvConfig(String address, DeleteKVConfigRequestHeader requestHeader,
long timeoutMillis);

CompletableFuture<Void> deleteSubscriptionGroup(String address, DeleteSubscriptionGroupRequestHeader requestHeader,
long timeoutMillis);

CompletableFuture<Map<MessageQueue, Long>> invokeBrokerToResetOffset(String address,
ResetOffsetRequestHeader requestHeader, long timeoutMillis);

CompletableFuture<MessageExt> viewMessage(String address, ViewMessageRequestHeader requestHeader,
long timeoutMillis);

CompletableFuture<ClusterInfo> getBrokerClusterInfo(String address, long timeoutMillis);

CompletableFuture<ConsumerConnection> getConsumerConnectionList(String address,
GetConsumerConnectionListRequestHeader requestHeader, long timeoutMillis);

CompletableFuture<TopicList> queryTopicsByConsumer(String address,
QueryTopicsByConsumerRequestHeader requestHeader, long timeoutMillis);

CompletableFuture<SubscriptionData> querySubscriptionByConsumer(String address,
QuerySubscriptionByConsumerRequestHeader requestHeader, long timeoutMillis);

CompletableFuture<ConsumeStats> getConsumeStats(String address, GetConsumeStatsRequestHeader requestHeader,
long timeoutMillis);

CompletableFuture<GroupList> queryTopicConsumeByWho(String address,
QueryTopicConsumeByWhoRequestHeader requestHeader, long timeoutMillis);

CompletableFuture<ConsumerRunningInfo> getConsumerRunningInfo(String address,
GetConsumerRunningInfoRequestHeader requestHeader, long timeoutMillis);

CompletableFuture<ConsumeMessageDirectlyResult> consumeMessageDirectly(String address,
ConsumeMessageDirectlyResultRequestHeader requestHeader, long timeoutMillis);
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.MqClientAdmin;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.message.MessageConst;
Expand Down Expand Up @@ -69,14 +70,15 @@
import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;

public class MqClientAdminImpl {
public class MqClientAdminImpl implements MqClientAdmin {
private final static Logger log = LoggerFactory.getLogger(MqClientAdminImpl.class);
private final RemotingClient remotingClient;

public MqClientAdminImpl(RemotingClient remotingClient) {
this.remotingClient = remotingClient;
}

@Override
public CompletableFuture<List<MessageExt>> queryMessage(String address, boolean uniqueKeyFlag, boolean decompressBody,
QueryMessageRequestHeader requestHeader, long timeoutMillis) {
CompletableFuture<List<MessageExt>> future = new CompletableFuture<>();
Expand All @@ -98,6 +100,7 @@ public CompletableFuture<List<MessageExt>> queryMessage(String address, boolean
return future;
}

@Override
public CompletableFuture<TopicStatsTable> getTopicStatsInfo(String address,
GetTopicStatsInfoRequestHeader requestHeader, long timeoutMillis) {
CompletableFuture<TopicStatsTable> future = new CompletableFuture<>();
Expand All @@ -114,6 +117,7 @@ public CompletableFuture<TopicStatsTable> getTopicStatsInfo(String address,
return future;
}

@Override
public CompletableFuture<List<QueueTimeSpan>> queryConsumeTimeSpan(String address,
QueryConsumeTimeSpanRequestHeader requestHeader, long timeoutMillis) {
CompletableFuture<List<QueueTimeSpan>> future = new CompletableFuture<>();
Expand All @@ -130,6 +134,7 @@ public CompletableFuture<List<QueueTimeSpan>> queryConsumeTimeSpan(String addres
return future;
}

@Override
public CompletableFuture<Void> updateOrCreateTopic(String address, CreateTopicRequestHeader requestHeader,
long timeoutMillis) {
CompletableFuture<Void> future = new CompletableFuture<>();
Expand All @@ -145,6 +150,7 @@ public CompletableFuture<Void> updateOrCreateTopic(String address, CreateTopicRe
return future;
}

@Override
public CompletableFuture<Void> updateOrCreateSubscriptionGroup(String address, SubscriptionGroupConfig config,
long timeoutMillis) {
CompletableFuture<Void> future = new CompletableFuture<>();
Expand All @@ -162,6 +168,7 @@ public CompletableFuture<Void> updateOrCreateSubscriptionGroup(String address, S
return future;
}

@Override
public CompletableFuture<Void> deleteTopicInBroker(String address, DeleteTopicRequestHeader requestHeader,
long timeoutMillis) {
CompletableFuture<Void> future = new CompletableFuture<>();
Expand All @@ -177,6 +184,7 @@ public CompletableFuture<Void> deleteTopicInBroker(String address, DeleteTopicRe
return future;
}

@Override
public CompletableFuture<Void> deleteTopicInNameserver(String address, DeleteTopicFromNamesrvRequestHeader requestHeader,
long timeoutMillis) {
CompletableFuture<Void> future = new CompletableFuture<>();
Expand All @@ -192,6 +200,7 @@ public CompletableFuture<Void> deleteTopicInNameserver(String address, DeleteTop
return future;
}

@Override
public CompletableFuture<Void> deleteKvConfig(String address, DeleteKVConfigRequestHeader requestHeader,
long timeoutMillis) {
CompletableFuture<Void> future = new CompletableFuture<>();
Expand All @@ -207,6 +216,7 @@ public CompletableFuture<Void> deleteKvConfig(String address, DeleteKVConfigRequ
return future;
}

@Override
public CompletableFuture<Void> deleteSubscriptionGroup(String address, DeleteSubscriptionGroupRequestHeader requestHeader,
long timeoutMillis) {
CompletableFuture<Void> future = new CompletableFuture<>();
Expand All @@ -222,6 +232,7 @@ public CompletableFuture<Void> deleteSubscriptionGroup(String address, DeleteSub
return future;
}

@Override
public CompletableFuture<Map<MessageQueue, Long>> invokeBrokerToResetOffset(String address,
ResetOffsetRequestHeader requestHeader, long timeoutMillis) {
CompletableFuture<Map<MessageQueue, Long>> future = new CompletableFuture<>();
Expand All @@ -240,6 +251,7 @@ public CompletableFuture<Map<MessageQueue, Long>> invokeBrokerToResetOffset(Stri
return future;
}

@Override
public CompletableFuture<MessageExt> viewMessage(String address, ViewMessageRequestHeader requestHeader,
long timeoutMillis) {
CompletableFuture<MessageExt> future = new CompletableFuture<>();
Expand All @@ -257,6 +269,7 @@ public CompletableFuture<MessageExt> viewMessage(String address, ViewMessageRequ
return future;
}

@Override
public CompletableFuture<ClusterInfo> getBrokerClusterInfo(String address, long timeoutMillis) {
CompletableFuture<ClusterInfo> future = new CompletableFuture<>();
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_BROKER_CLUSTER_INFO, null);
Expand All @@ -272,6 +285,7 @@ public CompletableFuture<ClusterInfo> getBrokerClusterInfo(String address, long
return future;
}

@Override
public CompletableFuture<ConsumerConnection> getConsumerConnectionList(String address,
GetConsumerConnectionListRequestHeader requestHeader, long timeoutMillis) {
CompletableFuture<ConsumerConnection> future = new CompletableFuture<>();
Expand All @@ -288,6 +302,7 @@ public CompletableFuture<ConsumerConnection> getConsumerConnectionList(String ad
return future;
}

@Override
public CompletableFuture<TopicList> queryTopicsByConsumer(String address,
QueryTopicsByConsumerRequestHeader requestHeader, long timeoutMillis) {
CompletableFuture<TopicList> future = new CompletableFuture<>();
Expand All @@ -304,6 +319,7 @@ public CompletableFuture<TopicList> queryTopicsByConsumer(String address,
return future;
}

@Override
public CompletableFuture<SubscriptionData> querySubscriptionByConsumer(String address,
QuerySubscriptionByConsumerRequestHeader requestHeader, long timeoutMillis) {
CompletableFuture<SubscriptionData> future = new CompletableFuture<>();
Expand All @@ -321,6 +337,7 @@ public CompletableFuture<SubscriptionData> querySubscriptionByConsumer(String ad
return future;
}

@Override
public CompletableFuture<ConsumeStats> getConsumeStats(String address, GetConsumeStatsRequestHeader requestHeader,
long timeoutMillis) {
CompletableFuture<ConsumeStats> future = new CompletableFuture<>();
Expand All @@ -337,6 +354,7 @@ public CompletableFuture<ConsumeStats> getConsumeStats(String address, GetConsum
return future;
}

@Override
public CompletableFuture<GroupList> queryTopicConsumeByWho(String address,
QueryTopicConsumeByWhoRequestHeader requestHeader, long timeoutMillis) {
CompletableFuture<GroupList> future = new CompletableFuture<>();
Expand All @@ -353,6 +371,7 @@ public CompletableFuture<GroupList> queryTopicConsumeByWho(String address,
return future;
}

@Override
public CompletableFuture<ConsumerRunningInfo> getConsumerRunningInfo(String address,
GetConsumerRunningInfoRequestHeader requestHeader, long timeoutMillis) {
CompletableFuture<ConsumerRunningInfo> future = new CompletableFuture<>();
Expand All @@ -369,6 +388,7 @@ public CompletableFuture<ConsumerRunningInfo> getConsumerRunningInfo(String addr
return future;
}

@Override
public CompletableFuture<ConsumeMessageDirectlyResult> consumeMessageDirectly(String address,
ConsumeMessageDirectlyResultRequestHeader requestHeader, long timeoutMillis) {
CompletableFuture<ConsumeMessageDirectlyResult> future = new CompletableFuture<>();
Expand Down

0 comments on commit ead0c57

Please sign in to comment.