Skip to content

Commit

Permalink
Open Write (#13344)
Browse files Browse the repository at this point in the history
  • Loading branch information
seanmcc-msft authored Aug 12, 2020
1 parent ad01bac commit f0265cb
Show file tree
Hide file tree
Showing 170 changed files with 74,491 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,13 @@ public AppendBlobCreateOptions() { }
public System.Collections.Generic.IDictionary<string, string> Metadata { get { throw null; } set { } }
public System.Collections.Generic.IDictionary<string, string> Tags { get { throw null; } set { } }
}
public partial class AppendBlobOpenWriteOptions
{
public AppendBlobOpenWriteOptions() { }
public long? BufferSize { get { throw null; } set { } }
public Azure.Storage.Blobs.Models.AppendBlobRequestConditions OpenConditions { get { throw null; } set { } }
public System.IProgress<long> ProgressHandler { get { throw null; } set { } }
}
public partial class AppendBlobRequestConditions : Azure.Storage.Blobs.Models.BlobRequestConditions
{
public AppendBlobRequestConditions() { }
Expand Down Expand Up @@ -892,6 +899,13 @@ public BlobUploadOptions() { }
public System.Collections.Generic.IDictionary<string, string> Tags { get { throw null; } set { } }
public Azure.Storage.StorageTransferOptions TransferOptions { get { throw null; } set { } }
}
public partial class BlockBlobOpenWriteOptions
{
public BlockBlobOpenWriteOptions() { }
public long? BufferSize { get { throw null; } set { } }
public Azure.Storage.Blobs.Models.BlobRequestConditions OpenConditions { get { throw null; } set { } }
public System.IProgress<long> ProgressHandler { get { throw null; } set { } }
}
public partial class BlockInfo
{
internal BlockInfo() { }
Expand Down Expand Up @@ -1031,6 +1045,14 @@ internal PageBlobInfo() { }
public Azure.ETag ETag { get { throw null; } }
public System.DateTimeOffset LastModified { get { throw null; } }
}
public partial class PageBlobOpenWriteOptions
{
public PageBlobOpenWriteOptions() { }
public long? BufferSize { get { throw null; } set { } }
public Azure.Storage.Blobs.Models.PageBlobRequestConditions OpenConditions { get { throw null; } set { } }
public System.IProgress<long> ProgressHandler { get { throw null; } set { } }
public long? Size { get { throw null; } set { } }
}
public partial class PageBlobRequestConditions : Azure.Storage.Blobs.Models.BlobRequestConditions
{
public PageBlobRequestConditions() { }
Expand Down Expand Up @@ -1151,6 +1173,8 @@ public AppendBlobClient(System.Uri blobUri, Azure.Storage.StorageSharedKeyCreden
public virtual System.Threading.Tasks.Task<Azure.Response<Azure.Storage.Blobs.Models.BlobContentInfo>> CreateIfNotExistsAsync(Azure.Storage.Blobs.Models.AppendBlobCreateOptions options, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
[System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)]
public virtual System.Threading.Tasks.Task<Azure.Response<Azure.Storage.Blobs.Models.BlobContentInfo>> CreateIfNotExistsAsync(Azure.Storage.Blobs.Models.BlobHttpHeaders httpHeaders = null, System.Collections.Generic.IDictionary<string, string> metadata = null, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public virtual System.IO.Stream OpenWrite(bool overwrite, Azure.Storage.Blobs.Models.AppendBlobOpenWriteOptions options = null, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public virtual System.Threading.Tasks.Task<System.IO.Stream> OpenWriteAsync(bool overwrite, Azure.Storage.Blobs.Models.AppendBlobOpenWriteOptions options = null, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public virtual Azure.Response<Azure.Storage.Blobs.Models.BlobInfo> Seal(Azure.Storage.Blobs.Models.AppendBlobRequestConditions conditions = null, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public virtual System.Threading.Tasks.Task<Azure.Response<Azure.Storage.Blobs.Models.BlobInfo>> SealAsync(Azure.Storage.Blobs.Models.AppendBlobRequestConditions conditions = null, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public new Azure.Storage.Blobs.Specialized.AppendBlobClient WithSnapshot(string snapshot) { throw null; }
Expand Down Expand Up @@ -1273,6 +1297,8 @@ public BlockBlobClient(System.Uri blobUri, Azure.Storage.StorageSharedKeyCredent
protected static Azure.Storage.Blobs.Specialized.BlockBlobClient CreateClient(System.Uri blobUri, Azure.Storage.Blobs.BlobClientOptions options, Azure.Core.Pipeline.HttpPipeline pipeline) { throw null; }
public virtual Azure.Response<Azure.Storage.Blobs.Models.BlockList> GetBlockList(Azure.Storage.Blobs.Models.BlockListTypes blockListTypes = Azure.Storage.Blobs.Models.BlockListTypes.All, string snapshot = null, Azure.Storage.Blobs.Models.BlobRequestConditions conditions = null, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public virtual System.Threading.Tasks.Task<Azure.Response<Azure.Storage.Blobs.Models.BlockList>> GetBlockListAsync(Azure.Storage.Blobs.Models.BlockListTypes blockListTypes = Azure.Storage.Blobs.Models.BlockListTypes.All, string snapshot = null, Azure.Storage.Blobs.Models.BlobRequestConditions conditions = null, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public virtual System.IO.Stream OpenWrite(bool overwrite, Azure.Storage.Blobs.Models.BlockBlobOpenWriteOptions options = null, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public virtual System.Threading.Tasks.Task<System.IO.Stream> OpenWriteAsync(bool overwrite, Azure.Storage.Blobs.Models.BlockBlobOpenWriteOptions options = null, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public virtual Azure.Response<Azure.Storage.Blobs.Models.BlobDownloadInfo> Query(string querySqlExpression, Azure.Storage.Blobs.Models.BlobQueryOptions options = null, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public virtual System.Threading.Tasks.Task<Azure.Response<Azure.Storage.Blobs.Models.BlobDownloadInfo>> QueryAsync(string querySqlExpression, Azure.Storage.Blobs.Models.BlobQueryOptions options = null, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public virtual Azure.Response<Azure.Storage.Blobs.Models.BlockInfo> StageBlock(string base64BlockId, System.IO.Stream content, byte[] transactionalContentHash = null, Azure.Storage.Blobs.Models.BlobRequestConditions conditions = null, System.IProgress<long> progressHandler = null, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
Expand Down Expand Up @@ -1319,6 +1345,8 @@ public PageBlobClient(System.Uri blobUri, Azure.Storage.StorageSharedKeyCredenti
public virtual System.Threading.Tasks.Task<Azure.Response<Azure.Storage.Blobs.Models.PageRangesInfo>> GetPageRangesAsync(Azure.HttpRange? range = default(Azure.HttpRange?), string snapshot = null, Azure.Storage.Blobs.Models.PageBlobRequestConditions conditions = null, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public virtual Azure.Response<Azure.Storage.Blobs.Models.PageRangesInfo> GetPageRangesDiff(Azure.HttpRange? range = default(Azure.HttpRange?), string snapshot = null, string previousSnapshot = null, Azure.Storage.Blobs.Models.PageBlobRequestConditions conditions = null, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public virtual System.Threading.Tasks.Task<Azure.Response<Azure.Storage.Blobs.Models.PageRangesInfo>> GetPageRangesDiffAsync(Azure.HttpRange? range = default(Azure.HttpRange?), string snapshot = null, string previousSnapshot = null, Azure.Storage.Blobs.Models.PageBlobRequestConditions conditions = null, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public virtual System.IO.Stream OpenWrite(bool overwrite, long position, Azure.Storage.Blobs.Models.PageBlobOpenWriteOptions options = null, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public virtual System.Threading.Tasks.Task<System.IO.Stream> OpenWriteAsync(bool overwrite, long position, Azure.Storage.Blobs.Models.PageBlobOpenWriteOptions options = null, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public virtual Azure.Response<Azure.Storage.Blobs.Models.PageBlobInfo> Resize(long size, Azure.Storage.Blobs.Models.PageBlobRequestConditions conditions = null, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public virtual System.Threading.Tasks.Task<Azure.Response<Azure.Storage.Blobs.Models.PageBlobInfo>> ResizeAsync(long size, Azure.Storage.Blobs.Models.PageBlobRequestConditions conditions = null, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public virtual Azure.Storage.Blobs.Models.CopyFromUriOperation StartCopyIncremental(System.Uri sourceUri, string snapshot, Azure.Storage.Blobs.Models.PageBlobRequestConditions conditions = null, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
Expand Down
176 changes: 175 additions & 1 deletion sdk/storage/Azure.Storage.Blobs/src/AppendBlobClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -944,7 +944,7 @@ await AppendBlockInternal(
/// A <see cref="RequestFailedException"/> will be thrown if
/// a failure occurs.
/// </remarks>
private async Task<Response<BlobAppendInfo>> AppendBlockInternal(
internal async Task<Response<BlobAppendInfo>> AppendBlockInternal(
Stream content,
byte[] transactionalContentHash,
AppendBlobRequestConditions conditions,
Expand Down Expand Up @@ -1390,6 +1390,180 @@ private async Task<Response<BlobInfo>> SealInternal(
}
}
#endregion Seal

#region OpenWrite
/// <summary>
/// Opens a stream for writing to the blob.
/// </summary>
/// <param name="overwrite">
/// Whether an existing blob should be deleted and recreated.
/// </param>
/// <param name="options">
/// Optional parameters.
/// </param>
/// <param name="cancellationToken">
/// Optional <see cref="CancellationToken"/> to propagate
/// notifications that the operation should be cancelled.
/// </param>
/// <returns>
/// A stream to write to the Append Blob.
/// </returns>
/// <remarks>
/// A <see cref="RequestFailedException"/> will be thrown if
/// a failure occurs.
/// </remarks>
#pragma warning disable AZC0015 // Unexpected client method return type.
public virtual Stream OpenWrite(
#pragma warning restore AZC0015 // Unexpected client method return type.
bool overwrite,
AppendBlobOpenWriteOptions options = default,
CancellationToken cancellationToken = default)
=> OpenWriteInternal(
overwrite: overwrite,
options: options,
async: false,
cancellationToken: cancellationToken)
.EnsureCompleted();

/// <summary>
/// Opens a stream for writing to the blob.
/// </summary>
/// <param name="overwrite">
/// Whether an existing blob should be deleted and recreated.
/// </param>
/// <param name="options">
/// Optional parameters.
/// </param>
/// <param name="cancellationToken">
/// Optional <see cref="CancellationToken"/> to propagate
/// notifications that the operation should be cancelled.
/// </param>
/// <returns>
/// A stream to write to the Append Blob.
/// </returns>
/// <remarks>
/// A <see cref="RequestFailedException"/> will be thrown if
/// a failure occurs.
/// </remarks>
#pragma warning disable AZC0015 // Unexpected client method return type.
public virtual async Task<Stream> OpenWriteAsync(
#pragma warning restore AZC0015 // Unexpected client method return type.
bool overwrite,
AppendBlobOpenWriteOptions options = default,
CancellationToken cancellationToken = default)
=> await OpenWriteInternal(
overwrite: overwrite,
options: options,
async: true,
cancellationToken: cancellationToken)
.ConfigureAwait(false);

/// <summary>
/// Opens a stream for writing to the blob.
/// </summary>
/// <param name="overwrite">
/// Whether an existing blob should be deleted and recreated.
/// </param>
/// <param name="options">
/// Optional parameters.
/// </param>
/// <param name="async">
/// Whether to invoke the operation asynchronously.
/// </param>
/// <param name="cancellationToken">
/// Optional <see cref="CancellationToken"/> to propagate
/// notifications that the operation should be cancelled.
/// </param>
/// <returns>
/// A stream to write to the Append Blob.
/// </returns>
/// <remarks>
/// A <see cref="RequestFailedException"/> will be thrown if
/// a failure occurs.
/// </remarks>
private async Task<Stream> OpenWriteInternal(
bool overwrite,
AppendBlobOpenWriteOptions options,
bool async,
CancellationToken cancellationToken)
{
DiagnosticScope scope = ClientDiagnostics.CreateScope($"{nameof(AppendBlobClient)}.{nameof(OpenWrite)}");

try
{
scope.Start();

long position;
ETag? etag;

if (overwrite)
{
Response<BlobContentInfo> createResponse = await CreateInternal(
httpHeaders: default,
metadata: default,
tags: default,
conditions: options?.OpenConditions,
async: async,
cancellationToken: cancellationToken)
.ConfigureAwait(false);

position = 0;
etag = createResponse.Value.ETag;
}
else
{
try
{
Response<BlobProperties> propertiesResponse = await GetPropertiesInternal(
conditions: options?.OpenConditions,
async: async,
cancellationToken: cancellationToken)
.ConfigureAwait(false);

position = propertiesResponse.Value.ContentLength;
etag = propertiesResponse.Value.ETag;
}
catch (RequestFailedException ex)
when (ex.ErrorCode == BlobErrorCode.BlobNotFound)
{
Response<BlobContentInfo> createResponse = await CreateInternal(
httpHeaders: default,
metadata: default,
tags: default,
conditions: options?.OpenConditions,
async: async,
cancellationToken: cancellationToken)
.ConfigureAwait(false);

position = 0;
etag = createResponse.Value.ETag;
}
}

AppendBlobRequestConditions conditions = new AppendBlobRequestConditions
{
IfMatch = etag,
LeaseId = options?.OpenConditions?.LeaseId,
};

return new AppendBlobWriteStream(
appendBlobClient: this,
bufferSize: options?.BufferSize ?? Constants.DefaultBufferSize,
position: position,
conditions: conditions,
progressHandler: options?.ProgressHandler);
}
catch (Exception ex)
{
scope.Failed(ex);
throw;
}
finally
{
scope.Dispose();
}
}
#endregion OpenWrite
}

/// <summary>
Expand Down
72 changes: 72 additions & 0 deletions sdk/storage/Azure.Storage.Blobs/src/AppendBlobWriteStream.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

using System;
using System.Threading;
using System.Threading.Tasks;
using Azure.Storage.Blobs.Models;
using Azure.Storage.Blobs.Specialized;
using Azure.Storage.Shared;

namespace Azure.Storage.Blobs
{
internal class AppendBlobWriteStream : StorageWriteStream
{
private readonly AppendBlobClient _appendBlobClient;
private readonly AppendBlobRequestConditions _conditions;

public AppendBlobWriteStream(
AppendBlobClient appendBlobClient,
long bufferSize,
long position,
AppendBlobRequestConditions conditions,
IProgress<long> progressHandler) : base(
position,
bufferSize,
progressHandler)
{
ValidateBufferSize(bufferSize);
_appendBlobClient = appendBlobClient;
_conditions = conditions ?? new AppendBlobRequestConditions();
}

protected override async Task AppendInternal(
bool async,
CancellationToken cancellationToken)
{
if (_buffer.Length > 0)
{
_buffer.Position = 0;

Response<BlobAppendInfo> response = await _appendBlobClient.AppendBlockInternal(
content: _buffer,
transactionalContentHash: default,
conditions: _conditions,
progressHandler: _progressHandler,
async: async,
cancellationToken: cancellationToken)
.ConfigureAwait(false);

_conditions.IfMatch = response.Value.ETag;

_buffer.Clear();
}
}

protected override async Task FlushInternal(bool async, CancellationToken cancellationToken)
=> await AppendInternal(async, cancellationToken).ConfigureAwait(false);

protected override void ValidateBufferSize(long bufferSize)
{
if (bufferSize < 1)
{
throw new ArgumentOutOfRangeException(nameof(bufferSize), "Must be greater than or equal to 1");
}

if (bufferSize > Constants.Blob.Append.MaxAppendBlockBytes)
{
throw new ArgumentOutOfRangeException(nameof(bufferSize), $"Must less than or equal to {Constants.Blob.Append.MaxAppendBlockBytes}");
}
}
}
}
Loading

0 comments on commit f0265cb

Please sign in to comment.