Skip to content

Commit

Permalink
Optimise memory for rest cluster health calls with inline shard aggre…
Browse files Browse the repository at this point in the history
…gations for levels cluster and indices.

Signed-off-by: Swetha Guptha <[email protected]>
  • Loading branch information
Swetha Guptha committed Sep 2, 2024
1 parent ed65482 commit ebadf42
Show file tree
Hide file tree
Showing 11 changed files with 433 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
import org.opensearch.action.support.IndicesOptions;
import org.opensearch.action.support.PlainActionFuture;
import org.opensearch.cluster.health.ClusterHealthStatus;
import org.opensearch.cluster.health.ClusterIndexHealth;
import org.opensearch.cluster.health.ClusterShardHealth;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.routing.UnassignedInfo;
import org.opensearch.cluster.service.ClusterService;
Expand All @@ -49,6 +51,7 @@

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;

import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
Expand Down Expand Up @@ -439,4 +442,130 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
completionFuture.actionGet(TimeValue.timeValueSeconds(30));
}
}

public void testHealthAtLevelParamEqualsClusterIsHonoured() {
createIndex(
"test1",
Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0).build()
);
ensureGreen();
ClusterHealthResponse healthResponse = client().admin()
.cluster()
.prepareHealth()
.setHonourClusterHealthLevel(true)
.execute()
.actionGet();
assertEquals(healthResponse.getStatus(), ClusterHealthStatus.GREEN);
assertTrue(healthResponse.getIndices().isEmpty());
assertEquals(1, healthResponse.getActiveShards());
assertEquals(1, healthResponse.getActivePrimaryShards());
assertEquals(0, healthResponse.getUnassignedShards());
assertEquals(0, healthResponse.getInitializingShards());
assertEquals(0, healthResponse.getRelocatingShards());
assertEquals(0, healthResponse.getDelayedUnassignedShards());
}

public void testHealthAtLevelParamEqualIndicesIsHonoured() {
createIndex(
"test1",
Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0).build()
);
ensureGreen();
ClusterHealthResponse healthResponse = client().admin()
.cluster()
.prepareHealth()
.setLevel("indices")
.setHonourClusterHealthLevel(true)
.execute()
.actionGet();
assertEquals(healthResponse.getStatus(), ClusterHealthStatus.GREEN);

assertEquals(1, healthResponse.getActiveShards());
assertEquals(1, healthResponse.getActivePrimaryShards());
assertEquals(0, healthResponse.getUnassignedShards());
assertEquals(0, healthResponse.getInitializingShards());
assertEquals(0, healthResponse.getRelocatingShards());
assertEquals(0, healthResponse.getDelayedUnassignedShards());

Map<String, ClusterIndexHealth> indices = healthResponse.getIndices();
assertFalse(indices.isEmpty());
assertEquals(1, indices.size());
for (Map.Entry<String, ClusterIndexHealth> indicesHealth : indices.entrySet()) {
String indexName = indicesHealth.getKey();
assertEquals("test1", indexName);
ClusterIndexHealth indicesHealthValue = indicesHealth.getValue();
assertEquals(1, indicesHealthValue.getActiveShards());
assertEquals(1, indicesHealthValue.getActivePrimaryShards());
assertEquals(0, indicesHealthValue.getInitializingShards());
assertEquals(0, indicesHealthValue.getUnassignedShards());
assertEquals(0, indicesHealthValue.getDelayedUnassignedShards());
assertEquals(0, indicesHealthValue.getRelocatingShards());
assertEquals(ClusterHealthStatus.GREEN, indicesHealthValue.getStatus());
assertTrue(indicesHealthValue.getShards().isEmpty());
}
}

public void testHealthAtLevelParamEqualShardsIsHonoured() {
createIndex(
"test1",
Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1).build()
);
ensureGreen(TimeValue.timeValueSeconds(120), "test1");
createIndex(
"test2",
Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1).build()
);
ensureGreen(TimeValue.timeValueSeconds(120));
client().admin()
.indices()
.prepareUpdateSettings()
.setIndices("test2")
.setSettings(Settings.builder().put("index.number_of_replicas", 2).build())
.execute()
.actionGet();
ClusterHealthResponse healthResponse = client().admin()
.cluster()
.prepareHealth()
.setLevel("shards")
.setHonourClusterHealthLevel(true)
.execute()
.actionGet();
assertEquals(healthResponse.getStatus(), ClusterHealthStatus.YELLOW);

assertEquals(4, healthResponse.getActiveShards());
assertEquals(2, healthResponse.getActivePrimaryShards());
assertEquals(1, healthResponse.getUnassignedShards());
assertEquals(0, healthResponse.getInitializingShards());
assertEquals(0, healthResponse.getRelocatingShards());
assertEquals(0, healthResponse.getDelayedUnassignedShards());

