Skip to content

Commit

Permalink
Optimize Cluster Stats Indices to precomute node level stats
Browse files Browse the repository at this point in the history
Signed-off-by: Pranshu Shukla <[email protected]>
  • Loading branch information
Pranshu-S committed Jul 12, 2024
1 parent bda8393 commit e4329b8
Show file tree
Hide file tree
Showing 9 changed files with 536 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,11 @@ public void testNodeCounts() {
Map<String, Integer> expectedCounts = getExpectedCounts(1, 1, 1, 1, 1, 0, 0);
int numNodes = randomIntBetween(1, 5);

ClusterStatsResponse response = client().admin().cluster().prepareClusterStats().get();
ClusterStatsResponse response = client().admin()
.cluster()
.prepareClusterStats()
.useOptimizedClusterStatsResponse(randomBoolean())
.get();
assertCounts(response.getNodesStats().getCounts(), total, expectedCounts);

for (int i = 0; i < numNodes; i++) {
Expand Down Expand Up @@ -153,7 +157,11 @@ public void testNodeCountsWithDeprecatedMasterRole() throws ExecutionException,
Map<String, Integer> expectedCounts = getExpectedCounts(0, 1, 1, 0, 0, 0, 0);

Client client = client();
ClusterStatsResponse response = client.admin().cluster().prepareClusterStats().get();
ClusterStatsResponse response = client.admin()
.cluster()
.prepareClusterStats()
.useOptimizedClusterStatsResponse(randomBoolean())
.get();
assertCounts(response.getNodesStats().getCounts(), total, expectedCounts);

Set<String> expectedRoles = Set.of(DiscoveryNodeRole.MASTER_ROLE.roleName());
Expand All @@ -179,7 +187,11 @@ private void assertShardStats(ClusterStatsIndices.ShardStats stats, int indices,
public void testIndicesShardStats() throws ExecutionException, InterruptedException {
internalCluster().startNode();
ensureGreen();
ClusterStatsResponse response = client().admin().cluster().prepareClusterStats().get();
ClusterStatsResponse response = client().admin()
.cluster()
.prepareClusterStats()
.useOptimizedClusterStatsResponse(randomBoolean())
.get();
assertThat(response.getStatus(), Matchers.equalTo(ClusterHealthStatus.GREEN));

prepareCreate("test1").setSettings(Settings.builder().put("number_of_shards", 2).put("number_of_replicas", 1)).get();
Expand All @@ -195,14 +207,14 @@ public void testIndicesShardStats() throws ExecutionException, InterruptedExcept
ensureGreen();
index("test1", "type", "1", "f", "f");
refresh(); // make the doc visible
response = client().admin().cluster().prepareClusterStats().get();
response = client().admin().cluster().prepareClusterStats().useOptimizedClusterStatsResponse(randomBoolean()).get();
assertThat(response.getStatus(), Matchers.equalTo(ClusterHealthStatus.GREEN));
assertThat(response.indicesStats.getDocs().getCount(), Matchers.equalTo(1L));
assertShardStats(response.getIndicesStats().getShards(), 1, 4, 2, 1.0);

prepareCreate("test2").setSettings(Settings.builder().put("number_of_shards", 3).put("number_of_replicas", 0)).get();
ensureGreen();
response = client().admin().cluster().prepareClusterStats().get();
response = client().admin().cluster().prepareClusterStats().useOptimizedClusterStatsResponse(randomBoolean()).get();
assertThat(response.getStatus(), Matchers.equalTo(ClusterHealthStatus.GREEN));
assertThat(response.indicesStats.getIndexCount(), Matchers.equalTo(2));
assertShardStats(response.getIndicesStats().getShards(), 2, 7, 5, 2.0 / 5);
Expand All @@ -225,7 +237,11 @@ public void testValuesSmokeScreen() throws IOException, ExecutionException, Inte
internalCluster().startNodes(randomIntBetween(1, 3));
index("test1", "type", "1", "f", "f");

ClusterStatsResponse response = client().admin().cluster().prepareClusterStats().get();
ClusterStatsResponse response = client().admin()
.cluster()
.prepareClusterStats()
.useOptimizedClusterStatsResponse(randomBoolean())
.get();
String msg = response.toString();
assertThat(msg, response.getTimestamp(), Matchers.greaterThan(946681200000L)); // 1 Jan 2000
assertThat(msg, response.indicesStats.getStore().getSizeInBytes(), Matchers.greaterThan(0L));
Expand Down Expand Up @@ -265,13 +281,21 @@ public void testAllocatedProcessors() throws Exception {
internalCluster().startNode(Settings.builder().put(OpenSearchExecutors.NODE_PROCESSORS_SETTING.getKey(), 7).build());
waitForNodes(1);

ClusterStatsResponse response = client().admin().cluster().prepareClusterStats().get();
ClusterStatsResponse response = client().admin()
.cluster()
.prepareClusterStats()
.useOptimizedClusterStatsResponse(randomBoolean())
.get();
assertThat(response.getNodesStats().getOs().getAllocatedProcessors(), equalTo(7));
}

public void testClusterStatusWhenStateNotRecovered() throws Exception {
internalCluster().startClusterManagerOnlyNode(Settings.builder().put("gateway.recover_after_nodes", 2).build());
ClusterStatsResponse response = client().admin().cluster().prepareClusterStats().get();
ClusterStatsResponse response = client().admin()
.cluster()
.prepareClusterStats()
.useOptimizedClusterStatsResponse(randomBoolean())
.get();
assertThat(response.getStatus(), equalTo(ClusterHealthStatus.RED));

if (randomBoolean()) {
Expand All @@ -281,14 +305,18 @@ public void testClusterStatusWhenStateNotRecovered() throws Exception {
}
// wait for the cluster status to settle
ensureGreen();
response = client().admin().cluster().prepareClusterStats().get();
response = client().admin().cluster().prepareClusterStats().useOptimizedClusterStatsResponse(randomBoolean()).get();
assertThat(response.getStatus(), equalTo(ClusterHealthStatus.GREEN));
}

public void testFieldTypes() {
internalCluster().startNode();
ensureGreen();
ClusterStatsResponse response = client().admin().cluster().prepareClusterStats().get();
ClusterStatsResponse response = client().admin()
.cluster()
.prepareClusterStats()
.useOptimizedClusterStatsResponse(randomBoolean())
.get();
assertThat(response.getStatus(), Matchers.equalTo(ClusterHealthStatus.GREEN));
assertTrue(response.getIndicesStats().getMappings().getFieldTypeStats().isEmpty());

Expand All @@ -301,7 +329,7 @@ public void testFieldTypes() {
+ "\"eggplant\":{\"type\":\"integer\"}}}}}"
)
.get();
response = client().admin().cluster().prepareClusterStats().get();
response = client().admin().cluster().prepareClusterStats().useOptimizedClusterStatsResponse(randomBoolean()).get();
assertThat(response.getIndicesStats().getMappings().getFieldTypeStats().size(), equalTo(3));
Set<IndexFeatureStats> stats = response.getIndicesStats().getMappings().getFieldTypeStats();
for (IndexFeatureStats stat : stats) {
Expand Down Expand Up @@ -329,7 +357,11 @@ public void testNodeRolesWithMasterLegacySettings() throws ExecutionException, I
Map<String, Integer> expectedCounts = getExpectedCounts(0, 1, 1, 0, 1, 0, 0);

Client client = client();
ClusterStatsResponse clusterStatsResponse = client.admin().cluster().prepareClusterStats().get();
ClusterStatsResponse clusterStatsResponse = client.admin()
.cluster()
.prepareClusterStats()
.useOptimizedClusterStatsResponse(randomBoolean())
.get();
assertCounts(clusterStatsResponse.getNodesStats().getCounts(), total, expectedCounts);

Set<String> expectedRoles = Set.of(
Expand Down Expand Up @@ -359,7 +391,11 @@ public void testNodeRolesWithClusterManagerRole() throws ExecutionException, Int
Map<String, Integer> expectedCounts = getExpectedCounts(0, 1, 1, 0, 1, 0, 0);

Client client = client();
ClusterStatsResponse clusterStatsResponse = client.admin().cluster().prepareClusterStats().get();
ClusterStatsResponse clusterStatsResponse = client.admin()
.cluster()
.prepareClusterStats()
.useOptimizedClusterStatsResponse(randomBoolean())
.get();
assertCounts(clusterStatsResponse.getNodesStats().getCounts(), total, expectedCounts);

Set<String> expectedRoles = Set.of(
Expand All @@ -383,7 +419,11 @@ public void testNodeRolesWithSeedDataNodeLegacySettings() throws ExecutionExcept
Map<String, Integer> expectedRoleCounts = getExpectedCounts(1, 1, 1, 0, 1, 0, 0);

Client client = client();
ClusterStatsResponse clusterStatsResponse = client.admin().cluster().prepareClusterStats().get();
ClusterStatsResponse clusterStatsResponse = client.admin()
.cluster()
.prepareClusterStats()
.useOptimizedClusterStatsResponse(randomBoolean())
.get();
assertCounts(clusterStatsResponse.getNodesStats().getCounts(), total, expectedRoleCounts);

Set<String> expectedRoles = Set.of(
Expand All @@ -410,7 +450,11 @@ public void testNodeRolesWithDataNodeLegacySettings() throws ExecutionException,
Map<String, Integer> expectedRoleCounts = getExpectedCounts(1, 1, 1, 0, 1, 0, 0);

Client client = client();
ClusterStatsResponse clusterStatsResponse = client.admin().cluster().prepareClusterStats().get();
ClusterStatsResponse clusterStatsResponse = client.admin()
.cluster()
.prepareClusterStats()
.useOptimizedClusterStatsResponse(randomBoolean())
.get();
assertCounts(clusterStatsResponse.getNodesStats().getCounts(), total, expectedRoleCounts);

Set<Set<String>> expectedNodesRoles = Set.of(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@

import org.opensearch.action.admin.indices.stats.CommonStats;
import org.opensearch.common.annotation.PublicApi;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.common.io.stream.Writeable;
import org.opensearch.core.xcontent.ToXContentFragment;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.index.cache.query.QueryCacheStats;
Expand Down Expand Up @@ -78,26 +81,44 @@ public ClusterStatsIndices(List<ClusterStatsNodeResponse> nodeResponses, Mapping
this.segments = new SegmentsStats();

for (ClusterStatsNodeResponse r : nodeResponses) {
for (org.opensearch.action.admin.indices.stats.ShardStats shardStats : r.shardsStats()) {
ShardStats indexShardStats = countsPerIndex.get(shardStats.getShardRouting().getIndexName());
if (indexShardStats == null) {
indexShardStats = new ShardStats();
countsPerIndex.put(shardStats.getShardRouting().getIndexName(), indexShardStats);
}

indexShardStats.total++;

CommonStats shardCommonStats = shardStats.getStats();

if (shardStats.getShardRouting().primary()) {
indexShardStats.primaries++;
docs.add(shardCommonStats.docs);
// Optimized response from the node
if (r.getNodeIndexShardStats() != null) {
r.getNodeIndexShardStats().indexStatsMap.forEach(
(index, indexCountStats) -> countsPerIndex.merge(index, indexCountStats, (v1, v2) -> {
v1.addStatsFrom(v2);
return v1;
})
);

docs.add(r.getNodeIndexShardStats().docs);
store.add(r.getNodeIndexShardStats().store);
fieldData.add(r.getNodeIndexShardStats().fieldData);
queryCache.add(r.getNodeIndexShardStats().queryCache);
completion.add(r.getNodeIndexShardStats().completion);
segments.add(r.getNodeIndexShardStats().segments);
} else {
// Default response from the node
for (org.opensearch.action.admin.indices.stats.ShardStats shardStats : r.shardsStats()) {
ShardStats indexShardStats = countsPerIndex.get(shardStats.getShardRouting().getIndexName());
if (indexShardStats == null) {
indexShardStats = new ShardStats();
countsPerIndex.put(shardStats.getShardRouting().getIndexName(), indexShardStats);
}

indexShardStats.total++;

CommonStats shardCommonStats = shardStats.getStats();

if (shardStats.getShardRouting().primary()) {
indexShardStats.primaries++;
docs.add(shardCommonStats.docs);
}
store.add(shardCommonStats.store);
fieldData.add(shardCommonStats.fieldData);
queryCache.add(shardCommonStats.queryCache);
completion.add(shardCommonStats.completion);
segments.add(shardCommonStats.segments);
}
store.add(shardCommonStats.store);
fieldData.add(shardCommonStats.fieldData);
queryCache.add(shardCommonStats.queryCache);
completion.add(shardCommonStats.completion);
segments.add(shardCommonStats.segments);
}
}

Expand Down Expand Up @@ -185,11 +206,11 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
* @opensearch.api
*/
@PublicApi(since = "1.0.0")
public static class ShardStats implements ToXContentFragment {
public static class ShardStats implements ToXContentFragment, Writeable {

int indices;
int total;
int primaries;
int indices = 0;
int total = 0;
int primaries = 0;

// min/max
int minIndexShards = -1;
Expand All @@ -202,6 +223,12 @@ public static class ShardStats implements ToXContentFragment {

public ShardStats() {}

public ShardStats(StreamInput in) throws IOException {
indices = in.readVInt();
total = in.readVInt();
primaries = in.readVInt();
}

/**
* number of indices in the cluster
*/
Expand Down Expand Up @@ -329,6 +356,19 @@ public void addIndexShardCount(ShardStats indexShardCount) {
}
}

public void addStatsFrom(ShardStats incomingStats) {
this.total += incomingStats.getTotal();
this.indices += incomingStats.getIndices();
this.primaries += incomingStats.getPrimaries();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVInt(indices);
out.writeVInt(total);
out.writeVInt(primaries);
}

/**
* Inner Fields used for creating XContent and parsing
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

package org.opensearch.action.admin.cluster.stats;

import org.opensearch.Version;
import org.opensearch.action.admin.cluster.node.info.NodeInfo;
import org.opensearch.action.admin.cluster.node.stats.NodeStats;
import org.opensearch.action.admin.indices.stats.ShardStats;
Expand All @@ -55,6 +56,7 @@ public class ClusterStatsNodeResponse extends BaseNodeResponse {
private final NodeStats nodeStats;
private final ShardStats[] shardsStats;
private ClusterHealthStatus clusterStatus;
private NodeIndexShardStats nodeIndexShardStats;

public ClusterStatsNodeResponse(StreamInput in) throws IOException {
super(in);
Expand All @@ -64,7 +66,12 @@ public ClusterStatsNodeResponse(StreamInput in) throws IOException {
}
this.nodeInfo = new NodeInfo(in);
this.nodeStats = new NodeStats(in);
shardsStats = in.readArray(ShardStats::new, ShardStats[]::new);
if (in.getVersion().onOrAfter(Version.V_3_0_0)) {
this.shardsStats = in.readOptionalArray(ShardStats::new, ShardStats[]::new);
this.nodeIndexShardStats = in.readOptionalWriteable(NodeIndexShardStats::new);
} else {
this.shardsStats = in.readArray(ShardStats::new, ShardStats[]::new);
}
}

public ClusterStatsNodeResponse(
Expand All @@ -81,6 +88,24 @@ public ClusterStatsNodeResponse(
this.clusterStatus = clusterStatus;
}

public ClusterStatsNodeResponse(
DiscoveryNode node,
@Nullable ClusterHealthStatus clusterStatus,
NodeInfo nodeInfo,
NodeStats nodeStats,
ShardStats[] shardsStats,
boolean optimized
) {
super(node);
this.nodeInfo = nodeInfo;
this.nodeStats = nodeStats;
if (optimized) {
this.nodeIndexShardStats = new NodeIndexShardStats(node, shardsStats);
}
this.shardsStats = shardsStats;
this.clusterStatus = clusterStatus;
}

public NodeInfo nodeInfo() {
return this.nodeInfo;
}
Expand All @@ -101,6 +126,10 @@ public ShardStats[] shardsStats() {
return this.shardsStats;
}

public NodeIndexShardStats getNodeIndexShardStats() {
return nodeIndexShardStats;
}

public static ClusterStatsNodeResponse readNodeResponse(StreamInput in) throws IOException {
return new ClusterStatsNodeResponse(in);
}
Expand All @@ -116,6 +145,16 @@ public void writeTo(StreamOutput out) throws IOException {
}
nodeInfo.writeTo(out);
nodeStats.writeTo(out);
out.writeArray(shardsStats);
if (out.getVersion().onOrAfter(Version.V_3_0_0)) {
if (nodeIndexShardStats != null) {
out.writeOptionalArray(null);
out.writeOptionalWriteable(nodeIndexShardStats);
} else {
out.writeOptionalArray(shardsStats);
out.writeOptionalWriteable(null);
}
} else {
out.writeArray(shardsStats);
}
}
}
Loading

0 comments on commit e4329b8

Please sign in to comment.