From b7993ae26c3445534bbf3bc8506c06ff3396c345 Mon Sep 17 00:00:00 2001 From: Jocelyn Schreppler Date: Thu, 19 Oct 2023 10:57:09 -0400 Subject: [PATCH 1/5] block --- .../tests/StartTransferUploadTests.cs | 330 +++++++++++------- 1 file changed, 196 insertions(+), 134 deletions(-) diff --git a/sdk/storage/Azure.Storage.DataMovement/tests/StartTransferUploadTests.cs b/sdk/storage/Azure.Storage.DataMovement/tests/StartTransferUploadTests.cs index f71c78d0961ea..75817d6dd393d 100644 --- a/sdk/storage/Azure.Storage.DataMovement/tests/StartTransferUploadTests.cs +++ b/sdk/storage/Azure.Storage.DataMovement/tests/StartTransferUploadTests.cs @@ -4,6 +4,7 @@ extern alias DMBlobs; using System; using System.Collections.Generic; +using System.Drawing; using System.IO; using System.Linq; using System.Threading; @@ -15,6 +16,7 @@ using Azure.Storage.Blobs.Tests; using DMBlobs::Azure.Storage.DataMovement.Blobs; using Microsoft.CodeAnalysis; +using Microsoft.Extensions.Options; using NUnit.Framework; namespace Azure.Storage.DataMovement.Tests @@ -66,87 +68,115 @@ internal DataTransferOptions CopySingleUploadOptions(DataTransferOptions options /// /// /// - private async Task UploadBlockBlobsAndVerify( - BlobContainerClient container, - long size = Constants.KB, - int waitTimeInSec = 30, + //private async Task UploadBlockBlobsAndVerify( + // BlobContainerClient container, + // long size = Constants.KB, + // int waitTimeInSec = 30, + // TransferManagerOptions transferManagerOptions = default, + // int blobCount = 1, + // List blobNames = default, + // List options = default) + //{ + // using DisposingLocalDirectory testDirectory = DisposingLocalDirectory.GetTestDirectory(); + + // // Populate blobNames list for number of blobs to be created + // if (blobNames == default || blobNames?.Count == 0) + // { + // blobNames ??= new List(); + // for (int i = 0; i < blobCount; i++) + // { + // blobNames.Add(GetNewBlobName()); + // } + // } + // else + // { + // // If blobNames is popluated make sure these number of blobs match + // Assert.AreEqual(blobCount, blobNames.Count); + // } + + // // Populate Options and TestRaisedOptions + // List eventRaisedList = TestEventsRaised.PopulateTestOptions(blobCount, ref options); + + // transferManagerOptions ??= new TransferManagerOptions() + // { + // ErrorHandling = DataTransferErrorMode.ContinueOnFailure + // }; + + // List uploadedBlobInfo = new List(blobCount); + + // // Initialize TransferManager + // TransferManager transferManager = new TransferManager(transferManagerOptions); + + // // Set up blob to upload + // for (int i = 0; i < blobCount; i++) + // { + // using Stream originalStream = await CreateLimitedMemoryStream(size); + // string localSourceFile = Path.Combine(testDirectory.DirectoryPath, GetNewBlobName()); + // // create a new file and copy contents of stream into it, and then close the FileStream + // // so the StagedUploadAsync call is not prevented from reading using its FileStream. + // using (FileStream fileStream = File.OpenWrite(localSourceFile)) + // { + // await originalStream.CopyToAsync(fileStream); + // } + + // // Set up destination client + // BlockBlobClient destClient = container.GetBlockBlobClient(blobNames[i]); + // StorageResourceItem destinationResource = new BlockBlobStorageResource(destClient); + + // // Act + // StorageResourceItem sourceResource = new LocalFileStorageResource(localSourceFile); + // DataTransfer transfer = await transferManager.StartTransferAsync(sourceResource, destinationResource, options[i]); + + // uploadedBlobInfo.Add(new VerifyUploadBlobContentInfo( + // sourceFile: localSourceFile, + // destinationClient: destClient, + // eventsRaised: eventRaisedList[i], + // dataTransfer: transfer)); + // } + + // for (int i = 0; i < blobCount; i++) + // { + // // Assert + // Assert.NotNull(uploadedBlobInfo[i].DataTransfer); + // CancellationTokenSource tokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(waitTimeInSec)); + // await uploadedBlobInfo[i].DataTransfer.WaitForCompletionAsync(tokenSource.Token); + // Assert.IsTrue(uploadedBlobInfo[i].DataTransfer.HasCompleted); + + // // Verify Upload + // await uploadedBlobInfo[i].EventsRaised.AssertSingleCompletedCheck(); + // using (FileStream fileStream = File.OpenRead(uploadedBlobInfo[i].LocalPath)) + // { + // await DownloadAndAssertAsync(fileStream, uploadedBlobInfo[i].DestinationClient); + // } + // } + //} + + private async Task UploadBlockBlobAndVerify( + string filePath, + BlockBlobClient blob, + int size = Constants.KB, TransferManagerOptions transferManagerOptions = default, - int blobCount = 1, - List blobNames = default, - List options = default) + DataTransferOptions options = default, + CancellationToken cancellationToken = default) { - using DisposingLocalDirectory testDirectory = DisposingLocalDirectory.GetTestDirectory(); - - // Populate blobNames list for number of blobs to be created - if (blobNames == default || blobNames?.Count == 0) + using (Stream fs = File.OpenWrite(filePath)) { - blobNames ??= new List(); - for (int i = 0; i < blobCount; i++) - { - blobNames.Add(GetNewBlobName()); - } - } - else - { - // If blobNames is popluated make sure these number of blobs match - Assert.AreEqual(blobCount, blobNames.Count); + await new MemoryStream(GetRandomBuffer(size)).CopyToAsync(fs); } - // Populate Options and TestRaisedOptions - List eventRaisedList = TestEventsRaised.PopulateTestOptions(blobCount, ref options); + LocalFileStorageResource sourceResource = new(filePath); + BlockBlobStorageResource destResource = new(blob); - transferManagerOptions ??= new TransferManagerOptions() + await new TransferValidator() { - ErrorHandling = DataTransferErrorMode.ContinueOnFailure - }; - - List uploadedBlobInfo = new List(blobCount); - - // Initialize TransferManager - TransferManager transferManager = new TransferManager(transferManagerOptions); - - // Set up blob to upload - for (int i = 0; i < blobCount; i++) - { - using Stream originalStream = await CreateLimitedMemoryStream(size); - string localSourceFile = Path.Combine(testDirectory.DirectoryPath, GetNewBlobName()); - // create a new file and copy contents of stream into it, and then close the FileStream - // so the StagedUploadAsync call is not prevented from reading using its FileStream. - using (FileStream fileStream = File.OpenWrite(localSourceFile)) - { - await originalStream.CopyToAsync(fileStream); - } - - // Set up destination client - BlockBlobClient destClient = container.GetBlockBlobClient(blobNames[i]); - StorageResourceItem destinationResource = new BlockBlobStorageResource(destClient); - - // Act - StorageResourceItem sourceResource = new LocalFileStorageResource(localSourceFile); - DataTransfer transfer = await transferManager.StartTransferAsync(sourceResource, destinationResource, options[i]); - - uploadedBlobInfo.Add(new VerifyUploadBlobContentInfo( - sourceFile: localSourceFile, - destinationClient: destClient, - eventsRaised: eventRaisedList[i], - dataTransfer: transfer)); - } - - for (int i = 0; i < blobCount; i++) - { - // Assert - Assert.NotNull(uploadedBlobInfo[i].DataTransfer); - CancellationTokenSource tokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(waitTimeInSec)); - await uploadedBlobInfo[i].DataTransfer.WaitForCompletionAsync(tokenSource.Token); - Assert.IsTrue(uploadedBlobInfo[i].DataTransfer.HasCompleted); - - // Verify Upload - await uploadedBlobInfo[i].EventsRaised.AssertSingleCompletedCheck(); - using (FileStream fileStream = File.OpenRead(uploadedBlobInfo[i].LocalPath)) - { - await DownloadAndAssertAsync(fileStream, uploadedBlobInfo[i].DestinationClient); - } - } + TransferManager = new TransferManager(transferManagerOptions) + }.TransferAndVerifyAsync( + sourceResource, + destResource, + async cToken => await blob.OpenReadAsync(cancellationToken: cToken), + cToken => Task.FromResult(File.OpenRead(filePath) as Stream), + options, + cancellationToken); } [RecordedTest] @@ -156,7 +186,9 @@ public async Task LocalToBlockBlob() await using DisposingContainer testContainer = await GetTestContainerAsync(); using DisposingLocalDirectory localDirectory = DisposingLocalDirectory.GetTestDirectory(); - await UploadBlockBlobsAndVerify(testContainer.Container); + await UploadBlockBlobAndVerify( + Path.Combine(localDirectory.DirectoryPath, GetNewBlobName()), + testContainer.Container.GetBlockBlobClient(GetNewBlobName())); } [RecordedTest] @@ -178,12 +210,11 @@ public async Task LocalToBlockBlob_EventHandler() // Arrange await using DisposingContainer testContainer = await GetTestContainerAsync(); - - List optionsList = new List() { options }; - await UploadBlockBlobsAndVerify( - container: testContainer.Container, - blobCount: optionsList.Count, - options: optionsList); + using DisposingLocalDirectory localDirectory = DisposingLocalDirectory.GetTestDirectory(); + await UploadBlockBlobAndVerify( + Path.Combine(localDirectory.DirectoryPath, GetNewBlobName()), + testContainer.Container.GetBlockBlobClient(GetNewBlobName()), + options: options); // Assert Assert.IsTrue(progressSeen); @@ -192,7 +223,7 @@ await UploadBlockBlobsAndVerify( [RecordedTest] public async Task LocalToBlockBlobSize_SmallChunk() { - long fileSize = Constants.KB; + int fileSize = Constants.KB; int waitTimeInSec = 25; DataTransferOptions options = new DataTransferOptions() { @@ -202,13 +233,16 @@ public async Task LocalToBlockBlobSize_SmallChunk() // Arrange await using DisposingContainer testContainer = await GetTestContainerAsync(); - List optionsList = new List() { options }; + using DisposingLocalDirectory localDirectory = DisposingLocalDirectory.GetTestDirectory(); - await UploadBlockBlobsAndVerify( + CancellationTokenSource cts = new(); + cts.CancelAfter(TimeSpan.FromSeconds(waitTimeInSec)); + await UploadBlockBlobAndVerify( + Path.Combine(localDirectory.DirectoryPath, GetNewBlobName()), + testContainer.Container.GetBlockBlobClient(GetNewBlobName()), size: fileSize, - waitTimeInSec: waitTimeInSec, - container: testContainer.Container, - options: optionsList); + options: options, + cancellationToken: cts.Token); } [RecordedTest] @@ -217,12 +251,13 @@ public async Task LocalToBlockBlob_Overwrite_Exists() // Arrange // Create source local file for checking, and source blob await using DisposingContainer testContainer = await GetTestContainerAsync(); - string blobName = GetNewBlobName(); - string localSourceFile = Path.GetTempFileName(); + using DisposingLocalDirectory localDirectory = DisposingLocalDirectory.GetTestDirectory(); int size = Constants.KB; int waitTimeInSec = 10; + // Create blob - BlockBlobClient destClient = await CreateBlockBlob(testContainer.Container, localSourceFile, blobName, size); + BlockBlobClient destClient = testContainer.Container.GetBlockBlobClient(GetNewBlobName()); + await destClient.UploadAsync(new MemoryStream(GetRandomBuffer(size))); // Act // Create options bag to overwrite any existing destination. @@ -230,16 +265,15 @@ public async Task LocalToBlockBlob_Overwrite_Exists() { CreationPreference = StorageResourceCreationPreference.OverwriteIfExists, }; - List optionsList = new List() { options }; - List blobNames = new List() { blobName }; - // Start transfer and await for completion. - await UploadBlockBlobsAndVerify( - container: testContainer.Container, + CancellationTokenSource cts = new(); + cts.CancelAfter(TimeSpan.FromSeconds(waitTimeInSec)); + await UploadBlockBlobAndVerify( + Path.Combine(localDirectory.DirectoryPath, GetNewBlobName()), + destClient, size: size, - waitTimeInSec: waitTimeInSec, - blobNames: blobNames, - options: optionsList); + options: options, + cancellationToken: cts.Token); } [RecordedTest] @@ -248,6 +282,7 @@ public async Task LocalToBlockBlob_Overwrite_NotExists() // Arrange // Create source local file for checking, and source blob await using DisposingContainer testContainer = await GetTestContainerAsync(); + using DisposingLocalDirectory localDirectory = DisposingLocalDirectory.GetTestDirectory(); int size = Constants.KB; int waitTimeInSec = 10; @@ -257,14 +292,15 @@ public async Task LocalToBlockBlob_Overwrite_NotExists() { CreationPreference = StorageResourceCreationPreference.OverwriteIfExists, }; - List optionsList = new List() { options }; - // Start transfer and await for completion. - await UploadBlockBlobsAndVerify( - container: testContainer.Container, + CancellationTokenSource cts = new(); + cts.CancelAfter(TimeSpan.FromSeconds(waitTimeInSec)); + await UploadBlockBlobAndVerify( + Path.Combine(localDirectory.DirectoryPath, GetNewBlobName()), + testContainer.Container.GetBlockBlobClient(GetNewBlobName()), size: size, - waitTimeInSec: waitTimeInSec, - options: optionsList); + options: options, + cancellationToken: cts.Token); } [RecordedTest] @@ -375,11 +411,14 @@ public async Task LocalToBlockBlob_SmallSize(long fileSize, int waitTimeInSec) { // Arrange await using DisposingContainer testContainer = await GetTestContainerAsync(); + using DisposingLocalDirectory localDirectory = DisposingLocalDirectory.GetTestDirectory(); - await UploadBlockBlobsAndVerify( - container: testContainer.Container, - size: fileSize, - waitTimeInSec: waitTimeInSec); + CancellationTokenSource cts = new(); + cts.CancelAfter(TimeSpan.FromSeconds(waitTimeInSec)); + await UploadBlockBlobAndVerify( + Path.Combine(localDirectory.DirectoryPath, GetNewBlobName()), + testContainer.Container.GetBlockBlobClient(GetNewBlobName()), + cancellationToken: cts.Token); } [Ignore("These tests currently take 40+ mins for little additional coverage")] @@ -389,15 +428,19 @@ await UploadBlockBlobsAndVerify( [TestCase(500 * Constants.MB, 200)] [TestCase(700 * Constants.MB, 200)] [TestCase(Constants.GB, 1500)] - public async Task LocalToBlockBlob_LargeSize(long fileSize, int waitTimeInSec) + public async Task LocalToBlockBlob_LargeSize(int fileSize, int waitTimeInSec) { // Arrange await using DisposingContainer testContainer = await GetTestContainerAsync(); + using DisposingLocalDirectory localDirectory = DisposingLocalDirectory.GetTestDirectory(); - await UploadBlockBlobsAndVerify( - container: testContainer.Container, + CancellationTokenSource cts = new(); + cts.CancelAfter(TimeSpan.FromSeconds(waitTimeInSec)); + await UploadBlockBlobAndVerify( + Path.Combine(localDirectory.DirectoryPath, GetNewBlobName()), + testContainer.Container.GetBlockBlobClient(GetNewBlobName()), size: fileSize, - waitTimeInSec: waitTimeInSec); + cancellationToken: cts.Token); } [RecordedTest] @@ -406,10 +449,11 @@ await UploadBlockBlobsAndVerify( [TestCase(1, 4 * Constants.KB, 60)] [TestCase(2, 4 * Constants.KB, 60)] [TestCase(4, 16 * Constants.KB, 60)] - public async Task LocalToBlockBlob_SmallConcurrency(int concurrency, long size, int waitTimeInSec) + public async Task LocalToBlockBlob_SmallConcurrency(int concurrency, int size, int waitTimeInSec) { // Arrange await using DisposingContainer testContainer = await GetTestContainerAsync(); + using DisposingLocalDirectory localDirectory = DisposingLocalDirectory.GetTestDirectory(); TransferManagerOptions managerOptions = new TransferManagerOptions() { @@ -422,14 +466,16 @@ public async Task LocalToBlockBlob_SmallConcurrency(int concurrency, long size, InitialTransferSize = 512, MaximumTransferChunkSize = 512, }; - List optionsList = new List { options }; - await UploadBlockBlobsAndVerify( - container: testContainer.Container, + CancellationTokenSource cts = new(); + cts.CancelAfter(TimeSpan.FromSeconds(waitTimeInSec)); + await UploadBlockBlobAndVerify( + Path.Combine(localDirectory.DirectoryPath, GetNewBlobName()), + testContainer.Container.GetBlockBlobClient(GetNewBlobName()), size: size, - waitTimeInSec: waitTimeInSec, transferManagerOptions: managerOptions, - options: optionsList); + options: options, + cancellationToken: cts.Token); } [Ignore("These tests currently take 40+ mins for little additional coverage")] @@ -444,6 +490,7 @@ public async Task LocalToBlockBlob_LargeConcurrency(int concurrency, int size, i { // Arrange await using DisposingContainer testContainer = await GetTestContainerAsync(); + using DisposingLocalDirectory localDirectory = DisposingLocalDirectory.GetTestDirectory(); TransferManagerOptions managerOptions = new TransferManagerOptions() { @@ -451,11 +498,14 @@ public async Task LocalToBlockBlob_LargeConcurrency(int concurrency, int size, i MaximumConcurrency = concurrency, }; - await UploadBlockBlobsAndVerify( - container: testContainer.Container, + CancellationTokenSource cts = new(); + cts.CancelAfter(TimeSpan.FromSeconds(waitTimeInSec)); + await UploadBlockBlobAndVerify( + Path.Combine(localDirectory.DirectoryPath, GetNewBlobName()), + testContainer.Container.GetBlockBlobClient(GetNewBlobName()), size: size, - waitTimeInSec: waitTimeInSec, - transferManagerOptions: managerOptions); + transferManagerOptions: managerOptions, + cancellationToken: cts.Token); } [Ignore("https://github.com/Azure/azure-sdk-for-net/issues/33003")] @@ -467,16 +517,22 @@ await UploadBlockBlobsAndVerify( [TestCase(32, Constants.KB, 30)] [TestCase(2, 2 * Constants.KB, 30)] [TestCase(6, 2 * Constants.KB, 30)] - public async Task LocalToBlockBlob_SmallMultiple(int blobCount, long fileSize, int waitTimeInSec) + public async Task LocalToBlockBlob_SmallMultiple(int blobCount, int fileSize, int waitTimeInSec) { // Arrange await using DisposingContainer testContainer = await GetTestContainerAsync(); + using DisposingLocalDirectory localDirectory = DisposingLocalDirectory.GetTestDirectory(); - await UploadBlockBlobsAndVerify( - container: testContainer.Container, - size: fileSize, - waitTimeInSec: waitTimeInSec, - blobCount: blobCount); + foreach (var _ in Enumerable.Range(0, blobCount)) + { + CancellationTokenSource cts = new(); + cts.CancelAfter(TimeSpan.FromSeconds(waitTimeInSec)); + await UploadBlockBlobAndVerify( + Path.Combine(localDirectory.DirectoryPath, GetNewBlobName()), + testContainer.Container.GetBlockBlobClient(GetNewBlobName()), + size: fileSize, + cancellationToken: cts.Token); + } } [Ignore("These tests currently take 40+ mins for little additional coverage")] @@ -486,16 +542,22 @@ await UploadBlockBlobsAndVerify( [TestCase(6, 257 * Constants.MB, 400)] [TestCase(2, Constants.GB, 1000)] [TestCase(3, Constants.GB, 2000)] - public async Task LocalToBlockBlob_LargeMultiple(int blobCount, long fileSize, int waitTimeInSec) + public async Task LocalToBlockBlob_LargeMultiple(int blobCount, int fileSize, int waitTimeInSec) { // Arrange await using DisposingContainer testContainer = await GetTestContainerAsync(); + using DisposingLocalDirectory localDirectory = DisposingLocalDirectory.GetTestDirectory(); - await UploadBlockBlobsAndVerify( - container: testContainer.Container, - size: fileSize, - waitTimeInSec: waitTimeInSec, - blobCount: blobCount); + foreach (var _ in Enumerable.Range(0, blobCount)) + { + CancellationTokenSource cts = new(); + cts.CancelAfter(TimeSpan.FromSeconds(waitTimeInSec)); + await UploadBlockBlobAndVerify( + Path.Combine(localDirectory.DirectoryPath, GetNewBlobName()), + testContainer.Container.GetBlockBlobClient(GetNewBlobName()), + size: fileSize, + cancellationToken: cts.Token); + } } #endregion SingleUpload Block Blob From 70cb5c8b973ae171a357d1f6c00cf240fa17b481 Mon Sep 17 00:00:00 2001 From: Jocelyn Schreppler Date: Thu, 19 Oct 2023 14:35:19 -0400 Subject: [PATCH 2/5] append --- .../tests/StartTransferUploadTests.cs | 571 ++++++------------ 1 file changed, 200 insertions(+), 371 deletions(-) diff --git a/sdk/storage/Azure.Storage.DataMovement/tests/StartTransferUploadTests.cs b/sdk/storage/Azure.Storage.DataMovement/tests/StartTransferUploadTests.cs index 75817d6dd393d..9053075b46e59 100644 --- a/sdk/storage/Azure.Storage.DataMovement/tests/StartTransferUploadTests.cs +++ b/sdk/storage/Azure.Storage.DataMovement/tests/StartTransferUploadTests.cs @@ -58,99 +58,6 @@ internal DataTransferOptions CopySingleUploadOptions(DataTransferOptions options } #region SingleUpload Block Blob - /// - /// Upload and verify the contents of the blob - /// - /// By default in this function an event arguement will be added to the options event handler - /// to detect when the upload has finished. - /// - /// - /// - /// - /// - //private async Task UploadBlockBlobsAndVerify( - // BlobContainerClient container, - // long size = Constants.KB, - // int waitTimeInSec = 30, - // TransferManagerOptions transferManagerOptions = default, - // int blobCount = 1, - // List blobNames = default, - // List options = default) - //{ - // using DisposingLocalDirectory testDirectory = DisposingLocalDirectory.GetTestDirectory(); - - // // Populate blobNames list for number of blobs to be created - // if (blobNames == default || blobNames?.Count == 0) - // { - // blobNames ??= new List(); - // for (int i = 0; i < blobCount; i++) - // { - // blobNames.Add(GetNewBlobName()); - // } - // } - // else - // { - // // If blobNames is popluated make sure these number of blobs match - // Assert.AreEqual(blobCount, blobNames.Count); - // } - - // // Populate Options and TestRaisedOptions - // List eventRaisedList = TestEventsRaised.PopulateTestOptions(blobCount, ref options); - - // transferManagerOptions ??= new TransferManagerOptions() - // { - // ErrorHandling = DataTransferErrorMode.ContinueOnFailure - // }; - - // List uploadedBlobInfo = new List(blobCount); - - // // Initialize TransferManager - // TransferManager transferManager = new TransferManager(transferManagerOptions); - - // // Set up blob to upload - // for (int i = 0; i < blobCount; i++) - // { - // using Stream originalStream = await CreateLimitedMemoryStream(size); - // string localSourceFile = Path.Combine(testDirectory.DirectoryPath, GetNewBlobName()); - // // create a new file and copy contents of stream into it, and then close the FileStream - // // so the StagedUploadAsync call is not prevented from reading using its FileStream. - // using (FileStream fileStream = File.OpenWrite(localSourceFile)) - // { - // await originalStream.CopyToAsync(fileStream); - // } - - // // Set up destination client - // BlockBlobClient destClient = container.GetBlockBlobClient(blobNames[i]); - // StorageResourceItem destinationResource = new BlockBlobStorageResource(destClient); - - // // Act - // StorageResourceItem sourceResource = new LocalFileStorageResource(localSourceFile); - // DataTransfer transfer = await transferManager.StartTransferAsync(sourceResource, destinationResource, options[i]); - - // uploadedBlobInfo.Add(new VerifyUploadBlobContentInfo( - // sourceFile: localSourceFile, - // destinationClient: destClient, - // eventsRaised: eventRaisedList[i], - // dataTransfer: transfer)); - // } - - // for (int i = 0; i < blobCount; i++) - // { - // // Assert - // Assert.NotNull(uploadedBlobInfo[i].DataTransfer); - // CancellationTokenSource tokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(waitTimeInSec)); - // await uploadedBlobInfo[i].DataTransfer.WaitForCompletionAsync(tokenSource.Token); - // Assert.IsTrue(uploadedBlobInfo[i].DataTransfer.HasCompleted); - - // // Verify Upload - // await uploadedBlobInfo[i].EventsRaised.AssertSingleCompletedCheck(); - // using (FileStream fileStream = File.OpenRead(uploadedBlobInfo[i].LocalPath)) - // { - // await DownloadAndAssertAsync(fileStream, uploadedBlobInfo[i].DestinationClient); - // } - // } - //} - private async Task UploadBlockBlobAndVerify( string filePath, BlockBlobClient blob, @@ -161,7 +68,7 @@ private async Task UploadBlockBlobAndVerify( { using (Stream fs = File.OpenWrite(filePath)) { - await new MemoryStream(GetRandomBuffer(size)).CopyToAsync(fs); + await new MemoryStream(GetRandomBuffer(size)).CopyToAsync(fs, cancellationToken); } LocalFileStorageResource sourceResource = new(filePath); @@ -562,98 +469,32 @@ await UploadBlockBlobAndVerify( #endregion SingleUpload Block Blob #region SingleUpload Page Blob - /// - /// Upload and verify the contents of the blob - /// - /// By default in this function an event arguement will be added to the options event handler - /// to detect when the upload has finished. - /// - /// - /// - /// - /// - private async Task UploadPageBlobsAndVerify( - BlobContainerClient container, + private async Task UploadPageBlobAndVerify( + string filePath, + PageBlobClient blob, long size = Constants.KB, - int waitTimeInSec = 30, - int blobCount = 1, TransferManagerOptions transferManagerOptions = default, - List blobNames = default, - List options = default) + DataTransferOptions options = default, + CancellationToken cancellationToken = default) { - using DisposingLocalDirectory testDirectory = DisposingLocalDirectory.GetTestDirectory(); - - // Populate blobNames list for number of blobs to be created - if (blobNames == default || blobNames?.Count == 0) - { - blobNames ??= new List(); - for (int i = 0; i < blobCount; i++) - { - blobNames.Add(GetNewBlobName()); - } - } - else + using (Stream fs = File.OpenWrite(filePath)) { - // If blobNames is popluated make sure these number of blobs match - Assert.AreEqual(blobCount, blobNames.Count); + await (await CreateLimitedMemoryStream(size)).CopyToAsync(fs, cancellationToken); } - // Populate Options and TestRaisedOptions - List eventRaisedList = TestEventsRaised.PopulateTestOptions(blobCount, ref options); - - transferManagerOptions ??= new TransferManagerOptions() - { - ErrorHandling = DataTransferErrorMode.ContinueOnFailure - }; - - List uploadedBlobInfo = new List(blobCount); - - // Initialize BlobDataController - TransferManager blobDataController = new TransferManager(transferManagerOptions); - - // Set up blob to upload - for (int i = 0; i < blobCount; i++) - { - using Stream originalStream = await CreateLimitedMemoryStream(size); - string localSourceFile = Path.GetTempFileName(); - // create a new file and copy contents of stream into it, and then close the FileStream - // so the StagedUploadAsync call is not prevented from reading using its FileStream. - using (FileStream fileStream = File.Create(localSourceFile)) - { - await originalStream.CopyToAsync(fileStream); - } - - // Set up destination client - PageBlobClient destClient = container.GetPageBlobClient(blobNames[i]); - StorageResourceItem destinationResource = new PageBlobStorageResource(destClient); - - // Act - StorageResourceItem sourceResource = new LocalFileStorageResource(localSourceFile); - DataTransfer transfer = await blobDataController.StartTransferAsync(sourceResource, destinationResource, options[i]); - - uploadedBlobInfo.Add(new VerifyUploadBlobContentInfo( - localSourceFile, - destClient, - eventRaisedList[i], - transfer)); - } + LocalFileStorageResource sourceResource = new(filePath); + PageBlobStorageResource destResource = new(blob); - for (int i = 0; i < blobCount; i++) + await new TransferValidator() { - // Assert - Assert.NotNull(uploadedBlobInfo[i].DataTransfer); - CancellationTokenSource tokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(waitTimeInSec)); - await uploadedBlobInfo[i].DataTransfer.WaitForCompletionAsync(tokenSource.Token); - Assert.IsTrue(uploadedBlobInfo[i].DataTransfer.HasCompleted); - Assert.AreEqual(DataTransferState.Completed, uploadedBlobInfo[i].DataTransfer.TransferStatus.State); - - // Verify Upload - await uploadedBlobInfo[i].EventsRaised.AssertSingleCompletedCheck(); - using (FileStream fileStream = File.OpenRead(uploadedBlobInfo[i].LocalPath)) - { - await DownloadAndAssertAsync(fileStream, uploadedBlobInfo[i].DestinationClient); - } - } + TransferManager = new TransferManager(transferManagerOptions) + }.TransferAndVerifyAsync( + sourceResource, + destResource, + async cToken => await blob.OpenReadAsync(cancellationToken: cToken), + cToken => Task.FromResult(File.OpenRead(filePath) as Stream), + options, + cancellationToken); } [RecordedTest] @@ -661,7 +502,10 @@ public async Task LocalToPageBlob() { // Arrange await using DisposingContainer testContainer = await GetTestContainerAsync(); - await UploadPageBlobsAndVerify(testContainer.Container); + using DisposingLocalDirectory localDirectory = DisposingLocalDirectory.GetTestDirectory(); + await UploadPageBlobAndVerify( + Path.Combine(localDirectory.DirectoryPath, GetNewBlobName()), + testContainer.Container.GetPageBlobClient(GetNewBlobName())); } [RecordedTest] @@ -670,12 +514,15 @@ public async Task LocalToPageBlob_Overwrite_Exists() // Arrange // Create source local file for checking, and source blob await using DisposingContainer testContainer = await GetTestContainerAsync(); + using DisposingLocalDirectory localDirectory = DisposingLocalDirectory.GetTestDirectory(); string blobName = GetNewBlobName(); - string localSourceFile = Path.GetTempFileName(); int size = Constants.KB; int waitTimeInSec = 10; + // Create blob - PageBlobClient destClient = await CreatePageBlob(testContainer.Container, localSourceFile, blobName, size); + PageBlobClient destClient = testContainer.Container.GetPageBlobClient(blobName); + await destClient.CreateAsync(size); + await destClient.UploadPagesAsync(new MemoryStream(GetRandomBuffer(size)), 0L); // Act // Create options bag to overwrite any existing destination. @@ -683,17 +530,16 @@ public async Task LocalToPageBlob_Overwrite_Exists() { CreationPreference = StorageResourceCreationPreference.OverwriteIfExists, }; - List optionsList = new List() { options }; - List blobNames = new List() { blobName }; - TransferManager transferManager = new TransferManager(); // Start transfer and await for completion. - await UploadPageBlobsAndVerify( - container: testContainer.Container, + CancellationTokenSource cts = new(); + cts.CancelAfter(TimeSpan.FromSeconds(waitTimeInSec)); + await UploadPageBlobAndVerify( + Path.Combine(localDirectory.DirectoryPath, GetNewBlobName()), + destClient, size: size, - waitTimeInSec: waitTimeInSec, - blobNames: blobNames, - options: optionsList); + options: options, + cancellationToken: cts.Token); } [RecordedTest] @@ -702,6 +548,7 @@ public async Task LocalToPageBlob_Overwrite_NotExists() // Arrange // Create source local file for checking, and source blob await using DisposingContainer testContainer = await GetTestContainerAsync(); + using DisposingLocalDirectory localDirectory = DisposingLocalDirectory.GetTestDirectory(); int size = Constants.KB; int waitTimeInSec = 10; @@ -711,15 +558,16 @@ public async Task LocalToPageBlob_Overwrite_NotExists() { CreationPreference = StorageResourceCreationPreference.OverwriteIfExists, }; - List optionsList = new List() { options }; - TransferManager transferManager = new TransferManager(); // Start transfer and await for completion. - await UploadPageBlobsAndVerify( - container: testContainer.Container, + CancellationTokenSource cts = new(); + cts.CancelAfter(TimeSpan.FromSeconds(waitTimeInSec)); + await UploadPageBlobAndVerify( + Path.Combine(localDirectory.DirectoryPath, GetNewBlobName()), + testContainer.Container.GetPageBlobClient(GetNewBlobName()), size: size, - waitTimeInSec: waitTimeInSec, - options: optionsList); + options: options, + cancellationToken: cts.Token); } [RecordedTest] @@ -829,14 +677,16 @@ public async Task LocalToPageBlob_SmallSize(long fileSize, int waitTimeInSec) DataTransferOptions options = new DataTransferOptions(); // Arrange - var blobName = GetNewBlobName(); await using DisposingContainer testContainer = await GetTestContainerAsync(); - PageBlobClient destClient = testContainer.Container.GetPageBlobClient(blobName); + using DisposingLocalDirectory localDirectory = DisposingLocalDirectory.GetTestDirectory(); - await UploadPageBlobsAndVerify( + CancellationTokenSource cts = new(); + cts.CancelAfter(TimeSpan.FromSeconds(waitTimeInSec)); + await UploadPageBlobAndVerify( + Path.Combine(localDirectory.DirectoryPath, GetNewBlobName()), + testContainer.Container.GetPageBlobClient(GetNewBlobName()), size: fileSize, - waitTimeInSec: waitTimeInSec, - container: testContainer.Container); + cancellationToken: cts.Token); } [Ignore("These tests currently take 40+ mins for little additional coverage")] @@ -851,14 +701,16 @@ public async Task LocalToPageBlob_LargeSize(long fileSize, int waitTimeInSec) DataTransferOptions options = new DataTransferOptions(); // Arrange - var blobName = GetNewBlobName(); await using DisposingContainer testContainer = await GetTestContainerAsync(); - PageBlobClient destClient = testContainer.Container.GetPageBlobClient(blobName); + using DisposingLocalDirectory localDirectory = DisposingLocalDirectory.GetTestDirectory(); - await UploadPageBlobsAndVerify( + CancellationTokenSource cts = new(); + cts.CancelAfter(TimeSpan.FromSeconds(waitTimeInSec)); + await UploadPageBlobAndVerify( + Path.Combine(localDirectory.DirectoryPath, GetNewBlobName()), + testContainer.Container.GetPageBlobClient(GetNewBlobName()), size: fileSize, - waitTimeInSec: waitTimeInSec, - container: testContainer.Container); + cancellationToken: cts.Token); } [RecordedTest] @@ -869,8 +721,8 @@ await UploadPageBlobsAndVerify( public async Task LocalToPageBlob_SmallConcurrency(int concurrency, int size, int waitTimeInSec) { // Arrange - var blobName = GetNewBlobName(); await using DisposingContainer testContainer = await GetTestContainerAsync(); + using DisposingLocalDirectory localDirectory = DisposingLocalDirectory.GetTestDirectory(); TransferManagerOptions managerOptions = new TransferManagerOptions() { @@ -883,13 +735,15 @@ public async Task LocalToPageBlob_SmallConcurrency(int concurrency, int size, in InitialTransferSize = 512, MaximumTransferChunkSize = 512, }; - List optionsList = new List { options }; - await UploadPageBlobsAndVerify( + CancellationTokenSource cts = new(); + cts.CancelAfter(TimeSpan.FromSeconds(waitTimeInSec)); + await UploadPageBlobAndVerify( + Path.Combine(localDirectory.DirectoryPath, GetNewBlobName()), + testContainer.Container.GetPageBlobClient(GetNewBlobName()), size: size, - waitTimeInSec: waitTimeInSec, - container: testContainer.Container, - options: optionsList); + options: options, + cancellationToken: cts.Token); } [Ignore("These tests currently take 40+ mins for little additional coverage")] @@ -905,8 +759,8 @@ await UploadPageBlobsAndVerify( public async Task LocalToPageBlob_LargeConcurrency(int concurrency, int size, int waitTimeInSec) { // Arrange - var blobName = GetNewBlobName(); await using DisposingContainer testContainer = await GetTestContainerAsync(); + using DisposingLocalDirectory localDirectory = DisposingLocalDirectory.GetTestDirectory(); TransferManagerOptions managerOptions = new TransferManagerOptions() { @@ -914,10 +768,13 @@ public async Task LocalToPageBlob_LargeConcurrency(int concurrency, int size, in MaximumConcurrency = concurrency, }; - await UploadPageBlobsAndVerify( + CancellationTokenSource cts = new(); + cts.CancelAfter(TimeSpan.FromSeconds(waitTimeInSec)); + await UploadPageBlobAndVerify( + Path.Combine(localDirectory.DirectoryPath, GetNewBlobName()), + testContainer.Container.GetPageBlobClient(GetNewBlobName()), size: size, - waitTimeInSec: waitTimeInSec, - container: testContainer.Container); + cancellationToken: cts.Token); } [Ignore("https://github.com/Azure/azure-sdk-for-net/issues/33003")] @@ -927,16 +784,22 @@ await UploadPageBlobsAndVerify( [TestCase(6, Constants.KB, 10)] [TestCase(2, 2 * Constants.KB, 10)] [TestCase(6, 2 * Constants.KB, 10)] - public async Task LocalToPageBlob_SmallMultiple(int blobCount, long fileSize, int waitTimeInSec) + public async Task LocalToPageBlob_SmallMultiple(int blobCount, int fileSize, int waitTimeInSec) { // Arrange await using DisposingContainer testContainer = await GetTestContainerAsync(); + using DisposingLocalDirectory localDirectory = DisposingLocalDirectory.GetTestDirectory(); - await UploadPageBlobsAndVerify( - size: fileSize, - waitTimeInSec: waitTimeInSec, - container: testContainer.Container, - blobCount: blobCount); + CancellationTokenSource cts = new(); + cts.CancelAfter(TimeSpan.FromSeconds(waitTimeInSec)); + foreach (var _ in Enumerable.Range(0, blobCount)) + { + await UploadPageBlobAndVerify( + Path.Combine(localDirectory.DirectoryPath, GetNewBlobName()), + testContainer.Container.GetPageBlobClient(GetNewBlobName()), + size: fileSize, + cancellationToken: cts.Token); + } } [Ignore("These tests currently take 40+ mins for little additional coverage")] @@ -952,12 +815,18 @@ public async Task LocalToPageBlob_LargeMultiple(int blobCount, long fileSize, in { // Arrange await using DisposingContainer testContainer = await GetTestContainerAsync(); + using DisposingLocalDirectory localDirectory = DisposingLocalDirectory.GetTestDirectory(); - await UploadPageBlobsAndVerify( - size: fileSize, - waitTimeInSec: waitTimeInSec, - container: testContainer.Container, - blobCount: blobCount); + CancellationTokenSource cts = new(); + cts.CancelAfter(TimeSpan.FromSeconds(waitTimeInSec)); + foreach (var _ in Enumerable.Range(0, blobCount)) + { + await UploadPageBlobAndVerify( + Path.Combine(localDirectory.DirectoryPath, GetNewBlobName()), + testContainer.Container.GetPageBlobClient(GetNewBlobName()), + size: fileSize, + cancellationToken: cts.Token); + } } [RecordedTest] @@ -972,109 +841,43 @@ public async Task LocalToPageBlob_SmallChunks() // Arrange await using DisposingContainer testContainer = await GetTestContainerAsync(); + using DisposingLocalDirectory localDirectory = DisposingLocalDirectory.GetTestDirectory(); - List optionsList = new List() { options }; - await UploadPageBlobsAndVerify( - container: testContainer.Container, + await UploadPageBlobAndVerify( + Path.Combine(localDirectory.DirectoryPath, GetNewBlobName()), + testContainer.Container.GetPageBlobClient(GetNewBlobName()), size: size, - blobCount: optionsList.Count, - options: optionsList); + options: options); } #endregion SingleUpload Page Blob #region SingleUpload Append Blob - /// - /// Upload and verify the contents of the blob - /// - /// By default in this function an event arguement will be added to the options event handler - /// to detect when the upload has finished. - /// - /// - /// - /// - /// - private async Task UploadAppendBlobsAndVerify( - BlobContainerClient container, + private async Task UploadAppendBlobAndVerify( + string filePath, + AppendBlobClient blob, long size = Constants.KB, - int waitTimeInSec = 30, - int blobCount = 1, TransferManagerOptions transferManagerOptions = default, - List blobNames = default, - List options = default) + DataTransferOptions options = default, + CancellationToken cancellationToken = default) { - using DisposingLocalDirectory testDirectory = DisposingLocalDirectory.GetTestDirectory(); - - // Populate blobNames list for number of blobs to be created - if (blobNames == default || blobNames?.Count == 0) - { - blobNames ??= new List(); - for (int i = 0; i < blobCount; i++) - { - blobNames.Add(GetNewBlobName()); - } - } - else + using (Stream fs = File.OpenWrite(filePath)) { - // If blobNames is popluated make sure these number of blobs match - Assert.AreEqual(blobCount, blobNames.Count); + await (await CreateLimitedMemoryStream(size)).CopyToAsync(fs, cancellationToken); } - // Populate Options and TestRaisedOptions - List eventRaisedList = TestEventsRaised.PopulateTestOptions(blobCount, ref options); - - transferManagerOptions ??= new TransferManagerOptions() - { - ErrorHandling = DataTransferErrorMode.ContinueOnFailure - }; - - List uploadedBlobInfo = new List(blobCount); - - // Initialize BlobDataController - TransferManager blobDataController = new TransferManager(transferManagerOptions); - - // Set up blob to upload - for (int i = 0; i < blobCount; i++) - { - using Stream originalStream = await CreateLimitedMemoryStream(size); - string localSourceFile = Path.GetTempFileName(); - // create a new file and copy contents of stream into it, and then close the FileStream - // so the StagedUploadAsync call is not prevented from reading using its FileStream. - using (FileStream fileStream = File.Create(localSourceFile)) - { - await originalStream.CopyToAsync(fileStream); - } - - // Set up destination client - AppendBlobClient destClient = container.GetAppendBlobClient(blobNames[i]); - StorageResourceItem destinationResource = new AppendBlobStorageResource(destClient); - - // Act - StorageResourceItem sourceResource = new LocalFileStorageResource(localSourceFile); - DataTransfer transfer = await blobDataController.StartTransferAsync(sourceResource, destinationResource, options[i]); - - uploadedBlobInfo.Add(new VerifyUploadBlobContentInfo( - localSourceFile, - destClient, - eventRaisedList[i], - transfer)); - } + LocalFileStorageResource sourceResource = new(filePath); + AppendBlobStorageResource destResource = new(blob); - for (int i = 0; i < blobCount; i++) + await new TransferValidator() { - // Assert - Assert.NotNull(uploadedBlobInfo[i].DataTransfer); - CancellationTokenSource tokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(waitTimeInSec)); - await uploadedBlobInfo[i].DataTransfer.WaitForCompletionAsync(tokenSource.Token); - Assert.IsTrue(uploadedBlobInfo[i].DataTransfer.HasCompleted); - Assert.AreEqual(DataTransferState.Completed, uploadedBlobInfo[i].DataTransfer.TransferStatus.State); - - // Verify Upload - await uploadedBlobInfo[i].EventsRaised.AssertSingleCompletedCheck(); - using (FileStream fileStream = File.OpenRead(uploadedBlobInfo[i].LocalPath)) - { - await DownloadAndAssertAsync(fileStream, uploadedBlobInfo[i].DestinationClient); - } - } + TransferManager = new TransferManager(transferManagerOptions) + }.TransferAndVerifyAsync( + sourceResource, + destResource, + async cToken => await blob.OpenReadAsync(cancellationToken: cToken), + cToken => Task.FromResult(File.OpenRead(filePath) as Stream), + options, + cancellationToken); } [RecordedTest] @@ -1082,8 +885,11 @@ public async Task LocalToAppendBlob() { // Arrange await using DisposingContainer testContainer = await GetTestContainerAsync(); + using DisposingLocalDirectory localDirectory = DisposingLocalDirectory.GetTestDirectory(); - await UploadAppendBlobsAndVerify(testContainer.Container); + await UploadAppendBlobAndVerify( + Path.Combine(localDirectory.DirectoryPath, GetNewBlobName()), + testContainer.Container.GetAppendBlobClient(GetNewBlobName())); } [RecordedTest] @@ -1100,13 +906,16 @@ public async Task LocalToAppend_SmallChunk() // Arrange await using DisposingContainer testContainer = await GetTestContainerAsync(publicAccessType: PublicAccessType.BlobContainer); + using DisposingLocalDirectory localDirectory = DisposingLocalDirectory.GetTestDirectory(); - List optionsList = new List() { options }; - await UploadAppendBlobsAndVerify( - testContainer.Container, - waitTimeInSec: waitTimeInSec, + CancellationTokenSource cts = new(); + cts.CancelAfter(TimeSpan.FromSeconds(waitTimeInSec)); + await UploadAppendBlobAndVerify( + Path.Combine(localDirectory.DirectoryPath, GetNewBlobName()), + testContainer.Container.GetAppendBlobClient(GetNewBlobName()), + options: options, size: size, - options: optionsList).ConfigureAwait(false); + cancellationToken: cts.Token).ConfigureAwait(false); } [RecordedTest] @@ -1115,12 +924,12 @@ public async Task LocalToAppendBlob_Overwrite_Exists() // Arrange // Create source local file for checking, and source blob await using DisposingContainer testContainer = await GetTestContainerAsync(); - string blobName = GetNewBlobName(); - string localSourceFile = Path.GetTempFileName(); + using DisposingLocalDirectory localDirectory = DisposingLocalDirectory.GetTestDirectory(); int size = Constants.KB; int waitTimeInSec = 10; // Create blob - AppendBlobClient destClient = await CreateAppendBlob(testContainer.Container, localSourceFile, blobName, size); + AppendBlobClient destClient = testContainer.Container.GetAppendBlobClient(GetNewBlobName()); + await (await CreateLimitedMemoryStream(size)).CopyToAsync(await destClient.OpenWriteAsync(true)); // Act // Create options bag to overwrite any existing destination. @@ -1128,16 +937,16 @@ public async Task LocalToAppendBlob_Overwrite_Exists() { CreationPreference = StorageResourceCreationPreference.OverwriteIfExists, }; - List optionsList = new List() { options }; - List blobNames = new List() { blobName }; // Start transfer and await for completion. - await UploadAppendBlobsAndVerify( - container: testContainer.Container, + CancellationTokenSource cts = new(); + cts.CancelAfter(TimeSpan.FromSeconds(waitTimeInSec)); + await UploadAppendBlobAndVerify( + Path.Combine(localDirectory.DirectoryPath, GetNewBlobName()), + destClient, size: size, - waitTimeInSec: waitTimeInSec, - blobNames: blobNames, - options: optionsList); + options: options, + cancellationToken: cts.Token); } [RecordedTest] @@ -1146,6 +955,7 @@ public async Task LocalToAppendBlob_Overwrite_NotExists() // Arrange // Create source local file for checking, and source blob await using DisposingContainer testContainer = await GetTestContainerAsync(); + using DisposingLocalDirectory localDirectory = DisposingLocalDirectory.GetTestDirectory(); int size = Constants.KB; int waitTimeInSec = 10; @@ -1155,14 +965,16 @@ public async Task LocalToAppendBlob_Overwrite_NotExists() { CreationPreference = StorageResourceCreationPreference.OverwriteIfExists, }; - List optionsList = new List() { options }; // Start transfer and await for completion. - await UploadAppendBlobsAndVerify( - container: testContainer.Container, + CancellationTokenSource cts = new(); + cts.CancelAfter(TimeSpan.FromSeconds(waitTimeInSec)); + await UploadAppendBlobAndVerify( + Path.Combine(localDirectory.DirectoryPath, GetNewBlobName()), + testContainer.Container.GetAppendBlobClient(GetNewBlobName()), size: size, - waitTimeInSec: waitTimeInSec, - options: optionsList); + options: options, + cancellationToken: cts.Token); } [RecordedTest] @@ -1270,14 +1082,16 @@ public async Task LocalToAppendBlob_Failure_Exists() public async Task LocalToAppendBlob_SmallSize(long fileSize, int waitTimeInSec) { // Arrange - var blobName = GetNewBlobName(); await using DisposingContainer testContainer = await GetTestContainerAsync(); - AppendBlobClient destClient = testContainer.Container.GetAppendBlobClient(blobName); + using DisposingLocalDirectory localDirectory = DisposingLocalDirectory.GetTestDirectory(); - await UploadAppendBlobsAndVerify( + CancellationTokenSource cts = new(); + cts.CancelAfter(TimeSpan.FromSeconds(waitTimeInSec)); + await UploadAppendBlobAndVerify( + Path.Combine(localDirectory.DirectoryPath, GetNewBlobName()), + testContainer.Container.GetAppendBlobClient(GetNewBlobName()), size: fileSize, - waitTimeInSec: waitTimeInSec, - container: testContainer.Container); + cancellationToken: cts.Token); } [Ignore("These tests currently take 40+ mins for little additional coverage")] @@ -1291,14 +1105,16 @@ await UploadAppendBlobsAndVerify( public async Task LocalToAppendBlob_LargeSize(long fileSize, int waitTimeInSec) { // Arrange - var blobName = GetNewBlobName(); await using DisposingContainer testContainer = await GetTestContainerAsync(); - AppendBlobClient destClient = testContainer.Container.GetAppendBlobClient(blobName); + using DisposingLocalDirectory localDirectory = DisposingLocalDirectory.GetTestDirectory(); - await UploadAppendBlobsAndVerify( + CancellationTokenSource cts = new(); + cts.CancelAfter(TimeSpan.FromSeconds(waitTimeInSec)); + await UploadAppendBlobAndVerify( + Path.Combine(localDirectory.DirectoryPath, GetNewBlobName()), + testContainer.Container.GetAppendBlobClient(GetNewBlobName()), size: fileSize, - waitTimeInSec: waitTimeInSec, - container: testContainer.Container); + cancellationToken: cts.Token); } [RecordedTest] @@ -1310,18 +1126,14 @@ await UploadAppendBlobsAndVerify( public async Task LocalToAppendBlob_SmallConcurrency(int concurrency, int size, int waitTimeInSec) { // Arrange - var blobName = GetNewBlobName(); await using DisposingContainer testContainer = await GetTestContainerAsync(); - AppendBlobClient destClient = testContainer.Container.GetAppendBlobClient(blobName); + using DisposingLocalDirectory localDirectory = DisposingLocalDirectory.GetTestDirectory(); DataTransferOptions options = new DataTransferOptions() { InitialTransferSize = 512, MaximumTransferChunkSize = 512 }; - List optionsList = new List { options }; - - List blobNames = new List() { blobName }; TransferManagerOptions managerOptions = new TransferManagerOptions() { @@ -1329,13 +1141,14 @@ public async Task LocalToAppendBlob_SmallConcurrency(int concurrency, int size, MaximumConcurrency = concurrency, }; - await UploadAppendBlobsAndVerify( + CancellationTokenSource cts = new(); + cts.CancelAfter(TimeSpan.FromSeconds(waitTimeInSec)); + await UploadAppendBlobAndVerify( + Path.Combine(localDirectory.DirectoryPath, GetNewBlobName()), + testContainer.Container.GetAppendBlobClient(GetNewBlobName()), size: size, - waitTimeInSec: waitTimeInSec, - container: testContainer.Container, - blobCount: blobNames.Count(), - blobNames: blobNames, - options: optionsList); + options: options, + cancellationToken: cts.Token); } [Ignore("These tests currently take 40+ mins for little additional coverage")] @@ -1353,6 +1166,7 @@ public async Task LocalToAppendBlob_LargeConcurrency(int concurrency, int size, { // Arrange await using DisposingContainer testContainer = await GetTestContainerAsync(); + using DisposingLocalDirectory localDirectory = DisposingLocalDirectory.GetTestDirectory(); TransferManagerOptions managerOptions = new TransferManagerOptions() { @@ -1360,11 +1174,14 @@ public async Task LocalToAppendBlob_LargeConcurrency(int concurrency, int size, MaximumConcurrency = concurrency, }; - await UploadAppendBlobsAndVerify( - container: testContainer.Container, + CancellationTokenSource cts = new(); + cts.CancelAfter(TimeSpan.FromSeconds(waitTimeInSec)); + await UploadAppendBlobAndVerify( + Path.Combine(localDirectory.DirectoryPath, GetNewBlobName()), + testContainer.Container.GetAppendBlobClient(GetNewBlobName()), size: size, - waitTimeInSec: waitTimeInSec, - transferManagerOptions: managerOptions); + transferManagerOptions: managerOptions, + cancellationToken: cts.Token); } [Ignore("https://github.com/Azure/azure-sdk-for-net/issues/33003")] @@ -1380,12 +1197,18 @@ public async Task LocalToAppendBlob_SmallMultiple(int blobCount, long fileSize, { // Arrange await using DisposingContainer testContainer = await GetTestContainerAsync(); + using DisposingLocalDirectory localDirectory = DisposingLocalDirectory.GetTestDirectory(); - await UploadAppendBlobsAndVerify( - size: fileSize, - waitTimeInSec: waitTimeInSec, - container: testContainer.Container, - blobCount: blobCount); + CancellationTokenSource cts = new(); + cts.CancelAfter(TimeSpan.FromSeconds(waitTimeInSec)); + foreach (var _ in Enumerable.Range(0, blobCount)) + { + await UploadAppendBlobAndVerify( + Path.Combine(localDirectory.DirectoryPath, GetNewBlobName()), + testContainer.Container.GetAppendBlobClient(GetNewBlobName()), + size: fileSize, + cancellationToken: cts.Token); + } } [Ignore("These tests currently take 40+ mins for little additional coverage")] @@ -1406,12 +1229,18 @@ public async Task LocalToAppendBlob_LargeMultiple(int blobCount, long fileSize, { // Arrange await using DisposingContainer testContainer = await GetTestContainerAsync(); + using DisposingLocalDirectory localDirectory = DisposingLocalDirectory.GetTestDirectory(); - await UploadAppendBlobsAndVerify( - size: fileSize, - waitTimeInSec: waitTimeInSec, - container: testContainer.Container, - blobCount: blobCount); + CancellationTokenSource cts = new(); + cts.CancelAfter(TimeSpan.FromSeconds(waitTimeInSec)); + foreach (var _ in Enumerable.Range(0, blobCount)) + { + await UploadAppendBlobAndVerify( + Path.Combine(localDirectory.DirectoryPath, GetNewBlobName()), + testContainer.Container.GetAppendBlobClient(GetNewBlobName()), + size: fileSize, + cancellationToken: cts.Token); + } } #endregion SingleUpload Append Blob From e3f6b22ed1d93e5191241adf1718e2d6447d11ad Mon Sep 17 00:00:00 2001 From: Jocelyn Schreppler Date: Thu, 19 Oct 2023 14:50:24 -0400 Subject: [PATCH 3/5] net462 fix --- .../tests/StartTransferUploadTests.cs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sdk/storage/Azure.Storage.DataMovement/tests/StartTransferUploadTests.cs b/sdk/storage/Azure.Storage.DataMovement/tests/StartTransferUploadTests.cs index 9053075b46e59..037d05ba0cced 100644 --- a/sdk/storage/Azure.Storage.DataMovement/tests/StartTransferUploadTests.cs +++ b/sdk/storage/Azure.Storage.DataMovement/tests/StartTransferUploadTests.cs @@ -68,7 +68,7 @@ private async Task UploadBlockBlobAndVerify( { using (Stream fs = File.OpenWrite(filePath)) { - await new MemoryStream(GetRandomBuffer(size)).CopyToAsync(fs, cancellationToken); + await new MemoryStream(GetRandomBuffer(size)).CopyToAsync(fs, bufferSize: Constants.KB, cancellationToken); } LocalFileStorageResource sourceResource = new(filePath); @@ -479,7 +479,7 @@ private async Task UploadPageBlobAndVerify( { using (Stream fs = File.OpenWrite(filePath)) { - await (await CreateLimitedMemoryStream(size)).CopyToAsync(fs, cancellationToken); + await (await CreateLimitedMemoryStream(size)).CopyToAsync(fs, bufferSize: Constants.KB, cancellationToken); } LocalFileStorageResource sourceResource = new(filePath); @@ -862,7 +862,7 @@ private async Task UploadAppendBlobAndVerify( { using (Stream fs = File.OpenWrite(filePath)) { - await (await CreateLimitedMemoryStream(size)).CopyToAsync(fs, cancellationToken); + await (await CreateLimitedMemoryStream(size)).CopyToAsync(fs, bufferSize: Constants.KB, cancellationToken); } LocalFileStorageResource sourceResource = new(filePath); From b0ba0407b5b4b65917b8b7323edbc18251fbe399 Mon Sep 17 00:00:00 2001 From: Jocelyn Schreppler Date: Fri, 20 Oct 2023 15:31:40 -0400 Subject: [PATCH 4/5] default timeouts --- .../tests/Shared/TransferValidator.cs | 14 ++++++++++++++ .../tests/StartTransferUploadTests.cs | 4 ---- 2 files changed, 14 insertions(+), 4 deletions(-) diff --git a/sdk/storage/Azure.Storage.DataMovement/tests/Shared/TransferValidator.cs b/sdk/storage/Azure.Storage.DataMovement/tests/Shared/TransferValidator.cs index 70493661580b7..62f3f3bd0f6eb 100644 --- a/sdk/storage/Azure.Storage.DataMovement/tests/Shared/TransferValidator.cs +++ b/sdk/storage/Azure.Storage.DataMovement/tests/Shared/TransferValidator.cs @@ -38,6 +38,13 @@ public async Task TransferAndVerifyAsync( options ??= new DataTransferOptions(); TestEventsRaised testEventsRaised = new TestEventsRaised(options); + if (cancellationToken == default) + { + CancellationTokenSource cts = new(); + cts.CancelAfter(TimeSpan.FromSeconds(10)); + cancellationToken = cts.Token; + } + DataTransfer transfer = await TransferManager.StartTransferAsync( sourceResource, destinationResource, @@ -74,6 +81,13 @@ public async Task TransferAndVerifyAsync( DataTransferOptions options = default, CancellationToken cancellationToken = default) { + if (cancellationToken == default) + { + CancellationTokenSource cts = new(); + cts.CancelAfter(TimeSpan.FromSeconds(10)); + cancellationToken = cts.Token; + } + DataTransfer transfer = await TransferManager.StartTransferAsync( sourceResource, destinationResource, diff --git a/sdk/storage/Azure.Storage.DataMovement/tests/StartTransferUploadTests.cs b/sdk/storage/Azure.Storage.DataMovement/tests/StartTransferUploadTests.cs index 037d05ba0cced..c01fd5e46c5d3 100644 --- a/sdk/storage/Azure.Storage.DataMovement/tests/StartTransferUploadTests.cs +++ b/sdk/storage/Azure.Storage.DataMovement/tests/StartTransferUploadTests.cs @@ -3,8 +3,6 @@ extern alias DMBlobs; using System; -using System.Collections.Generic; -using System.Drawing; using System.IO; using System.Linq; using System.Threading; @@ -15,8 +13,6 @@ using Azure.Storage.Blobs.Specialized; using Azure.Storage.Blobs.Tests; using DMBlobs::Azure.Storage.DataMovement.Blobs; -using Microsoft.CodeAnalysis; -using Microsoft.Extensions.Options; using NUnit.Framework; namespace Azure.Storage.DataMovement.Tests From cd5874523c1bc48aeaba3ee16cfa3992ad25baa6 Mon Sep 17 00:00:00 2001 From: Jocelyn Schreppler Date: Mon, 23 Oct 2023 10:51:30 -0400 Subject: [PATCH 5/5] test proxy --- sdk/storage/Azure.Storage.DataMovement/assets.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/storage/Azure.Storage.DataMovement/assets.json b/sdk/storage/Azure.Storage.DataMovement/assets.json index 4727d3510fdfc..f8bf73ec1b3bc 100644 --- a/sdk/storage/Azure.Storage.DataMovement/assets.json +++ b/sdk/storage/Azure.Storage.DataMovement/assets.json @@ -2,5 +2,5 @@ "AssetsRepo": "Azure/azure-sdk-assets", "AssetsRepoPrefixPath": "net", "TagPrefix": "net/storage/Azure.Storage.DataMovement", - "Tag": "net/storage/Azure.Storage.DataMovement_0682ba876f" + "Tag": "net/storage/Azure.Storage.DataMovement_d8993077de" }