Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ALLOC: Fail Stale Primary Alloc. Req. without Data #37226

Merged
Original file line number Diff line number Diff line change
Expand Up @@ -21,22 +21,39 @@

import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionListenerResponseHandler;
import org.elasticsearch.action.admin.indices.shards.IndicesShardStoresAction;
import org.elasticsearch.action.admin.indices.shards.IndicesShardStoresRequest;
import org.elasticsearch.action.admin.indices.shards.IndicesShardStoresResponse;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.routing.allocation.RoutingExplanations;
import org.elasticsearch.cluster.routing.allocation.command.AbstractAllocateAllocationCommand;
import org.elasticsearch.cluster.routing.allocation.command.AllocateStalePrimaryAllocationCommand;
import org.elasticsearch.cluster.routing.allocation.command.AllocationCommand;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.ImmutableOpenIntMap;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class TransportClusterRerouteAction extends TransportMasterNodeAction<ClusterRerouteRequest, ClusterRerouteResponse> {

private final AllocationService allocationService;
Expand Down Expand Up @@ -69,18 +86,71 @@ protected ClusterRerouteResponse newResponse() {
@Override
protected void masterOperation(final ClusterRerouteRequest request, final ClusterState state,
final ActionListener<ClusterRerouteResponse> listener) {
ActionListener<ClusterRerouteResponse> logWrapper = ActionListener.wrap(
response -> {
if (request.dryRun() == false) {
response.getExplanations().getYesDecisionMessages().forEach(logger::info);
}
listener.onResponse(response);
},
listener::onFailure
);

clusterService.submitStateUpdateTask("cluster_reroute (api)", new ClusterRerouteResponseAckedClusterStateUpdateTask(logger,
allocationService, request, logWrapper));
Map<String, List<AbstractAllocateAllocationCommand>> stalePrimaryAllocations = new HashMap<>();
for (AllocationCommand command : request.getCommands().commands()) {
if (command instanceof AllocateStalePrimaryAllocationCommand) {
final AllocateStalePrimaryAllocationCommand cmd = (AllocateStalePrimaryAllocationCommand) command;
stalePrimaryAllocations.computeIfAbsent(cmd.index(), k -> new ArrayList<>()).add(cmd);
}
}
if (stalePrimaryAllocations.isEmpty()) {
submitStateUpdate(request, listener);
} else {
verifyThenSubmitUpdate(request, listener, stalePrimaryAllocations);
}
}

private void verifyThenSubmitUpdate(ClusterRerouteRequest request, ActionListener<ClusterRerouteResponse> listener,
Map<String, List<AbstractAllocateAllocationCommand>> stalePrimaryAllocations) {
transportService.sendRequest(transportService.getLocalNode(), IndicesShardStoresAction.NAME,
new IndicesShardStoresRequest().indices(stalePrimaryAllocations.keySet().toArray(Strings.EMPTY_ARRAY)),
new ActionListenerResponseHandler<>(
ActionListener.wrap(
response -> {
ImmutableOpenMap<String, ImmutableOpenIntMap<List<IndicesShardStoresResponse.StoreStatus>>> status =
response.getStoreStatuses();
Exception e = null;
for (Map.Entry<String, List<AbstractAllocateAllocationCommand>> entry : stalePrimaryAllocations.entrySet()) {
final String index = entry.getKey();
final ImmutableOpenIntMap<List<IndicesShardStoresResponse.StoreStatus>> indexStatus = status.get(index);
assert indexStatus != null;
for (AbstractAllocateAllocationCommand command : entry.getValue()) {
final List<IndicesShardStoresResponse.StoreStatus> shardStatus =
indexStatus.get(command.shardId());
if (shardStatus == null || shardStatus.isEmpty()) {
e = ExceptionsHelper.useOrSuppress(e, new IllegalArgumentException(
"No data for shard [" + command.shardId() + "] of index [" + index + "] found on any node")
);
} else if (shardStatus.stream().noneMatch(storeStatus -> {
final DiscoveryNode node = storeStatus.getNode();
final String nodeInCommand = command.node();
return nodeInCommand.equals(node.getName()) || nodeInCommand.equals(node.getId());
})) {
e = ExceptionsHelper.useOrSuppress(e, new IllegalArgumentException(
"No data for shard [" + command.shardId() + "] of index [" + index + "] found on node ["
+ command.node() + ']'));
}
}
}
if (e == null) {
submitStateUpdate(request, listener);
} else {
listener.onFailure(e);
}
}, listener::onFailure
), IndicesShardStoresResponse::new));
}

private void submitStateUpdate(final ClusterRerouteRequest request, final ActionListener<ClusterRerouteResponse> listener) {
clusterService.submitStateUpdateTask("cluster_reroute (api)",
new ClusterRerouteResponseAckedClusterStateUpdateTask(logger, allocationService, request,
ActionListener.wrap(
response -> {
if (request.dryRun() == false) {
response.getExplanations().getYesDecisionMessages().forEach(logger::info);
}
listener.onResponse(response);
}, listener::onFailure)));
}

static class ClusterRerouteResponseAckedClusterStateUpdateTask extends AckedClusterStateUpdateTask<ClusterRerouteResponse> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,11 @@ public IndicesShardStoresResponse(ImmutableOpenMap<String, ImmutableOpenIntMap<L
}

IndicesShardStoresResponse() {
this(ImmutableOpenMap.<String, ImmutableOpenIntMap<List<StoreStatus>>>of(), Collections.<Failure>emptyList());
this(ImmutableOpenMap.of(), Collections.emptyList());
}

public IndicesShardStoresResponse(StreamInput in) throws IOException {
readFrom(in);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.gateway.GatewayAllocator;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.EngineTestCase;
import org.elasticsearch.index.shard.IndexShard;
Expand All @@ -50,6 +51,7 @@
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.elasticsearch.test.transport.MockTransportService;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
Expand Down Expand Up @@ -175,15 +177,17 @@ public void testFailedAllocationOfStalePrimaryToDataNodeWithNoData() throws Exce
.getShards().get(0).primaryShard().unassignedInfo().getReason(), equalTo(UnassignedInfo.Reason.NODE_LEFT));

logger.info("--> force allocation of stale copy to node that does not have shard copy");
client().admin().cluster().prepareReroute().add(new AllocateStalePrimaryAllocationCommand("test", 0,
dataNodeWithNoShardCopy, true)).get();
Throwable iae = expectThrows(
IllegalArgumentException.class,
() -> client().admin().cluster().prepareReroute().add(new AllocateStalePrimaryAllocationCommand("test", 0,
dataNodeWithNoShardCopy, true)).get());
assertThat(iae.getMessage(), equalTo("No data for shard [0] of index [test] found on any node"));

logger.info("--> wait until shard is failed and becomes unassigned again");
assertBusy(() ->
assertTrue(client().admin().cluster().prepareState().get().getState().toString(),
client().admin().cluster().prepareState().get().getState().getRoutingTable().index("test").allPrimaryShardsUnassigned()));
assertTrue(client().admin().cluster().prepareState().get().getState().toString(),
client().admin().cluster().prepareState().get().getState().getRoutingTable().index("test").allPrimaryShardsUnassigned());
assertThat(client().admin().cluster().prepareState().get().getState().getRoutingTable().index("test")
.getShards().get(0).primaryShard().unassignedInfo().getReason(), equalTo(UnassignedInfo.Reason.ALLOCATION_FAILED));
.getShards().get(0).primaryShard().unassignedInfo().getReason(), equalTo(UnassignedInfo.Reason.NODE_LEFT));
}

