Skip to content

Commit

Permalink
Avoid overshooting watermarks during relocation (elastic#46079)
Browse files Browse the repository at this point in the history
Today the `DiskThresholdDecider` attempts to account for already-relocating
shards when deciding how to allocate or relocate a shard. Its goal is to stop
relocating shards onto a node before that node exceeds the low watermark, and
to stop relocating shards away from a node as soon as the node drops below the
high watermark.

The decider handles multiple data paths by only accounting for relocating
shards that affect the appropriate data path. However, this mechanism does not
correctly account for _new_ relocating shards, which are unwittingly ignored.
This means that we may evict far too many shards from a node above the high
watermark, and may relocate far too many shards onto a node causing it to blow
right past the low watermark and potentially other watermarks too.

There are in fact two distinct issues that this PR fixes. New incoming shards
have an unknown data path until the `ClusterInfoService` refreshes its
statistics. New outgoing shards have a known data path, but we fail to account
for the change of the corresponding `ShardRouting` from `STARTED` to
`RELOCATING`, meaning that we fail to find the correct data path and treat the
path as unknown here too.

This PR also reworks the `MockDiskUsagesIT` test to avoid using fake data paths
for all shards. With the changes here, the data paths are handled in tests as
they are in production, except that their sizes are fake.

Fixes elastic#45177
  • Loading branch information
DaveCTurner authored and jkakavas committed Sep 2, 2019
1 parent 3d9b333 commit 7ac8396
Show file tree
Hide file tree
Showing 7 changed files with 422 additions and 211 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,8 @@
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.ReceiveTimeoutTransportException;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
Expand Down Expand Up @@ -88,7 +87,7 @@ public class InternalClusterInfoService implements ClusterInfoService, LocalNode
private final ClusterService clusterService;
private final ThreadPool threadPool;
private final NodeClient client;
private final List<Consumer<ClusterInfo>> listeners = Collections.synchronizedList(new ArrayList<>(1));
private final List<Consumer<ClusterInfo>> listeners = new CopyOnWriteArrayList<>();

public InternalClusterInfoService(Settings settings, ClusterService clusterService, ThreadPool threadPool, NodeClient client) {
this.leastAvailableSpaceUsages = ImmutableOpenMap.of();
Expand Down Expand Up @@ -275,6 +274,11 @@ private void maybeRefresh() {
}
}

// allow tests to adjust the node stats on receipt
List<NodeStats> adjustNodesStats(List<NodeStats> nodeStats) {
return nodeStats;
}

/**
* Refreshes the ClusterInfo in a blocking fashion
*/
Expand All @@ -284,12 +288,13 @@ public final ClusterInfo refresh() {
}
final CountDownLatch nodeLatch = updateNodeStats(new ActionListener<NodesStatsResponse>() {
@Override
public void onResponse(NodesStatsResponse nodeStatses) {
ImmutableOpenMap.Builder<String, DiskUsage> newLeastAvaiableUsages = ImmutableOpenMap.builder();
ImmutableOpenMap.Builder<String, DiskUsage> newMostAvaiableUsages = ImmutableOpenMap.builder();
fillDiskUsagePerNode(logger, nodeStatses.getNodes(), newLeastAvaiableUsages, newMostAvaiableUsages);
leastAvailableSpaceUsages = newLeastAvaiableUsages.build();
mostAvailableSpaceUsages = newMostAvaiableUsages.build();
public void onResponse(NodesStatsResponse nodesStatsResponse) {
ImmutableOpenMap.Builder<String, DiskUsage> leastAvailableUsagesBuilder = ImmutableOpenMap.builder();
ImmutableOpenMap.Builder<String, DiskUsage> mostAvailableUsagesBuilder = ImmutableOpenMap.builder();
fillDiskUsagePerNode(logger, adjustNodesStats(nodesStatsResponse.getNodes()),
leastAvailableUsagesBuilder, mostAvailableUsagesBuilder);
leastAvailableSpaceUsages = leastAvailableUsagesBuilder.build();
mostAvailableSpaceUsages = mostAvailableUsagesBuilder.build();
}

@Override
Expand Down Expand Up @@ -402,7 +407,7 @@ static void fillDiskUsagePerNode(Logger logger, List<NodeStats> nodeStatsArray,
if (leastAvailablePath == null) {
assert mostAvailablePath == null;
mostAvailablePath = leastAvailablePath = info;
} else if (leastAvailablePath.getAvailable().getBytes() > info.getAvailable().getBytes()){
} else if (leastAvailablePath.getAvailable().getBytes() > info.getAvailable().getBytes()) {
leastAvailablePath = info;
} else if (mostAvailablePath.getAvailable().getBytes() < info.getAvailable().getBytes()) {
mostAvailablePath = info;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,8 @@ public void onNewInfo(ClusterInfo info) {
.collect(Collectors.toSet());

if (indicesToAutoRelease.isEmpty() == false) {
logger.info("releasing read-only block on indices " + indicesToAutoRelease
+ " since they are now allocated to nodes with sufficient disk space");
updateIndicesReadOnly(indicesToAutoRelease, listener, false);
} else {
listener.onResponse(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,16 +90,36 @@ static long sizeOfRelocatingShards(RoutingNode node, RoutingAllocation allocatio
boolean subtractShardsMovingAway, String dataPath) {
ClusterInfo clusterInfo = allocation.clusterInfo();
long totalSize = 0;
for (ShardRouting routing : node.shardsWithState(ShardRoutingState.RELOCATING, ShardRoutingState.INITIALIZING)) {
String actualPath = clusterInfo.getDataPath(routing);
if (dataPath.equals(actualPath)) {
if (routing.initializing() && routing.relocatingNodeId() != null) {
totalSize += getExpectedShardSize(routing, allocation, 0);
} else if (subtractShardsMovingAway && routing.relocating()) {

for (ShardRouting routing : node.shardsWithState(ShardRoutingState.INITIALIZING)) {
if (routing.relocatingNodeId() == null) {
// in practice the only initializing-but-not-relocating shards with a nonzero expected shard size will be ones created
// by a resize (shrink/split/clone) operation which we expect to happen using hard links, so they shouldn't be taking
// any additional space and can be ignored here
continue;
}

final String actualPath = clusterInfo.getDataPath(routing);
// if we don't yet know the actual path of the incoming shard then conservatively assume it's going to the path with the least
// free space
if (actualPath == null || actualPath.equals(dataPath)) {
totalSize += getExpectedShardSize(routing, allocation, 0);
}
}

if (subtractShardsMovingAway) {
for (ShardRouting routing : node.shardsWithState(ShardRoutingState.RELOCATING)) {
String actualPath = clusterInfo.getDataPath(routing);
if (actualPath == null) {
// we might know the path of this shard from before when it was relocating
actualPath = clusterInfo.getDataPath(routing.cancelRelocation());
}
if (dataPath.equals(actualPath)) {
totalSize -= getExpectedShardSize(routing, allocation, 0);
}
}
}

return totalSize;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.DiskUsage;
import org.elasticsearch.cluster.ESAllocationTestCase;
import org.elasticsearch.cluster.MockInternalClusterInfoService.DevNullClusterInfo;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
Expand Down Expand Up @@ -1002,4 +1001,20 @@ public void logShardStates(ClusterState state) {
rn.shardsWithState(RELOCATING),
rn.shardsWithState(STARTED));
}

/**
* ClusterInfo that always reports /dev/null for the shards' data paths.
*/
static class DevNullClusterInfo extends ClusterInfo {
DevNullClusterInfo(ImmutableOpenMap<String, DiskUsage> leastAvailableSpaceUsage,
ImmutableOpenMap<String, DiskUsage> mostAvailableSpaceUsage,
ImmutableOpenMap<String, Long> shardSizes) {
super(leastAvailableSpaceUsage, mostAvailableSpaceUsage, shardSizes, null);
}

@Override
public String getDataPath(ShardRouting shardRouting) {
return "/dev/null";
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.DiskUsage;
import org.elasticsearch.cluster.ESAllocationTestCase;
import org.elasticsearch.cluster.MockInternalClusterInfoService.DevNullClusterInfo;
import org.elasticsearch.cluster.routing.allocation.decider.DiskThresholdDeciderTests.DevNullClusterInfo;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
Expand Down
Loading

0 comments on commit 7ac8396

Please sign in to comment.