Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ALLOC: Fail Stale Primary Alloc. Req. without Data #37226

Merged
Original file line number Diff line number Diff line change
Expand Up @@ -21,22 +21,40 @@

import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionListenerResponseHandler;
import org.elasticsearch.action.admin.indices.shards.IndicesShardStoresAction;
import org.elasticsearch.action.admin.indices.shards.IndicesShardStoresRequest;
import org.elasticsearch.action.admin.indices.shards.IndicesShardStoresResponse;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.routing.allocation.RoutingExplanations;
import org.elasticsearch.cluster.routing.allocation.command.AbstractAllocateAllocationCommand;
import org.elasticsearch.cluster.routing.allocation.command.AllocateStalePrimaryAllocationCommand;
import org.elasticsearch.cluster.routing.allocation.command.AllocationCommand;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.ImmutableOpenIntMap;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class TransportClusterRerouteAction extends TransportMasterNodeAction<ClusterRerouteRequest, ClusterRerouteResponse> {

private final AllocationService allocationService;
Expand Down Expand Up @@ -69,18 +87,82 @@ protected ClusterRerouteResponse newResponse() {
@Override
protected void masterOperation(final ClusterRerouteRequest request, final ClusterState state,
final ActionListener<ClusterRerouteResponse> listener) {
ActionListener<ClusterRerouteResponse> logWrapper = ActionListener.wrap(
response -> {
if (request.dryRun() == false) {
response.getExplanations().getYesDecisionMessages().forEach(logger::info);
// Gather all stale primary allocation commands into a map indexed by the index name they correspond to
// so we can check if the nodes they correspond to actually have any data for the shard
Map<String, List<AbstractAllocateAllocationCommand>> stalePrimaryAllocations = null;
for (AllocationCommand command : request.getCommands().commands()) {
if (command instanceof AllocateStalePrimaryAllocationCommand) {
if (stalePrimaryAllocations == null) {
stalePrimaryAllocations = new HashMap<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can afford to make this HashMap eagerly and avoid the noise in the loop.

}
listener.onResponse(response);
},
listener::onFailure
);
final AllocateStalePrimaryAllocationCommand cmd = (AllocateStalePrimaryAllocationCommand) command;
stalePrimaryAllocations.computeIfAbsent(cmd.index(), k -> new ArrayList<>()).add(cmd);
}
}
if (stalePrimaryAllocations == null) {
// We don't have any stale primary allocations, we simply execute the state update task for the requested allocations
submitStateUpdate(request, listener);
} else {
// We get the index shard store status for indices that we want to allocate stale primaries on first to fail requests
// where there's no data for a given shard on a given node.
verifyThenSubmitUpdate(request, listener, stalePrimaryAllocations);
}
}

private void verifyThenSubmitUpdate(ClusterRerouteRequest request, ActionListener<ClusterRerouteResponse> listener,
Map<String, List<AbstractAllocateAllocationCommand>> stalePrimaryAllocations) {
transportService.sendRequest(transportService.getLocalNode(), IndicesShardStoresAction.NAME,
new IndicesShardStoresRequest().indices(stalePrimaryAllocations.keySet().toArray(Strings.EMPTY_ARRAY)),
new ActionListenerResponseHandler<>(
ActionListener.wrap(
response -> {
ImmutableOpenMap<String, ImmutableOpenIntMap<List<IndicesShardStoresResponse.StoreStatus>>> status =
response.getStoreStatuses();
Exception e = null;
for (Map.Entry<String, List<AbstractAllocateAllocationCommand>> entry : stalePrimaryAllocations.entrySet()) {
final String index = entry.getKey();
final ImmutableOpenIntMap<List<IndicesShardStoresResponse.StoreStatus>> indexStatus = status.get(index);
if (indexStatus == null) {
e = ExceptionsHelper.useOrSuppress(e, new IndexNotFoundException(index));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we have a test that hits this branch?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually no ... my bad this one is dead code. The logic in the index shard store status request already checks the index exists.
I added an assertion for this now and a test that makes sure it's not tripped when the index doesn't exist :)

} else {
for (AbstractAllocateAllocationCommand command : entry.getValue()) {
final List<IndicesShardStoresResponse.StoreStatus> shardStatus =
indexStatus.get(command.shardId());
if (shardStatus == null) {
e = ExceptionsHelper.useOrSuppress(e, new IllegalArgumentException(
"No data for shard [" + command.shardId() + "] of index [" + index + "] found on any node")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we have a test that hits this branch?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in ea98277 :)

);
} else if (shardStatus.stream().noneMatch(storeStatus -> {
final DiscoveryNode node = storeStatus.getNode();
final String nodeInCommand = command.node();
return nodeInCommand.equals(node.getName()) || nodeInCommand.equals(node.getId());
})) {
e = ExceptionsHelper.useOrSuppress(e, new IllegalArgumentException(
"No data for shard [" + command.shardId() + "] of index [" + index + "] found on node ["
+ command.node() + ']'));
}
}
}
}
if (e == null) {
submitStateUpdate(request, listener);
} else {
listener.onFailure(e);
}
}, listener::onFailure
), IndicesShardStoresResponse::new));
}

clusterService.submitStateUpdateTask("cluster_reroute (api)", new ClusterRerouteResponseAckedClusterStateUpdateTask(logger,
allocationService, request, logWrapper));
private void submitStateUpdate(final ClusterRerouteRequest request, final ActionListener<ClusterRerouteResponse> listener) {
clusterService.submitStateUpdateTask("cluster_reroute (api)",
new ClusterRerouteResponseAckedClusterStateUpdateTask(logger, allocationService, request,
ActionListener.wrap(
response -> {
if (request.dryRun() == false) {
response.getExplanations().getYesDecisionMessages().forEach(logger::info);
}
listener.onResponse(response);
}, listener::onFailure)));
}

static class ClusterRerouteResponseAckedClusterStateUpdateTask extends AckedClusterStateUpdateTask<ClusterRerouteResponse> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,11 @@ public IndicesShardStoresResponse(ImmutableOpenMap<String, ImmutableOpenIntMap<L
}

IndicesShardStoresResponse() {
this(ImmutableOpenMap.<String, ImmutableOpenIntMap<List<StoreStatus>>>of(), Collections.<Failure>emptyList());
this(ImmutableOpenMap.of(), Collections.emptyList());
}

public IndicesShardStoresResponse(StreamInput in) throws IOException {
readFrom(in);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;

import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
Expand Down Expand Up @@ -135,7 +134,7 @@ public void testFailedRecoveryOnAllocateStalePrimaryRequiresAnotherAllocateStale
assertThat(shardRouting.unassignedInfo().getReason(), equalTo(UnassignedInfo.Reason.ALLOCATION_FAILED));
});

