Skip to content

Commit

Permalink
Move DocsStats into Engine (elastic#33835)
Browse files Browse the repository at this point in the history
By moving DocStats into the engine we can easily cache the stats for
read-only engines if necessary. It also moves the responsibility out of IndexShard
which has quiet some complexity already.
  • Loading branch information
s1monw authored Sep 19, 2018
1 parent 6f3b333 commit 0c77f45
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 31 deletions.
36 changes: 36 additions & 0 deletions server/src/main/java/org/elasticsearch/index/engine/Engine.java
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
import org.elasticsearch.index.merge.MergeStats;
import org.elasticsearch.index.seqno.SeqNoStats;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.DocsStats;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.translog.Translog;
Expand Down Expand Up @@ -175,6 +176,41 @@ public MergeStats getMergeStats() {
/** Returns how many bytes we are currently moving from heap to disk */
public abstract long getWritingBytes();

/**
* Returns the {@link DocsStats} for this engine
*/
public DocsStats docStats() {
// we calculate the doc stats based on the internal reader that is more up-to-date and not subject
// to external refreshes. For instance we don't refresh an external reader if we flush and indices with
// index.refresh_interval=-1 won't see any doc stats updates at all. This change will give more accurate statistics
// when indexing but not refreshing in general. Yet, if a refresh happens the internal reader is refresh as well so we are
// safe here.
try (Engine.Searcher searcher = acquireSearcher("docStats", Engine.SearcherScope.INTERNAL)) {
return docsStats(searcher.reader());
}
}

protected final DocsStats docsStats(IndexReader indexReader) {
long numDocs = 0;
long numDeletedDocs = 0;
long sizeInBytes = 0;
// we don't wait for a pending refreshes here since it's a stats call instead we mark it as accessed only which will cause
// the next scheduled refresh to go through and refresh the stats as well
for (LeafReaderContext readerContext : indexReader.leaves()) {
// we go on the segment level here to get accurate numbers
final SegmentReader segmentReader = Lucene.segmentReader(readerContext.reader());
SegmentCommitInfo info = segmentReader.getSegmentInfo();
numDocs += readerContext.reader().numDocs();
numDeletedDocs += readerContext.reader().numDeletedDocs();
try {
sizeInBytes += info.sizeInBytes();
} catch (IOException e) {
logger.trace(() -> new ParameterizedMessage("failed to get size for [{}]", info.info.name), e);
}
}
return new DocsStats(numDocs, numDeletedDocs, sizeInBytes);
}

/**
* A throttling class that can be activated, causing the
* {@code acquireThrottle} method to block on a lock when throttling
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.seqno.SeqNoStats;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.DocsStats;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.TranslogStats;
Expand Down Expand Up @@ -63,6 +64,7 @@ public final class ReadOnlyEngine extends Engine {
private final SearcherManager searcherManager;
private final IndexCommit indexCommit;
private final Lock indexWriterLock;
private final DocsStats docsStats;

/**
* Creates a new ReadOnlyEngine. This ctor can also be used to open a read-only engine on top of an already opened
Expand Down Expand Up @@ -101,6 +103,7 @@ public ReadOnlyEngine(EngineConfig config, SeqNoStats seqNoStats, TranslogStats
this.indexCommit = reader.getIndexCommit();
this.searcherManager = new SearcherManager(reader,
new RamAccountingSearcherFactory(engineConfig.getCircuitBreakerService()));
this.docsStats = docsStats(reader);
this.indexWriterLock = indexWriterLock;
success = true;
} finally {
Expand Down Expand Up @@ -365,4 +368,9 @@ public void trimOperationsFromTranslog(long belowTerm, long aboveSeqNo) {
@Override
public void maybePruneDeletes() {
}

@Override
public DocsStats docStats() {
return docsStats;
}
}
33 changes: 3 additions & 30 deletions server/src/main/java/org/elasticsearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,9 @@

import com.carrotsearch.hppc.ObjectLongMap;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.index.CheckIndex;
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.SegmentCommitInfo;
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.index.SegmentReader;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.QueryCachingPolicy;
Expand Down Expand Up @@ -879,32 +875,9 @@ public FlushStats flushStats() {
}

public DocsStats docStats() {
// we calculate the doc stats based on the internal reader that is more up-to-date and not subject
// to external refreshes. For instance we don't refresh an external reader if we flush and indices with
// index.refresh_interval=-1 won't see any doc stats updates at all. This change will give more accurate statistics
// when indexing but not refreshing in general. Yet, if a refresh happens the internal reader is refresh as well so we are
// safe here.
long numDocs = 0;
long numDeletedDocs = 0;
long sizeInBytes = 0;
try (Engine.Searcher searcher = acquireSearcher("docStats", Engine.SearcherScope.INTERNAL)) {
// we don't wait for a pending refreshes here since it's a stats call instead we mark it as accessed only which will cause
// the next scheduled refresh to go through and refresh the stats as well
markSearcherAccessed();
for (LeafReaderContext reader : searcher.reader().leaves()) {
// we go on the segment level here to get accurate numbers
final SegmentReader segmentReader = Lucene.segmentReader(reader.reader());
SegmentCommitInfo info = segmentReader.getSegmentInfo();
numDocs += reader.reader().numDocs();
numDeletedDocs += reader.reader().numDeletedDocs();
try {
sizeInBytes += info.sizeInBytes();
} catch (IOException e) {
logger.trace(() -> new ParameterizedMessage("failed to get size for [{}]", info.info.name), e);
}
}
}
return new DocsStats(numDocs, numDeletedDocs, sizeInBytes);
DocsStats docsStats = getEngine().docStats();
markSearcherAccessed();
return docsStats;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2438,7 +2438,7 @@ public void testRecoverFromLocalShard() throws IOException {
closeShards(sourceShard, targetShard);
}

public void testDocStats() throws IOException, InterruptedException {
public void testDocStats() throws Exception {
IndexShard indexShard = null;
try {
indexShard = newStartedShard(
Expand All @@ -2455,7 +2455,14 @@ public void testDocStats() throws IOException, InterruptedException {
indexShard.flush(new FlushRequest());
}
{
IndexShard shard = indexShard;
assertBusy(() -> {
ThreadPool threadPool = shard.getThreadPool();
assertThat(threadPool.relativeTimeInMillis(), greaterThan(shard.getLastSearcherAccess()));
});
long prevAccessTime = shard.getLastSearcherAccess();
final DocsStats docsStats = indexShard.docStats();
assertThat("searcher was not marked as accessed", shard.getLastSearcherAccess(), greaterThan(prevAccessTime));
assertThat(docsStats.getCount(), equalTo(numDocs));
try (Engine.Searcher searcher = indexShard.acquireSearcher("test")) {
assertTrue(searcher.reader().numDocs() <= docsStats.getCount());
Expand Down Expand Up @@ -3412,4 +3419,9 @@ public void testResetEngine() throws Exception {
assertThat(shard.translogStats().estimatedNumberOfOperations(), equalTo(translogStats.estimatedNumberOfOperations()));
closeShard(shard, false);
}

@Override
public Settings threadPoolSettings() {
return Settings.builder().put(super.threadPoolSettings()).put("thread_pool.estimated_time_interval", "5ms").build();
}
}

0 comments on commit 0c77f45

Please sign in to comment.