Skip to content

Commit

Permalink
Make NodeEnvironment.availableShardPaths singular (elastic#72441)
Browse files Browse the repository at this point in the history
This commit renames the availableShardPaths method to be singular and
return a single Path instead of an array.

relates elastic#71205
  • Loading branch information
rjernst authored Apr 29, 2021
1 parent 2a446ad commit faede0a
Show file tree
Hide file tree
Showing 19 changed files with 83 additions and 112 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.elasticsearch.test.InternalTestCluster;
import org.elasticsearch.test.MockLogAppender;

import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.List;
Expand Down Expand Up @@ -266,12 +267,12 @@ private void rerouteWithAllocateLocalGateway(Settings commonSettings) throws Exc
final Index index = resolveIndex("test");

logger.info("--> closing all nodes");
Path[] shardLocation = internalCluster().getInstance(NodeEnvironment.class, node_1).availableShardPaths(new ShardId(index, 0));
Path shardLocation = internalCluster().getInstance(NodeEnvironment.class, node_1).availableShardPath(new ShardId(index, 0));
assertThat(FileSystemUtils.exists(shardLocation), equalTo(true)); // make sure the data is there!
internalCluster().closeNonSharedNodes(false); // don't wipe data directories the index needs to be there!

logger.info("--> deleting the shard data [{}] ", Arrays.toString(shardLocation));
assertThat(FileSystemUtils.exists(shardLocation), equalTo(true)); // verify again after cluster was shut down
logger.info("--> deleting the shard data [{}] ", shardLocation);
assertThat(Files.exists(shardLocation), equalTo(true)); // verify again after cluster was shut down
IOUtils.rm(shardLocation);

logger.info("--> starting nodes back, will not allocate the shard since it has no data, but the index will be there");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -547,19 +547,17 @@ public Settings onNodeStopped(String nodeName) throws Exception {
});

