Skip to content

Commit

Permalink
Introduce allocation filter to control placement of search only replicas
Browse files Browse the repository at this point in the history
Signed-off-by: Marc Handalian <[email protected]>
  • Loading branch information
mch2 committed Aug 27, 2024
1 parent 6e563e5 commit 264cc15
Show file tree
Hide file tree
Showing 7 changed files with 250 additions and 4 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.cluster.allocation;

import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.routing.IndexShardRoutingTable;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.test.OpenSearchIntegTestCase;

import java.util.List;

import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REPLICATION_TYPE;
import static org.opensearch.cluster.routing.allocation.decider.FilterAllocationDecider.SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_SETTING;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class SearchReplicaFilteringAllocationIT extends OpenSearchIntegTestCase {

@Override
protected Settings featureFlagSettings() {
return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.READER_WRITER_SPLIT_EXPERIMENTAL, Boolean.TRUE).build();
}

public void testSearchReplicaDedicatedIncludes() {
logger.info("--> starting 2 nodes");
List<String> nodesIds = internalCluster().startNodes(3);
final String node_0 = nodesIds.get(0);
final String node_1 = nodesIds.get(1);
final String node_2 = nodesIds.get(2);
assertEquals(3, cluster().size());

client().admin()
.cluster()
.prepareUpdateSettings()
.setTransientSettings(
Settings.builder().put(SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_SETTING.getKey() + "_name", node_1 + "," + node_0)
)
.execute()
.actionGet();

logger.info("--> creating an index with no replicas");
createIndex(
"test",
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.put(IndexMetadata.SETTING_NUMBER_OF_SEARCH_REPLICAS, 1)
.put(SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
.build()
);
ensureGreen("test");
// ensure primary is not on 1 or 2,
IndexShardRoutingTable routingTable = getRoutingTable();
assertEquals(node_2, getNodeName(routingTable.primaryShard().currentNodeId()));

String existingSearchReplicaNode = getNodeName(routingTable.searchOnlyReplicas().get(0).currentNodeId());
String emptyAllowedNode = existingSearchReplicaNode.equals(node_0) ? node_1 : node_0;

// set the included nodes to the other open node.
client().admin()
.cluster()
.prepareUpdateSettings()
.setTransientSettings(Settings.builder().put(SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_SETTING.getKey() + "_name", emptyAllowedNode))
.execute()
.actionGet();
ensureGreen("test");

routingTable = getRoutingTable();
logger.info(routingTable);
assertEquals(node_2, getNodeName(routingTable.primaryShard().currentNodeId()));
logger.info(routingTable.replicaShards());
assertEquals(emptyAllowedNode, getNodeName(routingTable.searchOnlyReplicas().get(0).currentNodeId()));
}

private IndexShardRoutingTable getRoutingTable() {
IndexShardRoutingTable routingTable = getClusterState().routingTable().index("test").getShards().get(0);
return routingTable;
}

private String getNodeName(String id) {
return getClusterState().nodes().get(id).getName();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SEARCH_REPLICAS;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REPLICATION_TYPE;
import static org.opensearch.cluster.routing.UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING;
import static org.opensearch.cluster.routing.allocation.decider.FilterAllocationDecider.SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_SETTING;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.SUITE, numDataNodes = 1)
public class SearchOnlyReplicaFeatureFlagIT extends OpenSearchIntegTestCase {
Expand Down Expand Up @@ -66,4 +67,15 @@ public void testUpdateFeatureFlagDisabled() {
});
assertTrue(settingsException.getMessage().contains("unknown setting"));
}

