Skip to content

Commit

Permalink
Add search node repurpose commands (#6517)
Browse files Browse the repository at this point in the history
Signed-off-by: Kunal Kotwani <[email protected]>
  • Loading branch information
kotwanikunal authored Mar 11, 2023
1 parent 6707d6b commit 71a76da
Show file tree
Hide file tree
Showing 4 changed files with 284 additions and 58 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- The truncation limit of the OpenSearchJsonLayout logger is now configurable ([#6569](https://github.com/opensearch-project/OpenSearch/pull/6569))
- Add 'base_path' setting to File System Repository ([#6558](https://github.com/opensearch-project/OpenSearch/pull/6558))
- Return success on DeletePits when no PITs exist. ([#6544](https://github.com/opensearch-project/OpenSearch/pull/6544))
- Add node repurpose command for search nodes ([#6517](https://github.com/opensearch-project/OpenSearch/pull/6517))

### Dependencies
- Bump `org.apache.logging.log4j:log4j-core` from 2.18.0 to 2.20.0 ([#6490](https://github.com/opensearch-project/OpenSearch/pull/6490))
Expand Down
4 changes: 3 additions & 1 deletion server/src/main/java/org/opensearch/env/NodeEnvironment.java
Original file line number Diff line number Diff line change
Expand Up @@ -1307,7 +1307,9 @@ static List<Path> collectFileCacheDataPath(NodePath fileCacheNodePath) throws IO
for (Path indexPath : indexStream) {
if (Files.isDirectory(indexPath)) {
try (Stream<Path> shardStream = Files.list(indexPath)) {
shardStream.map(Path::toAbsolutePath).forEach(indexSubPaths::add);
shardStream.filter(NodeEnvironment::isShardPath)
.map(Path::toAbsolutePath)
.forEach(indexSubPaths::add);
}
}
}
Expand Down
195 changes: 153 additions & 42 deletions server/src/main/java/org/opensearch/env/NodeRepurposeCommand.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@

import java.io.IOException;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
Expand All @@ -57,6 +58,7 @@
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

import static org.opensearch.env.NodeEnvironment.CACHE_FOLDER;
import static org.opensearch.env.NodeEnvironment.INDICES_FOLDER;

/**
Expand All @@ -68,12 +70,14 @@ public class NodeRepurposeCommand extends OpenSearchNodeCommand {

static final String ABORTED_BY_USER_MSG = OpenSearchNodeCommand.ABORTED_BY_USER_MSG;
static final String FAILED_TO_OBTAIN_NODE_LOCK_MSG = OpenSearchNodeCommand.FAILED_TO_OBTAIN_NODE_LOCK_MSG;
static final String NO_CLEANUP = "Node has node.data=true -> no clean up necessary";
static final String NO_CLEANUP = "Node has node.data=true and node.search=true -> no clean up necessary";
static final String NO_DATA_TO_CLEAN_UP_FOUND = "No data to clean-up found";
static final String NO_SHARD_DATA_TO_CLEAN_UP_FOUND = "No shard data to clean-up found";
static final String NO_FILE_CACHE_DATA_TO_CLEAN_UP_FOUND = "No file cache to clean-up found";
private static final int FILE_CACHE_NODE_PATH_LOCATION = 0;

public NodeRepurposeCommand() {
super("Repurpose this node to another cluster-manager/data role, cleaning up any excess persisted data");
super("Repurpose this node to another cluster-manager/data/search role, cleaning up any excess persisted data");
}

void testExecute(Terminal terminal, OptionSet options, Environment env) throws Exception {
Expand All @@ -83,7 +87,7 @@ void testExecute(Terminal terminal, OptionSet options, Environment env) throws E
@Override
protected boolean validateBeforeLock(Terminal terminal, Environment env) {
Settings settings = env.settings();
if (DiscoveryNode.isDataNode(settings)) {
if (DiscoveryNode.isDataNode(settings) && DiscoveryNode.isSearchNode(settings)) {
terminal.println(Terminal.Verbosity.NORMAL, NO_CLEANUP);
return false;
}
Expand All @@ -94,85 +98,186 @@ protected boolean validateBeforeLock(Terminal terminal, Environment env) {
@Override
protected void processNodePaths(Terminal terminal, Path[] dataPaths, int nodeLockId, OptionSet options, Environment env)
throws IOException {
assert DiscoveryNode.isDataNode(env.settings()) == false;
assert DiscoveryNode.isDataNode(env.settings()) == false || DiscoveryNode.isSearchNode(env.settings()) == false;

boolean repurposeData = DiscoveryNode.isDataNode(env.settings()) == false;
boolean repurposeSearch = DiscoveryNode.isSearchNode(env.settings()) == false;

if (DiscoveryNode.isClusterManagerNode(env.settings()) == false) {
processNoClusterManagerNoDataNode(terminal, dataPaths, env);
processNoClusterManagerRepurposeNode(terminal, dataPaths, env, repurposeData, repurposeSearch);
} else {
processClusterManagerNoDataNode(terminal, dataPaths, env);
processClusterManagerRepurposeNode(terminal, dataPaths, env, repurposeData, repurposeSearch);
}
}

private void processNoClusterManagerNoDataNode(Terminal terminal, Path[] dataPaths, Environment env) throws IOException {
private void processNoClusterManagerRepurposeNode(
Terminal terminal,
Path[] dataPaths,
Environment env,
boolean repurposeData,
boolean repurposeSearch
) throws IOException {
NodeEnvironment.NodePath[] nodePaths = toNodePaths(dataPaths);
NodeEnvironment.NodePath fileCacheNodePath = toNodePaths(dataPaths)[FILE_CACHE_NODE_PATH_LOCATION];
final PersistedClusterStateService persistedClusterStateService = createPersistedClusterStateService(env.settings(), dataPaths);
final Metadata metadata = loadClusterState(terminal, env, persistedClusterStateService).metadata();

terminal.println(Terminal.Verbosity.VERBOSE, "Collecting shard data paths");
List<Path> shardDataPaths = NodeEnvironment.collectShardDataPaths(nodePaths);
Set<Path> indexPaths = Set.of();
List<Path> shardDataPaths = List.of();
Set<Path> fileCachePaths = Set.of();
List<Path> fileCacheDataPaths = List.of();

terminal.println(Terminal.Verbosity.VERBOSE, "Collecting index metadata paths");
List<Path> indexMetadataPaths = NodeEnvironment.collectIndexMetadataPaths(nodePaths);

Set<Path> indexPaths = uniqueParentPaths(shardDataPaths, indexMetadataPaths);
if (repurposeData) {
terminal.println(Terminal.Verbosity.VERBOSE, "Collecting shard data paths");
shardDataPaths = NodeEnvironment.collectShardDataPaths(nodePaths);
indexPaths = uniqueParentPaths(shardDataPaths, indexMetadataPaths);
}

final PersistedClusterStateService persistedClusterStateService = createPersistedClusterStateService(env.settings(), dataPaths);
if (repurposeSearch) {
terminal.println(Terminal.Verbosity.VERBOSE, "Collecting file cache data paths");
fileCacheDataPaths = NodeEnvironment.collectFileCacheDataPath(fileCacheNodePath);
fileCachePaths = uniqueParentPaths(fileCacheDataPaths, indexMetadataPaths);
}

final Metadata metadata = loadClusterState(terminal, env, persistedClusterStateService).metadata();
if (indexPaths.isEmpty() && metadata.indices().isEmpty()) {
if (repurposeData && repurposeSearch && fileCacheDataPaths.isEmpty() && indexPaths.isEmpty() && metadata.indices().isEmpty()) {
terminal.println(Terminal.Verbosity.NORMAL, NO_DATA_TO_CLEAN_UP_FOUND);
return;
} else if (repurposeData && !repurposeSearch && indexPaths.isEmpty() && metadata.indices().isEmpty()) {
terminal.println(Terminal.Verbosity.NORMAL, NO_DATA_TO_CLEAN_UP_FOUND);
return;
} else if (!repurposeData && repurposeSearch && fileCacheDataPaths.isEmpty() && metadata.indices().isEmpty()) {
terminal.println(NO_FILE_CACHE_DATA_TO_CLEAN_UP_FOUND);
return;
}

final Set<String> indexUUIDs = Sets.union(
indexUUIDsFor(indexPaths),
StreamSupport.stream(metadata.indices().values().spliterator(), false)
.map(imd -> imd.value.getIndexUUID())
.collect(Collectors.toSet())
indexUUIDsFor(fileCachePaths),
Sets.union(
indexUUIDsFor(indexPaths),
StreamSupport.stream(metadata.indices().values().spliterator(), false)
.map(imd -> imd.value.getIndexUUID())
.collect(Collectors.toSet())
)
);

outputVerboseInformation(terminal, indexPaths, indexUUIDs, metadata);

terminal.println(noClusterManagerMessage(indexUUIDs.size(), shardDataPaths.size(), indexMetadataPaths.size()));
List<Path> cleanUpPaths = new ArrayList<>(shardDataPaths);
cleanUpPaths.addAll(fileCacheDataPaths);
outputVerboseInformation(terminal, cleanUpPaths, indexUUIDs, metadata);
terminal.println(noClusterManagerMessage(indexUUIDs.size(), cleanUpPaths.size(), indexMetadataPaths.size()));
outputHowToSeeVerboseInformation(terminal);

terminal.println("Node is being re-purposed as no-cluster-manager and no-data. Clean-up of index data will be performed.");
if (repurposeData && repurposeSearch) {
terminal.println(
"Node is being re-purposed as no-cluster-manager, no-data and no-search. Clean-up of index data and file cache will be performed."
);
} else if (repurposeData) {
terminal.println("Node is being re-purposed as no-cluster-manager and no-data. Clean-up of index data will be performed.");
} else if (repurposeSearch) {
terminal.println(
"Node is being re-purposed as no-cluster-manager and no-search. Clean-up of file cache and corresponding index metadata will be performed."
);
}
confirm(terminal, "Do you want to proceed?");

removePaths(terminal, indexPaths); // clean-up shard dirs
// clean-up all metadata dirs
MetadataStateFormat.deleteMetaState(dataPaths);
IOUtils.rm(Stream.of(dataPaths).map(path -> path.resolve(INDICES_FOLDER)).toArray(Path[]::new));
if (repurposeData) {
removePaths(terminal, indexPaths); // clean-up shard dirs
IOUtils.rm(Stream.of(dataPaths).map(path -> path.resolve(INDICES_FOLDER)).toArray(Path[]::new));
}

if (repurposeSearch) {
removePaths(terminal, fileCachePaths); // clean-up file cache dirs
IOUtils.rm(dataPaths[FILE_CACHE_NODE_PATH_LOCATION].resolve(CACHE_FOLDER));
}

terminal.println("Node successfully repurposed to no-cluster-manager and no-data.");
if (repurposeData && repurposeSearch) {
terminal.println("Node successfully repurposed to no-cluster-manager, no-data and no-search.");
} else if (repurposeData) {
terminal.println("Node successfully repurposed to no-cluster-manager and no-data.");
} else if (repurposeSearch) {
terminal.println("Node successfully repurposed to no-cluster-manager and no-search.");
}
}

private void processClusterManagerNoDataNode(Terminal terminal, Path[] dataPaths, Environment env) throws IOException {
private void processClusterManagerRepurposeNode(
Terminal terminal,
Path[] dataPaths,
Environment env,
boolean repurposeData,
boolean repurposeSearch
) throws IOException {
NodeEnvironment.NodePath[] nodePaths = toNodePaths(dataPaths);
NodeEnvironment.NodePath fileCacheNodePath = toNodePaths(dataPaths)[FILE_CACHE_NODE_PATH_LOCATION];
final PersistedClusterStateService persistedClusterStateService = createPersistedClusterStateService(env.settings(), dataPaths);
final Metadata metadata = loadClusterState(terminal, env, persistedClusterStateService).metadata();

terminal.println(Terminal.Verbosity.VERBOSE, "Collecting shard data paths");
List<Path> shardDataPaths = NodeEnvironment.collectShardDataPaths(nodePaths);
if (shardDataPaths.isEmpty()) {
terminal.println(NO_SHARD_DATA_TO_CLEAN_UP_FOUND);
return;
}
Set<Path> indexPaths = Set.of();
List<Path> shardDataPaths = List.of();
Set<Path> fileCachePaths = Set.of();
List<Path> fileCacheDataPaths = List.of();

final PersistedClusterStateService persistedClusterStateService = createPersistedClusterStateService(env.settings(), dataPaths);
if (repurposeData) {
terminal.println(Terminal.Verbosity.VERBOSE, "Collecting shard data paths");
shardDataPaths = NodeEnvironment.collectShardDataPaths(nodePaths);
indexPaths = uniqueParentPaths(shardDataPaths);
}

final Metadata metadata = loadClusterState(terminal, env, persistedClusterStateService).metadata();
if (repurposeSearch) {
terminal.println(Terminal.Verbosity.VERBOSE, "Collecting file cache data paths");
fileCacheDataPaths = NodeEnvironment.collectFileCacheDataPath(fileCacheNodePath);
fileCachePaths = uniqueParentPaths(fileCacheDataPaths);
}

final Set<Path> indexPaths = uniqueParentPaths(shardDataPaths);
final Set<String> indexUUIDs = indexUUIDsFor(indexPaths);
if (repurposeData && repurposeSearch && shardDataPaths.isEmpty() && fileCacheDataPaths.isEmpty()) {
terminal.println(NO_SHARD_DATA_TO_CLEAN_UP_FOUND);
return;
} else if (repurposeData && !repurposeSearch && shardDataPaths.isEmpty()) {
terminal.println(NO_SHARD_DATA_TO_CLEAN_UP_FOUND);
return;
} else if (!repurposeData && repurposeSearch && fileCacheDataPaths.isEmpty()) {
terminal.println(NO_FILE_CACHE_DATA_TO_CLEAN_UP_FOUND);
return;
}

outputVerboseInformation(terminal, shardDataPaths, indexUUIDs, metadata);
final Set<String> indexUUIDs = Sets.union(indexUUIDsFor(indexPaths), indexUUIDsFor(fileCachePaths));

terminal.println(shardMessage(shardDataPaths.size(), indexUUIDs.size()));
List<Path> cleanUpPaths = new ArrayList<>(shardDataPaths);
cleanUpPaths.addAll(fileCacheDataPaths);
outputVerboseInformation(terminal, cleanUpPaths, indexUUIDs, metadata);
terminal.println(shardMessage(cleanUpPaths.size(), indexUUIDs.size()));
outputHowToSeeVerboseInformation(terminal);

terminal.println("Node is being re-purposed as cluster-manager and no-data. Clean-up of shard data will be performed.");
if (repurposeData && repurposeSearch) {
terminal.println(
"Node is being re-purposed as cluster-manager, no-data and no-search. Clean-up of shard data and file cache data will be performed."
);
} else if (repurposeData) {
terminal.println("Node is being re-purposed as cluster-manager and no-data. Clean-up of shard data will be performed.");
} else if (repurposeSearch) {
terminal.println("Node is being re-purposed as cluster-manager and no-search. Clean-up of file cache data will be performed.");
}

confirm(terminal, "Do you want to proceed?");

removePaths(terminal, shardDataPaths); // clean-up shard dirs
if (repurposeData) {
removePaths(terminal, shardDataPaths); // clean-up shard dirs
}

if (repurposeSearch) {
removePaths(terminal, fileCacheDataPaths); // clean-up file cache dirs
}

terminal.println("Node successfully repurposed to cluster-manager and no-data.");
if (repurposeData && repurposeSearch) {
terminal.println("Node successfully repurposed to cluster-manager, no-data and no-search.");
} else if (repurposeData) {
terminal.println("Node successfully repurposed to cluster-manager and no-data.");
} else if (repurposeSearch) {
terminal.println("Node successfully repurposed to cluster-manager and no-search.");
}
}

private ClusterState loadClusterState(Terminal terminal, Environment env, PersistedClusterStateService psf) throws IOException {
Expand Down Expand Up @@ -211,11 +316,17 @@ private Set<String> indexUUIDsFor(Set<Path> indexPaths) {
}

static String noClusterManagerMessage(int indexes, int shards, int indexMetadata) {
return "Found " + indexes + " indices (" + shards + " shards and " + indexMetadata + " index meta data) to clean up";
return "Found "
+ indexes
+ " indices ("
+ shards
+ " shards/file cache folders and "
+ indexMetadata
+ " index meta data) to clean up";
}

static String shardMessage(int shards, int indices) {
return "Found " + shards + " shards in " + indices + " indices to clean up";
return "Found " + shards + " shards/file cache folders in " + indices + " indices to clean up";
}

private void removePaths(Terminal terminal, Collection<Path> paths) {
Expand Down
Loading

0 comments on commit 71a76da

Please sign in to comment.