Skip to content

Commit

Permalink
Optimise memory for rest cluster health calls with inline shard aggre…
Browse files Browse the repository at this point in the history
…gations for levels cluster and indices.

Signed-off-by: Swetha Guptha <[email protected]>
  • Loading branch information
Swetha Guptha committed Sep 4, 2024
1 parent 3fc0139 commit d5c5869
Show file tree
Hide file tree
Showing 15 changed files with 656 additions and 40 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add prefix support to hashed prefix & infix path types on remote store ([#15557](https://github.com/opensearch-project/OpenSearch/pull/15557))
- Optimise snapshot deletion to speed up snapshot deletion and creation ([#15568](https://github.com/opensearch-project/OpenSearch/pull/15568))
- [Remote Publication] Added checksum validation for cluster state behind a cluster setting ([#15218](https://github.com/opensearch-project/OpenSearch/pull/15218))
- Cluster health API memory optimisation based on health level ([#15492](https://github.com/opensearch-project/OpenSearch/pull/15492))

### Dependencies
- Bump `netty` from 4.1.111.Final to 4.1.112.Final ([#15081](https://github.com/opensearch-project/OpenSearch/pull/15081))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
import org.opensearch.action.support.IndicesOptions;
import org.opensearch.action.support.PlainActionFuture;
import org.opensearch.cluster.health.ClusterHealthStatus;
import org.opensearch.cluster.health.ClusterIndexHealth;
import org.opensearch.cluster.health.ClusterShardHealth;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.routing.UnassignedInfo;
import org.opensearch.cluster.service.ClusterService;
Expand All @@ -49,6 +51,7 @@

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;

import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
Expand Down Expand Up @@ -439,4 +442,154 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
completionFuture.actionGet(TimeValue.timeValueSeconds(30));
}
}

public void testHealthAtLevelParamEqualsClusterIsHonoured() {
createIndex(
"test1",
Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0).build()
);
ensureGreen();
ClusterHealthResponse healthResponse = client().admin()
.cluster()
.prepareHealth()
.setApplyLevelAtTransportLayer(true)
.execute()
.actionGet();
assertEquals(ClusterHealthStatus.GREEN, healthResponse.getStatus());
assertTrue(healthResponse.getIndices().isEmpty());
assertEquals(1, healthResponse.getActiveShards());
assertEquals(1, healthResponse.getActivePrimaryShards());
assertEquals(0, healthResponse.getUnassignedShards());
assertEquals(0, healthResponse.getInitializingShards());
assertEquals(0, healthResponse.getRelocatingShards());
assertEquals(0, healthResponse.getDelayedUnassignedShards());
}

public void testHealthAtLevelParamEqualIndicesIsHonoured() {
createIndex(
"test1",
Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0).build()
);
ensureGreen();
ClusterHealthResponse healthResponse = client().admin()
.cluster()
.prepareHealth()
.setLevel("indices")
.setApplyLevelAtTransportLayer(true)
.execute()
.actionGet();
assertEquals(ClusterHealthStatus.GREEN, healthResponse.getStatus());

assertEquals(1, healthResponse.getActiveShards());
assertEquals(1, healthResponse.getActivePrimaryShards());
assertEquals(0, healthResponse.getUnassignedShards());
assertEquals(0, healthResponse.getInitializingShards());
assertEquals(0, healthResponse.getRelocatingShards());
assertEquals(0, healthResponse.getDelayedUnassignedShards());

Map<String, ClusterIndexHealth> indices = healthResponse.getIndices();
assertFalse(indices.isEmpty());
assertEquals(1, indices.size());
for (Map.Entry<String, ClusterIndexHealth> indicesHealth : indices.entrySet()) {
String indexName = indicesHealth.getKey();
assertEquals("test1", indexName);
ClusterIndexHealth indicesHealthValue = indicesHealth.getValue();
assertEquals(1, indicesHealthValue.getActiveShards());
assertEquals(1, indicesHealthValue.getActivePrimaryShards());
assertEquals(0, indicesHealthValue.getInitializingShards());
assertEquals(0, indicesHealthValue.getUnassignedShards());
assertEquals(0, indicesHealthValue.getDelayedUnassignedShards());
assertEquals(0, indicesHealthValue.getRelocatingShards());
assertEquals(ClusterHealthStatus.GREEN, indicesHealthValue.getStatus());
assertTrue(indicesHealthValue.getShards().isEmpty());
}
}

public void testHealthAtLevelParamEqualShardsIsHonoured() {
createIndex(
"test1",
Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1).build()
);
ensureGreen(TimeValue.timeValueSeconds(120), "test1");
createIndex(
"test2",
Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1).build()
);
ensureGreen(TimeValue.timeValueSeconds(120));
client().admin()
.indices()
.prepareUpdateSettings()
.setIndices("test2")
.setSettings(Settings.builder().put("index.number_of_replicas", 2).build())
.execute()
.actionGet();
ClusterHealthResponse healthResponse = client().admin()
.cluster()
.prepareHealth()
.setLevel("shards")
.setApplyLevelAtTransportLayer(true)
.execute()
.actionGet();
assertEquals(ClusterHealthStatus.YELLOW, healthResponse.getStatus());

assertEquals(4, healthResponse.getActiveShards());
assertEquals(2, healthResponse.getActivePrimaryShards());
assertEquals(1, healthResponse.getUnassignedShards());
assertEquals(0, healthResponse.getInitializingShards());
assertEquals(0, healthResponse.getRelocatingShards());
assertEquals(0, healthResponse.getDelayedUnassignedShards());

Map<String, ClusterIndexHealth> indices = healthResponse.getIndices();
assertFalse(indices.isEmpty());
assertEquals(2, indices.size());
for (Map.Entry<String, ClusterIndexHealth> indicesHealth : indices.entrySet()) {
String indexName = indicesHealth.getKey();
boolean indexHasMoreReplicas = indexName.equals("test2");
ClusterIndexHealth indicesHealthValue = indicesHealth.getValue();
assertEquals(2, indicesHealthValue.getActiveShards());
assertEquals(1, indicesHealthValue.getActivePrimaryShards());
assertEquals(0, indicesHealthValue.getInitializingShards());
assertEquals(indexHasMoreReplicas ? 1 : 0, indicesHealthValue.getUnassignedShards());
assertEquals(0, indicesHealthValue.getDelayedUnassignedShards());
assertEquals(0, indicesHealthValue.getRelocatingShards());
assertEquals(indexHasMoreReplicas ? ClusterHealthStatus.YELLOW : ClusterHealthStatus.GREEN, indicesHealthValue.getStatus());
Map<Integer, ClusterShardHealth> shards = indicesHealthValue.getShards();
assertFalse(shards.isEmpty());
assertEquals(1, shards.size());
for (Map.Entry<Integer, ClusterShardHealth> shardHealth : shards.entrySet()) {
ClusterShardHealth clusterShardHealth = shardHealth.getValue();
assertEquals(2, clusterShardHealth.getActiveShards());
assertEquals(indexHasMoreReplicas ? 1 : 0, clusterShardHealth.getUnassignedShards());
assertEquals(0, clusterShardHealth.getDelayedUnassignedShards());
assertEquals(0, clusterShardHealth.getRelocatingShards());
assertEquals(0, clusterShardHealth.getInitializingShards());
assertTrue(clusterShardHealth.isPrimaryActive());
assertEquals(indexHasMoreReplicas ? ClusterHealthStatus.YELLOW : ClusterHealthStatus.GREEN, clusterShardHealth.getStatus());
}
}
}

public void testHealthAtLevelParamEqualsAttributeAwarenessIsHonoured() {
createIndex(
"test1",
Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0).build()
);
ensureGreen();
ClusterHealthResponse healthResponse = client().admin()
.cluster()
.prepareHealth()
.setLevel("awareness_attributes")
.setApplyLevelAtTransportLayer(true)
.execute()
.actionGet();
assertEquals(ClusterHealthStatus.GREEN, healthResponse.getStatus());
assertTrue(healthResponse.getIndices().isEmpty());
assertNotNull(healthResponse.getClusterAwarenessHealth());
assertEquals(1, healthResponse.getActiveShards());
assertEquals(1, healthResponse.getActivePrimaryShards());
assertEquals(0, healthResponse.getUnassignedShards());
assertEquals(0, healthResponse.getInitializingShards());
assertEquals(0, healthResponse.getRelocatingShards());
assertEquals(0, healthResponse.getDelayedUnassignedShards());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,17 @@ public class ClusterHealthRequest extends ClusterManagerNodeReadRequest<ClusterH
*/
private Level level = Level.CLUSTER;

/**
* This flag will be used by the TransportClusterHealthAction to decide if indices/shards info is required in the ClusterHealthResponse or not.
* When the flag is disabled - indices/shard info will be returned in ClusterHealthResponse regardless of the health level requested.
* When the flag is enabled - indices/shards info will be set according to health level requested.
* For Level.CLUSTER (or) LevelAWARENESS_ATTRIBUTES - information on indices/shards will NOT be returned to the transport client
* For Level.INDICES - information on indices will be returned to the transport client.
* For Level.SHARDS - information on indices and shards will be returned to the transport client
* By default, the flag is disabled.
*/
private boolean applyLevelAtTransportLayer = false;

public ClusterHealthRequest() {}

public ClusterHealthRequest(String... indices) {
Expand Down Expand Up @@ -104,6 +115,9 @@ public ClusterHealthRequest(StreamInput in) throws IOException {
if (in.getVersion().onOrAfter(Version.V_2_6_0)) {
ensureNodeWeighedIn = in.readBoolean();
}
if (in.getVersion().onOrAfter(Version.V_3_0_0)) {
applyLevelAtTransportLayer = in.readBoolean();
}
}

@Override
Expand Down Expand Up @@ -139,6 +153,9 @@ public void writeTo(StreamOutput out) throws IOException {
if (out.getVersion().onOrAfter(Version.V_2_6_0)) {
out.writeBoolean(ensureNodeWeighedIn);
}
if (out.getVersion().onOrAfter(Version.V_3_0_0)) {
out.writeBoolean(applyLevelAtTransportLayer);
}
}

@Override
Expand Down Expand Up @@ -337,6 +354,14 @@ public final boolean ensureNodeWeighedIn() {
return ensureNodeWeighedIn;
}

public boolean isApplyLevelAtTransportLayer() {
return applyLevelAtTransportLayer;
}

public void setApplyLevelAtTransportLayer(boolean applyLevelAtTransportLayer) {
this.applyLevelAtTransportLayer = applyLevelAtTransportLayer;
}

@Override
public ActionRequestValidationException validate() {
if (level.equals(Level.AWARENESS_ATTRIBUTES) && indices.length > 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,4 +171,9 @@ public final ClusterHealthRequestBuilder setEnsureNodeWeighedIn(boolean ensureNo
request.ensureNodeWeighedIn(ensureNodeCommissioned);
return this;
}

public ClusterHealthRequestBuilder setApplyLevelAtTransportLayer(boolean applyLevelAtTransportLayer) {
request.setApplyLevelAtTransportLayer(applyLevelAtTransportLayer);
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,26 @@ public ClusterHealthResponse(
this.clusterHealthStatus = clusterStateHealth.getStatus();
}

public ClusterHealthResponse(
String clusterName,
String[] concreteIndices,
ClusterHealthRequest clusterHealthRequest,
ClusterState clusterState,
int numberOfPendingTasks,
int numberOfInFlightFetch,
TimeValue taskMaxWaitingTime
) {
this.clusterName = clusterName;
this.numberOfPendingTasks = numberOfPendingTasks;
this.numberOfInFlightFetch = numberOfInFlightFetch;
this.taskMaxWaitingTime = taskMaxWaitingTime;
this.clusterStateHealth = clusterHealthRequest.isApplyLevelAtTransportLayer()
? new ClusterStateHealth(clusterState, concreteIndices, clusterHealthRequest.level())
: new ClusterStateHealth(clusterState, concreteIndices);
this.clusterHealthStatus = clusterStateHealth.getStatus();
this.delayedUnassignedShards = clusterStateHealth.getDelayedUnassignedShards();
}

// Awareness Attribute health
public ClusterHealthResponse(
String clusterName,
Expand All @@ -261,6 +281,29 @@ public ClusterHealthResponse(
this.clusterAwarenessHealth = new ClusterAwarenessHealth(clusterState, clusterSettings, awarenessAttributeName);
}

public ClusterHealthResponse(
String clusterName,
ClusterHealthRequest clusterHealthRequest,
ClusterState clusterState,
ClusterSettings clusterSettings,
String[] concreteIndices,
String awarenessAttributeName,
int numberOfPendingTasks,
int numberOfInFlightFetch,
TimeValue taskMaxWaitingTime
) {
this(
clusterName,
concreteIndices,
clusterHealthRequest,
clusterState,
numberOfPendingTasks,
numberOfInFlightFetch,
taskMaxWaitingTime
);
this.clusterAwarenessHealth = new ClusterAwarenessHealth(clusterState, clusterSettings, awarenessAttributeName);
}

/**
* For XContent Parser and serialization tests
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@
import org.opensearch.cluster.metadata.ProcessClusterEventTimeoutException;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.routing.NodeWeighedAwayException;
import org.opensearch.cluster.routing.UnassignedInfo;
import org.opensearch.cluster.routing.WeightedRoutingUtils;
import org.opensearch.cluster.routing.allocation.AllocationService;
import org.opensearch.cluster.service.ClusterService;
Expand Down Expand Up @@ -487,7 +486,12 @@ private ClusterHealthResponse clusterHealth(
TimeValue pendingTaskTimeInQueue
) {
if (logger.isTraceEnabled()) {
logger.trace("Calculating health based on state version [{}]", clusterState.version());
logger.trace(
"Calculating health based on state version [{}] for health level [{}] and applyLevelAtTransportLayer set to [{}]",
clusterState.version(),
request.level(),
request.isApplyLevelAtTransportLayer()
);
}

String[] concreteIndices;
Expand All @@ -496,13 +500,13 @@ private ClusterHealthResponse clusterHealth(
concreteIndices = clusterState.getMetadata().getConcreteAllIndices();
return new ClusterHealthResponse(
clusterState.getClusterName().value(),
request,
clusterState,
clusterService.getClusterSettings(),
concreteIndices,
awarenessAttribute,
numberOfPendingTasks,
numberOfInFlightFetch,
UnassignedInfo.getNumberOfDelayedUnassigned(clusterState),
pendingTaskTimeInQueue
);
}
Expand All @@ -514,10 +518,10 @@ private ClusterHealthResponse clusterHealth(
ClusterHealthResponse response = new ClusterHealthResponse(
clusterState.getClusterName().value(),
Strings.EMPTY_ARRAY,
request,
clusterState,
numberOfPendingTasks,
numberOfInFlightFetch,
UnassignedInfo.getNumberOfDelayedUnassigned(clusterState),
pendingTaskTimeInQueue
);
response.setStatus(ClusterHealthStatus.RED);
Expand All @@ -527,10 +531,10 @@ private ClusterHealthResponse clusterHealth(
return new ClusterHealthResponse(
clusterState.getClusterName().value(),
concreteIndices,
request,
clusterState,
numberOfPendingTasks,
numberOfInFlightFetch,
UnassignedInfo.getNumberOfDelayedUnassigned(clusterState),
pendingTaskTimeInQueue
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@

import org.apache.lucene.store.AlreadyClosedException;
import org.opensearch.action.FailedNodeException;
import org.opensearch.action.admin.cluster.health.ClusterHealthRequest;
import org.opensearch.action.admin.cluster.node.info.NodeInfo;
import org.opensearch.action.admin.cluster.node.stats.NodeStats;
import org.opensearch.action.admin.indices.stats.CommonStats;
Expand Down Expand Up @@ -209,7 +210,7 @@ protected ClusterStatsNodeResponse nodeOperation(ClusterStatsNodeRequest nodeReq

ClusterHealthStatus clusterStatus = null;
if (clusterService.state().nodes().isLocalNodeElectedClusterManager()) {
clusterStatus = new ClusterStateHealth(clusterService.state()).getStatus();
clusterStatus = new ClusterStateHealth(clusterService.state(), ClusterHealthRequest.Level.CLUSTER).getStatus();
}

return new ClusterStatsNodeResponse(
Expand Down
Loading

0 comments on commit d5c5869

Please sign in to comment.