Skip to content

Commit

Permalink
GetItemLinqQueryable now works with null query (#561)
Browse files Browse the repository at this point in the history
* Linq query now support read feed

* Fixed isContinuationExcpected value

* Updated changelog

* Updated method name

* updated naming
  • Loading branch information
j82w authored and kirankumarkolli committed Jul 17, 2019
1 parent 7e744fe commit fc14786
Show file tree
Hide file tree
Showing 6 changed files with 170 additions and 86 deletions.
9 changes: 8 additions & 1 deletion Microsoft.Azure.Cosmos/src/Linq/CosmosLinqExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,14 @@ public static bool IsPrimitive(this object obj)
/// </example>
internal static string ToSqlQueryText<T>(this IQueryable<T> query)
{
return ((CosmosLinqQuery<T>)query).ToSqlQueryText();
CosmosLinqQuery<T> linqQuery = query as CosmosLinqQuery<T>;

if (linqQuery == null)
{
throw new ArgumentOutOfRangeException(nameof(linqQuery), "ToSqlQueryText is only supported on cosmos LINQ query operations");
}

return linqQuery.ToSqlQueryText();
}

/// <summary>
Expand Down
74 changes: 31 additions & 43 deletions Microsoft.Azure.Cosmos/src/Linq/CosmosLinqQuery.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@ namespace Microsoft.Azure.Cosmos.Linq
using System.Linq.Expressions;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos.Query;
using Microsoft.Azure.Documents;
using Newtonsoft.Json;

/// <summary>
Expand All @@ -21,54 +19,54 @@ namespace Microsoft.Azure.Cosmos.Linq
/// <seealso cref="CosmosLinqQueryProvider"/>
internal sealed class CosmosLinqQuery<T> : IDocumentQuery<T>, IOrderedQueryable<T>
{
private readonly Expression expression;
private readonly CosmosLinqQueryProvider queryProvider;
private readonly Guid correlatedActivityId;

private readonly ContainerCore container;
private readonly CosmosQueryClientCore queryClient;
private readonly CosmosSerializer cosmosJsonSerializer;
private readonly CosmosResponseFactory responseFactory;
private readonly QueryRequestOptions cosmosQueryRequestOptions;
private readonly bool allowSynchronousQueryExecution = false;
private readonly string continuationToken;

public CosmosLinqQuery(
ContainerCore container,
CosmosSerializer cosmosJsonSerializer,
CosmosResponseFactory responseFactory,
CosmosQueryClientCore queryClient,
string continuationToken,
QueryRequestOptions cosmosQueryRequestOptions,
Expression expression,
bool allowSynchronousQueryExecution)
{
this.container = container ?? throw new ArgumentNullException(nameof(container));
this.cosmosJsonSerializer = cosmosJsonSerializer ?? throw new ArgumentNullException(nameof(cosmosJsonSerializer));
this.responseFactory = responseFactory ?? throw new ArgumentNullException(nameof(responseFactory));
this.queryClient = queryClient ?? throw new ArgumentNullException(nameof(queryClient));
this.continuationToken = continuationToken;
this.cosmosQueryRequestOptions = cosmosQueryRequestOptions;
this.expression = expression ?? Expression.Constant(this);
this.Expression = expression ?? Expression.Constant(this);
this.allowSynchronousQueryExecution = allowSynchronousQueryExecution;
this.correlatedActivityId = Guid.NewGuid();

this.queryProvider = new CosmosLinqQueryProvider(
this.container,
this.cosmosJsonSerializer,
this.queryClient,
container,
responseFactory,
queryClient,
this.continuationToken,
this.cosmosQueryRequestOptions,
cosmosQueryRequestOptions,
this.allowSynchronousQueryExecution,
this.queryClient.OnExecuteScalarQueryCallback);
this.correlatedActivityId = Guid.NewGuid();
}

public CosmosLinqQuery(
ContainerCore container,
CosmosSerializer cosmosJsonSerializer,
CosmosResponseFactory responseFactory,
CosmosQueryClientCore queryClient,
string continuationToken,
QueryRequestOptions cosmosQueryRequestOptions,
bool allowSynchronousQueryExecution)
: this(
container,
cosmosJsonSerializer,
responseFactory,
queryClient,
continuationToken,
cosmosQueryRequestOptions,
Expand All @@ -79,7 +77,7 @@ public CosmosLinqQuery(

public Type ElementType => typeof(T);

public Expression Expression => this.expression;
public Expression Expression { get; }

public IQueryProvider Provider => this.queryProvider;

Expand All @@ -100,13 +98,12 @@ public IEnumerator<T> GetEnumerator()
" use GetItemsQueryIterator to execute asynchronously");
}

FeedIterator localQueryExecutionContext = this.CreateCosmosQueryExecutionContext();
while (localQueryExecutionContext.HasMoreResults)
FeedIterator<T> localFeedIterator = this.CreateFeedIterator(false);
while (localFeedIterator.HasMoreResults)
{
#pragma warning disable VSTHRD002 // Avoid problematic synchronous waits
ResponseMessage responseMessage = TaskHelper.InlineIfPossible(() => localQueryExecutionContext.ReadNextAsync(CancellationToken.None), null).GetAwaiter().GetResult();
FeedResponse<T> items = TaskHelper.InlineIfPossible(() => localFeedIterator.ReadNextAsync(CancellationToken.None), null).GetAwaiter().GetResult();
#pragma warning disable VSTHRD002 // Avoid problematic synchronous waits
FeedResponse<T> items = this.container.ClientContext.ResponseFactory.CreateQueryFeedResponse<T>(responseMessage);

foreach (T item in items)
{
Expand All @@ -126,7 +123,7 @@ IEnumerator IEnumerable.GetEnumerator()

public override string ToString()
{
SqlQuerySpec querySpec = DocumentQueryEvaluator.Evaluate(this.expression);
SqlQuerySpec querySpec = DocumentQueryEvaluator.Evaluate(this.Expression);
if (querySpec != null)
{
return JsonConvert.SerializeObject(querySpec);
Expand All @@ -137,21 +134,13 @@ public override string ToString()

public string ToSqlQueryText()
{
SqlQuerySpec querySpec = DocumentQueryEvaluator.Evaluate(this.expression);
if (querySpec != null)
{
return (querySpec.QueryText);
}

return this.container.LinkUri.ToString();
SqlQuerySpec querySpec = DocumentQueryEvaluator.Evaluate(this.Expression);
return querySpec?.QueryText;
}

public FeedIterator<T> ToFeedIterator()
{
return this.container.GetItemQueryIterator<T>(
queryDefinition: new QueryDefinition(this.ToSqlQueryText()),
continuationToken: this.continuationToken,
requestOptions: this.cosmosQueryRequestOptions);
return this.CreateFeedIterator(true);
}

public void Dispose()
Expand All @@ -169,20 +158,19 @@ Task<DocumentFeedResponse<dynamic>> IDocumentQuery<T>.ExecuteNextAsync(Cancellat
throw new NotImplementedException();
}

private FeedIterator CreateCosmosQueryExecutionContext()
private FeedIterator<T> CreateFeedIterator(bool isContinuationExcpected)
{
return new CosmosQueryExecutionContextFactory(
client: this.queryClient,
resourceTypeEnum: ResourceType.Document,
operationType: OperationType.Query,
resourceType: typeof(T),
sqlQuerySpec: DocumentQueryEvaluator.Evaluate(this.expression),
SqlQuerySpec querySpec = DocumentQueryEvaluator.Evaluate(this.Expression);

FeedIterator streamIterator = this.container.GetItemQueryStreamIteratorInternal(
sqlQuerySpec: querySpec,
isContinuationExcpected: isContinuationExcpected,
continuationToken: this.continuationToken,
queryRequestOptions: this.cosmosQueryRequestOptions,
resourceLink: this.container.LinkUri,
isContinuationExpected: false,
allowNonValueAggregateQuery: true,
correlatedActivityId: Guid.NewGuid());
requestOptions: this.cosmosQueryRequestOptions);

return new FeedIteratorCore<T>(
streamIterator,
this.responseFactory.CreateQueryFeedResponse<T>);
}
}
}
14 changes: 7 additions & 7 deletions Microsoft.Azure.Cosmos/src/Linq/CosmosLinqQueryProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,23 +16,23 @@ internal sealed class CosmosLinqQueryProvider : IQueryProvider
{
private readonly ContainerCore container;
private readonly CosmosQueryClientCore queryClient;
private readonly CosmosSerializer cosmosJsonSerializer;
private readonly CosmosResponseFactory responseFactory;
private readonly QueryRequestOptions cosmosQueryRequestOptions;
private readonly bool allowSynchronousQueryExecution;
private readonly Action<IQueryable> onExecuteScalarQueryCallback;
private readonly string continuationToken;

public CosmosLinqQueryProvider(
ContainerCore container,
CosmosSerializer cosmosJsonSerializer,
CosmosResponseFactory responseFactory,
CosmosQueryClientCore queryClient,
string continuationToken,
QueryRequestOptions cosmosQueryRequestOptions,
bool allowSynchronousQueryExecution,
Action<IQueryable> onExecuteScalarQueryCallback = null)
{
this.container = container;
this.cosmosJsonSerializer = cosmosJsonSerializer;
this.responseFactory = responseFactory;
this.queryClient = queryClient;
this.continuationToken = continuationToken;
this.cosmosQueryRequestOptions = cosmosQueryRequestOptions;
Expand All @@ -44,7 +44,7 @@ public IQueryable<TElement> CreateQuery<TElement>(Expression expression)
{
return new CosmosLinqQuery<TElement>(
this.container,
this.cosmosJsonSerializer,
this.responseFactory,
this.queryClient,
this.continuationToken,
this.cosmosQueryRequestOptions,
Expand All @@ -59,7 +59,7 @@ public IQueryable CreateQuery(Expression expression)
return (IQueryable)Activator.CreateInstance(
documentQueryType,
this.container,
this.cosmosJsonSerializer,
this.responseFactory,
this.queryClient,
this.continuationToken,
this.cosmosQueryRequestOptions,
Expand All @@ -73,7 +73,7 @@ public TResult Execute<TResult>(Expression expression)
CosmosLinqQuery<TResult> cosmosLINQQuery = (CosmosLinqQuery<TResult>)Activator.CreateInstance(
cosmosQueryType,
this.container,
this.cosmosJsonSerializer,
this.responseFactory,
this.queryClient,
this.continuationToken,
this.cosmosQueryRequestOptions,
Expand All @@ -90,7 +90,7 @@ public object Execute(Expression expression)
CosmosLinqQuery<object> cosmosLINQQuery = (CosmosLinqQuery<object>)Activator.CreateInstance(
cosmosQueryType,
this.container,
this.cosmosJsonSerializer,
this.responseFactory,
this.queryClient,
this.continuationToken,
this.cosmosQueryRequestOptions,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -241,36 +241,11 @@ public override FeedIterator GetItemQueryStreamIterator(
string continuationToken = null,
QueryRequestOptions requestOptions = null)
{
requestOptions = requestOptions ?? new QueryRequestOptions();

if (requestOptions.IsEffectivePartitionKeyRouting)
{
requestOptions.PartitionKey = null;
}

if (queryDefinition == null)
{
return new FeedIteratorCore(
this.ClientContext,
this.LinkUri,
resourceType: ResourceType.Document,
queryDefinition: null,
continuationToken: continuationToken,
options: requestOptions);
}

return new CosmosQueryExecutionContextFactory(
client: this.queryClient,
resourceTypeEnum: ResourceType.Document,
operationType: OperationType.Query,
resourceType: typeof(QueryResponse),
sqlQuerySpec: queryDefinition.ToSqlQuerySpec(),
return this.GetItemQueryStreamIteratorInternal(
sqlQuerySpec: queryDefinition?.ToSqlQuerySpec(),
isContinuationExcpected: true,
continuationToken: continuationToken,
queryRequestOptions: requestOptions,
resourceLink: this.LinkUri,
isContinuationExpected: true,
allowNonValueAggregateQuery: true,
correlatedActivityId: Guid.NewGuid());
requestOptions: requestOptions);
}

public override FeedIterator<T> GetItemQueryIterator<T>(
Expand Down Expand Up @@ -321,7 +296,7 @@ public override IOrderedQueryable<T> GetItemLinqQueryable<T>(

return new CosmosLinqQuery<T>(
this,
this.ClientContext.CosmosSerializer,
this.ClientContext.ResponseFactory,
(CosmosQueryClientCore)this.queryClient,
continuationToken,
requestOptions,
Expand Down Expand Up @@ -397,6 +372,49 @@ internal FeedIterator GetStandByFeedIterator(
options: cosmosQueryRequestOptions);
}

/// <summary>
/// Helper method to create a stream feed iterator.
/// It decides if it is a query or read feed and create
/// the correct instance.
/// </summary>
internal FeedIterator GetItemQueryStreamIteratorInternal(
SqlQuerySpec sqlQuerySpec,
bool isContinuationExcpected,
string continuationToken,
QueryRequestOptions requestOptions)
{
requestOptions = requestOptions ?? new QueryRequestOptions();

if (requestOptions.IsEffectivePartitionKeyRouting)
{
requestOptions.PartitionKey = null;
}

if (sqlQuerySpec == null)
{
return new FeedIteratorCore(
this.ClientContext,
this.LinkUri,
resourceType: ResourceType.Document,
queryDefinition: null,
continuationToken: continuationToken,
options: requestOptions);
}

return new CosmosQueryExecutionContextFactory(
client: this.queryClient,
resourceTypeEnum: ResourceType.Document,
operationType: OperationType.Query,
resourceType: typeof(QueryResponse),
sqlQuerySpec: sqlQuerySpec,
continuationToken: continuationToken,
queryRequestOptions: requestOptions,
resourceLink: this.LinkUri,
isContinuationExpected: isContinuationExcpected,
allowNonValueAggregateQuery: true,
correlatedActivityId: Guid.NewGuid());
}

// Extracted partition key might be invalid as CollectionCache might be stale.
// Stale collection cache is refreshed through PartitionKeyMismatchRetryPolicy
// and partition-key is extracted again.
Expand Down
Loading

0 comments on commit fc14786

Please sign in to comment.