Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Diagnostics: Add synchronization context Part 1 #1587

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
197 changes: 100 additions & 97 deletions Microsoft.Azure.Cosmos/src/CosmosClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ public virtual Task<AccountProperties> ReadAccountAsync()
/// <param name="id">The Cosmos database id</param>
/// <remarks>
/// <see cref="Database"/> proxy reference doesn't guarantee existence.
/// Please ensure database exists through <see cref="CosmosClient.CreateDatabaseAsync(DatabaseProperties, int?, RequestOptions, CancellationToken)"/>
/// Please ensure database exists through <see cref="CosmosClient.CreateDatabaseAsync(string, int?, RequestOptions, CancellationToken)"/>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are there any mechanisms which we could use to get consistency?
Like referring real code snippet like in the public docs?

Non blocking but will immensely help forward.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was pointing to a real snippet of code, it's just an internal method.

/// or <see cref="CosmosClient.CreateDatabaseIfNotExistsAsync(string, int?, RequestOptions, CancellationToken)"/>, before
/// operating on it.
/// </remarks>
Expand Down Expand Up @@ -382,12 +382,21 @@ public virtual Task<DatabaseResponse> CreateDatabaseAsync(
throw new ArgumentNullException(nameof(id));
}

DatabaseProperties databaseProperties = this.PrepareDatabaseProperties(id);
return TaskHelper.RunInlineIfNeededAsync(() => this.CreateDatabaseAsync(
databaseProperties: databaseProperties,
throughput: throughput,
requestOptions: requestOptions,
cancellationToken: cancellationToken));
return this.ClientContext.OperationHelperAsync(
nameof(CreateDatabaseAsync),
requestOptions,
(diagnostics) =>
{
DatabaseProperties databaseProperties = this.PrepareDatabaseProperties(id);
ThroughputProperties throughputProperties = ThroughputProperties.CreateManualThroughput(throughput);

return this.CreateDatabaseInternalAsync(
databaseProperties: databaseProperties,
throughputProperties: throughputProperties,
requestOptions: requestOptions,
diagnosticsContext: diagnostics,
cancellationToken: cancellationToken);
});
}

/// <summary>
Expand Down Expand Up @@ -418,12 +427,19 @@ public virtual Task<DatabaseResponse> CreateDatabaseAsync(
throw new ArgumentNullException(nameof(id));
}

DatabaseProperties databaseProperties = this.PrepareDatabaseProperties(id);
return TaskHelper.RunInlineIfNeededAsync(() => this.CreateDatabaseAsync(
databaseProperties: databaseProperties,
throughputProperties: throughputProperties,
requestOptions: requestOptions,
cancellationToken: cancellationToken));
return this.ClientContext.OperationHelperAsync(
nameof(CreateDatabaseAsync),
requestOptions,
(diagnostics) =>
{
DatabaseProperties databaseProperties = this.PrepareDatabaseProperties(id);
return this.CreateDatabaseInternalAsync(
diagnosticsContext: diagnostics,
databaseProperties: databaseProperties,
throughputProperties: throughputProperties,
requestOptions: requestOptions,
cancellationToken: cancellationToken);
});
}

/// <summary>
Expand Down Expand Up @@ -468,35 +484,45 @@ public virtual Task<DatabaseResponse> CreateDatabaseIfNotExistsAsync(
throw new ArgumentNullException(nameof(id));
}

