Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Remote Store] Fix stats reporting for multistream downloads. #10402

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@
import org.opensearch.index.seqno.SequenceNumbers;
import org.opensearch.index.shard.PrimaryReplicaSyncer.ResyncTask;
import org.opensearch.index.similarity.SimilarityService;
import org.opensearch.index.store.DirectoryFileTransferTracker;
import org.opensearch.index.store.RemoteSegmentStoreDirectory;
import org.opensearch.index.store.RemoteSegmentStoreDirectoryFactory;
import org.opensearch.index.store.Store;
Expand Down Expand Up @@ -4929,9 +4930,10 @@ private void downloadSegments(
final Runnable onFileSync
) throws IOException {
final Path indexPath = store.shardPath() == null ? null : store.shardPath().resolveIndex();
final DirectoryFileTransferTracker fileTransferTracker = store.getDirectoryFileTransferTracker();
for (String segment : toDownloadSegments) {
final PlainActionFuture<String> segmentListener = PlainActionFuture.newFuture();
sourceRemoteDirectory.copyTo(segment, storeDirectory, indexPath, segmentListener);
sourceRemoteDirectory.copyTo(segment, storeDirectory, indexPath, fileTransferTracker, segmentListener);
segmentListener.actionGet();
onFileSync.run();
if (targetRemoteDirectory != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -487,30 +487,51 @@
* @param source The source file name
* @param destinationDirectory The destination directory (if multipart is not supported)
* @param destinationPath The destination path (if multipart is supported)
* @param fileTransferTracker Tracker used for file transfer stats
* @param fileCompletionListener The listener to notify of completion
kotwanikunal marked this conversation as resolved.
Show resolved Hide resolved
*/
public void copyTo(String source, Directory destinationDirectory, Path destinationPath, ActionListener<String> fileCompletionListener) {
public void copyTo(
String source,
Directory destinationDirectory,
Path destinationPath,
DirectoryFileTransferTracker fileTransferTracker,
ActionListener<String> fileCompletionListener
) {
final String blobName = getExistingRemoteFilename(source);
if (destinationPath != null && remoteDataDirectory.getBlobContainer() instanceof AsyncMultiStreamBlobContainer) {
long length = 0L;
try {
length = fileLength(source);
andrross marked this conversation as resolved.
Show resolved Hide resolved
} catch (IOException ex) {
logger.error("Unable to fetch segment length for stats tracking", ex);

Check warning on line 506 in server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java#L505-L506

Added lines #L505 - L506 were not covered by tests
}
final long fileLength = length;
final long startTime = System.currentTimeMillis();
andrross marked this conversation as resolved.
Show resolved Hide resolved
fileTransferTracker.addTransferredBytesStarted(fileLength);
final AsyncMultiStreamBlobContainer blobContainer = (AsyncMultiStreamBlobContainer) remoteDataDirectory.getBlobContainer();
final Path destinationFilePath = destinationPath.resolve(source);
final ActionListener<String> completionListener = ActionListener.wrap(response -> {
fileTransferTracker.addTransferredBytesSucceeded(fileLength, startTime);
fileCompletionListener.onResponse(response);
}, e -> {
fileTransferTracker.addTransferredBytesFailed(fileLength, startTime);
fileCompletionListener.onFailure(e);
});

Check warning on line 519 in server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java#L517-L519

Added lines #L517 - L519 were not covered by tests
final ReadContextListener readContextListener = new ReadContextListener(
blobName,
destinationFilePath,
fileCompletionListener,
completionListener,
threadPool,
remoteDataDirectory.getDownloadRateLimiter(),
recoverySettings.getMaxConcurrentRemoteStoreStreams()
);
blobContainer.readBlobAsync(blobName, readContextListener);
} else {
// Fallback to older mechanism of downloading the file
try {
ActionListener.completeWith(fileCompletionListener, () -> {
destinationDirectory.copyFrom(this, source, source, IOContext.DEFAULT);
fileCompletionListener.onResponse(source);
} catch (IOException e) {
fileCompletionListener.onFailure(e);
}
return source;
});
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.shard.IndexShardState;
import org.opensearch.index.shard.ShardPath;
import org.opensearch.index.store.DirectoryFileTransferTracker;
import org.opensearch.index.store.RemoteSegmentStoreDirectory;
import org.opensearch.index.store.Store;
import org.opensearch.index.store.StoreFileMetadata;
Expand Down Expand Up @@ -121,7 +122,8 @@ public void getSegmentFiles(
assert directoryFiles.contains(file) == false : "Local store already contains the file " + file;
toDownloadSegments.add(fileMetadata);
}
downloadSegments(storeDirectory, remoteDirectory, toDownloadSegments, shardPath, listener);
final DirectoryFileTransferTracker fileTransferTracker = indexShard.store().getDirectoryFileTransferTracker();
downloadSegments(storeDirectory, remoteDirectory, toDownloadSegments, shardPath, fileTransferTracker, listener);
logger.debug("Downloaded segment files from remote store {}", toDownloadSegments);
} finally {
indexShard.store().decRef();
Expand All @@ -138,12 +140,13 @@ private void downloadSegments(
RemoteSegmentStoreDirectory remoteStoreDirectory,
List<StoreFileMetadata> toDownloadSegments,
ShardPath shardPath,
DirectoryFileTransferTracker fileTransferTracker,
ActionListener<GetSegmentFilesResponse> completionListener
) {
final Path indexPath = shardPath == null ? null : shardPath.resolveIndex();
for (StoreFileMetadata storeFileMetadata : toDownloadSegments) {
final PlainActionFuture<String> segmentListener = PlainActionFuture.newFuture();
remoteStoreDirectory.copyTo(storeFileMetadata.name(), storeDirectory, indexPath, segmentListener);
remoteStoreDirectory.copyTo(storeFileMetadata.name(), storeDirectory, indexPath, fileTransferTracker, segmentListener);
segmentListener.actionGet();
}
completionListener.onResponse(new GetSegmentFilesResponse(toDownloadSegments));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -596,10 +596,15 @@ public void onResponse(String unused) {
public void onFailure(Exception e) {}
};
Path path = createTempDir();
remoteSegmentStoreDirectory.copyTo(filename, storeDirectory, path, completionListener);
DirectoryFileTransferTracker directoryFileTransferTracker = new DirectoryFileTransferTracker();
long sourceFileLengthInBytes = remoteSegmentStoreDirectory.fileLength(filename);
remoteSegmentStoreDirectory.copyTo(filename, storeDirectory, path, directoryFileTransferTracker, completionListener);
assertTrue(downloadLatch.await(5000, TimeUnit.SECONDS));
verify(blobContainer, times(1)).readBlobAsync(contains(filename), any());
verify(storeDirectory, times(0)).copyFrom(any(), any(), any(), any());

// Verify stats are updated to DirectoryFileTransferTracker
assertEquals(sourceFileLengthInBytes, directoryFileTransferTracker.getTransferredBytesSucceeded());
}

public void testCopyFilesTo() throws Exception {
Expand All @@ -619,7 +624,7 @@ public void onResponse(String unused) {
public void onFailure(Exception e) {}
};
Path path = createTempDir();
remoteSegmentStoreDirectory.copyTo(filename, storeDirectory, path, completionListener);
remoteSegmentStoreDirectory.copyTo(filename, storeDirectory, path, new DirectoryFileTransferTracker(), completionListener);
assertTrue(downloadLatch.await(5000, TimeUnit.MILLISECONDS));
verify(storeDirectory, times(1)).copyFrom(any(), eq(filename), eq(filename), eq(IOContext.DEFAULT));
}
Expand All @@ -643,7 +648,7 @@ public void onResponse(String unused) {
@Override
public void onFailure(Exception e) {}
};
remoteSegmentStoreDirectory.copyTo(filename, storeDirectory, null, completionListener);
remoteSegmentStoreDirectory.copyTo(filename, storeDirectory, null, new DirectoryFileTransferTracker(), completionListener);
assertTrue(downloadLatch.await(5000, TimeUnit.MILLISECONDS));
verify(storeDirectory, times(1)).copyFrom(any(), eq(filename), eq(filename), eq(IOContext.DEFAULT));
}
Expand All @@ -670,7 +675,7 @@ public void onFailure(Exception e) {
}
};
Path path = createTempDir();
remoteSegmentStoreDirectory.copyTo(filename, storeDirectory, path, completionListener);
remoteSegmentStoreDirectory.copyTo(filename, storeDirectory, path, new DirectoryFileTransferTracker(), completionListener);
assertTrue(downloadLatch.await(5000, TimeUnit.MILLISECONDS));
verify(storeDirectory, times(1)).copyFrom(any(), eq(filename), eq(filename), eq(IOContext.DEFAULT));
}
Expand Down
Loading