Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.x] Optimized ClusterStatsIndices to precomute shard stats (#14426) #14910

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Enabling term version check on local state for all ClusterManager Read Transport Actions ([#14273](https://github.com/opensearch-project/OpenSearch/pull/14273))
- Add rest, transport layer changes for hot to warm tiering - dedicated setup (([#13980](https://github.com/opensearch-project/OpenSearch/pull/13980))
- Create listener to refresh search thread resource usage ([#14832](https://github.com/opensearch-project/OpenSearch/pull/14832))
- Optimize Cluster Stats Indices to precomute node level stats ([#14426](https://github.com/opensearch-project/OpenSearch/pull/14426))

### Dependencies
- Update to Apache Lucene 9.11.1 ([#14042](https://github.com/opensearch-project/OpenSearch/pull/14042), [#14576](https://github.com/opensearch-project/OpenSearch/pull/14576))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,11 @@ public void testNodeCounts() {
Map<String, Integer> expectedCounts = getExpectedCounts(1, 1, 1, 1, 1, 0, 0);
int numNodes = randomIntBetween(1, 5);

ClusterStatsResponse response = client().admin().cluster().prepareClusterStats().get();
ClusterStatsResponse response = client().admin()
.cluster()
.prepareClusterStats()
.useAggregatedNodeLevelResponses(randomBoolean())
.get();
assertCounts(response.getNodesStats().getCounts(), total, expectedCounts);

for (int i = 0; i < numNodes; i++) {
Expand Down Expand Up @@ -153,7 +157,11 @@ public void testNodeCountsWithDeprecatedMasterRole() throws ExecutionException,
Map<String, Integer> expectedCounts = getExpectedCounts(0, 1, 1, 0, 0, 0, 0);

Client client = client();
ClusterStatsResponse response = client.admin().cluster().prepareClusterStats().get();
ClusterStatsResponse response = client.admin()
.cluster()
.prepareClusterStats()
.useAggregatedNodeLevelResponses(randomBoolean())
.get();
assertCounts(response.getNodesStats().getCounts(), total, expectedCounts);

Set<String> expectedRoles = Set.of(DiscoveryNodeRole.MASTER_ROLE.roleName());
Expand All @@ -176,15 +184,60 @@ private void assertShardStats(ClusterStatsIndices.ShardStats stats, int indices,
assertThat(stats.getReplication(), Matchers.equalTo(replicationFactor));
}

public void testIndicesShardStats() throws ExecutionException, InterruptedException {
public void testIndicesShardStatsWithoutNodeLevelAggregations() {
internalCluster().startNode();
ensureGreen();
ClusterStatsResponse response = client().admin().cluster().prepareClusterStats().useAggregatedNodeLevelResponses(false).get();
assertThat(response.getStatus(), Matchers.equalTo(ClusterHealthStatus.GREEN));

prepareCreate("test1").setSettings(Settings.builder().put("number_of_shards", 2).put("number_of_replicas", 1)).get();

response = client().admin().cluster().prepareClusterStats().useAggregatedNodeLevelResponses(false).get();
assertThat(response.getStatus(), Matchers.equalTo(ClusterHealthStatus.YELLOW));
assertThat(response.indicesStats.getDocs().getCount(), Matchers.equalTo(0L));
assertThat(response.indicesStats.getIndexCount(), Matchers.equalTo(1));
assertShardStats(response.getIndicesStats().getShards(), 1, 2, 2, 0.0);

// add another node, replicas should get assigned
internalCluster().startNode();
ensureGreen();
index("test1", "type", "1", "f", "f");
refresh(); // make the doc visible
response = client().admin().cluster().prepareClusterStats().useAggregatedNodeLevelResponses(false).get();
assertThat(response.getStatus(), Matchers.equalTo(ClusterHealthStatus.GREEN));
assertThat(response.indicesStats.getDocs().getCount(), Matchers.equalTo(1L));
assertShardStats(response.getIndicesStats().getShards(), 1, 4, 2, 1.0);

prepareCreate("test2").setSettings(Settings.builder().put("number_of_shards", 3).put("number_of_replicas", 0)).get();
ensureGreen();
response = client().admin().cluster().prepareClusterStats().useAggregatedNodeLevelResponses(false).get();
assertThat(response.getStatus(), Matchers.equalTo(ClusterHealthStatus.GREEN));
assertThat(response.indicesStats.getIndexCount(), Matchers.equalTo(2));
assertShardStats(response.getIndicesStats().getShards(), 2, 7, 5, 2.0 / 5);

assertThat(response.getIndicesStats().getShards().getAvgIndexPrimaryShards(), Matchers.equalTo(2.5));
assertThat(response.getIndicesStats().getShards().getMinIndexPrimaryShards(), Matchers.equalTo(2));
assertThat(response.getIndicesStats().getShards().getMaxIndexPrimaryShards(), Matchers.equalTo(3));

assertThat(response.getIndicesStats().getShards().getAvgIndexShards(), Matchers.equalTo(3.5));
assertThat(response.getIndicesStats().getShards().getMinIndexShards(), Matchers.equalTo(3));
assertThat(response.getIndicesStats().getShards().getMaxIndexShards(), Matchers.equalTo(4));

assertThat(response.getIndicesStats().getShards().getAvgIndexReplication(), Matchers.equalTo(0.5));
assertThat(response.getIndicesStats().getShards().getMinIndexReplication(), Matchers.equalTo(0.0));
assertThat(response.getIndicesStats().getShards().getMaxIndexReplication(), Matchers.equalTo(1.0));

}

public void testIndicesShardStatsWithNodeLevelAggregations() {
internalCluster().startNode();
ensureGreen();
ClusterStatsResponse response = client().admin().cluster().prepareClusterStats().get();
ClusterStatsResponse response = client().admin().cluster().prepareClusterStats().useAggregatedNodeLevelResponses(true).get();
assertThat(response.getStatus(), Matchers.equalTo(ClusterHealthStatus.GREEN));

prepareCreate("test1").setSettings(Settings.builder().put("number_of_shards", 2).put("number_of_replicas", 1)).get();

response = client().admin().cluster().prepareClusterStats().get();
response = client().admin().cluster().prepareClusterStats().useAggregatedNodeLevelResponses(true).get();
assertThat(response.getStatus(), Matchers.equalTo(ClusterHealthStatus.YELLOW));
assertThat(response.indicesStats.getDocs().getCount(), Matchers.equalTo(0L));
assertThat(response.indicesStats.getIndexCount(), Matchers.equalTo(1));
Expand All @@ -195,14 +248,14 @@ public void testIndicesShardStats() throws ExecutionException, InterruptedExcept
ensureGreen();
index("test1", "type", "1", "f", "f");
refresh(); // make the doc visible
response = client().admin().cluster().prepareClusterStats().get();
response = client().admin().cluster().prepareClusterStats().useAggregatedNodeLevelResponses(true).get();
assertThat(response.getStatus(), Matchers.equalTo(ClusterHealthStatus.GREEN));
assertThat(response.indicesStats.getDocs().getCount(), Matchers.equalTo(1L));
assertShardStats(response.getIndicesStats().getShards(), 1, 4, 2, 1.0);

prepareCreate("test2").setSettings(Settings.builder().put("number_of_shards", 3).put("number_of_replicas", 0)).get();
ensureGreen();
response = client().admin().cluster().prepareClusterStats().get();
response = client().admin().cluster().prepareClusterStats().useAggregatedNodeLevelResponses(true).get();
assertThat(response.getStatus(), Matchers.equalTo(ClusterHealthStatus.GREEN));
assertThat(response.indicesStats.getIndexCount(), Matchers.equalTo(2));
assertShardStats(response.getIndicesStats().getShards(), 2, 7, 5, 2.0 / 5);
Expand All @@ -225,7 +278,11 @@ public void testValuesSmokeScreen() throws IOException, ExecutionException, Inte
internalCluster().startNodes(randomIntBetween(1, 3));
index("test1", "type", "1", "f", "f");

ClusterStatsResponse response = client().admin().cluster().prepareClusterStats().get();
ClusterStatsResponse response = client().admin()
.cluster()
.prepareClusterStats()
.useAggregatedNodeLevelResponses(randomBoolean())
.get();
String msg = response.toString();
assertThat(msg, response.getTimestamp(), Matchers.greaterThan(946681200000L)); // 1 Jan 2000
assertThat(msg, response.indicesStats.getStore().getSizeInBytes(), Matchers.greaterThan(0L));
Expand Down Expand Up @@ -265,13 +322,21 @@ public void testAllocatedProcessors() throws Exception {
internalCluster().startNode(Settings.builder().put(OpenSearchExecutors.NODE_PROCESSORS_SETTING.getKey(), 7).build());
waitForNodes(1);

ClusterStatsResponse response = client().admin().cluster().prepareClusterStats().get();
ClusterStatsResponse response = client().admin()
.cluster()
.prepareClusterStats()
.useAggregatedNodeLevelResponses(randomBoolean())
.get();
assertThat(response.getNodesStats().getOs().getAllocatedProcessors(), equalTo(7));
}

public void testClusterStatusWhenStateNotRecovered() throws Exception {
internalCluster().startClusterManagerOnlyNode(Settings.builder().put("gateway.recover_after_nodes", 2).build());
ClusterStatsResponse response = client().admin().cluster().prepareClusterStats().get();
ClusterStatsResponse response = client().admin()
.cluster()
.prepareClusterStats()
.useAggregatedNodeLevelResponses(randomBoolean())
.get();
assertThat(response.getStatus(), equalTo(ClusterHealthStatus.RED));

if (randomBoolean()) {
Expand All @@ -281,14 +346,18 @@ public void testClusterStatusWhenStateNotRecovered() throws Exception {
}
// wait for the cluster status to settle
ensureGreen();
response = client().admin().cluster().prepareClusterStats().get();
response = client().admin().cluster().prepareClusterStats().useAggregatedNodeLevelResponses(randomBoolean()).get();
assertThat(response.getStatus(), equalTo(ClusterHealthStatus.GREEN));
}

public void testFieldTypes() {
internalCluster().startNode();
ensureGreen();
ClusterStatsResponse response = client().admin().cluster().prepareClusterStats().get();
ClusterStatsResponse response = client().admin()
.cluster()
.prepareClusterStats()
.useAggregatedNodeLevelResponses(randomBoolean())
.get();
assertThat(response.getStatus(), Matchers.equalTo(ClusterHealthStatus.GREEN));
assertTrue(response.getIndicesStats().getMappings().getFieldTypeStats().isEmpty());

Expand All @@ -301,7 +370,7 @@ public void testFieldTypes() {
+ "\"eggplant\":{\"type\":\"integer\"}}}}}"
)
.get();
response = client().admin().cluster().prepareClusterStats().get();
response = client().admin().cluster().prepareClusterStats().useAggregatedNodeLevelResponses(randomBoolean()).get();
assertThat(response.getIndicesStats().getMappings().getFieldTypeStats().size(), equalTo(3));
Set<IndexFeatureStats> stats = response.getIndicesStats().getMappings().getFieldTypeStats();
for (IndexFeatureStats stat : stats) {
Expand Down Expand Up @@ -329,7 +398,11 @@ public void testNodeRolesWithMasterLegacySettings() throws ExecutionException, I
Map<String, Integer> expectedCounts = getExpectedCounts(0, 1, 1, 0, 1, 0, 0);

Client client = client();
ClusterStatsResponse clusterStatsResponse = client.admin().cluster().prepareClusterStats().get();
ClusterStatsResponse clusterStatsResponse = client.admin()
.cluster()
.prepareClusterStats()
.useAggregatedNodeLevelResponses(randomBoolean())
.get();
assertCounts(clusterStatsResponse.getNodesStats().getCounts(), total, expectedCounts);

Set<String> expectedRoles = Set.of(
Expand Down Expand Up @@ -359,7 +432,11 @@ public void testNodeRolesWithClusterManagerRole() throws ExecutionException, Int
Map<String, Integer> expectedCounts = getExpectedCounts(0, 1, 1, 0, 1, 0, 0);

Client client = client();
ClusterStatsResponse clusterStatsResponse = client.admin().cluster().prepareClusterStats().get();
ClusterStatsResponse clusterStatsResponse = client.admin()
.cluster()
.prepareClusterStats()
.useAggregatedNodeLevelResponses(randomBoolean())
.get();
assertCounts(clusterStatsResponse.getNodesStats().getCounts(), total, expectedCounts);

Set<String> expectedRoles = Set.of(
Expand All @@ -383,7 +460,11 @@ public void testNodeRolesWithSeedDataNodeLegacySettings() throws ExecutionExcept
Map<String, Integer> expectedRoleCounts = getExpectedCounts(1, 1, 1, 0, 1, 0, 0);

Client client = client();
ClusterStatsResponse clusterStatsResponse = client.admin().cluster().prepareClusterStats().get();
ClusterStatsResponse clusterStatsResponse = client.admin()
.cluster()
.prepareClusterStats()
.useAggregatedNodeLevelResponses(randomBoolean())
.get();
assertCounts(clusterStatsResponse.getNodesStats().getCounts(), total, expectedRoleCounts);

Set<String> expectedRoles = Set.of(
Expand All @@ -410,7 +491,11 @@ public void testNodeRolesWithDataNodeLegacySettings() throws ExecutionException,
Map<String, Integer> expectedRoleCounts = getExpectedCounts(1, 1, 1, 0, 1, 0, 0);

Client client = client();
ClusterStatsResponse clusterStatsResponse = client.admin().cluster().prepareClusterStats().get();
ClusterStatsResponse clusterStatsResponse = client.admin()
.cluster()
.prepareClusterStats()
.useAggregatedNodeLevelResponses(randomBoolean())
.get();
assertCounts(clusterStatsResponse.getNodesStats().getCounts(), total, expectedRoleCounts);

Set<Set<String>> expectedNodesRoles = Set.of(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,26 +78,49 @@ public ClusterStatsIndices(List<ClusterStatsNodeResponse> nodeResponses, Mapping
this.segments = new SegmentsStats();

for (ClusterStatsNodeResponse r : nodeResponses) {
for (org.opensearch.action.admin.indices.stats.ShardStats shardStats : r.shardsStats()) {
ShardStats indexShardStats = countsPerIndex.get(shardStats.getShardRouting().getIndexName());
if (indexShardStats == null) {
indexShardStats = new ShardStats();
countsPerIndex.put(shardStats.getShardRouting().getIndexName(), indexShardStats);
// Aggregated response from the node
if (r.getAggregatedNodeLevelStats() != null) {

for (Map.Entry<String, ClusterStatsNodeResponse.AggregatedIndexStats> entry : r.getAggregatedNodeLevelStats().indexStatsMap
.entrySet()) {
ShardStats indexShardStats = countsPerIndex.get(entry.getKey());
if (indexShardStats == null) {
indexShardStats = new ShardStats(entry.getValue());
countsPerIndex.put(entry.getKey(), indexShardStats);
} else {
indexShardStats.addStatsFrom(entry.getValue());
}
}

indexShardStats.total++;

CommonStats shardCommonStats = shardStats.getStats();

if (shardStats.getShardRouting().primary()) {
indexShardStats.primaries++;
docs.add(shardCommonStats.docs);
docs.add(r.getAggregatedNodeLevelStats().commonStats.docs);
store.add(r.getAggregatedNodeLevelStats().commonStats.store);
fieldData.add(r.getAggregatedNodeLevelStats().commonStats.fieldData);
queryCache.add(r.getAggregatedNodeLevelStats().commonStats.queryCache);
completion.add(r.getAggregatedNodeLevelStats().commonStats.completion);
segments.add(r.getAggregatedNodeLevelStats().commonStats.segments);
} else {
// Default response from the node
for (org.opensearch.action.admin.indices.stats.ShardStats shardStats : r.shardsStats()) {
ShardStats indexShardStats = countsPerIndex.get(shardStats.getShardRouting().getIndexName());
if (indexShardStats == null) {
indexShardStats = new ShardStats();
countsPerIndex.put(shardStats.getShardRouting().getIndexName(), indexShardStats);
}

indexShardStats.total++;

CommonStats shardCommonStats = shardStats.getStats();

if (shardStats.getShardRouting().primary()) {
indexShardStats.primaries++;
docs.add(shardCommonStats.docs);
}
store.add(shardCommonStats.store);
fieldData.add(shardCommonStats.fieldData);
queryCache.add(shardCommonStats.queryCache);
completion.add(shardCommonStats.completion);
segments.add(shardCommonStats.segments);
}
store.add(shardCommonStats.store);
fieldData.add(shardCommonStats.fieldData);
queryCache.add(shardCommonStats.queryCache);
completion.add(shardCommonStats.completion);
segments.add(shardCommonStats.segments);
}
}

Expand Down Expand Up @@ -202,6 +225,11 @@ public static class ShardStats implements ToXContentFragment {

public ShardStats() {}

public ShardStats(ClusterStatsNodeResponse.AggregatedIndexStats aggregatedIndexStats) {
this.total = aggregatedIndexStats.total;
this.primaries = aggregatedIndexStats.primaries;
}

/**
* number of indices in the cluster
*/
Expand Down Expand Up @@ -329,6 +357,11 @@ public void addIndexShardCount(ShardStats indexShardCount) {
}
}

public void addStatsFrom(ClusterStatsNodeResponse.AggregatedIndexStats incomingStats) {
this.total += incomingStats.total;
this.primaries += incomingStats.primaries;
}

/**
* Inner Fields used for creating XContent and parsing
*
Expand Down
Loading
Loading