Skip to content

Commit

Permalink
Batch CRUD API (#427)
Browse files Browse the repository at this point in the history
* Batch CRUD API

* Renames due to merge

* Remove Cosmos from Batch/BatchOperationResult/BatchResponse, use clientContext instead of client in BatchExecutor

* Add BatchItemRequestOptions so we can avoid having unsupported options; minor: getActivityIds to internal, use serializer from clientContext instead of clientOptions

* Changes for merge: move Cosmos removals; PartitionKey is struct and use of IsEffectivePartitionKeyRouting

* Remove no longer used resx string

* Ignore rate limiting test as emulator likely runs without rate limit in SDK test run

* Dummy

* Batch as abstract with BatchCore impl, arg checks for public methods, exceptions for input errors

* Missed file

* Move batch public contract under PREVIEW pre-processor directive

* Fixing clean-up
  • Loading branch information
abhijitpai authored and kirankumarkolli committed Jul 16, 2019
1 parent 63de89e commit af6a83a
Show file tree
Hide file tree
Showing 29 changed files with 5,092 additions and 148 deletions.
125 changes: 125 additions & 0 deletions Microsoft.Azure.Cosmos/src/Batch/Batch.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
//------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
//------------------------------------------------------------

namespace Microsoft.Azure.Cosmos
{
using System.IO;
using System.Threading;
using System.Threading.Tasks;

/// <summary>
/// Represents a batch of requests that will be performed atomically against the Azure Cosmos DB service.
/// </summary>
#if PREVIEW
public
#else
internal
#endif
abstract class Batch
{
/// <summary>
/// Adds an operation to create an item into the batch.
/// </summary>
/// <param name="item">A JSON serializable object that must contain an id property.<see cref="CosmosSerializer"/> to implement a custom serializer.</param>
/// <param name="requestOptions">(Optional) The options for the item request. <see cref="BatchItemRequestOptions"/>.</param>
/// <returns>The <see cref="Batch"/> instance with the operation added.</returns>
/// <typeparam name="T">The type of item to be created.</typeparam>
public abstract Batch CreateItem<T>(
T item,
BatchItemRequestOptions requestOptions = null);

/// <summary>
/// Adds an operation to create an item into the batch.
/// </summary>
/// <param name="streamPayload">
/// A <see cref="Stream"/> containing the payload of the item.
/// The stream must have a UTF-8 encoded JSON object which contains an id property.
/// </param>
/// <param name="requestOptions">(Optional) The options for the item request. <see cref="BatchItemRequestOptions"/>.</param>
/// <returns>The <see cref="Batch"/> instance with the operation added.</returns>
public abstract Batch CreateItemStream(
Stream streamPayload,
BatchItemRequestOptions requestOptions = null);

/// <summary>
/// Adds an operation to read an item into the batch.
/// </summary>
/// <param name="id">The cosmos item id.</param>
/// <param name="requestOptions">(Optional) The options for the item request. <see cref="BatchItemRequestOptions"/>.</param>
/// <returns>The <see cref="Batch"/> instance with the operation added.</returns>
public abstract Batch ReadItem(
string id,
BatchItemRequestOptions requestOptions = null);

/// <summary>
/// Adds an operation to upsert an item into the batch.
/// </summary>
/// <param name="item">A JSON serializable object that must contain an id property. <see cref="CosmosSerializer"/> to implement a custom serializer.</param>
/// <param name="requestOptions">(Optional) The options for the item request. <see cref="BatchItemRequestOptions"/>.</param>
/// <returns>The <see cref="Batch"/> instance with the operation added.</returns>
/// <typeparam name="T">The type of item to be created.</typeparam>
public abstract Batch UpsertItem<T>(
T item,
BatchItemRequestOptions requestOptions = null);

/// <summary>
/// Adds an operation to upsert an item into the batch.
/// </summary>
/// <param name="streamPayload">
/// A <see cref="Stream"/> containing the payload of the item.
/// The stream must have a UTF-8 encoded JSON object which contains an id property.
/// </param>
/// <param name="requestOptions">(Optional) The options for the item request. <see cref="BatchItemRequestOptions"/>.</param>
/// <returns>The <see cref="Batch"/> instance with the operation added.</returns>
public abstract Batch UpsertItemStream(
Stream streamPayload,
BatchItemRequestOptions requestOptions = null);

/// <summary>
/// Adds an operation to replace an item into the batch.
/// </summary>
/// <param name="id">The cosmos item id.</param>
/// <param name="item">A JSON serializable object that must contain an id property. <see cref="CosmosSerializer"/> to implement a custom serializer.</param>
/// <param name="requestOptions">(Optional) The options for the item request. <see cref="BatchItemRequestOptions"/>.</param>
/// <returns>The <see cref="Batch"/> instance with the operation added.</returns>
/// <typeparam name="T">The type of item to be created.</typeparam>
public abstract Batch ReplaceItem<T>(
string id,
T item,
BatchItemRequestOptions requestOptions = null);

/// <summary>
/// Adds an operation to replace an item into the batch.
/// </summary>
/// <param name="id">The cosmos item id.</param>
/// <param name="streamPayload">
/// A <see cref="Stream"/> containing the payload of the item.
/// The stream must have a UTF-8 encoded JSON object which contains an id property.
/// </param>
/// <param name="requestOptions">(Optional) The options for the item request. <see cref="BatchItemRequestOptions"/>.</param>
/// <returns>The <see cref="Batch"/> instance with the operation added.</returns>
public abstract Batch ReplaceItemStream(
string id,
Stream streamPayload,
BatchItemRequestOptions requestOptions = null);

/// <summary>
/// Adds an operation to delete an item into the batch.
/// </summary>
/// <param name="id">The cosmos item id.</param>
/// <param name="requestOptions">(Optional) The options for the item request. <see cref="BatchItemRequestOptions"/>.</param>
/// <returns>The <see cref="Batch"/> instance with the operation added.</returns>
public abstract Batch DeleteItem(
string id,
BatchItemRequestOptions requestOptions = null);

/// <summary>
/// Executes the batch at the Azure Cosmos service as an asynchronous operation.
/// </summary>
/// <param name="cancellationToken">(Optional) <see cref="CancellationToken"/> representing request cancellation.</param>
/// <returns>An awaitable <see cref="BatchResponse"/> which contains the completion status and results of each operation.</returns>
public abstract Task<BatchResponse> ExecuteAsync(
CancellationToken cancellationToken = default(CancellationToken));
}
}
254 changes: 254 additions & 0 deletions Microsoft.Azure.Cosmos/src/Batch/BatchCore.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,254 @@
//------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
//------------------------------------------------------------

namespace Microsoft.Azure.Cosmos
{
using System;
using System.Collections.Generic;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Documents;

internal class BatchCore : Batch
{
private readonly PartitionKey partitionKey;

private readonly ContainerCore container;

private List<ItemBatchOperation> operations;

/// <summary>
/// Initializes a new instance of the <see cref="BatchCore"/> class.
/// </summary>
/// <param name="container">Container that has items on which batch operations are to be performed.</param>
/// <param name="partitionKey">The partition key for all items in the batch. <see cref="PartitionKey"/>.</param>
internal BatchCore(
ContainerCore container,
PartitionKey partitionKey)
{
this.container = container;
this.partitionKey = partitionKey;
this.operations = new List<ItemBatchOperation>();
}

public override Batch CreateItem<T>(
T item,
BatchItemRequestOptions requestOptions = null)
{
if (item == null)
{
throw new ArgumentNullException(nameof(item));
}

this.operations.Add(new ItemBatchOperation<T>(
operationType: OperationType.Create,
operationIndex: this.operations.Count,
resource: item,
requestOptions: requestOptions));

return this;
}

public override Batch CreateItemStream(
Stream streamPayload,
BatchItemRequestOptions requestOptions = null)
{
if (streamPayload == null)
{
throw new ArgumentNullException(nameof(streamPayload));
}

this.operations.Add(new ItemBatchOperation(
operationType: OperationType.Create,
operationIndex: this.operations.Count,
resourceStream: streamPayload,
requestOptions: requestOptions));

return this;
}

public override Batch ReadItem(
string id,
BatchItemRequestOptions requestOptions = null)
{
if (id == null)
{
throw new ArgumentNullException(nameof(id));
}

this.operations.Add(new ItemBatchOperation(
operationType: OperationType.Read,
operationIndex: this.operations.Count,
id: id,
requestOptions: requestOptions));

return this;
}

public override Batch UpsertItem<T>(
T item,
BatchItemRequestOptions requestOptions = null)
{
if (item == null)
{
throw new ArgumentNullException(nameof(item));
}

this.operations.Add(new ItemBatchOperation<T>(
operationType: OperationType.Upsert,
operationIndex: this.operations.Count,
resource: item,
requestOptions: requestOptions));

return this;
}

public override Batch UpsertItemStream(
Stream streamPayload,
BatchItemRequestOptions requestOptions = null)
{
if (streamPayload == null)
{
throw new ArgumentNullException(nameof(streamPayload));
}

this.operations.Add(new ItemBatchOperation(
operationType: OperationType.Upsert,
operationIndex: this.operations.Count,
resourceStream: streamPayload,
requestOptions: requestOptions));

return this;
}

public override Batch ReplaceItem<T>(
string id,
T item,
BatchItemRequestOptions requestOptions = null)
{
if (id == null)
{
throw new ArgumentNullException(nameof(id));
}

if (item == null)
{
throw new ArgumentNullException(nameof(item));
}

this.operations.Add(new ItemBatchOperation<T>(
operationType: OperationType.Replace,
operationIndex: this.operations.Count,
id: id,
resource: item,
requestOptions: requestOptions));

return this;
}

public override Batch ReplaceItemStream(
string id,
Stream streamPayload,
BatchItemRequestOptions requestOptions = null)
{
if (id == null)
{
throw new ArgumentNullException(nameof(id));
}

if (streamPayload == null)
{
throw new ArgumentNullException(nameof(streamPayload));
}

this.operations.Add(new ItemBatchOperation(
operationType: OperationType.Replace,
operationIndex: this.operations.Count,
id: id,
resourceStream: streamPayload,
requestOptions: requestOptions));

return this;
}

public override Batch DeleteItem(
string id,
BatchItemRequestOptions requestOptions = null)
{
if (id == null)
{
throw new ArgumentNullException(nameof(id));
}

this.operations.Add(new ItemBatchOperation(
operationType: OperationType.Delete,
operationIndex: this.operations.Count,
id: id,
requestOptions: requestOptions));

return this;
}

public override Task<BatchResponse> ExecuteAsync(
CancellationToken cancellationToken = default(CancellationToken))
{
return this.ExecuteAsync(
Constants.MaxDirectModeBatchRequestBodySizeInBytes,
Constants.MaxOperationsInDirectModeBatchRequest,
requestOptions: null,
cancellationToken: cancellationToken);
}

/// <summary>
/// Executes the batch at the Azure Cosmos service as an asynchronous operation.
/// </summary>
/// <param name="requestOptions">Options that apply to the batch. Used only for EPK routing.</param>
/// <param name="cancellationToken">(Optional) <see cref="CancellationToken"/> representing request cancellation.</param>
/// <returns>An awaitable <see cref="BatchResponse"/> which contains the completion status and results of each operation.</returns>
public virtual Task<BatchResponse> ExecuteAsync(
RequestOptions requestOptions,
CancellationToken cancellationToken = default(CancellationToken))
{
return this.ExecuteAsync(
Constants.MaxDirectModeBatchRequestBodySizeInBytes,
Constants.MaxOperationsInDirectModeBatchRequest,
requestOptions,
cancellationToken);
}

/// <summary>
/// Adds an operation to patch an item into the batch.
/// </summary>
/// <param name="id">The cosmos item id.</param>
/// <param name="patchStream">A <see cref="Stream"/> containing the patch specification.</param>
/// <param name="requestOptions">(Optional) The options for the item request. <see cref="BatchItemRequestOptions"/>.</param>
/// <returns>The <see cref="Batch"/> instance with the operation added.</returns>
public virtual Batch PatchItemStream(
string id,
Stream patchStream,
BatchItemRequestOptions requestOptions = null)
{
this.operations.Add(new ItemBatchOperation(
operationType: OperationType.Patch,
operationIndex: this.operations.Count,
id: id,
resourceStream: patchStream,
requestOptions: requestOptions));

return this;
}

internal Task<BatchResponse> ExecuteAsync(
int maxServerRequestBodyLength,
int maxServerRequestOperationCount,
RequestOptions requestOptions = null,
CancellationToken cancellationToken = default(CancellationToken))
{
BatchExecutor executor = new BatchExecutor(this.container, this.partitionKey, this.operations, requestOptions, maxServerRequestBodyLength, maxServerRequestOperationCount);
this.operations = new List<ItemBatchOperation>();
return executor.ExecuteAsync(cancellationToken);
}
}
}
Loading

0 comments on commit af6a83a

Please sign in to comment.