Skip to content

Commit

Permalink
Addressing comments
Browse files Browse the repository at this point in the history
Signed-off-by: Shourya Dutta Biswas <[email protected]>
  • Loading branch information
shourya035 committed Jul 18, 2024
1 parent f637670 commit d77cc2f
Show file tree
Hide file tree
Showing 4 changed files with 275 additions and 291 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,10 @@
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.block.ClusterBlockException;
import org.opensearch.cluster.block.ClusterBlockLevel;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.cluster.routing.RoutingTable;
import org.opensearch.cluster.routing.allocation.AllocationService;
import org.opensearch.cluster.service.ClusterManagerTaskKeys;
import org.opensearch.cluster.service.ClusterManagerTaskThrottler;
Expand All @@ -60,20 +58,13 @@
import org.opensearch.common.settings.SettingsException;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.index.remote.RemoteMigrationIndexMetadataUpdater;
import org.opensearch.node.remotestore.RemoteStoreNodeService;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportService;

import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

import static org.opensearch.index.remote.RemoteMigrationIndexMetadataUpdater.indexHasRemoteStoreSettings;
import static org.opensearch.index.remote.RemoteStoreUtils.checkAndFinalizeRemoteStoreMigration;

/**
* Transport action for updating cluster settings
Expand Down Expand Up @@ -159,8 +150,6 @@ protected void clusterManagerOperation(

private volatile boolean changed = false;

private volatile boolean isSwitchingToStrictMode = false;

@Override
public ClusterManagerTaskThrottler.ThrottlingKey getClusterManagerThrottlingKey() {
return clusterUpdateSettingTaskKey;
Expand Down Expand Up @@ -269,27 +258,13 @@ public void onFailure(String source, Exception e) {
@Override
public ClusterState execute(final ClusterState currentState) {
boolean isCompatibilityModeChanging = validateCompatibilityModeSettingRequest(request, state);
final ClusterState clusterState = updater.updateSettings(
ClusterState clusterState = updater.updateSettings(
currentState,
clusterSettings.upgradeSettings(request.transientSettings()),
clusterSettings.upgradeSettings(request.persistentSettings()),
logger
);
/*
Remote Store migration: Checks if the applied cluster settings
has switched the cluster to STRICT mode. If so, checks and applies
appropriate index settings depending on the current set of node types
in the cluster
This has been intentionally done after the cluster settings update
flow. That way we are not interfering with the usual settings update
and the cluster state mutation that comes along with it
*/
if (isCompatibilityModeChanging && isSwitchToStrictCompatibilityMode(request)) {
ClusterState newStateAfterIndexMdChanges = finalizeMigration(clusterState);
changed = newStateAfterIndexMdChanges != currentState;
return newStateAfterIndexMdChanges;
}
clusterState = checkAndFinalizeRemoteStoreMigration(isCompatibilityModeChanging, request, clusterState, logger);
changed = clusterState != currentState;
return clusterState;
}
Expand All @@ -307,9 +282,10 @@ public ClusterState execute(final ClusterState currentState) {
public boolean validateCompatibilityModeSettingRequest(ClusterUpdateSettingsRequest request, ClusterState clusterState) {
Settings settings = Settings.builder().put(request.persistentSettings()).put(request.transientSettings()).build();
if (RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING.exists(settings)) {
String value = RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING.get(settings).mode;
validateAllNodesOfSameVersion(clusterState.nodes());
if (RemoteStoreNodeService.CompatibilityMode.STRICT.mode.equals(value)) {
if (RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING.get(
settings
) == RemoteStoreNodeService.CompatibilityMode.STRICT) {
validateAllNodesOfSameType(clusterState.nodes());
}
return true;
Expand All @@ -334,99 +310,18 @@ private void validateAllNodesOfSameVersion(DiscoveryNodes discoveryNodes) {
* @param discoveryNodes current discovery nodes in the cluster
*/
private void validateAllNodesOfSameType(DiscoveryNodes discoveryNodes) {
Set<Boolean> nodeTypes = discoveryNodes.getNodes()
boolean allNodesDocrepEnabled = discoveryNodes.getNodes()
.values()
.stream()
.allMatch(discoveryNode -> discoveryNode.isRemoteStoreNode() == false);
boolean allNodesRemoteStoreEnabled = discoveryNodes.getNodes()
.values()
.stream()
.map(DiscoveryNode::isRemoteStoreNode)
.collect(Collectors.toSet());
if (nodeTypes.size() != 1) {
.allMatch(discoveryNode -> discoveryNode.isRemoteStoreNode());
if (allNodesDocrepEnabled == false && allNodesRemoteStoreEnabled == false) {
throw new SettingsException(
"can not switch to STRICT compatibility mode when the cluster contains both remote and non-remote nodes"
);
}
}

/**
* Finalizes the docrep to remote-store migration process by applying remote store based index settings
* on indices that are missing them. No-Op if all indices already have the settings applied through
* IndexMetadataUpdater
*
* @param incomingState mutated cluster state after cluster settings were applied
* @return new cluster state with index settings updated
*/
public ClusterState finalizeMigration(ClusterState incomingState) {
Map<String, DiscoveryNode> discoveryNodeMap = incomingState.nodes().getNodes();
if (discoveryNodeMap.isEmpty() == false) {
// At this point, we have already validated that all nodes in the cluster are of uniform type.
// Either all of them are remote store enabled, or all of them are docrep enabled
boolean remoteStoreEnabledNodePresent = discoveryNodeMap.values().stream().findFirst().get().isRemoteStoreNode();
if (remoteStoreEnabledNodePresent == true) {
List<IndexMetadata> indicesWithoutRemoteStoreSettings = getIndicesWithoutRemoteStoreSettings(incomingState);
if (indicesWithoutRemoteStoreSettings.isEmpty() == true) {
logger.info("All indices in the cluster has remote store based index settings");
} else {
Metadata mutatedMetadata = applyRemoteStoreSettings(incomingState, indicesWithoutRemoteStoreSettings);
return ClusterState.builder(incomingState).metadata(mutatedMetadata).build();
}
} else {
logger.debug("All nodes in the cluster are not remote nodes. Skipping.");
}
}
return incomingState;
}

/**
* Filters out indices which does not have remote store based
* index settings applied even after all shard copies have
* migrated to remote store enabled nodes
*/
private List<IndexMetadata> getIndicesWithoutRemoteStoreSettings(ClusterState clusterState) {
Collection<IndexMetadata> allIndicesMetadata = clusterState.metadata().indices().values();
if (allIndicesMetadata.isEmpty() == false) {
List<IndexMetadata> indicesWithoutRemoteSettings = allIndicesMetadata.stream()
.filter(idxMd -> indexHasRemoteStoreSettings(idxMd.getSettings()) == false)
.collect(Collectors.toList());
logger.debug(
"Attempting to switch to strict mode. Count of indices without remote store settings {}",
indicesWithoutRemoteSettings.size()
);
return indicesWithoutRemoteSettings;
}
return Collections.emptyList();
}

/**
* Applies remote store index settings through {@link RemoteMigrationIndexMetadataUpdater}
*/
private Metadata applyRemoteStoreSettings(ClusterState clusterState, List<IndexMetadata> indicesWithRemoteStoreSettings) {
Metadata.Builder metadataBuilder = Metadata.builder(clusterState.getMetadata());
RoutingTable currentRoutingTable = clusterState.getRoutingTable();
DiscoveryNodes currentDiscoveryNodes = clusterState.getNodes();
Settings currentClusterSettings = clusterState.metadata().settings();
for (IndexMetadata indexMetadata : indicesWithRemoteStoreSettings) {
IndexMetadata.Builder indexMetadataBuilder = IndexMetadata.builder(indexMetadata);
RemoteMigrationIndexMetadataUpdater indexMetadataUpdater = new RemoteMigrationIndexMetadataUpdater(
currentDiscoveryNodes,
currentRoutingTable,
indexMetadata,
currentClusterSettings,
logger
);
indexMetadataUpdater.maybeAddRemoteIndexSettings(indexMetadataBuilder, indexMetadata.getIndex().getName());
metadataBuilder.put(indexMetadataBuilder);
}
return metadataBuilder.build();
}

/**
* Checks if the incoming cluster settings payload is attempting to switch
* the cluster to `STRICT` compatibility mode
* Visible only for tests
*/
public boolean isSwitchToStrictCompatibilityMode(ClusterUpdateSettingsRequest request) {
Settings incomingSettings = Settings.builder().put(request.persistentSettings()).put(request.transientSettings()).build();
return RemoteStoreNodeService.CompatibilityMode.STRICT.mode.equals(
RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING.get(incomingSettings).mode
);
}
}
123 changes: 123 additions & 0 deletions server/src/main/java/org/opensearch/index/remote/RemoteStoreUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,25 +11,33 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.Version;
import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.cluster.routing.RoutingTable;
import org.opensearch.common.collect.Tuple;
import org.opensearch.common.settings.Settings;
import org.opensearch.node.remotestore.RemoteStoreNodeAttribute;
import org.opensearch.node.remotestore.RemoteStoreNodeService;

import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Base64;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Function;
import java.util.stream.Collectors;

import static org.opensearch.index.remote.RemoteMigrationIndexMetadataUpdater.indexHasRemoteStoreSettings;
import static org.opensearch.indices.RemoteStoreSettings.CLUSTER_REMOTE_STORE_PATH_HASH_ALGORITHM_SETTING;
import static org.opensearch.indices.RemoteStoreSettings.CLUSTER_REMOTE_STORE_PATH_TYPE_SETTING;
import static org.opensearch.indices.RemoteStoreSettings.CLUSTER_REMOTE_STORE_TRANSLOG_METADATA;
Expand Down Expand Up @@ -250,4 +258,119 @@ public static Map<String, String> getRemoteStoreRepoName(DiscoveryNodes discover
.findFirst();
return remoteNode.map(RemoteStoreNodeAttribute::getDataRepoNames).orElseGet(HashMap::new);
}

/**
* Invoked after a cluster settings update.
* Checks if the applied cluster settings has switched the cluster to STRICT mode.
* If so, checks and applies appropriate index settings depending on the current set
* of node types in the cluster
* This has been intentionally done after the cluster settings update
* flow. That way we are not interfering with the usual settings update
* and the cluster state mutation that comes along with it
*
* @param isCompatibilityModeChanging flag passed from cluster settings update call to denote if a compatibility mode change has been done
* @param request request payload passed from cluster settings update
* @param currentState cluster state generated after changing cluster settings were applied
* @param logger Logger reference
* @return Mutated cluster state with remote store index settings applied, no-op if the cluster is not switching to `STRICT` compatibility mode
*/
public static ClusterState checkAndFinalizeRemoteStoreMigration(
boolean isCompatibilityModeChanging,
ClusterUpdateSettingsRequest request,
ClusterState currentState,
Logger logger
) {
if (isCompatibilityModeChanging && isSwitchToStrictCompatibilityMode(request)) {
return finalizeMigration(currentState, logger);
}
return currentState;
}

/**
* Finalizes the docrep to remote-store migration process by applying remote store based index settings
* on indices that are missing them. No-Op if all indices already have the settings applied through
* IndexMetadataUpdater
*
* @param incomingState mutated cluster state after cluster settings were applied
* @return new cluster state with index settings updated
*/
public static ClusterState finalizeMigration(ClusterState incomingState, Logger logger) {
Map<String, DiscoveryNode> discoveryNodeMap = incomingState.nodes().getNodes();
if (discoveryNodeMap.isEmpty() == false) {
// At this point, we have already validated that all nodes in the cluster are of uniform type.
// Either all of them are remote store enabled, or all of them are docrep enabled
boolean remoteStoreEnabledNodePresent = discoveryNodeMap.values().stream().findFirst().get().isRemoteStoreNode();
if (remoteStoreEnabledNodePresent == true) {
List<IndexMetadata> indicesWithoutRemoteStoreSettings = getIndicesWithoutRemoteStoreSettings(incomingState, logger);
if (indicesWithoutRemoteStoreSettings.isEmpty() == true) {
logger.info("All indices in the cluster has remote store based index settings");
} else {
Metadata mutatedMetadata = applyRemoteStoreSettings(incomingState, indicesWithoutRemoteStoreSettings, logger);
return ClusterState.builder(incomingState).metadata(mutatedMetadata).build();
}
} else {
logger.debug("All nodes in the cluster are not remote nodes. Skipping.");
}
}
return incomingState;
}

/**
* Filters out indices which does not have remote store based
* index settings applied even after all shard copies have
* migrated to remote store enabled nodes
*/
private static List<IndexMetadata> getIndicesWithoutRemoteStoreSettings(ClusterState clusterState, Logger logger) {
Collection<IndexMetadata> allIndicesMetadata = clusterState.metadata().indices().values();
if (allIndicesMetadata.isEmpty() == false) {
List<IndexMetadata> indicesWithoutRemoteSettings = allIndicesMetadata.stream()
.filter(idxMd -> indexHasRemoteStoreSettings(idxMd.getSettings()) == false)
.collect(Collectors.toList());
logger.debug(
"Attempting to switch to strict mode. Count of indices without remote store settings {}",
indicesWithoutRemoteSettings.size()
);
return indicesWithoutRemoteSettings;
}
return Collections.emptyList();
}

/**
* Applies remote store index settings through {@link RemoteMigrationIndexMetadataUpdater}
*/
private static Metadata applyRemoteStoreSettings(
ClusterState clusterState,
List<IndexMetadata> indicesWithoutRemoteStoreSettings,
Logger logger
) {
Metadata.Builder metadataBuilder = Metadata.builder(clusterState.getMetadata());
RoutingTable currentRoutingTable = clusterState.getRoutingTable();
DiscoveryNodes currentDiscoveryNodes = clusterState.getNodes();
Settings currentClusterSettings = clusterState.metadata().settings();
for (IndexMetadata indexMetadata : indicesWithoutRemoteStoreSettings) {
IndexMetadata.Builder indexMetadataBuilder = IndexMetadata.builder(indexMetadata);
RemoteMigrationIndexMetadataUpdater indexMetadataUpdater = new RemoteMigrationIndexMetadataUpdater(
currentDiscoveryNodes,
currentRoutingTable,
indexMetadata,
currentClusterSettings,
logger
);
indexMetadataUpdater.maybeAddRemoteIndexSettings(indexMetadataBuilder, indexMetadata.getIndex().getName());
metadataBuilder.put(indexMetadataBuilder);
}
return metadataBuilder.build();
}

/**
* Checks if the incoming cluster settings payload is attempting to switch
* the cluster to `STRICT` compatibility mode
* Visible only for tests
*/
public static boolean isSwitchToStrictCompatibilityMode(ClusterUpdateSettingsRequest request) {
Settings incomingSettings = Settings.builder().put(request.persistentSettings()).put(request.transientSettings()).build();
return RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING.get(
incomingSettings
) == RemoteStoreNodeService.CompatibilityMode.STRICT;
}
}
Loading

0 comments on commit d77cc2f

Please sign in to comment.