Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Remote Store] Fix stats reporting for multistream downloads. #10402

Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@
import org.opensearch.index.seqno.SequenceNumbers;
import org.opensearch.index.shard.PrimaryReplicaSyncer.ResyncTask;
import org.opensearch.index.similarity.SimilarityService;
import org.opensearch.index.store.DirectoryFileTransferTracker;
import org.opensearch.index.store.RemoteSegmentStoreDirectory;
import org.opensearch.index.store.RemoteSegmentStoreDirectoryFactory;
import org.opensearch.index.store.Store;
Expand Down Expand Up @@ -4929,9 +4930,10 @@ private void downloadSegments(
final Runnable onFileSync
) throws IOException {
final Path indexPath = store.shardPath() == null ? null : store.shardPath().resolveIndex();
final DirectoryFileTransferTracker tracker = store.getDirectoryFileTransferTracker();
for (String segment : toDownloadSegments) {
final PlainActionFuture<String> segmentListener = PlainActionFuture.newFuture();
sourceRemoteDirectory.copyTo(segment, storeDirectory, indexPath, segmentListener);
sourceRemoteDirectory.copyTo(segment, storeDirectory, indexPath, tracker, segmentListener);
segmentListener.actionGet();
onFileSync.run();
if (targetRemoteDirectory != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -489,28 +489,48 @@ public void copyFrom(Directory from, String src, IOContext context, ActionListen
* @param destinationPath The destination path (if multipart is supported)
* @param fileCompletionListener The listener to notify of completion
kotwanikunal marked this conversation as resolved.
Show resolved Hide resolved
*/
public void copyTo(String source, Directory destinationDirectory, Path destinationPath, ActionListener<String> fileCompletionListener) {
public void copyTo(
String source,
Directory destinationDirectory,
Path destinationPath,
DirectoryFileTransferTracker tracker,
kotwanikunal marked this conversation as resolved.
Show resolved Hide resolved
ActionListener<String> fileCompletionListener
) {
final String blobName = getExistingRemoteFilename(source);
if (destinationPath != null && remoteDataDirectory.getBlobContainer() instanceof AsyncMultiStreamBlobContainer) {
long length = 0L;
try {
length = fileLength(source);
andrross marked this conversation as resolved.
Show resolved Hide resolved
} catch (IOException ex) {
logger.error("Unable to fetch segment length for stats tracking", ex);
}
final long fileLength = length;
final long startTime = System.currentTimeMillis();
andrross marked this conversation as resolved.
Show resolved Hide resolved
tracker.addTransferredBytesStarted(fileLength);
final AsyncMultiStreamBlobContainer blobContainer = (AsyncMultiStreamBlobContainer) remoteDataDirectory.getBlobContainer();
final Path destinationFilePath = destinationPath.resolve(source);
final ActionListener<String> completionListener = ActionListener.wrap(response -> {
tracker.addTransferredBytesSucceeded(fileLength, startTime);
fileCompletionListener.onResponse(response);
}, e -> {
tracker.addTransferredBytesFailed(fileLength, startTime);
fileCompletionListener.onFailure(e);
});
final ReadContextListener readContextListener = new ReadContextListener(
blobName,
destinationFilePath,
fileCompletionListener,
completionListener,
threadPool,
remoteDataDirectory.getDownloadRateLimiter(),
recoverySettings.getMaxConcurrentRemoteStoreStreams()
);
blobContainer.readBlobAsync(blobName, readContextListener);
} else {
// Fallback to older mechanism of downloading the file
try {
ActionListener.completeWith(fileCompletionListener, () -> {
destinationDirectory.copyFrom(this, source, source, IOContext.DEFAULT);
fileCompletionListener.onResponse(source);
} catch (IOException e) {
fileCompletionListener.onFailure(e);
}
return source;
});
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.shard.IndexShardState;
import org.opensearch.index.shard.ShardPath;
import org.opensearch.index.store.DirectoryFileTransferTracker;
import org.opensearch.index.store.RemoteSegmentStoreDirectory;
import org.opensearch.index.store.Store;
import org.opensearch.index.store.StoreFileMetadata;
Expand Down Expand Up @@ -121,7 +122,8 @@ public void getSegmentFiles(
assert directoryFiles.contains(file) == false : "Local store already contains the file " + file;
toDownloadSegments.add(fileMetadata);
}
downloadSegments(storeDirectory, remoteDirectory, toDownloadSegments, shardPath, listener);
final DirectoryFileTransferTracker transferTracker = indexShard.store().getDirectoryFileTransferTracker();
downloadSegments(storeDirectory, remoteDirectory, toDownloadSegments, shardPath, transferTracker, listener);
logger.debug("Downloaded segment files from remote store {}", toDownloadSegments);
} finally {
indexShard.store().decRef();
Expand All @@ -138,12 +140,13 @@ private void downloadSegments(
RemoteSegmentStoreDirectory remoteStoreDirectory,
List<StoreFileMetadata> toDownloadSegments,
ShardPath shardPath,
DirectoryFileTransferTracker tracker,
ActionListener<GetSegmentFilesResponse> completionListener
) {
final Path indexPath = shardPath == null ? null : shardPath.resolveIndex();
for (StoreFileMetadata storeFileMetadata : toDownloadSegments) {
final PlainActionFuture<String> segmentListener = PlainActionFuture.newFuture();
remoteStoreDirectory.copyTo(storeFileMetadata.name(), storeDirectory, indexPath, segmentListener);
remoteStoreDirectory.copyTo(storeFileMetadata.name(), storeDirectory, indexPath, tracker, segmentListener);
segmentListener.actionGet();
}
completionListener.onResponse(new GetSegmentFilesResponse(toDownloadSegments));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -596,10 +596,15 @@ public void onResponse(String unused) {
public void onFailure(Exception e) {}
};
Path path = createTempDir();
remoteSegmentStoreDirectory.copyTo(filename, storeDirectory, path, completionListener);
DirectoryFileTransferTracker directoryFileTransferTracker = new DirectoryFileTransferTracker();
long sourceFileLengthInBytes = remoteSegmentStoreDirectory.fileLength(filename);
remoteSegmentStoreDirectory.copyTo(filename, storeDirectory, path, directoryFileTransferTracker, completionListener);
assertTrue(downloadLatch.await(5000, TimeUnit.SECONDS));
verify(blobContainer, times(1)).readBlobAsync(contains(filename), any());
verify(storeDirectory, times(0)).copyFrom(any(), any(), any(), any());

// Verify stats are updated to DirectoryFileTransferTracker
assertEquals(sourceFileLengthInBytes, directoryFileTransferTracker.getTransferredBytesSucceeded());
}

public void testCopyFilesTo() throws Exception {
Expand All @@ -619,7 +624,7 @@ public void onResponse(String unused) {
public void onFailure(Exception e) {}
};
Path path = createTempDir();
remoteSegmentStoreDirectory.copyTo(filename, storeDirectory, path, completionListener);
remoteSegmentStoreDirectory.copyTo(filename, storeDirectory, path, new DirectoryFileTransferTracker(), completionListener);
assertTrue(downloadLatch.await(5000, TimeUnit.MILLISECONDS));
verify(storeDirectory, times(1)).copyFrom(any(), eq(filename), eq(filename), eq(IOContext.DEFAULT));
}
Expand All @@ -643,7 +648,7 @@ public void onResponse(String unused) {
@Override
public void onFailure(Exception e) {}
};
remoteSegmentStoreDirectory.copyTo(filename, storeDirectory, null, completionListener);
remoteSegmentStoreDirectory.copyTo(filename, storeDirectory, null, new DirectoryFileTransferTracker(), completionListener);
assertTrue(downloadLatch.await(5000, TimeUnit.MILLISECONDS));
verify(storeDirectory, times(1)).copyFrom(any(), eq(filename), eq(filename), eq(IOContext.DEFAULT));
}
Expand All @@ -670,7 +675,7 @@ public void onFailure(Exception e) {
}
};
Path path = createTempDir();
remoteSegmentStoreDirectory.copyTo(filename, storeDirectory, path, completionListener);
remoteSegmentStoreDirectory.copyTo(filename, storeDirectory, path, new DirectoryFileTransferTracker(), completionListener);
assertTrue(downloadLatch.await(5000, TimeUnit.MILLISECONDS));
verify(storeDirectory, times(1)).copyFrom(any(), eq(filename), eq(filename), eq(IOContext.DEFAULT));
}
Expand Down
Loading