Skip to content

Commit

Permalink
Diagnostics: Add synchronization context Part 2 for Container (#1643)
Browse files Browse the repository at this point in the history
* Diagnostics: Add synchronization context Part 2 for Container
  • Loading branch information
j82w authored and kirankumarkolli committed Jul 11, 2020
1 parent 2380414 commit c7cb8ac
Show file tree
Hide file tree
Showing 6 changed files with 349 additions and 300 deletions.
290 changes: 130 additions & 160 deletions Microsoft.Azure.Cosmos/src/Resource/Container/ContainerCore.Items.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ namespace Microsoft.Azure.Cosmos
/// 1. The object operations where it serializes and deserializes the item on request/response
/// 2. The stream response which takes a Stream containing a JSON serialized object and returns a response containing a Stream
/// </summary>
internal partial class ContainerCore : ContainerInternal
internal abstract partial class ContainerCore : ContainerInternal
{
/// <summary>
/// Cache the full URI segment without the last resource id.
Expand All @@ -39,27 +39,25 @@ internal partial class ContainerCore : ContainerInternal

private readonly CosmosQueryClient queryClient;

public override async Task<ResponseMessage> CreateItemStreamAsync(
Stream streamPayload,
PartitionKey partitionKey,
ItemRequestOptions requestOptions = null,
CancellationToken cancellationToken = default(CancellationToken))
public async Task<ResponseMessage> CreateItemStreamAsync(
CosmosDiagnosticsContext diagnosticsContext,
Stream streamPayload,
PartitionKey partitionKey,
ItemRequestOptions requestOptions = null,
CancellationToken cancellationToken = default(CancellationToken))
{
CosmosDiagnosticsContext diagnosticsContext = CosmosDiagnosticsContext.Create(requestOptions);
using (diagnosticsContext.GetOverallScope())
{
return await this.ProcessItemStreamAsync(
partitionKey: partitionKey,
itemId: null,
streamPayload: streamPayload,
operationType: OperationType.Create,
requestOptions: requestOptions,
diagnosticsContext: diagnosticsContext,
cancellationToken: cancellationToken);
}
return await this.ProcessItemStreamAsync(
partitionKey: partitionKey,
itemId: null,
streamPayload: streamPayload,
operationType: OperationType.Create,
requestOptions: requestOptions,
diagnosticsContext: diagnosticsContext,
cancellationToken: cancellationToken);
}

public override async Task<ItemResponse<T>> CreateItemAsync<T>(
public async Task<ItemResponse<T>> CreateItemAsync<T>(
CosmosDiagnosticsContext diagnosticsContext,
T item,
PartitionKey? partitionKey = null,
ItemRequestOptions requestOptions = null,
Expand All @@ -70,85 +68,73 @@ public override async Task<ItemResponse<T>> CreateItemAsync<T>(
throw new ArgumentNullException(nameof(item));
}

CosmosDiagnosticsContext diagnosticsContext = CosmosDiagnosticsContext.Create(requestOptions);
using (diagnosticsContext.GetOverallScope())
{
ResponseMessage response = await this.ExtractPartitionKeyAndProcessItemStreamAsync(
partitionKey: partitionKey,
itemId: null,
item: item,
operationType: OperationType.Create,
requestOptions: requestOptions,
diagnosticsContext: diagnosticsContext,
cancellationToken: cancellationToken);
ResponseMessage response = await this.ExtractPartitionKeyAndProcessItemStreamAsync(
partitionKey: partitionKey,
itemId: null,
item: item,
operationType: OperationType.Create,
requestOptions: requestOptions,
diagnosticsContext: diagnosticsContext,
cancellationToken: cancellationToken);

return this.ClientContext.ResponseFactory.CreateItemResponse<T>(response);
}
return this.ClientContext.ResponseFactory.CreateItemResponse<T>(response);
}

public override async Task<ResponseMessage> ReadItemStreamAsync(
string id,
PartitionKey partitionKey,
ItemRequestOptions requestOptions = null,
CancellationToken cancellationToken = default(CancellationToken))
public async Task<ResponseMessage> ReadItemStreamAsync(
CosmosDiagnosticsContext diagnosticsContext,
string id,
PartitionKey partitionKey,
ItemRequestOptions requestOptions = null,
CancellationToken cancellationToken = default(CancellationToken))
{
CosmosDiagnosticsContext diagnosticsContext = CosmosDiagnosticsContext.Create(requestOptions);
using (diagnosticsContext.GetOverallScope())
{
return await this.ProcessItemStreamAsync(
partitionKey: partitionKey,
itemId: id,
streamPayload: null,
operationType: OperationType.Read,
requestOptions: requestOptions,
diagnosticsContext: diagnosticsContext,
cancellationToken: cancellationToken);
}
return await this.ProcessItemStreamAsync(
partitionKey: partitionKey,
itemId: id,
streamPayload: null,
operationType: OperationType.Read,
requestOptions: requestOptions,
diagnosticsContext: diagnosticsContext,
cancellationToken: cancellationToken);
}

public override async Task<ItemResponse<T>> ReadItemAsync<T>(
public async Task<ItemResponse<T>> ReadItemAsync<T>(
CosmosDiagnosticsContext diagnosticsContext,
string id,
PartitionKey partitionKey,
ItemRequestOptions requestOptions = null,
CancellationToken cancellationToken = default(CancellationToken))
{
CosmosDiagnosticsContext diagnosticsContext = CosmosDiagnosticsContext.Create(requestOptions);
using (diagnosticsContext.GetOverallScope())
{
ResponseMessage response = await this.ProcessItemStreamAsync(
partitionKey: partitionKey,
itemId: id,
streamPayload: null,
operationType: OperationType.Read,
requestOptions: requestOptions,
diagnosticsContext: diagnosticsContext,
cancellationToken: cancellationToken);
ResponseMessage response = await this.ProcessItemStreamAsync(
partitionKey: partitionKey,
itemId: id,
streamPayload: null,
operationType: OperationType.Read,
requestOptions: requestOptions,
diagnosticsContext: diagnosticsContext,
cancellationToken: cancellationToken);

return this.ClientContext.ResponseFactory.CreateItemResponse<T>(response);
}
return this.ClientContext.ResponseFactory.CreateItemResponse<T>(response);
}

public override async Task<ResponseMessage> UpsertItemStreamAsync(
Stream streamPayload,
PartitionKey partitionKey,
ItemRequestOptions requestOptions = null,
CancellationToken cancellationToken = default(CancellationToken))
public async Task<ResponseMessage> UpsertItemStreamAsync(
CosmosDiagnosticsContext diagnosticsContext,
Stream streamPayload,
PartitionKey partitionKey,
ItemRequestOptions requestOptions = null,
CancellationToken cancellationToken = default(CancellationToken))
{
CosmosDiagnosticsContext diagnosticsContext = CosmosDiagnosticsContext.Create(requestOptions);
using (diagnosticsContext.GetOverallScope())
{
return await this.ProcessItemStreamAsync(
partitionKey: partitionKey,
itemId: null,
streamPayload: streamPayload,
operationType: OperationType.Upsert,
requestOptions: requestOptions,
diagnosticsContext: diagnosticsContext,
cancellationToken: cancellationToken);
}
return await this.ProcessItemStreamAsync(
partitionKey: partitionKey,
itemId: null,
streamPayload: streamPayload,
operationType: OperationType.Upsert,
requestOptions: requestOptions,
diagnosticsContext: diagnosticsContext,
cancellationToken: cancellationToken);
}

public override async Task<ItemResponse<T>> UpsertItemAsync<T>(
public async Task<ItemResponse<T>> UpsertItemAsync<T>(
CosmosDiagnosticsContext diagnosticsContext,
T item,
PartitionKey? partitionKey = null,
ItemRequestOptions requestOptions = null,
Expand All @@ -159,44 +145,38 @@ public override async Task<ItemResponse<T>> UpsertItemAsync<T>(
throw new ArgumentNullException(nameof(item));
}

CosmosDiagnosticsContext diagnosticsContext = CosmosDiagnosticsContext.Create(requestOptions);
using (diagnosticsContext.GetOverallScope())
{
ResponseMessage response = await this.ExtractPartitionKeyAndProcessItemStreamAsync(
partitionKey: partitionKey,
itemId: null,
item: item,
operationType: OperationType.Upsert,
requestOptions: requestOptions,
diagnosticsContext: diagnosticsContext,
cancellationToken: cancellationToken);
ResponseMessage response = await this.ExtractPartitionKeyAndProcessItemStreamAsync(
partitionKey: partitionKey,
itemId: null,
item: item,
operationType: OperationType.Upsert,
requestOptions: requestOptions,
diagnosticsContext: diagnosticsContext,
cancellationToken: cancellationToken);

return this.ClientContext.ResponseFactory.CreateItemResponse<T>(response);
}
return this.ClientContext.ResponseFactory.CreateItemResponse<T>(response);
}

public override async Task<ResponseMessage> ReplaceItemStreamAsync(
Stream streamPayload,
string id,
PartitionKey partitionKey,
ItemRequestOptions requestOptions = null,
CancellationToken cancellationToken = default(CancellationToken))
public async Task<ResponseMessage> ReplaceItemStreamAsync(
CosmosDiagnosticsContext diagnosticsContext,
Stream streamPayload,
string id,
PartitionKey partitionKey,
ItemRequestOptions requestOptions = null,
CancellationToken cancellationToken = default(CancellationToken))
{
CosmosDiagnosticsContext diagnosticsContext = CosmosDiagnosticsContext.Create(requestOptions);
using (diagnosticsContext.GetOverallScope())
{
return await this.ProcessItemStreamAsync(
partitionKey: partitionKey,
itemId: id,
streamPayload: streamPayload,
operationType: OperationType.Replace,
requestOptions: requestOptions,
diagnosticsContext: diagnosticsContext,
cancellationToken: cancellationToken);
}
return await this.ProcessItemStreamAsync(
partitionKey: partitionKey,
itemId: id,
streamPayload: streamPayload,
operationType: OperationType.Replace,
requestOptions: requestOptions,
diagnosticsContext: diagnosticsContext,
cancellationToken: cancellationToken);
}

public override async Task<ItemResponse<T>> ReplaceItemAsync<T>(
public async Task<ItemResponse<T>> ReplaceItemAsync<T>(
CosmosDiagnosticsContext diagnosticsContext,
T item,
string id,
PartitionKey? partitionKey = null,
Expand All @@ -213,68 +193,58 @@ public override async Task<ItemResponse<T>> ReplaceItemAsync<T>(
throw new ArgumentNullException(nameof(item));
}

CosmosDiagnosticsContext diagnosticsContext = CosmosDiagnosticsContext.Create(requestOptions);
using (diagnosticsContext.GetOverallScope())
{
ResponseMessage response = await this.ExtractPartitionKeyAndProcessItemStreamAsync(
partitionKey: partitionKey,
itemId: id,
item: item,
operationType: OperationType.Replace,
requestOptions: requestOptions,
diagnosticsContext: diagnosticsContext,
cancellationToken: cancellationToken);
ResponseMessage response = await this.ExtractPartitionKeyAndProcessItemStreamAsync(
partitionKey: partitionKey,
itemId: id,
item: item,
operationType: OperationType.Replace,
requestOptions: requestOptions,
diagnosticsContext: diagnosticsContext,
cancellationToken: cancellationToken);

return this.ClientContext.ResponseFactory.CreateItemResponse<T>(response);
}
return this.ClientContext.ResponseFactory.CreateItemResponse<T>(response);
}

public override async Task<ResponseMessage> DeleteItemStreamAsync(
string id,
PartitionKey partitionKey,
ItemRequestOptions requestOptions = null,
CancellationToken cancellationToken = default(CancellationToken))
public async Task<ResponseMessage> DeleteItemStreamAsync(
CosmosDiagnosticsContext diagnosticsContext,
string id,
PartitionKey partitionKey,
ItemRequestOptions requestOptions = null,
CancellationToken cancellationToken = default(CancellationToken))
{
CosmosDiagnosticsContext diagnosticsContext = CosmosDiagnosticsContext.Create(requestOptions);
using (diagnosticsContext.GetOverallScope())
{
return await this.ProcessItemStreamAsync(
partitionKey: partitionKey,
itemId: id,
streamPayload: null,
operationType: OperationType.Delete,
requestOptions: requestOptions,
diagnosticsContext: diagnosticsContext,
cancellationToken: cancellationToken);
}
return await this.ProcessItemStreamAsync(
partitionKey: partitionKey,
itemId: id,
streamPayload: null,
operationType: OperationType.Delete,
requestOptions: requestOptions,
diagnosticsContext: diagnosticsContext,
cancellationToken: cancellationToken);
}

public override async Task<ItemResponse<T>> DeleteItemAsync<T>(
public async Task<ItemResponse<T>> DeleteItemAsync<T>(
CosmosDiagnosticsContext diagnosticsContext,
string id,
PartitionKey partitionKey,
ItemRequestOptions requestOptions = null,
CancellationToken cancellationToken = default(CancellationToken))
{
CosmosDiagnosticsContext diagnosticsContext = CosmosDiagnosticsContext.Create(requestOptions);
using (diagnosticsContext.GetOverallScope())
{
ResponseMessage response = await this.ProcessItemStreamAsync(
partitionKey: partitionKey,
itemId: id,
streamPayload: null,
operationType: OperationType.Delete,
requestOptions: requestOptions,
diagnosticsContext: diagnosticsContext,
cancellationToken: cancellationToken);
ResponseMessage response = await this.ProcessItemStreamAsync(
partitionKey: partitionKey,
itemId: id,
streamPayload: null,
operationType: OperationType.Delete,
requestOptions: requestOptions,
diagnosticsContext: diagnosticsContext,
cancellationToken: cancellationToken);

return this.ClientContext.ResponseFactory.CreateItemResponse<T>(response);
}
return this.ClientContext.ResponseFactory.CreateItemResponse<T>(response);
}

public override FeedIterator GetItemQueryStreamIterator(
string queryText = null,
string continuationToken = null,
QueryRequestOptions requestOptions = null)
string queryText = null,
string continuationToken = null,
QueryRequestOptions requestOptions = null)
{
QueryDefinition queryDefinition = null;
if (queryText != null)
Expand Down
Loading

0 comments on commit c7cb8ac

Please sign in to comment.