Skip to content

Commit

Permalink
[Remote Store] Fix stats reporting for multistream downloads. (#10402)
Browse files Browse the repository at this point in the history
* Fix stats reporting for multistream downloads.

Signed-off-by: Rishikesh1159 <[email protected]>

* rename tracker to fileTransferTracker.

Signed-off-by: Rishikesh1159 <[email protected]>

---------

Signed-off-by: Rishikesh1159 <[email protected]>
  • Loading branch information
Rishikesh1159 committed Oct 5, 2023
1 parent 10bae20 commit a3f8432
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@
import org.opensearch.index.seqno.SequenceNumbers;
import org.opensearch.index.shard.PrimaryReplicaSyncer.ResyncTask;
import org.opensearch.index.similarity.SimilarityService;
import org.opensearch.index.store.DirectoryFileTransferTracker;
import org.opensearch.index.store.RemoteSegmentStoreDirectory;
import org.opensearch.index.store.RemoteSegmentStoreDirectoryFactory;
import org.opensearch.index.store.Store;
Expand Down Expand Up @@ -4929,9 +4930,10 @@ private void downloadSegments(
final Runnable onFileSync
) throws IOException {
final Path indexPath = store.shardPath() == null ? null : store.shardPath().resolveIndex();
final DirectoryFileTransferTracker fileTransferTracker = store.getDirectoryFileTransferTracker();
for (String segment : toDownloadSegments) {
final PlainActionFuture<String> segmentListener = PlainActionFuture.newFuture();
sourceRemoteDirectory.copyTo(segment, storeDirectory, indexPath, segmentListener);
sourceRemoteDirectory.copyTo(segment, storeDirectory, indexPath, fileTransferTracker, segmentListener);
segmentListener.actionGet();
onFileSync.run();
if (targetRemoteDirectory != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -487,30 +487,51 @@ public void copyFrom(Directory from, String src, IOContext context, ActionListen
* @param source The source file name
* @param destinationDirectory The destination directory (if multipart is not supported)
* @param destinationPath The destination path (if multipart is supported)
* @param fileTransferTracker Tracker used for file transfer stats
* @param fileCompletionListener The listener to notify of completion
*/
public void copyTo(String source, Directory destinationDirectory, Path destinationPath, ActionListener<String> fileCompletionListener) {
public void copyTo(
String source,
Directory destinationDirectory,
Path destinationPath,
DirectoryFileTransferTracker fileTransferTracker,
ActionListener<String> fileCompletionListener
) {
final String blobName = getExistingRemoteFilename(source);
if (destinationPath != null && remoteDataDirectory.getBlobContainer() instanceof AsyncMultiStreamBlobContainer) {
long length = 0L;
try {
length = fileLength(source);
} catch (IOException ex) {
logger.error("Unable to fetch segment length for stats tracking", ex);
}
final long fileLength = length;
final long startTime = System.currentTimeMillis();
fileTransferTracker.addTransferredBytesStarted(fileLength);
final AsyncMultiStreamBlobContainer blobContainer = (AsyncMultiStreamBlobContainer) remoteDataDirectory.getBlobContainer();
final Path destinationFilePath = destinationPath.resolve(source);
final ActionListener<String> completionListener = ActionListener.wrap(response -> {
fileTransferTracker.addTransferredBytesSucceeded(fileLength, startTime);
fileCompletionListener.onResponse(response);
}, e -> {
fileTransferTracker.addTransferredBytesFailed(fileLength, startTime);
fileCompletionListener.onFailure(e);
});
final ReadContextListener readContextListener = new ReadContextListener(
blobName,
destinationFilePath,
fileCompletionListener,
completionListener,
threadPool,
remoteDataDirectory.getDownloadRateLimiter(),
recoverySettings.getMaxConcurrentRemoteStoreStreams()
);
blobContainer.readBlobAsync(blobName, readContextListener);
} else {
// Fallback to older mechanism of downloading the file
try {
ActionListener.completeWith(fileCompletionListener, () -> {
destinationDirectory.copyFrom(this, source, source, IOContext.DEFAULT);
fileCompletionListener.onResponse(source);
} catch (IOException e) {
fileCompletionListener.onFailure(e);
}
return source;
});
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.shard.IndexShardState;
import org.opensearch.index.shard.ShardPath;
import org.opensearch.index.store.DirectoryFileTransferTracker;
import org.opensearch.index.store.RemoteSegmentStoreDirectory;
import org.opensearch.index.store.Store;
import org.opensearch.index.store.StoreFileMetadata;
Expand Down Expand Up @@ -121,7 +122,8 @@ public void getSegmentFiles(
assert directoryFiles.contains(file) == false : "Local store already contains the file " + file;
toDownloadSegments.add(fileMetadata);
}
downloadSegments(storeDirectory, remoteDirectory, toDownloadSegments, shardPath, listener);
final DirectoryFileTransferTracker fileTransferTracker = indexShard.store().getDirectoryFileTransferTracker();
downloadSegments(storeDirectory, remoteDirectory, toDownloadSegments, shardPath, fileTransferTracker, listener);
logger.debug("Downloaded segment files from remote store {}", toDownloadSegments);
} finally {
indexShard.store().decRef();
Expand All @@ -138,12 +140,13 @@ private void downloadSegments(
RemoteSegmentStoreDirectory remoteStoreDirectory,
List<StoreFileMetadata> toDownloadSegments,
ShardPath shardPath,
DirectoryFileTransferTracker fileTransferTracker,
ActionListener<GetSegmentFilesResponse> completionListener
) {
final Path indexPath = shardPath == null ? null : shardPath.resolveIndex();
for (StoreFileMetadata storeFileMetadata : toDownloadSegments) {
final PlainActionFuture<String> segmentListener = PlainActionFuture.newFuture();
remoteStoreDirectory.copyTo(storeFileMetadata.name(), storeDirectory, indexPath, segmentListener);
remoteStoreDirectory.copyTo(storeFileMetadata.name(), storeDirectory, indexPath, fileTransferTracker, segmentListener);
segmentListener.actionGet();
}
completionListener.onResponse(new GetSegmentFilesResponse(toDownloadSegments));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -596,10 +596,15 @@ public void onResponse(String unused) {
public void onFailure(Exception e) {}
};
Path path = createTempDir();
remoteSegmentStoreDirectory.copyTo(filename, storeDirectory, path, completionListener);
DirectoryFileTransferTracker directoryFileTransferTracker = new DirectoryFileTransferTracker();
long sourceFileLengthInBytes = remoteSegmentStoreDirectory.fileLength(filename);
remoteSegmentStoreDirectory.copyTo(filename, storeDirectory, path, directoryFileTransferTracker, completionListener);
assertTrue(downloadLatch.await(5000, TimeUnit.SECONDS));
verify(blobContainer, times(1)).readBlobAsync(contains(filename), any());
verify(storeDirectory, times(0)).copyFrom(any(), any(), any(), any());

// Verify stats are updated to DirectoryFileTransferTracker
assertEquals(sourceFileLengthInBytes, directoryFileTransferTracker.getTransferredBytesSucceeded());
}

public void testCopyFilesTo() throws Exception {
Expand All @@ -619,7 +624,7 @@ public void onResponse(String unused) {
public void onFailure(Exception e) {}
};
Path path = createTempDir();
remoteSegmentStoreDirectory.copyTo(filename, storeDirectory, path, completionListener);
remoteSegmentStoreDirectory.copyTo(filename, storeDirectory, path, new DirectoryFileTransferTracker(), completionListener);
assertTrue(downloadLatch.await(5000, TimeUnit.MILLISECONDS));
verify(storeDirectory, times(1)).copyFrom(any(), eq(filename), eq(filename), eq(IOContext.DEFAULT));
}
Expand All @@ -643,7 +648,7 @@ public void onResponse(String unused) {
@Override
public void onFailure(Exception e) {}
};
remoteSegmentStoreDirectory.copyTo(filename, storeDirectory, null, completionListener);
remoteSegmentStoreDirectory.copyTo(filename, storeDirectory, null, new DirectoryFileTransferTracker(), completionListener);
assertTrue(downloadLatch.await(5000, TimeUnit.MILLISECONDS));
verify(storeDirectory, times(1)).copyFrom(any(), eq(filename), eq(filename), eq(IOContext.DEFAULT));
}
Expand All @@ -670,7 +675,7 @@ public void onFailure(Exception e) {
}
};
Path path = createTempDir();
remoteSegmentStoreDirectory.copyTo(filename, storeDirectory, path, completionListener);
remoteSegmentStoreDirectory.copyTo(filename, storeDirectory, path, new DirectoryFileTransferTracker(), completionListener);
assertTrue(downloadLatch.await(5000, TimeUnit.MILLISECONDS));
verify(storeDirectory, times(1)).copyFrom(any(), eq(filename), eq(filename), eq(IOContext.DEFAULT));
}
Expand Down

0 comments on commit a3f8432

Please sign in to comment.