diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java index 586beb412d297..34dd3610d4ec9 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java @@ -104,7 +104,6 @@ import org.apache.pulsar.common.util.DateFormatter; import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.common.util.Runnables; -import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; import org.apache.pulsar.metadata.api.MetadataStore; import org.apache.pulsar.metadata.api.Stat; import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended; @@ -1207,8 +1206,7 @@ public void calculateCursorBacklogs(final TopicName topicName, BookKeeper bk = getBookKeeper().get(); final CountDownLatch allCursorsCounter = new CountDownLatch(1); final long errorInReadingCursor = -1; - ConcurrentOpenHashMap ledgerRetryMap = - ConcurrentOpenHashMap.newBuilder().build(); + final var ledgerRetryMap = new ConcurrentHashMap(); final MLDataFormats.ManagedLedgerInfo.LedgerInfo ledgerInfo = ledgers.lastEntry().getValue(); final Position lastLedgerPosition = diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java index 0b1661fb9540a..b2ee299bb030e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java @@ -109,7 +109,6 @@ import org.apache.pulsar.common.stats.MetricsUtil; import org.apache.pulsar.common.topics.TopicList; import org.apache.pulsar.common.util.FutureUtil; -import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; import org.apache.pulsar.metadata.api.MetadataCache; import org.apache.pulsar.metadata.api.MetadataStoreException; import org.apache.pulsar.opentelemetry.annotations.PulsarDeprecatedMetric; @@ -150,7 +149,7 @@ public class NamespaceService implements AutoCloseable { public static final String HEARTBEAT_NAMESPACE_FMT_V2 = "pulsar/%s"; public static final String SLA_NAMESPACE_FMT = SLA_NAMESPACE_PROPERTY + "/%s/%s"; - private final ConcurrentOpenHashMap namespaceClients; + private final Map namespaceClients = new ConcurrentHashMap<>(); private final List bundleOwnershipListeners; @@ -204,8 +203,6 @@ public NamespaceService(PulsarService pulsar) { this.loadManager = pulsar.getLoadManager(); this.bundleFactory = new NamespaceBundleFactory(pulsar, Hashing.crc32()); this.ownershipCache = new OwnershipCache(pulsar, this); - this.namespaceClients = - ConcurrentOpenHashMap.newBuilder().build(); this.bundleOwnershipListeners = new CopyOnWriteArrayList<>(); this.bundleSplitListeners = new CopyOnWriteArrayList<>(); this.localBrokerDataCache = pulsar.getLocalMetadataStore().getMetadataCache(LocalBrokerData.class); @@ -461,16 +458,10 @@ public boolean registerNamespace(NamespaceName nsname, boolean ensureOwned) thro } } - private final ConcurrentOpenHashMap>> - findingBundlesAuthoritative = - ConcurrentOpenHashMap.>>newBuilder() - .build(); - private final ConcurrentOpenHashMap>> - findingBundlesNotAuthoritative = - ConcurrentOpenHashMap.>>newBuilder() - .build(); + private final Map>> + findingBundlesAuthoritative = new ConcurrentHashMap<>(); + private final Map>> + findingBundlesNotAuthoritative = new ConcurrentHashMap<>(); /** * Main internal method to lookup and setup ownership of service unit to a broker. @@ -485,7 +476,7 @@ private CompletableFuture> findBrokerServiceUrl( LOG.debug("findBrokerServiceUrl: {} - options: {}", bundle, options); } - ConcurrentOpenHashMap>> targetMap; + Map>> targetMap; if (options.isAuthoritative()) { targetMap = findingBundlesAuthoritative; } else { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java index e8d19d2e2eca1..dfb8b9d2edb12 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java @@ -42,7 +42,6 @@ import org.apache.pulsar.broker.service.Topic.PublishContext; import org.apache.pulsar.common.api.proto.MessageMetadata; import org.apache.pulsar.common.protocol.Commands; -import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -101,20 +100,12 @@ public MessageDupUnknownException() { // Map that contains the highest sequenceId that have been sent by each producers. The map will be updated before // the messages are persisted @VisibleForTesting - final ConcurrentOpenHashMap highestSequencedPushed = - ConcurrentOpenHashMap.newBuilder() - .expectedItems(16) - .concurrencyLevel(1) - .build(); + final Map highestSequencedPushed = new ConcurrentHashMap<>(); // Map that contains the highest sequenceId that have been persistent by each producers. The map will be updated // after the messages are persisted @VisibleForTesting - final ConcurrentOpenHashMap highestSequencedPersisted = - ConcurrentOpenHashMap.newBuilder() - .expectedItems(16) - .concurrencyLevel(1) - .build(); + final Map highestSequencedPersisted = new ConcurrentHashMap<>(); // Number of persisted entries after which to store a snapshot of the sequence ids map private final int snapshotInterval; @@ -434,7 +425,7 @@ public void resetHighestSequenceIdPushed() { } highestSequencedPushed.clear(); - for (String producer : highestSequencedPersisted.keys()) { + for (String producer : highestSequencedPersisted.keySet()) { highestSequencedPushed.put(producer, highestSequencedPersisted.get(producer)); } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/ClusterReplicationMetrics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/ClusterReplicationMetrics.java index 6b274b26b57fb..828cb48be429d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/ClusterReplicationMetrics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/ClusterReplicationMetrics.java @@ -20,23 +20,22 @@ import java.util.ArrayList; import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import org.apache.pulsar.common.stats.Metrics; -import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; /** */ public class ClusterReplicationMetrics { private final List metricsList; private final String localCluster; - private final ConcurrentOpenHashMap metricsMap; + private final Map metricsMap = new ConcurrentHashMap<>(); public static final String SEPARATOR = "_"; public final boolean metricsEnabled; public ClusterReplicationMetrics(String localCluster, boolean metricsEnabled) { metricsList = new ArrayList<>(); this.localCluster = localCluster; - metricsMap = ConcurrentOpenHashMap.newBuilder() - .build(); this.metricsEnabled = metricsEnabled; } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageDuplicationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageDuplicationTest.java index e7dcbc602134c..5b1c78574b462 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageDuplicationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageDuplicationTest.java @@ -60,7 +60,6 @@ import org.apache.pulsar.common.naming.SystemTopicNames; import org.apache.pulsar.common.protocol.Commands; import org.apache.pulsar.broker.qos.AsyncTokenBucket; -import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; import org.apache.pulsar.compaction.CompactionServiceFactory; import org.awaitility.Awaitility; import org.testng.Assert; @@ -230,9 +229,7 @@ public void testInactiveProducerRemove() throws Exception { messageDeduplication.purgeInactiveProducers(); assertFalse(inactiveProducers.containsKey(producerName2)); assertFalse(inactiveProducers.containsKey(producerName3)); - field = MessageDeduplication.class.getDeclaredField("highestSequencedPushed"); - field.setAccessible(true); - ConcurrentOpenHashMap highestSequencedPushed = (ConcurrentOpenHashMap) field.get(messageDeduplication); + final var highestSequencedPushed = messageDeduplication.highestSequencedPushed; assertEquals((long) highestSequencedPushed.get(producerName1), 2L); assertFalse(highestSequencedPushed.containsKey(producerName2)); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java index 03256a3e139b6..111cbdb8a8ef3 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java @@ -31,6 +31,7 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; @@ -67,7 +68,6 @@ import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType; import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.common.util.collections.BitSetRecyclable; -import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; import org.apache.pulsar.common.util.collections.GrowableArrayBlockingQueue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -88,7 +88,7 @@ public abstract class ConsumerBase extends HandlerState implements Consumer> incomingMessages; - protected ConcurrentOpenHashMap unAckedChunkedMessageIdSequenceMap; + protected Map unAckedChunkedMessageIdSequenceMap = new ConcurrentHashMap<>(); protected final ConcurrentLinkedQueue>> pendingReceives; protected final int maxReceiverQueueSize; private volatile int currentReceiverQueueSize; @@ -138,8 +138,6 @@ protected ConsumerBase(PulsarClientImpl client, String topic, ConsumerConfigurat this.consumerEventListener = conf.getConsumerEventListener(); // Always use growable queue since items can exceed the advertised size this.incomingMessages = new GrowableArrayBlockingQueue<>(); - this.unAckedChunkedMessageIdSequenceMap = - ConcurrentOpenHashMap.newBuilder().build(); this.executorProvider = executorProvider; this.messageListenerExecutor = conf.getMessageListenerExecutor() == null ? (conf.getSubscriptionType() == SubscriptionType.Key_Shared diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index 60b9d145c4897..03ccbae01c276 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -130,7 +130,6 @@ import org.apache.pulsar.common.util.SafeCollectionUtils; import org.apache.pulsar.common.util.collections.BitSetRecyclable; import org.apache.pulsar.common.util.collections.ConcurrentBitSetRecyclable; -import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; import org.apache.pulsar.common.util.collections.GrowableArrayBlockingQueue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -207,8 +206,7 @@ public class ConsumerImpl extends ConsumerBase implements ConnectionHandle protected volatile boolean paused; - protected ConcurrentOpenHashMap chunkedMessagesMap = - ConcurrentOpenHashMap.newBuilder().build(); + protected Map chunkedMessagesMap = new ConcurrentHashMap<>(); private int pendingChunkedMessageCount = 0; protected long expireTimeOfIncompleteChunkedMessageMillis = 0; private final AtomicBoolean expireChunkMessageTaskScheduled = new AtomicBoolean(false); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java index bf7f1066173f6..2dc826d9e3af3 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java @@ -27,9 +27,11 @@ import java.util.Collections; import java.util.Comparator; import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -52,7 +54,6 @@ import org.apache.pulsar.client.impl.transaction.TransactionImpl; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.util.FutureUtil; -import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -60,7 +61,7 @@ public class PartitionedProducerImpl extends ProducerBase { private static final Logger log = LoggerFactory.getLogger(PartitionedProducerImpl.class); - private final ConcurrentOpenHashMap> producers; + private final Map> producers = new ConcurrentHashMap<>(); private final MessageRouter routerPolicy; private final PartitionedTopicProducerStatsRecorderImpl stats; private TopicMetadata topicMetadata; @@ -76,8 +77,6 @@ public PartitionedProducerImpl(PulsarClientImpl client, String topic, ProducerCo int numPartitions, CompletableFuture> producerCreatedFuture, Schema schema, ProducerInterceptors interceptors) { super(client, topic, conf, producerCreatedFuture, schema, interceptors); - this.producers = - ConcurrentOpenHashMap.>newBuilder().build(); this.topicMetadata = new TopicMetadataImpl(numPartitions); this.routerPolicy = getMessageRouter(); stats = client.getConfiguration().getStatsIntervalSeconds() > 0 diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBase.java index 7dc5f78398434..12e380fdd510c 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBase.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBase.java @@ -19,7 +19,9 @@ package org.apache.pulsar.client.impl; import static com.google.common.base.Preconditions.checkArgument; +import java.util.Map; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.Producer; @@ -32,7 +34,6 @@ import org.apache.pulsar.client.impl.transaction.TransactionImpl; import org.apache.pulsar.common.protocol.schema.SchemaHash; import org.apache.pulsar.common.util.FutureUtil; -import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; public abstract class ProducerBase extends HandlerState implements Producer { @@ -40,7 +41,7 @@ public abstract class ProducerBase extends HandlerState implements Producer schema; protected final ProducerInterceptors interceptors; - protected final ConcurrentOpenHashMap schemaCache; + protected final Map schemaCache = new ConcurrentHashMap<>(); protected volatile MultiSchemaMode multiSchemaMode = MultiSchemaMode.Auto; protected ProducerBase(PulsarClientImpl client, String topic, ProducerConfigurationData conf, @@ -50,8 +51,6 @@ protected ProducerBase(PulsarClientImpl client, String topic, ProducerConfigurat this.conf = conf; this.schema = schema; this.interceptors = interceptors; - this.schemaCache = - ConcurrentOpenHashMap.newBuilder().build(); if (!conf.isMultiSchema()) { multiSchemaMode = MultiSchemaMode.Disabled; } diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AcknowledgementsGroupingTrackerTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AcknowledgementsGroupingTrackerTest.java index 514e3dde14070..a62d9e7479852 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AcknowledgementsGroupingTrackerTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AcknowledgementsGroupingTrackerTest.java @@ -45,7 +45,6 @@ import org.apache.pulsar.client.util.TimedCompletableFuture; import org.apache.pulsar.common.api.proto.CommandAck.AckType; import org.apache.pulsar.common.util.collections.ConcurrentBitSetRecyclable; -import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; import org.apache.pulsar.common.api.proto.ProtocolVersion; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; @@ -62,8 +61,7 @@ public class AcknowledgementsGroupingTrackerTest { public void setup() throws NoSuchFieldException, IllegalAccessException { eventLoopGroup = new NioEventLoopGroup(1); consumer = mock(ConsumerImpl.class); - consumer.unAckedChunkedMessageIdSequenceMap = - ConcurrentOpenHashMap.newBuilder().build(); + consumer.unAckedChunkedMessageIdSequenceMap = new ConcurrentHashMap<>(); cnx = spy(new ClientCnxTest(new ClientConfigurationData(), eventLoopGroup)); PulsarClientImpl client = mock(PulsarClientImpl.class); ConnectionPool connectionPool = mock(ConnectionPool.class); diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/UnAckedMessageTrackerTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/UnAckedMessageTrackerTest.java index b01fbcb879f80..eaac165818a56 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/UnAckedMessageTrackerTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/UnAckedMessageTrackerTest.java @@ -31,13 +31,11 @@ import io.netty.util.concurrent.DefaultThreadFactory; import java.time.Duration; import java.util.HashSet; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; - import org.apache.pulsar.client.api.MessageId; -import org.apache.pulsar.client.api.MessageIdAdv; import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; import org.apache.pulsar.client.impl.metrics.InstrumentProvider; -import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; import org.awaitility.Awaitility; import org.testng.annotations.Test; @@ -113,8 +111,7 @@ public void testTrackChunkedMessageId() { ChunkMessageIdImpl chunkedMessageId = new ChunkMessageIdImpl(chunkMsgIds[0], chunkMsgIds[chunkMsgIds.length - 1]); - consumer.unAckedChunkedMessageIdSequenceMap = - ConcurrentOpenHashMap.newBuilder().build(); + consumer.unAckedChunkedMessageIdSequenceMap = new ConcurrentHashMap<>(); consumer.unAckedChunkedMessageIdSequenceMap.put(chunkedMessageId, chunkMsgIds); // Redeliver chunked message diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashMap.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashMap.java deleted file mode 100644 index 7f0dbb4379265..0000000000000 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashMap.java +++ /dev/null @@ -1,658 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.common.util.collections; - -import static com.google.common.base.Preconditions.checkArgument; -import static java.util.Objects.requireNonNull; -import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; -import java.util.concurrent.locks.StampedLock; -import java.util.function.BiConsumer; -import java.util.function.Function; - -/** - * Concurrent hash map. - * - *

Provides similar methods as a {@code ConcurrentMap} but since it's an open hash map with linear probing, - * no node allocations are required to store the values. - * - *
- * WARN: method forEach do not guarantee thread safety, nor do the keys and values method. - *
- * The forEach method is specifically designed for single-threaded usage. When iterating over a map - * with concurrent writes, it becomes possible for new values to be either observed or not observed. - * There is no guarantee that if we write value1 and value2, and are able to see value2, then we will also see value1. - * In some cases, it is even possible to encounter two mappings with the same key, - * leading the keys method to return a List containing two identical keys. - * - *
- * It is crucial to understand that the results obtained from aggregate status methods such as keys and values - * are typically reliable only when the map is not undergoing concurrent updates from other threads. - * When concurrent updates are involved, the results of these methods reflect transient states - * that may be suitable for monitoring or estimation purposes, but not for program control. - * @param - */ -@SuppressWarnings("unchecked") -public class ConcurrentOpenHashMap { - - private static final Object EmptyKey = null; - private static final Object DeletedKey = new Object(); - private static final ConcurrentOpenHashMap EmptyMap = new ConcurrentOpenHashMap<>(1, 1); - - /** - * This object is used to delete empty value in this map. - * EmptyValue.equals(null) = true. - */ - private static final Object EmptyValue = new Object() { - - @SuppressFBWarnings - @Override - public boolean equals(Object obj) { - return obj == null; - } - - /** - * This is just for avoiding spotbugs errors - */ - @Override - public int hashCode() { - return super.hashCode(); - } - }; - - private static final int DefaultExpectedItems = 256; - private static final int DefaultConcurrencyLevel = 16; - - private static final float DefaultMapFillFactor = 0.66f; - private static final float DefaultMapIdleFactor = 0.15f; - - private static final float DefaultExpandFactor = 2; - private static final float DefaultShrinkFactor = 2; - - private static final boolean DefaultAutoShrink = false; - - private final Section[] sections; - - public static Builder newBuilder() { - return new Builder<>(); - } - - /** - * Builder of ConcurrentOpenHashMap. - */ - public static class Builder { - int expectedItems = DefaultExpectedItems; - int concurrencyLevel = DefaultConcurrencyLevel; - float mapFillFactor = DefaultMapFillFactor; - float mapIdleFactor = DefaultMapIdleFactor; - float expandFactor = DefaultExpandFactor; - float shrinkFactor = DefaultShrinkFactor; - boolean autoShrink = DefaultAutoShrink; - - public Builder expectedItems(int expectedItems) { - this.expectedItems = expectedItems; - return this; - } - - public Builder concurrencyLevel(int concurrencyLevel) { - this.concurrencyLevel = concurrencyLevel; - return this; - } - - public Builder mapFillFactor(float mapFillFactor) { - this.mapFillFactor = mapFillFactor; - return this; - } - - public Builder mapIdleFactor(float mapIdleFactor) { - this.mapIdleFactor = mapIdleFactor; - return this; - } - - public Builder expandFactor(float expandFactor) { - this.expandFactor = expandFactor; - return this; - } - - public Builder shrinkFactor(float shrinkFactor) { - this.shrinkFactor = shrinkFactor; - return this; - } - - public Builder autoShrink(boolean autoShrink) { - this.autoShrink = autoShrink; - return this; - } - - public ConcurrentOpenHashMap build() { - return new ConcurrentOpenHashMap<>(expectedItems, concurrencyLevel, - mapFillFactor, mapIdleFactor, autoShrink, expandFactor, shrinkFactor); - } - } - - @Deprecated - public ConcurrentOpenHashMap() { - this(DefaultExpectedItems); - } - - @Deprecated - public ConcurrentOpenHashMap(int expectedItems) { - this(expectedItems, DefaultConcurrencyLevel); - } - - @Deprecated - public ConcurrentOpenHashMap(int expectedItems, int concurrencyLevel) { - this(expectedItems, concurrencyLevel, DefaultMapFillFactor, DefaultMapIdleFactor, - DefaultAutoShrink, DefaultExpandFactor, DefaultShrinkFactor); - } - - public ConcurrentOpenHashMap(int expectedItems, int concurrencyLevel, - float mapFillFactor, float mapIdleFactor, - boolean autoShrink, float expandFactor, float shrinkFactor) { - checkArgument(expectedItems > 0); - checkArgument(concurrencyLevel > 0); - checkArgument(expectedItems >= concurrencyLevel); - checkArgument(mapFillFactor > 0 && mapFillFactor < 1); - checkArgument(mapIdleFactor > 0 && mapIdleFactor < 1); - checkArgument(mapFillFactor > mapIdleFactor); - checkArgument(expandFactor > 1); - checkArgument(shrinkFactor > 1); - - int numSections = concurrencyLevel; - int perSectionExpectedItems = expectedItems / numSections; - int perSectionCapacity = (int) (perSectionExpectedItems / mapFillFactor); - this.sections = (Section[]) new Section[numSections]; - - for (int i = 0; i < numSections; i++) { - sections[i] = new Section<>(perSectionCapacity, mapFillFactor, mapIdleFactor, - autoShrink, expandFactor, shrinkFactor); - } - } - - public static ConcurrentOpenHashMap emptyMap() { - return (ConcurrentOpenHashMap) EmptyMap; - } - - long getUsedBucketCount() { - long usedBucketCount = 0; - for (Section s : sections) { - usedBucketCount += s.usedBuckets; - } - return usedBucketCount; - } - - public long size() { - long size = 0; - for (Section s : sections) { - size += s.size; - } - return size; - } - - public long capacity() { - long capacity = 0; - for (Section s : sections) { - capacity += s.capacity; - } - return capacity; - } - - public boolean isEmpty() { - for (Section s : sections) { - if (s.size != 0) { - return false; - } - } - - return true; - } - - public V get(K key) { - requireNonNull(key); - long h = hash(key); - return getSection(h).get(key, (int) h); - } - - public boolean containsKey(K key) { - return get(key) != null; - } - - public V put(K key, V value) { - requireNonNull(key); - requireNonNull(value); - long h = hash(key); - return getSection(h).put(key, value, (int) h, false, null); - } - - public V putIfAbsent(K key, V value) { - requireNonNull(key); - requireNonNull(value); - long h = hash(key); - return getSection(h).put(key, value, (int) h, true, null); - } - - public V computeIfAbsent(K key, Function provider) { - requireNonNull(key); - requireNonNull(provider); - long h = hash(key); - return getSection(h).put(key, null, (int) h, true, provider); - } - - public V remove(K key) { - requireNonNull(key); - long h = hash(key); - return getSection(h).remove(key, null, (int) h); - } - - public boolean remove(K key, Object value) { - requireNonNull(key); - requireNonNull(value); - long h = hash(key); - return getSection(h).remove(key, value, (int) h) != null; - } - - public void removeNullValue(K key) { - remove(key, EmptyValue); - } - - private Section getSection(long hash) { - // Use 32 msb out of long to get the section - final int sectionIdx = (int) (hash >>> 32) & (sections.length - 1); - return sections[sectionIdx]; - } - - public void clear() { - for (int i = 0; i < sections.length; i++) { - sections[i].clear(); - } - } - - /** - * Iterate over all the entries in the map and apply the processor function to each of them. - *

- * Warning: Do Not Guarantee Thread-Safety. - * @param processor the function to apply to each entry - */ - public void forEach(BiConsumer processor) { - for (int i = 0; i < sections.length; i++) { - sections[i].forEach(processor); - } - } - - /** - * @return a new list of all keys (makes a copy) - */ - public List keys() { - List keys = new ArrayList<>((int) size()); - forEach((key, value) -> keys.add(key)); - return keys; - } - - public List values() { - List values = new ArrayList<>((int) size()); - forEach((key, value) -> values.add(value)); - return values; - } - - // A section is a portion of the hash map that is covered by a single - @SuppressWarnings("serial") - private static final class Section extends StampedLock { - // Each item take up 2 continuous array space. - private static final int ITEM_SIZE = 2; - - // Keys and values are stored interleaved in the table array - private volatile Object[] table; - - private volatile int capacity; - private final int initCapacity; - private static final AtomicIntegerFieldUpdater

SIZE_UPDATER = - AtomicIntegerFieldUpdater.newUpdater(Section.class, "size"); - private volatile int size; - private int usedBuckets; - private int resizeThresholdUp; - private int resizeThresholdBelow; - private final float mapFillFactor; - private final float mapIdleFactor; - private final float expandFactor; - private final float shrinkFactor; - private final boolean autoShrink; - - Section(int capacity, float mapFillFactor, float mapIdleFactor, boolean autoShrink, - float expandFactor, float shrinkFactor) { - this.capacity = alignToPowerOfTwo(capacity); - this.initCapacity = this.capacity; - this.table = new Object[ITEM_SIZE * this.capacity]; - this.size = 0; - this.usedBuckets = 0; - this.autoShrink = autoShrink; - this.mapFillFactor = mapFillFactor; - this.mapIdleFactor = mapIdleFactor; - this.expandFactor = expandFactor; - this.shrinkFactor = shrinkFactor; - this.resizeThresholdUp = (int) (this.capacity * mapFillFactor); - this.resizeThresholdBelow = (int) (this.capacity * mapIdleFactor); - } - - V get(K key, int keyHash) { - long stamp = tryOptimisticRead(); - boolean acquiredLock = false; - - // add local variable here, so OutOfBound won't happen - Object[] table = this.table; - // calculate table.length / 2 as capacity to avoid rehash changing capacity - int bucket = signSafeMod(keyHash, table.length / ITEM_SIZE); - - try { - while (true) { - // First try optimistic locking - K storedKey = (K) table[bucket]; - V storedValue = (V) table[bucket + 1]; - - if (!acquiredLock && validate(stamp)) { - // The values we have read are consistent - if (key.equals(storedKey)) { - return storedValue; - } else if (storedKey == EmptyKey) { - // Not found - return null; - } - } else { - // Fallback to acquiring read lock - if (!acquiredLock) { - stamp = readLock(); - acquiredLock = true; - - // update local variable - table = this.table; - bucket = signSafeMod(keyHash, table.length / ITEM_SIZE); - storedKey = (K) table[bucket]; - storedValue = (V) table[bucket + 1]; - } - - if (key.equals(storedKey)) { - return storedValue; - } else if (storedKey == EmptyKey) { - // Not found - return null; - } - } - - bucket = (bucket + ITEM_SIZE) & (table.length - 1); - } - } finally { - if (acquiredLock) { - unlockRead(stamp); - } - } - } - - V put(K key, V value, int keyHash, boolean onlyIfAbsent, Function valueProvider) { - long stamp = writeLock(); - int bucket = signSafeMod(keyHash, capacity); - - // Remember where we find the first available spot - int firstDeletedKey = -1; - - try { - while (true) { - K storedKey = (K) table[bucket]; - V storedValue = (V) table[bucket + 1]; - - if (key.equals(storedKey)) { - if (!onlyIfAbsent) { - // Over written an old value for same key - table[bucket + 1] = value; - return storedValue; - } else { - return storedValue; - } - } else if (storedKey == EmptyKey) { - // Found an empty bucket. This means the key is not in the map. If we've already seen a deleted - // key, we should write at that position - if (firstDeletedKey != -1) { - bucket = firstDeletedKey; - } else { - ++usedBuckets; - } - - if (value == null) { - value = valueProvider.apply(key); - } - - table[bucket] = key; - table[bucket + 1] = value; - SIZE_UPDATER.incrementAndGet(this); - return valueProvider != null ? value : null; - } else if (storedKey == DeletedKey) { - // The bucket contained a different deleted key - if (firstDeletedKey == -1) { - firstDeletedKey = bucket; - } - } - - bucket = (bucket + ITEM_SIZE) & (table.length - 1); - } - } finally { - if (usedBuckets > resizeThresholdUp) { - try { - // Expand the hashmap - int newCapacity = alignToPowerOfTwo((int) (capacity * expandFactor)); - rehash(newCapacity); - } finally { - unlockWrite(stamp); - } - } else { - unlockWrite(stamp); - } - } - } - - private V remove(K key, Object value, int keyHash) { - long stamp = writeLock(); - int bucket = signSafeMod(keyHash, capacity); - - try { - while (true) { - K storedKey = (K) table[bucket]; - V storedValue = (V) table[bucket + 1]; - if (key.equals(storedKey)) { - if (value == null || value.equals(storedValue)) { - SIZE_UPDATER.decrementAndGet(this); - - int nextInArray = (bucket + ITEM_SIZE) & (table.length - 1); - if (table[nextInArray] == EmptyKey) { - table[bucket] = EmptyKey; - table[bucket + 1] = null; - --usedBuckets; - - // Cleanup all the buckets that were in `DeletedKey` state, - // so that we can reduce unnecessary expansions - int lastBucket = (bucket - ITEM_SIZE) & (table.length - 1); - while (table[lastBucket] == DeletedKey) { - table[lastBucket] = EmptyKey; - table[lastBucket + 1] = null; - --usedBuckets; - - lastBucket = (lastBucket - ITEM_SIZE) & (table.length - 1); - } - } else { - table[bucket] = DeletedKey; - table[bucket + 1] = null; - } - - return storedValue; - } else { - return null; - } - } else if (storedKey == EmptyKey) { - // Key wasn't found - return null; - } - - bucket = (bucket + ITEM_SIZE) & (table.length - 1); - } - - } finally { - if (autoShrink && size < resizeThresholdBelow) { - try { - // Shrinking must at least ensure initCapacity, - // so as to avoid frequent shrinking and expansion near initCapacity, - // frequent shrinking and expansion, - // additionally opened arrays will consume more memory and affect GC - int newCapacity = Math.max(alignToPowerOfTwo((int) (capacity / shrinkFactor)), initCapacity); - int newResizeThresholdUp = (int) (newCapacity * mapFillFactor); - if (newCapacity < capacity && newResizeThresholdUp > size) { - // shrink the hashmap - rehash(newCapacity); - } - } finally { - unlockWrite(stamp); - } - } else { - unlockWrite(stamp); - } - } - } - - void clear() { - long stamp = writeLock(); - - try { - if (autoShrink && capacity > initCapacity) { - shrinkToInitCapacity(); - } else { - Arrays.fill(table, EmptyKey); - this.size = 0; - this.usedBuckets = 0; - } - } finally { - unlockWrite(stamp); - } - } - - public void forEach(BiConsumer processor) { - // Take a reference to the data table, if there is a rehashing event, we'll be - // simply iterating over a snapshot of the data. - Object[] table = this.table; - - // Go through all the buckets for this section. We try to renew the stamp only after a validation - // error, otherwise we keep going with the same. - long stamp = 0; - for (int bucket = 0; bucket < table.length; bucket += ITEM_SIZE) { - if (stamp == 0) { - stamp = tryOptimisticRead(); - } - - K storedKey = (K) table[bucket]; - V storedValue = (V) table[bucket + 1]; - - if (!validate(stamp)) { - // Fallback to acquiring read lock - stamp = readLock(); - - try { - storedKey = (K) table[bucket]; - storedValue = (V) table[bucket + 1]; - } finally { - unlockRead(stamp); - } - - stamp = 0; - } - - if (storedKey != DeletedKey && storedKey != EmptyKey) { - processor.accept(storedKey, storedValue); - } - } - } - - private void rehash(int newCapacity) { - // Expand the hashmap - Object[] newTable = new Object[ITEM_SIZE * newCapacity]; - - // Re-hash table - for (int i = 0; i < table.length; i += ITEM_SIZE) { - K storedKey = (K) table[i]; - V storedValue = (V) table[i + 1]; - if (storedKey != EmptyKey && storedKey != DeletedKey) { - insertKeyValueNoLock(newTable, newCapacity, storedKey, storedValue); - } - } - - table = newTable; - capacity = newCapacity; - usedBuckets = size; - resizeThresholdUp = (int) (capacity * mapFillFactor); - resizeThresholdBelow = (int) (capacity * mapIdleFactor); - } - - private void shrinkToInitCapacity() { - Object[] newTable = new Object[ITEM_SIZE * initCapacity]; - - table = newTable; - size = 0; - usedBuckets = 0; - // Capacity needs to be updated after the values, so that we won't see - // a capacity value bigger than the actual array size - capacity = initCapacity; - resizeThresholdUp = (int) (capacity * mapFillFactor); - resizeThresholdBelow = (int) (capacity * mapIdleFactor); - } - - private static void insertKeyValueNoLock(Object[] table, int capacity, K key, V value) { - int bucket = signSafeMod(hash(key), capacity); - - while (true) { - K storedKey = (K) table[bucket]; - - if (storedKey == EmptyKey) { - // The bucket is empty, so we can use it - table[bucket] = key; - table[bucket + 1] = value; - return; - } - - bucket = (bucket + ITEM_SIZE) & (table.length - 1); - } - } - } - - private static final long HashMixer = 0xc6a4a7935bd1e995L; - private static final int R = 47; - - static final long hash(K key) { - long hash = key.hashCode() * HashMixer; - hash ^= hash >>> R; - hash *= HashMixer; - return hash; - } - - static final int signSafeMod(long n, int max) { - // as the ITEM_SIZE of Section is 2, so the index is the multiple of 2 - // that is to left shift 1 bit - return (int) (n & (max - 1)) << 1; - } - - private static int alignToPowerOfTwo(int n) { - return (int) Math.pow(2, 32 - Integer.numberOfLeadingZeros(n - 1)); - } -} diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashSet.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashSet.java deleted file mode 100644 index 0a9f802037bce..0000000000000 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashSet.java +++ /dev/null @@ -1,622 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.common.util.collections; - -import static com.google.common.base.Preconditions.checkArgument; -import static java.util.Objects.requireNonNull; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; -import java.util.concurrent.locks.StampedLock; -import java.util.function.Consumer; -import java.util.function.Predicate; - -/** - * Concurrent hash set. - * - *

Provides similar methods as a {@code ConcurrentMap} but since it's an open hash map with linear probing, - * no node allocations are required to store the values. - * - *
- * WARN: method forEach do not guarantee thread safety, nor does the values method. - *
- * The forEach method is specifically designed for single-threaded usage. When iterating over a set - * with concurrent writes, it becomes possible for new values to be either observed or not observed. - * There is no guarantee that if we write value1 and value2, and are able to see value2, then we will also see value1. - * - *
- * It is crucial to understand that the results obtained from aggregate status methods such as values - * are typically reliable only when the map is not undergoing concurrent updates from other threads. - * When concurrent updates are involved, the results of these methods reflect transient states - * that may be suitable for monitoring or estimation purposes, but not for program control. - * @param - */ -@SuppressWarnings("unchecked") -public class ConcurrentOpenHashSet { - - private static final Object EmptyValue = null; - private static final Object DeletedValue = new Object(); - - private static final int DefaultExpectedItems = 256; - private static final int DefaultConcurrencyLevel = 16; - - private static final float DefaultMapFillFactor = 0.66f; - private static final float DefaultMapIdleFactor = 0.15f; - - private static final float DefaultExpandFactor = 2; - private static final float DefaultShrinkFactor = 2; - - private static final boolean DefaultAutoShrink = false; - - private final Section[] sections; - - public static Builder newBuilder() { - return new Builder<>(); - } - - /** - * Builder of ConcurrentOpenHashSet. - */ - public static class Builder { - int expectedItems = DefaultExpectedItems; - int concurrencyLevel = DefaultConcurrencyLevel; - float mapFillFactor = DefaultMapFillFactor; - float mapIdleFactor = DefaultMapIdleFactor; - float expandFactor = DefaultExpandFactor; - float shrinkFactor = DefaultShrinkFactor; - boolean autoShrink = DefaultAutoShrink; - - public Builder expectedItems(int expectedItems) { - this.expectedItems = expectedItems; - return this; - } - - public Builder concurrencyLevel(int concurrencyLevel) { - this.concurrencyLevel = concurrencyLevel; - return this; - } - - public Builder mapFillFactor(float mapFillFactor) { - this.mapFillFactor = mapFillFactor; - return this; - } - - public Builder mapIdleFactor(float mapIdleFactor) { - this.mapIdleFactor = mapIdleFactor; - return this; - } - - public Builder expandFactor(float expandFactor) { - this.expandFactor = expandFactor; - return this; - } - - public Builder shrinkFactor(float shrinkFactor) { - this.shrinkFactor = shrinkFactor; - return this; - } - - public Builder autoShrink(boolean autoShrink) { - this.autoShrink = autoShrink; - return this; - } - - public ConcurrentOpenHashSet build() { - return new ConcurrentOpenHashSet<>(expectedItems, concurrencyLevel, - mapFillFactor, mapIdleFactor, autoShrink, expandFactor, shrinkFactor); - } - } - - @Deprecated - public ConcurrentOpenHashSet() { - this(DefaultExpectedItems); - } - - @Deprecated - public ConcurrentOpenHashSet(int expectedItems) { - this(expectedItems, DefaultConcurrencyLevel); - } - - @Deprecated - public ConcurrentOpenHashSet(int expectedItems, int concurrencyLevel) { - this(expectedItems, concurrencyLevel, DefaultMapFillFactor, DefaultMapIdleFactor, - DefaultAutoShrink, DefaultExpandFactor, DefaultShrinkFactor); - } - - public ConcurrentOpenHashSet(int expectedItems, int concurrencyLevel, - float mapFillFactor, float mapIdleFactor, - boolean autoShrink, float expandFactor, float shrinkFactor) { - checkArgument(expectedItems > 0); - checkArgument(concurrencyLevel > 0); - checkArgument(expectedItems >= concurrencyLevel); - checkArgument(mapFillFactor > 0 && mapFillFactor < 1); - checkArgument(mapIdleFactor > 0 && mapIdleFactor < 1); - checkArgument(mapFillFactor > mapIdleFactor); - checkArgument(expandFactor > 1); - checkArgument(shrinkFactor > 1); - - int numSections = concurrencyLevel; - int perSectionExpectedItems = expectedItems / numSections; - int perSectionCapacity = (int) (perSectionExpectedItems / mapFillFactor); - this.sections = (Section[]) new Section[numSections]; - - for (int i = 0; i < numSections; i++) { - sections[i] = new Section<>(perSectionCapacity, mapFillFactor, mapIdleFactor, - autoShrink, expandFactor, shrinkFactor); - } - } - - long getUsedBucketCount() { - long usedBucketCount = 0; - for (Section s : sections) { - usedBucketCount += s.usedBuckets; - } - return usedBucketCount; - } - - public long size() { - long size = 0; - for (int i = 0; i < sections.length; i++) { - size += sections[i].size; - } - return size; - } - - public long capacity() { - long capacity = 0; - for (int i = 0; i < sections.length; i++) { - capacity += sections[i].capacity; - } - return capacity; - } - - public boolean isEmpty() { - for (int i = 0; i < sections.length; i++) { - if (sections[i].size != 0) { - return false; - } - } - - return true; - } - - public boolean contains(V value) { - requireNonNull(value); - long h = hash(value); - return getSection(h).contains(value, (int) h); - } - - public boolean add(V value) { - requireNonNull(value); - long h = hash(value); - return getSection(h).add(value, (int) h); - } - - public boolean remove(V value) { - requireNonNull(value); - long h = hash(value); - return getSection(h).remove(value, (int) h); - } - - private Section getSection(long hash) { - // Use 32 msb out of long to get the section - final int sectionIdx = (int) (hash >>> 32) & (sections.length - 1); - return sections[sectionIdx]; - } - - public void clear() { - for (int i = 0; i < sections.length; i++) { - sections[i].clear(); - } - } - - /** - * Iterate over all the elements in the set and apply the provided function. - *

- * Warning: Do Not Guarantee Thread-Safety. - * @param processor the function to apply to each element - */ - public void forEach(Consumer processor) { - for (int i = 0; i < sections.length; i++) { - sections[i].forEach(processor); - } - } - - public int removeIf(Predicate filter) { - requireNonNull(filter); - - int removedCount = 0; - for (int i = 0; i < sections.length; i++) { - removedCount += sections[i].removeIf(filter); - } - - return removedCount; - } - - /** - * @return a new list of all values (makes a copy) - */ - public List values() { - List values = new ArrayList<>(); - forEach(value -> values.add(value)); - return values; - } - - @Override - public String toString() { - StringBuilder sb = new StringBuilder(); - sb.append('{'); - final AtomicBoolean first = new AtomicBoolean(true); - forEach(value -> { - if (!first.getAndSet(false)) { - sb.append(", "); - } - - sb.append(value.toString()); - }); - sb.append('}'); - return sb.toString(); - } - - // A section is a portion of the hash map that is covered by a single - @SuppressWarnings("serial") - private static final class Section extends StampedLock { - private volatile V[] values; - - private volatile int capacity; - private final int initCapacity; - private static final AtomicIntegerFieldUpdater

SIZE_UPDATER = - AtomicIntegerFieldUpdater.newUpdater(Section.class, "size"); - private volatile int size; - private int usedBuckets; - private int resizeThresholdUp; - private int resizeThresholdBelow; - private final float mapFillFactor; - private final float mapIdleFactor; - private final float expandFactor; - private final float shrinkFactor; - private final boolean autoShrink; - - Section(int capacity, float mapFillFactor, float mapIdleFactor, boolean autoShrink, - float expandFactor, float shrinkFactor) { - this.capacity = alignToPowerOfTwo(capacity); - this.initCapacity = this.capacity; - this.values = (V[]) new Object[this.capacity]; - this.size = 0; - this.usedBuckets = 0; - this.autoShrink = autoShrink; - this.mapFillFactor = mapFillFactor; - this.mapIdleFactor = mapIdleFactor; - this.expandFactor = expandFactor; - this.shrinkFactor = shrinkFactor; - this.resizeThresholdUp = (int) (this.capacity * mapFillFactor); - this.resizeThresholdBelow = (int) (this.capacity * mapIdleFactor); - } - - boolean contains(V value, int keyHash) { - long stamp = tryOptimisticRead(); - boolean acquiredLock = false; - - // add local variable here, so OutOfBound won't happen - V[] values = this.values; - // calculate table.length as capacity to avoid rehash changing capacity - int bucket = signSafeMod(keyHash, values.length); - - try { - while (true) { - // First try optimistic locking - V storedValue = values[bucket]; - - if (!acquiredLock && validate(stamp)) { - // The values we have read are consistent - if (value.equals(storedValue)) { - return true; - } else if (storedValue == EmptyValue) { - // Not found - return false; - } - } else { - // Fallback to acquiring read lock - if (!acquiredLock) { - stamp = readLock(); - acquiredLock = true; - - // update local variable - values = this.values; - bucket = signSafeMod(keyHash, values.length); - storedValue = values[bucket]; - } - - if (value.equals(storedValue)) { - return true; - } else if (storedValue == EmptyValue) { - // Not found - return false; - } - } - bucket = (bucket + 1) & (values.length - 1); - } - } finally { - if (acquiredLock) { - unlockRead(stamp); - } - } - } - - boolean add(V value, int keyHash) { - int bucket = keyHash; - - long stamp = writeLock(); - int capacity = this.capacity; - - // Remember where we find the first available spot - int firstDeletedValue = -1; - - try { - while (true) { - bucket = signSafeMod(bucket, capacity); - - V storedValue = values[bucket]; - - if (value.equals(storedValue)) { - return false; - } else if (storedValue == EmptyValue) { - // Found an empty bucket. This means the value is not in the set. If we've already seen a - // deleted value, we should write at that position - if (firstDeletedValue != -1) { - bucket = firstDeletedValue; - } else { - ++usedBuckets; - } - - values[bucket] = value; - SIZE_UPDATER.incrementAndGet(this); - return true; - } else if (storedValue == DeletedValue) { - // The bucket contained a different deleted key - if (firstDeletedValue == -1) { - firstDeletedValue = bucket; - } - } - - ++bucket; - } - } finally { - if (usedBuckets > resizeThresholdUp) { - try { - // Expand the hashmap - int newCapacity = alignToPowerOfTwo((int) (capacity * expandFactor)); - rehash(newCapacity); - } finally { - unlockWrite(stamp); - } - } else { - unlockWrite(stamp); - } - } - } - - private boolean remove(V value, int keyHash) { - int bucket = keyHash; - long stamp = writeLock(); - - try { - while (true) { - int capacity = this.capacity; - bucket = signSafeMod(bucket, capacity); - - V storedValue = values[bucket]; - if (value.equals(storedValue)) { - SIZE_UPDATER.decrementAndGet(this); - cleanBucket(bucket); - return true; - } else if (storedValue == EmptyValue) { - // Value wasn't found - return false; - } - - ++bucket; - } - - } finally { - if (autoShrink && size < resizeThresholdBelow) { - try { - // Shrinking must at least ensure initCapacity, - // so as to avoid frequent shrinking and expansion near initCapacity, - // frequent shrinking and expansion, - // additionally opened arrays will consume more memory and affect GC - int newCapacity = Math.max(alignToPowerOfTwo((int) (capacity / shrinkFactor)), initCapacity); - int newResizeThresholdUp = (int) (newCapacity * mapFillFactor); - if (newCapacity < capacity && newResizeThresholdUp > size) { - // shrink the hashmap - rehash(newCapacity); - } - } finally { - unlockWrite(stamp); - } - } else { - unlockWrite(stamp); - } - } - } - - void clear() { - long stamp = writeLock(); - - try { - if (autoShrink && capacity > initCapacity) { - shrinkToInitCapacity(); - } else { - Arrays.fill(values, EmptyValue); - this.size = 0; - this.usedBuckets = 0; - } - } finally { - unlockWrite(stamp); - } - } - - int removeIf(Predicate filter) { - long stamp = writeLock(); - - int removedCount = 0; - try { - // Go through all the buckets for this section - for (int bucket = capacity - 1; bucket >= 0; bucket--) { - V storedValue = values[bucket]; - - if (storedValue != DeletedValue && storedValue != EmptyValue) { - if (filter.test(storedValue)) { - // Removing item - SIZE_UPDATER.decrementAndGet(this); - ++removedCount; - cleanBucket(bucket); - } - } - } - - return removedCount; - } finally { - unlockWrite(stamp); - } - } - - private void cleanBucket(int bucket) { - int nextInArray = signSafeMod(bucket + 1, capacity); - if (values[nextInArray] == EmptyValue) { - values[bucket] = (V) EmptyValue; - --usedBuckets; - - // Cleanup all the buckets that were in `DeletedValue` state, - // so that we can reduce unnecessary expansions - int lastBucket = signSafeMod(bucket - 1, capacity); - while (values[lastBucket] == DeletedValue) { - values[lastBucket] = (V) EmptyValue; - --usedBuckets; - - lastBucket = signSafeMod(lastBucket - 1, capacity); - } - } else { - values[bucket] = (V) DeletedValue; - } - } - - public void forEach(Consumer processor) { - V[] values = this.values; - - // Go through all the buckets for this section. We try to renew the stamp only after a validation - // error, otherwise we keep going with the same. - long stamp = 0; - for (int bucket = 0; bucket < capacity; bucket++) { - if (stamp == 0) { - stamp = tryOptimisticRead(); - } - - V storedValue = values[bucket]; - - if (!validate(stamp)) { - // Fallback to acquiring read lock - stamp = readLock(); - - try { - storedValue = values[bucket]; - } finally { - unlockRead(stamp); - } - - stamp = 0; - } - - if (storedValue != DeletedValue && storedValue != EmptyValue) { - processor.accept(storedValue); - } - } - } - - private void rehash(int newCapacity) { - // Expand the hashmap - V[] newValues = (V[]) new Object[newCapacity]; - - // Re-hash table - for (int i = 0; i < values.length; i++) { - V storedValue = values[i]; - if (storedValue != EmptyValue && storedValue != DeletedValue) { - insertValueNoLock(newValues, storedValue); - } - } - - values = newValues; - capacity = newCapacity; - usedBuckets = size; - resizeThresholdUp = (int) (capacity * mapFillFactor); - resizeThresholdBelow = (int) (capacity * mapIdleFactor); - } - - private void shrinkToInitCapacity() { - V[] newValues = (V[]) new Object[initCapacity]; - - values = newValues; - size = 0; - usedBuckets = 0; - // Capacity needs to be updated after the values, so that we won't see - // a capacity value bigger than the actual array size - capacity = initCapacity; - resizeThresholdUp = (int) (capacity * mapFillFactor); - resizeThresholdBelow = (int) (capacity * mapIdleFactor); - } - - private static void insertValueNoLock(V[] values, V value) { - int bucket = (int) hash(value); - - while (true) { - bucket = signSafeMod(bucket, values.length); - - V storedValue = values[bucket]; - - if (storedValue == EmptyValue) { - // The bucket is empty, so we can use it - values[bucket] = value; - return; - } - - ++bucket; - } - } - } - - private static final long HashMixer = 0xc6a4a7935bd1e995L; - private static final int R = 47; - - static final long hash(K key) { - long hash = key.hashCode() * HashMixer; - hash ^= hash >>> R; - hash *= HashMixer; - return hash; - } - - static final int signSafeMod(long n, int max) { - return (int) n & (max - 1); - } - - private static int alignToPowerOfTwo(int n) { - return (int) Math.pow(2, 32 - Integer.numberOfLeadingZeros(n - 1)); - } -} diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashMapTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashMapTest.java deleted file mode 100644 index 48a1a705a3202..0000000000000 --- a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashMapTest.java +++ /dev/null @@ -1,700 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.common.util.collections; - -import static org.testng.Assert.assertEquals; -import static org.testng.Assert.assertFalse; -import static org.testng.Assert.assertNotEquals; -import static org.testng.Assert.assertNull; -import static org.testng.Assert.assertThrows; -import static org.testng.Assert.assertTrue; -import static org.testng.Assert.fail; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Random; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.CyclicBarrier; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; -import java.util.function.Function; - -import lombok.Cleanup; -import org.testng.annotations.Test; - -import com.google.common.collect.Lists; - -public class ConcurrentOpenHashMapTest { - - @Test - public void testConstructor() { - try { - ConcurrentOpenHashMap.newBuilder() - .expectedItems(0) - .build(); - fail("should have thrown exception"); - } catch (IllegalArgumentException e) { - // ok - } - - try { - ConcurrentOpenHashMap.newBuilder() - .expectedItems(16) - .concurrencyLevel(0) - .build(); - fail("should have thrown exception"); - } catch (IllegalArgumentException e) { - // ok - } - - try { - ConcurrentOpenHashMap.newBuilder() - .expectedItems(4) - .concurrencyLevel(8) - .build(); - fail("should have thrown exception"); - } catch (IllegalArgumentException e) { - // ok - } - } - - @Test - public void simpleInsertions() { - ConcurrentOpenHashMap map = - ConcurrentOpenHashMap.newBuilder() - .expectedItems(16) - .build(); - - assertTrue(map.isEmpty()); - assertNull(map.put("1", "one")); - assertFalse(map.isEmpty()); - - assertNull(map.put("2", "two")); - assertNull(map.put("3", "three")); - - assertEquals(map.size(), 3); - - assertEquals(map.get("1"), "one"); - assertEquals(map.size(), 3); - - assertEquals(map.remove("1"), "one"); - assertEquals(map.size(), 2); - assertNull(map.get("1")); - assertNull(map.get("5")); - assertEquals(map.size(), 2); - - assertNull(map.put("1", "one")); - assertEquals(map.size(), 3); - assertEquals(map.put("1", "uno"), "one"); - assertEquals(map.size(), 3); - } - - @Test - public void testReduceUnnecessaryExpansions() { - ConcurrentOpenHashMap map = ConcurrentOpenHashMap.newBuilder() - .expectedItems(2) - .concurrencyLevel(1) - .build(); - assertNull(map.put("1", "1")); - assertNull(map.put("2", "2")); - assertNull(map.put("3", "3")); - assertNull(map.put("4", "4")); - - assertEquals(map.remove("1"), "1"); - assertEquals(map.remove("2"), "2"); - assertEquals(map.remove("3"), "3"); - assertEquals(map.remove("4"), "4"); - - assertEquals(0, map.getUsedBucketCount()); - } - - @Test - public void testClear() { - ConcurrentOpenHashMap map = ConcurrentOpenHashMap.newBuilder() - .expectedItems(2) - .concurrencyLevel(1) - .autoShrink(true) - .mapIdleFactor(0.25f) - .build(); - assertTrue(map.capacity() == 4); - - assertNull(map.put("k1", "v1")); - assertNull(map.put("k2", "v2")); - assertNull(map.put("k3", "v3")); - - assertTrue(map.capacity() == 8); - map.clear(); - assertTrue(map.capacity() == 4); - } - - @Test - public void testExpandAndShrink() { - ConcurrentOpenHashMap map = ConcurrentOpenHashMap.newBuilder() - .expectedItems(2) - .concurrencyLevel(1) - .autoShrink(true) - .mapIdleFactor(0.25f) - .build(); - assertTrue(map.capacity() == 4); - - assertNull(map.put("k1", "v1")); - assertNull(map.put("k2", "v2")); - assertNull(map.put("k3", "v3")); - - // expand hashmap - assertTrue(map.capacity() == 8); - - assertTrue(map.remove("k1", "v1")); - // not shrink - assertTrue(map.capacity() == 8); - assertTrue(map.remove("k2", "v2")); - // shrink hashmap - assertTrue(map.capacity() == 4); - - // expand hashmap - assertNull(map.put("k4", "v4")); - assertNull(map.put("k5", "v5")); - assertTrue(map.capacity() == 8); - - //verify that the map does not keep shrinking at every remove() operation - assertNull(map.put("k6", "v6")); - assertTrue(map.remove("k6", "v6")); - assertTrue(map.capacity() == 8); - } - - @Test - public void testExpandShrinkAndClear() { - ConcurrentOpenHashMap map = ConcurrentOpenHashMap.newBuilder() - .expectedItems(2) - .concurrencyLevel(1) - .autoShrink(true) - .mapIdleFactor(0.25f) - .build(); - final long initCapacity = map.capacity(); - assertTrue(map.capacity() == 4); - assertNull(map.put("k1", "v1")); - assertNull(map.put("k2", "v2")); - assertNull(map.put("k3", "v3")); - - // expand hashmap - assertTrue(map.capacity() == 8); - - assertTrue(map.remove("k1", "v1")); - // not shrink - assertTrue(map.capacity() == 8); - assertTrue(map.remove("k2", "v2")); - // shrink hashmap - assertTrue(map.capacity() == 4); - - assertTrue(map.remove("k3", "v3")); - // Will not shrink the hashmap again because shrink capacity is less than initCapacity - // current capacity is equal than the initial capacity - assertTrue(map.capacity() == initCapacity); - map.clear(); - // after clear, because current capacity is equal than the initial capacity, so not shrinkToInitCapacity - assertTrue(map.capacity() == initCapacity); - } - - @Test - public void testConcurrentExpandAndShrinkAndGet() throws Throwable { - ConcurrentOpenHashMap map = ConcurrentOpenHashMap.newBuilder() - .expectedItems(2) - .concurrencyLevel(1) - .autoShrink(true) - .mapIdleFactor(0.25f) - .build(); - assertEquals(map.capacity(), 4); - - @Cleanup("shutdownNow") - ExecutorService executor = Executors.newCachedThreadPool(); - final int readThreads = 16; - final int writeThreads = 1; - final int n = 1_000; - CyclicBarrier barrier = new CyclicBarrier(writeThreads + readThreads); - Future future = null; - AtomicReference ex = new AtomicReference<>(); - - for (int i = 0; i < readThreads; i++) { - executor.submit(() -> { - try { - barrier.await(); - } catch (Exception e) { - throw new RuntimeException(e); - } - while (!Thread.currentThread().isInterrupted()) { - try { - map.get("k2"); - } catch (Exception e) { - ex.set(e); - } - } - }); - } - - assertNull(map.put("k1","v1")); - future = executor.submit(() -> { - try { - barrier.await(); - } catch (Exception e) { - throw new RuntimeException(e); - } - - for (int i = 0; i < n; i++) { - // expand hashmap - assertNull(map.put("k2", "v2")); - assertNull(map.put("k3", "v3")); - assertEquals(map.capacity(), 8); - - // shrink hashmap - assertTrue(map.remove("k2", "v2")); - assertTrue(map.remove("k3", "v3")); - assertEquals(map.capacity(), 4); - } - }); - - future.get(); - assertTrue(ex.get() == null); - // shut down pool - executor.shutdown(); - } - - @Test - public void testRemove() { - ConcurrentOpenHashMap map = - ConcurrentOpenHashMap.newBuilder().build(); - - assertTrue(map.isEmpty()); - assertNull(map.put("1", "one")); - assertFalse(map.isEmpty()); - - assertFalse(map.remove("0", "zero")); - assertFalse(map.remove("1", "uno")); - - assertFalse(map.isEmpty()); - assertTrue(map.remove("1", "one")); - assertTrue(map.isEmpty()); - } - - @Test - public void testRehashing() { - int n = 16; - ConcurrentOpenHashMap map = ConcurrentOpenHashMap.newBuilder() - .expectedItems(n / 2) - .concurrencyLevel(1) - .build(); - assertEquals(map.capacity(), n); - assertEquals(map.size(), 0); - - for (int i = 0; i < n; i++) { - map.put(Integer.toString(i), i); - } - - assertEquals(map.capacity(), 2 * n); - assertEquals(map.size(), n); - } - - @Test - public void testRehashingWithDeletes() { - int n = 16; - ConcurrentOpenHashMap map = - ConcurrentOpenHashMap.newBuilder() - .expectedItems(n / 2) - .concurrencyLevel(1) - .build(); - assertEquals(map.capacity(), n); - assertEquals(map.size(), 0); - - for (int i = 0; i < n / 2; i++) { - map.put(i, i); - } - - for (int i = 0; i < n / 2; i++) { - map.remove(i); - } - - for (int i = n; i < (2 * n); i++) { - map.put(i, i); - } - - assertEquals(map.capacity(), 2 * n); - assertEquals(map.size(), n); - } - - @Test - public void concurrentInsertions() throws Throwable { - ConcurrentOpenHashMap map = ConcurrentOpenHashMap.newBuilder() - .expectedItems(16) - .concurrencyLevel(1) - .build(); - @Cleanup("shutdownNow") - ExecutorService executor = Executors.newCachedThreadPool(); - - final int nThreads = 16; - final int N = 100_000; - String value = "value"; - - List> futures = new ArrayList<>(); - for (int i = 0; i < nThreads; i++) { - final int threadIdx = i; - - futures.add(executor.submit(() -> { - Random random = new Random(); - - for (int j = 0; j < N; j++) { - long key = random.nextLong(); - // Ensure keys are uniques - key -= key % (threadIdx + 1); - - map.put(key, value); - } - })); - } - - for (Future future : futures) { - future.get(); - } - - assertEquals(map.size(), N * nThreads); - } - - @Test - public void concurrentInsertionsAndReads() throws Throwable { - ConcurrentOpenHashMap map = - ConcurrentOpenHashMap.newBuilder().build(); - @Cleanup("shutdownNow") - ExecutorService executor = Executors.newCachedThreadPool(); - - final int nThreads = 16; - final int N = 100_000; - String value = "value"; - - List> futures = new ArrayList<>(); - for (int i = 0; i < nThreads; i++) { - final int threadIdx = i; - - futures.add(executor.submit(() -> { - Random random = new Random(); - - for (int j = 0; j < N; j++) { - long key = random.nextLong(); - // Ensure keys are uniques - key -= key % (threadIdx + 1); - - map.put(key, value); - } - })); - } - - for (Future future : futures) { - future.get(); - } - - assertEquals(map.size(), N * nThreads); - } - - @Test - public void testIteration() { - ConcurrentOpenHashMap map = - ConcurrentOpenHashMap.newBuilder().build(); - - assertEquals(map.keys(), Collections.emptyList()); - assertEquals(map.values(), Collections.emptyList()); - - map.put(0l, "zero"); - - assertEquals(map.keys(), Lists.newArrayList(0l)); - assertEquals(map.values(), Lists.newArrayList("zero")); - - map.remove(0l); - - assertEquals(map.keys(), Collections.emptyList()); - assertEquals(map.values(), Collections.emptyList()); - - map.put(0l, "zero"); - map.put(1l, "one"); - map.put(2l, "two"); - - List keys = map.keys(); - keys.sort(null); - assertEquals(keys, Lists.newArrayList(0l, 1l, 2l)); - - List values = map.values(); - values.sort(null); - assertEquals(values, Lists.newArrayList("one", "two", "zero")); - - map.put(1l, "uno"); - - keys = map.keys(); - keys.sort(null); - assertEquals(keys, Lists.newArrayList(0l, 1l, 2l)); - - values = map.values(); - values.sort(null); - assertEquals(values, Lists.newArrayList("two", "uno", "zero")); - - map.clear(); - assertTrue(map.isEmpty()); - } - - @Test - public void testHashConflictWithDeletion() { - final int Buckets = 16; - ConcurrentOpenHashMap map = ConcurrentOpenHashMap.newBuilder() - .expectedItems(Buckets) - .concurrencyLevel(1) - .build(); - - // Pick 2 keys that fall into the same bucket - long key1 = 1; - long key2 = 27; - - int bucket1 = ConcurrentOpenHashMap.signSafeMod(ConcurrentOpenHashMap.hash(key1), Buckets); - int bucket2 = ConcurrentOpenHashMap.signSafeMod(ConcurrentOpenHashMap.hash(key2), Buckets); - assertEquals(bucket1, bucket2); - - assertNull(map.put(key1, "value-1")); - assertNull(map.put(key2, "value-2")); - assertEquals(map.size(), 2); - - assertEquals(map.remove(key1), "value-1"); - assertEquals(map.size(), 1); - - assertNull(map.put(key1, "value-1-overwrite")); - assertEquals(map.size(), 2); - - assertEquals(map.remove(key1), "value-1-overwrite"); - assertEquals(map.size(), 1); - - assertEquals(map.put(key2, "value-2-overwrite"), "value-2"); - assertEquals(map.get(key2), "value-2-overwrite"); - - assertEquals(map.size(), 1); - assertEquals(map.remove(key2), "value-2-overwrite"); - assertTrue(map.isEmpty()); - } - - @Test - public void testPutIfAbsent() { - ConcurrentOpenHashMap map = - ConcurrentOpenHashMap.newBuilder().build(); - assertNull(map.putIfAbsent(1l, "one")); - assertEquals(map.get(1l), "one"); - - assertEquals(map.putIfAbsent(1l, "uno"), "one"); - assertEquals(map.get(1l), "one"); - } - - @Test - public void testComputeIfAbsent() { - ConcurrentOpenHashMap map = ConcurrentOpenHashMap.newBuilder() - .expectedItems(16) - .concurrencyLevel(1) - .build(); - AtomicInteger counter = new AtomicInteger(); - Function provider = key -> counter.getAndIncrement(); - - assertEquals(map.computeIfAbsent(0, provider).intValue(), 0); - assertEquals(map.get(0).intValue(), 0); - - assertEquals(map.computeIfAbsent(1, provider).intValue(), 1); - assertEquals(map.get(1).intValue(), 1); - - assertEquals(map.computeIfAbsent(1, provider).intValue(), 1); - assertEquals(map.get(1).intValue(), 1); - - assertEquals(map.computeIfAbsent(2, provider).intValue(), 2); - assertEquals(map.get(2).intValue(), 2); - } - - @Test - public void testEqualsKeys() { - class T { - int value; - - T(int value) { - this.value = value; - } - - @Override - public int hashCode() { - return Integer.hashCode(value); - } - - @Override - public boolean equals(Object obj) { - if (obj instanceof T) { - return value == ((T) obj).value; - } - - return false; - } - } - - ConcurrentOpenHashMap map = - ConcurrentOpenHashMap.newBuilder().build(); - - T t1 = new T(1); - T t1_b = new T(1); - T t2 = new T(2); - - assertEquals(t1, t1_b); - assertNotEquals(t2, t1); - assertNotEquals(t2, t1_b); - - assertNull(map.put(t1, "t1")); - assertEquals(map.get(t1), "t1"); - assertEquals(map.get(t1_b), "t1"); - assertNull(map.get(t2)); - - assertEquals(map.remove(t1_b), "t1"); - assertNull(map.get(t1)); - assertNull(map.get(t1_b)); - } - - @Test - public void testNullValue() { - ConcurrentOpenHashMap map = - ConcurrentOpenHashMap.newBuilder() - .expectedItems(16) - .concurrencyLevel(1) - .build(); - String key = "a"; - assertThrows(NullPointerException.class, () -> map.put(key, null)); - - //put a null value. - assertNull(map.computeIfAbsent(key, k -> null)); - assertEquals(1, map.size()); - assertEquals(1, map.keys().size()); - assertEquals(1, map.values().size()); - assertNull(map.get(key)); - assertFalse(map.containsKey(key)); - - //test remove null value - map.removeNullValue(key); - assertTrue(map.isEmpty()); - assertEquals(0, map.keys().size()); - assertEquals(0, map.values().size()); - assertNull(map.get(key)); - assertFalse(map.containsKey(key)); - - - //test not remove non-null value - map.put(key, "V"); - assertEquals(1, map.size()); - map.removeNullValue(key); - assertEquals(1, map.size()); - - } - - static final int Iterations = 1; - static final int ReadIterations = 1000; - static final int N = 1_000_000; - - public void benchConcurrentOpenHashMap() throws Exception { - ConcurrentOpenHashMap map = ConcurrentOpenHashMap.newBuilder() - .expectedItems(N) - .concurrencyLevel(1) - .build(); - - for (long i = 0; i < Iterations; i++) { - for (int j = 0; j < N; j++) { - map.put(i, "value"); - } - - for (long h = 0; h < ReadIterations; h++) { - for (int j = 0; j < N; j++) { - map.get(i); - } - } - - for (long j = 0; j < N; j++) { - map.remove(i); - } - } - } - - public void benchConcurrentHashMap() throws Exception { - ConcurrentHashMap map = new ConcurrentHashMap(N, 0.66f, 1); - - for (long i = 0; i < Iterations; i++) { - for (int j = 0; j < N; j++) { - map.put(i, "value"); - } - - for (long h = 0; h < ReadIterations; h++) { - for (int j = 0; j < N; j++) { - map.get(i); - } - } - - for (int j = 0; j < N; j++) { - map.remove(i); - } - } - } - - void benchHashMap() { - HashMap map = new HashMap<>(N, 0.66f); - - for (long i = 0; i < Iterations; i++) { - for (int j = 0; j < N; j++) { - map.put(i, "value"); - } - - for (long h = 0; h < ReadIterations; h++) { - for (int j = 0; j < N; j++) { - map.get(i); - } - } - - for (int j = 0; j < N; j++) { - map.remove(i); - } - } - } - - public static void main(String[] args) throws Exception { - ConcurrentOpenHashMapTest t = new ConcurrentOpenHashMapTest(); - - long start = System.nanoTime(); - t.benchHashMap(); - long end = System.nanoTime(); - - System.out.println("HM: " + TimeUnit.NANOSECONDS.toMillis(end - start) + " ms"); - - start = System.nanoTime(); - t.benchConcurrentHashMap(); - end = System.nanoTime(); - - System.out.println("CHM: " + TimeUnit.NANOSECONDS.toMillis(end - start) + " ms"); - - start = System.nanoTime(); - t.benchConcurrentOpenHashMap(); - end = System.nanoTime(); - - System.out.println("CLHM: " + TimeUnit.NANOSECONDS.toMillis(end - start) + " ms"); - - } -} diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashSetTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashSetTest.java deleted file mode 100644 index d509002e21998..0000000000000 --- a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashSetTest.java +++ /dev/null @@ -1,503 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.common.util.collections; - -import static org.testng.Assert.assertEquals; -import static org.testng.Assert.assertFalse; -import static org.testng.Assert.assertNotEquals; -import static org.testng.Assert.assertThrows; -import static org.testng.Assert.assertTrue; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Random; -import java.util.concurrent.CyclicBarrier; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.atomic.AtomicReference; - -import lombok.Cleanup; -import org.testng.annotations.Test; - -import com.google.common.collect.Lists; - -// Deprecation warning suppressed as this test targets deprecated class -@SuppressWarnings("deprecation") -public class ConcurrentOpenHashSetTest { - - @Test - public void testConstructor() { - assertThrows(IllegalArgumentException.class, () -> new ConcurrentOpenHashSet(0)); - assertThrows(IllegalArgumentException.class, () -> new ConcurrentOpenHashSet(16, 0)); - assertThrows(IllegalArgumentException.class, () -> new ConcurrentOpenHashSet(4, 8)); - } - - @Test - public void simpleInsertions() { - ConcurrentOpenHashSet set = new ConcurrentOpenHashSet<>(16); - - assertTrue(set.isEmpty()); - assertTrue(set.add("1")); - assertFalse(set.isEmpty()); - - assertTrue(set.add("2")); - assertTrue(set.add("3")); - - assertEquals(set.size(), 3); - - assertTrue(set.contains("1")); - assertEquals(set.size(), 3); - - assertTrue(set.remove("1")); - assertEquals(set.size(), 2); - assertFalse(set.contains("1")); - assertFalse(set.contains("5")); - assertEquals(set.size(), 2); - - assertTrue(set.add("1")); - assertEquals(set.size(), 3); - assertFalse(set.add("1")); - assertEquals(set.size(), 3); - } - - @Test - public void testReduceUnnecessaryExpansions() { - ConcurrentOpenHashSet set = - ConcurrentOpenHashSet.newBuilder() - .expectedItems(2) - .concurrencyLevel(1) - .build(); - - assertTrue(set.add("1")); - assertTrue(set.add("2")); - assertTrue(set.add("3")); - assertTrue(set.add("4")); - - assertTrue(set.remove("1")); - assertTrue(set.remove("2")); - assertTrue(set.remove("3")); - assertTrue(set.remove("4")); - assertEquals(0, set.getUsedBucketCount()); - } - - @Test - public void testClear() { - ConcurrentOpenHashSet set = - ConcurrentOpenHashSet.newBuilder() - .expectedItems(2) - .concurrencyLevel(1) - .autoShrink(true) - .mapIdleFactor(0.25f) - .build(); - assertEquals(set.capacity(), 4); - - assertTrue(set.add("k1")); - assertTrue(set.add("k2")); - assertTrue(set.add("k3")); - - assertEquals(set.capacity(), 8); - set.clear(); - assertEquals(set.capacity(), 4); - } - - @Test - public void testExpandAndShrink() { - ConcurrentOpenHashSet map = - ConcurrentOpenHashSet.newBuilder() - .expectedItems(2) - .concurrencyLevel(1) - .autoShrink(true) - .mapIdleFactor(0.25f) - .build(); - assertEquals(map.capacity(), 4); - - assertTrue(map.add("k1")); - assertTrue(map.add("k2")); - assertTrue(map.add("k3")); - - // expand hashmap - assertEquals(map.capacity(), 8); - - assertTrue(map.remove("k1")); - // not shrink - assertEquals(map.capacity(), 8); - assertTrue(map.remove("k2")); - // shrink hashmap - assertEquals(map.capacity(), 4); - - // expand hashmap - assertTrue(map.add("k4")); - assertTrue(map.add("k5")); - assertEquals(map.capacity(), 8); - - //verify that the map does not keep shrinking at every remove() operation - assertTrue(map.add("k6")); - assertTrue(map.remove("k6")); - assertEquals(map.capacity(), 8); - } - - @Test - public void testExpandShrinkAndClear() { - ConcurrentOpenHashSet map = ConcurrentOpenHashSet.newBuilder() - .expectedItems(2) - .concurrencyLevel(1) - .autoShrink(true) - .mapIdleFactor(0.25f) - .build(); - final long initCapacity = map.capacity(); - assertTrue(map.capacity() == 4); - - assertTrue(map.add("k1")); - assertTrue(map.add("k2")); - assertTrue(map.add("k3")); - - // expand hashmap - assertTrue(map.capacity() == 8); - - assertTrue(map.remove("k1")); - // not shrink - assertTrue(map.capacity() == 8); - assertTrue(map.remove("k2")); - // shrink hashmap - assertTrue(map.capacity() == 4); - - assertTrue(map.remove("k3")); - // Will not shrink the hashmap again because shrink capacity is less than initCapacity - // current capacity is equal than the initial capacity - assertTrue(map.capacity() == initCapacity); - map.clear(); - // after clear, because current capacity is equal than the initial capacity, so not shrinkToInitCapacity - assertTrue(map.capacity() == initCapacity); - } - - @Test - public void testConcurrentExpandAndShrinkAndGet() throws Throwable { - ConcurrentOpenHashSet set = ConcurrentOpenHashSet.newBuilder() - .expectedItems(2) - .concurrencyLevel(1) - .autoShrink(true) - .mapIdleFactor(0.25f) - .build(); - assertEquals(set.capacity(), 4); - - @Cleanup("shutdownNow") - ExecutorService executor = Executors.newCachedThreadPool(); - final int readThreads = 16; - final int writeThreads = 1; - final int n = 1_000; - CyclicBarrier barrier = new CyclicBarrier(writeThreads + readThreads); - Future future = null; - AtomicReference ex = new AtomicReference<>(); - - for (int i = 0; i < readThreads; i++) { - executor.submit(() -> { - try { - barrier.await(); - } catch (Exception e) { - throw new RuntimeException(e); - } - while (!Thread.currentThread().isInterrupted()) { - try { - set.contains("k2"); - } catch (Exception e) { - ex.set(e); - } - } - }); - } - - assertTrue(set.add("k1")); - future = executor.submit(() -> { - try { - barrier.await(); - } catch (Exception e) { - throw new RuntimeException(e); - } - - for (int i = 0; i < n; i++) { - // expand hashmap - assertTrue(set.add("k2")); - assertTrue(set.add("k3")); - assertEquals(set.capacity(), 8); - - // shrink hashmap - assertTrue(set.remove("k2")); - assertTrue(set.remove("k3")); - assertEquals(set.capacity(), 4); - } - }); - - future.get(); - assertTrue(ex.get() == null); - // shut down pool - executor.shutdown(); - } - - @Test - public void testRemove() { - ConcurrentOpenHashSet set = - ConcurrentOpenHashSet.newBuilder().build(); - - assertTrue(set.isEmpty()); - assertTrue(set.add("1")); - assertFalse(set.isEmpty()); - - assertFalse(set.remove("0")); - assertFalse(set.isEmpty()); - assertTrue(set.remove("1")); - assertTrue(set.isEmpty()); - } - - @Test - public void testRehashing() { - int n = 16; - ConcurrentOpenHashSet set = new ConcurrentOpenHashSet<>(n / 2, 1); - assertEquals(set.capacity(), n); - assertEquals(set.size(), 0); - - for (int i = 0; i < n; i++) { - set.add(i); - } - - assertEquals(set.capacity(), 2 * n); - assertEquals(set.size(), n); - } - - @Test - public void testRehashingWithDeletes() { - int n = 16; - ConcurrentOpenHashSet set = new ConcurrentOpenHashSet<>(n / 2, 1); - assertEquals(set.capacity(), n); - assertEquals(set.size(), 0); - - for (int i = 0; i < n / 2; i++) { - set.add(i); - } - - for (int i = 0; i < n / 2; i++) { - set.remove(i); - } - - for (int i = n; i < (2 * n); i++) { - set.add(i); - } - - assertEquals(set.capacity(), 2 * n); - assertEquals(set.size(), n); - } - - @Test - public void concurrentInsertions() throws Throwable { - ConcurrentOpenHashSet set = - ConcurrentOpenHashSet.newBuilder().build(); - @Cleanup("shutdownNow") - ExecutorService executor = Executors.newCachedThreadPool(); - - final int nThreads = 16; - final int N = 100_000; - - List> futures = new ArrayList<>(); - for (int i = 0; i < nThreads; i++) { - final int threadIdx = i; - - futures.add(executor.submit(() -> { - Random random = new Random(); - - for (int j = 0; j < N; j++) { - long key = random.nextLong(); - // Ensure keys are unique - key -= key % (threadIdx + 1); - - set.add(key); - } - })); - } - - for (Future future : futures) { - future.get(); - } - - assertEquals(set.size(), N * nThreads); - } - - @Test - public void concurrentInsertionsAndReads() throws Throwable { - ConcurrentOpenHashSet map = - ConcurrentOpenHashSet.newBuilder().build(); - @Cleanup("shutdownNow") - ExecutorService executor = Executors.newCachedThreadPool(); - - final int nThreads = 16; - final int N = 100_000; - - List> futures = new ArrayList<>(); - for (int i = 0; i < nThreads; i++) { - final int threadIdx = i; - - futures.add(executor.submit(() -> { - Random random = new Random(); - - for (int j = 0; j < N; j++) { - long key = random.nextLong(); - // Ensure keys are unique - key -= key % (threadIdx + 1); - - map.add(key); - } - })); - } - - for (Future future : futures) { - future.get(); - } - - assertEquals(map.size(), N * nThreads); - } - - @Test - public void testIteration() { - ConcurrentOpenHashSet set = ConcurrentOpenHashSet.newBuilder().build(); - - assertEquals(set.values(), Collections.emptyList()); - - set.add(0l); - - assertEquals(set.values(), Lists.newArrayList(0l)); - - set.remove(0l); - - assertEquals(set.values(), Collections.emptyList()); - - set.add(0l); - set.add(1l); - set.add(2l); - - List values = set.values(); - values.sort(null); - assertEquals(values, Lists.newArrayList(0l, 1l, 2l)); - - set.clear(); - assertTrue(set.isEmpty()); - } - - @Test - public void testRemoval() { - ConcurrentOpenHashSet set = - ConcurrentOpenHashSet.newBuilder().build(); - - set.add(0); - set.add(1); - set.add(3); - set.add(6); - set.add(7); - - List values = set.values(); - values.sort(null); - assertEquals(values, Lists.newArrayList(0, 1, 3, 6, 7)); - - int numOfItemsDeleted = set.removeIf(i -> i < 5); - assertEquals(numOfItemsDeleted, 3); - assertEquals(set.size(), values.size() - numOfItemsDeleted); - values = set.values(); - values.sort(null); - assertEquals(values, Lists.newArrayList(6, 7)); - } - - @Test - public void testHashConflictWithDeletion() { - final int Buckets = 16; - ConcurrentOpenHashSet set = new ConcurrentOpenHashSet<>(Buckets, 1); - - // Pick 2 keys that fall into the same bucket - long key1 = 1; - long key2 = 27; - - int bucket1 = ConcurrentOpenHashSet.signSafeMod(ConcurrentOpenHashSet.hash(key1), Buckets); - int bucket2 = ConcurrentOpenHashSet.signSafeMod(ConcurrentOpenHashSet.hash(key2), Buckets); - assertEquals(bucket1, bucket2); - - assertTrue(set.add(key1)); - assertTrue(set.add(key2)); - assertEquals(set.size(), 2); - - assertTrue(set.remove(key1)); - assertEquals(set.size(), 1); - - assertTrue(set.add(key1)); - assertEquals(set.size(), 2); - - assertTrue(set.remove(key1)); - assertEquals(set.size(), 1); - - assertFalse(set.add(key2)); - assertTrue(set.contains(key2)); - - assertEquals(set.size(), 1); - assertTrue(set.remove(key2)); - assertTrue(set.isEmpty()); - } - - @Test - public void testEqualsObjects() { - class T { - int value; - - T(int value) { - this.value = value; - } - - @Override - public int hashCode() { - return Integer.hashCode(value); - } - - @Override - public boolean equals(Object obj) { - if (obj instanceof T) { - return value == ((T) obj).value; - } - - return false; - } - } - - ConcurrentOpenHashSet set = - ConcurrentOpenHashSet.newBuilder().build(); - - T t1 = new T(1); - T t1_b = new T(1); - T t2 = new T(2); - - assertEquals(t1, t1_b); - assertNotEquals(t2, t1); - assertNotEquals(t2, t1_b); - - set.add(t1); - assertTrue(set.contains(t1)); - assertTrue(set.contains(t1_b)); - assertFalse(set.contains(t2)); - - assertTrue(set.remove(t1_b)); - assertFalse(set.contains(t1)); - assertFalse(set.contains(t1_b)); - } - -} diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java index 889f4431cc35b..7bb4df7baa533 100644 --- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java +++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java @@ -23,7 +23,10 @@ import java.io.Closeable; import java.io.IOException; import java.net.MalformedURLException; +import java.util.Map; import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -44,8 +47,6 @@ import org.apache.pulsar.client.internal.PropertiesUtils; import org.apache.pulsar.common.configuration.PulsarConfigurationLoader; import org.apache.pulsar.common.policies.data.ClusterData; -import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; -import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet; import org.apache.pulsar.metadata.api.MetadataStoreException; import org.apache.pulsar.metadata.api.MetadataStoreException.NotFoundException; import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended; @@ -73,9 +74,9 @@ public class WebSocketService implements Closeable { private Optional cryptoKeyReader = Optional.empty(); private ClusterData localCluster; - private final ConcurrentOpenHashMap> topicProducerMap; - private final ConcurrentOpenHashMap> topicConsumerMap; - private final ConcurrentOpenHashMap> topicReaderMap; + private final Map> topicProducerMap = new ConcurrentHashMap<>(); + private final Map> topicConsumerMap = new ConcurrentHashMap<>(); + private final Map> topicReaderMap = new ConcurrentHashMap<>(); private final ProxyStats proxyStats; public WebSocketService(WebSocketProxyConfiguration config) { @@ -88,17 +89,6 @@ public WebSocketService(ClusterData localCluster, ServiceConfiguration config) { .newScheduledThreadPool(config.getWebSocketNumServiceThreads(), new DefaultThreadFactory("pulsar-websocket")); this.localCluster = localCluster; - this.topicProducerMap = - ConcurrentOpenHashMap.>newBuilder() - .build(); - this.topicConsumerMap = - ConcurrentOpenHashMap.>newBuilder() - .build(); - this.topicReaderMap = - ConcurrentOpenHashMap.>newBuilder() - .build(); this.proxyStats = new ProxyStats(this); } @@ -288,11 +278,11 @@ public boolean isAuthorizationEnabled() { public boolean addProducer(ProducerHandler producer) { return topicProducerMap .computeIfAbsent(producer.getProducer().getTopic(), - topic -> ConcurrentOpenHashSet.newBuilder().build()) + topic -> ConcurrentHashMap.newKeySet()) .add(producer); } - public ConcurrentOpenHashMap> getProducers() { + public Map> getProducers() { return topicProducerMap; } @@ -306,12 +296,11 @@ public boolean removeProducer(ProducerHandler producer) { public boolean addConsumer(ConsumerHandler consumer) { return topicConsumerMap - .computeIfAbsent(consumer.getConsumer().getTopic(), topic -> - ConcurrentOpenHashSet.newBuilder().build()) + .computeIfAbsent(consumer.getConsumer().getTopic(), topic -> ConcurrentHashMap.newKeySet()) .add(consumer); } - public ConcurrentOpenHashMap> getConsumers() { + public Map> getConsumers() { return topicConsumerMap; } @@ -324,12 +313,11 @@ public boolean removeConsumer(ConsumerHandler consumer) { } public boolean addReader(ReaderHandler reader) { - return topicReaderMap.computeIfAbsent(reader.getConsumer().getTopic(), topic -> - ConcurrentOpenHashSet.newBuilder().build()) + return topicReaderMap.computeIfAbsent(reader.getConsumer().getTopic(), topic -> ConcurrentHashMap.newKeySet()) .add(reader); } - public ConcurrentOpenHashMap> getReaders() { + public Map> getReaders() { return topicReaderMap; } diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/stats/ProxyStats.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/stats/ProxyStats.java index eb1566ef7d412..4660340e9cc54 100644 --- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/stats/ProxyStats.java +++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/stats/ProxyStats.java @@ -24,11 +24,11 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.stats.JvmMetrics; import org.apache.pulsar.common.stats.Metrics; -import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; import org.apache.pulsar.websocket.WebSocketService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -41,7 +41,7 @@ public class ProxyStats { private final WebSocketService service; private final JvmMetrics jvmMetrics; - private ConcurrentOpenHashMap topicStats; + private final Map topicStats = new ConcurrentHashMap<>(); private List metricsCollection; private List tempMetricsCollection; @@ -50,9 +50,6 @@ public ProxyStats(WebSocketService service) { this.service = service; this.jvmMetrics = JvmMetrics.create( service.getExecutor(), "prx", service.getConfig().getJvmGCMetricsLoggerClassName()); - this.topicStats = - ConcurrentOpenHashMap.newBuilder() - .build(); this.metricsCollection = new ArrayList<>(); this.tempMetricsCollection = new ArrayList<>(); // schedule stat generation task every 1 minute