Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Preview of Bulk Stream #741

Merged
merged 44 commits into from
Sep 5, 2019
Merged
Show file tree
Hide file tree
Changes from 43 commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
b5b6dc3
creating executor
ealsur Aug 26, 2019
47b8497
Adding public API
ealsur Aug 26, 2019
0bd197d
Updating contract
ealsur Aug 26, 2019
f3561f3
Adding ToResponseMessage
ealsur Aug 26, 2019
bdc7327
Wiring to executor
ealsur Aug 26, 2019
5ecdd99
Mockable executor
ealsur Aug 26, 2019
a56e84c
Itemrequestoptions to batchitemrequestoptions
ealsur Aug 26, 2019
dbf1b67
cleanup
ealsur Aug 26, 2019
a930e59
Tests
ealsur Aug 26, 2019
ced7a82
Wiring to executor
ealsur Aug 26, 2019
ec1e6d8
Unit tests on executor
ealsur Aug 26, 2019
6cbb3b4
Emulator tests on container
ealsur Aug 26, 2019
feb1f08
executor retry handler
ealsur Aug 26, 2019
8839833
Retry tests
ealsur Aug 26, 2019
4af4183
Handler tests
ealsur Aug 26, 2019
39bc6d2
under preview
ealsur Aug 27, 2019
9d2af01
else
ealsur Aug 27, 2019
bf6f780
Upsert support
ealsur Aug 27, 2019
2ba3ed1
Using ResourceThrottleRetryPolicy
ealsur Aug 27, 2019
771c094
Generating new context on retry
ealsur Aug 27, 2019
9380021
Correctly obtaining retrypolicy
ealsur Aug 27, 2019
c40af5e
Fixing unrelated test
ealsur Aug 27, 2019
19a20ab
Removing unneeded constants
ealsur Aug 27, 2019
5c0fb63
Fixing conflict tests
ealsur Aug 27, 2019
c9ae84f
Replace and Delete support
ealsur Aug 27, 2019
9daee99
Adding Read support
ealsur Aug 28, 2019
af0a340
private instead of internal
ealsur Aug 28, 2019
3e0cbdc
Fix for Read and DeleteItemAsync
ealsur Aug 28, 2019
ad82745
Typed API support
ealsur Aug 29, 2019
0f39e05
Rename property
ealsur Aug 30, 2019
03a3c55
Rename
ealsur Sep 3, 2019
faf6ae8
Merge branch 'master' into users/ealsur/bulkstreamconfig
ealsur Sep 4, 2019
8bfcf4e
Refactor on ClientContext
ealsur Sep 4, 2019
41cae1d
Refactoring through policies and batcher
ealsur Sep 4, 2019
fda4292
ItemId is required
ealsur Sep 4, 2019
8fd1d1f
Undo unnecessary changes
ealsur Sep 4, 2019
f4724c0
merge with master
ealsur Sep 4, 2019
2252f6f
Rename
ealsur Sep 4, 2019
6870378
Undoing comment
ealsur Sep 5, 2019
a3cf72c
Description
ealsur Sep 5, 2019
3ec10c6
Renaming variable
ealsur Sep 5, 2019
e0c3d4d
merge with master
ealsur Sep 5, 2019
afec0ca
Fixing test
ealsur Sep 5, 2019
6282712
Rename
ealsur Sep 5, 2019
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 10 additions & 11 deletions Microsoft.Azure.Cosmos/src/Batch/BatchAsyncBatcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -152,22 +152,21 @@ public virtual bool TryAdd(ItemBatchOperation operation)
try
{
PartitionKeyRangeBatchExecutionResult result = await this.executor(serverRequest, cancellationToken);
ealsur marked this conversation as resolved.
Show resolved Hide resolved

if (result.IsSplit())
{
foreach (ItemBatchOperation operationToRetry in result.Operations)
{
await this.retrier(operationToRetry, cancellationToken);
}

return;
}

using (PartitionKeyRangeBatchResponse batchResponse = new PartitionKeyRangeBatchResponse(serverRequest.Operations.Count, result.ServerResponse, this.cosmosSerializer))
{
foreach (ItemBatchOperation itemBatchOperation in batchResponse.Operations)
{
BatchOperationResult response = batchResponse[itemBatchOperation.OperationIndex];
if (!response.IsSuccessStatusCode)
{
Documents.ShouldRetryResult shouldRetry = await itemBatchOperation.Context.ShouldRetryAsync(response, cancellationToken);
if (shouldRetry.ShouldRetry)
{
await this.retrier(itemBatchOperation, cancellationToken);
continue;
}
}

itemBatchOperation.Context.Complete(this, response);
ealsur marked this conversation as resolved.
Show resolved Hide resolved
}
}
Expand Down
29 changes: 23 additions & 6 deletions Microsoft.Azure.Cosmos/src/Batch/BatchAsyncContainerExecutor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ namespace Microsoft.Azure.Cosmos
/// <seealso cref="BatchAsyncStreamer"/>
internal class BatchAsyncContainerExecutor : IDisposable
{
private const int DefaultDispatchTimer = 10;
private const int DefaultDispatchTimerInSeconds = 1;
private const int MinimumDispatchTimerInSeconds = 1;

private readonly ContainerCore cosmosContainer;
Expand All @@ -36,13 +36,21 @@ internal class BatchAsyncContainerExecutor : IDisposable
private readonly ConcurrentDictionary<string, BatchAsyncStreamer> streamersByPartitionKeyRange = new ConcurrentDictionary<string, BatchAsyncStreamer>();
private readonly ConcurrentDictionary<string, SemaphoreSlim> limitersByPartitionkeyRange = new ConcurrentDictionary<string, SemaphoreSlim>();
private readonly TimerPool timerPool;
private readonly RetryOptions retryOptions;

/// <summary>
/// For unit testing.
/// </summary>
internal BatchAsyncContainerExecutor()
{
}

public BatchAsyncContainerExecutor(
ContainerCore cosmosContainer,
CosmosClientContext cosmosClientContext,
int maxServerRequestOperationCount,
int maxServerRequestBodyLength,
int dispatchTimerInSeconds = BatchAsyncContainerExecutor.DefaultDispatchTimer)
int dispatchTimerInSeconds = BatchAsyncContainerExecutor.DefaultDispatchTimerInSeconds)
{
if (cosmosContainer == null)
{
Expand Down Expand Up @@ -70,9 +78,10 @@ public BatchAsyncContainerExecutor(
this.maxServerRequestOperationCount = maxServerRequestOperationCount;
this.dispatchTimerInSeconds = dispatchTimerInSeconds;
this.timerPool = new TimerPool(BatchAsyncContainerExecutor.MinimumDispatchTimerInSeconds);
this.retryOptions = cosmosClientContext.ClientOptions.GetConnectionPolicy().RetryOptions;
}

public async Task<BatchOperationResult> AddAsync(
public virtual async Task<BatchOperationResult> AddAsync(
ItemBatchOperation operation,
ItemRequestOptions itemRequestOptions = null,
CancellationToken cancellationToken = default(CancellationToken))
Expand All @@ -86,10 +95,10 @@ public async Task<BatchOperationResult> AddAsync(

string resolvedPartitionKeyRangeId = await this.ResolvePartitionKeyRangeIdAsync(operation, cancellationToken).ConfigureAwait(false);
BatchAsyncStreamer streamer = this.GetOrAddStreamerForPartitionKeyRange(resolvedPartitionKeyRangeId);
ItemBatchOperationContext context = new ItemBatchOperationContext(resolvedPartitionKeyRangeId);
ItemBatchOperationContext context = new ItemBatchOperationContext(resolvedPartitionKeyRangeId, BatchAsyncContainerExecutor.GetRetryPolicy(this.retryOptions));
operation.AttachContext(context);
streamer.Add(operation);
return await context.Task;
return await context.OperationTask;
}

public void Dispose()
Expand All @@ -107,7 +116,7 @@ public void Dispose()
this.timerPool.Dispose();
}

internal async Task ValidateOperationAsync(
internal virtual async Task ValidateOperationAsync(
ItemBatchOperation operation,
ItemRequestOptions itemRequestOptions = null,
CancellationToken cancellationToken = default(CancellationToken))
Expand Down Expand Up @@ -135,6 +144,14 @@ internal async Task ValidateOperationAsync(
}
}

private static IDocumentClientRetryPolicy GetRetryPolicy(RetryOptions retryOptions)
{
return new BulkPartitionKeyRangeGoneRetryPolicy(
new ResourceThrottleRetryPolicy(
retryOptions.MaxRetryAttemptsOnThrottledRequests,
retryOptions.MaxRetryWaitTimeInSeconds));
}

private static bool ValidateOperationEPK(
ItemBatchOperation operation,
ItemRequestOptions itemRequestOptions)
Expand Down
17 changes: 17 additions & 0 deletions Microsoft.Azure.Cosmos/src/Batch/BatchItemRequestOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,22 @@ class BatchItemRequestOptions : RequestOptions
/// <seealso cref="Microsoft.Azure.Cosmos.IndexingPolicy"/>
/// <seealso cref="IndexingDirective"/>
public IndexingDirective? IndexingDirective { get; set; }

internal static BatchItemRequestOptions FromItemRequestOptions(ItemRequestOptions itemRequestOptions)
{
if (itemRequestOptions == null)
{
return null;
}

RequestOptions requestOptions = itemRequestOptions as RequestOptions;
BatchItemRequestOptions batchItemRequestOptions = new BatchItemRequestOptions();
kirankumarkolli marked this conversation as resolved.
Show resolved Hide resolved
batchItemRequestOptions.IndexingDirective = itemRequestOptions.IndexingDirective;
batchItemRequestOptions.IfMatchEtag = requestOptions.IfMatchEtag;
batchItemRequestOptions.IfNoneMatchEtag = requestOptions.IfNoneMatchEtag;
batchItemRequestOptions.Properties = requestOptions.Properties;
batchItemRequestOptions.IsEffectivePartitionKeyRouting = requestOptions.IsEffectivePartitionKeyRouting;
return batchItemRequestOptions;
}
}
}
10 changes: 10 additions & 0 deletions Microsoft.Azure.Cosmos/src/Batch/BatchOperationResult.cs
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,16 @@ private static Result ReadOperationResult(ref RowReader reader, out BatchOperati

return Result.Success;
}

internal ResponseMessage ToResponseMessage()
{
ResponseMessage responseMessage = new ResponseMessage(this.StatusCode);
responseMessage.Headers.SubStatusCode = this.SubStatusCode;
responseMessage.Headers.ETag = this.ETag;
responseMessage.Headers.RetryAfter = this.RetryAfter;
responseMessage.Content = this.ResourceStream;
return responseMessage;
}
}

