Skip to content

Commit

Permalink
Merge branch 'main' into refactor-extensions-yml
Browse files Browse the repository at this point in the history
Signed-off-by: Sarat Vemulapalli <[email protected]>
  • Loading branch information
saratvemulapalli committed Feb 1, 2023
2 parents 35d526d + 6176ddc commit 4644f41
Show file tree
Hide file tree
Showing 14 changed files with 267 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,16 @@
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.cluster.routing.PreferenceBasedSearchNotAllowedException;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.ShardRoutingState;
import org.opensearch.cluster.routing.WeightedRouting;
import org.opensearch.cluster.routing.WeightedRoutingStats;
import org.opensearch.cluster.routing.allocation.decider.AwarenessAllocationDecider;
import org.opensearch.common.collect.ImmutableOpenMap;
import org.opensearch.common.settings.Settings;
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.index.search.stats.SearchStats;
import org.opensearch.index.shard.ShardId;
import org.opensearch.plugins.Plugin;
import org.opensearch.rest.RestStatus;
import org.opensearch.search.aggregations.Aggregations;
Expand Down Expand Up @@ -869,4 +873,112 @@ public void testPreferenceSearchWithWeightedRouting() throws Exception {
assertEquals(RestStatus.OK.getStatus(), searchResponse.status().getStatus());
}

