Skip to content

Commit

Permalink
Query: Adds distributed query gateway client pipeline stage (#4556)
Browse files Browse the repository at this point in the history
* Add a DistributedQuery gateway mode to the v3 sdk

Fix up bug in the continuation token and plumb the page size. Add an integration test

Make distributed query pipeline kick in only for document queries. Make it backwards compatible with older continuation tokens. Add integration tests.

Revert change in TransactionBatchOperationResult

Fix up missing include in OptimisticDirectExecutionQueryBaselineTests

* Incorporate code review feedback

Hack together a parity test for queries that require distribution

Add more partitioned parity tests

A tiny bit of code clean up

rename PaginationOptions to ExecutionOptions

Rename PaginationOptions to ExecutionOptions

rename pagination options to execution options

remove warning disable pragmas

Commit missed changes

* Fix up build issues due to bad merge

* Add another overload for NetworkAttachedDocumentContainer that does not take a distributed query client

* Incorporate code review feedback

* Fix up baseline to account for the class rename
  • Loading branch information
neildsh authored Jul 2, 2024
1 parent 0ae1823 commit 5287ffb
Show file tree
Hide file tree
Showing 66 changed files with 1,454 additions and 545 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ internal ResponseMessage ToResponseMessage(ContainerInternal cosmosContainerCore
RequestMessage requestMessage = new ()
{
ContainerId = cosmosContainerCore?.Id,
DatabaseId = cosmosContainerCore?.Database?.Id,
DatabaseId = cosmosContainerCore?.Database?.Id,
Trace = null
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,18 @@ namespace Microsoft.Azure.Cosmos.ChangeFeed
internal sealed class ChangeFeedCrossFeedRangeAsyncEnumerable : IAsyncEnumerable<TryCatch<ChangeFeedPage>>
{
private readonly IDocumentContainer documentContainer;
private readonly ChangeFeedPaginationOptions changeFeedPaginationOptions;
private readonly ChangeFeedExecutionOptions changeFeedPaginationOptions;
private readonly ChangeFeedCrossFeedRangeState state;
private readonly JsonSerializationFormatOptions jsonSerializationFormatOptions;

public ChangeFeedCrossFeedRangeAsyncEnumerable(
IDocumentContainer documentContainer,
ChangeFeedCrossFeedRangeState state,
ChangeFeedPaginationOptions changeFeedPaginationOptions,
ChangeFeedExecutionOptions changeFeedPaginationOptions,
JsonSerializationFormatOptions jsonSerializationFormatOptions = null)
{
this.documentContainer = documentContainer ?? throw new ArgumentNullException(nameof(documentContainer));
this.changeFeedPaginationOptions = changeFeedPaginationOptions ?? ChangeFeedPaginationOptions.Default;
this.changeFeedPaginationOptions = changeFeedPaginationOptions ?? ChangeFeedExecutionOptions.Default;
this.state = state;
this.jsonSerializationFormatOptions = jsonSerializationFormatOptions;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ public ChangeFeedIteratorCore(
CrossPartitionChangeFeedAsyncEnumerator enumerator = CrossPartitionChangeFeedAsyncEnumerator.Create(
documentContainer,
new CrossFeedRangeState<ChangeFeedState>(monadicChangeFeedCrossFeedRangeState.Result.FeedRangeStates),
new ChangeFeedPaginationOptions(
new ChangeFeedExecutionOptions(
changeFeedMode,
changeFeedRequestOptions?.PageSizeHint,
changeFeedRequestOptions?.JsonSerializationFormatOptions?.JsonSerializationFormat,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,11 @@ namespace Microsoft.Azure.Cosmos.ChangeFeed.Pagination
using System.Linq;
using Microsoft.Azure.Cosmos.Json;
using Microsoft.Azure.Cosmos.Pagination;
using Microsoft.Azure.Cosmos.Query.Core;
using Microsoft.Azure.Documents;

internal sealed class ChangeFeedPaginationOptions : PaginationOptions
using Microsoft.Azure.Documents;

internal sealed class ChangeFeedExecutionOptions : ExecutionOptions
{
public static readonly ChangeFeedPaginationOptions Default = new ChangeFeedPaginationOptions(
public static readonly ChangeFeedExecutionOptions Default = new ChangeFeedExecutionOptions(
mode: ChangeFeedMode.Incremental);

public static readonly ImmutableHashSet<string> BannedHeaders = new HashSet<string>()
Expand All @@ -24,10 +23,10 @@ internal sealed class ChangeFeedPaginationOptions : PaginationOptions
HttpConstants.HttpHeaders.IfModifiedSince,
HttpConstants.HttpHeaders.IfNoneMatch,
}
.Concat(PaginationOptions.bannedAdditionalHeaders)
.Concat(ExecutionOptions.bannedAdditionalHeaders)
.ToImmutableHashSet();

public ChangeFeedPaginationOptions(
public ChangeFeedExecutionOptions(
ChangeFeedMode mode,
int? pageSizeHint = null,
JsonSerializationFormat? jsonSerializationFormat = null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@ namespace Microsoft.Azure.Cosmos.ChangeFeed.Pagination
internal sealed class ChangeFeedPartitionRangePageAsyncEnumerator : PartitionRangePageAsyncEnumerator<ChangeFeedPage, ChangeFeedState>
{
private readonly IChangeFeedDataSource changeFeedDataSource;
private readonly ChangeFeedPaginationOptions changeFeedPaginationOptions;
private readonly ChangeFeedExecutionOptions changeFeedPaginationOptions;

public ChangeFeedPartitionRangePageAsyncEnumerator(
IChangeFeedDataSource changeFeedDataSource,
FeedRangeState<ChangeFeedState> feedRangeState,
ChangeFeedPaginationOptions changeFeedPaginationOptions)
ChangeFeedExecutionOptions changeFeedPaginationOptions)
: base(feedRangeState)
{
this.changeFeedDataSource = changeFeedDataSource ?? throw new ArgumentNullException(nameof(changeFeedDataSource));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,9 +129,9 @@ public async ValueTask<bool> MoveNextAsync(ITrace trace, CancellationToken cance
public static CrossPartitionChangeFeedAsyncEnumerator Create(
IDocumentContainer documentContainer,
CrossFeedRangeState<ChangeFeedState> state,
ChangeFeedPaginationOptions changeFeedPaginationOptions)
ChangeFeedExecutionOptions changeFeedPaginationOptions)
{
changeFeedPaginationOptions ??= ChangeFeedPaginationOptions.Default;
changeFeedPaginationOptions ??= ChangeFeedExecutionOptions.Default;

if (documentContainer == null)
{
Expand Down Expand Up @@ -164,7 +164,7 @@ private static bool IsNextRangeEqualToOriginal(

private static CreatePartitionRangePageAsyncEnumerator<ChangeFeedPage, ChangeFeedState> MakeCreateFunction(
IChangeFeedDataSource changeFeedDataSource,
ChangeFeedPaginationOptions changeFeedPaginationOptions) => (FeedRangeState<ChangeFeedState> feedRangeState) => new ChangeFeedPartitionRangePageAsyncEnumerator(
ChangeFeedExecutionOptions changeFeedPaginationOptions) => (FeedRangeState<ChangeFeedState> feedRangeState) => new ChangeFeedPartitionRangePageAsyncEnumerator(
changeFeedDataSource,
feedRangeState,
changeFeedPaginationOptions);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ internal interface IChangeFeedDataSource : IMonadicChangeFeedDataSource
{
Task<ChangeFeedPage> ChangeFeedAsync(
FeedRangeState<ChangeFeedState> feedRangeState,
ChangeFeedPaginationOptions changeFeedPaginationOptions,
ChangeFeedExecutionOptions changeFeedPaginationOptions,
ITrace trace,
CancellationToken cancellationToken);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ internal interface IMonadicChangeFeedDataSource
{
Task<TryCatch<ChangeFeedPage>> MonadicChangeFeedAsync(
FeedRangeState<ChangeFeedState> feedRangeState,
ChangeFeedPaginationOptions changeFeedPaginationOptions,
ChangeFeedExecutionOptions changeFeedPaginationOptions,
ITrace trace,
CancellationToken cancellationToken);
}
Expand Down
12 changes: 6 additions & 6 deletions Microsoft.Azure.Cosmos/src/Pagination/DocumentContainer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ public Task<Record> ReadItemAsync(

public Task<TryCatch<ReadFeedPage>> MonadicReadFeedAsync(
FeedRangeState<ReadFeedState> feedRangeState,
ReadFeedPaginationOptions readFeedPaginationOptions,
ReadFeedExecutionOptions readFeedPaginationOptions,
ITrace trace,
CancellationToken cancellationToken) => this.monadicDocumentContainer.MonadicReadFeedAsync(
feedRangeState,
Expand All @@ -112,7 +112,7 @@ public Task<TryCatch<ReadFeedPage>> MonadicReadFeedAsync(

public Task<ReadFeedPage> ReadFeedAsync(
FeedRangeState<ReadFeedState> feedRangeState,
ReadFeedPaginationOptions readFeedPaginationOptions,
ReadFeedExecutionOptions readFeedPaginationOptions,
ITrace trace,
CancellationToken cancellationToken) => TryCatch<ReadFeedPage>.UnsafeGetResultAsync(
this.MonadicReadFeedAsync(
Expand All @@ -125,7 +125,7 @@ public Task<ReadFeedPage> ReadFeedAsync(
public Task<TryCatch<QueryPage>> MonadicQueryAsync(
SqlQuerySpec sqlQuerySpec,
FeedRangeState<QueryState> feedRangeState,
QueryPaginationOptions queryPaginationOptions,
QueryExecutionOptions queryPaginationOptions,
ITrace trace,
CancellationToken cancellationToken) => this.monadicDocumentContainer.MonadicQueryAsync(
sqlQuerySpec,
Expand All @@ -137,7 +137,7 @@ public Task<TryCatch<QueryPage>> MonadicQueryAsync(
public Task<QueryPage> QueryAsync(
SqlQuerySpec sqlQuerySpec,
FeedRangeState<QueryState> feedRangeState,
QueryPaginationOptions queryPaginationOptions,
QueryExecutionOptions queryPaginationOptions,
ITrace trace,
CancellationToken cancellationToken) => TryCatch<QueryPage>.UnsafeGetResultAsync(
this.MonadicQueryAsync(
Expand Down Expand Up @@ -182,7 +182,7 @@ public Task MergeAsync(

public Task<ChangeFeedPage> ChangeFeedAsync(
FeedRangeState<ChangeFeedState> feedRangeState,
ChangeFeedPaginationOptions changeFeedPaginationOptions,
ChangeFeedExecutionOptions changeFeedPaginationOptions,
ITrace trace,
CancellationToken cancellationToken) => TryCatch<ChangeFeedPage>.UnsafeGetResultAsync(
this.MonadicChangeFeedAsync(
Expand All @@ -194,7 +194,7 @@ public Task<ChangeFeedPage> ChangeFeedAsync(

public Task<TryCatch<ChangeFeedPage>> MonadicChangeFeedAsync(
FeedRangeState<ChangeFeedState> state,
ChangeFeedPaginationOptions changeFeedPaginationOptions,
ChangeFeedExecutionOptions changeFeedPaginationOptions,
ITrace trace,
CancellationToken cancellationToken) => this.monadicDocumentContainer.MonadicChangeFeedAsync(
state,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ namespace Microsoft.Azure.Cosmos.Pagination
using System;
using System.Collections.Generic;
using System.Collections.Immutable;
using Microsoft.Azure.Documents;

internal abstract class PaginationOptions
using Microsoft.Azure.Documents;

internal abstract class ExecutionOptions
{
protected static readonly ImmutableHashSet<string> bannedAdditionalHeaders = new HashSet<string>()
{
Expand All @@ -20,7 +20,7 @@ internal abstract class PaginationOptions
private static readonly ImmutableDictionary<string, string> EmptyDictionary = new Dictionary<string, string>()
.ToImmutableDictionary<string, string>();

protected PaginationOptions(
protected ExecutionOptions(
int? pageSizeLimit = null,
IReadOnlyDictionary<string, string> additionalHeaders = null)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,24 +30,47 @@ namespace Microsoft.Azure.Cosmos.Pagination
internal sealed class NetworkAttachedDocumentContainer : IMonadicDocumentContainer
{
private readonly ContainerInternal container;
private readonly CosmosQueryClient cosmosQueryClient;
private readonly CosmosQueryClient cosmosQueryClient;
private readonly ICosmosDistributedQueryClient distributedQueryClient;
private readonly QueryRequestOptions queryRequestOptions;
private readonly ChangeFeedRequestOptions changeFeedRequestOptions;
private readonly string resourceLink;
private readonly ResourceType resourceType;
private readonly Guid correlatedActivityId;
private readonly Guid correlatedActivityId;

public NetworkAttachedDocumentContainer(
ContainerInternal container,
CosmosQueryClient cosmosQueryClient,
Guid correlatedActivityId,
QueryRequestOptions queryRequestOptions = null,
ChangeFeedRequestOptions changeFeedRequestOptions = null,
string resourceLink = null,
ResourceType resourceType = ResourceType.Document)
: this(
container,
cosmosQueryClient,
distributedQueryClient: null,
correlatedActivityId,
queryRequestOptions,
changeFeedRequestOptions,
resourceLink,
resourceType)
{
}

public NetworkAttachedDocumentContainer(
ContainerInternal container,
CosmosQueryClient cosmosQueryClient,
CosmosQueryClient cosmosQueryClient,
ICosmosDistributedQueryClient distributedQueryClient,
Guid correlatedActivityId,
QueryRequestOptions queryRequestOptions = null,
ChangeFeedRequestOptions changeFeedRequestOptions = null,
string resourceLink = null,
ResourceType resourceType = ResourceType.Document)
{
this.container = container ?? throw new ArgumentNullException(nameof(container));
this.cosmosQueryClient = cosmosQueryClient ?? throw new ArgumentNullException(nameof(cosmosQueryClient));
this.cosmosQueryClient = cosmosQueryClient ?? throw new ArgumentNullException(nameof(cosmosQueryClient));
this.distributedQueryClient = distributedQueryClient; // optional
this.queryRequestOptions = queryRequestOptions;
this.changeFeedRequestOptions = changeFeedRequestOptions;
this.resourceLink = resourceLink ?? this.container.LinkUri;
Expand Down Expand Up @@ -172,11 +195,11 @@ public async Task<TryCatch> MonadicRefreshProviderAsync(

public async Task<TryCatch<ReadFeedPage>> MonadicReadFeedAsync(
FeedRangeState<ReadFeedState> feedRangeState,
ReadFeedPaginationOptions readFeedPaginationOptions,
ReadFeedExecutionOptions readFeedPaginationOptions,
ITrace trace,
CancellationToken cancellationToken)
{
readFeedPaginationOptions ??= ReadFeedPaginationOptions.Default;
readFeedPaginationOptions ??= ReadFeedExecutionOptions.Default;

ResponseMessage responseMessage = await this.container.ClientContext.ProcessResourceOperationStreamAsync(
resourceUri: this.resourceLink,
Expand Down Expand Up @@ -232,10 +255,10 @@ public async Task<TryCatch<ReadFeedPage>> MonadicReadFeedAsync(
return monadicReadFeedPage;
}

public async Task<TryCatch<QueryPage>> MonadicQueryAsync(
public Task<TryCatch<QueryPage>> MonadicQueryAsync(
SqlQuerySpec sqlQuerySpec,
FeedRangeState<QueryState> feedRangeState,
QueryPaginationOptions queryPaginationOptions,
QueryExecutionOptions queryPaginationOptions,
ITrace trace,
CancellationToken cancellationToken)
{
Expand All @@ -254,28 +277,45 @@ public async Task<TryCatch<QueryPage>> MonadicQueryAsync(
throw new ArgumentNullException(nameof(trace));
}

QueryRequestOptions queryRequestOptions = this.queryRequestOptions == null ? new QueryRequestOptions() : this.queryRequestOptions;
QueryRequestOptions queryRequestOptions = this.queryRequestOptions ?? new QueryRequestOptions();
AdditionalRequestHeaders additionalRequestHeaders = new AdditionalRequestHeaders(this.correlatedActivityId, isContinuationExpected: false, optimisticDirectExecute: queryPaginationOptions.OptimisticDirectExecute);

TryCatch<QueryPage> monadicQueryPage = await this.cosmosQueryClient.ExecuteItemQueryAsync(
this.resourceLink,
this.resourceType,
Documents.OperationType.Query,
feedRangeState.FeedRange,
queryRequestOptions,
additionalRequestHeaders,
sqlQuerySpec,
feedRangeState.State == null ? null : ((CosmosString)feedRangeState.State.Value).Value,
queryPaginationOptions.PageSizeLimit ?? int.MaxValue,
trace,
cancellationToken);

Task<TryCatch<QueryPage>> monadicQueryPage;
if (this.distributedQueryClient != null &&
queryPaginationOptions.EnableDistributedQueryGatewayMode &&
this.resourceType == Documents.ResourceType.Document)
{
monadicQueryPage = this.distributedQueryClient.MonadicQueryAsync(
queryRequestOptions.PartitionKey,
feedRangeState.FeedRange,
sqlQuerySpec,
feedRangeState.State == null ? null : ((CosmosString)feedRangeState.State.Value).Value,
queryPaginationOptions,
trace,
cancellationToken);
}
else
{
monadicQueryPage = this.cosmosQueryClient.ExecuteItemQueryAsync(
this.resourceLink,
this.resourceType,
Documents.OperationType.Query,
feedRangeState.FeedRange,
queryRequestOptions,
additionalRequestHeaders,
sqlQuerySpec,
feedRangeState.State == null ? null : ((CosmosString)feedRangeState.State.Value).Value,
queryPaginationOptions.PageSizeLimit ?? int.MaxValue,
trace,
cancellationToken);
}

return monadicQueryPage;
}

public async Task<TryCatch<ChangeFeedPage>> MonadicChangeFeedAsync(
FeedRangeState<ChangeFeedState> feedRangeState,
ChangeFeedPaginationOptions changeFeedPaginationOptions,
ChangeFeedExecutionOptions changeFeedPaginationOptions,
ITrace trace,
CancellationToken cancellationToken)
{
Expand Down
Loading

0 comments on commit 5287ffb

Please sign in to comment.