Skip to content

Commit

Permalink
[Remote Store] Add total upload and download time from remote store t…
Browse files Browse the repository at this point in the history
…o nodes stats (opensearch-project#9454)

---------

Signed-off-by: Shourya Dutta Biswas <[email protected]>
  • Loading branch information
shourya035 authored and kaushalmahi12 committed Sep 12, 2023
1 parent d70e8d0 commit fbb8f91
Show file tree
Hide file tree
Showing 11 changed files with 284 additions and 120 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1457,6 +1457,8 @@ private void assertZeroRemoteSegmentStats(RemoteSegmentStats remoteSegmentStats)
assertEquals(0, remoteSegmentStats.getTotalRefreshBytesLag());
assertEquals(0, remoteSegmentStats.getMaxRefreshBytesLag());
assertEquals(0, remoteSegmentStats.getMaxRefreshTimeLag());
assertEquals(0, remoteSegmentStats.getTotalUploadTime());
assertEquals(0, remoteSegmentStats.getTotalDownloadTime());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ public void testNodesStatsParityWithOnlyPrimaryShards() {
indexSingleDoc(secondIndex, true);

long cumulativeUploadsSucceeded = 0, cumulativeUploadsStarted = 0, cumulativeUploadsFailed = 0;
long total_bytes_lag = 0, max_bytes_lag = 0, max_time_lag = 0;
long totalBytesLag = 0, maxBytesLag = 0, maxTimeLag = 0;
long totalUploadTime = 0;
// Fetch upload stats
RemoteStoreStatsResponse remoteStoreStatsFirstIndex = client(randomDataNode).admin()
.cluster()
Expand All @@ -77,9 +78,10 @@ public void testNodesStatsParityWithOnlyPrimaryShards() {
cumulativeUploadsSucceeded += remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getSegmentStats().uploadBytesSucceeded;
cumulativeUploadsStarted += remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getSegmentStats().uploadBytesStarted;
cumulativeUploadsFailed += remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getSegmentStats().uploadBytesFailed;
total_bytes_lag += remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getSegmentStats().bytesLag;
max_bytes_lag = Math.max(max_bytes_lag, remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getSegmentStats().bytesLag);
max_time_lag = Math.max(max_time_lag, remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getSegmentStats().refreshTimeLagMs);
totalBytesLag += remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getSegmentStats().bytesLag;
maxBytesLag = Math.max(maxBytesLag, remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getSegmentStats().bytesLag);
maxTimeLag = Math.max(maxTimeLag, remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getSegmentStats().refreshTimeLagMs);
totalUploadTime += remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getSegmentStats().totalUploadTimeInMs;

RemoteStoreStatsResponse remoteStoreStatsSecondIndex = client(randomDataNode).admin()
.cluster()
Expand All @@ -90,9 +92,10 @@ public void testNodesStatsParityWithOnlyPrimaryShards() {
cumulativeUploadsSucceeded += remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getSegmentStats().uploadBytesSucceeded;
cumulativeUploadsStarted += remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getSegmentStats().uploadBytesStarted;
cumulativeUploadsFailed += remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getSegmentStats().uploadBytesFailed;
total_bytes_lag += remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getSegmentStats().bytesLag;
max_bytes_lag = Math.max(max_bytes_lag, remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getSegmentStats().bytesLag);
max_time_lag = Math.max(max_time_lag, remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getSegmentStats().refreshTimeLagMs);
totalBytesLag += remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getSegmentStats().bytesLag;
maxBytesLag = Math.max(maxBytesLag, remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getSegmentStats().bytesLag);
maxTimeLag = Math.max(maxTimeLag, remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getSegmentStats().refreshTimeLagMs);
totalUploadTime += remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getSegmentStats().totalUploadTimeInMs;

// Fetch nodes stats
NodesStatsResponse nodesStatsResponse = client().admin()
Expand All @@ -101,12 +104,13 @@ public void testNodesStatsParityWithOnlyPrimaryShards() {
.setIndices(new CommonStatsFlags().set(CommonStatsFlags.Flag.Segments, true))
.get();
RemoteSegmentStats remoteSegmentStats = nodesStatsResponse.getNodes().get(0).getIndices().getSegments().getRemoteSegmentStats();
assertEquals(cumulativeUploadsSucceeded, remoteSegmentStats.getUploadBytesSucceeded());
assertEquals(cumulativeUploadsStarted, remoteSegmentStats.getUploadBytesStarted());
assertTrue(cumulativeUploadsSucceeded > 0 && cumulativeUploadsSucceeded == remoteSegmentStats.getUploadBytesSucceeded());
assertTrue(cumulativeUploadsStarted > 0 && cumulativeUploadsStarted == remoteSegmentStats.getUploadBytesStarted());
assertEquals(cumulativeUploadsFailed, remoteSegmentStats.getUploadBytesFailed());
assertEquals(total_bytes_lag, remoteSegmentStats.getTotalRefreshBytesLag());
assertEquals(max_bytes_lag, remoteSegmentStats.getMaxRefreshBytesLag());
assertEquals(max_time_lag, remoteSegmentStats.getMaxRefreshTimeLag());
assertEquals(totalBytesLag, remoteSegmentStats.getTotalRefreshBytesLag());
assertEquals(maxBytesLag, remoteSegmentStats.getMaxRefreshBytesLag());
assertEquals(maxTimeLag, remoteSegmentStats.getMaxRefreshTimeLag());
assertTrue(totalUploadTime > 0 && totalUploadTime == remoteSegmentStats.getTotalUploadTime());
}

/**
Expand Down Expand Up @@ -180,13 +184,16 @@ private void assertZeroRemoteSegmentStats(RemoteSegmentStats remoteSegmentStats)
assertEquals(0, remoteSegmentStats.getTotalRefreshBytesLag());
assertEquals(0, remoteSegmentStats.getMaxRefreshBytesLag());
assertEquals(0, remoteSegmentStats.getMaxRefreshTimeLag());
assertEquals(0, remoteSegmentStats.getTotalUploadTime());
assertEquals(0, remoteSegmentStats.getTotalDownloadTime());
}

private static void assertNodeStatsParityAcrossNodes(String firstIndex, String secondIndex) {
for (String dataNode : internalCluster().getDataNodeNames()) {
long cumulativeUploadsSucceeded = 0, cumulativeUploadsStarted = 0, cumulativeUploadsFailed = 0;
long cumulativeDownloadsSucceeded = 0, cumulativeDownloadsStarted = 0, cumulativeDownloadsFailed = 0;
long total_bytes_lag = 0, max_bytes_lag = 0, max_time_lag = 0;
long totalBytesLag = 0, maxBytesLag = 0, maxTimeLag = 0;
long totalUploadTime = 0, totalDownloadTime = 0;
// Fetch upload stats
RemoteStoreStatsResponse remoteStoreStatsFirstIndex = client(dataNode).admin()
.cluster()
Expand All @@ -202,9 +209,12 @@ private static void assertNodeStatsParityAcrossNodes(String firstIndex, String s
.getSegmentStats().directoryFileTransferTrackerStats.transferredBytesStarted;
cumulativeDownloadsFailed += remoteStoreStatsFirstIndex.getRemoteStoreStats()[0]
.getSegmentStats().directoryFileTransferTrackerStats.transferredBytesFailed;
total_bytes_lag += remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getSegmentStats().bytesLag;
max_bytes_lag = Math.max(max_bytes_lag, remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getSegmentStats().bytesLag);
max_time_lag = Math.max(max_time_lag, remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getSegmentStats().refreshTimeLagMs);
totalBytesLag += remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getSegmentStats().bytesLag;
maxBytesLag = Math.max(maxBytesLag, remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getSegmentStats().bytesLag);
maxTimeLag = Math.max(maxTimeLag, remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getSegmentStats().refreshTimeLagMs);
totalUploadTime += remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getSegmentStats().totalUploadTimeInMs;
totalDownloadTime += remoteStoreStatsFirstIndex.getRemoteStoreStats()[0]
.getSegmentStats().directoryFileTransferTrackerStats.totalTransferTimeInMs;

RemoteStoreStatsResponse remoteStoreStatsSecondIndex = client(dataNode).admin()
.cluster()
Expand All @@ -220,9 +230,12 @@ private static void assertNodeStatsParityAcrossNodes(String firstIndex, String s
.getSegmentStats().directoryFileTransferTrackerStats.transferredBytesStarted;
cumulativeDownloadsFailed += remoteStoreStatsSecondIndex.getRemoteStoreStats()[0]
.getSegmentStats().directoryFileTransferTrackerStats.transferredBytesFailed;
total_bytes_lag += remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getSegmentStats().bytesLag;
max_bytes_lag = Math.max(max_bytes_lag, remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getSegmentStats().bytesLag);
max_time_lag = Math.max(max_time_lag, remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getSegmentStats().refreshTimeLagMs);
totalBytesLag += remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getSegmentStats().bytesLag;
maxBytesLag = Math.max(maxBytesLag, remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getSegmentStats().bytesLag);
maxTimeLag = Math.max(maxTimeLag, remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getSegmentStats().refreshTimeLagMs);
totalUploadTime += remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getSegmentStats().totalUploadTimeInMs;
totalDownloadTime += remoteStoreStatsSecondIndex.getRemoteStoreStats()[0]
.getSegmentStats().directoryFileTransferTrackerStats.totalTransferTimeInMs;

// Fetch nodes stats
NodesStatsResponse nodesStatsResponse = client().admin()
Expand All @@ -237,9 +250,19 @@ private static void assertNodeStatsParityAcrossNodes(String firstIndex, String s
assertEquals(cumulativeDownloadsSucceeded, remoteSegmentStats.getDownloadBytesSucceeded());
assertEquals(cumulativeDownloadsStarted, remoteSegmentStats.getDownloadBytesStarted());
assertEquals(cumulativeDownloadsFailed, remoteSegmentStats.getDownloadBytesFailed());
assertEquals(total_bytes_lag, remoteSegmentStats.getTotalRefreshBytesLag());
assertEquals(max_bytes_lag, remoteSegmentStats.getMaxRefreshBytesLag());
assertEquals(max_time_lag, remoteSegmentStats.getMaxRefreshTimeLag());
assertEquals(totalBytesLag, remoteSegmentStats.getTotalRefreshBytesLag());
assertEquals(maxBytesLag, remoteSegmentStats.getMaxRefreshBytesLag());
assertEquals(maxTimeLag, remoteSegmentStats.getMaxRefreshTimeLag());
// Ensure that total upload time has non-zero value if there has been segments uploaded from the node
if (cumulativeUploadsStarted > 0) {
assertTrue(totalUploadTime > 0);
}
assertEquals(totalUploadTime, remoteSegmentStats.getTotalUploadTime());
// Ensure that total download time has non-zero value if there has been segments downloaded to the node
if (cumulativeDownloadsStarted > 0) {
assertTrue(totalDownloadTime > 0);
}
assertEquals(totalDownloadTime, remoteSegmentStats.getTotalDownloadTime());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,14 @@ public class RemoteSegmentStats implements Writeable, ToXContentFragment {
* Used to check for data freshness in the remote store
*/
private long totalRefreshBytesLag;
/**
* Total time spent in uploading segments to remote store
*/
private long totalUploadTime;
/**
* Total time spent in downloading segments from remote store
*/
private long totalDownloadTime;

public RemoteSegmentStats() {}

Expand All @@ -89,8 +97,10 @@ public RemoteSegmentStats(StreamInput in) throws IOException {
This would have to be removed after the new field addition PRs are also backported to 2.x.
If possible we would need to ensure that all field addition PRs are backported at once
*/
if (in.getVersion().onOrAfter(Version.V_3_0_0)) {
if (in.getVersion().onOrAfter(Version.CURRENT)) {
totalRefreshBytesLag = in.readLong();
totalUploadTime = in.readLong();
totalDownloadTime = in.readLong();
}
}

Expand All @@ -115,9 +125,12 @@ public RemoteSegmentStats(RemoteSegmentTransferTracker.Stats trackerStats) {
// Aggregations would be performed on the add method
this.maxRefreshBytesLag = trackerStats.bytesLag;
this.totalRefreshBytesLag = trackerStats.bytesLag;
this.totalUploadTime = trackerStats.totalUploadTimeInMs;
this.totalDownloadTime = trackerStats.directoryFileTransferTrackerStats.totalTransferTimeInMs;
}

// Getter and setters. All are visible for testing
// Setters are only used for testing
public long getUploadBytesStarted() {
return uploadBytesStarted;
}
Expand Down Expand Up @@ -190,6 +203,22 @@ public void addTotalRefreshBytesLag(long totalRefreshBytesLag) {
this.totalRefreshBytesLag += totalRefreshBytesLag;
}

public long getTotalUploadTime() {
return totalUploadTime;
}

public void addTotalUploadTime(long totalUploadTime) {
this.totalUploadTime += totalUploadTime;
}

public long getTotalDownloadTime() {
return totalDownloadTime;
}

public void addTotalDownloadTime(long totalDownloadTime) {
this.totalDownloadTime += totalDownloadTime;
}

/**
* Adds existing stats. Used for stats roll-ups at index or node level
*
Expand All @@ -206,6 +235,8 @@ public void add(RemoteSegmentStats existingStats) {
this.maxRefreshTimeLag = Math.max(this.maxRefreshTimeLag, existingStats.getMaxRefreshTimeLag());
this.maxRefreshBytesLag = Math.max(this.maxRefreshBytesLag, existingStats.getMaxRefreshBytesLag());
this.totalRefreshBytesLag += existingStats.getTotalRefreshBytesLag();
this.totalUploadTime += existingStats.getTotalUploadTime();
this.totalDownloadTime += existingStats.getTotalDownloadTime();
}
}

Expand All @@ -229,8 +260,10 @@ public void writeTo(StreamOutput out) throws IOException {
This would have to be removed after the new field addition PRs are also backported to 2.x.
If possible we would need to ensure that all field addition PRs are backported at once
*/
if (out.getVersion().onOrAfter(Version.V_3_0_0)) {
if (out.getVersion().onOrAfter(Version.CURRENT)) {
out.writeLong(totalRefreshBytesLag);
out.writeLong(totalUploadTime);
out.writeLong(totalDownloadTime);
}
}

Expand Down Expand Up @@ -258,6 +291,7 @@ private void buildUploadStats(XContentBuilder builder) throws IOException {
builder.humanReadableField(Fields.MAX_BYTES, Fields.MAX, new ByteSizeValue(maxRefreshBytesLag));
builder.endObject();
builder.humanReadableField(Fields.MAX_REFRESH_TIME_LAG_IN_MILLIS, Fields.MAX_REFRESH_TIME_LAG, new TimeValue(maxRefreshTimeLag));
builder.humanReadableField(Fields.TOTAL_TIME_SPENT_IN_MILLIS, Fields.TOTAL_TIME_SPENT, new TimeValue(totalUploadTime));
}

private void buildDownloadStats(XContentBuilder builder) throws IOException {
Expand All @@ -266,6 +300,7 @@ private void buildDownloadStats(XContentBuilder builder) throws IOException {
builder.humanReadableField(Fields.SUCCEEDED_BYTES, Fields.SUCCEEDED, new ByteSizeValue(downloadBytesSucceeded));
builder.humanReadableField(Fields.FAILED_BYTES, Fields.FAILED, new ByteSizeValue(downloadBytesFailed));
builder.endObject();
builder.humanReadableField(Fields.TOTAL_TIME_SPENT_IN_MILLIS, Fields.TOTAL_TIME_SPENT, new TimeValue(totalDownloadTime));
}

static final class Fields {
Expand All @@ -287,5 +322,7 @@ static final class Fields {
static final String TOTAL_BYTES = "total_bytes";
static final String MAX = "max";
static final String MAX_BYTES = "max_bytes";
static final String TOTAL_TIME_SPENT = "total_time_spent";
static final String TOTAL_TIME_SPENT_IN_MILLIS = "total_time_spent_in_millis";
}
}
Loading

0 comments on commit fbb8f91

Please sign in to comment.