public void testForceStaleReplicaToBePromotedToPrimary() throws Exception {
Expand Down Expand Up @@ -261,6 +265,43 @@ public void testForceStaleReplicaToBePromotedToPrimary() throws Exception {
assertThat(newHistoryUUIds, hasSize(1));
}

public void testForceStaleReplicaToBePromotedToPrimaryOnWrongNode() throws Exception {
String master = internalCluster().startMasterOnlyNode(Settings.EMPTY);
internalCluster().startDataOnlyNodes(2);
final String idxName = "test";
assertAcked(client().admin().indices().prepareCreate(idxName)
.setSettings(Settings.builder().put("index.number_of_shards", 1)
.put("index.number_of_replicas", 1)).get());
ensureGreen();
createStaleReplicaScenario(master);
internalCluster().startDataOnlyNodes(2);
final int shardId = 0;
final List<String> nodeNames = new ArrayList<>(Arrays.asList(internalCluster().getNodeNames()));
nodeNames.remove(master);
client().admin().indices().prepareShardStores(idxName).get().getStoreStatuses().get(idxName)
.get(shardId).forEach(status -> nodeNames.remove(status.getNode().getName()));
assertThat(nodeNames, hasSize(1));
final String nodeWithoutData = nodeNames.get(0);
Throwable iae = expectThrows(
IllegalArgumentException.class,
() -> client().admin().cluster().prepareReroute()
.add(new AllocateStalePrimaryAllocationCommand(idxName, shardId, nodeWithoutData, true)).get());
assertThat(
iae.getMessage(),
equalTo("No data for shard [" + shardId + "] of index [" + idxName + "] found on node [" + nodeWithoutData + ']'));
}

public void testForceStaleReplicaToBePromotedForMissingIndex() {
internalCluster().startMasterOnlyNode(Settings.EMPTY);
final String dataNode = internalCluster().startDataOnlyNode();
final String idxName = "test";
IndexNotFoundException ex = expectThrows(
IndexNotFoundException.class,
() -> client().admin().cluster().prepareReroute()
.add(new AllocateStalePrimaryAllocationCommand(idxName, 0, dataNode, true)).get());
assertThat(ex.getIndex().getName(), equalTo(idxName));
}

public void testForcePrimaryShardIfAllocationDecidersSayNoAfterIndexCreation() throws ExecutionException, InterruptedException {
String node = internalCluster().startNode();
client().admin().indices().prepareCreate("test").setWaitForActiveShards(ActiveShardCount.NONE).setSettings(Settings.builder()
Expand Down