/// <summary>
Expand Down
28 changes: 26 additions & 2 deletions Microsoft.Azure.Cosmos/src/Batch/ItemBatchOperationContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@
namespace Microsoft.Azure.Cosmos
{
using System;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos.Core.Trace;
using Microsoft.Azure.Documents;

/// <summary>
/// Context for a particular Batch operation.
Expand All @@ -17,13 +19,35 @@ internal class ItemBatchOperationContext : IDisposable

public BatchAsyncBatcher CurrentBatcher { get; set; }

public Task<BatchOperationResult> Task => this.taskCompletionSource.Task;
public Task<BatchOperationResult> OperationTask => this.taskCompletionSource.Task;

private readonly IDocumentClientRetryPolicy retryPolicy;

private TaskCompletionSource<BatchOperationResult> taskCompletionSource = new TaskCompletionSource<BatchOperationResult>();

public ItemBatchOperationContext(string partitionKeyRangeId)
public ItemBatchOperationContext(
string partitionKeyRangeId,
IDocumentClientRetryPolicy retryPolicy = null)
{
this.PartitionKeyRangeId = partitionKeyRangeId;
this.retryPolicy = retryPolicy;
}

/// <summary>
/// Based on the Retry Policy, if a failed response should retry.
/// </summary>
public Task<ShouldRetryResult> ShouldRetryAsync(
BatchOperationResult batchOperationResult,
CancellationToken cancellationToken)
{
if (this.retryPolicy == null
|| batchOperationResult.IsSuccessStatusCode)
{
return Task.FromResult(ShouldRetryResult.NoRetry());
}

ResponseMessage responseMessage = batchOperationResult.ToResponseMessage();
return this.retryPolicy.ShouldRetryAsync(responseMessage, cancellationToken);
}

public void Complete(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
//------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
//------------------------------------------------------------

namespace Microsoft.Azure.Cosmos
{
using System;
using System.Net;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Documents;

/// <summary>
/// Used only in the context of Bulk Stream operations.
/// </summary>
/// <see cref="BatchAsyncBatcher"/>
/// <see cref="ItemBatchOperationContext"/>
internal sealed class BulkPartitionKeyRangeGoneRetryPolicy : IDocumentClientRetryPolicy
{
private const int MaxRetries = 1;

private readonly IDocumentClientRetryPolicy nextRetryPolicy;

private int retriesAttempted;

public BulkPartitionKeyRangeGoneRetryPolicy(IDocumentClientRetryPolicy nextRetryPolicy)
{
this.nextRetryPolicy = nextRetryPolicy;
}

public Task<ShouldRetryResult> ShouldRetryAsync(
Exception exception,
CancellationToken cancellationToken)
{
DocumentClientException clientException = exception as DocumentClientException;

ShouldRetryResult shouldRetryResult = this.ShouldRetryInternal(
clientException?.StatusCode,
clientException?.GetSubStatus(),
clientException?.ResourceAddress);

if (shouldRetryResult != null)
{
return Task.FromResult(shouldRetryResult);
}

if (this.nextRetryPolicy == null)
{
return Task.FromResult(ShouldRetryResult.NoRetry());
}

return this.nextRetryPolicy.ShouldRetryAsync(exception, cancellationToken);
}

public Task<ShouldRetryResult> ShouldRetryAsync(
ResponseMessage cosmosResponseMessage,
CancellationToken cancellationToken)
{
ShouldRetryResult shouldRetryResult = this.ShouldRetryInternal(cosmosResponseMessage?.StatusCode,
cosmosResponseMessage?.Headers.SubStatusCode,
cosmosResponseMessage?.GetResourceAddress());
if (shouldRetryResult != null)
{
return Task.FromResult(shouldRetryResult);
}

if (this.nextRetryPolicy == null)
{
return Task.FromResult(ShouldRetryResult.NoRetry());
}

return this.nextRetryPolicy.ShouldRetryAsync(cosmosResponseMessage, cancellationToken);
}

public void OnBeforeSendRequest(DocumentServiceRequest request)
{
this.nextRetryPolicy.OnBeforeSendRequest(request);
}

private ShouldRetryResult ShouldRetryInternal(
HttpStatusCode? statusCode,
SubStatusCodes? subStatusCode,
string resourceIdOrFullName)
{
if (statusCode == HttpStatusCode.Gone
&& subStatusCode == SubStatusCodes.PartitionKeyRangeGone
&& this.retriesAttempted < MaxRetries)
{
this.retriesAttempted++;
return ShouldRetryResult.RetryAfter(TimeSpan.Zero);
}

return null;
}
}
}
10 changes: 10 additions & 0 deletions Microsoft.Azure.Cosmos/src/CosmosClientOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,16 @@ public CosmosSerializer Serializer
}
}

