Skip to content

Commit

Permalink
[Storage][DataMovement] More fixes to resuming transfers (Azure#39271)
Browse files Browse the repository at this point in the history
  • Loading branch information
jalauzon-msft authored and matthohn-msft committed Oct 27, 2023
1 parent 8ef35f3 commit b1bcded
Show file tree
Hide file tree
Showing 12 changed files with 74 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -729,13 +729,20 @@ public async Task<StorageResource> RehydrateAsync(
#endregion

private static ResourceType GetType(string typeId, bool isContainer)
=> typeId switch
{
if (isContainer)
{
"BlockBlob" => isContainer ? ResourceType.BlobContainer : ResourceType.BlockBlob,
"PageBlob" => isContainer ? ResourceType.BlobContainer : ResourceType.PageBlob,
"AppendBlob" => isContainer ? ResourceType.BlobContainer : ResourceType.AppendBlob,
return ResourceType.BlobContainer;
}

return typeId switch
{
"BlockBlob" => ResourceType.BlockBlob,
"PageBlob" => ResourceType.PageBlob,
"AppendBlob" => ResourceType.AppendBlob,
_ => ResourceType.Unknown
};
}

private static ArgumentException BadResourceTypeException(ResourceType resourceType)
=> new ArgumentException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,23 +54,26 @@ internal static async Task<DataTransferProperties> GetDataTransferPropertiesAsyn
header = JobPlanHeader.Deserialize(stream);
}

(string sourceResourceId, string destResourceId) = await checkpointer.GetResourceIdsAsync(
string sourceTypeId = default;
string destinationTypeId = default;
// Only need to get type ids for single transfers
if (!header.IsContainer)
{
(sourceTypeId, destinationTypeId) = await checkpointer.GetResourceIdsAsync(
transferId,
cancellationToken).ConfigureAwait(false);

bool isContainer =
(await checkpointer.CurrentJobPartCountAsync(transferId, cancellationToken).ConfigureAwait(false)) > 1;
}

return new DataTransferProperties
{
TransferId = transferId,
SourceTypeId = sourceResourceId,
SourceTypeId = sourceTypeId,
SourceUri = new Uri(header.ParentSourcePath),
SourceProviderId = header.SourceProviderId,
DestinationTypeId = destResourceId,
DestinationTypeId = destinationTypeId,
DestinationUri = new Uri(header.ParentDestinationPath),
DestinationProviderId = header.DestinationProviderId,
IsContainer = isContainer,
IsContainer = header.IsContainer,
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ public class DataTransferProperties
public virtual string TransferId { get; internal set; }

/// <summary>
/// Contains the Source Scheme of the Storage Resource to rehydrate the StorageResource from.
/// Contains the type id for the source resource to use during rehydration.
/// Will be null if <see cref="IsContainer"/> is true.
/// </summary>
public virtual string SourceTypeId { get; internal set; }

Expand All @@ -36,7 +37,8 @@ public class DataTransferProperties
public virtual string SourceProviderId { get; internal set; }

/// <summary>
/// Contains the Source Scheme of the Storage Resource to rehydrate the StorageResource from.
/// Contains the type id for the destination resource to use during rehydration.
/// Will be null if <see cref="IsContainer"/> is true.
/// </summary>
public virtual string DestinationTypeId { get; internal set; }

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,8 @@ internal static class JobPlanFile
internal const int OperationTypeIndex = CrateTimeIndex + LongSizeInBytes;
internal const int SourceProviderIdIndex = OperationTypeIndex + OneByte;
internal const int DestinationProviderIdIndex = SourceProviderIdIndex + ProviderIdNumBytes;
internal const int EnumerationCompleteIndex = DestinationProviderIdIndex + ProviderIdNumBytes;
internal const int IsContainerIndex = DestinationProviderIdIndex + ProviderIdNumBytes;
internal const int EnumerationCompleteIndex = IsContainerIndex + OneByte;
internal const int JobStatusIndex = EnumerationCompleteIndex + OneByte;
internal const int ParentSourcePathOffsetIndex = JobStatusIndex + IntSizeInBytes;
internal const int ParentSourcePathLengthIndex = ParentSourcePathOffsetIndex + IntSizeInBytes;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ public static async Task<StreamToUriJobPart> ToJobPartAsync(

// Apply credentials to the saved transfer job path
string childSourcePath = header.SourcePath;
string childSourceName = childSourcePath.Substring(sourceResource.Uri.GetPath().Length + 1);
string childSourceName = childSourcePath.Substring(sourceResource.Uri.AbsoluteUri.Length + 1);
string childDestinationPath = header.DestinationPath;
string childDestinationName = childDestinationPath.Substring(destinationResource.Uri.AbsoluteUri.Length + 1);
DataTransferStatus jobPartStatus = header.AtomicJobStatus;
Expand Down Expand Up @@ -163,7 +163,7 @@ public static async Task<UriToStreamJobPart> ToJobPartAsync(
string childSourcePath = header.SourcePath;
string childSourceName = childSourcePath.Substring(sourceResource.Uri.AbsoluteUri.Length + 1);
string childDestinationPath = header.DestinationPath;
string childDestinationName = childDestinationPath.Substring(destinationResource.Uri.GetPath().Length + 1);
string childDestinationName = childDestinationPath.Substring(destinationResource.Uri.AbsoluteUri.Length + 1);
DataTransferStatus jobPartStatus = header.AtomicJobStatus;
UriToStreamJobPart jobPart = await UriToStreamJobPart.CreateJobPartAsync(
job: baseJob,
Expand Down Expand Up @@ -257,11 +257,8 @@ internal static void VerifyJobPartPlanHeader(this JobPartInternal jobPart, JobPa
throw Errors.MismatchTransferId(jobPart._dataTransfer.Id, header.TransferId);
}

// Check source path'
// Remove any query or SAS that could be attach to the Uri
UriBuilder sourceUriBuilder = new UriBuilder(jobPart._sourceResource.Uri.AbsoluteUri);
sourceUriBuilder.Query = "";
string passedSourcePath = sourceUriBuilder.Uri.AbsoluteUri;
// Check source path
string passedSourcePath = jobPart._sourceResource.Uri.ToSanitizedString();

// We only check if it starts with the path because if we're passed a container
// then we only need to check if the prefix matches
Expand All @@ -271,10 +268,7 @@ internal static void VerifyJobPartPlanHeader(this JobPartInternal jobPart, JobPa
}

// Check destination path
// Remove any query or SAS that could be attach to the Uri
UriBuilder destinationUriBuilder = new UriBuilder(jobPart._destinationResource.Uri.AbsoluteUri);
destinationUriBuilder.Query = "";
string passedDestinationPath = destinationUriBuilder.Uri.AbsoluteUri;
string passedDestinationPath = jobPart._destinationResource.Uri.ToSanitizedString();

// We only check if it starts with the path because if we're passed a container
// then we only need to check if the prefix matches
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@ internal class JobPlanHeader
/// </summary>
public string DestinationProviderId;

/// <summary>
/// Whether the transfer is of a container or not.
/// </summary>
public bool IsContainer;

/// <summary>
/// Whether or not the enumeration of the parent container has completed.
/// </summary>
Expand Down Expand Up @@ -67,6 +72,7 @@ public JobPlanHeader(
JobPlanOperation operationType,
string sourceProviderId,
string destinationProviderId,
bool isContainer,
bool enumerationComplete,
DataTransferStatus jobStatus,
string parentSourcePath,
Expand Down Expand Up @@ -95,6 +101,7 @@ public JobPlanHeader(
OperationType = operationType;
SourceProviderId = sourceProviderId;
DestinationProviderId = destinationProviderId;
IsContainer = isContainer;
EnumerationComplete = enumerationComplete;
JobStatus = jobStatus;
ParentSourcePath = parentSourcePath;
Expand Down Expand Up @@ -127,6 +134,9 @@ public void Serialize(Stream stream)
// DestinationProviderId
WritePaddedString(writer, DestinationProviderId, DataMovementConstants.JobPlanFile.ProviderIdNumBytes);

// IsContainer
writer.Write(Convert.ToByte(IsContainer));

// EnumerationComplete
writer.Write(Convert.ToByte(EnumerationComplete));

Expand Down Expand Up @@ -180,6 +190,10 @@ public static JobPlanHeader Deserialize(Stream stream)
// DestinationProviderId
string destProviderId = ReadPaddedString(reader, DataMovementConstants.JobPlanFile.ProviderIdNumBytes);

// IsContainer
byte isContainerByte = reader.ReadByte();
bool isContainer = Convert.ToBoolean(isContainerByte);

// EnumerationComplete
byte enumerationCompleteByte = reader.ReadByte();
bool enumerationComplete = Convert.ToBoolean(enumerationCompleteByte);
Expand Down Expand Up @@ -224,6 +238,7 @@ public static JobPlanHeader Deserialize(Stream stream)
operationType,
sourceProviderId,
destProviderId,
isContainer,
enumerationComplete,
jobPlanStatus.ToDataTransferStatus(),
parentSourcePath,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,13 +67,15 @@ public override async Task AddNewJobAsync(
throw Errors.CollisionTransferIdCheckpointer(transferId);
}

bool isContainer = source is StorageResourceContainer;
JobPlanHeader header = new(
DataMovementConstants.JobPlanFile.SchemaVersion,
transferId,
DateTimeOffset.UtcNow,
GetOperationType(source, destination),
source.ProviderId,
destination.ProviderId,
isContainer,
false, /* enumerationComplete */
new DataTransferStatusInternal(),
source.Uri.ToSanitizedString(),
Expand Down
22 changes: 17 additions & 5 deletions sdk/storage/Azure.Storage.DataMovement/tests/GetTransfersTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -246,8 +246,8 @@ public async Task GetResumableTransfers_LocalCheckpointer()
new DataTransferProperties { TransferId = Guid.NewGuid().ToString(), SourceProviderId = "local", SourceTypeId = "LocalFile", SourceUri = new Uri(parentLocalUri1, "file1"), DestinationProviderId = "blob", DestinationTypeId = "BlockBlob", DestinationUri = new Uri(parentRemoteUri, "file1"), IsContainer = false },
new DataTransferProperties { TransferId = Guid.NewGuid().ToString(), SourceProviderId = "blob", SourceTypeId = "BlockBlob", SourceUri = new Uri(parentRemoteUri, "file2/"), DestinationProviderId = "local", DestinationTypeId = "LocalFile", DestinationUri = new Uri(parentLocalUri1, "file2/"), IsContainer = false },
new DataTransferProperties { TransferId = Guid.NewGuid().ToString(), SourceProviderId = "blob", SourceTypeId = "BlockBlob", SourceUri = new Uri(parentRemoteUri, "file3"), DestinationProviderId = "blob", DestinationTypeId = "BlockBlob", DestinationUri = new Uri(parentRemoteUri, "file3"), IsContainer = false },
new DataTransferProperties { TransferId = Guid.NewGuid().ToString(), SourceProviderId = "blob", SourceTypeId = "BlockBlob", SourceUri = parentRemoteUri, DestinationProviderId = "local", DestinationTypeId = "LocalFile", DestinationUri = parentLocalUri1, IsContainer = true },
new DataTransferProperties { TransferId = Guid.NewGuid().ToString(), SourceProviderId = "local", SourceTypeId = "LocalFile", SourceUri = parentLocalUri2, DestinationProviderId = "blob", DestinationTypeId = "AppendBlob", DestinationUri = parentRemoteUri, IsContainer = true },
new DataTransferProperties { TransferId = Guid.NewGuid().ToString(), SourceProviderId = "blob", SourceTypeId = default, SourceUri = parentRemoteUri, DestinationProviderId = "local", DestinationTypeId = default, DestinationUri = parentLocalUri1, IsContainer = true },
new DataTransferProperties { TransferId = Guid.NewGuid().ToString(), SourceProviderId = "local", SourceTypeId = default, SourceUri = parentLocalUri2, DestinationProviderId = "blob", DestinationTypeId = default, DestinationUri = parentRemoteUri, IsContainer = true },
};

// Add a transfer for each expected result
Expand Down Expand Up @@ -322,7 +322,8 @@ private void AddTransferFromDataTransferProperties(
parentSourcePath: properties.SourceUri.AbsoluteUri,
parentDestinationPath: properties.DestinationUri.AbsoluteUri,
sourceProviderId: properties.SourceProviderId,
destinationProviderId: properties.DestinationProviderId);
destinationProviderId: properties.DestinationProviderId,
isContainer: properties.IsContainer);

if (properties.IsContainer)
{
Expand All @@ -343,15 +344,18 @@ private void AddTransferFromDataTransferProperties(
destinationPaths.Add(properties.DestinationUri + $"file{i}");
}

// Because type ID is null on container transfers, derive a type from provider id
string sourceTypeId = GetTypeIdForProvider(properties.SourceProviderId);
string destinationTypeId = GetTypeIdForProvider(properties.DestinationProviderId);
factory.CreateStubJobPartPlanFilesAsync(
checkpointerPath,
properties.TransferId,
numParts, /* jobPartCount */
InProgressStatus,
sourcePaths,
destinationPaths,
sourceResourceId: properties.SourceTypeId,
destinationResourceId: properties.DestinationTypeId);
sourceResourceId: sourceTypeId,
destinationResourceId: destinationTypeId);
}
else
{
Expand All @@ -378,5 +382,13 @@ private void AssertTransferProperties(DataTransferProperties expected, DataTrans
Assert.AreEqual(expected.DestinationUri.AbsoluteUri.TrimEnd('\\', '/'), actual.DestinationUri.AbsoluteUri.TrimEnd('\\', '/'));
Assert.AreEqual(expected.IsContainer, actual.IsContainer);
}

private string GetTypeIdForProvider(string providerId)
=> providerId switch
{
"blob" => "BlockBlob",
"local" => "LocalFile",
_ => "Unknown"
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ public void Ctor()
Assert.AreEqual(DefaultJobPlanOperation, header.OperationType);
Assert.AreEqual(DefaultSourceProviderId, header.SourceProviderId);
Assert.AreEqual(DefaultDestinationProviderId, header.DestinationProviderId);
Assert.AreEqual(false, header.EnumerationComplete);
Assert.IsFalse(header.IsContainer);
Assert.IsFalse(header.EnumerationComplete);
Assert.AreEqual(DefaultJobStatus, header.JobStatus);
Assert.AreEqual(DefaultSourcePath, header.ParentSourcePath);
Assert.AreEqual(DefaultDestinationPath, header.ParentDestinationPath);
Expand Down Expand Up @@ -81,7 +82,8 @@ private void DeserializeAndVerify(Stream stream, string version)
Assert.AreEqual(DefaultJobPlanOperation, deserialized.OperationType);
Assert.AreEqual(DefaultSourceProviderId, deserialized.SourceProviderId);
Assert.AreEqual(DefaultDestinationProviderId, deserialized.DestinationProviderId);
Assert.AreEqual(false, deserialized.EnumerationComplete);
Assert.IsFalse(deserialized.IsContainer);
Assert.IsFalse(deserialized.EnumerationComplete);
Assert.AreEqual(DefaultJobStatus, deserialized.JobStatus);
Assert.AreEqual(DefaultSourcePath, deserialized.ParentSourcePath);
Assert.AreEqual(DefaultDestinationPath, deserialized.ParentDestinationPath);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ internal void CreateStubJobPlanFile(
string parentDestinationPath = _testDestinationPath,
string sourceProviderId = _testSourceProviderId,
string destinationProviderId = _testDestinationProviderId,
bool isContainer = false,
DataTransferStatus status = default)
{
status ??= new DataTransferStatus();
Expand All @@ -162,6 +163,7 @@ internal void CreateStubJobPlanFile(
JobPlanOperation.ServiceToService,
sourceProviderId,
destinationProviderId,
isContainer,
false, /* enumerationComplete */
status,
parentSourcePath,
Expand Down
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ internal static JobPlanHeader CreateDefaultJobHeader(
JobPlanOperation operationType = DefaultJobPlanOperation,
string sourceProviderId = DefaultSourceProviderId,
string destinationProviderId = DefaultDestinationProviderId,
bool isContainer = false,
bool enumerationComplete = false,
DataTransferStatus jobStatus = default,
string parentSourcePath = DefaultSourcePath,
Expand All @@ -203,6 +204,7 @@ internal static JobPlanHeader CreateDefaultJobHeader(
operationType,
sourceProviderId,
destinationProviderId,
isContainer,
enumerationComplete,
jobStatus,
parentSourcePath,
Expand Down

0 comments on commit b1bcded

Please sign in to comment.