Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Batch CRUD API #427

Merged
merged 18 commits into from
Jul 16, 2019
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
125 changes: 125 additions & 0 deletions Microsoft.Azure.Cosmos/src/Batch/Batch.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
//------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
//------------------------------------------------------------

namespace Microsoft.Azure.Cosmos
{
using System.IO;
using System.Threading;
using System.Threading.Tasks;

/// <summary>
/// Represents a batch of requests that will be performed atomically against the Azure Cosmos DB service.
/// </summary>
#if PREVIEW
public
#else
internal
#endif
abstract class Batch
{
/// <summary>
/// Adds an operation to create an item into the batch.
/// </summary>
/// <param name="item">A JSON serializable object that must contain an id property.<see cref="CosmosSerializer"/> to implement a custom serializer.</param>
/// <param name="requestOptions">(Optional) The options for the item request. <see cref="BatchItemRequestOptions"/>.</param>
/// <returns>The <see cref="Batch"/> instance with the operation added.</returns>
/// <typeparam name="T">The type of item to be created.</typeparam>
public abstract Batch CreateItem<T>(
T item,
BatchItemRequestOptions requestOptions = null);

/// <summary>
/// Adds an operation to create an item into the batch.
/// </summary>
/// <param name="streamPayload">
/// A <see cref="Stream"/> containing the payload of the item.
/// The stream must have a UTF-8 encoded JSON object which contains an id property.
/// </param>
/// <param name="requestOptions">(Optional) The options for the item request. <see cref="BatchItemRequestOptions"/>.</param>
/// <returns>The <see cref="Batch"/> instance with the operation added.</returns>
public abstract Batch CreateItemStream(
Stream streamPayload,
BatchItemRequestOptions requestOptions = null);

/// <summary>
/// Adds an operation to read an item into the batch.
/// </summary>
/// <param name="id">The cosmos item id.</param>
/// <param name="requestOptions">(Optional) The options for the item request. <see cref="BatchItemRequestOptions"/>.</param>
/// <returns>The <see cref="Batch"/> instance with the operation added.</returns>
public abstract Batch ReadItem(
string id,
BatchItemRequestOptions requestOptions = null);

/// <summary>
/// Adds an operation to upsert an item into the batch.
/// </summary>
/// <param name="item">A JSON serializable object that must contain an id property. <see cref="CosmosSerializer"/> to implement a custom serializer.</param>
/// <param name="requestOptions">(Optional) The options for the item request. <see cref="BatchItemRequestOptions"/>.</param>
/// <returns>The <see cref="Batch"/> instance with the operation added.</returns>
/// <typeparam name="T">The type of item to be created.</typeparam>
public abstract Batch UpsertItem<T>(
T item,
BatchItemRequestOptions requestOptions = null);

/// <summary>
/// Adds an operation to upsert an item into the batch.
/// </summary>
/// <param name="streamPayload">
/// A <see cref="Stream"/> containing the payload of the item.
/// The stream must have a UTF-8 encoded JSON object which contains an id property.
/// </param>
/// <param name="requestOptions">(Optional) The options for the item request. <see cref="BatchItemRequestOptions"/>.</param>
/// <returns>The <see cref="Batch"/> instance with the operation added.</returns>
public abstract Batch UpsertItemStream(
Stream streamPayload,
BatchItemRequestOptions requestOptions = null);

/// <summary>
/// Adds an operation to replace an item into the batch.
/// </summary>
/// <param name="id">The cosmos item id.</param>
/// <param name="item">A JSON serializable object that must contain an id property. <see cref="CosmosSerializer"/> to implement a custom serializer.</param>
/// <param name="requestOptions">(Optional) The options for the item request. <see cref="BatchItemRequestOptions"/>.</param>
/// <returns>The <see cref="Batch"/> instance with the operation added.</returns>
/// <typeparam name="T">The type of item to be created.</typeparam>
public abstract Batch ReplaceItem<T>(
string id,
T item,
BatchItemRequestOptions requestOptions = null);

/// <summary>
/// Adds an operation to replace an item into the batch.
/// </summary>
/// <param name="id">The cosmos item id.</param>
/// <param name="streamPayload">
/// A <see cref="Stream"/> containing the payload of the item.
/// The stream must have a UTF-8 encoded JSON object which contains an id property.
/// </param>
/// <param name="requestOptions">(Optional) The options for the item request. <see cref="BatchItemRequestOptions"/>.</param>
/// <returns>The <see cref="Batch"/> instance with the operation added.</returns>
public abstract Batch ReplaceItemStream(
string id,
Stream streamPayload,
BatchItemRequestOptions requestOptions = null);

/// <summary>
/// Adds an operation to delete an item into the batch.
/// </summary>
/// <param name="id">The cosmos item id.</param>
/// <param name="requestOptions">(Optional) The options for the item request. <see cref="BatchItemRequestOptions"/>.</param>
/// <returns>The <see cref="Batch"/> instance with the operation added.</returns>
public abstract Batch DeleteItem(
string id,
BatchItemRequestOptions requestOptions = null);

/// <summary>
/// Executes the batch at the Azure Cosmos service as an asynchronous operation.
/// </summary>
/// <param name="cancellationToken">(Optional) <see cref="CancellationToken"/> representing request cancellation.</param>
/// <returns>An awaitable <see cref="BatchResponse"/> which contains the completion status and results of each operation.</returns>
public abstract Task<BatchResponse> ExecuteAsync(
CancellationToken cancellationToken = default(CancellationToken));
}
}
254 changes: 254 additions & 0 deletions Microsoft.Azure.Cosmos/src/Batch/BatchCore.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,254 @@
//------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
//------------------------------------------------------------

