Skip to content

Commit

Permalink
Adding continuation token support for LINQ (#544)
Browse files Browse the repository at this point in the history
* closes #537

* adding docs on ToFeedIterator() extention method documentation

* adding change log
  • Loading branch information
simplynaveen20 authored and kirankumarkolli committed Jul 16, 2019
1 parent af6a83a commit a282352
Show file tree
Hide file tree
Showing 7 changed files with 76 additions and 6 deletions.
1 change: 1 addition & 0 deletions Microsoft.Azure.Cosmos/src/Linq/CosmosLinqExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ internal static string ToSqlQueryText<T>(this IQueryable<T> query)

/// <summary>
/// This extension method gets the FeedIterator from LINQ IQueryable to execute query asynchronously.
/// This will create the fresh new FeedIterator when called.
/// </summary>
/// <typeparam name="T">the type of object to query.</typeparam>
/// <param name="query">the IQueryable{T} to be converted.</param>
Expand Down
18 changes: 12 additions & 6 deletions Microsoft.Azure.Cosmos/src/Linq/CosmosLinqQuery.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,26 +30,30 @@ internal sealed class CosmosLinqQuery<T> : IDocumentQuery<T>, IOrderedQueryable<
private readonly CosmosSerializer cosmosJsonSerializer;
private readonly QueryRequestOptions cosmosQueryRequestOptions;
private readonly bool allowSynchronousQueryExecution = false;
private readonly string continuationToken;

public CosmosLinqQuery(
ContainerCore container,
CosmosSerializer cosmosJsonSerializer,
CosmosQueryClientCore queryClient,
string continuationToken,
QueryRequestOptions cosmosQueryRequestOptions,
Expression expression,
bool allowSynchronousQueryExecution)
{
this.container = container ?? throw new ArgumentNullException(nameof(container));
this.cosmosJsonSerializer = cosmosJsonSerializer ?? throw new ArgumentNullException(nameof(cosmosJsonSerializer));
this.queryClient = queryClient ?? throw new ArgumentNullException(nameof(queryClient));
this.continuationToken = continuationToken;
this.cosmosQueryRequestOptions = cosmosQueryRequestOptions;
this.expression = expression ?? Expression.Constant(this);
this.allowSynchronousQueryExecution = allowSynchronousQueryExecution;
this.queryProvider = new CosmosLinqQueryProvider(
container,
cosmosJsonSerializer,
queryClient,
cosmosQueryRequestOptions,
this.container,
this.cosmosJsonSerializer,
this.queryClient,
this.continuationToken,
this.cosmosQueryRequestOptions,
this.allowSynchronousQueryExecution,
this.queryClient.OnExecuteScalarQueryCallback);
this.correlatedActivityId = Guid.NewGuid();
Expand All @@ -59,12 +63,14 @@ public CosmosLinqQuery(
ContainerCore container,
CosmosSerializer cosmosJsonSerializer,
CosmosQueryClientCore queryClient,
string continuationToken,
QueryRequestOptions cosmosQueryRequestOptions,
bool allowSynchronousQueryExecution)
: this(
container,
cosmosJsonSerializer,
queryClient,
continuationToken,
cosmosQueryRequestOptions,
null,
allowSynchronousQueryExecution)
Expand Down Expand Up @@ -144,7 +150,7 @@ public FeedIterator<T> ToFeedIterator()
{
return this.container.GetItemQueryIterator<T>(
queryDefinition: new QueryDefinition(this.ToSqlQueryText()),
continuationToken: null,
continuationToken: this.continuationToken,
requestOptions: this.cosmosQueryRequestOptions);
}

Expand All @@ -171,7 +177,7 @@ private FeedIterator CreateCosmosQueryExecutionContext()
operationType: OperationType.Query,
resourceType: typeof(T),
sqlQuerySpec: DocumentQueryEvaluator.Evaluate(this.expression),
continuationToken: null,
continuationToken: this.continuationToken,
queryRequestOptions: this.cosmosQueryRequestOptions,
resourceLink: this.container.LinkUri,
isContinuationExpected: false,
Expand Down
7 changes: 7 additions & 0 deletions Microsoft.Azure.Cosmos/src/Linq/CosmosLinqQueryProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,21 @@ internal sealed class CosmosLinqQueryProvider : IQueryProvider
private readonly QueryRequestOptions cosmosQueryRequestOptions;
private readonly bool allowSynchronousQueryExecution;
private readonly Action<IQueryable> onExecuteScalarQueryCallback;
private readonly string continuationToken;

public CosmosLinqQueryProvider(
ContainerCore container,
CosmosSerializer cosmosJsonSerializer,
CosmosQueryClientCore queryClient,
string continuationToken,
QueryRequestOptions cosmosQueryRequestOptions,
bool allowSynchronousQueryExecution,
Action<IQueryable> onExecuteScalarQueryCallback = null)
{
this.container = container;
this.cosmosJsonSerializer = cosmosJsonSerializer;
this.queryClient = queryClient;
this.continuationToken = continuationToken;
this.cosmosQueryRequestOptions = cosmosQueryRequestOptions;
this.allowSynchronousQueryExecution = allowSynchronousQueryExecution;
this.onExecuteScalarQueryCallback = onExecuteScalarQueryCallback;
Expand All @@ -43,6 +46,7 @@ public IQueryable<TElement> CreateQuery<TElement>(Expression expression)
this.container,
this.cosmosJsonSerializer,
this.queryClient,
this.continuationToken,
this.cosmosQueryRequestOptions,
expression,
this.allowSynchronousQueryExecution);
Expand All @@ -57,6 +61,7 @@ public IQueryable CreateQuery(Expression expression)
this.container,
this.cosmosJsonSerializer,
this.queryClient,
this.continuationToken,
this.cosmosQueryRequestOptions,
expression,
this.allowSynchronousQueryExecution);
Expand All @@ -70,6 +75,7 @@ public TResult Execute<TResult>(Expression expression)
this.container,
this.cosmosJsonSerializer,
this.queryClient,
this.continuationToken,
this.cosmosQueryRequestOptions,
expression,
this.allowSynchronousQueryExecution);
Expand All @@ -86,6 +92,7 @@ public object Execute(Expression expression)
this.container,
this.cosmosJsonSerializer,
this.queryClient,
this.continuationToken,
this.cosmosQueryRequestOptions,
this.allowSynchronousQueryExecution);
this.onExecuteScalarQueryCallback?.Invoke(cosmosLINQQuery);
Expand Down
2 changes: 2 additions & 0 deletions Microsoft.Azure.Cosmos/src/Resource/Container/Container.cs
Original file line number Diff line number Diff line change
Expand Up @@ -984,6 +984,7 @@ public abstract FeedIterator<T> GetItemQueryIterator<T>(
/// </remarks>
/// <typeparam name="T">The type of object to query.</typeparam>
/// <param name="allowSynchronousQueryExecution">(Optional)the option which allows the query to be executed synchronously via IOrderedQueryable.</param>
/// <param name="continuationToken">(Optional) The continuation token in the Azure Cosmos DB service.</param>
/// <param name="requestOptions">(Optional)The options for the item query request.<see cref="QueryRequestOptions"/></param>
/// <returns>(Optional) An IOrderedQueryable{T} that can evaluate the query.</returns>
/// <example>
Expand Down Expand Up @@ -1053,6 +1054,7 @@ public abstract FeedIterator<T> GetItemQueryIterator<T>(
/// </remarks>
public abstract IOrderedQueryable<T> GetItemLinqQueryable<T>(
bool allowSynchronousQueryExecution = false,
string continuationToken = null,
QueryRequestOptions requestOptions = null);

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,7 @@ public override FeedIterator<T> GetItemQueryIterator<T>(

public override IOrderedQueryable<T> GetItemLinqQueryable<T>(
bool allowSynchronousQueryExecution = false,
string continuationToken = null,
QueryRequestOptions requestOptions = null)
{
requestOptions = requestOptions != null ? requestOptions : new QueryRequestOptions();
Expand All @@ -322,6 +323,7 @@ public override IOrderedQueryable<T> GetItemLinqQueryable<T>(
this,
this.ClientContext.CosmosSerializer,
(CosmosQueryClientCore)this.queryClient,
continuationToken,
requestOptions,
allowSynchronousQueryExecution);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1301,6 +1301,53 @@ public async Task ItemLINQQueryTest()
Assert.IsTrue(exception.Message.Contains("To execute LINQ query please set allowSynchronousQueryExecution true"));
}
}

[TestMethod]
public async Task ItemLINQQueryWithContinuationTokenTest()
{
//Creating items for query.
IList<ToDoActivity> itemList = await CreateRandomItems(pkCount: 10, perPKItemCount: 1, randomPartitionKey: true);

QueryRequestOptions queryRequestOptions = new QueryRequestOptions();
queryRequestOptions.MaxConcurrency = 1;
queryRequestOptions.MaxItemCount = 5;
IOrderedQueryable<ToDoActivity> linqQueryable = this.Container.GetItemLinqQueryable<ToDoActivity>(requestOptions: queryRequestOptions);
IQueryable<ToDoActivity> queriable = linqQueryable.Where(item => (item.taskNum < 100));
FeedIterator<ToDoActivity> feedIterator = queriable.ToFeedIterator();

int firstItemSet = 0;
string continuationToken = null;
while (feedIterator.HasMoreResults)
{
FeedResponse<ToDoActivity> feedResponse = await feedIterator.ReadNextAsync();
firstItemSet = feedResponse.Count();
continuationToken = feedResponse.ContinuationToken;
if(firstItemSet > 0)
{
break;
}
}

linqQueryable = this.Container.GetItemLinqQueryable<ToDoActivity>(continuationToken: continuationToken, requestOptions: queryRequestOptions);
queriable = linqQueryable.Where(item => (item.taskNum < 100));
feedIterator = queriable.ToFeedIterator();

//Test continuationToken with LINQ query generation and asynchronous feedIterator execution.
int secondItemSet = 0;
while (feedIterator.HasMoreResults)
{
FeedResponse<ToDoActivity> feedResponse = await feedIterator.ReadNextAsync();
secondItemSet += feedResponse.Count();
}

Assert.AreEqual(10 - firstItemSet, secondItemSet);

//Test continuationToken with blocking LINQ execution
linqQueryable = this.Container.GetItemLinqQueryable<ToDoActivity>(allowSynchronousQueryExecution:true, continuationToken: continuationToken, requestOptions: queryRequestOptions);
int linqExecutionItemCount = linqQueryable.Where(item => (item.taskNum < 100)).Count();
Assert.AreEqual(10 - firstItemSet, linqExecutionItemCount);
}

// Move the data from None Partition to other logical partitions
[TestMethod]
public async Task MigrateDataInNonPartitionContainer()
Expand Down
5 changes: 5 additions & 0 deletions changelog.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
## Changes in [3.1.0](https://www.nuget.org/packages/Microsoft.Azure.Cosmos/3.0.0) : ##

* Fixes [#544](https://github.com/Azure/azure-cosmos-dotnet-v3/pull/544) Adding continuation token support for LINQ


## Changes in [3.0.0](https://www.nuget.org/packages/Microsoft.Azure.Cosmos/3.0.0) : ##

* General availability of [Version 3.0.0](https://www.nuget.org/packages/Microsoft.Azure.Cosmos/) of the .NET SDK
Expand Down

0 comments on commit a282352

Please sign in to comment.