Skip to content

Commit

Permalink
[fix][ML]Fix NPE when put value to RangeCache. (#15707)
Browse files Browse the repository at this point in the history
### Motivation

When `ReferenceCounted` object overrides the method `deallocate` to make the `getLength` value equal null will cause NPE because the `RangeCache#put` method is not thread-safe. (The process of describing this abstraction is not very clear, please refer to the code modification :)

Pulsar implementation may throw an exception to make `OpAddEntry` fail abnormal and fence the topic. relative code as below:

https://github.com/apache/pulsar/blob/defeec0e84a63ea865f3a2790bc61b66a02254c5/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java#L211-L217

**Exception screenshot:**

```java
java.lang.NullPointerException: Cannot invoke "String.length()" because "value.s" is null

	at org.apache.bookkeeper.mledger.util.RangeCacheTest.lambda$testInParallel$6(RangeCacheTest.java:279)
	at org.apache.bookkeeper.mledger.util.RangeCache.put(RangeCache.java:77)
	at org.apache.bookkeeper.mledger.util.RangeCacheTest.testInParallel(RangeCacheTest.java:283)
	at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:104)
	at java.base/java.lang.reflect.Method.invoke(Method.java:577)
	at org.testng.internal.MethodInvocationHelper.invokeMethod(MethodInvocationHelper.java:132)
	at org.testng.internal.TestInvoker.invokeMethod(TestInvoker.java:599)
	at org.testng.internal.TestInvoker.invokeTestMethod(TestInvoker.java:174)
	at org.testng.internal.MethodRunner.runInSequence(MethodRunner.java:46)
	at org.testng.internal.TestInvoker$MethodInvocationAgent.invoke(TestInvoker.java:822)
	at org.testng.internal.TestInvoker.invokeTestMethods(TestInvoker.java:147)
	at org.testng.internal.TestMethodWorker.invokeTestMethods(TestMethodWorker.java:146)
	at org.testng.internal.TestMethodWorker.run(TestMethodWorker.java:128)
	at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
	at org.testng.TestRunner.privateRun(TestRunner.java:764)
	at org.testng.TestRunner.run(TestRunner.java:585)
	at org.testng.SuiteRunner.runTest(SuiteRunner.java:384)
	at org.testng.SuiteRunner.runSequentially(SuiteRunner.java:378)
	at org.testng.SuiteRunner.privateRun(SuiteRunner.java:337)
	at org.testng.SuiteRunner.run(SuiteRunner.java:286)
	at org.testng.SuiteRunnerWorker.runSuite(SuiteRunnerWorker.java:53)
	at org.testng.SuiteRunnerWorker.run(SuiteRunnerWorker.java:96)
	at org.testng.TestNG.runSuitesSequentially(TestNG.java:1218)
	at org.testng.TestNG.runSuitesLocally(TestNG.java:1140)
	at org.testng.TestNG.runSuites(TestNG.java:1069)
	at org.testng.TestNG.run(TestNG.java:1037)
	at com.intellij.rt.testng.IDEARemoteTestNG.run(IDEARemoteTestNG.java:66)
	at com.intellij.rt.testng.RemoteTestNGStarter.main(RemoteTestNGStarter.java:109)
```

### Modifications

- Make the `RangeCache#put` method to thread-safe.
  • Loading branch information
mattisonchao authored May 24, 2022
1 parent 20a22a0 commit b155d84
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.lang3.mutable.MutableBoolean;
import org.apache.commons.lang3.tuple.Pair;

/**
Expand Down Expand Up @@ -73,12 +74,13 @@ public RangeCache(Weighter<Value> weighter, TimestampExtractor<Value> timestampE
* @return whether the entry was inserted in the cache
*/
public boolean put(Key key, Value value) {
if (entries.putIfAbsent(key, value) == null) {
MutableBoolean flag = new MutableBoolean();
entries.computeIfAbsent(key, (k) -> {
size.addAndGet(weighter.getSize(value));
return true;
} else {
return false;
}
flag.setValue(true);
return value;
});
return flag.booleanValue();
}

public Value get(Key key) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,15 @@
import io.netty.util.ReferenceCounted;
import org.apache.commons.lang3.tuple.Pair;
import org.testng.annotations.Test;
import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class RangeCacheTest {

class RefString extends AbstractReferenceCounted implements ReferenceCounted {
final String s;
String s;

RefString(String s) {
super();
Expand All @@ -43,7 +47,7 @@ class RefString extends AbstractReferenceCounted implements ReferenceCounted {

@Override
protected void deallocate() {
// no-op
s = null;
}

@Override
Expand Down Expand Up @@ -122,6 +126,7 @@ public void customWeighter() {
assertEquals(cache.getNumberOfEntries(), 2);
}


@Test
public void customTimeExtraction() {
RangeCache<Integer, RefString> cache = new RangeCache<>(value -> value.s.length(), x -> x.s.length());
Expand Down Expand Up @@ -268,4 +273,24 @@ public void evictions() {
assertEquals((long) res.getRight(), 10);
assertEquals(cache.getSize(), 90);
}

@Test
public void testInParallel() {
RangeCache<String, RefString> cache = new RangeCache<>(value -> value.s.length(), x -> 0);
ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
executor.scheduleWithFixedDelay(cache::clear, 10, 10, TimeUnit.MILLISECONDS);
for (int i = 0; i < 1000; i++) {
cache.put(UUID.randomUUID().toString(), new RefString("zero"));
}
executor.shutdown();
}

@Test
public void testPutSameObj() {
RangeCache<Integer, RefString> cache = new RangeCache<>(value -> value.s.length(), x -> 0);
RefString s0 = new RefString("zero");
assertEquals(s0.refCnt(), 1);
assertTrue(cache.put(0, s0));
assertFalse(cache.put(0, s0));
}
}

0 comments on commit b155d84

Please sign in to comment.