From 6fbd941087d8790a449538d3de0bc57f7a200ba3 Mon Sep 17 00:00:00 2001 From: Lakshya Taragi <157457166+ltaragi@users.noreply.github.com> Date: Tue, 23 Apr 2024 15:59:52 +0530 Subject: [PATCH] Prefer remote replicas for primary promotion during migration (#13136) Signed-off-by: Lakshya Taragi (cherry picked from commit efa06feeabd336e3d95764d072b116250c01d8f5) --- .../RemoteDualReplicationIT.java | 135 +++++++++++++++++ .../cluster/routing/RoutingNodes.java | 32 +++- .../remotestore/RemoteStoreNodeService.java | 14 ++ .../allocation/FailedShardsRoutingTests.java | 140 ++++++++++++++++++ 4 files changed, 316 insertions(+), 5 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/remotemigration/RemoteDualReplicationIT.java b/server/src/internalClusterTest/java/org/opensearch/remotemigration/RemoteDualReplicationIT.java index e316bae5d8ebc..18f07910403d4 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotemigration/RemoteDualReplicationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotemigration/RemoteDualReplicationIT.java @@ -439,6 +439,141 @@ public void testFailoverRemotePrimaryToDocrepReplica() throws Exception { ); } + /* + Scenario: + - Starts 1 docrep backed data node + - Creates an index with 0 replica + - Starts 1 remote backed data node + - Moves primary copy from docrep to remote through _cluster/reroute + - Starts 1 more remote backed data node + - Expands index to 2 replicas, one each on new remote node and docrep node + - Stops remote enabled node hosting the primary + - Ensures remote replica gets promoted to primary + - Ensures doc count is same after failover + - Indexes some more docs to ensure working of failed-over primary + */ + public void testFailoverRemotePrimaryToRemoteReplica() throws Exception { + internalCluster().startClusterManagerOnlyNode(); + + logger.info("---> Starting 1 docrep data node"); + String docrepNodeName = internalCluster().startDataOnlyNode(); + internalCluster().validateClusterFormed(); + assertEquals(internalCluster().client().admin().cluster().prepareGetRepositories().get().repositories().size(), 0); + + logger.info("---> Creating index with 0 replica"); + createIndex(FAILOVER_REMOTE_TO_REMOTE, Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0).build()); + ensureGreen(FAILOVER_REMOTE_TO_REMOTE); + initDocRepToRemoteMigration(); + + logger.info("---> Starting 1 remote enabled data node"); + addRemote = true; + String remoteNodeName1 = internalCluster().startDataOnlyNode(); + internalCluster().validateClusterFormed(); + assertEquals( + internalCluster().client() + .admin() + .cluster() + .prepareGetRepositories(REPOSITORY_NAME, REPOSITORY_2_NAME) + .get() + .repositories() + .size(), + 2 + ); + + logger.info("---> Starting doc ingestion in parallel thread"); + AsyncIndexingService asyncIndexingService = new AsyncIndexingService(FAILOVER_REMOTE_TO_REMOTE); + asyncIndexingService.startIndexing(); + + logger.info("---> Moving primary copy from docrep node {} to remote enabled node {}", docrepNodeName, remoteNodeName1); + assertAcked( + internalCluster().client() + .admin() + .cluster() + .prepareReroute() + .add(new MoveAllocationCommand(FAILOVER_REMOTE_TO_REMOTE, 0, docrepNodeName, remoteNodeName1)) + .get() + ); + ensureGreen(FAILOVER_REMOTE_TO_REMOTE); + assertEquals(primaryNodeName(FAILOVER_REMOTE_TO_REMOTE), remoteNodeName1); + + logger.info("---> Starting 1 more remote enabled data node"); + String remoteNodeName2 = internalCluster().startDataOnlyNode(); + internalCluster().validateClusterFormed(); + + logger.info("---> Expanding index to 2 replica copies, on docrepNode and remoteNode2"); + assertAcked( + internalCluster().client() + .admin() + .indices() + .prepareUpdateSettings() + .setIndices(FAILOVER_REMOTE_TO_REMOTE) + .setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 2).build()) + .get() + ); + ensureGreen(FAILOVER_REMOTE_TO_REMOTE); + + logger.info("---> Stopping indexing thread"); + asyncIndexingService.stopIndexing(); + + refreshAndWaitForReplication(FAILOVER_REMOTE_TO_REMOTE); + Map shardStatsMap = internalCluster().client() + .admin() + .indices() + .prepareStats(FAILOVER_REMOTE_TO_REMOTE) + .setDocs(true) + .get() + .asMap(); + DiscoveryNodes nodes = internalCluster().client().admin().cluster().prepareState().get().getState().getNodes(); + long initialPrimaryDocCount = 0; + for (ShardRouting shardRouting : shardStatsMap.keySet()) { + if (shardRouting.primary()) { + assertTrue(nodes.get(shardRouting.currentNodeId()).isRemoteStoreNode()); + initialPrimaryDocCount = shardStatsMap.get(shardRouting).getStats().getDocs().getCount(); + } + } + int firstBatch = (int) asyncIndexingService.getIndexedDocs(); + assertReplicaAndPrimaryConsistency(FAILOVER_REMOTE_TO_REMOTE, firstBatch, 0); + + logger.info("---> Stop remote store enabled node hosting the primary"); + internalCluster().stopRandomNode(InternalTestCluster.nameFilter(remoteNodeName1)); + ensureStableCluster(3); + ensureYellow(FAILOVER_REMOTE_TO_REMOTE); + DiscoveryNodes finalNodes = internalCluster().client().admin().cluster().prepareState().get().getState().getNodes(); + + waitUntil(() -> { + ClusterState clusterState = client().admin().cluster().prepareState().get().getState(); + String nodeId = clusterState.getRoutingTable().index(FAILOVER_REMOTE_TO_REMOTE).shard(0).primaryShard().currentNodeId(); + if (nodeId == null) { + return false; + } else { + assertEquals(finalNodes.get(nodeId).getName(), remoteNodeName2); + return finalNodes.get(nodeId).isRemoteStoreNode(); + } + }); + + shardStatsMap = internalCluster().client().admin().indices().prepareStats(FAILOVER_REMOTE_TO_REMOTE).setDocs(true).get().asMap(); + long primaryDocCountAfterFailover = 0; + for (ShardRouting shardRouting : shardStatsMap.keySet()) { + if (shardRouting.primary()) { + assertTrue(finalNodes.get(shardRouting.currentNodeId()).isRemoteStoreNode()); + primaryDocCountAfterFailover = shardStatsMap.get(shardRouting).getStats().getDocs().getCount(); + } + } + assertEquals(initialPrimaryDocCount, primaryDocCountAfterFailover); + + logger.info("---> Index some more docs to ensure that the failed over primary is ingesting new docs"); + int secondBatch = randomIntBetween(1, 10); + logger.info("---> Indexing {} more docs", secondBatch); + indexBulk(FAILOVER_REMOTE_TO_REMOTE, secondBatch); + refreshAndWaitForReplication(FAILOVER_REMOTE_TO_REMOTE); + + shardStatsMap = internalCluster().client().admin().indices().prepareStats(FAILOVER_REMOTE_TO_REMOTE).setDocs(true).get().asMap(); + assertEquals(2, shardStatsMap.size()); + shardStatsMap.forEach( + (shardRouting, shardStats) -> { assertEquals(firstBatch + secondBatch, shardStats.getStats().getDocs().getCount()); } + ); + } + /* Scenario: - Starts 1 docrep backed data node diff --git a/server/src/main/java/org/opensearch/cluster/routing/RoutingNodes.java b/server/src/main/java/org/opensearch/cluster/routing/RoutingNodes.java index 35752027e46ea..3926eb0be707b 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/RoutingNodes.java +++ b/server/src/main/java/org/opensearch/cluster/routing/RoutingNodes.java @@ -67,6 +67,8 @@ import java.util.stream.Collectors; import java.util.stream.Stream; +import static org.opensearch.node.remotestore.RemoteStoreNodeService.isMigratingToRemoteStore; + /** * {@link RoutingNodes} represents a copy the routing information contained in the {@link ClusterState cluster state}. * It can be either initialized as mutable or immutable (see {@link #RoutingNodes(ClusterState, boolean)}), allowing @@ -418,6 +420,20 @@ public ShardRouting activeReplicaWithOldestVersion(ShardId shardId) { .orElse(null); } + /** + * Returns one active replica shard on a remote node for the given shard id or null if + * no such replica is found. + *

+ * Since we aim to continue moving forward during remote store migration, replicas already migrated to remote nodes + * are preferred for primary promotion + */ + public ShardRouting activeReplicaOnRemoteNode(ShardId shardId) { + return assignedShards(shardId).stream().filter(shr -> !shr.primary() && shr.active()).filter((shr) -> { + RoutingNode nd = node(shr.currentNodeId()); + return (nd != null && nd.node().isRemoteStoreNode()); + }).findFirst().orElse(null); + } + /** * Returns true iff all replicas are active for the given shard routing. Otherwise false */ @@ -735,11 +751,17 @@ private void unassignPrimaryAndPromoteActiveReplicaIfExists( RoutingChangesObserver routingChangesObserver ) { assert failedShard.primary(); - ShardRouting activeReplica; - if (metadata.isSegmentReplicationEnabled(failedShard.getIndexName())) { - activeReplica = activeReplicaWithOldestVersion(failedShard.shardId()); - } else { - activeReplica = activeReplicaWithHighestVersion(failedShard.shardId()); + ShardRouting activeReplica = null; + if (isMigratingToRemoteStore(metadata)) { + // we might not find any replica on remote node + activeReplica = activeReplicaOnRemoteNode(failedShard.shardId()); + } + if (activeReplica == null) { + if (metadata.isSegmentReplicationEnabled(failedShard.getIndexName())) { + activeReplica = activeReplicaWithOldestVersion(failedShard.shardId()); + } else { + activeReplica = activeReplicaWithHighestVersion(failedShard.shardId()); + } } if (activeReplica == null) { moveToUnassigned(failedShard, unassignedInfo); diff --git a/server/src/main/java/org/opensearch/node/remotestore/RemoteStoreNodeService.java b/server/src/main/java/org/opensearch/node/remotestore/RemoteStoreNodeService.java index 94b11086ba865..adfb751421db7 100644 --- a/server/src/main/java/org/opensearch/node/remotestore/RemoteStoreNodeService.java +++ b/server/src/main/java/org/opensearch/node/remotestore/RemoteStoreNodeService.java @@ -11,6 +11,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; +import org.opensearch.cluster.metadata.Metadata; import org.opensearch.cluster.metadata.RepositoriesMetadata; import org.opensearch.cluster.metadata.RepositoryMetadata; import org.opensearch.cluster.node.DiscoveryNode; @@ -238,4 +239,17 @@ public static boolean isMigratingToRemoteStore(ClusterSettings clusterSettings) return (isMixedMode && isRemoteStoreMigrationDirection); } + + /** + * To check if the cluster is undergoing remote store migration using clusterState metadata + * @return + * true For REMOTE_STORE migration direction and MIXED compatibility mode, + * false otherwise + */ + public static boolean isMigratingToRemoteStore(Metadata metadata) { + boolean isMixedMode = REMOTE_STORE_COMPATIBILITY_MODE_SETTING.get(metadata.settings()).equals(CompatibilityMode.MIXED); + boolean isRemoteStoreMigrationDirection = MIGRATION_DIRECTION_SETTING.get(metadata.settings()).equals(Direction.REMOTE_STORE); + + return (isMixedMode && isRemoteStoreMigrationDirection); + } } diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/FailedShardsRoutingTests.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/FailedShardsRoutingTests.java index db4cedbbbe7b5..04e37e7d958d0 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/allocation/FailedShardsRoutingTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/FailedShardsRoutingTests.java @@ -40,6 +40,7 @@ import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.metadata.Metadata; import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.node.DiscoveryNodeRole; import org.opensearch.cluster.node.DiscoveryNodes; import org.opensearch.cluster.routing.RoutingNodes; import org.opensearch.cluster.routing.RoutingTable; @@ -47,13 +48,18 @@ import org.opensearch.cluster.routing.allocation.command.AllocationCommands; import org.opensearch.cluster.routing.allocation.command.MoveAllocationCommand; import org.opensearch.cluster.routing.allocation.decider.ClusterRebalanceAllocationDecider; +import org.opensearch.common.UUIDs; import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.FeatureFlags; import org.opensearch.core.index.shard.ShardId; import org.opensearch.indices.replication.common.ReplicationType; +import org.opensearch.node.remotestore.RemoteStoreNodeService; import org.opensearch.test.VersionUtils; import java.util.ArrayList; import java.util.HashSet; +import java.util.List; +import java.util.Map; import java.util.Set; import static org.opensearch.cluster.ClusterName.CLUSTER_NAME_SETTING; @@ -61,6 +67,10 @@ import static org.opensearch.cluster.routing.ShardRoutingState.RELOCATING; import static org.opensearch.cluster.routing.ShardRoutingState.STARTED; import static org.opensearch.cluster.routing.ShardRoutingState.UNASSIGNED; +import static org.opensearch.common.util.FeatureFlags.REMOTE_STORE_MIGRATION_EXPERIMENTAL; +import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY; +import static org.opensearch.node.remotestore.RemoteStoreNodeService.MIGRATION_DIRECTION_SETTING; +import static org.opensearch.node.remotestore.RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING; import static org.hamcrest.Matchers.anyOf; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.lessThan; @@ -812,4 +822,134 @@ private void testReplicaIsPromoted(boolean isSegmentReplicationEnabled) { } } } + + public void testPreferReplicaOnRemoteNodeForPrimaryPromotion() { + FeatureFlags.initializeFeatureFlags(Settings.builder().put(REMOTE_STORE_MIGRATION_EXPERIMENTAL, "true").build()); + AllocationService allocation = createAllocationService(Settings.builder().build()); + + // segment replication enabled + Settings.Builder settingsBuilder = settings(Version.CURRENT).put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT); + + // remote store migration metadata settings + Metadata metadata = Metadata.builder() + .put(IndexMetadata.builder("test").settings(settingsBuilder).numberOfShards(1).numberOfReplicas(4)) + .persistentSettings( + Settings.builder() + .put(REMOTE_STORE_COMPATIBILITY_MODE_SETTING.getKey(), RemoteStoreNodeService.CompatibilityMode.MIXED.mode) + .put(MIGRATION_DIRECTION_SETTING.getKey(), RemoteStoreNodeService.Direction.REMOTE_STORE.direction) + .build() + ) + .build(); + + RoutingTable initialRoutingTable = RoutingTable.builder().addAsNew(metadata.index("test")).build(); + + ClusterState clusterState = ClusterState.builder(CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) + .metadata(metadata) + .routingTable(initialRoutingTable) + .build(); + + ShardId shardId = new ShardId(metadata.index("test").getIndex(), 0); + + // add a remote node and start primary shard + Map remoteStoreNodeAttributes = Map.of( + REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY, + "REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_VALUE" + ); + DiscoveryNode remoteNode1 = new DiscoveryNode( + UUIDs.base64UUID(), + buildNewFakeTransportAddress(), + remoteStoreNodeAttributes, + DiscoveryNodeRole.BUILT_IN_ROLES, + Version.CURRENT + ); + clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder().add(remoteNode1)).build(); + clusterState = ClusterState.builder(clusterState).routingTable(allocation.reroute(clusterState, "reroute").routingTable()).build(); + assertThat(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(1)); + assertThat(clusterState.getRoutingNodes().shardsWithState(UNASSIGNED).size(), equalTo(4)); + + clusterState = startInitializingShardsAndReroute(allocation, clusterState); + assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(1)); + assertThat(clusterState.getRoutingNodes().shardsWithState(UNASSIGNED).size(), equalTo(4)); + + // add remote and non-remote nodes and start replica shards + DiscoveryNode remoteNode2 = new DiscoveryNode( + UUIDs.base64UUID(), + buildNewFakeTransportAddress(), + remoteStoreNodeAttributes, + DiscoveryNodeRole.BUILT_IN_ROLES, + Version.CURRENT + ); + DiscoveryNode remoteNode3 = new DiscoveryNode( + UUIDs.base64UUID(), + buildNewFakeTransportAddress(), + remoteStoreNodeAttributes, + DiscoveryNodeRole.BUILT_IN_ROLES, + Version.CURRENT + ); + DiscoveryNode nonRemoteNode1 = new DiscoveryNode(UUIDs.base64UUID(), buildNewFakeTransportAddress(), Version.CURRENT); + DiscoveryNode nonRemoteNode2 = new DiscoveryNode(UUIDs.base64UUID(), buildNewFakeTransportAddress(), Version.CURRENT); + List replicaShardNodes = List.of(remoteNode2, remoteNode3, nonRemoteNode1, nonRemoteNode2); + + for (int i = 0; i < 4; i++) { + clusterState = ClusterState.builder(clusterState) + .nodes(DiscoveryNodes.builder(clusterState.nodes()).add(replicaShardNodes.get(i))) + .build(); + + clusterState = allocation.reroute(clusterState, "reroute"); + assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(1 + i)); + assertThat(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(1)); + assertThat(clusterState.getRoutingNodes().shardsWithState(UNASSIGNED).size(), equalTo(4 - (i + 1))); + + clusterState = startInitializingShardsAndReroute(allocation, clusterState); + assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(1 + (i + 1))); + assertThat(clusterState.getRoutingNodes().shardsWithState(UNASSIGNED).size(), equalTo(4 - (i + 1))); + } + + // fail primary shard + ShardRouting primaryShard0 = clusterState.routingTable().index("test").shard(0).primaryShard(); + ClusterState newState = allocation.applyFailedShard(clusterState, primaryShard0, randomBoolean()); + assertNotEquals(clusterState, newState); + clusterState = newState; + + // verify that promoted replica exists on a remote node + assertEquals(4, clusterState.getRoutingNodes().shardsWithState(STARTED).size()); + ShardRouting primaryShardRouting1 = clusterState.routingTable().index("test").shard(0).primaryShard(); + assertNotEquals(primaryShard0, primaryShardRouting1); + assertTrue( + primaryShardRouting1.currentNodeId().equals(remoteNode2.getId()) + || primaryShardRouting1.currentNodeId().equals(remoteNode3.getId()) + ); + + // fail primary shard again + newState = allocation.applyFailedShard(clusterState, primaryShardRouting1, randomBoolean()); + assertNotEquals(clusterState, newState); + clusterState = newState; + + // verify that promoted replica again exists on a remote node + assertEquals(3, clusterState.getRoutingNodes().shardsWithState(STARTED).size()); + ShardRouting primaryShardRouting2 = clusterState.routingTable().index("test").shard(0).primaryShard(); + assertNotEquals(primaryShardRouting1, primaryShardRouting2); + assertTrue( + primaryShardRouting2.currentNodeId().equals(remoteNode2.getId()) + || primaryShardRouting2.currentNodeId().equals(remoteNode3.getId()) + ); + assertNotEquals(primaryShardRouting1.currentNodeId(), primaryShardRouting2.currentNodeId()); + + ShardRouting expectedCandidateForSegRep = clusterState.getRoutingNodes().activeReplicaWithOldestVersion(shardId); + + // fail primary shard again + newState = allocation.applyFailedShard(clusterState, primaryShardRouting2, randomBoolean()); + assertNotEquals(clusterState, newState); + clusterState = newState; + + // verify that promoted replica exists on a non-remote node + assertEquals(2, clusterState.getRoutingNodes().shardsWithState(STARTED).size()); + ShardRouting primaryShardRouting3 = clusterState.routingTable().index("test").shard(0).primaryShard(); + assertNotEquals(primaryShardRouting2, primaryShardRouting3); + assertTrue( + primaryShardRouting3.currentNodeId().equals(nonRemoteNode1.getId()) + || primaryShardRouting3.currentNodeId().equals(nonRemoteNode2.getId()) + ); + assertEquals(expectedCandidateForSegRep.allocationId(), primaryShardRouting3.allocationId()); + } }