Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Batch CRUD API #427

Merged
merged 18 commits into from
Jul 16, 2019
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
240 changes: 240 additions & 0 deletions Microsoft.Azure.Cosmos/src/Batch/Batch.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,240 @@
//------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
//------------------------------------------------------------

namespace Microsoft.Azure.Cosmos
{
using System.Collections.Generic;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Documents;

/// <summary>
/// Represents a batch of requests that will be performed atomically against the Azure Cosmos DB service.
abhijitpai marked this conversation as resolved.
Show resolved Hide resolved
/// </summary>
public class Batch
abhijitpai marked this conversation as resolved.
Show resolved Hide resolved
{
private readonly PartitionKey partitionKey;

private readonly ContainerCore container;

private List<ItemBatchOperation> operations;

/// <summary>
/// Initializes a new instance of the <see cref="Batch"/> class.
/// </summary>
/// <param name="container">Container that has items on which batch operations are to be performed.</param>
/// <param name="partitionKey">The partition key for all items in the batch. <see cref="PartitionKey"/>.</param>
internal Batch(ContainerCore container, PartitionKey partitionKey)
{
this.container = container;
this.partitionKey = partitionKey;
this.operations = new List<ItemBatchOperation>();
}

/// <summary>
/// Adds an operation to create an item into the batch.
/// </summary>
/// <param name="item">A JSON serializable object that must contain an id property.<see cref="CosmosSerializer"/> to implement a custom serializer.</param>
/// <param name="requestOptions">(Optional) The options for the item request. <see cref="BatchItemRequestOptions"/>.</param>
/// <returns>The <see cref="Batch"/> instance with the operation added.</returns>
/// <typeparam name="T">The type of item to be created.</typeparam>
public virtual Batch CreateItem<T>(T item, BatchItemRequestOptions requestOptions = null)
{
this.operations.Add(new ItemBatchOperation<T>(
abhijitpai marked this conversation as resolved.
Show resolved Hide resolved
operationType: OperationType.Create,
operationIndex: this.operations.Count,
resource: item,
requestOptions: requestOptions));

return this;
}

/// <summary>
/// Adds an operation to create an item into the batch.
/// </summary>
/// <param name="resourceStream">
/// A <see cref="Stream"/> containing the payload of the item.
/// The stream must have a UTF-8 encoded JSON object which contains an id property.
/// </param>
/// <param name="requestOptions">(Optional) The options for the item request. <see cref="BatchItemRequestOptions"/>.</param>
/// <returns>The <see cref="Batch"/> instance with the operation added.</returns>
public virtual Batch CreateItemStream(Stream resourceStream, BatchItemRequestOptions requestOptions = null)
abhijitpai marked this conversation as resolved.
Show resolved Hide resolved
kirankumarkolli marked this conversation as resolved.
Show resolved Hide resolved
{
this.operations.Add(new ItemBatchOperation(
operationType: OperationType.Create,
operationIndex: this.operations.Count,
resourceStream: resourceStream,
requestOptions: requestOptions));

return this;
}

/// <summary>
/// Adds an operation to read an item into the batch.
/// </summary>
/// <param name="id">The cosmos item id.</param>
/// <param name="requestOptions">(Optional) The options for the item request. <see cref="BatchItemRequestOptions"/>.</param>
/// <returns>The <see cref="Batch"/> instance with the operation added.</returns>
public virtual Batch ReadItem(string id, BatchItemRequestOptions requestOptions = null)
{
this.operations.Add(new ItemBatchOperation(
operationType: OperationType.Read,
operationIndex: this.operations.Count,
id: id,
requestOptions: requestOptions));

return this;
}

/// <summary>
/// Adds an operation to upsert an item into the batch.
/// </summary>
/// <param name="item">A JSON serializable object that must contain an id property. <see cref="CosmosSerializer"/> to implement a custom serializer.</param>
/// <param name="requestOptions">(Optional) The options for the item request. <see cref="BatchItemRequestOptions"/>.</param>
/// <returns>The <see cref="Batch"/> instance with the operation added.</returns>
/// <typeparam name="T">The type of item to be created.</typeparam>
public virtual Batch UpsertItem<T>(T item, BatchItemRequestOptions requestOptions = null)
{
this.operations.Add(new ItemBatchOperation<T>(
abhijitpai marked this conversation as resolved.
Show resolved Hide resolved
operationType: OperationType.Upsert,
operationIndex: this.operations.Count,
resource: item,
requestOptions: requestOptions));

return this;
}

/// <summary>
/// Adds an operation to upsert an item into the batch.
/// </summary>
/// <param name="resourceStream">
/// A <see cref="Stream"/> containing the payload of the item.
/// The stream must have a UTF-8 encoded JSON object which contains an id property.
/// </param>
/// <param name="requestOptions">(Optional) The options for the item request. <see cref="BatchItemRequestOptions"/>.</param>
/// <returns>The <see cref="Batch"/> instance with the operation added.</returns>
public virtual Batch UpsertItemStream(Stream resourceStream, BatchItemRequestOptions requestOptions = null)
{
this.operations.Add(new ItemBatchOperation(
operationType: OperationType.Upsert,
operationIndex: this.operations.Count,
resourceStream: resourceStream,
requestOptions: requestOptions));

return this;
}

/// <summary>
/// Adds an operation to replace an item into the batch.
/// </summary>
/// <param name="id">The cosmos item id.</param>
/// <param name="item">A JSON serializable object that must contain an id property. <see cref="CosmosSerializer"/> to implement a custom serializer.</param>
/// <param name="requestOptions">(Optional) The options for the item request. <see cref="BatchItemRequestOptions"/>.</param>
/// <returns>The <see cref="Batch"/> instance with the operation added.</returns>
/// <typeparam name="T">The type of item to be created.</typeparam>
public virtual Batch ReplaceItem<T>(string id, T item, BatchItemRequestOptions requestOptions = null)
{
this.operations.Add(new ItemBatchOperation<T>(
operationType: OperationType.Replace,
operationIndex: this.operations.Count,
id: id,
resource: item,
requestOptions: requestOptions));

return this;
}

/// <summary>
/// Adds an operation to replace an item into the batch.
/// </summary>
/// <param name="id">The cosmos item id.</param>
/// <param name="resourceStream">
/// A <see cref="Stream"/> containing the payload of the item.
/// The stream must have a UTF-8 encoded JSON object which contains an id property.
/// </param>
/// <param name="requestOptions">(Optional) The options for the item request. <see cref="BatchItemRequestOptions"/>.</param>
/// <returns>The <see cref="Batch"/> instance with the operation added.</returns>
public virtual Batch ReplaceItemStream(string id, Stream resourceStream, BatchItemRequestOptions requestOptions = null)
{
this.operations.Add(new ItemBatchOperation(
operationType: OperationType.Replace,
operationIndex: this.operations.Count,
id: id,
resourceStream: resourceStream,
requestOptions: requestOptions));

return this;
}

/// <summary>
/// Adds an operation to delete an item into the batch.
/// </summary>
/// <param name="id">The cosmos item id.</param>
/// <param name="requestOptions">(Optional) The options for the item request. <see cref="BatchItemRequestOptions"/>.</param>
/// <returns>The <see cref="Batch"/> instance with the operation added.</returns>
public virtual Batch DeleteItem(string id, BatchItemRequestOptions requestOptions = null)
{
this.operations.Add(new ItemBatchOperation(
operationType: OperationType.Delete,
operationIndex: this.operations.Count,
id: id,
requestOptions: requestOptions));

return this;
}

/// <summary>
/// Executes the batch at the Azure Cosmos service as an asynchronous operation.
/// </summary>
/// <param name="cancellationToken">(Optional) <see cref="CancellationToken"/> representing request cancellation.</param>
/// <returns>An awaitable <see cref="BatchResponse"/> which contains the completion status and results of each operation.</returns>
public virtual Task<BatchResponse> ExecuteAsync(CancellationToken cancellationToken = default(CancellationToken))
{
return this.ExecuteAsync(requestOptions: null, cancellationToken: cancellationToken);
}

/// <summary>
/// Adds an operation to patch an item into the batch.
/// </summary>
/// <param name="id">The cosmos item id.</param>
/// <param name="patchStream">A <see cref="Stream"/> containing the patch specification.</param>
/// <param name="requestOptions">(Optional) The options for the item request. <see cref="BatchItemRequestOptions"/>.</param>
/// <returns>The <see cref="Batch"/> instance with the operation added.</returns>
internal virtual Batch PatchItemStream(string id, Stream patchStream, BatchItemRequestOptions requestOptions = null)
abhijitpai marked this conversation as resolved.
Show resolved Hide resolved
{
this.operations.Add(new ItemBatchOperation(
operationType: OperationType.Patch,
operationIndex: this.operations.Count,
id: id,
resourceStream: patchStream,
requestOptions: requestOptions));

return this;
}

/// <summary>
/// Executes the batch at the Azure Cosmos service as an asynchronous operation.
/// </summary>
/// <param name="requestOptions">Options that apply to the batch.</param>
/// <param name="cancellationToken">(Optional) <see cref="CancellationToken"/> representing request cancellation.</param>
/// <returns>An awaitable <see cref="BatchResponse"/> which contains the completion status and results of each operation.</returns>
internal virtual Task<BatchResponse> ExecuteAsync(RequestOptions requestOptions, CancellationToken cancellationToken = default(CancellationToken))
abhijitpai marked this conversation as resolved.
Show resolved Hide resolved
abhijitpai marked this conversation as resolved.
Show resolved Hide resolved
{
BatchExecUtils.GetServerRequestLimits(out int maxServerRequestBodyLength, out int maxServerRequestOperationCount);
return this.ExecuteAsync(maxServerRequestBodyLength, maxServerRequestOperationCount, requestOptions, cancellationToken);
}

internal Task<BatchResponse> ExecuteAsync(
int maxServerRequestBodyLength,
int maxServerRequestOperationCount,
RequestOptions requestOptions = null,
CancellationToken cancellationToken = default(CancellationToken))
{
BatchExecutor executor = new BatchExecutor(this.container, this.partitionKey, this.operations, requestOptions, maxServerRequestBodyLength, maxServerRequestOperationCount);
this.operations = new List<ItemBatchOperation>();
return executor.ExecuteAsync(cancellationToken);
}
}
}
150 changes: 150 additions & 0 deletions Microsoft.Azure.Cosmos/src/Batch/BatchExecUtils.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
//------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
//------------------------------------------------------------

