From 44ba994f3f73b7b1c9015c179635c241952a9984 Mon Sep 17 00:00:00 2001 From: Kunal Kotwani Date: Tue, 2 May 2023 16:11:41 -0700 Subject: [PATCH] Add back primary shard preference for queries Signed-off-by: Kunal Kotwani --- CHANGELOG.md | 3 +- .../test/bulk/20_list_of_strings.yml | 2 + .../coordination/RareClusterStateIT.java | 2 +- .../basic/SearchWhileCreatingIndexIT.java | 7 +- .../fetch/subphase/MatchedQueriesIT.java | 1 + .../functionscore/RandomScoreFunctionIT.java | 2 +- .../search/preference/SearchPreferenceIT.java | 27 +++++- .../search/profile/query/QueryProfilerIT.java | 2 + .../search/simple/SimpleSearchIT.java | 2 +- .../shards/ClusterSearchShardsRequest.java | 3 +- .../ClusterSearchShardsRequestBuilder.java | 3 +- .../org/opensearch/action/get/GetRequest.java | 3 +- .../action/get/GetRequestBuilder.java | 3 +- .../action/get/MultiGetRequest.java | 3 +- .../action/get/MultiGetShardRequest.java | 3 +- .../action/search/SearchRequest.java | 3 +- .../action/search/SearchRequestBuilder.java | 3 +- .../MultiTermVectorsShardRequest.java | 3 +- .../termvectors/TermVectorsRequest.java | 4 +- .../TermVectorsRequestBuilder.java | 3 +- .../routing/IndexShardRoutingTable.java | 86 +++++++++++++++++++ .../cluster/routing/OperationRouting.java | 8 ++ .../cluster/routing/Preference.java | 30 +++++++ .../structure/RoutingIteratorTests.java | 76 ++++++++++++++-- .../test/client/RandomizingClient.java | 3 +- 25 files changed, 255 insertions(+), 30 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index a2eb99d2fe96f..a9647e39e5605 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Allow mmap to use new JDK-19 preview APIs in Apache Lucene 9.4+ ([#5151](https://github.com/opensearch-project/OpenSearch/pull/5151)) - Add events correlation engine plugin ([#6854](https://github.com/opensearch-project/OpenSearch/issues/6854)) - Add connectToNodeAsExtension in TransportService ([#6866](https://github.com/opensearch-project/OpenSearch/pull/6866)) +- Add back primary shard preference for queries ([#7375](https://github.com/opensearch-project/OpenSearch/pull/7375) ### Dependencies - Bump `log4j-core` from 2.18.0 to 2.19.0 @@ -102,4 +103,4 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), ### Security [Unreleased 3.0]: https://github.com/opensearch-project/OpenSearch/compare/2.x...HEAD -[Unreleased 2.x]: https://github.com/opensearch-project/OpenSearch/compare/2.5...2.x \ No newline at end of file +[Unreleased 2.x]: https://github.com/opensearch-project/OpenSearch/compare/2.5...2.x diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/bulk/20_list_of_strings.yml b/rest-api-spec/src/main/resources/rest-api-spec/test/bulk/20_list_of_strings.yml index cb3553abbd435..4c81aad35bbb3 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/test/bulk/20_list_of_strings.yml +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/bulk/20_list_of_strings.yml @@ -11,6 +11,8 @@ - do: count: + # we count through the primary in case there is a replica that has not yet fully recovered + preference: _primary index: test_index - match: {count: 2} diff --git a/server/src/internalClusterTest/java/org/opensearch/cluster/coordination/RareClusterStateIT.java b/server/src/internalClusterTest/java/org/opensearch/cluster/coordination/RareClusterStateIT.java index 3b99ce73adf5b..0da039d29ba0b 100644 --- a/server/src/internalClusterTest/java/org/opensearch/cluster/coordination/RareClusterStateIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/cluster/coordination/RareClusterStateIT.java @@ -394,7 +394,7 @@ public void testDelayedMappingPropagationOnReplica() throws Exception { assertNotNull(mapper.mappers().getMapper("field2")); }); - assertBusy(() -> assertTrue(client().prepareGet("index", "2").get().isExists())); + assertBusy(() -> assertTrue(client().prepareGet("index", "2").setPreference("_primary").get().isExists())); // The mappings have not been propagated to the replica yet as a consequence the document count not be indexed // We wait on purpose to make sure that the document is not indexed because the shard operation is stalled diff --git a/server/src/internalClusterTest/java/org/opensearch/search/basic/SearchWhileCreatingIndexIT.java b/server/src/internalClusterTest/java/org/opensearch/search/basic/SearchWhileCreatingIndexIT.java index 1d8512e101f78..6099c5342a9d3 100644 --- a/server/src/internalClusterTest/java/org/opensearch/search/basic/SearchWhileCreatingIndexIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/search/basic/SearchWhileCreatingIndexIT.java @@ -88,8 +88,13 @@ private void searchWhileCreatingIndex(boolean createIndex, int numberOfReplicas) ClusterHealthStatus status = client().admin().cluster().prepareHealth("test").get().getStatus(); while (status != ClusterHealthStatus.GREEN) { // first, verify that search normal search works - SearchResponse searchResponse = client().prepareSearch("test").setQuery(QueryBuilders.termQuery("field", "test")).get(); + SearchResponse searchResponse = client().prepareSearch("test") + .setPreference("_primary") + .setQuery(QueryBuilders.termQuery("field", "test")) + .execute() + .actionGet(); assertHitCount(searchResponse, 1); + Client client = client(); searchResponse = client.prepareSearch("test") .setPreference(preference + Integer.toString(counter++)) diff --git a/server/src/internalClusterTest/java/org/opensearch/search/fetch/subphase/MatchedQueriesIT.java b/server/src/internalClusterTest/java/org/opensearch/search/fetch/subphase/MatchedQueriesIT.java index 488c253535827..9a3505a6e0abe 100644 --- a/server/src/internalClusterTest/java/org/opensearch/search/fetch/subphase/MatchedQueriesIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/search/fetch/subphase/MatchedQueriesIT.java @@ -328,6 +328,7 @@ public void testMatchedWithShould() throws Exception { .should(queryStringQuery("dolor").queryName("dolor")) .should(queryStringQuery("elit").queryName("elit")) ) + .setPreference("_primary") .get(); assertHitCount(searchResponse, 2L); diff --git a/server/src/internalClusterTest/java/org/opensearch/search/functionscore/RandomScoreFunctionIT.java b/server/src/internalClusterTest/java/org/opensearch/search/functionscore/RandomScoreFunctionIT.java index 0701e96b71f38..2176b93079d02 100644 --- a/server/src/internalClusterTest/java/org/opensearch/search/functionscore/RandomScoreFunctionIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/search/functionscore/RandomScoreFunctionIT.java @@ -117,7 +117,7 @@ public void testConsistentHitsWithSameSeed() throws Exception { for (int o = 0; o < outerIters; o++) { final int seed = randomInt(); String preference = randomRealisticUnicodeOfLengthBetween(1, 10); // at least one char!! - // randomPreference should not start with '_' (reserved for known preference types (e.g. _shards) + // randomPreference should not start with '_' (reserved for known preference types (e.g. _shards, _primary) while (preference.startsWith("_")) { preference = randomRealisticUnicodeOfLengthBetween(1, 10); } diff --git a/server/src/internalClusterTest/java/org/opensearch/search/preference/SearchPreferenceIT.java b/server/src/internalClusterTest/java/org/opensearch/search/preference/SearchPreferenceIT.java index c69555d00170b..a9d40c03839c6 100644 --- a/server/src/internalClusterTest/java/org/opensearch/search/preference/SearchPreferenceIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/search/preference/SearchPreferenceIT.java @@ -89,7 +89,9 @@ public void testStopOneNodePreferenceWithRedState() throws IOException { internalCluster().stopRandomDataNode(); client().admin().cluster().prepareHealth().setWaitForStatus(ClusterHealthStatus.RED).get(); String[] preferences = new String[] { + "_primary", "_local", + "_primary_first", "_prefer_nodes:somenode", "_prefer_nodes:server2", "_prefer_nodes:somenode,server2" }; @@ -140,13 +142,32 @@ public void testSimplePreference() { client().prepareIndex("test").setSource("field1", "value1").get(); refresh(); - SearchResponse searchResponse = client().prepareSearch().setQuery(matchAllQuery()).get(); + SearchResponse searchResponse = client().prepareSearch().setQuery(matchAllQuery()).setPreference("_local").execute().actionGet(); + assertThat(searchResponse.getHits().getTotalHits().value, equalTo(1L)); + searchResponse = client().prepareSearch().setQuery(matchAllQuery()).setPreference("_local").execute().actionGet(); + assertThat(searchResponse.getHits().getTotalHits().value, equalTo(1L)); + + searchResponse = client().prepareSearch().setQuery(matchAllQuery()).setPreference("_primary").execute().actionGet(); + assertThat(searchResponse.getHits().getTotalHits().value, equalTo(1L)); + searchResponse = client().prepareSearch().setQuery(matchAllQuery()).setPreference("_primary").execute().actionGet(); assertThat(searchResponse.getHits().getTotalHits().value, equalTo(1L)); - searchResponse = client().prepareSearch().setQuery(matchAllQuery()).setPreference("_local").get(); + searchResponse = client().prepareSearch().setQuery(matchAllQuery()).setPreference("_replica").execute().actionGet(); + assertThat(searchResponse.getHits().getTotalHits().value, equalTo(1L)); + searchResponse = client().prepareSearch().setQuery(matchAllQuery()).setPreference("_replica").execute().actionGet(); + assertThat(searchResponse.getHits().getTotalHits().value, equalTo(1L)); + + searchResponse = client().prepareSearch().setQuery(matchAllQuery()).setPreference("_replica_first").execute().actionGet(); + assertThat(searchResponse.getHits().getTotalHits().value, equalTo(1L)); + searchResponse = client().prepareSearch().setQuery(matchAllQuery()).setPreference("_replica_first").execute().actionGet(); + assertThat(searchResponse.getHits().getTotalHits().value, equalTo(1L)); + + searchResponse = client().prepareSearch().setQuery(matchAllQuery()).setPreference("1234").execute().actionGet(); + assertThat(searchResponse.getHits().getTotalHits().value, equalTo(1L)); + searchResponse = client().prepareSearch().setQuery(matchAllQuery()).setPreference("1234").execute().actionGet(); assertThat(searchResponse.getHits().getTotalHits().value, equalTo(1L)); - searchResponse = client().prepareSearch().setQuery(matchAllQuery()).setPreference("1234").get(); + searchResponse = client().prepareSearch().setQuery(matchAllQuery()).get(); assertThat(searchResponse.getHits().getTotalHits().value, equalTo(1L)); } diff --git a/server/src/internalClusterTest/java/org/opensearch/search/profile/query/QueryProfilerIT.java b/server/src/internalClusterTest/java/org/opensearch/search/profile/query/QueryProfilerIT.java index 8601e2b6d6be9..538c71b17b2fd 100644 --- a/server/src/internalClusterTest/java/org/opensearch/search/profile/query/QueryProfilerIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/search/profile/query/QueryProfilerIT.java @@ -148,6 +148,7 @@ public void testProfileMatchesRegular() throws Exception { .setProfile(false) .addSort("id.keyword", SortOrder.ASC) .setSearchType(SearchType.QUERY_THEN_FETCH) + .setPreference("_primary") .setRequestCache(false); SearchRequestBuilder profile = client().prepareSearch("test") @@ -155,6 +156,7 @@ public void testProfileMatchesRegular() throws Exception { .setProfile(true) .addSort("id.keyword", SortOrder.ASC) .setSearchType(SearchType.QUERY_THEN_FETCH) + .setPreference("_primary") .setRequestCache(false); MultiSearchResponse.Item[] responses = client().prepareMultiSearch().add(vanilla).add(profile).get().getResponses(); diff --git a/server/src/internalClusterTest/java/org/opensearch/search/simple/SimpleSearchIT.java b/server/src/internalClusterTest/java/org/opensearch/search/simple/SimpleSearchIT.java index 2010d39c6cd6c..fdbdb7fa94ed1 100644 --- a/server/src/internalClusterTest/java/org/opensearch/search/simple/SimpleSearchIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/search/simple/SimpleSearchIT.java @@ -99,7 +99,7 @@ public void testSearchRandomPreference() throws InterruptedException, ExecutionE int iters = scaledRandomIntBetween(10, 20); for (int i = 0; i < iters; i++) { String randomPreference = randomUnicodeOfLengthBetween(0, 4); - // randomPreference should not start with '_' (reserved for known preference types (e.g. _shards) + // randomPreference should not start with '_' (reserved for known preference types (e.g. _shards, _primary) while (randomPreference.startsWith("_")) { randomPreference = randomUnicodeOfLengthBetween(0, 4); } diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/shards/ClusterSearchShardsRequest.java b/server/src/main/java/org/opensearch/action/admin/cluster/shards/ClusterSearchShardsRequest.java index 3ab19cc595d98..fb3da57db7291 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/shards/ClusterSearchShardsRequest.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/shards/ClusterSearchShardsRequest.java @@ -152,7 +152,8 @@ public ClusterSearchShardsRequest routing(String... routings) { /** * Sets the preference to execute the search. Defaults to randomize across shards. Can be set to - * {@code _local} to prefer local shards or a custom value, which guarantees that the same order + * {@code _local} to prefer local shards, {@code _primary} to execute only on primary shards, + * or a custom value, which guarantees that the same order * will be used across different requests. */ public ClusterSearchShardsRequest preference(String preference) { diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/shards/ClusterSearchShardsRequestBuilder.java b/server/src/main/java/org/opensearch/action/admin/cluster/shards/ClusterSearchShardsRequestBuilder.java index 53940e47bb0df..674a2c2c36221 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/shards/ClusterSearchShardsRequestBuilder.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/shards/ClusterSearchShardsRequestBuilder.java @@ -76,7 +76,8 @@ public ClusterSearchShardsRequestBuilder setRouting(String... routing) { /** * Sets the preference to execute the search. Defaults to randomize across shards. Can be set to - * {@code _local} to prefer local shards or a custom value, which guarantees that the same order + * {@code _local} to prefer local shards, {@code _primary} to execute only on primary shards, + * or a custom value, which guarantees that the same order * will be used across different requests. */ public ClusterSearchShardsRequestBuilder setPreference(String preference) { diff --git a/server/src/main/java/org/opensearch/action/get/GetRequest.java b/server/src/main/java/org/opensearch/action/get/GetRequest.java index 64148f070cc16..33b4979c41b37 100644 --- a/server/src/main/java/org/opensearch/action/get/GetRequest.java +++ b/server/src/main/java/org/opensearch/action/get/GetRequest.java @@ -150,7 +150,8 @@ public GetRequest routing(String routing) { /** * Sets the preference to execute the search. Defaults to randomize across shards. Can be set to - * {@code _local} to prefer local shards or a custom value, which guarantees that the same order + * {@code _local} to prefer local shards, {@code _primary} to execute only on primary shards, + * or a custom value, which guarantees that the same order * will be used across different requests. */ public GetRequest preference(String preference) { diff --git a/server/src/main/java/org/opensearch/action/get/GetRequestBuilder.java b/server/src/main/java/org/opensearch/action/get/GetRequestBuilder.java index 078c32adeb7c9..c1004a990747d 100644 --- a/server/src/main/java/org/opensearch/action/get/GetRequestBuilder.java +++ b/server/src/main/java/org/opensearch/action/get/GetRequestBuilder.java @@ -73,7 +73,8 @@ public GetRequestBuilder setRouting(String routing) { /** * Sets the preference to execute the search. Defaults to randomize across shards. Can be set to - * {@code _local} to prefer local shards or a custom value, which guarantees that the same order + * {@code _local} to prefer local shards, {@code _primary} to execute only on primary shards, + * or a custom value, which guarantees that the same order * will be used across different requests. */ public GetRequestBuilder setPreference(String preference) { diff --git a/server/src/main/java/org/opensearch/action/get/MultiGetRequest.java b/server/src/main/java/org/opensearch/action/get/MultiGetRequest.java index e73eb81a26a05..dee4db4f18a0c 100644 --- a/server/src/main/java/org/opensearch/action/get/MultiGetRequest.java +++ b/server/src/main/java/org/opensearch/action/get/MultiGetRequest.java @@ -316,7 +316,8 @@ public ActionRequestValidationException validate() { /** * Sets the preference to execute the search. Defaults to randomize across shards. Can be set to - * {@code _local} to prefer local shards or a custom value, which guarantees that the same order + * {@code _local} to prefer local shards, {@code _primary} to execute only on primary shards, + * or a custom value, which guarantees that the same order * will be used across different requests. */ public MultiGetRequest preference(String preference) { diff --git a/server/src/main/java/org/opensearch/action/get/MultiGetShardRequest.java b/server/src/main/java/org/opensearch/action/get/MultiGetShardRequest.java index 9f137b9fdedf2..22d710c38a8c9 100644 --- a/server/src/main/java/org/opensearch/action/get/MultiGetShardRequest.java +++ b/server/src/main/java/org/opensearch/action/get/MultiGetShardRequest.java @@ -94,7 +94,8 @@ public int shardId() { /** * Sets the preference to execute the search. Defaults to randomize across shards. Can be set to - * {@code _local} to prefer local shards or a custom value, which guarantees that the same order + * {@code _local} to prefer local shards, {@code _primary} to execute only on primary shards, + * or a custom value, which guarantees that the same order * will be used across different requests. */ public MultiGetShardRequest preference(String preference) { diff --git a/server/src/main/java/org/opensearch/action/search/SearchRequest.java b/server/src/main/java/org/opensearch/action/search/SearchRequest.java index b16f5613ec895..5c664bf4cc7a0 100644 --- a/server/src/main/java/org/opensearch/action/search/SearchRequest.java +++ b/server/src/main/java/org/opensearch/action/search/SearchRequest.java @@ -432,7 +432,8 @@ public SearchRequest routing(String... routings) { /** * Sets the preference to execute the search. Defaults to randomize across shards. Can be set to - * {@code _local} to prefer local shards or a custom value, which guarantees that the same order + * {@code _local} to prefer local shards, {@code _primary} to execute only on primary shards, + * or a custom value, which guarantees that the same order * will be used across different requests. */ public SearchRequest preference(String preference) { diff --git a/server/src/main/java/org/opensearch/action/search/SearchRequestBuilder.java b/server/src/main/java/org/opensearch/action/search/SearchRequestBuilder.java index 8e3d9d3968fc6..861e1df0203d7 100644 --- a/server/src/main/java/org/opensearch/action/search/SearchRequestBuilder.java +++ b/server/src/main/java/org/opensearch/action/search/SearchRequestBuilder.java @@ -151,7 +151,8 @@ public SearchRequestBuilder setRouting(String... routing) { /** * Sets the preference to execute the search. Defaults to randomize across shards. Can be set to - * {@code _local} to prefer local shards or a custom value, which guarantees that the same order + * {@code _local} to prefer local shards, {@code _primary} to execute only on primary shards, + * or a custom value, which guarantees that the same order * will be used across different requests. */ public SearchRequestBuilder setPreference(String preference) { diff --git a/server/src/main/java/org/opensearch/action/termvectors/MultiTermVectorsShardRequest.java b/server/src/main/java/org/opensearch/action/termvectors/MultiTermVectorsShardRequest.java index d8c0c918cb821..e936dd3a658a3 100644 --- a/server/src/main/java/org/opensearch/action/termvectors/MultiTermVectorsShardRequest.java +++ b/server/src/main/java/org/opensearch/action/termvectors/MultiTermVectorsShardRequest.java @@ -86,7 +86,8 @@ public int shardId() { /** * Sets the preference to execute the search. Defaults to randomize across shards. Can be set to - * {@code _local} to prefer local shards or a custom value, which guarantees that the same order + * {@code _local} to prefer local shards, {@code _primary} to execute only on primary shards, + * or a custom value, which guarantees that the same order * will be used across different requests. */ public MultiTermVectorsShardRequest preference(String preference) { diff --git a/server/src/main/java/org/opensearch/action/termvectors/TermVectorsRequest.java b/server/src/main/java/org/opensearch/action/termvectors/TermVectorsRequest.java index 5cc99c815110e..2a170dec776a4 100644 --- a/server/src/main/java/org/opensearch/action/termvectors/TermVectorsRequest.java +++ b/server/src/main/java/org/opensearch/action/termvectors/TermVectorsRequest.java @@ -336,8 +336,8 @@ public String preference() { /** * Sets the preference to execute the search. Defaults to randomize across - * shards. Can be set to {@code _local} to prefer local shards or a custom value, - * which guarantees that the same order will be used across different + * shards. Can be set to {@code _local} to prefer local shards, {@code _primary} to execute only on primary shards, + * or a custom value, which guarantees that the same order will be used across different * requests. */ public TermVectorsRequest preference(String preference) { diff --git a/server/src/main/java/org/opensearch/action/termvectors/TermVectorsRequestBuilder.java b/server/src/main/java/org/opensearch/action/termvectors/TermVectorsRequestBuilder.java index 93502a02f656a..02cfff1a6682b 100644 --- a/server/src/main/java/org/opensearch/action/termvectors/TermVectorsRequestBuilder.java +++ b/server/src/main/java/org/opensearch/action/termvectors/TermVectorsRequestBuilder.java @@ -96,7 +96,8 @@ public TermVectorsRequestBuilder setRouting(String routing) { /** * Sets the preference to execute the search. Defaults to randomize across shards. Can be set to - * {@code _local} to prefer local shards or a custom value, which guarantees that the same order + * {@code _local} to prefer local shards, {@code _primary} to execute only on primary shards, + * or a custom value, which guarantees that the same order * will be used across different requests. */ public TermVectorsRequestBuilder setPreference(String preference) { diff --git a/server/src/main/java/org/opensearch/cluster/routing/IndexShardRoutingTable.java b/server/src/main/java/org/opensearch/cluster/routing/IndexShardRoutingTable.java index 711a750ade712..eb86f5e46bba6 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/IndexShardRoutingTable.java +++ b/server/src/main/java/org/opensearch/cluster/routing/IndexShardRoutingTable.java @@ -54,6 +54,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; +import java.util.LinkedList; import java.util.List; import java.util.Locale; import java.util.Map; @@ -82,11 +83,13 @@ public class IndexShardRoutingTable implements Iterable { final ShardId shardId; final ShardRouting primary; + final List primaryAsList; final List replicas; final List shards; final List activeShards; final List assignedShards; final Set allAllocationIds; + static final List NO_SHARDS = Collections.emptyList(); final boolean allShardsStarted; private volatile Map activeShardsByAttributes = emptyMap(); @@ -148,6 +151,11 @@ public class IndexShardRoutingTable implements Iterable { } this.allShardsStarted = allShardsStarted; this.primary = primary; + if (primary != null) { + this.primaryAsList = Collections.singletonList(primary); + } else { + this.primaryAsList = Collections.emptyList(); + } this.replicas = Collections.unmodifiableList(replicas); this.activeShards = Collections.unmodifiableList(activeShards); this.assignedShards = Collections.unmodifiableList(assignedShards); @@ -574,6 +582,84 @@ public ShardIterator primaryShardIt() { return new PlainShardIterator(shardId, Collections.emptyList()); } + /** + * Returns true if no primaries are active or initializing for this shard + */ + private boolean noPrimariesActive() { + if (!primaryAsList.isEmpty() && !primaryAsList.get(0).active() && !primaryAsList.get(0).initializing()) { + return true; + } + return false; + } + + public ShardIterator primaryActiveInitializingShardIt() { + if (noPrimariesActive()) { + return new PlainShardIterator(shardId, NO_SHARDS); + } + return primaryShardIt(); + } + + public ShardIterator primaryFirstActiveInitializingShardsIt() { + ArrayList ordered = new ArrayList<>(activeShards.size() + allInitializingShards.size()); + // fill it in a randomized fashion + for (ShardRouting shardRouting : shuffler.shuffle(activeShards)) { + ordered.add(shardRouting); + if (shardRouting.primary()) { + // switch, its the matching node id + ordered.set(ordered.size() - 1, ordered.get(0)); + ordered.set(0, shardRouting); + } + } + // no need to worry about primary first here..., its temporal + if (!allInitializingShards.isEmpty()) { + ordered.addAll(allInitializingShards); + } + return new PlainShardIterator(shardId, ordered); + } + + public ShardIterator replicaActiveInitializingShardIt() { + // If the primaries are unassigned, return an empty list (there aren't + // any replicas to query anyway) + if (noPrimariesActive()) { + return new PlainShardIterator(shardId, NO_SHARDS); + } + + LinkedList ordered = new LinkedList<>(); + for (ShardRouting replica : shuffler.shuffle(replicas)) { + if (replica.active()) { + ordered.addFirst(replica); + } else if (replica.initializing()) { + ordered.addLast(replica); + } + } + return new PlainShardIterator(shardId, ordered); + } + + public ShardIterator replicaFirstActiveInitializingShardsIt() { + // If the primaries are unassigned, return an empty list (there aren't + // any replicas to query anyway) + if (noPrimariesActive()) { + return new PlainShardIterator(shardId, NO_SHARDS); + } + + ArrayList ordered = new ArrayList<>(activeShards.size() + allInitializingShards.size()); + // fill it in a randomized fashion with the active replicas + for (ShardRouting replica : shuffler.shuffle(replicas)) { + if (replica.active()) { + ordered.add(replica); + } + } + + // Add the primary shard + ordered.add(primary); + + // Add initializing shards last + if (!allInitializingShards.isEmpty()) { + ordered.addAll(allInitializingShards); + } + return new PlainShardIterator(shardId, ordered); + } + public ShardIterator onlyNodeActiveInitializingShardsIt(String nodeId) { ArrayList ordered = new ArrayList<>(activeShards.size() + allInitializingShards.size()); int seed = shuffler.nextSeed(); diff --git a/server/src/main/java/org/opensearch/cluster/routing/OperationRouting.java b/server/src/main/java/org/opensearch/cluster/routing/OperationRouting.java index 566eefe2c4f1a..c7026f6c1bfb1 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/OperationRouting.java +++ b/server/src/main/java/org/opensearch/cluster/routing/OperationRouting.java @@ -342,6 +342,14 @@ private ShardIterator preferenceActiveShardIterator( return indexShard.preferNodeActiveInitializingShardsIt(nodesIds); case LOCAL: return indexShard.preferNodeActiveInitializingShardsIt(Collections.singleton(localNodeId)); + case PRIMARY: + return indexShard.primaryActiveInitializingShardIt(); + case REPLICA: + return indexShard.replicaActiveInitializingShardIt(); + case PRIMARY_FIRST: + return indexShard.primaryFirstActiveInitializingShardsIt(); + case REPLICA_FIRST: + return indexShard.replicaFirstActiveInitializingShardsIt(); case ONLY_LOCAL: return indexShard.onlyNodeActiveInitializingShardsIt(localNodeId); case ONLY_NODES: diff --git a/server/src/main/java/org/opensearch/cluster/routing/Preference.java b/server/src/main/java/org/opensearch/cluster/routing/Preference.java index f6f0b62e7905b..a1ea01afa118f 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/Preference.java +++ b/server/src/main/java/org/opensearch/cluster/routing/Preference.java @@ -53,6 +53,26 @@ public enum Preference { */ LOCAL("_local"), + /** + * Route to primary shards + */ + PRIMARY("_primary"), + + /** + * Route to replica shards + */ + REPLICA("_replica"), + + /** + * Route to primary shards first + */ + PRIMARY_FIRST("_primary_first"), + + /** + * Route to replica shards first + */ + REPLICA_FIRST("_replica_first"), + /** * Route to the local shard only */ @@ -92,6 +112,16 @@ public static Preference parse(String preference) { return PREFER_NODES; case "_local": return LOCAL; + case "_primary": + return PRIMARY; + case "_replica": + return REPLICA; + case "_primary_first": + case "_primaryFirst": + return PRIMARY_FIRST; + case "_replica_first": + case "_replicaFirst": + return REPLICA_FIRST; case "_only_local": case "_onlyLocal": return ONLY_LOCAL; diff --git a/server/src/test/java/org/opensearch/cluster/structure/RoutingIteratorTests.java b/server/src/test/java/org/opensearch/cluster/structure/RoutingIteratorTests.java index 59ea0dfca559a..cd645b08a119c 100644 --- a/server/src/test/java/org/opensearch/cluster/structure/RoutingIteratorTests.java +++ b/server/src/test/java/org/opensearch/cluster/structure/RoutingIteratorTests.java @@ -75,6 +75,7 @@ import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.nullValue; import static org.hamcrest.Matchers.sameInstance; +import static org.opensearch.cluster.routing.ShardRoutingState.INITIALIZING; public class RoutingIteratorTests extends OpenSearchAllocationTestCase { public void testEmptyIterator() { @@ -477,7 +478,10 @@ public void testShardsAndPreferNodeRouting() { } } - public void testReplicaShardPreferenceIters() throws Exception { + public void testReplicaShardPreferenceIters() { + AllocationService strategy = createAllocationService( + Settings.builder().put("cluster.routing.allocation.node_concurrent_recoveries", 10).build() + ); OperationRouting operationRouting = new OperationRouting( Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS) @@ -489,19 +493,73 @@ public void testReplicaShardPreferenceIters() throws Exception { RoutingTable routingTable = RoutingTable.builder().addAsNew(metadata.index("test")).build(); - final ClusterState clusterState = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) + ClusterState clusterState = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) .metadata(metadata) .routingTable(routingTable) + .build(); + + clusterState = ClusterState.builder(clusterState) .nodes(DiscoveryNodes.builder().add(newNode("node1")).add(newNode("node2")).add(newNode("node3")).localNodeId("node1")) .build(); + clusterState = strategy.reroute(clusterState, "reroute"); + clusterState = strategy.applyStartedShards(clusterState, clusterState.getRoutingNodes().shardsWithState(INITIALIZING)); - String[] removedPreferences = { "_primary", "_primary_first", "_replica", "_replica_first" }; - for (String pref : removedPreferences) { - expectThrows( - IllegalArgumentException.class, - () -> operationRouting.searchShards(clusterState, new String[] { "test" }, null, pref) - ); - } + clusterState = strategy.reroute(clusterState, "reroute"); // Move replicas to initializing + + // When replicas haven't initialized, it comes back with the primary first, then initializing replicas + GroupShardsIterator shardIterators = operationRouting.searchShards( + clusterState, + new String[] { "test" }, + null, + "_replica_first" + ); + assertThat(shardIterators.size(), equalTo(2)); // two potential shards + ShardIterator iter = shardIterators.iterator().next(); + assertThat(iter.size(), equalTo(3)); // three potential candidates for the shard + ShardRouting routing = iter.nextOrNull(); + assertNotNull(routing); + assertThat(routing.shardId().id(), anyOf(equalTo(0), equalTo(1))); + assertTrue(routing.primary()); // replicas haven't initialized yet, so primary is first + assertTrue(routing.started()); + routing = iter.nextOrNull(); + assertThat(routing.shardId().id(), anyOf(equalTo(0), equalTo(1))); + assertFalse(routing.primary()); + assertTrue(routing.initializing()); + routing = iter.nextOrNull(); + assertThat(routing.shardId().id(), anyOf(equalTo(0), equalTo(1))); + assertFalse(routing.primary()); + assertTrue(routing.initializing()); + clusterState = strategy.applyStartedShards(clusterState, clusterState.getRoutingNodes().shardsWithState(INITIALIZING)); + + clusterState = strategy.applyStartedShards(clusterState, clusterState.getRoutingNodes().shardsWithState(INITIALIZING)); + + shardIterators = operationRouting.searchShards(clusterState, new String[] { "test" }, null, "_replica"); + assertThat(shardIterators.size(), equalTo(2)); // two potential shards + iter = shardIterators.iterator().next(); + assertThat(iter.size(), equalTo(2)); // two potential replicas for the shard + routing = iter.nextOrNull(); + assertNotNull(routing); + assertThat(routing.shardId().id(), anyOf(equalTo(0), equalTo(1))); + assertFalse(routing.primary()); + routing = iter.nextOrNull(); + assertThat(routing.shardId().id(), anyOf(equalTo(0), equalTo(1))); + assertFalse(routing.primary()); + + shardIterators = operationRouting.searchShards(clusterState, new String[] { "test" }, null, "_replica_first"); + assertThat(shardIterators.size(), equalTo(2)); // two potential shards + iter = shardIterators.iterator().next(); + assertThat(iter.size(), equalTo(3)); // three potential candidates for the shard + routing = iter.nextOrNull(); + assertNotNull(routing); + assertThat(routing.shardId().id(), anyOf(equalTo(0), equalTo(1))); + assertFalse(routing.primary()); + routing = iter.nextOrNull(); + assertThat(routing.shardId().id(), anyOf(equalTo(0), equalTo(1))); + assertFalse(routing.primary()); + // finally the primary + routing = iter.nextOrNull(); + assertThat(routing.shardId().id(), anyOf(equalTo(0), equalTo(1))); + assertTrue(routing.primary()); } public void testWeightedRoutingWithDifferentWeights() { diff --git a/test/framework/src/main/java/org/opensearch/test/client/RandomizingClient.java b/test/framework/src/main/java/org/opensearch/test/client/RandomizingClient.java index e0e916c6da0f1..7ec9950d87c63 100644 --- a/test/framework/src/main/java/org/opensearch/test/client/RandomizingClient.java +++ b/test/framework/src/main/java/org/opensearch/test/client/RandomizingClient.java @@ -42,6 +42,7 @@ import org.opensearch.common.unit.TimeValue; import java.util.Arrays; +import java.util.EnumSet; import java.util.Random; import java.util.concurrent.TimeUnit; @@ -61,7 +62,7 @@ public RandomizingClient(Client client, Random random) { // given that they return `size*num_shards` hits instead of `size` defaultSearchType = RandomPicks.randomFrom(random, Arrays.asList(SearchType.DFS_QUERY_THEN_FETCH, SearchType.QUERY_THEN_FETCH)); if (random.nextInt(10) == 0) { - defaultPreference = Preference.LOCAL.type(); + defaultPreference = RandomPicks.randomFrom(random, EnumSet.of(Preference.PRIMARY_FIRST, Preference.LOCAL)).type(); } else if (random.nextInt(10) == 0) { String s = TestUtil.randomRealisticUnicodeString(random, 1, 10); defaultPreference = s.startsWith("_") ? null : s; // '_' is a reserved character