From fc14786f1b0b0f16d51a20bdac2bb75faf130623 Mon Sep 17 00:00:00 2001 From: j82w Date: Wed, 17 Jul 2019 16:28:35 -0700 Subject: [PATCH] GetItemLinqQueryable now works with null query (#561) * Linq query now support read feed * Fixed isContinuationExcpected value * Updated changelog * Updated method name * updated naming --- .../src/Linq/CosmosLinqExtensions.cs | 9 ++- .../src/Linq/CosmosLinqQuery.cs | 74 ++++++++--------- .../src/Linq/CosmosLinqQueryProvider.cs | 14 ++-- .../Resource/Container/ContainerCore.Items.cs | 78 +++++++++++------- .../CosmosItemTests.cs | 79 +++++++++++++++++-- changelog.md | 2 + 6 files changed, 170 insertions(+), 86 deletions(-) diff --git a/Microsoft.Azure.Cosmos/src/Linq/CosmosLinqExtensions.cs b/Microsoft.Azure.Cosmos/src/Linq/CosmosLinqExtensions.cs index 5b65e7830a..9115b32a99 100644 --- a/Microsoft.Azure.Cosmos/src/Linq/CosmosLinqExtensions.cs +++ b/Microsoft.Azure.Cosmos/src/Linq/CosmosLinqExtensions.cs @@ -91,7 +91,14 @@ public static bool IsPrimitive(this object obj) /// internal static string ToSqlQueryText(this IQueryable query) { - return ((CosmosLinqQuery)query).ToSqlQueryText(); + CosmosLinqQuery linqQuery = query as CosmosLinqQuery; + + if (linqQuery == null) + { + throw new ArgumentOutOfRangeException(nameof(linqQuery), "ToSqlQueryText is only supported on cosmos LINQ query operations"); + } + + return linqQuery.ToSqlQueryText(); } /// diff --git a/Microsoft.Azure.Cosmos/src/Linq/CosmosLinqQuery.cs b/Microsoft.Azure.Cosmos/src/Linq/CosmosLinqQuery.cs index d7ba618a02..1ab81d471c 100644 --- a/Microsoft.Azure.Cosmos/src/Linq/CosmosLinqQuery.cs +++ b/Microsoft.Azure.Cosmos/src/Linq/CosmosLinqQuery.cs @@ -11,8 +11,6 @@ namespace Microsoft.Azure.Cosmos.Linq using System.Linq.Expressions; using System.Threading; using System.Threading.Tasks; - using Microsoft.Azure.Cosmos.Query; - using Microsoft.Azure.Documents; using Newtonsoft.Json; /// @@ -21,20 +19,19 @@ namespace Microsoft.Azure.Cosmos.Linq /// internal sealed class CosmosLinqQuery : IDocumentQuery, IOrderedQueryable { - private readonly Expression expression; private readonly CosmosLinqQueryProvider queryProvider; private readonly Guid correlatedActivityId; private readonly ContainerCore container; private readonly CosmosQueryClientCore queryClient; - private readonly CosmosSerializer cosmosJsonSerializer; + private readonly CosmosResponseFactory responseFactory; private readonly QueryRequestOptions cosmosQueryRequestOptions; private readonly bool allowSynchronousQueryExecution = false; private readonly string continuationToken; public CosmosLinqQuery( ContainerCore container, - CosmosSerializer cosmosJsonSerializer, + CosmosResponseFactory responseFactory, CosmosQueryClientCore queryClient, string continuationToken, QueryRequestOptions cosmosQueryRequestOptions, @@ -42,33 +39,34 @@ public CosmosLinqQuery( bool allowSynchronousQueryExecution) { this.container = container ?? throw new ArgumentNullException(nameof(container)); - this.cosmosJsonSerializer = cosmosJsonSerializer ?? throw new ArgumentNullException(nameof(cosmosJsonSerializer)); + this.responseFactory = responseFactory ?? throw new ArgumentNullException(nameof(responseFactory)); this.queryClient = queryClient ?? throw new ArgumentNullException(nameof(queryClient)); this.continuationToken = continuationToken; this.cosmosQueryRequestOptions = cosmosQueryRequestOptions; - this.expression = expression ?? Expression.Constant(this); + this.Expression = expression ?? Expression.Constant(this); this.allowSynchronousQueryExecution = allowSynchronousQueryExecution; + this.correlatedActivityId = Guid.NewGuid(); + this.queryProvider = new CosmosLinqQueryProvider( - this.container, - this.cosmosJsonSerializer, - this.queryClient, + container, + responseFactory, + queryClient, this.continuationToken, - this.cosmosQueryRequestOptions, + cosmosQueryRequestOptions, this.allowSynchronousQueryExecution, this.queryClient.OnExecuteScalarQueryCallback); - this.correlatedActivityId = Guid.NewGuid(); } public CosmosLinqQuery( ContainerCore container, - CosmosSerializer cosmosJsonSerializer, + CosmosResponseFactory responseFactory, CosmosQueryClientCore queryClient, string continuationToken, QueryRequestOptions cosmosQueryRequestOptions, bool allowSynchronousQueryExecution) : this( container, - cosmosJsonSerializer, + responseFactory, queryClient, continuationToken, cosmosQueryRequestOptions, @@ -79,7 +77,7 @@ public CosmosLinqQuery( public Type ElementType => typeof(T); - public Expression Expression => this.expression; + public Expression Expression { get; } public IQueryProvider Provider => this.queryProvider; @@ -100,13 +98,12 @@ public IEnumerator GetEnumerator() " use GetItemsQueryIterator to execute asynchronously"); } - FeedIterator localQueryExecutionContext = this.CreateCosmosQueryExecutionContext(); - while (localQueryExecutionContext.HasMoreResults) + FeedIterator localFeedIterator = this.CreateFeedIterator(false); + while (localFeedIterator.HasMoreResults) { #pragma warning disable VSTHRD002 // Avoid problematic synchronous waits - ResponseMessage responseMessage = TaskHelper.InlineIfPossible(() => localQueryExecutionContext.ReadNextAsync(CancellationToken.None), null).GetAwaiter().GetResult(); + FeedResponse items = TaskHelper.InlineIfPossible(() => localFeedIterator.ReadNextAsync(CancellationToken.None), null).GetAwaiter().GetResult(); #pragma warning disable VSTHRD002 // Avoid problematic synchronous waits - FeedResponse items = this.container.ClientContext.ResponseFactory.CreateQueryFeedResponse(responseMessage); foreach (T item in items) { @@ -126,7 +123,7 @@ IEnumerator IEnumerable.GetEnumerator() public override string ToString() { - SqlQuerySpec querySpec = DocumentQueryEvaluator.Evaluate(this.expression); + SqlQuerySpec querySpec = DocumentQueryEvaluator.Evaluate(this.Expression); if (querySpec != null) { return JsonConvert.SerializeObject(querySpec); @@ -137,21 +134,13 @@ public override string ToString() public string ToSqlQueryText() { - SqlQuerySpec querySpec = DocumentQueryEvaluator.Evaluate(this.expression); - if (querySpec != null) - { - return (querySpec.QueryText); - } - - return this.container.LinkUri.ToString(); + SqlQuerySpec querySpec = DocumentQueryEvaluator.Evaluate(this.Expression); + return querySpec?.QueryText; } public FeedIterator ToFeedIterator() { - return this.container.GetItemQueryIterator( - queryDefinition: new QueryDefinition(this.ToSqlQueryText()), - continuationToken: this.continuationToken, - requestOptions: this.cosmosQueryRequestOptions); + return this.CreateFeedIterator(true); } public void Dispose() @@ -169,20 +158,19 @@ Task> IDocumentQuery.ExecuteNextAsync(Cancellat throw new NotImplementedException(); } - private FeedIterator CreateCosmosQueryExecutionContext() + private FeedIterator CreateFeedIterator(bool isContinuationExcpected) { - return new CosmosQueryExecutionContextFactory( - client: this.queryClient, - resourceTypeEnum: ResourceType.Document, - operationType: OperationType.Query, - resourceType: typeof(T), - sqlQuerySpec: DocumentQueryEvaluator.Evaluate(this.expression), + SqlQuerySpec querySpec = DocumentQueryEvaluator.Evaluate(this.Expression); + + FeedIterator streamIterator = this.container.GetItemQueryStreamIteratorInternal( + sqlQuerySpec: querySpec, + isContinuationExcpected: isContinuationExcpected, continuationToken: this.continuationToken, - queryRequestOptions: this.cosmosQueryRequestOptions, - resourceLink: this.container.LinkUri, - isContinuationExpected: false, - allowNonValueAggregateQuery: true, - correlatedActivityId: Guid.NewGuid()); + requestOptions: this.cosmosQueryRequestOptions); + + return new FeedIteratorCore( + streamIterator, + this.responseFactory.CreateQueryFeedResponse); } } } diff --git a/Microsoft.Azure.Cosmos/src/Linq/CosmosLinqQueryProvider.cs b/Microsoft.Azure.Cosmos/src/Linq/CosmosLinqQueryProvider.cs index d54ea3a06b..6ff37ebc1e 100644 --- a/Microsoft.Azure.Cosmos/src/Linq/CosmosLinqQueryProvider.cs +++ b/Microsoft.Azure.Cosmos/src/Linq/CosmosLinqQueryProvider.cs @@ -16,7 +16,7 @@ internal sealed class CosmosLinqQueryProvider : IQueryProvider { private readonly ContainerCore container; private readonly CosmosQueryClientCore queryClient; - private readonly CosmosSerializer cosmosJsonSerializer; + private readonly CosmosResponseFactory responseFactory; private readonly QueryRequestOptions cosmosQueryRequestOptions; private readonly bool allowSynchronousQueryExecution; private readonly Action onExecuteScalarQueryCallback; @@ -24,7 +24,7 @@ internal sealed class CosmosLinqQueryProvider : IQueryProvider public CosmosLinqQueryProvider( ContainerCore container, - CosmosSerializer cosmosJsonSerializer, + CosmosResponseFactory responseFactory, CosmosQueryClientCore queryClient, string continuationToken, QueryRequestOptions cosmosQueryRequestOptions, @@ -32,7 +32,7 @@ public CosmosLinqQueryProvider( Action onExecuteScalarQueryCallback = null) { this.container = container; - this.cosmosJsonSerializer = cosmosJsonSerializer; + this.responseFactory = responseFactory; this.queryClient = queryClient; this.continuationToken = continuationToken; this.cosmosQueryRequestOptions = cosmosQueryRequestOptions; @@ -44,7 +44,7 @@ public IQueryable CreateQuery(Expression expression) { return new CosmosLinqQuery( this.container, - this.cosmosJsonSerializer, + this.responseFactory, this.queryClient, this.continuationToken, this.cosmosQueryRequestOptions, @@ -59,7 +59,7 @@ public IQueryable CreateQuery(Expression expression) return (IQueryable)Activator.CreateInstance( documentQueryType, this.container, - this.cosmosJsonSerializer, + this.responseFactory, this.queryClient, this.continuationToken, this.cosmosQueryRequestOptions, @@ -73,7 +73,7 @@ public TResult Execute(Expression expression) CosmosLinqQuery cosmosLINQQuery = (CosmosLinqQuery)Activator.CreateInstance( cosmosQueryType, this.container, - this.cosmosJsonSerializer, + this.responseFactory, this.queryClient, this.continuationToken, this.cosmosQueryRequestOptions, @@ -90,7 +90,7 @@ public object Execute(Expression expression) CosmosLinqQuery cosmosLINQQuery = (CosmosLinqQuery)Activator.CreateInstance( cosmosQueryType, this.container, - this.cosmosJsonSerializer, + this.responseFactory, this.queryClient, this.continuationToken, this.cosmosQueryRequestOptions, diff --git a/Microsoft.Azure.Cosmos/src/Resource/Container/ContainerCore.Items.cs b/Microsoft.Azure.Cosmos/src/Resource/Container/ContainerCore.Items.cs index 5d465a3730..f481ccc763 100644 --- a/Microsoft.Azure.Cosmos/src/Resource/Container/ContainerCore.Items.cs +++ b/Microsoft.Azure.Cosmos/src/Resource/Container/ContainerCore.Items.cs @@ -241,36 +241,11 @@ public override FeedIterator GetItemQueryStreamIterator( string continuationToken = null, QueryRequestOptions requestOptions = null) { - requestOptions = requestOptions ?? new QueryRequestOptions(); - - if (requestOptions.IsEffectivePartitionKeyRouting) - { - requestOptions.PartitionKey = null; - } - - if (queryDefinition == null) - { - return new FeedIteratorCore( - this.ClientContext, - this.LinkUri, - resourceType: ResourceType.Document, - queryDefinition: null, - continuationToken: continuationToken, - options: requestOptions); - } - - return new CosmosQueryExecutionContextFactory( - client: this.queryClient, - resourceTypeEnum: ResourceType.Document, - operationType: OperationType.Query, - resourceType: typeof(QueryResponse), - sqlQuerySpec: queryDefinition.ToSqlQuerySpec(), + return this.GetItemQueryStreamIteratorInternal( + sqlQuerySpec: queryDefinition?.ToSqlQuerySpec(), + isContinuationExcpected: true, continuationToken: continuationToken, - queryRequestOptions: requestOptions, - resourceLink: this.LinkUri, - isContinuationExpected: true, - allowNonValueAggregateQuery: true, - correlatedActivityId: Guid.NewGuid()); + requestOptions: requestOptions); } public override FeedIterator GetItemQueryIterator( @@ -321,7 +296,7 @@ public override IOrderedQueryable GetItemLinqQueryable( return new CosmosLinqQuery( this, - this.ClientContext.CosmosSerializer, + this.ClientContext.ResponseFactory, (CosmosQueryClientCore)this.queryClient, continuationToken, requestOptions, @@ -397,6 +372,49 @@ internal FeedIterator GetStandByFeedIterator( options: cosmosQueryRequestOptions); } + /// + /// Helper method to create a stream feed iterator. + /// It decides if it is a query or read feed and create + /// the correct instance. + /// + internal FeedIterator GetItemQueryStreamIteratorInternal( + SqlQuerySpec sqlQuerySpec, + bool isContinuationExcpected, + string continuationToken, + QueryRequestOptions requestOptions) + { + requestOptions = requestOptions ?? new QueryRequestOptions(); + + if (requestOptions.IsEffectivePartitionKeyRouting) + { + requestOptions.PartitionKey = null; + } + + if (sqlQuerySpec == null) + { + return new FeedIteratorCore( + this.ClientContext, + this.LinkUri, + resourceType: ResourceType.Document, + queryDefinition: null, + continuationToken: continuationToken, + options: requestOptions); + } + + return new CosmosQueryExecutionContextFactory( + client: this.queryClient, + resourceTypeEnum: ResourceType.Document, + operationType: OperationType.Query, + resourceType: typeof(QueryResponse), + sqlQuerySpec: sqlQuerySpec, + continuationToken: continuationToken, + queryRequestOptions: requestOptions, + resourceLink: this.LinkUri, + isContinuationExpected: isContinuationExcpected, + allowNonValueAggregateQuery: true, + correlatedActivityId: Guid.NewGuid()); + } + // Extracted partition key might be invalid as CollectionCache might be stale. // Stale collection cache is refreshed through PartitionKeyMismatchRetryPolicy // and partition-key is extracted again. diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/CosmosItemTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/CosmosItemTests.cs index 7fce0a8042..32d313e8bb 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/CosmosItemTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/CosmosItemTests.cs @@ -425,6 +425,55 @@ await feedIterator.ReadNextAsync(this.cancellationToken)) Assert.AreEqual(itemIds.Count, 0); } + [DataRow(false)] + [DataRow(true)] + [DataTestMethod] + public async Task ItemLinqReadFeedTest(bool useStatelessIterator) + { + IList deleteList = await this.CreateRandomItems(3, randomPartitionKey: true); + HashSet itemIds = deleteList.Select(x => x.id).ToHashSet(); + + QueryRequestOptions requestOptions = new QueryRequestOptions() + { + MaxItemCount = 1 + }; + + List itemsViaReadFeed = this.Container.GetItemLinqQueryable( + allowSynchronousQueryExecution: true, + requestOptions: requestOptions).ToList(); + + Assert.IsTrue(itemsViaReadFeed.Count >= 3); + CollectionAssert.AreEqual(deleteList.ToList(), itemsViaReadFeed); + + string lastContinuationToken = null; + FeedIterator feedIterator = this.Container.GetItemLinqQueryable( + requestOptions: requestOptions).ToFeedIterator(); + + while (feedIterator.HasMoreResults) + { + if (useStatelessIterator) + { + feedIterator = this.Container.GetItemLinqQueryable( + continuationToken: lastContinuationToken, + requestOptions: requestOptions).ToFeedIterator(); + } + + var responseMessage = await feedIterator.ReadNextAsync(this.cancellationToken); + lastContinuationToken = responseMessage.ContinuationToken; + + foreach (ToDoActivity toDoActivity in responseMessage) + { + if (itemIds.Contains(toDoActivity.id)) + { + itemIds.Remove(toDoActivity.id); + } + } + } + + Assert.IsNull(lastContinuationToken); + Assert.AreEqual(itemIds.Count, 0); + } + [TestMethod] public async Task ItemDistinctStreamIterator() { @@ -1322,7 +1371,7 @@ public async Task ItemLINQQueryWithContinuationTokenTest() FeedResponse feedResponse = await feedIterator.ReadNextAsync(); firstItemSet = feedResponse.Count(); continuationToken = feedResponse.ContinuationToken; - if(firstItemSet > 0) + if (firstItemSet > 0) { break; } @@ -1343,7 +1392,7 @@ public async Task ItemLINQQueryWithContinuationTokenTest() Assert.AreEqual(10 - firstItemSet, secondItemSet); //Test continuationToken with blocking LINQ execution - linqQueryable = this.Container.GetItemLinqQueryable(allowSynchronousQueryExecution:true, continuationToken: continuationToken, requestOptions: queryRequestOptions); + linqQueryable = this.Container.GetItemLinqQueryable(allowSynchronousQueryExecution: true, continuationToken: continuationToken, requestOptions: queryRequestOptions); int linqExecutionItemCount = linqQueryable.Where(item => (item.taskNum < 100)).Count(); Assert.AreEqual(10 - firstItemSet, linqExecutionItemCount); } @@ -1463,7 +1512,7 @@ public async Task VerifyToManyRequestTest(bool isQuery) for (int i = 0; i < 500 && failedToManyRequests.Count == 0; i++) { createQuery.Add(VerifyQueryToManyExceptionAsync( - container, + container, isQuery, failedToManyRequests)); } @@ -1569,7 +1618,7 @@ public async Task ContainterReCreateStatelessTest(bool operationBetweenRecreate, private static async Task VerifyQueryToManyExceptionAsync( Container container, - bool isQuery, + bool isQuery, List failedToManyMessages) { string queryText = null; @@ -1607,7 +1656,7 @@ private static async Task ExecuteReadFeedAsync(Container container, HttpStatusCo { ResponseMessage response = await iterator.ReadNextAsync(); Assert.AreEqual(expected, response.StatusCode, $"ExecuteReadFeedAsync substatuscode: {response.Headers.SubStatusCode} "); - } + } } private async Task> CreateRandomItems(int pkCount, int perPKItemCount = 1, bool randomPartitionKey = true) @@ -1783,6 +1832,26 @@ public class ToDoActivity public double cost { get; set; } public string description { get; set; } public string status { get; set; } + + public override bool Equals(Object obj) + { + ToDoActivity input = obj as ToDoActivity; + if (input == null) + { + return false; + } + + return string.Equals(this.id, input.id) + && this.taskNum == input.taskNum + && this.cost == input.cost + && string.Equals(this.description, input.description) + && string.Equals(this.status, input.status); + } + + public override int GetHashCode() + { + return base.GetHashCode(); + } } public class ToDoActivityAfterMigration diff --git a/changelog.md b/changelog.md index af7bb3cb85..d21b5d2d2e 100644 --- a/changelog.md +++ b/changelog.md @@ -1,3 +1,4 @@ +# Changelog The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). @@ -14,6 +15,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - [#548](https://github.com/Azure/azure-cosmos-dotnet-v3/pull/548) Fixed mis-typed message in CosmosException.ToString(); - [#558](https://github.com/Azure/azure-cosmos-dotnet-v3/pull/558) LocationCache ConcurrentDict lock contention fix +- [#561](https://github.com/Azure/azure-cosmos-dotnet-v3/pull/561) GetItemLinqQueryable now works with null query ## [3.0.0](https://www.nuget.org/packages/Microsoft.Azure.Cosmos/3.0.0) - 2019-07-15