return TaskHelper.RunInlineIfNeededAsync(async () =>
return this.ClientContext.OperationHelperAsync(
nameof(CreateDatabaseIfNotExistsAsync),
requestOptions,
async (diagnostics) =>
{
// Doing a Read before Create will give us better latency for existing databases
DatabaseProperties databaseProperties = this.PrepareDatabaseProperties(id);
Database database = this.GetDatabase(id);
ResponseMessage readResponse = await database.ReadStreamAsync(
DatabaseCore database = (DatabaseCore)this.GetDatabase(id);
j82w marked this conversation as resolved.
Show resolved Hide resolved
using (ResponseMessage readResponse = await database.ReadStreamAsync(
diagnosticsContext: diagnostics,
requestOptions: requestOptions,
cancellationToken: cancellationToken);

if (readResponse.StatusCode != HttpStatusCode.NotFound)
cancellationToken: cancellationToken))
{
return this.ClientContext.ResponseFactory.CreateDatabaseResponse(database, readResponse);
if (readResponse.StatusCode != HttpStatusCode.NotFound)
{
return this.ClientContext.ResponseFactory.CreateDatabaseResponse(database, readResponse);
}
}

ResponseMessage createResponse = await this.CreateDatabaseStreamAsync(databaseProperties, throughputProperties, requestOptions, cancellationToken);

// Merge the diagnostics with the first read request.
createResponse.DiagnosticsContext.AddDiagnosticsInternal(readResponse.DiagnosticsContext);
if (createResponse.StatusCode != HttpStatusCode.Conflict)
using (ResponseMessage createResponse = await this.CreateDatabaseStreamInternalAsync(
diagnostics,
databaseProperties,
throughputProperties,
requestOptions,
cancellationToken))
{
return this.ClientContext.ResponseFactory.CreateDatabaseResponse(this.GetDatabase(databaseProperties.Id), createResponse);
if (createResponse.StatusCode != HttpStatusCode.Conflict)
{
return this.ClientContext.ResponseFactory.CreateDatabaseResponse(this.GetDatabase(databaseProperties.Id), createResponse);
}
}

// This second Read is to handle the race condition when 2 or more threads have Read the database and only one succeeds with Create
// so for the remaining ones we should do a Read instead of throwing Conflict exception
ResponseMessage readResponseAfterConflict = await database.ReadStreamAsync(
diagnosticsContext: diagnostics,
requestOptions: requestOptions,
cancellationToken: cancellationToken);
readResponseAfterConflict.DiagnosticsContext.AddDiagnosticsInternal(readResponse.DiagnosticsContext);

return this.ClientContext.ResponseFactory.CreateDatabaseResponse(this.GetDatabase(databaseProperties.Id), readResponseAfterConflict);
});
}
Expand Down Expand Up @@ -538,8 +564,7 @@ public virtual Task<DatabaseResponse> CreateDatabaseIfNotExistsAsync(
RequestOptions requestOptions = null,
CancellationToken cancellationToken = default(CancellationToken))
{
ThroughputProperties throughputProperties = throughput.HasValue
? ThroughputProperties.CreateManualThroughput(throughput.Value) : null;
ThroughputProperties throughputProperties = ThroughputProperties.CreateManualThroughput(throughput);

return this.CreateDatabaseIfNotExistsAsync(
id,
Expand Down Expand Up @@ -586,10 +611,10 @@ public virtual FeedIterator<T> GetDatabaseQueryIterator<T>(
QueryRequestOptions requestOptions = null)
{
return new FeedIteratorInlineCore<T>(
this.GetDatabaseQueryIteratorHelper<T>(
queryDefinition,
continuationToken,
requestOptions));
this.GetDatabaseQueryIteratorHelper<T>(
queryDefinition,
continuationToken,
requestOptions));
}

/// <summary>
Expand Down Expand Up @@ -771,14 +796,19 @@ public virtual Task<ResponseMessage> CreateDatabaseStreamAsync(
throw new ArgumentNullException(nameof(databaseProperties));
}

this.ClientContext.ValidateResource(databaseProperties.Id);
Stream streamPayload = this.ClientContext.SerializerCore.ToStream<DatabaseProperties>(databaseProperties);

return TaskHelper.RunInlineIfNeededAsync(() => this.CreateDatabaseStreamInternalAsync(
streamPayload,
throughput,
requestOptions,
cancellationToken));
return this.ClientContext.OperationHelperAsync(
nameof(CreateDatabaseStreamAsync),
requestOptions,
(diagnostics) =>
{
this.ClientContext.ValidateResource(databaseProperties.Id);
return this.CreateDatabaseStreamInternalAsync(
diagnostics,
databaseProperties,
ThroughputProperties.CreateManualThroughput(throughput),
requestOptions,
cancellationToken);
});
}

