diff --git a/Microsoft.Azure.Cosmos/src/Batch/BatchAsyncBatcher.cs b/Microsoft.Azure.Cosmos/src/Batch/BatchAsyncBatcher.cs index 21dda95a7c..499eab03d3 100644 --- a/Microsoft.Azure.Cosmos/src/Batch/BatchAsyncBatcher.cs +++ b/Microsoft.Azure.Cosmos/src/Batch/BatchAsyncBatcher.cs @@ -152,22 +152,21 @@ public virtual bool TryAdd(ItemBatchOperation operation) try { PartitionKeyRangeBatchExecutionResult result = await this.executor(serverRequest, cancellationToken); - - if (result.IsSplit()) - { - foreach (ItemBatchOperation operationToRetry in result.Operations) - { - await this.retrier(operationToRetry, cancellationToken); - } - - return; - } - using (PartitionKeyRangeBatchResponse batchResponse = new PartitionKeyRangeBatchResponse(serverRequest.Operations.Count, result.ServerResponse, this.cosmosSerializer)) { foreach (ItemBatchOperation itemBatchOperation in batchResponse.Operations) { BatchOperationResult response = batchResponse[itemBatchOperation.OperationIndex]; + if (!response.IsSuccessStatusCode) + { + Documents.ShouldRetryResult shouldRetry = await itemBatchOperation.Context.ShouldRetryAsync(response, cancellationToken); + if (shouldRetry.ShouldRetry) + { + await this.retrier(itemBatchOperation, cancellationToken); + continue; + } + } + itemBatchOperation.Context.Complete(this, response); } } diff --git a/Microsoft.Azure.Cosmos/src/Batch/BatchAsyncContainerExecutor.cs b/Microsoft.Azure.Cosmos/src/Batch/BatchAsyncContainerExecutor.cs index 0440cd27c3..499cbcf49a 100644 --- a/Microsoft.Azure.Cosmos/src/Batch/BatchAsyncContainerExecutor.cs +++ b/Microsoft.Azure.Cosmos/src/Batch/BatchAsyncContainerExecutor.cs @@ -25,7 +25,7 @@ namespace Microsoft.Azure.Cosmos /// internal class BatchAsyncContainerExecutor : IDisposable { - private const int DefaultDispatchTimer = 10; + private const int DefaultDispatchTimerInSeconds = 1; private const int MinimumDispatchTimerInSeconds = 1; private readonly ContainerCore cosmosContainer; @@ -36,13 +36,21 @@ internal class BatchAsyncContainerExecutor : IDisposable private readonly ConcurrentDictionary streamersByPartitionKeyRange = new ConcurrentDictionary(); private readonly ConcurrentDictionary limitersByPartitionkeyRange = new ConcurrentDictionary(); private readonly TimerPool timerPool; + private readonly RetryOptions retryOptions; + + /// + /// For unit testing. + /// + internal BatchAsyncContainerExecutor() + { + } public BatchAsyncContainerExecutor( ContainerCore cosmosContainer, CosmosClientContext cosmosClientContext, int maxServerRequestOperationCount, int maxServerRequestBodyLength, - int dispatchTimerInSeconds = BatchAsyncContainerExecutor.DefaultDispatchTimer) + int dispatchTimerInSeconds = BatchAsyncContainerExecutor.DefaultDispatchTimerInSeconds) { if (cosmosContainer == null) { @@ -70,9 +78,10 @@ public BatchAsyncContainerExecutor( this.maxServerRequestOperationCount = maxServerRequestOperationCount; this.dispatchTimerInSeconds = dispatchTimerInSeconds; this.timerPool = new TimerPool(BatchAsyncContainerExecutor.MinimumDispatchTimerInSeconds); + this.retryOptions = cosmosClientContext.ClientOptions.GetConnectionPolicy().RetryOptions; } - public async Task AddAsync( + public virtual async Task AddAsync( ItemBatchOperation operation, ItemRequestOptions itemRequestOptions = null, CancellationToken cancellationToken = default(CancellationToken)) @@ -86,10 +95,10 @@ public async Task AddAsync( string resolvedPartitionKeyRangeId = await this.ResolvePartitionKeyRangeIdAsync(operation, cancellationToken).ConfigureAwait(false); BatchAsyncStreamer streamer = this.GetOrAddStreamerForPartitionKeyRange(resolvedPartitionKeyRangeId); - ItemBatchOperationContext context = new ItemBatchOperationContext(resolvedPartitionKeyRangeId); + ItemBatchOperationContext context = new ItemBatchOperationContext(resolvedPartitionKeyRangeId, BatchAsyncContainerExecutor.GetRetryPolicy(this.retryOptions)); operation.AttachContext(context); streamer.Add(operation); - return await context.Task; + return await context.OperationTask; } public void Dispose() @@ -107,7 +116,7 @@ public void Dispose() this.timerPool.Dispose(); } - internal async Task ValidateOperationAsync( + internal virtual async Task ValidateOperationAsync( ItemBatchOperation operation, ItemRequestOptions itemRequestOptions = null, CancellationToken cancellationToken = default(CancellationToken)) @@ -135,6 +144,14 @@ internal async Task ValidateOperationAsync( } } + private static IDocumentClientRetryPolicy GetRetryPolicy(RetryOptions retryOptions) + { + return new BulkPartitionKeyRangeGoneRetryPolicy( + new ResourceThrottleRetryPolicy( + retryOptions.MaxRetryAttemptsOnThrottledRequests, + retryOptions.MaxRetryWaitTimeInSeconds)); + } + private static bool ValidateOperationEPK( ItemBatchOperation operation, ItemRequestOptions itemRequestOptions) diff --git a/Microsoft.Azure.Cosmos/src/Batch/BatchItemRequestOptions.cs b/Microsoft.Azure.Cosmos/src/Batch/BatchItemRequestOptions.cs index 3611886715..079f8ead7b 100644 --- a/Microsoft.Azure.Cosmos/src/Batch/BatchItemRequestOptions.cs +++ b/Microsoft.Azure.Cosmos/src/Batch/BatchItemRequestOptions.cs @@ -23,5 +23,22 @@ class BatchItemRequestOptions : RequestOptions /// /// public IndexingDirective? IndexingDirective { get; set; } + + internal static BatchItemRequestOptions FromItemRequestOptions(ItemRequestOptions itemRequestOptions) + { + if (itemRequestOptions == null) + { + return null; + } + + RequestOptions requestOptions = itemRequestOptions as RequestOptions; + BatchItemRequestOptions batchItemRequestOptions = new BatchItemRequestOptions(); + batchItemRequestOptions.IndexingDirective = itemRequestOptions.IndexingDirective; + batchItemRequestOptions.IfMatchEtag = requestOptions.IfMatchEtag; + batchItemRequestOptions.IfNoneMatchEtag = requestOptions.IfNoneMatchEtag; + batchItemRequestOptions.Properties = requestOptions.Properties; + batchItemRequestOptions.IsEffectivePartitionKeyRouting = requestOptions.IsEffectivePartitionKeyRouting; + return batchItemRequestOptions; + } } } \ No newline at end of file diff --git a/Microsoft.Azure.Cosmos/src/Batch/BatchOperationResult.cs b/Microsoft.Azure.Cosmos/src/Batch/BatchOperationResult.cs index 26a1719881..0d1343fe1a 100644 --- a/Microsoft.Azure.Cosmos/src/Batch/BatchOperationResult.cs +++ b/Microsoft.Azure.Cosmos/src/Batch/BatchOperationResult.cs @@ -176,6 +176,16 @@ private static Result ReadOperationResult(ref RowReader reader, out BatchOperati return Result.Success; } + + internal ResponseMessage ToResponseMessage() + { + ResponseMessage responseMessage = new ResponseMessage(this.StatusCode); + responseMessage.Headers.SubStatusCode = this.SubStatusCode; + responseMessage.Headers.ETag = this.ETag; + responseMessage.Headers.RetryAfter = this.RetryAfter; + responseMessage.Content = this.ResourceStream; + return responseMessage; + } } /// diff --git a/Microsoft.Azure.Cosmos/src/Batch/ItemBatchOperationContext.cs b/Microsoft.Azure.Cosmos/src/Batch/ItemBatchOperationContext.cs index e1d264fdbc..833729d2a6 100644 --- a/Microsoft.Azure.Cosmos/src/Batch/ItemBatchOperationContext.cs +++ b/Microsoft.Azure.Cosmos/src/Batch/ItemBatchOperationContext.cs @@ -5,8 +5,10 @@ namespace Microsoft.Azure.Cosmos { using System; + using System.Threading; using System.Threading.Tasks; using Microsoft.Azure.Cosmos.Core.Trace; + using Microsoft.Azure.Documents; /// /// Context for a particular Batch operation. @@ -17,13 +19,35 @@ internal class ItemBatchOperationContext : IDisposable public BatchAsyncBatcher CurrentBatcher { get; set; } - public Task Task => this.taskCompletionSource.Task; + public Task OperationTask => this.taskCompletionSource.Task; + + private readonly IDocumentClientRetryPolicy retryPolicy; private TaskCompletionSource taskCompletionSource = new TaskCompletionSource(); - public ItemBatchOperationContext(string partitionKeyRangeId) + public ItemBatchOperationContext( + string partitionKeyRangeId, + IDocumentClientRetryPolicy retryPolicy = null) { this.PartitionKeyRangeId = partitionKeyRangeId; + this.retryPolicy = retryPolicy; + } + + /// + /// Based on the Retry Policy, if a failed response should retry. + /// + public Task ShouldRetryAsync( + BatchOperationResult batchOperationResult, + CancellationToken cancellationToken) + { + if (this.retryPolicy == null + || batchOperationResult.IsSuccessStatusCode) + { + return Task.FromResult(ShouldRetryResult.NoRetry()); + } + + ResponseMessage responseMessage = batchOperationResult.ToResponseMessage(); + return this.retryPolicy.ShouldRetryAsync(responseMessage, cancellationToken); } public void Complete( diff --git a/Microsoft.Azure.Cosmos/src/BulkPartitionKeyRangeGoneRetryPolicy.cs b/Microsoft.Azure.Cosmos/src/BulkPartitionKeyRangeGoneRetryPolicy.cs new file mode 100644 index 0000000000..f0cbd6dd10 --- /dev/null +++ b/Microsoft.Azure.Cosmos/src/BulkPartitionKeyRangeGoneRetryPolicy.cs @@ -0,0 +1,96 @@ +//------------------------------------------------------------ +// Copyright (c) Microsoft Corporation. All rights reserved. +//------------------------------------------------------------ + +namespace Microsoft.Azure.Cosmos +{ + using System; + using System.Net; + using System.Threading; + using System.Threading.Tasks; + using Microsoft.Azure.Documents; + + /// + /// Used only in the context of Bulk Stream operations. + /// + /// + /// + internal sealed class BulkPartitionKeyRangeGoneRetryPolicy : IDocumentClientRetryPolicy + { + private const int MaxRetries = 1; + + private readonly IDocumentClientRetryPolicy nextRetryPolicy; + + private int retriesAttempted; + + public BulkPartitionKeyRangeGoneRetryPolicy(IDocumentClientRetryPolicy nextRetryPolicy) + { + this.nextRetryPolicy = nextRetryPolicy; + } + + public Task ShouldRetryAsync( + Exception exception, + CancellationToken cancellationToken) + { + DocumentClientException clientException = exception as DocumentClientException; + + ShouldRetryResult shouldRetryResult = this.ShouldRetryInternal( + clientException?.StatusCode, + clientException?.GetSubStatus(), + clientException?.ResourceAddress); + + if (shouldRetryResult != null) + { + return Task.FromResult(shouldRetryResult); + } + + if (this.nextRetryPolicy == null) + { + return Task.FromResult(ShouldRetryResult.NoRetry()); + } + + return this.nextRetryPolicy.ShouldRetryAsync(exception, cancellationToken); + } + + public Task ShouldRetryAsync( + ResponseMessage cosmosResponseMessage, + CancellationToken cancellationToken) + { + ShouldRetryResult shouldRetryResult = this.ShouldRetryInternal(cosmosResponseMessage?.StatusCode, + cosmosResponseMessage?.Headers.SubStatusCode, + cosmosResponseMessage?.GetResourceAddress()); + if (shouldRetryResult != null) + { + return Task.FromResult(shouldRetryResult); + } + + if (this.nextRetryPolicy == null) + { + return Task.FromResult(ShouldRetryResult.NoRetry()); + } + + return this.nextRetryPolicy.ShouldRetryAsync(cosmosResponseMessage, cancellationToken); + } + + public void OnBeforeSendRequest(DocumentServiceRequest request) + { + this.nextRetryPolicy.OnBeforeSendRequest(request); + } + + private ShouldRetryResult ShouldRetryInternal( + HttpStatusCode? statusCode, + SubStatusCodes? subStatusCode, + string resourceIdOrFullName) + { + if (statusCode == HttpStatusCode.Gone + && subStatusCode == SubStatusCodes.PartitionKeyRangeGone + && this.retriesAttempted < MaxRetries) + { + this.retriesAttempted++; + return ShouldRetryResult.RetryAfter(TimeSpan.Zero); + } + + return null; + } + } +} diff --git a/Microsoft.Azure.Cosmos/src/CosmosClientOptions.cs b/Microsoft.Azure.Cosmos/src/CosmosClientOptions.cs index 745820ade3..559044d54b 100644 --- a/Microsoft.Azure.Cosmos/src/CosmosClientOptions.cs +++ b/Microsoft.Azure.Cosmos/src/CosmosClientOptions.cs @@ -340,6 +340,16 @@ public CosmosSerializer Serializer } } + /// + /// Allows optimistic batching of requests to service. Setting this option might impact the latency of the operations. Hence this option is recommended for non-latency sensitive scenarios only. + /// +#if PREVIEW + public +#else + internal +#endif + bool AllowBulkExecution { get; set; } + /// /// A JSON serializer used by the CosmosClient to serialize or de-serialize cosmos request/responses. /// The default serializer is always used for all system owned types like DatabaseProperties. diff --git a/Microsoft.Azure.Cosmos/src/Fluent/CosmosClientBuilder.cs b/Microsoft.Azure.Cosmos/src/Fluent/CosmosClientBuilder.cs index 5080861c84..7c8a00e852 100644 --- a/Microsoft.Azure.Cosmos/src/Fluent/CosmosClientBuilder.cs +++ b/Microsoft.Azure.Cosmos/src/Fluent/CosmosClientBuilder.cs @@ -334,6 +334,23 @@ public CosmosClientBuilder WithCustomSerializer(CosmosSerializer cosmosJsonSeria return this; } + /// + /// Allows optimistic batching of requests to service. Setting this option might impact the latency of the operations. Hence this option is recommended for non-latency sensitive scenarios only. + /// + /// Whether is enabled. + /// The object + /// +#if PREVIEW + public +#else + internal +#endif + CosmosClientBuilder WithBulkexecution(bool enabled) + { + this.clientOptions.AllowBulkExecution = enabled; + return this; + } + /// /// The event handler to be invoked before the request is sent. /// diff --git a/Microsoft.Azure.Cosmos/src/Resource/ClientContextCore.cs b/Microsoft.Azure.Cosmos/src/Resource/ClientContextCore.cs index 7696aad351..c21dda602f 100644 --- a/Microsoft.Azure.Cosmos/src/Resource/ClientContextCore.cs +++ b/Microsoft.Azure.Cosmos/src/Resource/ClientContextCore.cs @@ -88,6 +88,45 @@ internal override void ValidateResource(string resourceId) this.DocumentClient.ValidateResource(resourceId); } + internal override Task ProcessResourceOperationStreamAsync( + Uri resourceUri, + ResourceType resourceType, + OperationType operationType, + RequestOptions requestOptions, + ContainerCore cosmosContainerCore, + PartitionKey? partitionKey, + string itemId, + Stream streamPayload, + Action requestEnricher, + CancellationToken cancellationToken) + { + if (this.IsBulkOperationSupported(resourceType, operationType)) + { + return this.ProcessResourceOperationAsBulkStreamAsync( + resourceUri: resourceUri, + resourceType: resourceType, + operationType: operationType, + requestOptions: requestOptions, + cosmosContainerCore: cosmosContainerCore, + partitionKey: partitionKey, + itemId: itemId, + streamPayload: streamPayload, + requestEnricher: requestEnricher, + cancellationToken: cancellationToken); + } + + return this.ProcessResourceOperationStreamAsync( + resourceUri: resourceUri, + resourceType: resourceType, + operationType: operationType, + requestOptions: requestOptions, + cosmosContainerCore: cosmosContainerCore, + partitionKey: partitionKey, + streamPayload: streamPayload, + requestEnricher: requestEnricher, + cancellationToken: cancellationToken); + } + internal override Task ProcessResourceOperationStreamAsync( Uri resourceUri, ResourceType resourceType, @@ -153,5 +192,41 @@ internal override async Task GetCachedContainerPropertiesAs throw new CosmosException(ex.ToCosmosResponseMessage(null), ex.Message, ex.Error); } } + + private async Task ProcessResourceOperationAsBulkStreamAsync( + Uri resourceUri, + ResourceType resourceType, + OperationType operationType, + RequestOptions requestOptions, + ContainerCore cosmosContainerCore, + PartitionKey? partitionKey, + string itemId, + Stream streamPayload, + Action requestEnricher, + CancellationToken cancellationToken) + { + ItemRequestOptions itemRequestOptions = requestOptions as ItemRequestOptions; + BatchItemRequestOptions batchItemRequestOptions = BatchItemRequestOptions.FromItemRequestOptions(itemRequestOptions); + ItemBatchOperation itemBatchOperation = new ItemBatchOperation(operationType, /* index */ 0, partitionKey, itemId, streamPayload, batchItemRequestOptions); + BatchOperationResult batchOperationResult = await cosmosContainerCore.BatchExecutor.AddAsync(itemBatchOperation, itemRequestOptions, cancellationToken); + return batchOperationResult.ToResponseMessage(); + } + + private bool IsBulkOperationSupported( + ResourceType resourceType, + OperationType operationType) + { + if (!this.ClientOptions.AllowBulkExecution) + { + return false; + } + + return resourceType == ResourceType.Document + && (operationType == OperationType.Create + || operationType == OperationType.Upsert + || operationType == OperationType.Read + || operationType == OperationType.Delete + || operationType == OperationType.Replace); + } } } \ No newline at end of file diff --git a/Microsoft.Azure.Cosmos/src/Resource/Container/ContainerCore.Items.cs b/Microsoft.Azure.Cosmos/src/Resource/Container/ContainerCore.Items.cs index 7b10d93bce..640614aa1e 100644 --- a/Microsoft.Azure.Cosmos/src/Resource/Container/ContainerCore.Items.cs +++ b/Microsoft.Azure.Cosmos/src/Resource/Container/ContainerCore.Items.cs @@ -497,6 +497,7 @@ internal async Task ProcessItemStreamAsync( requestOptions, this, partitionKey, + itemId, streamPayload, null, cancellationToken); diff --git a/Microsoft.Azure.Cosmos/src/Resource/Container/ContainerCore.cs b/Microsoft.Azure.Cosmos/src/Resource/Container/ContainerCore.cs index d21eebc43d..2de7bbe418 100644 --- a/Microsoft.Azure.Cosmos/src/Resource/Container/ContainerCore.cs +++ b/Microsoft.Azure.Cosmos/src/Resource/Container/ContainerCore.cs @@ -8,6 +8,7 @@ namespace Microsoft.Azure.Cosmos using System.IO; using System.Threading; using System.Threading.Tasks; + using Microsoft.Azure.Cosmos.Handlers; using Microsoft.Azure.Cosmos.Routing; using Microsoft.Azure.Cosmos.Scripts; using Microsoft.Azure.Documents; @@ -44,6 +45,7 @@ internal ContainerCore( this.Scripts = new ScriptsCore(this, this.ClientContext); this.cachedUriSegmentWithoutId = this.GetResourceSegmentUriWithoutId(); this.queryClient = queryClient ?? new CosmosQueryClientCore(this.ClientContext, this); + this.BatchExecutor = this.InitializeBatchExecutorForContainer(); } public override string Id { get; } @@ -54,6 +56,8 @@ internal ContainerCore( internal virtual CosmosClientContext ClientContext { get; } + internal virtual BatchAsyncContainerExecutor BatchExecutor { get; } + public override Conflicts Conflicts { get; } public override Scripts.Scripts Scripts { get; } @@ -260,6 +264,15 @@ internal virtual Task GetRoutingMapAsync(CancellationToken .Unwrap(); } + internal virtual BatchAsyncContainerExecutor InitializeBatchExecutorForContainer() + { + return new BatchAsyncContainerExecutor( + this, + this.ClientContext, + Constants.MaxOperationsInDirectModeBatchRequest, + Constants.MaxDirectModeBatchRequestBodySizeInBytes); + } + private Task ReplaceStreamInternalAsync( Stream streamPayload, ContainerRequestOptions requestOptions = null, diff --git a/Microsoft.Azure.Cosmos/src/Resource/CosmosClientContext.cs b/Microsoft.Azure.Cosmos/src/Resource/CosmosClientContext.cs index a03cea8a93..f9c567194c 100644 --- a/Microsoft.Azure.Cosmos/src/Resource/CosmosClientContext.cs +++ b/Microsoft.Azure.Cosmos/src/Resource/CosmosClientContext.cs @@ -57,6 +57,22 @@ internal abstract Task GetCachedContainerPropertiesAsync( string containerUri, CancellationToken cancellationToken = default(CancellationToken)); + /// + /// This is a wrapper around ExecUtil method. This allows the calls to be mocked so logic done + /// in a resource can be unit tested. + /// + internal abstract Task ProcessResourceOperationStreamAsync( + Uri resourceUri, + ResourceType resourceType, + OperationType operationType, + RequestOptions requestOptions, + ContainerCore cosmosContainerCore, + PartitionKey? partitionKey, + string itemId, + Stream streamPayload, + Action requestEnricher, + CancellationToken cancellationToken); + /// /// This is a wrapper around ExecUtil method. This allows the calls to be mocked so logic done /// in a resource can be unit tested. diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/Batch/BatchAsyncContainerExecutorTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/Batch/BatchAsyncContainerExecutorTests.cs index 3fbee77ab8..5de0f820e5 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/Batch/BatchAsyncContainerExecutorTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/Batch/BatchAsyncContainerExecutorTests.cs @@ -13,9 +13,7 @@ namespace Microsoft.Azure.Cosmos.SDK.EmulatorTests using Microsoft.VisualStudio.TestTools.UnitTesting; [TestClass] -#pragma warning disable CA1001 // Types that own disposable fields should be disposable public class BatchAsyncContainerExecutorTests -#pragma warning restore CA1001 // Types that own disposable fields should be disposable { private static CosmosSerializer cosmosDefaultJsonSerializer = new CosmosJsonDotNetSerializer(); private CosmosClient cosmosClient; @@ -99,7 +97,6 @@ private static ItemBatchOperation CreateItem(string id) private class MyDocument { - //[JsonProperty("id")] public string id { get; set; } public string Status { get; set; } diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/Batch/CosmosItemBulkTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/Batch/CosmosItemBulkTests.cs new file mode 100644 index 0000000000..e6b4f46d70 --- /dev/null +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/Batch/CosmosItemBulkTests.cs @@ -0,0 +1,370 @@ +//------------------------------------------------------------ +// Copyright (c) Microsoft Corporation. All rights reserved. +//------------------------------------------------------------ + +namespace Microsoft.Azure.Cosmos.SDK.EmulatorTests +{ + using System; + using System.Collections.Generic; + using System.Net; + using System.Threading.Tasks; + using Microsoft.VisualStudio.TestTools.UnitTesting; + + [TestClass] + public class CosmosItemBulkTests + { + private static CosmosSerializer cosmosDefaultJsonSerializer = new CosmosJsonDotNetSerializer(); + + private Container container; + private Database database; + + [TestInitialize] + public async Task TestInitialize() + { + CosmosClientOptions clientOptions = new CosmosClientOptions(); + clientOptions.AllowBulkExecution = true; + CosmosClient client = TestCommon.CreateCosmosClient(clientOptions); + + DatabaseResponse response = await client.CreateDatabaseIfNotExistsAsync(Guid.NewGuid().ToString()); + this.database = response.Database; + + ContainerResponse containerResponse = await this.database.CreateContainerAsync(Guid.NewGuid().ToString(), "/Status", 10000); + this.container = containerResponse; + } + + [TestCleanup] + public async Task Cleanup() + { + await this.database.DeleteAsync(); + } + + [TestMethod] + public async Task CreateItemStream_WithBulk() + { + List> tasks = new List>(); + for (int i = 0; i < 100; i++) + { + tasks.Add(ExecuteCreateStreamAsync(this.container, CreateItem(i.ToString()))); + } + + await Task.WhenAll(tasks); + + for (int i = 0; i < 100; i++) + { + Task task = tasks[i]; + ResponseMessage result = await task; + Assert.AreEqual(HttpStatusCode.Created, result.StatusCode); + + MyDocument document = cosmosDefaultJsonSerializer.FromStream(result.Content); + Assert.AreEqual(i.ToString(), document.id); + } + } + + [TestMethod] + public async Task CreateItemAsync_WithBulk() + { + List>> tasks = new List>>(); + for (int i = 0; i < 100; i++) + { + tasks.Add(ExecuteCreateAsync(this.container, CreateItem(i.ToString()))); + } + + await Task.WhenAll(tasks); + + for (int i = 0; i < 100; i++) + { + Task> task = tasks[i]; + ItemResponse result = await task; + Assert.AreEqual(HttpStatusCode.Created, result.StatusCode); + } + } + + [TestMethod] + public async Task UpsertItemStream_WithBulk() + { + List> tasks = new List>(); + for (int i = 0; i < 100; i++) + { + tasks.Add(ExecuteUpsertStreamAsync(this.container, CreateItem(i.ToString()))); + } + + await Task.WhenAll(tasks); + + for (int i = 0; i < 100; i++) + { + Task task = tasks[i]; + ResponseMessage result = await task; + Assert.AreEqual(HttpStatusCode.Created, result.StatusCode); + + MyDocument document = cosmosDefaultJsonSerializer.FromStream(result.Content); + Assert.AreEqual(i.ToString(), document.id); + } + } + + [TestMethod] + public async Task UpsertItem_WithBulk() + { + List>> tasks = new List>>(); + for (int i = 0; i < 100; i++) + { + tasks.Add(ExecuteUpsertAsync(this.container, CreateItem(i.ToString()))); + } + + await Task.WhenAll(tasks); + + for (int i = 0; i < 100; i++) + { + Task> task = tasks[i]; + ItemResponse result = await task; + Assert.AreEqual(HttpStatusCode.Created, result.StatusCode); + } + } + + [TestMethod] + public async Task DeleteItemStream_WithBulk() + { + List createdDocuments = new List(); + // Create the items + List> tasks = new List>(); + for (int i = 0; i < 100; i++) + { + MyDocument createdDocument = CreateItem(i.ToString()); + createdDocuments.Add(createdDocument); + tasks.Add(ExecuteCreateStreamAsync(this.container, createdDocument)); + } + + await Task.WhenAll(tasks); + + List> deleteTasks = new List>(); + // Delete the items + foreach (MyDocument createdDocument in createdDocuments) + { + deleteTasks.Add(ExecuteDeleteStreamAsync(this.container, createdDocument)); + } + + await Task.WhenAll(deleteTasks); + for (int i = 0; i < 100; i++) + { + Task task = deleteTasks[i]; + ResponseMessage result = await task; + Assert.AreEqual(HttpStatusCode.NoContent, result.StatusCode); + } + } + + [TestMethod] + public async Task DeleteItem_WithBulk() + { + List createdDocuments = new List(); + // Create the items + List>> tasks = new List>>(); + for (int i = 0; i < 100; i++) + { + MyDocument createdDocument = CreateItem(i.ToString()); + createdDocuments.Add(createdDocument); + tasks.Add(ExecuteCreateAsync(this.container, createdDocument)); + } + + await Task.WhenAll(tasks); + + List>> deleteTasks = new List>>(); + // Delete the items + foreach (MyDocument createdDocument in createdDocuments) + { + deleteTasks.Add(ExecuteDeleteAsync(this.container, createdDocument)); + } + + await Task.WhenAll(deleteTasks); + for (int i = 0; i < 100; i++) + { + Task> task = deleteTasks[i]; + ItemResponse result = await task; + Assert.AreEqual(HttpStatusCode.NoContent, result.StatusCode); + } + } + + [TestMethod] + public async Task ReadItemStream_WithBulk() + { + List createdDocuments = new List(); + // Create the items + List> tasks = new List>(); + for (int i = 0; i < 100; i++) + { + MyDocument createdDocument = CreateItem(i.ToString()); + createdDocuments.Add(createdDocument); + tasks.Add(ExecuteCreateStreamAsync(this.container, createdDocument)); + } + + await Task.WhenAll(tasks); + + List> readTasks = new List>(); + // Delete the items + foreach (MyDocument createdDocument in createdDocuments) + { + readTasks.Add(ExecuteReadStreamAsync(this.container, createdDocument)); + } + + await Task.WhenAll(readTasks); + for (int i = 0; i < 100; i++) + { + Task task = readTasks[i]; + ResponseMessage result = await task; + Assert.AreEqual(HttpStatusCode.OK, result.StatusCode); + } + } + + [TestMethod] + public async Task ReadItem_WithBulk() + { + List createdDocuments = new List(); + // Create the items + List>> tasks = new List>>(); + for (int i = 0; i < 100; i++) + { + MyDocument createdDocument = CreateItem(i.ToString()); + createdDocuments.Add(createdDocument); + tasks.Add(ExecuteCreateAsync(this.container, createdDocument)); + } + + await Task.WhenAll(tasks); + + List>> readTasks = new List>>(); + // Delete the items + foreach (MyDocument createdDocument in createdDocuments) + { + readTasks.Add(ExecuteReadAsync(this.container, createdDocument)); + } + + await Task.WhenAll(readTasks); + for (int i = 0; i < 100; i++) + { + Task> task = readTasks[i]; + ItemResponse result = await task; + Assert.AreEqual(HttpStatusCode.OK, result.StatusCode); + } + } + + [TestMethod] + public async Task ReplaceItemStream_WithBulk() + { + List createdDocuments = new List(); + // Create the items + List> tasks = new List>(); + for (int i = 0; i < 100; i++) + { + MyDocument createdDocument = CreateItem(i.ToString()); + createdDocuments.Add(createdDocument); + tasks.Add(ExecuteCreateStreamAsync(this.container, createdDocument)); + } + + await Task.WhenAll(tasks); + + List> replaceTasks = new List>(); + // Replace the items + foreach (MyDocument createdDocument in createdDocuments) + { + replaceTasks.Add(ExecuteReplaceStreamAsync(this.container, createdDocument)); + } + + await Task.WhenAll(replaceTasks); + for (int i = 0; i < 100; i++) + { + Task task = replaceTasks[i]; + ResponseMessage result = await task; + Assert.AreEqual(HttpStatusCode.OK, result.StatusCode); + } + } + + [TestMethod] + public async Task ReplaceItem_WithBulk() + { + List createdDocuments = new List(); + // Create the items + List>> tasks = new List>>(); + for (int i = 0; i < 100; i++) + { + MyDocument createdDocument = CreateItem(i.ToString()); + createdDocuments.Add(createdDocument); + tasks.Add(ExecuteCreateAsync(this.container, createdDocument)); + } + + await Task.WhenAll(tasks); + + List>> replaceTasks = new List>>(); + // Replace the items + foreach (MyDocument createdDocument in createdDocuments) + { + replaceTasks.Add(ExecuteReplaceAsync(this.container, createdDocument)); + } + + await Task.WhenAll(replaceTasks); + for (int i = 0; i < 100; i++) + { + Task> task = replaceTasks[i]; + ItemResponse result = await task; + Assert.AreEqual(HttpStatusCode.OK, result.StatusCode); + } + } + + private static Task> ExecuteCreateAsync(Container container, MyDocument item) + { + return container.CreateItemAsync(item, new PartitionKey(item.Status)); + } + + private static Task> ExecuteUpsertAsync(Container container, MyDocument item) + { + return container.UpsertItemAsync(item, new PartitionKey(item.Status)); + } + + private static Task> ExecuteReplaceAsync(Container container, MyDocument item) + { + return container.ReplaceItemAsync(item, item.id, new PartitionKey(item.Status)); + } + + private static Task> ExecuteDeleteAsync(Container container, MyDocument item) + { + return container.DeleteItemAsync(item.id, new PartitionKey(item.Status)); + } + + private static Task> ExecuteReadAsync(Container container, MyDocument item) + { + return container.ReadItemAsync(item.id, new PartitionKey(item.Status)); + } + + private static Task ExecuteCreateStreamAsync(Container container, MyDocument item) + { + return container.CreateItemStreamAsync(cosmosDefaultJsonSerializer.ToStream(item), new PartitionKey(item.Status)); + } + + private static Task ExecuteUpsertStreamAsync(Container container, MyDocument item) + { + return container.UpsertItemStreamAsync(cosmosDefaultJsonSerializer.ToStream(item), new PartitionKey(item.Status)); + } + + private static Task ExecuteReplaceStreamAsync(Container container, MyDocument item) + { + return container.ReplaceItemStreamAsync(cosmosDefaultJsonSerializer.ToStream(item), item.id, new PartitionKey(item.Status)); + } + + private static Task ExecuteDeleteStreamAsync(Container container, MyDocument item) + { + return container.DeleteItemStreamAsync(item.id, new PartitionKey(item.Status)); + } + + private static Task ExecuteReadStreamAsync(Container container, MyDocument item) + { + return container.ReadItemStreamAsync(item.id, new PartitionKey(item.Status)); + } + + private static MyDocument CreateItem(string id) => new MyDocument() { id = id, Status = id }; + + private class MyDocument + { + public string id { get; set; } + + public string Status { get; set; } + + public bool Updated { get; set; } + } + } +} diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/Microsoft.Azure.Cosmos.EmulatorTests.csproj b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/Microsoft.Azure.Cosmos.EmulatorTests.csproj index 9c947bc5cd..8411b546f4 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/Microsoft.Azure.Cosmos.EmulatorTests.csproj +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/Microsoft.Azure.Cosmos.EmulatorTests.csproj @@ -242,9 +242,6 @@ PreserveNewest - - - C:\Users\abpai\.nuget\packages\microsoft.azure.cosmos.direct.myget\3.0.0.33-preview\runtimes\any\lib\netstandard2.0\Microsoft.Azure.Cosmos.Core.dll diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Batch/BatchAsyncBatcherTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Batch/BatchAsyncBatcherTests.cs index 4244c84f61..aea5279738 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Batch/BatchAsyncBatcherTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Batch/BatchAsyncBatcherTests.cs @@ -228,10 +228,10 @@ public async Task ExceptionsFailOperationsAsync() batchAsyncBatcher.TryAdd(operation2); await batchAsyncBatcher.DispatchAsync(); - Assert.AreEqual(TaskStatus.Faulted, context1.Task.Status); - Assert.AreEqual(TaskStatus.Faulted, context2.Task.Status); - Assert.AreEqual(expectedException, context1.Task.Exception.InnerException); - Assert.AreEqual(expectedException, context2.Task.Exception.InnerException); + Assert.AreEqual(TaskStatus.Faulted, context1.OperationTask.Status); + Assert.AreEqual(TaskStatus.Faulted, context2.OperationTask.Status); + Assert.AreEqual(expectedException, context1.OperationTask.Exception.InnerException); + Assert.AreEqual(expectedException, context2.OperationTask.Exception.InnerException); } [TestMethod] @@ -253,8 +253,8 @@ public async Task DispatchProcessInOrderAsync() for (int i = 0; i < 10; i++) { ItemBatchOperationContext context = contexts[i]; - Assert.AreEqual(TaskStatus.RanToCompletion, context.Task.Status); - BatchOperationResult result = await context.Task; + Assert.AreEqual(TaskStatus.RanToCompletion, context.OperationTask.Status); + BatchOperationResult result = await context.OperationTask; Assert.AreEqual(i.ToString(), result.ETag); } } @@ -283,15 +283,15 @@ public async Task DispatchWithLessResponses() // Some tasks should not be resolved if(i == 0 || i == 9) { - Assert.IsTrue(operation.Context.Task.Status == TaskStatus.WaitingForActivation); + Assert.IsTrue(operation.Context.OperationTask.Status == TaskStatus.WaitingForActivation); } else { - Assert.IsTrue(operation.Context.Task.Status == TaskStatus.RanToCompletion); + Assert.IsTrue(operation.Context.OperationTask.Status == TaskStatus.RanToCompletion); } - if (operation.Context.Task.Status == TaskStatus.RanToCompletion) + if (operation.Context.OperationTask.Status == TaskStatus.RanToCompletion) { - BatchOperationResult result = await operation.Context.Task; + BatchOperationResult result = await operation.Context.OperationTask; Assert.AreEqual(i.ToString(), result.ETag); } else @@ -306,8 +306,8 @@ public async Task DispatchWithLessResponses() for (int i = 0; i < 10; i++) { ItemBatchOperation operation = operations[i]; - Assert.AreEqual(TaskStatus.RanToCompletion, operation.Context.Task.Status); - BatchOperationResult result = await operation.Context.Task; + Assert.AreEqual(TaskStatus.RanToCompletion, operation.Context.OperationTask.Status); + BatchOperationResult result = await operation.Context.OperationTask; Assert.AreEqual(i.ToString(), result.ETag); } } @@ -341,10 +341,16 @@ public async Task CannotAddToDispatchedBatch() [TestMethod] public async Task RetrierGetsCalledOnSplit() { + IDocumentClientRetryPolicy retryPolicy1 = new BulkPartitionKeyRangeGoneRetryPolicy( + new ResourceThrottleRetryPolicy(1)); + + IDocumentClientRetryPolicy retryPolicy2 = new BulkPartitionKeyRangeGoneRetryPolicy( + new ResourceThrottleRetryPolicy(1)); + ItemBatchOperation operation1 = this.CreateItemBatchOperation(); ItemBatchOperation operation2 = this.CreateItemBatchOperation(); - operation1.AttachContext(new ItemBatchOperationContext(string.Empty)); - operation2.AttachContext(new ItemBatchOperationContext(string.Empty)); + operation1.AttachContext(new ItemBatchOperationContext(string.Empty, retryPolicy1)); + operation2.AttachContext(new ItemBatchOperationContext(string.Empty, retryPolicy2)); Mock retryDelegate = new Mock(); diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Batch/BatchAsyncContainerExecutorTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Batch/BatchAsyncContainerExecutorTests.cs index ab6b342f88..8bb5ae30c0 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Batch/BatchAsyncContainerExecutorTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Batch/BatchAsyncContainerExecutorTests.cs @@ -27,6 +27,7 @@ public async Task RetryOnSplit() ItemBatchOperation itemBatchOperation = CreateItem("test"); Mock mockedContext = new Mock(); + mockedContext.Setup(c => c.ClientOptions).Returns(new CosmosClientOptions()); mockedContext .SetupSequence(c => c.ProcessResourceOperationStreamAsync( It.IsAny(), @@ -74,12 +75,67 @@ public async Task RetryOnSplit() Assert.AreEqual(HttpStatusCode.OK, result.StatusCode); } + [TestMethod] + public async Task RetryOn429() + { + ItemBatchOperation itemBatchOperation = CreateItem("test"); + + Mock mockedContext = new Mock(); + mockedContext.Setup(c => c.ClientOptions).Returns(new CosmosClientOptions()); + mockedContext + .SetupSequence(c => c.ProcessResourceOperationStreamAsync( + It.IsAny(), + It.IsAny(), + It.IsAny(), + It.IsAny(), + It.IsAny(), + It.IsAny(), + It.IsAny(), + It.IsAny>(), + It.IsAny())) + .Returns(this.Generate429ResponseAsync(itemBatchOperation)) + .Returns(this.GenerateOkResponseAsync(itemBatchOperation)); + + mockedContext.Setup(c => c.CosmosSerializer).Returns(new CosmosJsonDotNetSerializer()); + + Uri link = new Uri($"/dbs/db/colls/colls", UriKind.Relative); + Mock mockContainer = new Mock(); + mockContainer.Setup(x => x.LinkUri).Returns(link); + mockContainer.Setup(x => x.GetPartitionKeyDefinitionAsync(It.IsAny())).Returns(Task.FromResult(new PartitionKeyDefinition() { Paths = new Collection() { "/id" } })); + + CollectionRoutingMap routingMap = CollectionRoutingMap.TryCreateCompleteRoutingMap( + new[] + { + Tuple.Create(new PartitionKeyRange{ Id = "0", MinInclusive = "", MaxExclusive = "FF"}, (ServiceIdentity)null) + }, + string.Empty); + mockContainer.Setup(x => x.GetRoutingMapAsync(It.IsAny())).Returns(Task.FromResult(routingMap)); + BatchAsyncContainerExecutor executor = new BatchAsyncContainerExecutor(mockContainer.Object, mockedContext.Object, 20, Constants.MaxDirectModeBatchRequestBodySizeInBytes, 1); + BatchOperationResult result = await executor.AddAsync(itemBatchOperation); + + Mock.Get(mockContainer.Object) + .Verify(x => x.GetPartitionKeyDefinitionAsync(It.IsAny()), Times.Exactly(2)); + Mock.Get(mockedContext.Object) + .Verify(c => c.ProcessResourceOperationStreamAsync( + It.IsAny(), + It.IsAny(), + It.IsAny(), + It.IsAny(), + It.IsAny(), + It.IsAny(), + It.IsAny(), + It.IsAny>(), + It.IsAny()), Times.Exactly(2)); + Assert.AreEqual(HttpStatusCode.OK, result.StatusCode); + } + [TestMethod] public async Task DoesNotRecalculatePartitionKeyRangeOnNoSplits() { ItemBatchOperation itemBatchOperation = CreateItem("test"); Mock mockedContext = new Mock(); + mockedContext.Setup(c => c.ClientOptions).Returns(new CosmosClientOptions()); mockedContext .Setup(c => c.ProcessResourceOperationStreamAsync( It.IsAny(), @@ -154,6 +210,32 @@ private async Task GenerateSplitResponseAsync(ItemBatchOperatio return responseMessage; } + private async Task Generate429ResponseAsync(ItemBatchOperation itemBatchOperation) + { + List results = new List(); + ItemBatchOperation[] arrayOperations = new ItemBatchOperation[1]; + results.Add( + new BatchOperationResult((HttpStatusCode) StatusCodes.TooManyRequests) + { + ETag = itemBatchOperation.Id + }); + + arrayOperations[0] = itemBatchOperation; + + MemoryStream responseContent = await new BatchResponsePayloadWriter(results).GeneratePayloadAsync(); + + SinglePartitionKeyServerBatchRequest batchRequest = await SinglePartitionKeyServerBatchRequest.CreateAsync( + partitionKey: null, + operations: new ArraySegment(arrayOperations), + maxBodyLength: 100, + maxOperationCount: 1, + serializer: new CosmosJsonDotNetSerializer(), + cancellationToken: CancellationToken.None); + + ResponseMessage responseMessage = new ResponseMessage((HttpStatusCode)StatusCodes.TooManyRequests) { Content = responseContent }; + return responseMessage; + } + private async Task GenerateOkResponseAsync(ItemBatchOperation itemBatchOperation) { List results = new List(); diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Batch/BatchAsyncOperationContextTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Batch/BatchAsyncOperationContextTests.cs index d060ee7fe8..453cb159fa 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Batch/BatchAsyncOperationContextTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Batch/BatchAsyncOperationContextTests.cs @@ -6,6 +6,7 @@ namespace Microsoft.Azure.Cosmos.Tests { using System; using System.Net; + using System.Threading; using System.Threading.Tasks; using Microsoft.Azure.Documents; using Microsoft.VisualStudio.TestTools.UnitTesting; @@ -21,10 +22,10 @@ public void PartitionKeyRangeIdIsSetOnInitialization() ItemBatchOperationContext batchAsyncOperationContext = new ItemBatchOperationContext(expectedPkRangeId); operation.AttachContext(batchAsyncOperationContext); - Assert.IsNotNull(batchAsyncOperationContext.Task); + Assert.IsNotNull(batchAsyncOperationContext.OperationTask); Assert.AreEqual(batchAsyncOperationContext, operation.Context); Assert.AreEqual(expectedPkRangeId, batchAsyncOperationContext.PartitionKeyRangeId); - Assert.AreEqual(TaskStatus.WaitingForActivation, batchAsyncOperationContext.Task.Status); + Assert.AreEqual(TaskStatus.WaitingForActivation, batchAsyncOperationContext.OperationTask.Status); } [TestMethod] @@ -34,9 +35,9 @@ public void TaskIsCreatedOnInitialization() ItemBatchOperationContext batchAsyncOperationContext = new ItemBatchOperationContext(string.Empty); operation.AttachContext(batchAsyncOperationContext); - Assert.IsNotNull(batchAsyncOperationContext.Task); + Assert.IsNotNull(batchAsyncOperationContext.OperationTask); Assert.AreEqual(batchAsyncOperationContext, operation.Context); - Assert.AreEqual(TaskStatus.WaitingForActivation, batchAsyncOperationContext.Task.Status); + Assert.AreEqual(TaskStatus.WaitingForActivation, batchAsyncOperationContext.OperationTask.Status); } [TestMethod] @@ -50,8 +51,8 @@ public async Task TaskResultIsSetOnCompleteAsync() batchAsyncOperationContext.Complete(null, expected); - Assert.AreEqual(expected, await batchAsyncOperationContext.Task); - Assert.AreEqual(TaskStatus.RanToCompletion, batchAsyncOperationContext.Task.Status); + Assert.AreEqual(expected, await batchAsyncOperationContext.OperationTask); + Assert.AreEqual(TaskStatus.RanToCompletion, batchAsyncOperationContext.OperationTask.Status); } [TestMethod] @@ -64,9 +65,9 @@ public async Task ExceptionIsSetOnFailAsync() batchAsyncOperationContext.Fail(null, failure); - Exception capturedException = await Assert.ThrowsExceptionAsync(() => batchAsyncOperationContext.Task); + Exception capturedException = await Assert.ThrowsExceptionAsync(() => batchAsyncOperationContext.OperationTask); Assert.AreEqual(failure, capturedException); - Assert.AreEqual(TaskStatus.Faulted, batchAsyncOperationContext.Task.Status); + Assert.AreEqual(TaskStatus.Faulted, batchAsyncOperationContext.OperationTask.Status); } [TestMethod] @@ -76,5 +77,51 @@ public void CannotAttachMoreThanOnce() operation.AttachContext(new ItemBatchOperationContext(string.Empty)); Assert.ThrowsException(() => operation.AttachContext(new ItemBatchOperationContext(string.Empty))); } + + [TestMethod] + public async Task ShouldRetry_NoPolicy() + { + BatchOperationResult result = new BatchOperationResult(HttpStatusCode.OK); + ItemBatchOperation operation = new ItemBatchOperation(OperationType.Create, 0); + operation.AttachContext(new ItemBatchOperationContext(string.Empty)); + ShouldRetryResult shouldRetryResult = await operation.Context.ShouldRetryAsync(result, default(CancellationToken)); + Assert.IsFalse(shouldRetryResult.ShouldRetry); + } + + [TestMethod] + public async Task ShouldRetry_WithPolicy_OnSuccess() + { + IDocumentClientRetryPolicy retryPolicy = new BulkPartitionKeyRangeGoneRetryPolicy( + new ResourceThrottleRetryPolicy(1)); + BatchOperationResult result = new BatchOperationResult(HttpStatusCode.OK); + ItemBatchOperation operation = new ItemBatchOperation(OperationType.Create, 0); + operation.AttachContext(new ItemBatchOperationContext(string.Empty, retryPolicy)); + ShouldRetryResult shouldRetryResult = await operation.Context.ShouldRetryAsync(result, default(CancellationToken)); + Assert.IsFalse(shouldRetryResult.ShouldRetry); + } + + [TestMethod] + public async Task ShouldRetry_WithPolicy_On429() + { + IDocumentClientRetryPolicy retryPolicy = new BulkPartitionKeyRangeGoneRetryPolicy( + new ResourceThrottleRetryPolicy(1)); + BatchOperationResult result = new BatchOperationResult((HttpStatusCode)StatusCodes.TooManyRequests); + ItemBatchOperation operation = new ItemBatchOperation(OperationType.Create, 0); + operation.AttachContext(new ItemBatchOperationContext(string.Empty, retryPolicy)); + ShouldRetryResult shouldRetryResult = await operation.Context.ShouldRetryAsync(result, default(CancellationToken)); + Assert.IsTrue(shouldRetryResult.ShouldRetry); + } + + [TestMethod] + public async Task ShouldRetry_WithPolicy_OnSplit() + { + IDocumentClientRetryPolicy retryPolicy = new BulkPartitionKeyRangeGoneRetryPolicy( + new ResourceThrottleRetryPolicy(1)); + BatchOperationResult result = new BatchOperationResult(HttpStatusCode.Gone) { SubStatusCode = SubStatusCodes.PartitionKeyRangeGone }; + ItemBatchOperation operation = new ItemBatchOperation(OperationType.Create, 0); + operation.AttachContext(new ItemBatchOperationContext(string.Empty, retryPolicy)); + ShouldRetryResult shouldRetryResult = await operation.Context.ShouldRetryAsync(result, default(CancellationToken)); + Assert.IsTrue(shouldRetryResult.ShouldRetry); + } } } diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Batch/BatchAsyncStreamerTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Batch/BatchAsyncStreamerTests.cs index f0a4b47b29..780d350f27 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Batch/BatchAsyncStreamerTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Batch/BatchAsyncStreamerTests.cs @@ -114,7 +114,7 @@ public async Task ExceptionsOnBatchBubbleUpAsync() BatchAsyncStreamer batchAsyncStreamer = new BatchAsyncStreamer(2, MaxBatchByteSize, DispatchTimerInSeconds, this.TimerPool, new CosmosJsonDotNetSerializer(), this.ExecutorWithFailure, this.Retrier); ItemBatchOperationContext context = AttachContext(this.ItemBatchOperation); batchAsyncStreamer.Add(this.ItemBatchOperation); - Exception capturedException = await Assert.ThrowsExceptionAsync(() => context.Task); + Exception capturedException = await Assert.ThrowsExceptionAsync(() => context.OperationTask); Assert.AreEqual(expectedException, capturedException); } @@ -125,7 +125,7 @@ public async Task TimerDispatchesAsync() BatchAsyncStreamer batchAsyncStreamer = new BatchAsyncStreamer(2, MaxBatchByteSize, DispatchTimerInSeconds, this.TimerPool, new CosmosJsonDotNetSerializer(), this.Executor, this.Retrier); ItemBatchOperationContext context = AttachContext(this.ItemBatchOperation); batchAsyncStreamer.Add(this.ItemBatchOperation); - BatchOperationResult result = await context.Task; + BatchOperationResult result = await context.OperationTask; Assert.AreEqual(this.ItemBatchOperation.Id, result.ETag); } @@ -141,7 +141,7 @@ public async Task DispatchesAsync() ItemBatchOperation operation = new ItemBatchOperation(OperationType.Create, i, i.ToString()); ItemBatchOperationContext context = AttachContext(operation); batchAsyncStreamer.Add(operation); - contexts.Add(context.Task); + contexts.Add(context.OperationTask); } await Task.WhenAll(contexts); diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Batch/BatchUnitTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Batch/BatchUnitTests.cs index 0af1547224..ef53c8dbe6 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Batch/BatchUnitTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Batch/BatchUnitTests.cs @@ -397,6 +397,39 @@ public void BatchIsWriteOperation() Assert.IsTrue(OperationType.Batch.IsWriteOperation()); } + [TestMethod] + public void FromItemRequestOptions_FromNull() + { + Assert.IsNull(BatchItemRequestOptions.FromItemRequestOptions(null)); + } + + [TestMethod] + public void FromItemRequestOptions_WithCustomValues() + { + ItemRequestOptions itemRequestOptions = new ItemRequestOptions(); + itemRequestOptions.IfMatchEtag = Guid.NewGuid().ToString(); + itemRequestOptions.IfNoneMatchEtag = Guid.NewGuid().ToString(); + itemRequestOptions.IndexingDirective = Cosmos.IndexingDirective.Exclude; + itemRequestOptions.Properties = new Dictionary() { { "test", "test" } }; + + BatchItemRequestOptions batchItemRequestOptions = BatchItemRequestOptions.FromItemRequestOptions(itemRequestOptions); + Assert.AreEqual(itemRequestOptions.IfMatchEtag, batchItemRequestOptions.IfMatchEtag); + Assert.AreEqual(itemRequestOptions.IfNoneMatchEtag, batchItemRequestOptions.IfNoneMatchEtag); + Assert.AreEqual(itemRequestOptions.IndexingDirective, batchItemRequestOptions.IndexingDirective); + Assert.AreEqual(itemRequestOptions.Properties, batchItemRequestOptions.Properties); + } + + [TestMethod] + public void FromItemRequestOptions_WithDefaultValues() + { + ItemRequestOptions itemRequestOptions = new ItemRequestOptions(); + BatchItemRequestOptions batchItemRequestOptions = BatchItemRequestOptions.FromItemRequestOptions(itemRequestOptions); + Assert.AreEqual(itemRequestOptions.IfMatchEtag, batchItemRequestOptions.IfMatchEtag); + Assert.AreEqual(itemRequestOptions.IfNoneMatchEtag, batchItemRequestOptions.IfNoneMatchEtag); + Assert.AreEqual(itemRequestOptions.IndexingDirective, batchItemRequestOptions.IndexingDirective); + Assert.AreEqual(itemRequestOptions.Properties, batchItemRequestOptions.Properties); + } + private static async Task GetBatchResponseMessageAsync(List operations, int rateLimitedOperationCount = 0) { BatchOperationResult okOperationResult = new BatchOperationResult(HttpStatusCode.OK); diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Batch/PartitionKeyRangeBatchExecutionResultTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Batch/PartitionKeyRangeBatchExecutionResultTests.cs index ca67af38d2..da594dde21 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Batch/PartitionKeyRangeBatchExecutionResultTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Batch/PartitionKeyRangeBatchExecutionResultTests.cs @@ -67,6 +67,25 @@ public async Task StatusCodesAreSetThroughResponseAsync() Assert.AreEqual(HttpStatusCode.OK, response.StatusCode); } + [TestMethod] + public void ToResponseMessage_MapsProperties() + { + BatchOperationResult result = new BatchOperationResult(HttpStatusCode.OK) + { + ResourceStream = new MemoryStream(new byte[] { 0x41, 0x42 }, index: 0, count: 2, writable: false, publiclyVisible: true), + ETag = "1234", + SubStatusCode = SubStatusCodes.CompletingSplit, + RetryAfter = TimeSpan.FromSeconds(10) + }; + + ResponseMessage response = result.ToResponseMessage(); + + Assert.AreEqual(result.ResourceStream, response.Content); + Assert.AreEqual(result.SubStatusCode, response.Headers.SubStatusCode); + Assert.AreEqual(result.RetryAfter, response.Headers.RetryAfter); + Assert.AreEqual(result.StatusCode, response.StatusCode); + } + private async Task ConstainsSplitIsTrueInternal(HttpStatusCode statusCode, SubStatusCodes subStatusCode) { List results = new List(); diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/BulkPartitionKeyRangeGoneRetryPolicyTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/BulkPartitionKeyRangeGoneRetryPolicyTests.cs new file mode 100644 index 0000000000..2e220f1ad0 --- /dev/null +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/BulkPartitionKeyRangeGoneRetryPolicyTests.cs @@ -0,0 +1,49 @@ +//------------------------------------------------------------ +// Copyright (c) Microsoft Corporation. All rights reserved. +//------------------------------------------------------------ + +namespace Microsoft.Azure.Cosmos.Tests +{ + using System.Net; + using System.Threading; + using System.Threading.Tasks; + using Microsoft.Azure.Documents; + using Microsoft.VisualStudio.TestTools.UnitTesting; + + [TestClass] + public class BulkPartitionKeyRangeGoneRetryPolicyTests + { + [TestMethod] + public async Task NotRetryOnSuccess() + { + IDocumentClientRetryPolicy retryPolicy = new BulkPartitionKeyRangeGoneRetryPolicy( + new ResourceThrottleRetryPolicy(1)); + + BatchOperationResult result = new BatchOperationResult(HttpStatusCode.OK); + ShouldRetryResult shouldRetryResult = await retryPolicy.ShouldRetryAsync(result.ToResponseMessage(), default(CancellationToken)); + Assert.IsFalse(shouldRetryResult.ShouldRetry); + } + + [TestMethod] + public async Task RetriesOn429() + { + IDocumentClientRetryPolicy retryPolicy = new BulkPartitionKeyRangeGoneRetryPolicy( + new ResourceThrottleRetryPolicy(1)); + + BatchOperationResult result = new BatchOperationResult((HttpStatusCode)StatusCodes.TooManyRequests); + ShouldRetryResult shouldRetryResult = await retryPolicy.ShouldRetryAsync(result.ToResponseMessage(), default(CancellationToken)); + Assert.IsTrue(shouldRetryResult.ShouldRetry); + } + + [TestMethod] + public async Task RetriesOnSplits() + { + IDocumentClientRetryPolicy retryPolicy = new BulkPartitionKeyRangeGoneRetryPolicy( + new ResourceThrottleRetryPolicy(1)); + + BatchOperationResult result = new BatchOperationResult(HttpStatusCode.Gone) { SubStatusCode = SubStatusCodes.PartitionKeyRangeGone }; + ShouldRetryResult shouldRetryResult = await retryPolicy.ShouldRetryAsync(result.ToResponseMessage(), default(CancellationToken)); + Assert.IsTrue(shouldRetryResult.ShouldRetry); + } + } +} diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/CosmosClientOptionsUnitTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/CosmosClientOptionsUnitTests.cs index 309b04b2ba..cde7ba84a3 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/CosmosClientOptionsUnitTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/CosmosClientOptionsUnitTests.cs @@ -63,6 +63,7 @@ public void VerifyCosmosConfigurationPropertiesGetUpdated() Assert.AreNotEqual(requestTimeout, clientOptions.RequestTimeout); Assert.AreNotEqual(userAgentSuffix, clientOptions.ApplicationName); Assert.AreNotEqual(apiType, clientOptions.ApiType); + Assert.IsFalse(clientOptions.AllowBulkExecution); Assert.AreEqual(0, clientOptions.CustomHandlers.Count); Assert.IsNull(clientOptions.SerializerOptions); Assert.IsNull(clientOptions.Serializer); @@ -86,6 +87,7 @@ public void VerifyCosmosConfigurationPropertiesGetUpdated() .AddCustomHandlers(preProcessHandler) .WithApiType(apiType) .WithThrottlingRetryOptions(maxRetryWaitTime, maxRetryAttemptsOnThrottledRequests) + .WithBulkexecution(true) .WithSerializerOptions(cosmosSerializerOptions); cosmosClient = cosmosClientBuilder.Build(new MockDocumentClient()); @@ -105,6 +107,7 @@ public void VerifyCosmosConfigurationPropertiesGetUpdated() Assert.AreEqual(cosmosSerializerOptions.PropertyNamingPolicy, clientOptions.SerializerOptions.PropertyNamingPolicy); Assert.AreEqual(cosmosSerializerOptions.Indented, clientOptions.SerializerOptions.Indented); Assert.IsTrue(object.ReferenceEquals(webProxy, clientOptions.WebProxy)); + Assert.IsTrue(clientOptions.AllowBulkExecution); //Verify GetConnectionPolicy returns the correct values policy = clientOptions.GetConnectionPolicy(); diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/CosmosClientResourceUnitTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/CosmosClientResourceUnitTests.cs index 5e88939175..97ba44a1dd 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/CosmosClientResourceUnitTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/CosmosClientResourceUnitTests.cs @@ -21,7 +21,7 @@ public void ValidateUriGenerationForResources() CosmosClientContext context = new ClientContextCore( client: null, - clientOptions: null, + clientOptions: new CosmosClientOptions(), userJsonSerializer: null, defaultJsonSerializer: null, sqlQuerySpecSerializer: null, diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/CosmosConflictTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/CosmosConflictTests.cs index 1b90c3dd78..0e3071055f 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/CosmosConflictTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/CosmosConflictTests.cs @@ -122,7 +122,7 @@ private static CosmosClientContext GetMockedClientContext( return new ClientContextCore( client: client, - clientOptions: null, + clientOptions: new CosmosClientOptions(), userJsonSerializer: MockCosmosUtil.Serializer, defaultJsonSerializer: MockCosmosUtil.Serializer, sqlQuerySpecSerializer: MockCosmosUtil.Serializer, diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/CosmosItemUnitTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/CosmosItemUnitTests.cs index ce7e5da712..a50063df5c 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/CosmosItemUnitTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/CosmosItemUnitTests.cs @@ -11,6 +11,7 @@ namespace Microsoft.Azure.Cosmos.Tests using System.Threading; using System.Threading.Tasks; using Microsoft.Azure.Cosmos.Client.Core.Tests; + using Microsoft.Azure.Cosmos.Query; using Microsoft.Azure.Documents; using Microsoft.VisualStudio.TestTools.UnitTesting; using Moq; @@ -164,6 +165,340 @@ await Assert.ThrowsExceptionAsync(async () => { Assert.AreEqual(Cosmos.PartitionKey.Null, pkValue); } + [TestMethod] + public async Task AllowBatchingRequestsSendsToExecutor_CreateStream() + { + ClientContextCore clientContextCore = new ClientContextCore( + MockCosmosUtil.CreateMockCosmosClient(), + new CosmosClientOptions() { AllowBulkExecution = true }, + new CosmosJsonDotNetSerializer(), + new CosmosJsonDotNetSerializer(), + new CosmosJsonDotNetSerializer(), + null, + null, + new MockDocumentClient() + ); + + DatabaseCore db = new DatabaseCore(clientContextCore, "test"); + ExecutorContainerCore container = new ExecutorContainerCore(clientContextCore, db, "test"); + + dynamic testItem = new + { + id = Guid.NewGuid().ToString(), + pk = "FF627B77-568E-4541-A47E-041EAC10E46F", + }; + + using (Stream itemStream = MockCosmosUtil.Serializer.ToStream(testItem)) + { + ItemRequestOptions itemRequestOptions = new ItemRequestOptions(); + Cosmos.PartitionKey partitionKey = new Cosmos.PartitionKey(testItem.pk); + using (ResponseMessage streamResponse = await container.CreateItemStreamAsync( + partitionKey: partitionKey, + streamPayload: itemStream)) + { + container.MockedExecutor.Verify(c => c.AddAsync(It.IsAny(), It.IsAny(), It.IsAny()), Times.Once); + } + } + } + + [TestMethod] + public async Task AllowBatchingRequestsSendsToExecutor_UpsertStream() + { + ClientContextCore clientContextCore = new ClientContextCore( + MockCosmosUtil.CreateMockCosmosClient(), + new CosmosClientOptions() { AllowBulkExecution = true }, + new CosmosJsonDotNetSerializer(), + new CosmosJsonDotNetSerializer(), + new CosmosJsonDotNetSerializer(), + null, + null, + new MockDocumentClient() + ); + + DatabaseCore db = new DatabaseCore(clientContextCore, "test"); + ExecutorContainerCore container = new ExecutorContainerCore(clientContextCore, db, "test"); + + dynamic testItem = new + { + id = Guid.NewGuid().ToString(), + pk = "FF627B77-568E-4541-A47E-041EAC10E46F", + }; + + using (Stream itemStream = MockCosmosUtil.Serializer.ToStream(testItem)) + { + ItemRequestOptions itemRequestOptions = new ItemRequestOptions(); + Cosmos.PartitionKey partitionKey = new Cosmos.PartitionKey(testItem.pk); + using (ResponseMessage streamResponse = await container.UpsertItemStreamAsync( + partitionKey: partitionKey, + streamPayload: itemStream)) + { + container.MockedExecutor.Verify(c => c.AddAsync(It.IsAny(), It.IsAny(), It.IsAny()), Times.Once); + } + } + } + + [TestMethod] + public async Task AllowBatchingRequestsSendsToExecutor_ReplaceStream() + { + ClientContextCore clientContextCore = new ClientContextCore( + MockCosmosUtil.CreateMockCosmosClient(), + new CosmosClientOptions() { AllowBulkExecution = true }, + new CosmosJsonDotNetSerializer(), + new CosmosJsonDotNetSerializer(), + new CosmosJsonDotNetSerializer(), + null, + null, + new MockDocumentClient() + ); + + DatabaseCore db = new DatabaseCore(clientContextCore, "test"); + ExecutorContainerCore container = new ExecutorContainerCore(clientContextCore, db, "test"); + + dynamic testItem = new + { + id = Guid.NewGuid().ToString(), + pk = "FF627B77-568E-4541-A47E-041EAC10E46F", + }; + + using (Stream itemStream = MockCosmosUtil.Serializer.ToStream(testItem)) + { + ItemRequestOptions itemRequestOptions = new ItemRequestOptions(); + Cosmos.PartitionKey partitionKey = new Cosmos.PartitionKey(testItem.pk); + using (ResponseMessage streamResponse = await container.ReplaceItemStreamAsync( + partitionKey: partitionKey, + id: testItem.id, + streamPayload: itemStream)) + { + container.MockedExecutor.Verify(c => c.AddAsync(It.IsAny(), It.IsAny(), It.IsAny()), Times.Once); + } + } + } + + [TestMethod] + public async Task AllowBatchingRequestsSendsToExecutor_ReadStream() + { + ClientContextCore clientContextCore = new ClientContextCore( + MockCosmosUtil.CreateMockCosmosClient(), + new CosmosClientOptions() { AllowBulkExecution = true }, + new CosmosJsonDotNetSerializer(), + new CosmosJsonDotNetSerializer(), + new CosmosJsonDotNetSerializer(), + null, + null, + new MockDocumentClient() + ); + + DatabaseCore db = new DatabaseCore(clientContextCore, "test"); + ExecutorContainerCore container = new ExecutorContainerCore(clientContextCore, db, "test"); + + dynamic testItem = new + { + id = Guid.NewGuid().ToString(), + pk = "FF627B77-568E-4541-A47E-041EAC10E46F", + }; + + using (Stream itemStream = MockCosmosUtil.Serializer.ToStream(testItem)) + { + ItemRequestOptions itemRequestOptions = new ItemRequestOptions(); + Cosmos.PartitionKey partitionKey = new Cosmos.PartitionKey(testItem.pk); + using (ResponseMessage streamResponse = await container.ReadItemStreamAsync( + partitionKey: partitionKey, + id: testItem.id)) + { + container.MockedExecutor.Verify(c => c.AddAsync(It.IsAny(), It.IsAny(), It.IsAny()), Times.Once); + } + } + } + + [TestMethod] + public async Task AllowBatchingRequestsSendsToExecutor_DeleteStream() + { + ClientContextCore clientContextCore = new ClientContextCore( + MockCosmosUtil.CreateMockCosmosClient(), + new CosmosClientOptions() { AllowBulkExecution = true }, + new CosmosJsonDotNetSerializer(), + new CosmosJsonDotNetSerializer(), + new CosmosJsonDotNetSerializer(), + null, + null, + new MockDocumentClient() + ); + + DatabaseCore db = new DatabaseCore(clientContextCore, "test"); + ExecutorContainerCore container = new ExecutorContainerCore(clientContextCore, db, "test"); + + dynamic testItem = new + { + id = Guid.NewGuid().ToString(), + pk = "FF627B77-568E-4541-A47E-041EAC10E46F", + }; + + ItemRequestOptions itemRequestOptions = new ItemRequestOptions(); + Cosmos.PartitionKey partitionKey = new Cosmos.PartitionKey(testItem.pk); + using (ResponseMessage streamResponse = await container.DeleteItemStreamAsync( + partitionKey: partitionKey, + id: testItem.id)) + { + container.MockedExecutor.Verify(c => c.AddAsync(It.IsAny(), It.IsAny(), It.IsAny()), Times.Once); + } + } + + [TestMethod] + public async Task AllowBatchingRequestsSendsToExecutor_Create() + { + CosmosClient cosmosClient = MockCosmosUtil.CreateMockCosmosClient(); + ClientContextCore clientContextCore = new ClientContextCore( + cosmosClient, + new CosmosClientOptions() { AllowBulkExecution = true }, + new CosmosJsonDotNetSerializer(), + new CosmosJsonDotNetSerializer(), + null, + cosmosClient.ResponseFactory, + null, + new MockDocumentClient() + ); + + DatabaseCore db = new DatabaseCore(clientContextCore, "test"); + ExecutorContainerCore container = new ExecutorContainerCore(clientContextCore, db, "test"); + + dynamic testItem = new + { + id = Guid.NewGuid().ToString(), + pk = "FF627B77-568E-4541-A47E-041EAC10E46F", + }; + + Cosmos.PartitionKey partitionKey = new Cosmos.PartitionKey(testItem.pk); + ItemResponse response = await container.CreateItemAsync( + testItem, + partitionKey: partitionKey); + container.MockedExecutor.Verify(c => c.AddAsync(It.IsAny(), It.IsAny(), It.IsAny()), Times.Once); + } + + [TestMethod] + public async Task AllowBatchingRequestsSendsToExecutor_Upsert() + { + CosmosClient cosmosClient = MockCosmosUtil.CreateMockCosmosClient(); + ClientContextCore clientContextCore = new ClientContextCore( + cosmosClient, + new CosmosClientOptions() { AllowBulkExecution = true }, + new CosmosJsonDotNetSerializer(), + new CosmosJsonDotNetSerializer(), + null, + cosmosClient.ResponseFactory, + null, + new MockDocumentClient() + ); + + DatabaseCore db = new DatabaseCore(clientContextCore, "test"); + ExecutorContainerCore container = new ExecutorContainerCore(clientContextCore, db, "test"); + + dynamic testItem = new + { + id = Guid.NewGuid().ToString(), + pk = "FF627B77-568E-4541-A47E-041EAC10E46F", + }; + + Cosmos.PartitionKey partitionKey = new Cosmos.PartitionKey(testItem.pk); + ItemResponse response = await container.UpsertItemAsync( + testItem, + partitionKey: partitionKey); + container.MockedExecutor.Verify(c => c.AddAsync(It.IsAny(), It.IsAny(), It.IsAny()), Times.Once); + } + + [TestMethod] + public async Task AllowBatchingRequestsSendsToExecutor_Replace() + { + CosmosClient cosmosClient = MockCosmosUtil.CreateMockCosmosClient(); + ClientContextCore clientContextCore = new ClientContextCore( + cosmosClient, + new CosmosClientOptions() { AllowBulkExecution = true }, + new CosmosJsonDotNetSerializer(), + new CosmosJsonDotNetSerializer(), + null, + cosmosClient.ResponseFactory, + null, + new MockDocumentClient() + ); + + DatabaseCore db = new DatabaseCore(clientContextCore, "test"); + ExecutorContainerCore container = new ExecutorContainerCore(clientContextCore, db, "test"); + + dynamic testItem = new + { + id = Guid.NewGuid().ToString(), + pk = "FF627B77-568E-4541-A47E-041EAC10E46F", + }; + + Cosmos.PartitionKey partitionKey = new Cosmos.PartitionKey(testItem.pk); + ItemResponse response = await container.ReplaceItemAsync( + testItem, + testItem.id); + container.MockedExecutor.Verify(c => c.AddAsync(It.IsAny(), It.IsAny(), It.IsAny()), Times.Once); + } + + [TestMethod] + public async Task AllowBatchingRequestsSendsToExecutor_Read() + { + CosmosClient cosmosClient = MockCosmosUtil.CreateMockCosmosClient(); + ClientContextCore clientContextCore = new ClientContextCore( + cosmosClient, + new CosmosClientOptions() { AllowBulkExecution = true }, + new CosmosJsonDotNetSerializer(), + new CosmosJsonDotNetSerializer(), + null, + cosmosClient.ResponseFactory, + null, + new MockDocumentClient() + ); + + DatabaseCore db = new DatabaseCore(clientContextCore, "test"); + ExecutorContainerCore container = new ExecutorContainerCore(clientContextCore, db, "test"); + + dynamic testItem = new + { + id = Guid.NewGuid().ToString(), + pk = "FF627B77-568E-4541-A47E-041EAC10E46F", + }; + + Cosmos.PartitionKey partitionKey = new Cosmos.PartitionKey(testItem.pk); + ItemResponse response = await container.ReadItemAsync( + id: testItem.id, + partitionKey: partitionKey); + container.MockedExecutor.Verify(c => c.AddAsync(It.IsAny(), It.IsAny(), It.IsAny()), Times.Once); + } + + [TestMethod] + public async Task AllowBatchingRequestsSendsToExecutor_Delete() + { + CosmosClient cosmosClient = MockCosmosUtil.CreateMockCosmosClient(); + ClientContextCore clientContextCore = new ClientContextCore( + cosmosClient, + new CosmosClientOptions() { AllowBulkExecution = true }, + new CosmosJsonDotNetSerializer(), + new CosmosJsonDotNetSerializer(), + null, + cosmosClient.ResponseFactory, + null, + new MockDocumentClient() + ); + + DatabaseCore db = new DatabaseCore(clientContextCore, "test"); + ExecutorContainerCore container = new ExecutorContainerCore(clientContextCore, db, "test"); + + dynamic testItem = new + { + id = Guid.NewGuid().ToString(), + pk = "FF627B77-568E-4541-A47E-041EAC10E46F", + }; + + Cosmos.PartitionKey partitionKey = new Cosmos.PartitionKey(testItem.pk); + ItemResponse response = await container.DeleteItemAsync( + partitionKey: partitionKey, + id: testItem.id); + + container.MockedExecutor.Verify(c => c.AddAsync(It.IsAny(), It.IsAny(), It.IsAny()), Times.Once); + } + [TestMethod] public async Task TestNestedPartitionKeyValueFromStreamAsync() { @@ -400,5 +735,38 @@ private async Task VerifyItemOperations( Assert.AreEqual(10, testHandlerHitCount, "A stream operation did not make it to the handler"); } + + private class ExecutorContainerCore : ContainerCore + { + public readonly Mock MockedExecutor = new Mock(); + public ExecutorContainerCore( + CosmosClientContext clientContext, + DatabaseCore database, + string containerId) : base (clientContext, database, containerId) + { + this.MockedExecutor + .Setup(e => e.AddAsync(It.IsAny(), It.IsAny(), It.IsAny())) + .ReturnsAsync(new BatchOperationResult(HttpStatusCode.OK)); + } + + internal override BatchAsyncContainerExecutor InitializeBatchExecutorForContainer() => this.MockedExecutor.Object; + } + + private class ExecutorWithThrottlingContainerCore : ContainerCore + { + public readonly Mock MockedExecutor = new Mock(); + public ExecutorWithThrottlingContainerCore( + CosmosClientContext clientContext, + DatabaseCore database, + string containerId) : base(clientContext, database, containerId) + { + this.MockedExecutor + .SetupSequence(e => e.AddAsync(It.IsAny(), It.IsAny(), It.IsAny())) + .ReturnsAsync(new BatchOperationResult((HttpStatusCode) StatusCodes.TooManyRequests)) + .ReturnsAsync(new BatchOperationResult(HttpStatusCode.OK)); + } + + internal override BatchAsyncContainerExecutor InitializeBatchExecutorForContainer() => this.MockedExecutor.Object; + } } }