From 0d05c92be831d896c9d12231bdbd0b72bbf784ae Mon Sep 17 00:00:00 2001 From: Rajiv Kumar Vaidyanathan Date: Sun, 20 Oct 2024 23:34:48 +0530 Subject: [PATCH] fix the existing repo lookup --- .../coordination/JoinTaskExecutor.java | 40 ++++++++++++++----- 1 file changed, 30 insertions(+), 10 deletions(-) diff --git a/server/src/main/java/org/opensearch/cluster/coordination/JoinTaskExecutor.java b/server/src/main/java/org/opensearch/cluster/coordination/JoinTaskExecutor.java index 1d5ffd5fbd940..5b5a781f1f65c 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/JoinTaskExecutor.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/JoinTaskExecutor.java @@ -42,6 +42,7 @@ import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.metadata.Metadata; import org.opensearch.cluster.metadata.RepositoriesMetadata; +import org.opensearch.cluster.metadata.RepositoryMetadata; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.node.DiscoveryNodes; import org.opensearch.cluster.routing.RerouteService; @@ -57,6 +58,7 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.List; import java.util.Locale; import java.util.Map; @@ -185,19 +187,29 @@ public ClusterTasksResult execute(ClusterState currentState, List jo // for every set of node join task which we can optimize to not compute if cluster state already has // repository information. Optional remoteDN = currentNodes.getNodes().values().stream().filter(DiscoveryNode::isRemoteStoreNode).findFirst(); - DiscoveryNode dn = remoteDN.orElseGet(() -> (currentNodes.getNodes().values()).stream().findFirst().get()); - RepositoriesMetadata repositoriesMetadata = remoteStoreNodeService.updateRepositoriesMetadata( - dn, - currentState.getMetadata().custom(RepositoriesMetadata.TYPE) - ); - Optional remotePublicationDN = currentNodes.getNodes() .values() .stream() .filter(DiscoveryNode::isRemoteStatePublicationEnabled) .findFirst(); + RepositoriesMetadata existingrepositoriesMetadata = currentState.getMetadata().custom(RepositoriesMetadata.TYPE); + Map repositories = new LinkedHashMap<>(); + if (existingrepositoriesMetadata != null) { + existingrepositoriesMetadata.repositories().forEach(r -> repositories.put(r.name(), r)); + } + if (remoteDN.isPresent()) { + RepositoriesMetadata repositoriesMetadata = remoteStoreNodeService.updateRepositoriesMetadata( + remoteDN.get(), + existingrepositoriesMetadata + ); + repositoriesMetadata.repositories().forEach(r -> repositories.put(r.name(), r)); + } if (remotePublicationDN.isPresent()) { - repositoriesMetadata = remoteStoreNodeService.updateRepositoriesMetadata(remotePublicationDN.get(), repositoriesMetadata); + RepositoriesMetadata repositoriesMetadata = remoteStoreNodeService.updateRepositoriesMetadata( + remotePublicationDN.get(), + existingrepositoriesMetadata + ); + repositoriesMetadata.repositories().forEach(r -> repositories.put(r.name(), r)); } assert nodesBuilder.isLocalNodeElectedClusterManager(); @@ -231,12 +243,20 @@ public ClusterTasksResult execute(ClusterState currentState, List jo if (remoteDN.isEmpty() && node.isRemoteStoreNode()) { // This is hit only on cases where we encounter first remote node logger.info("Updating system repository now for remote store"); - repositoriesMetadata = remoteStoreNodeService.updateRepositoriesMetadata(node, repositoriesMetadata); + RepositoriesMetadata repositoriesMetadata = remoteStoreNodeService.updateRepositoriesMetadata( + node, + existingrepositoriesMetadata + ); + repositoriesMetadata.repositories().forEach(r -> repositories.put(r.name(), r)); } if (remotePublicationDN.isEmpty() && node.isRemoteStatePublicationEnabled()) { // This is hit only on cases where we encounter first remote publication node logger.info("Updating system repository now for remote publication store"); - repositoriesMetadata = remoteStoreNodeService.updateRepositoriesMetadata(node, repositoriesMetadata); + RepositoriesMetadata repositoriesMetadata = remoteStoreNodeService.updateRepositoriesMetadata( + node, + existingrepositoriesMetadata + ); + repositoriesMetadata.repositories().forEach(r -> repositories.put(r.name(), r)); } nodesChanged = true; @@ -252,7 +272,7 @@ public ClusterTasksResult execute(ClusterState currentState, List jo } results.success(joinTask); } - + RepositoriesMetadata repositoriesMetadata = new RepositoriesMetadata(new ArrayList<>(repositories.values())); if (nodesChanged) { rerouteService.reroute( "post-join reroute",