Skip to content

Commit

Permalink
[Weighted Routing] Add support for discovered master and remove local…
Browse files Browse the repository at this point in the history
… weights in the response (opensearch-project#5680)

* Add support for discovered master and remove local weights in the weighted routing API response

Signed-off-by: Anshu Agarwal <[email protected]>
  • Loading branch information
anshu1106 authored and Anshu Agarwal committed Jan 10, 2023
1 parent a3d57c9 commit e7f8392
Show file tree
Hide file tree
Showing 6 changed files with 138 additions and 38 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Added @gbbafna as an OpenSearch maintainer ([#5668](https://github.com/opensearch-project/OpenSearch/pull/5668))
- Experimental support for extended backward compatiblity in searchable snapshots ([#5429](https://github.com/opensearch-project/OpenSearch/pull/5429))
- Added Request level Durability using Remote Translog functionality ([#5757](https://github.com/opensearch-project/OpenSearch/pull/5757))
- Add support for discovered cluster manager and remove local weights ([#5680](https://github.com/opensearch-project/OpenSearch/pull/5680))
- Added support for feature flags in opensearch.yml ([#4959](https://github.com/opensearch-project/OpenSearch/pull/4959))
- Add query for initialized extensions ([#5658](https://github.com/opensearch-project/OpenSearch/pull/5658))

### Dependencies
- Bump bcpg-fips from 1.0.5.1 to 1.0.7.1 ([#5148](https://github.com/opensearch-project/OpenSearch/pull/5148))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,32 @@
import org.opensearch.action.admin.cluster.shards.routing.weighted.get.ClusterGetWeightedRoutingResponse;
import org.opensearch.action.admin.cluster.shards.routing.weighted.put.ClusterPutWeightedRoutingResponse;
import org.opensearch.common.settings.Settings;
import org.opensearch.snapshots.mockstore.MockRepository;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.disruption.NetworkDisruption;
import org.opensearch.plugins.Plugin;
import org.opensearch.test.transport.MockTransportService;

import java.io.IOException;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.hamcrest.Matchers.equalTo;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0, minNumDataNodes = 3)
public class WeightedRoutingIT extends OpenSearchIntegTestCase {

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Arrays.asList(MockTransportService.TestPlugin.class, MockRepository.Plugin.class);
}

public void testPutWeightedRouting() {
Settings commonSettings = Settings.builder()
.put("cluster.routing.allocation.awareness.attributes", "zone")
Expand Down Expand Up @@ -157,6 +172,7 @@ public void testGetWeightedRouting_WeightsNotSet() {
.setAwarenessAttribute("zone")
.get();
assertNull(weightedRoutingResponse.weights());
assertNull(weightedRoutingResponse.getDiscoveredClusterManager());
}

public void testGetWeightedRouting_WeightsAreSet() throws IOException {
Expand Down Expand Up @@ -209,8 +225,9 @@ public void testGetWeightedRouting_WeightsAreSet() throws IOException {
.setAwarenessAttribute("zone")
.get();
assertEquals(weightedRouting, weightedRoutingResponse.weights());
assertTrue(weightedRoutingResponse.getDiscoveredClusterManager());

// get api to fetch local node weight for a node in zone a
// get api to fetch local weighted routing for a node in zone a
weightedRoutingResponse = internalCluster().client(randomFrom(nodes_in_zone_a.get(0), nodes_in_zone_a.get(1)))
.admin()
.cluster()
Expand All @@ -219,9 +236,9 @@ public void testGetWeightedRouting_WeightsAreSet() throws IOException {
.setRequestLocal(true)
.get();
assertEquals(weightedRouting, weightedRoutingResponse.weights());
assertEquals("1.0", weightedRoutingResponse.getLocalNodeWeight());
assertTrue(weightedRoutingResponse.getDiscoveredClusterManager());

// get api to fetch local node weight for a node in zone b
// get api to fetch local weighted routing for a node in zone b
weightedRoutingResponse = internalCluster().client(randomFrom(nodes_in_zone_b.get(0), nodes_in_zone_b.get(1)))
.admin()
.cluster()
Expand All @@ -230,9 +247,9 @@ public void testGetWeightedRouting_WeightsAreSet() throws IOException {
.setRequestLocal(true)
.get();
assertEquals(weightedRouting, weightedRoutingResponse.weights());
assertEquals("2.0", weightedRoutingResponse.getLocalNodeWeight());
assertTrue(weightedRoutingResponse.getDiscoveredClusterManager());

// get api to fetch local node weight for a node in zone c
// get api to fetch local weighted routing for a node in zone c
weightedRoutingResponse = internalCluster().client(randomFrom(nodes_in_zone_c.get(0), nodes_in_zone_c.get(1)))
.admin()
.cluster()
Expand All @@ -241,7 +258,81 @@ public void testGetWeightedRouting_WeightsAreSet() throws IOException {
.setRequestLocal(true)
.get();
assertEquals(weightedRouting, weightedRoutingResponse.weights());
assertEquals("3.0", weightedRoutingResponse.getLocalNodeWeight());
assertTrue(weightedRoutingResponse.getDiscoveredClusterManager());

}

public void testGetWeightedRouting_ClusterManagerNotDiscovered() throws Exception {

Settings commonSettings = Settings.builder()
.put("cluster.routing.allocation.awareness.attributes", "zone")
.put("cluster.routing.allocation.awareness.force.zone.values", "a,b,c")
.put("cluster.fault_detection.leader_check.timeout", 10000 + "ms")
.put("cluster.fault_detection.leader_check.retry_count", 1)
.build();

int nodeCountPerAZ = 1;

logger.info("--> starting a dedicated cluster manager node");
String clusterManager = internalCluster().startClusterManagerOnlyNode(Settings.builder().put(commonSettings).build());

logger.info("--> starting 2 nodes on zones 'a' & 'b' & 'c'");
List<String> nodes_in_zone_a = internalCluster().startDataOnlyNodes(
nodeCountPerAZ,
Settings.builder().put(commonSettings).put("node.attr.zone", "a").build()
);
List<String> nodes_in_zone_b = internalCluster().startDataOnlyNodes(
nodeCountPerAZ,
Settings.builder().put(commonSettings).put("node.attr.zone", "b").build()
);
List<String> nodes_in_zone_c = internalCluster().startDataOnlyNodes(
nodeCountPerAZ,
Settings.builder().put(commonSettings).put("node.attr.zone", "c").build()
);

logger.info("--> waiting for nodes to form a cluster");
ClusterHealthResponse health = client().admin().cluster().prepareHealth().setWaitForNodes("4").execute().actionGet();
assertThat(health.isTimedOut(), equalTo(false));

ensureGreen();

logger.info("--> setting shard routing weights for weighted round robin");
Map<String, Double> weights = Map.of("a", 1.0, "b", 2.0, "c", 3.0);
WeightedRouting weightedRouting = new WeightedRouting("zone", weights);
// put api call to set weights
ClusterPutWeightedRoutingResponse response = client().admin()
.cluster()
.prepareWeightedRouting()
.setWeightedRouting(weightedRouting)
.get();
assertEquals(response.isAcknowledged(), true);

Set<String> nodesInOneSide = Stream.of(nodes_in_zone_a.get(0), nodes_in_zone_b.get(0), nodes_in_zone_c.get(0))
.collect(Collectors.toCollection(HashSet::new));
Set<String> nodesInOtherSide = Stream.of(clusterManager).collect(Collectors.toCollection(HashSet::new));

NetworkDisruption networkDisruption = new NetworkDisruption(
new NetworkDisruption.TwoPartitions(nodesInOneSide, nodesInOtherSide),
NetworkDisruption.DISCONNECT
);
internalCluster().setDisruptionScheme(networkDisruption);

logger.info("--> network disruption is started");
networkDisruption.startDisrupting();

// wait for leader checker to fail
Thread.sleep(13000);

// get api to fetch local weighted routing for a node in zone a or b
ClusterGetWeightedRoutingResponse weightedRoutingResponse = internalCluster().client(
randomFrom(nodes_in_zone_a.get(0), nodes_in_zone_b.get(0))
).admin().cluster().prepareGetWeightedRouting().setAwarenessAttribute("zone").setRequestLocal(true).get();
assertEquals(weightedRouting, weightedRoutingResponse.weights());
assertFalse(weightedRoutingResponse.getDiscoveredClusterManager());

logger.info("--> network disruption is stopped");
networkDisruption.stopDisrupting();

}

public void testWeightedRoutingMetadataOnOSProcessRestart() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,26 +31,37 @@
* @opensearch.internal
*/
public class ClusterGetWeightedRoutingResponse extends ActionResponse implements ToXContentObject {
private WeightedRouting weightedRouting;
private String localNodeWeight;
private static final String NODE_WEIGHT = "node_weight";
public WeightedRouting getWeightedRouting() {
return weightedRouting;
}

private final WeightedRouting weightedRouting;

public String getLocalNodeWeight() {
return localNodeWeight;
public Boolean getDiscoveredClusterManager() {
return discoveredClusterManager;
}

private final Boolean discoveredClusterManager;

private static final String DISCOVERED_CLUSTER_MANAGER = "discovered_cluster_manager";

ClusterGetWeightedRoutingResponse() {
this.weightedRouting = null;
this.discoveredClusterManager = null;
}

public ClusterGetWeightedRoutingResponse(String localNodeWeight, WeightedRouting weightedRouting) {
this.localNodeWeight = localNodeWeight;
public ClusterGetWeightedRoutingResponse(WeightedRouting weightedRouting, Boolean discoveredClusterManager) {
this.discoveredClusterManager = discoveredClusterManager;
this.weightedRouting = weightedRouting;
}

ClusterGetWeightedRoutingResponse(StreamInput in) throws IOException {
if (in.available() != 0) {
this.weightedRouting = new WeightedRouting(in);
this.discoveredClusterManager = in.readOptionalBoolean();
} else {
this.weightedRouting = null;
this.discoveredClusterManager = null;
}
}

Expand All @@ -68,6 +79,9 @@ public void writeTo(StreamOutput out) throws IOException {
if (weightedRouting != null) {
weightedRouting.writeTo(out);
}
if (discoveredClusterManager != null) {
out.writeOptionalBoolean(discoveredClusterManager);
}
}

@Override
Expand All @@ -77,8 +91,8 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
for (Map.Entry<String, Double> entry : weightedRouting.weights().entrySet()) {
builder.field(entry.getKey(), entry.getValue().toString());
}
if (localNodeWeight != null) {
builder.field(NODE_WEIGHT, localNodeWeight);
if (discoveredClusterManager != null) {
builder.field(DISCOVERED_CLUSTER_MANAGER, discoveredClusterManager);
}
}
builder.endObject();
Expand All @@ -89,37 +103,37 @@ public static ClusterGetWeightedRoutingResponse fromXContent(XContentParser pars
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser);
XContentParser.Token token;
String attrKey = null, attrValue = null;
String localNodeWeight = null;
Boolean discoveredClusterManager = null;
Map<String, Double> weights = new HashMap<>();

while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
attrKey = parser.currentName();
} else if (token == XContentParser.Token.VALUE_STRING) {
attrValue = parser.text();
if (attrKey != null && attrKey.equals(NODE_WEIGHT)) {
localNodeWeight = attrValue;
} else if (attrKey != null) {
if (attrKey != null) {
weights.put(attrKey, Double.parseDouble(attrValue));
}
} else if (token == XContentParser.Token.VALUE_BOOLEAN && attrKey != null && attrKey.equals(DISCOVERED_CLUSTER_MANAGER)) {
discoveredClusterManager = Boolean.parseBoolean(parser.text());
} else {
throw new OpenSearchParseException("failed to parse weighted routing response");
}
}
WeightedRouting weightedRouting = new WeightedRouting("", weights);
return new ClusterGetWeightedRoutingResponse(localNodeWeight, weightedRouting);
return new ClusterGetWeightedRoutingResponse(weightedRouting, discoveredClusterManager);
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
ClusterGetWeightedRoutingResponse that = (ClusterGetWeightedRoutingResponse) o;
return weightedRouting.equals(that.weightedRouting) && localNodeWeight.equals(that.localNodeWeight);
return weightedRouting.equals(that.weightedRouting) && discoveredClusterManager.equals(that.discoveredClusterManager);
}

@Override
public int hashCode() {
return Objects.hash(weightedRouting, localNodeWeight);
return Objects.hash(weightedRouting, discoveredClusterManager);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;

import org.opensearch.cluster.metadata.WeightedRoutingMetadata;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.routing.WeightedRouting;
import org.opensearch.cluster.routing.WeightedRoutingService;
import org.opensearch.cluster.service.ClusterService;
Expand Down Expand Up @@ -89,19 +88,12 @@ protected void clusterManagerOperation(
weightedRoutingService.verifyAwarenessAttribute(request.getAwarenessAttribute());
WeightedRoutingMetadata weightedRoutingMetadata = state.metadata().custom(WeightedRoutingMetadata.TYPE);
ClusterGetWeightedRoutingResponse clusterGetWeightedRoutingResponse = new ClusterGetWeightedRoutingResponse();
String weight = null;
if (weightedRoutingMetadata != null && weightedRoutingMetadata.getWeightedRouting() != null) {
WeightedRouting weightedRouting = weightedRoutingMetadata.getWeightedRouting();
if (request.local()) {
DiscoveryNode localNode = state.getNodes().getLocalNode();
if (localNode.getAttributes().get(request.getAwarenessAttribute()) != null) {
String attrVal = localNode.getAttributes().get(request.getAwarenessAttribute());
if (weightedRouting.weights().containsKey(attrVal)) {
weight = weightedRouting.weights().get(attrVal).toString();
}
}
}
clusterGetWeightedRoutingResponse = new ClusterGetWeightedRoutingResponse(weight, weightedRouting);
clusterGetWeightedRoutingResponse = new ClusterGetWeightedRoutingResponse(
weightedRouting,
state.nodes().getClusterManagerNodeId() != null
);
}
listener.onResponse(clusterGetWeightedRoutingResponse);
} catch (Exception ex) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ public class ClusterGetWeightedRoutingResponseTests extends AbstractXContentTest
protected ClusterGetWeightedRoutingResponse createTestInstance() {
Map<String, Double> weights = Map.of("zone_A", 1.0, "zone_B", 0.0, "zone_C", 1.0);
WeightedRouting weightedRouting = new WeightedRouting("", weights);
ClusterGetWeightedRoutingResponse response = new ClusterGetWeightedRoutingResponse("1", weightedRouting);
ClusterGetWeightedRoutingResponse response = new ClusterGetWeightedRoutingResponse(weightedRouting, true);
return response;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,6 @@ public void testGetWeightedRouting_WeightsNotSetInMetadata() {
ClusterState state = clusterService.state();

ClusterGetWeightedRoutingResponse response = ActionTestUtils.executeBlocking(transportGetWeightedRoutingAction, request.request());
assertEquals(response.getLocalNodeWeight(), null);
assertEquals(response.weights(), null);
}

Expand Down Expand Up @@ -231,7 +230,8 @@ public void testGetWeightedRoutingLocalWeight_WeightsSetInMetadata() {
ClusterServiceUtils.setState(clusterService, builder);

ClusterGetWeightedRoutingResponse response = ActionTestUtils.executeBlocking(transportGetWeightedRoutingAction, request.request());
assertEquals("0.0", response.getLocalNodeWeight());
assertEquals(true, response.getDiscoveredClusterManager());
assertEquals(weights, response.getWeightedRouting().weights());
}

public void testGetWeightedRoutingLocalWeight_WeightsNotSetInMetadata() {
Expand All @@ -250,7 +250,7 @@ public void testGetWeightedRoutingLocalWeight_WeightsNotSetInMetadata() {
ClusterServiceUtils.setState(clusterService, builder);

ClusterGetWeightedRoutingResponse response = ActionTestUtils.executeBlocking(transportGetWeightedRoutingAction, request.request());
assertEquals(null, response.getLocalNodeWeight());
assertEquals(null, response.getWeightedRouting());
}

@After
Expand Down

0 comments on commit e7f8392

Please sign in to comment.