public void testFilterAllocationSettingNotRegistered() {
expectThrows(SettingsException.class, () -> {
client().admin()
.cluster()
.prepareUpdateSettings()
.setTransientSettings(Settings.builder().put(SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_SETTING.getKey() + "_name", "node"))
.execute()
.actionGet();
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ public List<ShardRouting> getShards() {
}

/**
* Returns a {@link List} of the search only shards in the RoutingTable
* Returns a {@link List} of the search only replicas in the RoutingTable
*
* @return a {@link List} of shards
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ private ShardRouting initializeTargetRelocatingShard() {
relocatingNodeId,
currentNodeId,
primary,
searchOnly,
ShardRoutingState.INITIALIZING,
PeerRecoverySource.INSTANCE,
unassignedInfo,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Setting.Property;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.node.remotestore.RemoteStoreNodeService;

import java.util.Map;
Expand Down Expand Up @@ -84,10 +85,12 @@
public class FilterAllocationDecider extends AllocationDecider {

public static final String NAME = "filter";

private static final String CLUSTER_ROUTING_REQUIRE_GROUP_PREFIX = "cluster.routing.allocation.require";
private static final String CLUSTER_ROUTING_INCLUDE_GROUP_PREFIX = "cluster.routing.allocation.include";
private static final String CLUSTER_ROUTING_EXCLUDE_GROUP_PREFIX = "cluster.routing.allocation.exclude";

private static final String SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_PREFIX = "cluster.routing.allocation.search.replica.dedicated.include";

public static final Setting.AffixSetting<String> CLUSTER_ROUTING_REQUIRE_GROUP_SETTING = Setting.prefixKeySetting(
CLUSTER_ROUTING_REQUIRE_GROUP_PREFIX + ".",
key -> Setting.simpleString(key, value -> IP_VALIDATOR.accept(key, value), Property.Dynamic, Property.NodeScope)
Expand All @@ -100,7 +103,12 @@ public class FilterAllocationDecider extends AllocationDecider {
CLUSTER_ROUTING_EXCLUDE_GROUP_PREFIX + ".",
key -> Setting.simpleString(key, value -> IP_VALIDATOR.accept(key, value), Property.Dynamic, Property.NodeScope)
);
public static final Setting.AffixSetting<String> SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_SETTING = Setting.prefixKeySetting(
SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_PREFIX + ".",
key -> Setting.simpleString(key, value -> IP_VALIDATOR.accept(key, value), Property.Dynamic, Property.NodeScope)
);

private volatile DiscoveryNodeFilters searchReplicaIncludeFilters;
private volatile DiscoveryNodeFilters clusterRequireFilters;
private volatile DiscoveryNodeFilters clusterIncludeFilters;
private volatile DiscoveryNodeFilters clusterExcludeFilters;
Expand All @@ -113,7 +121,6 @@ public FilterAllocationDecider(Settings settings, ClusterSettings clusterSetting
setClusterIncludeFilters(CLUSTER_ROUTING_INCLUDE_GROUP_SETTING.getAsMap(settings));
this.migrationDirection = RemoteStoreNodeService.MIGRATION_DIRECTION_SETTING.get(settings);
this.compatibilityMode = RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING.get(settings);

clusterSettings.addAffixMapUpdateConsumer(CLUSTER_ROUTING_REQUIRE_GROUP_SETTING, this::setClusterRequireFilters, (a, b) -> {});
clusterSettings.addAffixMapUpdateConsumer(CLUSTER_ROUTING_EXCLUDE_GROUP_SETTING, this::setClusterExcludeFilters, (a, b) -> {});
clusterSettings.addAffixMapUpdateConsumer(CLUSTER_ROUTING_INCLUDE_GROUP_SETTING, this::setClusterIncludeFilters, (a, b) -> {});
Expand All @@ -122,6 +129,15 @@ public FilterAllocationDecider(Settings settings, ClusterSettings clusterSetting
RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING,
this::setCompatibilityMode
);

if (FeatureFlags.isEnabled(FeatureFlags.READER_WRITER_SPLIT_EXPERIMENTAL)) {
setSearchReplicaIncludeFilters(SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_SETTING.getAsMap(settings));
clusterSettings.addAffixMapUpdateConsumer(
SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_SETTING,
this::setSearchReplicaIncludeFilters,
(a, b) -> {}
);
}
}

private void setMigrationDirection(RemoteStoreNodeService.Direction migrationDirection) {
Expand Down Expand Up @@ -203,6 +219,9 @@ private Decision shouldFilter(ShardRouting shardRouting, DiscoveryNode node, Rou
decision = shouldIndexFilter(allocation.metadata().getIndexSafe(shardRouting.index()), node, allocation);
if (decision != null) return decision;

decision = shouldSearchReplicaShardTypeFilter(shardRouting, node, allocation);
if (decision != null) return decision;

return allocation.decision(Decision.YES, NAME, "node passes include/exclude/require filters");
}

Expand Down Expand Up @@ -294,6 +313,32 @@ private Decision shouldClusterFilter(DiscoveryNode node, RoutingAllocation alloc
return null;
}

private Decision shouldSearchReplicaShardTypeFilter(ShardRouting routing, DiscoveryNode node, RoutingAllocation allocation) {
if (searchReplicaIncludeFilters != null) {
final boolean match = searchReplicaIncludeFilters.match(node);
if (match == false && routing.isSearchOnly()) {
return allocation.decision(
Decision.NO,
NAME,
"node does not match shard setting [%s] filters [%s]",
SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_PREFIX,
searchReplicaIncludeFilters
);
}
// filter will only apply to search replicas
if (routing.isSearchOnly() == false && match) {
return allocation.decision(
Decision.NO,
NAME,
"only search replicas can be allocated to node with setting [%s] filters [%s]",
SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_PREFIX,
searchReplicaIncludeFilters
);
}
}
return null;
}

private void setClusterRequireFilters(Map<String, String> filters) {
clusterRequireFilters = DiscoveryNodeFilters.trimTier(
DiscoveryNodeFilters.buildOrUpdateFromKeyValue(clusterRequireFilters, AND, filters)
Expand All @@ -311,4 +356,10 @@ private void setClusterExcludeFilters(Map<String, String> filters) {
DiscoveryNodeFilters.buildOrUpdateFromKeyValue(clusterExcludeFilters, OR, filters)
);
}

private void setSearchReplicaIncludeFilters(Map<String, String> filters) {
searchReplicaIncludeFilters = DiscoveryNodeFilters.trimTier(
DiscoveryNodeFilters.buildOrUpdateFromKeyValue(searchReplicaIncludeFilters, OR, filters)
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -804,6 +804,8 @@ public void apply(Settings value, Settings current, Settings previous) {
OpenSearchOnHeapCacheSettings.EXPIRE_AFTER_ACCESS_SETTING.getConcreteSettingForNamespace(
CacheType.INDICES_REQUEST_CACHE.getSettingPrefix()
)
)
),
List.of(FeatureFlags.READER_WRITER_SPLIT_EXPERIMENTAL),
List.of(FilterAllocationDecider.SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_SETTING)
);
}
Original file line number Diff line number Diff line change
Expand Up @@ -462,4 +462,94 @@ public void testMixedModeRemoteStoreAllocation() {
decision = (Decision.Single) filterAllocationDecider.canAllocate(sr, state.getRoutingNodes().node("node2"), allocation);
assertEquals(decision.toString(), Type.NO, decision.type());
}

public void testSearchReplicaRoutingDedicatedIncludes() {
ClusterSettings clusterSettings = new ClusterSettings(Settings.builder().build(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
Settings initialSettings = Settings.builder()
.put("cluster.routing.allocation.search.replica.dedicated.include._id", "node1,node2")
.build();

FilterAllocationDecider filterAllocationDecider = new FilterAllocationDecider(initialSettings, clusterSettings);
AllocationDeciders allocationDeciders = new AllocationDeciders(
Arrays.asList(
filterAllocationDecider,
new SameShardAllocationDecider(Settings.EMPTY, clusterSettings),
new ReplicaAfterPrimaryActiveAllocationDecider()
)
);
AllocationService service = new AllocationService(
allocationDeciders,
new TestGatewayAllocator(),
new BalancedShardsAllocator(Settings.EMPTY),
EmptyClusterInfoService.INSTANCE,
EmptySnapshotsInfoService.INSTANCE
);
ClusterState state = createInitialClusterState(service, Settings.EMPTY, Settings.EMPTY);
RoutingTable routingTable = state.routingTable();
RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, state.getRoutingNodes(), state, null, null, 0);
allocation.debugDecision(true);

ShardRouting searchReplica = ShardRouting.newUnassigned(
routingTable.index("sourceIndex").shard(0).shardId(),
false,
true,
RecoverySource.PeerRecoverySource.INSTANCE,
new UnassignedInfo(UnassignedInfo.Reason.NODE_LEFT, "")
);

ShardRouting regularReplica = ShardRouting.newUnassigned(
routingTable.index("sourceIndex").shard(0).shardId(),
false,
false,
RecoverySource.PeerRecoverySource.INSTANCE,
new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "")
);

ShardRouting primary = ShardRouting.newUnassigned(
routingTable.index("sourceIndex").shard(0).shardId(),
true,
false,
RecoverySource.PeerRecoverySource.INSTANCE,
new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "")
);

Decision.Single decision = (Decision.Single) filterAllocationDecider.canAllocate(
searchReplica,
state.getRoutingNodes().node("node2"),
allocation
);
assertEquals(decision.toString(), Type.YES, decision.type());
decision = (Decision.Single) filterAllocationDecider.canAllocate(searchReplica, state.getRoutingNodes().node("node1"), allocation);
assertEquals(decision.toString(), Type.YES, decision.type());

decision = (Decision.Single) filterAllocationDecider.canAllocate(regularReplica, state.getRoutingNodes().node("node2"), allocation);
assertEquals(decision.toString(), Type.NO, decision.type());
decision = (Decision.Single) filterAllocationDecider.canAllocate(regularReplica, state.getRoutingNodes().node("node1"), allocation);
assertEquals(decision.toString(), Type.NO, decision.type());

decision = (Decision.Single) filterAllocationDecider.canAllocate(primary, state.getRoutingNodes().node("node1"), allocation);
assertEquals(decision.toString(), Type.NO, decision.type());
decision = (Decision.Single) filterAllocationDecider.canAllocate(primary, state.getRoutingNodes().node("node2"), allocation);
assertEquals(decision.toString(), Type.NO, decision.type());

Settings updatedSettings = Settings.builder()
.put("cluster.routing.allocation.search.replica.dedicated.include._id", "node2")
.build();
clusterSettings.applySettings(updatedSettings);

decision = (Decision.Single) filterAllocationDecider.canAllocate(searchReplica, state.getRoutingNodes().node("node2"), allocation);
assertEquals(decision.toString(), Type.YES, decision.type());
decision = (Decision.Single) filterAllocationDecider.canAllocate(searchReplica, state.getRoutingNodes().node("node1"), allocation);
assertEquals(decision.toString(), Type.NO, decision.type());

decision = (Decision.Single) filterAllocationDecider.canAllocate(regularReplica, state.getRoutingNodes().node("node2"), allocation);
assertEquals(decision.toString(), Type.NO, decision.type());
decision = (Decision.Single) filterAllocationDecider.canAllocate(regularReplica, state.getRoutingNodes().node("node1"), allocation);
assertEquals(decision.toString(), Type.YES, decision.type());

decision = (Decision.Single) filterAllocationDecider.canAllocate(primary, state.getRoutingNodes().node("node1"), allocation);
assertEquals(decision.toString(), Type.YES, decision.type());
decision = (Decision.Single) filterAllocationDecider.canAllocate(primary, state.getRoutingNodes().node("node2"), allocation);
assertEquals(decision.toString(), Type.NO, decision.type());
}
}

0 comments on commit 264cc15

Please sign in to comment.