From 0c77f45dc66a448fc37e5cf8a1984282083fc28f Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Wed, 19 Sep 2018 11:03:11 +0200 Subject: [PATCH] Move DocsStats into Engine (#33835) By moving DocStats into the engine we can easily cache the stats for read-only engines if necessary. It also moves the responsibility out of IndexShard which has quiet some complexity already. --- .../elasticsearch/index/engine/Engine.java | 36 +++++++++++++++++++ .../index/engine/ReadOnlyEngine.java | 8 +++++ .../elasticsearch/index/shard/IndexShard.java | 33 ++--------------- .../index/shard/IndexShardTests.java | 14 +++++++- 4 files changed, 60 insertions(+), 31 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/Engine.java b/server/src/main/java/org/elasticsearch/index/engine/Engine.java index fc693113fee53..f513a8577b6a1 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -66,6 +66,7 @@ import org.elasticsearch.index.merge.MergeStats; import org.elasticsearch.index.seqno.SeqNoStats; import org.elasticsearch.index.seqno.SequenceNumbers; +import org.elasticsearch.index.shard.DocsStats; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.store.Store; import org.elasticsearch.index.translog.Translog; @@ -175,6 +176,41 @@ public MergeStats getMergeStats() { /** Returns how many bytes we are currently moving from heap to disk */ public abstract long getWritingBytes(); + /** + * Returns the {@link DocsStats} for this engine + */ + public DocsStats docStats() { + // we calculate the doc stats based on the internal reader that is more up-to-date and not subject + // to external refreshes. For instance we don't refresh an external reader if we flush and indices with + // index.refresh_interval=-1 won't see any doc stats updates at all. This change will give more accurate statistics + // when indexing but not refreshing in general. Yet, if a refresh happens the internal reader is refresh as well so we are + // safe here. + try (Engine.Searcher searcher = acquireSearcher("docStats", Engine.SearcherScope.INTERNAL)) { + return docsStats(searcher.reader()); + } + } + + protected final DocsStats docsStats(IndexReader indexReader) { + long numDocs = 0; + long numDeletedDocs = 0; + long sizeInBytes = 0; + // we don't wait for a pending refreshes here since it's a stats call instead we mark it as accessed only which will cause + // the next scheduled refresh to go through and refresh the stats as well + for (LeafReaderContext readerContext : indexReader.leaves()) { + // we go on the segment level here to get accurate numbers + final SegmentReader segmentReader = Lucene.segmentReader(readerContext.reader()); + SegmentCommitInfo info = segmentReader.getSegmentInfo(); + numDocs += readerContext.reader().numDocs(); + numDeletedDocs += readerContext.reader().numDeletedDocs(); + try { + sizeInBytes += info.sizeInBytes(); + } catch (IOException e) { + logger.trace(() -> new ParameterizedMessage("failed to get size for [{}]", info.info.name), e); + } + } + return new DocsStats(numDocs, numDeletedDocs, sizeInBytes); + } + /** * A throttling class that can be activated, causing the * {@code acquireThrottle} method to block on a lock when throttling diff --git a/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java b/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java index b958bd84b76a6..80b653939299f 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java @@ -34,6 +34,7 @@ import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.seqno.SeqNoStats; import org.elasticsearch.index.seqno.SequenceNumbers; +import org.elasticsearch.index.shard.DocsStats; import org.elasticsearch.index.store.Store; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.index.translog.TranslogStats; @@ -63,6 +64,7 @@ public final class ReadOnlyEngine extends Engine { private final SearcherManager searcherManager; private final IndexCommit indexCommit; private final Lock indexWriterLock; + private final DocsStats docsStats; /** * Creates a new ReadOnlyEngine. This ctor can also be used to open a read-only engine on top of an already opened @@ -101,6 +103,7 @@ public ReadOnlyEngine(EngineConfig config, SeqNoStats seqNoStats, TranslogStats this.indexCommit = reader.getIndexCommit(); this.searcherManager = new SearcherManager(reader, new RamAccountingSearcherFactory(engineConfig.getCircuitBreakerService())); + this.docsStats = docsStats(reader); this.indexWriterLock = indexWriterLock; success = true; } finally { @@ -365,4 +368,9 @@ public void trimOperationsFromTranslog(long belowTerm, long aboveSeqNo) { @Override public void maybePruneDeletes() { } + + @Override + public DocsStats docStats() { + return docsStats; + } } diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 168444a226750..51549b439a37b 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -21,13 +21,9 @@ import com.carrotsearch.hppc.ObjectLongMap; import org.apache.logging.log4j.Logger; -import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.lucene.index.CheckIndex; import org.apache.lucene.index.IndexCommit; -import org.apache.lucene.index.LeafReaderContext; -import org.apache.lucene.index.SegmentCommitInfo; import org.apache.lucene.index.SegmentInfos; -import org.apache.lucene.index.SegmentReader; import org.apache.lucene.index.Term; import org.apache.lucene.search.Query; import org.apache.lucene.search.QueryCachingPolicy; @@ -879,32 +875,9 @@ public FlushStats flushStats() { } public DocsStats docStats() { - // we calculate the doc stats based on the internal reader that is more up-to-date and not subject - // to external refreshes. For instance we don't refresh an external reader if we flush and indices with - // index.refresh_interval=-1 won't see any doc stats updates at all. This change will give more accurate statistics - // when indexing but not refreshing in general. Yet, if a refresh happens the internal reader is refresh as well so we are - // safe here. - long numDocs = 0; - long numDeletedDocs = 0; - long sizeInBytes = 0; - try (Engine.Searcher searcher = acquireSearcher("docStats", Engine.SearcherScope.INTERNAL)) { - // we don't wait for a pending refreshes here since it's a stats call instead we mark it as accessed only which will cause - // the next scheduled refresh to go through and refresh the stats as well - markSearcherAccessed(); - for (LeafReaderContext reader : searcher.reader().leaves()) { - // we go on the segment level here to get accurate numbers - final SegmentReader segmentReader = Lucene.segmentReader(reader.reader()); - SegmentCommitInfo info = segmentReader.getSegmentInfo(); - numDocs += reader.reader().numDocs(); - numDeletedDocs += reader.reader().numDeletedDocs(); - try { - sizeInBytes += info.sizeInBytes(); - } catch (IOException e) { - logger.trace(() -> new ParameterizedMessage("failed to get size for [{}]", info.info.name), e); - } - } - } - return new DocsStats(numDocs, numDeletedDocs, sizeInBytes); + DocsStats docsStats = getEngine().docStats(); + markSearcherAccessed(); + return docsStats; } /** diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index b74b5343a82a1..c1803619ed5f3 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -2438,7 +2438,7 @@ public void testRecoverFromLocalShard() throws IOException { closeShards(sourceShard, targetShard); } - public void testDocStats() throws IOException, InterruptedException { + public void testDocStats() throws Exception { IndexShard indexShard = null; try { indexShard = newStartedShard( @@ -2455,7 +2455,14 @@ public void testDocStats() throws IOException, InterruptedException { indexShard.flush(new FlushRequest()); } { + IndexShard shard = indexShard; + assertBusy(() -> { + ThreadPool threadPool = shard.getThreadPool(); + assertThat(threadPool.relativeTimeInMillis(), greaterThan(shard.getLastSearcherAccess())); + }); + long prevAccessTime = shard.getLastSearcherAccess(); final DocsStats docsStats = indexShard.docStats(); + assertThat("searcher was not marked as accessed", shard.getLastSearcherAccess(), greaterThan(prevAccessTime)); assertThat(docsStats.getCount(), equalTo(numDocs)); try (Engine.Searcher searcher = indexShard.acquireSearcher("test")) { assertTrue(searcher.reader().numDocs() <= docsStats.getCount()); @@ -3412,4 +3419,9 @@ public void testResetEngine() throws Exception { assertThat(shard.translogStats().estimatedNumberOfOperations(), equalTo(translogStats.estimatedNumberOfOperations())); closeShard(shard, false); } + + @Override + public Settings threadPoolSettings() { + return Settings.builder().put(super.threadPoolSettings()).put("thread_pool.estimated_time_interval", "5ms").build(); + } }