Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add cluster-wide shard limit #32856

Closed
wants to merge 12 commits into from
29 changes: 28 additions & 1 deletion docs/reference/modules/cluster/misc.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,33 @@ user with access to the <<cluster-update-settings,cluster-update-settings>>
API can make the cluster read-write again.


[[cluster-shard-limit]]

There is a cap on the number of shards in a cluster, based on the number of
nodes in the cluster. If an operation, such as creating a new index, restoring
a snapshot of an index, or opening a closed index would lead to the number of
shards in the cluster going over this limit, the operation will fail.

If the cluster is already over the cap, due to changes in node membership or
setting changes, all operations that create or open indices will fail until
the cap is increased as described below, or some indices are closed or deleted
to bring the number of shards below the cap.

Replicas count towards this cap, but closed indexes do not. An index with 5
primary shards and 2 replicas will be counted as 15 shards. Any closed index
is counted as 0, no matter how many shards and replicas it contains.

The limit defaults to 1,000 shards per node, and be dynamically adjusted using
the following property:

`cluster.shards.max_per_node`::

Controls the number of shards allowed in the cluster per node.

For example, a 3-node cluster with the default setting would allow 3,000 shards
total, across all open indexes. If the above setting is changed to 1,500, then
the cluster would allow 4,500 shards total.

[[cluster-max-tombstones]]
==== Index Tombstones

Expand Down Expand Up @@ -82,4 +109,4 @@ Enable or disable allocation for persistent tasks:
This setting does not affect the persistent tasks that are already being executed.
Only newly created persistent tasks, or tasks that must be reassigned (after a node
left the cluster, for example), are impacted by this setting.
--
--
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,9 @@ public interface Custom extends NamedDiffable<Custom>, ToXContentFragment, Clust

}

public static final Setting<Integer> SETTING_CLUSTER_MAX_SHARDS_PER_NODE =
Setting.intSetting("cluster.shards.max_per_node", 1000, 1, Property.Dynamic, Property.NodeScope);

public static final Setting<Boolean> SETTING_READ_ONLY_SETTING =
Setting.boolSetting("cluster.blocks.read_only", false, Property.Dynamic, Property.NodeScope);

Expand Down Expand Up @@ -162,6 +165,7 @@ public interface Custom extends NamedDiffable<Custom>, ToXContentFragment, Clust
private final ImmutableOpenMap<String, Custom> customs;

private final transient int totalNumberOfShards; // Transient ? not serializable anyway?
private final int totalOpenIndexShards;
private final int numberOfShards;

private final String[] allIndices;
Expand All @@ -183,12 +187,17 @@ public interface Custom extends NamedDiffable<Custom>, ToXContentFragment, Clust
this.customs = customs;
this.templates = templates;
int totalNumberOfShards = 0;
int totalOpenIndexShards = 0;
int numberOfShards = 0;
for (ObjectCursor<IndexMetaData> cursor : indices.values()) {
totalNumberOfShards += cursor.value.getTotalNumberOfShards();
numberOfShards += cursor.value.getNumberOfShards();
if (IndexMetaData.State.OPEN.equals(cursor.value.getState())) {
totalOpenIndexShards += cursor.value.getTotalNumberOfShards();
}
}
this.totalNumberOfShards = totalNumberOfShards;
this.totalOpenIndexShards = totalOpenIndexShards;
this.numberOfShards = numberOfShards;

this.allIndices = allIndices;
Expand Down Expand Up @@ -661,10 +670,29 @@ public <T extends Custom> T custom(String type) {
}


/**
* Gets the total number of shards from all indices, including replicas and
* closed indices.
* @return The total number shards from all indices.
*/
public int getTotalNumberOfShards() {
return this.totalNumberOfShards;
}

/**
* Gets the total number of active shards from all indices. Includes
* replicas, but does not include shards that are part of closed indices.
* @return The total number of active shards from all indices.
*/
public int getTotalOpenIndexShards() {
return this.totalOpenIndexShards;
}

