Skip to content

Commit

Permalink
Add back primary shard preference for queries
Browse files Browse the repository at this point in the history
Signed-off-by: Kunal Kotwani <[email protected]>
  • Loading branch information
kotwanikunal committed May 2, 2023
1 parent 18390f9 commit 44ba994
Show file tree
Hide file tree
Showing 25 changed files with 255 additions and 30 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Allow mmap to use new JDK-19 preview APIs in Apache Lucene 9.4+ ([#5151](https://github.com/opensearch-project/OpenSearch/pull/5151))
- Add events correlation engine plugin ([#6854](https://github.com/opensearch-project/OpenSearch/issues/6854))
- Add connectToNodeAsExtension in TransportService ([#6866](https://github.com/opensearch-project/OpenSearch/pull/6866))
- Add back primary shard preference for queries ([#7375](https://github.com/opensearch-project/OpenSearch/pull/7375)

### Dependencies
- Bump `log4j-core` from 2.18.0 to 2.19.0
Expand Down Expand Up @@ -102,4 +103,4 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
### Security

[Unreleased 3.0]: https://github.com/opensearch-project/OpenSearch/compare/2.x...HEAD
[Unreleased 2.x]: https://github.com/opensearch-project/OpenSearch/compare/2.5...2.x
[Unreleased 2.x]: https://github.com/opensearch-project/OpenSearch/compare/2.5...2.x
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@

- do:
count:
# we count through the primary in case there is a replica that has not yet fully recovered
preference: _primary
index: test_index

- match: {count: 2}
Original file line number Diff line number Diff line change
Expand Up @@ -394,7 +394,7 @@ public void testDelayedMappingPropagationOnReplica() throws Exception {
assertNotNull(mapper.mappers().getMapper("field2"));
});

assertBusy(() -> assertTrue(client().prepareGet("index", "2").get().isExists()));
assertBusy(() -> assertTrue(client().prepareGet("index", "2").setPreference("_primary").get().isExists()));

// The mappings have not been propagated to the replica yet as a consequence the document count not be indexed
// We wait on purpose to make sure that the document is not indexed because the shard operation is stalled
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,13 @@ private void searchWhileCreatingIndex(boolean createIndex, int numberOfReplicas)
ClusterHealthStatus status = client().admin().cluster().prepareHealth("test").get().getStatus();
while (status != ClusterHealthStatus.GREEN) {
// first, verify that search normal search works
SearchResponse searchResponse = client().prepareSearch("test").setQuery(QueryBuilders.termQuery("field", "test")).get();
SearchResponse searchResponse = client().prepareSearch("test")
.setPreference("_primary")
.setQuery(QueryBuilders.termQuery("field", "test"))
.execute()
.actionGet();
assertHitCount(searchResponse, 1);

Client client = client();
searchResponse = client.prepareSearch("test")
.setPreference(preference + Integer.toString(counter++))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,7 @@ public void testMatchedWithShould() throws Exception {
.should(queryStringQuery("dolor").queryName("dolor"))
.should(queryStringQuery("elit").queryName("elit"))
)
.setPreference("_primary")
.get();

assertHitCount(searchResponse, 2L);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ public void testConsistentHitsWithSameSeed() throws Exception {
for (int o = 0; o < outerIters; o++) {
final int seed = randomInt();
String preference = randomRealisticUnicodeOfLengthBetween(1, 10); // at least one char!!
// randomPreference should not start with '_' (reserved for known preference types (e.g. _shards)
// randomPreference should not start with '_' (reserved for known preference types (e.g. _shards, _primary)
while (preference.startsWith("_")) {
preference = randomRealisticUnicodeOfLengthBetween(1, 10);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,9 @@ public void testStopOneNodePreferenceWithRedState() throws IOException {
internalCluster().stopRandomDataNode();
client().admin().cluster().prepareHealth().setWaitForStatus(ClusterHealthStatus.RED).get();
String[] preferences = new String[] {
"_primary",
"_local",
"_primary_first",
"_prefer_nodes:somenode",
"_prefer_nodes:server2",
"_prefer_nodes:somenode,server2" };
Expand Down Expand Up @@ -140,13 +142,32 @@ public void testSimplePreference() {
client().prepareIndex("test").setSource("field1", "value1").get();
refresh();

SearchResponse searchResponse = client().prepareSearch().setQuery(matchAllQuery()).get();
SearchResponse searchResponse = client().prepareSearch().setQuery(matchAllQuery()).setPreference("_local").execute().actionGet();
assertThat(searchResponse.getHits().getTotalHits().value, equalTo(1L));
searchResponse = client().prepareSearch().setQuery(matchAllQuery()).setPreference("_local").execute().actionGet();
assertThat(searchResponse.getHits().getTotalHits().value, equalTo(1L));

searchResponse = client().prepareSearch().setQuery(matchAllQuery()).setPreference("_primary").execute().actionGet();
assertThat(searchResponse.getHits().getTotalHits().value, equalTo(1L));
searchResponse = client().prepareSearch().setQuery(matchAllQuery()).setPreference("_primary").execute().actionGet();
assertThat(searchResponse.getHits().getTotalHits().value, equalTo(1L));

searchResponse = client().prepareSearch().setQuery(matchAllQuery()).setPreference("_local").get();
searchResponse = client().prepareSearch().setQuery(matchAllQuery()).setPreference("_replica").execute().actionGet();
assertThat(searchResponse.getHits().getTotalHits().value, equalTo(1L));
searchResponse = client().prepareSearch().setQuery(matchAllQuery()).setPreference("_replica").execute().actionGet();
assertThat(searchResponse.getHits().getTotalHits().value, equalTo(1L));

searchResponse = client().prepareSearch().setQuery(matchAllQuery()).setPreference("_replica_first").execute().actionGet();
assertThat(searchResponse.getHits().getTotalHits().value, equalTo(1L));
searchResponse = client().prepareSearch().setQuery(matchAllQuery()).setPreference("_replica_first").execute().actionGet();
assertThat(searchResponse.getHits().getTotalHits().value, equalTo(1L));

searchResponse = client().prepareSearch().setQuery(matchAllQuery()).setPreference("1234").execute().actionGet();
assertThat(searchResponse.getHits().getTotalHits().value, equalTo(1L));
searchResponse = client().prepareSearch().setQuery(matchAllQuery()).setPreference("1234").execute().actionGet();
assertThat(searchResponse.getHits().getTotalHits().value, equalTo(1L));

searchResponse = client().prepareSearch().setQuery(matchAllQuery()).setPreference("1234").get();
searchResponse = client().prepareSearch().setQuery(matchAllQuery()).get();
assertThat(searchResponse.getHits().getTotalHits().value, equalTo(1L));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,13 +148,15 @@ public void testProfileMatchesRegular() throws Exception {
.setProfile(false)
.addSort("id.keyword", SortOrder.ASC)
.setSearchType(SearchType.QUERY_THEN_FETCH)
.setPreference("_primary")
.setRequestCache(false);

SearchRequestBuilder profile = client().prepareSearch("test")
.setQuery(q)
.setProfile(true)
.addSort("id.keyword", SortOrder.ASC)
.setSearchType(SearchType.QUERY_THEN_FETCH)
.setPreference("_primary")
.setRequestCache(false);

MultiSearchResponse.Item[] responses = client().prepareMultiSearch().add(vanilla).add(profile).get().getResponses();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ public void testSearchRandomPreference() throws InterruptedException, ExecutionE
int iters = scaledRandomIntBetween(10, 20);
for (int i = 0; i < iters; i++) {
String randomPreference = randomUnicodeOfLengthBetween(0, 4);
// randomPreference should not start with '_' (reserved for known preference types (e.g. _shards)
// randomPreference should not start with '_' (reserved for known preference types (e.g. _shards, _primary)
while (randomPreference.startsWith("_")) {
randomPreference = randomUnicodeOfLengthBetween(0, 4);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,8 @@ public ClusterSearchShardsRequest routing(String... routings) {

/**
* Sets the preference to execute the search. Defaults to randomize across shards. Can be set to
* {@code _local} to prefer local shards or a custom value, which guarantees that the same order
* {@code _local} to prefer local shards, {@code _primary} to execute only on primary shards,
* or a custom value, which guarantees that the same order
* will be used across different requests.
*/
public ClusterSearchShardsRequest preference(String preference) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@ public ClusterSearchShardsRequestBuilder setRouting(String... routing) {

/**
* Sets the preference to execute the search. Defaults to randomize across shards. Can be set to
* {@code _local} to prefer local shards or a custom value, which guarantees that the same order
* {@code _local} to prefer local shards, {@code _primary} to execute only on primary shards,
* or a custom value, which guarantees that the same order
* will be used across different requests.
*/
public ClusterSearchShardsRequestBuilder setPreference(String preference) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,8 @@ public GetRequest routing(String routing) {

/**
* Sets the preference to execute the search. Defaults to randomize across shards. Can be set to
* {@code _local} to prefer local shards or a custom value, which guarantees that the same order
* {@code _local} to prefer local shards, {@code _primary} to execute only on primary shards,
* or a custom value, which guarantees that the same order
* will be used across different requests.
*/
public GetRequest preference(String preference) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@ public GetRequestBuilder setRouting(String routing) {

/**
* Sets the preference to execute the search. Defaults to randomize across shards. Can be set to
* {@code _local} to prefer local shards or a custom value, which guarantees that the same order
* {@code _local} to prefer local shards, {@code _primary} to execute only on primary shards,
* or a custom value, which guarantees that the same order
* will be used across different requests.
*/
public GetRequestBuilder setPreference(String preference) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,8 @@ public ActionRequestValidationException validate() {

/**
* Sets the preference to execute the search. Defaults to randomize across shards. Can be set to
* {@code _local} to prefer local shards or a custom value, which guarantees that the same order
* {@code _local} to prefer local shards, {@code _primary} to execute only on primary shards,
* or a custom value, which guarantees that the same order
* will be used across different requests.
*/
public MultiGetRequest preference(String preference) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,8 @@ public int shardId() {

/**
* Sets the preference to execute the search. Defaults to randomize across shards. Can be set to
* {@code _local} to prefer local shards or a custom value, which guarantees that the same order
* {@code _local} to prefer local shards, {@code _primary} to execute only on primary shards,
* or a custom value, which guarantees that the same order
* will be used across different requests.
*/
public MultiGetShardRequest preference(String preference) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -432,7 +432,8 @@ public SearchRequest routing(String... routings) {

/**
* Sets the preference to execute the search. Defaults to randomize across shards. Can be set to
* {@code _local} to prefer local shards or a custom value, which guarantees that the same order
* {@code _local} to prefer local shards, {@code _primary} to execute only on primary shards,
* or a custom value, which guarantees that the same order
* will be used across different requests.
*/
public SearchRequest preference(String preference) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,8 @@ public SearchRequestBuilder setRouting(String... routing) {

/**
* Sets the preference to execute the search. Defaults to randomize across shards. Can be set to
* {@code _local} to prefer local shards or a custom value, which guarantees that the same order
* {@code _local} to prefer local shards, {@code _primary} to execute only on primary shards,
* or a custom value, which guarantees that the same order
* will be used across different requests.
*/
public SearchRequestBuilder setPreference(String preference) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,8 @@ public int shardId() {

/**
* Sets the preference to execute the search. Defaults to randomize across shards. Can be set to
* {@code _local} to prefer local shards or a custom value, which guarantees that the same order
* {@code _local} to prefer local shards, {@code _primary} to execute only on primary shards,
* or a custom value, which guarantees that the same order
* will be used across different requests.
*/
public MultiTermVectorsShardRequest preference(String preference) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -336,8 +336,8 @@ public String preference() {

/**
* Sets the preference to execute the search. Defaults to randomize across
* shards. Can be set to {@code _local} to prefer local shards or a custom value,
* which guarantees that the same order will be used across different
* shards. Can be set to {@code _local} to prefer local shards, {@code _primary} to execute only on primary shards,
* or a custom value, which guarantees that the same order will be used across different
* requests.
*/
public TermVectorsRequest preference(String preference) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,8 @@ public TermVectorsRequestBuilder setRouting(String routing) {

/**
* Sets the preference to execute the search. Defaults to randomize across shards. Can be set to
* {@code _local} to prefer local shards or a custom value, which guarantees that the same order
* {@code _local} to prefer local shards, {@code _primary} to execute only on primary shards,
* or a custom value, which guarantees that the same order
* will be used across different requests.
*/
public TermVectorsRequestBuilder setPreference(String preference) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Locale;
import java.util.Map;
Expand Down Expand Up @@ -82,11 +83,13 @@ public class IndexShardRoutingTable implements Iterable<ShardRouting> {
final ShardId shardId;

final ShardRouting primary;
final List<ShardRouting> primaryAsList;
final List<ShardRouting> replicas;
final List<ShardRouting> shards;
final List<ShardRouting> activeShards;
final List<ShardRouting> assignedShards;
final Set<String> allAllocationIds;
static final List<ShardRouting> NO_SHARDS = Collections.emptyList();
final boolean allShardsStarted;

private volatile Map<AttributesKey, AttributesRoutings> activeShardsByAttributes = emptyMap();
Expand Down Expand Up @@ -148,6 +151,11 @@ public class IndexShardRoutingTable implements Iterable<ShardRouting> {
}
this.allShardsStarted = allShardsStarted;
this.primary = primary;
if (primary != null) {
this.primaryAsList = Collections.singletonList(primary);
} else {
this.primaryAsList = Collections.emptyList();
}
this.replicas = Collections.unmodifiableList(replicas);
this.activeShards = Collections.unmodifiableList(activeShards);
this.assignedShards = Collections.unmodifiableList(assignedShards);
Expand Down Expand Up @@ -574,6 +582,84 @@ public ShardIterator primaryShardIt() {
return new PlainShardIterator(shardId, Collections.emptyList());
}

/**
* Returns true if no primaries are active or initializing for this shard
*/
private boolean noPrimariesActive() {
if (!primaryAsList.isEmpty() && !primaryAsList.get(0).active() && !primaryAsList.get(0).initializing()) {
return true;
}
return false;
}

public ShardIterator primaryActiveInitializingShardIt() {
if (noPrimariesActive()) {
return new PlainShardIterator(shardId, NO_SHARDS);
}
return primaryShardIt();
}

public ShardIterator primaryFirstActiveInitializingShardsIt() {
ArrayList<ShardRouting> ordered = new ArrayList<>(activeShards.size() + allInitializingShards.size());
// fill it in a randomized fashion
for (ShardRouting shardRouting : shuffler.shuffle(activeShards)) {
ordered.add(shardRouting);
if (shardRouting.primary()) {
// switch, its the matching node id
ordered.set(ordered.size() - 1, ordered.get(0));
ordered.set(0, shardRouting);
}
}
// no need to worry about primary first here..., its temporal
if (!allInitializingShards.isEmpty()) {
ordered.addAll(allInitializingShards);
}
return new PlainShardIterator(shardId, ordered);
}

public ShardIterator replicaActiveInitializingShardIt() {
// If the primaries are unassigned, return an empty list (there aren't
// any replicas to query anyway)
if (noPrimariesActive()) {
return new PlainShardIterator(shardId, NO_SHARDS);
}

LinkedList<ShardRouting> ordered = new LinkedList<>();
for (ShardRouting replica : shuffler.shuffle(replicas)) {
if (replica.active()) {
ordered.addFirst(replica);
} else if (replica.initializing()) {
ordered.addLast(replica);
}
}
return new PlainShardIterator(shardId, ordered);
}

public ShardIterator replicaFirstActiveInitializingShardsIt() {
// If the primaries are unassigned, return an empty list (there aren't
// any replicas to query anyway)
if (noPrimariesActive()) {
return new PlainShardIterator(shardId, NO_SHARDS);
}

ArrayList<ShardRouting> ordered = new ArrayList<>(activeShards.size() + allInitializingShards.size());
// fill it in a randomized fashion with the active replicas
for (ShardRouting replica : shuffler.shuffle(replicas)) {
if (replica.active()) {
ordered.add(replica);
}
}

// Add the primary shard
ordered.add(primary);

// Add initializing shards last
if (!allInitializingShards.isEmpty()) {
ordered.addAll(allInitializingShards);
}
return new PlainShardIterator(shardId, ordered);
}

public ShardIterator onlyNodeActiveInitializingShardsIt(String nodeId) {
ArrayList<ShardRouting> ordered = new ArrayList<>(activeShards.size() + allInitializingShards.size());
int seed = shuffler.nextSeed();
Expand Down
Loading

0 comments on commit 44ba994

Please sign in to comment.