Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Diagnostics: Add synchronization context Part 2 for Container #1643

Merged
merged 6 commits into from
Jun 19, 2020
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
290 changes: 130 additions & 160 deletions Microsoft.Azure.Cosmos/src/Resource/Container/ContainerCore.Items.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ namespace Microsoft.Azure.Cosmos
/// 1. The object operations where it serializes and deserializes the item on request/response
/// 2. The stream response which takes a Stream containing a JSON serialized object and returns a response containing a Stream
/// </summary>
internal partial class ContainerCore : ContainerInternal
internal abstract partial class ContainerCore : ContainerInternal
{
/// <summary>
/// Cache the full URI segment without the last resource id.
Expand All @@ -39,27 +39,25 @@ internal partial class ContainerCore : ContainerInternal

private readonly CosmosQueryClient queryClient;

public override async Task<ResponseMessage> CreateItemStreamAsync(
Stream streamPayload,
PartitionKey partitionKey,
ItemRequestOptions requestOptions = null,
CancellationToken cancellationToken = default(CancellationToken))
public async Task<ResponseMessage> CreateItemStreamAsync(
CosmosDiagnosticsContext diagnosticsContext,
Stream streamPayload,
PartitionKey partitionKey,
ItemRequestOptions requestOptions = null,
CancellationToken cancellationToken = default(CancellationToken))
{
CosmosDiagnosticsContext diagnosticsContext = CosmosDiagnosticsContext.Create(requestOptions);
using (diagnosticsContext.GetOverallScope())
{
return await this.ProcessItemStreamAsync(
partitionKey: partitionKey,
itemId: null,
streamPayload: streamPayload,
operationType: OperationType.Create,
requestOptions: requestOptions,
diagnosticsContext: diagnosticsContext,
cancellationToken: cancellationToken);
}
return await this.ProcessItemStreamAsync(
partitionKey: partitionKey,
itemId: null,
streamPayload: streamPayload,
operationType: OperationType.Create,
requestOptions: requestOptions,
diagnosticsContext: diagnosticsContext,
cancellationToken: cancellationToken);
}

public override async Task<ItemResponse<T>> CreateItemAsync<T>(
public async Task<ItemResponse<T>> CreateItemAsync<T>(
CosmosDiagnosticsContext diagnosticsContext,
T item,
PartitionKey? partitionKey = null,
ItemRequestOptions requestOptions = null,
Expand All @@ -70,85 +68,73 @@ public override async Task<ItemResponse<T>> CreateItemAsync<T>(
throw new ArgumentNullException(nameof(item));
}

CosmosDiagnosticsContext diagnosticsContext = CosmosDiagnosticsContext.Create(requestOptions);
using (diagnosticsContext.GetOverallScope())
{
ResponseMessage response = await this.ExtractPartitionKeyAndProcessItemStreamAsync(
partitionKey: partitionKey,
itemId: null,
item: item,
operationType: OperationType.Create,
requestOptions: requestOptions,
diagnosticsContext: diagnosticsContext,
cancellationToken: cancellationToken);
ResponseMessage response = await this.ExtractPartitionKeyAndProcessItemStreamAsync(
partitionKey: partitionKey,
itemId: null,
item: item,
operationType: OperationType.Create,
requestOptions: requestOptions,
diagnosticsContext: diagnosticsContext,
cancellationToken: cancellationToken);

return this.ClientContext.ResponseFactory.CreateItemResponse<T>(response);
}
return this.ClientContext.ResponseFactory.CreateItemResponse<T>(response);
}

public override async Task<ResponseMessage> ReadItemStreamAsync(
string id,
PartitionKey partitionKey,
ItemRequestOptions requestOptions = null,
CancellationToken cancellationToken = default(CancellationToken))
public async Task<ResponseMessage> ReadItemStreamAsync(
CosmosDiagnosticsContext diagnosticsContext,
string id,
PartitionKey partitionKey,
ItemRequestOptions requestOptions = null,
CancellationToken cancellationToken = default(CancellationToken))
{
CosmosDiagnosticsContext diagnosticsContext = CosmosDiagnosticsContext.Create(requestOptions);
using (diagnosticsContext.GetOverallScope())
{
return await this.ProcessItemStreamAsync(
partitionKey: partitionKey,
itemId: id,
streamPayload: null,
operationType: OperationType.Read,
requestOptions: requestOptions,
diagnosticsContext: diagnosticsContext,
cancellationToken: cancellationToken);
}
return await this.ProcessItemStreamAsync(
partitionKey: partitionKey,
itemId: id,
streamPayload: null,
operationType: OperationType.Read,
requestOptions: requestOptions,
diagnosticsContext: diagnosticsContext,
cancellationToken: cancellationToken);
}

public override async Task<ItemResponse<T>> ReadItemAsync<T>(
public async Task<ItemResponse<T>> ReadItemAsync<T>(
CosmosDiagnosticsContext diagnosticsContext,
string id,
PartitionKey partitionKey,
ItemRequestOptions requestOptions = null,
CancellationToken cancellationToken = default(CancellationToken))
{
CosmosDiagnosticsContext diagnosticsContext = CosmosDiagnosticsContext.Create(requestOptions);
using (diagnosticsContext.GetOverallScope())
{
ResponseMessage response = await this.ProcessItemStreamAsync(
partitionKey: partitionKey,
itemId: id,
streamPayload: null,
operationType: OperationType.Read,
requestOptions: requestOptions,
diagnosticsContext: diagnosticsContext,
cancellationToken: cancellationToken);
ResponseMessage response = await this.ProcessItemStreamAsync(
partitionKey: partitionKey,
itemId: id,
streamPayload: null,
operationType: OperationType.Read,
requestOptions: requestOptions,
diagnosticsContext: diagnosticsContext,
cancellationToken: cancellationToken);

return this.ClientContext.ResponseFactory.CreateItemResponse<T>(response);
}
return this.ClientContext.ResponseFactory.CreateItemResponse<T>(response);
}

public override async Task<ResponseMessage> UpsertItemStreamAsync(
Stream streamPayload,
PartitionKey partitionKey,
ItemRequestOptions requestOptions = null,
CancellationToken cancellationToken = default(CancellationToken))
public async Task<ResponseMessage> UpsertItemStreamAsync(
CosmosDiagnosticsContext diagnosticsContext,
Stream streamPayload,
PartitionKey partitionKey,
ItemRequestOptions requestOptions = null,
CancellationToken cancellationToken = default(CancellationToken))
{
CosmosDiagnosticsContext diagnosticsContext = CosmosDiagnosticsContext.Create(requestOptions);
using (diagnosticsContext.GetOverallScope())
{
return await this.ProcessItemStreamAsync(
partitionKey: partitionKey,
itemId: null,
streamPayload: streamPayload,
operationType: OperationType.Upsert,
requestOptions: requestOptions,
diagnosticsContext: diagnosticsContext,
cancellationToken: cancellationToken);
}
return await this.ProcessItemStreamAsync(
partitionKey: partitionKey,
itemId: null,
streamPayload: streamPayload,
operationType: OperationType.Upsert,
requestOptions: requestOptions,
diagnosticsContext: diagnosticsContext,
cancellationToken: cancellationToken);
}

public override async Task<ItemResponse<T>> UpsertItemAsync<T>(
public async Task<ItemResponse<T>> UpsertItemAsync<T>(
CosmosDiagnosticsContext diagnosticsContext,
T item,
PartitionKey? partitionKey = null,
ItemRequestOptions requestOptions = null,
Expand All @@ -159,44 +145,38 @@ public override async Task<ItemResponse<T>> UpsertItemAsync<T>(
throw new ArgumentNullException(nameof(item));
}

CosmosDiagnosticsContext diagnosticsContext = CosmosDiagnosticsContext.Create(requestOptions);
using (diagnosticsContext.GetOverallScope())
{
ResponseMessage response = await this.ExtractPartitionKeyAndProcessItemStreamAsync(
partitionKey: partitionKey,
itemId: null,
item: item,
operationType: OperationType.Upsert,
requestOptions: requestOptions,
diagnosticsContext: diagnosticsContext,
cancellationToken: cancellationToken);
ResponseMessage response = await this.ExtractPartitionKeyAndProcessItemStreamAsync(
partitionKey: partitionKey,
itemId: null,
item: item,
operationType: OperationType.Upsert,
requestOptions: requestOptions,
diagnosticsContext: diagnosticsContext,
cancellationToken: cancellationToken);

return this.ClientContext.ResponseFactory.CreateItemResponse<T>(response);
}
return this.ClientContext.ResponseFactory.CreateItemResponse<T>(response);
}

public override async Task<ResponseMessage> ReplaceItemStreamAsync(
Stream streamPayload,
string id,
PartitionKey partitionKey,
ItemRequestOptions requestOptions = null,
CancellationToken cancellationToken = default(CancellationToken))
public async Task<ResponseMessage> ReplaceItemStreamAsync(
CosmosDiagnosticsContext diagnosticsContext,
Stream streamPayload,
string id,
PartitionKey partitionKey,
ItemRequestOptions requestOptions = null,
CancellationToken cancellationToken = default(CancellationToken))
{
CosmosDiagnosticsContext diagnosticsContext = CosmosDiagnosticsContext.Create(requestOptions);
using (diagnosticsContext.GetOverallScope())
{
return await this.ProcessItemStreamAsync(
partitionKey: partitionKey,
itemId: id,
streamPayload: streamPayload,
operationType: OperationType.Replace,
requestOptions: requestOptions,
diagnosticsContext: diagnosticsContext,
cancellationToken: cancellationToken);
}
return await this.ProcessItemStreamAsync(
partitionKey: partitionKey,
itemId: id,
streamPayload: streamPayload,
operationType: OperationType.Replace,
requestOptions: requestOptions,
diagnosticsContext: diagnosticsContext,
cancellationToken: cancellationToken);
}

public override async Task<ItemResponse<T>> ReplaceItemAsync<T>(
public async Task<ItemResponse<T>> ReplaceItemAsync<T>(
CosmosDiagnosticsContext diagnosticsContext,
T item,
string id,
PartitionKey? partitionKey = null,
Expand All @@ -213,68 +193,58 @@ public override async Task<ItemResponse<T>> ReplaceItemAsync<T>(
throw new ArgumentNullException(nameof(item));
}

CosmosDiagnosticsContext diagnosticsContext = CosmosDiagnosticsContext.Create(requestOptions);
using (diagnosticsContext.GetOverallScope())
{
ResponseMessage response = await this.ExtractPartitionKeyAndProcessItemStreamAsync(
partitionKey: partitionKey,
itemId: id,
item: item,
operationType: OperationType.Replace,
requestOptions: requestOptions,
diagnosticsContext: diagnosticsContext,
cancellationToken: cancellationToken);
ResponseMessage response = await this.ExtractPartitionKeyAndProcessItemStreamAsync(
partitionKey: partitionKey,
itemId: id,
item: item,
operationType: OperationType.Replace,
requestOptions: requestOptions,
diagnosticsContext: diagnosticsContext,
cancellationToken: cancellationToken);

return this.ClientContext.ResponseFactory.CreateItemResponse<T>(response);
}
return this.ClientContext.ResponseFactory.CreateItemResponse<T>(response);
}

public override async Task<ResponseMessage> DeleteItemStreamAsync(
string id,
PartitionKey partitionKey,
ItemRequestOptions requestOptions = null,
CancellationToken cancellationToken = default(CancellationToken))
public async Task<ResponseMessage> DeleteItemStreamAsync(
CosmosDiagnosticsContext diagnosticsContext,
string id,
PartitionKey partitionKey,
ItemRequestOptions requestOptions = null,
CancellationToken cancellationToken = default(CancellationToken))
{
CosmosDiagnosticsContext diagnosticsContext = CosmosDiagnosticsContext.Create(requestOptions);
using (diagnosticsContext.GetOverallScope())
{
return await this.ProcessItemStreamAsync(
partitionKey: partitionKey,
itemId: id,
streamPayload: null,
operationType: OperationType.Delete,
requestOptions: requestOptions,
diagnosticsContext: diagnosticsContext,
cancellationToken: cancellationToken);
}
return await this.ProcessItemStreamAsync(
partitionKey: partitionKey,
itemId: id,
streamPayload: null,
operationType: OperationType.Delete,
requestOptions: requestOptions,
diagnosticsContext: diagnosticsContext,
cancellationToken: cancellationToken);
}

public override async Task<ItemResponse<T>> DeleteItemAsync<T>(
public async Task<ItemResponse<T>> DeleteItemAsync<T>(
CosmosDiagnosticsContext diagnosticsContext,
string id,
PartitionKey partitionKey,
ItemRequestOptions requestOptions = null,
CancellationToken cancellationToken = default(CancellationToken))
{
CosmosDiagnosticsContext diagnosticsContext = CosmosDiagnosticsContext.Create(requestOptions);
using (diagnosticsContext.GetOverallScope())
{
ResponseMessage response = await this.ProcessItemStreamAsync(
partitionKey: partitionKey,
itemId: id,
streamPayload: null,
operationType: OperationType.Delete,
requestOptions: requestOptions,
diagnosticsContext: diagnosticsContext,
cancellationToken: cancellationToken);
ResponseMessage response = await this.ProcessItemStreamAsync(
partitionKey: partitionKey,
itemId: id,
streamPayload: null,
operationType: OperationType.Delete,
requestOptions: requestOptions,
diagnosticsContext: diagnosticsContext,
cancellationToken: cancellationToken);

return this.ClientContext.ResponseFactory.CreateItemResponse<T>(response);
}
return this.ClientContext.ResponseFactory.CreateItemResponse<T>(response);
}

public override FeedIterator GetItemQueryStreamIterator(
string queryText = null,
string continuationToken = null,
QueryRequestOptions requestOptions = null)
string queryText = null,
j82w marked this conversation as resolved.
Show resolved Hide resolved
string continuationToken = null,
QueryRequestOptions requestOptions = null)
{
QueryDefinition queryDefinition = null;
if (queryText != null)
Expand Down
Loading