From 3467ca714871dc8ccca38e24a88a584481dede76 Mon Sep 17 00:00:00 2001 From: Arpit Bandejiya Date: Sat, 8 Jun 2024 15:26:00 +0530 Subject: [PATCH] Add remote routing table read Signed-off-by: Arpit Bandejiya --- .../cluster/routing/RoutingTable.java | 2 +- .../remote/RemoteRoutingTableService.java | 62 ++++- .../remote/RemoteClusterStateService.java | 47 +++- .../RemoteRoutingTableServiceTests.java | 245 +++++++++++++++++- .../remote/ClusterMetadataManifestTests.java | 4 +- 5 files changed, 339 insertions(+), 21 deletions(-) diff --git a/server/src/main/java/org/opensearch/cluster/routing/RoutingTable.java b/server/src/main/java/org/opensearch/cluster/routing/RoutingTable.java index e4095a84be081..6c7b94f316da2 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/RoutingTable.java +++ b/server/src/main/java/org/opensearch/cluster/routing/RoutingTable.java @@ -79,7 +79,7 @@ public class RoutingTable implements Iterable, Diffable indicesRouting; - private RoutingTable(long version, final Map indicesRouting) { + public RoutingTable(long version, final Map indicesRouting) { this.version = version; this.indicesRouting = Collections.unmodifiableMap(indicesRouting); } diff --git a/server/src/main/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableService.java b/server/src/main/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableService.java index 058fc3bff772a..89aa898544d30 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableService.java +++ b/server/src/main/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableService.java @@ -10,23 +10,37 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.opensearch.action.LatchedActionListener; import org.opensearch.cluster.DiffableUtils; import org.opensearch.cluster.routing.IndexRoutingTable; import org.opensearch.cluster.routing.RoutingTable; +import org.opensearch.common.CheckedRunnable; +import org.opensearch.common.blobstore.BlobContainer; +import org.opensearch.common.blobstore.BlobPath; import org.opensearch.common.lifecycle.AbstractLifecycleComponent; import org.opensearch.common.settings.Settings; import org.opensearch.common.util.io.IOUtils; +import org.opensearch.core.action.ActionListener; import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.core.index.Index; +import org.opensearch.gateway.remote.ClusterMetadataManifest; +import org.opensearch.gateway.remote.RemoteClusterStateService; +import org.opensearch.gateway.remote.routingtable.RemoteIndexRoutingTable; import org.opensearch.node.Node; import org.opensearch.node.remotestore.RemoteStoreNodeAttribute; import org.opensearch.repositories.RepositoriesService; import org.opensearch.repositories.Repository; import org.opensearch.repositories.blobstore.BlobStoreRepository; +import org.opensearch.threadpool.ThreadPool; import java.io.IOException; +import java.util.List; import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ExecutorService; import java.util.function.Supplier; +import java.util.stream.Collectors; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteRoutingTableEnabled; @@ -41,6 +55,7 @@ public class RemoteRoutingTableService extends AbstractLifecycleComponent { private final Settings settings; private final Supplier repositoriesService; private BlobStoreRepository blobStoreRepository; + private final ThreadPool threadPool; private static final DiffableUtils.NonDiffableValueSerializer CUSTOM_ROUTING_TABLE_VALUE_SERIALIZER = new DiffableUtils.NonDiffableValueSerializer() { @@ -55,10 +70,55 @@ public IndexRoutingTable read(StreamInput in, String key) throws IOException { } }; - public RemoteRoutingTableService(Supplier repositoriesService, Settings settings) { + public RemoteRoutingTableService(Supplier repositoriesService, Settings settings, ThreadPool threadPool) { assert isRemoteRoutingTableEnabled(settings) : "Remote routing table is not enabled"; this.repositoriesService = repositoriesService; this.settings = settings; + this.threadPool = threadPool; + } + + public CheckedRunnable getAsyncIndexMetadataReadAction( + String uploadedFilename, + Index index, + LatchedActionListener latchedActionListener) { + int idx = uploadedFilename.lastIndexOf("/"); + String blobFileName = uploadedFilename.substring(idx+1); + BlobContainer blobContainer = blobStoreRepository.blobStore().blobContainer(BlobPath.cleanPath().add(uploadedFilename.substring(0,idx))); + + return () -> readAsync( + blobContainer, + blobFileName, + index, + threadPool.executor(ThreadPool.Names.GENERIC), + ActionListener.wrap(response -> latchedActionListener.onResponse(response.getIndexRoutingTable()), latchedActionListener::onFailure) + ); + } + + public void readAsync(BlobContainer blobContainer, String name, Index index, ExecutorService executorService, ActionListener listener) throws IOException { + executorService.execute(() -> { + try { + listener.onResponse(read(blobContainer, name, index)); + } catch (Exception e) { + listener.onFailure(e); + } + }); + } + + public RemoteIndexRoutingTable read(BlobContainer blobContainer, String path, Index index) { + try { + return new RemoteIndexRoutingTable(blobContainer.readBlob(path), index); + } catch (IOException | AssertionError e) { + logger.info("RoutingTable read failed with error: {}", e.toString()); + throw new RemoteClusterStateService.RemoteStateTransferException("Failed to read RemoteRoutingTable from Manifest with error ", e); + } + } + + public List getUpdatedIndexRoutingTableMetadata(List updatedIndicesRouting, List allIndicesRouting) { + return updatedIndicesRouting.stream().map(idx -> { + Optional uploadedIndexMetadataOptional = allIndicesRouting.stream().filter(idx2 -> idx2.getIndexName().equals(idx)).findFirst(); + assert uploadedIndexMetadataOptional.isPresent() == true; + return uploadedIndexMetadataOptional.get(); + }).collect(Collectors.toList()); } public static DiffableUtils.MapDiff> getIndicesRoutingMapDiff( diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java index 2f32ae2ffb605..35819800e81ae 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java @@ -24,7 +24,7 @@ import static org.opensearch.gateway.remote.model.RemoteHashesOfConsistentSettings.HASHES_OF_CONSISTENT_SETTINGS; import static org.opensearch.gateway.remote.model.RemoteIndexMetadata.INDEX_PATH_TOKEN; import static org.opensearch.gateway.remote.model.RemoteTransientSettingsMetadata.TRANSIENT_SETTING_METADATA; -import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemotePublicationEnabled; +import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteRoutingTableEnabled; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteStoreClusterStateEnabled; import java.io.Closeable; @@ -193,6 +193,9 @@ public RemoteClusterStateService( this.namedWriteableRegistry = namedWriteableRegistry; this.remoteClusterStateCleanupManager = new RemoteClusterStateCleanupManager(this, clusterService); this.indexMetadataUploadListeners = indexMetadataUploadListeners; + this.remoteRoutingTableService = isRemoteRoutingTableEnabled(settings) + ? Optional.of(new RemoteRoutingTableService(repositoriesService, settings, threadPool)) + : Optional.empty(); } /** @@ -203,7 +206,6 @@ public RemoteClusterStateService( */ @Nullable public RemoteUploadDetails writeFullMetadata(ClusterState clusterState, String previousClusterUUID) throws IOException { - logger.info("WRITING FULL STATE"); final long startTimeNanos = relativeTimeNanosSupplier.getAsLong(); if (clusterState.nodes().isLocalNodeElectedClusterManager() == false) { logger.error("Local node is not elected cluster manager. Exiting"); @@ -971,11 +973,12 @@ private ClusterState readClusterStateInParallel( boolean readTemplatesMetadata, boolean readDiscoveryNodes, boolean readClusterBlocks, + List indicesRoutingToRead, boolean readHashesOfConsistentSettings, Map clusterStateCustomToRead ) throws IOException { int totalReadTasks = - indicesToRead.size() + customToRead.size() + (readCoordinationMetadata ? 1 : 0) + (readSettingsMetadata ? 1 : 0) + ( + indicesToRead.size() + customToRead.size() + (remoteRoutingTableService.isPresent() ? indicesRoutingToRead.size() : 0) + (readCoordinationMetadata ? 1 : 0) + (readSettingsMetadata ? 1 : 0) + ( readTemplatesMetadata ? 1 : 0) + (readDiscoveryNodes ? 1 : 0) + (readClusterBlocks ? 1 : 0) + (readTransientSettingsMetadata ? 1 : 0) + (readHashesOfConsistentSettings ? 1 : 0) + clusterStateCustomToRead.size(); CountDownLatch latch = new CountDownLatch(totalReadTasks); @@ -1022,6 +1025,18 @@ private ClusterState readClusterStateInParallel( latch ); + if(remoteRoutingTableService.isPresent()) { + for (UploadedIndexMetadata indexRouting : indicesRoutingToRead) { + asyncMetadataReadActions.add( + remoteRoutingTableService.get().getAsyncIndexMetadataReadAction( + indexRouting.getUploadedFilename(), + new Index(indexRouting.getIndexName(), indexRouting.getIndexUUID()), + routingTableLatchedActionListener + ) + ); + } + } + for (Map.Entry entry : customToRead.entrySet()) { asyncMetadataReadActions.add( remoteGlobalMetadataManager.getAsyncMetadataReadAction( @@ -1204,18 +1219,20 @@ private ClusterState readClusterStateInParallel( } }); - readIndexRoutingTableResults.forEach(indexRoutingTable -> { - indicesRouting.put(indexRoutingTable.getIndex().getName(), indexRoutingTable); - }); - metadataBuilder.indices(indexMetadataMap); if (readDiscoveryNodes) { clusterStateBuilder.nodes(discoveryNodesBuilder.get().localNodeId(localNodeId)); } - return clusterStateBuilder.metadata(metadataBuilder) + + clusterStateBuilder = clusterStateBuilder.metadata(metadataBuilder) .version(manifest.getStateVersion()) - .stateUUID(manifest.getStateUUID()) - .build(); + .stateUUID(manifest.getStateUUID()); + + if(remoteRoutingTableService.isPresent()) { + readIndexRoutingTableResults.forEach(indexRoutingTable -> indicesRouting.put(indexRoutingTable.getIndex().getName(), indexRoutingTable)); + clusterStateBuilder = clusterStateBuilder.routingTable(new RoutingTable(manifest.getRoutingTableVersion(), indicesRouting)); + } + return clusterStateBuilder.build(); } public ClusterState getClusterStateForManifest(String clusterName, ClusterMetadataManifest manifest, String localNodeId, boolean includeEphemeral) @@ -1233,6 +1250,7 @@ public ClusterState getClusterStateForManifest(String clusterName, ClusterMetada manifest.getTemplatesMetadata() != null, includeEphemeral && manifest.getDiscoveryNodesMetadata() != null, includeEphemeral && manifest.getClusterBlocksMetadata() != null, + includeEphemeral ? manifest.getIndicesRouting() : Collections.emptyList(), includeEphemeral && manifest.getHashesOfConsistentSettings() != null, includeEphemeral ? manifest.getClusterStateCustomMap() : Collections.emptyMap() ); @@ -1261,6 +1279,13 @@ public ClusterState getClusterStateUsingDiff(String clusterName, ClusterMetadata updatedClusterStateCustom.put(customType, manifest.getClusterStateCustomMap().get(customType)); } } + + List updatedIndexRouting = new ArrayList<>(); + remoteRoutingTableService.ifPresent(routingTableService -> + updatedIndexRouting.addAll(routingTableService.getUpdatedIndexRoutingTableMetadata(diff.getIndicesRoutingUpdated(), + manifest.getIndicesRouting())) + ); + ClusterState updatedClusterState = readClusterStateInParallel( previousState, manifest, @@ -1274,6 +1299,7 @@ public ClusterState getClusterStateUsingDiff(String clusterName, ClusterMetadata diff.isTemplatesMetadataUpdated(), diff.isDiscoveryNodesUpdated(), diff.isClusterBlocksUpdated(), + updatedIndexRouting, diff.isHashesOfConsistentSettingsUpdated(), updatedClusterStateCustom ); @@ -1307,6 +1333,7 @@ public ClusterState getClusterStateUsingDiff(String clusterName, ClusterMetadata stateUUID(manifest.getStateUUID()). version(manifest.getStateVersion()). metadata(metadataBuilder). + routingTable(new RoutingTable(manifest.getRoutingTableVersion(), indexRoutingTables)). build(); } diff --git a/server/src/test/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableServiceTests.java b/server/src/test/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableServiceTests.java index 9a9cbfa153259..34db6bb3d5212 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableServiceTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableServiceTests.java @@ -8,30 +8,62 @@ package org.opensearch.cluster.routing.remote; +import org.junit.After; +import org.junit.Before; +import org.opensearch.Version; +import org.opensearch.action.LatchedActionListener; +import org.opensearch.cluster.ClusterName; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.routing.IndexRoutingTable; +import org.opensearch.cluster.routing.RoutingTable; +import org.opensearch.common.CheckedRunnable; +import org.opensearch.common.blobstore.BlobContainer; +import org.opensearch.common.blobstore.BlobPath; +import org.opensearch.common.blobstore.BlobStore; +import org.opensearch.common.compress.DeflateCompressor; +import org.opensearch.common.io.stream.BytesStreamOutput; +import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; import org.opensearch.common.util.FeatureFlags; +import org.opensearch.core.index.Index; +import org.opensearch.gateway.remote.ClusterMetadataManifest; +import org.opensearch.gateway.remote.RemoteClusterStateService; +import org.opensearch.gateway.remote.routingtable.RemoteIndexRoutingTable; import org.opensearch.repositories.FilterRepository; import org.opensearch.repositories.RepositoriesService; import org.opensearch.repositories.RepositoryMissingException; import org.opensearch.repositories.blobstore.BlobStoreRepository; import org.opensearch.test.OpenSearchTestCase; -import org.junit.After; -import org.junit.Before; +import org.opensearch.threadpool.TestThreadPool; +import org.opensearch.threadpool.ThreadPool; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; import java.util.function.Supplier; -import static org.opensearch.common.util.FeatureFlags.REMOTE_PUBLICATION_EXPERIMENTAL; -import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY; +import static org.mockito.Mockito.any; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import static org.opensearch.common.util.FeatureFlags.REMOTE_PUBLICATION_EXPERIMENTAL; +import static org.opensearch.gateway.remote.ClusterMetadataManifestTests.randomUploadedIndexMetadataList; +import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY; public class RemoteRoutingTableServiceTests extends OpenSearchTestCase { private RemoteRoutingTableService remoteRoutingTableService; + private ClusterSettings clusterSettings; private Supplier repositoriesServiceSupplier; private RepositoriesService repositoriesService; private BlobStoreRepository blobStoreRepository; + private BlobStore blobStore; + private BlobContainer blobContainer; + private BlobPath basePath; + private final ThreadPool threadPool = new TestThreadPool(getClass().getName()); @Before public void setup() { @@ -43,24 +75,44 @@ public void setup() { .put("node.attr." + REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY, "routing_repository") .build(); + clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + blobStoreRepository = mock(BlobStoreRepository.class); + when(blobStoreRepository.getCompressor()).thenReturn(new DeflateCompressor()); + blobStore = mock(BlobStore.class); + blobContainer = mock(BlobContainer.class); when(repositoriesService.repository("routing_repository")).thenReturn(blobStoreRepository); - + when(blobStoreRepository.blobStore()).thenReturn(blobStore); Settings nodeSettings = Settings.builder().put(REMOTE_PUBLICATION_EXPERIMENTAL, "true").build(); FeatureFlags.initializeFeatureFlags(nodeSettings); - remoteRoutingTableService = new RemoteRoutingTableService(repositoriesServiceSupplier, settings); + basePath = BlobPath.cleanPath().add("base-path"); + + remoteRoutingTableService = new RemoteRoutingTableService( + repositoriesServiceSupplier, + settings, + threadPool + ); } @After public void teardown() throws Exception { super.tearDown(); remoteRoutingTableService.close(); + threadPool.shutdown(); } + public void testFailInitializationWhenRemoteRoutingDisabled() { final Settings settings = Settings.builder().build(); - assertThrows(AssertionError.class, () -> new RemoteRoutingTableService(repositoriesServiceSupplier, settings)); + assertThrows( + AssertionError.class, + () -> new RemoteRoutingTableService( + repositoriesServiceSupplier, + settings, + new ThreadPool(settings) + ) + ); } public void testFailStartWhenRepositoryNotSet() { @@ -74,4 +126,183 @@ public void testFailStartWhenNotBlobRepository() { assertThrows(AssertionError.class, () -> remoteRoutingTableService.start()); } + public void testGetChangedIndicesRouting() { + String indexName = randomAlphaOfLength(randomIntBetween(1, 50)); + final Index index = new Index(indexName, "uuid"); + final IndexMetadata indexMetadata = new IndexMetadata.Builder(indexName).settings( + Settings.builder() + .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetadata.SETTING_INDEX_UUID, "uuid") + .build() + ).numberOfShards(1).numberOfReplicas(1).build(); + + RoutingTable routingTable = RoutingTable.builder().addAsNew(indexMetadata).build(); + ClusterState state = ClusterState.builder(ClusterName.DEFAULT).routingTable(routingTable).build(); + + assertEquals(0, RemoteRoutingTableService.getIndicesRoutingMapDiff(state.getRoutingTable(), state.getRoutingTable()).getUpserts().size()); + + //Reversing order to check for equality without order. + IndexRoutingTable indexRouting = routingTable.getIndicesRouting().get(indexName); + IndexRoutingTable indexRoutingTable = IndexRoutingTable.builder(index).addShard(indexRouting.getShards().get(0).replicaShards().get(0)) + .addShard(indexRouting.getShards().get(0).primaryShard()).build(); + ClusterState newState = ClusterState.builder(ClusterName.DEFAULT).routingTable(RoutingTable.builder().add(indexRoutingTable).build()).build(); + assertEquals(0, RemoteRoutingTableService.getIndicesRoutingMapDiff(state.getRoutingTable(), newState.getRoutingTable()).getUpserts().size()); + } + + public void testIndicesRoutingDiffWhenIndexDeleted() { + + ClusterState state = createIndices(randomIntBetween(1,100)); + RoutingTable routingTable = state.routingTable(); + + List allIndices = new ArrayList<>(); + routingTable.getIndicesRouting().forEach((k,v) -> allIndices.add(k)); + + String indexNameToDelete = allIndices.get(randomIntBetween(0, allIndices.size()-1)); + RoutingTable updatedRoutingTable = RoutingTable.builder(routingTable).remove(indexNameToDelete).build(); + + assertEquals(1, RemoteRoutingTableService.getIndicesRoutingMapDiff(state.getRoutingTable(), updatedRoutingTable).getDeletes().size()); + assertEquals(indexNameToDelete, RemoteRoutingTableService.getIndicesRoutingMapDiff(state.getRoutingTable(), updatedRoutingTable).getDeletes().get(0)); + } + + public void testIndicesRoutingDiffWhenIndexDeletedAndAdded() { + + ClusterState state = createIndices(randomIntBetween(1,100)); + RoutingTable routingTable = state.routingTable(); + + List allIndices = new ArrayList<>(); + routingTable.getIndicesRouting().forEach((k,v) -> allIndices.add(k)); + + String indexNameToDelete = allIndices.get(randomIntBetween(0, allIndices.size()-1)); + RoutingTable.Builder updatedRoutingTableBuilder = RoutingTable.builder(routingTable).remove(indexNameToDelete); + + String indexName = randomAlphaOfLength(randomIntBetween(1, 50)); + final IndexMetadata indexMetadata = new IndexMetadata.Builder(indexName).settings( + Settings.builder() + .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetadata.SETTING_INDEX_UUID, "uuid") + .build() + ).numberOfShards(1).numberOfReplicas(1).build(); + + RoutingTable updatedRoutingTable = updatedRoutingTableBuilder.addAsNew(indexMetadata).build(); + + assertEquals(1, RemoteRoutingTableService.getIndicesRoutingMapDiff(state.getRoutingTable(), updatedRoutingTable).getDeletes().size()); + assertEquals(indexNameToDelete, RemoteRoutingTableService.getIndicesRoutingMapDiff(state.getRoutingTable(), updatedRoutingTable).getDeletes().get(0)); + + assertEquals(1, RemoteRoutingTableService.getIndicesRoutingMapDiff(state.getRoutingTable(), updatedRoutingTable).getUpserts().size()); + assertTrue(RemoteRoutingTableService.getIndicesRoutingMapDiff(state.getRoutingTable(), updatedRoutingTable).getUpserts().containsKey(indexName)); + } + + public void testGetAsyncIndexMetadataReadAction() throws Exception { + String indexName = randomAlphaOfLength(randomIntBetween(1,50)); + ClusterState clusterState = createClusterState(indexName); + String uploadedFileName = String.format("index-routing/" + indexName); + Index index = new Index(indexName, "uuid-01"); + + LatchedActionListener listener = mock(LatchedActionListener.class); + when(blobStore.blobContainer(any())).thenReturn(blobContainer); + BytesStreamOutput streamOutput = new BytesStreamOutput(); + RemoteIndexRoutingTable remoteIndexRoutingTable = new RemoteIndexRoutingTable(clusterState.routingTable().getIndicesRouting().get(indexName)); + remoteIndexRoutingTable.writeTo(streamOutput); + when(blobContainer.readBlob(indexName)).thenReturn(streamOutput.bytes().streamInput()); + remoteRoutingTableService.start(); + + CheckedRunnable runnable = remoteRoutingTableService.getAsyncIndexMetadataReadAction( + uploadedFileName, index, listener + ); + assertNotNull(runnable); + runnable.run(); + + verify(blobContainer, times(1)).readBlob(any()); + assertBusy(() -> verify(listener, times(1)).onResponse(any(IndexRoutingTable.class))); + } + + public void testGetAsyncIndexMetadataReadActionFailureForIncorrectIndex() throws Exception { + String indexName = randomAlphaOfLength(randomIntBetween(1,50)); + ClusterState clusterState = createClusterState(indexName); + String uploadedFileName = String.format("index-routing/" + indexName); + Index index = new Index("incorrect-index", "uuid-01"); + + LatchedActionListener listener = mock(LatchedActionListener.class); + when(blobStore.blobContainer(any())).thenReturn(blobContainer); + BytesStreamOutput streamOutput = new BytesStreamOutput(); + RemoteIndexRoutingTable remoteIndexRoutingTable = new RemoteIndexRoutingTable(clusterState.routingTable().getIndicesRouting().get(indexName)); + remoteIndexRoutingTable.writeTo(streamOutput); + when(blobContainer.readBlob(indexName)).thenReturn(streamOutput.bytes().streamInput()); + remoteRoutingTableService.start(); + + CheckedRunnable runnable = remoteRoutingTableService.getAsyncIndexMetadataReadAction( + uploadedFileName, index, listener + ); + assertNotNull(runnable); + runnable.run(); + + verify(blobContainer, times(1)).readBlob(any()); + assertBusy(() -> verify(listener, times(1)).onFailure(any(Exception.class))); + } + + public void testGetAsyncIndexMetadataReadActionFailureInBlobRepo() throws Exception { + String indexName = randomAlphaOfLength(randomIntBetween(1,50)); + ClusterState clusterState = createClusterState(indexName); + String uploadedFileName = String.format("index-routing/" + indexName); + Index index = new Index(indexName, "uuid-01"); + + LatchedActionListener listener = mock(LatchedActionListener.class); + when(blobStore.blobContainer(any())).thenReturn(blobContainer); + doThrow(new IOException("testing failure")).when(blobContainer).readBlob(indexName); + remoteRoutingTableService.start(); + + CheckedRunnable runnable = remoteRoutingTableService.getAsyncIndexMetadataReadAction( + uploadedFileName, index, listener + ); + assertNotNull(runnable); + runnable.run(); + + verify(blobContainer, times(1)).readBlob(any()); + assertBusy(() -> verify(listener, times(1)).onFailure(any(RemoteClusterStateService.RemoteStateTransferException.class))); + } + + public void testGetUpdatedIndexRoutingTableMetadataWhenNoChange() { + List updatedIndicesRouting = new ArrayList<>(); + List indicesRouting = randomUploadedIndexMetadataList(); + List updatedIndexMetadata = remoteRoutingTableService.getUpdatedIndexRoutingTableMetadata(updatedIndicesRouting, indicesRouting); + assertEquals(0, updatedIndexMetadata.size()); + } + + public void testGetUpdatedIndexRoutingTableMetadataWhenIndexIsUpdated() { + List updatedIndicesRouting = new ArrayList<>(); + List indicesRouting = randomUploadedIndexMetadataList(); + ClusterMetadataManifest.UploadedIndexMetadata expectedIndexRouting = indicesRouting.get(randomIntBetween(0, indicesRouting.size())); + updatedIndicesRouting.add(expectedIndexRouting.getIndexName()); + List updatedIndexMetadata = remoteRoutingTableService.getUpdatedIndexRoutingTableMetadata(updatedIndicesRouting, indicesRouting); + assertEquals(1, updatedIndexMetadata.size()); + assertEquals(expectedIndexRouting, updatedIndexMetadata.get(0)); + } + + private ClusterState createIndices(int numberOfIndices) { + RoutingTable.Builder routingTableBuilder = RoutingTable.builder(); + for(int i=0; i< numberOfIndices; i++) { + String indexName = randomAlphaOfLength(randomIntBetween(1, 50)); + final Index index = new Index(indexName, "uuid"); + final IndexMetadata indexMetadata = new IndexMetadata.Builder(indexName).settings( + Settings.builder() + .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetadata.SETTING_INDEX_UUID, "uuid") + .build() + ).numberOfShards(1).numberOfReplicas(1).build(); + + routingTableBuilder.addAsNew(indexMetadata); + } + return ClusterState.builder(ClusterName.DEFAULT).routingTable(routingTableBuilder.build()).build(); + } + + private ClusterState createClusterState(String indexName) { + final IndexMetadata indexMetadata = new IndexMetadata.Builder(indexName).settings( + Settings.builder() + .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetadata.SETTING_INDEX_UUID, "uuid") + .build() + ).numberOfShards(randomInt(1000)).numberOfReplicas(randomInt(10)).build(); + RoutingTable routingTable = RoutingTable.builder().addAsNew(indexMetadata).build(); + return ClusterState.builder(ClusterName.DEFAULT).routingTable(routingTable).build(); + } } diff --git a/server/src/test/java/org/opensearch/gateway/remote/ClusterMetadataManifestTests.java b/server/src/test/java/org/opensearch/gateway/remote/ClusterMetadataManifestTests.java index ffaa5a9f64c75..070de379c1da0 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/ClusterMetadataManifestTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/ClusterMetadataManifestTests.java @@ -425,7 +425,7 @@ public void testClusterMetadataManifestXContentV3() throws IOException { } } - private List randomUploadedIndexMetadataList() { + public static List randomUploadedIndexMetadataList() { final int size = randomIntBetween(1, 10); final List uploadedIndexMetadataList = new ArrayList<>(size); while (uploadedIndexMetadataList.size() < size) { @@ -434,7 +434,7 @@ private List randomUploadedIndexMetadataList() { return uploadedIndexMetadataList; } - private UploadedIndexMetadata randomUploadedIndexMetadata() { + private static UploadedIndexMetadata randomUploadedIndexMetadata() { return new UploadedIndexMetadata(randomAlphaOfLength(10), randomAlphaOfLength(10), randomAlphaOfLength(10)); }