Skip to content

Commit

Permalink
Diagnostics for Bulk Stream (#770)
Browse files Browse the repository at this point in the history
* creating executor

* Adding public API

* Updating contract

* Adding ToResponseMessage

* Wiring to executor

* Mockable executor

* Itemrequestoptions to batchitemrequestoptions

* cleanup

* Tests

* Wiring to executor

* Unit tests on executor

* Emulator tests on container

* executor retry handler

* Retry tests

* Handler tests

* under preview

* else

* Upsert support

* Using ResourceThrottleRetryPolicy

* Generating new context on retry

* Correctly obtaining retrypolicy

* Fixing unrelated test

* Removing unneeded constants

* Fixing conflict tests

* Replace and Delete support

* Adding Read support

* private instead of internal

* Fix for Read and DeleteItemAsync

* Typed API support

* Rename property

* Rename

* Refactor on ClientContext

* Refactoring through policies and batcher

* ItemId is required

* Undo unnecessary changes

* Diagnostics from response

* Introducing itembatchoperationstatistics

* Description

* More generic

* Batcher appends diagnostics

* Test to verify multiple operations

* Emulator tests

* On Context

* Rename

* Undoing comment

* Description

* Renaming variable

* Fixing test

* Rename

* Duplicat test

* Adding created and completed

* set is private

* Avoid creating executor instances

* Set diagnostics independently
  • Loading branch information
ealsur authored and kirankumarkolli committed Sep 12, 2019
1 parent bec2d81 commit 1f40f52
Show file tree
Hide file tree
Showing 14 changed files with 288 additions and 25 deletions.
1 change: 1 addition & 0 deletions Microsoft.Azure.Cosmos/src/Batch/BatchAsyncBatcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ public virtual bool TryAdd(ItemBatchOperation operation)
foreach (ItemBatchOperation itemBatchOperation in batchResponse.Operations)
{
BatchOperationResult response = batchResponse[itemBatchOperation.OperationIndex];
itemBatchOperation.Context.Diagnostics.AppendDiagnostics(batchResponse.Diagnostics);
if (!response.IsSuccessStatusCode)
{
Documents.ShouldRetryResult shouldRetry = await itemBatchOperation.Context.ShouldRetryAsync(response, cancellationToken);
Expand Down
6 changes: 6 additions & 0 deletions Microsoft.Azure.Cosmos/src/Batch/BatchOperationResult.cs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,11 @@ public virtual bool IsSuccessStatusCode
/// </summary>
internal virtual SubStatusCodes SubStatusCode { get; set; }

/// <summary>
/// Gets the cosmos diagnostic information for the current request to Azure Cosmos DB service
/// </summary>
internal virtual CosmosDiagnostics Diagnostics { get; set; }

internal static Result ReadOperationResult(Memory<byte> input, out BatchOperationResult batchOperationResult)
{
RowBuffer row = new RowBuffer(input.Length);
Expand Down Expand Up @@ -184,6 +189,7 @@ internal ResponseMessage ToResponseMessage()
responseMessage.Headers.ETag = this.ETag;
responseMessage.Headers.RetryAfter = this.RetryAfter;
responseMessage.Content = this.ResourceStream;
responseMessage.Diagnostics = this.Diagnostics;
return responseMessage;
}
}
Expand Down
15 changes: 14 additions & 1 deletion Microsoft.Azure.Cosmos/src/Batch/BatchResponse.cs
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,10 @@ internal BatchResponse(
double requestCharge,
TimeSpan? retryAfter,
string activityId,
CosmosDiagnostics cosmosDiagnostics,
ServerBatchRequest serverRequest,
CosmosSerializer serializer)
: this(statusCode, subStatusCode, errorMessage, requestCharge, retryAfter, activityId, serverRequest.Operations, serializer)
: this(statusCode, subStatusCode, errorMessage, requestCharge, retryAfter, activityId, cosmosDiagnostics, serverRequest.Operations, serializer)
{
}

Expand All @@ -61,6 +62,7 @@ internal BatchResponse(
requestCharge: 0,
retryAfter: null,
activityId: Guid.Empty.ToString(),
cosmosDiagnostics: null,
operations: operations,
serializer: null)
{
Expand All @@ -80,6 +82,7 @@ private BatchResponse(
double requestCharge,
TimeSpan? retryAfter,
string activityId,
CosmosDiagnostics cosmosDiagnostics,
IReadOnlyList<ItemBatchOperation> operations,
CosmosSerializer serializer)
{
Expand All @@ -91,6 +94,7 @@ private BatchResponse(
this.RequestCharge = requestCharge;
this.RetryAfter = retryAfter;
this.ActivityId = activityId;
this.Diagnostics = cosmosDiagnostics;
}

/// <summary>
Expand Down Expand Up @@ -140,6 +144,11 @@ public virtual bool IsSuccessStatusCode
/// </summary>
public virtual int Count => this.results?.Count ?? 0;

/// <summary>
/// Gets the cosmos diagnostic information for the current request to Azure Cosmos DB service
/// </summary>
public virtual CosmosDiagnostics Diagnostics { get; }

internal virtual SubStatusCodes SubStatusCode { get; }

internal virtual CosmosSerializer Serializer { get; }
Expand Down Expand Up @@ -237,6 +246,7 @@ internal static async Task<BatchResponse> FromResponseMessageAsync(
responseMessage.Headers.RequestCharge,
responseMessage.Headers.RetryAfter,
responseMessage.Headers.ActivityId,
responseMessage.Diagnostics,
serverRequest,
serializer);
}
Expand All @@ -250,6 +260,7 @@ internal static async Task<BatchResponse> FromResponseMessageAsync(
responseMessage.Headers.RequestCharge,
responseMessage.Headers.RetryAfter,
responseMessage.Headers.ActivityId,
responseMessage.Diagnostics,
serverRequest,
serializer);
}
Expand All @@ -267,6 +278,7 @@ internal static async Task<BatchResponse> FromResponseMessageAsync(
responseMessage.Headers.RequestCharge,
responseMessage.Headers.RetryAfter,
responseMessage.Headers.ActivityId,
responseMessage.Diagnostics,
serverRequest,
serializer);
}
Expand Down Expand Up @@ -339,6 +351,7 @@ record =>
responseMessage.Headers.RequestCharge,
responseMessage.Headers.RetryAfter,
responseMessage.Headers.ActivityId,
responseMessage.Diagnostics,
serverRequest,
serializer);

