Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add global and index level blocks to IndexSettings #35695

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -447,8 +447,13 @@ public ClusterState execute(ClusterState currentState) throws Exception {
"]: cannot be greater than number of shard copies [" +
(tmpImd.getNumberOfReplicas() + 1) + "]");
}

final ClusterBlocks clusterBlocks = currentState.blocks();
final Set<ClusterBlock> globalBlocks = clusterBlocks.global();
final Set<ClusterBlock> indexBlocks = clusterBlocks.indices().get(tmpImd.getIndex().getName());

// create the index here (on the master) to validate it can be created, as well as adding the mapping
final IndexService indexService = indicesService.createIndex(tmpImd, Collections.emptyList());
final IndexService indexService = indicesService.createIndex(tmpImd, globalBlocks, indexBlocks, Collections.emptyList());
createdIndex = indexService.index();
// now add the mappings
MapperService mapperService = indexService.mapperService();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ack.ClusterStateUpdateResponse;
import org.elasticsearch.cluster.block.ClusterBlock;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.metadata.AliasAction.NewAliasValidator;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Priority;
Expand Down Expand Up @@ -124,6 +126,11 @@ ClusterState innerExecute(ClusterState currentState, Iterable<AliasAction> actio
if (index == null) {
throw new IndexNotFoundException(action.getIndex());
}

final ClusterBlocks clusterBlocks = currentState.blocks();
final Set<ClusterBlock> globalBlocks = clusterBlocks.global();
final Set<ClusterBlock> indexBlocks = clusterBlocks.indices().get(index.getIndex().getName());

NewAliasValidator newAliasValidator = (alias, indexRouting, filter, writeIndex) -> {
/* It is important that we look up the index using the metadata builder we are modifying so we can remove an
* index and replace it with an alias. */
Expand All @@ -136,7 +143,7 @@ ClusterState innerExecute(ClusterState currentState, Iterable<AliasAction> actio
if (indexService == null) {
// temporarily create the index and add mappings so we can parse the filter
try {
indexService = indicesService.createIndex(index, emptyList());
indexService = indicesService.createIndex(index, globalBlocks, indexBlocks, emptyList());
indicesToClose.add(index.getIndex());
} catch (IOException e) {
throw new ElasticsearchException("Failed to create temporary index for parsing the alias", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ private static void validateAndAddTemplate(final PutRequest request, IndexTempla
.build();

final IndexMetaData tmpIndexMetadata = IndexMetaData.builder(temporaryIndexName).settings(dummySettings).build();
IndexService dummyIndexService = indicesService.createIndex(tmpIndexMetadata, Collections.emptyList());
IndexService dummyIndexService = indicesService.createIndex(tmpIndexMetadata, null, null, Collections.emptyList());
createdIndex = dummyIndexService.index();

templateBuilder.order(request.order);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.cluster.block.ClusterBlock;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingClusterStateUpdateRequest;
Expand Down Expand Up @@ -52,6 +54,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;

import static org.elasticsearch.indices.cluster.IndicesClusterStateService.AllocatedIndices.IndexRemovalReason.NO_LONGER_ASSIGNED;

Expand Down Expand Up @@ -144,8 +147,12 @@ ClusterState executeRefresh(final ClusterState currentState, final List<RefreshT
boolean removeIndex = false;
IndexService indexService = indicesService.indexService(indexMetaData.getIndex());
if (indexService == null) {
final ClusterBlocks clusterBlocks = currentState.blocks();
final Set<ClusterBlock> globalBlocks = clusterBlocks.global();
final Set<ClusterBlock> indexBlocks = clusterBlocks.indices().get(indexMetaData.getIndex().getName());

// we need to create the index here, and add the current mapping to it, so we can merge
indexService = indicesService.createIndex(indexMetaData, Collections.emptyList());
indexService = indicesService.createIndex(indexMetaData, globalBlocks, indexBlocks, Collections.emptyList());
removeIndex = true;
indexService.mapperService().merge(indexMetaData, MergeReason.MAPPING_RECOVERY);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.block.ClusterBlock;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.Nullable;
Expand Down Expand Up @@ -679,6 +680,11 @@ public boolean isForceExecution() {
}
}

@Override
public void updateBlocks(@Nullable final Set<ClusterBlock> globalBlocks, @Nullable final Set<ClusterBlock> indexBlocks) {
indexSettings.updateIndexBlocks(globalBlocks, indexBlocks);
}

private void rescheduleFsyncTask(Translog.Durability durability) {
try {
if (fsyncTask != null) {
Expand Down
60 changes: 59 additions & 1 deletion server/src/main/java/org/elasticsearch/index/IndexSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@
import org.apache.logging.log4j.Logger;
import org.apache.lucene.index.MergePolicy;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.block.ClusterBlock;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.IndexScopedSettings;
import org.elasticsearch.common.settings.Setting;
Expand All @@ -37,6 +40,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;
Expand Down Expand Up @@ -327,6 +331,11 @@ public final class IndexSettings {
private volatile String defaultPipeline;
private volatile boolean searchThrottled;

/**
* A {@link ClusterBlocks} containing global level and index level blocks
*/
private volatile ClusterBlocks indexBlocks;

/**
* The maximum number of refresh listeners allows on this shard.
*/
Expand Down Expand Up @@ -397,8 +406,27 @@ public IndexSettings(final IndexMetaData indexMetaData, final Settings nodeSetti
*
* @param indexMetaData the index metadata this settings object is associated with
* @param nodeSettings the nodes settings this index is allocated on.
* @param indexScopedSettings the index level settings
*/
public IndexSettings(final IndexMetaData indexMetaData, final Settings nodeSettings, IndexScopedSettings indexScopedSettings) {
this(indexMetaData, nodeSettings, indexScopedSettings, null, null);
}

/**
* Creates a new {@link IndexSettings} instance. The given node settings will be merged with the settings in the metadata
* while index level settings will overwrite node settings.
*
* @param indexMetaData the index metadata this settings object is associated with
* @param nodeSettings the nodes settings this index is allocated on.
* @param indexScopedSettings the index level settings
* @param globalBlocks the global level blocks
* @param indexBlocks the index level blocks
*/
public IndexSettings(final IndexMetaData indexMetaData,
final Settings nodeSettings,
final IndexScopedSettings indexScopedSettings,
@Nullable final Set<ClusterBlock> globalBlocks,
@Nullable final Set<ClusterBlock> indexBlocks) {
scopedSettings = indexScopedSettings.copy(nodeSettings, indexMetaData);
this.nodeSettings = nodeSettings;
this.settings = Settings.builder().put(nodeSettings).put(indexMetaData.getSettings()).build();
Expand All @@ -408,7 +436,7 @@ public IndexSettings(final IndexMetaData indexMetaData, final Settings nodeSetti
nodeName = Node.NODE_NAME_SETTING.get(settings);
this.indexMetaData = indexMetaData;
numberOfShards = settings.getAsInt(IndexMetaData.SETTING_NUMBER_OF_SHARDS, null);

this.indexBlocks = buildBlocks(globalBlocks, indexBlocks);
this.searchThrottled = INDEX_SEARCH_THROTTLED.get(settings);
this.queryStringLenient = QUERY_STRING_LENIENT_SETTING.get(settings);
this.queryStringAnalyzeWildcard = QUERY_STRING_ANALYZE_WILDCARD.get(nodeSettings);
Expand Down Expand Up @@ -626,6 +654,36 @@ public static boolean same(final Settings left, final Settings right) {
.equals(right.filter(IndexScopedSettings.INDEX_SETTINGS_KEY_PREDICATE));
}

/**
* Updates the global level and index level blocks.
*
* @param globalBlocks the global level blocks
* @param indexBlocks the index level blocks
*/
public synchronized void updateIndexBlocks(@Nullable final Set<ClusterBlock> globalBlocks,
@Nullable final Set<ClusterBlock> indexBlocks) {
this.indexBlocks = buildBlocks(globalBlocks, indexBlocks);
}

private ClusterBlocks buildBlocks(@Nullable final Set<ClusterBlock> globalBlocks,
@Nullable final Set<ClusterBlock> indexBlocks) {
final ClusterBlocks.Builder builder = ClusterBlocks.builder();
if (globalBlocks != null) {
globalBlocks.forEach(builder::addGlobalBlock);
}
if (indexBlocks != null) {
indexBlocks.forEach(block -> builder.addIndexBlock(index.getName(), block));
}
return builder.build();
}

/**
* @return the current global level and index level blocks
*/
public ClusterBlocks getIndexBlocks() {
return indexBlocks;
}

/**
* Returns the translog durability for this index.
*/
Expand Down
15 changes: 11 additions & 4 deletions server/src/main/java/org/elasticsearch/indices/IndicesService.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlock;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.MetaData;
Expand Down Expand Up @@ -448,8 +449,10 @@ public IndexService indexServiceSafe(Index index) {
* @throws ResourceAlreadyExistsException if the index already exists.
*/
@Override
public synchronized IndexService createIndex(
final IndexMetaData indexMetaData, final List<IndexEventListener> builtInListeners) throws IOException {
public synchronized IndexService createIndex(final IndexMetaData indexMetaData,
@Nullable final Set<ClusterBlock> globalBlocks,
@Nullable final Set<ClusterBlock> indexBlocks,
final List<IndexEventListener> builtInListeners) throws IOException {
ensureChangesAllowed();
if (indexMetaData.getIndexUUID().equals(IndexMetaData.INDEX_UUID_NA_VALUE)) {
throw new IllegalArgumentException("index must have a real UUID found value: [" + indexMetaData.getIndexUUID() + "]");
Expand All @@ -471,6 +474,8 @@ public void onStoreClosed(ShardId shardId) {
createIndexService(
"create index",
indexMetaData,
globalBlocks,
indexBlocks,
indicesQueryCache,
indicesFieldDataCache,
finalListeners,
Expand All @@ -493,11 +498,13 @@ public void onStoreClosed(ShardId shardId) {
*/
private synchronized IndexService createIndexService(final String reason,
IndexMetaData indexMetaData,
@Nullable final Set<ClusterBlock> globalBlocks,
@Nullable final Set<ClusterBlock> indexBlocks,
IndicesQueryCache indicesQueryCache,
IndicesFieldDataCache indicesFieldDataCache,
List<IndexEventListener> builtInListeners,
IndexingOperationListener... indexingOperationListeners) throws IOException {
final IndexSettings idxSettings = new IndexSettings(indexMetaData, settings, indexScopedSettings);
final IndexSettings idxSettings = new IndexSettings(indexMetaData, settings, indexScopedSettings, globalBlocks, indexBlocks);
// we ignore private settings since they are not registered settings
indexScopedSettings.validate(indexMetaData.getSettings(), true, true, true);
logger.debug("creating Index [{}], shards [{}]/[{}] - reason [{}]",
Expand Down Expand Up @@ -587,7 +594,7 @@ public synchronized void verifyIndexMetadata(IndexMetaData metaData, IndexMetaDa
closeables.add(indicesQueryCache);
// this will also fail if some plugin fails etc. which is nice since we can verify that early
final IndexService service =
createIndexService("metadata verification", metaData, indicesQueryCache, indicesFieldDataCache, emptyList());
createIndexService("metadata verification", metaData, null, null, indicesQueryCache, indicesFieldDataCache, emptyList());
closeables.add(() -> service.close("metadata verification", false));
service.mapperService().merge(metaData, MapperService.MergeReason.MAPPING_RECOVERY);
if (metaData.equals(metaDataUpdate) == false) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
import org.elasticsearch.cluster.ClusterStateApplier;
import org.elasticsearch.cluster.action.index.NodeMappingRefreshAction;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.block.ClusterBlock;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
Expand Down Expand Up @@ -457,7 +459,11 @@ private void createIndices(final ClusterState state) {

AllocatedIndex<? extends Shard> indexService = null;
try {
indexService = indicesService.createIndex(indexMetaData, buildInIndexListener);
final ClusterBlocks clusterBlocks = state.blocks();
final Set<ClusterBlock> globalBlocks = clusterBlocks.global();
final Set<ClusterBlock> indexBlocks = clusterBlocks.indices().get(index.getName());

indexService = indicesService.createIndex(indexMetaData, globalBlocks, indexBlocks, buildInIndexListener);
if (indexService.updateMapping(null, indexMetaData) && sendRefreshMapping) {
nodeMappingRefreshAction.nodeMappingRefresh(state.nodes().getMasterNode(),
new NodeMappingRefreshAction.NodeMappingRefreshRequest(indexMetaData.getIndex().getName(),
Expand All @@ -479,8 +485,8 @@ private void createIndices(final ClusterState state) {
}
}

private void updateIndices(ClusterChangedEvent event) {
if (!event.metaDataChanged()) {
private void updateIndices(final ClusterChangedEvent event) {
if (event.metaDataChanged() == false && event.blocksChanged() == false) {
return;
}
final ClusterState state = event.state();
Expand Down Expand Up @@ -512,6 +518,11 @@ private void updateIndices(ClusterChangedEvent event) {
}
}
}

if (event.blocksChanged()) {
final ClusterBlocks clusterBlocks = state.blocks();
indexService.updateBlocks(clusterBlocks.global(), clusterBlocks.indices().get(index.getName()));
}
}
}

Expand Down Expand Up @@ -780,6 +791,14 @@ public interface AllocatedIndex<T extends Shard> extends Iterable<T>, IndexCompo
*/
void updateMetaData(IndexMetaData currentIndexMetaData, IndexMetaData newIndexMetaData);

/**
* Updates the global level and index level blocks of this index. Changes become visible through {@link #getIndexSettings()}.
*
* @param globalBlocks the global level blocks
* @param indexBlocks the index level blocks
*/
void updateBlocks(@Nullable Set<ClusterBlock> globalBlocks, @Nullable Set<ClusterBlock> indexBlocks);

/**
* Checks if index requires refresh from master.
*/
Expand All @@ -801,12 +820,16 @@ public interface AllocatedIndices<T extends Shard, U extends AllocatedIndex<T>>
/**
* Creates a new {@link IndexService} for the given metadata.
*
* @param indexMetaData the index metadata to create the index for
* @param builtInIndexListener a list of built-in lifecycle {@link IndexEventListener} that should should be used along side with
* the per-index listeners
* @param indexMetaData the index metadata to create the index for
* @param globalBlocks the global level blocks
* @param indexBlocks the index level blocks
* @param builtInIndexListener a list of built-in lifecycle {@link IndexEventListener} that should should be used along side with
* the per-index listeners
* @throws ResourceAlreadyExistsException if the index already exists.
*/
U createIndex(IndexMetaData indexMetaData,
@Nullable Set<ClusterBlock> globalBlocks,
@Nullable Set<ClusterBlock> indexBlocks,
List<IndexEventListener> builtInIndexListener) throws IOException;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -457,6 +457,6 @@ private void setupIndicesService() throws Exception {
when(service.getIndexSortSupplier()).thenReturn(supplier);
when(service.getIndexEventListener()).thenReturn(mock(IndexEventListener.class));

when(indicesService.createIndex(anyObject(), anyObject())).thenReturn(service);
when(indicesService.createIndex(anyObject(), anyObject(), anyObject(), anyObject())).thenReturn(service);
}
}
Loading