namespace Microsoft.Azure.Cosmos
{
using System;
using System.Collections.Generic;
using System.IO;
using System.Net;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Documents;

/// <summary>
/// Util methods for batch requests.
/// </summary>
internal static class BatchExecUtils
{
/// <summary>
/// Converts a Stream to a Memory{byte} wrapping a byte array honoring a provided maximum length for the returned Memory.
/// </summary>
/// <param name="stream">Stream to be converted to bytes.</param>
/// <param name="maximumLength">Desired maximum length of the Memory{byte}.</param>
/// <param name="cancellationToken"><see cref="CancellationToken"/> to cancel the operation.</param>
/// <returns>A Memory{byte} with length at most maximumLength.</returns>
/// <remarks>Throws RequestEntityTooLargeException if the input stream has more bytes than maximumLength.</remarks>
public static async Task<Memory<byte>> StreamToMemoryAsync(Stream stream, int maximumLength, CancellationToken cancellationToken)
abhijitpai marked this conversation as resolved.
Show resolved Hide resolved
{
if (stream.CanSeek)
{
if (stream.Length > maximumLength)
{
throw new RequestEntityTooLargeException(RMResources.RequestTooLarge);
}

// Some derived implementations of MemoryStream (such as versions of RecyclableMemoryStream prior to 1.2.2 that we may be using)
// return an incorrect response from TryGetBuffer. Use TryGetBuffer only on the MemoryStream type and not derived types.
MemoryStream memStream = stream as MemoryStream;
if (memStream != null
&& memStream.GetType() == typeof(MemoryStream)
&& memStream.TryGetBuffer(out ArraySegment<byte> memBuffer))
{
return memBuffer;
}

byte[] bytes = new byte[stream.Length];
int sum = 0;
int count;
while ((count = await stream.ReadAsync(bytes, sum, bytes.Length - sum, cancellationToken)) > 0)
{
sum += count;
}

return bytes;
}
else
{
int bufferSize = 81920; // Using the same buffer size as the Stream.DefaultCopyBufferSize
abhijitpai marked this conversation as resolved.
Show resolved Hide resolved
byte[] buffer = new byte[bufferSize];

using (MemoryStream memoryStream = new MemoryStream(bufferSize)) // using bufferSize as initial capacity as well
{
int sum = 0;
int count;
while ((count = await stream.ReadAsync(buffer, 0, bufferSize, cancellationToken)) > 0)
{
sum += count;
if (sum > maximumLength)
{
throw new RequestEntityTooLargeException(RMResources.RequestTooLarge);
}

#pragma warning disable VSTHRD103 // Call async methods when in an async method
memoryStream.Write(buffer, 0, count);
#pragma warning restore VSTHRD103 // Call async methods when in an async method
}

return new Memory<byte>(memoryStream.GetBuffer(), 0, (int)memoryStream.Length);
}
}
}

public static void GetServerRequestLimits(out int maxServerRequestBodyLength, out int maxServerRequestOperationCount)
abhijitpai marked this conversation as resolved.
Show resolved Hide resolved
{
maxServerRequestBodyLength = Constants.MaxDirectModeBatchRequestBodySizeInBytes;
maxServerRequestOperationCount = Constants.MaxOperationsInDirectModeBatchRequest;
}

public static ResponseMessage Validate(
IReadOnlyList<ItemBatchOperation> operations,
RequestOptions batchOptions,
int? maxOperationCount = null)
{
string errorMessage = null;

if (operations.Count == 0)
{
errorMessage = ClientResources.BatchNoOperations;
}

if (maxOperationCount.HasValue && operations.Count > maxOperationCount.Value)
{
errorMessage = ClientResources.BatchTooLarge;
}

if (errorMessage == null && batchOptions != null)
{
if (batchOptions.IfMatchEtag != null || batchOptions.IfNoneMatchEtag != null)
{
errorMessage = ClientResources.BatchRequestOptionNotSupported;
}
}

if (errorMessage == null)
{
foreach (ItemBatchOperation operation in operations)
{
if (operation.RequestOptions != null
&& operation.RequestOptions.Properties != null
&& (operation.RequestOptions.Properties.TryGetValue(WFConstants.BackendHeaders.EffectivePartitionKey, out object epkObj)
| operation.RequestOptions.Properties.TryGetValue(WFConstants.BackendHeaders.EffectivePartitionKeyString, out object epkStrObj)))
abhijitpai marked this conversation as resolved.
Show resolved Hide resolved
{
byte[] epk = epkObj as byte[];
string epkStr = epkStrObj as string;
if (epk == null || epkStr == null)
{
errorMessage = string.Format(
ClientResources.EpkPropertiesPairingExpected,
WFConstants.BackendHeaders.EffectivePartitionKey,
WFConstants.BackendHeaders.EffectivePartitionKeyString);
}

if (operation.PartitionKey != null)
{
errorMessage = ClientResources.PKAndEpkSetTogether;
}
}
}
}

if (errorMessage != null)
{
return new ResponseMessage(HttpStatusCode.BadRequest, errorMessage: errorMessage);
}

return new ResponseMessage(HttpStatusCode.OK);
}
}
}
Loading