Skip to content

Commit

Permalink
Refactor code
Browse files Browse the repository at this point in the history
Signed-off-by: Anshu Agarwal <[email protected]>
  • Loading branch information
Anshu Agarwal committed Nov 10, 2022
1 parent 02f1fa8 commit 9e69711
Show file tree
Hide file tree
Showing 5 changed files with 19 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ private ShardRouting nextRoutingOrNull(Exception failure) {
ShardRouting next = shardsIt.get(shardIndex).nextOrNull();

if (next != null
&& WeightedRoutingHelper.shardInWeighedAwayAZ(next, clusterService.state())
&& WeightedRoutingHelper.shardInWeighedAwayAZ(next.currentNodeId(), clusterService.state())
&& !WeightedRoutingHelper.isInternalFailure(failure)) {
next = shardsIt.get(shardIndex).nextOrNull();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,27 +41,22 @@
import org.opensearch.action.ActionListener;
import org.opensearch.action.NoShardAvailableActionException;
import org.opensearch.action.ShardOperationFailedException;
import org.opensearch.action.UnavailableShardsException;
import org.opensearch.action.support.TransportActions;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.metadata.WeightedRoutingMetadata;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.routing.GroupShardsIterator;
import org.opensearch.cluster.routing.WeightedRouting;
import org.opensearch.cluster.routing.WeightedRoutingHelper;
import org.opensearch.common.Nullable;
import org.opensearch.common.lease.Releasable;
import org.opensearch.common.lease.Releasables;
import org.opensearch.common.util.concurrent.AbstractRunnable;
import org.opensearch.common.util.concurrent.AtomicArray;
import org.opensearch.index.shard.ShardId;
import org.opensearch.node.NodeClosedException;
import org.opensearch.search.SearchPhaseResult;
import org.opensearch.search.SearchShardTarget;
import org.opensearch.search.internal.AliasFilter;
import org.opensearch.search.internal.InternalSearchResponse;
import org.opensearch.search.internal.SearchContext;
import org.opensearch.search.internal.ShardSearchRequest;
import org.opensearch.transport.NodeNotConnectedException;
import org.opensearch.transport.Transport;

import java.util.ArrayDeque;
Expand All @@ -76,7 +71,6 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
import java.util.stream.Stream;

/**
* This is an abstract base class that encapsulates the logic to fan out to all shards in provided {@link GroupShardsIterator}
Expand Down Expand Up @@ -452,46 +446,14 @@ ShardSearchFailure[] buildShardFailures() {
return failures;
}

private boolean isInternalFailure(Exception e) {
final Throwable cause = ExceptionsHelper.unwrapCause(e);
if (e instanceof NoShardAvailableActionException
|| e instanceof UnavailableShardsException
|| e instanceof NodeNotConnectedException
|| e instanceof NodeClosedException) {
return true;
}
return false;
}

private boolean shardInWeighedAwayAZ(SearchShardTarget nextShard) {
DiscoveryNode targetNode = clusterState.nodes().get(nextShard.getNodeId());
WeightedRoutingMetadata weightedRoutingMetadata = clusterState.metadata().weightedRoutingMetadata();
if (weightedRoutingMetadata != null) {
WeightedRouting weightedRouting = weightedRoutingMetadata.getWeightedRouting();
if (weightedRouting != null) {
// Fetch weighted routing attributes with weight set as zero
Stream<String> keys = weightedRouting.weights()
.entrySet()
.stream()
.filter(entry -> entry.getValue().intValue() == 0)
.map(Map.Entry::getKey);
if (keys != null && targetNode.getAttributes().get("zone").equals(keys.findFirst().get())) {
return true;
}
}

}
return false;

}

private void onShardFailure(final int shardIndex, @Nullable SearchShardTarget shard, final SearchShardIterator shardIt, Exception e) {
// we always add the shard failure for a specific shard instance
// we do make sure to clean it on a successful response from a shard
onShardFailure(shardIndex, shard, e);
SearchShardTarget nextShard = shardIt.nextOrNull();

if (nextShard != null && shardInWeighedAwayAZ(nextShard) && !isInternalFailure(e)) {
while (nextShard != null
&& WeightedRoutingHelper.shardInWeighedAwayAZ(nextShard.getNodeId(), clusterState)
&& !WeightedRoutingHelper.isInternalFailure(e)) {
nextShard = shardIt.nextOrNull();
}
final boolean lastShard = nextShard == null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ void onOperation(@Nullable ShardRouting shard, final ShardIterator shardIt, int
ShardRouting nextShard = shardIt.nextOrNull();

if (nextShard != null
&& WeightedRoutingHelper.shardInWeighedAwayAZ(nextShard, clusterState)
&& WeightedRoutingHelper.shardInWeighedAwayAZ(nextShard.currentNodeId(), clusterState)
&& !WeightedRoutingHelper.isInternalFailure(e)) {
nextShard = shardIt.nextOrNull();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ private void perform(@Nullable final Exception currentFailure) {
}
ShardRouting shardRouting = shardIt.nextOrNull();
if (shardRouting != null
&& WeightedRoutingHelper.shardInWeighedAwayAZ(shardRouting, clusterService.state())
&& WeightedRoutingHelper.shardInWeighedAwayAZ(shardRouting.currentNodeId(), clusterService.state())
&& !WeightedRoutingHelper.isInternalFailure(currentFailure)) {
shardRouting = shardIt.nextOrNull();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@

package org.opensearch.cluster.routing;

import org.opensearch.ExceptionsHelper;
import org.opensearch.action.NoShardAvailableActionException;
import org.opensearch.action.UnavailableShardsException;
import org.opensearch.cluster.ClusterState;
Expand All @@ -19,22 +18,21 @@
import java.util.Map;
import java.util.stream.Stream;

/**
* * WeightedRouting helper class
*/

public class WeightedRoutingHelper {

public static boolean isInternalFailure(Exception e) {
final Throwable cause = ExceptionsHelper.unwrapCause(e);
if (e instanceof NoShardAvailableActionException
return e instanceof NoShardAvailableActionException
|| e instanceof UnavailableShardsException
|| e instanceof NodeNotConnectedException) {
return true;
}
return false;
|| e instanceof NodeNotConnectedException;
}

public static boolean shardInWeighedAwayAZ(ShardRouting nextShard, ClusterState clusterState) {
DiscoveryNode targetNode = clusterState.nodes().get(nextShard.currentNodeId());
public static boolean shardInWeighedAwayAZ(String nodeId, ClusterState clusterState) {
DiscoveryNode targetNode = clusterState.nodes().get(nodeId);
WeightedRoutingMetadata weightedRoutingMetadata = clusterState.metadata().weightedRoutingMetadata();

if (weightedRoutingMetadata != null) {
WeightedRouting weightedRouting = weightedRoutingMetadata.getWeightedRouting();
if (weightedRouting != null) {
Expand All @@ -44,13 +42,13 @@ public static boolean shardInWeighedAwayAZ(ShardRouting nextShard, ClusterState
.stream()
.filter(entry -> entry.getValue().intValue() == 0)
.map(Map.Entry::getKey);
if (keys != null && targetNode.getAttributes().get("zone").equals(keys.findFirst().get())) {
return true;
}
return keys != null && targetNode.getAttributes().get("zone").equals(keys.findFirst().get());
}
}

}
return false;

}


}

0 comments on commit 9e69711

Please sign in to comment.