From 36a6bb17df431e46f776c7ddb79a3c9bd3f2fa2c Mon Sep 17 00:00:00 2001 From: panguixin Date: Sat, 3 Feb 2024 01:00:37 +0800 Subject: [PATCH] add index store listener Signed-off-by: panguixin --- .../org/opensearch/env/NodeEnvironment.java | 78 ++++------- .../remote/filecache/FileCacheCleaner.java | 116 +++++++++++++++++ .../main/java/org/opensearch/node/Node.java | 8 +- .../opensearch/env/NodeEnvironmentTests.java | 56 ++++++++ .../filecache/FileCacheCleanerTests.java | 123 ++++++++++++++++++ 5 files changed, 326 insertions(+), 55 deletions(-) create mode 100644 server/src/main/java/org/opensearch/index/store/remote/filecache/FileCacheCleaner.java create mode 100644 server/src/test/java/org/opensearch/index/store/remote/filecache/FileCacheCleanerTests.java diff --git a/server/src/main/java/org/opensearch/env/NodeEnvironment.java b/server/src/main/java/org/opensearch/env/NodeEnvironment.java index 931c83e75bc8e..2748938d8b761 100644 --- a/server/src/main/java/org/opensearch/env/NodeEnvironment.java +++ b/server/src/main/java/org/opensearch/env/NodeEnvironment.java @@ -44,7 +44,6 @@ import org.apache.lucene.store.LockObtainFailedException; import org.apache.lucene.store.NIOFSDirectory; import org.apache.lucene.store.NativeFSLockFactory; -import org.apache.lucene.util.SetOnce; import org.opensearch.OpenSearchException; import org.opensearch.Version; import org.opensearch.cluster.metadata.IndexMetadata; @@ -72,7 +71,6 @@ import org.opensearch.index.IndexSettings; import org.opensearch.index.shard.ShardPath; import org.opensearch.index.store.FsDirectoryFactory; -import org.opensearch.index.store.remote.filecache.FileCache; import org.opensearch.monitor.fs.FsInfo; import org.opensearch.monitor.fs.FsProbe; import org.opensearch.monitor.jvm.JvmInfo; @@ -107,7 +105,6 @@ import java.util.stream.Stream; import static java.util.Collections.unmodifiableSet; -import static org.opensearch.index.store.remote.directory.RemoteSnapshotDirectoryFactory.LOCAL_STORE_LOCATION; /** * A component that holds all data paths for a single node. @@ -202,7 +199,7 @@ public String toString() { private final NodeMetadata nodeMetadata; - private final SetOnce fileCache = new SetOnce<>(); + private final IndexStoreListener indexStoreListener; /** * Maximum number of data nodes that should run in an environment. @@ -300,18 +297,23 @@ public void close() { } } + public NodeEnvironment(Settings settings, Environment environment) throws IOException { + this(settings, environment, IndexStoreListener.EMPTY); + } + /** * Setup the environment. * @param settings settings from opensearch.yml */ - public NodeEnvironment(Settings settings, Environment environment) throws IOException { - if (!DiscoveryNode.nodeRequiresLocalStorage(settings)) { + public NodeEnvironment(Settings settings, Environment environment, IndexStoreListener indexStoreListener) throws IOException { + if (DiscoveryNode.nodeRequiresLocalStorage(settings) == false) { nodePaths = null; fileCacheNodePath = null; sharedDataPath = null; locks = null; nodeLockId = -1; nodeMetadata = new NodeMetadata(generateNodeId(settings), Version.CURRENT); + this.indexStoreListener = IndexStoreListener.EMPTY; return; } boolean success = false; @@ -390,6 +392,7 @@ public NodeEnvironment(Settings settings, Environment environment) throws IOExce } this.nodeMetadata = loadNodeMetadata(settings, logger, nodePaths); + this.indexStoreListener = indexStoreListener; success = true; } finally { if (success == false) { @@ -398,10 +401,6 @@ public NodeEnvironment(Settings settings, Environment environment) throws IOExce } } - public void setFileCache(final FileCache fileCache) { - this.fileCache.set(fileCache); - } - /** * Resolve a specific nodes/{node.id} path for the specified path and node lock id. * @@ -587,12 +586,7 @@ public void deleteShardDirectoryUnderLock(ShardLock lock, IndexSettings indexSet final ShardId shardId = lock.getShardId(); assert isShardLocked(shardId) : "shard " + shardId + " is not locked"; - if (indexSettings.isRemoteSnapshot()) { - final ShardPath shardPath = ShardPath.loadFileCachePath(this, shardId); - cleanupShardFileCache(shardPath); - deleteShardFileCacheDirectory(shardPath); - logger.trace("deleted shard {} file cache directory, path: [{}]", shardId, shardPath.getDataPath()); - } + indexStoreListener.beforeShardPathDeleted(shardId, indexSettings, this); final Path[] paths = availableShardPaths(shardId); logger.trace("acquiring locks for {}, paths: [{}]", shardId, paths); @@ -609,40 +603,6 @@ public void deleteShardDirectoryUnderLock(ShardLock lock, IndexSettings indexSet assert assertPathsDoNotExist(paths); } - /** - * Cleans up the corresponding index file path entries from FileCache - * - * @param shardPath the shard path - */ - private void cleanupShardFileCache(ShardPath shardPath) { - try { - final FileCache fc = fileCache.get(); - assert fc != null; - final Path localStorePath = shardPath.getDataPath().resolve(LOCAL_STORE_LOCATION); - try (DirectoryStream ds = Files.newDirectoryStream(localStorePath)) { - for (Path subPath : ds) { - fc.remove(subPath.toRealPath()); - } - } - } catch (IOException ioe) { - logger.error( - () -> new ParameterizedMessage("Error removing items from cache during shard deletion {}", shardPath.getShardId()), - ioe - ); - } - } - - private void deleteShardFileCacheDirectory(ShardPath shardPath) { - final Path path = shardPath.getDataPath(); - try { - if (Files.exists(path)) { - IOUtils.rm(path); - } - } catch (IOException e) { - logger.error(() -> new ParameterizedMessage("Failed to delete cache path for shard {}", shardPath.getShardId()), e); - } - } - private static boolean assertPathsDoNotExist(final Path[] paths) { Set existingPaths = Stream.of(paths).filter(FileSystemUtils::exists).filter(leftOver -> { // Relaxed assertion for the special case where only the empty state directory exists after deleting @@ -704,9 +664,7 @@ public void deleteIndexDirectorySafe(Index index, long lockTimeoutMS, IndexSetti * @param indexSettings settings for the index being deleted */ public void deleteIndexDirectoryUnderLock(Index index, IndexSettings indexSettings) throws IOException { - if (indexSettings.isRemoteSnapshot()) { - deleteIndexFileCacheDirectory(index); - } + indexStoreListener.beforeIndexPathDeleted(index, indexSettings, this); final Path[] indexPaths = indexPaths(index); logger.trace("deleting index {} directory, paths({}): [{}]", index, indexPaths.length, indexPaths); @@ -1454,4 +1412,18 @@ private static void tryWriteTempFile(Path path) throws IOException { } } } + + /** + * A listener that is executed on per-index and per-shard store events, like deleting shard path + * + * @opensearch.internal + */ + public interface IndexStoreListener { + default void beforeShardPathDeleted(ShardId shardId, IndexSettings indexSettings, NodeEnvironment env) {} + + default void beforeIndexPathDeleted(Index index, IndexSettings indexSettings, NodeEnvironment env) {} + + IndexStoreListener EMPTY = new IndexStoreListener() { + }; + } } diff --git a/server/src/main/java/org/opensearch/index/store/remote/filecache/FileCacheCleaner.java b/server/src/main/java/org/opensearch/index/store/remote/filecache/FileCacheCleaner.java new file mode 100644 index 0000000000000..0261ab24dfa7a --- /dev/null +++ b/server/src/main/java/org/opensearch/index/store/remote/filecache/FileCacheCleaner.java @@ -0,0 +1,116 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.store.remote.filecache; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.opensearch.common.inject.Provider; +import org.opensearch.common.util.io.IOUtils; +import org.opensearch.core.index.Index; +import org.opensearch.core.index.shard.ShardId; +import org.opensearch.env.NodeEnvironment; +import org.opensearch.index.IndexSettings; +import org.opensearch.index.shard.ShardPath; + +import java.io.IOException; +import java.nio.file.DirectoryStream; +import java.nio.file.Files; +import java.nio.file.Path; + +import static org.opensearch.index.store.remote.directory.RemoteSnapshotDirectoryFactory.LOCAL_STORE_LOCATION; + +/** + * IndexStoreListener to clean up file cache when the index is deleted. The cached entries will be eligible + * for eviction when the shard is deleted, but this listener deterministically removes entries from memory and + * from disk at the time of shard deletion as opposed to waiting for the cache to need to perform eviction. + * + * @opensearch.internal + */ +public class FileCacheCleaner implements NodeEnvironment.IndexStoreListener { + private static final Logger logger = LogManager.getLogger(FileCacheCleaner.class); + + private final Provider fileCacheProvider; + + public FileCacheCleaner(Provider fileCacheProvider) { + this.fileCacheProvider = fileCacheProvider; + } + + /** + * before shard path deleted, cleans up the corresponding index file path entries from FC and delete the corresponding shard file + * cache path. + * + * @param shardId the shard id + * @param indexSettings the index settings + * @param nodeEnvironment the node environment + */ + @Override + public void beforeShardPathDeleted(ShardId shardId, IndexSettings indexSettings, NodeEnvironment nodeEnvironment) { + if (indexSettings.isRemoteSnapshot()) { + final ShardPath shardPath = ShardPath.loadFileCachePath(nodeEnvironment, shardId); + cleanupShardFileCache(shardPath); + deleteShardFileCacheDirectory(shardPath); + } + } + + /** + * Cleans up the corresponding index file path entries from FileCache + * + * @param shardPath the shard path + */ + private void cleanupShardFileCache(ShardPath shardPath) { + try { + final FileCache fc = fileCacheProvider.get(); + assert fc != null; + final Path localStorePath = shardPath.getDataPath().resolve(LOCAL_STORE_LOCATION); + try (DirectoryStream ds = Files.newDirectoryStream(localStorePath)) { + for (Path subPath : ds) { + fc.remove(subPath.toRealPath()); + } + } + } catch (IOException ioe) { + logger.error( + () -> new ParameterizedMessage("Error removing items from cache during shard deletion {}", shardPath.getShardId()), + ioe + ); + } + } + + private void deleteShardFileCacheDirectory(ShardPath shardPath) { + final Path path = shardPath.getDataPath(); + try { + if (Files.exists(path)) { + IOUtils.rm(path); + } + } catch (IOException e) { + logger.error(() -> new ParameterizedMessage("Failed to delete cache path for shard {}", shardPath.getShardId()), e); + } + } + + /** + * before index path deleted, delete the corresponding index file cache path. + * + * @param index the index + * @param indexSettings the index settings + * @param nodeEnvironment the node environment + */ + @Override + public void beforeIndexPathDeleted(Index index, IndexSettings indexSettings, NodeEnvironment nodeEnvironment) { + if (indexSettings.isRemoteSnapshot()) { + final Path indexCachePath = nodeEnvironment.fileCacheNodePath().fileCachePath.resolve(index.getUUID()); + if (Files.exists(indexCachePath)) { + try { + IOUtils.rm(indexCachePath); + } catch (IOException e) { + logger.error(() -> new ParameterizedMessage("Failed to delete cache path for index {}", index), e); + } + } + } + } +} diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index 47420b0db7042..547f610f4a752 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -146,6 +146,7 @@ import org.opensearch.index.remote.RemoteStoreStatsTrackerFactory; import org.opensearch.index.store.RemoteSegmentStoreDirectoryFactory; import org.opensearch.index.store.remote.filecache.FileCache; +import org.opensearch.index.store.remote.filecache.FileCacheCleaner; import org.opensearch.index.store.remote.filecache.FileCacheFactory; import org.opensearch.indices.IndicesModule; import org.opensearch.indices.IndicesService; @@ -526,7 +527,11 @@ protected Node( */ this.environment = new Environment(settings, initialEnvironment.configDir(), Node.NODE_LOCAL_STORAGE_SETTING.get(settings)); Environment.assertEquivalent(initialEnvironment, this.environment); - nodeEnvironment = new NodeEnvironment(tmpSettings, environment); + if (DiscoveryNode.isSearchNode(settings) == false) { + nodeEnvironment = new NodeEnvironment(tmpSettings, environment); + } else { + nodeEnvironment = new NodeEnvironment(settings, environment, new FileCacheCleaner(this::fileCache)); + } logger.info( "node name [{}], node ID [{}], cluster name [{}], roles {}", NODE_NAME_SETTING.get(tmpSettings), @@ -677,7 +682,6 @@ protected Node( ); // File cache will be initialized by the node once circuit breakers are in place. initializeFileCache(settings, circuitBreakerService.getBreaker(CircuitBreaker.REQUEST)); - nodeEnvironment.setFileCache(fileCache); final MonitorService monitorService = new MonitorService(settings, nodeEnvironment, threadPool, fileCache); pluginsService.filterPlugins(CircuitBreakerPlugin.class).forEach(plugin -> { diff --git a/server/src/test/java/org/opensearch/env/NodeEnvironmentTests.java b/server/src/test/java/org/opensearch/env/NodeEnvironmentTests.java index 7f669934579ee..962eb743dca6e 100644 --- a/server/src/test/java/org/opensearch/env/NodeEnvironmentTests.java +++ b/server/src/test/java/org/opensearch/env/NodeEnvironmentTests.java @@ -359,6 +359,57 @@ protected void doRun() throws Exception { env.close(); } + public void testIndexStoreListener() throws Exception { + final AtomicInteger shardCounter = new AtomicInteger(0); + final AtomicInteger indexCounter = new AtomicInteger(0); + final Index index = new Index("foo", "fooUUID"); + final ShardId shardId = new ShardId(index, 0); + final NodeEnvironment.IndexStoreListener listener = new NodeEnvironment.IndexStoreListener() { + @Override + public void beforeShardPathDeleted(ShardId inShardId, IndexSettings indexSettings, NodeEnvironment env) { + assertEquals(shardId, inShardId); + shardCounter.incrementAndGet(); + } + + @Override + public void beforeIndexPathDeleted(Index inIndex, IndexSettings indexSettings, NodeEnvironment env) { + assertEquals(index, inIndex); + indexCounter.incrementAndGet(); + } + }; + final NodeEnvironment env = newNodeEnvironment(listener); + + for (Path path : env.indexPaths(index)) { + Files.createDirectories(path.resolve("0")); + } + + for (Path path : env.indexPaths(index)) { + assertTrue(Files.exists(path.resolve("0"))); + } + assertEquals(0, shardCounter.get()); + + env.deleteShardDirectorySafe(new ShardId(index, 0), idxSettings); + + for (Path path : env.indexPaths(index)) { + assertFalse(Files.exists(path.resolve("0"))); + } + assertEquals(1, shardCounter.get()); + + for (Path path : env.indexPaths(index)) { + assertTrue(Files.exists(path)); + } + assertEquals(0, indexCounter.get()); + + env.deleteIndexDirectorySafe(index, 5000, idxSettings); + + for (Path path : env.indexPaths(index)) { + assertFalse(Files.exists(path)); + } + assertEquals(1, indexCounter.get()); + assertTrue("LockedShards: " + env.lockedShards(), env.lockedShards().isEmpty()); + env.close(); + } + public void testStressShardLock() throws IOException, InterruptedException { class Int { int value = 0; @@ -629,6 +680,11 @@ public NodeEnvironment newNodeEnvironment() throws IOException { return newNodeEnvironment(Settings.EMPTY); } + public NodeEnvironment newNodeEnvironment(NodeEnvironment.IndexStoreListener listener) throws IOException { + Settings build = buildEnvSettings(Settings.EMPTY); + return new NodeEnvironment(build, TestEnvironment.newEnvironment(build), listener); + } + @Override public NodeEnvironment newNodeEnvironment(Settings settings) throws IOException { Settings build = buildEnvSettings(settings); diff --git a/server/src/test/java/org/opensearch/index/store/remote/filecache/FileCacheCleanerTests.java b/server/src/test/java/org/opensearch/index/store/remote/filecache/FileCacheCleanerTests.java new file mode 100644 index 0000000000000..e2a6a4011a6b7 --- /dev/null +++ b/server/src/test/java/org/opensearch/index/store/remote/filecache/FileCacheCleanerTests.java @@ -0,0 +1,123 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.store.remote.filecache; + +import org.apache.lucene.store.IndexInput; +import org.opensearch.Version; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.common.settings.Settings; +import org.opensearch.core.common.breaker.CircuitBreaker; +import org.opensearch.core.common.breaker.NoopCircuitBreaker; +import org.opensearch.core.index.shard.ShardId; +import org.opensearch.env.NodeEnvironment; +import org.opensearch.index.IndexSettings; +import org.opensearch.index.shard.ShardPath; +import org.opensearch.test.OpenSearchTestCase; +import org.hamcrest.MatcherAssert; +import org.junit.After; +import org.junit.Before; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.HashMap; +import java.util.Map; + +import static org.opensearch.index.store.remote.directory.RemoteSnapshotDirectoryFactory.LOCAL_STORE_LOCATION; +import static org.hamcrest.Matchers.equalTo; + +public class FileCacheCleanerTests extends OpenSearchTestCase { + private static final ShardId SHARD_0 = new ShardId("index", "uuid-0", 0); + private static final ShardId SHARD_1 = new ShardId("index", "uuid-1", 0); + private static final Settings SETTINGS = Settings.builder() + .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put("index.store.type", "remote_snapshot") + .build(); + private static final IndexSettings INDEX_SETTINGS = new IndexSettings( + IndexMetadata.builder("index").settings(SETTINGS).build(), + SETTINGS + ); + + private final FileCache fileCache = FileCacheFactory.createConcurrentLRUFileCache( + 1024 * 1024, + 1, + new NoopCircuitBreaker(CircuitBreaker.REQUEST) + ); + private final Map files = new HashMap<>(); + private NodeEnvironment env; + private FileCacheCleaner cleaner; + + @Before + public void setUpFileCache() throws IOException { + env = newNodeEnvironment(SETTINGS); + cleaner = new FileCacheCleaner(() -> fileCache); + files.put(SHARD_0, addFile(fileCache, env, SHARD_0)); + files.put(SHARD_1, addFile(fileCache, env, SHARD_1)); + MatcherAssert.assertThat(fileCache.size(), equalTo(2L)); + } + + private static Path addFile(FileCache fileCache, NodeEnvironment env, ShardId shardId) throws IOException { + final ShardPath shardPath = ShardPath.loadFileCachePath(env, shardId); + final Path localStorePath = shardPath.getDataPath().resolve(LOCAL_STORE_LOCATION); + Files.createDirectories(localStorePath); + final Path file = Files.createFile(localStorePath.resolve("file")); + fileCache.put(file, new CachedIndexInput() { + @Override + public IndexInput getIndexInput() { + return null; + } + + @Override + public long length() { + return 1024; + } + + @Override + public boolean isClosed() { + return false; + } + + @Override + public void close() { + + } + }); + return file; + } + + @After + public void tearDownFileCache() { + env.close(); + } + + public void testShardRemoved() { + final Path cachePath = ShardPath.loadFileCachePath(env, SHARD_0).getDataPath(); + assertTrue(Files.exists(cachePath)); + + cleaner.beforeShardPathDeleted(SHARD_0, INDEX_SETTINGS, env); + MatcherAssert.assertThat(fileCache.size(), equalTo(1L)); + assertNull(fileCache.get(files.get(SHARD_0))); + assertFalse(Files.exists(files.get(SHARD_0))); + assertTrue(Files.exists(files.get(SHARD_1))); + assertFalse(Files.exists(cachePath)); + } + + public void testIndexRemoved() { + final Path indexCachePath = env.fileCacheNodePath().fileCachePath.resolve(SHARD_0.getIndex().getUUID()); + assertTrue(Files.exists(indexCachePath)); + + cleaner.beforeShardPathDeleted(SHARD_0, INDEX_SETTINGS, env); + cleaner.beforeShardPathDeleted(SHARD_1, INDEX_SETTINGS, env); + cleaner.beforeIndexPathDeleted(SHARD_0.getIndex(), INDEX_SETTINGS, env); + MatcherAssert.assertThat(fileCache.size(), equalTo(0L)); + assertFalse(Files.exists(indexCachePath)); + } +}