Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Remote Store] Add total upload and download time from remote store to nodes stats #9454

Merged
merged 12 commits into from
Aug 25, 2023
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Change http code on create index API with bad input raising NotXContentException from 500 to 400 ([#4773](https://github.com/opensearch-project/OpenSearch/pull/4773))
- Improve summary error message for invalid setting updates ([#4792](https://github.com/opensearch-project/OpenSearch/pull/4792))
- [Remote Store] Add Segment download stats to remotestore stats API ([#8718](https://github.com/opensearch-project/OpenSearch/pull/8718))
- [Remote Store] Add remote segment transfer stats on NodesStats API ([#9168](https://github.com/opensearch-project/OpenSearch/pull/9168) [#9393](https://github.com/opensearch-project/OpenSearch/pull/9393))
- [Remote Store] Add remote segment transfer stats on NodesStats API ([#9168](https://github.com/opensearch-project/OpenSearch/pull/9168) [#9393](https://github.com/opensearch-project/OpenSearch/pull/9393) [#9454](https://github.com/opensearch-project/OpenSearch/pull/9454))
- Return 409 Conflict HTTP status instead of 503 on failure to concurrently execute snapshots ([#8986](https://github.com/opensearch-project/OpenSearch/pull/5855))

### Deprecated
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1457,6 +1457,8 @@ private void assertZeroRemoteSegmentStats(RemoteSegmentStats remoteSegmentStats)
assertEquals(0, remoteSegmentStats.getTotalRefreshBytesLag());
assertEquals(0, remoteSegmentStats.getMaxRefreshBytesLag());
assertEquals(0, remoteSegmentStats.getMaxRefreshTimeLag());
assertEquals(0, remoteSegmentStats.getTotalUploadTime());
assertEquals(0, remoteSegmentStats.getTotalDownloadTime());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ public void testNodesStatsParityWithOnlyPrimaryShards() {
indexSingleDoc(secondIndex, true);

long cumulativeUploadsSucceeded = 0, cumulativeUploadsStarted = 0, cumulativeUploadsFailed = 0;
long total_bytes_lag = 0, max_bytes_lag = 0, max_time_lag = 0;
long totalBytesLag = 0, maxBytesLag = 0, maxTimeLag = 0;
long totalUploadTime = 0;
// Fetch upload stats
RemoteStoreStatsResponse remoteStoreStatsFirstIndex = client(randomDataNode).admin()
.cluster()
Expand All @@ -77,9 +78,10 @@ public void testNodesStatsParityWithOnlyPrimaryShards() {
cumulativeUploadsSucceeded += remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getSegmentStats().uploadBytesSucceeded;
cumulativeUploadsStarted += remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getSegmentStats().uploadBytesStarted;
cumulativeUploadsFailed += remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getSegmentStats().uploadBytesFailed;
total_bytes_lag += remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getSegmentStats().bytesLag;
max_bytes_lag = Math.max(max_bytes_lag, remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getSegmentStats().bytesLag);
max_time_lag = Math.max(max_time_lag, remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getSegmentStats().refreshTimeLagMs);
totalBytesLag += remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getSegmentStats().bytesLag;
maxBytesLag = Math.max(maxBytesLag, remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getSegmentStats().bytesLag);
maxTimeLag = Math.max(maxTimeLag, remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getSegmentStats().refreshTimeLagMs);
totalUploadTime += remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getSegmentStats().totalUploadTimeInMs;

RemoteStoreStatsResponse remoteStoreStatsSecondIndex = client(randomDataNode).admin()
.cluster()
Expand All @@ -90,9 +92,10 @@ public void testNodesStatsParityWithOnlyPrimaryShards() {
cumulativeUploadsSucceeded += remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getSegmentStats().uploadBytesSucceeded;
cumulativeUploadsStarted += remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getSegmentStats().uploadBytesStarted;
cumulativeUploadsFailed += remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getSegmentStats().uploadBytesFailed;
total_bytes_lag += remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getSegmentStats().bytesLag;
max_bytes_lag = Math.max(max_bytes_lag, remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getSegmentStats().bytesLag);
max_time_lag = Math.max(max_time_lag, remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getSegmentStats().refreshTimeLagMs);
totalBytesLag += remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getSegmentStats().bytesLag;
maxBytesLag = Math.max(maxBytesLag, remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getSegmentStats().bytesLag);
maxTimeLag = Math.max(maxTimeLag, remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getSegmentStats().refreshTimeLagMs);
totalUploadTime += remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getSegmentStats().totalUploadTimeInMs;

// Fetch nodes stats
NodesStatsResponse nodesStatsResponse = client().admin()
Expand All @@ -104,9 +107,10 @@ public void testNodesStatsParityWithOnlyPrimaryShards() {
assertEquals(cumulativeUploadsSucceeded, remoteSegmentStats.getUploadBytesSucceeded());
assertEquals(cumulativeUploadsStarted, remoteSegmentStats.getUploadBytesStarted());
assertEquals(cumulativeUploadsFailed, remoteSegmentStats.getUploadBytesFailed());
assertEquals(total_bytes_lag, remoteSegmentStats.getTotalRefreshBytesLag());
assertEquals(max_bytes_lag, remoteSegmentStats.getMaxRefreshBytesLag());
assertEquals(max_time_lag, remoteSegmentStats.getMaxRefreshTimeLag());
assertEquals(totalBytesLag, remoteSegmentStats.getTotalRefreshBytesLag());
assertEquals(maxBytesLag, remoteSegmentStats.getMaxRefreshBytesLag());
assertEquals(maxTimeLag, remoteSegmentStats.getMaxRefreshTimeLag());
assertEquals(totalUploadTime, remoteSegmentStats.getTotalUploadTime());
}

/**
Expand Down Expand Up @@ -180,13 +184,16 @@ private void assertZeroRemoteSegmentStats(RemoteSegmentStats remoteSegmentStats)
assertEquals(0, remoteSegmentStats.getTotalRefreshBytesLag());
assertEquals(0, remoteSegmentStats.getMaxRefreshBytesLag());
assertEquals(0, remoteSegmentStats.getMaxRefreshTimeLag());
assertEquals(0, remoteSegmentStats.getTotalUploadTime());
assertEquals(0, remoteSegmentStats.getTotalDownloadTime());
}

private static void assertNodeStatsParityAcrossNodes(String firstIndex, String secondIndex) {
for (String dataNode : internalCluster().getDataNodeNames()) {
long cumulativeUploadsSucceeded = 0, cumulativeUploadsStarted = 0, cumulativeUploadsFailed = 0;
long cumulativeDownloadsSucceeded = 0, cumulativeDownloadsStarted = 0, cumulativeDownloadsFailed = 0;
long total_bytes_lag = 0, max_bytes_lag = 0, max_time_lag = 0;
long totalBytesLag = 0, maxBytesLag = 0, maxTimeLag = 0;
long totalUploadTime = 0, totalDownloadTime = 0;
// Fetch upload stats
RemoteStoreStatsResponse remoteStoreStatsFirstIndex = client(dataNode).admin()
.cluster()
Expand All @@ -202,9 +209,12 @@ private static void assertNodeStatsParityAcrossNodes(String firstIndex, String s
.getSegmentStats().directoryFileTransferTrackerStats.transferredBytesStarted;
cumulativeDownloadsFailed += remoteStoreStatsFirstIndex.getRemoteStoreStats()[0]
.getSegmentStats().directoryFileTransferTrackerStats.transferredBytesFailed;
total_bytes_lag += remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getSegmentStats().bytesLag;
max_bytes_lag = Math.max(max_bytes_lag, remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getSegmentStats().bytesLag);
max_time_lag = Math.max(max_time_lag, remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getSegmentStats().refreshTimeLagMs);
totalBytesLag += remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getSegmentStats().bytesLag;
maxBytesLag = Math.max(maxBytesLag, remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getSegmentStats().bytesLag);
maxTimeLag = Math.max(maxTimeLag, remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getSegmentStats().refreshTimeLagMs);
totalUploadTime += remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getSegmentStats().totalUploadTimeInMs;
totalDownloadTime += remoteStoreStatsFirstIndex.getRemoteStoreStats()[0]
.getSegmentStats().directoryFileTransferTrackerStats.totalTransferTimeInMs;

RemoteStoreStatsResponse remoteStoreStatsSecondIndex = client(dataNode).admin()
.cluster()
Expand All @@ -220,9 +230,12 @@ private static void assertNodeStatsParityAcrossNodes(String firstIndex, String s
.getSegmentStats().directoryFileTransferTrackerStats.transferredBytesStarted;
cumulativeDownloadsFailed += remoteStoreStatsSecondIndex.getRemoteStoreStats()[0]
.getSegmentStats().directoryFileTransferTrackerStats.transferredBytesFailed;
total_bytes_lag += remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getSegmentStats().bytesLag;
max_bytes_lag = Math.max(max_bytes_lag, remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getSegmentStats().bytesLag);
max_time_lag = Math.max(max_time_lag, remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getSegmentStats().refreshTimeLagMs);
totalBytesLag += remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getSegmentStats().bytesLag;
maxBytesLag = Math.max(maxBytesLag, remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getSegmentStats().bytesLag);
maxTimeLag = Math.max(maxTimeLag, remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getSegmentStats().refreshTimeLagMs);
totalUploadTime += remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getSegmentStats().totalUploadTimeInMs;
totalDownloadTime += remoteStoreStatsSecondIndex.getRemoteStoreStats()[0]
.getSegmentStats().directoryFileTransferTrackerStats.totalTransferTimeInMs;

// Fetch nodes stats
NodesStatsResponse nodesStatsResponse = client().admin()
Expand All @@ -237,9 +250,11 @@ private static void assertNodeStatsParityAcrossNodes(String firstIndex, String s
assertEquals(cumulativeDownloadsSucceeded, remoteSegmentStats.getDownloadBytesSucceeded());
assertEquals(cumulativeDownloadsStarted, remoteSegmentStats.getDownloadBytesStarted());
assertEquals(cumulativeDownloadsFailed, remoteSegmentStats.getDownloadBytesFailed());
assertEquals(total_bytes_lag, remoteSegmentStats.getTotalRefreshBytesLag());
assertEquals(max_bytes_lag, remoteSegmentStats.getMaxRefreshBytesLag());
assertEquals(max_time_lag, remoteSegmentStats.getMaxRefreshTimeLag());
assertEquals(totalBytesLag, remoteSegmentStats.getTotalRefreshBytesLag());
assertEquals(maxBytesLag, remoteSegmentStats.getMaxRefreshBytesLag());
assertEquals(maxTimeLag, remoteSegmentStats.getMaxRefreshTimeLag());
assertEquals(totalUploadTime, remoteSegmentStats.getTotalUploadTime());
assertEquals(totalDownloadTime, remoteSegmentStats.getTotalDownloadTime());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,14 @@ public class RemoteSegmentStats implements Writeable, ToXContentFragment {
* Used to check for data freshness in the remote store
*/
private long totalRefreshBytesLag;
/**
* Total time spent in uploading segments to remote store
*/
private long totalUploadTime;
/**
* Total time spent in downloading segments from remote store
*/
private long totalDownloadTime;

public RemoteSegmentStats() {}

Expand All @@ -91,6 +99,8 @@ public RemoteSegmentStats(StreamInput in) throws IOException {
*/
if (in.getVersion().onOrAfter(Version.V_3_0_0)) {
totalRefreshBytesLag = in.readLong();
totalUploadTime = in.readLong();
totalDownloadTime = in.readLong();
shourya035 marked this conversation as resolved.
Show resolved Hide resolved
}
}

Expand All @@ -115,9 +125,12 @@ public RemoteSegmentStats(RemoteSegmentTransferTracker.Stats trackerStats) {
// Aggregations would be performed on the add method
this.maxRefreshBytesLag = trackerStats.bytesLag;
this.totalRefreshBytesLag = trackerStats.bytesLag;
this.totalUploadTime = trackerStats.totalUploadTimeInMs;
this.totalDownloadTime = trackerStats.directoryFileTransferTrackerStats.totalTransferTimeInMs;
}

// Getter and setters. All are visible for testing
// Setters are only used for testing
public long getUploadBytesStarted() {
return uploadBytesStarted;
}
Expand Down Expand Up @@ -190,6 +203,22 @@ public void addTotalRefreshBytesLag(long totalRefreshBytesLag) {
this.totalRefreshBytesLag += totalRefreshBytesLag;
}

public long getTotalUploadTime() {
return totalUploadTime;
}

public void addTotalUploadTime(long totalUploadTime) {
this.totalUploadTime += totalUploadTime;
}

public long getTotalDownloadTime() {
return totalDownloadTime;
}

public void addTotalDownloadTime(long totalDownloadTime) {
this.totalDownloadTime += totalDownloadTime;
}

shourya035 marked this conversation as resolved.
Show resolved Hide resolved
/**
* Adds existing stats. Used for stats roll-ups at index or node level
*
Expand All @@ -206,6 +235,8 @@ public void add(RemoteSegmentStats existingStats) {
this.maxRefreshTimeLag = Math.max(this.maxRefreshTimeLag, existingStats.getMaxRefreshTimeLag());
this.maxRefreshBytesLag = Math.max(this.maxRefreshBytesLag, existingStats.getMaxRefreshBytesLag());
this.totalRefreshBytesLag += existingStats.getTotalRefreshBytesLag();
this.totalUploadTime += existingStats.getTotalUploadTime();
this.totalDownloadTime += existingStats.getTotalDownloadTime();
}
}

Expand All @@ -231,6 +262,8 @@ public void writeTo(StreamOutput out) throws IOException {
*/
shourya035 marked this conversation as resolved.
Show resolved Hide resolved
if (out.getVersion().onOrAfter(Version.V_3_0_0)) {
out.writeLong(totalRefreshBytesLag);
out.writeLong(totalUploadTime);
out.writeLong(totalDownloadTime);
}
}

Expand Down Expand Up @@ -258,6 +291,7 @@ private void buildUploadStats(XContentBuilder builder) throws IOException {
builder.humanReadableField(Fields.MAX_BYTES, Fields.MAX, new ByteSizeValue(maxRefreshBytesLag));
builder.endObject();
builder.humanReadableField(Fields.MAX_REFRESH_TIME_LAG_IN_MILLIS, Fields.MAX_REFRESH_TIME_LAG, new TimeValue(maxRefreshTimeLag));
builder.humanReadableField(Fields.TOTAL_TIME_SPENT_IN_MILLIS, Fields.TOTAL_TIME_SPENT, new TimeValue(totalUploadTime));
}

private void buildDownloadStats(XContentBuilder builder) throws IOException {
Expand All @@ -266,6 +300,7 @@ private void buildDownloadStats(XContentBuilder builder) throws IOException {
builder.humanReadableField(Fields.SUCCEEDED_BYTES, Fields.SUCCEEDED, new ByteSizeValue(downloadBytesSucceeded));
builder.humanReadableField(Fields.FAILED_BYTES, Fields.FAILED, new ByteSizeValue(downloadBytesFailed));
builder.endObject();
builder.humanReadableField(Fields.TOTAL_TIME_SPENT_IN_MILLIS, Fields.TOTAL_TIME_SPENT, new TimeValue(totalDownloadTime));
}

static final class Fields {
Expand All @@ -287,5 +322,7 @@ static final class Fields {
static final String TOTAL_BYTES = "total_bytes";
static final String MAX = "max";
static final String MAX_BYTES = "max_bytes";
static final String TOTAL_TIME_SPENT = "total_time_spent";
static final String TOTAL_TIME_SPENT_IN_MILLIS = "total_time_spent_in_millis";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,11 @@
*/
private volatile long totalUploadsSucceeded;

/**
* Cumulative sum of time taken in remote refresh (in milliseconds) [Tracked per file]
*/
private volatile long totalUploadTimeInMs;

/**
* Cumulative sum of rejection counts for this shard.
*/
Expand Down Expand Up @@ -508,7 +513,7 @@
return uploadTimeMsMovingAverageReference.get().getAverage();
}

public void addUploadTimeMs(long timeMs) {
public void addTimeForCompletedUploadSync(long timeMs) {
synchronized (uploadTimeMsMutex) {
this.uploadTimeMsMovingAverageReference.get().record(timeMs);
}
Expand All @@ -525,6 +530,14 @@
}
}

public void addTotalUploadTimeInMs(long fileUploadTimeInMs) {
this.totalUploadTimeInMs += fileUploadTimeInMs;
}

public long getTotalUploadTimeInMs() {
return totalUploadTimeInMs;
}
sachinpkale marked this conversation as resolved.
Show resolved Hide resolved

public DirectoryFileTransferTracker getDirectoryFileTransferTracker() {
return directoryFileTransferTracker;
}
Expand All @@ -550,6 +563,7 @@
uploadBytesPerSecMovingAverageReference.get().getAverage(),
uploadTimeMsMovingAverageReference.get().getAverage(),
getBytesLag(),
totalUploadTimeInMs,
directoryFileTransferTracker.stats()
);
}
Expand Down Expand Up @@ -578,6 +592,7 @@
public final long lastSuccessfulRemoteRefreshBytes;
public final double uploadBytesMovingAverage;
public final double uploadBytesPerSecMovingAverage;
public final long totalUploadTimeInMs;
public final double uploadTimeMovingAverage;
public final long bytesLag;
public final DirectoryFileTransferTracker.Stats directoryFileTransferTrackerStats;
Expand All @@ -602,6 +617,7 @@
double uploadBytesPerSecMovingAverage,
double uploadTimeMovingAverage,
long bytesLag,
long totalUploadTimeInMs,
DirectoryFileTransferTracker.Stats directoryFileTransferTrackerStats
) {
this.shardId = shardId;
Expand All @@ -623,6 +639,7 @@
this.uploadBytesPerSecMovingAverage = uploadBytesPerSecMovingAverage;
this.uploadTimeMovingAverage = uploadTimeMovingAverage;
this.bytesLag = bytesLag;
this.totalUploadTimeInMs = totalUploadTimeInMs;
this.directoryFileTransferTrackerStats = directoryFileTransferTrackerStats;
}

Expand All @@ -647,6 +664,7 @@
this.uploadBytesPerSecMovingAverage = in.readDouble();
this.uploadTimeMovingAverage = in.readDouble();
this.bytesLag = in.readLong();
this.totalUploadTimeInMs = in.readLong();
this.directoryFileTransferTrackerStats = in.readOptionalWriteable(DirectoryFileTransferTracker.Stats::new);
} catch (IOException e) {
throw e;
Expand Down Expand Up @@ -674,6 +692,7 @@
out.writeDouble(uploadBytesPerSecMovingAverage);
out.writeDouble(uploadTimeMovingAverage);
out.writeLong(bytesLag);
out.writeLong(totalUploadTimeInMs);
out.writeOptionalWriteable(directoryFileTransferTrackerStats);
}

Expand Down Expand Up @@ -702,6 +721,7 @@
&& Double.compare(this.uploadBytesPerSecMovingAverage, other.uploadBytesPerSecMovingAverage) == 0
&& Double.compare(this.uploadTimeMovingAverage, other.uploadTimeMovingAverage) == 0
&& this.bytesLag == other.bytesLag
&& this.totalUploadTimeInMs == other.totalUploadTimeInMs
&& this.directoryFileTransferTrackerStats.equals(other.directoryFileTransferTrackerStats);
}

Expand All @@ -727,6 +747,7 @@
uploadBytesPerSecMovingAverage,
uploadTimeMovingAverage,
bytesLag,
totalUploadTimeInMs,

Check warning on line 750 in server/src/main/java/org/opensearch/index/remote/RemoteSegmentTransferTracker.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/remote/RemoteSegmentTransferTracker.java#L750

Added line #L750 was not covered by tests
directoryFileTransferTrackerStats
);
}
Expand Down
Loading
Loading