Map<String, ClusterIndexHealth> indices = healthResponse.getIndices();
assertFalse(indices.isEmpty());
assertEquals(2, indices.size());
for (Map.Entry<String, ClusterIndexHealth> indicesHealth : indices.entrySet()) {
String indexName = indicesHealth.getKey();
boolean indexHasMoreReplicas = indexName.equals("test2");
ClusterIndexHealth indicesHealthValue = indicesHealth.getValue();
assertEquals(2, indicesHealthValue.getActiveShards());
assertEquals(1, indicesHealthValue.getActivePrimaryShards());
assertEquals(0, indicesHealthValue.getInitializingShards());
assertEquals(indexHasMoreReplicas ? 1 : 0, indicesHealthValue.getUnassignedShards());
assertEquals(0, indicesHealthValue.getDelayedUnassignedShards());
assertEquals(0, indicesHealthValue.getRelocatingShards());
assertEquals(indexHasMoreReplicas ? ClusterHealthStatus.YELLOW : ClusterHealthStatus.GREEN, indicesHealthValue.getStatus());
Map<Integer, ClusterShardHealth> shards = indicesHealthValue.getShards();
assertFalse(shards.isEmpty());
assertEquals(1, shards.size());
for (Map.Entry<Integer, ClusterShardHealth> shardHealth : shards.entrySet()) {
ClusterShardHealth clusterShardHealth = shardHealth.getValue();
assertEquals(2, clusterShardHealth.getActiveShards());
assertEquals(indexHasMoreReplicas ? 1 : 0, clusterShardHealth.getUnassignedShards());
assertEquals(0, clusterShardHealth.getDelayedUnassignedShards());
assertEquals(0, clusterShardHealth.getRelocatingShards());
assertEquals(0, clusterShardHealth.getInitializingShards());
assertTrue(clusterShardHealth.isPrimaryActive());
assertEquals(indexHasMoreReplicas ? ClusterHealthStatus.YELLOW : ClusterHealthStatus.GREEN, clusterShardHealth.getStatus());
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,17 @@ public class ClusterHealthRequest extends ClusterManagerNodeReadRequest<ClusterH
*/
private Level level = Level.CLUSTER;

/**
* This flag will be used by the TransportClusterHealthAction whether to return indices/shards info in the ClusterHealthResponse or not.
* When the flag is disabled - indices/shard info will be returned in ClusterHealthResponse regardless of the health level received in the request.
* When the flag is enabled - indices/shards info will be set according to health level received in the request.
* For Level.CLUSTER, information on indices/shards will NOT be returned to the transport client
* For Level.INDICES, information on indices will be returned to the transport client.
* For Level.SHARDS, information on indices and shards will be returned to the transport client
* By default, the flag is disabled.
*/
private boolean honourClusterHealthLevel = false;

public ClusterHealthRequest() {}

public ClusterHealthRequest(String... indices) {
Expand Down Expand Up @@ -104,6 +115,9 @@ public ClusterHealthRequest(StreamInput in) throws IOException {
if (in.getVersion().onOrAfter(Version.V_2_6_0)) {
ensureNodeWeighedIn = in.readBoolean();
}
if (in.getVersion().onOrAfter(Version.V_3_0_0)) {
honourClusterHealthLevel = in.readBoolean();
}
}

@Override
Expand Down Expand Up @@ -139,6 +153,9 @@ public void writeTo(StreamOutput out) throws IOException {
if (out.getVersion().onOrAfter(Version.V_2_6_0)) {
out.writeBoolean(ensureNodeWeighedIn);
}
if (out.getVersion().onOrAfter(Version.V_3_0_0)) {
out.writeBoolean(honourClusterHealthLevel);
}
}

@Override
Expand Down Expand Up @@ -337,6 +354,14 @@ public final boolean ensureNodeWeighedIn() {
return ensureNodeWeighedIn;
}

public boolean isHonourClusterHealthLevel() {
return honourClusterHealthLevel;
}

public void setHonourClusterHealthLevel(boolean honourClusterHealthLevel) {
this.honourClusterHealthLevel = honourClusterHealthLevel;
}

@Override
public ActionRequestValidationException validate() {
if (level.equals(Level.AWARENESS_ATTRIBUTES) && indices.length > 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,4 +171,9 @@ public final ClusterHealthRequestBuilder setEnsureNodeWeighedIn(boolean ensureNo
request.ensureNodeWeighedIn(ensureNodeCommissioned);
return this;
}

public ClusterHealthRequestBuilder setHonourClusterHealthLevel(boolean honourClusterHealthLevel) {
request.setHonourClusterHealthLevel(honourClusterHealthLevel);
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,28 @@ public ClusterHealthResponse(
this.clusterHealthStatus = clusterStateHealth.getStatus();
}

public ClusterHealthResponse(
String clusterName,
String[] concreteIndices,
ClusterHealthRequest clusterHealthRequest,
ClusterState clusterState,
int numberOfPendingTasks,
int numberOfInFlightFetch,
TimeValue taskMaxWaitingTime
) {
this.clusterName = clusterName;
this.numberOfPendingTasks = numberOfPendingTasks;
this.numberOfInFlightFetch = numberOfInFlightFetch;
this.taskMaxWaitingTime = taskMaxWaitingTime;
if (clusterHealthRequest.isHonourClusterHealthLevel()) {
this.clusterStateHealth = new ClusterStateHealth(clusterState, concreteIndices, clusterHealthRequest.level());
} else {
this.clusterStateHealth = new ClusterStateHealth(clusterState, concreteIndices);
}
this.clusterHealthStatus = clusterStateHealth.getStatus();
this.delayedUnassignedShards = clusterStateHealth.getDelayedUnassignedShards();
}

// Awareness Attribute health
public ClusterHealthResponse(
String clusterName,
Expand All @@ -261,6 +283,29 @@ public ClusterHealthResponse(
this.clusterAwarenessHealth = new ClusterAwarenessHealth(clusterState, clusterSettings, awarenessAttributeName);
}

public ClusterHealthResponse(
String clusterName,
ClusterHealthRequest clusterHealthRequest,
ClusterState clusterState,
ClusterSettings clusterSettings,
String[] concreteIndices,
String awarenessAttributeName,
int numberOfPendingTasks,
int numberOfInFlightFetch,
TimeValue taskMaxWaitingTime
) {
this(
clusterName,
concreteIndices,
clusterHealthRequest,
clusterState,
numberOfPendingTasks,
numberOfInFlightFetch,
taskMaxWaitingTime
);
this.clusterAwarenessHealth = new ClusterAwarenessHealth(clusterState, clusterSettings, awarenessAttributeName);
}

/**
* For XContent Parser and serialization tests
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@
import org.opensearch.cluster.metadata.ProcessClusterEventTimeoutException;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.routing.NodeWeighedAwayException;
import org.opensearch.cluster.routing.UnassignedInfo;
import org.opensearch.cluster.routing.WeightedRoutingUtils;
import org.opensearch.cluster.routing.allocation.AllocationService;
import org.opensearch.cluster.service.ClusterService;
Expand Down Expand Up @@ -496,13 +495,13 @@ private ClusterHealthResponse clusterHealth(
concreteIndices = clusterState.getMetadata().getConcreteAllIndices();
return new ClusterHealthResponse(
clusterState.getClusterName().value(),
request,
clusterState,
clusterService.getClusterSettings(),
concreteIndices,
awarenessAttribute,
numberOfPendingTasks,
numberOfInFlightFetch,
UnassignedInfo.getNumberOfDelayedUnassigned(clusterState),
pendingTaskTimeInQueue
);
}
Expand All @@ -514,10 +513,10 @@ private ClusterHealthResponse clusterHealth(
ClusterHealthResponse response = new ClusterHealthResponse(
clusterState.getClusterName().value(),
Strings.EMPTY_ARRAY,
request,
clusterState,
numberOfPendingTasks,
numberOfInFlightFetch,
UnassignedInfo.getNumberOfDelayedUnassigned(clusterState),
pendingTaskTimeInQueue
);
response.setStatus(ClusterHealthStatus.RED);
Expand All @@ -527,10 +526,10 @@ private ClusterHealthResponse clusterHealth(
return new ClusterHealthResponse(
clusterState.getClusterName().value(),
concreteIndices,
request,
clusterState,
numberOfPendingTasks,
numberOfInFlightFetch,
UnassignedInfo.getNumberOfDelayedUnassigned(clusterState),
pendingTaskTimeInQueue
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@

import org.apache.lucene.store.AlreadyClosedException;
import org.opensearch.action.FailedNodeException;
import org.opensearch.action.admin.cluster.health.ClusterHealthRequest;
import org.opensearch.action.admin.cluster.node.info.NodeInfo;
import org.opensearch.action.admin.cluster.node.stats.NodeStats;
import org.opensearch.action.admin.indices.stats.CommonStats;
Expand Down Expand Up @@ -209,7 +210,7 @@ protected ClusterStatsNodeResponse nodeOperation(ClusterStatsNodeRequest nodeReq

ClusterHealthStatus clusterStatus = null;
if (clusterService.state().nodes().isLocalNodeElectedClusterManager()) {
clusterStatus = new ClusterStateHealth(clusterService.state()).getStatus();
clusterStatus = new ClusterStateHealth(clusterService.state(), ClusterHealthRequest.Level.CLUSTER).getStatus();
}

return new ClusterStatsNodeResponse(
Expand Down
Loading

0 comments on commit ebadf42

Please sign in to comment.