/// <summary>
/// Allows optimistic batching of requests to service. Setting this option might impact the latency of the operations. Hence this option is recommended for non-latency sensitive scenarios only.
/// </summary>
#if PREVIEW
public
#else
internal
#endif
bool AllowBatchRequests { get; set; }
kirankumarkolli marked this conversation as resolved.
Show resolved Hide resolved

/// <summary>
/// A JSON serializer used by the CosmosClient to serialize or de-serialize cosmos request/responses.
/// The default serializer is always used for all system owned types like DatabaseProperties.
Expand Down
17 changes: 17 additions & 0 deletions Microsoft.Azure.Cosmos/src/Fluent/CosmosClientBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,23 @@ public CosmosClientBuilder WithCustomSerializer(CosmosSerializer cosmosJsonSeria
return this;
}

/// <summary>
/// Allows optimistic batching of requests to service. Setting this option might impact the latency of the operations. Hence this option is recommended for non-latency sensitive scenarios only.
/// </summary>
/// <param name="enabled">Whether <see cref="CosmosClientOptions.AllowBatchRequests"/> is enabled.</param>
/// <returns>The <see cref="CosmosClientBuilder"/> object</returns>
/// <seealso cref="CosmosClientOptions.AllowBatchRequests"/>
#if PREVIEW
public
#else
internal
#endif
CosmosClientBuilder WithBatchRequests(bool enabled)
{
this.clientOptions.AllowBatchRequests = enabled;
return this;
}

/// <summary>
/// The event handler to be invoked before the request is sent.
/// </summary>
Expand Down
Loading