Skip to content

Commit

Permalink
create publication repos during join task
Browse files Browse the repository at this point in the history
  • Loading branch information
rajiv-kv committed Oct 19, 2024
1 parent 0bded88 commit c23f86e
Show file tree
Hide file tree
Showing 4 changed files with 255 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.opensearch.remotemigration.MigrationBaseTestCase;
import org.opensearch.remotestore.multipart.mocks.MockFsRepositoryPlugin;
import org.opensearch.repositories.blobstore.BlobStoreRepository;
import org.opensearch.repositories.fs.FsRepository;
import org.opensearch.repositories.fs.ReloadableFsRepository;
import org.opensearch.test.InternalSettingsPlugin;
import org.opensearch.test.OpenSearchIntegTestCase;
Expand Down Expand Up @@ -97,23 +98,26 @@ public Settings.Builder remotePublishConfiguredNodeSetting() {
.put(stateRepoSettingsAttributeKeyPrefix + prefixModeVerificationSuffix, true)
.put(REMOTE_CLUSTER_STATE_ENABLED_SETTING.getKey(), true)
.put("node.attr." + REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY, ROUTING_TABLE_REPO_NAME)
.put(routingTableRepoTypeAttributeKey, ReloadableFsRepository.TYPE)
.put(routingTableRepoTypeAttributeKey, FsRepository.TYPE)
.put(routingTableRepoSettingsAttributeKeyPrefix + "location", segmentRepoPath);
return builder;
}

