Skip to content

Commit

Permalink
Implement Cosmos pagination (#34103)
Browse files Browse the repository at this point in the history
Closes #24513
  • Loading branch information
roji committed Jul 1, 2024
1 parent bedc332 commit fee7c1a
Show file tree
Hide file tree
Showing 16 changed files with 896 additions and 173 deletions.
1 change: 1 addition & 0 deletions src/EFCore.Analyzers/EFDiagnostics.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,5 @@ public static class EFDiagnostics
public const string ProviderExperimentalApi = "EF9002";
public const string PrecompiledQueryExperimental = "EF9100";
public const string MetricsExperimental = "EF9101";
public const string PagingExperimental = "EF9102";
}
4 changes: 2 additions & 2 deletions src/EFCore.Cosmos/Diagnostics/CosmosQueryEventData.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ public CosmosQueryEventData(
EventDefinitionBase eventDefinition,
Func<EventDefinitionBase, EventData, string> messageGenerator,
string containerId,
PartitionKey partitionKeyValue,
PartitionKey? partitionKeyValue,
IReadOnlyList<(string Name, object? Value)> parameters,
string querySql,
bool logSensitiveData)
Expand All @@ -46,7 +46,7 @@ public CosmosQueryEventData(
/// <summary>
/// The key of the Cosmos partition that the query is using.
/// </summary>
public virtual PartitionKey PartitionKeyValue { get; }
public virtual PartitionKey? PartitionKeyValue { get; }

/// <summary>
/// Name/values for each parameter in the Cosmos Query.
Expand Down
4 changes: 2 additions & 2 deletions src/EFCore.Cosmos/Diagnostics/CosmosQueryExecutedEventData.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public CosmosQueryExecutedEventData(
double requestCharge,
string activityId,
string containerId,
PartitionKey partitionKeyValue,
PartitionKey? partitionKeyValue,
IReadOnlyList<(string Name, object? Value)> parameters,
string querySql,
bool logSensitiveData)
Expand Down Expand Up @@ -70,7 +70,7 @@ public CosmosQueryExecutedEventData(
/// <summary>
/// The key of the Cosmos partition that the query is using.
/// </summary>
public virtual PartitionKey PartitionKeyValue { get; }
public virtual PartitionKey? PartitionKeyValue { get; }

/// <summary>
/// Name/values for each parameter in the Cosmos Query.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public static void SyncNotSupported(
public static void ExecutingSqlQuery(
this IDiagnosticsLogger<DbLoggerCategory.Database.Command> diagnostics,
string containerId,
PartitionKey partitionKeyValue,
PartitionKey? partitionKeyValue,
CosmosSqlQuery cosmosSqlQuery)
{
var definition = CosmosResources.LogExecutingSqlQuery(diagnostics);
Expand All @@ -66,7 +66,7 @@ public static void ExecutingSqlQuery(
definition.Log(
diagnostics,
containerId,
logSensitiveData ? partitionKeyValue.ToString() : "?",
logSensitiveData ? partitionKeyValue?.ToString() : "?",
FormatParameters(cosmosSqlQuery.Parameters, logSensitiveData && cosmosSqlQuery.Parameters.Count > 0),
Environment.NewLine,
cosmosSqlQuery.Query);
Expand Down Expand Up @@ -158,7 +158,7 @@ public static void ExecutedReadNext(
double requestCharge,
string activityId,
string containerId,
PartitionKey partitionKeyValue,
PartitionKey? partitionKeyValue,
CosmosSqlQuery cosmosSqlQuery)
{
var definition = CosmosResources.LogExecutedReadNext(diagnostics);
Expand All @@ -177,7 +177,7 @@ public static void ExecutedReadNext(
requestCharge,
activityId,
containerId,
logSensitiveData ? partitionKeyValue.ToString() : "?",
logSensitiveData ? partitionKeyValue?.ToString() : "?",
FormatParameters(cosmosSqlQuery.Parameters, logSensitiveData && cosmosSqlQuery.Parameters.Count > 0),
Environment.NewLine,
cosmosSqlQuery.Query));
Expand Down
1 change: 1 addition & 0 deletions src/EFCore.Cosmos/EFCore.Cosmos.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
<PackageTags>$(PackageTags);CosmosDb;SQL API</PackageTags>
<ImplicitUsings>true</ImplicitUsings>
<NoWarn>$(NoWarn);EF9101</NoWarn> <!-- Metrics is experimental -->
<NoWarn>$(NoWarn);EF9102</NoWarn> <!-- Paging is experimental -->
</PropertyGroup>

<ItemGroup>
Expand Down
49 changes: 49 additions & 0 deletions src/EFCore.Cosmos/Extensions/CosmosQueryableExtensions.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.

using System.Diagnostics.CodeAnalysis;
using JetBrains.Annotations;
using Microsoft.EntityFrameworkCore.Cosmos.Query.Internal;
using Microsoft.EntityFrameworkCore.Query.Internal;
Expand Down Expand Up @@ -177,4 +178,52 @@ private static FromSqlQueryRootExpression GenerateFromSqlQueryRoot(
sql,
Expression.Constant(arguments));
}

internal static readonly MethodInfo ToPageAsyncMethodInfo
= typeof(CosmosQueryableExtensions).GetMethod(nameof(ToPageAsync))!;


/// <summary>
/// Allows paginating through query results by repeatedly executing the same query, passing continuation tokens to retrieve
/// successive pages of the result set, and specifying the maximum number of results per page.
/// </summary>
/// <param name="source">The source query.</param>
/// <param name="continuationToken">
/// An optional continuation token returned from a previous execution of this query via
/// <see cref="CosmosPage{T}.ContinuationToken" />. If <see langword="null" />, retrieves query results from the start.
/// </param>
/// <param name="pageSize">
/// The maximum number of results in the returned <see cref="CosmosPage{T}" />. The page may contain fewer results if the database
/// did not contain enough matching results.
/// </param>
/// <param name="responseContinuationTokenLimitInKb">Limits the length of continuation token in the query response.</param>
/// <param name="cancellationToken">A <see cref="CancellationToken" /> to observe while waiting for the task to complete.</param>
/// <returns>A <see cref="CosmosPage{T}" /> containing at most <paramref name="pageSize" /> results.</returns>
[Experimental(EFDiagnostics.PagingExperimental)]
public static Task<CosmosPage<TSource>> ToPageAsync<TSource>(
this IQueryable<TSource> source,
int pageSize,
string? continuationToken,
int? responseContinuationTokenLimitInKb = null,
CancellationToken cancellationToken = default)
{
if (source.Provider is not IAsyncQueryProvider provider)
{
throw new InvalidOperationException(CoreStrings.IQueryableProviderNotAsync);
}

return provider.ExecuteAsync<Task<CosmosPage<TSource>>>(
Expression.Call(
instance: null,
method: ToPageAsyncMethodInfo.MakeGenericMethod(typeof(TSource)),
arguments:
[
source.Expression,
Expression.Constant(pageSize, typeof(int)),
Expression.Constant(continuationToken, typeof(string)),
Expression.Constant(responseContinuationTokenLimitInKb, typeof(int?)),
Expression.Constant(default(CancellationToken), typeof(CancellationToken))
]),
cancellationToken);
}
}
32 changes: 32 additions & 0 deletions src/EFCore.Cosmos/Query/CosmosPage.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.

using System.Diagnostics.CodeAnalysis;

// ReSharper disable once CheckNamespace
namespace Microsoft.EntityFrameworkCore;

/// <summary>
/// A single page of results returned from a user query; can be used to paginate through long result-sets.
/// Returned by <see cref="CosmosQueryableExtensions.ToPageAsync{T}" />.
/// </summary>
/// <param name="values">The values contained in this page.</param>
/// <param name="continuationToken">
/// The continuation token for fetching further results from the query. Is <see langword="null" /> or empty when there are no more
/// results.
/// </param>
/// <typeparam name="T">The type of values contained in the page.</typeparam>
[Experimental(EFDiagnostics.PagingExperimental)]
public readonly struct CosmosPage<T>(IReadOnlyList<T> values, string? continuationToken)
{
/// <summary>
/// The values contained in this page.
/// </summary>
public IReadOnlyList<T> Values { get; } = values;

/// <summary>
/// The continuation token for fetching further results from the query. Is <see langword="null" /> or empty when there are no more
/// results.
/// </summary>
public string? ContinuationToken { get; } = continuationToken;
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using System.Diagnostics.CodeAnalysis;
using Microsoft.EntityFrameworkCore.Cosmos.Internal;
using Microsoft.EntityFrameworkCore.Cosmos.Metadata.Internal;
using Microsoft.EntityFrameworkCore.Cosmos.Query.Internal.Expressions;
using Microsoft.EntityFrameworkCore.Cosmos.Storage.Internal;
using Microsoft.EntityFrameworkCore.Internal;

Expand Down Expand Up @@ -86,6 +87,64 @@ protected CosmosQueryableMethodTranslatingExpressionVisitor(
_subquery = true;
}

/// <summary>
/// This is an internal API that supports the Entity Framework Core infrastructure and not subject to
/// the same compatibility standards as public APIs. It may be changed or removed without notice in
/// any release. You should only use it directly in your code with extreme caution and knowing that
/// doing so can result in application failures when updating to a new Entity Framework Core release.
/// </summary>
public override Expression Translate(Expression expression)
{
// Handle ToPageAsync(), which can only ever be the top-level node in the query tree.
if (expression is MethodCallExpression { Method: var method, Arguments: var arguments }
&& method.DeclaringType == typeof(CosmosQueryableExtensions)
&& method.Name is nameof(CosmosQueryableExtensions.ToPageAsync))
{
var source = base.Translate(arguments[0]);

if (source == QueryCompilationContext.NotTranslatedExpression)
{
return source;
}

if (source is not ShapedQueryExpression shapedQuery)
{
throw new UnreachableException($"Expected a ShapedQueryExpression but found {source.GetType().Name}");
}

// The arguments to ToPage/ToPageAsync must have been parameterized by the funcletizer, since they're non-lambda arguments to
// a top-level function (like Skip/Take). Translate to get these as SqlParameterExpressions.
if (arguments is not
[
_, // source
ParameterExpression maxItemCount,
ParameterExpression continuationToken,
ParameterExpression responseContinuationTokenLimitInKb,
_ // cancellation token
]
|| _sqlTranslator.Translate(maxItemCount) is not SqlParameterExpression translatedMaxItemCount
|| _sqlTranslator.Translate(continuationToken) is not SqlParameterExpression translatedContinuationToken
|| _sqlTranslator.Translate(responseContinuationTokenLimitInKb) is not SqlParameterExpression
translatedResponseContinuationTokenLimitInKb)
{
throw new UnreachableException("ToPageAsync without the appropriate parameterized arguments");
}

// Wrap the shaper for the entire query in a PagingExpression which also contains the paging arguments, and update
// the final cardinality to Single (since we'll be returning a single Page).
return shapedQuery
.UpdateShaperExpression(new PagingExpression(
shapedQuery.ShaperExpression,
translatedMaxItemCount,
translatedContinuationToken,
translatedResponseContinuationTokenLimitInKb,
typeof(CosmosPage<>).MakeGenericType(shapedQuery.ShaperExpression.Type)))
.UpdateResultCardinality(ResultCardinality.Single);
}

return base.Translate(expression);
}

/// <summary>
/// This is an internal API that supports the Entity Framework Core infrastructure and not subject to
/// the same compatibility standards as public APIs. It may be changed or removed without notice in
Expand Down
Loading

0 comments on commit fee7c1a

Please sign in to comment.