Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ac pr fs stats 2 #18

Open
wants to merge 4 commits into
base: ac-pr-final-1
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ public enum Metric {
FILE_CACHE_STATS("file_cache"),
TASK_CANCELLATION("task_cancellation"),
SEARCH_PIPELINE("search_pipeline"),
NODES_PERFORMANCE_STATS("nodes_performance_stats");
GLOBAL_PERFORMANCE_STATS("performance_stats");

private String metricName;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ protected NodeStats nodeOperation(NodeStatsRequest nodeStatsRequest) {
NodesStatsRequest.Metric.FILE_CACHE_STATS.containedIn(metrics),
NodesStatsRequest.Metric.TASK_CANCELLATION.containedIn(metrics),
NodesStatsRequest.Metric.SEARCH_PIPELINE.containedIn(metrics),
NodesStatsRequest.Metric.NODES_PERFORMANCE_STATS.containedIn(metrics)
NodesStatsRequest.Metric.GLOBAL_PERFORMANCE_STATS.containedIn(metrics)
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -655,6 +655,7 @@ public void apply(Settings value, Settings current, Settings previous) {
// Settings related to admission control
PerformanceTrackerSettings.GLOBAL_CPU_USAGE_AC_WINDOW_DURATION_SETTING,
PerformanceTrackerSettings.GLOBAL_JVM_USAGE_AC_WINDOW_DURATION_SETTING,
PerformanceTrackerSettings.GLOBAL_IO_WINDOW_DURATION_SETTING,

// Settings related to Searchable Snapshots
Node.NODE_SEARCH_CACHE_SIZE_SETTING,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.common.util;

/**
* MovingAverage is used to calculate the moving average of last 'n' observations of double type.
*
* @opensearch.internal
*/
public class DoubleMovingAverage {
private final int windowSize;
private final double[] observations;

private volatile long count = 0;
private volatile double sum = 0.0;
private volatile double average = 0.0;

public DoubleMovingAverage(int windowSize) {
checkWindowSize(windowSize);
this.windowSize = windowSize;
this.observations = new double[windowSize];
}

/**
* Used for changing the window size of {@code MovingAverage}.
*
* @param newWindowSize new window size.
* @return copy of original object with updated size.
*/
public DoubleMovingAverage copyWithSize(int newWindowSize) {
DoubleMovingAverage copy = new DoubleMovingAverage(newWindowSize);
// Start is inclusive, but end is exclusive
long start, end = count;
if (isReady() == false) {
start = 0;
} else {
start = end - windowSize;
}
// If the newWindow Size is smaller than the elements eligible to be copied over, then we adjust the start value
if (end - start > newWindowSize) {
start = end - newWindowSize;
}
for (int i = (int) start; i < end; i++) {
copy.record(observations[i % observations.length]);
}
return copy;
}

private void checkWindowSize(int size) {
if (size <= 0) {
throw new IllegalArgumentException("window size must be greater than zero");
}
}

/**
* Records a new observation and evicts the n-th last observation.
*/
public synchronized double record(double value) {
double delta = value - observations[(int) (count % observations.length)];
observations[(int) (count % observations.length)] = value;

count++;
sum += delta;
average = sum / (double) Math.min(count, observations.length);
return average;
}

public double getAverage() {
return average;
}

public long getCount() {
return count;
}

public boolean isReady() {
return count >= windowSize;
}
}
96 changes: 94 additions & 2 deletions server/src/main/java/org/opensearch/monitor/fs/FsInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@

package org.opensearch.monitor.fs;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.Version;
import org.opensearch.common.Nullable;
import org.opensearch.core.common.io.stream.StreamInput;
Expand All @@ -41,6 +43,7 @@
import org.opensearch.core.xcontent.ToXContentFragment;
import org.opensearch.core.xcontent.ToXContentObject;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.throttling.tracker.AverageCpuUsageTracker;

import java.io.IOException;
import java.util.Arrays;
Expand Down Expand Up @@ -235,6 +238,12 @@ public static class DeviceStats implements Writeable, ToXContentFragment {
final long previousWritesCompleted;
final long currentSectorsWritten;
final long previousSectorsWritten;
final long currentIOTime;
final long previousIOTime;
final double currentReadTime;
final double previousReadTime;
final double currentWriteTime;
final double previousWriteTime;

public DeviceStats(
final int majorDeviceNumber,
Expand All @@ -244,6 +253,9 @@ public DeviceStats(
final long currentSectorsRead,
final long currentWritesCompleted,
final long currentSectorsWritten,
final long currentIOTime,
final double currentReadTime,
final double currentWriteTime,
final DeviceStats previousDeviceStats
) {
this(
Expand All @@ -257,7 +269,13 @@ public DeviceStats(
currentSectorsRead,
previousDeviceStats != null ? previousDeviceStats.currentSectorsRead : -1,
currentWritesCompleted,
previousDeviceStats != null ? previousDeviceStats.currentWritesCompleted : -1
previousDeviceStats != null ? previousDeviceStats.currentWritesCompleted : -1,
currentIOTime,
previousDeviceStats != null ? previousDeviceStats.currentIOTime : -1,
currentReadTime,
previousDeviceStats != null ? previousDeviceStats.previousReadTime : -1.0,
currentWriteTime,
previousDeviceStats != null ? previousDeviceStats.previousWriteTime : -1.0
);
}

Expand All @@ -272,7 +290,13 @@ private DeviceStats(
final long currentSectorsRead,
final long previousSectorsRead,
final long currentWritesCompleted,
final long previousWritesCompleted
final long previousWritesCompleted,
final long currentIOTime,
final long previousIOTime,
final double currentReadTime,
final double previousReadTime,
final double currentWriteTime,
final double previousWriteTime
) {
this.majorDeviceNumber = majorDeviceNumber;
this.minorDeviceNumber = minorDeviceNumber;
Expand All @@ -285,6 +309,12 @@ private DeviceStats(
this.previousSectorsRead = previousSectorsRead;
this.currentSectorsWritten = currentSectorsWritten;
this.previousSectorsWritten = previousSectorsWritten;
this.currentIOTime = currentIOTime;
this.previousIOTime = previousIOTime;
this.currentReadTime = currentReadTime;
this.previousReadTime = previousReadTime;
this.currentWriteTime = currentWriteTime;
this.previousWriteTime = previousWriteTime;
}

public DeviceStats(StreamInput in) throws IOException {
Expand All @@ -299,6 +329,12 @@ public DeviceStats(StreamInput in) throws IOException {
previousSectorsRead = in.readLong();
currentSectorsWritten = in.readLong();
previousSectorsWritten = in.readLong();
currentIOTime = in.readLong();
previousIOTime = in.readLong();
currentReadTime = in.readDouble();
previousReadTime = in.readDouble();
currentWriteTime = in.readDouble();
previousWriteTime = in.readDouble();
}

@Override
Expand All @@ -314,6 +350,11 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeLong(previousSectorsRead);
out.writeLong(currentSectorsWritten);
out.writeLong(previousSectorsWritten);
out.writeLong(currentIOTime);
out.writeLong(previousIOTime);
out.writeDouble(currentReadTime);
out.writeDouble(currentWriteTime);
out.writeDouble(previousWriteTime);
}

public long operations() {
Expand All @@ -334,18 +375,56 @@ public long writeOperations() {
return (currentWritesCompleted - previousWritesCompleted);
}

public long currentReadOperations() {
return currentReadsCompleted;
}

public long currentWriteOpetations() {
return currentWritesCompleted;
}

public long readKilobytes() {
if (previousSectorsRead == -1) return -1;

return (currentSectorsRead - previousSectorsRead) / 2;
}

public long getCurrentReadKilobytes() {
return currentSectorsRead / 2;
}

public long getCurrentWriteKilobytes() {
return currentSectorsWritten / 2;
}

public long writeKilobytes() {
if (previousSectorsWritten == -1) return -1;

return (currentSectorsWritten - previousSectorsWritten) / 2;
}

public long ioTimeInMillis() {
if (previousIOTime == -1) return -1;

return (currentIOTime - previousIOTime);
}

public long getCurrentIOTime() {
return this.currentIOTime;
}

public double getCurrentReadTime() {
return this.currentReadTime;
}

public double getCurrentWriteTime() {
return this.currentWriteTime;
}

public String getDeviceName() {
return this.deviceName;
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.field("device_name", deviceName);
Expand All @@ -354,6 +433,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
builder.field(IoStats.WRITE_OPERATIONS, writeOperations());
builder.field(IoStats.READ_KILOBYTES, readKilobytes());
builder.field(IoStats.WRITE_KILOBYTES, writeKilobytes());
builder.field(IoStats.IO_TIME_MILLIS, ioTimeInMillis());
return builder;
}

Expand All @@ -371,13 +451,15 @@ public static class IoStats implements Writeable, ToXContentFragment {
private static final String WRITE_OPERATIONS = "write_operations";
private static final String READ_KILOBYTES = "read_kilobytes";
private static final String WRITE_KILOBYTES = "write_kilobytes";
private static final String IO_TIME_MILLIS = "io_time_in_millis";

final DeviceStats[] devicesStats;
final long totalOperations;
final long totalReadOperations;
final long totalWriteOperations;
final long totalReadKilobytes;
final long totalWriteKilobytes;
final long totalIOTimeInMillis;

public IoStats(final DeviceStats[] devicesStats) {
this.devicesStats = devicesStats;
Expand All @@ -386,18 +468,21 @@ public IoStats(final DeviceStats[] devicesStats) {
long totalWriteOperations = 0;
long totalReadKilobytes = 0;
long totalWriteKilobytes = 0;
long totalIOTimeInMillis = 0;
for (DeviceStats deviceStats : devicesStats) {
totalOperations += deviceStats.operations() != -1 ? deviceStats.operations() : 0;
totalReadOperations += deviceStats.readOperations() != -1 ? deviceStats.readOperations() : 0;
totalWriteOperations += deviceStats.writeOperations() != -1 ? deviceStats.writeOperations() : 0;
totalReadKilobytes += deviceStats.readKilobytes() != -1 ? deviceStats.readKilobytes() : 0;
totalWriteKilobytes += deviceStats.writeKilobytes() != -1 ? deviceStats.writeKilobytes() : 0;
totalIOTimeInMillis += deviceStats.ioTimeInMillis() != -1 ? deviceStats.ioTimeInMillis() : 0;
}
this.totalOperations = totalOperations;
this.totalReadOperations = totalReadOperations;
this.totalWriteOperations = totalWriteOperations;
this.totalReadKilobytes = totalReadKilobytes;
this.totalWriteKilobytes = totalWriteKilobytes;
this.totalIOTimeInMillis = totalIOTimeInMillis;
}

public IoStats(StreamInput in) throws IOException {
Expand All @@ -412,6 +497,7 @@ public IoStats(StreamInput in) throws IOException {
this.totalWriteOperations = in.readLong();
this.totalReadKilobytes = in.readLong();
this.totalWriteKilobytes = in.readLong();
this.totalIOTimeInMillis = in.readLong();
}

@Override
Expand All @@ -425,6 +511,7 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeLong(totalWriteOperations);
out.writeLong(totalReadKilobytes);
out.writeLong(totalWriteKilobytes);
out.writeLong(totalIOTimeInMillis);
}

public DeviceStats[] getDevicesStats() {
Expand All @@ -451,6 +538,10 @@ public long getTotalWriteKilobytes() {
return totalWriteKilobytes;
}

public long getTotalIOTimeMillis() {
return totalIOTimeInMillis;
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
if (devicesStats.length > 0) {
Expand All @@ -468,6 +559,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
builder.field(WRITE_OPERATIONS, totalWriteOperations);
builder.field(READ_KILOBYTES, totalReadKilobytes);
builder.field(WRITE_KILOBYTES, totalWriteKilobytes);
builder.field(IO_TIME_MILLIS, totalIOTimeInMillis);
builder.endObject();
}
return builder;
Expand Down
6 changes: 6 additions & 0 deletions server/src/main/java/org/opensearch/monitor/fs/FsProbe.java
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,9 @@ final FsInfo.IoStats ioStats(final Set<Tuple<Integer, Integer>> devicesNumbers,
final long sectorsRead = Long.parseLong(fields[5]);
final long writesCompleted = Long.parseLong(fields[7]);
final long sectorsWritten = Long.parseLong(fields[9]);
final double readTime = Double.parseDouble(fields[6]);
final double writeTime = Double.parseDouble(fields[10]);
final long ioTime = Long.parseLong(fields[12]);
final FsInfo.DeviceStats deviceStats = new FsInfo.DeviceStats(
majorDeviceNumber,
minorDeviceNumber,
Expand All @@ -131,6 +134,9 @@ final FsInfo.IoStats ioStats(final Set<Tuple<Integer, Integer>> devicesNumbers,
sectorsRead,
writesCompleted,
sectorsWritten,
ioTime,
readTime,
writeTime,
deviceMap.get(Tuple.tuple(majorDeviceNumber, minorDeviceNumber))
);
devicesStats.add(deviceStats);
Expand Down
3 changes: 2 additions & 1 deletion server/src/main/java/org/opensearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -795,7 +795,8 @@ protected Node(
performanceCollectorService,
threadPool,
settings,
clusterService.getClusterSettings()
clusterService.getClusterSettings(),
monitorService.fsService()
);

final AliasValidator aliasValidator = new AliasValidator();
Expand Down
Loading