Skip to content

Commit

Permalink
Merge branch '2.x' into backport/backport-5605-to-2.x
Browse files Browse the repository at this point in the history
  • Loading branch information
PritLadani committed Feb 7, 2023
2 parents a77f009 + 7164ef2 commit a73f3a3
Show file tree
Hide file tree
Showing 6 changed files with 250 additions and 36 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add support to disallow search request with preference parameter with strict weighted shard routing([#5874](https://github.com/opensearch-project/OpenSearch/pull/5874))
- Added support to apply index create block ([#4603](https://github.com/opensearch-project/OpenSearch/issues/4603))
- Adds support for minimum compatible version for extensions ([#6003](https://github.com/opensearch-project/OpenSearch/pull/6003))
- Add a guardrail to limit maximum number of shard on the cluster ([#6143](https://github.com/opensearch-project/OpenSearch/pull/6143))
- Add cancellation of in-flight SearchTasks based on resource consumption ([#5606](https://github.com/opensearch-project/OpenSearch/pull/5605))

### Dependencies
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.opensearch.action.support.master.AcknowledgedResponse;
import org.opensearch.client.Client;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.common.Priority;
import org.opensearch.common.network.NetworkModule;
Expand Down Expand Up @@ -68,19 +69,21 @@

import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS;
import static org.opensearch.indices.ShardLimitValidator.SETTING_CLUSTER_MAX_SHARDS_PER_NODE;
import static org.opensearch.indices.ShardLimitValidator.SETTING_MAX_SHARDS_PER_CLUSTER_KEY;
import static org.opensearch.test.NodeRoles.dataNode;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST)
public class ClusterShardLimitIT extends OpenSearchIntegTestCase {
private static final String shardsPerNodeKey = ShardLimitValidator.SETTING_CLUSTER_MAX_SHARDS_PER_NODE.getKey();
private static final String shardsPerNodeKey = SETTING_CLUSTER_MAX_SHARDS_PER_NODE.getKey();
private static final String ignoreDotIndexKey = ShardLimitValidator.SETTING_CLUSTER_IGNORE_DOT_INDEXES.getKey();

public void testSettingClusterMaxShards() {
int shardsPerNode = between(1, 500_000);
setShardsPerNode(shardsPerNode);
setMaxShardLimit(shardsPerNode, shardsPerNodeKey);
}

public void testSettingIgnoreDotIndexes() {
Expand Down Expand Up @@ -118,7 +121,7 @@ public void testIndexCreationOverLimit() {

ShardCounts counts = ShardCounts.forDataNodeCount(dataNodes);

setShardsPerNode(counts.getShardsPerNode());
setMaxShardLimit(counts.getShardsPerNode(), shardsPerNodeKey);
// Create an index that will bring us up to the limit
createIndex(
"test",
Expand Down Expand Up @@ -155,7 +158,7 @@ public void testIndexCreationOverLimitForDotIndexesSucceeds() {
int dataNodes = client().admin().cluster().prepareState().get().getState().getNodes().getDataNodes().size();

// Setting the cluster.max_shards_per_node setting according to the data node count.
setShardsPerNode(dataNodes);
setMaxShardLimit(dataNodes, shardsPerNodeKey);
setIgnoreDotIndex(true);

/*
Expand All @@ -176,9 +179,7 @@ public void testIndexCreationOverLimitForDotIndexesSucceeds() {

// Getting cluster.max_shards_per_node setting
ClusterState clusterState = client().admin().cluster().prepareState().get().getState();
String maxShardsPerNode = clusterState.getMetadata()
.settings()
.get(ShardLimitValidator.SETTING_CLUSTER_MAX_SHARDS_PER_NODE.getKey());
String maxShardsPerNode = clusterState.getMetadata().settings().get(SETTING_CLUSTER_MAX_SHARDS_PER_NODE.getKey());

// Checking if the total shards created are equivalent to dataNodes * cluster.max_shards_per_node
assertEquals(dataNodes * Integer.parseInt(maxShardsPerNode), currentActiveShards);
Expand All @@ -203,7 +204,7 @@ public void testIndexCreationOverLimitForDotIndexesFail() {
int maxAllowedShards = dataNodes * dataNodes;

// Setting the cluster.max_shards_per_node setting according to the data node count.
setShardsPerNode(dataNodes);
setMaxShardLimit(dataNodes, shardsPerNodeKey);

/*
Create an index that will bring us up to the limit. It would create index with primary equal to the
Expand All @@ -223,9 +224,7 @@ public void testIndexCreationOverLimitForDotIndexesFail() {

// Getting cluster.max_shards_per_node setting
ClusterState clusterState = client().admin().cluster().prepareState().get().getState();
String maxShardsPerNode = clusterState.getMetadata()
.settings()
.get(ShardLimitValidator.SETTING_CLUSTER_MAX_SHARDS_PER_NODE.getKey());
String maxShardsPerNode = clusterState.getMetadata().settings().get(SETTING_CLUSTER_MAX_SHARDS_PER_NODE.getKey());

// Checking if the total shards created are equivalent to dataNodes * cluster.max_shards_per_node
assertEquals(dataNodes * Integer.parseInt(maxShardsPerNode), currentActiveShards);
Expand All @@ -247,6 +246,27 @@ public void testIndexCreationOverLimitForDotIndexesFail() {
assertFalse(clusterState.getMetadata().hasIndex(".test-index"));
}

public void testCreateIndexWithMaxClusterShardSetting() {
int dataNodes = client().admin().cluster().prepareState().get().getState().getNodes().getDataNodes().size();
ClusterState clusterState = client().admin().cluster().prepareState().get().getState();
setMaxShardLimit(dataNodes, shardsPerNodeKey);

int maxAllowedShards = dataNodes + 1;
int extraShardCount = maxAllowedShards + 1;
// Getting total active shards in the cluster.
int currentActiveShards = client().admin().cluster().prepareHealth().get().getActiveShards();
try {
setMaxShardLimit(maxAllowedShards, SETTING_MAX_SHARDS_PER_CLUSTER_KEY);
prepareCreate("test_index_with_cluster_shard_limit").setSettings(
Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, extraShardCount).put(SETTING_NUMBER_OF_REPLICAS, 0).build()
).get();
} catch (final IllegalArgumentException ex) {
verifyException(Math.min(maxAllowedShards, dataNodes * dataNodes), currentActiveShards, extraShardCount, ex);
} finally {
setMaxShardLimit(-1, SETTING_MAX_SHARDS_PER_CLUSTER_KEY);
}
}

/**
* The test checks if the index starting with the .ds- can be created if the node has
* number of shards equivalent to the cluster.max_shards_per_node and the cluster.ignore_Dot_indexes
Expand All @@ -258,7 +278,7 @@ public void testIndexCreationOverLimitForDataStreamIndexes() {
int maxAllowedShards = dataNodes * dataNodes;

// Setting the cluster.max_shards_per_node setting according to the data node count.
setShardsPerNode(dataNodes);
setMaxShardLimit(dataNodes, shardsPerNodeKey);
setIgnoreDotIndex(true);

/*
Expand All @@ -279,9 +299,7 @@ public void testIndexCreationOverLimitForDataStreamIndexes() {

// Getting cluster.max_shards_per_node setting
ClusterState clusterState = client().admin().cluster().prepareState().get().getState();
String maxShardsPerNode = clusterState.getMetadata()
.settings()
.get(ShardLimitValidator.SETTING_CLUSTER_MAX_SHARDS_PER_NODE.getKey());
String maxShardsPerNode = clusterState.getMetadata().settings().get(SETTING_CLUSTER_MAX_SHARDS_PER_NODE.getKey());

// Checking if the total shards created are equivalent to dataNodes * cluster.max_shards_per_node
assertEquals(dataNodes * Integer.parseInt(maxShardsPerNode), currentActiveShards);
Expand All @@ -308,7 +326,7 @@ public void testIndexCreationOverLimitFromTemplate() {

final ShardCounts counts = ShardCounts.forDataNodeCount(dataNodes);

setShardsPerNode(counts.getShardsPerNode());
setMaxShardLimit(counts.getShardsPerNode(), shardsPerNodeKey);

if (counts.getFirstIndexShards() > 0) {
createIndex(
Expand Down Expand Up @@ -351,7 +369,7 @@ public void testIncreaseReplicasOverLimit() {

int firstShardCount = between(2, 10);
int shardsPerNode = firstShardCount - 1;
setShardsPerNode(shardsPerNode);
setMaxShardLimit(shardsPerNode, shardsPerNodeKey);

prepareCreate(
"growing-should-fail",
Expand Down Expand Up @@ -397,7 +415,7 @@ public void testChangingMultipleIndicesOverLimit() {
int secondIndexReplicas = dataNodes;

int shardsPerNode = firstIndexFactor + (secondIndexFactor * (1 + secondIndexReplicas));
setShardsPerNode(shardsPerNode);
setMaxShardLimit(shardsPerNode, shardsPerNodeKey);

createIndex(
"test-1-index",
Expand Down Expand Up @@ -448,7 +466,7 @@ public void testPreserveExistingSkipsCheck() {

int firstShardCount = between(2, 10);
int shardsPerNode = firstShardCount - 1;
setShardsPerNode(shardsPerNode);
setMaxShardLimit(shardsPerNode, shardsPerNodeKey);

prepareCreate(
"test-index",
Expand Down Expand Up @@ -521,7 +539,7 @@ public void testRestoreSnapshotOverLimit() {
cluster().wipeIndices("snapshot-index");

// Reduce the shard limit and fill it up
setShardsPerNode(counts.getShardsPerNode());
setMaxShardLimit(counts.getShardsPerNode(), shardsPerNodeKey);
createIndex(
"test-fill",
Settings.builder()
Expand Down Expand Up @@ -570,7 +588,7 @@ public void testOpenIndexOverLimit() {
assertTrue(closeIndexResponse.isAcknowledged());

// Fill up the cluster
setShardsPerNode(counts.getShardsPerNode());
setMaxShardLimit(counts.getShardsPerNode(), shardsPerNodeKey);
createIndex(
"test-fill",
Settings.builder()
Expand Down Expand Up @@ -704,27 +722,34 @@ private int ensureMultipleDataNodes(int dataNodes) {
return dataNodes;
}

private void setShardsPerNode(int shardsPerNode) {
/**
* Set max shard limit on either per node level or on cluster level.
*
* @param limit the limit value to set.
* @param key node level or cluster level setting key.
*/
private void setMaxShardLimit(int limit, String key) {
try {
ClusterUpdateSettingsResponse response;
if (frequently()) {
response = client().admin()
.cluster()
.prepareUpdateSettings()
.setPersistentSettings(Settings.builder().put(shardsPerNodeKey, shardsPerNode).build())
.setPersistentSettings(Settings.builder().put(key, limit).build())
.get();
assertEquals(shardsPerNode, response.getPersistentSettings().getAsInt(shardsPerNodeKey, -1).intValue());
assertEquals(limit, response.getPersistentSettings().getAsInt(key, -1).intValue());
} else {
response = client().admin()
.cluster()
.prepareUpdateSettings()
.setTransientSettings(Settings.builder().put(shardsPerNodeKey, shardsPerNode).build())
.setTransientSettings(Settings.builder().put(key, limit).build())
.get();
assertEquals(shardsPerNode, response.getTransientSettings().getAsInt(shardsPerNodeKey, -1).intValue());
assertEquals(limit, response.getTransientSettings().getAsInt(key, -1).intValue());
}
} catch (IllegalArgumentException ex) {
fail(ex.getMessage());
}

}

private void setIgnoreDotIndex(boolean ignoreDotIndex) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,7 @@ public void apply(Settings value, Settings current, Settings previous) {
Metadata.DEFAULT_REPLICA_COUNT_SETTING,
Metadata.SETTING_CREATE_INDEX_BLOCK_SETTING,
ShardLimitValidator.SETTING_CLUSTER_MAX_SHARDS_PER_NODE,
ShardLimitValidator.SETTING_CLUSTER_MAX_SHARDS_PER_CLUSTER,
ShardLimitValidator.SETTING_CLUSTER_IGNORE_DOT_INDEXES,
RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING,
RecoverySettings.INDICES_RECOVERY_RETRY_DELAY_STATE_SYNC_SETTING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
import org.opensearch.transport.TransportService;
import org.yaml.snakeyaml.Yaml;
import org.opensearch.env.EnvironmentSettingsResponse;
import org.yaml.snakeyaml.constructor.SafeConstructor;

/**
* The main class for managing Extension communication with the OpenSearch Node.
Expand Down Expand Up @@ -555,7 +556,7 @@ public String executor() {
}

private ExtensionsSettings readFromExtensionsYml(Path filePath) throws IOException {
Yaml yaml = new Yaml();
Yaml yaml = new Yaml(new SafeConstructor());
try (InputStream inputStream = Files.newInputStream(filePath)) {
Map<String, Object> obj = yaml.load(inputStream);
if (obj == null) {
Expand Down
Loading

0 comments on commit a73f3a3

Please sign in to comment.