Skip to content

Commit

Permalink
Merge branch 'main' into publication-setting-change
Browse files Browse the repository at this point in the history
Signed-off-by: shwetathareja <[email protected]>
  • Loading branch information
shwetathareja committed Sep 5, 2024
2 parents 736276a + 729e40d commit ef8d91e
Show file tree
Hide file tree
Showing 39 changed files with 1,286 additions and 287 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Reset DiscoveryNodes in all transport node actions request ([#15131](https://github.com/opensearch-project/OpenSearch/pull/15131))
- Relax the join validation for Remote State publication ([#15471](https://github.com/opensearch-project/OpenSearch/pull/15471))
- Static RemotePublication setting added, removed experimental feature flag ([#15478](https://github.com/opensearch-project/OpenSearch/pull/15478))
- MultiTermQueries in keyword fields now default to `indexed` approach and gated behind cluster setting ([#15637](https://github.com/opensearch-project/OpenSearch/pull/15637))

### Dependencies
- Bump `netty` from 4.1.111.Final to 4.1.112.Final ([#15081](https://github.com/opensearch-project/OpenSearch/pull/15081))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.cluster.allocation;

import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.routing.IndexShardRoutingTable;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.test.OpenSearchIntegTestCase;

import java.util.List;
import java.util.stream.Collectors;

import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REPLICATION_TYPE;
import static org.opensearch.cluster.routing.allocation.decider.SearchReplicaAllocationDecider.SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_SETTING;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class SearchReplicaFilteringAllocationIT extends OpenSearchIntegTestCase {

@Override
protected Settings featureFlagSettings() {
return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.READER_WRITER_SPLIT_EXPERIMENTAL, Boolean.TRUE).build();
}

public void testSearchReplicaDedicatedIncludes() {
List<String> nodesIds = internalCluster().startNodes(3);
final String node_0 = nodesIds.get(0);
final String node_1 = nodesIds.get(1);
final String node_2 = nodesIds.get(2);
assertEquals(3, cluster().size());

client().admin()
.cluster()
.prepareUpdateSettings()
.setTransientSettings(
Settings.builder().put(SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_SETTING.getKey() + "_name", node_1 + "," + node_0)
)
.execute()
.actionGet();

createIndex(
"test",
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.put(IndexMetadata.SETTING_NUMBER_OF_SEARCH_REPLICAS, 1)
.put(SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
.build()
);
ensureGreen("test");
// ensure primary is not on node 0 or 1,
IndexShardRoutingTable routingTable = getRoutingTable();
assertEquals(node_2, getNodeName(routingTable.primaryShard().currentNodeId()));

String existingSearchReplicaNode = getNodeName(routingTable.searchOnlyReplicas().get(0).currentNodeId());
String emptyAllowedNode = existingSearchReplicaNode.equals(node_0) ? node_1 : node_0;

// set the included nodes to the other open node, search replica should relocate to that node.
client().admin()
.cluster()
.prepareUpdateSettings()
.setTransientSettings(Settings.builder().put(SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_SETTING.getKey() + "_name", emptyAllowedNode))
.execute()
.actionGet();
ensureGreen("test");

routingTable = getRoutingTable();
assertEquals(node_2, getNodeName(routingTable.primaryShard().currentNodeId()));
assertEquals(emptyAllowedNode, getNodeName(routingTable.searchOnlyReplicas().get(0).currentNodeId()));
}

public void testSearchReplicaDedicatedIncludes_DoNotAssignToOtherNodes() {
List<String> nodesIds = internalCluster().startNodes(3);
final String node_0 = nodesIds.get(0);
final String node_1 = nodesIds.get(1);
final String node_2 = nodesIds.get(2);
assertEquals(3, cluster().size());

// set filter on 1 node and set search replica count to 2 - should leave 1 unassigned
client().admin()
.cluster()
.prepareUpdateSettings()
.setTransientSettings(Settings.builder().put(SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_SETTING.getKey() + "_name", node_1))
.execute()
.actionGet();

logger.info("--> creating an index with no replicas");
createIndex(
"test",
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.put(IndexMetadata.SETTING_NUMBER_OF_SEARCH_REPLICAS, 2)
.put(SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
.build()
);
ensureYellowAndNoInitializingShards("test");
IndexShardRoutingTable routingTable = getRoutingTable();
assertEquals(2, routingTable.searchOnlyReplicas().size());
List<ShardRouting> assignedSearchShards = routingTable.searchOnlyReplicas()
.stream()
.filter(ShardRouting::assignedToNode)
.collect(Collectors.toList());
assertEquals(1, assignedSearchShards.size());
assertEquals(node_1, getNodeName(assignedSearchShards.get(0).currentNodeId()));
assertEquals(1, routingTable.searchOnlyReplicas().stream().filter(ShardRouting::unassigned).count());
}

private IndexShardRoutingTable getRoutingTable() {
IndexShardRoutingTable routingTable = getClusterState().routingTable().index("test").getShards().get(0);
return routingTable;
}

private String getNodeName(String id) {
return getClusterState().nodes().get(id).getName();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SEARCH_REPLICAS;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REPLICATION_TYPE;
import static org.opensearch.cluster.routing.allocation.decider.SearchReplicaAllocationDecider.SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_SETTING;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.SUITE, numDataNodes = 1)
public class SearchOnlyReplicaFeatureFlagIT extends OpenSearchIntegTestCase {
Expand Down Expand Up @@ -53,4 +54,15 @@ public void testUpdateFeatureFlagDisabled() {
});
assertTrue(settingsException.getMessage().contains("unknown setting"));
}

public void testFilterAllocationSettingNotRegistered() {
expectThrows(SettingsException.class, () -> {
client().admin()
.cluster()
.prepareUpdateSettings()
.setTransientSettings(Settings.builder().put(SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_SETTING.getKey() + "_name", "node"))
.execute()
.actionGet();
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,17 @@

package org.opensearch.indices.settings;

import org.opensearch.action.search.SearchResponse;
import org.opensearch.action.support.WriteRequest;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.routing.IndexShardRoutingTable;
import org.opensearch.cluster.routing.Preference;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.test.InternalTestCluster;
import org.opensearch.test.OpenSearchIntegTestCase;
Expand Down Expand Up @@ -110,7 +113,6 @@ public void testFailoverWithSearchReplica_WithWriterReplicas() throws IOExceptio
// add back a node
internalCluster().startDataOnlyNode();
ensureGreen(TEST_INDEX);

}

public void testFailoverWithSearchReplica_WithoutWriterReplicas() throws IOException {
Expand Down Expand Up @@ -175,6 +177,39 @@ public void testSearchReplicaScaling() {
assertActiveSearchShards(0);
}

public void testSearchReplicaRoutingPreference() throws IOException {
int numSearchReplicas = 1;
int numWriterReplicas = 1;
internalCluster().startClusterManagerOnlyNode();
String primaryNodeName = internalCluster().startDataOnlyNode();
createIndex(
TEST_INDEX,
Settings.builder()
.put(indexSettings())
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, numWriterReplicas)
.put(IndexMetadata.SETTING_NUMBER_OF_SEARCH_REPLICAS, numSearchReplicas)
.build()
);
ensureYellow(TEST_INDEX);
client().prepareIndex(TEST_INDEX).setId("1").setSource("foo", "bar").setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get();
// add 2 nodes for the replicas
internalCluster().startDataOnlyNodes(2);
ensureGreen(TEST_INDEX);

assertActiveShardCounts(numSearchReplicas, numWriterReplicas);

// set preference to search replica here - we default to this when there are
// search replicas but tests will randomize this value if unset
SearchResponse response = client().prepareSearch(TEST_INDEX)
.setPreference(Preference.SEARCH_REPLICA.type())
.setQuery(QueryBuilders.matchAllQuery())
.get();

String nodeId = response.getHits().getAt(0).getShard().getNodeId();
IndexShardRoutingTable indexShardRoutingTable = getIndexShardRoutingTable();
assertEquals(nodeId, indexShardRoutingTable.searchOnlyReplicas().get(0).currentNodeId());
}

/**
* Helper to assert counts of active shards for each type.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
import static org.hamcrest.Matchers.lessThan;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class DeleteSnapshotITV2 extends AbstractSnapshotIntegTestCase {
public class DeleteSnapshotV2IT extends AbstractSnapshotIntegTestCase {

private static final String REMOTE_REPO_NAME = "remote-store-repo-name";

Expand Down Expand Up @@ -276,9 +276,11 @@ public void testRemoteStoreCleanupForDeletedIndexForSnapshotV2() throws Exceptio
Path indexPath = Path.of(String.valueOf(remoteStoreRepoPath), indexUUID);
Path shardPath = Path.of(String.valueOf(indexPath), "0");
Path segmentsPath = Path.of(String.valueOf(shardPath), "segments");
Path translogPath = Path.of(String.valueOf(shardPath), "translog");

// Get total segments remote store directory file count for deleted index and shard 0
int segmentFilesCountBeforeDeletingSnapshot1 = RemoteStoreBaseIntegTestCase.getFileCount(segmentsPath);
int translogFilesCountBeforeDeletingSnapshot1 = RemoteStoreBaseIntegTestCase.getFileCount(translogPath);

RemoteStoreSettings.setPinnedTimestampsLookbackInterval(TimeValue.ZERO);

Expand Down Expand Up @@ -312,6 +314,13 @@ public void testRemoteStoreCleanupForDeletedIndexForSnapshotV2() throws Exceptio
assertThat(RemoteStoreBaseIntegTestCase.getFileCount(segmentsPath), lessThan(segmentFilesCountAfterDeletingSnapshot1));
} catch (Exception e) {}
}, 60, TimeUnit.SECONDS);

assertBusy(() -> {
try {
assertThat(RemoteStoreBaseIntegTestCase.getFileCount(translogPath), lessThan(translogFilesCountBeforeDeletingSnapshot1));
} catch (Exception e) {}
}, 60, TimeUnit.SECONDS);

}

private Settings snapshotV2Settings(Path remoteStoreRepoPath) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ public CommonStatsFlags(StreamInput in) throws IOException {
includeCaches = in.readEnumSet(CacheType.class);
levels = in.readStringArray();
}
if (in.getVersion().onOrAfter(Version.V_3_0_0)) {
if (in.getVersion().onOrAfter(Version.V_2_17_0)) {
includeIndicesStatsByLevel = in.readBoolean();
}
}
Expand All @@ -128,7 +128,7 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeEnumSet(includeCaches);
out.writeStringArrayNullable(levels);
}
if (out.getVersion().onOrAfter(Version.V_3_0_0)) {
if (out.getVersion().onOrAfter(Version.V_2_17_0)) {
out.writeBoolean(includeIndicesStatsByLevel);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@
import org.opensearch.cluster.routing.allocation.decider.ResizeAllocationDecider;
import org.opensearch.cluster.routing.allocation.decider.RestoreInProgressAllocationDecider;
import org.opensearch.cluster.routing.allocation.decider.SameShardAllocationDecider;
import org.opensearch.cluster.routing.allocation.decider.SearchReplicaAllocationDecider;
import org.opensearch.cluster.routing.allocation.decider.ShardsLimitAllocationDecider;
import org.opensearch.cluster.routing.allocation.decider.SnapshotInProgressAllocationDecider;
import org.opensearch.cluster.routing.allocation.decider.TargetPoolAllocationDecider;
Expand All @@ -85,6 +86,7 @@
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Setting.Property;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.common.util.set.Sets;
import org.opensearch.core.ParseField;
Expand Down Expand Up @@ -379,6 +381,9 @@ public static Collection<AllocationDecider> createAllocationDeciders(
addAllocationDecider(deciders, new SnapshotInProgressAllocationDecider());
addAllocationDecider(deciders, new RestoreInProgressAllocationDecider());
addAllocationDecider(deciders, new FilterAllocationDecider(settings, clusterSettings));
if (FeatureFlags.READER_WRITER_SPLIT_EXPERIMENTAL_SETTING.get(settings)) {
addAllocationDecider(deciders, new SearchReplicaAllocationDecider(settings, clusterSettings));
}
addAllocationDecider(deciders, new SameShardAllocationDecider(settings, clusterSettings));
addAllocationDecider(deciders, new DiskThresholdDecider(settings, clusterSettings));
addAllocationDecider(deciders, new ThrottlingAllocationDecider(settings, clusterSettings));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -647,15 +647,11 @@ public ShardIterator replicaActiveInitializingShardIt() {
return new PlainShardIterator(shardId, Collections.emptyList());
}

LinkedList<ShardRouting> ordered = new LinkedList<>();
for (ShardRouting replica : shuffler.shuffle(replicas)) {
if (replica.active()) {
ordered.addFirst(replica);
} else if (replica.initializing()) {
ordered.addLast(replica);
}
}
return new PlainShardIterator(shardId, ordered);
return filterAndOrderShards(replica -> true);
}

public ShardIterator searchReplicaActiveInitializingShardIt() {
return filterAndOrderShards(ShardRouting::isSearchOnly);
}

/**
Expand Down Expand Up @@ -686,6 +682,20 @@ public ShardIterator replicaFirstActiveInitializingShardsIt() {
return new PlainShardIterator(shardId, ordered);
}

private ShardIterator filterAndOrderShards(Predicate<ShardRouting> filter) {
LinkedList<ShardRouting> ordered = new LinkedList<>();
for (ShardRouting replica : shuffler.shuffle(replicas)) {
if (filter.test(replica)) {
if (replica.active()) {
ordered.addFirst(replica);
} else if (replica.initializing()) {
ordered.addLast(replica);
}
}
}
return new PlainShardIterator(shardId, ordered);
}

/**
* Returns an iterator on active and initializing shards residing on the provided nodeId.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ public class OperationRouting {
private volatile boolean isFailOpenEnabled;
private volatile boolean isStrictWeightedShardRouting;
private volatile boolean ignoreWeightedRouting;
private final boolean isReaderWriterSplitEnabled;

public OperationRouting(Settings settings, ClusterSettings clusterSettings) {
// whether to ignore awareness attributes when routing requests
Expand All @@ -141,6 +142,7 @@ public OperationRouting(Settings settings, ClusterSettings clusterSettings) {
clusterSettings.addSettingsUpdateConsumer(WEIGHTED_ROUTING_FAILOPEN_ENABLED, this::setFailOpenEnabled);
clusterSettings.addSettingsUpdateConsumer(STRICT_WEIGHTED_SHARD_ROUTING_ENABLED, this::setStrictWeightedShardRouting);
clusterSettings.addSettingsUpdateConsumer(IGNORE_WEIGHTED_SHARD_ROUTING, this::setIgnoreWeightedRouting);
this.isReaderWriterSplitEnabled = FeatureFlags.READER_WRITER_SPLIT_EXPERIMENTAL_SETTING.get(settings);
}

void setUseAdaptiveReplicaSelection(boolean useAdaptiveReplicaSelection) {
Expand Down Expand Up @@ -254,6 +256,14 @@ public GroupShardsIterator<ShardIterator> searchShards(
preference = Preference.PRIMARY_FIRST.type();
}

if (isReaderWriterSplitEnabled) {
if (preference == null || preference.isEmpty()) {
if (indexMetadataForShard.getNumberOfSearchOnlyReplicas() > 0) {
preference = Preference.SEARCH_REPLICA.type();
}
}
}

ShardIterator iterator = preferenceActiveShardIterator(
shard,
clusterState.nodes().getLocalNodeId(),
Expand Down Expand Up @@ -366,6 +376,8 @@ private ShardIterator preferenceActiveShardIterator(
return indexShard.primaryFirstActiveInitializingShardsIt();
case REPLICA_FIRST:
return indexShard.replicaFirstActiveInitializingShardsIt();
case SEARCH_REPLICA:
return indexShard.searchReplicaActiveInitializingShardIt();
case ONLY_LOCAL:
return indexShard.onlyNodeActiveInitializingShardsIt(localNodeId);
case ONLY_NODES:
Expand Down
Loading

0 comments on commit ef8d91e

Please sign in to comment.