Skip to content

Commit

Permalink
Replace version with reader cache key in IndicesRequestCache (#34189)
Browse files Browse the repository at this point in the history
Today we use the version of a DirectoryReader as a component of the key
of IndicesRequestCache. This usage is perfectly fine since the version
is advanced every time a new change is made into IndexWriter. In other
words, two DirectoryReaders with the same version should have the same
content. However, this invariant is only guaranteed in the context of a
single IndexWriter because the version is reset to the committed version
value when IndexWriter is re-opened.

Since #33473, each IndexShard may have more than one IndexWriter, and
using the version of a DirectoryReader as a part of the cache key can
cause IndicesRequestCache to return stale cached values. For example, in
#27650, we rollback the engine (i.e., re-open IndexWriter), index new
documents, refresh, then make a count request, but the search layer
mistakenly returns the count of the DirectoryReader of the previous
IndexWriter because the current DirectoryReader has the same version of
the old DirectoryReader even their documents are different. This is
possible because these two readers come from different IndexWriters.

This commit replaces the the version with the reader cache key of
IndexReader as a component of the cache key of IndicesRequestCache.

Closes #27650
Relates #33473
  • Loading branch information
dnhatn committed Oct 4, 2018
1 parent 8de6726 commit eed50b4
Show file tree
Hide file tree
Showing 5 changed files with 80 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,6 @@ protected void doRun() throws Exception {
return future;
}

@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/27650")
public void testRecoveryWithConcurrentIndexing() throws Exception {
final String index = "recovery_with_concurrent_indexing";
Response response = client().performRequest(new Request("GET", "_nodes"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,15 @@
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Supplier;

/**
* The indices request cache allows to cache a shard level request stage responses, helping with improving
* similar requests that are potentially expensive (because of aggs for example). The cache is fully coherent
* with the semantics of NRT (the index reader version is part of the cache key), and relies on size based
* with the semantics of NRT (the index reader cache key is part of the cache key), and relies on size based
* eviction to evict old reader associated cache entries as well as scheduler reaper to clean readers that
* are no longer used or closed shards.
* <p>
Expand Down Expand Up @@ -100,7 +101,7 @@ public void close() {
}

void clear(CacheEntity entity) {
keysToClean.add(new CleanupKey(entity, -1));
keysToClean.add(new CleanupKey(entity, null));
cleanCache();
}

Expand All @@ -111,13 +112,14 @@ public void onRemoval(RemovalNotification<Key, BytesReference> notification) {

BytesReference getOrCompute(CacheEntity cacheEntity, Supplier<BytesReference> loader,
DirectoryReader reader, BytesReference cacheKey) throws Exception {
final Key key = new Key(cacheEntity, reader.getVersion(), cacheKey);
assert reader.getReaderCacheHelper() != null;
final Key key = new Key(cacheEntity, reader.getReaderCacheHelper().getKey(), cacheKey);
Loader cacheLoader = new Loader(cacheEntity, loader);
BytesReference value = cache.computeIfAbsent(key, cacheLoader);
if (cacheLoader.isLoaded()) {
key.entity.onMiss();
// see if its the first time we see this reader, and make sure to register a cleanup key
CleanupKey cleanupKey = new CleanupKey(cacheEntity, reader.getVersion());
CleanupKey cleanupKey = new CleanupKey(cacheEntity, reader.getReaderCacheHelper().getKey());
if (!registeredClosedListeners.containsKey(cleanupKey)) {
Boolean previous = registeredClosedListeners.putIfAbsent(cleanupKey, Boolean.TRUE);
if (previous == null) {
Expand All @@ -137,7 +139,8 @@ BytesReference getOrCompute(CacheEntity cacheEntity, Supplier<BytesReference> lo
* @param cacheKey the cache key to invalidate
*/
void invalidate(CacheEntity cacheEntity, DirectoryReader reader, BytesReference cacheKey) {
cache.invalidate(new Key(cacheEntity, reader.getVersion(), cacheKey));
assert reader.getReaderCacheHelper() != null;
cache.invalidate(new Key(cacheEntity, reader.getReaderCacheHelper().getKey(), cacheKey));
}

private static class Loader implements CacheLoader<Key, BytesReference> {
Expand Down Expand Up @@ -206,12 +209,12 @@ static class Key implements Accountable {
private static final long BASE_RAM_BYTES_USED = RamUsageEstimator.shallowSizeOfInstance(Key.class);

public final CacheEntity entity; // use as identity equality
public final long readerVersion; // use the reader version to now keep a reference to a "short" lived reader until its reaped
public final IndexReader.CacheKey readerCacheKey;
public final BytesReference value;

Key(CacheEntity entity, long readerVersion, BytesReference value) {
Key(CacheEntity entity, IndexReader.CacheKey readerCacheKey, BytesReference value) {
this.entity = entity;
this.readerVersion = readerVersion;
this.readerCacheKey = Objects.requireNonNull(readerCacheKey);
this.value = value;
}

Expand All @@ -231,7 +234,7 @@ public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Key key = (Key) o;
if (readerVersion != key.readerVersion) return false;
if (Objects.equals(readerCacheKey, key.readerCacheKey) == false) return false;
if (!entity.getCacheIdentity().equals(key.entity.getCacheIdentity())) return false;
if (!value.equals(key.value)) return false;
return true;
Expand All @@ -240,19 +243,19 @@ public boolean equals(Object o) {
@Override
public int hashCode() {
int result = entity.getCacheIdentity().hashCode();
result = 31 * result + Long.hashCode(readerVersion);
result = 31 * result + readerCacheKey.hashCode();
result = 31 * result + value.hashCode();
return result;
}
}

private class CleanupKey implements IndexReader.ClosedListener {
final CacheEntity entity;
final long readerVersion; // use the reader version to now keep a reference to a "short" lived reader until its reaped
final IndexReader.CacheKey readerCacheKey;

private CleanupKey(CacheEntity entity, long readerVersion) {
private CleanupKey(CacheEntity entity, IndexReader.CacheKey readerCacheKey) {
this.entity = entity;
this.readerVersion = readerVersion;
this.readerCacheKey = readerCacheKey;
}

@Override
Expand All @@ -270,15 +273,15 @@ public boolean equals(Object o) {
return false;
}
CleanupKey that = (CleanupKey) o;
if (readerVersion != that.readerVersion) return false;
if (Objects.equals(readerCacheKey, that.readerCacheKey) == false) return false;
if (!entity.getCacheIdentity().equals(that.entity.getCacheIdentity())) return false;
return true;
}

@Override
public int hashCode() {
int result = entity.getCacheIdentity().hashCode();
result = 31 * result + Long.hashCode(readerVersion);
result = 31 * result + Objects.hashCode(readerCacheKey);
return result;
}
}
Expand All @@ -293,7 +296,7 @@ synchronized void cleanCache() {
for (Iterator<CleanupKey> iterator = keysToClean.iterator(); iterator.hasNext(); ) {
CleanupKey cleanupKey = iterator.next();
iterator.remove();
if (cleanupKey.readerVersion == -1 || cleanupKey.entity.isOpen() == false) {
if (cleanupKey.readerCacheKey == null || cleanupKey.entity.isOpen() == false) {
// -1 indicates full cleanup, as does a closed shard
currentFullClean.add(cleanupKey.entity.getCacheIdentity());
} else {
Expand All @@ -306,7 +309,7 @@ synchronized void cleanCache() {
if (currentFullClean.contains(key.entity.getCacheIdentity())) {
iterator.remove();
} else {
if (currentKeysToClean.contains(new CleanupKey(key.entity, key.readerVersion))) {
if (currentKeysToClean.contains(new CleanupKey(key.entity, key.readerCacheKey))) {
iterator.remove();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1166,10 +1166,9 @@ public boolean canCache(ShardSearchRequest request, SearchContext context) {
} else if (request.requestCache() == false) {
return false;
}
// if the reader is not a directory reader, we can't get the version from it
if ((context.searcher().getIndexReader() instanceof DirectoryReader) == false) {
return false;
}
// We use the cacheKey of the index reader as a part of a key of the IndicesRequestCache.
assert context.searcher().getIndexReader().getReaderCacheHelper() != null;

// if now in millis is used (or in the future, a more generic "isDeterministic" flag
// then we can't cache based on "now" key within the search request, as it is not deterministic
if (context.getQueryShardContext().isCachable() == false) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse;
import org.elasticsearch.action.admin.indices.stats.IndexStats;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.cluster.ClusterInfoService;
Expand Down Expand Up @@ -67,6 +68,7 @@
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.test.DummyShardLock;
import org.elasticsearch.test.ESSingleNodeTestCase;
import org.elasticsearch.test.IndexSettingsModule;
Expand Down Expand Up @@ -713,4 +715,37 @@ public void testGlobalCheckpointListenerTimeout() throws InterruptedException {
assertTrue(notified.get());
}

public void testInvalidateIndicesRequestCacheWhenRollbackEngine() throws Exception {
createIndex("test", Settings.builder().put("index.number_of_shards", 1).put("index.number_of_replicas", 0)
.put("index.refresh_interval", -1).build());
ensureGreen();
final IndicesService indicesService = getInstanceFromNode(IndicesService.class);
final IndexShard shard = indicesService.getShardOrNull(new ShardId(resolveIndex("test"), 0));
final SearchRequest countRequest = new SearchRequest("test").source(new SearchSourceBuilder().size(0));
final long numDocs = between(10, 20);
for (int i = 0; i < numDocs; i++) {
client().prepareIndex("test", "_doc", Integer.toString(i)).setSource("{}", XContentType.JSON).get();
if (randomBoolean()) {
shard.refresh("test");
}
}
shard.refresh("test");
assertThat(client().search(countRequest).actionGet().getHits().totalHits, equalTo(numDocs));
assertThat(shard.getLocalCheckpoint(), equalTo(shard.seqNoStats().getMaxSeqNo()));
shard.resetEngineToGlobalCheckpoint();
final long moreDocs = between(10, 20);
for (int i = 0; i < moreDocs; i++) {
client().prepareIndex("test", "_doc", Long.toString(i + numDocs)).setSource("{}", XContentType.JSON).get();
if (randomBoolean()) {
shard.refresh("test");
}
}
shard.refresh("test");
try (Engine.Searcher searcher = shard.acquireSearcher("test")) {
assertThat("numDocs=" + numDocs + " moreDocs=" + moreDocs, (long) searcher.reader().numDocs(), equalTo(numDocs + moreDocs));
}
assertThat("numDocs=" + numDocs + " moreDocs=" + moreDocs,
client().search(countRequest).actionGet().getHits().totalHits, equalTo(numDocs + moreDocs));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@
import org.apache.lucene.document.Field;
import org.apache.lucene.document.StringField;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.TermQuery;
Expand Down Expand Up @@ -119,7 +121,11 @@ public void testCacheDifferentReaders() throws Exception {
DirectoryReader reader = ElasticsearchDirectoryReader.wrap(DirectoryReader.open(writer), new ShardId("foo", "bar", 1));
TermQueryBuilder termQuery = new TermQueryBuilder("id", "0");
BytesReference termBytes = XContentHelper.toXContent(termQuery, XContentType.JSON, false);

if (randomBoolean()) {
writer.flush();
IOUtils.close(writer);
writer = new IndexWriter(dir, newIndexWriterConfig());
}
writer.updateDocument(new Term("id", "0"), newDoc(0, "bar"));
DirectoryReader secondReader = ElasticsearchDirectoryReader.wrap(DirectoryReader.open(writer), new ShardId("foo", "bar", 1));

Expand Down Expand Up @@ -424,14 +430,23 @@ public void testInvalidate() throws Exception {
assertEquals(0, cache.numRegisteredCloseListeners());
}

public void testEqualsKey() {
public void testEqualsKey() throws IOException {
AtomicBoolean trueBoolean = new AtomicBoolean(true);
AtomicBoolean falseBoolean = new AtomicBoolean(false);
IndicesRequestCache.Key key1 = new IndicesRequestCache.Key(new TestEntity(null, trueBoolean), 1L, new TestBytesReference(1));
IndicesRequestCache.Key key2 = new IndicesRequestCache.Key(new TestEntity(null, trueBoolean), 1L, new TestBytesReference(1));
IndicesRequestCache.Key key3 = new IndicesRequestCache.Key(new TestEntity(null, falseBoolean), 1L, new TestBytesReference(1));
IndicesRequestCache.Key key4 = new IndicesRequestCache.Key(new TestEntity(null, trueBoolean), 2L, new TestBytesReference(1));
IndicesRequestCache.Key key5 = new IndicesRequestCache.Key(new TestEntity(null, trueBoolean), 1L, new TestBytesReference(2));
Directory dir = newDirectory();
IndexWriterConfig config = newIndexWriterConfig();
IndexWriter writer = new IndexWriter(dir, config);
IndexReader reader1 = DirectoryReader.open(writer);
IndexReader.CacheKey rKey1 = reader1.getReaderCacheHelper().getKey();
writer.addDocument(new Document());
IndexReader reader2 = DirectoryReader.open(writer);
IndexReader.CacheKey rKey2 = reader2.getReaderCacheHelper().getKey();
IOUtils.close(reader1, reader2, writer, dir);
IndicesRequestCache.Key key1 = new IndicesRequestCache.Key(new TestEntity(null, trueBoolean), rKey1, new TestBytesReference(1));
IndicesRequestCache.Key key2 = new IndicesRequestCache.Key(new TestEntity(null, trueBoolean), rKey1, new TestBytesReference(1));
IndicesRequestCache.Key key3 = new IndicesRequestCache.Key(new TestEntity(null, falseBoolean), rKey1, new TestBytesReference(1));
IndicesRequestCache.Key key4 = new IndicesRequestCache.Key(new TestEntity(null, trueBoolean), rKey2, new TestBytesReference(1));
IndicesRequestCache.Key key5 = new IndicesRequestCache.Key(new TestEntity(null, trueBoolean), rKey1, new TestBytesReference(2));
String s = "Some other random object";
assertEquals(key1, key1);
assertEquals(key1, key2);
Expand Down

0 comments on commit eed50b4

Please sign in to comment.