public Settings.Builder remoteWithRoutingTableNodeSetting() {
// Remote Cluster with Routing table

return Settings.builder()
.put(
buildRemoteStoreNodeAttributes(
remoteStoreClusterSettings(
REPOSITORY_NAME,
segmentRepoPath,
ReloadableFsRepository.TYPE,
REPOSITORY_2_NAME,
translogRepoPath,
ReloadableFsRepository.TYPE,
REPOSITORY_NAME,
segmentRepoPath,
false
ReloadableFsRepository.TYPE
)
)
.put(REMOTE_CLUSTER_STATE_ENABLED_SETTING.getKey(), true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,15 @@ public ClusterTasksResult<Task> execute(ClusterState currentState, List<Task> jo
currentState.getMetadata().custom(RepositoriesMetadata.TYPE)
);

Optional<DiscoveryNode> remotePublicationDN = currentNodes.getNodes()
.values()
.stream()
.filter(DiscoveryNode::isRemoteStatePublicationEnabled)
.findFirst();
if (remotePublicationDN.isPresent()) {
repositoriesMetadata = remoteStoreNodeService.updateRepositoriesMetadata(remotePublicationDN.get(), repositoriesMetadata);
}

assert nodesBuilder.isLocalNodeElectedClusterManager();

Version minClusterNodeVersion = newState.nodes().getMinNodeVersion();
Expand Down Expand Up @@ -222,10 +231,12 @@ public ClusterTasksResult<Task> execute(ClusterState currentState, List<Task> jo
if (remoteDN.isEmpty() && node.isRemoteStoreNode()) {
// This is hit only on cases where we encounter first remote node
logger.info("Updating system repository now for remote store");
repositoriesMetadata = remoteStoreNodeService.updateRepositoriesMetadata(
node,
currentState.getMetadata().custom(RepositoriesMetadata.TYPE)
);
repositoriesMetadata = remoteStoreNodeService.updateRepositoriesMetadata(node, repositoriesMetadata);
}
if (remotePublicationDN.isEmpty() && node.isRemoteStatePublicationEnabled()) {
// This is hit only on cases where we encounter first remote publication node
logger.info("Updating system repository now for remote publication store");
repositoriesMetadata = remoteStoreNodeService.updateRepositoriesMetadata(node, repositoriesMetadata);
}

nodesChanged = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ public void createAndVerifyRepositories(DiscoveryNode localNode) {
* node repository metadata an exception will be thrown and the node will not be allowed to join the cluster.
*/
public RepositoriesMetadata updateRepositoriesMetadata(DiscoveryNode joiningNode, RepositoriesMetadata existingRepositories) {
if (joiningNode.isRemoteStoreNode()) {
if (joiningNode.isRemoteStoreNode() || joiningNode.isRemoteStatePublicationEnabled()) {
List<RepositoryMetadata> updatedRepositoryMetadataList = new ArrayList<>();
List<RepositoryMetadata> newRepositoryMetadataList = new RemoteStoreNodeAttribute(joiningNode).getRepositoriesMetadata()
.repositories();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -959,6 +959,198 @@ public void testUpdatesClusterStateWithMultiNodeClusterAndSameRepository() throw
validateRepositoryMetadata(result.resultingState, clusterManagerNode, 2);
}

public void testUpdatesRepoRemoteNodeJoinPublicationCluster() throws Exception {
final AllocationService allocationService = mock(AllocationService.class);
when(allocationService.adaptAutoExpandReplicas(any())).then(invocationOnMock -> invocationOnMock.getArguments()[0]);
final RerouteService rerouteService = (reason, priority, listener) -> listener.onResponse(null);
final RemoteStoreNodeService remoteStoreNodeService = new RemoteStoreNodeService(
new SetOnce<>(mock(RepositoriesService.class))::get,
null
);

final JoinTaskExecutor joinTaskExecutor = new JoinTaskExecutor(
Settings.EMPTY,
allocationService,
logger,
rerouteService,
remoteStoreNodeService
);

final DiscoveryNode clusterManagerNode = new DiscoveryNode(
UUIDs.base64UUID(),
buildNewFakeTransportAddress(),
remotePublicationNodeAttributes(),
DiscoveryNodeRole.BUILT_IN_ROLES,
Version.CURRENT
);

final ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT)
.nodes(
DiscoveryNodes.builder()
.add(clusterManagerNode)
.localNodeId(clusterManagerNode.getId())
.clusterManagerNodeId(clusterManagerNode.getId())
)
.build();

final ClusterStateTaskExecutor.ClusterTasksResult<JoinTaskExecutor.Task> result = joinTaskExecutor.execute(
clusterState,
List.of(new JoinTaskExecutor.Task(clusterManagerNode, "elect leader"))
);
assertThat(result.executionResults.entrySet(), hasSize(1));
final ClusterStateTaskExecutor.TaskResult taskResult = result.executionResults.values().iterator().next();
assertTrue(taskResult.isSuccess());
validatePublicationRepositoryMetadata(result.resultingState, clusterManagerNode);

final Settings settings = Settings.builder()
.put(MIGRATION_DIRECTION_SETTING.getKey(), RemoteStoreNodeService.Direction.REMOTE_STORE)
.put(REMOTE_STORE_COMPATIBILITY_MODE_SETTING.getKey(), "mixed")
.build();
final Settings nodeSettings = Settings.builder().put(REMOTE_STORE_MIGRATION_EXPERIMENTAL, "true").build();
FeatureFlags.initializeFeatureFlags(nodeSettings);
Metadata metadata = Metadata.builder().persistentSettings(settings).build();

ClusterState currentState = ClusterState.builder(result.resultingState).metadata(metadata).build();

final DiscoveryNode remoteStoreNode = new DiscoveryNode(
UUIDs.base64UUID(),
buildNewFakeTransportAddress(),
remoteStoreNodeAttributes(SEGMENT_REPO, TRANSLOG_REPO),
DiscoveryNodeRole.BUILT_IN_ROLES,
Version.CURRENT
);

final ClusterStateTaskExecutor.ClusterTasksResult<JoinTaskExecutor.Task> resultAfterRemoteNodeJoin = joinTaskExecutor.execute(
currentState,
List.of(new JoinTaskExecutor.Task(remoteStoreNode, "test"))
);
assertThat(resultAfterRemoteNodeJoin.executionResults.entrySet(), hasSize(1));
final ClusterStateTaskExecutor.TaskResult taskResult1 = resultAfterRemoteNodeJoin.executionResults.values().iterator().next();
assertTrue(taskResult1.isSuccess());
validateRepositoriesMetadata(resultAfterRemoteNodeJoin.resultingState, remoteStoreNode, clusterManagerNode);
}

public void testUpdatesRepoPublicationNodeJoinRemoteCluster() throws Exception {
final AllocationService allocationService = mock(AllocationService.class);
when(allocationService.adaptAutoExpandReplicas(any())).then(invocationOnMock -> invocationOnMock.getArguments()[0]);
final RerouteService rerouteService = (reason, priority, listener) -> listener.onResponse(null);
final RemoteStoreNodeService remoteStoreNodeService = new RemoteStoreNodeService(
new SetOnce<>(mock(RepositoriesService.class))::get,
null
);

final JoinTaskExecutor joinTaskExecutor = new JoinTaskExecutor(
Settings.EMPTY,
allocationService,
logger,
rerouteService,
remoteStoreNodeService
);

final DiscoveryNode clusterManagerNode = new DiscoveryNode(
UUIDs.base64UUID(),
buildNewFakeTransportAddress(),
remoteStoreNodeAttributes(SEGMENT_REPO, TRANSLOG_REPO),
DiscoveryNodeRole.BUILT_IN_ROLES,
Version.CURRENT
);

final ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT)
.nodes(
DiscoveryNodes.builder()
.add(clusterManagerNode)
.localNodeId(clusterManagerNode.getId())
.clusterManagerNodeId(clusterManagerNode.getId())
)
.build();

final ClusterStateTaskExecutor.ClusterTasksResult<JoinTaskExecutor.Task> result = joinTaskExecutor.execute(
clusterState,
List.of(new JoinTaskExecutor.Task(clusterManagerNode, "elect leader"))
);
final Settings settings = Settings.builder()
.put(MIGRATION_DIRECTION_SETTING.getKey(), RemoteStoreNodeService.Direction.REMOTE_STORE)
.put(REMOTE_STORE_COMPATIBILITY_MODE_SETTING.getKey(), "mixed")
.build();
final Settings nodeSettings = Settings.builder().put(REMOTE_STORE_MIGRATION_EXPERIMENTAL, "true").build();
FeatureFlags.initializeFeatureFlags(nodeSettings);
Metadata metadata = Metadata.builder().persistentSettings(settings).build();

ClusterState currentState = ClusterState.builder(result.resultingState).metadata(metadata).build();

final DiscoveryNode remotePublicationNode = new DiscoveryNode(
UUIDs.base64UUID(),
buildNewFakeTransportAddress(),
remotePublicationNodeAttributes(),
DiscoveryNodeRole.BUILT_IN_ROLES,
Version.CURRENT
);

final ClusterStateTaskExecutor.ClusterTasksResult<JoinTaskExecutor.Task> resultAfterRemoteNodeJoin = joinTaskExecutor.execute(
currentState,
List.of(new JoinTaskExecutor.Task(remotePublicationNode, "test"))
);
assertThat(resultAfterRemoteNodeJoin.executionResults.entrySet(), hasSize(1));
final ClusterStateTaskExecutor.TaskResult taskResult1 = resultAfterRemoteNodeJoin.executionResults.values().iterator().next();
assertTrue(taskResult1.isSuccess());
validateRepositoriesMetadata(resultAfterRemoteNodeJoin.resultingState, clusterManagerNode, remotePublicationNode);
}

public void testUpdatesClusterStateWithMultiplePublicationNodeJoin() throws Exception {
Map<String, String> remoteStoreNodeAttributes = remotePublicationNodeAttributes();
final AllocationService allocationService = mock(AllocationService.class);
when(allocationService.adaptAutoExpandReplicas(any())).then(invocationOnMock -> invocationOnMock.getArguments()[0]);
final RerouteService rerouteService = (reason, priority, listener) -> listener.onResponse(null);
final RemoteStoreNodeService remoteStoreNodeService = new RemoteStoreNodeService(
new SetOnce<>(mock(RepositoriesService.class))::get,
null
);

final JoinTaskExecutor joinTaskExecutor = new JoinTaskExecutor(
Settings.EMPTY,
allocationService,
logger,
rerouteService,
remoteStoreNodeService
);

final DiscoveryNode clusterManagerNode = new DiscoveryNode(
UUIDs.base64UUID(),
buildNewFakeTransportAddress(),
remoteStoreNodeAttributes,
DiscoveryNodeRole.BUILT_IN_ROLES,
Version.CURRENT
);
List<RepositoryMetadata> repositoriesMetadata = new ArrayList<>();

final ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT)
.nodes(
DiscoveryNodes.builder()
.add(clusterManagerNode)
.localNodeId(clusterManagerNode.getId())
.clusterManagerNodeId(clusterManagerNode.getId())
)
.metadata(Metadata.builder().putCustom(RepositoriesMetadata.TYPE, new RepositoriesMetadata(repositoriesMetadata)))
.build();

final DiscoveryNode joiningNode = new DiscoveryNode(
UUIDs.base64UUID(),
buildNewFakeTransportAddress(),
remoteStoreNodeAttributes,
DiscoveryNodeRole.BUILT_IN_ROLES,
Version.CURRENT
);

final ClusterStateTaskExecutor.ClusterTasksResult<JoinTaskExecutor.Task> result = joinTaskExecutor.execute(
clusterState,
List.of(new JoinTaskExecutor.Task(joiningNode, "test"))
);
assertThat(result.executionResults.entrySet(), hasSize(1));
final ClusterStateTaskExecutor.TaskResult taskResult = result.executionResults.values().iterator().next();
assertTrue(taskResult.isSuccess());
validatePublicationRepositoryMetadata(result.resultingState, clusterManagerNode);
}

public void testNodeJoinInMixedMode() {
List<Version> versions = allOpenSearchVersions();
assert versions.size() >= 2 : "test requires at least two open search versions";
Expand Down Expand Up @@ -1191,7 +1383,9 @@ private void validateRepositoryMetadata(ClusterState updatedState, DiscoveryNode

final RepositoriesMetadata repositoriesMetadata = updatedState.metadata().custom(RepositoriesMetadata.TYPE);
assertTrue(repositoriesMetadata.repositories().size() == expectedRepositories);
if (repositoriesMetadata.repositories().size() == 2 || repositoriesMetadata.repositories().size() == 3) {
if (repositoriesMetadata.repositories().size() == 2
|| repositoriesMetadata.repositories().size() == 3
|| repositoriesMetadata.repositories().size() == 4) {
final RepositoryMetadata segmentRepositoryMetadata = buildRepositoryMetadata(existingNode, SEGMENT_REPO);
final RepositoryMetadata translogRepositoryMetadata = buildRepositoryMetadata(existingNode, TRANSLOG_REPO);
for (RepositoryMetadata repositoryMetadata : repositoriesMetadata.repositories()) {
Expand All @@ -1212,6 +1406,43 @@ private void validateRepositoryMetadata(ClusterState updatedState, DiscoveryNode
}
}

private void validatePublicationRepositoryMetadata(ClusterState updatedState, DiscoveryNode existingNode) throws Exception {
final RepositoriesMetadata repositoriesMetadata = updatedState.metadata().custom(RepositoriesMetadata.TYPE);
assertTrue(repositoriesMetadata.repositories().size() == 2);
final RepositoryMetadata clusterStateRepoMetadata = buildRepositoryMetadata(existingNode, CLUSTER_STATE_REPO);
final RepositoryMetadata routingTableRepoMetadata = buildRepositoryMetadata(existingNode, ROUTING_TABLE_REPO);
for (RepositoryMetadata repositoryMetadata : repositoriesMetadata.repositories()) {
if (repositoryMetadata.name().equals(clusterStateRepoMetadata.name())) {
assertTrue(clusterStateRepoMetadata.equalsIgnoreGenerations(repositoryMetadata));
} else if (repositoryMetadata.name().equals(routingTableRepoMetadata.name())) {
assertTrue(routingTableRepoMetadata.equalsIgnoreGenerations(repositoryMetadata));
}
}
}

private void validateRepositoriesMetadata(ClusterState updatedState, DiscoveryNode remoteNode, DiscoveryNode publicationNode)
throws Exception {

final RepositoriesMetadata repositoriesMetadata = updatedState.metadata().custom(RepositoriesMetadata.TYPE);
assertEquals(4, repositoriesMetadata.repositories().size());
final RepositoryMetadata segmentRepositoryMetadata = buildRepositoryMetadata(remoteNode, SEGMENT_REPO);
final RepositoryMetadata translogRepositoryMetadata = buildRepositoryMetadata(remoteNode, TRANSLOG_REPO);
final RepositoryMetadata clusterStateRepositoryMetadata = buildRepositoryMetadata(remoteNode, CLUSTER_STATE_REPO);
final RepositoryMetadata routingTableRepositoryMetadata = buildRepositoryMetadata(publicationNode, ROUTING_TABLE_REPO);

for (RepositoryMetadata repositoryMetadata : repositoriesMetadata.repositories()) {
if (repositoryMetadata.name().equals(segmentRepositoryMetadata.name())) {
assertTrue(segmentRepositoryMetadata.equalsIgnoreGenerations(repositoryMetadata));
} else if (repositoryMetadata.name().equals(translogRepositoryMetadata.name())) {
assertTrue(translogRepositoryMetadata.equalsIgnoreGenerations(repositoryMetadata));
} else if (repositoryMetadata.name().equals(clusterStateRepositoryMetadata.name())) {
assertTrue(clusterStateRepositoryMetadata.equalsIgnoreGenerations(repositoryMetadata));
} else if (repositoryMetadata.name().equals(routingTableRepositoryMetadata.name())) {
assertTrue(repositoryMetadata.equalsIgnoreGenerations(routingTableRepositoryMetadata));
}
}
}

private DiscoveryNode newDiscoveryNode(Map<String, String> attributes) {
return new DiscoveryNode(
randomAlphaOfLength(10),
Expand Down

0 comments on commit c23f86e

Please sign in to comment.