/**
* Shard routing request is served by data nodes in az with weight set as 0,
* in case shard copies are not available in other azs.(with fail open enabled)
* This is tested by setting up a 3 node cluster with one data node per az.
* Weighted shard routing weight is set as 0 for az-c.
* Indices are created with one replica copy and network disruption is introduced,
* which makes data node in zone-a unresponsive.
* Since there are two copies of a shard, there can be few shards for which copy doesn't exist in zone b.
* Assertions are put to make sure such shard search requests are served by data node in zone c.
* Asserts on fail open stats which captures number of times fail open is executed
* @throws IOException throws exception
*/
public void testWeightedRoutingFailOpenStats() throws Exception {
Settings commonSettings = Settings.builder()
.put("cluster.routing.allocation.awareness.attributes", "zone")
.put("cluster.routing.allocation.awareness.force.zone.values", "a,b,c")
.put("cluster.routing.weighted.fail_open", true)
.build();

int nodeCountPerAZ = 1;
Map<String, List<String>> nodeMap = setupCluster(nodeCountPerAZ, commonSettings);

int numShards = 10;
int numReplicas = 1;
setUpIndexing(numShards, numReplicas);

logger.info("--> setting shard routing weights for weighted round robin");
Map<String, Double> weights = Map.of("a", 1.0, "b", 1.0, "c", 0.0);
setShardRoutingWeights(weights);
WeightedRoutingStats.getInstance().resetFailOpenCount();

logger.info("--> creating network partition disruption");
final String clusterManagerNode1 = internalCluster().getClusterManagerName();
Set<String> nodesInOneSide = Stream.of(clusterManagerNode1, nodeMap.get("b").get(0)).collect(Collectors.toCollection(HashSet::new));
Set<String> nodesInOtherSide = Stream.of(nodeMap.get("a").get(0)).collect(Collectors.toCollection(HashSet::new));

NetworkDisruption networkDisruption = new NetworkDisruption(
new NetworkDisruption.TwoPartitions(nodesInOneSide, nodesInOtherSide),
NetworkDisruption.UNRESPONSIVE
);
internalCluster().setDisruptionScheme(networkDisruption);

DiscoveryNodes dataNodes = internalCluster().clusterService().state().nodes();

Map<String, String> nodeIDMap = new HashMap<>();
for (DiscoveryNode node : dataNodes) {
nodeIDMap.put(node.getName(), node.getId());
}

List<ShardRouting> shardInNodeA = internalCluster().clusterService()
.state()
.getRoutingNodes()
.node(nodeIDMap.get(nodeMap.get("a").get(0)))
.shardsWithState(ShardRoutingState.STARTED);

List<ShardRouting> shardInNodeC = internalCluster().clusterService()
.state()
.getRoutingNodes()
.node(nodeIDMap.get(nodeMap.get("c").get(0)))
.shardsWithState(ShardRoutingState.STARTED);

// fail open will be called for shards in zone-a data node with replica in zone-c data node
Set<ShardId> result = new HashSet<>();
int failOpenShardCount = 0;
for (ShardRouting shardRouting : shardInNodeA) {
result.add(shardRouting.shardId());
}
for (ShardRouting shardRouting : shardInNodeC) {
if (result.contains(shardRouting.shardId())) {
failOpenShardCount++;
}
}

logger.info("--> network disruption is started");
networkDisruption.startDisrupting();

Set<String> hitNodes = new HashSet<>();
logger.info("--> making search requests");

Future<SearchResponse> response = internalCluster().client(nodeMap.get("b").get(0))
.prepareSearch("test")
.setSize(100)
.setQuery(QueryBuilders.matchAllQuery())
.execute();

logger.info("--> network disruption is stopped");
networkDisruption.stopDisrupting();

try {
SearchResponse searchResponse = response.get();
assertEquals(searchResponse.getFailedShards(), 0);
for (int j = 0; j < searchResponse.getHits().getHits().length; j++) {
hitNodes.add(searchResponse.getHits().getAt(j).getShard().getNodeId());
}
} catch (Exception t) {
fail("search should not fail");
}
assertSearchInAZ("b");
assertSearchInAZ("c");
assertNoSearchInAZ("a");

NodesStatsResponse nodeStats = client().admin().cluster().prepareNodesStats().addMetric("weighted_routing").execute().actionGet();
Map<String, NodeStats> stats = nodeStats.getNodesMap();
NodeStats nodeStatsC = stats.get(nodeIDMap.get(nodeMap.get("c").get(0)));
assertEquals(failOpenShardCount, nodeStatsC.getWeightedRoutingStats().getFailOpenCount());
WeightedRoutingStats.getInstance().resetFailOpenCount();
}

}
2 changes: 1 addition & 1 deletion server/src/main/java/org/opensearch/Version.java
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ public class Version implements Comparable<Version>, ToXContentFragment {

// UNRELEASED
public static final Version V_2_4_2 = new Version(2040299, org.apache.lucene.util.Version.LUCENE_9_4_2);
public static final Version V_2_6_0 = new Version(2060099, org.apache.lucene.util.Version.LUCENE_9_4_2);
public static final Version V_2_6_0 = new Version(2060099, org.apache.lucene.util.Version.LUCENE_9_5_0);
public static final Version V_3_0_0 = new Version(3000099, org.apache.lucene.util.Version.LUCENE_9_5_0);
public static final Version CURRENT = V_3_0_0;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.opensearch.action.support.nodes.BaseNodeResponse;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.node.DiscoveryNodeRole;
import org.opensearch.cluster.routing.WeightedRoutingStats;
import org.opensearch.cluster.service.ClusterManagerThrottlingStats;
import org.opensearch.common.Nullable;
import org.opensearch.common.io.stream.StreamInput;
Expand Down Expand Up @@ -126,6 +127,9 @@ public class NodeStats extends BaseNodeResponse implements ToXContentFragment {
@Nullable
private ClusterManagerThrottlingStats clusterManagerThrottlingStats;

@Nullable
private WeightedRoutingStats weightedRoutingStats;

public NodeStats(StreamInput in) throws IOException {
super(in);
timestamp = in.readVLong();
Expand Down Expand Up @@ -162,6 +166,11 @@ public NodeStats(StreamInput in) throws IOException {
} else {
clusterManagerThrottlingStats = null;
}
if (in.getVersion().onOrAfter(Version.V_2_6_0)) {
weightedRoutingStats = in.readOptionalWriteable(WeightedRoutingStats::new);
} else {
weightedRoutingStats = null;
}
}

public NodeStats(
Expand All @@ -184,7 +193,8 @@ public NodeStats(
@Nullable IndexingPressureStats indexingPressureStats,
@Nullable ShardIndexingPressureStats shardIndexingPressureStats,
@Nullable SearchBackpressureStats searchBackpressureStats,
@Nullable ClusterManagerThrottlingStats clusterManagerThrottlingStats
@Nullable ClusterManagerThrottlingStats clusterManagerThrottlingStats,
@Nullable WeightedRoutingStats weightedRoutingStats
) {
super(node);
this.timestamp = timestamp;
Expand All @@ -206,6 +216,7 @@ public NodeStats(
this.shardIndexingPressureStats = shardIndexingPressureStats;
this.searchBackpressureStats = searchBackpressureStats;
this.clusterManagerThrottlingStats = clusterManagerThrottlingStats;
this.weightedRoutingStats = weightedRoutingStats;
}

public long getTimestamp() {
Expand Down Expand Up @@ -325,6 +336,10 @@ public ClusterManagerThrottlingStats getClusterManagerThrottlingStats() {
return clusterManagerThrottlingStats;
}

public WeightedRoutingStats getWeightedRoutingStats() {
return weightedRoutingStats;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
Expand Down Expand Up @@ -356,6 +371,9 @@ public void writeTo(StreamOutput out) throws IOException {
if (out.getVersion().onOrAfter(Version.V_2_6_0)) {
out.writeOptionalWriteable(clusterManagerThrottlingStats);
}
if (out.getVersion().onOrAfter(Version.V_2_6_0)) {
out.writeOptionalWriteable(weightedRoutingStats);
}
}

@Override
Expand Down Expand Up @@ -434,6 +452,10 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
if (getClusterManagerThrottlingStats() != null) {
getClusterManagerThrottlingStats().toXContent(builder, params);
}
if (getWeightedRoutingStats() != null) {
getWeightedRoutingStats().toXContent(builder, params);
}

return builder;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,8 @@ public enum Metric {
INDEXING_PRESSURE("indexing_pressure"),
SHARD_INDEXING_PRESSURE("shard_indexing_pressure"),
SEARCH_BACKPRESSURE("search_backpressure"),
CLUSTER_MANAGER_THROTTLING("cluster_manager_throttling");
CLUSTER_MANAGER_THROTTLING("cluster_manager_throttling"),
WEIGHTED_ROUTING_STATS("weighted_routing");

private String metricName;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,8 @@ protected NodeStats nodeOperation(NodeStatsRequest nodeStatsRequest) {
NodesStatsRequest.Metric.INDEXING_PRESSURE.containedIn(metrics),
NodesStatsRequest.Metric.SHARD_INDEXING_PRESSURE.containedIn(metrics),
NodesStatsRequest.Metric.SEARCH_BACKPRESSURE.containedIn(metrics),
NodesStatsRequest.Metric.CLUSTER_MANAGER_THROTTLING.containedIn(metrics)
NodesStatsRequest.Metric.CLUSTER_MANAGER_THROTTLING.containedIn(metrics),
NodesStatsRequest.Metric.WEIGHTED_ROUTING_STATS.containedIn(metrics)
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ protected ClusterStatsNodeResponse nodeOperation(ClusterStatsNodeRequest nodeReq
false,
false,
false,
false,
false
);
List<ShardStats> shardsStats = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ public SearchShardTarget findNext(final SearchShardIterator shardIt, ClusterStat
SearchShardTarget nextShard = next;
if (canFailOpen(nextShard.getShardId(), exception, clusterState)) {
logger.info(() -> new ParameterizedMessage("{}: Fail open executed due to exception", nextShard.getShardId()), exception);
getWeightedRoutingStats().updateFailOpenCount();
break;
}
next = shardIt.nextOrNull();
Expand All @@ -125,6 +126,7 @@ public ShardRouting findNext(final ShardsIterator shardsIt, ClusterState cluster
ShardRouting nextShard = next;
if (canFailOpen(nextShard.shardId(), exception, clusterState)) {
logger.info(() -> new ParameterizedMessage("{}: Fail open executed due to exception", nextShard.shardId()), exception);
getWeightedRoutingStats().updateFailOpenCount();
break;
}
next = shardsIt.nextOrNull();
Expand All @@ -150,4 +152,8 @@ private boolean hasInActiveShardCopies(ClusterState clusterState, ShardId shardI
}
return false;
}

public WeightedRoutingStats getWeightedRoutingStats() {
return WeightedRoutingStats.getInstance();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.cluster.routing;

import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
import org.opensearch.common.io.stream.Writeable;
import org.opensearch.common.xcontent.ToXContent;
import org.opensearch.common.xcontent.ToXContentFragment;
import org.opensearch.common.xcontent.XContentBuilder;

import java.io.IOException;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;

/**
* Captures weighted shard routing stats per node. See {@link WeightedRoutingService} for more details.
*
* @opensearch.internal
*/
public class WeightedRoutingStats implements ToXContentFragment, Writeable {
// number of times fail open has to be executed for search requests
private AtomicInteger failOpenCount;

private static final WeightedRoutingStats INSTANCE = new WeightedRoutingStats();

public static WeightedRoutingStats getInstance() {
return INSTANCE;
}

private WeightedRoutingStats() {
failOpenCount = new AtomicInteger(0);
}

public WeightedRoutingStats(StreamInput in) throws IOException {
failOpenCount = new AtomicInteger(in.readInt());
}

public void updateFailOpenCount() {
failOpenCount.getAndIncrement();
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException {
builder.startObject("weighted_routing");
builder.startObject("stats");
builder.field("fail_open_count", getFailOpenCount());
builder.endObject();
builder.endObject();
return builder;
}

public int getFailOpenCount() {
return failOpenCount.get();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeInt(failOpenCount.get());
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
WeightedRoutingStats that = (WeightedRoutingStats) o;
return failOpenCount.equals(that.failOpenCount);
}

@Override
public int hashCode() {
return Objects.hash(failOpenCount);
}

public void resetFailOpenCount() {
failOpenCount.set(0);
}
}
7 changes: 5 additions & 2 deletions server/src/main/java/org/opensearch/node/NodeService.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

package org.opensearch.node;

import org.opensearch.cluster.routing.WeightedRoutingStats;
import org.opensearch.core.internal.io.IOUtils;
import org.opensearch.Build;
import org.opensearch.Version;
Expand Down Expand Up @@ -177,7 +178,8 @@ public NodeStats stats(
boolean indexingPressure,
boolean shardIndexingPressure,
boolean searchBackpressure,
boolean clusterManagerThrottling
boolean clusterManagerThrottling,
boolean weightedRoutingStats
) {
// for indices stats we want to include previous allocated shards stats as well (it will
// only be applied to the sensible ones to use, like refresh/merge/flush/indexing stats)
Expand All @@ -201,7 +203,8 @@ public NodeStats stats(
indexingPressure ? this.indexingPressureService.nodeStats() : null,
shardIndexingPressure ? this.indexingPressureService.shardStats(indices) : null,
searchBackpressure ? this.searchBackpressureService.nodeStats() : null,
clusterManagerThrottling ? this.clusterService.getClusterManagerService().getThrottlingStats() : null
clusterManagerThrottling ? this.clusterService.getClusterManagerService().getThrottlingStats() : null,
weightedRoutingStats ? WeightedRoutingStats.getInstance() : null
);
}

Expand Down
Loading

0 comments on commit 4644f41

Please sign in to comment.