internal virtual async Task<ConsistencyLevel> GetAccountConsistencyLevelAsync()
Expand Down Expand Up @@ -835,21 +865,27 @@ internal virtual Task<ResponseMessage> CreateDatabaseStreamAsync(
throw new ArgumentNullException(nameof(databaseProperties));
}

this.ClientContext.ValidateResource(databaseProperties.Id);
Stream streamPayload = this.ClientContext.SerializerCore.ToStream<DatabaseProperties>(databaseProperties);

return TaskHelper.RunInlineIfNeededAsync(() => this.CreateDatabaseStreamInternalAsync(
streamPayload,
throughputProperties,
return this.ClientContext.OperationHelperAsync(
nameof(CreateDatabaseIfNotExistsAsync),
requestOptions,
cancellationToken));
(diagnostics) =>
{
this.ClientContext.ValidateResource(databaseProperties.Id);
return this.CreateDatabaseStreamInternalAsync(
diagnostics,
databaseProperties,
throughputProperties,
requestOptions,
cancellationToken);
});
}

internal async Task<DatabaseResponse> CreateDatabaseAsync(
DatabaseProperties databaseProperties,
ThroughputProperties throughputProperties,
RequestOptions requestOptions = null,
CancellationToken cancellationToken = default(CancellationToken))
private async Task<DatabaseResponse> CreateDatabaseInternalAsync(
CosmosDiagnosticsContext diagnosticsContext,
DatabaseProperties databaseProperties,
ThroughputProperties throughputProperties,
RequestOptions requestOptions,
CancellationToken cancellationToken)
{
ResponseMessage response = await this.ClientContext.ProcessResourceOperationStreamAsync(
resourceUri: this.DatabaseRootUri,
Expand All @@ -860,63 +896,30 @@ internal async Task<DatabaseResponse> CreateDatabaseAsync(
partitionKey: null,
streamPayload: this.ClientContext.SerializerCore.ToStream<DatabaseProperties>(databaseProperties),
requestEnricher: (httpRequestMessage) => httpRequestMessage.AddThroughputPropertiesHeader(throughputProperties),
diagnosticsContext: null,
cancellationToken: cancellationToken);

return this.ClientContext.ResponseFactory.CreateDatabaseResponse(this.GetDatabase(databaseProperties.Id), response);
}

internal async Task<DatabaseResponse> CreateDatabaseAsync(
DatabaseProperties databaseProperties,
int? throughput = null,
RequestOptions requestOptions = null,
CancellationToken cancellationToken = default(CancellationToken))
{
ResponseMessage response = await this.CreateDatabaseStreamInternalAsync(
streamPayload: this.ClientContext.SerializerCore.ToStream<DatabaseProperties>(databaseProperties),
throughput: throughput,
requestOptions: requestOptions,
diagnosticsContext: diagnosticsContext,
cancellationToken: cancellationToken);

return this.ClientContext.ResponseFactory.CreateDatabaseResponse(this.GetDatabase(databaseProperties.Id), response);
}

private Task<ResponseMessage> CreateDatabaseStreamInternalAsync(
Stream streamPayload,
int? throughput,
RequestOptions requestOptions,
CancellationToken cancellationToken)
{
ThroughputProperties throughputProperties = null;
if (throughput.HasValue)
{
throughputProperties = ThroughputProperties.CreateManualThroughput(throughput.Value);
}

return this.CreateDatabaseStreamInternalAsync(
streamPayload,
throughputProperties,
requestOptions,
cancellationToken);

}

private Task<ResponseMessage> CreateDatabaseStreamInternalAsync(
Stream streamPayload,
ThroughputProperties throughputProperties,
RequestOptions requestOptions,
CancellationToken cancellationToken)
CosmosDiagnosticsContext diagnosticsContext,
DatabaseProperties databaseProperties,
ThroughputProperties throughputProperties,
RequestOptions requestOptions,
CancellationToken cancellationToken)
{
return this.ClientContext.ProcessResourceOperationStreamAsync(
return this.ClientContext.ProcessResourceOperationAsync(
resourceUri: this.DatabaseRootUri,
resourceType: ResourceType.Database,
operationType: OperationType.Create,
requestOptions: requestOptions,
cosmosContainerCore: null,
containerInternal: null,
partitionKey: null,
streamPayload: streamPayload,
streamPayload: this.ClientContext.SerializerCore.ToStream<DatabaseProperties>(databaseProperties),
requestEnricher: (httpRequestMessage) => httpRequestMessage.AddThroughputPropertiesHeader(throughputProperties),
diagnosticsContext: null,
responseCreator: (response) => response,
diagnosticsContext: diagnosticsContext,
cancellationToken: cancellationToken);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public CosmosClientSideRequestStatistics(CosmosDiagnosticsContext diagnosticsCon
this.ContactedReplicas = new List<Uri>();
this.FailedReplicas = new HashSet<Uri>();
this.RegionsContacted = new HashSet<Uri>();
this.DiagnosticsContext = diagnosticsContext ?? new CosmosDiagnosticsContextCore();
this.DiagnosticsContext = diagnosticsContext ?? CosmosDiagnosticsContextCore.Create(requestOptions: null);
kirankumarkolli marked this conversation as resolved.
Show resolved Hide resolved
this.DiagnosticsContext.AddDiagnosticsInternal(this);
this.clientSideRequestStatisticsCreateTime = Stopwatch.GetTimestamp();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,25 @@ internal abstract class CosmosDiagnosticsContext : CosmosDiagnosticsInternal, IE
{
public abstract DateTime StartUtc { get; }

public abstract int TotalRequestCount { get; protected set; }
public abstract string UserAgent { get; }

public abstract int FailedRequestCount { get; protected set; }

public abstract string UserAgent { get; protected set; }
public abstract string OperationName { get; }

internal abstract CosmosDiagnostics Diagnostics { get; }

public abstract int GetTotalRequestCount();

public abstract int GetFailedRequestCount();

internal abstract IDisposable GetOverallScope();

internal abstract IDisposable CreateScope(string name);

internal abstract IDisposable CreateRequestHandlerScopeScope(RequestHandler requestHandler);

internal abstract TimeSpan GetClientElapsedTime();
internal abstract TimeSpan GetRunningElapsedTime();

internal abstract bool TryGetTotalElapsedTime(out TimeSpan timeSpan);

internal abstract bool IsComplete();

Expand All @@ -50,18 +54,29 @@ internal abstract class CosmosDiagnosticsContext : CosmosDiagnosticsInternal, IE

internal abstract void AddDiagnosticsInternal(CosmosDiagnosticsContext newContext);

internal abstract void SetSdkUserAgent(string userAgent);

public abstract IEnumerator<CosmosDiagnosticsInternal> GetEnumerator();

IEnumerator IEnumerable.GetEnumerator()
{
return this.GetEnumerator();
}

internal static CosmosDiagnosticsContext Create(RequestOptions requestOptions)
internal static CosmosDiagnosticsContext Create(
RequestOptions requestOptions)
{
return requestOptions?.DiagnosticContextFactory?.Invoke() ??
new CosmosDiagnosticsContextCore();
}

internal static CosmosDiagnosticsContext Create(
string operationName,
RequestOptions requestOptions,
string userAgentString)
{
return requestOptions?.DiagnosticContextFactory?.Invoke() ?? new CosmosDiagnosticsContextCore();
return requestOptions?.DiagnosticContextFactory?.Invoke() ??
new CosmosDiagnosticsContextCore(
operationName,
userAgentString);
}
}
}
Loading