Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][broker] Replace ConcurrentOpenHashMap with ConcurrentHashMap in Topic classes #36

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -1265,14 +1265,14 @@ private void resumeAsyncResponse(AsyncResponse asyncResponse, Set<String> subscr
return;
}
} else {
asyncResponse.resume(new ArrayList<>(subscriptions));
asyncResponse.resume(subscriptions);
}
});
}

private void internalGetSubscriptionsForNonPartitionedTopic(AsyncResponse asyncResponse) {
getTopicReferenceAsync(topicName)
.thenAccept(topic -> asyncResponse.resume(new ArrayList<>(topic.getSubscriptions().keys())))
.thenAccept(topic -> asyncResponse.resume(topic.getSubscriptions().keySet()))
.exceptionally(ex -> {
// If the exception is not redirect exception we need to log it.
if (isNot307And404Exception(ex)) {
Expand Down Expand Up @@ -2024,7 +2024,7 @@ private void internalExpireMessagesForAllSubscriptionsForNonPartitionedTopic(Asy
new ArrayList<>((int) topic.getReplicators().size());
List<String> subNames =
new ArrayList<>((int) topic.getSubscriptions().size());
subNames.addAll(topic.getSubscriptions().keys().stream().filter(
subNames.addAll(topic.getSubscriptions().keySet().stream().filter(
subName -> !subName.equals(Compactor.COMPACTION_SUBSCRIPTION)).toList());
for (int i = 0; i < subNames.size(); i++) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.time.Clock;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.List;
Expand Down Expand Up @@ -576,7 +577,7 @@ && getNumberOfSameAddressConsumers(consumer.getClientAddress()) >= maxSameAddres
public abstract int getNumberOfSameAddressConsumers(String clientAddress);

protected int getNumberOfSameAddressConsumers(final String clientAddress,
final List<? extends Subscription> subscriptions) {
final Collection<? extends Subscription> subscriptions) {
int count = 0;
if (clientAddress != null) {
for (Subscription subscription : subscriptions) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1176,15 +1176,15 @@ private CompletableFuture<Void> deleteTopicInternal(String topic, boolean forceD
// v2 topics have a global name so check if the topic is replicated.
if (t.isReplicated()) {
// Delete is disallowed on global topic
final List<String> clusters = t.getReplicators().keys();
final var clusters = t.getReplicators().keySet();
log.error("Delete forbidden topic {} is replicated on clusters {}", topic, clusters);
return FutureUtil.failedFuture(
new IllegalStateException("Delete forbidden topic is replicated on clusters " + clusters));
}

// shadow topic should be deleted first.
if (t.isShadowReplicated()) {
final List<String> shadowTopics = t.getShadowReplicators().keys();
final var shadowTopics = t.getShadowReplicators().keySet();
log.error("Delete forbidden. Topic {} is replicated to shadow topics: {}", topic, shadowTopics);
return FutureUtil.failedFuture(new IllegalStateException(
"Delete forbidden. Topic " + topic + " is replicated to shadow topics."));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@
import org.apache.pulsar.common.policies.data.stats.TopicStatsImpl;
import org.apache.pulsar.common.protocol.schema.SchemaData;
import org.apache.pulsar.common.protocol.schema.SchemaVersion;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats;
import org.apache.pulsar.utils.StatsOutputStream;

Expand Down Expand Up @@ -183,7 +182,7 @@ CompletableFuture<Subscription> createSubscription(String subscriptionName, Init

CompletableFuture<Void> unsubscribe(String subName);

ConcurrentOpenHashMap<String, ? extends Subscription> getSubscriptions();
Map<String, ? extends Subscription> getSubscriptions();

CompletableFuture<Void> delete();

Expand Down Expand Up @@ -265,9 +264,9 @@ void updateRates(NamespaceStats nsStats, NamespaceBundleStats currentBundleStats

Subscription getSubscription(String subscription);

ConcurrentOpenHashMap<String, ? extends Replicator> getReplicators();
Map<String, ? extends Replicator> getReplicators();

ConcurrentOpenHashMap<String, ? extends Replicator> getShadowReplicators();
Map<String, ? extends Replicator> getShadowReplicators();

TopicStatsImpl getStats(boolean getPreciseBacklog, boolean subscriptionBacklogSize,
boolean getEarliestTimeInBacklog);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
Expand Down Expand Up @@ -96,7 +97,6 @@
import org.apache.pulsar.common.protocol.schema.SchemaData;
import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats;
import org.apache.pulsar.utils.StatsOutputStream;
import org.slf4j.Logger;
Expand All @@ -105,9 +105,9 @@
public class NonPersistentTopic extends AbstractTopic implements Topic, TopicPolicyListener {

// Subscriptions to this topic
private final ConcurrentOpenHashMap<String, NonPersistentSubscription> subscriptions;
private final Map<String, NonPersistentSubscription> subscriptions = new ConcurrentHashMap<>();

private final ConcurrentOpenHashMap<String, NonPersistentReplicator> replicators;
private final Map<String, NonPersistentReplicator> replicators = new ConcurrentHashMap<>();

// Ever increasing counter of entries added
private static final AtomicLongFieldUpdater<NonPersistentTopic> ENTRIES_ADDED_COUNTER_UPDATER =
Expand Down Expand Up @@ -152,17 +152,6 @@ public void reset() {

public NonPersistentTopic(String topic, BrokerService brokerService) {
super(topic, brokerService);

this.subscriptions =
ConcurrentOpenHashMap.<String, NonPersistentSubscription>newBuilder()
.expectedItems(16)
.concurrencyLevel(1)
.build();
this.replicators =
ConcurrentOpenHashMap.<String, NonPersistentReplicator>newBuilder()
.expectedItems(16)
.concurrencyLevel(1)
.build();
this.isFenced = false;
registerTopicPolicyListener();
}
Expand Down Expand Up @@ -446,8 +435,8 @@ private CompletableFuture<Void> delete(boolean failIfHasSubscriptions, boolean c
if (failIfHasSubscriptions) {
if (!subscriptions.isEmpty()) {
isFenced = false;
deleteFuture.completeExceptionally(
new TopicBusyException("Topic has subscriptions:" + subscriptions.keys()));
deleteFuture.completeExceptionally(new TopicBusyException("Topic has subscriptions:"
+ subscriptions.keySet().stream().toList()));
return;
}
} else {
Expand Down Expand Up @@ -714,18 +703,18 @@ public int getNumberOfSameAddressConsumers(final String clientAddress) {
}

@Override
public ConcurrentOpenHashMap<String, NonPersistentSubscription> getSubscriptions() {
public Map<String, NonPersistentSubscription> getSubscriptions() {
return subscriptions;
}

@Override
public ConcurrentOpenHashMap<String, NonPersistentReplicator> getReplicators() {
public Map<String, NonPersistentReplicator> getReplicators() {
return replicators;
}

@Override
public ConcurrentOpenHashMap<String, ? extends Replicator> getShadowReplicators() {
return ConcurrentOpenHashMap.emptyMap();
public Map<String, ? extends Replicator> getShadowReplicators() {
return Map.of();
}

@Override
Expand Down Expand Up @@ -1043,7 +1032,6 @@ private CompletableFuture<Void> checkAndUnsubscribeSubscriptions() {

private CompletableFuture<Void> disconnectReplicators() {
List<CompletableFuture<Void>> futures = new ArrayList<>();
ConcurrentOpenHashMap<String, NonPersistentReplicator> replicators = getReplicators();
replicators.forEach((r, replicator) -> {
futures.add(replicator.terminate());
});
Expand Down
Loading
Loading