namespace Microsoft.Azure.Cosmos
{
using System;
using System.Collections.Generic;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Documents;

internal class BatchCore : Batch
{
private readonly PartitionKey partitionKey;

private readonly ContainerCore container;

private List<ItemBatchOperation> operations;

/// <summary>
/// Initializes a new instance of the <see cref="BatchCore"/> class.
/// </summary>
/// <param name="container">Container that has items on which batch operations are to be performed.</param>
/// <param name="partitionKey">The partition key for all items in the batch. <see cref="PartitionKey"/>.</param>
internal BatchCore(
ContainerCore container,
PartitionKey partitionKey)
{
this.container = container;
this.partitionKey = partitionKey;
this.operations = new List<ItemBatchOperation>();
}

public override Batch CreateItem<T>(
T item,
BatchItemRequestOptions requestOptions = null)
{
if (item == null)
{
throw new ArgumentNullException(nameof(item));
}

this.operations.Add(new ItemBatchOperation<T>(
operationType: OperationType.Create,
operationIndex: this.operations.Count,
resource: item,
requestOptions: requestOptions));

return this;
}

public override Batch CreateItemStream(
Stream streamPayload,
BatchItemRequestOptions requestOptions = null)
{
if (streamPayload == null)
{
throw new ArgumentNullException(nameof(streamPayload));
}

this.operations.Add(new ItemBatchOperation(
operationType: OperationType.Create,
operationIndex: this.operations.Count,
resourceStream: streamPayload,
requestOptions: requestOptions));

return this;
}

public override Batch ReadItem(
string id,
BatchItemRequestOptions requestOptions = null)
{
if (id == null)
{
throw new ArgumentNullException(nameof(id));
}

this.operations.Add(new ItemBatchOperation(
operationType: OperationType.Read,
operationIndex: this.operations.Count,
id: id,
requestOptions: requestOptions));

return this;
}

public override Batch UpsertItem<T>(
T item,
BatchItemRequestOptions requestOptions = null)
{
if (item == null)
{
throw new ArgumentNullException(nameof(item));
}

this.operations.Add(new ItemBatchOperation<T>(
operationType: OperationType.Upsert,
operationIndex: this.operations.Count,
resource: item,
requestOptions: requestOptions));

return this;
}

public override Batch UpsertItemStream(
Stream streamPayload,
BatchItemRequestOptions requestOptions = null)
{
if (streamPayload == null)
{
throw new ArgumentNullException(nameof(streamPayload));
}

this.operations.Add(new ItemBatchOperation(
operationType: OperationType.Upsert,
operationIndex: this.operations.Count,
resourceStream: streamPayload,
requestOptions: requestOptions));

return this;
}

public override Batch ReplaceItem<T>(
string id,
T item,
BatchItemRequestOptions requestOptions = null)
{
if (id == null)
{
throw new ArgumentNullException(nameof(id));
}

if (item == null)
{
throw new ArgumentNullException(nameof(item));
}

this.operations.Add(new ItemBatchOperation<T>(
operationType: OperationType.Replace,
operationIndex: this.operations.Count,
id: id,
resource: item,
requestOptions: requestOptions));

return this;
}

public override Batch ReplaceItemStream(
string id,
Stream streamPayload,
BatchItemRequestOptions requestOptions = null)
{
if (id == null)
{
throw new ArgumentNullException(nameof(id));
}

if (streamPayload == null)
{
throw new ArgumentNullException(nameof(streamPayload));
}

this.operations.Add(new ItemBatchOperation(
operationType: OperationType.Replace,
operationIndex: this.operations.Count,
id: id,
resourceStream: streamPayload,
requestOptions: requestOptions));

return this;
}

public override Batch DeleteItem(
string id,
BatchItemRequestOptions requestOptions = null)
{
if (id == null)
{
throw new ArgumentNullException(nameof(id));
}

this.operations.Add(new ItemBatchOperation(
operationType: OperationType.Delete,
operationIndex: this.operations.Count,
id: id,
requestOptions: requestOptions));

return this;
}

public override Task<BatchResponse> ExecuteAsync(
CancellationToken cancellationToken = default(CancellationToken))
{
return this.ExecuteAsync(
Constants.MaxDirectModeBatchRequestBodySizeInBytes,
Constants.MaxOperationsInDirectModeBatchRequest,
requestOptions: null,
cancellationToken: cancellationToken);
}

/// <summary>
/// Executes the batch at the Azure Cosmos service as an asynchronous operation.
/// </summary>
/// <param name="requestOptions">Options that apply to the batch. Used only for EPK routing.</param>
/// <param name="cancellationToken">(Optional) <see cref="CancellationToken"/> representing request cancellation.</param>
/// <returns>An awaitable <see cref="BatchResponse"/> which contains the completion status and results of each operation.</returns>
public virtual Task<BatchResponse> ExecuteAsync(
RequestOptions requestOptions,
CancellationToken cancellationToken = default(CancellationToken))
{
return this.ExecuteAsync(
Constants.MaxDirectModeBatchRequestBodySizeInBytes,
Constants.MaxOperationsInDirectModeBatchRequest,
requestOptions,
cancellationToken);
}

/// <summary>
/// Adds an operation to patch an item into the batch.
/// </summary>
/// <param name="id">The cosmos item id.</param>
/// <param name="patchStream">A <see cref="Stream"/> containing the patch specification.</param>
/// <param name="requestOptions">(Optional) The options for the item request. <see cref="BatchItemRequestOptions"/>.</param>
/// <returns>The <see cref="Batch"/> instance with the operation added.</returns>
public virtual Batch PatchItemStream(
string id,
Stream patchStream,
BatchItemRequestOptions requestOptions = null)
{
this.operations.Add(new ItemBatchOperation(
operationType: OperationType.Patch,
operationIndex: this.operations.Count,
id: id,
resourceStream: patchStream,
requestOptions: requestOptions));

return this;
}

internal Task<BatchResponse> ExecuteAsync(
int maxServerRequestBodyLength,
int maxServerRequestOperationCount,
RequestOptions requestOptions = null,
CancellationToken cancellationToken = default(CancellationToken))
{
BatchExecutor executor = new BatchExecutor(this.container, this.partitionKey, this.operations, requestOptions, maxServerRequestBodyLength, maxServerRequestOperationCount);
this.operations = new List<ItemBatchOperation>();
return executor.ExecuteAsync(cancellationToken);
}
}
}
Loading