Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Remote Store] Add total upload and download time from remote store to nodes stats #9454

Merged
merged 12 commits into from
Aug 25, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -104,13 +104,13 @@ public void testNodesStatsParityWithOnlyPrimaryShards() {
.setIndices(new CommonStatsFlags().set(CommonStatsFlags.Flag.Segments, true))
.get();
RemoteSegmentStats remoteSegmentStats = nodesStatsResponse.getNodes().get(0).getIndices().getSegments().getRemoteSegmentStats();
assertEquals(cumulativeUploadsSucceeded, remoteSegmentStats.getUploadBytesSucceeded());
assertEquals(cumulativeUploadsStarted, remoteSegmentStats.getUploadBytesStarted());
assertTrue(cumulativeUploadsSucceeded > 0 && cumulativeUploadsSucceeded == remoteSegmentStats.getUploadBytesSucceeded());
assertTrue(cumulativeUploadsStarted > 0 && cumulativeUploadsStarted == remoteSegmentStats.getUploadBytesStarted());
assertEquals(cumulativeUploadsFailed, remoteSegmentStats.getUploadBytesFailed());
assertEquals(totalBytesLag, remoteSegmentStats.getTotalRefreshBytesLag());
assertEquals(maxBytesLag, remoteSegmentStats.getMaxRefreshBytesLag());
assertEquals(maxTimeLag, remoteSegmentStats.getMaxRefreshTimeLag());
assertEquals(totalUploadTime, remoteSegmentStats.getTotalUploadTime());
assertTrue(totalUploadTime > 0 && totalUploadTime == remoteSegmentStats.getTotalUploadTime());
}

