Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Weighted round-robin scheduling policy for shard coordination traffic… #4241

Merged
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
162cbeb
Weighted round-robin scheduling policy for shard coordination traffic…
Aug 17, 2022
8513c4f
Add caching layer for wrr shard routing and moved wrr routing call to…
Aug 26, 2022
4cdbee3
Integrate ARS with weighted round robin,
Aug 27, 2022
7a49e5c
Remove ARS and add tests for zone with undefined weight
Sep 1, 2022
03c4d23
Add changelog for the commit
Sep 1, 2022
7c64537
Merge branch 'main' into feature/wrr-shard-routing-core
anshu1106 Sep 1, 2022
154a3b9
Fix java doc, add test of WeightedRoundRobinRouting metadata
Sep 1, 2022
3afb4e3
Fix minor change related to shuffling wrr shard routings
Sep 1, 2022
30e17bc
Merge branch 'main' into feature/wrr-shard-routing-core
anshu1106 Sep 2, 2022
3aa6fbb
Add default weight 1 for zones with undefined weight
Sep 7, 2022
9ae063c
Merge remote-tracking branch 'origin/main' into feature/wrr-shard-rou…
Sep 7, 2022
1112366
Inject WRRShardsCache on node start
Sep 12, 2022
cd3e16c
Remove extra new lines
Sep 12, 2022
0c7dbd9
Fix import
Sep 12, 2022
438fe9e
Add size for shard routing cache
Sep 12, 2022
694be4d
Invalidate shard routing cache on close
Sep 12, 2022
9236b8d
Refactor code
Sep 13, 2022
93e7587
Add test for Weighted routing iterator and some refactoring changes
Sep 13, 2022
a9e30d1
Merge remote-tracking branch 'origin/main' into feature/wrr-shard-rou…
Sep 13, 2022
17dc58c
Update metadata minimal supported version
Sep 13, 2022
ed0cc1b
Add cluster setting for default weight
Sep 14, 2022
5a5cb11
Fix tests due to the change
Sep 14, 2022
1c33964
Fix cache concurrency issue
Sep 14, 2022
f8638f2
Spotless check fix
Sep 14, 2022
4016d27
Fix weighted round robin logic case when there is an entity with weig…
Sep 15, 2022
c98606d
Changes weight data type to double
Sep 15, 2022
1bd83f3
Fix test
Sep 15, 2022
8a72dc0
Empty commit
Sep 15, 2022
bff61b9
Empty commit
Sep 15, 2022
9102616
Fix spotless check
Sep 15, 2022
a291634
Create in-memory cache for shard routings
Sep 16, 2022
bd6c9e7
Fix put operation for weighted shard routings
Sep 16, 2022
d9368ab
Add tests for shard routing in-memory store
Sep 17, 2022
a3f0290
Merge branch 'main' into feature/wrr-shard-routing-core
Sep 17, 2022
01239bf
Add java docs and some code refactoring
Sep 17, 2022
dbbb263
Add null check for discovery nodes and single mutex for weighted shar…
Sep 19, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
- Dependency updates (httpcore, mockito, slf4j, httpasyncclient, commons-codec) ([#4308](https://github.com/opensearch-project/OpenSearch/pull/4308))
- Use RemoteSegmentStoreDirectory instead of RemoteDirectory ([#4240](https://github.com/opensearch-project/OpenSearch/pull/4240))
- Plugin ZIP publication groupId value is configurable ([#4156](https://github.com/opensearch-project/OpenSearch/pull/4156))
- Weighted round-robin scheduling policy for shard coordination traffic ([#4241](https://github.com/opensearch-project/OpenSearch/pull/4241))

### Deprecated

Expand Down
14 changes: 14 additions & 0 deletions server/src/main/java/org/opensearch/cluster/ClusterModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.opensearch.cluster.metadata.MetadataMappingService;
import org.opensearch.cluster.metadata.MetadataUpdateSettingsService;
import org.opensearch.cluster.metadata.RepositoriesMetadata;
import org.opensearch.cluster.metadata.WeightedRoundRobinRoutingMetadata;
import org.opensearch.cluster.routing.DelayedAllocationService;
import org.opensearch.cluster.routing.allocation.AllocationService;
import org.opensearch.cluster.routing.allocation.ExistingShardsAllocator;
Expand Down Expand Up @@ -191,6 +192,12 @@ public static List<Entry> getNamedWriteables() {
ComposableIndexTemplateMetadata::readDiffFrom
);
registerMetadataCustom(entries, DataStreamMetadata.TYPE, DataStreamMetadata::new, DataStreamMetadata::readDiffFrom);
registerMetadataCustom(
entries,
WeightedRoundRobinRoutingMetadata.TYPE,
WeightedRoundRobinRoutingMetadata::new,
WeightedRoundRobinRoutingMetadata::readDiffFrom
);
// Task Status (not Diffable)
entries.add(new Entry(Task.Status.class, PersistentTasksNodeService.Status.NAME, PersistentTasksNodeService.Status::new));
return entries;
Expand Down Expand Up @@ -274,6 +281,13 @@ public static List<NamedXContentRegistry.Entry> getNamedXWriteables() {
DataStreamMetadata::fromXContent
)
);
entries.add(
new NamedXContentRegistry.Entry(
Metadata.Custom.class,
new ParseField(WeightedRoundRobinRoutingMetadata.TYPE),
WeightedRoundRobinRoutingMetadata::fromXContent
)
);
return entries;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.cluster.metadata;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.OpenSearchParseException;
import org.opensearch.Version;
import org.opensearch.cluster.AbstractNamedDiffable;
import org.opensearch.cluster.NamedDiff;
import org.opensearch.cluster.routing.WRRWeights;
import org.opensearch.common.Strings;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
import org.opensearch.common.xcontent.ToXContent;
import org.opensearch.common.xcontent.XContentBuilder;
import org.opensearch.common.xcontent.XContentParser;

import java.io.IOException;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.Map;

/**
* Contains metadata for weighted round-robin shard routing weights
*
* @opensearch.internal
*/
public class WeightedRoundRobinRoutingMetadata extends AbstractNamedDiffable<Metadata.Custom> implements Metadata.Custom {
anshu1106 marked this conversation as resolved.
Show resolved Hide resolved
private static final Logger logger = LogManager.getLogger(WeightedRoundRobinRoutingMetadata.class);
public static final String TYPE = "wrr_shard_routing";
anshu1106 marked this conversation as resolved.
Show resolved Hide resolved
private WRRWeights wrrWeight;

public WRRWeights getWrrWeight() {
return wrrWeight;
}

public WeightedRoundRobinRoutingMetadata setWrrWeight(WRRWeights wrrWeight) {
this.wrrWeight = wrrWeight;
return this;
}

public WeightedRoundRobinRoutingMetadata(StreamInput in) throws IOException {
this.wrrWeight = new WRRWeights(in);
}

public WeightedRoundRobinRoutingMetadata(WRRWeights wrrWeight) {
this.wrrWeight = wrrWeight;
}

@Override
public EnumSet<Metadata.XContentContext> context() {
return Metadata.API_AND_GATEWAY;
}

@Override
public String getWriteableName() {
return TYPE;
}

@Override
public Version getMinimalSupportedVersion() {
return Version.V_2_3_0;

}

@Override
public void writeTo(StreamOutput out) throws IOException {
wrrWeight.writeTo(out);
}

public static NamedDiff<Metadata.Custom> readDiffFrom(StreamInput in) throws IOException {
return readDiffFrom(Metadata.Custom.class, TYPE, in);
}

public static WeightedRoundRobinRoutingMetadata fromXContent(XContentParser parser) throws IOException {
String attrKey = null;
Object attrValue;
String attributeName = null;
Map<String, Object> weights = new HashMap<>();
WRRWeights wrrWeight = null;
XContentParser.Token token;
// move to the first alias
parser.nextToken();
String awarenessField = null;

while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
awarenessField = parser.currentName();
if (parser.nextToken() != XContentParser.Token.START_OBJECT) {
throw new OpenSearchParseException("failed to parse wrr metadata [{}], expected object", awarenessField);
}
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
attributeName = parser.currentName();
if (parser.nextToken() != XContentParser.Token.START_OBJECT) {
throw new OpenSearchParseException("failed to parse wrr metadata [{}], expected object", attributeName);
}
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
attrKey = parser.currentName();
} else if (token == XContentParser.Token.VALUE_STRING) {
attrValue = parser.text();
weights.put(attrKey, attrValue);
} else {
throw new OpenSearchParseException("failed to parse wrr metadata attribute [{}], unknown type", attributeName);
}
}
}
} else {
throw new OpenSearchParseException("failed to parse wrr metadata attribute [{}]", attributeName);
}
}
wrrWeight = new WRRWeights(attributeName, weights);
return new WeightedRoundRobinRoutingMetadata(wrrWeight);
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
WeightedRoundRobinRoutingMetadata that = (WeightedRoundRobinRoutingMetadata) o;
return wrrWeight.equals(that.wrrWeight);
}

@Override
public int hashCode() {
return wrrWeight.hashCode();
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException {
toXContent(wrrWeight, builder);
return builder;
}

public static void toXContent(WRRWeights wrrWeight, XContentBuilder builder) throws IOException {
builder.startObject("awareness");
builder.startObject(wrrWeight.attributeName());
for (Map.Entry<String, Object> entry : wrrWeight.weights().entrySet()) {
builder.field(entry.getKey(), entry.getValue());
}
builder.endObject();
builder.endObject();
}

@Override
public String toString() {
return Strings.toString(this);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,77 @@ public ShardIterator activeInitializingShardsRankedIt(
return new PlainShardIterator(shardId, ordered);
}

/**
* Returns an iterator over active and initializing shards, shards are ordered by weighted round-robin scheduling
* policy.
* @param wrrWeight Weighted round-robin weight entity
* @param nodes discovered nodes in the cluster
* @return an iterator over active and initializing shards, ordered by weighted round-robin
* scheduling policy. Making sure that initializing shards are the last to iterate through.
*/
public ShardIterator activeInitializingShardsWRR(WRRWeights wrrWeight, DiscoveryNodes nodes, WRRShardsCache cache) {
final int seed = shuffler.nextSeed();
anshu1106 marked this conversation as resolved.
Show resolved Hide resolved
List<ShardRouting> ordered = new ArrayList<>(activeShards.size() + allInitializingShards.size());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this size relevant ? If the weights are 4:3:2 , we will be adding lot more than shard count .

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah thats true. But we can start with this initial capacity since we can expect minimum of these many shards and list can resize if more shards are added

List<ShardRouting> orderedActiveShards;
if (cache.getCache().get(new WRRShardsCache.Key(shardId)) != null) {
orderedActiveShards = cache.getCache().get(new WRRShardsCache.Key(shardId));
} else {
orderedActiveShards = getShardsWRR(activeShards, wrrWeight, nodes);
cache.getCache().put(new WRRShardsCache.Key(shardId), orderedActiveShards);
}
ordered.addAll(orderedActiveShards);

if (!allInitializingShards.isEmpty()) {
List<ShardRouting> orderedInitializingShards = getShardsWRR(allInitializingShards, wrrWeight, nodes);
ordered.addAll(orderedInitializingShards);
}
return new PlainShardIterator(shardId, ordered);
}

/**
*
* @param shards shards to be ordered using weighted round-robin scheduling policy
* @param wrrWeight weights to be considered for routing
* @param nodes discovered nodes in the cluster
* @return list of shards ordered using weighted round-robin scheduling.
*/
private List<ShardRouting> getShardsWRR(List<ShardRouting> shards, WRRWeights wrrWeight, DiscoveryNodes nodes) {
List<WeightedRoundRobin.Entity<ShardRouting>> weightedShards = calculateShardWeight(shards, wrrWeight, nodes);
WeightedRoundRobin<ShardRouting> wrr = new WeightedRoundRobin<>(weightedShards);
List<WeightedRoundRobin.Entity<ShardRouting>> wrrOrderedActiveShards = wrr.orderEntities();
List<ShardRouting> orderedActiveShards = new ArrayList<>(activeShards.size());
for (WeightedRoundRobin.Entity<ShardRouting> shardRouting : wrrOrderedActiveShards) {
orderedActiveShards.add(shardRouting.getTarget());
}
return orderedActiveShards;
}

/**
*
* @param shards associate weights to shards
* @param wrrWeight weights to be used for association
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit add a description, make this package private and please add tests

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, will add a description. The method is already private. Do you mean we should move it? Tests added in OperationRoutingTests.java do call these methods. Do you think we should have separate tests as well?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes these are critical operations and we might need more tests

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added tests.

* @param nodes discovered nodes in the cluster
* @return list of entity containing shard routing and associated weight.
*/
private List<WeightedRoundRobin.Entity<ShardRouting>> calculateShardWeight(
List<ShardRouting> shards,
WRRWeights wrrWeight,
DiscoveryNodes nodes
) {
List<WeightedRoundRobin.Entity<ShardRouting>> weightedShards = new ArrayList<>();
for (ShardRouting shard : shards) {
DiscoveryNode node = nodes.get(shard.currentNodeId());
anshu1106 marked this conversation as resolved.
Show resolved Hide resolved
String attVal = node.getAttributes().get(wrrWeight.attributeName());
// If weight for a zone is not defined, not considering shards from that zone
if (wrrWeight.weights().get(attVal) == null) {
continue;
}
Double weight = Double.parseDouble(wrrWeight.weights().get(attVal).toString());
weightedShards.add(new WeightedRoundRobin.Entity<>(weight, shard));
}
return weightedShards;
}

private static Set<String> getAllNodeIds(final List<ShardRouting> shards) {
final Set<String> nodeIds = new HashSet<>();
for (ShardRouting shard : shards) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,10 @@

import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.metadata.WeightedRoundRobinRoutingMetadata;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.cluster.routing.allocation.decider.AwarenessAllocationDecider;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.Nullable;
import org.opensearch.common.Strings;
import org.opensearch.common.settings.ClusterSettings;
Expand Down Expand Up @@ -79,6 +81,33 @@ public class OperationRouting {
private volatile boolean useAdaptiveReplicaSelection;
private volatile boolean ignoreAwarenessAttr;

// reads value from cluster setting
private volatile boolean useWeightedRoundRobin;
/**
* Reads value from cluster setting and cluster state to determine if weighted round-robin
* search routing is enabled
* This is true if useWeightedRoundRobin=true and weights are set in cluster metadata.
*/
private volatile boolean isWeightedRoundRobinEnabled;

private volatile WRRWeights wrrWeights;

public WRRShardsCache getWrrShardsCache() {
return wrrShardsCache;
}

private WRRShardsCache wrrShardsCache;

public ClusterService getClusterService() {
return clusterService;
}

public void setClusterService(ClusterService clusterService) {
this.clusterService = clusterService;
}

private ClusterService clusterService;

public OperationRouting(Settings settings, ClusterSettings clusterSettings) {
// whether to ignore awareness attributes when routing requests
this.ignoreAwarenessAttr = clusterSettings.get(IGNORE_AWARENESS_ATTRIBUTES_SETTING);
Expand All @@ -90,6 +119,7 @@ public OperationRouting(Settings settings, ClusterSettings clusterSettings) {
this.useAdaptiveReplicaSelection = USE_ADAPTIVE_REPLICA_SELECTION_SETTING.get(settings);
clusterSettings.addSettingsUpdateConsumer(USE_ADAPTIVE_REPLICA_SELECTION_SETTING, this::setUseAdaptiveReplicaSelection);
clusterSettings.addSettingsUpdateConsumer(IGNORE_AWARENESS_ATTRIBUTES_SETTING, this::setIgnoreAwarenessAttributes);

}

void setUseAdaptiveReplicaSelection(boolean useAdaptiveReplicaSelection) {
Expand All @@ -116,6 +146,10 @@ public boolean ignoreAwarenessAttributes() {
return this.awarenessAttributes.isEmpty() || this.ignoreAwarenessAttr;
}

public WRRWeights getWrrWeights() {
return wrrWeights;
}

public ShardIterator indexShards(ClusterState clusterState, String index, String id, @Nullable String routing) {
return shards(clusterState, index, id, routing).shardsIt();
}
Expand All @@ -139,6 +173,7 @@ public ShardIterator getShards(

public ShardIterator getShards(ClusterState clusterState, String index, int shardId, @Nullable String preference) {
final IndexShardRoutingTable indexShard = clusterState.getRoutingTable().shardRoutingTable(index, shardId);
setWeightedRoundRobinAttributes(clusterState, getClusterService());
anshu1106 marked this conversation as resolved.
Show resolved Hide resolved
return preferenceActiveShardIterator(
indexShard,
clusterState.nodes().getLocalNodeId(),
Expand Down Expand Up @@ -168,6 +203,7 @@ public GroupShardsIterator<ShardIterator> searchShards(
) {
final Set<IndexShardRoutingTable> shards = computeTargetedShards(clusterState, concreteIndices, routing);
final Set<ShardIterator> set = new HashSet<>(shards.size());
setWeightedRoundRobinAttributes(clusterState, getClusterService());
for (IndexShardRoutingTable shard : shards) {
ShardIterator iterator = preferenceActiveShardIterator(
shard,
Expand All @@ -184,6 +220,20 @@ public GroupShardsIterator<ShardIterator> searchShards(
return GroupShardsIterator.sortAndCreate(new ArrayList<>(set));
}

private void setWeightedRoundRobinAttributes(ClusterState clusterState, ClusterService clusterService) {
WeightedRoundRobinRoutingMetadata weightedRoundRobinRoutingMetadata = clusterState.metadata()
.custom(WeightedRoundRobinRoutingMetadata.TYPE);
this.isWeightedRoundRobinEnabled = weightedRoundRobinRoutingMetadata != null ? true : false;
anshu1106 marked this conversation as resolved.
Show resolved Hide resolved
if (this.isWeightedRoundRobinEnabled) {
this.wrrWeights = weightedRoundRobinRoutingMetadata.getWrrWeight();
this.wrrShardsCache = getWrrShardsCache() != null ? getWrrShardsCache() : new WRRShardsCache(clusterService);
anshu1106 marked this conversation as resolved.
Show resolved Hide resolved
}
}

private boolean isWeightedRoundRobinEnabled() {
return isWeightedRoundRobinEnabled;
}

public static ShardIterator getShards(ClusterState clusterState, ShardId shardId) {
final IndexShardRoutingTable shard = clusterState.routingTable().shardRoutingTable(shardId);
return shard.activeInitializingShardsRandomIt();
Expand Down Expand Up @@ -227,6 +277,7 @@ private ShardIterator preferenceActiveShardIterator(
@Nullable ResponseCollectorService collectorService,
@Nullable Map<String, Long> nodeCounts
) {

anshu1106 marked this conversation as resolved.
Show resolved Hide resolved
if (preference == null || preference.isEmpty()) {
return shardRoutings(indexShard, nodes, collectorService, nodeCounts);
}
Expand Down Expand Up @@ -300,7 +351,9 @@ private ShardIterator shardRoutings(
@Nullable ResponseCollectorService collectorService,
@Nullable Map<String, Long> nodeCounts
) {
if (ignoreAwarenessAttributes()) {
if (isWeightedRoundRobinEnabled()) {
return indexShard.activeInitializingShardsWRR(getWrrWeights(), nodes, wrrShardsCache);
} else if (ignoreAwarenessAttributes()) {
if (useAdaptiveReplicaSelection) {
return indexShard.activeInitializingShardsRankedIt(collectorService, nodeCounts);
} else {
Expand Down
Loading