diff --git a/sdk/storage/Azure.Storage.DataMovement/assets.json b/sdk/storage/Azure.Storage.DataMovement/assets.json index 4727d3510fdfc..f8bf73ec1b3bc 100644 --- a/sdk/storage/Azure.Storage.DataMovement/assets.json +++ b/sdk/storage/Azure.Storage.DataMovement/assets.json @@ -2,5 +2,5 @@ "AssetsRepo": "Azure/azure-sdk-assets", "AssetsRepoPrefixPath": "net", "TagPrefix": "net/storage/Azure.Storage.DataMovement", - "Tag": "net/storage/Azure.Storage.DataMovement_0682ba876f" + "Tag": "net/storage/Azure.Storage.DataMovement_d8993077de" } diff --git a/sdk/storage/Azure.Storage.DataMovement/tests/Shared/TransferValidator.cs b/sdk/storage/Azure.Storage.DataMovement/tests/Shared/TransferValidator.cs index 70493661580b7..62f3f3bd0f6eb 100644 --- a/sdk/storage/Azure.Storage.DataMovement/tests/Shared/TransferValidator.cs +++ b/sdk/storage/Azure.Storage.DataMovement/tests/Shared/TransferValidator.cs @@ -38,6 +38,13 @@ public async Task TransferAndVerifyAsync( options ??= new DataTransferOptions(); TestEventsRaised testEventsRaised = new TestEventsRaised(options); + if (cancellationToken == default) + { + CancellationTokenSource cts = new(); + cts.CancelAfter(TimeSpan.FromSeconds(10)); + cancellationToken = cts.Token; + } + DataTransfer transfer = await TransferManager.StartTransferAsync( sourceResource, destinationResource, @@ -74,6 +81,13 @@ public async Task TransferAndVerifyAsync( DataTransferOptions options = default, CancellationToken cancellationToken = default) { + if (cancellationToken == default) + { + CancellationTokenSource cts = new(); + cts.CancelAfter(TimeSpan.FromSeconds(10)); + cancellationToken = cts.Token; + } + DataTransfer transfer = await TransferManager.StartTransferAsync( sourceResource, destinationResource, diff --git a/sdk/storage/Azure.Storage.DataMovement/tests/StartTransferUploadTests.cs b/sdk/storage/Azure.Storage.DataMovement/tests/StartTransferUploadTests.cs index f71c78d0961ea..c01fd5e46c5d3 100644 --- a/sdk/storage/Azure.Storage.DataMovement/tests/StartTransferUploadTests.cs +++ b/sdk/storage/Azure.Storage.DataMovement/tests/StartTransferUploadTests.cs @@ -3,7 +3,6 @@ extern alias DMBlobs; using System; -using System.Collections.Generic; using System.IO; using System.Linq; using System.Threading; @@ -14,7 +13,6 @@ using Azure.Storage.Blobs.Specialized; using Azure.Storage.Blobs.Tests; using DMBlobs::Azure.Storage.DataMovement.Blobs; -using Microsoft.CodeAnalysis; using NUnit.Framework; namespace Azure.Storage.DataMovement.Tests @@ -56,97 +54,32 @@ internal DataTransferOptions CopySingleUploadOptions(DataTransferOptions options } #region SingleUpload Block Blob - /// - /// Upload and verify the contents of the blob - /// - /// By default in this function an event arguement will be added to the options event handler - /// to detect when the upload has finished. - /// - /// - /// - /// - /// - private async Task UploadBlockBlobsAndVerify( - BlobContainerClient container, - long size = Constants.KB, - int waitTimeInSec = 30, + private async Task UploadBlockBlobAndVerify( + string filePath, + BlockBlobClient blob, + int size = Constants.KB, TransferManagerOptions transferManagerOptions = default, - int blobCount = 1, - List blobNames = default, - List options = default) + DataTransferOptions options = default, + CancellationToken cancellationToken = default) { - using DisposingLocalDirectory testDirectory = DisposingLocalDirectory.GetTestDirectory(); - - // Populate blobNames list for number of blobs to be created - if (blobNames == default || blobNames?.Count == 0) - { - blobNames ??= new List(); - for (int i = 0; i < blobCount; i++) - { - blobNames.Add(GetNewBlobName()); - } - } - else + using (Stream fs = File.OpenWrite(filePath)) { - // If blobNames is popluated make sure these number of blobs match - Assert.AreEqual(blobCount, blobNames.Count); + await new MemoryStream(GetRandomBuffer(size)).CopyToAsync(fs, bufferSize: Constants.KB, cancellationToken); } - // Populate Options and TestRaisedOptions - List eventRaisedList = TestEventsRaised.PopulateTestOptions(blobCount, ref options); + LocalFileStorageResource sourceResource = new(filePath); + BlockBlobStorageResource destResource = new(blob); - transferManagerOptions ??= new TransferManagerOptions() + await new TransferValidator() { - ErrorHandling = DataTransferErrorMode.ContinueOnFailure - }; - - List uploadedBlobInfo = new List(blobCount); - - // Initialize TransferManager - TransferManager transferManager = new TransferManager(transferManagerOptions); - - // Set up blob to upload - for (int i = 0; i < blobCount; i++) - { - using Stream originalStream = await CreateLimitedMemoryStream(size); - string localSourceFile = Path.Combine(testDirectory.DirectoryPath, GetNewBlobName()); - // create a new file and copy contents of stream into it, and then close the FileStream - // so the StagedUploadAsync call is not prevented from reading using its FileStream. - using (FileStream fileStream = File.OpenWrite(localSourceFile)) - { - await originalStream.CopyToAsync(fileStream); - } - - // Set up destination client - BlockBlobClient destClient = container.GetBlockBlobClient(blobNames[i]); - StorageResourceItem destinationResource = new BlockBlobStorageResource(destClient); - - // Act - StorageResourceItem sourceResource = new LocalFileStorageResource(localSourceFile); - DataTransfer transfer = await transferManager.StartTransferAsync(sourceResource, destinationResource, options[i]); - - uploadedBlobInfo.Add(new VerifyUploadBlobContentInfo( - sourceFile: localSourceFile, - destinationClient: destClient, - eventsRaised: eventRaisedList[i], - dataTransfer: transfer)); - } - - for (int i = 0; i < blobCount; i++) - { - // Assert - Assert.NotNull(uploadedBlobInfo[i].DataTransfer); - CancellationTokenSource tokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(waitTimeInSec)); - await uploadedBlobInfo[i].DataTransfer.WaitForCompletionAsync(tokenSource.Token); - Assert.IsTrue(uploadedBlobInfo[i].DataTransfer.HasCompleted); - - // Verify Upload - await uploadedBlobInfo[i].EventsRaised.AssertSingleCompletedCheck(); - using (FileStream fileStream = File.OpenRead(uploadedBlobInfo[i].LocalPath)) - { - await DownloadAndAssertAsync(fileStream, uploadedBlobInfo[i].DestinationClient); - } - } + TransferManager = new TransferManager(transferManagerOptions) + }.TransferAndVerifyAsync( + sourceResource, + destResource, + async cToken => await blob.OpenReadAsync(cancellationToken: cToken), + cToken => Task.FromResult(File.OpenRead(filePath) as Stream), + options, + cancellationToken); } [RecordedTest] @@ -156,7 +89,9 @@ public async Task LocalToBlockBlob() await using DisposingContainer testContainer = await GetTestContainerAsync(); using DisposingLocalDirectory localDirectory = DisposingLocalDirectory.GetTestDirectory(); - await UploadBlockBlobsAndVerify(testContainer.Container); + await UploadBlockBlobAndVerify( + Path.Combine(localDirectory.DirectoryPath, GetNewBlobName()), + testContainer.Container.GetBlockBlobClient(GetNewBlobName())); } [RecordedTest] @@ -178,12 +113,11 @@ public async Task LocalToBlockBlob_EventHandler() // Arrange await using DisposingContainer testContainer = await GetTestContainerAsync(); - - List optionsList = new List() { options }; - await UploadBlockBlobsAndVerify( - container: testContainer.Container, - blobCount: optionsList.Count, - options: optionsList); + using DisposingLocalDirectory localDirectory = DisposingLocalDirectory.GetTestDirectory(); + await UploadBlockBlobAndVerify( + Path.Combine(localDirectory.DirectoryPath, GetNewBlobName()), + testContainer.Container.GetBlockBlobClient(GetNewBlobName()), + options: options); // Assert Assert.IsTrue(progressSeen); @@ -192,7 +126,7 @@ await UploadBlockBlobsAndVerify( [RecordedTest] public async Task LocalToBlockBlobSize_SmallChunk() { - long fileSize = Constants.KB; + int fileSize = Constants.KB; int waitTimeInSec = 25; DataTransferOptions options = new DataTransferOptions() { @@ -202,13 +136,16 @@ public async Task LocalToBlockBlobSize_SmallChunk() // Arrange await using DisposingContainer testContainer = await GetTestContainerAsync(); - List optionsList = new List() { options }; + using DisposingLocalDirectory localDirectory = DisposingLocalDirectory.GetTestDirectory(); - await UploadBlockBlobsAndVerify( + CancellationTokenSource cts = new(); + cts.CancelAfter(TimeSpan.FromSeconds(waitTimeInSec)); + await UploadBlockBlobAndVerify( + Path.Combine(localDirectory.DirectoryPath, GetNewBlobName()), + testContainer.Container.GetBlockBlobClient(GetNewBlobName()), size: fileSize, - waitTimeInSec: waitTimeInSec, - container: testContainer.Container, - options: optionsList); + options: options, + cancellationToken: cts.Token); } [RecordedTest] @@ -217,12 +154,13 @@ public async Task LocalToBlockBlob_Overwrite_Exists() // Arrange // Create source local file for checking, and source blob await using DisposingContainer testContainer = await GetTestContainerAsync(); - string blobName = GetNewBlobName(); - string localSourceFile = Path.GetTempFileName(); + using DisposingLocalDirectory localDirectory = DisposingLocalDirectory.GetTestDirectory(); int size = Constants.KB; int waitTimeInSec = 10; + // Create blob - BlockBlobClient destClient = await CreateBlockBlob(testContainer.Container, localSourceFile, blobName, size); + BlockBlobClient destClient = testContainer.Container.GetBlockBlobClient(GetNewBlobName()); + await destClient.UploadAsync(new MemoryStream(GetRandomBuffer(size))); // Act // Create options bag to overwrite any existing destination. @@ -230,16 +168,15 @@ public async Task LocalToBlockBlob_Overwrite_Exists() { CreationPreference = StorageResourceCreationPreference.OverwriteIfExists, }; - List optionsList = new List() { options }; - List blobNames = new List() { blobName }; - // Start transfer and await for completion. - await UploadBlockBlobsAndVerify( - container: testContainer.Container, + CancellationTokenSource cts = new(); + cts.CancelAfter(TimeSpan.FromSeconds(waitTimeInSec)); + await UploadBlockBlobAndVerify( + Path.Combine(localDirectory.DirectoryPath, GetNewBlobName()), + destClient, size: size, - waitTimeInSec: waitTimeInSec, - blobNames: blobNames, - options: optionsList); + options: options, + cancellationToken: cts.Token); } [RecordedTest] @@ -248,6 +185,7 @@ public async Task LocalToBlockBlob_Overwrite_NotExists() // Arrange // Create source local file for checking, and source blob await using DisposingContainer testContainer = await GetTestContainerAsync(); + using DisposingLocalDirectory localDirectory = DisposingLocalDirectory.GetTestDirectory(); int size = Constants.KB; int waitTimeInSec = 10; @@ -257,14 +195,15 @@ public async Task LocalToBlockBlob_Overwrite_NotExists() { CreationPreference = StorageResourceCreationPreference.OverwriteIfExists, }; - List optionsList = new List() { options }; - // Start transfer and await for completion. - await UploadBlockBlobsAndVerify( - container: testContainer.Container, + CancellationTokenSource cts = new(); + cts.CancelAfter(TimeSpan.FromSeconds(waitTimeInSec)); + await UploadBlockBlobAndVerify( + Path.Combine(localDirectory.DirectoryPath, GetNewBlobName()), + testContainer.Container.GetBlockBlobClient(GetNewBlobName()), size: size, - waitTimeInSec: waitTimeInSec, - options: optionsList); + options: options, + cancellationToken: cts.Token); } [RecordedTest] @@ -375,11 +314,14 @@ public async Task LocalToBlockBlob_SmallSize(long fileSize, int waitTimeInSec) { // Arrange await using DisposingContainer testContainer = await GetTestContainerAsync(); + using DisposingLocalDirectory localDirectory = DisposingLocalDirectory.GetTestDirectory(); - await UploadBlockBlobsAndVerify( - container: testContainer.Container, - size: fileSize, - waitTimeInSec: waitTimeInSec); + CancellationTokenSource cts = new(); + cts.CancelAfter(TimeSpan.FromSeconds(waitTimeInSec)); + await UploadBlockBlobAndVerify( + Path.Combine(localDirectory.DirectoryPath, GetNewBlobName()), + testContainer.Container.GetBlockBlobClient(GetNewBlobName()), + cancellationToken: cts.Token); } [Ignore("These tests currently take 40+ mins for little additional coverage")] @@ -389,15 +331,19 @@ await UploadBlockBlobsAndVerify( [TestCase(500 * Constants.MB, 200)] [TestCase(700 * Constants.MB, 200)] [TestCase(Constants.GB, 1500)] - public async Task LocalToBlockBlob_LargeSize(long fileSize, int waitTimeInSec) + public async Task LocalToBlockBlob_LargeSize(int fileSize, int waitTimeInSec) { // Arrange await using DisposingContainer testContainer = await GetTestContainerAsync(); + using DisposingLocalDirectory localDirectory = DisposingLocalDirectory.GetTestDirectory(); - await UploadBlockBlobsAndVerify( - container: testContainer.Container, + CancellationTokenSource cts = new(); + cts.CancelAfter(TimeSpan.FromSeconds(waitTimeInSec)); + await UploadBlockBlobAndVerify( + Path.Combine(localDirectory.DirectoryPath, GetNewBlobName()), + testContainer.Container.GetBlockBlobClient(GetNewBlobName()), size: fileSize, - waitTimeInSec: waitTimeInSec); + cancellationToken: cts.Token); } [RecordedTest] @@ -406,10 +352,11 @@ await UploadBlockBlobsAndVerify( [TestCase(1, 4 * Constants.KB, 60)] [TestCase(2, 4 * Constants.KB, 60)] [TestCase(4, 16 * Constants.KB, 60)] - public async Task LocalToBlockBlob_SmallConcurrency(int concurrency, long size, int waitTimeInSec) + public async Task LocalToBlockBlob_SmallConcurrency(int concurrency, int size, int waitTimeInSec) { // Arrange await using DisposingContainer testContainer = await GetTestContainerAsync(); + using DisposingLocalDirectory localDirectory = DisposingLocalDirectory.GetTestDirectory(); TransferManagerOptions managerOptions = new TransferManagerOptions() { @@ -422,14 +369,16 @@ public async Task LocalToBlockBlob_SmallConcurrency(int concurrency, long size, InitialTransferSize = 512, MaximumTransferChunkSize = 512, }; - List optionsList = new List { options }; - await UploadBlockBlobsAndVerify( - container: testContainer.Container, + CancellationTokenSource cts = new(); + cts.CancelAfter(TimeSpan.FromSeconds(waitTimeInSec)); + await UploadBlockBlobAndVerify( + Path.Combine(localDirectory.DirectoryPath, GetNewBlobName()), + testContainer.Container.GetBlockBlobClient(GetNewBlobName()), size: size, - waitTimeInSec: waitTimeInSec, transferManagerOptions: managerOptions, - options: optionsList); + options: options, + cancellationToken: cts.Token); } [Ignore("These tests currently take 40+ mins for little additional coverage")] @@ -444,6 +393,7 @@ public async Task LocalToBlockBlob_LargeConcurrency(int concurrency, int size, i { // Arrange await using DisposingContainer testContainer = await GetTestContainerAsync(); + using DisposingLocalDirectory localDirectory = DisposingLocalDirectory.GetTestDirectory(); TransferManagerOptions managerOptions = new TransferManagerOptions() { @@ -451,11 +401,14 @@ public async Task LocalToBlockBlob_LargeConcurrency(int concurrency, int size, i MaximumConcurrency = concurrency, }; - await UploadBlockBlobsAndVerify( - container: testContainer.Container, + CancellationTokenSource cts = new(); + cts.CancelAfter(TimeSpan.FromSeconds(waitTimeInSec)); + await UploadBlockBlobAndVerify( + Path.Combine(localDirectory.DirectoryPath, GetNewBlobName()), + testContainer.Container.GetBlockBlobClient(GetNewBlobName()), size: size, - waitTimeInSec: waitTimeInSec, - transferManagerOptions: managerOptions); + transferManagerOptions: managerOptions, + cancellationToken: cts.Token); } [Ignore("https://github.com/Azure/azure-sdk-for-net/issues/33003")] @@ -467,16 +420,22 @@ await UploadBlockBlobsAndVerify( [TestCase(32, Constants.KB, 30)] [TestCase(2, 2 * Constants.KB, 30)] [TestCase(6, 2 * Constants.KB, 30)] - public async Task LocalToBlockBlob_SmallMultiple(int blobCount, long fileSize, int waitTimeInSec) + public async Task LocalToBlockBlob_SmallMultiple(int blobCount, int fileSize, int waitTimeInSec) { // Arrange await using DisposingContainer testContainer = await GetTestContainerAsync(); + using DisposingLocalDirectory localDirectory = DisposingLocalDirectory.GetTestDirectory(); - await UploadBlockBlobsAndVerify( - container: testContainer.Container, - size: fileSize, - waitTimeInSec: waitTimeInSec, - blobCount: blobCount); + foreach (var _ in Enumerable.Range(0, blobCount)) + { + CancellationTokenSource cts = new(); + cts.CancelAfter(TimeSpan.FromSeconds(waitTimeInSec)); + await UploadBlockBlobAndVerify( + Path.Combine(localDirectory.DirectoryPath, GetNewBlobName()), + testContainer.Container.GetBlockBlobClient(GetNewBlobName()), + size: fileSize, + cancellationToken: cts.Token); + } } [Ignore("These tests currently take 40+ mins for little additional coverage")] @@ -486,112 +445,52 @@ await UploadBlockBlobsAndVerify( [TestCase(6, 257 * Constants.MB, 400)] [TestCase(2, Constants.GB, 1000)] [TestCase(3, Constants.GB, 2000)] - public async Task LocalToBlockBlob_LargeMultiple(int blobCount, long fileSize, int waitTimeInSec) + public async Task LocalToBlockBlob_LargeMultiple(int blobCount, int fileSize, int waitTimeInSec) { // Arrange await using DisposingContainer testContainer = await GetTestContainerAsync(); + using DisposingLocalDirectory localDirectory = DisposingLocalDirectory.GetTestDirectory(); - await UploadBlockBlobsAndVerify( - container: testContainer.Container, - size: fileSize, - waitTimeInSec: waitTimeInSec, - blobCount: blobCount); + foreach (var _ in Enumerable.Range(0, blobCount)) + { + CancellationTokenSource cts = new(); + cts.CancelAfter(TimeSpan.FromSeconds(waitTimeInSec)); + await UploadBlockBlobAndVerify( + Path.Combine(localDirectory.DirectoryPath, GetNewBlobName()), + testContainer.Container.GetBlockBlobClient(GetNewBlobName()), + size: fileSize, + cancellationToken: cts.Token); + } } #endregion SingleUpload Block Blob #region SingleUpload Page Blob - /// - /// Upload and verify the contents of the blob - /// - /// By default in this function an event arguement will be added to the options event handler - /// to detect when the upload has finished. - /// - /// - /// - /// - /// - private async Task UploadPageBlobsAndVerify( - BlobContainerClient container, + private async Task UploadPageBlobAndVerify( + string filePath, + PageBlobClient blob, long size = Constants.KB, - int waitTimeInSec = 30, - int blobCount = 1, TransferManagerOptions transferManagerOptions = default, - List blobNames = default, - List options = default) + DataTransferOptions options = default, + CancellationToken cancellationToken = default) { - using DisposingLocalDirectory testDirectory = DisposingLocalDirectory.GetTestDirectory(); - - // Populate blobNames list for number of blobs to be created - if (blobNames == default || blobNames?.Count == 0) + using (Stream fs = File.OpenWrite(filePath)) { - blobNames ??= new List(); - for (int i = 0; i < blobCount; i++) - { - blobNames.Add(GetNewBlobName()); - } + await (await CreateLimitedMemoryStream(size)).CopyToAsync(fs, bufferSize: Constants.KB, cancellationToken); } - else - { - // If blobNames is popluated make sure these number of blobs match - Assert.AreEqual(blobCount, blobNames.Count); - } - - // Populate Options and TestRaisedOptions - List eventRaisedList = TestEventsRaised.PopulateTestOptions(blobCount, ref options); - - transferManagerOptions ??= new TransferManagerOptions() - { - ErrorHandling = DataTransferErrorMode.ContinueOnFailure - }; - - List uploadedBlobInfo = new List(blobCount); - // Initialize BlobDataController - TransferManager blobDataController = new TransferManager(transferManagerOptions); + LocalFileStorageResource sourceResource = new(filePath); + PageBlobStorageResource destResource = new(blob); - // Set up blob to upload - for (int i = 0; i < blobCount; i++) + await new TransferValidator() { - using Stream originalStream = await CreateLimitedMemoryStream(size); - string localSourceFile = Path.GetTempFileName(); - // create a new file and copy contents of stream into it, and then close the FileStream - // so the StagedUploadAsync call is not prevented from reading using its FileStream. - using (FileStream fileStream = File.Create(localSourceFile)) - { - await originalStream.CopyToAsync(fileStream); - } - - // Set up destination client - PageBlobClient destClient = container.GetPageBlobClient(blobNames[i]); - StorageResourceItem destinationResource = new PageBlobStorageResource(destClient); - - // Act - StorageResourceItem sourceResource = new LocalFileStorageResource(localSourceFile); - DataTransfer transfer = await blobDataController.StartTransferAsync(sourceResource, destinationResource, options[i]); - - uploadedBlobInfo.Add(new VerifyUploadBlobContentInfo( - localSourceFile, - destClient, - eventRaisedList[i], - transfer)); - } - - for (int i = 0; i < blobCount; i++) - { - // Assert - Assert.NotNull(uploadedBlobInfo[i].DataTransfer); - CancellationTokenSource tokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(waitTimeInSec)); - await uploadedBlobInfo[i].DataTransfer.WaitForCompletionAsync(tokenSource.Token); - Assert.IsTrue(uploadedBlobInfo[i].DataTransfer.HasCompleted); - Assert.AreEqual(DataTransferState.Completed, uploadedBlobInfo[i].DataTransfer.TransferStatus.State); - - // Verify Upload - await uploadedBlobInfo[i].EventsRaised.AssertSingleCompletedCheck(); - using (FileStream fileStream = File.OpenRead(uploadedBlobInfo[i].LocalPath)) - { - await DownloadAndAssertAsync(fileStream, uploadedBlobInfo[i].DestinationClient); - } - } + TransferManager = new TransferManager(transferManagerOptions) + }.TransferAndVerifyAsync( + sourceResource, + destResource, + async cToken => await blob.OpenReadAsync(cancellationToken: cToken), + cToken => Task.FromResult(File.OpenRead(filePath) as Stream), + options, + cancellationToken); } [RecordedTest] @@ -599,7 +498,10 @@ public async Task LocalToPageBlob() { // Arrange await using DisposingContainer testContainer = await GetTestContainerAsync(); - await UploadPageBlobsAndVerify(testContainer.Container); + using DisposingLocalDirectory localDirectory = DisposingLocalDirectory.GetTestDirectory(); + await UploadPageBlobAndVerify( + Path.Combine(localDirectory.DirectoryPath, GetNewBlobName()), + testContainer.Container.GetPageBlobClient(GetNewBlobName())); } [RecordedTest] @@ -608,12 +510,15 @@ public async Task LocalToPageBlob_Overwrite_Exists() // Arrange // Create source local file for checking, and source blob await using DisposingContainer testContainer = await GetTestContainerAsync(); + using DisposingLocalDirectory localDirectory = DisposingLocalDirectory.GetTestDirectory(); string blobName = GetNewBlobName(); - string localSourceFile = Path.GetTempFileName(); int size = Constants.KB; int waitTimeInSec = 10; + // Create blob - PageBlobClient destClient = await CreatePageBlob(testContainer.Container, localSourceFile, blobName, size); + PageBlobClient destClient = testContainer.Container.GetPageBlobClient(blobName); + await destClient.CreateAsync(size); + await destClient.UploadPagesAsync(new MemoryStream(GetRandomBuffer(size)), 0L); // Act // Create options bag to overwrite any existing destination. @@ -621,17 +526,16 @@ public async Task LocalToPageBlob_Overwrite_Exists() { CreationPreference = StorageResourceCreationPreference.OverwriteIfExists, }; - List optionsList = new List() { options }; - List blobNames = new List() { blobName }; - TransferManager transferManager = new TransferManager(); // Start transfer and await for completion. - await UploadPageBlobsAndVerify( - container: testContainer.Container, + CancellationTokenSource cts = new(); + cts.CancelAfter(TimeSpan.FromSeconds(waitTimeInSec)); + await UploadPageBlobAndVerify( + Path.Combine(localDirectory.DirectoryPath, GetNewBlobName()), + destClient, size: size, - waitTimeInSec: waitTimeInSec, - blobNames: blobNames, - options: optionsList); + options: options, + cancellationToken: cts.Token); } [RecordedTest] @@ -640,6 +544,7 @@ public async Task LocalToPageBlob_Overwrite_NotExists() // Arrange // Create source local file for checking, and source blob await using DisposingContainer testContainer = await GetTestContainerAsync(); + using DisposingLocalDirectory localDirectory = DisposingLocalDirectory.GetTestDirectory(); int size = Constants.KB; int waitTimeInSec = 10; @@ -649,15 +554,16 @@ public async Task LocalToPageBlob_Overwrite_NotExists() { CreationPreference = StorageResourceCreationPreference.OverwriteIfExists, }; - List optionsList = new List() { options }; - TransferManager transferManager = new TransferManager(); // Start transfer and await for completion. - await UploadPageBlobsAndVerify( - container: testContainer.Container, + CancellationTokenSource cts = new(); + cts.CancelAfter(TimeSpan.FromSeconds(waitTimeInSec)); + await UploadPageBlobAndVerify( + Path.Combine(localDirectory.DirectoryPath, GetNewBlobName()), + testContainer.Container.GetPageBlobClient(GetNewBlobName()), size: size, - waitTimeInSec: waitTimeInSec, - options: optionsList); + options: options, + cancellationToken: cts.Token); } [RecordedTest] @@ -767,14 +673,16 @@ public async Task LocalToPageBlob_SmallSize(long fileSize, int waitTimeInSec) DataTransferOptions options = new DataTransferOptions(); // Arrange - var blobName = GetNewBlobName(); await using DisposingContainer testContainer = await GetTestContainerAsync(); - PageBlobClient destClient = testContainer.Container.GetPageBlobClient(blobName); + using DisposingLocalDirectory localDirectory = DisposingLocalDirectory.GetTestDirectory(); - await UploadPageBlobsAndVerify( + CancellationTokenSource cts = new(); + cts.CancelAfter(TimeSpan.FromSeconds(waitTimeInSec)); + await UploadPageBlobAndVerify( + Path.Combine(localDirectory.DirectoryPath, GetNewBlobName()), + testContainer.Container.GetPageBlobClient(GetNewBlobName()), size: fileSize, - waitTimeInSec: waitTimeInSec, - container: testContainer.Container); + cancellationToken: cts.Token); } [Ignore("These tests currently take 40+ mins for little additional coverage")] @@ -789,14 +697,16 @@ public async Task LocalToPageBlob_LargeSize(long fileSize, int waitTimeInSec) DataTransferOptions options = new DataTransferOptions(); // Arrange - var blobName = GetNewBlobName(); await using DisposingContainer testContainer = await GetTestContainerAsync(); - PageBlobClient destClient = testContainer.Container.GetPageBlobClient(blobName); + using DisposingLocalDirectory localDirectory = DisposingLocalDirectory.GetTestDirectory(); - await UploadPageBlobsAndVerify( + CancellationTokenSource cts = new(); + cts.CancelAfter(TimeSpan.FromSeconds(waitTimeInSec)); + await UploadPageBlobAndVerify( + Path.Combine(localDirectory.DirectoryPath, GetNewBlobName()), + testContainer.Container.GetPageBlobClient(GetNewBlobName()), size: fileSize, - waitTimeInSec: waitTimeInSec, - container: testContainer.Container); + cancellationToken: cts.Token); } [RecordedTest] @@ -807,8 +717,8 @@ await UploadPageBlobsAndVerify( public async Task LocalToPageBlob_SmallConcurrency(int concurrency, int size, int waitTimeInSec) { // Arrange - var blobName = GetNewBlobName(); await using DisposingContainer testContainer = await GetTestContainerAsync(); + using DisposingLocalDirectory localDirectory = DisposingLocalDirectory.GetTestDirectory(); TransferManagerOptions managerOptions = new TransferManagerOptions() { @@ -821,13 +731,15 @@ public async Task LocalToPageBlob_SmallConcurrency(int concurrency, int size, in InitialTransferSize = 512, MaximumTransferChunkSize = 512, }; - List optionsList = new List { options }; - await UploadPageBlobsAndVerify( + CancellationTokenSource cts = new(); + cts.CancelAfter(TimeSpan.FromSeconds(waitTimeInSec)); + await UploadPageBlobAndVerify( + Path.Combine(localDirectory.DirectoryPath, GetNewBlobName()), + testContainer.Container.GetPageBlobClient(GetNewBlobName()), size: size, - waitTimeInSec: waitTimeInSec, - container: testContainer.Container, - options: optionsList); + options: options, + cancellationToken: cts.Token); } [Ignore("These tests currently take 40+ mins for little additional coverage")] @@ -843,8 +755,8 @@ await UploadPageBlobsAndVerify( public async Task LocalToPageBlob_LargeConcurrency(int concurrency, int size, int waitTimeInSec) { // Arrange - var blobName = GetNewBlobName(); await using DisposingContainer testContainer = await GetTestContainerAsync(); + using DisposingLocalDirectory localDirectory = DisposingLocalDirectory.GetTestDirectory(); TransferManagerOptions managerOptions = new TransferManagerOptions() { @@ -852,10 +764,13 @@ public async Task LocalToPageBlob_LargeConcurrency(int concurrency, int size, in MaximumConcurrency = concurrency, }; - await UploadPageBlobsAndVerify( + CancellationTokenSource cts = new(); + cts.CancelAfter(TimeSpan.FromSeconds(waitTimeInSec)); + await UploadPageBlobAndVerify( + Path.Combine(localDirectory.DirectoryPath, GetNewBlobName()), + testContainer.Container.GetPageBlobClient(GetNewBlobName()), size: size, - waitTimeInSec: waitTimeInSec, - container: testContainer.Container); + cancellationToken: cts.Token); } [Ignore("https://github.com/Azure/azure-sdk-for-net/issues/33003")] @@ -865,16 +780,22 @@ await UploadPageBlobsAndVerify( [TestCase(6, Constants.KB, 10)] [TestCase(2, 2 * Constants.KB, 10)] [TestCase(6, 2 * Constants.KB, 10)] - public async Task LocalToPageBlob_SmallMultiple(int blobCount, long fileSize, int waitTimeInSec) + public async Task LocalToPageBlob_SmallMultiple(int blobCount, int fileSize, int waitTimeInSec) { // Arrange await using DisposingContainer testContainer = await GetTestContainerAsync(); + using DisposingLocalDirectory localDirectory = DisposingLocalDirectory.GetTestDirectory(); - await UploadPageBlobsAndVerify( - size: fileSize, - waitTimeInSec: waitTimeInSec, - container: testContainer.Container, - blobCount: blobCount); + CancellationTokenSource cts = new(); + cts.CancelAfter(TimeSpan.FromSeconds(waitTimeInSec)); + foreach (var _ in Enumerable.Range(0, blobCount)) + { + await UploadPageBlobAndVerify( + Path.Combine(localDirectory.DirectoryPath, GetNewBlobName()), + testContainer.Container.GetPageBlobClient(GetNewBlobName()), + size: fileSize, + cancellationToken: cts.Token); + } } [Ignore("These tests currently take 40+ mins for little additional coverage")] @@ -890,12 +811,18 @@ public async Task LocalToPageBlob_LargeMultiple(int blobCount, long fileSize, in { // Arrange await using DisposingContainer testContainer = await GetTestContainerAsync(); + using DisposingLocalDirectory localDirectory = DisposingLocalDirectory.GetTestDirectory(); - await UploadPageBlobsAndVerify( - size: fileSize, - waitTimeInSec: waitTimeInSec, - container: testContainer.Container, - blobCount: blobCount); + CancellationTokenSource cts = new(); + cts.CancelAfter(TimeSpan.FromSeconds(waitTimeInSec)); + foreach (var _ in Enumerable.Range(0, blobCount)) + { + await UploadPageBlobAndVerify( + Path.Combine(localDirectory.DirectoryPath, GetNewBlobName()), + testContainer.Container.GetPageBlobClient(GetNewBlobName()), + size: fileSize, + cancellationToken: cts.Token); + } } [RecordedTest] @@ -910,109 +837,43 @@ public async Task LocalToPageBlob_SmallChunks() // Arrange await using DisposingContainer testContainer = await GetTestContainerAsync(); + using DisposingLocalDirectory localDirectory = DisposingLocalDirectory.GetTestDirectory(); - List optionsList = new List() { options }; - await UploadPageBlobsAndVerify( - container: testContainer.Container, + await UploadPageBlobAndVerify( + Path.Combine(localDirectory.DirectoryPath, GetNewBlobName()), + testContainer.Container.GetPageBlobClient(GetNewBlobName()), size: size, - blobCount: optionsList.Count, - options: optionsList); + options: options); } #endregion SingleUpload Page Blob #region SingleUpload Append Blob - /// - /// Upload and verify the contents of the blob - /// - /// By default in this function an event arguement will be added to the options event handler - /// to detect when the upload has finished. - /// - /// - /// - /// - /// - private async Task UploadAppendBlobsAndVerify( - BlobContainerClient container, + private async Task UploadAppendBlobAndVerify( + string filePath, + AppendBlobClient blob, long size = Constants.KB, - int waitTimeInSec = 30, - int blobCount = 1, TransferManagerOptions transferManagerOptions = default, - List blobNames = default, - List options = default) + DataTransferOptions options = default, + CancellationToken cancellationToken = default) { - using DisposingLocalDirectory testDirectory = DisposingLocalDirectory.GetTestDirectory(); - - // Populate blobNames list for number of blobs to be created - if (blobNames == default || blobNames?.Count == 0) + using (Stream fs = File.OpenWrite(filePath)) { - blobNames ??= new List(); - for (int i = 0; i < blobCount; i++) - { - blobNames.Add(GetNewBlobName()); - } + await (await CreateLimitedMemoryStream(size)).CopyToAsync(fs, bufferSize: Constants.KB, cancellationToken); } - else - { - // If blobNames is popluated make sure these number of blobs match - Assert.AreEqual(blobCount, blobNames.Count); - } - - // Populate Options and TestRaisedOptions - List eventRaisedList = TestEventsRaised.PopulateTestOptions(blobCount, ref options); - transferManagerOptions ??= new TransferManagerOptions() - { - ErrorHandling = DataTransferErrorMode.ContinueOnFailure - }; - - List uploadedBlobInfo = new List(blobCount); + LocalFileStorageResource sourceResource = new(filePath); + AppendBlobStorageResource destResource = new(blob); - // Initialize BlobDataController - TransferManager blobDataController = new TransferManager(transferManagerOptions); - - // Set up blob to upload - for (int i = 0; i < blobCount; i++) + await new TransferValidator() { - using Stream originalStream = await CreateLimitedMemoryStream(size); - string localSourceFile = Path.GetTempFileName(); - // create a new file and copy contents of stream into it, and then close the FileStream - // so the StagedUploadAsync call is not prevented from reading using its FileStream. - using (FileStream fileStream = File.Create(localSourceFile)) - { - await originalStream.CopyToAsync(fileStream); - } - - // Set up destination client - AppendBlobClient destClient = container.GetAppendBlobClient(blobNames[i]); - StorageResourceItem destinationResource = new AppendBlobStorageResource(destClient); - - // Act - StorageResourceItem sourceResource = new LocalFileStorageResource(localSourceFile); - DataTransfer transfer = await blobDataController.StartTransferAsync(sourceResource, destinationResource, options[i]); - - uploadedBlobInfo.Add(new VerifyUploadBlobContentInfo( - localSourceFile, - destClient, - eventRaisedList[i], - transfer)); - } - - for (int i = 0; i < blobCount; i++) - { - // Assert - Assert.NotNull(uploadedBlobInfo[i].DataTransfer); - CancellationTokenSource tokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(waitTimeInSec)); - await uploadedBlobInfo[i].DataTransfer.WaitForCompletionAsync(tokenSource.Token); - Assert.IsTrue(uploadedBlobInfo[i].DataTransfer.HasCompleted); - Assert.AreEqual(DataTransferState.Completed, uploadedBlobInfo[i].DataTransfer.TransferStatus.State); - - // Verify Upload - await uploadedBlobInfo[i].EventsRaised.AssertSingleCompletedCheck(); - using (FileStream fileStream = File.OpenRead(uploadedBlobInfo[i].LocalPath)) - { - await DownloadAndAssertAsync(fileStream, uploadedBlobInfo[i].DestinationClient); - } - } + TransferManager = new TransferManager(transferManagerOptions) + }.TransferAndVerifyAsync( + sourceResource, + destResource, + async cToken => await blob.OpenReadAsync(cancellationToken: cToken), + cToken => Task.FromResult(File.OpenRead(filePath) as Stream), + options, + cancellationToken); } [RecordedTest] @@ -1020,8 +881,11 @@ public async Task LocalToAppendBlob() { // Arrange await using DisposingContainer testContainer = await GetTestContainerAsync(); + using DisposingLocalDirectory localDirectory = DisposingLocalDirectory.GetTestDirectory(); - await UploadAppendBlobsAndVerify(testContainer.Container); + await UploadAppendBlobAndVerify( + Path.Combine(localDirectory.DirectoryPath, GetNewBlobName()), + testContainer.Container.GetAppendBlobClient(GetNewBlobName())); } [RecordedTest] @@ -1038,13 +902,16 @@ public async Task LocalToAppend_SmallChunk() // Arrange await using DisposingContainer testContainer = await GetTestContainerAsync(publicAccessType: PublicAccessType.BlobContainer); + using DisposingLocalDirectory localDirectory = DisposingLocalDirectory.GetTestDirectory(); - List optionsList = new List() { options }; - await UploadAppendBlobsAndVerify( - testContainer.Container, - waitTimeInSec: waitTimeInSec, + CancellationTokenSource cts = new(); + cts.CancelAfter(TimeSpan.FromSeconds(waitTimeInSec)); + await UploadAppendBlobAndVerify( + Path.Combine(localDirectory.DirectoryPath, GetNewBlobName()), + testContainer.Container.GetAppendBlobClient(GetNewBlobName()), + options: options, size: size, - options: optionsList).ConfigureAwait(false); + cancellationToken: cts.Token).ConfigureAwait(false); } [RecordedTest] @@ -1053,12 +920,12 @@ public async Task LocalToAppendBlob_Overwrite_Exists() // Arrange // Create source local file for checking, and source blob await using DisposingContainer testContainer = await GetTestContainerAsync(); - string blobName = GetNewBlobName(); - string localSourceFile = Path.GetTempFileName(); + using DisposingLocalDirectory localDirectory = DisposingLocalDirectory.GetTestDirectory(); int size = Constants.KB; int waitTimeInSec = 10; // Create blob - AppendBlobClient destClient = await CreateAppendBlob(testContainer.Container, localSourceFile, blobName, size); + AppendBlobClient destClient = testContainer.Container.GetAppendBlobClient(GetNewBlobName()); + await (await CreateLimitedMemoryStream(size)).CopyToAsync(await destClient.OpenWriteAsync(true)); // Act // Create options bag to overwrite any existing destination. @@ -1066,16 +933,16 @@ public async Task LocalToAppendBlob_Overwrite_Exists() { CreationPreference = StorageResourceCreationPreference.OverwriteIfExists, }; - List optionsList = new List() { options }; - List blobNames = new List() { blobName }; // Start transfer and await for completion. - await UploadAppendBlobsAndVerify( - container: testContainer.Container, + CancellationTokenSource cts = new(); + cts.CancelAfter(TimeSpan.FromSeconds(waitTimeInSec)); + await UploadAppendBlobAndVerify( + Path.Combine(localDirectory.DirectoryPath, GetNewBlobName()), + destClient, size: size, - waitTimeInSec: waitTimeInSec, - blobNames: blobNames, - options: optionsList); + options: options, + cancellationToken: cts.Token); } [RecordedTest] @@ -1084,6 +951,7 @@ public async Task LocalToAppendBlob_Overwrite_NotExists() // Arrange // Create source local file for checking, and source blob await using DisposingContainer testContainer = await GetTestContainerAsync(); + using DisposingLocalDirectory localDirectory = DisposingLocalDirectory.GetTestDirectory(); int size = Constants.KB; int waitTimeInSec = 10; @@ -1093,14 +961,16 @@ public async Task LocalToAppendBlob_Overwrite_NotExists() { CreationPreference = StorageResourceCreationPreference.OverwriteIfExists, }; - List optionsList = new List() { options }; // Start transfer and await for completion. - await UploadAppendBlobsAndVerify( - container: testContainer.Container, + CancellationTokenSource cts = new(); + cts.CancelAfter(TimeSpan.FromSeconds(waitTimeInSec)); + await UploadAppendBlobAndVerify( + Path.Combine(localDirectory.DirectoryPath, GetNewBlobName()), + testContainer.Container.GetAppendBlobClient(GetNewBlobName()), size: size, - waitTimeInSec: waitTimeInSec, - options: optionsList); + options: options, + cancellationToken: cts.Token); } [RecordedTest] @@ -1208,14 +1078,16 @@ public async Task LocalToAppendBlob_Failure_Exists() public async Task LocalToAppendBlob_SmallSize(long fileSize, int waitTimeInSec) { // Arrange - var blobName = GetNewBlobName(); await using DisposingContainer testContainer = await GetTestContainerAsync(); - AppendBlobClient destClient = testContainer.Container.GetAppendBlobClient(blobName); + using DisposingLocalDirectory localDirectory = DisposingLocalDirectory.GetTestDirectory(); - await UploadAppendBlobsAndVerify( + CancellationTokenSource cts = new(); + cts.CancelAfter(TimeSpan.FromSeconds(waitTimeInSec)); + await UploadAppendBlobAndVerify( + Path.Combine(localDirectory.DirectoryPath, GetNewBlobName()), + testContainer.Container.GetAppendBlobClient(GetNewBlobName()), size: fileSize, - waitTimeInSec: waitTimeInSec, - container: testContainer.Container); + cancellationToken: cts.Token); } [Ignore("These tests currently take 40+ mins for little additional coverage")] @@ -1229,14 +1101,16 @@ await UploadAppendBlobsAndVerify( public async Task LocalToAppendBlob_LargeSize(long fileSize, int waitTimeInSec) { // Arrange - var blobName = GetNewBlobName(); await using DisposingContainer testContainer = await GetTestContainerAsync(); - AppendBlobClient destClient = testContainer.Container.GetAppendBlobClient(blobName); + using DisposingLocalDirectory localDirectory = DisposingLocalDirectory.GetTestDirectory(); - await UploadAppendBlobsAndVerify( + CancellationTokenSource cts = new(); + cts.CancelAfter(TimeSpan.FromSeconds(waitTimeInSec)); + await UploadAppendBlobAndVerify( + Path.Combine(localDirectory.DirectoryPath, GetNewBlobName()), + testContainer.Container.GetAppendBlobClient(GetNewBlobName()), size: fileSize, - waitTimeInSec: waitTimeInSec, - container: testContainer.Container); + cancellationToken: cts.Token); } [RecordedTest] @@ -1248,18 +1122,14 @@ await UploadAppendBlobsAndVerify( public async Task LocalToAppendBlob_SmallConcurrency(int concurrency, int size, int waitTimeInSec) { // Arrange - var blobName = GetNewBlobName(); await using DisposingContainer testContainer = await GetTestContainerAsync(); - AppendBlobClient destClient = testContainer.Container.GetAppendBlobClient(blobName); + using DisposingLocalDirectory localDirectory = DisposingLocalDirectory.GetTestDirectory(); DataTransferOptions options = new DataTransferOptions() { InitialTransferSize = 512, MaximumTransferChunkSize = 512 }; - List optionsList = new List { options }; - - List blobNames = new List() { blobName }; TransferManagerOptions managerOptions = new TransferManagerOptions() { @@ -1267,13 +1137,14 @@ public async Task LocalToAppendBlob_SmallConcurrency(int concurrency, int size, MaximumConcurrency = concurrency, }; - await UploadAppendBlobsAndVerify( + CancellationTokenSource cts = new(); + cts.CancelAfter(TimeSpan.FromSeconds(waitTimeInSec)); + await UploadAppendBlobAndVerify( + Path.Combine(localDirectory.DirectoryPath, GetNewBlobName()), + testContainer.Container.GetAppendBlobClient(GetNewBlobName()), size: size, - waitTimeInSec: waitTimeInSec, - container: testContainer.Container, - blobCount: blobNames.Count(), - blobNames: blobNames, - options: optionsList); + options: options, + cancellationToken: cts.Token); } [Ignore("These tests currently take 40+ mins for little additional coverage")] @@ -1291,6 +1162,7 @@ public async Task LocalToAppendBlob_LargeConcurrency(int concurrency, int size, { // Arrange await using DisposingContainer testContainer = await GetTestContainerAsync(); + using DisposingLocalDirectory localDirectory = DisposingLocalDirectory.GetTestDirectory(); TransferManagerOptions managerOptions = new TransferManagerOptions() { @@ -1298,11 +1170,14 @@ public async Task LocalToAppendBlob_LargeConcurrency(int concurrency, int size, MaximumConcurrency = concurrency, }; - await UploadAppendBlobsAndVerify( - container: testContainer.Container, + CancellationTokenSource cts = new(); + cts.CancelAfter(TimeSpan.FromSeconds(waitTimeInSec)); + await UploadAppendBlobAndVerify( + Path.Combine(localDirectory.DirectoryPath, GetNewBlobName()), + testContainer.Container.GetAppendBlobClient(GetNewBlobName()), size: size, - waitTimeInSec: waitTimeInSec, - transferManagerOptions: managerOptions); + transferManagerOptions: managerOptions, + cancellationToken: cts.Token); } [Ignore("https://github.com/Azure/azure-sdk-for-net/issues/33003")] @@ -1318,12 +1193,18 @@ public async Task LocalToAppendBlob_SmallMultiple(int blobCount, long fileSize, { // Arrange await using DisposingContainer testContainer = await GetTestContainerAsync(); + using DisposingLocalDirectory localDirectory = DisposingLocalDirectory.GetTestDirectory(); - await UploadAppendBlobsAndVerify( - size: fileSize, - waitTimeInSec: waitTimeInSec, - container: testContainer.Container, - blobCount: blobCount); + CancellationTokenSource cts = new(); + cts.CancelAfter(TimeSpan.FromSeconds(waitTimeInSec)); + foreach (var _ in Enumerable.Range(0, blobCount)) + { + await UploadAppendBlobAndVerify( + Path.Combine(localDirectory.DirectoryPath, GetNewBlobName()), + testContainer.Container.GetAppendBlobClient(GetNewBlobName()), + size: fileSize, + cancellationToken: cts.Token); + } } [Ignore("These tests currently take 40+ mins for little additional coverage")] @@ -1344,12 +1225,18 @@ public async Task LocalToAppendBlob_LargeMultiple(int blobCount, long fileSize, { // Arrange await using DisposingContainer testContainer = await GetTestContainerAsync(); + using DisposingLocalDirectory localDirectory = DisposingLocalDirectory.GetTestDirectory(); - await UploadAppendBlobsAndVerify( - size: fileSize, - waitTimeInSec: waitTimeInSec, - container: testContainer.Container, - blobCount: blobCount); + CancellationTokenSource cts = new(); + cts.CancelAfter(TimeSpan.FromSeconds(waitTimeInSec)); + foreach (var _ in Enumerable.Range(0, blobCount)) + { + await UploadAppendBlobAndVerify( + Path.Combine(localDirectory.DirectoryPath, GetNewBlobName()), + testContainer.Container.GetAppendBlobClient(GetNewBlobName()), + size: fileSize, + cancellationToken: cts.Token); + } } #endregion SingleUpload Append Blob