diff --git a/Microsoft.Azure.Cosmos/src/Resource/Container/ContainerCore.Items.cs b/Microsoft.Azure.Cosmos/src/Resource/Container/ContainerCore.Items.cs
index 312a907b35..47e850b44c 100644
--- a/Microsoft.Azure.Cosmos/src/Resource/Container/ContainerCore.Items.cs
+++ b/Microsoft.Azure.Cosmos/src/Resource/Container/ContainerCore.Items.cs
@@ -29,7 +29,7 @@ namespace Microsoft.Azure.Cosmos
/// 1. The object operations where it serializes and deserializes the item on request/response
/// 2. The stream response which takes a Stream containing a JSON serialized object and returns a response containing a Stream
///
- internal partial class ContainerCore : ContainerInternal
+ internal abstract partial class ContainerCore : ContainerInternal
{
///
/// Cache the full URI segment without the last resource id.
@@ -39,27 +39,25 @@ internal partial class ContainerCore : ContainerInternal
private readonly CosmosQueryClient queryClient;
- public override async Task CreateItemStreamAsync(
- Stream streamPayload,
- PartitionKey partitionKey,
- ItemRequestOptions requestOptions = null,
- CancellationToken cancellationToken = default(CancellationToken))
+ public async Task CreateItemStreamAsync(
+ CosmosDiagnosticsContext diagnosticsContext,
+ Stream streamPayload,
+ PartitionKey partitionKey,
+ ItemRequestOptions requestOptions = null,
+ CancellationToken cancellationToken = default(CancellationToken))
{
- CosmosDiagnosticsContext diagnosticsContext = CosmosDiagnosticsContext.Create(requestOptions);
- using (diagnosticsContext.GetOverallScope())
- {
- return await this.ProcessItemStreamAsync(
- partitionKey: partitionKey,
- itemId: null,
- streamPayload: streamPayload,
- operationType: OperationType.Create,
- requestOptions: requestOptions,
- diagnosticsContext: diagnosticsContext,
- cancellationToken: cancellationToken);
- }
+ return await this.ProcessItemStreamAsync(
+ partitionKey: partitionKey,
+ itemId: null,
+ streamPayload: streamPayload,
+ operationType: OperationType.Create,
+ requestOptions: requestOptions,
+ diagnosticsContext: diagnosticsContext,
+ cancellationToken: cancellationToken);
}
- public override async Task> CreateItemAsync(
+ public async Task> CreateItemAsync(
+ CosmosDiagnosticsContext diagnosticsContext,
T item,
PartitionKey? partitionKey = null,
ItemRequestOptions requestOptions = null,
@@ -70,85 +68,73 @@ public override async Task> CreateItemAsync(
throw new ArgumentNullException(nameof(item));
}
- CosmosDiagnosticsContext diagnosticsContext = CosmosDiagnosticsContext.Create(requestOptions);
- using (diagnosticsContext.GetOverallScope())
- {
- ResponseMessage response = await this.ExtractPartitionKeyAndProcessItemStreamAsync(
- partitionKey: partitionKey,
- itemId: null,
- item: item,
- operationType: OperationType.Create,
- requestOptions: requestOptions,
- diagnosticsContext: diagnosticsContext,
- cancellationToken: cancellationToken);
+ ResponseMessage response = await this.ExtractPartitionKeyAndProcessItemStreamAsync(
+ partitionKey: partitionKey,
+ itemId: null,
+ item: item,
+ operationType: OperationType.Create,
+ requestOptions: requestOptions,
+ diagnosticsContext: diagnosticsContext,
+ cancellationToken: cancellationToken);
- return this.ClientContext.ResponseFactory.CreateItemResponse(response);
- }
+ return this.ClientContext.ResponseFactory.CreateItemResponse(response);
}
- public override async Task ReadItemStreamAsync(
- string id,
- PartitionKey partitionKey,
- ItemRequestOptions requestOptions = null,
- CancellationToken cancellationToken = default(CancellationToken))
+ public async Task ReadItemStreamAsync(
+ CosmosDiagnosticsContext diagnosticsContext,
+ string id,
+ PartitionKey partitionKey,
+ ItemRequestOptions requestOptions = null,
+ CancellationToken cancellationToken = default(CancellationToken))
{
- CosmosDiagnosticsContext diagnosticsContext = CosmosDiagnosticsContext.Create(requestOptions);
- using (diagnosticsContext.GetOverallScope())
- {
- return await this.ProcessItemStreamAsync(
- partitionKey: partitionKey,
- itemId: id,
- streamPayload: null,
- operationType: OperationType.Read,
- requestOptions: requestOptions,
- diagnosticsContext: diagnosticsContext,
- cancellationToken: cancellationToken);
- }
+ return await this.ProcessItemStreamAsync(
+ partitionKey: partitionKey,
+ itemId: id,
+ streamPayload: null,
+ operationType: OperationType.Read,
+ requestOptions: requestOptions,
+ diagnosticsContext: diagnosticsContext,
+ cancellationToken: cancellationToken);
}
- public override async Task> ReadItemAsync(
+ public async Task> ReadItemAsync(
+ CosmosDiagnosticsContext diagnosticsContext,
string id,
PartitionKey partitionKey,
ItemRequestOptions requestOptions = null,
CancellationToken cancellationToken = default(CancellationToken))
{
- CosmosDiagnosticsContext diagnosticsContext = CosmosDiagnosticsContext.Create(requestOptions);
- using (diagnosticsContext.GetOverallScope())
- {
- ResponseMessage response = await this.ProcessItemStreamAsync(
- partitionKey: partitionKey,
- itemId: id,
- streamPayload: null,
- operationType: OperationType.Read,
- requestOptions: requestOptions,
- diagnosticsContext: diagnosticsContext,
- cancellationToken: cancellationToken);
+ ResponseMessage response = await this.ProcessItemStreamAsync(
+ partitionKey: partitionKey,
+ itemId: id,
+ streamPayload: null,
+ operationType: OperationType.Read,
+ requestOptions: requestOptions,
+ diagnosticsContext: diagnosticsContext,
+ cancellationToken: cancellationToken);
- return this.ClientContext.ResponseFactory.CreateItemResponse(response);
- }
+ return this.ClientContext.ResponseFactory.CreateItemResponse(response);
}
- public override async Task UpsertItemStreamAsync(
- Stream streamPayload,
- PartitionKey partitionKey,
- ItemRequestOptions requestOptions = null,
- CancellationToken cancellationToken = default(CancellationToken))
+ public async Task UpsertItemStreamAsync(
+ CosmosDiagnosticsContext diagnosticsContext,
+ Stream streamPayload,
+ PartitionKey partitionKey,
+ ItemRequestOptions requestOptions = null,
+ CancellationToken cancellationToken = default(CancellationToken))
{
- CosmosDiagnosticsContext diagnosticsContext = CosmosDiagnosticsContext.Create(requestOptions);
- using (diagnosticsContext.GetOverallScope())
- {
- return await this.ProcessItemStreamAsync(
- partitionKey: partitionKey,
- itemId: null,
- streamPayload: streamPayload,
- operationType: OperationType.Upsert,
- requestOptions: requestOptions,
- diagnosticsContext: diagnosticsContext,
- cancellationToken: cancellationToken);
- }
+ return await this.ProcessItemStreamAsync(
+ partitionKey: partitionKey,
+ itemId: null,
+ streamPayload: streamPayload,
+ operationType: OperationType.Upsert,
+ requestOptions: requestOptions,
+ diagnosticsContext: diagnosticsContext,
+ cancellationToken: cancellationToken);
}
- public override async Task> UpsertItemAsync(
+ public async Task> UpsertItemAsync(
+ CosmosDiagnosticsContext diagnosticsContext,
T item,
PartitionKey? partitionKey = null,
ItemRequestOptions requestOptions = null,
@@ -159,44 +145,38 @@ public override async Task> UpsertItemAsync(
throw new ArgumentNullException(nameof(item));
}
- CosmosDiagnosticsContext diagnosticsContext = CosmosDiagnosticsContext.Create(requestOptions);
- using (diagnosticsContext.GetOverallScope())
- {
- ResponseMessage response = await this.ExtractPartitionKeyAndProcessItemStreamAsync(
- partitionKey: partitionKey,
- itemId: null,
- item: item,
- operationType: OperationType.Upsert,
- requestOptions: requestOptions,
- diagnosticsContext: diagnosticsContext,
- cancellationToken: cancellationToken);
+ ResponseMessage response = await this.ExtractPartitionKeyAndProcessItemStreamAsync(
+ partitionKey: partitionKey,
+ itemId: null,
+ item: item,
+ operationType: OperationType.Upsert,
+ requestOptions: requestOptions,
+ diagnosticsContext: diagnosticsContext,
+ cancellationToken: cancellationToken);
- return this.ClientContext.ResponseFactory.CreateItemResponse(response);
- }
+ return this.ClientContext.ResponseFactory.CreateItemResponse(response);
}
- public override async Task ReplaceItemStreamAsync(
- Stream streamPayload,
- string id,
- PartitionKey partitionKey,
- ItemRequestOptions requestOptions = null,
- CancellationToken cancellationToken = default(CancellationToken))
+ public async Task ReplaceItemStreamAsync(
+ CosmosDiagnosticsContext diagnosticsContext,
+ Stream streamPayload,
+ string id,
+ PartitionKey partitionKey,
+ ItemRequestOptions requestOptions = null,
+ CancellationToken cancellationToken = default(CancellationToken))
{
- CosmosDiagnosticsContext diagnosticsContext = CosmosDiagnosticsContext.Create(requestOptions);
- using (diagnosticsContext.GetOverallScope())
- {
- return await this.ProcessItemStreamAsync(
- partitionKey: partitionKey,
- itemId: id,
- streamPayload: streamPayload,
- operationType: OperationType.Replace,
- requestOptions: requestOptions,
- diagnosticsContext: diagnosticsContext,
- cancellationToken: cancellationToken);
- }
+ return await this.ProcessItemStreamAsync(
+ partitionKey: partitionKey,
+ itemId: id,
+ streamPayload: streamPayload,
+ operationType: OperationType.Replace,
+ requestOptions: requestOptions,
+ diagnosticsContext: diagnosticsContext,
+ cancellationToken: cancellationToken);
}
- public override async Task> ReplaceItemAsync(
+ public async Task> ReplaceItemAsync(
+ CosmosDiagnosticsContext diagnosticsContext,
T item,
string id,
PartitionKey? partitionKey = null,
@@ -213,68 +193,58 @@ public override async Task> ReplaceItemAsync(
throw new ArgumentNullException(nameof(item));
}
- CosmosDiagnosticsContext diagnosticsContext = CosmosDiagnosticsContext.Create(requestOptions);
- using (diagnosticsContext.GetOverallScope())
- {
- ResponseMessage response = await this.ExtractPartitionKeyAndProcessItemStreamAsync(
- partitionKey: partitionKey,
- itemId: id,
- item: item,
- operationType: OperationType.Replace,
- requestOptions: requestOptions,
- diagnosticsContext: diagnosticsContext,
- cancellationToken: cancellationToken);
+ ResponseMessage response = await this.ExtractPartitionKeyAndProcessItemStreamAsync(
+ partitionKey: partitionKey,
+ itemId: id,
+ item: item,
+ operationType: OperationType.Replace,
+ requestOptions: requestOptions,
+ diagnosticsContext: diagnosticsContext,
+ cancellationToken: cancellationToken);
- return this.ClientContext.ResponseFactory.CreateItemResponse(response);
- }
+ return this.ClientContext.ResponseFactory.CreateItemResponse(response);
}
- public override async Task DeleteItemStreamAsync(
- string id,
- PartitionKey partitionKey,
- ItemRequestOptions requestOptions = null,
- CancellationToken cancellationToken = default(CancellationToken))
+ public async Task DeleteItemStreamAsync(
+ CosmosDiagnosticsContext diagnosticsContext,
+ string id,
+ PartitionKey partitionKey,
+ ItemRequestOptions requestOptions = null,
+ CancellationToken cancellationToken = default(CancellationToken))
{
- CosmosDiagnosticsContext diagnosticsContext = CosmosDiagnosticsContext.Create(requestOptions);
- using (diagnosticsContext.GetOverallScope())
- {
- return await this.ProcessItemStreamAsync(
- partitionKey: partitionKey,
- itemId: id,
- streamPayload: null,
- operationType: OperationType.Delete,
- requestOptions: requestOptions,
- diagnosticsContext: diagnosticsContext,
- cancellationToken: cancellationToken);
- }
+ return await this.ProcessItemStreamAsync(
+ partitionKey: partitionKey,
+ itemId: id,
+ streamPayload: null,
+ operationType: OperationType.Delete,
+ requestOptions: requestOptions,
+ diagnosticsContext: diagnosticsContext,
+ cancellationToken: cancellationToken);
}
- public override async Task> DeleteItemAsync(
+ public async Task> DeleteItemAsync(
+ CosmosDiagnosticsContext diagnosticsContext,
string id,
PartitionKey partitionKey,
ItemRequestOptions requestOptions = null,
CancellationToken cancellationToken = default(CancellationToken))
{
- CosmosDiagnosticsContext diagnosticsContext = CosmosDiagnosticsContext.Create(requestOptions);
- using (diagnosticsContext.GetOverallScope())
- {
- ResponseMessage response = await this.ProcessItemStreamAsync(
- partitionKey: partitionKey,
- itemId: id,
- streamPayload: null,
- operationType: OperationType.Delete,
- requestOptions: requestOptions,
- diagnosticsContext: diagnosticsContext,
- cancellationToken: cancellationToken);
+ ResponseMessage response = await this.ProcessItemStreamAsync(
+ partitionKey: partitionKey,
+ itemId: id,
+ streamPayload: null,
+ operationType: OperationType.Delete,
+ requestOptions: requestOptions,
+ diagnosticsContext: diagnosticsContext,
+ cancellationToken: cancellationToken);
- return this.ClientContext.ResponseFactory.CreateItemResponse(response);
- }
+ return this.ClientContext.ResponseFactory.CreateItemResponse(response);
}
public override FeedIterator GetItemQueryStreamIterator(
- string queryText = null,
- string continuationToken = null,
- QueryRequestOptions requestOptions = null)
+ string queryText = null,
+ string continuationToken = null,
+ QueryRequestOptions requestOptions = null)
{
QueryDefinition queryDefinition = null;
if (queryText != null)
diff --git a/Microsoft.Azure.Cosmos/src/Resource/Container/ContainerCore.cs b/Microsoft.Azure.Cosmos/src/Resource/Container/ContainerCore.cs
index 10482bfea0..e87dccd915 100644
--- a/Microsoft.Azure.Cosmos/src/Resource/Container/ContainerCore.cs
+++ b/Microsoft.Azure.Cosmos/src/Resource/Container/ContainerCore.cs
@@ -21,7 +21,7 @@ namespace Microsoft.Azure.Cosmos
///
/// for creating new containers, and reading/querying all containers;
///
- internal partial class ContainerCore : ContainerInternal
+ internal abstract partial class ContainerCore : ContainerInternal
{
private readonly Lazy lazyBatchExecutor;
@@ -60,18 +60,21 @@ protected ContainerCore(
public override Scripts.Scripts Scripts { get; }
- public override async Task ReadContainerAsync(
+ public async Task ReadContainerAsync(
+ CosmosDiagnosticsContext diagnosticsContext,
ContainerRequestOptions requestOptions = null,
CancellationToken cancellationToken = default(CancellationToken))
{
ResponseMessage response = await this.ReadContainerStreamAsync(
+ diagnosticsContext: diagnosticsContext,
requestOptions: requestOptions,
cancellationToken: cancellationToken);
return this.ClientContext.ResponseFactory.CreateContainerResponse(this, response);
}
- public override async Task ReplaceContainerAsync(
+ public async Task ReplaceContainerAsync(
+ CosmosDiagnosticsContext diagnosticsContext,
ContainerProperties containerProperties,
ContainerRequestOptions requestOptions = null,
CancellationToken cancellationToken = default(CancellationToken))
@@ -83,6 +86,7 @@ public override async Task ReplaceContainerAsync(
this.ClientContext.ValidateResource(containerProperties.Id);
ResponseMessage response = await this.ReplaceStreamInternalAsync(
+ diagnosticsContext: diagnosticsContext,
streamPayload: this.ClientContext.SerializerCore.ToStream(containerProperties),
requestOptions: requestOptions,
cancellationToken: cancellationToken);
@@ -90,25 +94,29 @@ public override async Task ReplaceContainerAsync(
return this.ClientContext.ResponseFactory.CreateContainerResponse(this, response);
}
- public override async Task DeleteContainerAsync(
+ public async Task DeleteContainerAsync(
+ CosmosDiagnosticsContext diagnosticsContext,
ContainerRequestOptions requestOptions = null,
CancellationToken cancellationToken = default(CancellationToken))
{
ResponseMessage response = await this.DeleteContainerStreamAsync(
+ diagnosticsContext: diagnosticsContext,
requestOptions: requestOptions,
cancellationToken: cancellationToken);
return this.ClientContext.ResponseFactory.CreateContainerResponse(this, response);
}
- public override async Task ReadThroughputAsync(
+ public async Task ReadThroughputAsync(
+ CosmosDiagnosticsContext diagnosticsContext,
CancellationToken cancellationToken = default(CancellationToken))
{
ThroughputResponse response = await this.ReadThroughputIfExistsAsync(null, cancellationToken);
return response.Resource?.Throughput;
}
- public override async Task ReadThroughputAsync(
+ public async Task ReadThroughputAsync(
+ CosmosDiagnosticsContext diagnosticsContext,
RequestOptions requestOptions,
CancellationToken cancellationToken = default(CancellationToken))
{
@@ -117,7 +125,8 @@ public override async Task ReadThroughputAsync(
return await cosmosOffers.ReadThroughputAsync(rid, requestOptions, cancellationToken);
}
- public override async Task ReadThroughputIfExistsAsync(
+ public async Task ReadThroughputIfExistsAsync(
+ CosmosDiagnosticsContext diagnosticsContext,
RequestOptions requestOptions,
CancellationToken cancellationToken = default(CancellationToken))
{
@@ -126,7 +135,8 @@ public override async Task ReadThroughputIfExistsAsync(
return await cosmosOffers.ReadThroughputIfExistsAsync(rid, requestOptions, cancellationToken);
}
- public override async Task ReplaceThroughputAsync(
+ public async Task ReplaceThroughputAsync(
+ CosmosDiagnosticsContext diagnosticsContext,
int throughput,
RequestOptions requestOptions = null,
CancellationToken cancellationToken = default(CancellationToken))
@@ -141,7 +151,8 @@ public override async Task ReplaceThroughputAsync(
cancellationToken: cancellationToken);
}
- public override async Task ReplaceThroughputIfExistsAsync(
+ public async Task ReplaceThroughputIfExistsAsync(
+ CosmosDiagnosticsContext diagnosticsContext,
ThroughputProperties throughput,
RequestOptions requestOptions = null,
CancellationToken cancellationToken = default(CancellationToken))
@@ -156,7 +167,8 @@ public override async Task ReplaceThroughputIfExistsAsync(
cancellationToken: cancellationToken);
}
- public override async Task ReplaceThroughputAsync(
+ public async Task ReplaceThroughputAsync(
+ CosmosDiagnosticsContext diagnosticsContext,
ThroughputProperties throughputProperties,
RequestOptions requestOptions = null,
CancellationToken cancellationToken = default)
@@ -170,29 +182,34 @@ public override async Task ReplaceThroughputAsync(
cancellationToken);
}
- public override Task DeleteContainerStreamAsync(
+ public Task DeleteContainerStreamAsync(
+ CosmosDiagnosticsContext diagnosticsContext,
ContainerRequestOptions requestOptions = null,
CancellationToken cancellationToken = default(CancellationToken))
{
return this.ProcessStreamAsync(
- streamPayload: null,
- operationType: OperationType.Delete,
- requestOptions: requestOptions,
- cancellationToken: cancellationToken);
+ diagnosticsContext: diagnosticsContext,
+ streamPayload: null,
+ operationType: OperationType.Delete,
+ requestOptions: requestOptions,
+ cancellationToken: cancellationToken);
}
- public override Task ReadContainerStreamAsync(
+ public Task ReadContainerStreamAsync(
+ CosmosDiagnosticsContext diagnosticsContext,
ContainerRequestOptions requestOptions = null,
CancellationToken cancellationToken = default(CancellationToken))
{
return this.ProcessStreamAsync(
+ diagnosticsContext: diagnosticsContext,
streamPayload: null,
operationType: OperationType.Read,
requestOptions: requestOptions,
cancellationToken: cancellationToken);
}
- public override Task ReplaceContainerStreamAsync(
+ public Task ReplaceContainerStreamAsync(
+ CosmosDiagnosticsContext diagnosticsContext,
ContainerProperties containerProperties,
ContainerRequestOptions requestOptions = null,
CancellationToken cancellationToken = default(CancellationToken))
@@ -204,12 +221,15 @@ public override Task ReplaceContainerStreamAsync(
this.ClientContext.ValidateResource(containerProperties.Id);
return this.ReplaceStreamInternalAsync(
+ diagnosticsContext: diagnosticsContext,
streamPayload: this.ClientContext.SerializerCore.ToStream(containerProperties),
requestOptions: requestOptions,
cancellationToken: cancellationToken);
}
- public override async Task> GetFeedRangesAsync(CancellationToken cancellationToken = default(CancellationToken))
+ public async Task> GetFeedRangesAsync(
+ CosmosDiagnosticsContext diagnosticsContext,
+ CancellationToken cancellationToken = default(CancellationToken))
{
PartitionKeyRangeCache partitionKeyRangeCache = await this.ClientContext.DocumentClient.GetPartitionKeyRangeCacheAsync();
string containerRId = await this.GetRIDAsync(cancellationToken);
@@ -411,11 +431,13 @@ public override Task GetRoutingMapAsync(CancellationToken
}
private Task ReplaceStreamInternalAsync(
+ CosmosDiagnosticsContext diagnosticsContext,
Stream streamPayload,
ContainerRequestOptions requestOptions = null,
CancellationToken cancellationToken = default(CancellationToken))
{
return this.ProcessStreamAsync(
+ diagnosticsContext: diagnosticsContext,
streamPayload: streamPayload,
operationType: OperationType.Replace,
requestOptions: requestOptions,
@@ -423,12 +445,14 @@ private Task ReplaceStreamInternalAsync(
}
private Task ProcessStreamAsync(
+ CosmosDiagnosticsContext diagnosticsContext,
Stream streamPayload,
OperationType operationType,
ContainerRequestOptions requestOptions = null,
CancellationToken cancellationToken = default(CancellationToken))
{
return this.ProcessResourceOperationStreamAsync(
+ diagnosticsContext: diagnosticsContext,
streamPayload: streamPayload,
operationType: operationType,
linkUri: this.LinkUri,
@@ -438,12 +462,13 @@ private Task ProcessStreamAsync(
}
private Task ProcessResourceOperationStreamAsync(
- Stream streamPayload,
- OperationType operationType,
- Uri linkUri,
- ResourceType resourceType,
- RequestOptions requestOptions = null,
- CancellationToken cancellationToken = default(CancellationToken))
+ CosmosDiagnosticsContext diagnosticsContext,
+ Stream streamPayload,
+ OperationType operationType,
+ Uri linkUri,
+ ResourceType resourceType,
+ RequestOptions requestOptions = null,
+ CancellationToken cancellationToken = default(CancellationToken))
{
return this.ClientContext.ProcessResourceOperationStreamAsync(
resourceUri: linkUri,
@@ -454,7 +479,7 @@ private Task ProcessResourceOperationStreamAsync(
streamPayload: streamPayload,
requestOptions: requestOptions,
requestEnricher: null,
- diagnosticsContext: null,
+ diagnosticsContext: diagnosticsContext,
cancellationToken: cancellationToken);
}
}
diff --git a/Microsoft.Azure.Cosmos/src/Resource/Container/ContainerInlineCore.cs b/Microsoft.Azure.Cosmos/src/Resource/Container/ContainerInlineCore.cs
index 281480a795..841c100a0f 100644
--- a/Microsoft.Azure.Cosmos/src/Resource/Container/ContainerInlineCore.cs
+++ b/Microsoft.Azure.Cosmos/src/Resource/Container/ContainerInlineCore.cs
@@ -13,7 +13,7 @@ namespace Microsoft.Azure.Cosmos
using Microsoft.Azure.Cosmos.Query.Core.QueryClient;
// This class acts as a wrapper for environments that use SynchronizationContext.
- internal sealed partial class ContainerInlineCore : ContainerCore
+ internal sealed class ContainerInlineCore : ContainerCore
{
internal ContainerInlineCore(
CosmosClientContext clientContext,
@@ -31,14 +31,20 @@ public override Task ReadContainerAsync(
ContainerRequestOptions requestOptions = null,
CancellationToken cancellationToken = default)
{
- return TaskHelper.RunInlineIfNeededAsync(() => base.ReadContainerAsync(requestOptions, cancellationToken));
+ return this.ClientContext.OperationHelperAsync(
+ nameof(ReadContainerAsync),
+ requestOptions,
+ (diagnostics) => base.ReadContainerAsync(diagnostics, requestOptions, cancellationToken));
}
public override Task ReadContainerStreamAsync(
ContainerRequestOptions requestOptions = null,
CancellationToken cancellationToken = default)
{
- return TaskHelper.RunInlineIfNeededAsync(() => base.ReadContainerStreamAsync(requestOptions, cancellationToken));
+ return this.ClientContext.OperationHelperAsync(
+ nameof(ReadContainerStreamAsync),
+ requestOptions,
+ (diagnostics) => base.ReadContainerStreamAsync(diagnostics, requestOptions, cancellationToken));
}
public override Task ReplaceContainerAsync(
@@ -46,7 +52,10 @@ public override Task ReplaceContainerAsync(
ContainerRequestOptions requestOptions = null,
CancellationToken cancellationToken = default)
{
- return TaskHelper.RunInlineIfNeededAsync(() => base.ReplaceContainerAsync(containerProperties, requestOptions, cancellationToken));
+ return this.ClientContext.OperationHelperAsync(
+ nameof(ReplaceContainerAsync),
+ requestOptions,
+ (diagnostics) => base.ReplaceContainerAsync(diagnostics, containerProperties, requestOptions, cancellationToken));
}
public override Task ReplaceContainerStreamAsync(
@@ -54,33 +63,48 @@ public override Task ReplaceContainerStreamAsync(
ContainerRequestOptions requestOptions = null,
CancellationToken cancellationToken = default)
{
- return TaskHelper.RunInlineIfNeededAsync(() => base.ReplaceContainerStreamAsync(containerProperties, requestOptions, cancellationToken));
+ return this.ClientContext.OperationHelperAsync(
+ nameof(ReplaceContainerStreamAsync),
+ requestOptions,
+ (diagnostics) => base.ReplaceContainerStreamAsync(diagnostics, containerProperties, requestOptions, cancellationToken));
}
public override Task DeleteContainerAsync(
ContainerRequestOptions requestOptions = null,
CancellationToken cancellationToken = default)
{
- return TaskHelper.RunInlineIfNeededAsync(() => base.DeleteContainerAsync(requestOptions, cancellationToken));
+ return this.ClientContext.OperationHelperAsync(
+ nameof(DeleteContainerAsync),
+ requestOptions,
+ (diagnostics) => base.DeleteContainerAsync(diagnostics, requestOptions, cancellationToken));
}
public override Task DeleteContainerStreamAsync(
ContainerRequestOptions requestOptions = null,
CancellationToken cancellationToken = default)
{
- return TaskHelper.RunInlineIfNeededAsync(() => base.DeleteContainerStreamAsync(requestOptions, cancellationToken));
+ return this.ClientContext.OperationHelperAsync(
+ nameof(DeleteContainerStreamAsync),
+ requestOptions,
+ (diagnostics) => base.DeleteContainerStreamAsync(diagnostics, requestOptions, cancellationToken));
}
public override Task ReadThroughputAsync(CancellationToken cancellationToken = default)
{
- return TaskHelper.RunInlineIfNeededAsync(() => base.ReadThroughputAsync(cancellationToken));
+ return this.ClientContext.OperationHelperAsync(
+ nameof(ReadThroughputAsync),
+ null,
+ (diagnostics) => base.ReadThroughputAsync(diagnostics, cancellationToken));
}
public override Task ReadThroughputAsync(
RequestOptions requestOptions,
CancellationToken cancellationToken = default)
{
- return TaskHelper.RunInlineIfNeededAsync(() => base.ReadThroughputAsync(requestOptions, cancellationToken));
+ return this.ClientContext.OperationHelperAsync(
+ nameof(ReadThroughputAsync),
+ requestOptions,
+ (diagnostics) => base.ReadThroughputAsync(diagnostics, requestOptions, cancellationToken));
}
public override Task ReplaceThroughputAsync(
@@ -88,12 +112,37 @@ public override Task ReplaceThroughputAsync(
RequestOptions requestOptions = null,
CancellationToken cancellationToken = default)
{
- return TaskHelper.RunInlineIfNeededAsync(() => base.ReplaceThroughputAsync(throughput, requestOptions, cancellationToken));
+ return this.ClientContext.OperationHelperAsync(
+ nameof(ReplaceThroughputAsync),
+ requestOptions,
+ (diagnostics) => base.ReplaceThroughputAsync(diagnostics, throughput, requestOptions, cancellationToken));
}
- public override Task ReplaceThroughputAsync(ThroughputProperties throughputProperties, RequestOptions requestOptions = null, CancellationToken cancellationToken = default)
+ public override Task ReplaceThroughputAsync(
+ ThroughputProperties throughputProperties,
+ RequestOptions requestOptions = null,
+ CancellationToken cancellationToken = default)
+ {
+ return this.ClientContext.OperationHelperAsync(
+ nameof(ReplaceThroughputAsync),
+ requestOptions,
+ (diagnostics) => base.ReplaceThroughputAsync(diagnostics, throughputProperties, requestOptions, cancellationToken));
+ }
+
+ public override Task ReadThroughputIfExistsAsync(RequestOptions requestOptions, CancellationToken cancellationToken)
{
- return TaskHelper.RunInlineIfNeededAsync(() => base.ReplaceThroughputAsync(throughputProperties, requestOptions, cancellationToken));
+ return this.ClientContext.OperationHelperAsync(
+ nameof(ReadThroughputIfExistsAsync),
+ requestOptions,
+ (diagnostics) => base.ReadThroughputIfExistsAsync(diagnostics, requestOptions, cancellationToken));
+ }
+
+ public override Task ReplaceThroughputIfExistsAsync(ThroughputProperties throughput, RequestOptions requestOptions, CancellationToken cancellationToken)
+ {
+ return this.ClientContext.OperationHelperAsync(
+ nameof(ReplaceThroughputIfExistsAsync),
+ requestOptions,
+ (diagnostics) => base.ReplaceThroughputIfExistsAsync(diagnostics, throughput, requestOptions, cancellationToken));
}
public override Task CreateItemStreamAsync(
@@ -102,7 +151,10 @@ public override Task CreateItemStreamAsync(
ItemRequestOptions requestOptions = null,
CancellationToken cancellationToken = default)
{
- return TaskHelper.RunInlineIfNeededAsync(() => base.CreateItemStreamAsync(streamPayload, partitionKey, requestOptions, cancellationToken));
+ return this.ClientContext.OperationHelperAsync(
+ nameof(CreateItemStreamAsync),
+ requestOptions,
+ (diagnostics) => base.CreateItemStreamAsync(diagnostics, streamPayload, partitionKey, requestOptions, cancellationToken));
}
public override Task> CreateItemAsync(T item,
@@ -110,7 +162,10 @@ public override Task> CreateItemAsync(T item,
ItemRequestOptions requestOptions = null,
CancellationToken cancellationToken = default)
{
- return TaskHelper.RunInlineIfNeededAsync(() => base.CreateItemAsync(item, partitionKey, requestOptions, cancellationToken));
+ return this.ClientContext.OperationHelperAsync(
+ nameof(CreateItemAsync),
+ requestOptions,
+ (diagnostics) => base.CreateItemAsync(diagnostics, item, partitionKey, requestOptions, cancellationToken));
}
public override Task ReadItemStreamAsync(
@@ -119,7 +174,10 @@ public override Task ReadItemStreamAsync(
ItemRequestOptions requestOptions = null,
CancellationToken cancellationToken = default)
{
- return TaskHelper.RunInlineIfNeededAsync(() => base.ReadItemStreamAsync(id, partitionKey, requestOptions, cancellationToken));
+ return this.ClientContext.OperationHelperAsync(
+ nameof(ReadItemStreamAsync),
+ requestOptions,
+ (diagnostics) => base.ReadItemStreamAsync(diagnostics, id, partitionKey, requestOptions, cancellationToken));
}
public override Task> ReadItemAsync(
@@ -128,7 +186,10 @@ public override Task> ReadItemAsync(
ItemRequestOptions requestOptions = null,
CancellationToken cancellationToken = default)
{
- return TaskHelper.RunInlineIfNeededAsync(() => base.ReadItemAsync(id, partitionKey, requestOptions, cancellationToken));
+ return this.ClientContext.OperationHelperAsync(
+ nameof(ReadItemAsync),
+ requestOptions,
+ (diagnostics) => base.ReadItemAsync(diagnostics, id, partitionKey, requestOptions, cancellationToken));
}
public override Task UpsertItemStreamAsync(
@@ -137,7 +198,10 @@ public override Task UpsertItemStreamAsync(
ItemRequestOptions requestOptions = null,
CancellationToken cancellationToken = default)
{
- return TaskHelper.RunInlineIfNeededAsync(() => base.UpsertItemStreamAsync(streamPayload, partitionKey, requestOptions, cancellationToken));
+ return this.ClientContext.OperationHelperAsync(
+ nameof(UpsertItemStreamAsync),
+ requestOptions,
+ (diagnostics) => base.UpsertItemStreamAsync(diagnostics, streamPayload, partitionKey, requestOptions, cancellationToken));
}
public override Task> UpsertItemAsync(
@@ -146,7 +210,10 @@ public override Task> UpsertItemAsync(
ItemRequestOptions requestOptions = null,
CancellationToken cancellationToken = default)
{
- return TaskHelper.RunInlineIfNeededAsync(() => base.UpsertItemAsync(item, partitionKey, requestOptions, cancellationToken));
+ return this.ClientContext.OperationHelperAsync(
+ nameof(UpsertItemAsync),
+ requestOptions,
+ (diagnostics) => base.UpsertItemAsync(diagnostics, item, partitionKey, requestOptions, cancellationToken));
}
public override Task ReplaceItemStreamAsync(
@@ -156,7 +223,10 @@ public override Task ReplaceItemStreamAsync(
ItemRequestOptions requestOptions = null,
CancellationToken cancellationToken = default)
{
- return TaskHelper.RunInlineIfNeededAsync(() => base.ReplaceItemStreamAsync(streamPayload, id, partitionKey, requestOptions, cancellationToken));
+ return this.ClientContext.OperationHelperAsync(
+ nameof(ReplaceItemStreamAsync),
+ requestOptions,
+ (diagnostics) => base.ReplaceItemStreamAsync(diagnostics, streamPayload, id, partitionKey, requestOptions, cancellationToken));
}
public override Task> ReplaceItemAsync(
@@ -166,7 +236,10 @@ public override Task> ReplaceItemAsync(
ItemRequestOptions requestOptions = null,
CancellationToken cancellationToken = default)
{
- return TaskHelper.RunInlineIfNeededAsync(() => base.ReplaceItemAsync(item, id, partitionKey, requestOptions, cancellationToken));
+ return this.ClientContext.OperationHelperAsync(
+ nameof(ReplaceItemAsync),
+ requestOptions,
+ (diagnostics) => base.ReplaceItemAsync(diagnostics, item, id, partitionKey, requestOptions, cancellationToken));
}
public override Task DeleteItemStreamAsync(
@@ -175,7 +248,10 @@ public override Task DeleteItemStreamAsync(
ItemRequestOptions requestOptions = null,
CancellationToken cancellationToken = default)
{
- return TaskHelper.RunInlineIfNeededAsync(() => base.DeleteItemStreamAsync(id, partitionKey, requestOptions, cancellationToken));
+ return this.ClientContext.OperationHelperAsync(
+ nameof(DeleteItemStreamAsync),
+ requestOptions,
+ (diagnostics) => base.DeleteItemStreamAsync(diagnostics, id, partitionKey, requestOptions, cancellationToken));
}
public override Task> DeleteItemAsync(
@@ -184,13 +260,16 @@ public override Task> DeleteItemAsync(
ItemRequestOptions requestOptions = null,
CancellationToken cancellationToken = default)
{
- return TaskHelper.RunInlineIfNeededAsync(() => base.DeleteItemAsync(id, partitionKey, requestOptions, cancellationToken));
+ return this.ClientContext.OperationHelperAsync(
+ nameof(DeleteItemAsync),
+ requestOptions,
+ (diagnostics) => base.DeleteItemAsync(diagnostics, id, partitionKey, requestOptions, cancellationToken));
}
public override FeedIterator GetItemQueryStreamIterator(
- QueryDefinition queryDefinition,
- string continuationToken = null,
- QueryRequestOptions requestOptions = null)
+ QueryDefinition queryDefinition,
+ string continuationToken = null,
+ QueryRequestOptions requestOptions = null)
{
return new FeedIteratorInlineCore(base.GetItemQueryStreamIterator(
queryDefinition,
@@ -261,7 +340,10 @@ public override TransactionalBatch CreateTransactionalBatch(PartitionKey partiti
public override Task> GetFeedRangesAsync(CancellationToken cancellationToken = default(CancellationToken))
{
- return TaskHelper.RunInlineIfNeededAsync(() => base.GetFeedRangesAsync(cancellationToken));
+ return this.ClientContext.OperationHelperAsync(
+ nameof(GetFeedRangesAsync),
+ null,
+ (diagnostics) => base.GetFeedRangesAsync(diagnostics, cancellationToken));
}
public override FeedIterator GetChangeFeedStreamIterator(
@@ -310,7 +392,10 @@ public override Task> GetPartitionKeyRangesAsync(
FeedRange feedRange,
CancellationToken cancellationToken = default(CancellationToken))
{
- return TaskHelper.RunInlineIfNeededAsync(() => base.GetPartitionKeyRangesAsync(feedRange, cancellationToken));
+ return this.ClientContext.OperationHelperAsync(
+ nameof(GetPartitionKeyRangesAsync),
+ null,
+ (diagnostics) => base.GetPartitionKeyRangesAsync(feedRange, cancellationToken));
}
public override FeedIterator GetItemQueryStreamIterator(
diff --git a/Microsoft.Azure.Cosmos/src/Resource/Database/DatabaseCore.cs b/Microsoft.Azure.Cosmos/src/Resource/Database/DatabaseCore.cs
index 6247b46f7e..41650cbd8a 100644
--- a/Microsoft.Azure.Cosmos/src/Resource/Database/DatabaseCore.cs
+++ b/Microsoft.Azure.Cosmos/src/Resource/Database/DatabaseCore.cs
@@ -184,53 +184,53 @@ public async Task CreateContainerIfNotExistsAsync(
this.ValidateContainerProperties(containerProperties);
- ContainerInternal container = (ContainerInternal)this.GetContainer(containerProperties.Id);
- ResponseMessage readResponse = await container.ReadContainerStreamAsync(
- cancellationToken: cancellationToken);
-
- if (readResponse.StatusCode != HttpStatusCode.NotFound)
+ ContainerCore container = (ContainerCore)this.GetContainer(containerProperties.Id);
+ using (ResponseMessage readResponse = await container.ReadContainerStreamAsync(
+ diagnosticsContext: diagnosticsContext,
+ cancellationToken: cancellationToken))
{
- ContainerResponse retrivedContainerResponse = this.ClientContext.ResponseFactory.CreateContainerResponse(
- container,
- readResponse);
- if (!retrivedContainerResponse.Resource.PartitionKeyPath.Equals(containerProperties.PartitionKeyPath))
+ if (readResponse.StatusCode != HttpStatusCode.NotFound)
{
- throw new ArgumentException(
- string.Format(
- ClientResources.PartitionKeyPathConflict,
- containerProperties.PartitionKeyPath,
- containerProperties.Id,
- retrivedContainerResponse.Resource.PartitionKeyPath),
- nameof(containerProperties.PartitionKey));
+ ContainerResponse retrivedContainerResponse = this.ClientContext.ResponseFactory.CreateContainerResponse(
+ container,
+ readResponse);
+ if (!retrivedContainerResponse.Resource.PartitionKeyPath.Equals(containerProperties.PartitionKeyPath))
+ {
+ throw new ArgumentException(
+ string.Format(
+ ClientResources.PartitionKeyPathConflict,
+ containerProperties.PartitionKeyPath,
+ containerProperties.Id,
+ retrivedContainerResponse.Resource.PartitionKeyPath),
+ nameof(containerProperties.PartitionKey));
+ }
+
+ return retrivedContainerResponse;
}
-
- return retrivedContainerResponse;
}
this.ValidateContainerProperties(containerProperties);
- ResponseMessage createResponse = await this.CreateContainerStreamAsync(
+ using (ResponseMessage createResponse = await this.CreateContainerStreamAsync(
diagnosticsContext,
containerProperties,
throughputProperties,
requestOptions,
- cancellationToken);
-
- // Merge the previous message diagnostics
- createResponse.DiagnosticsContext.AddDiagnosticsInternal(readResponse.DiagnosticsContext);
-
- if (readResponse.StatusCode != HttpStatusCode.Conflict)
+ cancellationToken))
{
- return this.ClientContext.ResponseFactory.CreateContainerResponse(container, createResponse);
+ if (createResponse.StatusCode != HttpStatusCode.Conflict)
+ {
+ return this.ClientContext.ResponseFactory.CreateContainerResponse(container, createResponse);
+ }
}
// This second Read is to handle the race condition when 2 or more threads have Read the database and only one succeeds with Create
// so for the remaining ones we should do a Read instead of throwing Conflict exception
- ResponseMessage readResponseAfterCreate = await container.ReadContainerStreamAsync(
- cancellationToken: cancellationToken);
-
- // Merge the previous message diagnostics
- createResponse.DiagnosticsContext.AddDiagnosticsInternal(readResponse.DiagnosticsContext);
- return this.ClientContext.ResponseFactory.CreateContainerResponse(container, readResponseAfterCreate);
+ using (ResponseMessage readResponseAfterCreate = await container.ReadContainerStreamAsync(
+ diagnosticsContext: diagnosticsContext,
+ cancellationToken: cancellationToken))
+ {
+ return this.ClientContext.ResponseFactory.CreateContainerResponse(container, readResponseAfterCreate);
+ }
}
public async Task ReplaceThroughputAsync(
@@ -344,7 +344,7 @@ public Task CreateContainerAsync(
cancellationToken);
}
- public async Task CreateContainerIfNotExistsAsync(
+ public Task CreateContainerIfNotExistsAsync(
CosmosDiagnosticsContext diagnosticsContext,
ContainerProperties containerProperties,
int? throughput,
@@ -356,63 +356,12 @@ public async Task CreateContainerIfNotExistsAsync(
throw new ArgumentNullException(nameof(containerProperties));
}
- this.ValidateContainerProperties(containerProperties);
-
- ContainerInternal container = (ContainerInternal)this.GetContainer(containerProperties.Id);
- ResponseMessage readResponse = await this.ProcessResourceOperationStreamAsync(
+ return this.CreateContainerIfNotExistsAsync(
diagnosticsContext: diagnosticsContext,
- streamPayload: null,
- operationType: OperationType.Read,
- linkUri: container.LinkUri,
- resourceType: ResourceType.Collection,
- requestOptions: requestOptions,
- cancellationToken: cancellationToken);
-
- if (readResponse.StatusCode != HttpStatusCode.NotFound)
- {
- ContainerResponse retrivedContainerResponse = this.ClientContext.ResponseFactory.CreateContainerResponse(
- container,
- readResponse);
- if (!retrivedContainerResponse.Resource.PartitionKeyPath.Equals(containerProperties.PartitionKeyPath))
- {
- throw new ArgumentException(
- string.Format(
- ClientResources.PartitionKeyPathConflict,
- containerProperties.PartitionKeyPath,
- containerProperties.Id,
- retrivedContainerResponse.Resource.PartitionKeyPath),
- nameof(containerProperties.PartitionKey));
- }
-
- return retrivedContainerResponse;
- }
-
- this.ValidateContainerProperties(containerProperties);
- ResponseMessage createResponse = await this.CreateContainerStreamAsync(
- diagnosticsContext,
containerProperties,
- throughput,
+ ThroughputProperties.CreateManualThroughput(throughput),
requestOptions,
cancellationToken);
-
- if (readResponse.StatusCode != HttpStatusCode.Conflict)
- {
- return this.ClientContext.ResponseFactory.CreateContainerResponse(container, createResponse);
- }
-
- // This second Read is to handle the race condition when 2 or more threads have Read the database and only one succeeds with Create
- // so for the remaining ones we should do a Read instead of throwing Conflict exception
- ResponseMessage readResponseAfterCreate = await this.ProcessResourceOperationStreamAsync(
- diagnosticsContext: diagnosticsContext,
- streamPayload: null,
- operationType: OperationType.Read,
- linkUri: container.LinkUri,
- resourceType: ResourceType.Collection,
- requestOptions: requestOptions,
- cancellationToken: cancellationToken);
-
- // Merge the previous message diagnostics
- return this.ClientContext.ResponseFactory.CreateContainerResponse(container, readResponseAfterCreate);
}
public Task CreateContainerIfNotExistsAsync(
diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/Utils/DiagnosticValidators.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/Utils/DiagnosticValidators.cs
index dbb49b5d15..add0d49f4d 100644
--- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/Utils/DiagnosticValidators.cs
+++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/Utils/DiagnosticValidators.cs
@@ -51,7 +51,7 @@ internal static void ValidateCosmosDiagnosticsContext(
CosmosDiagnosticsContext cosmosDiagnosticsContext)
{
Assert.IsTrue((cosmosDiagnosticsContext.StartUtc - DateTime.UtcNow) < TimeSpan.FromHours(12), $"Start Time is not valid {cosmosDiagnosticsContext.StartUtc}");
- Assert.AreNotEqual(cosmosDiagnosticsContext.UserAgent.ToString(), new UserAgentContainer().UserAgent.ToString(), "User agent not set");
+ Assert.IsTrue(cosmosDiagnosticsContext.UserAgent.ToString().Contains("cosmos-netstandard-sdk"));
Assert.IsTrue(cosmosDiagnosticsContext.GetTotalRequestCount() > 0, "No request found");
Assert.IsTrue(cosmosDiagnosticsContext.IsComplete(), "OverallClientRequestTime should be stopped");
Assert.IsTrue(cosmosDiagnosticsContext.GetRunningElapsedTime() > TimeSpan.Zero, "OverallClientRequestTime should have time.");
@@ -62,7 +62,7 @@ internal static void ValidateCosmosDiagnosticsContext(
Assert.IsNotNull(jObject["DiagnosticVersion"].ToString());
JToken summary = jObject["Summary"];
Assert.IsNotNull(summary["UserAgent"].ToString());
- Assert.AreNotEqual(summary["UserAgent"].ToString(), new UserAgentContainer().UserAgent);
+ Assert.IsTrue(summary["UserAgent"].ToString().Contains("cosmos-netstandard-sdk"));
Assert.IsNotNull(summary["StartUtc"].ToString());
Assert.IsNotNull(summary["TotalElapsedTimeInMs"].ToString());
}
diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/CosmosItemUnitTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/CosmosItemUnitTests.cs
index 870e5b995b..16cee3c6aa 100644
--- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/CosmosItemUnitTests.cs
+++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/CosmosItemUnitTests.cs
@@ -474,6 +474,26 @@ public async Task TestNestedPartitionKeyValueFromStreamAsync()
mockContext.Setup(x => x.ResponseFactory).Returns(context.ResponseFactory);
mockContext.Setup(x => x.SerializerCore).Returns(context.SerializerCore);
mockContext.Setup(x => x.DocumentClient).Returns(context.DocumentClient);
+
+ mockContext.Setup(x => x.CreateDiagnosticContext(
+ It.IsAny(),
+ It.IsAny()))
+ .Returns((x, y) => new CosmosDiagnosticsContextCore(x, "MockUserAgentString"));
+
+ mockContext.Setup(x => x.OperationHelperAsync(
+ It.IsAny(),
+ It.IsAny(),
+ It.IsAny>>()))
+ .Returns>>(
+ (x, y, z) => z(new CosmosDiagnosticsContextCore(x, "MockUserAgentString")));
+
+ mockContext.Setup(x => x.OperationHelperAsync>(
+ It.IsAny(),
+ It.IsAny(),
+ It.IsAny>>>()))
+ .Returns>>>(
+ (x, y, z) => z(new CosmosDiagnosticsContextCore(x, "MockUserAgentString")));
+
mockContext.Setup(x => x.ProcessResourceOperationStreamAsync(
It.IsAny(),
It.IsAny(),