From 310b234946e1f973ba67b303dcc46f1200543cf8 Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Mon, 24 Jun 2024 19:43:55 +0300 Subject: [PATCH] [fix] Revert "[improve][broker] Optimize `ConcurrentOpenLongPairRangeSet` by RoaringBitmap (#22908)" This reverts commit 5b1f653e65ccd967fd9642e6d6959de4b1b01a63. --- .../server/src/assemble/LICENSE.bin.txt | 2 +- .../shell/src/assemble/LICENSE.bin.txt | 2 - pom.xml | 2 +- pulsar-common/pom.xml | 5 - .../ConcurrentOpenLongPairRangeSet.java | 12 +- .../collections/ConcurrentRoaringBitSet.java | 439 ------------------ 6 files changed, 9 insertions(+), 453 deletions(-) delete mode 100644 pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentRoaringBitSet.java diff --git a/distribution/server/src/assemble/LICENSE.bin.txt b/distribution/server/src/assemble/LICENSE.bin.txt index c2216378c278e..52d44839257da 100644 --- a/distribution/server/src/assemble/LICENSE.bin.txt +++ b/distribution/server/src/assemble/LICENSE.bin.txt @@ -513,7 +513,7 @@ The Apache Software License, Version 2.0 * RxJava - io.reactivex.rxjava3-rxjava-3.0.1.jar * RoaringBitmap - - org.roaringbitmap-RoaringBitmap-1.1.0.jar + - org.roaringbitmap-RoaringBitmap-1.0.6.jar * OpenTelemetry - io.opentelemetry-opentelemetry-api-1.38.0.jar - io.opentelemetry-opentelemetry-api-incubator-1.38.0-alpha.jar diff --git a/distribution/shell/src/assemble/LICENSE.bin.txt b/distribution/shell/src/assemble/LICENSE.bin.txt index 86e7d2d560808..369ac42a660fc 100644 --- a/distribution/shell/src/assemble/LICENSE.bin.txt +++ b/distribution/shell/src/assemble/LICENSE.bin.txt @@ -382,8 +382,6 @@ The Apache Software License, Version 2.0 - simpleclient_tracer_common-0.16.0.jar - simpleclient_tracer_otel-0.16.0.jar - simpleclient_tracer_otel_agent-0.16.0.jar - * RoaringBitmap - - RoaringBitmap-1.1.0.jar * Log4J - log4j-api-2.23.1.jar - log4j-core-2.23.1.jar diff --git a/pom.xml b/pom.xml index 8325336aa9684..62644d38d167c 100644 --- a/pom.xml +++ b/pom.xml @@ -317,7 +317,7 @@ flexible messaging model and an intuitive client API. 1.3 0.4 9.1.0 - 1.1.0 + 1.0.6 1.6.1 6.4.0 3.33.0 diff --git a/pulsar-common/pom.xml b/pulsar-common/pom.xml index 3f73a43698ea4..aa7e4998e5c3e 100644 --- a/pulsar-common/pom.xml +++ b/pulsar-common/pom.xml @@ -252,11 +252,6 @@ awaitility test - - - org.roaringbitmap - RoaringBitmap - diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenLongPairRangeSet.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenLongPairRangeSet.java index b5ad89d1695d4..72215d7296cc3 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenLongPairRangeSet.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenLongPairRangeSet.java @@ -29,7 +29,6 @@ import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.lang.mutable.MutableInt; -import org.roaringbitmap.RoaringBitSet; /** * A Concurrent set comprising zero or more ranges of type {@link LongPair}. This can be alternative of @@ -45,7 +44,7 @@ public class ConcurrentOpenLongPairRangeSet> implements LongPairRangeSet { protected final NavigableMap rangeBitSetMap = new ConcurrentSkipListMap<>(); - private final boolean threadSafe; + private boolean threadSafe = true; private final int bitSetSize; private final LongPairConsumer consumer; @@ -96,7 +95,9 @@ public void addOpenClosed(long lowerKey, long lowerValueOpen, long upperKey, lon // (2) set 0th-index to upper-index in upperRange.getKey() if (isValid(upperKey, upperValue)) { BitSet rangeBitSet = rangeBitSetMap.computeIfAbsent(upperKey, (key) -> createNewBitSet()); - rangeBitSet.set(0, (int) upperValue + 1); + if (rangeBitSet != null) { + rangeBitSet.set(0, (int) upperValue + 1); + } } // No-op if values are not valid eg: if lower == LongPair.earliest or upper == LongPair.latest then nothing // to set @@ -413,6 +414,7 @@ private int getSafeEntry(long value) { } private BitSet createNewBitSet() { - return this.threadSafe ? new ConcurrentRoaringBitSet() : new RoaringBitSet(); + return this.threadSafe ? new ConcurrentBitSet(bitSetSize) : new BitSet(bitSetSize); } -} \ No newline at end of file + +} diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentRoaringBitSet.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentRoaringBitSet.java deleted file mode 100644 index 814e58400993b..0000000000000 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentRoaringBitSet.java +++ /dev/null @@ -1,439 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.common.util.collections; - -import java.util.BitSet; -import java.util.concurrent.locks.StampedLock; -import java.util.stream.IntStream; -import org.roaringbitmap.RoaringBitSet; - -public class ConcurrentRoaringBitSet extends RoaringBitSet { - private final StampedLock rwLock = new StampedLock(); - - public ConcurrentRoaringBitSet() { - super(); - } - - @Override - public boolean get(int bitIndex) { - long stamp = rwLock.tryOptimisticRead(); - boolean isSet = super.get(bitIndex); - if (!rwLock.validate(stamp)) { - stamp = rwLock.readLock(); - try { - isSet = super.get(bitIndex); - } finally { - rwLock.unlockRead(stamp); - } - } - return isSet; - } - - @Override - public void set(int bitIndex) { - long stamp = rwLock.writeLock(); - try { - super.set(bitIndex); - } finally { - rwLock.unlockWrite(stamp); - } - } - - @Override - public void clear(int bitIndex) { - long stamp = rwLock.writeLock(); - try { - super.clear(bitIndex); - } finally { - rwLock.unlockWrite(stamp); - } - } - - @Override - public void set(int fromIndex, int toIndex) { - long stamp = rwLock.writeLock(); - try { - super.set(fromIndex, toIndex); - } finally { - rwLock.unlockWrite(stamp); - } - } - - @Override - public void clear(int fromIndex, int toIndex) { - long stamp = rwLock.writeLock(); - try { - super.clear(fromIndex, toIndex); - } finally { - rwLock.unlockWrite(stamp); - } - } - - @Override - public void clear() { - long stamp = rwLock.writeLock(); - try { - super.clear(); - } finally { - rwLock.unlockWrite(stamp); - } - } - - @Override - public int nextSetBit(int fromIndex) { - long stamp = rwLock.tryOptimisticRead(); - int nextSetBit = super.nextSetBit(fromIndex); - if (!rwLock.validate(stamp)) { - // Fallback to read lock - stamp = rwLock.readLock(); - try { - nextSetBit = super.nextSetBit(fromIndex); - } finally { - rwLock.unlockRead(stamp); - } - } - return nextSetBit; - } - - @Override - public int nextClearBit(int fromIndex) { - long stamp = rwLock.tryOptimisticRead(); - int nextClearBit = super.nextClearBit(fromIndex); - if (!rwLock.validate(stamp)) { - // Fallback to read lock - stamp = rwLock.readLock(); - try { - nextClearBit = super.nextClearBit(fromIndex); - } finally { - rwLock.unlockRead(stamp); - } - } - return nextClearBit; - } - - @Override - public int previousSetBit(int fromIndex) { - long stamp = rwLock.tryOptimisticRead(); - int previousSetBit = super.previousSetBit(fromIndex); - if (!rwLock.validate(stamp)) { - // Fallback to read lock - stamp = rwLock.readLock(); - try { - previousSetBit = super.previousSetBit(fromIndex); - } finally { - rwLock.unlockRead(stamp); - } - } - return previousSetBit; - } - - @Override - public int previousClearBit(int fromIndex) { - long stamp = rwLock.tryOptimisticRead(); - int previousClearBit = super.previousClearBit(fromIndex); - if (!rwLock.validate(stamp)) { - // Fallback to read lock - stamp = rwLock.readLock(); - try { - previousClearBit = super.previousClearBit(fromIndex); - } finally { - rwLock.unlockRead(stamp); - } - } - return previousClearBit; - } - - @Override - public int length() { - long stamp = rwLock.tryOptimisticRead(); - int length = super.length(); - if (!rwLock.validate(stamp)) { - // Fallback to read lock - stamp = rwLock.readLock(); - try { - length = super.length(); - } finally { - rwLock.unlockRead(stamp); - } - } - return length; - } - - @Override - public boolean isEmpty() { - long stamp = rwLock.tryOptimisticRead(); - boolean isEmpty = super.isEmpty(); - if (!rwLock.validate(stamp)) { - // Fallback to read lock - stamp = rwLock.readLock(); - try { - isEmpty = super.isEmpty(); - } finally { - rwLock.unlockRead(stamp); - } - } - return isEmpty; - } - - @Override - public int cardinality() { - long stamp = rwLock.tryOptimisticRead(); - int cardinality = super.cardinality(); - if (!rwLock.validate(stamp)) { - // Fallback to read lock - stamp = rwLock.readLock(); - try { - cardinality = super.cardinality(); - } finally { - rwLock.unlockRead(stamp); - } - } - return cardinality; - } - - @Override - public int size() { - long stamp = rwLock.tryOptimisticRead(); - int size = super.size(); - if (!rwLock.validate(stamp)) { - // Fallback to read lock - stamp = rwLock.readLock(); - try { - size = super.size(); - } finally { - rwLock.unlockRead(stamp); - } - } - return size; - } - - @Override - public byte[] toByteArray() { - long stamp = rwLock.tryOptimisticRead(); - byte[] byteArray = super.toByteArray(); - if (!rwLock.validate(stamp)) { - // Fallback to read lock - stamp = rwLock.readLock(); - try { - byteArray = super.toByteArray(); - } finally { - rwLock.unlockRead(stamp); - } - } - return byteArray; - } - - @Override - public long[] toLongArray() { - long stamp = rwLock.tryOptimisticRead(); - long[] longArray = super.toLongArray(); - if (!rwLock.validate(stamp)) { - // Fallback to read lock - stamp = rwLock.readLock(); - try { - longArray = super.toLongArray(); - } finally { - rwLock.unlockRead(stamp); - } - } - return longArray; - } - - @Override - public void flip(int bitIndex) { - long stamp = rwLock.writeLock(); - try { - super.flip(bitIndex); - } finally { - rwLock.unlockWrite(stamp); - } - } - - @Override - public void flip(int fromIndex, int toIndex) { - long stamp = rwLock.writeLock(); - try { - super.flip(fromIndex, toIndex); - } finally { - rwLock.unlockWrite(stamp); - } - } - - @Override - public void set(int bitIndex, boolean value) { - long stamp = rwLock.writeLock(); - try { - super.set(bitIndex, value); - } finally { - rwLock.unlockWrite(stamp); - } - } - - @Override - public void set(int fromIndex, int toIndex, boolean value) { - long stamp = rwLock.writeLock(); - try { - super.set(fromIndex, toIndex, value); - } finally { - rwLock.unlockWrite(stamp); - } - } - - @Override - public BitSet get(int fromIndex, int toIndex) { - long stamp = rwLock.tryOptimisticRead(); - BitSet bitSet = super.get(fromIndex, toIndex); - if (!rwLock.validate(stamp)) { - // Fallback to read lock - stamp = rwLock.readLock(); - try { - bitSet = super.get(fromIndex, toIndex); - } finally { - rwLock.unlockRead(stamp); - } - } - return bitSet; - } - - @Override - public boolean intersects(BitSet set) { - long stamp = rwLock.writeLock(); - try { - return super.intersects(set); - } finally { - rwLock.unlockWrite(stamp); - } - } - - @Override - public void and(BitSet set) { - long stamp = rwLock.writeLock(); - try { - super.and(set); - } finally { - rwLock.unlockWrite(stamp); - } - } - - @Override - public void or(BitSet set) { - long stamp = rwLock.writeLock(); - try { - super.or(set); - } finally { - rwLock.unlockWrite(stamp); - } - } - - @Override - public void xor(BitSet set) { - long stamp = rwLock.writeLock(); - try { - super.xor(set); - } finally { - rwLock.unlockWrite(stamp); - } - } - - @Override - public void andNot(BitSet set) { - long stamp = rwLock.writeLock(); - try { - super.andNot(set); - } finally { - rwLock.unlockWrite(stamp); - } - } - - /** - * Returns the clone of the internal wrapped {@code BitSet}. - * This won't be a clone of the {@code ConcurrentBitSet} object. - * - * @return a clone of the internal wrapped {@code BitSet} - */ - @Override - public Object clone() { - long stamp = rwLock.tryOptimisticRead(); - RoaringBitSet clone = (RoaringBitSet) super.clone(); - if (!rwLock.validate(stamp)) { - // Fallback to read lock - stamp = rwLock.readLock(); - try { - clone = (RoaringBitSet) super.clone(); - } finally { - rwLock.unlockRead(stamp); - } - } - return clone; - } - - @Override - public String toString() { - long stamp = rwLock.tryOptimisticRead(); - String str = super.toString(); - if (!rwLock.validate(stamp)) { - // Fallback to read lock - stamp = rwLock.readLock(); - try { - str = super.toString(); - } finally { - rwLock.unlockRead(stamp); - } - } - return str; - } - - /** - * This operation is not supported on {@code ConcurrentBitSet}. - */ - @Override - public IntStream stream() { - throw new UnsupportedOperationException("stream is not supported"); - } - - public boolean equals(final Object o) { - long stamp = rwLock.tryOptimisticRead(); - boolean isEqual = super.equals(o); - if (!rwLock.validate(stamp)) { - // Fallback to read lock - stamp = rwLock.readLock(); - try { - isEqual = super.equals(o); - } finally { - rwLock.unlockRead(stamp); - } - } - return isEqual; - } - - public int hashCode() { - long stamp = rwLock.tryOptimisticRead(); - int hashCode = super.hashCode(); - if (!rwLock.validate(stamp)) { - // Fallback to read lock - stamp = rwLock.readLock(); - try { - hashCode = super.hashCode(); - } finally { - rwLock.unlockRead(stamp); - } - } - return hashCode; - } -}