Skip to content

Commit

Permalink
create publication repos during join task execution (opensearch-proje…
Browse files Browse the repository at this point in the history
…ct#16383)

* create publication repos during join task

Signed-off-by: Rajiv Kumar Vaidyanathan <[email protected]>
  • Loading branch information
rajiv-kv authored Oct 28, 2024
1 parent 6f1b59e commit 72559bf
Show file tree
Hide file tree
Showing 4 changed files with 272 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.opensearch.remotemigration.MigrationBaseTestCase;
import org.opensearch.remotestore.multipart.mocks.MockFsRepositoryPlugin;
import org.opensearch.repositories.blobstore.BlobStoreRepository;
import org.opensearch.repositories.fs.FsRepository;
import org.opensearch.repositories.fs.ReloadableFsRepository;
import org.opensearch.test.InternalSettingsPlugin;
import org.opensearch.test.OpenSearchIntegTestCase;
Expand Down Expand Up @@ -97,23 +98,26 @@ public Settings.Builder remotePublishConfiguredNodeSetting() {
.put(stateRepoSettingsAttributeKeyPrefix + prefixModeVerificationSuffix, true)
.put(REMOTE_CLUSTER_STATE_ENABLED_SETTING.getKey(), true)
.put("node.attr." + REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY, ROUTING_TABLE_REPO_NAME)
.put(routingTableRepoTypeAttributeKey, ReloadableFsRepository.TYPE)
.put(routingTableRepoTypeAttributeKey, FsRepository.TYPE)
.put(routingTableRepoSettingsAttributeKeyPrefix + "location", segmentRepoPath);
return builder;
}

public Settings.Builder remoteWithRoutingTableNodeSetting() {
// Remote Cluster with Routing table

return Settings.builder()
.put(
buildRemoteStoreNodeAttributes(
remoteStoreClusterSettings(
REPOSITORY_NAME,
segmentRepoPath,
ReloadableFsRepository.TYPE,
REPOSITORY_2_NAME,
translogRepoPath,
ReloadableFsRepository.TYPE,
REPOSITORY_NAME,
segmentRepoPath,
false
ReloadableFsRepository.TYPE
)
)
.put(REMOTE_CLUSTER_STATE_ENABLED_SETTING.getKey(), true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.metadata.RepositoriesMetadata;
import org.opensearch.cluster.metadata.RepositoryMetadata;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.cluster.routing.RerouteService;
Expand All @@ -57,6 +58,7 @@
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
Expand Down Expand Up @@ -185,11 +187,30 @@ public ClusterTasksResult<Task> execute(ClusterState currentState, List<Task> jo
// for every set of node join task which we can optimize to not compute if cluster state already has
// repository information.
Optional<DiscoveryNode> remoteDN = currentNodes.getNodes().values().stream().filter(DiscoveryNode::isRemoteStoreNode).findFirst();
DiscoveryNode dn = remoteDN.orElseGet(() -> (currentNodes.getNodes().values()).stream().findFirst().get());
RepositoriesMetadata repositoriesMetadata = remoteStoreNodeService.updateRepositoriesMetadata(
dn,
currentState.getMetadata().custom(RepositoriesMetadata.TYPE)
);
Optional<DiscoveryNode> remotePublicationDN = currentNodes.getNodes()
.values()
.stream()
.filter(DiscoveryNode::isRemoteStatePublicationEnabled)
.findFirst();
RepositoriesMetadata existingRepositoriesMetadata = currentState.getMetadata().custom(RepositoriesMetadata.TYPE);
Map<String, RepositoryMetadata> repositories = new LinkedHashMap<>();
if (existingRepositoriesMetadata != null) {
existingRepositoriesMetadata.repositories().forEach(r -> repositories.putIfAbsent(r.name(), r));
}
if (remoteDN.isPresent()) {
RepositoriesMetadata repositoriesMetadata = remoteStoreNodeService.updateRepositoriesMetadata(
remoteDN.get(),
existingRepositoriesMetadata
);
repositoriesMetadata.repositories().forEach(r -> repositories.putIfAbsent(r.name(), r));
}
if (remotePublicationDN.isPresent()) {
RepositoriesMetadata repositoriesMetadata = remoteStoreNodeService.updateRepositoriesMetadata(
remotePublicationDN.get(),
existingRepositoriesMetadata
);
repositoriesMetadata.repositories().forEach(r -> repositories.putIfAbsent(r.name(), r));
}

assert nodesBuilder.isLocalNodeElectedClusterManager();

Expand Down Expand Up @@ -219,15 +240,16 @@ public ClusterTasksResult<Task> execute(ClusterState currentState, List<Task> jo
ensureNodeCommissioned(node, currentState.metadata());
nodesBuilder.add(node);

if (remoteDN.isEmpty() && node.isRemoteStoreNode()) {
if ((remoteDN.isEmpty() && node.isRemoteStoreNode())
|| (remotePublicationDN.isEmpty() && node.isRemoteStatePublicationEnabled())) {
// This is hit only on cases where we encounter first remote node
logger.info("Updating system repository now for remote store");
repositoriesMetadata = remoteStoreNodeService.updateRepositoriesMetadata(
RepositoriesMetadata repositoriesMetadata = remoteStoreNodeService.updateRepositoriesMetadata(
node,
currentState.getMetadata().custom(RepositoriesMetadata.TYPE)
existingRepositoriesMetadata
);
repositoriesMetadata.repositories().forEach(r -> repositories.putIfAbsent(r.name(), r));
}

nodesChanged = true;
minClusterNodeVersion = Version.min(minClusterNodeVersion, node.getVersion());
maxClusterNodeVersion = Version.max(maxClusterNodeVersion, node.getVersion());
Expand All @@ -241,7 +263,7 @@ public ClusterTasksResult<Task> execute(ClusterState currentState, List<Task> jo
}
results.success(joinTask);
}

RepositoriesMetadata repositoriesMetadata = new RepositoriesMetadata(new ArrayList<>(repositories.values()));
if (nodesChanged) {
rerouteService.reroute(
"post-join reroute",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ public void createAndVerifyRepositories(DiscoveryNode localNode) {
* node repository metadata an exception will be thrown and the node will not be allowed to join the cluster.
*/
public RepositoriesMetadata updateRepositoriesMetadata(DiscoveryNode joiningNode, RepositoriesMetadata existingRepositories) {
if (joiningNode.isRemoteStoreNode()) {
if (joiningNode.isRemoteStoreNode() || joiningNode.isRemoteStatePublicationEnabled()) {
List<RepositoryMetadata> updatedRepositoryMetadataList = new ArrayList<>();
List<RepositoryMetadata> newRepositoryMetadataList = new RemoteStoreNodeAttribute(joiningNode).getRepositoriesMetadata()
.repositories();
Expand Down
Loading

0 comments on commit 72559bf

Please sign in to comment.