Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Diagnostics for Bulk Stream #770

Merged
merged 64 commits into from
Sep 12, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
64 commits
Select commit Hold shift + click to select a range
b5b6dc3
creating executor
ealsur Aug 26, 2019
47b8497
Adding public API
ealsur Aug 26, 2019
0bd197d
Updating contract
ealsur Aug 26, 2019
f3561f3
Adding ToResponseMessage
ealsur Aug 26, 2019
bdc7327
Wiring to executor
ealsur Aug 26, 2019
5ecdd99
Mockable executor
ealsur Aug 26, 2019
a56e84c
Itemrequestoptions to batchitemrequestoptions
ealsur Aug 26, 2019
dbf1b67
cleanup
ealsur Aug 26, 2019
a930e59
Tests
ealsur Aug 26, 2019
ced7a82
Wiring to executor
ealsur Aug 26, 2019
ec1e6d8
Unit tests on executor
ealsur Aug 26, 2019
6cbb3b4
Emulator tests on container
ealsur Aug 26, 2019
feb1f08
executor retry handler
ealsur Aug 26, 2019
8839833
Retry tests
ealsur Aug 26, 2019
4af4183
Handler tests
ealsur Aug 26, 2019
39bc6d2
under preview
ealsur Aug 27, 2019
9d2af01
else
ealsur Aug 27, 2019
bf6f780
Upsert support
ealsur Aug 27, 2019
2ba3ed1
Using ResourceThrottleRetryPolicy
ealsur Aug 27, 2019
771c094
Generating new context on retry
ealsur Aug 27, 2019
9380021
Correctly obtaining retrypolicy
ealsur Aug 27, 2019
c40af5e
Fixing unrelated test
ealsur Aug 27, 2019
19a20ab
Removing unneeded constants
ealsur Aug 27, 2019
5c0fb63
Fixing conflict tests
ealsur Aug 27, 2019
c9ae84f
Replace and Delete support
ealsur Aug 27, 2019
9daee99
Adding Read support
ealsur Aug 28, 2019
af0a340
private instead of internal
ealsur Aug 28, 2019
3e0cbdc
Fix for Read and DeleteItemAsync
ealsur Aug 28, 2019
ad82745
Typed API support
ealsur Aug 29, 2019
0f39e05
Rename property
ealsur Aug 30, 2019
03a3c55
Rename
ealsur Sep 3, 2019
faf6ae8
Merge branch 'master' into users/ealsur/bulkstreamconfig
ealsur Sep 4, 2019
8bfcf4e
Refactor on ClientContext
ealsur Sep 4, 2019
41cae1d
Refactoring through policies and batcher
ealsur Sep 4, 2019
fda4292
ItemId is required
ealsur Sep 4, 2019
8fd1d1f
Undo unnecessary changes
ealsur Sep 4, 2019
f4724c0
merge with master
ealsur Sep 4, 2019
aebff8f
Diagnostics from response
ealsur Sep 4, 2019
36ea84a
Introducing itembatchoperationstatistics
ealsur Sep 4, 2019
0304ba4
Description
ealsur Sep 4, 2019
7d77faf
More generic
ealsur Sep 4, 2019
94460dc
Batcher appends diagnostics
ealsur Sep 4, 2019
7dbc381
Test to verify multiple operations
ealsur Sep 4, 2019
27085ec
Emulator tests
ealsur Sep 4, 2019
c54d8a2
On Context
ealsur Sep 4, 2019
2252f6f
Rename
ealsur Sep 4, 2019
153f790
Merge branch 'users/ealsur/bulkstreamconfig' into users/ealsur/bulkst…
ealsur Sep 4, 2019
6870378
Undoing comment
ealsur Sep 5, 2019
a3cf72c
Description
ealsur Sep 5, 2019
3ec10c6
Renaming variable
ealsur Sep 5, 2019
22cfaa8
merge
ealsur Sep 5, 2019
e0c3d4d
merge with master
ealsur Sep 5, 2019
afec0ca
Fixing test
ealsur Sep 5, 2019
2b72ef1
Merge branch 'users/ealsur/bulkstreamconfig' into users/ealsur/bulkst…
ealsur Sep 5, 2019
6282712
Rename
ealsur Sep 5, 2019
9b293a8
Merge branch 'users/ealsur/bulkstreamconfig' into users/ealsur/bulkst…
ealsur Sep 5, 2019
c67c2f6
merge with master
ealsur Sep 5, 2019
0420d83
Duplicat test
ealsur Sep 5, 2019
2364c9f
Adding created and completed
ealsur Sep 5, 2019
b6c7c30
set is private
ealsur Sep 6, 2019
a234ffd
Avoid creating executor instances
ealsur Sep 6, 2019
bcba0dd
Set diagnostics independently
ealsur Sep 9, 2019
d298289
Merge branch 'master' into users/ealsur/bulkstreamdiagnostics
ealsur Sep 10, 2019
d8fca9e
merge with master
ealsur Sep 10, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Microsoft.Azure.Cosmos/src/Batch/BatchAsyncBatcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ public virtual bool TryAdd(ItemBatchOperation operation)
foreach (ItemBatchOperation itemBatchOperation in batchResponse.Operations)
{
BatchOperationResult response = batchResponse[itemBatchOperation.OperationIndex];
itemBatchOperation.Context.Diagnostics.AppendDiagnostics(batchResponse.Diagnostics);
if (!response.IsSuccessStatusCode)
{
Documents.ShouldRetryResult shouldRetry = await itemBatchOperation.Context.ShouldRetryAsync(response, cancellationToken);
Expand Down
6 changes: 6 additions & 0 deletions Microsoft.Azure.Cosmos/src/Batch/BatchOperationResult.cs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,11 @@ public virtual bool IsSuccessStatusCode
/// </summary>
internal virtual SubStatusCodes SubStatusCode { get; set; }

/// <summary>
/// Gets the cosmos diagnostic information for the current request to Azure Cosmos DB service
/// </summary>
internal virtual CosmosDiagnostics Diagnostics { get; set; }
kirankumarkolli marked this conversation as resolved.
Show resolved Hide resolved

internal static Result ReadOperationResult(Memory<byte> input, out BatchOperationResult batchOperationResult)
{
RowBuffer row = new RowBuffer(input.Length);
Expand Down Expand Up @@ -184,6 +189,7 @@ internal ResponseMessage ToResponseMessage()
responseMessage.Headers.ETag = this.ETag;
responseMessage.Headers.RetryAfter = this.RetryAfter;
responseMessage.Content = this.ResourceStream;
responseMessage.Diagnostics = this.Diagnostics;
return responseMessage;
}
}
Expand Down
15 changes: 14 additions & 1 deletion Microsoft.Azure.Cosmos/src/Batch/BatchResponse.cs
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,10 @@ internal BatchResponse(
double requestCharge,
TimeSpan? retryAfter,
string activityId,
CosmosDiagnostics cosmosDiagnostics,
ServerBatchRequest serverRequest,
CosmosSerializer serializer)
: this(statusCode, subStatusCode, errorMessage, requestCharge, retryAfter, activityId, serverRequest.Operations, serializer)
: this(statusCode, subStatusCode, errorMessage, requestCharge, retryAfter, activityId, cosmosDiagnostics, serverRequest.Operations, serializer)
{
}

Expand All @@ -61,6 +62,7 @@ internal BatchResponse(
requestCharge: 0,
retryAfter: null,
activityId: Guid.Empty.ToString(),
cosmosDiagnostics: null,
operations: operations,
serializer: null)
{
Expand All @@ -80,6 +82,7 @@ private BatchResponse(
double requestCharge,
TimeSpan? retryAfter,
string activityId,
CosmosDiagnostics cosmosDiagnostics,
IReadOnlyList<ItemBatchOperation> operations,
CosmosSerializer serializer)
{
Expand All @@ -91,6 +94,7 @@ private BatchResponse(
this.RequestCharge = requestCharge;
this.RetryAfter = retryAfter;
this.ActivityId = activityId;
this.Diagnostics = cosmosDiagnostics;
}

/// <summary>
Expand Down Expand Up @@ -140,6 +144,11 @@ public virtual bool IsSuccessStatusCode
/// </summary>
public virtual int Count => this.results?.Count ?? 0;

/// <summary>
/// Gets the cosmos diagnostic information for the current request to Azure Cosmos DB service
/// </summary>
public virtual CosmosDiagnostics Diagnostics { get; }

internal virtual SubStatusCodes SubStatusCode { get; }

internal virtual CosmosSerializer Serializer { get; }
Expand Down Expand Up @@ -236,6 +245,7 @@ internal static async Task<BatchResponse> FromResponseMessageAsync(
responseMessage.Headers.RequestCharge,
responseMessage.Headers.RetryAfter,
responseMessage.Headers.ActivityId,
responseMessage.Diagnostics,
serverRequest,
serializer);
}
Expand All @@ -249,6 +259,7 @@ internal static async Task<BatchResponse> FromResponseMessageAsync(
responseMessage.Headers.RequestCharge,
responseMessage.Headers.RetryAfter,
responseMessage.Headers.ActivityId,
responseMessage.Diagnostics,
serverRequest,
serializer);
}
Expand All @@ -266,6 +277,7 @@ internal static async Task<BatchResponse> FromResponseMessageAsync(
responseMessage.Headers.RequestCharge,
responseMessage.Headers.RetryAfter,
responseMessage.Headers.ActivityId,
responseMessage.Diagnostics,
serverRequest,
serializer);
}
Expand Down Expand Up @@ -338,6 +350,7 @@ record =>
responseMessage.Headers.RequestCharge,
responseMessage.Headers.RetryAfter,
responseMessage.Headers.ActivityId,
responseMessage.Diagnostics,
serverRequest,
serializer);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ internal class ItemBatchOperationContext : IDisposable

