Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add cluster-wide shard limit warnings #34021

Merged
merged 12 commits into from
Oct 23, 2018
6 changes: 6 additions & 0 deletions docs/reference/migration/migrate_7_0/cluster.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,9 @@ primary shards of the opened index to be allocated.

==== Shard preferences `_primary`, `_primary_first`, `_replica`, and `_replica_first` are removed
These shard preferences are removed in favour of the `_prefer_nodes` and `_only_nodes` preferences.

==== Cluster-wide shard soft limit
Clusters now have soft limits on the total number of open shards in the cluster
based on the number of nodes and the `cluster.shards.max_per_node` cluster
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about cluster.shards.max_per_node -> cluster.max_shards_per_node`.

setting, to prevent accidental operations that would destabilize the cluster.
More information can be found in the <<misc-cluster,documentation for that setting>>.
44 changes: 43 additions & 1 deletion docs/reference/modules/cluster/misc.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,48 @@ user with access to the <<cluster-update-settings,cluster-update-settings>>
API can make the cluster read-write again.


[[cluster-shard-limit]]

==== Cluster Shard Limit

In a Elasticsearch 7.0 and later, there will be a soft cap on the number of
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we call it a "soft limit" to be in line with the terminology on similar settings elsewhere?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I'll change all the instances of "cap" to "limit" - thanks!

shards in a cluster, based on the number of nodes in the cluster. This is
intended to prevent operations which may unintentionally destabilize the
cluster. Until 7.0, actions which would result in the cluster going over the
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Until -> Prior to

limit will issue a deprecation warning. Strict enforcement of this behavior can
be enabled by setting the cluster setting `cluster.shards.enforce_max_per_node`
to `true`.

NOTE: `cluster.shards.enforce_max_per_node` cannot be set to `false`, as this
setting will be removed in 7.0 and the limit will always be enforced. To return
to the default behavior for your Elasticsearch version, set this setting to
`"default"`.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to have a "default" value for the setting here? There are two reasons why I am concerned about having this:

  1. It means the setting accepts values of different types (boolean and String) which we have tried to avoid and remove instances of in other APIs
  2. Users who set the setting to "default" explicitly are going to need to make a subsequent change to their settings in 8.0 (I presume?) to remove the setting which will no longer be valid

Instead could we maybe have the default behaviour enabled if the setting is not set, meaning that users who want to maintain the default behaviour through the version changes don't end up defining this setting and so don't need to make any setting changes at all?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did this based on a conversation with @jasontedor a while ago, where he made very similar comments, but I think I misunderstood what he was suggesting at the time. I'll reevaluate this setting and change it as appropriate.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just talked with @jasontedor again - this setting is going to go away and become a system property, which can only be unset or true.


If an operation, such as creating a new index, restoring a snapshot of an index,
or opening a closed index would lead to the number of shards in the cluster
going over this limit, the operation will issue a deprecation warning, or fail
if `cluster.shards.enforce_max_per_node` is set.

If the cluster is already over the cap, due to changes in node membership or
setting changes, all operations that create or open indices will issue warnings
or fail until either the cap is increased as described below, or some indices
are closed or deleted to bring the number of shards below the cap.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As above I wonder if we should use "limit" instead of "cap"?


Replicas count towards this cap, but closed indexes do not. An index with 5
primary shards and 2 replicas will be counted as 15 shards. Any closed index
is counted as 0, no matter how many shards and replicas it contains.

The limit defaults to 1,000 shards per node, and be dynamically adjusted using
the following property:

`cluster.shards.max_per_node`::
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am doubting whether this needs to be in a shards namespace. How about cluster.shards.max_per_node -> cluster.max_shards_per_node.


Controls the number of shards allowed in the cluster per node.

For example, a 3-node cluster with the default setting would allow 3,000 shards
total, across all open indexes. If the above setting is changed to 1,500, then
the cluster would allow 4,500 shards total.

[[user-defined-data]]
==== User Defined Cluster Metadata

Expand Down Expand Up @@ -103,4 +145,4 @@ Enable or disable allocation for persistent tasks:
This setting does not affect the persistent tasks that are already being executed.
Only newly created persistent tasks, or tasks that must be reassigned (after a node
left the cluster, for example), are impacted by this setting.
--
--
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import com.carrotsearch.hppc.ObjectHashSet;
import com.carrotsearch.hppc.cursors.ObjectCursor;
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;

import org.apache.logging.log4j.Logger;
import org.apache.lucene.util.CollectionUtil;
import org.elasticsearch.action.AliasesRequest;
Expand Down Expand Up @@ -127,6 +126,20 @@ public interface Custom extends NamedDiffable<Custom>, ToXContentFragment, Clust

}

public static final Setting<Integer> SETTING_CLUSTER_MAX_SHARDS_PER_NODE =
Setting.intSetting("cluster.shards.max_per_node", 1000, 1, Property.Dynamic, Property.NodeScope);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cluster.shards.max_per_node -> cluster.max_shards_per_node.

public static final Setting<Boolean> SETTING_ENFORCE_CLUSTER_MAX_SHARDS_PER_NODE =
new Setting<>("cluster.shards.enforce_max_per_node", "default", (s) -> {
switch (s) {
case "default":
return false;
case "true":
return true;
default:
throw new IllegalArgumentException("unrecognized [cluster.shards.enforce_max_per_node] \"" + s + "\": must be default or true");
}
}, Property.Dynamic, Property.NodeScope);

public static final Setting<Boolean> SETTING_READ_ONLY_SETTING =
Setting.boolSetting("cluster.blocks.read_only", false, Property.Dynamic, Property.NodeScope);

Expand Down Expand Up @@ -162,6 +175,7 @@ public interface Custom extends NamedDiffable<Custom>, ToXContentFragment, Clust
private final ImmutableOpenMap<String, Custom> customs;

private final transient int totalNumberOfShards; // Transient ? not serializable anyway?
private final int totalOpenIndexShards;
private final int numberOfShards;

private final String[] allIndices;
Expand All @@ -183,12 +197,17 @@ public interface Custom extends NamedDiffable<Custom>, ToXContentFragment, Clust
this.customs = customs;
this.templates = templates;
int totalNumberOfShards = 0;
int totalOpenIndexShards = 0;
int numberOfShards = 0;
for (ObjectCursor<IndexMetaData> cursor : indices.values()) {
totalNumberOfShards += cursor.value.getTotalNumberOfShards();
numberOfShards += cursor.value.getNumberOfShards();
if (IndexMetaData.State.OPEN.equals(cursor.value.getState())) {
totalOpenIndexShards += cursor.value.getTotalNumberOfShards();
}
}
this.totalNumberOfShards = totalNumberOfShards;
this.totalOpenIndexShards = totalOpenIndexShards;
this.numberOfShards = numberOfShards;

this.allIndices = allIndices;
Expand Down Expand Up @@ -676,10 +695,29 @@ public <T extends Custom> T custom(String type) {
}


/**
* Gets the total number of shards from all indices, including replicas and
* closed indices.
* @return The total number shards from all indices.
*/
public int getTotalNumberOfShards() {
return this.totalNumberOfShards;
}

/**
* Gets the total number of active shards from all indices. Includes
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not all open indices are active. Let us be precise here and say open.

* replicas, but does not include shards that are part of closed indices.
* @return The total number of active shards from all indices.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

active -> open

*/
public int getTotalOpenIndexShards() {
return this.totalOpenIndexShards;
}

/**
* Gets the number of primary shards from all indices, not including
* replicas.
* @return The number of primary shards from all indices.
*/
public int getNumberOfShards() {
return this.numberOfShards;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.compress.CompressedXContent;
import org.elasticsearch.common.io.PathUtils;
import org.elasticsearch.common.logging.DeprecationLogger;
import org.elasticsearch.common.settings.IndexScopedSettings;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
Expand Down Expand Up @@ -82,6 +83,7 @@
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiFunction;
Expand Down Expand Up @@ -587,19 +589,29 @@ public void onFailure(String source, Exception e) {

private void validate(CreateIndexClusterStateUpdateRequest request, ClusterState state) {
validateIndexName(request.index(), state);
validateIndexSettings(request.index(), request.settings(), forbidPrivateIndexSettings);
validateIndexSettings(request.index(), request.settings(), state, forbidPrivateIndexSettings);
}

public void validateIndexSettings(
final String indexName, final Settings settings, final boolean forbidPrivateIndexSettings) throws IndexCreationException {
public void validateIndexSettings(String indexName, final Settings settings, ClusterState clusterState, final boolean forbidPrivateIndexSettings) throws IndexCreationException {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

clusterState can be final.

List<String> validationErrors = getIndexSettingsValidationErrors(settings, forbidPrivateIndexSettings);

Optional<String> shardAllocation = checkShardLimit(settings, clusterState, deprecationLogger);
shardAllocation.ifPresent(validationErrors::add);

if (validationErrors.isEmpty() == false) {
ValidationException validationException = new ValidationException();
validationException.addValidationErrors(validationErrors);
throw new IndexCreationException(indexName, validationException);
}
}

static Optional<String> checkShardLimit(Settings settings, ClusterState clusterState, DeprecationLogger deprecationLogger) {
gwbrown marked this conversation as resolved.
Show resolved Hide resolved
int shardsToCreate = IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.get(settings)
* (1 + IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.get(settings));

return IndicesService.checkShardLimit(shardsToCreate, clusterState, deprecationLogger);
}

List<String> getIndexSettingsValidationErrors(final Settings settings, final boolean forbidPrivateIndexSettings) {
String customPath = IndexMetaData.INDEX_DATA_PATH_SETTING.get(settings);
List<String> validationErrors = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,10 @@
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.ValidationException;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.logging.DeprecationLogger;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.Index;
import org.elasticsearch.indices.IndicesService;
Expand All @@ -50,6 +52,7 @@
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;

/**
Expand Down Expand Up @@ -175,6 +178,8 @@ public ClusterState execute(ClusterState currentState) {
}
}

validateShardLimit(currentState, request.indices(), deprecationLogger);

if (indicesToOpen.isEmpty()) {
return currentState;
}
Expand Down Expand Up @@ -217,4 +222,25 @@ public ClusterState execute(ClusterState currentState) {
});
}

static void validateShardLimit(ClusterState currentState, Index[] indices, DeprecationLogger deprecationLogger) {
gwbrown marked this conversation as resolved.
Show resolved Hide resolved
int shardsToOpen = Arrays.stream(indices)
.filter(index -> currentState.metaData().index(index).getState().equals(IndexMetaData.State.CLOSE))
.mapToInt(index -> getTotalShardCount(currentState, index))
.sum();

Optional<String> error = IndicesService.checkShardLimit(shardsToOpen, currentState, deprecationLogger);
if (error.isPresent()) {
ValidationException ex = new ValidationException();
ex.addValidationError(error.get());
throw ex;
}

}


Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is an extraneous newline here.

private static int getTotalShardCount(ClusterState state, Index index) {
IndexMetaData indexMetaData = state.metaData().index(index);
return indexMetaData.getNumberOfShards() * (1 + indexMetaData.getNumberOfReplicas());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.ValidationException;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
Expand All @@ -45,9 +46,11 @@
import org.elasticsearch.threadpool.ThreadPool;

import java.io.IOException;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.Set;

import static org.elasticsearch.action.support.ContextPreservingActionListener.wrapPreservingContext;
Expand Down Expand Up @@ -114,6 +117,7 @@ protected ClusterStateUpdateResponse newResponse(boolean acknowledged) {

@Override
public ClusterState execute(ClusterState currentState) {

RoutingTable.Builder routingTableBuilder = RoutingTable.builder(currentState.routingTable());
MetaData.Builder metaDataBuilder = MetaData.builder(currentState.metaData());

Expand All @@ -140,6 +144,18 @@ public ClusterState execute(ClusterState currentState) {

int updatedNumberOfReplicas = openSettings.getAsInt(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, -1);
if (updatedNumberOfReplicas != -1 && preserveExisting == false) {

// Verify that this won't take us over the cluster shard limit.
int totalNewShards = Arrays.stream(request.indices())
.mapToInt(i -> getTotalNewShards(i, currentState, updatedNumberOfReplicas))
.sum();
Optional<String> error = IndicesService.checkShardLimit(totalNewShards, currentState, deprecationLogger);
if (error.isPresent()) {
ValidationException ex = new ValidationException();
ex.addValidationError(error.get());
throw ex;
}

// we do *not* update the in sync allocation ids as they will be removed upon the first index
// operation which make these copies stale
// TODO: update the list once the data is deleted by the node?
Expand Down Expand Up @@ -215,6 +231,14 @@ public ClusterState execute(ClusterState currentState) {
});
}

private int getTotalNewShards(Index index, ClusterState currentState, int updatedNumberOfReplicas) {
IndexMetaData indexMetaData = currentState.metaData().index(index);
int shardsInIndex = indexMetaData.getNumberOfShards();
int oldNumberOfReplicas = indexMetaData.getNumberOfReplicas();
int replicaIncrease = updatedNumberOfReplicas - oldNumberOfReplicas;
return replicaIncrease * shardsInIndex;
}

/**
* Updates the cluster block only iff the setting exists in the given settings
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,8 @@ public void apply(Settings value, Settings current, Settings previous) {
MappingUpdatedAction.INDICES_MAPPING_DYNAMIC_TIMEOUT_SETTING,
MetaData.SETTING_READ_ONLY_SETTING,
MetaData.SETTING_READ_ONLY_ALLOW_DELETE_SETTING,
MetaData.SETTING_CLUSTER_MAX_SHARDS_PER_NODE,
MetaData.SETTING_ENFORCE_CLUSTER_MAX_SHARDS_PER_NODE,
RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING,
RecoverySettings.INDICES_RECOVERY_RETRY_DELAY_STATE_SYNC_SETTING,
RecoverySettings.INDICES_RECOVERY_RETRY_DELAY_NETWORK_SETTING,
Expand Down
40 changes: 40 additions & 0 deletions server/src/main/java/org/elasticsearch/indices/IndicesService.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.routing.RecoverySource;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.CheckedFunction;
Expand All @@ -52,6 +53,7 @@
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.logging.DeprecationLogger;
import org.elasticsearch.common.settings.IndexScopedSettings;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
Expand Down Expand Up @@ -1347,4 +1349,42 @@ public Function<String, Predicate<String>> getFieldFilter() {
public boolean isMetaDataField(String field) {
return mapperRegistry.isMetaDataField(field);
}

/**
* Checks to see if an operation can be performed without taking the cluster
* over the cluster-wide shard limit. Adds a deprecation warning or returns
* an error message as appropriate
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You don't have to wrap these so narrowly, we can use the full 140-column line length here.

*
* @param newShards The number of shards to be added by this operation
* @param state The current cluster state
* @param deprecationLogger The logger to use for deprecation warnings
* @return If present, an error message to be given as the reason for failing
* an operation. If empty, a sign that the operation is valid.
*/
public static Optional<String> checkShardLimit(int newShards, ClusterState state, DeprecationLogger deprecationLogger) {
Settings theseSettings = state.metaData().settings();
int nodeCount = state.getNodes().getDataNodes().size();

// Only enforce the shard limit if we have at least one data node, so that we don't block
// index creation during cluster setup
if (nodeCount == 0 || newShards < 0) {
return Optional.empty();
}
int maxShardsPerNode = MetaData.SETTING_CLUSTER_MAX_SHARDS_PER_NODE.get(theseSettings);
int maxShardsInCluster = maxShardsPerNode * nodeCount;
int currentOpenShards = state.getMetaData().getTotalOpenIndexShards();

if ((currentOpenShards + newShards) > maxShardsInCluster) {
String errorMessage = "this action would add [" + newShards + "] total shards, but this cluster currently has [" +
currentOpenShards + "]/[" + maxShardsInCluster + "] maximum shards open";
if (MetaData.SETTING_ENFORCE_CLUSTER_MAX_SHARDS_PER_NODE.get(theseSettings)) {
return Optional.of(errorMessage);
} else {
deprecationLogger.deprecated("In a future major version, this request will fail because {}. Before upgrading, " +
"reduce the number of shards in your cluster or adjust the cluster setting [{}].",
errorMessage, MetaData.SETTING_CLUSTER_MAX_SHARDS_PER_NODE.getKey());
}
}
return Optional.empty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ public ClusterState execute(ClusterState currentState) {
// Index doesn't exist - create it and start recovery
// Make sure that the index we are about to create has a validate name
MetaDataCreateIndexService.validateIndexName(renamedIndexName, currentState);
createIndexService.validateIndexSettings(renamedIndexName, snapshotIndexMetaData.getSettings(), false);
createIndexService.validateIndexSettings(renamedIndexName, snapshotIndexMetaData.getSettings(), currentState, false);
IndexMetaData.Builder indexMdBuilder = IndexMetaData.builder(snapshotIndexMetaData).state(IndexMetaData.State.OPEN).index(renamedIndexName);
indexMdBuilder.settings(Settings.builder().put(snapshotIndexMetaData.getSettings()).put(IndexMetaData.SETTING_INDEX_UUID, UUIDs.randomBase64UUID()));
if (!request.includeAliases() && !snapshotIndexMetaData.getAliases().isEmpty()) {
Expand Down
Loading