Expand Down
4 changes: 4 additions & 0 deletions Microsoft.Azure.Cosmos/src/Batch/ItemBatchOperationContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ internal class ItemBatchOperationContext : IDisposable

public Task<BatchOperationResult> OperationTask => this.taskCompletionSource.Task;

public ItemBatchOperationStatistics Diagnostics { get; } = new ItemBatchOperationStatistics();

private readonly IDocumentClientRetryPolicy retryPolicy;

private TaskCompletionSource<BatchOperationResult> taskCompletionSource = new TaskCompletionSource<BatchOperationResult>();
Expand Down Expand Up @@ -56,6 +58,8 @@ public void Complete(
{
if (this.AssertBatcher(completer))
{
this.Diagnostics.Complete();
result.Diagnostics = this.Diagnostics;
this.taskCompletionSource.SetResult(result);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ internal class PartitionKeyRangeBatchResponse : BatchResponse
{
// Results sorted in the order operations had been added.
private readonly BatchOperationResult[] resultsByOperationIndex;
private readonly BatchResponse serverResponse;
private bool isDisposed;

/// <summary>
Expand Down Expand Up @@ -49,11 +50,10 @@ internal PartitionKeyRangeBatchResponse(
{
this.StatusCode = serverResponse.StatusCode;

this.ServerResponse = serverResponse;
this.serverResponse = serverResponse;
this.resultsByOperationIndex = new BatchOperationResult[originalOperationsCount];

StringBuilder errorMessageBuilder = new StringBuilder();
List<string> activityIds = new List<string>();
List<ItemBatchOperation> itemBatchOperations = new List<ItemBatchOperation>();
// We expect number of results == number of operations here
for (int index = 0; index < serverResponse.Operations.Count; index++)
Expand All @@ -74,7 +74,6 @@ internal PartitionKeyRangeBatchResponse(
errorMessageBuilder.AppendFormat("{0}; ", serverResponse.ErrorMessage);
}

this.ActivityId = serverResponse.ActivityId;
this.ErrorMessage = errorMessageBuilder.Length > 2 ? errorMessageBuilder.ToString(0, errorMessageBuilder.Length - 2) : null;
this.Operations = itemBatchOperations;
this.Serializer = serializer;
Expand All @@ -83,12 +82,12 @@ internal PartitionKeyRangeBatchResponse(
/// <summary>
/// Gets the ActivityId that identifies the server request made to execute the batch request.
/// </summary>
public override string ActivityId { get; }
public override string ActivityId => this.serverResponse.ActivityId;

internal override CosmosSerializer Serializer { get; }
/// <inheritdoc />
public override CosmosDiagnostics Diagnostics => this.serverResponse.Diagnostics;

// for unit testing only
internal BatchResponse ServerResponse { get; private set; }
internal override CosmosSerializer Serializer { get; }

/// <summary>
/// Gets the number of operation results.
Expand Down Expand Up @@ -153,7 +152,7 @@ protected override void Dispose(bool disposing)
if (disposing && !this.isDisposed)
{
this.isDisposed = true;
this.ServerResponse?.Dispose();
this.serverResponse?.Dispose();
}

base.Dispose(disposing);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ namespace Microsoft.Azure.Cosmos
using System.IO;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos.Handlers;
using Microsoft.Azure.Cosmos.Routing;
using Microsoft.Azure.Cosmos.Scripts;
using Microsoft.Azure.Documents;
Expand Down Expand Up @@ -290,6 +289,11 @@ internal virtual Task<CollectionRoutingMap> GetRoutingMapAsync(CancellationToken

internal virtual BatchAsyncContainerExecutor InitializeBatchExecutorForContainer()
{
if (!this.ClientContext.ClientOptions.AllowBulkExecution)
{
return null;
}

return new BatchAsyncContainerExecutor(
this,
this.ClientContext,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
//------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
//------------------------------------------------------------

namespace Microsoft.Azure.Cosmos
{
using System;
using System.Collections.Generic;
using System.Text;

/// <summary>
/// A batch operation might extend multiple requests due to retries.
/// </summary>
internal class ItemBatchOperationStatistics : CosmosDiagnostics
{
private readonly DateTime created = DateTime.UtcNow;
private readonly List<CosmosDiagnostics> cosmosDiagnostics = new List<CosmosDiagnostics>();
private DateTime completed;

public void AppendDiagnostics(CosmosDiagnostics diagnostics)
{
this.cosmosDiagnostics.Add(diagnostics);
}

public void Complete()
{
this.completed = DateTime.UtcNow;
}

public override string ToString()
{
if (this.cosmosDiagnostics.Count == 0)
{
return string.Empty;
}

StringBuilder statistics = new StringBuilder($"Bulk operation started at {this.created}. ");
if (this.completed != null)
{
statistics.Append($"Completed at {this.completed}. ");
}

foreach (CosmosDiagnostics pointOperationStatistic in this.cosmosDiagnostics)
{
statistics.AppendLine(pointOperationStatistic.ToString());
}

return statistics.ToString();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public async Task CreateItemStream_WithBulk()
Task<ResponseMessage> task = tasks[i];
ResponseMessage result = await task;
Assert.AreEqual(HttpStatusCode.Created, result.StatusCode);

Assert.IsFalse(string.IsNullOrEmpty(result.Diagnostics.ToString()));
MyDocument document = cosmosDefaultJsonSerializer.FromStream<MyDocument>(result.Content);
Assert.AreEqual(i.ToString(), document.id);
}
Expand All @@ -75,6 +75,7 @@ public async Task CreateItemAsync_WithBulk()
{
Task<ItemResponse<MyDocument>> task = tasks[i];
ItemResponse<MyDocument> result = await task;
Assert.IsFalse(string.IsNullOrEmpty(result.Diagnostics.ToString()));
Assert.AreEqual(HttpStatusCode.Created, result.StatusCode);
}
}
Expand All @@ -95,7 +96,7 @@ public async Task UpsertItemStream_WithBulk()
Task<ResponseMessage> task = tasks[i];
ResponseMessage result = await task;
Assert.AreEqual(HttpStatusCode.Created, result.StatusCode);

Assert.IsFalse(string.IsNullOrEmpty(result.Diagnostics.ToString()));
MyDocument document = cosmosDefaultJsonSerializer.FromStream<MyDocument>(result.Content);
Assert.AreEqual(i.ToString(), document.id);
}
Expand All @@ -116,6 +117,7 @@ public async Task UpsertItem_WithBulk()
{
Task<ItemResponse<MyDocument>> task = tasks[i];
ItemResponse<MyDocument> result = await task;
Assert.IsFalse(string.IsNullOrEmpty(result.Diagnostics.ToString()));
Assert.AreEqual(HttpStatusCode.Created, result.StatusCode);
}
}
Expand Down Expand Up @@ -147,6 +149,7 @@ public async Task DeleteItemStream_WithBulk()
{
Task<ResponseMessage> task = deleteTasks[i];
ResponseMessage result = await task;
Assert.IsFalse(string.IsNullOrEmpty(result.Diagnostics.ToString()));
Assert.AreEqual(HttpStatusCode.NoContent, result.StatusCode);
}
}
Expand Down Expand Up @@ -178,6 +181,7 @@ public async Task DeleteItem_WithBulk()
{
Task<ItemResponse<MyDocument>> task = deleteTasks[i];
ItemResponse<MyDocument> result = await task;
Assert.IsFalse(string.IsNullOrEmpty(result.Diagnostics.ToString()));
Assert.AreEqual(HttpStatusCode.NoContent, result.StatusCode);
}
}
Expand All @@ -198,7 +202,7 @@ public async Task ReadItemStream_WithBulk()
await Task.WhenAll(tasks);

List<Task<ResponseMessage>> readTasks = new List<Task<ResponseMessage>>();
// Delete the items
// Read the items
foreach (MyDocument createdDocument in createdDocuments)
{
readTasks.Add(ExecuteReadStreamAsync(this.container, createdDocument));
Expand All @@ -209,6 +213,7 @@ public async Task ReadItemStream_WithBulk()
{
Task<ResponseMessage> task = readTasks[i];
ResponseMessage result = await task;
Assert.IsFalse(string.IsNullOrEmpty(result.Diagnostics.ToString()));
Assert.AreEqual(HttpStatusCode.OK, result.StatusCode);
}
}
Expand All @@ -229,7 +234,7 @@ public async Task ReadItem_WithBulk()
await Task.WhenAll(tasks);

List<Task<ItemResponse<MyDocument>>> readTasks = new List<Task<ItemResponse<MyDocument>>>();
// Delete the items
// Read the items
foreach (MyDocument createdDocument in createdDocuments)
{
readTasks.Add(ExecuteReadAsync(this.container, createdDocument));
Expand All @@ -240,6 +245,7 @@ public async Task ReadItem_WithBulk()
{
Task<ItemResponse<MyDocument>> task = readTasks[i];
ItemResponse<MyDocument> result = await task;
Assert.IsFalse(string.IsNullOrEmpty(result.Diagnostics.ToString()));
Assert.AreEqual(HttpStatusCode.OK, result.StatusCode);
}
}
Expand Down Expand Up @@ -271,6 +277,7 @@ public async Task ReplaceItemStream_WithBulk()
{
Task<ResponseMessage> task = replaceTasks[i];
ResponseMessage result = await task;
Assert.IsFalse(string.IsNullOrEmpty(result.Diagnostics.ToString()));
Assert.AreEqual(HttpStatusCode.OK, result.StatusCode);
}
}
Expand Down Expand Up @@ -302,6 +309,7 @@ public async Task ReplaceItem_WithBulk()
{
Task<ItemResponse<MyDocument>> task = replaceTasks[i];
ItemResponse<MyDocument> result = await task;
Assert.IsFalse(string.IsNullOrEmpty(result.Diagnostics.ToString()));
Assert.AreEqual(HttpStatusCode.OK, result.StatusCode);
}
}
Expand Down
Loading

0 comments on commit 1f40f52

Please sign in to comment.