Skip to content

Commit

Permalink
Add changes for graceful node decommission (#4586)
Browse files Browse the repository at this point in the history
* Add changes for graceful node decommission

Signed-off-by: pranikum <[email protected]>
  • Loading branch information
pranikum authored Oct 25, 2022
1 parent 8fda187 commit 1a40bd5
Show file tree
Hide file tree
Showing 14 changed files with 374 additions and 45 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
- Add dev help in gradle check CI failures ([4872](https://github.com/opensearch-project/OpenSearch/pull/4872))
- Copy `build.sh` over from opensearch-build ([#4887](https://github.com/opensearch-project/OpenSearch/pull/4887))
- Add project health badges to the README.md ([#4843](https://github.com/opensearch-project/OpenSearch/pull/4843))

- Added changes for graceful node decommission ([#4586](https://github.com/opensearch-project/OpenSearch/pull/4586))
### Dependencies
- Bumps `log4j-core` from 2.18.0 to 2.19.0
- Bumps `reactor-netty-http` from 1.0.18 to 1.0.23
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,38 +120,53 @@ public void testDecommissionStatusUpdatePublishedToAllNodes() throws ExecutionEx

logger.info("--> starting decommissioning nodes in zone {}", 'c');
DecommissionAttribute decommissionAttribute = new DecommissionAttribute("zone", "c");
// Set the timeout to 0 to do immediate Decommission
DecommissionRequest decommissionRequest = new DecommissionRequest(decommissionAttribute);
decommissionRequest.setNoDelay(true);
DecommissionResponse decommissionResponse = client().execute(DecommissionAction.INSTANCE, decommissionRequest).get();
assertTrue(decommissionResponse.isAcknowledged());

logger.info("--> Received decommissioning nodes in zone {}", 'c');
// Keep some delay for scheduler to invoke decommission flow
Thread.sleep(500);

// Will wait for all events to complete
client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).get();

logger.info("--> Received LANGUID event");

// assert that decommission status is successful
GetDecommissionStateResponse response = client().execute(
GetDecommissionStateResponse response = client(clusterManagerNodes.get(0)).execute(
GetDecommissionStateAction.INSTANCE,
new GetDecommissionStateRequest(decommissionAttribute.attributeName())
).get();
assertEquals(response.getAttributeValue(), decommissionAttribute.attributeValue());
assertEquals(response.getDecommissionStatus(), DecommissionStatus.SUCCESSFUL);
assertEquals(DecommissionStatus.SUCCESSFUL, response.getDecommissionStatus());

logger.info("--> Decommission status is successful");
ClusterState clusterState = client(clusterManagerNodes.get(0)).admin().cluster().prepareState().execute().actionGet().getState();
assertEquals(4, clusterState.nodes().getSize());

logger.info("--> Got cluster state with 4 nodes.");
// assert status on nodes that are part of cluster currently
Iterator<DiscoveryNode> discoveryNodeIterator = clusterState.nodes().getNodes().valuesIt();
DiscoveryNode clusterManagerNodeAfterDecommission = null;
while (discoveryNodeIterator.hasNext()) {
// assert no node has decommissioned attribute
DiscoveryNode node = discoveryNodeIterator.next();
assertNotEquals(node.getAttributes().get("zone"), "c");

if (node.isClusterManagerNode()) {
clusterManagerNodeAfterDecommission = node;
}
// assert all the nodes has status as SUCCESSFUL
ClusterService localNodeClusterService = internalCluster().getInstance(ClusterService.class, node.getName());
assertEquals(
localNodeClusterService.state().metadata().decommissionAttributeMetadata().status(),
DecommissionStatus.SUCCESSFUL
);
}
assertNotNull("Cluster Manager not found after decommission", clusterManagerNodeAfterDecommission);
logger.info("--> Cluster Manager node found after decommission");

// assert status on decommissioned node
// Here we will verify that until it got kicked out, it received appropriate status updates
Expand All @@ -163,16 +178,18 @@ public void testDecommissionStatusUpdatePublishedToAllNodes() throws ExecutionEx
decommissionedNodeClusterService.state().metadata().decommissionAttributeMetadata().status(),
DecommissionStatus.IN_PROGRESS
);
logger.info("--> Verified the decommissioned node Has in progress state.");

// Will wait for all events to complete
client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).get();

client(clusterManagerNodeAfterDecommission.getName()).admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).get();
logger.info("--> Got LANGUID event");
// Recommissioning the zone back to gracefully succeed the test once above tests succeeds
DeleteDecommissionStateResponse deleteDecommissionStateResponse = client(clusterManagerNodes.get(0)).execute(
DeleteDecommissionStateResponse deleteDecommissionStateResponse = client(clusterManagerNodeAfterDecommission.getName()).execute(
DeleteDecommissionStateAction.INSTANCE,
new DeleteDecommissionStateRequest()
).get();
assertTrue(deleteDecommissionStateResponse.isAcknowledged());
logger.info("--> Deleting decommission done.");

// will wait for cluster to stabilise with a timeout of 2 min (findPeerInterval for decommissioned nodes)
// as by then all nodes should have joined the cluster
Expand Down Expand Up @@ -201,6 +218,7 @@ public void testDecommissionFailedWhenAttributeNotWeighedAway() throws Exception

DecommissionAttribute decommissionAttribute = new DecommissionAttribute("zone", "c");
DecommissionRequest decommissionRequest = new DecommissionRequest(decommissionAttribute);
decommissionRequest.setNoDelay(true);
assertBusy(() -> {
DecommissioningFailedException ex = expectThrows(
DecommissioningFailedException.class,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.opensearch.common.Strings;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
import org.opensearch.common.unit.TimeValue;

import java.io.IOException;

Expand All @@ -28,8 +29,15 @@
*/
public class DecommissionRequest extends ClusterManagerNodeRequest<DecommissionRequest> {

public static final TimeValue DEFAULT_NODE_DRAINING_TIMEOUT = TimeValue.timeValueSeconds(120);

private DecommissionAttribute decommissionAttribute;

private TimeValue delayTimeout = DEFAULT_NODE_DRAINING_TIMEOUT;

// holder for no_delay param. To avoid draining time timeout.
private boolean noDelay = false;

public DecommissionRequest() {}

public DecommissionRequest(DecommissionAttribute decommissionAttribute) {
Expand All @@ -39,12 +47,14 @@ public DecommissionRequest(DecommissionAttribute decommissionAttribute) {
public DecommissionRequest(StreamInput in) throws IOException {
super(in);
decommissionAttribute = new DecommissionAttribute(in);
this.delayTimeout = in.readTimeValue();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
decommissionAttribute.writeTo(out);
out.writeTimeValue(delayTimeout);
}

/**
Expand All @@ -65,6 +75,19 @@ public DecommissionAttribute getDecommissionAttribute() {
return this.decommissionAttribute;
}

public TimeValue getDelayTimeout() {
return this.delayTimeout;
}

public void setNoDelay(boolean noDelay) {
this.delayTimeout = TimeValue.ZERO;
this.noDelay = noDelay;
}

public boolean isNoDelay() {
return noDelay;
}

@Override
public ActionRequestValidationException validate() {
ActionRequestValidationException validationException = null;
Expand All @@ -74,6 +97,14 @@ public ActionRequestValidationException validate() {
if (decommissionAttribute.attributeValue() == null || Strings.isEmpty(decommissionAttribute.attributeValue())) {
validationException = addValidationError("attribute value is missing", validationException);
}
// This validation should not fail since we are not allowing delay timeout to be set externally.
// Still keeping it for double check.
if (noDelay && delayTimeout.getSeconds() > 0) {
final String validationMessage = "Invalid decommission request. no_delay is true and delay_timeout is set to "
+ delayTimeout.getSeconds()
+ "] Seconds";
validationException = addValidationError(validationMessage, validationException);
}
return validationException;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,6 @@ protected ClusterBlockException checkBlock(DecommissionRequest request, ClusterS
protected void clusterManagerOperation(DecommissionRequest request, ClusterState state, ActionListener<DecommissionResponse> listener)
throws Exception {
logger.info("starting awareness attribute [{}] decommissioning", request.getDecommissionAttribute().toString());
decommissionService.startDecommissionAction(request.getDecommissionAttribute(), listener);
decommissionService.startDecommissionAction(request, listener);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.io.IOException;
import java.util.EnumSet;
import java.util.Objects;
import java.util.Set;

/**
* Contains metadata about decommission attribute
Expand Down Expand Up @@ -88,11 +89,14 @@ public synchronized void validateNewStatus(DecommissionStatus newStatus) {
}
// We don't expect that INIT will be new status, as it is registered only when starting the decommission action
switch (newStatus) {
case DRAINING:
validateStatus(Set.of(DecommissionStatus.INIT), newStatus);
break;
case IN_PROGRESS:
validateStatus(DecommissionStatus.INIT, newStatus);
validateStatus(Set.of(DecommissionStatus.DRAINING, DecommissionStatus.INIT), newStatus);
break;
case SUCCESSFUL:
validateStatus(DecommissionStatus.IN_PROGRESS, newStatus);
validateStatus(Set.of(DecommissionStatus.IN_PROGRESS), newStatus);
break;
default:
throw new IllegalArgumentException(
Expand All @@ -101,17 +105,17 @@ public synchronized void validateNewStatus(DecommissionStatus newStatus) {
}
}

private void validateStatus(DecommissionStatus expected, DecommissionStatus next) {
if (status.equals(expected) == false) {
private void validateStatus(Set<DecommissionStatus> expectedStatuses, DecommissionStatus next) {
if (expectedStatuses.contains(status) == false) {
assert false : "can't move decommission status to ["
+ next
+ "]. current status: ["
+ status
+ "] (expected ["
+ expected
+ "] (allowed statuses ["
+ expectedStatuses
+ "])";
throw new IllegalStateException(
"can't move decommission status to [" + next + "]. current status: [" + status + "] (expected [" + expected + "])"
"can't move decommission status to [" + next + "]. current status: [" + status + "] (expected [" + expectedStatuses + "])"
);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@
import org.opensearch.action.admin.cluster.configuration.ClearVotingConfigExclusionsAction;
import org.opensearch.action.admin.cluster.configuration.ClearVotingConfigExclusionsRequest;
import org.opensearch.action.admin.cluster.configuration.ClearVotingConfigExclusionsResponse;
import org.opensearch.action.admin.cluster.node.stats.NodeStats;
import org.opensearch.action.admin.cluster.node.stats.NodesStatsAction;
import org.opensearch.action.admin.cluster.node.stats.NodesStatsRequest;
import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.ClusterStateObserver;
import org.opensearch.cluster.ClusterStateTaskConfig;
Expand All @@ -32,14 +36,17 @@
import org.opensearch.common.Strings;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.http.HttpStats;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportException;
import org.opensearch.transport.TransportResponseHandler;
import org.opensearch.transport.TransportService;

import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Predicate;
Expand Down Expand Up @@ -271,4 +278,61 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
}
});
}

private void logActiveConnections(NodesStatsResponse nodesStatsResponse) {
if (nodesStatsResponse == null || nodesStatsResponse.getNodes() == null) {
logger.info("Node stats response received is null/empty.");
return;
}

Map<String, Long> nodeActiveConnectionMap = new HashMap<>();
List<NodeStats> responseNodes = nodesStatsResponse.getNodes();
for (int i = 0; i < responseNodes.size(); i++) {
HttpStats httpStats = responseNodes.get(i).getHttp();
DiscoveryNode node = responseNodes.get(i).getNode();
nodeActiveConnectionMap.put(node.getId(), httpStats.getServerOpen());
}
logger.info("Decommissioning node with connections : [{}]", nodeActiveConnectionMap);
}

void getActiveRequestCountOnDecommissionedNodes(Set<DiscoveryNode> decommissionedNodes) {
if (decommissionedNodes == null || decommissionedNodes.isEmpty()) {
return;
}
String[] nodes = decommissionedNodes.stream().map(DiscoveryNode::getId).toArray(String[]::new);
if (nodes.length == 0) {
return;
}

final NodesStatsRequest nodesStatsRequest = new NodesStatsRequest(nodes);
nodesStatsRequest.clear();
nodesStatsRequest.addMetric(NodesStatsRequest.Metric.HTTP.metricName());

transportService.sendRequest(
transportService.getLocalNode(),
NodesStatsAction.NAME,
nodesStatsRequest,
new TransportResponseHandler<NodesStatsResponse>() {
@Override
public void handleResponse(NodesStatsResponse response) {
logActiveConnections(response);
}

@Override
public void handleException(TransportException exp) {
logger.error("Failure occurred while dumping connection for decommission nodes - ", exp.unwrapCause());
}

@Override
public String executor() {
return ThreadPool.Names.SAME;
}

@Override
public NodesStatsResponse read(StreamInput in) throws IOException {
return new NodesStatsResponse(in);
}
}
);
}
}
Loading

0 comments on commit 1a40bd5

Please sign in to comment.