try(Store store = new Store(shardId, indexSettings, new SimpleFSDirectory(indexPath), new DummyShardLock(shardId))) {
try (Store store = new Store(shardId, indexSettings, new SimpleFSDirectory(indexPath), new DummyShardLock(shardId))) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for these two noisy cleanups that snuck into this file

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should back them out for the sake of future git annotate users :)

store.removeCorruptionMarker();
}

Expand Down Expand Up @@ -172,7 +171,7 @@ public void checkHealthStatus(String indexName, ClusterHealthStatus healthStatus
assertThat(indexHealthStatus, is(healthStatus));
}

private int indexDocs(String indexName, Object ... source) throws InterruptedException, ExecutionException {
private int indexDocs(String indexName, Object ... source) throws InterruptedException {
// index some docs in several segments
int numDocs = 0;
for (int k = 0, attempts = randomIntBetween(5, 10); k < attempts; k++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,15 +175,18 @@ public void testFailedAllocationOfStalePrimaryToDataNodeWithNoData() throws Exce
.getShards().get(0).primaryShard().unassignedInfo().getReason(), equalTo(UnassignedInfo.Reason.NODE_LEFT));

logger.info("--> force allocation of stale copy to node that does not have shard copy");
client().admin().cluster().prepareReroute().add(new AllocateStalePrimaryAllocationCommand("test", 0,
dataNodeWithNoShardCopy, true)).get();
Throwable iae = expectThrows(
IllegalArgumentException.class,
() -> client().admin().cluster().prepareReroute().add(new AllocateStalePrimaryAllocationCommand("test", 0,
dataNodeWithNoShardCopy, true)).get());
assertThat(iae.getMessage(), equalTo("No data for shard [0] of index [test] found on node [" + dataNodeWithNoShardCopy + ']'));

logger.info("--> wait until shard is failed and becomes unassigned again");
assertBusy(() ->
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we no longer want this to be an assertBusy() - we expect that the previous reroute didn't do anything, so it should still be in this state from beforehand.

assertTrue(client().admin().cluster().prepareState().get().getState().toString(),
client().admin().cluster().prepareState().get().getState().getRoutingTable().index("test").allPrimaryShardsUnassigned()));
assertThat(client().admin().cluster().prepareState().get().getState().getRoutingTable().index("test")
.getShards().get(0).primaryShard().unassignedInfo().getReason(), equalTo(UnassignedInfo.Reason.ALLOCATION_FAILED));
.getShards().get(0).primaryShard().unassignedInfo().getReason(), equalTo(UnassignedInfo.Reason.NODE_LEFT));
}

public void testForceStaleReplicaToBePromotedToPrimary() throws Exception {
Expand Down