Skip to content

Commit

Permalink
Fail Stale Primary Alloc. Req. without Data (#37226)
Browse files Browse the repository at this point in the history
* Get indices shard store status before enqueuing the reallocation state update task to prevent
tasks that would fail because a node does not hold a stale copy of the shard on a best effort basis
* Closes #37098
  • Loading branch information
original-brownbear authored Jan 10, 2019
1 parent 6d81e7c commit 46237fa
Show file tree
Hide file tree
Showing 3 changed files with 134 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,22 +21,39 @@

import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionListenerResponseHandler;
import org.elasticsearch.action.admin.indices.shards.IndicesShardStoresAction;
import org.elasticsearch.action.admin.indices.shards.IndicesShardStoresRequest;
import org.elasticsearch.action.admin.indices.shards.IndicesShardStoresResponse;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.routing.allocation.RoutingExplanations;
import org.elasticsearch.cluster.routing.allocation.command.AbstractAllocateAllocationCommand;
import org.elasticsearch.cluster.routing.allocation.command.AllocateStalePrimaryAllocationCommand;
import org.elasticsearch.cluster.routing.allocation.command.AllocationCommand;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.ImmutableOpenIntMap;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class TransportClusterRerouteAction extends TransportMasterNodeAction<ClusterRerouteRequest, ClusterRerouteResponse> {

private final AllocationService allocationService;
Expand Down Expand Up @@ -69,18 +86,71 @@ protected ClusterRerouteResponse newResponse() {
@Override
protected void masterOperation(final ClusterRerouteRequest request, final ClusterState state,
final ActionListener<ClusterRerouteResponse> listener) {
ActionListener<ClusterRerouteResponse> logWrapper = ActionListener.wrap(
response -> {
if (request.dryRun() == false) {
response.getExplanations().getYesDecisionMessages().forEach(logger::info);
}
listener.onResponse(response);
},
listener::onFailure
);

clusterService.submitStateUpdateTask("cluster_reroute (api)", new ClusterRerouteResponseAckedClusterStateUpdateTask(logger,
allocationService, request, logWrapper));
Map<String, List<AbstractAllocateAllocationCommand>> stalePrimaryAllocations = new HashMap<>();
for (AllocationCommand command : request.getCommands().commands()) {
if (command instanceof AllocateStalePrimaryAllocationCommand) {
final AllocateStalePrimaryAllocationCommand cmd = (AllocateStalePrimaryAllocationCommand) command;
stalePrimaryAllocations.computeIfAbsent(cmd.index(), k -> new ArrayList<>()).add(cmd);
}
}
if (stalePrimaryAllocations.isEmpty()) {
submitStateUpdate(request, listener);
} else {
verifyThenSubmitUpdate(request, listener, stalePrimaryAllocations);
}
}

private void verifyThenSubmitUpdate(ClusterRerouteRequest request, ActionListener<ClusterRerouteResponse> listener,
Map<String, List<AbstractAllocateAllocationCommand>> stalePrimaryAllocations) {
transportService.sendRequest(transportService.getLocalNode(), IndicesShardStoresAction.NAME,
new IndicesShardStoresRequest().indices(stalePrimaryAllocations.keySet().toArray(Strings.EMPTY_ARRAY)),
new ActionListenerResponseHandler<>(
ActionListener.wrap(
response -> {
ImmutableOpenMap<String, ImmutableOpenIntMap<List<IndicesShardStoresResponse.StoreStatus>>> status =
response.getStoreStatuses();
Exception e = null;
for (Map.Entry<String, List<AbstractAllocateAllocationCommand>> entry : stalePrimaryAllocations.entrySet()) {
final String index = entry.getKey();
final ImmutableOpenIntMap<List<IndicesShardStoresResponse.StoreStatus>> indexStatus = status.get(index);
assert indexStatus != null;
for (AbstractAllocateAllocationCommand command : entry.getValue()) {
final List<IndicesShardStoresResponse.StoreStatus> shardStatus =
indexStatus.get(command.shardId());
if (shardStatus == null || shardStatus.isEmpty()) {
e = ExceptionsHelper.useOrSuppress(e, new IllegalArgumentException(
"No data for shard [" + command.shardId() + "] of index [" + index + "] found on any node")
);
} else if (shardStatus.stream().noneMatch(storeStatus -> {
final DiscoveryNode node = storeStatus.getNode();
final String nodeInCommand = command.node();
return nodeInCommand.equals(node.getName()) || nodeInCommand.equals(node.getId());
})) {
e = ExceptionsHelper.useOrSuppress(e, new IllegalArgumentException(
"No data for shard [" + command.shardId() + "] of index [" + index + "] found on node ["
+ command.node() + ']'));
}
}
}
if (e == null) {
submitStateUpdate(request, listener);
} else {
listener.onFailure(e);
}
}, listener::onFailure
), IndicesShardStoresResponse::new));
}

private void submitStateUpdate(final ClusterRerouteRequest request, final ActionListener<ClusterRerouteResponse> listener) {
clusterService.submitStateUpdateTask("cluster_reroute (api)",
new ClusterRerouteResponseAckedClusterStateUpdateTask(logger, allocationService, request,
ActionListener.wrap(
response -> {
if (request.dryRun() == false) {
response.getExplanations().getYesDecisionMessages().forEach(logger::info);
}
listener.onResponse(response);
}, listener::onFailure)));
}

static class ClusterRerouteResponseAckedClusterStateUpdateTask extends AckedClusterStateUpdateTask<ClusterRerouteResponse> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,11 @@ public IndicesShardStoresResponse(ImmutableOpenMap<String, ImmutableOpenIntMap<L
}

IndicesShardStoresResponse() {
this(ImmutableOpenMap.<String, ImmutableOpenIntMap<List<StoreStatus>>>of(), Collections.<Failure>emptyList());
this(ImmutableOpenMap.of(), Collections.emptyList());
}

public IndicesShardStoresResponse(StreamInput in) throws IOException {
readFrom(in);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.gateway.GatewayAllocator;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.EngineTestCase;
import org.elasticsearch.index.shard.IndexShard;
Expand All @@ -50,6 +51,7 @@
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.elasticsearch.test.transport.MockTransportService;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
Expand Down Expand Up @@ -175,15 +177,17 @@ public void testFailedAllocationOfStalePrimaryToDataNodeWithNoData() throws Exce
.getShards().get(0).primaryShard().unassignedInfo().getReason(), equalTo(UnassignedInfo.Reason.NODE_LEFT));

logger.info("--> force allocation of stale copy to node that does not have shard copy");
client().admin().cluster().prepareReroute().add(new AllocateStalePrimaryAllocationCommand("test", 0,
dataNodeWithNoShardCopy, true)).get();
Throwable iae = expectThrows(
IllegalArgumentException.class,
() -> client().admin().cluster().prepareReroute().add(new AllocateStalePrimaryAllocationCommand("test", 0,
dataNodeWithNoShardCopy, true)).get());
assertThat(iae.getMessage(), equalTo("No data for shard [0] of index [test] found on any node"));

logger.info("--> wait until shard is failed and becomes unassigned again");
assertBusy(() ->
assertTrue(client().admin().cluster().prepareState().get().getState().toString(),
client().admin().cluster().prepareState().get().getState().getRoutingTable().index("test").allPrimaryShardsUnassigned()));
assertTrue(client().admin().cluster().prepareState().get().getState().toString(),
client().admin().cluster().prepareState().get().getState().getRoutingTable().index("test").allPrimaryShardsUnassigned());
assertThat(client().admin().cluster().prepareState().get().getState().getRoutingTable().index("test")
.getShards().get(0).primaryShard().unassignedInfo().getReason(), equalTo(UnassignedInfo.Reason.ALLOCATION_FAILED));
.getShards().get(0).primaryShard().unassignedInfo().getReason(), equalTo(UnassignedInfo.Reason.NODE_LEFT));
}

public void testForceStaleReplicaToBePromotedToPrimary() throws Exception {
Expand Down Expand Up @@ -261,6 +265,43 @@ public void testForceStaleReplicaToBePromotedToPrimary() throws Exception {
assertThat(newHistoryUUIds, hasSize(1));
}

public void testForceStaleReplicaToBePromotedToPrimaryOnWrongNode() throws Exception {
String master = internalCluster().startMasterOnlyNode(Settings.EMPTY);
internalCluster().startDataOnlyNodes(2);
final String idxName = "test";
assertAcked(client().admin().indices().prepareCreate(idxName)
.setSettings(Settings.builder().put("index.number_of_shards", 1)
.put("index.number_of_replicas", 1)).get());
ensureGreen();
createStaleReplicaScenario(master);
internalCluster().startDataOnlyNodes(2);
final int shardId = 0;
final List<String> nodeNames = new ArrayList<>(Arrays.asList(internalCluster().getNodeNames()));
nodeNames.remove(master);
client().admin().indices().prepareShardStores(idxName).get().getStoreStatuses().get(idxName)
.get(shardId).forEach(status -> nodeNames.remove(status.getNode().getName()));
assertThat(nodeNames, hasSize(1));
final String nodeWithoutData = nodeNames.get(0);
Throwable iae = expectThrows(
IllegalArgumentException.class,
() -> client().admin().cluster().prepareReroute()
.add(new AllocateStalePrimaryAllocationCommand(idxName, shardId, nodeWithoutData, true)).get());
assertThat(
iae.getMessage(),
equalTo("No data for shard [" + shardId + "] of index [" + idxName + "] found on node [" + nodeWithoutData + ']'));
}

public void testForceStaleReplicaToBePromotedForMissingIndex() {
internalCluster().startMasterOnlyNode(Settings.EMPTY);
final String dataNode = internalCluster().startDataOnlyNode();
final String idxName = "test";
IndexNotFoundException ex = expectThrows(
IndexNotFoundException.class,
() -> client().admin().cluster().prepareReroute()
.add(new AllocateStalePrimaryAllocationCommand(idxName, 0, dataNode, true)).get());
assertThat(ex.getIndex().getName(), equalTo(idxName));
}

public void testForcePrimaryShardIfAllocationDecidersSayNoAfterIndexCreation() throws ExecutionException, InterruptedException {
String node = internalCluster().startNode();
client().admin().indices().prepareCreate("test").setWaitForActiveShards(ActiveShardCount.NONE).setSettings(Settings.builder()
Expand Down

0 comments on commit 46237fa

Please sign in to comment.