Skip to content

Commit

Permalink
[improve][broker] Remove ConcurrentOpenHashMap and ConcurrentOpenHash…
Browse files Browse the repository at this point in the history
…Set (#23329)
  • Loading branch information
BewareMyPower committed Sep 23, 2024
1 parent 1ce7855 commit 9012422
Show file tree
Hide file tree
Showing 17 changed files with 40 additions and 2,573 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,6 @@
import org.apache.pulsar.common.util.DateFormatter;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.Runnables;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.metadata.api.MetadataStore;
import org.apache.pulsar.metadata.api.Stat;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
Expand Down Expand Up @@ -1207,8 +1206,7 @@ public void calculateCursorBacklogs(final TopicName topicName,
BookKeeper bk = getBookKeeper().get();
final CountDownLatch allCursorsCounter = new CountDownLatch(1);
final long errorInReadingCursor = -1;
ConcurrentOpenHashMap<String, Long> ledgerRetryMap =
ConcurrentOpenHashMap.<String, Long>newBuilder().build();
final var ledgerRetryMap = new ConcurrentHashMap<String, Long>();

final MLDataFormats.ManagedLedgerInfo.LedgerInfo ledgerInfo = ledgers.lastEntry().getValue();
final Position lastLedgerPosition =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,6 @@
import org.apache.pulsar.common.stats.MetricsUtil;
import org.apache.pulsar.common.topics.TopicList;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.metadata.api.MetadataCache;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.opentelemetry.annotations.PulsarDeprecatedMetric;
Expand Down Expand Up @@ -150,7 +149,7 @@ public class NamespaceService implements AutoCloseable {
public static final String HEARTBEAT_NAMESPACE_FMT_V2 = "pulsar/%s";
public static final String SLA_NAMESPACE_FMT = SLA_NAMESPACE_PROPERTY + "/%s/%s";

private final ConcurrentOpenHashMap<ClusterDataImpl, PulsarClientImpl> namespaceClients;
private final Map<ClusterDataImpl, PulsarClientImpl> namespaceClients = new ConcurrentHashMap<>();

private final List<NamespaceBundleOwnershipListener> bundleOwnershipListeners;

Expand Down Expand Up @@ -204,8 +203,6 @@ public NamespaceService(PulsarService pulsar) {
this.loadManager = pulsar.getLoadManager();
this.bundleFactory = new NamespaceBundleFactory(pulsar, Hashing.crc32());
this.ownershipCache = new OwnershipCache(pulsar, this);
this.namespaceClients =
ConcurrentOpenHashMap.<ClusterDataImpl, PulsarClientImpl>newBuilder().build();
this.bundleOwnershipListeners = new CopyOnWriteArrayList<>();
this.bundleSplitListeners = new CopyOnWriteArrayList<>();
this.localBrokerDataCache = pulsar.getLocalMetadataStore().getMetadataCache(LocalBrokerData.class);
Expand Down Expand Up @@ -461,16 +458,10 @@ public boolean registerNamespace(NamespaceName nsname, boolean ensureOwned) thro
}
}

private final ConcurrentOpenHashMap<NamespaceBundle, CompletableFuture<Optional<LookupResult>>>
findingBundlesAuthoritative =
ConcurrentOpenHashMap.<NamespaceBundle,
CompletableFuture<Optional<LookupResult>>>newBuilder()
.build();
private final ConcurrentOpenHashMap<NamespaceBundle, CompletableFuture<Optional<LookupResult>>>
findingBundlesNotAuthoritative =
ConcurrentOpenHashMap.<NamespaceBundle,
CompletableFuture<Optional<LookupResult>>>newBuilder()
.build();
private final Map<NamespaceBundle, CompletableFuture<Optional<LookupResult>>>
findingBundlesAuthoritative = new ConcurrentHashMap<>();
private final Map<NamespaceBundle, CompletableFuture<Optional<LookupResult>>>
findingBundlesNotAuthoritative = new ConcurrentHashMap<>();

/**
* Main internal method to lookup and setup ownership of service unit to a broker.
Expand All @@ -485,7 +476,7 @@ private CompletableFuture<Optional<LookupResult>> findBrokerServiceUrl(
LOG.debug("findBrokerServiceUrl: {} - options: {}", bundle, options);
}

ConcurrentOpenHashMap<NamespaceBundle, CompletableFuture<Optional<LookupResult>>> targetMap;
Map<NamespaceBundle, CompletableFuture<Optional<LookupResult>>> targetMap;
if (options.isAuthoritative()) {
targetMap = findingBundlesAuthoritative;
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
import org.apache.pulsar.broker.service.Topic.PublishContext;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -101,20 +100,12 @@ public MessageDupUnknownException() {
// Map that contains the highest sequenceId that have been sent by each producers. The map will be updated before
// the messages are persisted
@VisibleForTesting
final ConcurrentOpenHashMap<String, Long> highestSequencedPushed =
ConcurrentOpenHashMap.<String, Long>newBuilder()
.expectedItems(16)
.concurrencyLevel(1)
.build();
final Map<String, Long> highestSequencedPushed = new ConcurrentHashMap<>();

// Map that contains the highest sequenceId that have been persistent by each producers. The map will be updated
// after the messages are persisted
@VisibleForTesting
final ConcurrentOpenHashMap<String, Long> highestSequencedPersisted =
ConcurrentOpenHashMap.<String, Long>newBuilder()
.expectedItems(16)
.concurrencyLevel(1)
.build();
final Map<String, Long> highestSequencedPersisted = new ConcurrentHashMap<>();

// Number of persisted entries after which to store a snapshot of the sequence ids map
private final int snapshotInterval;
Expand Down Expand Up @@ -434,7 +425,7 @@ public void resetHighestSequenceIdPushed() {
}

highestSequencedPushed.clear();
for (String producer : highestSequencedPersisted.keys()) {
for (String producer : highestSequencedPersisted.keySet()) {
highestSequencedPushed.put(producer, highestSequencedPersisted.get(producer));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,23 +20,22 @@

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.pulsar.common.stats.Metrics;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;

/**
*/
public class ClusterReplicationMetrics {
private final List<Metrics> metricsList;
private final String localCluster;
private final ConcurrentOpenHashMap<String, ReplicationMetrics> metricsMap;
private final Map<String, ReplicationMetrics> metricsMap = new ConcurrentHashMap<>();
public static final String SEPARATOR = "_";
public final boolean metricsEnabled;

public ClusterReplicationMetrics(String localCluster, boolean metricsEnabled) {
metricsList = new ArrayList<>();
this.localCluster = localCluster;
metricsMap = ConcurrentOpenHashMap.<String, ReplicationMetrics>newBuilder()
.build();
this.metricsEnabled = metricsEnabled;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@
import org.apache.pulsar.common.naming.SystemTopicNames;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.broker.qos.AsyncTokenBucket;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.compaction.CompactionServiceFactory;
import org.awaitility.Awaitility;
import org.testng.Assert;
Expand Down Expand Up @@ -230,9 +229,7 @@ public void testInactiveProducerRemove() throws Exception {
messageDeduplication.purgeInactiveProducers();
assertFalse(inactiveProducers.containsKey(producerName2));
assertFalse(inactiveProducers.containsKey(producerName3));
field = MessageDeduplication.class.getDeclaredField("highestSequencedPushed");
field.setAccessible(true);
ConcurrentOpenHashMap<String, Long> highestSequencedPushed = (ConcurrentOpenHashMap<String, Long>) field.get(messageDeduplication);
final var highestSequencedPushed = messageDeduplication.highestSequencedPushed;

assertEquals((long) highestSequencedPushed.get(producerName1), 2L);
assertFalse(highestSequencedPushed.containsKey(producerName2));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
Expand Down Expand Up @@ -67,7 +68,6 @@
import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.BitSetRecyclable;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.common.util.collections.GrowableArrayBlockingQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -88,7 +88,7 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
protected final ExecutorService internalPinnedExecutor;
protected UnAckedMessageTracker unAckedMessageTracker;
final GrowableArrayBlockingQueue<Message<T>> incomingMessages;
protected ConcurrentOpenHashMap<MessageIdAdv, MessageIdImpl[]> unAckedChunkedMessageIdSequenceMap;
protected Map<MessageIdAdv, MessageIdImpl[]> unAckedChunkedMessageIdSequenceMap = new ConcurrentHashMap<>();
protected final ConcurrentLinkedQueue<CompletableFuture<Message<T>>> pendingReceives;
protected final int maxReceiverQueueSize;
private volatile int currentReceiverQueueSize;
Expand Down Expand Up @@ -138,8 +138,6 @@ protected ConsumerBase(PulsarClientImpl client, String topic, ConsumerConfigurat
this.consumerEventListener = conf.getConsumerEventListener();
// Always use growable queue since items can exceed the advertised size
this.incomingMessages = new GrowableArrayBlockingQueue<>();
this.unAckedChunkedMessageIdSequenceMap =
ConcurrentOpenHashMap.<MessageIdAdv, MessageIdImpl[]>newBuilder().build();
this.executorProvider = executorProvider;
this.messageListenerExecutor = conf.getMessageListenerExecutor() == null
? (conf.getSubscriptionType() == SubscriptionType.Key_Shared
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,6 @@
import org.apache.pulsar.common.util.SafeCollectionUtils;
import org.apache.pulsar.common.util.collections.BitSetRecyclable;
import org.apache.pulsar.common.util.collections.ConcurrentBitSetRecyclable;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.common.util.collections.GrowableArrayBlockingQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -207,8 +206,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle

protected volatile boolean paused;

protected ConcurrentOpenHashMap<String, ChunkedMessageCtx> chunkedMessagesMap =
ConcurrentOpenHashMap.<String, ChunkedMessageCtx>newBuilder().build();
protected Map<String, ChunkedMessageCtx> chunkedMessagesMap = new ConcurrentHashMap<>();
private int pendingChunkedMessageCount = 0;
protected long expireTimeOfIncompleteChunkedMessageMillis = 0;
private final AtomicBoolean expireChunkMessageTaskScheduled = new AtomicBoolean(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,11 @@
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
Expand All @@ -52,15 +54,14 @@
import org.apache.pulsar.client.impl.transaction.TransactionImpl;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PartitionedProducerImpl<T> extends ProducerBase<T> {

private static final Logger log = LoggerFactory.getLogger(PartitionedProducerImpl.class);

private final ConcurrentOpenHashMap<Integer, ProducerImpl<T>> producers;
private final Map<Integer, ProducerImpl<T>> producers = new ConcurrentHashMap<>();
private final MessageRouter routerPolicy;
private final PartitionedTopicProducerStatsRecorderImpl stats;
private TopicMetadata topicMetadata;
Expand All @@ -76,8 +77,6 @@ public PartitionedProducerImpl(PulsarClientImpl client, String topic, ProducerCo
int numPartitions, CompletableFuture<Producer<T>> producerCreatedFuture,
Schema<T> schema, ProducerInterceptors interceptors) {
super(client, topic, conf, producerCreatedFuture, schema, interceptors);
this.producers =
ConcurrentOpenHashMap.<Integer, ProducerImpl<T>>newBuilder().build();
this.topicMetadata = new TopicMetadataImpl(numPartitions);
this.routerPolicy = getMessageRouter();
stats = client.getConfiguration().getStatsIntervalSeconds() > 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@
package org.apache.pulsar.client.impl;

import static com.google.common.base.Preconditions.checkArgument;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
Expand All @@ -32,15 +34,14 @@
import org.apache.pulsar.client.impl.transaction.TransactionImpl;
import org.apache.pulsar.common.protocol.schema.SchemaHash;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;

public abstract class ProducerBase<T> extends HandlerState implements Producer<T> {

protected final CompletableFuture<Producer<T>> producerCreatedFuture;
protected final ProducerConfigurationData conf;
protected final Schema<T> schema;
protected final ProducerInterceptors interceptors;
protected final ConcurrentOpenHashMap<SchemaHash, byte[]> schemaCache;
protected final Map<SchemaHash, byte[]> schemaCache = new ConcurrentHashMap<>();
protected volatile MultiSchemaMode multiSchemaMode = MultiSchemaMode.Auto;

protected ProducerBase(PulsarClientImpl client, String topic, ProducerConfigurationData conf,
Expand All @@ -50,8 +51,6 @@ protected ProducerBase(PulsarClientImpl client, String topic, ProducerConfigurat
this.conf = conf;
this.schema = schema;
this.interceptors = interceptors;
this.schemaCache =
ConcurrentOpenHashMap.<SchemaHash, byte[]>newBuilder().build();
if (!conf.isMultiSchema()) {
multiSchemaMode = MultiSchemaMode.Disabled;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@
import org.apache.pulsar.client.util.TimedCompletableFuture;
import org.apache.pulsar.common.api.proto.CommandAck.AckType;
import org.apache.pulsar.common.util.collections.ConcurrentBitSetRecyclable;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.common.api.proto.ProtocolVersion;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
Expand All @@ -62,8 +61,7 @@ public class AcknowledgementsGroupingTrackerTest {
public void setup() throws NoSuchFieldException, IllegalAccessException {
eventLoopGroup = new NioEventLoopGroup(1);
consumer = mock(ConsumerImpl.class);
consumer.unAckedChunkedMessageIdSequenceMap =
ConcurrentOpenHashMap.<MessageIdAdv, MessageIdImpl[]>newBuilder().build();
consumer.unAckedChunkedMessageIdSequenceMap = new ConcurrentHashMap<>();
cnx = spy(new ClientCnxTest(new ClientConfigurationData(), eventLoopGroup));
PulsarClientImpl client = mock(PulsarClientImpl.class);
ConnectionPool connectionPool = mock(ConnectionPool.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,11 @@
import io.netty.util.concurrent.DefaultThreadFactory;
import java.time.Duration;
import java.util.HashSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;

import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageIdAdv;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.pulsar.client.impl.metrics.InstrumentProvider;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.awaitility.Awaitility;
import org.testng.annotations.Test;

Expand Down Expand Up @@ -113,8 +111,7 @@ public void testTrackChunkedMessageId() {
ChunkMessageIdImpl chunkedMessageId =
new ChunkMessageIdImpl(chunkMsgIds[0], chunkMsgIds[chunkMsgIds.length - 1]);

consumer.unAckedChunkedMessageIdSequenceMap =
ConcurrentOpenHashMap.<MessageIdAdv, MessageIdImpl[]>newBuilder().build();
consumer.unAckedChunkedMessageIdSequenceMap = new ConcurrentHashMap<>();
consumer.unAckedChunkedMessageIdSequenceMap.put(chunkedMessageId, chunkMsgIds);

// Redeliver chunked message
Expand Down
Loading

0 comments on commit 9012422

Please sign in to comment.