Skip to content

Commit

Permalink
[Storage][DataMovement] Fix perf test cleanup + other minor fixes (#4…
Browse files Browse the repository at this point in the history
  • Loading branch information
jalauzon-msft authored Oct 9, 2024
1 parent a2665e9 commit cc82fce
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@ public abstract class DirectoryTransferTest<TOptions> : PerfTest<TOptions> where
protected BlobServiceClient BlobServiceClient { get; }
protected LocalFilesStorageResourceProvider LocalFileResourceProvider { get; }
protected BlobsStorageResourceProvider BlobResourceProvider { get; }
private TimeSpan _transferTimeout;

public DirectoryTransferTest(TOptions options) : base(options)
{
Random = new Random();
BlobServiceClient = new BlobServiceClient(PerfTestEnvironment.Instance.BlobStorageEndpoint, PerfTestEnvironment.Instance.Credential);
LocalFileResourceProvider = new LocalFilesStorageResourceProvider();
BlobResourceProvider = new BlobsStorageResourceProvider(PerfTestEnvironment.Instance.Credential);
_transferTimeout = TimeSpan.FromSeconds(5 + (Options.Count * Options.Size) / (1 * 1024 * 1024));
}

protected string CreateLocalDirectory(bool populate = false)
Expand Down Expand Up @@ -91,8 +93,24 @@ protected async Task RunAndVerifyTransferAsync(
DataTransfer transfer = await transferManager.StartTransferAsync(
source, destination, options, cancellationToken);

await transfer.WaitForCompletionAsync(cancellationToken);
if (!transfer.TransferStatus.HasCompletedSuccessfully)
// The test runs for a specified duration and then cancels the token.
// When canceled, pause the currently running transfer so it can be cleaned up.
cancellationToken.Register(async () =>
{
// Don't pass cancellation token since its already cancelled.
await transfer.PauseAsync();
});

// The cancellation token we specify for WaitForCompletion should not
// be the one passed to the test as we don't want this code to exit until
// the transfer is complete or paused so it can be properly cleaned up.
// However, we pass a token with a generous time to prevent the transfer
// from hanging forever if there is an issue.
CancellationTokenSource ctx = new(_transferTimeout);
await transfer.WaitForCompletionAsync(ctx.Token);

if (!transfer.TransferStatus.HasCompletedSuccessfully &&
transfer.TransferStatus.State != DataTransferState.Paused)
{
throw new Exception("A failure occurred during the transfer.");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,7 @@ public async virtual Task InvokeFailedArg(Exception ex)
{
if (ex is not OperationCanceledException &&
ex is not TaskCanceledException &&
ex.InnerException is not TaskCanceledException &&
!ex.Message.Contains("The request was canceled."))
{
if (ex is RequestFailedException requestFailedException)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ protected internal override Task<StorageResourceReadStreamResult> ReadStreamAsyn
long? length = default,
CancellationToken cancellationToken = default)
{
CancellationHelper.ThrowIfCancellationRequested(cancellationToken);
FileStream stream = new FileStream(_uri.LocalPath, FileMode.Open, FileAccess.Read);
stream.Position = position;
return Task.FromResult(new StorageResourceReadStreamResult(stream));
Expand Down
38 changes: 20 additions & 18 deletions sdk/storage/Azure.Storage.DataMovement/src/StreamToUriJobPart.cs
Original file line number Diff line number Diff line change
Expand Up @@ -308,8 +308,9 @@ private async Task InitialUploadCall(
StorageResourceReadStreamResult result = await _sourceResource.ReadStreamAsync(
cancellationToken: _cancellationToken).ConfigureAwait(false);

using Stream stream = result.Content;
await _destinationResource.CopyFromStreamAsync(
using (Stream stream = result.Content)
{
await _destinationResource.CopyFromStreamAsync(
stream: stream,
overwrite: _createMode == StorageResourceCreationPreference.OverwriteIfExists,
streamLength: blockSize,
Expand All @@ -319,6 +320,7 @@ await _destinationResource.CopyFromStreamAsync(
SourceProperties = sourceProperties
},
cancellationToken: _cancellationToken).ConfigureAwait(false);
}

// Report bytes written before completion
ReportBytesWritten(blockSize);
Expand All @@ -328,19 +330,19 @@ await _destinationResource.CopyFromStreamAsync(
}
else
{
Stream slicedStream = Stream.Null;
StorageResourceReadStreamResult result = await _sourceResource.ReadStreamAsync(
position: 0,
length: blockSize,
cancellationToken: _cancellationToken).ConfigureAwait(false);
using (Stream stream = result.Content)

using (Stream contentStream = result.Content)
using (Stream slicedStream = await GetOffsetPartitionInternal(
contentStream,
0L,
blockSize,
UploadArrayPool,
_cancellationToken).ConfigureAwait(false))
{
slicedStream = await GetOffsetPartitionInternal(
stream,
0L,
blockSize,
UploadArrayPool,
_cancellationToken).ConfigureAwait(false);
await _destinationResource.CopyFromStreamAsync(
stream: slicedStream,
streamLength: blockSize,
Expand Down Expand Up @@ -394,19 +396,19 @@ internal async Task StageBlockInternal(
{
try
{
Stream slicedStream = Stream.Null;
StorageResourceReadStreamResult result = await _sourceResource.ReadStreamAsync(
position: offset,
length: blockLength,
cancellationToken: _cancellationToken).ConfigureAwait(false);
using (Stream stream = result.Content)

using (Stream contentStream = result.Content)
using (Stream slicedStream = await GetOffsetPartitionInternal(
contentStream,
offset,
blockLength,
UploadArrayPool,
_cancellationToken).ConfigureAwait(false))
{
slicedStream = await GetOffsetPartitionInternal(
stream,
offset,
blockLength,
UploadArrayPool,
_cancellationToken).ConfigureAwait(false);
await _destinationResource.CopyFromStreamAsync(
stream: slicedStream,
streamLength: blockLength,
Expand Down

0 comments on commit cc82fce

Please sign in to comment.