From ef77db6a28d0c24e373c2df9b93d6c4b917070b3 Mon Sep 17 00:00:00 2001 From: Lakshya Taragi Date: Tue, 9 Apr 2024 19:20:06 +0530 Subject: [PATCH] Prefer replicas on remote nodes Signed-off-by: Lakshya Taragi --- .../cluster/routing/RoutingNodes.java | 43 ++++++- .../allocation/FailedShardsRoutingTests.java | 114 ++++++++++++++++++ 2 files changed, 152 insertions(+), 5 deletions(-) diff --git a/server/src/main/java/org/opensearch/cluster/routing/RoutingNodes.java b/server/src/main/java/org/opensearch/cluster/routing/RoutingNodes.java index 938a603c459c9..27056559d3ddf 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/RoutingNodes.java +++ b/server/src/main/java/org/opensearch/cluster/routing/RoutingNodes.java @@ -44,6 +44,7 @@ import org.opensearch.common.Randomness; import org.opensearch.common.annotation.PublicApi; import org.opensearch.common.collect.Tuple; +import org.opensearch.common.settings.ClusterSettings; import org.opensearch.core.Assertions; import org.opensearch.core.index.Index; import org.opensearch.core.index.shard.ShardId; @@ -61,12 +62,15 @@ import java.util.NoSuchElementException; import java.util.Objects; import java.util.Queue; +import java.util.Random; import java.util.Set; import java.util.function.Function; import java.util.function.Predicate; import java.util.stream.Collectors; import java.util.stream.Stream; +import static org.opensearch.node.remotestore.RemoteStoreNodeService.isMigratingToRemoteStore; + /** * {@link RoutingNodes} represents a copy the routing information contained in the {@link ClusterState cluster state}. * It can be either initialized as mutable or immutable (see {@link #RoutingNodes(ClusterState, boolean)}), allowing @@ -418,6 +422,29 @@ public ShardRouting activeReplicaWithOldestVersion(ShardId shardId) { .orElse(null); } + /** + * Returns one active replica shard on a remote node for the given shard id or null if + * no such replica is found. + *

+ * Since we aim to continue moving forward during remote store migration, replicas already migrated to remote nodes + * are preferred for primary promotion + */ + public ShardRouting activeReplicaOnRemoteNode(ShardId shardId) { + List replicaShardsOnRemote = assignedShards(shardId).stream() + .filter(shr -> !shr.primary() && shr.active()) + .filter((shr) -> { + RoutingNode nd = node(shr.currentNodeId()); + return (nd != null && nd.node().isRemoteStoreNode()); + }) + .collect(Collectors.toList()); + ShardRouting replicaShard = null; + if (replicaShardsOnRemote.isEmpty() == false) { + Random rand = Randomness.get(); + replicaShard = replicaShardsOnRemote.get(rand.nextInt(replicaShardsOnRemote.size())); + } + return replicaShard; + } + /** * Returns true iff all replicas are active for the given shard routing. Otherwise false */ @@ -735,11 +762,17 @@ private void unassignPrimaryAndPromoteActiveReplicaIfExists( RoutingChangesObserver routingChangesObserver ) { assert failedShard.primary(); - ShardRouting activeReplica; - if (metadata.isSegmentReplicationEnabled(failedShard.getIndexName())) { - activeReplica = activeReplicaWithOldestVersion(failedShard.shardId()); - } else { - activeReplica = activeReplicaWithHighestVersion(failedShard.shardId()); + ShardRouting activeReplica = null; + if (isMigratingToRemoteStore(new ClusterSettings(metadata.settings(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS))) { + // we might not find any replica on remote node + activeReplica = activeReplicaOnRemoteNode(failedShard.shardId()); + } + if (activeReplica == null) { + if (metadata.isSegmentReplicationEnabled(failedShard.getIndexName())) { + activeReplica = activeReplicaWithOldestVersion(failedShard.shardId()); + } else { + activeReplica = activeReplicaWithHighestVersion(failedShard.shardId()); + } } if (activeReplica == null) { moveToUnassigned(failedShard, unassignedInfo); diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/FailedShardsRoutingTests.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/FailedShardsRoutingTests.java index db4cedbbbe7b5..02173f43938b7 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/allocation/FailedShardsRoutingTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/FailedShardsRoutingTests.java @@ -48,12 +48,15 @@ import org.opensearch.cluster.routing.allocation.command.MoveAllocationCommand; import org.opensearch.cluster.routing.allocation.decider.ClusterRebalanceAllocationDecider; import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.FeatureFlags; import org.opensearch.core.index.shard.ShardId; import org.opensearch.indices.replication.common.ReplicationType; +import org.opensearch.node.remotestore.RemoteStoreNodeService; import org.opensearch.test.VersionUtils; import java.util.ArrayList; import java.util.HashSet; +import java.util.List; import java.util.Set; import static org.opensearch.cluster.ClusterName.CLUSTER_NAME_SETTING; @@ -61,6 +64,9 @@ import static org.opensearch.cluster.routing.ShardRoutingState.RELOCATING; import static org.opensearch.cluster.routing.ShardRoutingState.STARTED; import static org.opensearch.cluster.routing.ShardRoutingState.UNASSIGNED; +import static org.opensearch.common.util.FeatureFlags.REMOTE_STORE_MIGRATION_EXPERIMENTAL; +import static org.opensearch.node.remotestore.RemoteStoreNodeService.MIGRATION_DIRECTION_SETTING; +import static org.opensearch.node.remotestore.RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING; import static org.hamcrest.Matchers.anyOf; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.lessThan; @@ -812,4 +818,112 @@ private void testReplicaIsPromoted(boolean isSegmentReplicationEnabled) { } } } + + public void testPreferReplicaOnRemoteNodeForPrimaryPromotion() { + FeatureFlags.initializeFeatureFlags(Settings.builder().put(REMOTE_STORE_MIGRATION_EXPERIMENTAL, "true").build()); + AllocationService allocation = createAllocationService(Settings.builder().build()); + + // segment replication enabled + Settings.Builder settingsBuilder = settings(Version.CURRENT).put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT); + + // remote store migration metadata settings + Metadata metadata = Metadata.builder() + .put(IndexMetadata.builder("test").settings(settingsBuilder).numberOfShards(1).numberOfReplicas(4)) + .persistentSettings( + Settings.builder() + .put(REMOTE_STORE_COMPATIBILITY_MODE_SETTING.getKey(), RemoteStoreNodeService.CompatibilityMode.MIXED.mode) + .put(MIGRATION_DIRECTION_SETTING.getKey(), RemoteStoreNodeService.Direction.REMOTE_STORE.direction) + .build() + ) + .build(); + + RoutingTable initialRoutingTable = RoutingTable.builder().addAsNew(metadata.index("test")).build(); + + ClusterState clusterState = ClusterState.builder(CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) + .metadata(metadata) + .routingTable(initialRoutingTable) + .build(); + + ShardId shardId = new ShardId(metadata.index("test").getIndex(), 0); + + // add a remote node and start primary shard + DiscoveryNode remoteNode1 = RemoteStoreMigrationAllocationDeciderTests.getRemoteNode(); + clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder().add(remoteNode1)).build(); + clusterState = ClusterState.builder(clusterState).routingTable(allocation.reroute(clusterState, "reroute").routingTable()).build(); + assertThat(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(1)); + assertThat(clusterState.getRoutingNodes().shardsWithState(UNASSIGNED).size(), equalTo(4)); + + clusterState = startInitializingShardsAndReroute(allocation, clusterState); + assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(1)); + assertThat(clusterState.getRoutingNodes().shardsWithState(UNASSIGNED).size(), equalTo(4)); + + // add remote and non-remote nodes and start replica shards + DiscoveryNode remoteNode2 = RemoteStoreMigrationAllocationDeciderTests.getRemoteNode(); + DiscoveryNode remoteNode3 = RemoteStoreMigrationAllocationDeciderTests.getRemoteNode(); + DiscoveryNode nonRemoteNode1 = RemoteStoreMigrationAllocationDeciderTests.getNonRemoteNode(); + DiscoveryNode nonRemoteNode2 = RemoteStoreMigrationAllocationDeciderTests.getNonRemoteNode(); + List replicaShardNodes = List.of(remoteNode2, remoteNode3, nonRemoteNode1, nonRemoteNode2); + + for (int i = 0; i < 4; i++) { + clusterState = ClusterState.builder(clusterState) + .nodes(DiscoveryNodes.builder(clusterState.nodes()).add(replicaShardNodes.get(i))) + .build(); + + clusterState = allocation.reroute(clusterState, "reroute"); + assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(1 + i)); + assertThat(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(1)); + assertThat(clusterState.getRoutingNodes().shardsWithState(UNASSIGNED).size(), equalTo(4 - (i + 1))); + + clusterState = startInitializingShardsAndReroute(allocation, clusterState); + assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(1 + (i + 1))); + assertThat(clusterState.getRoutingNodes().shardsWithState(UNASSIGNED).size(), equalTo(4 - (i + 1))); + } + + // fail primary shard + ShardRouting primaryShard0 = clusterState.routingTable().index("test").shard(0).primaryShard(); + ClusterState newState = allocation.applyFailedShard(clusterState, primaryShard0, randomBoolean()); + assertNotEquals(clusterState, newState); + clusterState = newState; + + // verify that promoted replica exists on a remote node + assertEquals(4, clusterState.getRoutingNodes().shardsWithState(STARTED).size()); + ShardRouting primaryShardRouting1 = clusterState.routingTable().index("test").shard(0).primaryShard(); + assertNotEquals(primaryShard0, primaryShardRouting1); + assertTrue( + primaryShardRouting1.currentNodeId().equals(remoteNode2.getId()) + || primaryShardRouting1.currentNodeId().equals(remoteNode3.getId()) + ); + + // fail primary shard again + newState = allocation.applyFailedShard(clusterState, primaryShardRouting1, randomBoolean()); + assertNotEquals(clusterState, newState); + clusterState = newState; + + // verify that promoted replica again exists on a remote node + assertEquals(3, clusterState.getRoutingNodes().shardsWithState(STARTED).size()); + ShardRouting primaryShardRouting2 = clusterState.routingTable().index("test").shard(0).primaryShard(); + assertNotEquals(primaryShardRouting1, primaryShardRouting2); + assertTrue( + primaryShardRouting2.currentNodeId().equals(remoteNode2.getId()) + || primaryShardRouting2.currentNodeId().equals(remoteNode3.getId()) + ); + assertNotEquals(primaryShardRouting1.currentNodeId(), primaryShardRouting2.currentNodeId()); + + ShardRouting expectedCandidateForSegRep = clusterState.getRoutingNodes().activeReplicaWithOldestVersion(shardId); + + // fail primary shard again + newState = allocation.applyFailedShard(clusterState, primaryShardRouting2, randomBoolean()); + assertNotEquals(clusterState, newState); + clusterState = newState; + + // verify that promoted replica exists on a non-remote node + assertEquals(2, clusterState.getRoutingNodes().shardsWithState(STARTED).size()); + ShardRouting primaryShardRouting3 = clusterState.routingTable().index("test").shard(0).primaryShard(); + assertNotEquals(primaryShardRouting2, primaryShardRouting3); + assertTrue( + primaryShardRouting3.currentNodeId().equals(nonRemoteNode1.getId()) + || primaryShardRouting3.currentNodeId().equals(nonRemoteNode2.getId()) + ); + assertEquals(expectedCandidateForSegRep.allocationId(), primaryShardRouting3.allocationId()); + } }