From c7cb8acb7166bee05f760704544e707df9421676 Mon Sep 17 00:00:00 2001 From: j82w Date: Fri, 19 Jun 2020 12:46:55 -0700 Subject: [PATCH] Diagnostics: Add synchronization context Part 2 for Container (#1643) * Diagnostics: Add synchronization context Part 2 for Container --- .../Resource/Container/ContainerCore.Items.cs | 290 ++++++++---------- .../src/Resource/Container/ContainerCore.cs | 75 +++-- .../Resource/Container/ContainerInlineCore.cs | 139 +++++++-- .../src/Resource/Database/DatabaseCore.cs | 121 +++----- .../Utils/DiagnosticValidators.cs | 4 +- .../CosmosItemUnitTests.cs | 20 ++ 6 files changed, 349 insertions(+), 300 deletions(-) diff --git a/Microsoft.Azure.Cosmos/src/Resource/Container/ContainerCore.Items.cs b/Microsoft.Azure.Cosmos/src/Resource/Container/ContainerCore.Items.cs index 312a907b35..47e850b44c 100644 --- a/Microsoft.Azure.Cosmos/src/Resource/Container/ContainerCore.Items.cs +++ b/Microsoft.Azure.Cosmos/src/Resource/Container/ContainerCore.Items.cs @@ -29,7 +29,7 @@ namespace Microsoft.Azure.Cosmos /// 1. The object operations where it serializes and deserializes the item on request/response /// 2. The stream response which takes a Stream containing a JSON serialized object and returns a response containing a Stream /// - internal partial class ContainerCore : ContainerInternal + internal abstract partial class ContainerCore : ContainerInternal { /// /// Cache the full URI segment without the last resource id. @@ -39,27 +39,25 @@ internal partial class ContainerCore : ContainerInternal private readonly CosmosQueryClient queryClient; - public override async Task CreateItemStreamAsync( - Stream streamPayload, - PartitionKey partitionKey, - ItemRequestOptions requestOptions = null, - CancellationToken cancellationToken = default(CancellationToken)) + public async Task CreateItemStreamAsync( + CosmosDiagnosticsContext diagnosticsContext, + Stream streamPayload, + PartitionKey partitionKey, + ItemRequestOptions requestOptions = null, + CancellationToken cancellationToken = default(CancellationToken)) { - CosmosDiagnosticsContext diagnosticsContext = CosmosDiagnosticsContext.Create(requestOptions); - using (diagnosticsContext.GetOverallScope()) - { - return await this.ProcessItemStreamAsync( - partitionKey: partitionKey, - itemId: null, - streamPayload: streamPayload, - operationType: OperationType.Create, - requestOptions: requestOptions, - diagnosticsContext: diagnosticsContext, - cancellationToken: cancellationToken); - } + return await this.ProcessItemStreamAsync( + partitionKey: partitionKey, + itemId: null, + streamPayload: streamPayload, + operationType: OperationType.Create, + requestOptions: requestOptions, + diagnosticsContext: diagnosticsContext, + cancellationToken: cancellationToken); } - public override async Task> CreateItemAsync( + public async Task> CreateItemAsync( + CosmosDiagnosticsContext diagnosticsContext, T item, PartitionKey? partitionKey = null, ItemRequestOptions requestOptions = null, @@ -70,85 +68,73 @@ public override async Task> CreateItemAsync( throw new ArgumentNullException(nameof(item)); } - CosmosDiagnosticsContext diagnosticsContext = CosmosDiagnosticsContext.Create(requestOptions); - using (diagnosticsContext.GetOverallScope()) - { - ResponseMessage response = await this.ExtractPartitionKeyAndProcessItemStreamAsync( - partitionKey: partitionKey, - itemId: null, - item: item, - operationType: OperationType.Create, - requestOptions: requestOptions, - diagnosticsContext: diagnosticsContext, - cancellationToken: cancellationToken); + ResponseMessage response = await this.ExtractPartitionKeyAndProcessItemStreamAsync( + partitionKey: partitionKey, + itemId: null, + item: item, + operationType: OperationType.Create, + requestOptions: requestOptions, + diagnosticsContext: diagnosticsContext, + cancellationToken: cancellationToken); - return this.ClientContext.ResponseFactory.CreateItemResponse(response); - } + return this.ClientContext.ResponseFactory.CreateItemResponse(response); } - public override async Task ReadItemStreamAsync( - string id, - PartitionKey partitionKey, - ItemRequestOptions requestOptions = null, - CancellationToken cancellationToken = default(CancellationToken)) + public async Task ReadItemStreamAsync( + CosmosDiagnosticsContext diagnosticsContext, + string id, + PartitionKey partitionKey, + ItemRequestOptions requestOptions = null, + CancellationToken cancellationToken = default(CancellationToken)) { - CosmosDiagnosticsContext diagnosticsContext = CosmosDiagnosticsContext.Create(requestOptions); - using (diagnosticsContext.GetOverallScope()) - { - return await this.ProcessItemStreamAsync( - partitionKey: partitionKey, - itemId: id, - streamPayload: null, - operationType: OperationType.Read, - requestOptions: requestOptions, - diagnosticsContext: diagnosticsContext, - cancellationToken: cancellationToken); - } + return await this.ProcessItemStreamAsync( + partitionKey: partitionKey, + itemId: id, + streamPayload: null, + operationType: OperationType.Read, + requestOptions: requestOptions, + diagnosticsContext: diagnosticsContext, + cancellationToken: cancellationToken); } - public override async Task> ReadItemAsync( + public async Task> ReadItemAsync( + CosmosDiagnosticsContext diagnosticsContext, string id, PartitionKey partitionKey, ItemRequestOptions requestOptions = null, CancellationToken cancellationToken = default(CancellationToken)) { - CosmosDiagnosticsContext diagnosticsContext = CosmosDiagnosticsContext.Create(requestOptions); - using (diagnosticsContext.GetOverallScope()) - { - ResponseMessage response = await this.ProcessItemStreamAsync( - partitionKey: partitionKey, - itemId: id, - streamPayload: null, - operationType: OperationType.Read, - requestOptions: requestOptions, - diagnosticsContext: diagnosticsContext, - cancellationToken: cancellationToken); + ResponseMessage response = await this.ProcessItemStreamAsync( + partitionKey: partitionKey, + itemId: id, + streamPayload: null, + operationType: OperationType.Read, + requestOptions: requestOptions, + diagnosticsContext: diagnosticsContext, + cancellationToken: cancellationToken); - return this.ClientContext.ResponseFactory.CreateItemResponse(response); - } + return this.ClientContext.ResponseFactory.CreateItemResponse(response); } - public override async Task UpsertItemStreamAsync( - Stream streamPayload, - PartitionKey partitionKey, - ItemRequestOptions requestOptions = null, - CancellationToken cancellationToken = default(CancellationToken)) + public async Task UpsertItemStreamAsync( + CosmosDiagnosticsContext diagnosticsContext, + Stream streamPayload, + PartitionKey partitionKey, + ItemRequestOptions requestOptions = null, + CancellationToken cancellationToken = default(CancellationToken)) { - CosmosDiagnosticsContext diagnosticsContext = CosmosDiagnosticsContext.Create(requestOptions); - using (diagnosticsContext.GetOverallScope()) - { - return await this.ProcessItemStreamAsync( - partitionKey: partitionKey, - itemId: null, - streamPayload: streamPayload, - operationType: OperationType.Upsert, - requestOptions: requestOptions, - diagnosticsContext: diagnosticsContext, - cancellationToken: cancellationToken); - } + return await this.ProcessItemStreamAsync( + partitionKey: partitionKey, + itemId: null, + streamPayload: streamPayload, + operationType: OperationType.Upsert, + requestOptions: requestOptions, + diagnosticsContext: diagnosticsContext, + cancellationToken: cancellationToken); } - public override async Task> UpsertItemAsync( + public async Task> UpsertItemAsync( + CosmosDiagnosticsContext diagnosticsContext, T item, PartitionKey? partitionKey = null, ItemRequestOptions requestOptions = null, @@ -159,44 +145,38 @@ public override async Task> UpsertItemAsync( throw new ArgumentNullException(nameof(item)); } - CosmosDiagnosticsContext diagnosticsContext = CosmosDiagnosticsContext.Create(requestOptions); - using (diagnosticsContext.GetOverallScope()) - { - ResponseMessage response = await this.ExtractPartitionKeyAndProcessItemStreamAsync( - partitionKey: partitionKey, - itemId: null, - item: item, - operationType: OperationType.Upsert, - requestOptions: requestOptions, - diagnosticsContext: diagnosticsContext, - cancellationToken: cancellationToken); + ResponseMessage response = await this.ExtractPartitionKeyAndProcessItemStreamAsync( + partitionKey: partitionKey, + itemId: null, + item: item, + operationType: OperationType.Upsert, + requestOptions: requestOptions, + diagnosticsContext: diagnosticsContext, + cancellationToken: cancellationToken); - return this.ClientContext.ResponseFactory.CreateItemResponse(response); - } + return this.ClientContext.ResponseFactory.CreateItemResponse(response); } - public override async Task ReplaceItemStreamAsync( - Stream streamPayload, - string id, - PartitionKey partitionKey, - ItemRequestOptions requestOptions = null, - CancellationToken cancellationToken = default(CancellationToken)) + public async Task ReplaceItemStreamAsync( + CosmosDiagnosticsContext diagnosticsContext, + Stream streamPayload, + string id, + PartitionKey partitionKey, + ItemRequestOptions requestOptions = null, + CancellationToken cancellationToken = default(CancellationToken)) { - CosmosDiagnosticsContext diagnosticsContext = CosmosDiagnosticsContext.Create(requestOptions); - using (diagnosticsContext.GetOverallScope()) - { - return await this.ProcessItemStreamAsync( - partitionKey: partitionKey, - itemId: id, - streamPayload: streamPayload, - operationType: OperationType.Replace, - requestOptions: requestOptions, - diagnosticsContext: diagnosticsContext, - cancellationToken: cancellationToken); - } + return await this.ProcessItemStreamAsync( + partitionKey: partitionKey, + itemId: id, + streamPayload: streamPayload, + operationType: OperationType.Replace, + requestOptions: requestOptions, + diagnosticsContext: diagnosticsContext, + cancellationToken: cancellationToken); } - public override async Task> ReplaceItemAsync( + public async Task> ReplaceItemAsync( + CosmosDiagnosticsContext diagnosticsContext, T item, string id, PartitionKey? partitionKey = null, @@ -213,68 +193,58 @@ public override async Task> ReplaceItemAsync( throw new ArgumentNullException(nameof(item)); } - CosmosDiagnosticsContext diagnosticsContext = CosmosDiagnosticsContext.Create(requestOptions); - using (diagnosticsContext.GetOverallScope()) - { - ResponseMessage response = await this.ExtractPartitionKeyAndProcessItemStreamAsync( - partitionKey: partitionKey, - itemId: id, - item: item, - operationType: OperationType.Replace, - requestOptions: requestOptions, - diagnosticsContext: diagnosticsContext, - cancellationToken: cancellationToken); + ResponseMessage response = await this.ExtractPartitionKeyAndProcessItemStreamAsync( + partitionKey: partitionKey, + itemId: id, + item: item, + operationType: OperationType.Replace, + requestOptions: requestOptions, + diagnosticsContext: diagnosticsContext, + cancellationToken: cancellationToken); - return this.ClientContext.ResponseFactory.CreateItemResponse(response); - } + return this.ClientContext.ResponseFactory.CreateItemResponse(response); } - public override async Task DeleteItemStreamAsync( - string id, - PartitionKey partitionKey, - ItemRequestOptions requestOptions = null, - CancellationToken cancellationToken = default(CancellationToken)) + public async Task DeleteItemStreamAsync( + CosmosDiagnosticsContext diagnosticsContext, + string id, + PartitionKey partitionKey, + ItemRequestOptions requestOptions = null, + CancellationToken cancellationToken = default(CancellationToken)) { - CosmosDiagnosticsContext diagnosticsContext = CosmosDiagnosticsContext.Create(requestOptions); - using (diagnosticsContext.GetOverallScope()) - { - return await this.ProcessItemStreamAsync( - partitionKey: partitionKey, - itemId: id, - streamPayload: null, - operationType: OperationType.Delete, - requestOptions: requestOptions, - diagnosticsContext: diagnosticsContext, - cancellationToken: cancellationToken); - } + return await this.ProcessItemStreamAsync( + partitionKey: partitionKey, + itemId: id, + streamPayload: null, + operationType: OperationType.Delete, + requestOptions: requestOptions, + diagnosticsContext: diagnosticsContext, + cancellationToken: cancellationToken); } - public override async Task> DeleteItemAsync( + public async Task> DeleteItemAsync( + CosmosDiagnosticsContext diagnosticsContext, string id, PartitionKey partitionKey, ItemRequestOptions requestOptions = null, CancellationToken cancellationToken = default(CancellationToken)) { - CosmosDiagnosticsContext diagnosticsContext = CosmosDiagnosticsContext.Create(requestOptions); - using (diagnosticsContext.GetOverallScope()) - { - ResponseMessage response = await this.ProcessItemStreamAsync( - partitionKey: partitionKey, - itemId: id, - streamPayload: null, - operationType: OperationType.Delete, - requestOptions: requestOptions, - diagnosticsContext: diagnosticsContext, - cancellationToken: cancellationToken); + ResponseMessage response = await this.ProcessItemStreamAsync( + partitionKey: partitionKey, + itemId: id, + streamPayload: null, + operationType: OperationType.Delete, + requestOptions: requestOptions, + diagnosticsContext: diagnosticsContext, + cancellationToken: cancellationToken); - return this.ClientContext.ResponseFactory.CreateItemResponse(response); - } + return this.ClientContext.ResponseFactory.CreateItemResponse(response); } public override FeedIterator GetItemQueryStreamIterator( - string queryText = null, - string continuationToken = null, - QueryRequestOptions requestOptions = null) + string queryText = null, + string continuationToken = null, + QueryRequestOptions requestOptions = null) { QueryDefinition queryDefinition = null; if (queryText != null) diff --git a/Microsoft.Azure.Cosmos/src/Resource/Container/ContainerCore.cs b/Microsoft.Azure.Cosmos/src/Resource/Container/ContainerCore.cs index 10482bfea0..e87dccd915 100644 --- a/Microsoft.Azure.Cosmos/src/Resource/Container/ContainerCore.cs +++ b/Microsoft.Azure.Cosmos/src/Resource/Container/ContainerCore.cs @@ -21,7 +21,7 @@ namespace Microsoft.Azure.Cosmos /// /// for creating new containers, and reading/querying all containers; /// - internal partial class ContainerCore : ContainerInternal + internal abstract partial class ContainerCore : ContainerInternal { private readonly Lazy lazyBatchExecutor; @@ -60,18 +60,21 @@ protected ContainerCore( public override Scripts.Scripts Scripts { get; } - public override async Task ReadContainerAsync( + public async Task ReadContainerAsync( + CosmosDiagnosticsContext diagnosticsContext, ContainerRequestOptions requestOptions = null, CancellationToken cancellationToken = default(CancellationToken)) { ResponseMessage response = await this.ReadContainerStreamAsync( + diagnosticsContext: diagnosticsContext, requestOptions: requestOptions, cancellationToken: cancellationToken); return this.ClientContext.ResponseFactory.CreateContainerResponse(this, response); } - public override async Task ReplaceContainerAsync( + public async Task ReplaceContainerAsync( + CosmosDiagnosticsContext diagnosticsContext, ContainerProperties containerProperties, ContainerRequestOptions requestOptions = null, CancellationToken cancellationToken = default(CancellationToken)) @@ -83,6 +86,7 @@ public override async Task ReplaceContainerAsync( this.ClientContext.ValidateResource(containerProperties.Id); ResponseMessage response = await this.ReplaceStreamInternalAsync( + diagnosticsContext: diagnosticsContext, streamPayload: this.ClientContext.SerializerCore.ToStream(containerProperties), requestOptions: requestOptions, cancellationToken: cancellationToken); @@ -90,25 +94,29 @@ public override async Task ReplaceContainerAsync( return this.ClientContext.ResponseFactory.CreateContainerResponse(this, response); } - public override async Task DeleteContainerAsync( + public async Task DeleteContainerAsync( + CosmosDiagnosticsContext diagnosticsContext, ContainerRequestOptions requestOptions = null, CancellationToken cancellationToken = default(CancellationToken)) { ResponseMessage response = await this.DeleteContainerStreamAsync( + diagnosticsContext: diagnosticsContext, requestOptions: requestOptions, cancellationToken: cancellationToken); return this.ClientContext.ResponseFactory.CreateContainerResponse(this, response); } - public override async Task ReadThroughputAsync( + public async Task ReadThroughputAsync( + CosmosDiagnosticsContext diagnosticsContext, CancellationToken cancellationToken = default(CancellationToken)) { ThroughputResponse response = await this.ReadThroughputIfExistsAsync(null, cancellationToken); return response.Resource?.Throughput; } - public override async Task ReadThroughputAsync( + public async Task ReadThroughputAsync( + CosmosDiagnosticsContext diagnosticsContext, RequestOptions requestOptions, CancellationToken cancellationToken = default(CancellationToken)) { @@ -117,7 +125,8 @@ public override async Task ReadThroughputAsync( return await cosmosOffers.ReadThroughputAsync(rid, requestOptions, cancellationToken); } - public override async Task ReadThroughputIfExistsAsync( + public async Task ReadThroughputIfExistsAsync( + CosmosDiagnosticsContext diagnosticsContext, RequestOptions requestOptions, CancellationToken cancellationToken = default(CancellationToken)) { @@ -126,7 +135,8 @@ public override async Task ReadThroughputIfExistsAsync( return await cosmosOffers.ReadThroughputIfExistsAsync(rid, requestOptions, cancellationToken); } - public override async Task ReplaceThroughputAsync( + public async Task ReplaceThroughputAsync( + CosmosDiagnosticsContext diagnosticsContext, int throughput, RequestOptions requestOptions = null, CancellationToken cancellationToken = default(CancellationToken)) @@ -141,7 +151,8 @@ public override async Task ReplaceThroughputAsync( cancellationToken: cancellationToken); } - public override async Task ReplaceThroughputIfExistsAsync( + public async Task ReplaceThroughputIfExistsAsync( + CosmosDiagnosticsContext diagnosticsContext, ThroughputProperties throughput, RequestOptions requestOptions = null, CancellationToken cancellationToken = default(CancellationToken)) @@ -156,7 +167,8 @@ public override async Task ReplaceThroughputIfExistsAsync( cancellationToken: cancellationToken); } - public override async Task ReplaceThroughputAsync( + public async Task ReplaceThroughputAsync( + CosmosDiagnosticsContext diagnosticsContext, ThroughputProperties throughputProperties, RequestOptions requestOptions = null, CancellationToken cancellationToken = default) @@ -170,29 +182,34 @@ public override async Task ReplaceThroughputAsync( cancellationToken); } - public override Task DeleteContainerStreamAsync( + public Task DeleteContainerStreamAsync( + CosmosDiagnosticsContext diagnosticsContext, ContainerRequestOptions requestOptions = null, CancellationToken cancellationToken = default(CancellationToken)) { return this.ProcessStreamAsync( - streamPayload: null, - operationType: OperationType.Delete, - requestOptions: requestOptions, - cancellationToken: cancellationToken); + diagnosticsContext: diagnosticsContext, + streamPayload: null, + operationType: OperationType.Delete, + requestOptions: requestOptions, + cancellationToken: cancellationToken); } - public override Task ReadContainerStreamAsync( + public Task ReadContainerStreamAsync( + CosmosDiagnosticsContext diagnosticsContext, ContainerRequestOptions requestOptions = null, CancellationToken cancellationToken = default(CancellationToken)) { return this.ProcessStreamAsync( + diagnosticsContext: diagnosticsContext, streamPayload: null, operationType: OperationType.Read, requestOptions: requestOptions, cancellationToken: cancellationToken); } - public override Task ReplaceContainerStreamAsync( + public Task ReplaceContainerStreamAsync( + CosmosDiagnosticsContext diagnosticsContext, ContainerProperties containerProperties, ContainerRequestOptions requestOptions = null, CancellationToken cancellationToken = default(CancellationToken)) @@ -204,12 +221,15 @@ public override Task ReplaceContainerStreamAsync( this.ClientContext.ValidateResource(containerProperties.Id); return this.ReplaceStreamInternalAsync( + diagnosticsContext: diagnosticsContext, streamPayload: this.ClientContext.SerializerCore.ToStream(containerProperties), requestOptions: requestOptions, cancellationToken: cancellationToken); } - public override async Task> GetFeedRangesAsync(CancellationToken cancellationToken = default(CancellationToken)) + public async Task> GetFeedRangesAsync( + CosmosDiagnosticsContext diagnosticsContext, + CancellationToken cancellationToken = default(CancellationToken)) { PartitionKeyRangeCache partitionKeyRangeCache = await this.ClientContext.DocumentClient.GetPartitionKeyRangeCacheAsync(); string containerRId = await this.GetRIDAsync(cancellationToken); @@ -411,11 +431,13 @@ public override Task GetRoutingMapAsync(CancellationToken } private Task ReplaceStreamInternalAsync( + CosmosDiagnosticsContext diagnosticsContext, Stream streamPayload, ContainerRequestOptions requestOptions = null, CancellationToken cancellationToken = default(CancellationToken)) { return this.ProcessStreamAsync( + diagnosticsContext: diagnosticsContext, streamPayload: streamPayload, operationType: OperationType.Replace, requestOptions: requestOptions, @@ -423,12 +445,14 @@ private Task ReplaceStreamInternalAsync( } private Task ProcessStreamAsync( + CosmosDiagnosticsContext diagnosticsContext, Stream streamPayload, OperationType operationType, ContainerRequestOptions requestOptions = null, CancellationToken cancellationToken = default(CancellationToken)) { return this.ProcessResourceOperationStreamAsync( + diagnosticsContext: diagnosticsContext, streamPayload: streamPayload, operationType: operationType, linkUri: this.LinkUri, @@ -438,12 +462,13 @@ private Task ProcessStreamAsync( } private Task ProcessResourceOperationStreamAsync( - Stream streamPayload, - OperationType operationType, - Uri linkUri, - ResourceType resourceType, - RequestOptions requestOptions = null, - CancellationToken cancellationToken = default(CancellationToken)) + CosmosDiagnosticsContext diagnosticsContext, + Stream streamPayload, + OperationType operationType, + Uri linkUri, + ResourceType resourceType, + RequestOptions requestOptions = null, + CancellationToken cancellationToken = default(CancellationToken)) { return this.ClientContext.ProcessResourceOperationStreamAsync( resourceUri: linkUri, @@ -454,7 +479,7 @@ private Task ProcessResourceOperationStreamAsync( streamPayload: streamPayload, requestOptions: requestOptions, requestEnricher: null, - diagnosticsContext: null, + diagnosticsContext: diagnosticsContext, cancellationToken: cancellationToken); } } diff --git a/Microsoft.Azure.Cosmos/src/Resource/Container/ContainerInlineCore.cs b/Microsoft.Azure.Cosmos/src/Resource/Container/ContainerInlineCore.cs index 281480a795..841c100a0f 100644 --- a/Microsoft.Azure.Cosmos/src/Resource/Container/ContainerInlineCore.cs +++ b/Microsoft.Azure.Cosmos/src/Resource/Container/ContainerInlineCore.cs @@ -13,7 +13,7 @@ namespace Microsoft.Azure.Cosmos using Microsoft.Azure.Cosmos.Query.Core.QueryClient; // This class acts as a wrapper for environments that use SynchronizationContext. - internal sealed partial class ContainerInlineCore : ContainerCore + internal sealed class ContainerInlineCore : ContainerCore { internal ContainerInlineCore( CosmosClientContext clientContext, @@ -31,14 +31,20 @@ public override Task ReadContainerAsync( ContainerRequestOptions requestOptions = null, CancellationToken cancellationToken = default) { - return TaskHelper.RunInlineIfNeededAsync(() => base.ReadContainerAsync(requestOptions, cancellationToken)); + return this.ClientContext.OperationHelperAsync( + nameof(ReadContainerAsync), + requestOptions, + (diagnostics) => base.ReadContainerAsync(diagnostics, requestOptions, cancellationToken)); } public override Task ReadContainerStreamAsync( ContainerRequestOptions requestOptions = null, CancellationToken cancellationToken = default) { - return TaskHelper.RunInlineIfNeededAsync(() => base.ReadContainerStreamAsync(requestOptions, cancellationToken)); + return this.ClientContext.OperationHelperAsync( + nameof(ReadContainerStreamAsync), + requestOptions, + (diagnostics) => base.ReadContainerStreamAsync(diagnostics, requestOptions, cancellationToken)); } public override Task ReplaceContainerAsync( @@ -46,7 +52,10 @@ public override Task ReplaceContainerAsync( ContainerRequestOptions requestOptions = null, CancellationToken cancellationToken = default) { - return TaskHelper.RunInlineIfNeededAsync(() => base.ReplaceContainerAsync(containerProperties, requestOptions, cancellationToken)); + return this.ClientContext.OperationHelperAsync( + nameof(ReplaceContainerAsync), + requestOptions, + (diagnostics) => base.ReplaceContainerAsync(diagnostics, containerProperties, requestOptions, cancellationToken)); } public override Task ReplaceContainerStreamAsync( @@ -54,33 +63,48 @@ public override Task ReplaceContainerStreamAsync( ContainerRequestOptions requestOptions = null, CancellationToken cancellationToken = default) { - return TaskHelper.RunInlineIfNeededAsync(() => base.ReplaceContainerStreamAsync(containerProperties, requestOptions, cancellationToken)); + return this.ClientContext.OperationHelperAsync( + nameof(ReplaceContainerStreamAsync), + requestOptions, + (diagnostics) => base.ReplaceContainerStreamAsync(diagnostics, containerProperties, requestOptions, cancellationToken)); } public override Task DeleteContainerAsync( ContainerRequestOptions requestOptions = null, CancellationToken cancellationToken = default) { - return TaskHelper.RunInlineIfNeededAsync(() => base.DeleteContainerAsync(requestOptions, cancellationToken)); + return this.ClientContext.OperationHelperAsync( + nameof(DeleteContainerAsync), + requestOptions, + (diagnostics) => base.DeleteContainerAsync(diagnostics, requestOptions, cancellationToken)); } public override Task DeleteContainerStreamAsync( ContainerRequestOptions requestOptions = null, CancellationToken cancellationToken = default) { - return TaskHelper.RunInlineIfNeededAsync(() => base.DeleteContainerStreamAsync(requestOptions, cancellationToken)); + return this.ClientContext.OperationHelperAsync( + nameof(DeleteContainerStreamAsync), + requestOptions, + (diagnostics) => base.DeleteContainerStreamAsync(diagnostics, requestOptions, cancellationToken)); } public override Task ReadThroughputAsync(CancellationToken cancellationToken = default) { - return TaskHelper.RunInlineIfNeededAsync(() => base.ReadThroughputAsync(cancellationToken)); + return this.ClientContext.OperationHelperAsync( + nameof(ReadThroughputAsync), + null, + (diagnostics) => base.ReadThroughputAsync(diagnostics, cancellationToken)); } public override Task ReadThroughputAsync( RequestOptions requestOptions, CancellationToken cancellationToken = default) { - return TaskHelper.RunInlineIfNeededAsync(() => base.ReadThroughputAsync(requestOptions, cancellationToken)); + return this.ClientContext.OperationHelperAsync( + nameof(ReadThroughputAsync), + requestOptions, + (diagnostics) => base.ReadThroughputAsync(diagnostics, requestOptions, cancellationToken)); } public override Task ReplaceThroughputAsync( @@ -88,12 +112,37 @@ public override Task ReplaceThroughputAsync( RequestOptions requestOptions = null, CancellationToken cancellationToken = default) { - return TaskHelper.RunInlineIfNeededAsync(() => base.ReplaceThroughputAsync(throughput, requestOptions, cancellationToken)); + return this.ClientContext.OperationHelperAsync( + nameof(ReplaceThroughputAsync), + requestOptions, + (diagnostics) => base.ReplaceThroughputAsync(diagnostics, throughput, requestOptions, cancellationToken)); } - public override Task ReplaceThroughputAsync(ThroughputProperties throughputProperties, RequestOptions requestOptions = null, CancellationToken cancellationToken = default) + public override Task ReplaceThroughputAsync( + ThroughputProperties throughputProperties, + RequestOptions requestOptions = null, + CancellationToken cancellationToken = default) + { + return this.ClientContext.OperationHelperAsync( + nameof(ReplaceThroughputAsync), + requestOptions, + (diagnostics) => base.ReplaceThroughputAsync(diagnostics, throughputProperties, requestOptions, cancellationToken)); + } + + public override Task ReadThroughputIfExistsAsync(RequestOptions requestOptions, CancellationToken cancellationToken) { - return TaskHelper.RunInlineIfNeededAsync(() => base.ReplaceThroughputAsync(throughputProperties, requestOptions, cancellationToken)); + return this.ClientContext.OperationHelperAsync( + nameof(ReadThroughputIfExistsAsync), + requestOptions, + (diagnostics) => base.ReadThroughputIfExistsAsync(diagnostics, requestOptions, cancellationToken)); + } + + public override Task ReplaceThroughputIfExistsAsync(ThroughputProperties throughput, RequestOptions requestOptions, CancellationToken cancellationToken) + { + return this.ClientContext.OperationHelperAsync( + nameof(ReplaceThroughputIfExistsAsync), + requestOptions, + (diagnostics) => base.ReplaceThroughputIfExistsAsync(diagnostics, throughput, requestOptions, cancellationToken)); } public override Task CreateItemStreamAsync( @@ -102,7 +151,10 @@ public override Task CreateItemStreamAsync( ItemRequestOptions requestOptions = null, CancellationToken cancellationToken = default) { - return TaskHelper.RunInlineIfNeededAsync(() => base.CreateItemStreamAsync(streamPayload, partitionKey, requestOptions, cancellationToken)); + return this.ClientContext.OperationHelperAsync( + nameof(CreateItemStreamAsync), + requestOptions, + (diagnostics) => base.CreateItemStreamAsync(diagnostics, streamPayload, partitionKey, requestOptions, cancellationToken)); } public override Task> CreateItemAsync(T item, @@ -110,7 +162,10 @@ public override Task> CreateItemAsync(T item, ItemRequestOptions requestOptions = null, CancellationToken cancellationToken = default) { - return TaskHelper.RunInlineIfNeededAsync(() => base.CreateItemAsync(item, partitionKey, requestOptions, cancellationToken)); + return this.ClientContext.OperationHelperAsync( + nameof(CreateItemAsync), + requestOptions, + (diagnostics) => base.CreateItemAsync(diagnostics, item, partitionKey, requestOptions, cancellationToken)); } public override Task ReadItemStreamAsync( @@ -119,7 +174,10 @@ public override Task ReadItemStreamAsync( ItemRequestOptions requestOptions = null, CancellationToken cancellationToken = default) { - return TaskHelper.RunInlineIfNeededAsync(() => base.ReadItemStreamAsync(id, partitionKey, requestOptions, cancellationToken)); + return this.ClientContext.OperationHelperAsync( + nameof(ReadItemStreamAsync), + requestOptions, + (diagnostics) => base.ReadItemStreamAsync(diagnostics, id, partitionKey, requestOptions, cancellationToken)); } public override Task> ReadItemAsync( @@ -128,7 +186,10 @@ public override Task> ReadItemAsync( ItemRequestOptions requestOptions = null, CancellationToken cancellationToken = default) { - return TaskHelper.RunInlineIfNeededAsync(() => base.ReadItemAsync(id, partitionKey, requestOptions, cancellationToken)); + return this.ClientContext.OperationHelperAsync( + nameof(ReadItemAsync), + requestOptions, + (diagnostics) => base.ReadItemAsync(diagnostics, id, partitionKey, requestOptions, cancellationToken)); } public override Task UpsertItemStreamAsync( @@ -137,7 +198,10 @@ public override Task UpsertItemStreamAsync( ItemRequestOptions requestOptions = null, CancellationToken cancellationToken = default) { - return TaskHelper.RunInlineIfNeededAsync(() => base.UpsertItemStreamAsync(streamPayload, partitionKey, requestOptions, cancellationToken)); + return this.ClientContext.OperationHelperAsync( + nameof(UpsertItemStreamAsync), + requestOptions, + (diagnostics) => base.UpsertItemStreamAsync(diagnostics, streamPayload, partitionKey, requestOptions, cancellationToken)); } public override Task> UpsertItemAsync( @@ -146,7 +210,10 @@ public override Task> UpsertItemAsync( ItemRequestOptions requestOptions = null, CancellationToken cancellationToken = default) { - return TaskHelper.RunInlineIfNeededAsync(() => base.UpsertItemAsync(item, partitionKey, requestOptions, cancellationToken)); + return this.ClientContext.OperationHelperAsync( + nameof(UpsertItemAsync), + requestOptions, + (diagnostics) => base.UpsertItemAsync(diagnostics, item, partitionKey, requestOptions, cancellationToken)); } public override Task ReplaceItemStreamAsync( @@ -156,7 +223,10 @@ public override Task ReplaceItemStreamAsync( ItemRequestOptions requestOptions = null, CancellationToken cancellationToken = default) { - return TaskHelper.RunInlineIfNeededAsync(() => base.ReplaceItemStreamAsync(streamPayload, id, partitionKey, requestOptions, cancellationToken)); + return this.ClientContext.OperationHelperAsync( + nameof(ReplaceItemStreamAsync), + requestOptions, + (diagnostics) => base.ReplaceItemStreamAsync(diagnostics, streamPayload, id, partitionKey, requestOptions, cancellationToken)); } public override Task> ReplaceItemAsync( @@ -166,7 +236,10 @@ public override Task> ReplaceItemAsync( ItemRequestOptions requestOptions = null, CancellationToken cancellationToken = default) { - return TaskHelper.RunInlineIfNeededAsync(() => base.ReplaceItemAsync(item, id, partitionKey, requestOptions, cancellationToken)); + return this.ClientContext.OperationHelperAsync( + nameof(ReplaceItemAsync), + requestOptions, + (diagnostics) => base.ReplaceItemAsync(diagnostics, item, id, partitionKey, requestOptions, cancellationToken)); } public override Task DeleteItemStreamAsync( @@ -175,7 +248,10 @@ public override Task DeleteItemStreamAsync( ItemRequestOptions requestOptions = null, CancellationToken cancellationToken = default) { - return TaskHelper.RunInlineIfNeededAsync(() => base.DeleteItemStreamAsync(id, partitionKey, requestOptions, cancellationToken)); + return this.ClientContext.OperationHelperAsync( + nameof(DeleteItemStreamAsync), + requestOptions, + (diagnostics) => base.DeleteItemStreamAsync(diagnostics, id, partitionKey, requestOptions, cancellationToken)); } public override Task> DeleteItemAsync( @@ -184,13 +260,16 @@ public override Task> DeleteItemAsync( ItemRequestOptions requestOptions = null, CancellationToken cancellationToken = default) { - return TaskHelper.RunInlineIfNeededAsync(() => base.DeleteItemAsync(id, partitionKey, requestOptions, cancellationToken)); + return this.ClientContext.OperationHelperAsync( + nameof(DeleteItemAsync), + requestOptions, + (diagnostics) => base.DeleteItemAsync(diagnostics, id, partitionKey, requestOptions, cancellationToken)); } public override FeedIterator GetItemQueryStreamIterator( - QueryDefinition queryDefinition, - string continuationToken = null, - QueryRequestOptions requestOptions = null) + QueryDefinition queryDefinition, + string continuationToken = null, + QueryRequestOptions requestOptions = null) { return new FeedIteratorInlineCore(base.GetItemQueryStreamIterator( queryDefinition, @@ -261,7 +340,10 @@ public override TransactionalBatch CreateTransactionalBatch(PartitionKey partiti public override Task> GetFeedRangesAsync(CancellationToken cancellationToken = default(CancellationToken)) { - return TaskHelper.RunInlineIfNeededAsync(() => base.GetFeedRangesAsync(cancellationToken)); + return this.ClientContext.OperationHelperAsync( + nameof(GetFeedRangesAsync), + null, + (diagnostics) => base.GetFeedRangesAsync(diagnostics, cancellationToken)); } public override FeedIterator GetChangeFeedStreamIterator( @@ -310,7 +392,10 @@ public override Task> GetPartitionKeyRangesAsync( FeedRange feedRange, CancellationToken cancellationToken = default(CancellationToken)) { - return TaskHelper.RunInlineIfNeededAsync(() => base.GetPartitionKeyRangesAsync(feedRange, cancellationToken)); + return this.ClientContext.OperationHelperAsync( + nameof(GetPartitionKeyRangesAsync), + null, + (diagnostics) => base.GetPartitionKeyRangesAsync(feedRange, cancellationToken)); } public override FeedIterator GetItemQueryStreamIterator( diff --git a/Microsoft.Azure.Cosmos/src/Resource/Database/DatabaseCore.cs b/Microsoft.Azure.Cosmos/src/Resource/Database/DatabaseCore.cs index 6247b46f7e..41650cbd8a 100644 --- a/Microsoft.Azure.Cosmos/src/Resource/Database/DatabaseCore.cs +++ b/Microsoft.Azure.Cosmos/src/Resource/Database/DatabaseCore.cs @@ -184,53 +184,53 @@ public async Task CreateContainerIfNotExistsAsync( this.ValidateContainerProperties(containerProperties); - ContainerInternal container = (ContainerInternal)this.GetContainer(containerProperties.Id); - ResponseMessage readResponse = await container.ReadContainerStreamAsync( - cancellationToken: cancellationToken); - - if (readResponse.StatusCode != HttpStatusCode.NotFound) + ContainerCore container = (ContainerCore)this.GetContainer(containerProperties.Id); + using (ResponseMessage readResponse = await container.ReadContainerStreamAsync( + diagnosticsContext: diagnosticsContext, + cancellationToken: cancellationToken)) { - ContainerResponse retrivedContainerResponse = this.ClientContext.ResponseFactory.CreateContainerResponse( - container, - readResponse); - if (!retrivedContainerResponse.Resource.PartitionKeyPath.Equals(containerProperties.PartitionKeyPath)) + if (readResponse.StatusCode != HttpStatusCode.NotFound) { - throw new ArgumentException( - string.Format( - ClientResources.PartitionKeyPathConflict, - containerProperties.PartitionKeyPath, - containerProperties.Id, - retrivedContainerResponse.Resource.PartitionKeyPath), - nameof(containerProperties.PartitionKey)); + ContainerResponse retrivedContainerResponse = this.ClientContext.ResponseFactory.CreateContainerResponse( + container, + readResponse); + if (!retrivedContainerResponse.Resource.PartitionKeyPath.Equals(containerProperties.PartitionKeyPath)) + { + throw new ArgumentException( + string.Format( + ClientResources.PartitionKeyPathConflict, + containerProperties.PartitionKeyPath, + containerProperties.Id, + retrivedContainerResponse.Resource.PartitionKeyPath), + nameof(containerProperties.PartitionKey)); + } + + return retrivedContainerResponse; } - - return retrivedContainerResponse; } this.ValidateContainerProperties(containerProperties); - ResponseMessage createResponse = await this.CreateContainerStreamAsync( + using (ResponseMessage createResponse = await this.CreateContainerStreamAsync( diagnosticsContext, containerProperties, throughputProperties, requestOptions, - cancellationToken); - - // Merge the previous message diagnostics - createResponse.DiagnosticsContext.AddDiagnosticsInternal(readResponse.DiagnosticsContext); - - if (readResponse.StatusCode != HttpStatusCode.Conflict) + cancellationToken)) { - return this.ClientContext.ResponseFactory.CreateContainerResponse(container, createResponse); + if (createResponse.StatusCode != HttpStatusCode.Conflict) + { + return this.ClientContext.ResponseFactory.CreateContainerResponse(container, createResponse); + } } // This second Read is to handle the race condition when 2 or more threads have Read the database and only one succeeds with Create // so for the remaining ones we should do a Read instead of throwing Conflict exception - ResponseMessage readResponseAfterCreate = await container.ReadContainerStreamAsync( - cancellationToken: cancellationToken); - - // Merge the previous message diagnostics - createResponse.DiagnosticsContext.AddDiagnosticsInternal(readResponse.DiagnosticsContext); - return this.ClientContext.ResponseFactory.CreateContainerResponse(container, readResponseAfterCreate); + using (ResponseMessage readResponseAfterCreate = await container.ReadContainerStreamAsync( + diagnosticsContext: diagnosticsContext, + cancellationToken: cancellationToken)) + { + return this.ClientContext.ResponseFactory.CreateContainerResponse(container, readResponseAfterCreate); + } } public async Task ReplaceThroughputAsync( @@ -344,7 +344,7 @@ public Task CreateContainerAsync( cancellationToken); } - public async Task CreateContainerIfNotExistsAsync( + public Task CreateContainerIfNotExistsAsync( CosmosDiagnosticsContext diagnosticsContext, ContainerProperties containerProperties, int? throughput, @@ -356,63 +356,12 @@ public async Task CreateContainerIfNotExistsAsync( throw new ArgumentNullException(nameof(containerProperties)); } - this.ValidateContainerProperties(containerProperties); - - ContainerInternal container = (ContainerInternal)this.GetContainer(containerProperties.Id); - ResponseMessage readResponse = await this.ProcessResourceOperationStreamAsync( + return this.CreateContainerIfNotExistsAsync( diagnosticsContext: diagnosticsContext, - streamPayload: null, - operationType: OperationType.Read, - linkUri: container.LinkUri, - resourceType: ResourceType.Collection, - requestOptions: requestOptions, - cancellationToken: cancellationToken); - - if (readResponse.StatusCode != HttpStatusCode.NotFound) - { - ContainerResponse retrivedContainerResponse = this.ClientContext.ResponseFactory.CreateContainerResponse( - container, - readResponse); - if (!retrivedContainerResponse.Resource.PartitionKeyPath.Equals(containerProperties.PartitionKeyPath)) - { - throw new ArgumentException( - string.Format( - ClientResources.PartitionKeyPathConflict, - containerProperties.PartitionKeyPath, - containerProperties.Id, - retrivedContainerResponse.Resource.PartitionKeyPath), - nameof(containerProperties.PartitionKey)); - } - - return retrivedContainerResponse; - } - - this.ValidateContainerProperties(containerProperties); - ResponseMessage createResponse = await this.CreateContainerStreamAsync( - diagnosticsContext, containerProperties, - throughput, + ThroughputProperties.CreateManualThroughput(throughput), requestOptions, cancellationToken); - - if (readResponse.StatusCode != HttpStatusCode.Conflict) - { - return this.ClientContext.ResponseFactory.CreateContainerResponse(container, createResponse); - } - - // This second Read is to handle the race condition when 2 or more threads have Read the database and only one succeeds with Create - // so for the remaining ones we should do a Read instead of throwing Conflict exception - ResponseMessage readResponseAfterCreate = await this.ProcessResourceOperationStreamAsync( - diagnosticsContext: diagnosticsContext, - streamPayload: null, - operationType: OperationType.Read, - linkUri: container.LinkUri, - resourceType: ResourceType.Collection, - requestOptions: requestOptions, - cancellationToken: cancellationToken); - - // Merge the previous message diagnostics - return this.ClientContext.ResponseFactory.CreateContainerResponse(container, readResponseAfterCreate); } public Task CreateContainerIfNotExistsAsync( diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/Utils/DiagnosticValidators.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/Utils/DiagnosticValidators.cs index dbb49b5d15..add0d49f4d 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/Utils/DiagnosticValidators.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/Utils/DiagnosticValidators.cs @@ -51,7 +51,7 @@ internal static void ValidateCosmosDiagnosticsContext( CosmosDiagnosticsContext cosmosDiagnosticsContext) { Assert.IsTrue((cosmosDiagnosticsContext.StartUtc - DateTime.UtcNow) < TimeSpan.FromHours(12), $"Start Time is not valid {cosmosDiagnosticsContext.StartUtc}"); - Assert.AreNotEqual(cosmosDiagnosticsContext.UserAgent.ToString(), new UserAgentContainer().UserAgent.ToString(), "User agent not set"); + Assert.IsTrue(cosmosDiagnosticsContext.UserAgent.ToString().Contains("cosmos-netstandard-sdk")); Assert.IsTrue(cosmosDiagnosticsContext.GetTotalRequestCount() > 0, "No request found"); Assert.IsTrue(cosmosDiagnosticsContext.IsComplete(), "OverallClientRequestTime should be stopped"); Assert.IsTrue(cosmosDiagnosticsContext.GetRunningElapsedTime() > TimeSpan.Zero, "OverallClientRequestTime should have time."); @@ -62,7 +62,7 @@ internal static void ValidateCosmosDiagnosticsContext( Assert.IsNotNull(jObject["DiagnosticVersion"].ToString()); JToken summary = jObject["Summary"]; Assert.IsNotNull(summary["UserAgent"].ToString()); - Assert.AreNotEqual(summary["UserAgent"].ToString(), new UserAgentContainer().UserAgent); + Assert.IsTrue(summary["UserAgent"].ToString().Contains("cosmos-netstandard-sdk")); Assert.IsNotNull(summary["StartUtc"].ToString()); Assert.IsNotNull(summary["TotalElapsedTimeInMs"].ToString()); } diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/CosmosItemUnitTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/CosmosItemUnitTests.cs index 870e5b995b..16cee3c6aa 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/CosmosItemUnitTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/CosmosItemUnitTests.cs @@ -474,6 +474,26 @@ public async Task TestNestedPartitionKeyValueFromStreamAsync() mockContext.Setup(x => x.ResponseFactory).Returns(context.ResponseFactory); mockContext.Setup(x => x.SerializerCore).Returns(context.SerializerCore); mockContext.Setup(x => x.DocumentClient).Returns(context.DocumentClient); + + mockContext.Setup(x => x.CreateDiagnosticContext( + It.IsAny(), + It.IsAny())) + .Returns((x, y) => new CosmosDiagnosticsContextCore(x, "MockUserAgentString")); + + mockContext.Setup(x => x.OperationHelperAsync( + It.IsAny(), + It.IsAny(), + It.IsAny>>())) + .Returns>>( + (x, y, z) => z(new CosmosDiagnosticsContextCore(x, "MockUserAgentString"))); + + mockContext.Setup(x => x.OperationHelperAsync>( + It.IsAny(), + It.IsAny(), + It.IsAny>>>())) + .Returns>>>( + (x, y, z) => z(new CosmosDiagnosticsContextCore(x, "MockUserAgentString"))); + mockContext.Setup(x => x.ProcessResourceOperationStreamAsync( It.IsAny(), It.IsAny(),