Skip to content

Commit

Permalink
[improve] [broker] high CPU usage caused by list topics under namespa…
Browse files Browse the repository at this point in the history
…ce (apache#23049)

(cherry picked from commit 3e4f338)
(cherry picked from commit 3f7206c)
  • Loading branch information
poorbarcode authored and srinath-ctds committed Jul 26, 2024
1 parent d63205c commit 40a797c
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
Expand All @@ -52,6 +53,7 @@
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.collections4.ListUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.mutable.MutableBoolean;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
Expand Down Expand Up @@ -100,6 +102,7 @@
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.stats.TopicStatsImpl;
import org.apache.pulsar.common.policies.impl.NamespaceIsolationPolicies;
import org.apache.pulsar.common.topics.TopicList;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.metadata.api.MetadataCache;
Expand Down Expand Up @@ -159,6 +162,9 @@ public class NamespaceService implements AutoCloseable {
.register();


private ConcurrentHashMap<String, CompletableFuture<List<String>>> inProgressQueryUserTopics =
new ConcurrentHashMap<>();

/**
* Default constructor.
*/
Expand Down Expand Up @@ -1452,6 +1458,23 @@ public CompletableFuture<List<String>> getListOfTopics(NamespaceName namespaceNa
}
}

public CompletableFuture<List<String>> getListOfUserTopics(NamespaceName namespaceName, Mode mode) {
String key = String.format("%s://%s", mode, namespaceName);
final MutableBoolean initializedByCurrentThread = new MutableBoolean();
CompletableFuture<List<String>> queryRes = inProgressQueryUserTopics.computeIfAbsent(key, k -> {
initializedByCurrentThread.setTrue();
return getListOfTopics(namespaceName, mode).thenApplyAsync(list -> {
return TopicList.filterSystemTopic(list);
}, pulsar.getExecutor());
});
if (initializedByCurrentThread.getValue()) {
queryRes.whenComplete((ignore, ex) -> {
inProgressQueryUserTopics.remove(key, queryRes);
});
}
return queryRes;
}

public CompletableFuture<List<String>> getAllPartitions(NamespaceName namespaceName) {
return getPartitions(namespaceName, TopicDomain.persistent)
.thenCombine(getPartitions(namespaceName, TopicDomain.non_persistent),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2348,11 +2348,11 @@ protected void handleGetTopicsOfNamespace(CommandGetTopicsOfNamespace commandGet
if (lookupSemaphore.tryAcquire()) {
isNamespaceOperationAllowed(namespaceName, NamespaceOperation.GET_TOPICS).thenApply(isAuthorized -> {
if (isAuthorized) {
getBrokerService().pulsar().getNamespaceService().getListOfTopics(namespaceName, mode)
getBrokerService().pulsar().getNamespaceService().getListOfUserTopics(namespaceName, mode)
.thenAccept(topics -> {
boolean filterTopics = false;
// filter system topic
List<String> filteredTopics = TopicList.filterSystemTopic(topics);
List<String> filteredTopics = topics;

if (enableSubscriptionPatternEvaluation && topicsPattern.isPresent()) {
if (topicsPattern.get().length() <= maxSubscriptionPatternLength) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,8 @@ public void setup() throws Exception {
doReturn(CompletableFuture.completedFuture(true)).when(namespaceService).checkTopicOwnership(any());
doReturn(CompletableFuture.completedFuture(topics)).when(namespaceService).getListOfTopics(
NamespaceName.get("use", "ns-abc"), CommandGetTopicsOfNamespace.Mode.ALL);
doReturn(CompletableFuture.completedFuture(topics)).when(namespaceService).getListOfUserTopics(
NamespaceName.get("use", "ns-abc"), CommandGetTopicsOfNamespace.Mode.ALL);
doReturn(CompletableFuture.completedFuture(topics)).when(namespaceService).getListOfPersistentTopics(
NamespaceName.get("use", "ns-abc"));

Expand Down

0 comments on commit 40a797c

Please sign in to comment.