if (corrupt) {
for (Path path : internalCluster().getInstance(NodeEnvironment.class, nodeName).availableShardPaths(shardId)) {
final Path indexPath = path.resolve(ShardPath.INDEX_FOLDER_NAME);
if (Files.exists(indexPath)) { // multi data path might only have one path in use
try (DirectoryStream<Path> stream = Files.newDirectoryStream(indexPath)) {
for (Path item : stream) {
if (item.getFileName().toString().startsWith("segments_")) {
logger.debug("--> deleting [{}]", item);
Files.delete(item);
}
Path path = internalCluster().getInstance(NodeEnvironment.class, nodeName).availableShardPath(shardId);
final Path indexPath = path.resolve(ShardPath.INDEX_FOLDER_NAME);
if (Files.exists(indexPath)) { // multi data path might only have one path in use
try (DirectoryStream<Path> stream = Files.newDirectoryStream(indexPath)) {
for (Path item : stream) {
if (item.getFileName().toString().startsWith("segments_")) {
logger.debug("--> deleting [{}]", item);
Files.delete(item);
}
}
}

}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,11 +130,11 @@ public void testLockTryingToDelete() throws Exception {

ClusterService cs = getInstanceFromNode(ClusterService.class);
final Index index = cs.state().metadata().index("test").getIndex();
Path[] shardPaths = env.availableShardPaths(new ShardId(index, 0));
logger.info("--> paths: [{}]", (Object)shardPaths);
Path shardPath = env.availableShardPath(new ShardId(index, 0));
logger.info("--> path: [{}]", shardPath);
// Should not be able to acquire the lock because it's already open
try {
NodeEnvironment.acquireFSLockForPaths(IndexSettingsModule.newIndexSettings("test", Settings.EMPTY), shardPaths);
NodeEnvironment.acquireFSLockForPaths(IndexSettingsModule.newIndexSettings("test", Settings.EMPTY), shardPath);
fail("should not have been able to acquire the lock");
} catch (LockObtainFailedException e) {
assertTrue("msg: " + e.getMessage(), e.getMessage().contains("unable to acquire write.lock"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -607,14 +607,12 @@ private int numShards(String... index) {

private List<Path> findFilesToCorruptOnNode(final String nodeName, final ShardId shardId) throws IOException {
List<Path> files = new ArrayList<>();
for (Path path : internalCluster().getInstance(NodeEnvironment.class, nodeName).availableShardPaths(shardId)) {
path = path.resolve("index");
if (Files.exists(path)) { // multi data path might only have one path in use
try (DirectoryStream<Path> stream = Files.newDirectoryStream(path)) {
for (Path item : stream) {
if (item.getFileName().toString().startsWith("segments_")) {
files.add(item);
}
Path path = internalCluster().getInstance(NodeEnvironment.class, nodeName).availableShardPath(shardId).resolve("index");
if (Files.exists(path)) { // multi data path might only have one path in use
try (DirectoryStream<Path> stream = Files.newDirectoryStream(path)) {
for (Path item : stream) {
if (item.getFileName().toString().startsWith("segments_")) {
files.add(item);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -457,9 +457,7 @@ private Path indexDirectory(String server, Index index) {

private Path shardDirectory(String server, Index index, int shard) {
NodeEnvironment env = internalCluster().getInstance(NodeEnvironment.class, server);
final Path[] paths = env.availableShardPaths(new ShardId(index, shard));
assert paths.length == 1;
return paths[0];
return env.availableShardPath(new ShardId(index, shard));
}

private void assertShardDeleted(final String server, final Index index, final int shard) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -424,18 +424,17 @@ public void testCancellationCleansTempFiles() throws Exception {
logger.info("--> verifying no temporary recoveries are left");
for (String node : internalCluster().getNodeNames()) {
NodeEnvironment nodeEnvironment = internalCluster().getInstance(NodeEnvironment.class, node);
for (final Path shardLoc : nodeEnvironment.availableShardPaths(new ShardId(indexName, "_na_", 0))) {
if (Files.exists(shardLoc)) {
assertBusy(() -> {
try {
forEachFileRecursively(shardLoc,
(file, attrs) -> assertThat("found a temporary recovery file: " + file, file.getFileName().toString(),
not(startsWith("recovery."))));
} catch (IOException e) {
throw new AssertionError("failed to walk file tree starting at [" + shardLoc + "]", e);
}
});
}
final Path shardLoc = nodeEnvironment.availableShardPath(new ShardId(indexName, "_na_", 0));
if (Files.exists(shardLoc)) {
assertBusy(() -> {
try {
forEachFileRecursively(shardLoc,
(file, attrs) -> assertThat("found a temporary recovery file: " + file, file.getFileName().toString(),
not(startsWith("recovery."))));
} catch (IOException e) {
throw new AssertionError("failed to walk file tree starting at [" + shardLoc + "]", e);
}
});
}
}
}
Expand Down
29 changes: 12 additions & 17 deletions server/src/main/java/org/elasticsearch/env/NodeEnvironment.java
Original file line number Diff line number Diff line change
Expand Up @@ -549,8 +549,8 @@ public void deleteShardDirectorySafe(
IndexSettings indexSettings,
Consumer<Path[]> listener
) throws IOException, ShardLockObtainFailedException {
final Path[] paths = availableShardPaths(shardId);
logger.trace("deleting shard {} directory, paths: [{}]", shardId, paths);
final Path path = availableShardPath(shardId);
logger.trace("deleting shard {} directory, path: [{}]", shardId, path);
try (ShardLock lock = shardLock(shardId, "shard deletion under lock")) {
deleteShardDirectoryUnderLock(lock, indexSettings, listener);
}
Expand Down Expand Up @@ -604,11 +604,11 @@ public void deleteShardDirectoryUnderLock(
) throws IOException {
final ShardId shardId = lock.getShardId();
assert isShardLocked(shardId) : "shard " + shardId + " is not locked";
final Path[] paths = availableShardPaths(shardId);
logger.trace("acquiring locks for {}, paths: [{}]", shardId, paths);
acquireFSLockForPaths(indexSettings, paths);
listener.accept(paths);
IOUtils.rm(paths);
final Path path = availableShardPath(shardId);
logger.trace("acquiring locks for {}, path: [{}]", shardId, path);
acquireFSLockForPaths(indexSettings, path);
listener.accept(new Path[] { path });
IOUtils.rm(path);
if (indexSettings.hasCustomDataPath()) {
Path customLocation = resolveCustomLocation(indexSettings.customDataPath(), shardId);
logger.trace("acquiring lock for {}, custom path: [{}]", shardId, customLocation);
Expand All @@ -617,11 +617,11 @@ public void deleteShardDirectoryUnderLock(
listener.accept(new Path[]{customLocation});
IOUtils.rm(customLocation);
}
logger.trace("deleted shard {} directory, paths: [{}]", shardId, paths);
assert assertPathsDoNotExist(paths);
logger.trace("deleted shard {} directory, path: [{}]", shardId, path);
assert assertPathsDoNotExist(path);
}

private static boolean assertPathsDoNotExist(final Path[] paths) {
private static boolean assertPathsDoNotExist(final Path... paths) {
Set<Path> existingPaths = Stream.of(paths)
.filter(FileSystemUtils::exists)
.filter(leftOver -> {
Expand Down Expand Up @@ -950,14 +950,9 @@ public Path indexPath(Index index) {
* @see #resolveCustomLocation(String, ShardId)
*
*/
public Path[] availableShardPaths(ShardId shardId) {
public Path availableShardPath(ShardId shardId) {
assertEnvIsLocked();
final NodePath[] nodePaths = nodePaths();
final Path[] shardLocations = new Path[nodePaths.length];
for (int i = 0; i < nodePaths.length; i++) {
shardLocations[i] = nodePaths[i].resolve(shardId);
}
return shardLocations;
return nodePaths[0].resolve(shardId);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ protected NodeGatewayStartedShards nodeOperation(NodeRequest request, Task task)
final ShardId shardId = request.getShardId();
logger.trace("{} loading local shard state info", shardId);
ShardStateMetadata shardStateMetadata = ShardStateMetadata.FORMAT.loadLatestState(logger, namedXContentRegistry,
nodeEnv.availableShardPaths(request.shardId));
nodeEnv.availableShardPath(request.shardId));
if (shardStateMetadata != null) {
if (indicesService.getShardOrNull(shardId) == null) {
final String customDataPath;
Expand Down
26 changes: 12 additions & 14 deletions server/src/main/java/org/elasticsearch/index/shard/ShardPath.java
Original file line number Diff line number Diff line change
Expand Up @@ -109,9 +109,9 @@ public boolean isCustomDataPath() {
*/
public static ShardPath loadShardPath(Logger logger, NodeEnvironment env,
ShardId shardId, String customDataPath) throws IOException {
final Path[] paths = env.availableShardPaths(shardId);
final Path shardPath = env.availableShardPath(shardId);
final Path sharedDataPath = env.sharedDataPath();
return loadShardPath(logger, shardId, customDataPath, paths, sharedDataPath);
return loadShardPath(logger, shardId, customDataPath, new Path[] { shardPath }, sharedDataPath);
}

/**
Expand Down Expand Up @@ -170,18 +170,16 @@ public static void deleteLeftoverShardDirectory(
final Consumer<Path[]> listener
) throws IOException {
final String indexUUID = indexSettings.getUUID();
final Path[] paths = env.availableShardPaths(lock.getShardId());
for (Path path : paths) {
// EMPTY is safe here because we never call namedObject
ShardStateMetadata load = ShardStateMetadata.FORMAT.loadLatestState(logger, NamedXContentRegistry.EMPTY, path);
if (load != null) {
if (load.indexUUID.equals(indexUUID) == false && IndexMetadata.INDEX_UUID_NA_VALUE.equals(load.indexUUID) == false) {
logger.warn("{} deleting leftover shard on path: [{}] with a different index UUID", lock.getShardId(), path);
assert Files.isDirectory(path) : path + " is not a directory";
NodeEnvironment.acquireFSLockForPaths(indexSettings, path);
listener.accept(new Path[]{path});
IOUtils.rm(path);
}
final Path path = env.availableShardPath(lock.getShardId());
// EMPTY is safe here because we never call namedObject
ShardStateMetadata load = ShardStateMetadata.FORMAT.loadLatestState(logger, NamedXContentRegistry.EMPTY, path);
if (load != null) {
if (load.indexUUID.equals(indexUUID) == false && IndexMetadata.INDEX_UUID_NA_VALUE.equals(load.indexUUID) == false) {
logger.warn("{} deleting leftover shard on path: [{}] with a different index UUID", lock.getShardId(), path);
assert Files.isDirectory(path) : path + " is not a directory";
NodeEnvironment.acquireFSLockForPaths(indexSettings, path);
listener.accept(new Path[]{path});
IOUtils.rm(path);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1083,7 +1083,7 @@ public ShardDeletionCheckResult canDeleteShardContent(ShardId shardId, IndexSett
} else {
// lets see if it's path is available (return false if the shard doesn't exist)
// we don't need to delete anything that is not there
return FileSystemUtils.exists(nodeEnv.availableShardPaths(shardId)) ?
return Files.exists(nodeEnv.availableShardPath(shardId)) ?
ShardDeletionCheckResult.FOLDER_FOUND_CAN_DELETE :
ShardDeletionCheckResult.NO_FOLDER_FOUND;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -342,23 +342,23 @@ public void testCustomDataPaths() throws Exception {
Index index = new Index("myindex", "myindexUUID");
ShardId sid = new ShardId(index, 0);

assertThat(env.availableShardPaths(sid), equalTo(env.availableShardPaths(sid)));
assertThat(env.availableShardPath(sid), equalTo(env.availableShardPath(sid)));
assertThat(env.resolveCustomLocation("/tmp/foo", sid).toAbsolutePath(),
equalTo(PathUtils.get("/tmp/foo/0/" + index.getUUID() + "/0").toAbsolutePath()));

assertThat("shard paths with a custom data_path should contain only regular paths",
env.availableShardPaths(sid)[0],
env.availableShardPath(sid),
equalTo(dataPath.resolve("indices/" + index.getUUID() + "/0")));

assertThat("index paths uses the regular template",
env.indexPath(index), equalTo(dataPath.resolve("indices/" + index.getUUID())));

assertThat(env.availableShardPaths(sid), equalTo(env.availableShardPaths(sid)));
assertThat(env.availableShardPath(sid), equalTo(env.availableShardPath(sid)));
assertThat(env.resolveCustomLocation("/tmp/foo", sid).toAbsolutePath(),
equalTo(PathUtils.get("/tmp/foo/0/" + index.getUUID() + "/0").toAbsolutePath()));

assertThat("shard paths with a custom data_path should contain only regular paths",
env.availableShardPaths(sid)[0],
env.availableShardPath(sid),
equalTo(dataPath.resolve("indices/" + index.getUUID() + "/0")));

assertThat("index paths uses the regular template",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,18 +208,18 @@ public void testWriteShardState() throws Exception {
boolean primary = randomBoolean();
AllocationId allocationId = randomBoolean() ? null : randomAllocationId();
ShardStateMetadata state1 = new ShardStateMetadata(primary, "fooUUID", allocationId);
write(state1, env.availableShardPaths(id));
ShardStateMetadata shardStateMetadata = load(logger, env.availableShardPaths(id));
write(state1, env.availableShardPath(id));
ShardStateMetadata shardStateMetadata = load(logger, env.availableShardPath(id));
assertEquals(shardStateMetadata, state1);

ShardStateMetadata state2 = new ShardStateMetadata(primary, "fooUUID", allocationId);
write(state2, env.availableShardPaths(id));
shardStateMetadata = load(logger, env.availableShardPaths(id));
write(state2, env.availableShardPath(id));
shardStateMetadata = load(logger, env.availableShardPath(id));
assertEquals(shardStateMetadata, state1);

ShardStateMetadata state3 = new ShardStateMetadata(primary, "fooUUID", allocationId);
write(state3, env.availableShardPaths(id));
shardStateMetadata = load(logger, env.availableShardPaths(id));
write(state3, env.availableShardPath(id));
shardStateMetadata = load(logger, env.availableShardPath(id));
assertEquals(shardStateMetadata, state3);
assertEquals("fooUUID", state3.indexUUID);
}
Expand Down
Loading

0 comments on commit faede0a

Please sign in to comment.