Skip to content

Commit

Permalink
IndexShard should not return null stats (#31528)
Browse files Browse the repository at this point in the history
IndexShard should not return null stats - empty stats or AlreadyCloseException if it's closed is better
  • Loading branch information
vladimirdolzhenko authored Jun 22, 2018
1 parent 7313a98 commit f04c579
Show file tree
Hide file tree
Showing 8 changed files with 148 additions and 69 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.elasticsearch.action.admin.cluster.stats;

import org.apache.lucene.store.AlreadyClosedException;
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.admin.cluster.node.info.NodeInfo;
import org.elasticsearch.action.admin.cluster.node.stats.NodeStats;
Expand All @@ -36,6 +37,8 @@
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.engine.CommitStats;
import org.elasticsearch.index.seqno.SeqNoStats;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.node.NodeService;
Expand Down Expand Up @@ -96,13 +99,23 @@ protected ClusterStatsNodeResponse nodeOperation(ClusterStatsNodeRequest nodeReq
for (IndexShard indexShard : indexService) {
if (indexShard.routingEntry() != null && indexShard.routingEntry().active()) {
// only report on fully started shards
CommitStats commitStats;
SeqNoStats seqNoStats;
try {
commitStats = indexShard.commitStats();
seqNoStats = indexShard.seqNoStats();
} catch (AlreadyClosedException e) {
// shard is closed - no stats is fine
commitStats = null;
seqNoStats = null;
}
shardsStats.add(
new ShardStats(
indexShard.routingEntry(),
indexShard.shardPath(),
new CommonStats(indicesService.getIndicesQueryCache(), indexShard, SHARD_STATS_FLAGS),
indexShard.commitStats(),
indexShard.seqNoStats()));
commitStats,
seqNoStats));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.elasticsearch.action.admin.indices.stats;

import org.apache.lucene.store.AlreadyClosedException;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
Expand Down Expand Up @@ -167,57 +168,61 @@ public CommonStats(CommonStatsFlags flags) {
public CommonStats(IndicesQueryCache indicesQueryCache, IndexShard indexShard, CommonStatsFlags flags) {
CommonStatsFlags.Flag[] setFlags = flags.getFlags();
for (CommonStatsFlags.Flag flag : setFlags) {
switch (flag) {
case Docs:
docs = indexShard.docStats();
break;
case Store:
store = indexShard.storeStats();
break;
case Indexing:
indexing = indexShard.indexingStats(flags.types());
break;
case Get:
get = indexShard.getStats();
break;
case Search:
search = indexShard.searchStats(flags.groups());
break;
case Merge:
merge = indexShard.mergeStats();
break;
case Refresh:
refresh = indexShard.refreshStats();
break;
case Flush:
flush = indexShard.flushStats();
break;
case Warmer:
warmer = indexShard.warmerStats();
break;
case QueryCache:
queryCache = indicesQueryCache.getStats(indexShard.shardId());
break;
case FieldData:
fieldData = indexShard.fieldDataStats(flags.fieldDataFields());
break;
case Completion:
completion = indexShard.completionStats(flags.completionDataFields());
break;
case Segments:
segments = indexShard.segmentStats(flags.includeSegmentFileSizes());
break;
case Translog:
translog = indexShard.translogStats();
break;
case RequestCache:
requestCache = indexShard.requestCache().stats();
break;
case Recovery:
recoveryStats = indexShard.recoveryStats();
break;
default:
throw new IllegalStateException("Unknown Flag: " + flag);
try {
switch (flag) {
case Docs:
docs = indexShard.docStats();
break;
case Store:
store = indexShard.storeStats();
break;
case Indexing:
indexing = indexShard.indexingStats(flags.types());
break;
case Get:
get = indexShard.getStats();
break;
case Search:
search = indexShard.searchStats(flags.groups());
break;
case Merge:
merge = indexShard.mergeStats();
break;
case Refresh:
refresh = indexShard.refreshStats();
break;
case Flush:
flush = indexShard.flushStats();
break;
case Warmer:
warmer = indexShard.warmerStats();
break;
case QueryCache:
queryCache = indicesQueryCache.getStats(indexShard.shardId());
break;
case FieldData:
fieldData = indexShard.fieldDataStats(flags.fieldDataFields());
break;
case Completion:
completion = indexShard.completionStats(flags.completionDataFields());
break;
case Segments:
segments = indexShard.segmentStats(flags.includeSegmentFileSizes());
break;
case Translog:
translog = indexShard.translogStats();
break;
case RequestCache:
requestCache = indexShard.requestCache().stats();
break;
case Recovery:
recoveryStats = indexShard.recoveryStats();
break;
default:
throw new IllegalStateException("Unknown Flag: " + flag);
}
} catch (AlreadyClosedException e) {
// shard is closed - no stats is fine
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ public CommonStats getStats() {
return this.commonStats;
}

@Nullable
public CommitStats getCommitStats() {
return this.commitStats;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.elasticsearch.action.admin.indices.stats;

import org.apache.lucene.store.AlreadyClosedException;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.DefaultShardOperationFailedException;
import org.elasticsearch.action.support.broadcast.node.TransportBroadcastByNodeAction;
Expand All @@ -33,6 +34,8 @@
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.engine.CommitStats;
import org.elasticsearch.index.seqno.SeqNoStats;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardNotFoundException;
import org.elasticsearch.indices.IndicesService;
Expand Down Expand Up @@ -100,7 +103,17 @@ protected ShardStats shardOperation(IndicesStatsRequest request, ShardRouting sh
}

CommonStats commonStats = new CommonStats(indicesService.getIndicesQueryCache(), indexShard, request.flags());
CommitStats commitStats;
SeqNoStats seqNoStats;
try {
commitStats = indexShard.commitStats();
seqNoStats = indexShard.seqNoStats();
} catch (AlreadyClosedException e) {
// shard is closed - no stats is fine
commitStats = null;
seqNoStats = null;
}
return new ShardStats(indexShard.routingEntry(), indexShard.shardPath(), commonStats,
indexShard.commitStats(), indexShard.seqNoStats());
commitStats, seqNoStats);
}
}
16 changes: 6 additions & 10 deletions server/src/main/java/org/elasticsearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -868,21 +868,19 @@ public DocsStats docStats() {
}

/**
* @return {@link CommitStats} if engine is open, otherwise null
* @return {@link CommitStats}
* @throws AlreadyClosedException if shard is closed
*/
@Nullable
public CommitStats commitStats() {
Engine engine = getEngineOrNull();
return engine == null ? null : engine.commitStats();
return getEngine().commitStats();
}

/**
* @return {@link SeqNoStats} if engine is open, otherwise null
* @return {@link SeqNoStats}
* @throws AlreadyClosedException if shard is closed
*/
@Nullable
public SeqNoStats seqNoStats() {
Engine engine = getEngineOrNull();
return engine == null ? null : engine.getSeqNoStats(replicationTracker.getGlobalCheckpoint());
return getEngine().getSeqNoStats(replicationTracker.getGlobalCheckpoint());
}

public IndexingStats indexingStats(String... types) {
Expand Down Expand Up @@ -912,8 +910,6 @@ public StoreStats storeStats() {
return store.stats();
} catch (IOException e) {
throw new ElasticsearchException("io exception while building 'store stats'", e);
} catch (AlreadyClosedException ex) {
return null; // already closed
}
}

Expand Down
17 changes: 15 additions & 2 deletions server/src/main/java/org/elasticsearch/indices/IndicesService.java
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.analysis.AnalysisRegistry;
import org.elasticsearch.index.cache.request.ShardRequestCache;
import org.elasticsearch.index.engine.CommitStats;
import org.elasticsearch.index.engine.EngineFactory;
import org.elasticsearch.index.engine.InternalEngineFactory;
import org.elasticsearch.index.fielddata.IndexFieldDataCache;
Expand All @@ -91,6 +92,7 @@
import org.elasticsearch.index.recovery.RecoveryStats;
import org.elasticsearch.index.refresh.RefreshStats;
import org.elasticsearch.index.search.stats.SearchStats;
import org.elasticsearch.index.seqno.SeqNoStats;
import org.elasticsearch.index.shard.IllegalIndexShardStateException;
import org.elasticsearch.index.shard.IndexEventListener;
import org.elasticsearch.index.shard.IndexShard;
Expand Down Expand Up @@ -333,13 +335,24 @@ IndexShardStats indexShardStats(final IndicesService indicesService, final Index
return null;
}

CommitStats commitStats;
SeqNoStats seqNoStats;
try {
commitStats = indexShard.commitStats();
seqNoStats = indexShard.seqNoStats();
} catch (AlreadyClosedException e) {
// shard is closed - no stats is fine
commitStats = null;
seqNoStats = null;
}

return new IndexShardStats(indexShard.shardId(),
new ShardStats[] {
new ShardStats(indexShard.routingEntry(),
indexShard.shardPath(),
new CommonStats(indicesService.getIndicesQueryCache(), indexShard, flags),
indexShard.commitStats(),
indexShard.seqNoStats())
commitStats,
seqNoStats)
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.engine.CommitStats;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.EngineException;
import org.elasticsearch.index.engine.EngineTestCase;
Expand All @@ -88,6 +89,7 @@
import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.index.mapper.SourceToParse;
import org.elasticsearch.index.mapper.Uid;
import org.elasticsearch.index.seqno.SeqNoStats;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus;
import org.elasticsearch.index.store.Store;
Expand Down Expand Up @@ -3082,4 +3084,36 @@ public void onShardInactive(IndexShard indexShard) {
closeShards(primary);
}

public void testOnCloseStats() throws IOException {
final IndexShard indexShard = newStartedShard(true);

for (int i = 0; i < 3; i++) {
indexDoc(indexShard, "_doc", "" + i, "{\"foo\" : \"" + randomAlphaOfLength(10) + "\"}");
indexShard.refresh("test"); // produce segments
}

// check stats on closed and on opened shard
if (randomBoolean()) {
closeShards(indexShard);

expectThrows(AlreadyClosedException.class, () -> indexShard.seqNoStats());
expectThrows(AlreadyClosedException.class, () -> indexShard.commitStats());
expectThrows(AlreadyClosedException.class, () -> indexShard.storeStats());

} else {
final SeqNoStats seqNoStats = indexShard.seqNoStats();
assertThat(seqNoStats.getLocalCheckpoint(), equalTo(2L));

final CommitStats commitStats = indexShard.commitStats();
assertThat(commitStats.getGeneration(), equalTo(2L));

final StoreStats storeStats = indexShard.storeStats();

assertThat(storeStats.sizeInBytes(), greaterThan(0L));

closeShards(indexShard);
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -1111,17 +1111,21 @@ private void assertSameSyncIdSameDocs() {
IndicesService indexServices = getInstance(IndicesService.class, nodeAndClient.name);
for (IndexService indexService : indexServices) {
for (IndexShard indexShard : indexService) {
CommitStats commitStats = indexShard.commitStats();
if (commitStats != null) { // null if the engine is closed or if the shard is recovering
try {
CommitStats commitStats = indexShard.commitStats();
String syncId = commitStats.getUserData().get(Engine.SYNC_COMMIT_ID);
if (syncId != null) {
long liveDocsOnShard = commitStats.getNumDocs();
if (docsOnShards.get(syncId) != null) {
assertThat("sync id is equal but number of docs does not match on node " + nodeAndClient.name + ". expected " + docsOnShards.get(syncId) + " but got " + liveDocsOnShard, docsOnShards.get(syncId), equalTo(liveDocsOnShard));
assertThat("sync id is equal but number of docs does not match on node "
+ nodeAndClient.name + ". expected " + docsOnShards.get(syncId) + " but got "
+ liveDocsOnShard, docsOnShards.get(syncId), equalTo(liveDocsOnShard));
} else {
docsOnShards.put(syncId, liveDocsOnShard);
}
}
} catch (AlreadyClosedException e) {
// the engine is closed or if the shard is recovering
}
}
}
Expand Down

0 comments on commit f04c579

Please sign in to comment.