public Task<BatchOperationResult> OperationTask => this.taskCompletionSource.Task;

public ItemBatchOperationStatistics Diagnostics { get; } = new ItemBatchOperationStatistics();

private readonly IDocumentClientRetryPolicy retryPolicy;

private TaskCompletionSource<BatchOperationResult> taskCompletionSource = new TaskCompletionSource<BatchOperationResult>();
Expand Down Expand Up @@ -56,6 +58,8 @@ public void Complete(
{
if (this.AssertBatcher(completer))
{
this.Diagnostics.Complete();
result.Diagnostics = this.Diagnostics;
this.taskCompletionSource.SetResult(result);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ internal class PartitionKeyRangeBatchResponse : BatchResponse
{
// Results sorted in the order operations had been added.
private readonly BatchOperationResult[] resultsByOperationIndex;
private readonly BatchResponse serverResponse;
private bool isDisposed;

/// <summary>
Expand Down Expand Up @@ -49,11 +50,10 @@ internal PartitionKeyRangeBatchResponse(
{
this.StatusCode = serverResponse.StatusCode;

this.ServerResponse = serverResponse;
this.serverResponse = serverResponse;
this.resultsByOperationIndex = new BatchOperationResult[originalOperationsCount];

StringBuilder errorMessageBuilder = new StringBuilder();
List<string> activityIds = new List<string>();
List<ItemBatchOperation> itemBatchOperations = new List<ItemBatchOperation>();
// We expect number of results == number of operations here
for (int index = 0; index < serverResponse.Operations.Count; index++)
Expand All @@ -74,7 +74,6 @@ internal PartitionKeyRangeBatchResponse(
errorMessageBuilder.AppendFormat("{0}; ", serverResponse.ErrorMessage);
}

this.ActivityId = serverResponse.ActivityId;
this.ErrorMessage = errorMessageBuilder.Length > 2 ? errorMessageBuilder.ToString(0, errorMessageBuilder.Length - 2) : null;
this.Operations = itemBatchOperations;
this.Serializer = serializer;
Expand All @@ -83,12 +82,12 @@ internal PartitionKeyRangeBatchResponse(
/// <summary>
/// Gets the ActivityId that identifies the server request made to execute the batch request.
/// </summary>
public override string ActivityId { get; }
public override string ActivityId => this.serverResponse.ActivityId;

internal override CosmosSerializer Serializer { get; }
/// <inheritdoc />
public override CosmosDiagnostics Diagnostics => this.serverResponse.Diagnostics;

// for unit testing only
internal BatchResponse ServerResponse { get; private set; }
internal override CosmosSerializer Serializer { get; }

/// <summary>
/// Gets the number of operation results.
Expand Down Expand Up @@ -148,7 +147,7 @@ protected override void Dispose(bool disposing)
if (disposing && !this.isDisposed)
{
this.isDisposed = true;
this.ServerResponse?.Dispose();
this.serverResponse?.Dispose();
}

base.Dispose(disposing);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ namespace Microsoft.Azure.Cosmos
using System.IO;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos.Handlers;
using Microsoft.Azure.Cosmos.Routing;
using Microsoft.Azure.Cosmos.Scripts;
using Microsoft.Azure.Documents;
Expand Down Expand Up @@ -290,6 +289,11 @@ internal virtual Task<CollectionRoutingMap> GetRoutingMapAsync(CancellationToken

internal virtual BatchAsyncContainerExecutor InitializeBatchExecutorForContainer()
{
if (!this.ClientContext.ClientOptions.AllowBulkExecution)
{
return null;
}

return new BatchAsyncContainerExecutor(
this,
this.ClientContext,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
//------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
//------------------------------------------------------------

namespace Microsoft.Azure.Cosmos
{
using System;
using System.Collections.Generic;
using System.Text;

/// <summary>
/// A batch operation might extend multiple requests due to retries.
/// </summary>
internal class ItemBatchOperationStatistics : CosmosDiagnostics
kirankumarkolli marked this conversation as resolved.
Show resolved Hide resolved
{
private readonly DateTime created = DateTime.UtcNow;
private readonly List<CosmosDiagnostics> cosmosDiagnostics = new List<CosmosDiagnostics>();
private DateTime completed;

public void AppendDiagnostics(CosmosDiagnostics diagnostics)
{
this.cosmosDiagnostics.Add(diagnostics);
}

public void Complete()
{
this.completed = DateTime.UtcNow;
}

public override string ToString()
{
if (this.cosmosDiagnostics.Count == 0)
{
return string.Empty;
}

StringBuilder statistics = new StringBuilder($"Bulk operation started at {this.created}. ");
if (this.completed != null)
{
statistics.Append($"Completed at {this.completed}. ");
}

foreach (CosmosDiagnostics pointOperationStatistic in this.cosmosDiagnostics)
{
statistics.AppendLine(pointOperationStatistic.ToString());
}

return statistics.ToString();
j82w marked this conversation as resolved.
Show resolved Hide resolved
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public async Task CreateItemStream_WithBulk()
Task<ResponseMessage> task = tasks[i];
ResponseMessage result = await task;
Assert.AreEqual(HttpStatusCode.Created, result.StatusCode);

Assert.IsFalse(string.IsNullOrEmpty(result.Diagnostics.ToString()));
MyDocument document = cosmosDefaultJsonSerializer.FromStream<MyDocument>(result.Content);
Assert.AreEqual(i.ToString(), document.id);
}
Expand All @@ -75,6 +75,7 @@ public async Task CreateItemAsync_WithBulk()
{
Task<ItemResponse<MyDocument>> task = tasks[i];
ItemResponse<MyDocument> result = await task;
Assert.IsFalse(string.IsNullOrEmpty(result.Diagnostics.ToString()));
Assert.AreEqual(HttpStatusCode.Created, result.StatusCode);
}
}
Expand All @@ -95,7 +96,7 @@ public async Task UpsertItemStream_WithBulk()
Task<ResponseMessage> task = tasks[i];
ResponseMessage result = await task;
Assert.AreEqual(HttpStatusCode.Created, result.StatusCode);

Assert.IsFalse(string.IsNullOrEmpty(result.Diagnostics.ToString()));
MyDocument document = cosmosDefaultJsonSerializer.FromStream<MyDocument>(result.Content);
Assert.AreEqual(i.ToString(), document.id);
}
Expand All @@ -116,6 +117,7 @@ public async Task UpsertItem_WithBulk()
{
Task<ItemResponse<MyDocument>> task = tasks[i];
ItemResponse<MyDocument> result = await task;
Assert.IsFalse(string.IsNullOrEmpty(result.Diagnostics.ToString()));
Assert.AreEqual(HttpStatusCode.Created, result.StatusCode);
}
}
Expand Down Expand Up @@ -147,6 +149,7 @@ public async Task DeleteItemStream_WithBulk()
{
Task<ResponseMessage> task = deleteTasks[i];
ResponseMessage result = await task;
Assert.IsFalse(string.IsNullOrEmpty(result.Diagnostics.ToString()));
Assert.AreEqual(HttpStatusCode.NoContent, result.StatusCode);
}
}
Expand Down Expand Up @@ -178,6 +181,7 @@ public async Task DeleteItem_WithBulk()
{
Task<ItemResponse<MyDocument>> task = deleteTasks[i];
ItemResponse<MyDocument> result = await task;
Assert.IsFalse(string.IsNullOrEmpty(result.Diagnostics.ToString()));
Assert.AreEqual(HttpStatusCode.NoContent, result.StatusCode);
}
}
Expand All @@ -198,7 +202,7 @@ public async Task ReadItemStream_WithBulk()
await Task.WhenAll(tasks);

List<Task<ResponseMessage>> readTasks = new List<Task<ResponseMessage>>();
// Delete the items
// Read the items
foreach (MyDocument createdDocument in createdDocuments)
{
readTasks.Add(ExecuteReadStreamAsync(this.container, createdDocument));
Expand All @@ -209,6 +213,7 @@ public async Task ReadItemStream_WithBulk()
{
Task<ResponseMessage> task = readTasks[i];
ResponseMessage result = await task;
Assert.IsFalse(string.IsNullOrEmpty(result.Diagnostics.ToString()));
Assert.AreEqual(HttpStatusCode.OK, result.StatusCode);
}
}
Expand All @@ -229,7 +234,7 @@ public async Task ReadItem_WithBulk()
await Task.WhenAll(tasks);

List<Task<ItemResponse<MyDocument>>> readTasks = new List<Task<ItemResponse<MyDocument>>>();
// Delete the items
// Read the items
foreach (MyDocument createdDocument in createdDocuments)
{
readTasks.Add(ExecuteReadAsync(this.container, createdDocument));
Expand All @@ -240,6 +245,7 @@ public async Task ReadItem_WithBulk()
{
Task<ItemResponse<MyDocument>> task = readTasks[i];
ItemResponse<MyDocument> result = await task;
Assert.IsFalse(string.IsNullOrEmpty(result.Diagnostics.ToString()));
Assert.AreEqual(HttpStatusCode.OK, result.StatusCode);
}
}
Expand Down Expand Up @@ -271,6 +277,7 @@ public async Task ReplaceItemStream_WithBulk()
{
Task<ResponseMessage> task = replaceTasks[i];
ResponseMessage result = await task;
Assert.IsFalse(string.IsNullOrEmpty(result.Diagnostics.ToString()));
Assert.AreEqual(HttpStatusCode.OK, result.StatusCode);
}
}
Expand Down Expand Up @@ -302,6 +309,7 @@ public async Task ReplaceItem_WithBulk()
{
Task<ItemResponse<MyDocument>> task = replaceTasks[i];
ItemResponse<MyDocument> result = await task;
Assert.IsFalse(string.IsNullOrEmpty(result.Diagnostics.ToString()));
Assert.AreEqual(HttpStatusCode.OK, result.StatusCode);
}
}
Expand Down
Loading