diff --git a/src/EFCore.Analyzers/EFDiagnostics.cs b/src/EFCore.Analyzers/EFDiagnostics.cs index b8eb034043a..acedc129899 100644 --- a/src/EFCore.Analyzers/EFDiagnostics.cs +++ b/src/EFCore.Analyzers/EFDiagnostics.cs @@ -17,4 +17,5 @@ public static class EFDiagnostics public const string ProviderExperimentalApi = "EF9002"; public const string PrecompiledQueryExperimental = "EF9100"; public const string MetricsExperimental = "EF9101"; + public const string PagingExperimental = "EF9102"; } diff --git a/src/EFCore.Cosmos/Diagnostics/CosmosQueryEventData.cs b/src/EFCore.Cosmos/Diagnostics/CosmosQueryEventData.cs index fca1bea3489..65e0e729503 100644 --- a/src/EFCore.Cosmos/Diagnostics/CosmosQueryEventData.cs +++ b/src/EFCore.Cosmos/Diagnostics/CosmosQueryEventData.cs @@ -25,7 +25,7 @@ public CosmosQueryEventData( EventDefinitionBase eventDefinition, Func messageGenerator, string containerId, - PartitionKey partitionKeyValue, + PartitionKey? partitionKeyValue, IReadOnlyList<(string Name, object? Value)> parameters, string querySql, bool logSensitiveData) @@ -46,7 +46,7 @@ public CosmosQueryEventData( /// /// The key of the Cosmos partition that the query is using. /// - public virtual PartitionKey PartitionKeyValue { get; } + public virtual PartitionKey? PartitionKeyValue { get; } /// /// Name/values for each parameter in the Cosmos Query. diff --git a/src/EFCore.Cosmos/Diagnostics/CosmosQueryExecutedEventData.cs b/src/EFCore.Cosmos/Diagnostics/CosmosQueryExecutedEventData.cs index 2934ac9e71f..351a250b310 100644 --- a/src/EFCore.Cosmos/Diagnostics/CosmosQueryExecutedEventData.cs +++ b/src/EFCore.Cosmos/Diagnostics/CosmosQueryExecutedEventData.cs @@ -31,7 +31,7 @@ public CosmosQueryExecutedEventData( double requestCharge, string activityId, string containerId, - PartitionKey partitionKeyValue, + PartitionKey? partitionKeyValue, IReadOnlyList<(string Name, object? Value)> parameters, string querySql, bool logSensitiveData) @@ -70,7 +70,7 @@ public CosmosQueryExecutedEventData( /// /// The key of the Cosmos partition that the query is using. /// - public virtual PartitionKey PartitionKeyValue { get; } + public virtual PartitionKey? PartitionKeyValue { get; } /// /// Name/values for each parameter in the Cosmos Query. diff --git a/src/EFCore.Cosmos/Diagnostics/Internal/CosmosLoggerExtensions.cs b/src/EFCore.Cosmos/Diagnostics/Internal/CosmosLoggerExtensions.cs index 32e4f91c1f5..1689ca7b98a 100644 --- a/src/EFCore.Cosmos/Diagnostics/Internal/CosmosLoggerExtensions.cs +++ b/src/EFCore.Cosmos/Diagnostics/Internal/CosmosLoggerExtensions.cs @@ -54,7 +54,7 @@ public static void SyncNotSupported( public static void ExecutingSqlQuery( this IDiagnosticsLogger diagnostics, string containerId, - PartitionKey partitionKeyValue, + PartitionKey? partitionKeyValue, CosmosSqlQuery cosmosSqlQuery) { var definition = CosmosResources.LogExecutingSqlQuery(diagnostics); @@ -66,7 +66,7 @@ public static void ExecutingSqlQuery( definition.Log( diagnostics, containerId, - logSensitiveData ? partitionKeyValue.ToString() : "?", + logSensitiveData ? partitionKeyValue?.ToString() : "?", FormatParameters(cosmosSqlQuery.Parameters, logSensitiveData && cosmosSqlQuery.Parameters.Count > 0), Environment.NewLine, cosmosSqlQuery.Query); @@ -158,7 +158,7 @@ public static void ExecutedReadNext( double requestCharge, string activityId, string containerId, - PartitionKey partitionKeyValue, + PartitionKey? partitionKeyValue, CosmosSqlQuery cosmosSqlQuery) { var definition = CosmosResources.LogExecutedReadNext(diagnostics); @@ -177,7 +177,7 @@ public static void ExecutedReadNext( requestCharge, activityId, containerId, - logSensitiveData ? partitionKeyValue.ToString() : "?", + logSensitiveData ? partitionKeyValue?.ToString() : "?", FormatParameters(cosmosSqlQuery.Parameters, logSensitiveData && cosmosSqlQuery.Parameters.Count > 0), Environment.NewLine, cosmosSqlQuery.Query)); diff --git a/src/EFCore.Cosmos/EFCore.Cosmos.csproj b/src/EFCore.Cosmos/EFCore.Cosmos.csproj index 935e7f25195..d14b89967ce 100644 --- a/src/EFCore.Cosmos/EFCore.Cosmos.csproj +++ b/src/EFCore.Cosmos/EFCore.Cosmos.csproj @@ -10,6 +10,7 @@ $(PackageTags);CosmosDb;SQL API true $(NoWarn);EF9101 + $(NoWarn);EF9102 diff --git a/src/EFCore.Cosmos/Extensions/CosmosQueryableExtensions.cs b/src/EFCore.Cosmos/Extensions/CosmosQueryableExtensions.cs index b974d714a92..7def403562a 100644 --- a/src/EFCore.Cosmos/Extensions/CosmosQueryableExtensions.cs +++ b/src/EFCore.Cosmos/Extensions/CosmosQueryableExtensions.cs @@ -1,6 +1,7 @@ // Licensed to the .NET Foundation under one or more agreements. // The .NET Foundation licenses this file to you under the MIT license. +using System.Diagnostics.CodeAnalysis; using JetBrains.Annotations; using Microsoft.EntityFrameworkCore.Cosmos.Query.Internal; using Microsoft.EntityFrameworkCore.Query.Internal; @@ -177,4 +178,52 @@ private static FromSqlQueryRootExpression GenerateFromSqlQueryRoot( sql, Expression.Constant(arguments)); } + + internal static readonly MethodInfo ToPageAsyncMethodInfo + = typeof(CosmosQueryableExtensions).GetMethod(nameof(ToPageAsync))!; + + + /// + /// Allows paginating through query results by repeatedly executing the same query, passing continuation tokens to retrieve + /// successive pages of the result set, and specifying the maximum number of results per page. + /// + /// The source query. + /// + /// An optional continuation token returned from a previous execution of this query via + /// . If , retrieves query results from the start. + /// + /// + /// The maximum number of results in the returned . The page may contain fewer results if the database + /// did not contain enough matching results. + /// + /// Limits the length of continuation token in the query response. + /// A to observe while waiting for the task to complete. + /// A containing at most results. + [Experimental(EFDiagnostics.PagingExperimental)] + public static Task> ToPageAsync( + this IQueryable source, + int pageSize, + string? continuationToken, + int? responseContinuationTokenLimitInKb = null, + CancellationToken cancellationToken = default) + { + if (source.Provider is not IAsyncQueryProvider provider) + { + throw new InvalidOperationException(CoreStrings.IQueryableProviderNotAsync); + } + + return provider.ExecuteAsync>>( + Expression.Call( + instance: null, + method: ToPageAsyncMethodInfo.MakeGenericMethod(typeof(TSource)), + arguments: + [ + source.Expression, + Expression.Constant(pageSize, typeof(int)), + Expression.Constant(continuationToken, typeof(string)), + Expression.Constant(responseContinuationTokenLimitInKb, typeof(int?)), + Expression.Constant(default(CancellationToken), typeof(CancellationToken)) + ]), + cancellationToken); + } } diff --git a/src/EFCore.Cosmos/Query/CosmosPage.cs b/src/EFCore.Cosmos/Query/CosmosPage.cs new file mode 100644 index 00000000000..3ab66791975 --- /dev/null +++ b/src/EFCore.Cosmos/Query/CosmosPage.cs @@ -0,0 +1,32 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. + +using System.Diagnostics.CodeAnalysis; + +// ReSharper disable once CheckNamespace +namespace Microsoft.EntityFrameworkCore; + +/// +/// A single page of results returned from a user query; can be used to paginate through long result-sets. +/// Returned by . +/// +/// The values contained in this page. +/// +/// The continuation token for fetching further results from the query. Is or empty when there are no more +/// results. +/// +/// The type of values contained in the page. +[Experimental(EFDiagnostics.PagingExperimental)] +public readonly struct CosmosPage(IReadOnlyList values, string? continuationToken) +{ + /// + /// The values contained in this page. + /// + public IReadOnlyList Values { get; } = values; + + /// + /// The continuation token for fetching further results from the query. Is or empty when there are no more + /// results. + /// + public string? ContinuationToken { get; } = continuationToken; +} diff --git a/src/EFCore.Cosmos/Query/Internal/CosmosQueryableMethodTranslatingExpressionVisitor.cs b/src/EFCore.Cosmos/Query/Internal/CosmosQueryableMethodTranslatingExpressionVisitor.cs index 3ba1654c57e..1bc4e87cd2a 100644 --- a/src/EFCore.Cosmos/Query/Internal/CosmosQueryableMethodTranslatingExpressionVisitor.cs +++ b/src/EFCore.Cosmos/Query/Internal/CosmosQueryableMethodTranslatingExpressionVisitor.cs @@ -4,6 +4,7 @@ using System.Diagnostics.CodeAnalysis; using Microsoft.EntityFrameworkCore.Cosmos.Internal; using Microsoft.EntityFrameworkCore.Cosmos.Metadata.Internal; +using Microsoft.EntityFrameworkCore.Cosmos.Query.Internal.Expressions; using Microsoft.EntityFrameworkCore.Cosmos.Storage.Internal; using Microsoft.EntityFrameworkCore.Internal; @@ -86,6 +87,64 @@ protected CosmosQueryableMethodTranslatingExpressionVisitor( _subquery = true; } + /// + /// This is an internal API that supports the Entity Framework Core infrastructure and not subject to + /// the same compatibility standards as public APIs. It may be changed or removed without notice in + /// any release. You should only use it directly in your code with extreme caution and knowing that + /// doing so can result in application failures when updating to a new Entity Framework Core release. + /// + public override Expression Translate(Expression expression) + { + // Handle ToPageAsync(), which can only ever be the top-level node in the query tree. + if (expression is MethodCallExpression { Method: var method, Arguments: var arguments } + && method.DeclaringType == typeof(CosmosQueryableExtensions) + && method.Name is nameof(CosmosQueryableExtensions.ToPageAsync)) + { + var source = base.Translate(arguments[0]); + + if (source == QueryCompilationContext.NotTranslatedExpression) + { + return source; + } + + if (source is not ShapedQueryExpression shapedQuery) + { + throw new UnreachableException($"Expected a ShapedQueryExpression but found {source.GetType().Name}"); + } + + // The arguments to ToPage/ToPageAsync must have been parameterized by the funcletizer, since they're non-lambda arguments to + // a top-level function (like Skip/Take). Translate to get these as SqlParameterExpressions. + if (arguments is not + [ + _, // source + ParameterExpression maxItemCount, + ParameterExpression continuationToken, + ParameterExpression responseContinuationTokenLimitInKb, + _ // cancellation token + ] + || _sqlTranslator.Translate(maxItemCount) is not SqlParameterExpression translatedMaxItemCount + || _sqlTranslator.Translate(continuationToken) is not SqlParameterExpression translatedContinuationToken + || _sqlTranslator.Translate(responseContinuationTokenLimitInKb) is not SqlParameterExpression + translatedResponseContinuationTokenLimitInKb) + { + throw new UnreachableException("ToPageAsync without the appropriate parameterized arguments"); + } + + // Wrap the shaper for the entire query in a PagingExpression which also contains the paging arguments, and update + // the final cardinality to Single (since we'll be returning a single Page). + return shapedQuery + .UpdateShaperExpression(new PagingExpression( + shapedQuery.ShaperExpression, + translatedMaxItemCount, + translatedContinuationToken, + translatedResponseContinuationTokenLimitInKb, + typeof(CosmosPage<>).MakeGenericType(shapedQuery.ShaperExpression.Type))) + .UpdateResultCardinality(ResultCardinality.Single); + } + + return base.Translate(expression); + } + /// /// This is an internal API that supports the Entity Framework Core infrastructure and not subject to /// the same compatibility standards as public APIs. It may be changed or removed without notice in diff --git a/src/EFCore.Cosmos/Query/Internal/CosmosShapedQueryCompilingExpressionVisitor.PagingQueryingEnumerable.cs b/src/EFCore.Cosmos/Query/Internal/CosmosShapedQueryCompilingExpressionVisitor.PagingQueryingEnumerable.cs new file mode 100644 index 00000000000..6685d6fc8a5 --- /dev/null +++ b/src/EFCore.Cosmos/Query/Internal/CosmosShapedQueryCompilingExpressionVisitor.PagingQueryingEnumerable.cs @@ -0,0 +1,240 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. + +#nullable disable + +using System.Collections; +using Microsoft.EntityFrameworkCore.Cosmos.Diagnostics.Internal; +using Microsoft.EntityFrameworkCore.Cosmos.Internal; +using Microsoft.EntityFrameworkCore.Cosmos.Storage.Internal; +using Newtonsoft.Json.Linq; + +namespace Microsoft.EntityFrameworkCore.Cosmos.Query.Internal; + +/// +/// This is an internal API that supports the Entity Framework Core infrastructure and not subject to +/// the same compatibility standards as public APIs. It may be changed or removed without notice in +/// any release. You should only use it directly in your code with extreme caution and knowing that +/// doing so can result in application failures when updating to a new Entity Framework Core release. +/// +public partial class CosmosShapedQueryCompilingExpressionVisitor +{ + private sealed class PagingQueryingEnumerable : IAsyncEnumerable> + { + private readonly CosmosQueryContext _cosmosQueryContext; + private readonly ISqlExpressionFactory _sqlExpressionFactory; + private readonly SelectExpression _selectExpression; + private readonly Func _shaper; + private readonly IQuerySqlGeneratorFactory _querySqlGeneratorFactory; + private readonly Type _contextType; + private readonly string _cosmosContainer; + private readonly PartitionKey _cosmosPartitionKeyValue; + private readonly IDiagnosticsLogger _queryLogger; + private readonly IDiagnosticsLogger _commandLogger; + private readonly bool _standAloneStateManager; + private readonly bool _threadSafetyChecksEnabled; + private readonly string _maxItemCountParameterName; + private readonly string _continuationTokenParameterName; + private readonly string _responseContinuationTokenLimitInKbParameterName; + + public PagingQueryingEnumerable( + CosmosQueryContext cosmosQueryContext, + ISqlExpressionFactory sqlExpressionFactory, + IQuerySqlGeneratorFactory querySqlGeneratorFactory, + SelectExpression selectExpression, + Func shaper, + Type contextType, + string cosmosContainer, + PartitionKey partitionKeyValueFromExtension, + bool standAloneStateManager, + bool threadSafetyChecksEnabled, + string maxItemCountParameterName, + string continuationTokenParameterName, + string responseContinuationTokenLimitInKbParameterName) + { + _cosmosQueryContext = cosmosQueryContext; + _sqlExpressionFactory = sqlExpressionFactory; + _querySqlGeneratorFactory = querySqlGeneratorFactory; + _selectExpression = selectExpression; + _shaper = shaper; + _contextType = contextType; + _queryLogger = cosmosQueryContext.QueryLogger; + _commandLogger = cosmosQueryContext.CommandLogger; + _standAloneStateManager = standAloneStateManager; + _threadSafetyChecksEnabled = threadSafetyChecksEnabled; + _maxItemCountParameterName = maxItemCountParameterName; + _continuationTokenParameterName = continuationTokenParameterName; + _responseContinuationTokenLimitInKbParameterName = responseContinuationTokenLimitInKbParameterName; + + var partitionKey = selectExpression.GetPartitionKeyValue(cosmosQueryContext.ParameterValues); + if (partitionKey != PartitionKey.None + && partitionKeyValueFromExtension != PartitionKey.None + && !partitionKeyValueFromExtension.Equals(partitionKey)) + { + throw new InvalidOperationException(CosmosStrings.PartitionKeyMismatch(partitionKeyValueFromExtension, partitionKey)); + } + + _cosmosPartitionKeyValue = partitionKey != PartitionKey.None ? partitionKey : partitionKeyValueFromExtension; + _cosmosContainer = cosmosContainer; + } + + public IAsyncEnumerator> GetAsyncEnumerator(CancellationToken cancellationToken = default) + => new AsyncEnumerator(this, cancellationToken); + + private CosmosSqlQuery GenerateQuery() + => _querySqlGeneratorFactory.Create().GetSqlQuery( + (SelectExpression)new InExpressionValuesExpandingExpressionVisitor( + _sqlExpressionFactory, + _cosmosQueryContext.ParameterValues) + .Visit(_selectExpression), + _cosmosQueryContext.ParameterValues); + + private sealed class AsyncEnumerator : IAsyncEnumerator> + { + private readonly PagingQueryingEnumerable _queryingEnumerable; + private readonly CosmosQueryContext _cosmosQueryContext; + private readonly Func _shaper; + private readonly Type _contextType; + private readonly string _cosmosContainer; + private readonly PartitionKey _cosmosPartitionKeyValue; + private readonly IDiagnosticsLogger _queryLogger; + private readonly IDiagnosticsLogger _commandLogger; + private readonly bool _standAloneStateManager; + private readonly CancellationToken _cancellationToken; + private readonly IConcurrencyDetector _concurrencyDetector; + private readonly IExceptionDetector _exceptionDetector; + + private bool _hasExecuted; + private bool _isDisposed; + + public AsyncEnumerator(PagingQueryingEnumerable queryingEnumerable, CancellationToken cancellationToken = default) + { + _queryingEnumerable = queryingEnumerable; + _cosmosQueryContext = queryingEnumerable._cosmosQueryContext; + _shaper = queryingEnumerable._shaper; + _contextType = queryingEnumerable._contextType; + _cosmosContainer = queryingEnumerable._cosmosContainer; + _cosmosPartitionKeyValue = queryingEnumerable._cosmosPartitionKeyValue; + _queryLogger = queryingEnumerable._queryLogger; + _commandLogger = queryingEnumerable._commandLogger; + _standAloneStateManager = queryingEnumerable._standAloneStateManager; + _exceptionDetector = _cosmosQueryContext.ExceptionDetector; + _cancellationToken = cancellationToken; + + _concurrencyDetector = queryingEnumerable._threadSafetyChecksEnabled + ? _cosmosQueryContext.ConcurrencyDetector + : null; + } + + public CosmosPage Current { get; private set; } + + public async ValueTask MoveNextAsync() + { + ObjectDisposedException.ThrowIf(_isDisposed, typeof(AsyncEnumerator)); + + try + { + _concurrencyDetector?.EnterCriticalSection(); + + try + { + if (_hasExecuted) + { + return false; + } + + _hasExecuted = true; + + var maxItemCount = (int)_cosmosQueryContext.ParameterValues[_queryingEnumerable._maxItemCountParameterName]; + var continuationToken = + (string)_cosmosQueryContext.ParameterValues[_queryingEnumerable._continuationTokenParameterName]; + var responseContinuationTokenLimitInKb = (int?) + _cosmosQueryContext.ParameterValues[_queryingEnumerable._responseContinuationTokenLimitInKbParameterName]; + + var sqlQuery = _queryingEnumerable.GenerateQuery(); + + EntityFrameworkMetricsData.ReportQueryExecuting(); + + var queryRequestOptions = new QueryRequestOptions + { + ResponseContinuationTokenLimitInKb = responseContinuationTokenLimitInKb + }; + + if (_cosmosPartitionKeyValue != PartitionKey.None) + { + queryRequestOptions.PartitionKey = _cosmosPartitionKeyValue; + } + + var cosmosClient = _cosmosQueryContext.CosmosClient; + _commandLogger.ExecutingSqlQuery(_cosmosContainer, _cosmosPartitionKeyValue, sqlQuery); + _cosmosQueryContext.InitializeStateManager(_standAloneStateManager); + + var results = new List(maxItemCount); + + while (maxItemCount > 0) + { + queryRequestOptions.MaxItemCount = maxItemCount; + using var feedIterator = cosmosClient.CreateQuery( + _cosmosContainer, sqlQuery, continuationToken, queryRequestOptions); + + using var responseMessage = await feedIterator.ReadNextAsync(_cancellationToken).ConfigureAwait(false); + + _commandLogger.ExecutedReadNext( + responseMessage.Diagnostics.GetClientElapsedTime(), + responseMessage.Headers.RequestCharge, + responseMessage.Headers.ActivityId, + _cosmosContainer, + _cosmosPartitionKeyValue, + sqlQuery); + + responseMessage.EnsureSuccessStatusCode(); + + var responseMessageEnumerable = cosmosClient.GetResponseMessageEnumerable(responseMessage); + foreach (var resultObject in responseMessageEnumerable) + { + results.Add(_shaper(_cosmosQueryContext, resultObject)); + maxItemCount--; + } + + continuationToken = responseMessage.ContinuationToken; + + if (responseMessage.ContinuationToken is null) + { + break; + } + } + + Current = new CosmosPage(results, continuationToken); + + _hasExecuted = true; + return true; + } + finally + { + _concurrencyDetector?.ExitCriticalSection(); + } + } + catch (Exception exception) + { + if (_exceptionDetector.IsCancellation(exception, _cancellationToken)) + { + _queryLogger.QueryCanceled(_contextType); + } + else + { + _queryLogger.QueryIterationFailed(_contextType, exception); + } + + throw; + } + } + + public ValueTask DisposeAsync() + { + _isDisposed = true; + + return default; + } + } + } +} diff --git a/src/EFCore.Cosmos/Query/Internal/CosmosShapedQueryCompilingExpressionVisitor.QueryingEnumerable.cs b/src/EFCore.Cosmos/Query/Internal/CosmosShapedQueryCompilingExpressionVisitor.QueryingEnumerable.cs index edf79fdec44..cf0876f47fc 100644 --- a/src/EFCore.Cosmos/Query/Internal/CosmosShapedQueryCompilingExpressionVisitor.QueryingEnumerable.cs +++ b/src/EFCore.Cosmos/Query/Internal/CosmosShapedQueryCompilingExpressionVisitor.QueryingEnumerable.cs @@ -158,10 +158,7 @@ public bool MoveNext() EntityFrameworkMetricsData.ReportQueryExecuting(); _enumerator = _cosmosQueryContext.CosmosClient - .ExecuteSqlQuery( - _cosmosContainer, - _cosmosPartitionKeyValue, - sqlQuery) + .ExecuteSqlQuery(_cosmosContainer, _cosmosPartitionKeyValue, sqlQuery) .GetEnumerator(); _cosmosQueryContext.InitializeStateManager(_standAloneStateManager); } @@ -256,10 +253,7 @@ public async ValueTask MoveNextAsync() EntityFrameworkMetricsData.ReportQueryExecuting(); _enumerator = _cosmosQueryContext.CosmosClient - .ExecuteSqlQueryAsync( - _cosmosContainer, - _cosmosPartitionKeyValue, - sqlQuery) + .ExecuteSqlQueryAsync(_cosmosContainer, _cosmosPartitionKeyValue, sqlQuery) .GetAsyncEnumerator(_cancellationToken); _cosmosQueryContext.InitializeStateManager(_standAloneStateManager); } diff --git a/src/EFCore.Cosmos/Query/Internal/CosmosShapedQueryCompilingExpressionVisitor.cs b/src/EFCore.Cosmos/Query/Internal/CosmosShapedQueryCompilingExpressionVisitor.cs index 23a17d2d437..54dc03462c9 100644 --- a/src/EFCore.Cosmos/Query/Internal/CosmosShapedQueryCompilingExpressionVisitor.cs +++ b/src/EFCore.Cosmos/Query/Internal/CosmosShapedQueryCompilingExpressionVisitor.cs @@ -3,6 +3,7 @@ #nullable disable +using Microsoft.EntityFrameworkCore.Cosmos.Query.Internal.Expressions; using Newtonsoft.Json.Linq; using static System.Linq.Expressions.Expression; @@ -43,55 +44,90 @@ protected override Expression VisitShapedQuery(ShapedQueryExpression shapedQuery var jObjectParameter = Parameter(typeof(JObject), "jObject"); var shaperBody = shapedQueryExpression.ShaperExpression; + + var (paging, maxItemCount, continuationToken, responseContinuationTokenLimitInKb) = + (false, (SqlParameterExpression)null, (SqlParameterExpression)null, (SqlParameterExpression)null); + + // If the query is terminated ToPageAsync(), CosmosQueryableMethodTranslatingExpressionVisitor composed a PagingExpression on top + // of the shaper. We remove that to get the shaper for each actual document being read (as opposed to the page of those documents), + // and extract the pagination arguments. + if (shaperBody is PagingExpression pagingExpression) + { + paging = true; + maxItemCount = pagingExpression.MaxItemCount; + continuationToken = pagingExpression.ContinuationToken; + responseContinuationTokenLimitInKb = pagingExpression.ResponseContinuationTokenLimitInKb; + + shaperBody = pagingExpression.Expression; + } + shaperBody = new JObjectInjectingExpressionVisitor().Visit(shaperBody); shaperBody = InjectEntityMaterializers(shaperBody); - switch (shapedQueryExpression.QueryExpression) + if (shapedQueryExpression.QueryExpression is not SelectExpression selectExpression) { - case SelectExpression selectExpression: - shaperBody = new CosmosProjectionBindingRemovingExpressionVisitor( - selectExpression, jObjectParameter, - QueryCompilationContext.QueryTrackingBehavior == QueryTrackingBehavior.TrackAll) - .Visit(shaperBody); - - var shaperLambda = Lambda( - shaperBody, - QueryCompilationContext.QueryContextParameter, - jObjectParameter); - - var cosmosQueryContextConstant = Convert(QueryCompilationContext.QueryContextParameter, typeof(CosmosQueryContext)); - var shaperConstant = Constant(shaperLambda.Compile()); - var contextTypeConstant = Constant(_contextType); - var containerConstant = Constant(cosmosQueryCompilationContext.CosmosContainer); - var threadSafetyConstant = Constant(_threadSafetyChecksEnabled); - var standAloneStateManagerConstant = Constant( - QueryCompilationContext.QueryTrackingBehavior == QueryTrackingBehavior.NoTrackingWithIdentityResolution); - - return selectExpression.ReadItemInfo != null - ? New( - typeof(ReadItemQueryingEnumerable<>).MakeGenericType(selectExpression.ReadItemInfo.Type).GetConstructors()[0], - cosmosQueryContextConstant, - containerConstant, - Constant(selectExpression.ReadItemInfo), - shaperConstant, - contextTypeConstant, - standAloneStateManagerConstant, - threadSafetyConstant) - : New( - typeof(QueryingEnumerable<>).MakeGenericType(shaperLambda.ReturnType).GetConstructors()[0], - cosmosQueryContextConstant, - Constant(sqlExpressionFactory), - Constant(querySqlGeneratorFactory), - Constant(selectExpression), - shaperConstant, - contextTypeConstant, - containerConstant, - Constant(_partitionKeyValueFromExtension, typeof(PartitionKey)), - standAloneStateManagerConstant, - threadSafetyConstant); - - default: - throw new NotSupportedException(CoreStrings.UnhandledExpressionNode(shapedQueryExpression.QueryExpression)); + throw new NotSupportedException(CoreStrings.UnhandledExpressionNode(shapedQueryExpression.QueryExpression)); } + + shaperBody = new CosmosProjectionBindingRemovingExpressionVisitor( + selectExpression, jObjectParameter, + QueryCompilationContext.QueryTrackingBehavior == QueryTrackingBehavior.TrackAll) + .Visit(shaperBody); + + var shaperLambda = Lambda( + shaperBody, + QueryCompilationContext.QueryContextParameter, + jObjectParameter); + + var cosmosQueryContextConstant = Convert(QueryCompilationContext.QueryContextParameter, typeof(CosmosQueryContext)); + var shaperConstant = Constant(shaperLambda.Compile()); + var contextTypeConstant = Constant(_contextType); + var containerConstant = Constant(cosmosQueryCompilationContext.CosmosContainer); + var threadSafetyConstant = Constant(_threadSafetyChecksEnabled); + var standAloneStateManagerConstant = Constant( + QueryCompilationContext.QueryTrackingBehavior == QueryTrackingBehavior.NoTrackingWithIdentityResolution); + + Check.DebugAssert(!paging || selectExpression.ReadItemInfo is null, "ReadItem is being with paging, impossible."); + + return selectExpression switch + { + { ReadItemInfo: ReadItemInfo readItemInfo } => New( + typeof(ReadItemQueryingEnumerable<>).MakeGenericType(readItemInfo.Type).GetConstructors()[0], + cosmosQueryContextConstant, + containerConstant, + Constant(readItemInfo), + shaperConstant, + contextTypeConstant, + standAloneStateManagerConstant, + threadSafetyConstant), + + _ when paging => New( + typeof(PagingQueryingEnumerable<>).MakeGenericType(shaperLambda.ReturnType).GetConstructors()[0], + cosmosQueryContextConstant, + Constant(sqlExpressionFactory), + Constant(querySqlGeneratorFactory), + Constant(selectExpression), + shaperConstant, + contextTypeConstant, + containerConstant, + Constant(_partitionKeyValueFromExtension, typeof(PartitionKey)), + standAloneStateManagerConstant, + threadSafetyConstant, + Constant(maxItemCount.Name), + Constant(continuationToken.Name), + Constant(responseContinuationTokenLimitInKb.Name)), + + _ => New( + typeof(QueryingEnumerable<>).MakeGenericType(shaperLambda.ReturnType).GetConstructors()[0], cosmosQueryContextConstant, + Constant(sqlExpressionFactory), + Constant(querySqlGeneratorFactory), + Constant(selectExpression), + shaperConstant, + contextTypeConstant, + containerConstant, + Constant(_partitionKeyValueFromExtension, typeof(PartitionKey)), + standAloneStateManagerConstant, + threadSafetyConstant) + }; } } diff --git a/src/EFCore.Cosmos/Query/Internal/Expressions/PagingExpression.cs b/src/EFCore.Cosmos/Query/Internal/Expressions/PagingExpression.cs new file mode 100644 index 00000000000..0399f985067 --- /dev/null +++ b/src/EFCore.Cosmos/Query/Internal/Expressions/PagingExpression.cs @@ -0,0 +1,129 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. + +namespace Microsoft.EntityFrameworkCore.Cosmos.Query.Internal.Expressions; + +/// +/// This is an internal API that supports the Entity Framework Core infrastructure and not subject to +/// the same compatibility standards as public APIs. It may be changed or removed without notice in +/// any release. You should only use it directly in your code with extreme caution and knowing that +/// doing so can result in application failures when updating to a new Entity Framework Core release. +/// +public class PagingExpression( + Expression expression, + SqlParameterExpression maxItemCount, + SqlParameterExpression continuationToken, + SqlParameterExpression responseContinuationTokenLimitInKb, + Type type) + : Expression, IPrintableExpression +{ + /// + /// This is an internal API that supports the Entity Framework Core infrastructure and not subject to + /// the same compatibility standards as public APIs. It may be changed or removed without notice in + /// any release. You should only use it directly in your code with extreme caution and knowing that + /// doing so can result in application failures when updating to a new Entity Framework Core release. + /// + public sealed override ExpressionType NodeType + => ExpressionType.Extension; + + /// + /// This is an internal API that supports the Entity Framework Core infrastructure and not subject to + /// the same compatibility standards as public APIs. It may be changed or removed without notice in + /// any release. You should only use it directly in your code with extreme caution and knowing that + /// doing so can result in application failures when updating to a new Entity Framework Core release. + /// + public override Type Type { get; } = type; + + /// + /// This is an internal API that supports the Entity Framework Core infrastructure and not subject to + /// the same compatibility standards as public APIs. It may be changed or removed without notice in + /// any release. You should only use it directly in your code with extreme caution and knowing that + /// doing so can result in application failures when updating to a new Entity Framework Core release. + /// + public virtual Expression Expression { get; } = expression; + + /// + /// This is an internal API that supports the Entity Framework Core infrastructure and not subject to + /// the same compatibility standards as public APIs. It may be changed or removed without notice in + /// any release. You should only use it directly in your code with extreme caution and knowing that + /// doing so can result in application failures when updating to a new Entity Framework Core release. + /// + public virtual SqlParameterExpression MaxItemCount { get; } = maxItemCount; + + /// + /// This is an internal API that supports the Entity Framework Core infrastructure and not subject to + /// the same compatibility standards as public APIs. It may be changed or removed without notice in + /// any release. You should only use it directly in your code with extreme caution and knowing that + /// doing so can result in application failures when updating to a new Entity Framework Core release. + /// + public virtual SqlParameterExpression ContinuationToken { get; } = continuationToken; + + /// + /// This is an internal API that supports the Entity Framework Core infrastructure and not subject to + /// the same compatibility standards as public APIs. It may be changed or removed without notice in + /// any release. You should only use it directly in your code with extreme caution and knowing that + /// doing so can result in application failures when updating to a new Entity Framework Core release. + /// + public virtual SqlParameterExpression ResponseContinuationTokenLimitInKb { get; } = responseContinuationTokenLimitInKb; + + /// + /// This is an internal API that supports the Entity Framework Core infrastructure and not subject to + /// the same compatibility standards as public APIs. It may be changed or removed without notice in + /// any release. You should only use it directly in your code with extreme caution and knowing that + /// doing so can result in application failures when updating to a new Entity Framework Core release. + /// + protected override Expression VisitChildren(ExpressionVisitor visitor) + => Update(visitor.Visit(Expression)); + + /// + /// This is an internal API that supports the Entity Framework Core infrastructure and not subject to + /// the same compatibility standards as public APIs. It may be changed or removed without notice in + /// any release. You should only use it directly in your code with extreme caution and knowing that + /// doing so can result in application failures when updating to a new Entity Framework Core release. + /// + public virtual PagingExpression Update(Expression expression) + => expression == Expression + ? this + : expression.Type == Expression.Type + ? new PagingExpression(expression, ContinuationToken, MaxItemCount, ResponseContinuationTokenLimitInKb, Type) + : throw new UnreachableException("Can't change the Type of a PagingExpression"); + + /// + /// This is an internal API that supports the Entity Framework Core infrastructure and not subject to + /// the same compatibility standards as public APIs. It may be changed or removed without notice in + /// any release. You should only use it directly in your code with extreme caution and knowing that + /// doing so can result in application failures when updating to a new Entity Framework Core release. + /// + void IPrintableExpression.Print(ExpressionPrinter expressionPrinter) + { + expressionPrinter.Append("ToPage: "); + expressionPrinter.Visit(Expression); + } + + /// + /// This is an internal API that supports the Entity Framework Core infrastructure and not subject to + /// the same compatibility standards as public APIs. It may be changed or removed without notice in + /// any release. You should only use it directly in your code with extreme caution and knowing that + /// doing so can result in application failures when updating to a new Entity Framework Core release. + /// + public override bool Equals(object? obj) + => obj != null + && (ReferenceEquals(this, obj) + || obj is PagingExpression other + && Equals(other)); + + private bool Equals(PagingExpression other) + => Expression.Equals(other.Expression) + && ContinuationToken.Equals(other.ContinuationToken) + && MaxItemCount.Equals(other.MaxItemCount) + && ResponseContinuationTokenLimitInKb.Equals(other.ResponseContinuationTokenLimitInKb); + + /// + /// This is an internal API that supports the Entity Framework Core infrastructure and not subject to + /// the same compatibility standards as public APIs. It may be changed or removed without notice in + /// any release. You should only use it directly in your code with extreme caution and knowing that + /// doing so can result in application failures when updating to a new Entity Framework Core release. + /// + public override int GetHashCode() + => Expression.GetHashCode(); +} diff --git a/src/EFCore.Cosmos/Storage/Internal/CosmosClientWrapper.cs b/src/EFCore.Cosmos/Storage/Internal/CosmosClientWrapper.cs index 62aa38750f8..c2c64c84460 100644 --- a/src/EFCore.Cosmos/Storage/Internal/CosmosClientWrapper.cs +++ b/src/EFCore.Cosmos/Storage/Internal/CosmosClientWrapper.cs @@ -658,8 +658,9 @@ private static Task CreateSingleItemQueryAsync( /// public virtual FeedIterator CreateQuery( string containerId, - PartitionKey partitionKeyValue, - CosmosSqlQuery query) + CosmosSqlQuery query, + string? continuationToken = null, + QueryRequestOptions? queryRequestOptions = null) { var container = Client.GetDatabase(_databaseId).GetContainer(containerId); var queryDefinition = new QueryDefinition(query.Query); @@ -669,14 +670,7 @@ public virtual FeedIterator CreateQuery( queryDefinition, (current, parameter) => current.WithParameter(parameter.Name, parameter.Value)); - if (partitionKeyValue == PartitionKey.None) - { - return container.GetItemQueryStreamIterator(queryDefinition); - } - - var queryRequestOptions = new QueryRequestOptions { PartitionKey = partitionKeyValue }; - - return container.GetItemQueryStreamIterator(queryDefinition, requestOptions: queryRequestOptions); + return container.GetItemQueryStreamIterator(queryDefinition, continuationToken, queryRequestOptions); } [MethodImpl(MethodImplOptions.AggressiveInlining)] @@ -718,24 +712,17 @@ private static bool TryReadJObject(JsonTextReader jsonReader, [NotNullWhen(true) return false; } - private sealed class DocumentEnumerable : IEnumerable + private sealed class DocumentEnumerable( + CosmosClientWrapper cosmosClient, + string containerId, + PartitionKey partitionKeyValue, + CosmosSqlQuery cosmosSqlQuery) + : IEnumerable { - private readonly CosmosClientWrapper _cosmosClient; - private readonly string _containerId; - private readonly PartitionKey _partitionKeyValue; - private readonly CosmosSqlQuery _cosmosSqlQuery; - - public DocumentEnumerable( - CosmosClientWrapper cosmosClient, - string containerId, - PartitionKey partitionKeyValue, - CosmosSqlQuery cosmosSqlQuery) - { - _cosmosClient = cosmosClient; - _containerId = containerId; - _partitionKeyValue = partitionKeyValue; - _cosmosSqlQuery = cosmosSqlQuery; - } + private readonly CosmosClientWrapper _cosmosClient = cosmosClient; + private readonly string _containerId = containerId; + private readonly PartitionKey _partitionKeyValue = partitionKeyValue; + private readonly CosmosSqlQuery _cosmosSqlQuery = cosmosSqlQuery; public IEnumerator GetEnumerator() => new Enumerator(this); @@ -743,29 +730,19 @@ public IEnumerator GetEnumerator() IEnumerator IEnumerable.GetEnumerator() => GetEnumerator(); - private sealed class Enumerator : IEnumerator + private sealed class Enumerator(DocumentEnumerable documentEnumerable) : IEnumerator { - private readonly CosmosClientWrapper _cosmosClientWrapper; - private readonly string _containerId; - private readonly PartitionKey _partitionKeyValue; - private readonly CosmosSqlQuery _cosmosSqlQuery; + private readonly CosmosClientWrapper _cosmosClientWrapper = documentEnumerable._cosmosClient; + private readonly string _containerId = documentEnumerable._containerId; + private readonly PartitionKey _partitionKeyValue = documentEnumerable._partitionKeyValue; + private readonly CosmosSqlQuery _cosmosSqlQuery = documentEnumerable._cosmosSqlQuery; private JObject? _current; private ResponseMessage? _responseMessage; - private Stream? _responseStream; - private StreamReader? _reader; - private JsonTextReader? _jsonReader; + private IEnumerator? _responseMessageEnumerator; private FeedIterator? _query; - public Enumerator(DocumentEnumerable documentEnumerable) - { - _cosmosClientWrapper = documentEnumerable._cosmosClient; - _containerId = documentEnumerable._containerId; - _partitionKeyValue = documentEnumerable._partitionKeyValue; - _cosmosSqlQuery = documentEnumerable._cosmosSqlQuery; - } - public JObject Current => _current ?? throw new InvalidOperationException(); @@ -775,9 +752,19 @@ object IEnumerator.Current [MethodImpl(MethodImplOptions.AggressiveInlining)] public bool MoveNext() { - if (_jsonReader == null) + if (_responseMessageEnumerator == null) { - _query ??= _cosmosClientWrapper.CreateQuery(_containerId, _partitionKeyValue, _cosmosSqlQuery); + if (_query is null) + { + var queryRequestOptions = new QueryRequestOptions(); + if (_partitionKeyValue != PartitionKey.None) + { + queryRequestOptions.PartitionKey = _partitionKeyValue; + } + + _query = _cosmosClientWrapper.CreateQuery( + _containerId, _cosmosSqlQuery, continuationToken: null, queryRequestOptions); + } if (!_query.HasMoreResults) { @@ -797,14 +784,12 @@ public bool MoveNext() _responseMessage.EnsureSuccessStatusCode(); - _responseStream = _responseMessage.Content; - _reader = new StreamReader(_responseStream); - _jsonReader = CreateJsonReader(_reader); + _responseMessageEnumerator = new ResponseMessageEnumerable(_responseMessage).GetEnumerator(); } - if (TryReadJObject(_jsonReader, out var jObject)) + if (_responseMessageEnumerator.MoveNext()) { - _current = jObject; + _current = _responseMessageEnumerator.Current; return true; } @@ -815,20 +800,15 @@ public bool MoveNext() private void ResetRead() { - _jsonReader?.Close(); - _jsonReader = null; - _reader?.Dispose(); - _reader = null; - _responseStream?.Dispose(); - _responseStream = null; + _responseMessageEnumerator?.Dispose(); + _responseMessageEnumerator = null; + _responseMessage?.Dispose(); } public void Dispose() { ResetRead(); - - _responseMessage?.Dispose(); - _responseMessage = null; + _query?.Dispose(); } public void Reset() @@ -836,64 +816,56 @@ public void Reset() } } - private sealed class DocumentAsyncEnumerable : IAsyncEnumerable + private sealed class DocumentAsyncEnumerable( + CosmosClientWrapper cosmosClient, + string containerId, + PartitionKey partitionKeyValue, + CosmosSqlQuery cosmosSqlQuery) + : IAsyncEnumerable { - private readonly CosmosClientWrapper _cosmosClient; - private readonly string _containerId; - private readonly PartitionKey _partitionKeyValue; - private readonly CosmosSqlQuery _cosmosSqlQuery; - - public DocumentAsyncEnumerable( - CosmosClientWrapper cosmosClient, - string containerId, - PartitionKey partitionKeyValue, - CosmosSqlQuery cosmosSqlQuery) - { - _cosmosClient = cosmosClient; - _containerId = containerId; - _partitionKeyValue = partitionKeyValue; - _cosmosSqlQuery = cosmosSqlQuery; - } + private readonly CosmosClientWrapper _cosmosClient = cosmosClient; + private readonly string _containerId = containerId; + private readonly PartitionKey _partitionKeyValue = partitionKeyValue; + private readonly CosmosSqlQuery _cosmosSqlQuery = cosmosSqlQuery; public IAsyncEnumerator GetAsyncEnumerator(CancellationToken cancellationToken = default) => new AsyncEnumerator(this, cancellationToken); - private sealed class AsyncEnumerator : IAsyncEnumerator + private sealed class AsyncEnumerator(DocumentAsyncEnumerable documentEnumerable, CancellationToken cancellationToken) + : IAsyncEnumerator { - private readonly CosmosClientWrapper _cosmosClientWrapper; - private readonly string _containerId; - private readonly PartitionKey _partitionKeyValue; - private readonly CosmosSqlQuery _cosmosSqlQuery; - private readonly CancellationToken _cancellationToken; + private readonly CosmosClientWrapper _cosmosClientWrapper = documentEnumerable._cosmosClient; + private readonly string _containerId = documentEnumerable._containerId; + private readonly PartitionKey _partitionKeyValue = documentEnumerable._partitionKeyValue; + private readonly CosmosSqlQuery _cosmosSqlQuery = documentEnumerable._cosmosSqlQuery; private JObject? _current; private ResponseMessage? _responseMessage; - private Stream? _responseStream; - private StreamReader? _reader; - private JsonTextReader? _jsonReader; + private IAsyncEnumerator? _responseMessageEnumerator; private FeedIterator? _query; public JObject Current => _current ?? throw new InvalidOperationException(); - public AsyncEnumerator(DocumentAsyncEnumerable documentEnumerable, CancellationToken cancellationToken) - { - _cosmosClientWrapper = documentEnumerable._cosmosClient; - _containerId = documentEnumerable._containerId; - _partitionKeyValue = documentEnumerable._partitionKeyValue; - _cosmosSqlQuery = documentEnumerable._cosmosSqlQuery; - _cancellationToken = cancellationToken; - } - [MethodImpl(MethodImplOptions.AggressiveInlining)] public async ValueTask MoveNextAsync() { - _cancellationToken.ThrowIfCancellationRequested(); + cancellationToken.ThrowIfCancellationRequested(); - if (_jsonReader == null) + if (_responseMessageEnumerator == null) { - _query ??= _cosmosClientWrapper.CreateQuery(_containerId, _partitionKeyValue, _cosmosSqlQuery); + if (_query is null) + { + var queryRequestOptions = new QueryRequestOptions(); + if (_partitionKeyValue != PartitionKey.None) + { + queryRequestOptions.PartitionKey = _partitionKeyValue; + } + + _query = _cosmosClientWrapper.CreateQuery( + _containerId, _cosmosSqlQuery, continuationToken: null, queryRequestOptions); + } if (!_query.HasMoreResults) { @@ -901,7 +873,7 @@ public async ValueTask MoveNextAsync() return false; } - _responseMessage = await _query.ReadNextAsync(_cancellationToken).ConfigureAwait(false); + _responseMessage = await _query.ReadNextAsync(cancellationToken).ConfigureAwait(false); _cosmosClientWrapper._commandLogger.ExecutedReadNext( _responseMessage.Diagnostics.GetClientElapsedTime(), @@ -913,14 +885,12 @@ public async ValueTask MoveNextAsync() _responseMessage.EnsureSuccessStatusCode(); - _responseStream = _responseMessage.Content; - _reader = new StreamReader(_responseStream); - _jsonReader = CreateJsonReader(_reader); + _responseMessageEnumerator = new ResponseMessageEnumerable(_responseMessage).GetAsyncEnumerator(cancellationToken); } - if (TryReadJObject(_jsonReader, out var jObject)) + if (await _responseMessageEnumerator.MoveNextAsync().ConfigureAwait(false)) { - _current = jObject; + _current = _responseMessageEnumerator.Current; return true; } @@ -931,21 +901,131 @@ public async ValueTask MoveNextAsync() private async Task ResetReadAsync() { - _jsonReader?.Close(); - _jsonReader = null; - await _reader.DisposeAsyncIfAvailable().ConfigureAwait(false); - _reader = null; - await _responseStream.DisposeAsyncIfAvailable().ConfigureAwait(false); - _responseStream = null; + if (_responseMessageEnumerator is not null) + { + await _responseMessageEnumerator.DisposeAsync().ConfigureAwait(false); + _responseMessageEnumerator = null; + } + + _responseMessage?.Dispose(); } public async ValueTask DisposeAsync() { await ResetReadAsync().ConfigureAwait(false); + _query?.Dispose(); + } + } + } + + #region ResponseMessageEnumerable + + /// + /// This is an internal API that supports the Entity Framework Core infrastructure and not subject to + /// the same compatibility standards as public APIs. It may be changed or removed without notice in + /// any release. You should only use it directly in your code with extreme caution and knowing that + /// doing so can result in application failures when updating to a new Entity Framework Core release. + /// + public virtual IEnumerable GetResponseMessageEnumerable(ResponseMessage responseMessage) + => new ResponseMessageEnumerable(responseMessage); + + private sealed class ResponseMessageEnumerable(ResponseMessage responseMessage) : IEnumerable, IAsyncEnumerable + { + public IEnumerator GetEnumerator() + => new ResponseMessageEnumerator(responseMessage); + + IEnumerator IEnumerable.GetEnumerator() + => GetEnumerator(); + + public IAsyncEnumerator GetAsyncEnumerator(CancellationToken cancellationToken = default) + => new ResponseMessageAsyncEnumerator(responseMessage); + } + + private sealed class ResponseMessageEnumerator : IEnumerator + { + private readonly Stream _responseStream; + private readonly StreamReader _reader; + private readonly JsonTextReader _jsonReader; + + private JObject? _current; + + public ResponseMessageEnumerator(ResponseMessage responseMessage) + { + _responseStream = responseMessage.Content; + _reader = new StreamReader(_responseStream); + _jsonReader = CreateJsonReader(_reader); + } + + public bool MoveNext() + { + while (_jsonReader.Read()) + { + if (_jsonReader.TokenType == JsonToken.StartObject) + { + _current = Serializer.Deserialize(_jsonReader); + return true; + } + } + + return false; + } + + public JObject Current + => _current ?? throw new InvalidOperationException(); + + object IEnumerator.Current + => Current; - await _responseMessage.DisposeAsyncIfAvailable().ConfigureAwait(false); - _responseMessage = null; + public void Dispose() + { + _jsonReader.Close(); + _reader.Dispose(); + _responseStream.Dispose(); + } + + public void Reset() + => throw new NotSupportedException(); + } + + private sealed class ResponseMessageAsyncEnumerator : IAsyncEnumerator + { + private readonly Stream _responseStream; + private readonly StreamReader _reader; + private readonly JsonTextReader _jsonReader; + + private JObject? _current; + + public ResponseMessageAsyncEnumerator(ResponseMessage responseMessage) + { + _responseStream = responseMessage.Content; + _reader = new StreamReader(_responseStream); + _jsonReader = CreateJsonReader(_reader); + } + + public async ValueTask MoveNextAsync() + { + while (await _jsonReader.ReadAsync().ConfigureAwait(false)) + { + if (_jsonReader.TokenType == JsonToken.StartObject) + { + _current = Serializer.Deserialize(_jsonReader); + return true; + } } + + return false; + } + + public JObject Current + => _current ?? throw new InvalidOperationException(); + + public async ValueTask DisposeAsync() + { + _jsonReader.Close(); + _reader.Dispose(); + await _responseStream.DisposeAsync().ConfigureAwait(false); } } + + #endregion ResponseMessageEnumerable } diff --git a/src/EFCore.Cosmos/Storage/Internal/ICosmosClientWrapper.cs b/src/EFCore.Cosmos/Storage/Internal/ICosmosClientWrapper.cs index fa8ba51f089..20bb42fa670 100644 --- a/src/EFCore.Cosmos/Storage/Internal/ICosmosClientWrapper.cs +++ b/src/EFCore.Cosmos/Storage/Internal/ICosmosClientWrapper.cs @@ -135,7 +135,11 @@ Task DeleteItemAsync( /// any release. You should only use it directly in your code with extreme caution and knowing that /// doing so can result in application failures when updating to a new Entity Framework Core release. /// - FeedIterator CreateQuery(string containerId, PartitionKey partitionKeyValue, CosmosSqlQuery query); + FeedIterator CreateQuery( + string containerId, + CosmosSqlQuery query, + string? continuationToken = null, + QueryRequestOptions? queryRequestOptions = null); /// /// This is an internal API that supports the Entity Framework Core infrastructure and not subject to @@ -181,4 +185,12 @@ IAsyncEnumerable ExecuteSqlQueryAsync( string containerId, PartitionKey partitionKeyValue, CosmosSqlQuery query); + + /// + /// This is an internal API that supports the Entity Framework Core infrastructure and not subject to + /// the same compatibility standards as public APIs. It may be changed or removed without notice in + /// any release. You should only use it directly in your code with extreme caution and knowing that + /// doing so can result in application failures when updating to a new Entity Framework Core release. + /// + IEnumerable GetResponseMessageEnumerable(ResponseMessage responseMessage); } diff --git a/test/EFCore.Cosmos.FunctionalTests/EFCore.Cosmos.FunctionalTests.csproj b/test/EFCore.Cosmos.FunctionalTests/EFCore.Cosmos.FunctionalTests.csproj index 01df755416e..a8ecf58320d 100644 --- a/test/EFCore.Cosmos.FunctionalTests/EFCore.Cosmos.FunctionalTests.csproj +++ b/test/EFCore.Cosmos.FunctionalTests/EFCore.Cosmos.FunctionalTests.csproj @@ -8,6 +8,7 @@ true true + $(NoWarn);EF9102 diff --git a/test/EFCore.Cosmos.FunctionalTests/Query/NorthwindMiscellaneousQueryCosmosTest.cs b/test/EFCore.Cosmos.FunctionalTests/Query/NorthwindMiscellaneousQueryCosmosTest.cs index 41e1d99ceb1..f9dccc60c94 100644 --- a/test/EFCore.Cosmos.FunctionalTests/Query/NorthwindMiscellaneousQueryCosmosTest.cs +++ b/test/EFCore.Cosmos.FunctionalTests/Query/NorthwindMiscellaneousQueryCosmosTest.cs @@ -5324,6 +5324,95 @@ FROM root c """); }); + [ConditionalFact] + public virtual async Task ToPageAsync() + { + await using var context = CreateContext(); + + var totalCustomers = await context.Set().CountAsync(); + + var page1 = await context.Set() + .OrderBy(c => c.CustomerID) + .ToPageAsync(pageSize: 1, continuationToken: null); + + var customer1 = Assert.Single(page1.Values); + Assert.Equal("ALFKI", customer1.CustomerID); + + var page2 = await context.Set() + .OrderBy(c => c.CustomerID) + .ToPageAsync(pageSize: 2, page1.ContinuationToken); + + Assert.Collection( + page2.Values, + c => Assert.Equal("ANATR", c.CustomerID), + c => Assert.Equal("ANTON", c.CustomerID)); + + var page3 = await context.Set() + .OrderBy(c => c.CustomerID) + .ToPageAsync(pageSize: totalCustomers, page2.ContinuationToken); + + Assert.Equal(totalCustomers - 3, page3.Values.Count); + Assert.Null(page3.ContinuationToken); + + AssertSql( + """ +SELECT COUNT(1) AS c +FROM root c +WHERE (c["Discriminator"] = "Customer") +""", + // + """ +SELECT c +FROM root c +WHERE (c["Discriminator"] = "Customer") +ORDER BY c["CustomerID"] +""", + // + """ +SELECT c +FROM root c +WHERE (c["Discriminator"] = "Customer") +ORDER BY c["CustomerID"] +""", + // + """ +SELECT c +FROM root c +WHERE (c["Discriminator"] = "Customer") +ORDER BY c["CustomerID"] +"""); + } + + [ConditionalFact] + public virtual async Task ToPageAsync_with_exact_maxItemCount() + { + await using var context = CreateContext(); + + var totalCustomers = await context.Set().CountAsync(); + + var onlyPage = await context.Set() + .OrderBy(c => c.CustomerID) + .ToPageAsync(pageSize: totalCustomers, continuationToken: null); + + Assert.Equal("ALFKI", onlyPage.Values[0].CustomerID); + Assert.Equal("WOLZA", onlyPage.Values[^1].CustomerID); + Assert.Null(onlyPage.ContinuationToken); + + AssertSql( + """ +SELECT COUNT(1) AS c +FROM root c +WHERE (c["Discriminator"] = "Customer") +""", + // + """ +SELECT c +FROM root c +WHERE (c["Discriminator"] = "Customer") +ORDER BY c["CustomerID"] +"""); + } + private void AssertSql(params string[] expected) => Fixture.TestSqlLoggerFactory.AssertBaseline(expected);