Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Weighted Routing] Add support for discovered master and remove local weights in the response #5680

Merged
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add CI bundle pattern to distribution download ([#5348](https://github.com/opensearch-project/OpenSearch/pull/5348))
- Add support for ppc64le architecture ([#5459](https://github.com/opensearch-project/OpenSearch/pull/5459))
- Added @gbbafna as an OpenSearch maintainer ([#5668](https://github.com/opensearch-project/OpenSearch/pull/5668))
- Add support for discovered cluster manager and remove local weights ([#5680](https://github.
com/opensearch-project/OpenSearch/pull/5680))


### Dependencies
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,32 @@
import org.opensearch.action.admin.cluster.shards.routing.weighted.get.ClusterGetWeightedRoutingResponse;
import org.opensearch.action.admin.cluster.shards.routing.weighted.put.ClusterPutWeightedRoutingResponse;
import org.opensearch.common.settings.Settings;
import org.opensearch.snapshots.mockstore.MockRepository;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.disruption.NetworkDisruption;
import org.opensearch.plugins.Plugin;
import org.opensearch.test.transport.MockTransportService;

import java.io.IOException;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.hamcrest.Matchers.equalTo;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0, minNumDataNodes = 3)
public class WeightedRoutingIT extends OpenSearchIntegTestCase {

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Arrays.asList(MockTransportService.TestPlugin.class, MockRepository.Plugin.class);
}

public void testPutWeightedRouting() {
Settings commonSettings = Settings.builder()
.put("cluster.routing.allocation.awareness.attributes", "zone")
Expand Down Expand Up @@ -157,6 +172,7 @@ public void testGetWeightedRouting_WeightsNotSet() {
.setAwarenessAttribute("zone")
.get();
assertNull(weightedRoutingResponse.weights());
assertNull(weightedRoutingResponse.getDiscoveredClusterManager());
}

public void testGetWeightedRouting_WeightsAreSet() throws IOException {
Expand Down Expand Up @@ -209,8 +225,9 @@ public void testGetWeightedRouting_WeightsAreSet() throws IOException {
.setAwarenessAttribute("zone")
.get();
assertEquals(weightedRouting, weightedRoutingResponse.weights());
assertTrue(weightedRoutingResponse.getDiscoveredClusterManager());

// get api to fetch local node weight for a node in zone a
// get api to fetch local weighted routing for a node in zone a
weightedRoutingResponse = internalCluster().client(randomFrom(nodes_in_zone_a.get(0), nodes_in_zone_a.get(1)))
.admin()
.cluster()
Expand All @@ -219,9 +236,9 @@ public void testGetWeightedRouting_WeightsAreSet() throws IOException {
.setRequestLocal(true)
.get();
assertEquals(weightedRouting, weightedRoutingResponse.weights());
assertEquals("1.0", weightedRoutingResponse.getLocalNodeWeight());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add discovered_cluster_manager assertion as well .

assertTrue(weightedRoutingResponse.getDiscoveredClusterManager());

// get api to fetch local node weight for a node in zone b
// get api to fetch local weighted routing for a node in zone b
weightedRoutingResponse = internalCluster().client(randomFrom(nodes_in_zone_b.get(0), nodes_in_zone_b.get(1)))
.admin()
.cluster()
Expand All @@ -230,9 +247,9 @@ public void testGetWeightedRouting_WeightsAreSet() throws IOException {
.setRequestLocal(true)
.get();
assertEquals(weightedRouting, weightedRoutingResponse.weights());
assertEquals("2.0", weightedRoutingResponse.getLocalNodeWeight());
assertTrue(weightedRoutingResponse.getDiscoveredClusterManager());

// get api to fetch local node weight for a node in zone c
// get api to fetch local weighted routing for a node in zone c
weightedRoutingResponse = internalCluster().client(randomFrom(nodes_in_zone_c.get(0), nodes_in_zone_c.get(1)))
.admin()
.cluster()
Expand All @@ -241,7 +258,81 @@ public void testGetWeightedRouting_WeightsAreSet() throws IOException {
.setRequestLocal(true)
.get();
assertEquals(weightedRouting, weightedRoutingResponse.weights());
assertEquals("3.0", weightedRoutingResponse.getLocalNodeWeight());
assertTrue(weightedRoutingResponse.getDiscoveredClusterManager());

}

public void testGetWeightedRouting_ClusterManagerNotDiscovered() throws Exception {

Settings commonSettings = Settings.builder()
.put("cluster.routing.allocation.awareness.attributes", "zone")
.put("cluster.routing.allocation.awareness.force.zone.values", "a,b,c")
.put("cluster.fault_detection.leader_check.timeout", 10000 + "ms")
.put("cluster.fault_detection.leader_check.retry_count", 1)
.build();

int nodeCountPerAZ = 1;

logger.info("--> starting a dedicated cluster manager node");
String clusterManager = internalCluster().startClusterManagerOnlyNode(Settings.builder().put(commonSettings).build());

logger.info("--> starting 2 nodes on zones 'a' & 'b' & 'c'");
List<String> nodes_in_zone_a = internalCluster().startDataOnlyNodes(
nodeCountPerAZ,
Settings.builder().put(commonSettings).put("node.attr.zone", "a").build()
);
List<String> nodes_in_zone_b = internalCluster().startDataOnlyNodes(
nodeCountPerAZ,
Settings.builder().put(commonSettings).put("node.attr.zone", "b").build()
);
List<String> nodes_in_zone_c = internalCluster().startDataOnlyNodes(
nodeCountPerAZ,
Settings.builder().put(commonSettings).put("node.attr.zone", "c").build()
);

logger.info("--> waiting for nodes to form a cluster");
ClusterHealthResponse health = client().admin().cluster().prepareHealth().setWaitForNodes("4").execute().actionGet();
assertThat(health.isTimedOut(), equalTo(false));

ensureGreen();

logger.info("--> setting shard routing weights for weighted round robin");
Map<String, Double> weights = Map.of("a", 1.0, "b", 2.0, "c", 3.0);
WeightedRouting weightedRouting = new WeightedRouting("zone", weights);
// put api call to set weights
ClusterPutWeightedRoutingResponse response = client().admin()
.cluster()
.prepareWeightedRouting()
.setWeightedRouting(weightedRouting)
.get();
assertEquals(response.isAcknowledged(), true);

Set<String> nodesInOneSide = Stream.of(nodes_in_zone_a.get(0), nodes_in_zone_b.get(0), nodes_in_zone_c.get(0))
.collect(Collectors.toCollection(HashSet::new));
Set<String> nodesInOtherSide = Stream.of(clusterManager).collect(Collectors.toCollection(HashSet::new));

NetworkDisruption networkDisruption = new NetworkDisruption(
new NetworkDisruption.TwoPartitions(nodesInOneSide, nodesInOtherSide),
NetworkDisruption.DISCONNECT
);
internalCluster().setDisruptionScheme(networkDisruption);

logger.info("--> network disruption is started");
networkDisruption.startDisrupting();

// wait for leader checker to fail
Thread.sleep(13000);

// get api to fetch local weighted routing for a node in zone a or b
ClusterGetWeightedRoutingResponse weightedRoutingResponse = internalCluster().client(
randomFrom(nodes_in_zone_a.get(0), nodes_in_zone_b.get(0))
).admin().cluster().prepareGetWeightedRouting().setAwarenessAttribute("zone").setRequestLocal(true).get();
assertEquals(weightedRouting, weightedRoutingResponse.weights());
assertFalse(weightedRoutingResponse.getDiscoveredClusterManager());

logger.info("--> network disruption is stopped");
networkDisruption.stopDisrupting();

}

public void testWeightedRoutingMetadataOnOSProcessRestart() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,26 +31,37 @@
* @opensearch.internal
*/
public class ClusterGetWeightedRoutingResponse extends ActionResponse implements ToXContentObject {
private WeightedRouting weightedRouting;
private String localNodeWeight;
private static final String NODE_WEIGHT = "node_weight";
public WeightedRouting getWeightedRouting() {
return weightedRouting;
}

private final WeightedRouting weightedRouting;

public String getLocalNodeWeight() {
return localNodeWeight;
public Boolean getDiscoveredClusterManager() {
return discoveredClusterManager;
}

private final Boolean discoveredClusterManager;

private static final String DISCOVERED_CLUSTER_MANAGER = "discovered_cluster_manager";

ClusterGetWeightedRoutingResponse() {
this.weightedRouting = null;
this.discoveredClusterManager = null;
}
Comment on lines 48 to 51
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no explicit need to set them null


public ClusterGetWeightedRoutingResponse(String localNodeWeight, WeightedRouting weightedRouting) {
this.localNodeWeight = localNodeWeight;
public ClusterGetWeightedRoutingResponse(WeightedRouting weightedRouting, Boolean discoveredClusterManager) {
this.discoveredClusterManager = discoveredClusterManager;
this.weightedRouting = weightedRouting;
}

ClusterGetWeightedRoutingResponse(StreamInput in) throws IOException {
if (in.available() != 0) {
this.weightedRouting = new WeightedRouting(in);
this.discoveredClusterManager = in.readOptionalBoolean();
} else {
this.weightedRouting = null;
this.discoveredClusterManager = null;
}
}

Expand All @@ -68,6 +79,9 @@ public void writeTo(StreamOutput out) throws IOException {
if (weightedRouting != null) {
weightedRouting.writeTo(out);
}
if (discoveredClusterManager != null) {
out.writeOptionalBoolean(discoveredClusterManager);
}
}

@Override
Expand All @@ -77,8 +91,8 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
for (Map.Entry<String, Double> entry : weightedRouting.weights().entrySet()) {
builder.field(entry.getKey(), entry.getValue().toString());
}
if (localNodeWeight != null) {
builder.field(NODE_WEIGHT, localNodeWeight);
if (discoveredClusterManager != null) {
builder.field(DISCOVERED_CLUSTER_MANAGER, discoveredClusterManager);
}
}
builder.endObject();
Expand All @@ -89,37 +103,37 @@ public static ClusterGetWeightedRoutingResponse fromXContent(XContentParser pars
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser);
XContentParser.Token token;
String attrKey = null, attrValue = null;
String localNodeWeight = null;
Boolean discoveredClusterManager = null;
Map<String, Double> weights = new HashMap<>();

while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
attrKey = parser.currentName();
} else if (token == XContentParser.Token.VALUE_STRING) {
attrValue = parser.text();
if (attrKey != null && attrKey.equals(NODE_WEIGHT)) {
localNodeWeight = attrValue;
} else if (attrKey != null) {
if (attrKey != null) {
weights.put(attrKey, Double.parseDouble(attrValue));
}
} else if (token == XContentParser.Token.VALUE_BOOLEAN && attrKey != null && attrKey.equals(DISCOVERED_CLUSTER_MANAGER)) {
discoveredClusterManager = Boolean.parseBoolean(parser.text());
} else {
throw new OpenSearchParseException("failed to parse weighted routing response");
}
}
WeightedRouting weightedRouting = new WeightedRouting("", weights);
return new ClusterGetWeightedRoutingResponse(localNodeWeight, weightedRouting);
return new ClusterGetWeightedRoutingResponse(weightedRouting, discoveredClusterManager);
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
ClusterGetWeightedRoutingResponse that = (ClusterGetWeightedRoutingResponse) o;
return weightedRouting.equals(that.weightedRouting) && localNodeWeight.equals(that.localNodeWeight);
return weightedRouting.equals(that.weightedRouting) && discoveredClusterManager.equals(that.discoveredClusterManager);
}

@Override
public int hashCode() {
return Objects.hash(weightedRouting, localNodeWeight);
return Objects.hash(weightedRouting, discoveredClusterManager);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;

import org.opensearch.cluster.metadata.WeightedRoutingMetadata;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.routing.WeightedRouting;
import org.opensearch.cluster.routing.WeightedRoutingService;
import org.opensearch.cluster.service.ClusterService;
Expand Down Expand Up @@ -89,19 +88,12 @@ protected void clusterManagerOperation(
weightedRoutingService.verifyAwarenessAttribute(request.getAwarenessAttribute());
WeightedRoutingMetadata weightedRoutingMetadata = state.metadata().custom(WeightedRoutingMetadata.TYPE);
ClusterGetWeightedRoutingResponse clusterGetWeightedRoutingResponse = new ClusterGetWeightedRoutingResponse();
String weight = null;
if (weightedRoutingMetadata != null && weightedRoutingMetadata.getWeightedRouting() != null) {
WeightedRouting weightedRouting = weightedRoutingMetadata.getWeightedRouting();
if (request.local()) {
DiscoveryNode localNode = state.getNodes().getLocalNode();
if (localNode.getAttributes().get(request.getAwarenessAttribute()) != null) {
String attrVal = localNode.getAttributes().get(request.getAwarenessAttribute());
if (weightedRouting.weights().containsKey(attrVal)) {
weight = weightedRouting.weights().get(attrVal).toString();
}
}
}
clusterGetWeightedRoutingResponse = new ClusterGetWeightedRoutingResponse(weight, weightedRouting);
clusterGetWeightedRoutingResponse = new ClusterGetWeightedRoutingResponse(
weightedRouting,
state.nodes().getClusterManagerNodeId() != null
);
}
listener.onResponse(clusterGetWeightedRoutingResponse);
} catch (Exception ex) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ public class ClusterGetWeightedRoutingResponseTests extends AbstractXContentTest
protected ClusterGetWeightedRoutingResponse createTestInstance() {
Map<String, Double> weights = Map.of("zone_A", 1.0, "zone_B", 0.0, "zone_C", 1.0);
WeightedRouting weightedRouting = new WeightedRouting("", weights);
ClusterGetWeightedRoutingResponse response = new ClusterGetWeightedRoutingResponse("1", weightedRouting);
ClusterGetWeightedRoutingResponse response = new ClusterGetWeightedRoutingResponse(weightedRouting, true);
return response;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,6 @@ public void testGetWeightedRouting_WeightsNotSetInMetadata() {
ClusterState state = clusterService.state();

ClusterGetWeightedRoutingResponse response = ActionTestUtils.executeBlocking(transportGetWeightedRoutingAction, request.request());
assertEquals(response.getLocalNodeWeight(), null);
assertEquals(response.weights(), null);
}

Expand Down Expand Up @@ -231,7 +230,8 @@ public void testGetWeightedRoutingLocalWeight_WeightsSetInMetadata() {
ClusterServiceUtils.setState(clusterService, builder);

ClusterGetWeightedRoutingResponse response = ActionTestUtils.executeBlocking(transportGetWeightedRoutingAction, request.request());
assertEquals("0.0", response.getLocalNodeWeight());
assertEquals(true, response.getDiscoveredClusterManager());
assertEquals(weights, response.getWeightedRouting().weights());
}

public void testGetWeightedRoutingLocalWeight_WeightsNotSetInMetadata() {
Expand All @@ -250,7 +250,7 @@ public void testGetWeightedRoutingLocalWeight_WeightsNotSetInMetadata() {
ClusterServiceUtils.setState(clusterService, builder);

ClusterGetWeightedRoutingResponse response = ActionTestUtils.executeBlocking(transportGetWeightedRoutingAction, request.request());
assertEquals(null, response.getLocalNodeWeight());
assertEquals(null, response.getWeightedRouting());
}

@After
Expand Down