Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.15] [Tiered Caching] [Bug Fix] Use concurrentMap instead of HashMap to fix Concurrent Modification Exception #14254

Merged
merged 2 commits into from
Jun 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions release-notes/opensearch.release-notes-2.15.0.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,3 +72,4 @@
- Java high-level REST client bulk() is not respecting the bulkRequest.requireAlias(true) method call ([#14146](https://github.com/opensearch-project/OpenSearch/pull/14146))
- Fix ShardNotFoundException during request cache clean up ([#14219](https://github.com/opensearch-project/OpenSearch/pull/14219))
- Fix the rewrite method for MatchOnlyText field query ([#14248](https://github.com/opensearch-project/OpenSearch/pull/14248))
- Fix Concurrent Modification Exception in Indices Request Cache([#14032](https://github.com/opensearch-project/OpenSearch/pull/14221))
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
Expand Down Expand Up @@ -508,7 +507,7 @@ public int hashCode() {
* */
class IndicesRequestCacheCleanupManager implements Closeable {
private final Set<CleanupKey> keysToClean;
private final ConcurrentMap<ShardId, HashMap<String, Integer>> cleanupKeyToCountMap;
private final ConcurrentMap<ShardId, ConcurrentMap<String, Integer>> cleanupKeyToCountMap;
private final AtomicInteger staleKeysCount;
private volatile double stalenessThreshold;
private final IndicesRequestCacheCleaner cacheCleaner;
Expand Down Expand Up @@ -569,7 +568,13 @@ private void updateStaleCountOnCacheInsert(CleanupKey cleanupKey) {

// If the key doesn't exist, it's added with a value of 1.
// If the key exists, its value is incremented by 1.
cleanupKeyToCountMap.computeIfAbsent(shardId, k -> new HashMap<>()).merge(cleanupKey.readerCacheKeyId, 1, Integer::sum);
addToCleanupKeyToCountMap(shardId, cleanupKey.readerCacheKeyId);
}

// pkg-private for testing
void addToCleanupKeyToCountMap(ShardId shardId, String readerCacheKeyId) {
cleanupKeyToCountMap.computeIfAbsent(shardId, k -> ConcurrentCollections.newConcurrentMap())
.merge(readerCacheKeyId, 1, Integer::sum);
}

/**
Expand Down Expand Up @@ -827,7 +832,7 @@ public void close() {
}

// for testing
ConcurrentMap<ShardId, HashMap<String, Integer>> getCleanupKeyToCountMap() {
ConcurrentMap<ShardId, ConcurrentMap<String, Integer>> getCleanupKeyToCountMap() {
return cleanupKeyToCountMap;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.ConcurrentModificationException;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand All @@ -105,7 +105,9 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import static java.util.Collections.emptyMap;
Expand Down Expand Up @@ -489,7 +491,7 @@ public void testStaleCount_OnRemovalNotificationOfStaleKey_DecrementsStaleCount(
indexShard.hashCode()
);
// test the mapping
ConcurrentMap<ShardId, HashMap<String, Integer>> cleanupKeyToCountMap = cache.cacheCleanupManager.getCleanupKeyToCountMap();
ConcurrentMap<ShardId, ConcurrentMap<String, Integer>> cleanupKeyToCountMap = cache.cacheCleanupManager.getCleanupKeyToCountMap();
// shard id should exist
assertTrue(cleanupKeyToCountMap.containsKey(shardId));
// reader CacheKeyId should NOT exist
Expand Down Expand Up @@ -552,7 +554,7 @@ public void testStaleCount_OnRemovalNotificationOfNonStaleKey_DoesNotDecrementsS
);

// test the mapping
ConcurrentMap<ShardId, HashMap<String, Integer>> cleanupKeyToCountMap = cache.cacheCleanupManager.getCleanupKeyToCountMap();
ConcurrentMap<ShardId, ConcurrentMap<String, Integer>> cleanupKeyToCountMap = cache.cacheCleanupManager.getCleanupKeyToCountMap();
// shard id should exist
assertTrue(cleanupKeyToCountMap.containsKey(shardId));
// reader CacheKeyId should NOT exist
Expand Down Expand Up @@ -720,7 +722,7 @@ public void testCleanupKeyToCountMapAreSetAppropriately() throws Exception {
cache.getOrCompute(getEntity(indexShard), getLoader(reader), reader, getTermBytes());
assertEquals(1, cache.count());
// test the mappings
ConcurrentMap<ShardId, HashMap<String, Integer>> cleanupKeyToCountMap = cache.cacheCleanupManager.getCleanupKeyToCountMap();
ConcurrentMap<ShardId, ConcurrentMap<String, Integer>> cleanupKeyToCountMap = cache.cacheCleanupManager.getCleanupKeyToCountMap();
assertEquals(1, (int) cleanupKeyToCountMap.get(shardId).get(getReaderCacheKeyId(reader)));

cache.getOrCompute(getEntity(indexShard), getLoader(secondReader), secondReader, getTermBytes());
Expand Down Expand Up @@ -793,8 +795,54 @@ public void testCleanupKeyToCountMapAreSetAppropriately() throws Exception {
IOUtils.close(secondReader);
}

private DirectoryReader getReader(IndexWriter writer, ShardId shardId) throws IOException {
return OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), shardId);
// test adding to cleanupKeyToCountMap with multiple threads
public void testAddToCleanupKeyToCountMap() throws Exception {
threadPool = getThreadPool();
Settings settings = Settings.builder().put(INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING.getKey(), "51%").build();
cache = getIndicesRequestCache(settings);

int numberOfThreads = 10;
int numberOfIterations = 1000;
Phaser phaser = new Phaser(numberOfThreads + 1); // +1 for the main thread
AtomicBoolean exceptionDetected = new AtomicBoolean(false);

ExecutorService executorService = Executors.newFixedThreadPool(numberOfThreads);

for (int i = 0; i < numberOfThreads; i++) {
executorService.submit(() -> {
phaser.arriveAndAwaitAdvance(); // Ensure all threads start at the same time
try {
for (int j = 0; j < numberOfIterations; j++) {
cache.cacheCleanupManager.addToCleanupKeyToCountMap(indexShard.shardId(), UUID.randomUUID().toString());
}
} catch (ConcurrentModificationException e) {
logger.error("ConcurrentModificationException detected in thread : " + e.getMessage());
exceptionDetected.set(true); // Set flag if exception is detected
}
});
}
phaser.arriveAndAwaitAdvance(); // Start all threads

// Main thread iterates over the map
executorService.submit(() -> {
try {
for (int j = 0; j < numberOfIterations; j++) {
cache.cacheCleanupManager.getCleanupKeyToCountMap().forEach((k, v) -> {
v.forEach((k1, v1) -> {
// Accessing the map to create contention
v.get(k1);
});
});
}
} catch (ConcurrentModificationException e) {
logger.error("ConcurrentModificationException detected in main thread : " + e.getMessage());
exceptionDetected.set(true); // Set flag if exception is detected
}
});

executorService.shutdown();
executorService.awaitTermination(60, TimeUnit.SECONDS);
assertFalse(exceptionDetected.get());
}

private IndicesRequestCache getIndicesRequestCache(Settings settings) {
Expand All @@ -808,6 +856,10 @@ private IndicesRequestCache getIndicesRequestCache(Settings settings) {
);
}

private DirectoryReader getReader(IndexWriter writer, ShardId shardId) throws IOException {
return OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), shardId);
}

private Loader getLoader(DirectoryReader reader) {
return new Loader(reader, 0);
}
Expand Down
Loading