From e3b5015dd50ad2ce45655fe1ec7a2f368ac0db02 Mon Sep 17 00:00:00 2001 From: Anshu Agarwal Date: Tue, 10 Jan 2023 12:28:52 +0530 Subject: [PATCH] [Weighted Shard Routing] Fail open requests on search shard failures (#5072) * Fail open requests on search shard failures ( Signed-off-by: Anshu Agarwal --- CHANGELOG.md | 2 + .../search/SearchWeightedRoutingIT.java | 643 +++++++++++++++++- ...TransportFieldCapabilitiesIndexAction.java | 10 +- .../search/AbstractSearchAsyncAction.java | 4 +- .../action/search/SearchShardIterator.java | 2 +- .../broadcast/TransportBroadcastAction.java | 4 +- .../shard/TransportSingleShardAction.java | 7 +- .../metadata/WeightedRoutingMetadata.java | 1 + .../routing/FailAwareWeightedRouting.java | 156 +++++ .../routing/IndexShardRoutingTable.java | 42 +- .../cluster/routing/OperationRouting.java | 17 +- .../common/settings/ClusterSettings.java | 1 + .../FailAwareWeightedRoutingTests.java | 266 ++++++++ .../routing/OperationRoutingTests.java | 1 + .../structure/RoutingIteratorTests.java | 135 +++- 15 files changed, 1239 insertions(+), 52 deletions(-) create mode 100644 server/src/main/java/org/opensearch/cluster/routing/FailAwareWeightedRouting.java create mode 100644 server/src/test/java/org/opensearch/cluster/routing/FailAwareWeightedRoutingTests.java diff --git a/CHANGELOG.md b/CHANGELOG.md index 2701cb9598ac1..244290f52eb91 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,4 @@ + # CHANGELOG All notable changes to this project are documented in this file. @@ -15,6 +16,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - [Test] Add IAE test for deprecated edgeNGram analyzer name ([#5040](https://github.com/opensearch-project/OpenSearch/pull/5040)) - Allow mmap to use new JDK-19 preview APIs in Apache Lucene 9.4+ ([#5151](https://github.com/opensearch-project/OpenSearch/pull/5151)) - Add feature flag for extensions ([#5211](https://github.com/opensearch-project/OpenSearch/pull/5211)) +- Support to fail open requests on search shard failures with weighted traffic routing ([#5072](https://github.com/opensearch-project/OpenSearch/pull/5072)) - Added jackson dependency to server ([#5366] (https://github.com/opensearch-project/OpenSearch/pull/5366)) - Adding support to register settings dynamically ([#5495](https://github.com/opensearch-project/OpenSearch/pull/5495)) - Added experimental support for extensions ([#5347](https://github.com/opensearch-project/OpenSearch/pull/5347)), ([#5518](https://github.com/opensearch-project/OpenSearch/pull/5518), ([#5597](https://github.com/opensearch-project/OpenSearch/pull/5597)), ([#5615](https://github.com/opensearch-project/OpenSearch/pull/5615))) diff --git a/server/src/internalClusterTest/java/org/opensearch/search/SearchWeightedRoutingIT.java b/server/src/internalClusterTest/java/org/opensearch/search/SearchWeightedRoutingIT.java index 80e7f22c47c58..b0afbc6983c95 100644 --- a/server/src/internalClusterTest/java/org/opensearch/search/SearchWeightedRoutingIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/search/SearchWeightedRoutingIT.java @@ -14,38 +14,60 @@ import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse; import org.opensearch.action.admin.cluster.shards.routing.weighted.delete.ClusterDeleteWeightedRoutingResponse; import org.opensearch.action.admin.cluster.shards.routing.weighted.put.ClusterPutWeightedRoutingResponse; +import org.opensearch.action.get.MultiGetRequest; +import org.opensearch.action.get.MultiGetResponse; +import org.opensearch.action.index.IndexRequestBuilder; import org.opensearch.action.search.SearchResponse; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.node.DiscoveryNodes; import org.opensearch.cluster.routing.WeightedRouting; import org.opensearch.cluster.routing.allocation.decider.AwarenessAllocationDecider; +import org.opensearch.common.collect.ImmutableOpenMap; import org.opensearch.common.settings.Settings; import org.opensearch.index.query.QueryBuilders; import org.opensearch.index.search.stats.SearchStats; +import org.opensearch.plugins.Plugin; +import org.opensearch.search.aggregations.Aggregations; +import org.opensearch.search.aggregations.bucket.terms.Terms; +import org.opensearch.snapshots.mockstore.MockRepository; +import org.opensearch.test.InternalTestCluster; import org.opensearch.test.OpenSearchIntegTestCase; +import org.opensearch.test.disruption.NetworkDisruption; +import org.opensearch.test.transport.MockTransportService; import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.Stream; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; +import static org.opensearch.search.aggregations.AggregationBuilders.terms; @OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0, minNumDataNodes = 3) public class SearchWeightedRoutingIT extends OpenSearchIntegTestCase { + @Override - protected int numberOfReplicas() { - return 2; + protected Collection> nodePlugins() { + return Arrays.asList(MockTransportService.TestPlugin.class, MockRepository.Plugin.class); } public void testSearchWithWRRShardRouting() throws IOException { Settings commonSettings = Settings.builder() .put(AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_FORCE_GROUP_SETTING.getKey() + "zone" + ".values", "a,b,c") .put(AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING.getKey(), "zone") + .put("cluster.routing.weighted.fail_open", false) .build(); logger.info("--> starting 6 nodes on different zones"); @@ -102,7 +124,8 @@ public void testSearchWithWRRShardRouting() throws IOException { hitNodes.add(searchResponse.getHits().getAt(j).getShard().getNodeId()); } } - // search should not go to nodes in zone c + // search should not go to nodes in zone c with weight zero in case + // shard copies are available in other zones assertThat(hitNodes.size(), lessThanOrEqualTo(4)); DiscoveryNodes dataNodes = internalCluster().clusterService().state().nodes(); List nodeIdsFromZoneWithWeightZero = new ArrayList<>(); @@ -159,4 +182,618 @@ public void testSearchWithWRRShardRouting() throws IOException { } } + private Map> setupCluster(int nodeCountPerAZ, Settings commonSettings) { + + logger.info("--> starting a dedicated cluster manager node"); + internalCluster().startClusterManagerOnlyNode(Settings.builder().put(commonSettings).build()); + + logger.info("--> starting 1 nodes on zones 'a' & 'b' & 'c'"); + Map> nodeMap = new HashMap<>(); + List nodes_in_zone_a = internalCluster().startDataOnlyNodes( + nodeCountPerAZ, + Settings.builder().put(commonSettings).put("node.attr.zone", "a").build() + ); + nodeMap.put("a", nodes_in_zone_a); + List nodes_in_zone_b = internalCluster().startDataOnlyNodes( + nodeCountPerAZ, + Settings.builder().put(commonSettings).put("node.attr.zone", "b").build() + ); + nodeMap.put("b", nodes_in_zone_b); + + List nodes_in_zone_c = internalCluster().startDataOnlyNodes( + nodeCountPerAZ, + Settings.builder().put(commonSettings).put("node.attr.zone", "c").build() + ); + nodeMap.put("c", nodes_in_zone_c); + + logger.info("--> waiting for nodes to form a cluster"); + ClusterHealthResponse health = client().admin().cluster().prepareHealth().setWaitForNodes("4").execute().actionGet(); + assertThat(health.isTimedOut(), equalTo(false)); + + ensureGreen(); + return nodeMap; + + } + + private void setUpIndexing(int numShards, int numReplicas) { + assertAcked( + prepareCreate("test").setSettings( + Settings.builder().put("index.number_of_shards", numShards).put("index.number_of_replicas", numReplicas) + ) + ); + ensureGreen(); + + logger.info("--> creating indices for test"); + for (int i = 0; i < 100; i++) { + client().prepareIndex("test").setId("" + i).setSource("field_" + i, "value_" + i).get(); + } + refresh("test"); + } + + private void setShardRoutingWeights(Map weights) { + WeightedRouting weightedRouting = new WeightedRouting("zone", weights); + + ClusterPutWeightedRoutingResponse response = client().admin() + .cluster() + .prepareWeightedRouting() + .setWeightedRouting(weightedRouting) + .setVersion(-1) + .get(); + assertEquals(response.isAcknowledged(), true); + } + + /** + * Shard routing request fail without fail-open if there are no healthy nodes in active az to serve request + * This is tested by setting up a 3 node cluster with one data node per az. + * Weighted shard routing weight is set as 0 for az-c. + * Data nodes in zone a and b are stopped, + * assertions are put to check that search requests fail. + * @throws Exception throws Exception + */ + public void testShardRoutingByStoppingDataNodes_FailOpenDisabled() throws Exception { + + Settings commonSettings = Settings.builder() + .put("cluster.routing.allocation.awareness.attributes", "zone") + .put("cluster.routing.allocation.awareness.force.zone.values", "a,b,c") + .put("cluster.routing.weighted.fail_open", false) + .build(); + + int nodeCountPerAZ = 1; + Map> nodeMap = setupCluster(nodeCountPerAZ, commonSettings); + + int numShards = 1; + int numReplicas = 2; + setUpIndexing(numShards, numReplicas); + + logger.info("--> setting shard routing weights for weighted round robin"); + Map weights = Map.of("a", 1.0, "b", 1.0, "c", 0.0); + setShardRoutingWeights(weights); + + logger.info("--> data nodes in zone a and b are stopped"); + internalCluster().stopRandomNode(InternalTestCluster.nameFilter(nodeMap.get("a").get(0))); + internalCluster().stopRandomNode(InternalTestCluster.nameFilter(nodeMap.get("b").get(0))); + ensureStableCluster(2); + + Set hitNodes = new HashSet<>(); + + // Make Search Requests + Future[] responses = new Future[50]; + for (int i = 0; i < 50; i++) { + responses[i] = internalCluster().smartClient().prepareSearch("test").setQuery(QueryBuilders.matchAllQuery()).execute(); + } + int failedCount = 0; + for (int i = 0; i < 50; i++) { + try { + SearchResponse searchResponse = responses[i].get(); + assertEquals(0, searchResponse.getFailedShards()); + for (int j = 0; j < searchResponse.getHits().getHits().length; j++) { + hitNodes.add(searchResponse.getHits().getAt(j).getShard().getNodeId()); + } + } catch (Exception t) { + failedCount++; + } + } + + Assert.assertTrue(failedCount > 0); + logger.info("--> failed request count is [()]", failedCount); + assertNoSearchInAZ("c"); + } + + /** + * Shard routing request with fail open enabled is served by data nodes in az with weight set as 0, + * in case shard copies are not available in other azs (with fail open enabled) + * This is tested by setting up a 3 node cluster with one data node per az. + * Weighted shard routing weight is set as 0 for az-c. + * Data nodes in zone a and b are stopped, + * assertions are put to make sure shard search requests do not fail. + * @throws IOException throws exception + */ + public void testShardRoutingByStoppingDataNodes_FailOpenEnabled() throws Exception { + + Settings commonSettings = Settings.builder() + .put("cluster.routing.allocation.awareness.attributes", "zone") + .put("cluster.routing.allocation.awareness.force.zone.values", "a,b,c") + .put("cluster.routing.weighted.fail_open", true) + .build(); + + int nodeCountPerAZ = 1; + Map> nodeMap = setupCluster(nodeCountPerAZ, commonSettings); + + int numShards = 1; + int numReplicas = 2; + setUpIndexing(numShards, numReplicas); + + logger.info("--> setting shard routing weights for weighted round robin"); + Map weights = Map.of("a", 1.0, "b", 1.0, "c", 0.0); + setShardRoutingWeights(weights); + + logger.info("--> data nodes in zone a and b are stopped"); + internalCluster().stopRandomNode(InternalTestCluster.nameFilter(nodeMap.get("a").get(0))); + internalCluster().stopRandomNode(InternalTestCluster.nameFilter(nodeMap.get("b").get(0))); + ensureStableCluster(2); + + Set hitNodes = new HashSet<>(); + + // Make Search Requests + Future[] responses = new Future[50]; + for (int i = 0; i < 50; i++) { + responses[i] = internalCluster().smartClient().prepareSearch("test").setQuery(QueryBuilders.matchAllQuery()).execute(); + } + int failedCount = 0; + for (int i = 0; i < 50; i++) { + try { + SearchResponse searchResponse = responses[i].get(); + assertEquals(0, searchResponse.getFailedShards()); + for (int j = 0; j < searchResponse.getHits().getHits().length; j++) { + hitNodes.add(searchResponse.getHits().getAt(j).getShard().getNodeId()); + } + } catch (Exception t) { + failedCount++; + } + } + + Assert.assertTrue(failedCount == 0); + assertSearchInAZ("c"); + } + + /** + * Shard routing request with fail open disabled is not served by data nodes in az with weight set as 0, + * in case shard copies are not available in other azs. + * This is tested by setting up a 3 node cluster with one data node per az. + * Weighted shard routing weight is set as 0 for az-c. + * Indices are created with one replica copy and network disruption is introduced, + * which makes data node in zone-a unresponsive. + * Since there are two copies of a shard, there can be few shards for which copy doesn't exist in zone b. + * Assertions are put to make sure such shard search requests are not served by data node in zone c. + * @throws IOException throws exception + */ + public void testShardRoutingWithNetworkDisruption_FailOpenDisabled() throws Exception { + + Settings commonSettings = Settings.builder() + .put("cluster.routing.allocation.awareness.attributes", "zone") + .put("cluster.routing.allocation.awareness.force.zone.values", "a,b,c") + .put("cluster.routing.weighted.fail_open", false) + .build(); + + int nodeCountPerAZ = 1; + Map> nodeMap = setupCluster(nodeCountPerAZ, commonSettings); + + int numShards = 10; + int numReplicas = 1; + setUpIndexing(numShards, numReplicas); + + logger.info("--> setting shard routing weights for weighted round robin"); + Map weights = Map.of("a", 1.0, "b", 1.0, "c", 0.0); + setShardRoutingWeights(weights); + + logger.info("--> creating network partition disruption"); + final String clusterManagerNode1 = internalCluster().getClusterManagerName(); + Set nodesInOneSide = Stream.of(clusterManagerNode1, nodeMap.get("b").get(0), nodeMap.get("c").get(0)) + .collect(Collectors.toCollection(HashSet::new)); + Set nodesInOtherSide = Stream.of(nodeMap.get("a").get(0)).collect(Collectors.toCollection(HashSet::new)); + + NetworkDisruption networkDisruption = new NetworkDisruption( + new NetworkDisruption.TwoPartitions(nodesInOneSide, nodesInOtherSide), + NetworkDisruption.UNRESPONSIVE + ); + internalCluster().setDisruptionScheme(networkDisruption); + + logger.info("--> network disruption is started"); + networkDisruption.startDisrupting(); + + Set hitNodes = new HashSet<>(); + Future[] responses = new Future[50]; + logger.info("--> making search requests"); + for (int i = 0; i < 50; i++) { + responses[i] = internalCluster().client(nodeMap.get("b").get(0)) + .prepareSearch("test") + .setQuery(QueryBuilders.matchAllQuery()) + .execute(); + } + + logger.info("--> network disruption is stopped"); + networkDisruption.stopDisrupting(); + + int failedShardCount = 0; + + for (int i = 0; i < 50; i++) { + try { + SearchResponse searchResponse = responses[i].get(); + failedShardCount += searchResponse.getFailedShards(); + for (int j = 0; j < searchResponse.getHits().getHits().length; j++) { + hitNodes.add(searchResponse.getHits().getAt(j).getShard().getNodeId()); + } + } catch (Exception t) { + fail("search should not fail"); + } + } + Assert.assertTrue(failedShardCount > 0); + // assert that no search request goes to az with weight zero + assertNoSearchInAZ("c"); + } + + /** + * Shard routing request is served by data nodes in az with weight set as 0, + * in case shard copies are not available in other azs.(with fail open enabled) + * This is tested by setting up a 3 node cluster with one data node per az. + * Weighted shard routing weight is set as 0 for az-c. + * Indices are created with one replica copy and network disruption is introduced, + * which makes data node in zone-a unresponsive. + * Since there are two copies of a shard, there can be few shards for which copy doesn't exist in zone b. + * Assertions are put to make sure such shard search requests are served by data node in zone c. + * @throws IOException throws exception + */ + public void testShardRoutingWithNetworkDisruption_FailOpenEnabled() throws Exception { + + Settings commonSettings = Settings.builder() + .put("cluster.routing.allocation.awareness.attributes", "zone") + .put("cluster.routing.allocation.awareness.force.zone.values", "a,b,c") + .put("cluster.routing.weighted.fail_open", true) + .build(); + + int nodeCountPerAZ = 1; + Map> nodeMap = setupCluster(nodeCountPerAZ, commonSettings); + + int numShards = 10; + int numReplicas = 1; + setUpIndexing(numShards, numReplicas); + + logger.info("--> setting shard routing weights for weighted round robin"); + Map weights = Map.of("a", 1.0, "b", 1.0, "c", 0.0); + setShardRoutingWeights(weights); + + logger.info("--> creating network partition disruption"); + final String clusterManagerNode1 = internalCluster().getClusterManagerName(); + Set nodesInOneSide = Stream.of(clusterManagerNode1, nodeMap.get("b").get(0)).collect(Collectors.toCollection(HashSet::new)); + Set nodesInOtherSide = Stream.of(nodeMap.get("a").get(0)).collect(Collectors.toCollection(HashSet::new)); + + NetworkDisruption networkDisruption = new NetworkDisruption( + new NetworkDisruption.TwoPartitions(nodesInOneSide, nodesInOtherSide), + NetworkDisruption.UNRESPONSIVE + ); + internalCluster().setDisruptionScheme(networkDisruption); + + logger.info("--> network disruption is started"); + networkDisruption.startDisrupting(); + + Set hitNodes = new HashSet<>(); + Future[] responses = new Future[50]; + logger.info("--> making search requests"); + for (int i = 0; i < 50; i++) { + responses[i] = internalCluster().client(nodeMap.get("b").get(0)) + .prepareSearch("test") + .setSize(100) + .setQuery(QueryBuilders.matchAllQuery()) + .execute(); + } + + logger.info("--> network disruption is stopped"); + networkDisruption.stopDisrupting(); + + for (int i = 0; i < 50; i++) { + try { + SearchResponse searchResponse = responses[i].get(); + assertEquals(searchResponse.getFailedShards(), 0); + for (int j = 0; j < searchResponse.getHits().getHits().length; j++) { + hitNodes.add(searchResponse.getHits().getAt(j).getShard().getNodeId()); + } + } catch (Exception t) { + fail("search should not fail"); + } + } + + assertSearchInAZ("b"); + assertSearchInAZ("c"); + assertNoSearchInAZ("a"); + } + + private void assertNoSearchInAZ(String az) { + ImmutableOpenMap dataNodes = internalCluster().clusterService().state().nodes().getDataNodes(); + String dataNodeId = null; + + for (Iterator it = dataNodes.valuesIt(); it.hasNext();) { + DiscoveryNode node = it.next(); + if (node.getAttributes().get("zone").equals(az)) { + dataNodeId = node.getId(); + break; + } + } + + NodesStatsResponse nodeStats = client().admin().cluster().prepareNodesStats().execute().actionGet(); + for (NodeStats stat : nodeStats.getNodes()) { + SearchStats.Stats searchStats = stat.getIndices().getSearch().getTotal(); + if (stat.getNode().isDataNode()) { + if (stat.getNode().getId().equals(dataNodeId)) { + assertEquals(0, searchStats.getQueryCount()); + assertEquals(0, searchStats.getFetchCount()); + } + } + } + } + + private void assertSearchInAZ(String az) { + ImmutableOpenMap dataNodes = internalCluster().clusterService().state().nodes().getDataNodes(); + String dataNodeId = null; + + for (Iterator it = dataNodes.valuesIt(); it.hasNext();) { + DiscoveryNode node = it.next(); + if (node.getAttributes().get("zone").equals(az)) { + dataNodeId = node.getId(); + break; + } + } + + NodesStatsResponse nodeStats = client().admin().cluster().prepareNodesStats().execute().actionGet(); + for (NodeStats stat : nodeStats.getNodes()) { + SearchStats.Stats searchStats = stat.getIndices().getSearch().getTotal(); + if (stat.getNode().isDataNode()) { + if (stat.getNode().getId().equals(dataNodeId)) { + Assert.assertTrue(searchStats.getFetchCount() > 0L); + Assert.assertTrue(searchStats.getQueryCount() > 0L); + } + } + } + } + + /** + * Shard routing request is served by data nodes in az with weight set as 0, + * in case shard copies are not available in other azs. + * This is tested by setting up a 3 node cluster with one data node per az. + * Weighted shard routing weight is set as 0 for az-c. + * Indices are created with one replica copy and network disruption is introduced, + * which makes node in zone-a unresponsive. + * Since there are two copies of a shard, there can be few shards for which copy doesn't exist in zone b. + * Assertions are put to make sure such shard search requests are served by data node in zone c. + * @throws IOException throws exception + */ + public void testSearchAggregationWithNetworkDisruption_FailOpenEnabled() throws Exception { + + Settings commonSettings = Settings.builder() + .put("cluster.routing.allocation.awareness.attributes", "zone") + .put("cluster.routing.allocation.awareness.force.zone.values", "a,b,c") + .put("cluster.routing.weighted.fail_open", true) + .build(); + + int nodeCountPerAZ = 1; + Map> nodeMap = setupCluster(nodeCountPerAZ, commonSettings); + + assertAcked( + prepareCreate("index").setMapping("f", "type=keyword") + .setSettings(Settings.builder().put("index" + ".number_of_shards", 10).put("index" + ".number_of_replicas", 1)) + ); + + int numDocs = 10; + List docs = new ArrayList<>(); + for (int i = 0; i < numDocs; ++i) { + docs.add(client().prepareIndex("index").setSource("f", Integer.toString(i / 3))); + } + indexRandom(true, docs); + ensureGreen(); + refresh("index"); + + logger.info("--> setting shard routing weights for weighted round robin"); + Map weights = Map.of("a", 1.0, "b", 1.0, "c", 0.0); + setShardRoutingWeights(weights); + + logger.info("--> creating network partition disruption"); + final String clusterManagerNode1 = internalCluster().getClusterManagerName(); + Set nodesInOneSide = Stream.of(clusterManagerNode1, nodeMap.get("b").get(0), nodeMap.get("c").get(0)) + .collect(Collectors.toCollection(HashSet::new)); + Set nodesInOtherSide = Stream.of(nodeMap.get("a").get(0)).collect(Collectors.toCollection(HashSet::new)); + + NetworkDisruption networkDisruption = new NetworkDisruption( + new NetworkDisruption.TwoPartitions(nodesInOneSide, nodesInOtherSide), + NetworkDisruption.UNRESPONSIVE + ); + internalCluster().setDisruptionScheme(networkDisruption); + + logger.info("--> network disruption is started"); + networkDisruption.startDisrupting(); + + Set hitNodes = new HashSet<>(); + Future[] responses = new Future[51]; + int size = 17; + logger.info("--> making search requests"); + for (int i = 0; i < 50; i++) { + responses[i] = internalCluster().client(nodeMap.get("b").get(0)) + .prepareSearch("index") + .setSize(20) + .addAggregation(terms("f").field("f")) + .execute(); + } + + logger.info("--> network disruption is stopped"); + networkDisruption.stopDisrupting(); + + for (int i = 0; i < 50; i++) { + try { + SearchResponse searchResponse = responses[i].get(); + Aggregations aggregations = searchResponse.getAggregations(); + assertNotNull(aggregations); + Terms terms = aggregations.get("f"); + assertEquals(0, searchResponse.getFailedShards()); + assertEquals(Math.min(numDocs, 3L), terms.getBucketByKey("0").getDocCount()); + for (int j = 0; j < searchResponse.getHits().getHits().length; j++) { + hitNodes.add(searchResponse.getHits().getAt(j).getShard().getNodeId()); + } + } catch (Exception t) { + fail("search should not fail"); + } + } + assertSearchInAZ("b"); + assertSearchInAZ("c"); + assertNoSearchInAZ("a"); + + assertBusy( + () -> assertThat(client().admin().indices().prepareStats().get().getTotal().getSearch().getOpenContexts(), equalTo(0L)), + 60, + TimeUnit.SECONDS + ); + } + + /** + * MultiGet with fail open enabled. No request failure on network disruption + * @throws IOException throws exception + */ + public void testMultiGetWithNetworkDisruption_FailOpenEnabled() throws Exception { + + Settings commonSettings = Settings.builder() + .put("cluster.routing.allocation.awareness.attributes", "zone") + .put("cluster.routing.allocation.awareness.force.zone.values", "a,b,c") + .put("cluster.routing.weighted.fail_open", true) + .build(); + + int nodeCountPerAZ = 1; + Map> nodeMap = setupCluster(nodeCountPerAZ, commonSettings); + + int numShards = 10; + int numReplicas = 1; + setUpIndexing(numShards, numReplicas); + + logger.info("--> setting shard routing weights for weighted round robin"); + Map weights = Map.of("a", 1.0, "b", 1.0, "c", 0.0); + setShardRoutingWeights(weights); + + logger.info("--> creating network partition disruption"); + final String clusterManagerNode1 = internalCluster().getClusterManagerName(); + Set nodesInOneSide = Stream.of(clusterManagerNode1, nodeMap.get("b").get(0), nodeMap.get("c").get(0)) + .collect(Collectors.toCollection(HashSet::new)); + Set nodesInOtherSide = Stream.of(nodeMap.get("a").get(0)).collect(Collectors.toCollection(HashSet::new)); + + NetworkDisruption networkDisruption = new NetworkDisruption( + new NetworkDisruption.TwoPartitions(nodesInOneSide, nodesInOtherSide), + NetworkDisruption.UNRESPONSIVE + ); + internalCluster().setDisruptionScheme(networkDisruption); + + logger.info("--> network disruption is started"); + networkDisruption.startDisrupting(); + + Future[] responses = new Future[50]; + logger.info("--> making search requests"); + int index1, index2; + for (int i = 0; i < 50; i++) { + index1 = randomIntBetween(0, 9); + index2 = randomIntBetween(0, 9); + responses[i] = client().prepareMultiGet() + .add(new MultiGetRequest.Item("test", "" + index1)) + .add(new MultiGetRequest.Item("test", "" + index2)) + .execute(); + } + + logger.info("--> network disruption is stopped"); + networkDisruption.stopDisrupting(); + + for (int i = 0; i < 50; i++) { + try { + MultiGetResponse multiGetResponse = responses[i].get(); + assertThat(multiGetResponse.getResponses().length, equalTo(2)); + assertThat(multiGetResponse.getResponses()[0].isFailed(), equalTo(false)); + assertThat(multiGetResponse.getResponses()[1].isFailed(), equalTo(false)); + } catch (Exception t) { + fail("search should not fail"); + } + } + + assertBusy( + () -> assertThat(client().admin().indices().prepareStats().get().getTotal().getSearch().getOpenContexts(), equalTo(0L)), + 60, + TimeUnit.SECONDS + ); + } + + /** + * MultiGet with fail open disabled. Assert that some requests do fail. + * @throws IOException throws exception + */ + public void testMultiGetWithNetworkDisruption_FailOpenDisabled() throws Exception { + + Settings commonSettings = Settings.builder() + .put("cluster.routing.allocation.awareness.attributes", "zone") + .put("cluster.routing.allocation.awareness.force.zone.values", "a,b,c") + .put("cluster.routing.weighted.fail_open", false) + .build(); + + int nodeCountPerAZ = 1; + Map> nodeMap = setupCluster(nodeCountPerAZ, commonSettings); + + int numShards = 10; + int numReplicas = 1; + setUpIndexing(numShards, numReplicas); + + logger.info("--> setting shard routing weights for weighted round robin"); + Map weights = Map.of("a", 1.0, "b", 1.0, "c", 0.0); + setShardRoutingWeights(weights); + + logger.info("--> creating network partition disruption"); + final String clusterManagerNode1 = internalCluster().getClusterManagerName(); + Set nodesInOneSide = Stream.of(clusterManagerNode1, nodeMap.get("b").get(0), nodeMap.get("c").get(0)) + .collect(Collectors.toCollection(HashSet::new)); + Set nodesInOtherSide = Stream.of(nodeMap.get("a").get(0)).collect(Collectors.toCollection(HashSet::new)); + + NetworkDisruption networkDisruption = new NetworkDisruption( + new NetworkDisruption.TwoPartitions(nodesInOneSide, nodesInOtherSide), + NetworkDisruption.UNRESPONSIVE + ); + internalCluster().setDisruptionScheme(networkDisruption); + + logger.info("--> network disruption is started"); + networkDisruption.startDisrupting(); + + Future[] responses = new Future[50]; + logger.info("--> making search requests"); + int index1, index2; + for (int i = 0; i < 50; i++) { + index1 = randomIntBetween(0, 9); + index2 = randomIntBetween(0, 9); + responses[i] = client().prepareMultiGet() + .add(new MultiGetRequest.Item("test", "" + index1)) + .add(new MultiGetRequest.Item("test", "" + index2)) + .execute(); + } + + logger.info("--> network disruption is stopped"); + networkDisruption.stopDisrupting(); + int failedCount = 0; + for (int i = 0; i < 50; i++) { + try { + MultiGetResponse multiGetResponse = responses[i].get(); + assertThat(multiGetResponse.getResponses().length, equalTo(2)); + if (multiGetResponse.getResponses()[0].isFailed() || multiGetResponse.getResponses()[1].isFailed()) { + failedCount++; + } + } catch (Exception t) { + fail("search should not fail"); + } + } + + Assert.assertTrue(failedCount > 0); + + assertBusy( + () -> assertThat(client().admin().indices().prepareStats().get().getTotal().getSearch().getOpenContexts(), equalTo(0L)), + 60, + TimeUnit.SECONDS + ); + } + } diff --git a/server/src/main/java/org/opensearch/action/fieldcaps/TransportFieldCapabilitiesIndexAction.java b/server/src/main/java/org/opensearch/action/fieldcaps/TransportFieldCapabilitiesIndexAction.java index 7d9ab4ff93f59..d57bbed59fe3e 100644 --- a/server/src/main/java/org/opensearch/action/fieldcaps/TransportFieldCapabilitiesIndexAction.java +++ b/server/src/main/java/org/opensearch/action/fieldcaps/TransportFieldCapabilitiesIndexAction.java @@ -48,6 +48,7 @@ import org.opensearch.cluster.metadata.IndexNameExpressionResolver; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.node.DiscoveryNodes; +import org.opensearch.cluster.routing.FailAwareWeightedRouting; import org.opensearch.cluster.routing.GroupShardsIterator; import org.opensearch.cluster.routing.ShardIterator; import org.opensearch.cluster.routing.ShardRouting; @@ -261,16 +262,17 @@ private void onFailure(ShardRouting shardRouting, Exception e) { tryNext(e, false); } - private ShardRouting nextRoutingOrNull() { + private ShardRouting nextRoutingOrNull(Exception failure) { if (shardsIt.size() == 0 || shardIndex >= shardsIt.size()) { return null; } - ShardRouting next = shardsIt.get(shardIndex).nextOrNull(); + ShardRouting next = FailAwareWeightedRouting.getInstance().findNext(shardsIt.get(shardIndex), clusterService.state(), failure); + if (next != null) { return next; } moveToNextShard(); - return nextRoutingOrNull(); + return nextRoutingOrNull(failure); } private void moveToNextShard() { @@ -278,7 +280,7 @@ private void moveToNextShard() { } private void tryNext(@Nullable final Exception lastFailure, boolean canMatchShard) { - ShardRouting shardRouting = nextRoutingOrNull(); + ShardRouting shardRouting = nextRoutingOrNull(lastFailure); if (shardRouting == null) { if (canMatchShard == false) { listener.onResponse(new FieldCapabilitiesIndexResponse(request.index(), Collections.emptyMap(), false)); diff --git a/server/src/main/java/org/opensearch/action/search/AbstractSearchAsyncAction.java b/server/src/main/java/org/opensearch/action/search/AbstractSearchAsyncAction.java index 0876bf93a557b..1a37406e19f14 100644 --- a/server/src/main/java/org/opensearch/action/search/AbstractSearchAsyncAction.java +++ b/server/src/main/java/org/opensearch/action/search/AbstractSearchAsyncAction.java @@ -43,6 +43,7 @@ import org.opensearch.action.ShardOperationFailedException; import org.opensearch.action.support.TransportActions; import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.routing.FailAwareWeightedRouting; import org.opensearch.cluster.routing.GroupShardsIterator; import org.opensearch.common.Nullable; import org.opensearch.common.lease.Releasable; @@ -449,7 +450,8 @@ private void onShardFailure(final int shardIndex, @Nullable SearchShardTarget sh // we always add the shard failure for a specific shard instance // we do make sure to clean it on a successful response from a shard onShardFailure(shardIndex, shard, e); - final SearchShardTarget nextShard = shardIt.nextOrNull(); + SearchShardTarget nextShard = FailAwareWeightedRouting.getInstance().findNext(shardIt, clusterState, e); + final boolean lastShard = nextShard == null; logger.debug( () -> new ParameterizedMessage( diff --git a/server/src/main/java/org/opensearch/action/search/SearchShardIterator.java b/server/src/main/java/org/opensearch/action/search/SearchShardIterator.java index 72951f60c286e..45e4c1a54eeba 100644 --- a/server/src/main/java/org/opensearch/action/search/SearchShardIterator.java +++ b/server/src/main/java/org/opensearch/action/search/SearchShardIterator.java @@ -119,7 +119,7 @@ public String getClusterAlias() { return clusterAlias; } - SearchShardTarget nextOrNull() { + public SearchShardTarget nextOrNull() { final String nodeId = targetNodesIterator.nextOrNull(); if (nodeId != null) { return new SearchShardTarget(nodeId, shardId, clusterAlias, originalIndices); diff --git a/server/src/main/java/org/opensearch/action/support/broadcast/TransportBroadcastAction.java b/server/src/main/java/org/opensearch/action/support/broadcast/TransportBroadcastAction.java index a69853dc6a3c0..10645c744b2f3 100644 --- a/server/src/main/java/org/opensearch/action/support/broadcast/TransportBroadcastAction.java +++ b/server/src/main/java/org/opensearch/action/support/broadcast/TransportBroadcastAction.java @@ -44,6 +44,7 @@ import org.opensearch.cluster.metadata.IndexNameExpressionResolver; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.node.DiscoveryNodes; +import org.opensearch.cluster.routing.FailAwareWeightedRouting; import org.opensearch.cluster.routing.GroupShardsIterator; import org.opensearch.cluster.routing.ShardIterator; import org.opensearch.cluster.routing.ShardRouting; @@ -250,7 +251,8 @@ void onOperation(@Nullable ShardRouting shard, final ShardIterator shardIt, int // we set the shard failure always, even if its the first in the replication group, and the next one // will work (it will just override it...) setFailure(shardIt, shardIndex, e); - ShardRouting nextShard = shardIt.nextOrNull(); + ShardRouting nextShard = FailAwareWeightedRouting.getInstance().findNext(shardIt, clusterService.state(), e); + if (nextShard != null) { if (e != null) { if (logger.isTraceEnabled()) { diff --git a/server/src/main/java/org/opensearch/action/support/single/shard/TransportSingleShardAction.java b/server/src/main/java/org/opensearch/action/support/single/shard/TransportSingleShardAction.java index df39bd29493dd..d8c4913e595a4 100644 --- a/server/src/main/java/org/opensearch/action/support/single/shard/TransportSingleShardAction.java +++ b/server/src/main/java/org/opensearch/action/support/single/shard/TransportSingleShardAction.java @@ -47,6 +47,7 @@ import org.opensearch.cluster.metadata.IndexNameExpressionResolver; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.node.DiscoveryNodes; +import org.opensearch.cluster.routing.FailAwareWeightedRouting; import org.opensearch.cluster.routing.ShardRouting; import org.opensearch.cluster.routing.ShardsIterator; import org.opensearch.cluster.service.ClusterService; @@ -244,7 +245,8 @@ private void perform(@Nullable final Exception currentFailure) { lastFailure = currentFailure; this.lastFailure = currentFailure; } - final ShardRouting shardRouting = shardIt.nextOrNull(); + ShardRouting shardRouting = FailAwareWeightedRouting.getInstance().findNext(shardIt, clusterService.state(), currentFailure); + if (shardRouting == null) { Exception failure = lastFailure; if (failure == null || isShardNotAvailableException(failure)) { @@ -273,6 +275,7 @@ private void perform(@Nullable final Exception currentFailure) { ); } final Writeable.Reader reader = getResponseReader(); + ShardRouting finalShardRouting = shardRouting; transportService.sendRequest( node, transportShardAction, @@ -296,7 +299,7 @@ public void handleResponse(final Response response) { @Override public void handleException(TransportException exp) { - onFailure(shardRouting, exp); + onFailure(finalShardRouting, exp); } } ); diff --git a/server/src/main/java/org/opensearch/cluster/metadata/WeightedRoutingMetadata.java b/server/src/main/java/org/opensearch/cluster/metadata/WeightedRoutingMetadata.java index 320f75a9f2ada..c7136785606f6 100644 --- a/server/src/main/java/org/opensearch/cluster/metadata/WeightedRoutingMetadata.java +++ b/server/src/main/java/org/opensearch/cluster/metadata/WeightedRoutingMetadata.java @@ -39,6 +39,7 @@ public class WeightedRoutingMetadata extends AbstractNamedDiffable internalErrorRestStatusList = List.of( + RestStatus.INTERNAL_SERVER_ERROR, + RestStatus.BAD_GATEWAY, + RestStatus.SERVICE_UNAVAILABLE, + RestStatus.GATEWAY_TIMEOUT + ); + + public static FailAwareWeightedRouting getInstance() { + return INSTANCE; + + } + + /** + * * + * @return true if exception is due to cluster availability issues + */ + private boolean isInternalFailure(Exception exception) { + if (exception instanceof OpenSearchException) { + // checking for 5xx failures + return internalErrorRestStatusList.contains(((OpenSearchException) exception).status()); + } + return false; + } + + /** + * This function checks if the shard is present in data node with weighted routing weight set to 0, + * In such cases we fail open, if shard search request for the shard from other shard copies fail with non + * retryable exception. + * + * @param nodeId the node with the shard copy + * @return true if the node has attribute value with shard routing weight set to zero, else false + */ + private boolean isWeighedAway(String nodeId, ClusterState clusterState) { + DiscoveryNode node = clusterState.nodes().get(nodeId); + WeightedRoutingMetadata weightedRoutingMetadata = clusterState.metadata().weightedRoutingMetadata(); + if (weightedRoutingMetadata != null) { + WeightedRouting weightedRouting = weightedRoutingMetadata.getWeightedRouting(); + if (weightedRouting != null && weightedRouting.isSet()) { + // Fetch weighted routing attributes with weight set as zero + Stream keys = weightedRouting.weights() + .entrySet() + .stream() + .filter(entry -> entry.getValue().intValue() == WeightedRoutingMetadata.WEIGHED_AWAY_WEIGHT) + .map(Map.Entry::getKey); + + for (Object key : keys.toArray()) { + if (node.getAttributes().get(weightedRouting.attributeName()).equals(key.toString())) { + return true; + } + } + } + } + return false; + } + + /** + * This function returns next shard copy to retry search request in case of failure from previous copy returned + * by the iterator. It has the logic to fail open ie request shard copies present in nodes with weighted shard + * routing weight set to zero + * + * @param shardIt Shard Iterator containing order in which shard copies for a shard need to be requested + * @return the next shard copy + */ + public SearchShardTarget findNext(final SearchShardIterator shardIt, ClusterState clusterState, Exception exception) { + SearchShardTarget next = shardIt.nextOrNull(); + while (next != null && isWeighedAway(next.getNodeId(), clusterState)) { + SearchShardTarget nextShard = next; + if (canFailOpen(nextShard.getShardId(), exception, clusterState)) { + logger.info( + () -> new ParameterizedMessage("{}: Fail open executed due to exception {}", nextShard.getShardId(), exception) + ); + break; + } + next = shardIt.nextOrNull(); + } + return next; + } + + /** + * This function returns next shard copy to retry search request in case of failure from previous copy returned + * by the iterator. It has the logic to fail open ie request shard copies present in nodes with weighted shard + * routing weight set to zero + * + * @param shardsIt Shard Iterator containing order in which shard copies for a shard need to be requested + * @return the next shard copy + */ + public ShardRouting findNext(final ShardsIterator shardsIt, ClusterState clusterState, Exception exception) { + ShardRouting next = shardsIt.nextOrNull(); + + while (next != null && isWeighedAway(next.currentNodeId(), clusterState)) { + ShardRouting nextShard = next; + if (canFailOpen(nextShard.shardId(), exception, clusterState)) { + logger.info(() -> new ParameterizedMessage("{}: Fail open executed due to exception {}", nextShard.shardId(), exception)); + break; + } + next = shardsIt.nextOrNull(); + } + return next; + } + + /** + * * + * @return true if can fail open ie request shard copies present in nodes with weighted shard + * routing weight set to zero + */ + private boolean canFailOpen(ShardId shardId, Exception exception, ClusterState clusterState) { + return isInternalFailure(exception) || hasInActiveShardCopies(clusterState, shardId); + } + + private boolean hasInActiveShardCopies(ClusterState clusterState, ShardId shardId) { + List shards = clusterState.routingTable().shardRoutingTable(shardId).shards(); + for (ShardRouting shardRouting : shards) { + if (!shardRouting.active()) { + return true; + } + } + return false; + } +} diff --git a/server/src/main/java/org/opensearch/cluster/routing/IndexShardRoutingTable.java b/server/src/main/java/org/opensearch/cluster/routing/IndexShardRoutingTable.java index 9026e7068e9fe..207570c1d56b2 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/IndexShardRoutingTable.java +++ b/server/src/main/java/org/opensearch/cluster/routing/IndexShardRoutingTable.java @@ -32,6 +32,9 @@ package org.opensearch.cluster.routing; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.cluster.metadata.WeightedRoutingMetadata; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.node.DiscoveryNodes; import org.opensearch.common.Nullable; @@ -57,6 +60,8 @@ import java.util.Optional; import java.util.Set; import java.util.function.Predicate; +import java.util.stream.Collectors; +import java.util.stream.Stream; import static java.util.Collections.emptyMap; @@ -89,6 +94,8 @@ public class IndexShardRoutingTable implements Iterable { private volatile Map> activeShardsByWeight = emptyMap(); private volatile Map> initializingShardsByWeight = emptyMap(); + private static final Logger logger = LogManager.getLogger(IndexShardRoutingTable.class); + /** * The initializing list, including ones that are initializing on a target node because of relocation. * If we can come up with a better variable name, it would be nice... @@ -305,19 +312,50 @@ public ShardIterator activeInitializingShardsRankedIt( * * @param weightedRouting entity * @param nodes discovered nodes in the cluster + * @param isFailOpenEnabled if true, shards search requests in case of failures are tried on shard copies present + * in node attribute value with weight zero * @return an iterator over active and initializing shards, ordered by weighted round-robin * scheduling policy. Making sure that initializing shards are the last to iterate through. */ - public ShardIterator activeInitializingShardsWeightedIt(WeightedRouting weightedRouting, DiscoveryNodes nodes, double defaultWeight) { + public ShardIterator activeInitializingShardsWeightedIt( + WeightedRouting weightedRouting, + DiscoveryNodes nodes, + double defaultWeight, + boolean isFailOpenEnabled + ) { final int seed = shuffler.nextSeed(); List ordered = new ArrayList<>(); List orderedActiveShards = getActiveShardsByWeight(weightedRouting, nodes, defaultWeight); + List orderedListWithDistinctShards; ordered.addAll(shuffler.shuffle(orderedActiveShards, seed)); if (!allInitializingShards.isEmpty()) { List orderedInitializingShards = getInitializingShardsByWeight(weightedRouting, nodes, defaultWeight); ordered.addAll(orderedInitializingShards); } - return new PlainShardIterator(shardId, ordered); + + // append shards for attribute value with weight zero, so that shard search requests can be tried on + // shard copies in case of request failure from other attribute values. + if (isFailOpenEnabled) { + try { + Stream keys = weightedRouting.weights() + .entrySet() + .stream() + .filter(entry -> entry.getValue().intValue() == WeightedRoutingMetadata.WEIGHED_AWAY_WEIGHT) + .map(Map.Entry::getKey); + keys.forEach(key -> { + ShardIterator iterator = onlyNodeSelectorActiveInitializingShardsIt(weightedRouting.attributeName() + ":" + key, nodes); + while (iterator.remaining() > 0) { + ordered.add(iterator.nextOrNull()); + } + }); + } catch (IllegalArgumentException e) { + // this exception is thrown by {@link onlyNodeSelectorActiveInitializingShardsIt} in case count of shard + // copies found is zero + logger.debug("no shard copies found for shard id [{}] for node attribute with weight zero", shardId); + } + } + orderedListWithDistinctShards = ordered.stream().distinct().collect(Collectors.toList()); + return new PlainShardIterator(shardId, orderedListWithDistinctShards); } /** diff --git a/server/src/main/java/org/opensearch/cluster/routing/OperationRouting.java b/server/src/main/java/org/opensearch/cluster/routing/OperationRouting.java index a4b4cc961fade..d7df1a2c2181b 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/OperationRouting.java +++ b/server/src/main/java/org/opensearch/cluster/routing/OperationRouting.java @@ -83,10 +83,18 @@ public class OperationRouting { Setting.Property.Dynamic, Setting.Property.NodeScope ); + + public static final Setting WEIGHTED_ROUTING_FAILOPEN_ENABLED = Setting.boolSetting( + "cluster.routing.weighted.fail_open", + true, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); private volatile List awarenessAttributes; private volatile boolean useAdaptiveReplicaSelection; private volatile boolean ignoreAwarenessAttr; private volatile double weightedRoutingDefaultWeight; + private volatile boolean isFailOpenEnabled; public OperationRouting(Settings settings, ClusterSettings clusterSettings) { // whether to ignore awareness attributes when routing requests @@ -98,9 +106,11 @@ public OperationRouting(Settings settings, ClusterSettings clusterSettings) { ); this.useAdaptiveReplicaSelection = USE_ADAPTIVE_REPLICA_SELECTION_SETTING.get(settings); this.weightedRoutingDefaultWeight = WEIGHTED_ROUTING_DEFAULT_WEIGHT.get(settings); + this.isFailOpenEnabled = WEIGHTED_ROUTING_FAILOPEN_ENABLED.get(settings); clusterSettings.addSettingsUpdateConsumer(USE_ADAPTIVE_REPLICA_SELECTION_SETTING, this::setUseAdaptiveReplicaSelection); clusterSettings.addSettingsUpdateConsumer(IGNORE_AWARENESS_ATTRIBUTES_SETTING, this::setIgnoreAwarenessAttributes); clusterSettings.addSettingsUpdateConsumer(WEIGHTED_ROUTING_DEFAULT_WEIGHT, this::setWeightedRoutingDefaultWeight); + clusterSettings.addSettingsUpdateConsumer(WEIGHTED_ROUTING_FAILOPEN_ENABLED, this::setFailOpenEnabled); } void setUseAdaptiveReplicaSelection(boolean useAdaptiveReplicaSelection) { @@ -115,6 +125,10 @@ void setWeightedRoutingDefaultWeight(double weightedRoutingDefaultWeight) { this.weightedRoutingDefaultWeight = weightedRoutingDefaultWeight; } + void setFailOpenEnabled(boolean isFailOpenEnabled) { + this.isFailOpenEnabled = isFailOpenEnabled; + } + public boolean isIgnoreAwarenessAttr() { return ignoreAwarenessAttr; } @@ -328,7 +342,8 @@ private ShardIterator shardRoutings( return indexShard.activeInitializingShardsWeightedIt( weightedRoutingMetadata.getWeightedRouting(), nodes, - getWeightedRoutingDefaultWeight() + getWeightedRoutingDefaultWeight(), + isFailOpenEnabled ); } else if (ignoreAwarenessAttributes()) { if (useAdaptiveReplicaSelection) { diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index ea27cb82b8c0f..55e88d2d09d8c 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -540,6 +540,7 @@ public void apply(Settings value, Settings current, Settings previous) { OperationRouting.USE_ADAPTIVE_REPLICA_SELECTION_SETTING, OperationRouting.IGNORE_AWARENESS_ATTRIBUTES_SETTING, OperationRouting.WEIGHTED_ROUTING_DEFAULT_WEIGHT, + OperationRouting.WEIGHTED_ROUTING_FAILOPEN_ENABLED, IndexGraveyard.SETTING_MAX_TOMBSTONES, PersistentTasksClusterService.CLUSTER_TASKS_ALLOCATION_RECHECK_INTERVAL_SETTING, EnableAssignmentDecider.CLUSTER_TASKS_ALLOCATION_ENABLE_SETTING, diff --git a/server/src/test/java/org/opensearch/cluster/routing/FailAwareWeightedRoutingTests.java b/server/src/test/java/org/opensearch/cluster/routing/FailAwareWeightedRoutingTests.java new file mode 100644 index 0000000000000..c9c616dab0dbc --- /dev/null +++ b/server/src/test/java/org/opensearch/cluster/routing/FailAwareWeightedRoutingTests.java @@ -0,0 +1,266 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.cluster.routing; + +import org.opensearch.Version; +import org.opensearch.action.OriginalIndicesTests; +import org.opensearch.action.search.SearchShardIterator; +import org.opensearch.cluster.ClusterName; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.metadata.Metadata; +import org.opensearch.cluster.metadata.WeightedRoutingMetadata; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.node.DiscoveryNodeRole; +import org.opensearch.cluster.node.DiscoveryNodes; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.concurrent.OpenSearchRejectedExecutionException; +import org.opensearch.index.shard.ShardId; +import org.opensearch.search.SearchShardTarget; +import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.transport.NodeNotConnectedException; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import static java.util.Collections.singletonMap; +import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_CREATION_DATE; +import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS; +import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS; +import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_VERSION_CREATED; + +public class FailAwareWeightedRoutingTests extends OpenSearchTestCase { + + private ClusterState setUpCluster() { + ClusterState clusterState = ClusterState.builder(new ClusterName("test")).build(); + + // set up nodes + DiscoveryNode nodeA = new DiscoveryNode( + "node_zone_a", + buildNewFakeTransportAddress(), + singletonMap("zone", "a"), + Collections.singleton(DiscoveryNodeRole.DATA_ROLE), + Version.CURRENT + ); + DiscoveryNode nodeB = new DiscoveryNode( + "node_zone_b", + buildNewFakeTransportAddress(), + singletonMap("zone", "b"), + Collections.singleton(DiscoveryNodeRole.DATA_ROLE), + Version.CURRENT + ); + DiscoveryNode nodeC = new DiscoveryNode( + "node_zone_c", + buildNewFakeTransportAddress(), + singletonMap("zone", "c"), + Collections.singleton(DiscoveryNodeRole.DATA_ROLE), + Version.CURRENT + ); + + DiscoveryNodes.Builder nodeBuilder = DiscoveryNodes.builder(clusterState.nodes()); + + nodeBuilder.add(nodeA); + nodeBuilder.add(nodeB); + nodeBuilder.add(nodeC); + clusterState = ClusterState.builder(clusterState).nodes(nodeBuilder).build(); + + // set up weighted routing weights + Map weights = Map.of("a", 1.0, "b", 1.0, "c", 0.0); + WeightedRouting weightedRouting = new WeightedRouting("zone", weights); + WeightedRoutingMetadata weightedRoutingMetadata = new WeightedRoutingMetadata(weightedRouting, 0); + Metadata.Builder metadataBuilder = Metadata.builder(clusterState.metadata()); + metadataBuilder.putCustom(WeightedRoutingMetadata.TYPE, weightedRoutingMetadata); + clusterState = ClusterState.builder(clusterState).metadata(metadataBuilder).build(); + + return clusterState; + + } + + public void testFindNextWithoutFailOpen() throws IOException { + + ClusterState clusterState = setUpCluster(); + + // set up index + IndexMetadata indexMetadata = IndexMetadata.builder("test") + .settings( + Settings.builder() + .put(SETTING_VERSION_CREATED, Version.CURRENT) + .put(SETTING_NUMBER_OF_SHARDS, 1) + .put(SETTING_NUMBER_OF_REPLICAS, 2) + .put(SETTING_CREATION_DATE, System.currentTimeMillis()) + ) + .build(); + + ShardRouting shardRoutingA = TestShardRouting.newShardRouting("test", 0, "node_zone_a", true, ShardRoutingState.STARTED); + ShardRouting shardRoutingB = TestShardRouting.newShardRouting("test", 0, "node_zone_b", false, ShardRoutingState.STARTED); + ShardRouting shardRoutingC = TestShardRouting.newShardRouting("test", 0, "node_zone_c", false, ShardRoutingState.STARTED); + + Metadata.Builder metadataBuilder = Metadata.builder(clusterState.metadata()); + metadataBuilder.put(indexMetadata, false).generateClusterUuidIfNeeded(); + IndexRoutingTable.Builder indexRoutingTableBuilder = IndexRoutingTable.builder(indexMetadata.getIndex()); + + final ShardId shardId = new ShardId("test", "_na_", 0); + IndexShardRoutingTable.Builder indexShardRoutingBuilder = new IndexShardRoutingTable.Builder(shardId); + indexShardRoutingBuilder.addShard(shardRoutingA); + indexShardRoutingBuilder.addShard(shardRoutingB); + indexShardRoutingBuilder.addShard(shardRoutingC); + + indexRoutingTableBuilder.addIndexShard(indexShardRoutingBuilder.build()); + RoutingTable.Builder routingTableBuilder = RoutingTable.builder(); + routingTableBuilder.add(indexRoutingTableBuilder.build()); + clusterState = ClusterState.builder(clusterState).routingTable(routingTableBuilder.build()).build(); + + List shardRoutings = new ArrayList<>(); + shardRoutings.add(shardRoutingA); + shardRoutings.add(shardRoutingB); + shardRoutings.add(shardRoutingC); + + String clusterAlias = randomBoolean() ? null : randomAlphaOfLengthBetween(5, 10); + SearchShardIterator searchShardIterator = new SearchShardIterator( + clusterAlias, + shardId, + shardRoutings, + OriginalIndicesTests.randomOriginalIndices() + ); + + searchShardIterator.nextOrNull(); + searchShardIterator.nextOrNull(); + + // fail open is not executed since fail open conditions don't met + SearchShardTarget next = FailAwareWeightedRouting.getInstance() + .findNext(searchShardIterator, clusterState, new OpenSearchRejectedExecutionException()); + assertNull(next); + } + + public void testFindNextWithFailOpenDueTo5xx() throws IOException { + + ClusterState clusterState = setUpCluster(); + + // set up index + IndexMetadata indexMetadata = IndexMetadata.builder("test") + .settings( + Settings.builder() + .put(SETTING_VERSION_CREATED, Version.CURRENT) + .put(SETTING_NUMBER_OF_SHARDS, 1) + .put(SETTING_NUMBER_OF_REPLICAS, 2) + .put(SETTING_CREATION_DATE, System.currentTimeMillis()) + ) + .build(); + + ShardRouting shardRoutingA = TestShardRouting.newShardRouting("test", 0, "node_zone_a", true, ShardRoutingState.STARTED); + ShardRouting shardRoutingB = TestShardRouting.newShardRouting("test", 0, "node_zone_b", false, ShardRoutingState.STARTED); + ShardRouting shardRoutingC = TestShardRouting.newShardRouting("test", 0, "node_zone_c", false, ShardRoutingState.STARTED); + + List shardRoutings = new ArrayList<>(); + shardRoutings.add(shardRoutingA); + shardRoutings.add(shardRoutingB); + shardRoutings.add(shardRoutingC); + + Metadata.Builder metadataBuilder = Metadata.builder(clusterState.metadata()); + metadataBuilder.put(indexMetadata, false).generateClusterUuidIfNeeded(); + IndexRoutingTable.Builder indexRoutingTableBuilder = IndexRoutingTable.builder(indexMetadata.getIndex()); + + final ShardId shardId = new ShardId("test", "_na_", 0); + IndexShardRoutingTable.Builder indexShardRoutingBuilder = new IndexShardRoutingTable.Builder(shardId); + + indexShardRoutingBuilder.addShard(shardRoutingA); + indexShardRoutingBuilder.addShard(shardRoutingB); + indexShardRoutingBuilder.addShard(shardRoutingC); + + indexRoutingTableBuilder.addIndexShard(indexShardRoutingBuilder.build()); + RoutingTable.Builder routingTableBuilder = RoutingTable.builder(); + + routingTableBuilder.add(indexRoutingTableBuilder.build()); + clusterState = ClusterState.builder(clusterState).routingTable(routingTableBuilder.build()).build(); + + String clusterAlias = randomBoolean() ? null : randomAlphaOfLengthBetween(5, 10); + + SearchShardIterator searchShardIterator = new SearchShardIterator( + clusterAlias, + shardId, + shardRoutings, + OriginalIndicesTests.randomOriginalIndices() + ); + + searchShardIterator.nextOrNull(); + searchShardIterator.nextOrNull(); + + // Node in zone b is disconnected + DiscoveryNode node = clusterState.nodes().get("node_zone_b"); + // fail open is executed and shard present in node with weighted routing weight zero is returned + SearchShardTarget next = FailAwareWeightedRouting.getInstance() + .findNext(searchShardIterator, clusterState, new NodeNotConnectedException(node, "Node is not " + "connected")); + assertNotNull(next); + assertEquals("node_zone_c", next.getNodeId()); + } + + public void testFindNextWithFailOpenDueToUnassignedShard() throws IOException { + + ClusterState clusterState = setUpCluster(); + + IndexMetadata indexMetadata = IndexMetadata.builder("test") + .settings( + Settings.builder() + .put(SETTING_VERSION_CREATED, Version.CURRENT) + .put(SETTING_NUMBER_OF_SHARDS, 1) + .put(SETTING_NUMBER_OF_REPLICAS, 2) + .put(SETTING_CREATION_DATE, System.currentTimeMillis()) + ) + .build(); + ShardRouting shardRoutingB = TestShardRouting.newShardRouting("test", 0, "node_zone_b", true, ShardRoutingState.STARTED); + + ShardRouting shardRoutingA = TestShardRouting.newShardRouting("test", 0, null, false, ShardRoutingState.UNASSIGNED); + + ShardRouting shardRoutingC = TestShardRouting.newShardRouting("test", 0, "node_zone_c", false, ShardRoutingState.STARTED); + + List shardRoutings = new ArrayList<>(); + shardRoutings.add(shardRoutingA); + shardRoutings.add(shardRoutingB); + shardRoutings.add(shardRoutingC); + + Metadata.Builder metadataBuilder = Metadata.builder(clusterState.metadata()); + metadataBuilder.put(indexMetadata, false).generateClusterUuidIfNeeded(); + IndexRoutingTable.Builder indexRoutingTableBuilder = IndexRoutingTable.builder(indexMetadata.getIndex()); + + final ShardId shardId = new ShardId("test", "_na_", 0); + IndexShardRoutingTable.Builder indexShardRoutingBuilder = new IndexShardRoutingTable.Builder(shardId); + + indexShardRoutingBuilder.addShard(shardRoutingA); + indexShardRoutingBuilder.addShard(shardRoutingB); + indexShardRoutingBuilder.addShard(shardRoutingC); + + indexRoutingTableBuilder.addIndexShard(indexShardRoutingBuilder.build()); + RoutingTable.Builder routingTableBuilder = RoutingTable.builder(); + + routingTableBuilder.add(indexRoutingTableBuilder.build()); + clusterState = ClusterState.builder(clusterState).routingTable(routingTableBuilder.build()).build(); + + String clusterAlias = randomBoolean() ? null : randomAlphaOfLengthBetween(5, 10); + + SearchShardIterator searchShardIterator = new SearchShardIterator( + clusterAlias, + shardId, + shardRoutings, + OriginalIndicesTests.randomOriginalIndices() + ); + + searchShardIterator.nextOrNull(); + searchShardIterator.nextOrNull(); + + // since there is an unassigned shard in the cluster, fail open is executed and shard present in node with + // weighted routing weight zero is returned + SearchShardTarget next = FailAwareWeightedRouting.getInstance() + .findNext(searchShardIterator, clusterState, new OpenSearchRejectedExecutionException()); + assertNotNull(next); + assertEquals("node_zone_c", next.getNodeId()); + } +} diff --git a/server/src/test/java/org/opensearch/cluster/routing/OperationRoutingTests.java b/server/src/test/java/org/opensearch/cluster/routing/OperationRoutingTests.java index d64402a74fba2..ffdb2d39fb817 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/OperationRoutingTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/OperationRoutingTests.java @@ -904,6 +904,7 @@ public void testWeightedOperationRoutingWeightUndefinedForOneZone() throws Excep Settings setting = Settings.builder() .put("cluster.routing.allocation.awareness.attributes", "zone") .put("cluster.routing.allocation.awareness.force.zone.values", "a,b,c") + .put("cluster.routing.weighted.fail_open", false) .build(); threadPool = new TestThreadPool("testThatOnlyNodesSupport"); diff --git a/server/src/test/java/org/opensearch/cluster/structure/RoutingIteratorTests.java b/server/src/test/java/org/opensearch/cluster/structure/RoutingIteratorTests.java index 8f5aa1b764551..4196b8882fa8f 100644 --- a/server/src/test/java/org/opensearch/cluster/structure/RoutingIteratorTests.java +++ b/server/src/test/java/org/opensearch/cluster/structure/RoutingIteratorTests.java @@ -550,22 +550,20 @@ public void testWeightedRoutingWithDifferentWeights() { ShardIterator shardIterator = clusterState.routingTable() .index("test") .shard(0) - .activeInitializingShardsWeightedIt(weightedRouting, clusterState.nodes(), 1); + .activeInitializingShardsWeightedIt(weightedRouting, clusterState.nodes(), 1, false); assertEquals(2, shardIterator.size()); ShardRouting shardRouting; - while (shardIterator.remaining() > 0) { - shardRouting = shardIterator.nextOrNull(); - assertNotNull(shardRouting); - assertFalse(Arrays.asList("node3").contains(shardRouting.currentNodeId())); - } + shardRouting = shardIterator.nextOrNull(); + assertNotNull(shardRouting); + assertFalse(Arrays.asList("node3").contains(shardRouting.currentNodeId())); weights = Map.of("zone1", 1.0, "zone2", 1.0, "zone3", 1.0); weightedRouting = new WeightedRouting("zone", weights); shardIterator = clusterState.routingTable() .index("test") .shard(0) - .activeInitializingShardsWeightedIt(weightedRouting, clusterState.nodes(), 1); + .activeInitializingShardsWeightedIt(weightedRouting, clusterState.nodes(), 1, false); assertEquals(3, shardIterator.size()); weights = Map.of("zone1", -1.0, "zone2", 0.0, "zone3", 1.0); @@ -573,21 +571,21 @@ public void testWeightedRoutingWithDifferentWeights() { shardIterator = clusterState.routingTable() .index("test") .shard(0) - .activeInitializingShardsWeightedIt(weightedRouting, clusterState.nodes(), 1); + .activeInitializingShardsWeightedIt(weightedRouting, clusterState.nodes(), 1, false); assertEquals(1, shardIterator.size()); - while (shardIterator.remaining() > 0) { - shardRouting = shardIterator.nextOrNull(); - assertNotNull(shardRouting); - assertFalse(Arrays.asList("node2", "node1").contains(shardRouting.currentNodeId())); - } + shardRouting = shardIterator.nextOrNull(); + assertNotNull(shardRouting); + assertFalse(Arrays.asList("node2", "node1").contains(shardRouting.currentNodeId())); - weights = Map.of("zone1", 0.0, "zone2", 0.0, "zone3", 0.0); + weights = Map.of("zone1", 3.0, "zone2", 2.0, "zone3", 0.0); weightedRouting = new WeightedRouting("zone", weights); shardIterator = clusterState.routingTable() .index("test") .shard(0) - .activeInitializingShardsWeightedIt(weightedRouting, clusterState.nodes(), 1); - assertEquals(0, shardIterator.size()); + .activeInitializingShardsWeightedIt(weightedRouting, clusterState.nodes(), 1, true); + assertEquals(3, shardIterator.size()); + shardRouting = shardIterator.nextOrNull(); + assertNotNull(shardRouting); } finally { terminate(threadPool); } @@ -646,14 +644,12 @@ public void testWeightedRoutingInMemoryStore() { ShardIterator shardIterator = clusterState.routingTable() .index("test") .shard(0) - .activeInitializingShardsWeightedIt(weightedRouting, clusterState.nodes(), 1); + .activeInitializingShardsWeightedIt(weightedRouting, clusterState.nodes(), 1, false); assertEquals(2, shardIterator.size()); ShardRouting shardRouting; - while (shardIterator.remaining() > 0) { - shardRouting = shardIterator.nextOrNull(); - assertNotNull(shardRouting); - assertFalse(Arrays.asList("node3").contains(shardRouting.currentNodeId())); - } + shardRouting = shardIterator.nextOrNull(); + assertNotNull(shardRouting); + assertFalse(Arrays.asList("node3").contains(shardRouting.currentNodeId())); // Make iterator call with same WeightedRouting instance assertNotNull( @@ -662,13 +658,11 @@ public void testWeightedRoutingInMemoryStore() { shardIterator = clusterState.routingTable() .index("test") .shard(0) - .activeInitializingShardsWeightedIt(weightedRouting, clusterState.nodes(), 1); + .activeInitializingShardsWeightedIt(weightedRouting, clusterState.nodes(), 1, false); assertEquals(2, shardIterator.size()); - while (shardIterator.remaining() > 0) { - shardRouting = shardIterator.nextOrNull(); - assertNotNull(shardRouting); - assertFalse(Arrays.asList("node3").contains(shardRouting.currentNodeId())); - } + shardRouting = shardIterator.nextOrNull(); + assertNotNull(shardRouting); + assertFalse(Arrays.asList("node3").contains(shardRouting.currentNodeId())); // Make iterator call with new instance of WeightedRouting but same weights Map weights1 = Map.of("zone1", 1.0, "zone2", 1.0, "zone3", 0.0); @@ -679,13 +673,11 @@ public void testWeightedRoutingInMemoryStore() { shardIterator = clusterState.routingTable() .index("test") .shard(0) - .activeInitializingShardsWeightedIt(weightedRouting, clusterState.nodes(), 1); + .activeInitializingShardsWeightedIt(weightedRouting, clusterState.nodes(), 1, false); assertEquals(2, shardIterator.size()); - while (shardIterator.remaining() > 0) { - shardRouting = shardIterator.nextOrNull(); - assertNotNull(shardRouting); - assertFalse(Arrays.asList("node3").contains(shardRouting.currentNodeId())); - } + shardRouting = shardIterator.nextOrNull(); + assertNotNull(shardRouting); + assertFalse(Arrays.asList("node3").contains(shardRouting.currentNodeId())); // Make iterator call with different weights Map weights2 = Map.of("zone1", 1.0, "zone2", 0.0, "zone3", 1.0); @@ -696,13 +688,82 @@ public void testWeightedRoutingInMemoryStore() { shardIterator = clusterState.routingTable() .index("test") .shard(0) - .activeInitializingShardsWeightedIt(weightedRouting, clusterState.nodes(), 1); + .activeInitializingShardsWeightedIt(weightedRouting, clusterState.nodes(), 1, false); assertEquals(2, shardIterator.size()); - while (shardIterator.remaining() > 0) { + shardRouting = shardIterator.nextOrNull(); + assertNotNull(shardRouting); + assertFalse(Arrays.asList("node2").contains(shardRouting.currentNodeId())); + + } finally { + terminate(threadPool); + } + } + + /** + * Test to validate that shard routing state is maintained across requests + */ + public void testWeightedRoutingShardState() { + TestThreadPool threadPool = null; + try { + Settings.Builder settings = Settings.builder() + .put("cluster.routing.allocation.node_concurrent_recoveries", 10) + .put("cluster.routing.allocation.awareness.attributes", "zone"); + AllocationService strategy = createAllocationService(settings.build()); + + Metadata metadata = Metadata.builder() + .put(IndexMetadata.builder("test").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(2)) + .build(); + + RoutingTable routingTable = RoutingTable.builder().addAsNew(metadata.index("test")).build(); + + ClusterState clusterState = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) + .metadata(metadata) + .routingTable(routingTable) + .build(); + + threadPool = new TestThreadPool("testThatOnlyNodesSupport"); + ClusterService clusterService = ClusterServiceUtils.createClusterService(threadPool); + + Map node1Attributes = new HashMap<>(); + node1Attributes.put("zone", "zone1"); + Map node2Attributes = new HashMap<>(); + node2Attributes.put("zone", "zone2"); + Map node3Attributes = new HashMap<>(); + node3Attributes.put("zone", "zone3"); + clusterState = ClusterState.builder(clusterState) + .nodes( + DiscoveryNodes.builder() + .add(newNode("node1", unmodifiableMap(node1Attributes))) + .add(newNode("node2", unmodifiableMap(node2Attributes))) + .add(newNode("node3", unmodifiableMap(node3Attributes))) + .localNodeId("node1") + ) + .build(); + clusterState = strategy.reroute(clusterState, "reroute"); + + clusterState = startInitializingShardsAndReroute(strategy, clusterState); + clusterState = startInitializingShardsAndReroute(strategy, clusterState); + + Map weights = Map.of("zone1", 3.0, "zone2", 2.0, "zone3", 0.0); + WeightedRouting weightedRouting = new WeightedRouting("zone", weights); + + Map requestCount = new HashMap<>(); + + for (int i = 0; i < 5; i++) { + ShardIterator shardIterator = clusterState.routingTable() + .index("test") + .shard(0) + .activeInitializingShardsWeightedIt(weightedRouting, clusterState.nodes(), 1, true); + + assertEquals(3, shardIterator.size()); + ShardRouting shardRouting; shardRouting = shardIterator.nextOrNull(); assertNotNull(shardRouting); - assertFalse(Arrays.asList("node2").contains(shardRouting.currentNodeId())); + requestCount.put(shardRouting.currentNodeId(), requestCount.getOrDefault(shardRouting.currentNodeId(), 0) + 1); } + assertEquals(3, requestCount.get("node1").intValue()); + assertEquals(2, requestCount.get("node2").intValue()); + } finally { terminate(threadPool); }