Skip to content

Commit

Permalink
[Storage][DataMovement] Cleanup after recent changes to checkpointer (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
jalauzon-msft authored Oct 25, 2023
1 parent 92d30a9 commit c9ec533
Show file tree
Hide file tree
Showing 58 changed files with 161 additions and 427 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -283,12 +283,12 @@ protected override async Task<bool> DeleteIfExistsAsync(CancellationToken cancel
return await BlobClient.DeleteIfExistsAsync(cancellationToken: cancellationToken).ConfigureAwait(false);
}

public override StorageResourceCheckpointData GetSourceCheckpointData()
protected override StorageResourceCheckpointData GetSourceCheckpointData()
{
return new BlobSourceCheckpointData(BlobType.Append);
}

public override StorageResourceCheckpointData GetDestinationCheckpointData()
protected override StorageResourceCheckpointData GetDestinationCheckpointData()
{
return new BlobDestinationCheckpointData(
BlobType.Append,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,9 @@
<Compile Include="$(AzureStorageDataMovementSharedSources)Errors.DataMovement.cs" LinkBase="Shared\DataMovement" />
<Compile Include="$(AzureStorageDataMovementSharedSources)DataMovementConstants.cs" LinkBase="Shared\DataMovement" />
<Compile Include="$(AzureStorageDataMovementSharedSources)DataTransferStatusInternal.cs" LinkBase="Shared\DataMovement" />
<Compile Include="$(AzureStorageDataMovementSharedSources)JobPlanExtensions.cs" LinkBase="Shared\DataMovement" />
<Compile Include="$(AzureStorageDataMovementSharedSources)ResponseExtensions.cs" LinkBase="Shared\DataMovement" />
<Compile Include="$(AzureStorageDataMovementSharedSources)LocalTransferCheckpointer.cs" LinkBase="Shared\DataMovement" />
<Compile Include="$(AzureStorageDataMovementSharedSources)StorageResourceItemInternal.cs" LinkBase="Shared\DataMovement" />
<Compile Include="$(AzureStorageDataMovementSharedSources)StorageResourceContainerInternal.cs" LinkBase="Shared\DataMovement" />
<Compile Include="$(AzureStorageDataMovementSharedSources)TransferCheckpointer.cs" LinkBase="Shared\DataMovement" />
</ItemGroup>
<ItemGroup>
<Compile Include="$(AzureStorageDataMovementSharedSources)\JobPlan\*" LinkBase="Shared\DataMovement\JobPlan" />
</ItemGroup>
<ItemGroup>
<Compile Include="$(AzureStorageBlobsSharedSources)BlobErrors.cs" LinkBase="Shared\Blobs" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
using System.Text;
using Azure.Core;
using Azure.Storage.Blobs.Models;
using static Azure.Storage.DataMovement.JobPlanExtensions;
using Metadata = System.Collections.Generic.IDictionary<string, string>;
using Tags = System.Collections.Generic.IDictionary<string, string>;

Expand Down Expand Up @@ -73,7 +72,7 @@ public BlobDestinationCheckpointData(
_cpkScopeBytes = CpkScope != default ? Encoding.UTF8.GetBytes(CpkScope) : Array.Empty<byte>();
}

public override void Serialize(Stream stream)
protected override void Serialize(Stream stream)
{
Argument.AssertNotNull(stream, nameof(stream));

Expand All @@ -87,31 +86,31 @@ public override void Serialize(Stream stream)
writer.Write((byte)BlobType);

// ContentType offset/length
WriteVariableLengthFieldInfo(writer, _contentTypeBytes.Length, ref currentVariableLengthIndex);
writer.WriteVariableLengthFieldInfo(_contentTypeBytes.Length, ref currentVariableLengthIndex);

// ContentEncoding offset/length
WriteVariableLengthFieldInfo(writer, _contentEncodingBytes.Length, ref currentVariableLengthIndex);
writer.WriteVariableLengthFieldInfo(_contentEncodingBytes.Length, ref currentVariableLengthIndex);

// ContentLanguage offset/length
WriteVariableLengthFieldInfo(writer, _contentLanguageBytes.Length, ref currentVariableLengthIndex);
writer.WriteVariableLengthFieldInfo(_contentLanguageBytes.Length, ref currentVariableLengthIndex);

// ContentDisposition offset/length
WriteVariableLengthFieldInfo(writer, _contentDispositionBytes.Length, ref currentVariableLengthIndex);
writer.WriteVariableLengthFieldInfo(_contentDispositionBytes.Length, ref currentVariableLengthIndex);

// CacheControl offset/length
WriteVariableLengthFieldInfo(writer, _cacheControlBytes.Length, ref currentVariableLengthIndex);
writer.WriteVariableLengthFieldInfo(_cacheControlBytes.Length, ref currentVariableLengthIndex);

// AccessTier
writer.Write((byte)AccessTier.ToJobPlanAccessTier());

// Metadata offset/length
WriteVariableLengthFieldInfo(writer, _metadataBytes.Length, ref currentVariableLengthIndex);
writer.WriteVariableLengthFieldInfo(_metadataBytes.Length, ref currentVariableLengthIndex);

// Tags offset/length
WriteVariableLengthFieldInfo(writer, _tagsBytes.Length, ref currentVariableLengthIndex);
writer.WriteVariableLengthFieldInfo(_tagsBytes.Length, ref currentVariableLengthIndex);

// CpkScope offset/length
WriteVariableLengthFieldInfo(writer, _cpkScopeBytes.Length, ref currentVariableLengthIndex);
writer.WriteVariableLengthFieldInfo(_cpkScopeBytes.Length, ref currentVariableLengthIndex);

writer.Write(_contentTypeBytes);
writer.Write(_contentEncodingBytes);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ public BlobSourceCheckpointData(BlobType blobType)

public override int Length => DataMovementBlobConstants.SourceCheckpointData.DataSize;

public override void Serialize(Stream stream)
protected override void Serialize(Stream stream)
{
Argument.AssertNotNull(stream, nameof(stream));
BinaryWriter writer = new BinaryWriter(stream);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,13 +129,13 @@ protected override async IAsyncEnumerable<StorageResource> GetStorageResourcesAs
}
}

public override StorageResourceCheckpointData GetSourceCheckpointData()
protected override StorageResourceCheckpointData GetSourceCheckpointData()
{
// Source blob type does not matter for container
return new BlobSourceCheckpointData(BlobType.Block);
}

public override StorageResourceCheckpointData GetDestinationCheckpointData()
protected override StorageResourceCheckpointData GetDestinationCheckpointData()
{
return new BlobDestinationCheckpointData(
_options?.BlobType ?? BlobType.Block,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -325,12 +325,12 @@ protected override async Task<bool> DeleteIfExistsAsync(CancellationToken cancel
return await BlobClient.DeleteIfExistsAsync(cancellationToken: cancellationToken).ConfigureAwait(false);
}

public override StorageResourceCheckpointData GetSourceCheckpointData()
protected override StorageResourceCheckpointData GetSourceCheckpointData()
{
return new BlobSourceCheckpointData(BlobType.Block);
}

public override StorageResourceCheckpointData GetDestinationCheckpointData()
protected override StorageResourceCheckpointData GetDestinationCheckpointData()
{
return new BlobDestinationCheckpointData(
BlobType.Block,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -287,12 +287,12 @@ protected override async Task<bool> DeleteIfExistsAsync(CancellationToken cancel
return await BlobClient.DeleteIfExistsAsync(cancellationToken: cancellationToken).ConfigureAwait(false);
}

public override StorageResourceCheckpointData GetSourceCheckpointData()
protected override StorageResourceCheckpointData GetSourceCheckpointData()
{
return new BlobSourceCheckpointData(BlobType.Page);
}

public override StorageResourceCheckpointData GetDestinationCheckpointData()
protected override StorageResourceCheckpointData GetDestinationCheckpointData()
{
return new BlobDestinationCheckpointData(
BlobType.Page,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
<Compile Include="$(AzureStorageSharedTestSources)\RandomExtensions.cs" LinkBase="Shared\Storage" />
</ItemGroup>
<ItemGroup>
<Compile Include="$(AzureStorageDataMovementTestSharedSources)CheckpointerTesting.cs" LinkBase="Shared\DataMovement" />
<Compile Include="$(AzureStorageDataMovementTestSharedSources)DisposingLocalDirectory.cs" LinkBase="Shared\DataMovement" />
<Compile Include="$(AzureStorageDataMovementTestSharedSources)TransferUtility.cs" LinkBase="Shared\DataMovement" />
</ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,17 +28,6 @@ private enum StorageResourceType
Local
}

private static string ToResourceId(StorageResourceType type)
{
return type switch
{
StorageResourceType.BlockBlob => "BlockBlob",
StorageResourceType.PageBlob => "PageBlob",
StorageResourceType.AppendBlob => "AppendBlob",
_ => throw new NotImplementedException(),
};
}

private static string ToProviderId(StorageResourceType type)
{
return type switch
Expand Down Expand Up @@ -91,12 +80,9 @@ private static byte[] GetBytes(BlobCheckpointData checkpointData)
}

private static Mock<DataTransferProperties> GetProperties(
string checkpointerPath,
string transferId,
string sourcePath,
string destinationPath,
string sourceResourceId,
string destinationResourceId,
string sourceProviderId,
string destinationProviderId,
bool isContainer,
Expand All @@ -105,11 +91,8 @@ private static Mock<DataTransferProperties> GetProperties(
{
var mock = new Mock<DataTransferProperties>(MockBehavior.Strict);
mock.Setup(p => p.TransferId).Returns(transferId);
mock.Setup(p => p.Checkpointer).Returns(new TransferCheckpointStoreOptions(checkpointerPath));
mock.Setup(p => p.SourceUri).Returns(new Uri(sourcePath));
mock.Setup(p => p.DestinationUri).Returns(new Uri(destinationPath));
mock.Setup(p => p.SourceTypeId).Returns(sourceResourceId);
mock.Setup(p => p.DestinationTypeId).Returns(destinationResourceId);
mock.Setup(p => p.SourceProviderId).Returns(sourceProviderId);
mock.Setup(p => p.DestinationProviderId).Returns(destinationProviderId);
mock.Setup(p => p.SourceCheckpointData).Returns(GetBytes(sourceCheckpointData));
Expand All @@ -122,7 +105,6 @@ private static Mock<DataTransferProperties> GetProperties(
public async Task RehydrateBlockBlob(
[Values(true, false)] bool isSource)
{
using DisposingLocalDirectory test = DisposingLocalDirectory.GetTestDirectory();
string transferId = GetNewTransferId();
string sourcePath = "https://storageaccount.blob.core.windows.net/container/blobsource";
string destinationPath = "https://storageaccount.blob.core.windows.net/container/blobdest";
Expand All @@ -132,12 +114,9 @@ public async Task RehydrateBlockBlob(
StorageResourceType destinationType = StorageResourceType.BlockBlob;

DataTransferProperties transferProperties = GetProperties(
test.DirectoryPath,
transferId,
sourcePath,
destinationPath,
ToResourceId(sourceType),
ToResourceId(destinationType),
ToProviderId(sourceType),
ToProviderId(destinationType),
isContainer: false,
Expand All @@ -155,7 +134,6 @@ public async Task RehydrateBlockBlob(
[Test]
public async Task RehydrateBlockBlob_Options()
{
using DisposingLocalDirectory test = DisposingLocalDirectory.GetTestDirectory();
string transferId = GetNewTransferId();
string sourcePath = "https://storageaccount.blob.core.windows.net/container/blobsource";
string destinationPath = "https://storageaccount.blob.core.windows.net/container/blobdest";
Expand All @@ -165,12 +143,9 @@ public async Task RehydrateBlockBlob_Options()

BlobDestinationCheckpointData checkpointData = GetPopulatedDestinationCheckpointData(BlobType.Block, AccessTier.Cool);
DataTransferProperties transferProperties = GetProperties(
test.DirectoryPath,
transferId,
sourcePath,
destinationPath,
ToResourceId(sourceType),
ToResourceId(destinationType),
ToProviderId(sourceType),
ToProviderId(destinationType),
isContainer: false,
Expand All @@ -195,7 +170,6 @@ public async Task RehydrateBlockBlob_Options()
public async Task RehydratePageBlob(
[Values(true, false)] bool isSource)
{
using DisposingLocalDirectory test = DisposingLocalDirectory.GetTestDirectory();
string transferId = GetNewTransferId();
string sourcePath = "https://storageaccount.blob.core.windows.net/container/blobsource";
string destinationPath = "https://storageaccount.blob.core.windows.net/container/blobdest";
Expand All @@ -205,12 +179,9 @@ public async Task RehydratePageBlob(
StorageResourceType destinationType = StorageResourceType.PageBlob;

DataTransferProperties transferProperties = GetProperties(
test.DirectoryPath,
transferId,
sourcePath,
destinationPath,
ToResourceId(sourceType),
ToResourceId(destinationType),
ToProviderId(sourceType),
ToProviderId(destinationType),
isContainer: false,
Expand All @@ -228,7 +199,6 @@ public async Task RehydratePageBlob(
[Test]
public async Task RehydratePageBlob_Options()
{
using DisposingLocalDirectory test = DisposingLocalDirectory.GetTestDirectory();
string transferId = GetNewTransferId();
string sourcePath = "https://storageaccount.blob.core.windows.net/container/blobsource";
string destinationPath = "https://storageaccount.blob.core.windows.net/container/blobdest";
Expand All @@ -238,12 +208,9 @@ public async Task RehydratePageBlob_Options()

BlobDestinationCheckpointData checkpointData = GetPopulatedDestinationCheckpointData(BlobType.Page, AccessTier.P30);
DataTransferProperties transferProperties = GetProperties(
test.DirectoryPath,
transferId,
sourcePath,
destinationPath,
ToResourceId(sourceType),
ToResourceId(destinationType),
ToProviderId(sourceType),
ToProviderId(destinationType),
isContainer: false,
Expand All @@ -268,7 +235,6 @@ public async Task RehydratePageBlob_Options()
public async Task RehydrateAppendBlob(
[Values(true, false)] bool isSource)
{
using DisposingLocalDirectory test = DisposingLocalDirectory.GetTestDirectory();
string transferId = GetNewTransferId();
string sourcePath = "https://storageaccount.blob.core.windows.net/container/blobsource";
string destinationPath = "https://storageaccount.blob.core.windows.net/container/blobdest";
Expand All @@ -278,12 +244,9 @@ public async Task RehydrateAppendBlob(
StorageResourceType destinationType = StorageResourceType.AppendBlob;

DataTransferProperties transferProperties = GetProperties(
test.DirectoryPath,
transferId,
sourcePath,
destinationPath,
ToResourceId(sourceType),
ToResourceId(destinationType),
ToProviderId(sourceType),
ToProviderId(destinationType),
isContainer: false,
Expand All @@ -301,7 +264,6 @@ public async Task RehydrateAppendBlob(
[Test]
public async Task RehydrateAppendBlob_Options()
{
using DisposingLocalDirectory test = DisposingLocalDirectory.GetTestDirectory();
string transferId = GetNewTransferId();
string sourcePath = "https://storageaccount.blob.core.windows.net/container/blobsource";
string destinationPath = "https://storageaccount.blob.core.windows.net/container/blobdest";
Expand All @@ -311,12 +273,9 @@ public async Task RehydrateAppendBlob_Options()

BlobDestinationCheckpointData checkpointData = GetPopulatedDestinationCheckpointData(BlobType.Append, accessTier: default);
DataTransferProperties transferProperties = GetProperties(
test.DirectoryPath,
transferId,
sourcePath,
destinationPath,
ToResourceId(sourceType),
ToResourceId(destinationType),
ToProviderId(sourceType),
ToProviderId(destinationType),
isContainer: false,
Expand All @@ -341,7 +300,6 @@ public async Task RehydrateAppendBlob_Options()
public async Task RehydrateBlobContainer(
[Values(true, false)] bool isSource)
{
using DisposingLocalDirectory test = DisposingLocalDirectory.GetTestDirectory();
string transferId = GetNewTransferId();
List<string> sourcePaths = new List<string>();
string sourceParentPath = "https://storageaccount.blob.core.windows.net/sourcecontainer";
Expand All @@ -361,12 +319,9 @@ public async Task RehydrateBlobContainer(
string originalPath = isSource ? sourceParentPath : destinationParentPath;

DataTransferProperties transferProperties = GetProperties(
test.DirectoryPath,
transferId,
sourceParentPath,
destinationParentPath,
ToResourceId(sourceType),
ToResourceId(destinationType),
ToProviderId(sourceType),
ToProviderId(destinationType),
isContainer: true,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,12 @@ protected override async IAsyncEnumerable<StorageResource> GetStorageResourcesAs
}
}

public override StorageResourceCheckpointData GetSourceCheckpointData()
protected override StorageResourceCheckpointData GetSourceCheckpointData()
{
return new ShareFileSourceCheckpointData();
}

public override StorageResourceCheckpointData GetDestinationCheckpointData()
protected override StorageResourceCheckpointData GetDestinationCheckpointData()
{
return new ShareFileDestinationCheckpointData();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ internal class ShareFileDestinationCheckpointData : StorageResourceCheckpointDat
{
public override int Length => 0;

public override void Serialize(Stream stream)
protected override void Serialize(Stream stream)
{
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ internal class ShareFileSourceCheckpointData : StorageResourceCheckpointData
{
public override int Length => 0;

public override void Serialize(Stream stream)
protected override void Serialize(Stream stream)
{
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,12 +204,12 @@ protected override async Task<StorageResourceReadStreamResult> ReadStreamAsync(
return response.Value.ToStorageResourceReadStreamResult();
}

public override StorageResourceCheckpointData GetSourceCheckpointData()
protected override StorageResourceCheckpointData GetSourceCheckpointData()
{
return new ShareFileSourceCheckpointData();
}

public override StorageResourceCheckpointData GetDestinationCheckpointData()
protected override StorageResourceCheckpointData GetDestinationCheckpointData()
{
return new ShareFileDestinationCheckpointData();
}
Expand Down
Loading

0 comments on commit c9ec533

Please sign in to comment.