From a3f8432df30c69be3950ca55984032a1b2323f35 Mon Sep 17 00:00:00 2001 From: Rishikesh Pasham <62345295+Rishikesh1159@users.noreply.github.com> Date: Thu, 5 Oct 2023 13:56:09 -0700 Subject: [PATCH] [Remote Store] Fix stats reporting for multistream downloads. (#10402) * Fix stats reporting for multistream downloads. Signed-off-by: Rishikesh1159 * rename tracker to fileTransferTracker. Signed-off-by: Rishikesh1159 --------- Signed-off-by: Rishikesh1159 --- .../opensearch/index/shard/IndexShard.java | 4 ++- .../store/RemoteSegmentStoreDirectory.java | 35 +++++++++++++++---- .../RemoteStoreReplicationSource.java | 7 ++-- .../RemoteSegmentStoreDirectoryTests.java | 13 ++++--- 4 files changed, 45 insertions(+), 14 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 4f08411c19b55..833c91c1766c8 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -160,6 +160,7 @@ import org.opensearch.index.seqno.SequenceNumbers; import org.opensearch.index.shard.PrimaryReplicaSyncer.ResyncTask; import org.opensearch.index.similarity.SimilarityService; +import org.opensearch.index.store.DirectoryFileTransferTracker; import org.opensearch.index.store.RemoteSegmentStoreDirectory; import org.opensearch.index.store.RemoteSegmentStoreDirectoryFactory; import org.opensearch.index.store.Store; @@ -4929,9 +4930,10 @@ private void downloadSegments( final Runnable onFileSync ) throws IOException { final Path indexPath = store.shardPath() == null ? null : store.shardPath().resolveIndex(); + final DirectoryFileTransferTracker fileTransferTracker = store.getDirectoryFileTransferTracker(); for (String segment : toDownloadSegments) { final PlainActionFuture segmentListener = PlainActionFuture.newFuture(); - sourceRemoteDirectory.copyTo(segment, storeDirectory, indexPath, segmentListener); + sourceRemoteDirectory.copyTo(segment, storeDirectory, indexPath, fileTransferTracker, segmentListener); segmentListener.actionGet(); onFileSync.run(); if (targetRemoteDirectory != null) { diff --git a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java index a97b22360716c..a067cb9c5ae61 100644 --- a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java +++ b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java @@ -487,17 +487,40 @@ public void copyFrom(Directory from, String src, IOContext context, ActionListen * @param source The source file name * @param destinationDirectory The destination directory (if multipart is not supported) * @param destinationPath The destination path (if multipart is supported) + * @param fileTransferTracker Tracker used for file transfer stats * @param fileCompletionListener The listener to notify of completion */ - public void copyTo(String source, Directory destinationDirectory, Path destinationPath, ActionListener fileCompletionListener) { + public void copyTo( + String source, + Directory destinationDirectory, + Path destinationPath, + DirectoryFileTransferTracker fileTransferTracker, + ActionListener fileCompletionListener + ) { final String blobName = getExistingRemoteFilename(source); if (destinationPath != null && remoteDataDirectory.getBlobContainer() instanceof AsyncMultiStreamBlobContainer) { + long length = 0L; + try { + length = fileLength(source); + } catch (IOException ex) { + logger.error("Unable to fetch segment length for stats tracking", ex); + } + final long fileLength = length; + final long startTime = System.currentTimeMillis(); + fileTransferTracker.addTransferredBytesStarted(fileLength); final AsyncMultiStreamBlobContainer blobContainer = (AsyncMultiStreamBlobContainer) remoteDataDirectory.getBlobContainer(); final Path destinationFilePath = destinationPath.resolve(source); + final ActionListener completionListener = ActionListener.wrap(response -> { + fileTransferTracker.addTransferredBytesSucceeded(fileLength, startTime); + fileCompletionListener.onResponse(response); + }, e -> { + fileTransferTracker.addTransferredBytesFailed(fileLength, startTime); + fileCompletionListener.onFailure(e); + }); final ReadContextListener readContextListener = new ReadContextListener( blobName, destinationFilePath, - fileCompletionListener, + completionListener, threadPool, remoteDataDirectory.getDownloadRateLimiter(), recoverySettings.getMaxConcurrentRemoteStoreStreams() @@ -505,12 +528,10 @@ public void copyTo(String source, Directory destinationDirectory, Path destinati blobContainer.readBlobAsync(blobName, readContextListener); } else { // Fallback to older mechanism of downloading the file - try { + ActionListener.completeWith(fileCompletionListener, () -> { destinationDirectory.copyFrom(this, source, source, IOContext.DEFAULT); - fileCompletionListener.onResponse(source); - } catch (IOException e) { - fileCompletionListener.onFailure(e); - } + return source; + }); } } diff --git a/server/src/main/java/org/opensearch/indices/replication/RemoteStoreReplicationSource.java b/server/src/main/java/org/opensearch/indices/replication/RemoteStoreReplicationSource.java index e17c5293c38ac..ddbcb86269aa9 100644 --- a/server/src/main/java/org/opensearch/indices/replication/RemoteStoreReplicationSource.java +++ b/server/src/main/java/org/opensearch/indices/replication/RemoteStoreReplicationSource.java @@ -20,6 +20,7 @@ import org.opensearch.index.shard.IndexShard; import org.opensearch.index.shard.IndexShardState; import org.opensearch.index.shard.ShardPath; +import org.opensearch.index.store.DirectoryFileTransferTracker; import org.opensearch.index.store.RemoteSegmentStoreDirectory; import org.opensearch.index.store.Store; import org.opensearch.index.store.StoreFileMetadata; @@ -121,7 +122,8 @@ public void getSegmentFiles( assert directoryFiles.contains(file) == false : "Local store already contains the file " + file; toDownloadSegments.add(fileMetadata); } - downloadSegments(storeDirectory, remoteDirectory, toDownloadSegments, shardPath, listener); + final DirectoryFileTransferTracker fileTransferTracker = indexShard.store().getDirectoryFileTransferTracker(); + downloadSegments(storeDirectory, remoteDirectory, toDownloadSegments, shardPath, fileTransferTracker, listener); logger.debug("Downloaded segment files from remote store {}", toDownloadSegments); } finally { indexShard.store().decRef(); @@ -138,12 +140,13 @@ private void downloadSegments( RemoteSegmentStoreDirectory remoteStoreDirectory, List toDownloadSegments, ShardPath shardPath, + DirectoryFileTransferTracker fileTransferTracker, ActionListener completionListener ) { final Path indexPath = shardPath == null ? null : shardPath.resolveIndex(); for (StoreFileMetadata storeFileMetadata : toDownloadSegments) { final PlainActionFuture segmentListener = PlainActionFuture.newFuture(); - remoteStoreDirectory.copyTo(storeFileMetadata.name(), storeDirectory, indexPath, segmentListener); + remoteStoreDirectory.copyTo(storeFileMetadata.name(), storeDirectory, indexPath, fileTransferTracker, segmentListener); segmentListener.actionGet(); } completionListener.onResponse(new GetSegmentFilesResponse(toDownloadSegments)); diff --git a/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java b/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java index b574ccaac55e1..4a89b3c718f0b 100644 --- a/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java +++ b/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java @@ -596,10 +596,15 @@ public void onResponse(String unused) { public void onFailure(Exception e) {} }; Path path = createTempDir(); - remoteSegmentStoreDirectory.copyTo(filename, storeDirectory, path, completionListener); + DirectoryFileTransferTracker directoryFileTransferTracker = new DirectoryFileTransferTracker(); + long sourceFileLengthInBytes = remoteSegmentStoreDirectory.fileLength(filename); + remoteSegmentStoreDirectory.copyTo(filename, storeDirectory, path, directoryFileTransferTracker, completionListener); assertTrue(downloadLatch.await(5000, TimeUnit.SECONDS)); verify(blobContainer, times(1)).readBlobAsync(contains(filename), any()); verify(storeDirectory, times(0)).copyFrom(any(), any(), any(), any()); + + // Verify stats are updated to DirectoryFileTransferTracker + assertEquals(sourceFileLengthInBytes, directoryFileTransferTracker.getTransferredBytesSucceeded()); } public void testCopyFilesTo() throws Exception { @@ -619,7 +624,7 @@ public void onResponse(String unused) { public void onFailure(Exception e) {} }; Path path = createTempDir(); - remoteSegmentStoreDirectory.copyTo(filename, storeDirectory, path, completionListener); + remoteSegmentStoreDirectory.copyTo(filename, storeDirectory, path, new DirectoryFileTransferTracker(), completionListener); assertTrue(downloadLatch.await(5000, TimeUnit.MILLISECONDS)); verify(storeDirectory, times(1)).copyFrom(any(), eq(filename), eq(filename), eq(IOContext.DEFAULT)); } @@ -643,7 +648,7 @@ public void onResponse(String unused) { @Override public void onFailure(Exception e) {} }; - remoteSegmentStoreDirectory.copyTo(filename, storeDirectory, null, completionListener); + remoteSegmentStoreDirectory.copyTo(filename, storeDirectory, null, new DirectoryFileTransferTracker(), completionListener); assertTrue(downloadLatch.await(5000, TimeUnit.MILLISECONDS)); verify(storeDirectory, times(1)).copyFrom(any(), eq(filename), eq(filename), eq(IOContext.DEFAULT)); } @@ -670,7 +675,7 @@ public void onFailure(Exception e) { } }; Path path = createTempDir(); - remoteSegmentStoreDirectory.copyTo(filename, storeDirectory, path, completionListener); + remoteSegmentStoreDirectory.copyTo(filename, storeDirectory, path, new DirectoryFileTransferTracker(), completionListener); assertTrue(downloadLatch.await(5000, TimeUnit.MILLISECONDS)); verify(storeDirectory, times(1)).copyFrom(any(), eq(filename), eq(filename), eq(IOContext.DEFAULT)); }