Skip to content

Commit

Permalink
added MakeList and MakeSet aggregators
Browse files Browse the repository at this point in the history
  • Loading branch information
ezrahaleva-msft committed May 14, 2024
1 parent f304e97 commit e443aa3
Show file tree
Hide file tree
Showing 4 changed files with 176 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ internal enum AggregateOperator
Average,
Count,
Max,
MakeList,
MakeSet,
Min,
Sum,
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
//------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
//------------------------------------------------------------

namespace Microsoft.Azure.Cosmos.Query.Core.Pipeline.Aggregate.Aggregators
{
using System;
using System.Collections.Generic;
using System.Globalization;
using System.Text;
using Microsoft.Azure.Cosmos.CosmosElements;
using Microsoft.Azure.Cosmos.CosmosElements.Numbers;
using Microsoft.Azure.Cosmos.Query.Core.Exceptions;
using Microsoft.Azure.Cosmos.Query.Core.Monads;

internal sealed class MakeListAggregator : IAggregator
{
private readonly List<CosmosElement> globalList;

private MakeListAggregator(CosmosArray initialList)
{
this.globalList = new List<CosmosElement>();
foreach (CosmosElement setItem in initialList)
{
this.globalList.Add(setItem);
}
}

public void Aggregate(CosmosElement localList)
{
if (!(localList is CosmosArray cosmosArray))
{
throw new ArgumentException($"{nameof(localList)} must be an array.");
}

foreach (CosmosElement listItem in cosmosArray)
{
this.globalList.Add(listItem);
}
}

public CosmosElement GetResult()
{
CosmosElement[] cosmosElementArray = new CosmosElement[this.globalList.Count];
this.globalList.CopyTo(cosmosElementArray);
return CosmosArray.Create(cosmosElementArray);
}

public string GetContinuationToken()
{
return this.globalList.ToString();
}

public static TryCatch<IAggregator> TryCreate(CosmosElement continuationToken)
{
CosmosArray partialList;
if (continuationToken != null)
{
if (!(continuationToken is CosmosArray cosmosPartialList))
{
return TryCatch<IAggregator>.FromException(
new MalformedContinuationTokenException($@"Invalid MakeList continuation token: ""{continuationToken}""."));
}

partialList = cosmosPartialList;
}
else
{
partialList = CosmosArray.Empty;
}

return TryCatch<IAggregator>.FromResult(
new MakeListAggregator(initialList: partialList));
}

public CosmosElement GetCosmosElementContinuationToken()
{
CosmosElement[] cosmosElementArray = new CosmosElement[this.globalList.Count];
this.globalList.CopyTo(cosmosElementArray);
return CosmosArray.Create(cosmosElementArray);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
//------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
//------------------------------------------------------------

namespace Microsoft.Azure.Cosmos.Query.Core.Pipeline.Aggregate.Aggregators
{
using System;
using System.Collections.Generic;
using System.Globalization;
using System.Text;
using Microsoft.Azure.Cosmos.CosmosElements;
using Microsoft.Azure.Cosmos.CosmosElements.Numbers;
using Microsoft.Azure.Cosmos.Query.Core.Exceptions;
using Microsoft.Azure.Cosmos.Query.Core.Monads;

internal sealed class MakeSetAggregator : IAggregator
{
private readonly HashSet<CosmosElement> globalSet;

private MakeSetAggregator(CosmosArray initialSet)
{
this.globalSet = new HashSet<CosmosElement>();
foreach (CosmosElement setItem in initialSet)
{
this.globalSet.Add(setItem);
}
}

public void Aggregate(CosmosElement localSet)
{
if (!(localSet is CosmosArray cosmosArray))
{
throw new ArgumentException($"{nameof(localSet)} must be an array.");
}

foreach (CosmosElement setItem in cosmosArray)
{
this.globalSet.Add(setItem);
}
}

public CosmosElement GetResult()
{
CosmosElement[] cosmosElementArray = new CosmosElement[this.globalSet.Count];
this.globalSet.CopyTo(cosmosElementArray);
return CosmosArray.Create(cosmosElementArray);
}

public string GetContinuationToken()
{
return this.globalSet.ToString();
}

public static TryCatch<IAggregator> TryCreate(CosmosElement continuationToken)
{
CosmosArray partialSet;
if (continuationToken != null)
{
if (!(continuationToken is CosmosArray cosmosPartialSet))
{
return TryCatch<IAggregator>.FromException(
new MalformedContinuationTokenException($@"Invalid MakeSet continuation token: ""{continuationToken}""."));
}

partialSet = cosmosPartialSet;
}
else
{
partialSet = CosmosArray.Empty;
}

return TryCatch<IAggregator>.FromResult(
new MakeSetAggregator(initialSet: partialSet));
}

public CosmosElement GetCosmosElementContinuationToken()
{
CosmosElement[] cosmosElementArray = new CosmosElement[this.globalSet.Count];
this.globalSet.CopyTo(cosmosElementArray);
return CosmosArray.Create(cosmosElementArray);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,14 @@ public static TryCatch<AggregateValue> TryCreate(
tryCreateAggregator = CountAggregator.TryCreate(continuationToken);
break;

case AggregateOperator.MakeList:
tryCreateAggregator = MakeListAggregator.TryCreate(continuationToken);
break;

case AggregateOperator.MakeSet:
tryCreateAggregator = MakeSetAggregator.TryCreate(continuationToken);
break;

case AggregateOperator.Max:
tryCreateAggregator = MinMaxAggregator.TryCreateMaxAggregator(continuationToken);
break;
Expand All @@ -381,7 +389,6 @@ public static TryCatch<AggregateValue> TryCreate(
case AggregateOperator.Sum:
tryCreateAggregator = SumAggregator.TryCreate(continuationToken);
break;

default:
throw new ArgumentException($"Unknown {nameof(AggregateOperator)}: {aggregateOperator}.");
}
Expand Down

0 comments on commit e443aa3

Please sign in to comment.