diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/RangeCache.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/RangeCache.java index a5b31335f35df..1de9429d7c0b7 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/RangeCache.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/RangeCache.java @@ -27,6 +27,7 @@ import java.util.concurrent.ConcurrentNavigableMap; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.atomic.AtomicLong; +import org.apache.commons.lang3.mutable.MutableBoolean; import org.apache.commons.lang3.tuple.Pair; /** @@ -73,12 +74,13 @@ public RangeCache(Weighter weighter, TimestampExtractor timestampE * @return whether the entry was inserted in the cache */ public boolean put(Key key, Value value) { - if (entries.putIfAbsent(key, value) == null) { + MutableBoolean flag = new MutableBoolean(); + entries.computeIfAbsent(key, (k) -> { size.addAndGet(weighter.getSize(value)); - return true; - } else { - return false; - } + flag.setValue(true); + return value; + }); + return flag.booleanValue(); } public Value get(Key key) { diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/util/RangeCacheTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/util/RangeCacheTest.java index 95896d24f35f2..f31aa4a74f9d1 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/util/RangeCacheTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/util/RangeCacheTest.java @@ -29,11 +29,15 @@ import io.netty.util.ReferenceCounted; import org.apache.commons.lang3.tuple.Pair; import org.testng.annotations.Test; +import java.util.UUID; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; public class RangeCacheTest { class RefString extends AbstractReferenceCounted implements ReferenceCounted { - final String s; + String s; RefString(String s) { super(); @@ -43,7 +47,7 @@ class RefString extends AbstractReferenceCounted implements ReferenceCounted { @Override protected void deallocate() { - // no-op + s = null; } @Override @@ -122,6 +126,7 @@ public void customWeighter() { assertEquals(cache.getNumberOfEntries(), 2); } + @Test public void customTimeExtraction() { RangeCache cache = new RangeCache<>(value -> value.s.length(), x -> x.s.length()); @@ -268,4 +273,24 @@ public void evictions() { assertEquals((long) res.getRight(), 10); assertEquals(cache.getSize(), 90); } + + @Test + public void testInParallel() { + RangeCache cache = new RangeCache<>(value -> value.s.length(), x -> 0); + ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); + executor.scheduleWithFixedDelay(cache::clear, 10, 10, TimeUnit.MILLISECONDS); + for (int i = 0; i < 1000; i++) { + cache.put(UUID.randomUUID().toString(), new RefString("zero")); + } + executor.shutdown(); + } + + @Test + public void testPutSameObj() { + RangeCache cache = new RangeCache<>(value -> value.s.length(), x -> 0); + RefString s0 = new RefString("zero"); + assertEquals(s0.refCnt(), 1); + assertTrue(cache.put(0, s0)); + assertFalse(cache.put(0, s0)); + } }