Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Caching avg total bytes and avg free bytes inside ClusterInfo #14851

Merged
merged 1 commit into from
Jul 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions server/src/main/java/org/opensearch/cluster/ClusterInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
package org.opensearch.cluster;

import org.opensearch.Version;
import org.opensearch.cluster.routing.RoutingNode;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.common.annotation.PublicApi;
import org.opensearch.core.common.io.stream.StreamInput;
Expand Down Expand Up @@ -68,6 +69,8 @@ public class ClusterInfo implements ToXContentFragment, Writeable {
final Map<ShardRouting, String> routingToDataPath;
final Map<NodeAndPath, ReservedSpace> reservedSpace;
final Map<String, FileCacheStats> nodeFileCacheStats;
private long avgTotalBytes;
private long avgFreeByte;

protected ClusterInfo() {
this(Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), Map.of());
Expand Down Expand Up @@ -97,6 +100,7 @@ public ClusterInfo(
this.routingToDataPath = routingToDataPath;
this.reservedSpace = reservedSpace;
this.nodeFileCacheStats = nodeFileCacheStats;
calculateAvgFreeAndTotalBytes(mostAvailableSpaceUsage);
}

public ClusterInfo(StreamInput in) throws IOException {
Expand All @@ -117,6 +121,39 @@ public ClusterInfo(StreamInput in) throws IOException {
} else {
this.nodeFileCacheStats = Map.of();
}

calculateAvgFreeAndTotalBytes(mostAvailableSpaceUsage);
}

/**
* Returns a {@link DiskUsage} for the {@link RoutingNode} using the
* average usage of other nodes in the disk usage map.
* @param usages Map of nodeId to DiskUsage for all known nodes
*/
private void calculateAvgFreeAndTotalBytes(final Map<String, DiskUsage> usages) {
Bukhtawar marked this conversation as resolved.
Show resolved Hide resolved
if (usages == null || usages.isEmpty()) {
this.avgTotalBytes = 0;
this.avgFreeByte = 0;
return;
}

long totalBytes = 0;
long freeBytes = 0;
for (DiskUsage du : usages.values()) {
totalBytes += du.getTotalBytes();
freeBytes += du.getFreeBytes();
}

this.avgTotalBytes = totalBytes / usages.size();
this.avgFreeByte = freeBytes / usages.size();
}

public long getAvgFreeByte() {
return avgFreeByte;
}

public long getAvgTotalBytes() {
return avgTotalBytes;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,9 +140,8 @@ public static long sizeOfRelocatingShards(

// Where reserved space is unavailable (e.g. stats are out-of-sync) compute a conservative estimate for initialising shards
final List<ShardRouting> initializingShards = node.shardsWithState(ShardRoutingState.INITIALIZING);
initializingShards.removeIf(shardRouting -> reservedSpace.containsShardId(shardRouting.shardId()));
for (ShardRouting routing : initializingShards) {
if (routing.relocatingNodeId() == null) {
if (routing.relocatingNodeId() == null || reservedSpace.containsShardId(routing.shardId())) {
// in practice the only initializing-but-not-relocating shards with a nonzero expected shard size will be ones created
// by a resize (shrink/split/clone) operation which we expect to happen using hard links, so they shouldn't be taking
// any additional space and can be ignored here
Expand Down Expand Up @@ -230,7 +229,14 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing

// subtractLeavingShards is passed as false here, because they still use disk space, and therefore we should be extra careful
// and take the size into account
final DiskUsageWithRelocations usage = getDiskUsage(node, allocation, usages, false);
final DiskUsageWithRelocations usage = getDiskUsage(
node,
allocation,
usages,
clusterInfo.getAvgFreeByte(),
clusterInfo.getAvgTotalBytes(),
false
);
// First, check that the node currently over the low watermark
double freeDiskPercentage = usage.getFreeDiskAsPercentage();
// Cache the used disk percentage for displaying disk percentages consistent with documentation
Expand Down Expand Up @@ -492,7 +498,14 @@ public Decision canRemain(ShardRouting shardRouting, RoutingNode node, RoutingAl

// subtractLeavingShards is passed as true here, since this is only for shards remaining, we will *eventually* have enough disk
// since shards are moving away. No new shards will be incoming since in canAllocate we pass false for this check.
final DiskUsageWithRelocations usage = getDiskUsage(node, allocation, usages, true);
final DiskUsageWithRelocations usage = getDiskUsage(
node,
allocation,
usages,
clusterInfo.getAvgFreeByte(),
clusterInfo.getAvgTotalBytes(),
true
);
final String dataPath = clusterInfo.getDataPath(shardRouting);
// If this node is already above the high threshold, the shard cannot remain (get it off!)
final double freeDiskPercentage = usage.getFreeDiskAsPercentage();
Expand Down Expand Up @@ -581,13 +594,15 @@ private DiskUsageWithRelocations getDiskUsage(
RoutingNode node,
RoutingAllocation allocation,
final Map<String, DiskUsage> usages,
final long avgFreeBytes,
final long avgTotalBytes,
boolean subtractLeavingShards
) {
DiskUsage usage = usages.get(node.nodeId());
if (usage == null) {
// If there is no usage, and we have other nodes in the cluster,
// use the average usage for all nodes as the usage for this node
usage = averageUsage(node, usages);
RS146BIJAY marked this conversation as resolved.
Show resolved Hide resolved
usage = new DiskUsage(node.nodeId(), node.node().getName(), "_na_", avgTotalBytes, avgFreeBytes);
if (logger.isDebugEnabled()) {
logger.debug(
"unable to determine disk usage for {}, defaulting to average across nodes [{} total] [{} free] [{}% free]",
Expand Down Expand Up @@ -619,26 +634,6 @@ private DiskUsageWithRelocations getDiskUsage(
return diskUsageWithRelocations;
}

/**
* Returns a {@link DiskUsage} for the {@link RoutingNode} using the
* average usage of other nodes in the disk usage map.
* @param node Node to return an averaged DiskUsage object for
* @param usages Map of nodeId to DiskUsage for all known nodes
* @return DiskUsage representing given node using the average disk usage
*/
DiskUsage averageUsage(RoutingNode node, final Map<String, DiskUsage> usages) {
if (usages.size() == 0) {
return new DiskUsage(node.nodeId(), node.node().getName(), "_na_", 0, 0);
}
long totalBytes = 0;
long freeBytes = 0;
for (DiskUsage du : usages.values()) {
totalBytes += du.getTotalBytes();
freeBytes += du.getFreeBytes();
}
return new DiskUsage(node.nodeId(), node.node().getName(), "_na_", totalBytes / usages.size(), freeBytes / usages.size());
}

/**
* Given the DiskUsage for a node and the size of the shard, return the
* percentage of free disk if the shard were to be allocated to the node.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -863,19 +863,6 @@ public void testUnknownDiskUsage() {
assertThat(clusterState.getRoutingNodes().node("node1").size(), equalTo(1));
}

public void testAverageUsage() {
RoutingNode rn = new RoutingNode("node1", newNode("node1"));
DiskThresholdDecider decider = makeDecider(Settings.EMPTY);

final Map<String, DiskUsage> usages = new HashMap<>();
usages.put("node2", new DiskUsage("node2", "n2", "/dev/null", 100, 50)); // 50% used
usages.put("node3", new DiskUsage("node3", "n3", "/dev/null", 100, 0)); // 100% used

DiskUsage node1Usage = decider.averageUsage(rn, usages);
assertThat(node1Usage.getTotalBytes(), equalTo(100L));
assertThat(node1Usage.getFreeBytes(), equalTo(25L));
}

public void testFreeDiskPercentageAfterShardAssigned() {
DiskThresholdDecider decider = makeDecider(Settings.EMPTY);

Expand Down
Loading