Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ReadMany: Adds ReadManyItems Api using Query under the covers #2352

Merged
merged 21 commits into from
Apr 16, 2021
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions Microsoft.Azure.Cosmos/src/ReadManyHelper.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
//------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
//------------------------------------------------------------

namespace Microsoft.Azure.Cosmos
{
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos.Tracing;

internal abstract class ReadManyHelper
{
public abstract Task<ResponseMessage> ExecuteReadManyRequestAsync(IReadOnlyList<(string, PartitionKey)> items,
asketagarwal marked this conversation as resolved.
Show resolved Hide resolved
ITrace trace,
CancellationToken cancellationToken = default);

public abstract Task<FeedResponse<T>> ExecuteReadManyRequestAsync<T>(IReadOnlyList<(string, PartitionKey)> items,
ITrace trace,
CancellationToken cancellationToken = default);
}
}
208 changes: 208 additions & 0 deletions Microsoft.Azure.Cosmos/src/ReadManyQueryHelper.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,208 @@
//------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
//------------------------------------------------------------

namespace Microsoft.Azure.Cosmos
{
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos.CosmosElements;
using Microsoft.Azure.Cosmos.Json;
using Microsoft.Azure.Cosmos.Query.Core;
using Microsoft.Azure.Cosmos.Routing;
using Microsoft.Azure.Cosmos.Serializer;
using Microsoft.Azure.Cosmos.Tracing;
using Microsoft.Azure.Documents;

internal sealed class ReadManyQueryHelper : ReadManyHelper
{
private readonly string partitionKeySelector;
private readonly int maxConcurrency = Environment.ProcessorCount * 10;
private readonly ContainerCore container;

public ReadManyQueryHelper(PartitionKeyDefinition partitionKeyDefinition,
ContainerCore container)
{
this.partitionKeySelector = this.CreatePkSelector(partitionKeyDefinition);
this.container = container;
}

public override async Task<ResponseMessage> ExecuteReadManyRequestAsync(IReadOnlyList<(string, PartitionKey)> items,
ITrace trace,
CancellationToken cancellationToken = default)
{
IDictionary<string, List<(string, PartitionKey)>> partitionKeyRangeItemMap =
await this.CreatePartitionKeyRangeItemListMapAsync(items, cancellationToken);

SemaphoreSlim semaphore = new SemaphoreSlim(0, this.maxConcurrency);
List<Task<List<ResponseMessage>>> tasks = new List<Task<List<ResponseMessage>>>();

foreach (KeyValuePair<string, List<(string, PartitionKey)>> entry in partitionKeyRangeItemMap)
{
tasks.Add(Task.Run(async () =>
{
// Only allow 'maxConcurrency' number of queries at a time
await semaphore.WaitAsync();

try
{
QueryDefinition queryDefinition = (this.partitionKeySelector == "[\"id\"]") ?
asketagarwal marked this conversation as resolved.
Show resolved Hide resolved
this.CreateReadManyQueryDefifnitionForId(entry.Value) :
this.CreateReadManyQueryDefifnitionForOther(entry.Value);

List<ResponseMessage> pages = new List<ResponseMessage>();
FeedIteratorInternal feedIterator = (FeedIteratorInternal)this.container.GetItemQueryStreamIterator(new FeedRangePartitionKeyRange(entry.Key),
asketagarwal marked this conversation as resolved.
Show resolved Hide resolved
queryDefinition);
while (feedIterator.HasMoreResults)
{
using (ResponseMessage responseMessage = await feedIterator.ReadNextAsync(trace, cancellationToken))
asketagarwal marked this conversation as resolved.
Show resolved Hide resolved
{
pages.Add(responseMessage);
}
}

return pages;
}
finally
{
semaphore.Release();
}
}));
}

// Restore semaphore to max Count and allow tasks to run
semaphore.Release(this.maxConcurrency);

List<ResponseMessage>[] queryResponses = await Task.WhenAll(tasks);
return this.CombineStreamsFromQueryResponses(queryResponses);
}

public override Task<FeedResponse<T>> ExecuteReadManyRequestAsync<T>(IReadOnlyList<(string, PartitionKey)> items,
ITrace trace,
CancellationToken cancellationToken = default)
{
throw new NotImplementedException();
}

private async Task<IDictionary<string, List<(string, PartitionKey)>>> CreatePartitionKeyRangeItemListMapAsync(
IReadOnlyList<(string, PartitionKey)> items,
CancellationToken cancellationToken = default)
{
CollectionRoutingMap collectionRoutingMap = await this.container.GetRoutingMapAsync(cancellationToken);
PartitionKeyDefinition partitionKeyDefinition = await this.container.GetPartitionKeyDefinitionAsync(cancellationToken);

IDictionary<string, List<(string, PartitionKey)>> partitionKeyRangeItemMap = new
Dictionary<string, List<(string, PartitionKey)>>();

foreach ((string id, PartitionKey pk) item in items)
{
string effectivePartitionKeyValue = item.pk.InternalKey.GetEffectivePartitionKeyString(partitionKeyDefinition);
PartitionKeyRange partitionKeyRange = collectionRoutingMap.GetRangeByEffectivePartitionKey(effectivePartitionKeyValue);
if (partitionKeyRangeItemMap.TryGetValue(partitionKeyRange.Id, out List<(string, PartitionKey)> itemList))
{
itemList.Add(item);
}
else
{
List<(string, PartitionKey)> newList = new List<(string, PartitionKey)> { item };
partitionKeyRangeItemMap[partitionKeyRange.Id] = newList;
}
}

return partitionKeyRangeItemMap;
}

private ResponseMessage CombineStreamsFromQueryResponses(List<ResponseMessage>[] queryResponses)
{
List<CosmosElement> cosmosElements = new List<CosmosElement>();
foreach (List<ResponseMessage> responseMessagesForSinglePartition in queryResponses)
{
foreach (ResponseMessage responseMessage in responseMessagesForSinglePartition)
{
if (responseMessage is QueryResponse queryResponse)
{
cosmosElements.AddRange(queryResponse.CosmosElements);
}
else
{
throw new InvalidOperationException("Read Many is being used with Query");
}
}
}

return new ResponseMessage(System.Net.HttpStatusCode.OK)
asketagarwal marked this conversation as resolved.
Show resolved Hide resolved
{
Content = CosmosElementSerializer.ToStream(string.Empty, cosmosElements, ResourceType.Document)
};
}

private QueryDefinition CreateReadManyQueryDefifnitionForId(List<(string, PartitionKey)> items)
{
StringBuilder queryStringBuilder = new StringBuilder();

queryStringBuilder.Append("SELECT * FROM c WHERE c.id IN ( ");
for (int i = 0; i < items.Count; i++)
asketagarwal marked this conversation as resolved.
Show resolved Hide resolved
{
queryStringBuilder.Append($"'{items[i].Item1}'");
if (i < items.Count - 1)
{
queryStringBuilder.Append(",");
}
}
queryStringBuilder.Append(" )");

return new QueryDefinition(queryStringBuilder.ToString());
}

private QueryDefinition CreateReadManyQueryDefifnitionForOther(List<(string, PartitionKey)> items)
{
StringBuilder queryStringBuilder = new StringBuilder();
SqlParameterCollection sqlParameters = new SqlParameterCollection();

queryStringBuilder.Append("SELECT * FROM c WHERE ( ");
for (int i = 0; i < items.Count; i++)
asketagarwal marked this conversation as resolved.
Show resolved Hide resolved
{
string pkParamName = "@param_pk" + i;
sqlParameters.Add(new SqlParameter(pkParamName, items[i].Item2));

string idParamName = "@param_id" + i;
sqlParameters.Add(new SqlParameter(idParamName, items[i].Item2));

queryStringBuilder.Append("( ");
queryStringBuilder.Append("c.id = ");
queryStringBuilder.Append(idParamName);
queryStringBuilder.Append(" AND ");
queryStringBuilder.Append("c");
queryStringBuilder.Append(this.partitionKeySelector);
queryStringBuilder.Append(" = ");
queryStringBuilder.Append(pkParamName);
queryStringBuilder.Append(" )");

if (i < items.Count - 1)
{
queryStringBuilder.Append(" OR ");
}
}
queryStringBuilder.Append(" )");

return QueryDefinition.CreateFromQuerySpec(new SqlQuerySpec(queryStringBuilder.ToString(),
sqlParameters));
}

private string CreatePkSelector(PartitionKeyDefinition partitionKeyDefinition)
{
List<string> pathParts = new List<string>();
foreach (string path in partitionKeyDefinition.Paths)
{
// Ignore '/' in the beginning and escaping quote
string modifiedString = path.Substring(1).Replace("\"", "\\");
pathParts.Add(modifiedString);
}

return string.Join(String.Empty, pathParts);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ namespace Microsoft.Azure.Cosmos
using Microsoft.Azure.Cosmos.ChangeFeed;
using Microsoft.Azure.Cosmos.ChangeFeed.FeedProcessing;
using Microsoft.Azure.Cosmos.ChangeFeed.Pagination;
using Microsoft.Azure.Cosmos.ChangeFeed.Utils;
using Microsoft.Azure.Cosmos.Common;
using Microsoft.Azure.Cosmos.CosmosElements;
using Microsoft.Azure.Cosmos.Json;
using Microsoft.Azure.Cosmos.Linq;
Expand All @@ -27,9 +29,11 @@ namespace Microsoft.Azure.Cosmos
using Microsoft.Azure.Cosmos.Query.Core.QueryPlan;
using Microsoft.Azure.Cosmos.ReadFeed;
using Microsoft.Azure.Cosmos.ReadFeed.Pagination;
using Microsoft.Azure.Cosmos.Routing;
using Microsoft.Azure.Cosmos.Serializer;
using Microsoft.Azure.Cosmos.Tracing;
using Microsoft.Azure.Documents;
using Microsoft.Azure.Documents.Routing;

/// <summary>
/// Used to perform operations on items. There are two different types of operations.
Expand Down Expand Up @@ -278,6 +282,17 @@ public override FeedIterator GetItemQueryStreamIterator(
requestOptions: requestOptions);
}

public async Task<ResponseMessage> ReadManyAsync<T>(
IReadOnlyList<(string, PartitionKey)> items,
ITrace trace,
CancellationToken cancellationToken = default)
{
ReadManyHelper readManyHelper = new ReadManyQueryHelper(await this.GetPartitionKeyDefinitionAsync(),
this);

return await readManyHelper.ExecuteReadManyRequestAsync(items, trace, cancellationToken);
}

/// <summary>
/// Used in the compute gateway to support legacy gateway interface.
/// </summary>
Expand Down