Skip to content

Commit

Permalink
Fail weight update when decommission ongoing and fail decommission wh…
Browse files Browse the repository at this point in the history
…en attribute not weighed away (#4839)

* Add checks for decommission before setting weights

Signed-off-by: Rishab Nahata <[email protected]>
  • Loading branch information
imRishN authored Oct 20, 2022
1 parent 3af46ae commit 3344738
Show file tree
Hide file tree
Showing 6 changed files with 263 additions and 0 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
- Refactor Base Action class javadocs to OpenSearch.API ([#4732](https://github.com/opensearch-project/OpenSearch/pull/4732))
- Migrate client transports to Apache HttpClient / Core 5.x ([#4459](https://github.com/opensearch-project/OpenSearch/pull/4459))
- Refactored BalancedAllocator.Balancer to LocalShardsBalancer ([#4761](https://github.com/opensearch-project/OpenSearch/pull/4761))
- Fail weight update when decommission ongoing and fail decommission when attribute not weighed away ([#4839](https://github.com/opensearch-project/OpenSearch/pull/4839))
### Deprecated
### Removed
- Remove deprecated code to add node name into log pattern of log4j property file ([#4568](https://github.com/opensearch-project/OpenSearch/pull/4568))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,15 @@
import org.opensearch.action.admin.cluster.decommission.awareness.put.DecommissionAction;
import org.opensearch.action.admin.cluster.decommission.awareness.put.DecommissionRequest;
import org.opensearch.action.admin.cluster.decommission.awareness.put.DecommissionResponse;
import org.opensearch.action.admin.cluster.health.ClusterHealthResponse;
import org.opensearch.action.admin.cluster.shards.routing.weighted.put.ClusterPutWeightedRoutingResponse;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.decommission.DecommissionAttribute;
import org.opensearch.cluster.decommission.DecommissionStatus;
import org.opensearch.cluster.decommission.DecommissioningFailedException;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.node.DiscoveryNodeRole;
import org.opensearch.cluster.routing.WeightedRouting;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.Priority;
import org.opensearch.common.settings.Settings;
Expand All @@ -37,6 +41,7 @@
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;

import static org.opensearch.test.NodeRoles.onlyRole;
Expand Down Expand Up @@ -102,6 +107,17 @@ public void testDecommissionStatusUpdatePublishedToAllNodes() throws ExecutionEx

ensureStableCluster(6);

logger.info("--> setting shard routing weights for weighted round robin");
Map<String, Double> weights = Map.of("a", 1.0, "b", 1.0, "c", 0.0);
WeightedRouting weightedRouting = new WeightedRouting("zone", weights);

ClusterPutWeightedRoutingResponse weightedRoutingResponse = client().admin()
.cluster()
.prepareWeightedRouting()
.setWeightedRouting(weightedRouting)
.get();
assertTrue(weightedRoutingResponse.isAcknowledged());

logger.info("--> starting decommissioning nodes in zone {}", 'c');
DecommissionAttribute decommissionAttribute = new DecommissionAttribute("zone", "c");
DecommissionRequest decommissionRequest = new DecommissionRequest(decommissionAttribute);
Expand Down Expand Up @@ -162,4 +178,57 @@ public void testDecommissionStatusUpdatePublishedToAllNodes() throws ExecutionEx
// as by then all nodes should have joined the cluster
ensureStableCluster(6, TimeValue.timeValueMinutes(2));
}

public void testDecommissionFailedWhenAttributeNotWeighedAway() throws Exception {
Settings commonSettings = Settings.builder()
.put("cluster.routing.allocation.awareness.attributes", "zone")
.put("cluster.routing.allocation.awareness.force.zone.values", "a,b,c")
.build();
// Start 3 cluster manager eligible nodes
internalCluster().startClusterManagerOnlyNodes(3, Settings.builder().put(commonSettings).build());
// start 3 data nodes
internalCluster().startDataOnlyNodes(3, Settings.builder().put(commonSettings).build());
ensureStableCluster(6);
ClusterHealthResponse health = client().admin()
.cluster()
.prepareHealth()
.setWaitForEvents(Priority.LANGUID)
.setWaitForGreenStatus()
.setWaitForNodes(Integer.toString(6))
.execute()
.actionGet();
assertFalse(health.isTimedOut());

DecommissionAttribute decommissionAttribute = new DecommissionAttribute("zone", "c");
DecommissionRequest decommissionRequest = new DecommissionRequest(decommissionAttribute);
assertBusy(() -> {
DecommissioningFailedException ex = expectThrows(
DecommissioningFailedException.class,
() -> client().execute(DecommissionAction.INSTANCE, decommissionRequest).actionGet()
);
assertTrue(
ex.getMessage()
.contains("no weights are set to the attribute. Please set appropriate weights before triggering decommission action")
);
});

logger.info("--> setting shard routing weights for weighted round robin");
Map<String, Double> weights = Map.of("a", 1.0, "b", 1.0, "c", 1.0);
WeightedRouting weightedRouting = new WeightedRouting("zone", weights);

ClusterPutWeightedRoutingResponse weightedRoutingResponse = client().admin()
.cluster()
.prepareWeightedRouting()
.setWeightedRouting(weightedRouting)
.get();
assertTrue(weightedRoutingResponse.isAcknowledged());

assertBusy(() -> {
DecommissioningFailedException ex = expectThrows(
DecommissioningFailedException.class,
() -> client().execute(DecommissionAction.INSTANCE, decommissionRequest).actionGet()
);
assertTrue(ex.getMessage().contains("weight for decommissioned attribute is expected to be [0.0] but found [1.0]"));
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@
import org.opensearch.cluster.ClusterStateUpdateTask;
import org.opensearch.cluster.NotClusterManagerException;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.metadata.WeightedRoutingMetadata;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.routing.WeightedRouting;
import org.opensearch.cluster.routing.allocation.AllocationService;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.Priority;
Expand Down Expand Up @@ -129,6 +131,8 @@ public ClusterState execute(ClusterState currentState) {
DecommissionAttributeMetadata decommissionAttributeMetadata = currentState.metadata().decommissionAttributeMetadata();
// check that request is eligible to proceed
ensureEligibleRequest(decommissionAttributeMetadata, decommissionAttribute);
// ensure attribute is weighed away
ensureToBeDecommissionedAttributeWeighedAway(currentState, decommissionAttribute);
decommissionAttributeMetadata = new DecommissionAttributeMetadata(decommissionAttribute);
logger.info("registering decommission metadata [{}] to execute action", decommissionAttributeMetadata.toString());
return ClusterState.builder(currentState)
Expand Down Expand Up @@ -413,6 +417,30 @@ private static void validateAwarenessAttribute(
}
}

private static void ensureToBeDecommissionedAttributeWeighedAway(ClusterState state, DecommissionAttribute decommissionAttribute) {
WeightedRoutingMetadata weightedRoutingMetadata = state.metadata().weightedRoutingMetadata();
if (weightedRoutingMetadata == null) {
throw new DecommissioningFailedException(
decommissionAttribute,
"no weights are set to the attribute. Please set appropriate weights before triggering decommission action"
);
}
WeightedRouting weightedRouting = weightedRoutingMetadata.getWeightedRouting();
if (weightedRouting.attributeName().equals(decommissionAttribute.attributeName()) == false) {
throw new DecommissioningFailedException(
decommissionAttribute,
"no weights are specified to attribute [" + decommissionAttribute.attributeName() + "]"
);
}
Double attributeValueWeight = weightedRouting.weights().get(decommissionAttribute.attributeValue());
if (attributeValueWeight == null || attributeValueWeight.equals(0.0) == false) {
throw new DecommissioningFailedException(
decommissionAttribute,
"weight for decommissioned attribute is expected to be [0.0] but found [" + attributeValueWeight + "]"
);
}
}

private static void ensureEligibleRequest(
DecommissionAttributeMetadata decommissionAttributeMetadata,
DecommissionAttribute requestedDecommissionAttribute
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.ClusterStateUpdateTask;
import org.opensearch.cluster.ack.ClusterStateUpdateResponse;
import org.opensearch.cluster.decommission.DecommissionAttributeMetadata;
import org.opensearch.cluster.decommission.DecommissionStatus;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.metadata.WeightedRoutingMetadata;
import org.opensearch.cluster.routing.allocation.decider.AwarenessAllocationDecider;
Expand Down Expand Up @@ -68,6 +70,8 @@ public void registerWeightedRoutingMetadata(
clusterService.submitStateUpdateTask("update_weighted_routing", new ClusterStateUpdateTask(Priority.URGENT) {
@Override
public ClusterState execute(ClusterState currentState) {
// verify currently no decommission action is ongoing
ensureNoOngoingDecommissionAction(currentState);
Metadata metadata = currentState.metadata();
Metadata.Builder mdBuilder = Metadata.builder(currentState.metadata());
WeightedRoutingMetadata weightedRoutingMetadata = metadata.custom(WeightedRoutingMetadata.TYPE);
Expand Down Expand Up @@ -154,4 +158,15 @@ public void verifyAwarenessAttribute(String attributeName) {
throw validationException;
}
}

public void ensureNoOngoingDecommissionAction(ClusterState state) {
DecommissionAttributeMetadata decommissionAttributeMetadata = state.metadata().decommissionAttributeMetadata();
if (decommissionAttributeMetadata != null && decommissionAttributeMetadata.status().equals(DecommissionStatus.FAILED) == false) {
throw new IllegalStateException(
"a decommission action is ongoing with status ["
+ decommissionAttributeMetadata.status().status()
+ "], cannot update weight during this state"
);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,17 @@
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.coordination.CoordinationMetadata;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.metadata.WeightedRoutingMetadata;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.node.DiscoveryNodeRole;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.cluster.routing.WeightedRouting;
import org.opensearch.cluster.routing.allocation.AllocationService;
import org.opensearch.cluster.routing.allocation.decider.AwarenessAllocationDecider;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.test.ClusterServiceUtils;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.test.transport.MockTransport;
import org.opensearch.threadpool.TestThreadPool;
Expand Down Expand Up @@ -169,6 +172,56 @@ public void onFailure(Exception e) {
assertTrue(countDownLatch.await(30, TimeUnit.SECONDS));
}

public void testDecommissionNotStartedWithoutWeighingAwayAttribute_1() throws InterruptedException {
Map<String, Double> weights = Map.of("zone_1", 1.0, "zone_2", 1.0, "zone_3", 0.0);
setWeightedRoutingWeights(weights);
final CountDownLatch countDownLatch = new CountDownLatch(1);
DecommissionAttribute decommissionAttribute = new DecommissionAttribute("zone", "zone_1");
ActionListener<DecommissionResponse> listener = new ActionListener<DecommissionResponse>() {
@Override
public void onResponse(DecommissionResponse decommissionResponse) {
fail("on response shouldn't have been called");
}

@Override
public void onFailure(Exception e) {
assertTrue(e instanceof DecommissioningFailedException);
assertThat(
e.getMessage(),
Matchers.containsString("weight for decommissioned attribute is expected to be [0.0] but found [1.0]")
);
countDownLatch.countDown();
}
};
decommissionService.startDecommissionAction(decommissionAttribute, listener);
assertTrue(countDownLatch.await(30, TimeUnit.SECONDS));
}

public void testDecommissionNotStartedWithoutWeighingAwayAttribute_2() throws InterruptedException {
final CountDownLatch countDownLatch = new CountDownLatch(1);
DecommissionAttribute decommissionAttribute = new DecommissionAttribute("zone", "zone_1");
ActionListener<DecommissionResponse> listener = new ActionListener<DecommissionResponse>() {
@Override
public void onResponse(DecommissionResponse decommissionResponse) {
fail("on response shouldn't have been called");
}

@Override
public void onFailure(Exception e) {
assertTrue(e instanceof DecommissioningFailedException);
assertThat(
e.getMessage(),
Matchers.containsString(
"no weights are set to the attribute. Please set appropriate weights before triggering decommission action"
)
);
countDownLatch.countDown();
}
};
decommissionService.startDecommissionAction(decommissionAttribute, listener);
assertTrue(countDownLatch.await(30, TimeUnit.SECONDS));
}

@SuppressWarnings("unchecked")
public void testDecommissioningFailedWhenAnotherAttributeDecommissioningSuccessful() throws InterruptedException {
final CountDownLatch countDownLatch = new CountDownLatch(1);
Expand Down Expand Up @@ -286,6 +339,17 @@ public void onFailure(Exception e) {
assertTrue(countDownLatch.await(30, TimeUnit.SECONDS));
}

private void setWeightedRoutingWeights(Map<String, Double> weights) {
ClusterState clusterState = clusterService.state();
WeightedRouting weightedRouting = new WeightedRouting("zone", weights);
WeightedRoutingMetadata weightedRoutingMetadata = new WeightedRoutingMetadata(weightedRouting);
Metadata.Builder metadataBuilder = Metadata.builder(clusterState.metadata());
metadataBuilder.putCustom(WeightedRoutingMetadata.TYPE, weightedRoutingMetadata);
clusterState = ClusterState.builder(clusterState).metadata(metadataBuilder).build();
ClusterState.Builder builder = ClusterState.builder(clusterState);
ClusterServiceUtils.setState(clusterService, builder);
}

private ClusterState addDataNodes(ClusterState clusterState, String zone, String... nodeIds) {
DiscoveryNodes.Builder nodeBuilder = DiscoveryNodes.builder(clusterState.nodes());
org.opensearch.common.collect.List.of(nodeIds).forEach(nodeId -> nodeBuilder.add(newDataNode(nodeId, singletonMap("zone", zone))));
Expand Down
Loading

0 comments on commit 3344738

Please sign in to comment.