/**
* Gets the number of primary shards from all indices, not including
* replicas.
* @return The number of primary shards from all indices.
*/
public int getNumberOfShards() {
return this.numberOfShards;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiFunction;
Expand Down Expand Up @@ -597,18 +598,45 @@ public void onFailure(String source, Exception e) {

private void validate(CreateIndexClusterStateUpdateRequest request, ClusterState state) {
validateIndexName(request.index(), state);
validateIndexSettings(request.index(), request.settings());
validateIndexSettings(request.index(), request.settings(), state);
}

public void validateIndexSettings(String indexName, Settings settings) throws IndexCreationException {
public void validateIndexSettings(String indexName, Settings settings, ClusterState clusterState) throws IndexCreationException {
List<String> validationErrors = getIndexSettingsValidationErrors(settings);

Optional<String> shardAllocation = getShardLimitError(indexName, settings, clusterState);
shardAllocation.ifPresent(validationErrors::add);

if (validationErrors.isEmpty() == false) {
ValidationException validationException = new ValidationException();
validationException.addValidationErrors(validationErrors);
throw new IndexCreationException(indexName, validationException);
}
}

private Optional<String> getShardLimitError(String indexName, Settings settings, ClusterState clusterState) {
int nodeCount = clusterState.getNodes().getDataNodes().size();

// Only enforce the shard limit if we have at least one data node, so that we don't block
// index creation during cluster setup
if (nodeCount == 0) {
return Optional.empty();
}

int currentOpenShards = clusterState.getMetaData().getTotalOpenIndexShards();
int maxShardsPerNode = clusterService.getClusterSettings().get(MetaData.SETTING_CLUSTER_MAX_SHARDS_PER_NODE);
long maxShardsInCluster = maxShardsPerNode * nodeCount;
int shardsToCreate = IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.get(settings)
* (1 + IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.get(settings));

if ((currentOpenShards + shardsToCreate) > maxShardsInCluster) {
return Optional.of("creating [" + indexName + "] "
+ "would create " + shardsToCreate + " total shards, but this cluster currently has "
+ currentOpenShards + "/" + maxShardsInCluster + " maximum shards open");
}
return Optional.empty();
}

List<String> getIndexSettingsValidationErrors(Settings settings) {
String customPath = IndexMetaData.INDEX_DATA_PATH_SETTING.get(settings);
List<String> validationErrors = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.admin.indices.close.CloseIndexClusterStateUpdateRequest;
import org.elasticsearch.action.admin.indices.open.OpenIndexClusterStateUpdateRequest;
import org.elasticsearch.action.support.ActiveShardsObserver;
Expand Down Expand Up @@ -175,6 +176,8 @@ public ClusterState execute(ClusterState currentState) {
}
}

validateShardLimit(currentState, request);

if (indicesToOpen.isEmpty()) {
return currentState;
}
Expand Down Expand Up @@ -217,4 +220,36 @@ public ClusterState execute(ClusterState currentState) {
});
}

private void validateShardLimit(ClusterState currentState, OpenIndexClusterStateUpdateRequest request) {
int nodeCount = currentState.getNodes().getDataNodes().size();

// Only enforce the shard limit if we have at least one data node, so that we don't block
// index creation during cluster setup
if (nodeCount == 0) {
return;
}

int currentOpenShards = currentState.getMetaData().getTotalOpenIndexShards();
int maxShardsPerNode = clusterService.getClusterSettings().get(MetaData.SETTING_CLUSTER_MAX_SHARDS_PER_NODE);
int maxShardsInCluster = maxShardsPerNode * nodeCount;
int shardsToOpen = Arrays.stream(request.indices())
.mapToInt(index -> getTotalShardCount(currentState, index))
.sum();

if ((currentOpenShards + shardsToOpen) > maxShardsInCluster) {
ActionRequestValidationException exception = new ActionRequestValidationException();
String[] indexNames = Arrays.stream(request.indices()).map(Index::getName).toArray(String[]::new);
exception.addValidationError("opening " + Arrays.toString(indexNames)
+ " would open " + shardsToOpen + " total shards, but this cluster currently has "
+ currentOpenShards + "/" + maxShardsInCluster + " maximum shards open");
throw exception;
}
}


private int getTotalShardCount(ClusterState state, Index index) {
IndexMetaData indexMetaData = state.metaData().index(index);
return indexMetaData.getNumberOfShards() * (1 + indexMetaData.getNumberOfReplicas());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@ public void apply(Settings value, Settings current, Settings previous) {
MappingUpdatedAction.INDICES_MAPPING_DYNAMIC_TIMEOUT_SETTING,
MetaData.SETTING_READ_ONLY_SETTING,
MetaData.SETTING_READ_ONLY_ALLOW_DELETE_SETTING,
MetaData.SETTING_CLUSTER_MAX_SHARDS_PER_NODE,
RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING,
RecoverySettings.INDICES_RECOVERY_RETRY_DELAY_STATE_SYNC_SETTING,
RecoverySettings.INDICES_RECOVERY_RETRY_DELAY_NETWORK_SETTING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ public ClusterState execute(ClusterState currentState) {
// Index doesn't exist - create it and start recovery
// Make sure that the index we are about to create has a validate name
MetaDataCreateIndexService.validateIndexName(renamedIndexName, currentState);
createIndexService.validateIndexSettings(renamedIndexName, snapshotIndexMetaData.getSettings());
createIndexService.validateIndexSettings(renamedIndexName, snapshotIndexMetaData.getSettings(), currentState);
IndexMetaData.Builder indexMdBuilder = IndexMetaData.builder(snapshotIndexMetaData).state(IndexMetaData.State.OPEN).index(renamedIndexName);
indexMdBuilder.settings(Settings.builder().put(snapshotIndexMetaData.getSettings()).put(IndexMetaData.SETTING_INDEX_UUID, UUIDs.randomBase64UUID()));
if (!request.includeAliases() && !snapshotIndexMetaData.getAliases().isEmpty()) {
Expand Down
Loading