/**
Expand Down Expand Up @@ -253,7 +253,15 @@ private static void assertNodeStatsParityAcrossNodes(String firstIndex, String s
assertEquals(totalBytesLag, remoteSegmentStats.getTotalRefreshBytesLag());
assertEquals(maxBytesLag, remoteSegmentStats.getMaxRefreshBytesLag());
assertEquals(maxTimeLag, remoteSegmentStats.getMaxRefreshTimeLag());
// Ensure that total upload time has non-zero value if there has been segments uploaded from the node
if (cumulativeUploadsStarted > 0) {
assertTrue(totalUploadTime > 0);
}
assertEquals(totalUploadTime, remoteSegmentStats.getTotalUploadTime());
// Ensure that total download time has non-zero value if there has been segments downloaded to the node
if (cumulativeDownloadsStarted > 0) {
assertTrue(totalDownloadTime > 0);
}
assertEquals(totalDownloadTime, remoteSegmentStats.getTotalDownloadTime());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ public RemoteSegmentStats(StreamInput in) throws IOException {
This would have to be removed after the new field addition PRs are also backported to 2.x.
If possible we would need to ensure that all field addition PRs are backported at once
*/
if (in.getVersion().onOrAfter(Version.V_3_0_0)) {
if (in.getVersion().onOrAfter(Version.CURRENT)) {
totalRefreshBytesLag = in.readLong();
totalUploadTime = in.readLong();
totalDownloadTime = in.readLong();
Expand Down Expand Up @@ -260,7 +260,7 @@ public void writeTo(StreamOutput out) throws IOException {
This would have to be removed after the new field addition PRs are also backported to 2.x.
If possible we would need to ensure that all field addition PRs are backported at once
*/
shourya035 marked this conversation as resolved.
Show resolved Hide resolved
if (out.getVersion().onOrAfter(Version.V_3_0_0)) {
if (out.getVersion().onOrAfter(Version.CURRENT)) {
out.writeLong(totalRefreshBytesLag);
out.writeLong(totalUploadTime);
out.writeLong(totalDownloadTime);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,17 +96,17 @@
/**
* Cumulative sum of size in bytes of segment files for which upload has started during remote refresh.
*/
private volatile long uploadBytesStarted;
private final AtomicLong uploadBytesStarted = new AtomicLong();

/**
* Cumulative sum of size in bytes of segment files for which upload has failed during remote refresh.
*/
private volatile long uploadBytesFailed;
private final AtomicLong uploadBytesFailed = new AtomicLong();

/**
* Cumulative sum of size in bytes of segment files for which upload has succeeded during remote refresh.
*/
private volatile long uploadBytesSucceeded;
private final AtomicLong uploadBytesSucceeded = new AtomicLong();

/**
* Cumulative sum of count of remote refreshes that have started.
Expand All @@ -126,7 +126,7 @@
/**
* Cumulative sum of time taken in remote refresh (in milliseconds) [Tracked per file]
*/
private volatile long totalUploadTimeInMs;
private AtomicLong totalUploadTimeInMs = new AtomicLong();

/**
* Cumulative sum of rejection counts for this shard.
Expand Down Expand Up @@ -321,31 +321,31 @@
}

public long getUploadBytesStarted() {
return uploadBytesStarted;
return uploadBytesStarted.get();
}

public void addUploadBytesStarted(long size) {
uploadBytesStarted += size;
uploadBytesStarted.getAndAdd(size);
}

public long getUploadBytesFailed() {
return uploadBytesFailed;
return uploadBytesFailed.get();
}

public void addUploadBytesFailed(long size) {
uploadBytesFailed += size;
uploadBytesFailed.getAndAdd(size);
}

public long getUploadBytesSucceeded() {
return uploadBytesSucceeded;
return uploadBytesSucceeded.get();
}

public void addUploadBytesSucceeded(long size) {
uploadBytesSucceeded += size;
uploadBytesSucceeded.getAndAdd(size);
}

public long getInflightUploadBytes() {
return uploadBytesStarted - uploadBytesFailed - uploadBytesSucceeded;
return uploadBytesStarted.get() - uploadBytesFailed.get() - uploadBytesSucceeded.get();
}

public long getTotalUploadsStarted() {
Expand Down Expand Up @@ -531,11 +531,11 @@
}

public void addTotalUploadTimeInMs(long fileUploadTimeInMs) {
this.totalUploadTimeInMs += fileUploadTimeInMs;
this.totalUploadTimeInMs.addAndGet(fileUploadTimeInMs);
}

public long getTotalUploadTimeInMs() {
return totalUploadTimeInMs;
return totalUploadTimeInMs.get();
}

public DirectoryFileTransferTracker getDirectoryFileTransferTracker() {
Expand All @@ -550,9 +550,9 @@
timeMsLag,
localRefreshSeqNo,
remoteRefreshSeqNo,
uploadBytesStarted,
uploadBytesSucceeded,
uploadBytesFailed,
uploadBytesStarted.get(),
uploadBytesSucceeded.get(),
uploadBytesFailed.get(),
totalUploadsStarted,
totalUploadsSucceeded,
totalUploadsFailed,
Expand All @@ -563,7 +563,7 @@
uploadBytesPerSecMovingAverageReference.get().getAverage(),
uploadTimeMsMovingAverageReference.get().getAverage(),
getBytesLag(),
totalUploadTimeInMs,
totalUploadTimeInMs.get(),
directoryFileTransferTracker.stats()
);
}
Expand Down Expand Up @@ -747,7 +747,7 @@
uploadBytesPerSecMovingAverage,
uploadTimeMovingAverage,
bytesLag,
totalUploadTimeInMs,

Check warning on line 750 in server/src/main/java/org/opensearch/index/remote/RemoteSegmentTransferTracker.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/remote/RemoteSegmentTransferTracker.java#L750

Added line #L750 was not covered by tests
directoryFileTransferTrackerStats
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,6 @@
private long primaryTerm;
private volatile Iterator<TimeValue> backoffDelayIterator;
private final SegmentReplicationCheckpointPublisher checkpointPublisher;
private final UploadListener statsListener;

public RemoteStoreRefreshListener(
IndexShard indexShard,
Expand Down Expand Up @@ -117,33 +116,6 @@
this.segmentTracker = segmentTracker;
resetBackOffDelayIterator();
this.checkpointPublisher = checkpointPublisher;
this.statsListener = new UploadListener() {
private long uploadStartTime = 0;

@Override
public void beforeUpload(String file) {
// Start tracking the upload bytes started
segmentTracker.addUploadBytesStarted(segmentTracker.getLatestLocalFileNameLengthMap().get(file));
uploadStartTime = System.currentTimeMillis();
}

@Override
public void onSuccess(String file) {
// Track upload success
segmentTracker.addUploadBytesSucceeded(segmentTracker.getLatestLocalFileNameLengthMap().get(file));
segmentTracker.addToLatestUploadedFiles(file);
long fileUploadTime = System.currentTimeMillis() - uploadStartTime;
// Round off upload time to 1 millisecond
fileUploadTime = fileUploadTime > 0 ? fileUploadTime : 1;
segmentTracker.addTotalUploadTimeInMs(fileUploadTime);
}

@Override
public void onFailure(String file) {
// Track upload failure
segmentTracker.addUploadBytesFailed(segmentTracker.getLatestLocalFileNameLengthMap().get(file));
}
};
}

@Override
Expand Down Expand Up @@ -380,6 +352,32 @@
GroupedActionListener<Void> batchUploadListener = new GroupedActionListener<>(mappedListener, filteredFiles.size());

for (String src : filteredFiles) {
// Initializing listener here to ensure that the stats increment operations are thread-safe
UploadListener statsListener = new UploadListener() {
private long uploadStartTime = 0;

@Override
public void beforeUpload(String file) {
// Start tracking the upload bytes started
segmentTracker.addUploadBytesStarted(segmentTracker.getLatestLocalFileNameLengthMap().get(file));
uploadStartTime = System.currentTimeMillis();
}

@Override
public void onSuccess(String file) {
// Track upload success
segmentTracker.addUploadBytesSucceeded(segmentTracker.getLatestLocalFileNameLengthMap().get(file));
segmentTracker.addToLatestUploadedFiles(file);
segmentTracker.addTotalUploadTimeInMs(Math.max(1, System.currentTimeMillis() - uploadStartTime));
}

@Override
public void onFailure(String file) {
// Track upload failure
segmentTracker.addUploadBytesFailed(segmentTracker.getLatestLocalFileNameLengthMap().get(file));
segmentTracker.addTotalUploadTimeInMs(Math.max(1, System.currentTimeMillis() - uploadStartTime));
}

Check warning on line 379 in server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java#L377-L379

Added lines #L377 - L379 were not covered by tests
shourya035 marked this conversation as resolved.
Show resolved Hide resolved
};
ActionListener<Void> aggregatedListener = ActionListener.wrap(resp -> {
statsListener.onSuccess(src);
batchUploadListener.onResponse(resp);
Expand Down
Loading
Loading