Skip to content

Commit

Permalink
Service Layer changes for Recommission API (#4320)
Browse files Browse the repository at this point in the history
* Recommission API service level changes

Signed-off-by: pranikum <[email protected]>
  • Loading branch information
pranikum authored Oct 6, 2022
1 parent e354461 commit 3186935
Show file tree
Hide file tree
Showing 3 changed files with 136 additions and 1 deletion.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
- Added precommit support for windows ([#4676](https://github.com/opensearch-project/OpenSearch/pull/4676))
- Added release notes for 1.3.6 ([#4681](https://github.com/opensearch-project/OpenSearch/pull/4681))
- Added precommit support for MacOS ([#4682](https://github.com/opensearch-project/OpenSearch/pull/4682))

- Recommission API changes for service layer ([#4320](https://github.com/opensearch-project/OpenSearch/pull/4320))
### Dependencies
- Bumps `log4j-core` from 2.18.0 to 2.19.0
- Bumps `reactor-netty-http` from 1.0.18 to 1.0.23
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.opensearch.OpenSearchTimeoutException;
import org.opensearch.action.ActionListener;
import org.opensearch.action.admin.cluster.decommission.awareness.put.DecommissionResponse;
import org.opensearch.action.support.master.AcknowledgedResponse;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.ClusterStateObserver;
import org.opensearch.cluster.ClusterStateUpdateTask;
Expand Down Expand Up @@ -481,4 +482,53 @@ public void onFailure(Exception e) {
}
};
}

public void startRecommissionAction(final ActionListener<AcknowledgedResponse> listener) {
/*
* For abandoned requests, we might not really know if it actually restored the exclusion list.
* And can land up in cases where even after recommission, exclusions are set(which is unexpected).
* And by definition of OpenSearch - Clusters should have no voting configuration exclusions in normal operation.
* Once the excluded nodes have stopped, clear the voting configuration exclusions with DELETE /_cluster/voting_config_exclusions.
* And hence it is safe to remove the exclusion if any. User should make conscious choice before decommissioning awareness attribute.
*/
decommissionController.clearVotingConfigExclusion(new ActionListener<Void>() {
@Override
public void onResponse(Void unused) {
logger.info("successfully cleared voting config exclusion for deleting the decommission.");
deleteDecommissionState(listener);
}

@Override
public void onFailure(Exception e) {
logger.error("Failure in clearing voting config during delete_decommission request.", e);
listener.onFailure(e);
}
}, false);
}

void deleteDecommissionState(ActionListener<AcknowledgedResponse> listener) {
clusterService.submitStateUpdateTask("delete_decommission_state", new ClusterStateUpdateTask(Priority.URGENT) {
@Override
public ClusterState execute(ClusterState currentState) {
logger.info("Deleting the decommission attribute from the cluster state");
Metadata metadata = currentState.metadata();
Metadata.Builder mdBuilder = Metadata.builder(metadata);
mdBuilder.removeCustom(DecommissionAttributeMetadata.TYPE);
return ClusterState.builder(currentState).metadata(mdBuilder).build();
}

@Override
public void onFailure(String source, Exception e) {
logger.error(() -> new ParameterizedMessage("Failed to clear decommission attribute. [{}]", source), e);
listener.onFailure(e);
}

@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
// Cluster state processed for deleting the decommission attribute.
assert newState.metadata().decommissionAttributeMetadata() == null;
listener.onResponse(new AcknowledgedResponse(true));
}
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,13 @@
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.Before;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
import org.opensearch.Version;
import org.opensearch.action.ActionListener;
import org.opensearch.action.admin.cluster.decommission.awareness.put.DecommissionResponse;
import org.opensearch.action.admin.cluster.configuration.ClearVotingConfigExclusionsRequest;
import org.opensearch.action.support.master.AcknowledgedResponse;
import org.opensearch.cluster.ClusterName;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.coordination.CoordinationMetadata;
Expand All @@ -30,6 +34,7 @@
import org.opensearch.test.transport.MockTransport;
import org.opensearch.threadpool.TestThreadPool;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportResponseHandler;
import org.opensearch.transport.TransportService;

import java.util.Collections;
Expand Down Expand Up @@ -201,6 +206,86 @@ public void onFailure(Exception e) {
assertTrue(countDownLatch.await(30, TimeUnit.SECONDS));
}

public void testClearClusterDecommissionState() throws InterruptedException {
final CountDownLatch countDownLatch = new CountDownLatch(1);
DecommissionAttribute decommissionAttribute = new DecommissionAttribute("zone", "zone-2");
DecommissionAttributeMetadata decommissionAttributeMetadata = new DecommissionAttributeMetadata(
decommissionAttribute,
DecommissionStatus.SUCCESSFUL
);
ClusterState state = ClusterState.builder(new ClusterName("test"))
.metadata(Metadata.builder().putCustom(DecommissionAttributeMetadata.TYPE, decommissionAttributeMetadata).build())
.build();

ActionListener<AcknowledgedResponse> listener = new ActionListener<AcknowledgedResponse>() {
@Override
public void onResponse(AcknowledgedResponse decommissionResponse) {
DecommissionAttributeMetadata metadata = clusterService.state().metadata().custom(DecommissionAttributeMetadata.TYPE);
assertNull(metadata);
countDownLatch.countDown();
}

@Override
public void onFailure(Exception e) {
fail("on failure shouldn't have been called");
countDownLatch.countDown();
}
};

this.decommissionService.deleteDecommissionState(listener);

// Decommission Attribute should be removed.
assertTrue(countDownLatch.await(30, TimeUnit.SECONDS));
}

public void testDeleteDecommissionAttributeClearVotingExclusion() {
TransportService mockTransportService = Mockito.mock(TransportService.class);
Mockito.when(mockTransportService.getLocalNode()).thenReturn(Mockito.mock(DiscoveryNode.class));
DecommissionService decommissionService = new DecommissionService(
Settings.EMPTY,
clusterSettings,
clusterService,
mockTransportService,
threadPool,
allocationService
);
decommissionService.startRecommissionAction(Mockito.mock(ActionListener.class));

ArgumentCaptor<ClearVotingConfigExclusionsRequest> clearVotingConfigExclusionsRequestArgumentCaptor = ArgumentCaptor.forClass(
ClearVotingConfigExclusionsRequest.class
);
Mockito.verify(mockTransportService)
.sendRequest(
Mockito.any(DiscoveryNode.class),
Mockito.anyString(),
clearVotingConfigExclusionsRequestArgumentCaptor.capture(),
Mockito.any(TransportResponseHandler.class)
);

ClearVotingConfigExclusionsRequest request = clearVotingConfigExclusionsRequestArgumentCaptor.getValue();
assertFalse(request.getWaitForRemoval());
}

public void testClusterUpdateTaskForDeletingDecommission() throws InterruptedException {
final CountDownLatch countDownLatch = new CountDownLatch(1);
ActionListener<AcknowledgedResponse> listener = new ActionListener<>() {
@Override
public void onResponse(AcknowledgedResponse response) {
assertTrue(response.isAcknowledged());
assertNull(clusterService.state().metadata().decommissionAttributeMetadata());
countDownLatch.countDown();
}

@Override
public void onFailure(Exception e) {
fail("On Failure shouldn't have been called");
countDownLatch.countDown();
}
};
decommissionService.deleteDecommissionState(listener);
assertTrue(countDownLatch.await(30, TimeUnit.SECONDS));
}

private ClusterState addDataNodes(ClusterState clusterState, String zone, String... nodeIds) {
DiscoveryNodes.Builder nodeBuilder = DiscoveryNodes.builder(clusterState.nodes());
org.opensearch.common.collect.List.of(nodeIds).forEach(nodeId -> nodeBuilder.add(newDataNode(nodeId, singletonMap("zone", zone))));
Expand Down

0 comments on commit 3186935

Please sign in to comment.