Skip to content

Commit

Permalink
[Backport 2.x] Introduce allocation filter to control placement of se…
Browse files Browse the repository at this point in the history
…arch only replicas (#15702)
  • Loading branch information
mch2 authored Sep 5, 2024
1 parent 017f7d4 commit b44242d
Show file tree
Hide file tree
Showing 9 changed files with 546 additions and 198 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.cluster.allocation;

import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.routing.IndexShardRoutingTable;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.test.OpenSearchIntegTestCase;

import java.util.List;
import java.util.stream.Collectors;

import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REPLICATION_TYPE;
import static org.opensearch.cluster.routing.allocation.decider.SearchReplicaAllocationDecider.SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_SETTING;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class SearchReplicaFilteringAllocationIT extends OpenSearchIntegTestCase {

@Override
protected Settings featureFlagSettings() {
return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.READER_WRITER_SPLIT_EXPERIMENTAL, Boolean.TRUE).build();
}

public void testSearchReplicaDedicatedIncludes() {
List<String> nodesIds = internalCluster().startNodes(3);
final String node_0 = nodesIds.get(0);
final String node_1 = nodesIds.get(1);
final String node_2 = nodesIds.get(2);
assertEquals(3, cluster().size());

client().admin()
.cluster()
.prepareUpdateSettings()
.setTransientSettings(
Settings.builder().put(SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_SETTING.getKey() + "_name", node_1 + "," + node_0)
)
.execute()
.actionGet();

createIndex(
"test",
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.put(IndexMetadata.SETTING_NUMBER_OF_SEARCH_REPLICAS, 1)
.put(SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
.build()
);
ensureGreen("test");
// ensure primary is not on node 0 or 1,
IndexShardRoutingTable routingTable = getRoutingTable();
assertEquals(node_2, getNodeName(routingTable.primaryShard().currentNodeId()));

String existingSearchReplicaNode = getNodeName(routingTable.searchOnlyReplicas().get(0).currentNodeId());
String emptyAllowedNode = existingSearchReplicaNode.equals(node_0) ? node_1 : node_0;

// set the included nodes to the other open node, search replica should relocate to that node.
client().admin()
.cluster()
.prepareUpdateSettings()
.setTransientSettings(Settings.builder().put(SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_SETTING.getKey() + "_name", emptyAllowedNode))
.execute()
.actionGet();
ensureGreen("test");

routingTable = getRoutingTable();
assertEquals(node_2, getNodeName(routingTable.primaryShard().currentNodeId()));
assertEquals(emptyAllowedNode, getNodeName(routingTable.searchOnlyReplicas().get(0).currentNodeId()));
}

public void testSearchReplicaDedicatedIncludes_DoNotAssignToOtherNodes() {
List<String> nodesIds = internalCluster().startNodes(3);
final String node_0 = nodesIds.get(0);
final String node_1 = nodesIds.get(1);
final String node_2 = nodesIds.get(2);
assertEquals(3, cluster().size());

// set filter on 1 node and set search replica count to 2 - should leave 1 unassigned
client().admin()
.cluster()
.prepareUpdateSettings()
.setTransientSettings(Settings.builder().put(SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_SETTING.getKey() + "_name", node_1))
.execute()
.actionGet();

logger.info("--> creating an index with no replicas");
createIndex(
"test",
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.put(IndexMetadata.SETTING_NUMBER_OF_SEARCH_REPLICAS, 2)
.put(SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
.build()
);
ensureYellowAndNoInitializingShards("test");
IndexShardRoutingTable routingTable = getRoutingTable();
assertEquals(2, routingTable.searchOnlyReplicas().size());
List<ShardRouting> assignedSearchShards = routingTable.searchOnlyReplicas()
.stream()
.filter(ShardRouting::assignedToNode)
.collect(Collectors.toList());
assertEquals(1, assignedSearchShards.size());
assertEquals(node_1, getNodeName(assignedSearchShards.get(0).currentNodeId()));
assertEquals(1, routingTable.searchOnlyReplicas().stream().filter(ShardRouting::unassigned).count());
}

private IndexShardRoutingTable getRoutingTable() {
IndexShardRoutingTable routingTable = getClusterState().routingTable().index("test").getShards().get(0);
return routingTable;
}

private String getNodeName(String id) {
return getClusterState().nodes().get(id).getName();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SEARCH_REPLICAS;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REPLICATION_TYPE;
import static org.opensearch.cluster.routing.allocation.decider.SearchReplicaAllocationDecider.SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_SETTING;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.SUITE, numDataNodes = 1)
public class SearchOnlyReplicaFeatureFlagIT extends OpenSearchIntegTestCase {
Expand Down Expand Up @@ -52,4 +53,15 @@ public void testUpdateFeatureFlagDisabled() {
});
assertTrue(exception.getMessage().contains("unknown setting"));
}

public void testFilterAllocationSettingNotRegistered() {
expectThrows(IllegalArgumentException.class, () -> {
client().admin()
.cluster()
.prepareUpdateSettings()
.setTransientSettings(Settings.builder().put(SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_SETTING.getKey() + "_name", "node"))
.execute()
.actionGet();
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@
import org.opensearch.cluster.routing.allocation.decider.ResizeAllocationDecider;
import org.opensearch.cluster.routing.allocation.decider.RestoreInProgressAllocationDecider;
import org.opensearch.cluster.routing.allocation.decider.SameShardAllocationDecider;
import org.opensearch.cluster.routing.allocation.decider.SearchReplicaAllocationDecider;
import org.opensearch.cluster.routing.allocation.decider.ShardsLimitAllocationDecider;
import org.opensearch.cluster.routing.allocation.decider.SnapshotInProgressAllocationDecider;
import org.opensearch.cluster.routing.allocation.decider.TargetPoolAllocationDecider;
Expand All @@ -84,6 +85,7 @@
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Setting.Property;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.common.util.set.Sets;
import org.opensearch.core.ParseField;
Expand Down Expand Up @@ -376,6 +378,9 @@ public static Collection<AllocationDecider> createAllocationDeciders(
addAllocationDecider(deciders, new SnapshotInProgressAllocationDecider());
addAllocationDecider(deciders, new RestoreInProgressAllocationDecider());
addAllocationDecider(deciders, new FilterAllocationDecider(settings, clusterSettings));
if (FeatureFlags.READER_WRITER_SPLIT_EXPERIMENTAL_SETTING.get(settings)) {
addAllocationDecider(deciders, new SearchReplicaAllocationDecider(settings, clusterSettings));
}
addAllocationDecider(deciders, new SameShardAllocationDecider(settings, clusterSettings));
addAllocationDecider(deciders, new DiskThresholdDecider(settings, clusterSettings));
addAllocationDecider(deciders, new ThrottlingAllocationDecider(settings, clusterSettings));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.cluster.routing.allocation.decider;

import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.node.DiscoveryNodeFilters;
import org.opensearch.cluster.routing.RoutingNode;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.allocation.RoutingAllocation;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Setting.Property;
import org.opensearch.common.settings.Settings;
import org.opensearch.node.remotestore.RemoteStoreNodeService;

import java.util.Map;

import static org.opensearch.cluster.node.DiscoveryNodeFilters.IP_VALIDATOR;
import static org.opensearch.cluster.node.DiscoveryNodeFilters.OpType.OR;

/**
* This allocation decider is similar to FilterAllocationDecider but provides
* the option to filter specifically for search replicas.
* The filter behaves similar to an include for any defined node attribute.
* A search replica can be allocated to only nodes with one of the specified attributes while
* other shard types will be rejected from nodes with any othe attributes.
* @opensearch.internal
*/
public class SearchReplicaAllocationDecider extends AllocationDecider {

public static final String NAME = "filter";
private static final String SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_PREFIX = "cluster.routing.allocation.search.replica.dedicated.include";
public static final Setting.AffixSetting<String> SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_SETTING = Setting.prefixKeySetting(
SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_PREFIX + ".",
key -> Setting.simpleString(key, value -> IP_VALIDATOR.accept(key, value), Property.Dynamic, Property.NodeScope)
);

private volatile DiscoveryNodeFilters searchReplicaIncludeFilters;

private volatile RemoteStoreNodeService.Direction migrationDirection;
private volatile RemoteStoreNodeService.CompatibilityMode compatibilityMode;

public SearchReplicaAllocationDecider(Settings settings, ClusterSettings clusterSettings) {
setSearchReplicaIncludeFilters(SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_SETTING.getAsMap(settings));
clusterSettings.addAffixMapUpdateConsumer(
SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_SETTING,
this::setSearchReplicaIncludeFilters,
(a, b) -> {}
);
}

@Override
public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
return shouldFilter(shardRouting, node.node(), allocation);
}

@Override
public Decision canRemain(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
return shouldFilter(shardRouting, node.node(), allocation);
}

private Decision shouldFilter(ShardRouting shardRouting, DiscoveryNode node, RoutingAllocation allocation) {
if (searchReplicaIncludeFilters != null) {
final boolean match = searchReplicaIncludeFilters.match(node);
if (match == false && shardRouting.isSearchOnly()) {
return allocation.decision(
Decision.NO,
NAME,
"node does not match shard setting [%s] filters [%s]",
SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_PREFIX,
searchReplicaIncludeFilters
);
}
// filter will only apply to search replicas
if (shardRouting.isSearchOnly() == false && match) {
return allocation.decision(
Decision.NO,
NAME,
"only search replicas can be allocated to node with setting [%s] filters [%s]",
SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_PREFIX,
searchReplicaIncludeFilters
);
}
}
return allocation.decision(Decision.YES, NAME, "node passes include/exclude/require filters");
}

private void setSearchReplicaIncludeFilters(Map<String, String> filters) {
searchReplicaIncludeFilters = DiscoveryNodeFilters.trimTier(
DiscoveryNodeFilters.buildOrUpdateFromKeyValue(searchReplicaIncludeFilters, OR, filters)
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@
import org.opensearch.cluster.routing.allocation.decider.FilterAllocationDecider;
import org.opensearch.cluster.routing.allocation.decider.NodeLoadAwareAllocationDecider;
import org.opensearch.cluster.routing.allocation.decider.SameShardAllocationDecider;
import org.opensearch.cluster.routing.allocation.decider.SearchReplicaAllocationDecider;
import org.opensearch.cluster.routing.allocation.decider.ShardsLimitAllocationDecider;
import org.opensearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider;
import org.opensearch.cluster.service.ClusterApplierService;
Expand Down Expand Up @@ -816,6 +817,8 @@ public void apply(Settings value, Settings current, Settings previous) {
OpenSearchOnHeapCacheSettings.EXPIRE_AFTER_ACCESS_SETTING.getConcreteSettingForNamespace(
CacheType.INDICES_REQUEST_CACHE.getSettingPrefix()
)
)
),
List.of(FeatureFlags.READER_WRITER_SPLIT_EXPERIMENTAL),
List.of(SearchReplicaAllocationDecider.SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_SETTING)
);
}
Original file line number Diff line number Diff line change
Expand Up @@ -138,12 +138,10 @@
import static java.util.Collections.singleton;
import static java.util.Collections.singletonList;
import static org.opensearch.cluster.metadata.IndexMetadata.INDEX_NUMBER_OF_ROUTING_SHARDS_SETTING;
import static org.opensearch.cluster.metadata.IndexMetadata.INDEX_NUMBER_OF_SEARCH_REPLICAS_SETTING;
import static org.opensearch.cluster.metadata.IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING;
import static org.opensearch.cluster.metadata.IndexMetadata.INDEX_READ_ONLY_BLOCK;
import static org.opensearch.cluster.metadata.IndexMetadata.INDEX_REPLICATION_TYPE_SETTING;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SEARCH_REPLICAS;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_READ_ONLY;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REMOTE_SEGMENT_STORE_REPOSITORY;
Expand All @@ -157,7 +155,6 @@
import static org.opensearch.cluster.metadata.MetadataCreateIndexService.getIndexNumberOfRoutingShards;
import static org.opensearch.cluster.metadata.MetadataCreateIndexService.parseV1Mappings;
import static org.opensearch.cluster.metadata.MetadataCreateIndexService.resolveAndValidateAliases;
import static org.opensearch.common.util.FeatureFlags.READER_WRITER_SPLIT_EXPERIMENTAL;
import static org.opensearch.common.util.FeatureFlags.REMOTE_STORE_MIGRATION_EXPERIMENTAL;
import static org.opensearch.index.IndexModule.INDEX_STORE_TYPE_SETTING;
import static org.opensearch.index.IndexSettings.INDEX_MERGE_POLICY;
Expand Down Expand Up @@ -2254,71 +2251,6 @@ public void testIndexCreationWithIndexStoreTypeRemoteStoreThrowsException() {
);
}

public void testDefaultSearchReplicasSetting() {
FeatureFlags.initializeFeatureFlags(Settings.builder().put(READER_WRITER_SPLIT_EXPERIMENTAL, Boolean.TRUE).build());
Settings templateSettings = Settings.EMPTY;
request = new CreateIndexClusterStateUpdateRequest("create index", "test", "test");
final Settings.Builder requestSettings = Settings.builder();
request.settings(requestSettings.build());
Settings indexSettings = aggregateIndexSettings(
ClusterState.EMPTY_STATE,
request,
templateSettings,
null,
Settings.EMPTY,
IndexScopedSettings.DEFAULT_SCOPED_SETTINGS,
randomShardLimitService(),
Collections.emptySet(),
clusterSettings
);
assertFalse(INDEX_NUMBER_OF_SEARCH_REPLICAS_SETTING.exists(indexSettings));
}

public void testSearchReplicasValidationWithSegmentReplication() {
FeatureFlags.initializeFeatureFlags(Settings.builder().put(READER_WRITER_SPLIT_EXPERIMENTAL, Boolean.TRUE).build());
Settings templateSettings = Settings.builder().put(SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT).build();
request = new CreateIndexClusterStateUpdateRequest("create index", "test", "test");
final Settings.Builder requestSettings = Settings.builder().put(SETTING_NUMBER_OF_SEARCH_REPLICAS, 2);
request.settings(requestSettings.build());
Settings indexSettings = aggregateIndexSettings(
ClusterState.EMPTY_STATE,
request,
templateSettings,
null,
Settings.EMPTY,
IndexScopedSettings.DEFAULT_SCOPED_SETTINGS,
randomShardLimitService(),
Collections.emptySet(),
clusterSettings
);
assertEquals("2", indexSettings.get(SETTING_NUMBER_OF_SEARCH_REPLICAS));
assertEquals(ReplicationType.SEGMENT.toString(), indexSettings.get(SETTING_REPLICATION_TYPE));
}

public void testSearchReplicasValidationWithDocumentReplication() {
FeatureFlags.initializeFeatureFlags(Settings.builder().put(READER_WRITER_SPLIT_EXPERIMENTAL, Boolean.TRUE).build());
Settings templateSettings = Settings.builder().put(SETTING_REPLICATION_TYPE, ReplicationType.DOCUMENT).build();
request = new CreateIndexClusterStateUpdateRequest("create index", "test", "test");
final Settings.Builder requestSettings = Settings.builder().put(SETTING_NUMBER_OF_SEARCH_REPLICAS, 2);
request.settings(requestSettings.build());

IllegalArgumentException exception = expectThrows(
IllegalArgumentException.class,
() -> aggregateIndexSettings(
ClusterState.EMPTY_STATE,
request,
templateSettings,
null,
Settings.EMPTY,
IndexScopedSettings.DEFAULT_SCOPED_SETTINGS,
randomShardLimitService(),
Collections.emptySet(),
clusterSettings
)
);
assertEquals("To set index.number_of_search_only_replicas, index.replication.type must be set to SEGMENT", exception.getMessage());
}

public void testCreateIndexWithContextDisabled() throws Exception {
request = new CreateIndexClusterStateUpdateRequest("create index", "test", "test").context(new Context(randomAlphaOfLength(5)));
withTemporaryClusterService((clusterService, threadPool) -> {
Expand Down
Loading

0 comments on commit b44242d

Please sign in to comment.