Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Tiered Caching] Enabling serialization for IndicesRequestCache key o… #10275

Merged
merged 23 commits into from
Jan 11, 2024
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
df8b26e
[Tiered Caching] Enabling serialization for IndicesRequestCache key o…
sgup432 Sep 29, 2023
93c1172
Fixing javadoc issue
sgup432 Sep 29, 2023
a03af2d
Addressing comments
sgup432 Oct 26, 2023
02d9864
Merge branch 'main' into cache_key_serialization
sgup432 Oct 26, 2023
40cc1e2
Adding changelog
sgup432 Oct 26, 2023
7f2c833
Update server/src/main/java/org/opensearch/indices/IndicesRequestCach…
sgup432 Nov 6, 2023
4d2b5ed
Update server/src/main/java/org/opensearch/indices/IndicesRequestCach…
sgup432 Nov 6, 2023
ebeadf9
Update server/src/main/java/org/opensearch/indices/IndicesRequestCach…
sgup432 Nov 6, 2023
6acc85d
Making DelegatingCache helper and key members private
sgup432 Nov 6, 2023
314f5d0
Merge branch 'main' into cache_key_serialization
sgup432 Nov 6, 2023
25b12e5
Merge branch 'main' into cache_key_serialization
sgup432 Dec 19, 2023
00091c3
Fixing issue where RamUsageEstimator is getting initialized for every…
sgup432 Jan 2, 2024
dc3f903
Making delegating cache helper/key members final
sgup432 Jan 2, 2024
faacca9
Merge branch 'main' into cache_key_serialization
sgup432 Jan 2, 2024
f051604
Using shardId as cache entity in IndicesRequestCache key
sgup432 Jan 6, 2024
bb232f2
Changing cacheEntity function name and removing volatile with concurr…
sgup432 Jan 8, 2024
62b0058
Reverting to old changes for indices map
sgup432 Jan 8, 2024
5c5af1d
Fixing issue where index shard might get closed before it is removed …
sgup432 Jan 9, 2024
ccde0ea
Fixing IndicesServiceCloseTests unit test
sgup432 Jan 9, 2024
bde4ec9
Merge branch 'main' into cache_key_serialization
sgup432 Jan 9, 2024
0850e11
Merge branch 'main' into cache_key_serialization
sgup432 Jan 9, 2024
b6e2c4a
Changing logic to fetch Index shard from shardId
sgup432 Jan 10, 2024
d744da2
Merge branch 'main' into cache_key_serialization
sgup432 Jan 11, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add search query categorizer ([#10255](https://github.com/opensearch-project/OpenSearch/pull/10255))
- Per request phase latency ([#10351](https://github.com/opensearch-project/OpenSearch/issues/10351))
- Add cluster state stats ([#10670](https://github.com/opensearch-project/OpenSearch/pull/10670))
- [Tiered caching] Enabling serialization for IndicesRequestCache key object ([#10275](https://github.com/opensearch-project/OpenSearch/pull/10275))
- [Remote cluster state] Restore cluster state version during remote state auto restore ([#10853](https://github.com/opensearch-project/OpenSearch/pull/10853))
- Update the indexRandom function to create more segments for concurrent search tests ([10247](https://github.com/opensearch-project/OpenSearch/pull/10247))
- Add support for query profiler with concurrent aggregation ([#9248](https://github.com/opensearch-project/OpenSearch/pull/9248))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

package org.opensearch.core.index.shard;

import org.apache.lucene.util.RamUsageEstimator;
import org.opensearch.common.annotation.PublicApi;
import org.opensearch.core.common.Strings;
import org.opensearch.core.common.io.stream.StreamInput;
Expand All @@ -55,6 +56,8 @@ public class ShardId implements Comparable<ShardId>, ToXContentFragment, Writeab
private final int shardId;
private final int hashCode;

private final static long BASE_RAM_BYTES_USED = RamUsageEstimator.shallowSizeOfInstance(ShardId.class);

/**
* Constructs a new shard id.
* @param index the index name
Expand Down Expand Up @@ -88,6 +91,10 @@ public ShardId(StreamInput in) throws IOException {
hashCode = computeHashCode();
}

public long getBaseRamBytesUsed() {
return BASE_RAM_BYTES_USED;
}

/**
* Writes this shard id to a stream.
* @param out the stream to write to
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -636,6 +636,45 @@ public void testProfileDisableCache() throws Exception {
}
}

public void testCacheWithInvalidation() throws Exception {
Client client = client();
assertAcked(
client.admin()
.indices()
.prepareCreate("index")
.setMapping("k", "type=keyword")
.setSettings(
Settings.builder()
.put(IndicesRequestCache.INDEX_CACHE_REQUEST_ENABLED_SETTING.getKey(), true)
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
)
.get()
);
indexRandom(true, client.prepareIndex("index").setSource("k", "hello"));
ensureSearchable("index");
SearchResponse resp = client.prepareSearch("index").setRequestCache(true).setQuery(QueryBuilders.termQuery("k", "hello")).get();
assertSearchResponse(resp);
OpenSearchAssertions.assertAllSuccessful(resp);
assertThat(resp.getHits().getTotalHits().value, equalTo(1L));

assertCacheState(client, "index", 0, 1);
// Index but don't refresh
indexRandom(false, client.prepareIndex("index").setSource("k", "hello2"));
resp = client.prepareSearch("index").setRequestCache(true).setQuery(QueryBuilders.termQuery("k", "hello")).get();
assertSearchResponse(resp);
// Should expect hit as here as refresh didn't happen
assertCacheState(client, "index", 1, 1);

// Explicit refresh would invalidate cache
refresh();
// Hit same query again
resp = client.prepareSearch("index").setRequestCache(true).setQuery(QueryBuilders.termQuery("k", "hello")).get();
assertSearchResponse(resp);
// Should expect miss as key has changed due to change in IndexReader.CacheKey (due to refresh)
assertCacheState(client, "index", 1, 2);
}

private static void assertCacheState(Client client, String index, long expectedHits, long expectedMisses) {
RequestCacheStats requestCacheStats = client.admin()
.indices()
Expand All @@ -650,6 +689,7 @@ private static void assertCacheState(Client client, String index, long expectedH
Arrays.asList(expectedHits, expectedMisses, 0L),
Arrays.asList(requestCacheStats.getHitCount(), requestCacheStats.getMissCount(), requestCacheStats.getEvictions())
);

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.opensearch.core.index.shard.ShardId;

import java.io.IOException;
import java.util.UUID;

/**
* A {@link org.apache.lucene.index.FilterDirectoryReader} that exposes
Expand All @@ -53,11 +54,14 @@ public final class OpenSearchDirectoryReader extends FilterDirectoryReader {
private final ShardId shardId;
private final FilterDirectoryReader.SubReaderWrapper wrapper;

private final DelegatingCacheHelper delegatingCacheHelper;

private OpenSearchDirectoryReader(DirectoryReader in, FilterDirectoryReader.SubReaderWrapper wrapper, ShardId shardId)
throws IOException {
super(in, wrapper);
this.wrapper = wrapper;
this.shardId = shardId;
this.delegatingCacheHelper = new DelegatingCacheHelper(in.getReaderCacheHelper());
}

/**
Expand All @@ -70,7 +74,61 @@ public ShardId shardId() {
@Override
public CacheHelper getReaderCacheHelper() {
// safe to delegate since this reader does not alter the index
return in.getReaderCacheHelper();
return this.delegatingCacheHelper;
}

public DelegatingCacheHelper getDelegatingCacheHelper() {
return this.delegatingCacheHelper;
}

/**
* Wraps existing IndexReader cache helper which internally provides a way to wrap CacheKey.
* @opensearch.internal
*/
public class DelegatingCacheHelper implements CacheHelper {
private final CacheHelper cacheHelper;
private final DelegatingCacheKey serializableCacheKey;

DelegatingCacheHelper(CacheHelper cacheHelper) {
this.cacheHelper = cacheHelper;
this.serializableCacheKey = new DelegatingCacheKey(cacheHelper.getKey());
}

@Override
public CacheKey getKey() {
return this.cacheHelper.getKey();
}

public DelegatingCacheKey getDelegatingCacheKey() {
return this.serializableCacheKey;
}

@Override
public void addClosedListener(ClosedListener listener) {
this.cacheHelper.addClosedListener(listener);
}
}

/**
* Wraps internal IndexReader.CacheKey and attaches a uniqueId to it which can be eventually be used instead of
* object itself for serialization purposes.
*/
public class DelegatingCacheKey {
private final CacheKey cacheKey;
private final String uniqueId;

DelegatingCacheKey(CacheKey cacheKey) {
this.cacheKey = cacheKey;
this.uniqueId = UUID.randomUUID().toString();
}

public CacheKey getCacheKey() {
return this.cacheKey;
}

public String getId() {
return uniqueId;
}
}

@Override
Expand Down
Loading
Loading