From 5a9a7044c695392ab5ac4c3252236692e67709e1 Mon Sep 17 00:00:00 2001 From: Jay Deng Date: Tue, 8 Aug 2023 14:21:54 -0700 Subject: [PATCH 1/4] Use BucketCountThresholds in InternalTerms and InternalAggregations and do not apply shard level thresholds at slice level for Concurrent Segment Search Signed-off-by: Jay Deng --- CHANGELOG.md | 3 +- .../aggregations/TermsReduceBenchmark.java | 7 +- .../StringTermsSerializationBenchmark.java | 7 +- .../AggregationCollectorManager.java | 2 +- .../terms/AbstractStringTermsAggregator.java | 20 +---- .../bucket/terms/DoubleTerms.java | 24 ++---- .../GlobalOrdinalsStringTermsAggregator.java | 28 ++++--- .../terms/InternalMappedSignificantTerms.java | 7 +- .../bucket/terms/InternalMappedTerms.java | 10 +-- .../bucket/terms/InternalMultiTerms.java | 23 ++---- .../terms/InternalSignificantTerms.java | 40 ++++++++-- .../bucket/terms/InternalTerms.java | 51 +++++++++--- .../aggregations/bucket/terms/LongTerms.java | 30 +++---- .../terms/MapStringTermsAggregator.java | 24 +++--- .../bucket/terms/MultiTermsAggregator.java | 25 +++--- .../bucket/terms/NumericTermsAggregator.java | 80 +++++++------------ .../bucket/terms/SignificantLongTerms.java | 17 ++-- .../bucket/terms/SignificantStringTerms.java | 17 ++-- .../SignificantTermsAggregatorFactory.java | 7 +- .../bucket/terms/StringTerms.java | 24 ++---- .../bucket/terms/TermsAggregatorFactory.java | 8 +- .../terms/UnmappedSignificantTerms.java | 12 ++- .../bucket/terms/UnmappedTerms.java | 13 ++- .../bucket/terms/UnsignedLongTerms.java | 30 +++---- .../InternalAggregationsTests.java | 13 ++- .../InternalMultiBucketAggregationTests.java | 14 ++-- .../bucket/terms/DoubleTermsTests.java | 20 ++--- .../bucket/terms/InternalMultiTermsTests.java | 12 ++- .../InternalSignificantTermsTestCase.java | 2 +- .../bucket/terms/InternalTermsTestCase.java | 2 +- .../bucket/terms/LongTermsTests.java | 20 ++--- .../terms/SignificanceHeuristicTests.java | 32 ++++++-- .../terms/SignificantLongTermsTests.java | 19 +++-- .../terms/SignificantStringTermsTests.java | 12 ++- .../bucket/terms/StringTermsTests.java | 20 ++--- .../bucket/terms/UnsignedLongTermsTests.java | 20 ++--- 36 files changed, 350 insertions(+), 345 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index e5ab51e9b162e..90e7df11140b9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -118,6 +118,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Add support for aggregation profiler with concurrent aggregation ([#8801](https://github.com/opensearch-project/OpenSearch/pull/8801)) - [Remove] Deprecated Fractional ByteSizeValue support #9005 ([#9005](https://github.com/opensearch-project/OpenSearch/pull/9005)) - Make MultiBucketConsumerService thread safe to use across slices during search ([#9047](https://github.com/opensearch-project/OpenSearch/pull/9047)) +- Change shard_size and shard_min_doc_count evaluation to happen in shard level reduce phase ([#9085](https://github.com/opensearch-project/OpenSearch/pull/9085)) ### Deprecated @@ -129,4 +130,4 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), ### Security [Unreleased 3.0]: https://github.com/opensearch-project/OpenSearch/compare/2.x...HEAD -[Unreleased 2.x]: https://github.com/opensearch-project/OpenSearch/compare/2.10...2.x \ No newline at end of file +[Unreleased 2.x]: https://github.com/opensearch-project/OpenSearch/compare/2.10...2.x diff --git a/benchmarks/src/main/java/org/opensearch/benchmark/search/aggregations/TermsReduceBenchmark.java b/benchmarks/src/main/java/org/opensearch/benchmark/search/aggregations/TermsReduceBenchmark.java index b18ea4327cbc2..c3c1103b5b098 100644 --- a/benchmarks/src/main/java/org/opensearch/benchmark/search/aggregations/TermsReduceBenchmark.java +++ b/benchmarks/src/main/java/org/opensearch/benchmark/search/aggregations/TermsReduceBenchmark.java @@ -57,6 +57,7 @@ import org.opensearch.search.aggregations.InternalAggregations; import org.opensearch.search.aggregations.MultiBucketConsumerService; import org.opensearch.search.aggregations.bucket.terms.StringTerms; +import org.opensearch.search.aggregations.bucket.terms.TermsAggregator; import org.opensearch.search.aggregations.pipeline.PipelineAggregator; import org.opensearch.search.builder.SearchSourceBuilder; import org.opensearch.search.query.QuerySearchResult; @@ -170,15 +171,13 @@ private StringTerms newTerms(Random rand, BytesRef[] dict, boolean withNested) { "terms", BucketOrder.key(true), BucketOrder.count(false), - topNSize, - 1, Collections.emptyMap(), DocValueFormat.RAW, - numShards, true, 0, buckets, - 0 + 0, + new TermsAggregator.BucketCountThresholds(1, 0, topNSize, numShards) ); } diff --git a/benchmarks/src/main/java/org/opensearch/benchmark/search/aggregations/bucket/terms/StringTermsSerializationBenchmark.java b/benchmarks/src/main/java/org/opensearch/benchmark/search/aggregations/bucket/terms/StringTermsSerializationBenchmark.java index 8f86a0f3afbc6..c3979d0722035 100644 --- a/benchmarks/src/main/java/org/opensearch/benchmark/search/aggregations/bucket/terms/StringTermsSerializationBenchmark.java +++ b/benchmarks/src/main/java/org/opensearch/benchmark/search/aggregations/bucket/terms/StringTermsSerializationBenchmark.java @@ -51,6 +51,7 @@ import org.openjdk.jmh.annotations.Setup; import org.openjdk.jmh.annotations.State; import org.openjdk.jmh.annotations.Warmup; +import org.opensearch.search.aggregations.bucket.terms.TermsAggregator; import java.util.ArrayList; import java.util.List; @@ -86,15 +87,13 @@ private StringTerms newTerms(boolean withNested) { "test", BucketOrder.key(true), BucketOrder.key(true), - buckets, - 1, null, DocValueFormat.RAW, - buckets, false, 100000, resultBuckets, - 0 + 0, + new TermsAggregator.BucketCountThresholds(1, 0, buckets, buckets) ); } diff --git a/server/src/main/java/org/opensearch/search/aggregations/AggregationCollectorManager.java b/server/src/main/java/org/opensearch/search/aggregations/AggregationCollectorManager.java index 1f60ff6503ca8..11052cd215fb6 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/AggregationCollectorManager.java +++ b/server/src/main/java/org/opensearch/search/aggregations/AggregationCollectorManager.java @@ -66,7 +66,7 @@ public ReduceableSearchResult reduce(Collection collectors) throws IO // Reduce the aggregations across slices before sending to the coordinator. We will perform shard level reduce iff multiple slices // were created to execute this request and it used concurrent segment search path // TODO: Add the check for flag that the request was executed using concurrent search - if (collectors.size() > 1) { + if (collectors.size() >= 1) { // using reduce is fine here instead of topLevelReduce as pipeline aggregation is evaluated on the coordinator after all // documents are collected across shards for an aggregation return new AggregationReduceableSearchResult( diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/AbstractStringTermsAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/AbstractStringTermsAggregator.java index 9551be10e52b8..c52ec4c6f7df4 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/AbstractStringTermsAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/AbstractStringTermsAggregator.java @@ -72,20 +72,7 @@ abstract class AbstractStringTermsAggregator extends TermsAggregator { } protected StringTerms buildEmptyTermsAggregation() { - return new StringTerms( - name, - order, - order, - bucketCountThresholds.getRequiredSize(), - bucketCountThresholds.getMinDocCount(), - metadata(), - format, - bucketCountThresholds.getShardSize(), - showTermDocCountError, - 0, - emptyList(), - 0 - ); + return new StringTerms(name, order, order, metadata(), format, showTermDocCountError, 0, emptyList(), 0, bucketCountThresholds); } protected SignificantStringTerms buildEmptySignificantTermsAggregation(long subsetSize, SignificanceHeuristic significanceHeuristic) { @@ -95,14 +82,13 @@ protected SignificantStringTerms buildEmptySignificantTermsAggregation(long subs int supersetSize = topReader.numDocs(); return new SignificantStringTerms( name, - bucketCountThresholds.getRequiredSize(), - bucketCountThresholds.getMinDocCount(), metadata(), format, subsetSize, supersetSize, significanceHeuristic, - emptyList() + emptyList(), + bucketCountThresholds ); } } diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/DoubleTerms.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/DoubleTerms.java index 0b76c302801af..f4e3475940ada 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/DoubleTerms.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/DoubleTerms.java @@ -130,29 +130,25 @@ public DoubleTerms( String name, BucketOrder reduceOrder, BucketOrder order, - int requiredSize, - long minDocCount, Map metadata, DocValueFormat format, - int shardSize, boolean showTermDocCountError, long otherDocCount, List buckets, - long docCountError + long docCountError, + TermsAggregator.BucketCountThresholds bucketCountThresholds ) { super( name, reduceOrder, order, - requiredSize, - minDocCount, metadata, format, - shardSize, showTermDocCountError, otherDocCount, buckets, - docCountError + docCountError, + bucketCountThresholds ); } @@ -174,15 +170,13 @@ public DoubleTerms create(List buckets) { name, reduceOrder, order, - requiredSize, - minDocCount, metadata, format, - shardSize, showTermDocCountError, otherDocCount, buckets, - docCountError + docCountError, + bucketCountThresholds ); } @@ -204,15 +198,13 @@ protected DoubleTerms create(String name, List buckets, BucketOrder redu name, reduceOrder, order, - requiredSize, - minDocCount, getMetadata(), format, - shardSize, showTermDocCountError, otherDocCount, buckets, - docCountError + docCountError, + bucketCountThresholds ); } diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/GlobalOrdinalsStringTermsAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/GlobalOrdinalsStringTermsAggregator.java index e0a22435b8f48..5718ec08eafdd 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/GlobalOrdinalsStringTermsAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/GlobalOrdinalsStringTermsAggregator.java @@ -603,6 +603,15 @@ abstract class ResultStrategy< TB extends InternalMultiBucketAggregation.InternalBucket> implements Releasable { private InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException { + int requiredSizeLocal; + long minDocCountLocal; + if (context.isConcurrentSegmentSearchEnabled()) { + requiredSizeLocal = Integer.MAX_VALUE; + minDocCountLocal = 0; + } else { + requiredSizeLocal = bucketCountThresholds.getShardSize(); + minDocCountLocal = bucketCountThresholds.getShardMinDocCount(); + } if (valueCount == 0) { // no context in this reader InternalAggregation[] results = new InternalAggregation[owningBucketOrds.length]; for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) { @@ -615,11 +624,11 @@ private InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws long[] otherDocCount = new long[owningBucketOrds.length]; for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) { final int size; - if (bucketCountThresholds.getMinDocCount() == 0) { + if (minDocCountLocal == 0) { // if minDocCount == 0 then we can end up with more buckets then maxBucketOrd() returns - size = (int) Math.min(valueCount, bucketCountThresholds.getShardSize()); + size = (int) Math.min(valueCount, requiredSizeLocal); } else { - size = (int) Math.min(maxBucketOrd(), bucketCountThresholds.getShardSize()); + size = (int) Math.min(maxBucketOrd(), requiredSizeLocal); } PriorityQueue ordered = buildPriorityQueue(size); final int finalOrdIdx = ordIdx; @@ -630,7 +639,7 @@ private InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws @Override public void accept(long globalOrd, long bucketOrd, long docCount) throws IOException { otherDocCount[finalOrdIdx] += docCount; - if (docCount >= bucketCountThresholds.getShardMinDocCount()) { + if (docCount >= minDocCountLocal) { if (spare == null) { spare = buildEmptyTemporaryBucket(); } @@ -799,15 +808,13 @@ StringTerms buildResult(long owningBucketOrd, long otherDocCount, StringTerms.Bu name, reduceOrder, order, - bucketCountThresholds.getRequiredSize(), - bucketCountThresholds.getMinDocCount(), metadata(), format, - bucketCountThresholds.getShardSize(), showTermDocCountError, otherDocCount, Arrays.asList(topBuckets), - 0 + 0, + bucketCountThresholds ); } @@ -924,14 +931,13 @@ void buildSubAggs(SignificantStringTerms.Bucket[][] topBucketsPreOrd) throws IOE SignificantStringTerms buildResult(long owningBucketOrd, long otherDocCount, SignificantStringTerms.Bucket[] topBuckets) { return new SignificantStringTerms( name, - bucketCountThresholds.getRequiredSize(), - bucketCountThresholds.getMinDocCount(), metadata(), format, subsetSize(owningBucketOrd), supersetSize, significanceHeuristic, - Arrays.asList(topBuckets) + Arrays.asList(topBuckets), + bucketCountThresholds ); } diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/InternalMappedSignificantTerms.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/InternalMappedSignificantTerms.java index 97a95b8df840b..a7c5427fa38cc 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/InternalMappedSignificantTerms.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/InternalMappedSignificantTerms.java @@ -64,16 +64,15 @@ public abstract class InternalMappedSignificantTerms< protected InternalMappedSignificantTerms( String name, - int requiredSize, - long minDocCount, Map metadata, DocValueFormat format, long subsetSize, long supersetSize, SignificanceHeuristic significanceHeuristic, - List buckets + List buckets, + TermsAggregator.BucketCountThresholds bucketCountThresholds ) { - super(name, requiredSize, minDocCount, metadata); + super(name, bucketCountThresholds, metadata); this.format = format; this.buckets = buckets; this.subsetSize = subsetSize; diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/InternalMappedTerms.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/InternalMappedTerms.java index f5e92fec8195d..943d8e37eed15 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/InternalMappedTerms.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/InternalMappedTerms.java @@ -52,7 +52,6 @@ */ public abstract class InternalMappedTerms, B extends InternalTerms.Bucket> extends InternalTerms { protected final DocValueFormat format; - protected final int shardSize; protected final boolean showTermDocCountError; protected final long otherDocCount; protected final List buckets; @@ -64,19 +63,16 @@ protected InternalMappedTerms( String name, BucketOrder reduceOrder, BucketOrder order, - int requiredSize, - long minDocCount, Map metadata, DocValueFormat format, - int shardSize, boolean showTermDocCountError, long otherDocCount, List buckets, - long docCountError + long docCountError, + TermsAggregator.BucketCountThresholds bucketCountThresholds ) { - super(name, reduceOrder, order, requiredSize, minDocCount, metadata); + super(name, reduceOrder, order, bucketCountThresholds, metadata); this.format = format; - this.shardSize = shardSize; this.showTermDocCountError = showTermDocCountError; this.otherDocCount = otherDocCount; this.docCountError = docCountError; diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/InternalMultiTerms.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/InternalMultiTerms.java index fc84f35385d5c..89fd488d21e23 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/InternalMultiTerms.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/InternalMultiTerms.java @@ -220,7 +220,6 @@ public int compare(List thisObjects, List thatObjects) { } } - private final int shardSize; private final boolean showTermDocCountError; private final long otherDocCount; private final List termFormats; @@ -233,18 +232,16 @@ public InternalMultiTerms( String name, BucketOrder reduceOrder, BucketOrder order, - int requiredSize, - long minDocCount, Map metadata, - int shardSize, boolean showTermDocCountError, long otherDocCount, long docCountError, List formats, - List buckets + List buckets, + TermsAggregator.BucketCountThresholds bucketCountThresholds ) { - super(name, reduceOrder, order, requiredSize, minDocCount, metadata); - this.shardSize = shardSize; + super(name, reduceOrder, order, bucketCountThresholds, metadata); + this.shardSize = bucketCountThresholds.getShardSize(); this.showTermDocCountError = showTermDocCountError; this.otherDocCount = otherDocCount; this.termFormats = formats; @@ -278,15 +275,13 @@ public InternalMultiTerms create(List buckets) { name, reduceOrder, order, - requiredSize, - minDocCount, metadata, - shardSize, showTermDocCountError, otherDocCount, docCountError, termFormats, - buckets + buckets, + bucketCountThresholds ); } @@ -357,15 +352,13 @@ protected InternalMultiTerms create( name, reduceOrder, order, - requiredSize, - minDocCount, metadata, - shardSize, showTermDocCountError, otherDocCount, docCountError, termFormats, - buckets + buckets, + bucketCountThresholds ); } diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/InternalSignificantTerms.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/InternalSignificantTerms.java index 84d148199a7f9..8eb3919f4b88d 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/InternalSignificantTerms.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/InternalSignificantTerms.java @@ -39,6 +39,7 @@ import org.opensearch.search.aggregations.InternalAggregation; import org.opensearch.search.aggregations.InternalAggregations; import org.opensearch.search.aggregations.InternalMultiBucketAggregation; +import org.opensearch.search.aggregations.bucket.BucketUtils; import org.opensearch.search.aggregations.bucket.terms.heuristic.SignificanceHeuristic; import java.io.IOException; @@ -195,11 +196,21 @@ public final XContentBuilder toXContent(XContentBuilder builder, Params params) protected final int requiredSize; protected final long minDocCount; - - protected InternalSignificantTerms(String name, int requiredSize, long minDocCount, Map metadata) { + protected int shardSize; + protected final long shardMinDocCount; + protected final TermsAggregator.BucketCountThresholds bucketCountThresholds; + + protected InternalSignificantTerms( + String name, + TermsAggregator.BucketCountThresholds bucketCountThresholds, + Map metadata + ) { super(name, metadata); - this.requiredSize = requiredSize; - this.minDocCount = minDocCount; + this.requiredSize = bucketCountThresholds.getRequiredSize(); + this.minDocCount = bucketCountThresholds.getMinDocCount(); + this.shardSize = bucketCountThresholds.getShardSize(); + this.shardMinDocCount = bucketCountThresholds.getShardMinDocCount(); + this.bucketCountThresholds = bucketCountThresholds; } /** @@ -209,6 +220,9 @@ protected InternalSignificantTerms(StreamInput in) throws IOException { super(in); requiredSize = readSize(in); minDocCount = in.readVLong(); + shardSize = BucketUtils.suggestShardSideQueueSize(requiredSize); + shardMinDocCount = 0; + bucketCountThresholds = new TermsAggregator.BucketCountThresholds(minDocCount, shardMinDocCount, requiredSize, shardSize); } protected final void doWriteTo(StreamOutput out) throws IOException { @@ -224,6 +238,16 @@ protected final void doWriteTo(StreamOutput out) throws IOException { @Override public InternalAggregation reduce(List aggregations, ReduceContext reduceContext) { + int requiredSizeLocal; + long minDocCountLocal; + if (reduceContext.isSliceLevel()) { + requiredSizeLocal = bucketCountThresholds.getShardSize(); + minDocCountLocal = bucketCountThresholds.getShardMinDocCount(); + } else { + requiredSizeLocal = bucketCountThresholds.getRequiredSize(); + minDocCountLocal = bucketCountThresholds.getMinDocCount(); + } + long globalSubsetSize = 0; long globalSupersetSize = 0; // Compute the overall result set size and the corpus size using the @@ -265,13 +289,17 @@ public InternalAggregation reduce(List aggregations, Reduce } } SignificanceHeuristic heuristic = getSignificanceHeuristic().rewrite(reduceContext); - final int size = reduceContext.isFinalReduce() == false ? buckets.size() : Math.min(requiredSize, buckets.size()); + final int size = (reduceContext.isFinalReduce() || reduceContext.isSliceLevel()) + ? Math.min(requiredSizeLocal, buckets.size()) + : buckets.size(); BucketSignificancePriorityQueue ordered = new BucketSignificancePriorityQueue<>(size); for (Map.Entry> entry : buckets.entrySet()) { List sameTermBuckets = entry.getValue(); final B b = reduceBucket(sameTermBuckets, reduceContext); b.updateScore(heuristic); - if (((b.score > 0) && (b.subsetDf >= minDocCount)) || reduceContext.isFinalReduce() == false) { + if (!(reduceContext.isFinalReduce() || reduceContext.isSliceLevel()) // Don't apply thresholds for partial reduce + || (reduceContext.isSliceLevel() && (b.subsetDf >= minDocCountLocal)) // Score needs to be evaluated only at the coordinator + || ((b.score > 0) && (b.subsetDf >= minDocCountLocal))) { B removed = ordered.insertWithOverflow(b); if (removed == null) { reduceContext.consumeBucketsAndMaybeBreak(1); diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/InternalTerms.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/InternalTerms.java index 9a80155eea51c..7489822371011 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/InternalTerms.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/InternalTerms.java @@ -45,6 +45,7 @@ import org.opensearch.search.aggregations.InternalMultiBucketAggregation; import org.opensearch.search.aggregations.InternalOrder; import org.opensearch.search.aggregations.KeyComparable; +import org.opensearch.search.aggregations.bucket.BucketUtils; import org.opensearch.search.aggregations.bucket.IteratorAndCurrent; import org.opensearch.search.aggregations.bucket.MultiBucketsAggregation; @@ -57,6 +58,7 @@ import java.util.Map; import java.util.Objects; import java.util.function.Function; +import java.util.stream.Collectors; import static org.opensearch.search.aggregations.InternalOrder.isKeyAsc; import static org.opensearch.search.aggregations.InternalOrder.isKeyOrder; @@ -223,29 +225,33 @@ public int hashCode() { protected final BucketOrder order; protected final int requiredSize; protected final long minDocCount; + protected int shardSize; + protected final long shardMinDocCount; + protected final TermsAggregator.BucketCountThresholds bucketCountThresholds; /** * Creates a new {@link InternalTerms} * @param name The name of the aggregation * @param reduceOrder The {@link BucketOrder} that should be used to merge shard results. * @param order The {@link BucketOrder} that should be used to sort the final reduce. - * @param requiredSize The number of top buckets. - * @param minDocCount The minimum number of documents allowed per bucket. + * @param bucketCountThresholds Object containing values for minDocCount, shardMinDocCount, size, shardSize. * @param metadata The metadata associated with the aggregation. */ protected InternalTerms( String name, BucketOrder reduceOrder, BucketOrder order, - int requiredSize, - long minDocCount, + TermsAggregator.BucketCountThresholds bucketCountThresholds, Map metadata ) { super(name, metadata); this.reduceOrder = reduceOrder; this.order = order; - this.requiredSize = requiredSize; - this.minDocCount = minDocCount; + this.bucketCountThresholds = bucketCountThresholds; + this.requiredSize = bucketCountThresholds.getRequiredSize(); + this.minDocCount = bucketCountThresholds.getMinDocCount(); + this.shardSize = bucketCountThresholds.getShardSize(); + this.shardMinDocCount = bucketCountThresholds.getShardMinDocCount(); } /** @@ -257,6 +263,9 @@ protected InternalTerms(StreamInput in) throws IOException { order = InternalOrder.Streams.readOrder(in); requiredSize = readSize(in); minDocCount = in.readVLong(); + shardSize = BucketUtils.suggestShardSideQueueSize(requiredSize); + shardMinDocCount = 0; + bucketCountThresholds = new TermsAggregator.BucketCountThresholds(minDocCount, shardMinDocCount, requiredSize, shardSize); } @Override @@ -385,6 +394,16 @@ private List reduceLegacy(List aggregations, ReduceConte } public InternalAggregation reduce(List aggregations, ReduceContext reduceContext) { + int requiredSizeLocal; + long minDocCountLocal; + if (reduceContext.isSliceLevel()) { + requiredSizeLocal = bucketCountThresholds.getShardSize(); + minDocCountLocal = bucketCountThresholds.getShardMinDocCount(); + } else { + requiredSizeLocal = bucketCountThresholds.getRequiredSize(); + minDocCountLocal = bucketCountThresholds.getMinDocCount(); + } + long sumDocCountError = 0; long otherDocCount = 0; InternalTerms referenceTerms = null; @@ -444,8 +463,8 @@ public InternalAggregation reduce(List aggregations, Reduce reducedBuckets = reduceLegacy(aggregations, reduceContext); } final B[] list; - if (reduceContext.isFinalReduce()) { - final int size = Math.min(requiredSize, reducedBuckets.size()); + if (reduceContext.isFinalReduce() || reduceContext.isSliceLevel()) { + final int size = Math.min(requiredSizeLocal, reducedBuckets.size()); // final comparator final BucketPriorityQueue ordered = new BucketPriorityQueue<>(size, order.comparator()); for (B bucket : reducedBuckets) { @@ -455,7 +474,7 @@ public InternalAggregation reduce(List aggregations, Reduce final long finalSumDocCountError = sumDocCountError; bucket.setDocCountError(docCountError -> docCountError + finalSumDocCountError); } - if (bucket.getDocCount() >= minDocCount) { + if (bucket.getDocCount() >= minDocCountLocal) { B removed = ordered.insertWithOverflow(bucket); if (removed != null) { otherDocCount += removed.getDocCount(); @@ -474,7 +493,9 @@ public InternalAggregation reduce(List aggregations, Reduce } else { // we can prune the list on partial reduce if the aggregation is ordered by key // and not filtered (minDocCount == 0) - int size = isKeyOrder(order) && minDocCount == 0 ? Math.min(requiredSize, reducedBuckets.size()) : reducedBuckets.size(); + int size = isKeyOrder(order) && minDocCountLocal == 0 + ? Math.min(requiredSizeLocal, reducedBuckets.size()) + : reducedBuckets.size(); list = createBucketsArray(size); for (int i = 0; i < size; i++) { reduceContext.consumeBucketsAndMaybeBreak(1); @@ -493,7 +514,15 @@ public InternalAggregation reduce(List aggregations, Reduce } else { docCountError = aggregations.size() == 1 ? 0 : sumDocCountError; } - return create(name, Arrays.asList(list), reduceContext.isFinalReduce() ? order : thisReduceOrder, docCountError, otherDocCount); + + // Shards must return buckets sorted by key, so we apply the sort here + List resultList; + if (reduceContext.isSliceLevel()) { + resultList = Arrays.stream(list).sorted(thisReduceOrder.comparator()).collect(Collectors.toList()); + } else { + resultList = Arrays.asList(list); + } + return create(name, resultList, reduceContext.isFinalReduce() ? order : thisReduceOrder, docCountError, otherDocCount); } @Override diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/LongTerms.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/LongTerms.java index 67aa80d0a9879..4ddc07830a319 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/LongTerms.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/LongTerms.java @@ -142,29 +142,25 @@ public LongTerms( String name, BucketOrder reduceOrder, BucketOrder order, - int requiredSize, - long minDocCount, Map metadata, DocValueFormat format, - int shardSize, boolean showTermDocCountError, long otherDocCount, List buckets, - long docCountError + long docCountError, + TermsAggregator.BucketCountThresholds bucketCountThresholds ) { super( name, reduceOrder, order, - requiredSize, - minDocCount, metadata, format, - shardSize, showTermDocCountError, otherDocCount, buckets, - docCountError + docCountError, + bucketCountThresholds ); } @@ -186,15 +182,13 @@ public LongTerms create(List buckets) { name, reduceOrder, order, - requiredSize, - minDocCount, metadata, format, - shardSize, showTermDocCountError, otherDocCount, buckets, - docCountError + docCountError, + bucketCountThresholds ); } @@ -216,15 +210,13 @@ protected LongTerms create(String name, List buckets, BucketOrder reduce name, reduceOrder, order, - requiredSize, - minDocCount, getMetadata(), format, - shardSize, showTermDocCountError, otherDocCount, buckets, - docCountError + docCountError, + bucketCountThresholds ); } @@ -293,15 +285,13 @@ static DoubleTerms convertLongTermsToDouble(LongTerms longTerms, DocValueFormat longTerms.getName(), longTerms.reduceOrder, longTerms.order, - longTerms.requiredSize, - longTerms.minDocCount, longTerms.metadata, longTerms.format, - longTerms.shardSize, longTerms.showTermDocCountError, longTerms.otherDocCount, newBuckets, - longTerms.docCountError + longTerms.docCountError, + longTerms.bucketCountThresholds ); } } diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/MapStringTermsAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/MapStringTermsAggregator.java index bcdf1f4480a31..a4269a216050b 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/MapStringTermsAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/MapStringTermsAggregator.java @@ -244,11 +244,20 @@ abstract class ResultStrategy ordered = buildPriorityQueue(size); B spare = null; @@ -257,7 +266,7 @@ private InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws while (ordsEnum.next()) { long docCount = bucketDocCount(ordsEnum.ord()); otherDocCounts[ordIdx] += docCount; - if (docCount < bucketCountThresholds.getShardMinDocCount()) { + if (docCount < minDocCountLocal) { continue; } if (spare == null) { @@ -454,15 +463,13 @@ StringTerms buildResult(long owningBucketOrd, long otherDocCount, StringTerms.Bu name, reduceOrder, order, - bucketCountThresholds.getRequiredSize(), - bucketCountThresholds.getMinDocCount(), metadata(), format, - bucketCountThresholds.getShardSize(), showTermDocCountError, otherDocCount, Arrays.asList(topBuckets), - 0 + 0, + bucketCountThresholds ); } @@ -572,14 +579,13 @@ void buildSubAggs(SignificantStringTerms.Bucket[][] topBucketsPerOrd) throws IOE SignificantStringTerms buildResult(long owningBucketOrd, long otherDocCount, SignificantStringTerms.Bucket[] topBuckets) { return new SignificantStringTerms( name, - bucketCountThresholds.getRequiredSize(), - bucketCountThresholds.getMinDocCount(), metadata(), format, subsetSizes.get(owningBucketOrd), supersetSize, significanceHeuristic, - Arrays.asList(topBuckets) + Arrays.asList(topBuckets), + bucketCountThresholds ); } diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/MultiTermsAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/MultiTermsAggregator.java index 9d99c0b90a075..499e1743856dd 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/MultiTermsAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/MultiTermsAggregator.java @@ -118,13 +118,22 @@ public MultiTermsAggregator( @Override public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException { + int requiredSizeLocal; + long minDocCountLocal; + if (context.isConcurrentSegmentSearchEnabled()) { + requiredSizeLocal = Integer.MAX_VALUE; + minDocCountLocal = 0; + } else { + requiredSizeLocal = bucketCountThresholds.getShardSize(); + minDocCountLocal = bucketCountThresholds.getShardMinDocCount(); + } InternalMultiTerms.Bucket[][] topBucketsPerOrd = new InternalMultiTerms.Bucket[owningBucketOrds.length][]; long[] otherDocCounts = new long[owningBucketOrds.length]; for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) { collectZeroDocEntriesIfNeeded(owningBucketOrds[ordIdx]); long bucketsInOrd = bucketOrds.bucketsInOrd(owningBucketOrds[ordIdx]); - int size = (int) Math.min(bucketsInOrd, bucketCountThresholds.getShardSize()); + int size = (int) Math.min(bucketsInOrd, requiredSizeLocal); PriorityQueue ordered = new BucketPriorityQueue<>(size, partiallyBuiltBucketComparator); InternalMultiTerms.Bucket spare = null; BytesRef dest = null; @@ -136,7 +145,7 @@ public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws I while (ordsEnum.next()) { long docCount = bucketDocCount(ordsEnum.ord()); otherDocCounts[ordIdx] += docCount; - if (docCount < bucketCountThresholds.getShardMinDocCount()) { + if (docCount < minDocCountLocal) { continue; } if (spare == null) { @@ -182,15 +191,13 @@ InternalMultiTerms buildResult(long owningBucketOrd, long otherDocCount, Interna name, reduceOrder, order, - bucketCountThresholds.getRequiredSize(), - bucketCountThresholds.getMinDocCount(), metadata(), - bucketCountThresholds.getShardSize(), showTermDocCountError, otherDocCount, 0, formats, - List.of(topBuckets) + List.of(topBuckets), + bucketCountThresholds ); } @@ -200,15 +207,13 @@ public InternalAggregation buildEmptyAggregation() { name, order, order, - bucketCountThresholds.getRequiredSize(), - bucketCountThresholds.getMinDocCount(), metadata(), - bucketCountThresholds.getShardSize(), showTermDocCountError, 0, 0, formats, - Collections.emptyList() + Collections.emptyList(), + bucketCountThresholds ); } diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/NumericTermsAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/NumericTermsAggregator.java index a0265135fe9d3..53e1732e729fd 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/NumericTermsAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/NumericTermsAggregator.java @@ -173,13 +173,22 @@ abstract class ResultStrategy ordered = buildPriorityQueue(size); B spare = null; BucketOrdsEnum ordsEnum = bucketOrds.ordsEnum(owningBucketOrds[ordIdx]); @@ -187,7 +196,7 @@ private InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws while (ordsEnum.next()) { long docCount = bucketDocCount(ordsEnum.ord()); otherDocCounts[ordIdx] += docCount; - if (docCount < bucketCountThresholds.getShardMinDocCount()) { + if (docCount < minDocCountLocal) { continue; } if (spare == null) { @@ -395,34 +404,19 @@ LongTerms buildResult(long owningBucketOrd, long otherDocCount, LongTerms.Bucket name, reduceOrder, order, - bucketCountThresholds.getRequiredSize(), - bucketCountThresholds.getMinDocCount(), metadata(), format, - bucketCountThresholds.getShardSize(), showTermDocCountError, otherDocCount, List.of(topBuckets), - 0 + 0, + bucketCountThresholds ); } @Override LongTerms buildEmptyResult() { - return new LongTerms( - name, - order, - order, - bucketCountThresholds.getRequiredSize(), - bucketCountThresholds.getMinDocCount(), - metadata(), - format, - bucketCountThresholds.getShardSize(), - showTermDocCountError, - 0, - emptyList(), - 0 - ); + return new LongTerms(name, order, order, metadata(), format, showTermDocCountError, 0, emptyList(), 0, bucketCountThresholds); } } @@ -477,34 +471,19 @@ DoubleTerms buildResult(long owningBucketOrd, long otherDocCount, DoubleTerms.Bu name, reduceOrder, order, - bucketCountThresholds.getRequiredSize(), - bucketCountThresholds.getMinDocCount(), metadata(), format, - bucketCountThresholds.getShardSize(), showTermDocCountError, otherDocCount, List.of(topBuckets), - 0 + 0, + bucketCountThresholds ); } @Override DoubleTerms buildEmptyResult() { - return new DoubleTerms( - name, - order, - order, - bucketCountThresholds.getRequiredSize(), - bucketCountThresholds.getMinDocCount(), - metadata(), - format, - bucketCountThresholds.getShardSize(), - showTermDocCountError, - 0, - emptyList(), - 0 - ); + return new DoubleTerms(name, order, order, metadata(), format, showTermDocCountError, 0, emptyList(), 0, bucketCountThresholds); } } @@ -558,15 +537,13 @@ UnsignedLongTerms buildResult(long owningBucketOrd, long otherDocCount, Unsigned name, reduceOrder, order, - bucketCountThresholds.getRequiredSize(), - bucketCountThresholds.getMinDocCount(), metadata(), format, - bucketCountThresholds.getShardSize(), showTermDocCountError, otherDocCount, List.of(topBuckets), - 0 + 0, + bucketCountThresholds ); } @@ -576,15 +553,13 @@ UnsignedLongTerms buildEmptyResult() { name, order, order, - bucketCountThresholds.getRequiredSize(), - bucketCountThresholds.getMinDocCount(), metadata(), format, - bucketCountThresholds.getShardSize(), showTermDocCountError, 0, emptyList(), - 0 + 0, + bucketCountThresholds ); } } @@ -670,17 +645,17 @@ void collectZeroDocEntriesIfNeeded(long owningBucketOrd) throws IOException {} @Override SignificantLongTerms buildResult(long owningBucketOrd, long otherDocCoun, SignificantLongTerms.Bucket[] topBuckets) { - return new SignificantLongTerms( + SignificantLongTerms significantLongTerms = new SignificantLongTerms( name, - bucketCountThresholds.getRequiredSize(), - bucketCountThresholds.getMinDocCount(), metadata(), format, subsetSizes.get(owningBucketOrd), supersetSize, significanceHeuristic, - List.of(topBuckets) + List.of(topBuckets), + bucketCountThresholds ); + return significantLongTerms; } @Override @@ -691,14 +666,13 @@ SignificantLongTerms buildEmptyResult() { int supersetSize = topReader.numDocs(); return new SignificantLongTerms( name, - bucketCountThresholds.getRequiredSize(), - bucketCountThresholds.getMinDocCount(), metadata(), format, 0, supersetSize, significanceHeuristic, - emptyList() + emptyList(), + bucketCountThresholds ); } diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/SignificantLongTerms.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/SignificantLongTerms.java index 46e8cea7abc36..3da5a766fc37b 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/SignificantLongTerms.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/SignificantLongTerms.java @@ -130,16 +130,15 @@ public int hashCode() { public SignificantLongTerms( String name, - int requiredSize, - long minDocCount, Map metadata, DocValueFormat format, long subsetSize, long supersetSize, SignificanceHeuristic significanceHeuristic, - List buckets + List buckets, + TermsAggregator.BucketCountThresholds bucketCountThresholds ) { - super(name, requiredSize, minDocCount, metadata, format, subsetSize, supersetSize, significanceHeuristic, buckets); + super(name, metadata, format, subsetSize, supersetSize, significanceHeuristic, buckets, bucketCountThresholds); } /** @@ -158,14 +157,13 @@ public String getWriteableName() { public SignificantLongTerms create(List buckets) { return new SignificantLongTerms( name, - requiredSize, - minDocCount, metadata, format, subsetSize, supersetSize, significanceHeuristic, - buckets + buckets, + bucketCountThresholds ); } @@ -187,14 +185,13 @@ public Bucket createBucket(InternalAggregations aggregations, SignificantLongTer protected SignificantLongTerms create(long subsetSize, long supersetSize, List buckets) { return new SignificantLongTerms( getName(), - requiredSize, - minDocCount, getMetadata(), format, subsetSize, supersetSize, significanceHeuristic, - buckets + buckets, + bucketCountThresholds ); } diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/SignificantStringTerms.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/SignificantStringTerms.java index d8d93ad7ae159..c70db6005d7cd 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/SignificantStringTerms.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/SignificantStringTerms.java @@ -135,16 +135,15 @@ public int hashCode() { public SignificantStringTerms( String name, - int requiredSize, - long minDocCount, Map metadata, DocValueFormat format, long subsetSize, long supersetSize, SignificanceHeuristic significanceHeuristic, - List buckets + List buckets, + TermsAggregator.BucketCountThresholds bucketCountThresholds ) { - super(name, requiredSize, minDocCount, metadata, format, subsetSize, supersetSize, significanceHeuristic, buckets); + super(name, metadata, format, subsetSize, supersetSize, significanceHeuristic, buckets, bucketCountThresholds); } /** @@ -163,14 +162,13 @@ public String getWriteableName() { public SignificantStringTerms create(List buckets) { return new SignificantStringTerms( name, - requiredSize, - minDocCount, metadata, format, subsetSize, supersetSize, significanceHeuristic, - buckets + buckets, + bucketCountThresholds ); } @@ -192,14 +190,13 @@ public Bucket createBucket(InternalAggregations aggregations, SignificantStringT protected SignificantStringTerms create(long subsetSize, long supersetSize, List buckets) { return new SignificantStringTerms( getName(), - requiredSize, - minDocCount, getMetadata(), format, subsetSize, supersetSize, significanceHeuristic, - buckets + buckets, + bucketCountThresholds ); } diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/SignificantTermsAggregatorFactory.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/SignificantTermsAggregatorFactory.java index 1dacd4c7de4e8..9f5136dc1df53 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/SignificantTermsAggregatorFactory.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/SignificantTermsAggregatorFactory.java @@ -246,12 +246,7 @@ public Aggregator build( @Override protected Aggregator createUnmapped(SearchContext searchContext, Aggregator parent, Map metadata) throws IOException { - final InternalAggregation aggregation = new UnmappedSignificantTerms( - name, - bucketCountThresholds.getRequiredSize(), - bucketCountThresholds.getMinDocCount(), - metadata - ); + final InternalAggregation aggregation = new UnmappedSignificantTerms(name, bucketCountThresholds, metadata); return new NonCollectingAggregator(name, searchContext, parent, factories, metadata) { @Override public InternalAggregation buildEmptyAggregation() { diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/StringTerms.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/StringTerms.java index c985bf770d4a7..b4e7ef02e110c 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/StringTerms.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/StringTerms.java @@ -134,29 +134,25 @@ public StringTerms( String name, BucketOrder reduceOrder, BucketOrder order, - int requiredSize, - long minDocCount, Map metadata, DocValueFormat format, - int shardSize, boolean showTermDocCountError, long otherDocCount, List buckets, - long docCountError + long docCountError, + TermsAggregator.BucketCountThresholds bucketCountThresholds ) { super( name, reduceOrder, order, - requiredSize, - minDocCount, metadata, format, - shardSize, showTermDocCountError, otherDocCount, buckets, - docCountError + docCountError, + bucketCountThresholds ); } @@ -178,15 +174,13 @@ public StringTerms create(List buckets) { name, reduceOrder, order, - requiredSize, - minDocCount, metadata, format, - shardSize, showTermDocCountError, otherDocCount, buckets, - docCountError + docCountError, + bucketCountThresholds ); } @@ -213,15 +207,13 @@ protected StringTerms create(String name, List buckets, BucketOrder redu name, reduceOrder, order, - requiredSize, - minDocCount, getMetadata(), format, - shardSize, showTermDocCountError, otherDocCount, buckets, - docCountError + docCountError, + bucketCountThresholds ); } diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/TermsAggregatorFactory.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/TermsAggregatorFactory.java index 11be3da5c8991..62844b4499dba 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/TermsAggregatorFactory.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/TermsAggregatorFactory.java @@ -265,13 +265,7 @@ public Aggregator build( @Override protected Aggregator createUnmapped(SearchContext searchContext, Aggregator parent, Map metadata) throws IOException { - final InternalAggregation aggregation = new UnmappedTerms( - name, - order, - bucketCountThresholds.getRequiredSize(), - bucketCountThresholds.getMinDocCount(), - metadata - ); + final InternalAggregation aggregation = new UnmappedTerms(name, order, bucketCountThresholds, metadata); Aggregator agg = new NonCollectingAggregator(name, searchContext, parent, factories, metadata) { @Override public InternalAggregation buildEmptyAggregation() { diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/UnmappedSignificantTerms.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/UnmappedSignificantTerms.java index 9384f9e793d81..2c8aa8f0a0c37 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/UnmappedSignificantTerms.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/UnmappedSignificantTerms.java @@ -77,8 +77,12 @@ private Bucket( } } - public UnmappedSignificantTerms(String name, int requiredSize, long minDocCount, Map metadata) { - super(name, requiredSize, minDocCount, metadata); + public UnmappedSignificantTerms( + String name, + TermsAggregator.BucketCountThresholds bucketCountThresholds, + Map metadata + ) { + super(name, bucketCountThresholds, metadata); } /** @@ -105,7 +109,7 @@ public String getType() { @Override public UnmappedSignificantTerms create(List buckets) { - return new UnmappedSignificantTerms(name, requiredSize, minDocCount, metadata); + return new UnmappedSignificantTerms(name, bucketCountThresholds, metadata); } @Override @@ -132,7 +136,7 @@ Bucket createBucket( @Override public InternalAggregation reduce(List aggregations, ReduceContext reduceContext) { - return new UnmappedSignificantTerms(name, requiredSize, minDocCount, metadata); + return new UnmappedSignificantTerms(name, bucketCountThresholds, metadata); } @Override diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/UnmappedTerms.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/UnmappedTerms.java index 01902f9449bae..3d2bbb93c889a 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/UnmappedTerms.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/UnmappedTerms.java @@ -72,8 +72,13 @@ private Bucket( } } - public UnmappedTerms(String name, BucketOrder order, int requiredSize, long minDocCount, Map metadata) { - super(name, order, order, requiredSize, minDocCount, metadata); + public UnmappedTerms( + String name, + BucketOrder order, + TermsAggregator.BucketCountThresholds bucketCountThresholds, + Map metadata + ) { + super(name, order, order, bucketCountThresholds, metadata); } /** @@ -100,7 +105,7 @@ public String getType() { @Override public UnmappedTerms create(List buckets) { - return new UnmappedTerms(name, order, requiredSize, minDocCount, metadata); + return new UnmappedTerms(name, order, bucketCountThresholds, metadata); } @Override @@ -120,7 +125,7 @@ protected UnmappedTerms create(String name, List buckets, BucketOrder re @Override public InternalAggregation reduce(List aggregations, ReduceContext reduceContext) { - return new UnmappedTerms(name, order, requiredSize, minDocCount, metadata); + return new UnmappedTerms(name, order, bucketCountThresholds, metadata); } @Override diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/UnsignedLongTerms.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/UnsignedLongTerms.java index db05ac84b4aec..3eb8e6693ba79 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/UnsignedLongTerms.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/UnsignedLongTerms.java @@ -121,29 +121,25 @@ public UnsignedLongTerms( String name, BucketOrder reduceOrder, BucketOrder order, - int requiredSize, - long minDocCount, Map metadata, DocValueFormat format, - int shardSize, boolean showTermDocCountError, long otherDocCount, List buckets, - long docCountError + long docCountError, + TermsAggregator.BucketCountThresholds bucketCountThresholds ) { super( name, reduceOrder, order, - requiredSize, - minDocCount, metadata, format, - shardSize, showTermDocCountError, otherDocCount, buckets, - docCountError + docCountError, + bucketCountThresholds ); } @@ -165,15 +161,13 @@ public UnsignedLongTerms create(List buckets) { name, reduceOrder, order, - requiredSize, - minDocCount, metadata, format, - shardSize, showTermDocCountError, otherDocCount, buckets, - docCountError + docCountError, + bucketCountThresholds ); } @@ -195,15 +189,13 @@ protected UnsignedLongTerms create(String name, List buckets, BucketOrde name, reduceOrder, order, - requiredSize, - minDocCount, getMetadata(), format, - shardSize, showTermDocCountError, otherDocCount, buckets, - docCountError + docCountError, + bucketCountThresholds ); } @@ -272,15 +264,13 @@ static DoubleTerms convertUnsignedLongTermsToDouble(UnsignedLongTerms unsignedLo unsignedLongTerms.getName(), unsignedLongTerms.reduceOrder, unsignedLongTerms.order, - unsignedLongTerms.requiredSize, - unsignedLongTerms.minDocCount, unsignedLongTerms.metadata, unsignedLongTerms.format, - unsignedLongTerms.shardSize, unsignedLongTerms.showTermDocCountError, unsignedLongTerms.otherDocCount, newBuckets, - unsignedLongTerms.docCountError + unsignedLongTerms.docCountError, + unsignedLongTerms.bucketCountThresholds ); } } diff --git a/server/src/test/java/org/opensearch/search/aggregations/InternalAggregationsTests.java b/server/src/test/java/org/opensearch/search/aggregations/InternalAggregationsTests.java index 354c635e06fab..0a3e92267efa4 100644 --- a/server/src/test/java/org/opensearch/search/aggregations/InternalAggregationsTests.java +++ b/server/src/test/java/org/opensearch/search/aggregations/InternalAggregationsTests.java @@ -43,6 +43,7 @@ import org.opensearch.search.aggregations.bucket.histogram.InternalDateHistogramTests; import org.opensearch.search.aggregations.bucket.terms.StringTerms; import org.opensearch.search.aggregations.bucket.terms.StringTermsTests; +import org.opensearch.search.aggregations.bucket.terms.TermsAggregator; import org.opensearch.search.aggregations.pipeline.InternalSimpleValueTests; import org.opensearch.search.aggregations.pipeline.MaxBucketPipelineAggregationBuilder; import org.opensearch.search.aggregations.pipeline.PipelineAggregator; @@ -76,15 +77,13 @@ public void testNonFinalReduceTopLevelPipelineAggs() { "name", BucketOrder.key(true), BucketOrder.key(true), - 10, - 1, Collections.emptyMap(), DocValueFormat.RAW, - 25, false, 10, Collections.emptyList(), - 0 + 0, + new TermsAggregator.BucketCountThresholds(1, 0, 10, 25) ); List aggs = singletonList(InternalAggregations.from(Collections.singletonList(terms))); InternalAggregations reducedAggs = InternalAggregations.topLevelReduce(aggs, maxBucketReduceContext().forPartialReduction()); @@ -96,15 +95,13 @@ public void testFinalReduceTopLevelPipelineAggs() { "name", BucketOrder.key(true), BucketOrder.key(true), - 10, - 1, Collections.emptyMap(), DocValueFormat.RAW, - 25, false, 10, Collections.emptyList(), - 0 + 0, + new TermsAggregator.BucketCountThresholds(1, 0, 10, 25) ); InternalAggregations aggs = InternalAggregations.from(Collections.singletonList(terms)); diff --git a/server/src/test/java/org/opensearch/search/aggregations/InternalMultiBucketAggregationTests.java b/server/src/test/java/org/opensearch/search/aggregations/InternalMultiBucketAggregationTests.java index bc48b546e6d28..cd900972ff23b 100644 --- a/server/src/test/java/org/opensearch/search/aggregations/InternalMultiBucketAggregationTests.java +++ b/server/src/test/java/org/opensearch/search/aggregations/InternalMultiBucketAggregationTests.java @@ -37,6 +37,7 @@ import org.opensearch.search.aggregations.bucket.terms.InternalTerms; import org.opensearch.search.aggregations.bucket.terms.LongTerms; import org.opensearch.search.aggregations.bucket.terms.StringTerms; +import org.opensearch.search.aggregations.bucket.terms.TermsAggregator; import org.opensearch.search.aggregations.metrics.InternalAvg; import org.opensearch.search.aggregations.support.AggregationPath; import org.opensearch.test.OpenSearchTestCase; @@ -164,20 +165,17 @@ public void testResolveToSpecificBucket() { DocValueFormat.RAW ) ); - InternalTerms termsAgg = new StringTerms( "string_terms", BucketOrder.count(false), BucketOrder.count(false), - 1, - 0, Collections.emptyMap(), DocValueFormat.RAW, - 1, false, 0, stringBuckets, - 0 + 0, + new TermsAggregator.BucketCountThresholds(0, 0, 1, 1) ); InternalAggregations internalAggregations = InternalAggregations.from(Collections.singletonList(termsAgg)); LongTerms.Bucket bucket = new LongTerms.Bucket(19, 1, internalAggregations, false, 0, DocValueFormat.RAW); @@ -208,15 +206,13 @@ public void testResolveToMissingSpecificBucket() { "string_terms", BucketOrder.count(false), BucketOrder.count(false), - 1, - 0, Collections.emptyMap(), DocValueFormat.RAW, - 1, false, 0, stringBuckets, - 0 + 0, + new TermsAggregator.BucketCountThresholds(0, 0, 1, 1) ); InternalAggregations internalAggregations = InternalAggregations.from(Collections.singletonList(termsAgg)); LongTerms.Bucket bucket = new LongTerms.Bucket(19, 1, internalAggregations, false, 0, DocValueFormat.RAW); diff --git a/server/src/test/java/org/opensearch/search/aggregations/bucket/terms/DoubleTermsTests.java b/server/src/test/java/org/opensearch/search/aggregations/bucket/terms/DoubleTermsTests.java index 853d56202c360..0ac3214531d5d 100644 --- a/server/src/test/java/org/opensearch/search/aggregations/bucket/terms/DoubleTermsTests.java +++ b/server/src/test/java/org/opensearch/search/aggregations/bucket/terms/DoubleTermsTests.java @@ -59,6 +59,12 @@ public class DoubleTermsTests extends InternalTermsTestCase { long minDocCount = 1; int requiredSize = 3; int shardSize = requiredSize + 2; + TermsAggregator.BucketCountThresholds bucketCountThresholds = new TermsAggregator.BucketCountThresholds( + minDocCount, + 0, + requiredSize, + shardSize + ); DocValueFormat format = randomNumericDocValueFormat(); long otherDocCount = 0; List buckets = new ArrayList<>(); @@ -75,15 +81,13 @@ public class DoubleTermsTests extends InternalTermsTestCase { name, reduceOrder, order, - requiredSize, - minDocCount, metadata, format, - shardSize, showTermDocCountError, otherDocCount, buckets, - docCountError + docCountError, + bucketCountThresholds ); } @@ -158,15 +162,13 @@ protected Class implementationClass() { name, doubleTerms.reduceOrder, order, - requiredSize, - minDocCount, metadata, format, - shardSize, showTermDocCountError, otherDocCount, buckets, - docCountError + docCountError, + new TermsAggregator.BucketCountThresholds(minDocCount, 0, requiredSize, shardSize) ); } else { String name = instance.getName(); @@ -195,7 +197,7 @@ protected Class implementationClass() { default: throw new AssertionError("Illegal randomisation branch"); } - return new UnmappedTerms(name, order, requiredSize, minDocCount, metadata); + return new UnmappedTerms(name, order, new TermsAggregator.BucketCountThresholds(minDocCount, 0, requiredSize, 0), metadata); } } diff --git a/server/src/test/java/org/opensearch/search/aggregations/bucket/terms/InternalMultiTermsTests.java b/server/src/test/java/org/opensearch/search/aggregations/bucket/terms/InternalMultiTermsTests.java index 2657f2bdd5138..61416326e6176 100644 --- a/server/src/test/java/org/opensearch/search/aggregations/bucket/terms/InternalMultiTermsTests.java +++ b/server/src/test/java/org/opensearch/search/aggregations/bucket/terms/InternalMultiTermsTests.java @@ -45,6 +45,12 @@ public class InternalMultiTermsTests extends InternalTermsTestCase { int requiredSize = 3; int shardSize = requiredSize + 2; long otherDocCount = 0; + TermsAggregator.BucketCountThresholds bucketCountThresholds = new TermsAggregator.BucketCountThresholds( + minDocCount, + 0, + requiredSize, + shardSize + ); final int numBuckets = randomNumberOfBuckets(); @@ -70,15 +76,13 @@ public class InternalMultiTermsTests extends InternalTermsTestCase { name, reduceOrder, order, - requiredSize, - minDocCount, metadata, - shardSize, showTermDocCountError, otherDocCount, docCountError, formats, - buckets + buckets, + bucketCountThresholds ); } diff --git a/server/src/test/java/org/opensearch/search/aggregations/bucket/terms/InternalSignificantTermsTestCase.java b/server/src/test/java/org/opensearch/search/aggregations/bucket/terms/InternalSignificantTermsTestCase.java index aab9e91576b18..e640aa92ac782 100644 --- a/server/src/test/java/org/opensearch/search/aggregations/bucket/terms/InternalSignificantTermsTestCase.java +++ b/server/src/test/java/org/opensearch/search/aggregations/bucket/terms/InternalSignificantTermsTestCase.java @@ -114,7 +114,7 @@ protected abstract InternalSignificantTerms createTestInstance( @Override protected InternalSignificantTerms createUnmappedInstance(String name, Map metadata) { InternalSignificantTerms testInstance = createTestInstance(name, metadata); - return new UnmappedSignificantTerms(name, testInstance.requiredSize, testInstance.minDocCount, metadata); + return new UnmappedSignificantTerms(name, testInstance.bucketCountThresholds, metadata); } @Override diff --git a/server/src/test/java/org/opensearch/search/aggregations/bucket/terms/InternalTermsTestCase.java b/server/src/test/java/org/opensearch/search/aggregations/bucket/terms/InternalTermsTestCase.java index d3f7c62021243..2e00248a70771 100644 --- a/server/src/test/java/org/opensearch/search/aggregations/bucket/terms/InternalTermsTestCase.java +++ b/server/src/test/java/org/opensearch/search/aggregations/bucket/terms/InternalTermsTestCase.java @@ -73,7 +73,7 @@ public void init() { @Override protected InternalTerms createUnmappedInstance(String name, Map metadata) { InternalTerms testInstance = createTestInstance(name, metadata); - return new UnmappedTerms(name, testInstance.order, testInstance.requiredSize, testInstance.minDocCount, metadata); + return new UnmappedTerms(name, testInstance.order, testInstance.bucketCountThresholds, metadata); } @Override diff --git a/server/src/test/java/org/opensearch/search/aggregations/bucket/terms/LongTermsTests.java b/server/src/test/java/org/opensearch/search/aggregations/bucket/terms/LongTermsTests.java index becf89c4603f3..aade7e6d4a388 100644 --- a/server/src/test/java/org/opensearch/search/aggregations/bucket/terms/LongTermsTests.java +++ b/server/src/test/java/org/opensearch/search/aggregations/bucket/terms/LongTermsTests.java @@ -59,6 +59,12 @@ public class LongTermsTests extends InternalTermsTestCase { long minDocCount = 1; int requiredSize = 3; int shardSize = requiredSize + 2; + TermsAggregator.BucketCountThresholds bucketCountThresholds = new TermsAggregator.BucketCountThresholds( + minDocCount, + 0, + requiredSize, + shardSize + ); DocValueFormat format = randomNumericDocValueFormat(); long otherDocCount = 0; List buckets = new ArrayList<>(); @@ -75,15 +81,13 @@ public class LongTermsTests extends InternalTermsTestCase { name, reduceOrder, order, - requiredSize, - minDocCount, metadata, format, - shardSize, showTermDocCountError, otherDocCount, buckets, - docCountError + docCountError, + bucketCountThresholds ); } @@ -158,15 +162,13 @@ protected Class implementationClass() { name, longTerms.reduceOrder, order, - requiredSize, - minDocCount, metadata, format, - shardSize, showTermDocCountError, otherDocCount, buckets, - docCountError + docCountError, + new TermsAggregator.BucketCountThresholds(minDocCount, 0, requiredSize, shardSize) ); } else { String name = instance.getName(); @@ -195,7 +197,7 @@ protected Class implementationClass() { default: throw new AssertionError("Illegal randomisation branch"); } - return new UnmappedTerms(name, order, requiredSize, minDocCount, metadata); + return new UnmappedTerms(name, order, new TermsAggregator.BucketCountThresholds(minDocCount, 0, requiredSize, 0), metadata); } } } diff --git a/server/src/test/java/org/opensearch/search/aggregations/bucket/terms/SignificanceHeuristicTests.java b/server/src/test/java/org/opensearch/search/aggregations/bucket/terms/SignificanceHeuristicTests.java index b400f5bccca01..24aeed1aeb635 100644 --- a/server/src/test/java/org/opensearch/search/aggregations/bucket/terms/SignificanceHeuristicTests.java +++ b/server/src/test/java/org/opensearch/search/aggregations/bucket/terms/SignificanceHeuristicTests.java @@ -125,7 +125,16 @@ public void testStreamResponse() throws Exception { DocValueFormat.RAW, randomDoubleBetween(0, 100, true) ); - return new SignificantLongTerms("some_name", 1, 1, null, DocValueFormat.RAW, 10, 20, heuristic, singletonList(bucket)); + return new SignificantLongTerms( + "some_name", + null, + DocValueFormat.RAW, + 10, + 20, + heuristic, + singletonList(bucket), + new TermsAggregator.BucketCountThresholds(1, 0, 1, 0) + ); } else { SignificantStringTerms.Bucket bucket = new SignificantStringTerms.Bucket( new BytesRef("someterm"), @@ -137,7 +146,16 @@ public void testStreamResponse() throws Exception { DocValueFormat.RAW, randomDoubleBetween(0, 100, true) ); - return new SignificantStringTerms("some_name", 1, 1, null, DocValueFormat.RAW, 10, 20, heuristic, singletonList(bucket)); + return new SignificantStringTerms( + "some_name", + null, + DocValueFormat.RAW, + 10, + 20, + heuristic, + singletonList(bucket), + new TermsAggregator.BucketCountThresholds(1, 0, 1, 0) + ); } } @@ -204,14 +222,13 @@ SignificantStringTerms createAggregation( ) { return new SignificantStringTerms( "sig_terms", - 2, - -1, emptyMap(), DocValueFormat.RAW, subsetSize, supersetSize, significanceHeuristic, - buckets + buckets, + new TermsAggregator.BucketCountThresholds(-1, 0, 2, 0) ); } @@ -240,14 +257,13 @@ SignificantLongTerms createAggregation( ) { return new SignificantLongTerms( "sig_terms", - 2, - -1, emptyMap(), DocValueFormat.RAW, subsetSize, supersetSize, significanceHeuristic, - buckets + buckets, + new TermsAggregator.BucketCountThresholds(-1, 0, 2, 0) ); } diff --git a/server/src/test/java/org/opensearch/search/aggregations/bucket/terms/SignificantLongTermsTests.java b/server/src/test/java/org/opensearch/search/aggregations/bucket/terms/SignificantLongTermsTests.java index 886e4d8267578..38b478efd004b 100644 --- a/server/src/test/java/org/opensearch/search/aggregations/bucket/terms/SignificantLongTermsTests.java +++ b/server/src/test/java/org/opensearch/search/aggregations/bucket/terms/SignificantLongTermsTests.java @@ -85,7 +85,17 @@ protected InternalSignificantTerms createTestInstance( bucket.updateScore(significanceHeuristic); buckets.add(bucket); } - return new SignificantLongTerms(name, requiredSize, 1L, metadata, format, subsetSize, supersetSize, significanceHeuristic, buckets); + + return new SignificantLongTerms( + name, + metadata, + format, + subsetSize, + supersetSize, + significanceHeuristic, + buckets, + new TermsAggregator.BucketCountThresholds(1L, 0, requiredSize, 0) + ); } @Override @@ -150,14 +160,13 @@ protected Class implementationClass() { } return new SignificantLongTerms( name, - requiredSize, - minDocCount, metadata, format, subsetSize, supersetSize, significanceHeuristic, - buckets + buckets, + new TermsAggregator.BucketCountThresholds(minDocCount, 0, requiredSize, 0) ); } else { String name = instance.getName(); @@ -185,7 +194,7 @@ protected Class implementationClass() { default: throw new AssertionError("Illegal randomisation branch"); } - return new UnmappedSignificantTerms(name, requiredSize, minDocCount, metadata); + return new UnmappedSignificantTerms(name, new TermsAggregator.BucketCountThresholds(minDocCount, 0, requiredSize, 0), metadata); } } } diff --git a/server/src/test/java/org/opensearch/search/aggregations/bucket/terms/SignificantStringTermsTests.java b/server/src/test/java/org/opensearch/search/aggregations/bucket/terms/SignificantStringTermsTests.java index 63a08a7aa1683..3ac30248ef353 100644 --- a/server/src/test/java/org/opensearch/search/aggregations/bucket/terms/SignificantStringTermsTests.java +++ b/server/src/test/java/org/opensearch/search/aggregations/bucket/terms/SignificantStringTermsTests.java @@ -80,14 +80,13 @@ protected InternalSignificantTerms createTestInstance( } return new SignificantStringTerms( name, - requiredSize, - 1L, metadata, format, subsetSize, supersetSize, significanceHeuristic, - buckets + buckets, + new TermsAggregator.BucketCountThresholds(1L, 0, requiredSize, 0) ); } @@ -153,14 +152,13 @@ protected Class implementationClass() { } return new SignificantStringTerms( name, - requiredSize, - minDocCount, metadata, format, subsetSize, supersetSize, significanceHeuristic, - buckets + buckets, + new TermsAggregator.BucketCountThresholds(minDocCount, 0, requiredSize, 0) ); } else { String name = instance.getName(); @@ -188,7 +186,7 @@ protected Class implementationClass() { default: throw new AssertionError("Illegal randomisation branch"); } - return new UnmappedSignificantTerms(name, requiredSize, minDocCount, metadata); + return new UnmappedSignificantTerms(name, new TermsAggregator.BucketCountThresholds(minDocCount, 0, requiredSize, 0), metadata); } } } diff --git a/server/src/test/java/org/opensearch/search/aggregations/bucket/terms/StringTermsTests.java b/server/src/test/java/org/opensearch/search/aggregations/bucket/terms/StringTermsTests.java index 6757c8e00f83d..0431123405822 100644 --- a/server/src/test/java/org/opensearch/search/aggregations/bucket/terms/StringTermsTests.java +++ b/server/src/test/java/org/opensearch/search/aggregations/bucket/terms/StringTermsTests.java @@ -140,15 +140,13 @@ protected Class implementationClass() { name, stringTerms.reduceOrder, order, - requiredSize, - minDocCount, metadata, format, - shardSize, showTermDocCountError, otherDocCount, buckets, - docCountError + docCountError, + new TermsAggregator.BucketCountThresholds(minDocCount, 0, requiredSize, shardSize) ); } else { String name = instance.getName(); @@ -177,7 +175,7 @@ protected Class implementationClass() { default: throw new AssertionError("Illegal randomisation branch"); } - return new UnmappedTerms(name, order, requiredSize, minDocCount, metadata); + return new UnmappedTerms(name, order, new TermsAggregator.BucketCountThresholds(minDocCount, 0, requiredSize, 0), metadata); } } @@ -206,6 +204,12 @@ private BytesRef[] generateRandomDict() { long minDocCount = 1; int requiredSize = 3; int shardSize = requiredSize + 2; + TermsAggregator.BucketCountThresholds bucketCountThresholds = new TermsAggregator.BucketCountThresholds( + minDocCount, + 0, + requiredSize, + shardSize + ); DocValueFormat format = DocValueFormat.RAW; long otherDocCount = 0; List buckets = new ArrayList<>(); @@ -226,15 +230,13 @@ private BytesRef[] generateRandomDict() { name, reduceOrder, order, - requiredSize, - minDocCount, metadata, format, - shardSize, showTermDocCountError, otherDocCount, buckets, - docCountError + docCountError, + bucketCountThresholds ); } } diff --git a/server/src/test/java/org/opensearch/search/aggregations/bucket/terms/UnsignedLongTermsTests.java b/server/src/test/java/org/opensearch/search/aggregations/bucket/terms/UnsignedLongTermsTests.java index b961039e50501..1e1f01ce35a40 100644 --- a/server/src/test/java/org/opensearch/search/aggregations/bucket/terms/UnsignedLongTermsTests.java +++ b/server/src/test/java/org/opensearch/search/aggregations/bucket/terms/UnsignedLongTermsTests.java @@ -36,6 +36,12 @@ public class UnsignedLongTermsTests extends InternalTermsTestCase { long minDocCount = 1; int requiredSize = 3; int shardSize = requiredSize + 2; + TermsAggregator.BucketCountThresholds bucketCountThresholds = new TermsAggregator.BucketCountThresholds( + minDocCount, + 0, + requiredSize, + shardSize + ); DocValueFormat format = randomNumericDocValueFormat(); long otherDocCount = 0; List buckets = new ArrayList<>(); @@ -52,15 +58,13 @@ public class UnsignedLongTermsTests extends InternalTermsTestCase { name, reduceOrder, order, - requiredSize, - minDocCount, metadata, format, - shardSize, showTermDocCountError, otherDocCount, buckets, - docCountError + docCountError, + bucketCountThresholds ); } @@ -135,15 +139,13 @@ protected Class implementationClass() { name, longTerms.reduceOrder, order, - requiredSize, - minDocCount, metadata, format, - shardSize, showTermDocCountError, otherDocCount, buckets, - docCountError + docCountError, + new TermsAggregator.BucketCountThresholds(minDocCount, 0, requiredSize, shardSize) ); } else { String name = instance.getName(); @@ -172,7 +174,7 @@ protected Class implementationClass() { default: throw new AssertionError("Illegal randomisation branch"); } - return new UnmappedTerms(name, order, requiredSize, minDocCount, metadata); + return new UnmappedTerms(name, order, new TermsAggregator.BucketCountThresholds(minDocCount, 0, requiredSize, 0), metadata); } } } From f239aeca437f4b06a9da6e557ef97eab6c588891 Mon Sep 17 00:00:00 2001 From: Jay Deng Date: Tue, 8 Aug 2023 17:54:34 -0700 Subject: [PATCH 2/4] Addressing comments Signed-off-by: Jay Deng --- .../AggregationCollectorManager.java | 19 +++------ .../GlobalAggCollectorManager.java | 10 +++++ .../aggregations/InternalAggregation.java | 11 ++++++ .../NonGlobalAggCollectorManager.java | 10 +++++ .../GlobalOrdinalsStringTermsAggregator.java | 17 ++------ .../bucket/terms/InternalMappedTerms.java | 2 + .../bucket/terms/InternalMultiTerms.java | 1 + .../terms/InternalSignificantTerms.java | 34 ++++++---------- .../bucket/terms/InternalTerms.java | 39 +++++-------------- .../terms/MapStringTermsAggregator.java | 13 +------ .../bucket/terms/MultiTermsAggregator.java | 13 +------ .../bucket/terms/NumericTermsAggregator.java | 13 +------ .../bucket/terms/TermsAggregator.java | 28 ++++++++++++- .../search/internal/SearchContext.java | 16 ++++++++ .../aggregations/AggregatorTestCase.java | 2 + 15 files changed, 116 insertions(+), 112 deletions(-) diff --git a/server/src/main/java/org/opensearch/search/aggregations/AggregationCollectorManager.java b/server/src/main/java/org/opensearch/search/aggregations/AggregationCollectorManager.java index 11052cd215fb6..5e67193081f03 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/AggregationCollectorManager.java +++ b/server/src/main/java/org/opensearch/search/aggregations/AggregationCollectorManager.java @@ -26,7 +26,7 @@ * aggregation operators */ class AggregationCollectorManager implements CollectorManager { - private final SearchContext context; + protected final SearchContext context; private final CheckedFunction, IOException> aggProvider; private final String collectorReason; @@ -63,18 +63,11 @@ public ReduceableSearchResult reduce(Collection collectors) throws IO } final InternalAggregations internalAggregations = InternalAggregations.from(internals); - // Reduce the aggregations across slices before sending to the coordinator. We will perform shard level reduce iff multiple slices - // were created to execute this request and it used concurrent segment search path - // TODO: Add the check for flag that the request was executed using concurrent search - if (collectors.size() >= 1) { - // using reduce is fine here instead of topLevelReduce as pipeline aggregation is evaluated on the coordinator after all - // documents are collected across shards for an aggregation - return new AggregationReduceableSearchResult( - InternalAggregations.reduce(Collections.singletonList(internalAggregations), context.partialOnShard()) - ); - } else { - return new AggregationReduceableSearchResult(internalAggregations); - } + return buildAggregationResult(internalAggregations); + } + + public AggregationReduceableSearchResult buildAggregationResult(InternalAggregations internalAggregations) { + return new AggregationReduceableSearchResult(internalAggregations); } static Collector createCollector(SearchContext context, List collectors, String reason) throws IOException { diff --git a/server/src/main/java/org/opensearch/search/aggregations/GlobalAggCollectorManager.java b/server/src/main/java/org/opensearch/search/aggregations/GlobalAggCollectorManager.java index 56f53a57a8573..1dcaee7e2ea6b 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/GlobalAggCollectorManager.java +++ b/server/src/main/java/org/opensearch/search/aggregations/GlobalAggCollectorManager.java @@ -14,6 +14,7 @@ import org.opensearch.search.profile.query.CollectorResult; import java.io.IOException; +import java.util.Collections; import java.util.Objects; /** @@ -38,4 +39,13 @@ public Collector newCollector() throws IOException { return super.newCollector(); } } + + @Override + public AggregationReduceableSearchResult buildAggregationResult(InternalAggregations internalAggregations) { + // Reduce the aggregations across slices before sending to the coordinator. We will perform shard level reduce as long as any slices + // were created so that we can apply shard level bucket count thresholds in the reduce phase. + return new AggregationReduceableSearchResult( + InternalAggregations.reduce(Collections.singletonList(internalAggregations), context.partialOnShard()) + ); + } } diff --git a/server/src/main/java/org/opensearch/search/aggregations/InternalAggregation.java b/server/src/main/java/org/opensearch/search/aggregations/InternalAggregation.java index 999ed458f2388..7ac8ac9579a58 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/InternalAggregation.java +++ b/server/src/main/java/org/opensearch/search/aggregations/InternalAggregation.java @@ -40,6 +40,7 @@ import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.rest.action.search.RestSearchAction; import org.opensearch.script.ScriptService; +import org.opensearch.search.aggregations.bucket.terms.TermsAggregator; import org.opensearch.search.aggregations.pipeline.PipelineAggregator; import org.opensearch.search.aggregations.pipeline.PipelineAggregator.PipelineTree; import org.opensearch.search.aggregations.support.AggregationPath; @@ -160,6 +161,16 @@ public boolean isSliceLevel() { return this.isSliceLevel; } + // For slice level partial reduce we will apply shard level `shard_size` and `shard_min_doc_count` limits whereas for coordinator + // level partial reduce it will use top level `size` and `min_doc_count` + public int getRequiredSizeLocal(TermsAggregator.BucketCountThresholds bucketCountThresholds) { + return isSliceLevel() ? bucketCountThresholds.getShardSize() : bucketCountThresholds.getRequiredSize(); + } + + public long getMinDocCountLocal(TermsAggregator.BucketCountThresholds bucketCountThresholds) { + return isSliceLevel() ? bucketCountThresholds.getShardMinDocCount() : bucketCountThresholds.getMinDocCount(); + } + public BigArrays bigArrays() { return bigArrays; } diff --git a/server/src/main/java/org/opensearch/search/aggregations/NonGlobalAggCollectorManager.java b/server/src/main/java/org/opensearch/search/aggregations/NonGlobalAggCollectorManager.java index 3729734c48ed7..adc7cfa775a97 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/NonGlobalAggCollectorManager.java +++ b/server/src/main/java/org/opensearch/search/aggregations/NonGlobalAggCollectorManager.java @@ -14,6 +14,7 @@ import org.opensearch.search.profile.query.CollectorResult; import java.io.IOException; +import java.util.Collections; import java.util.Objects; /** @@ -38,4 +39,13 @@ public Collector newCollector() throws IOException { return super.newCollector(); } } + + @Override + public AggregationReduceableSearchResult buildAggregationResult(InternalAggregations internalAggregations) { + // Reduce the aggregations across slices before sending to the coordinator. We will perform shard level reduce as long as any slices + // were created so that we can apply shard level bucket count thresholds in the reduce phase. + return new AggregationReduceableSearchResult( + InternalAggregations.reduce(Collections.singletonList(internalAggregations), context.partialOnShard()) + ); + } } diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/GlobalOrdinalsStringTermsAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/GlobalOrdinalsStringTermsAggregator.java index 5718ec08eafdd..eed3f518b089e 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/GlobalOrdinalsStringTermsAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/GlobalOrdinalsStringTermsAggregator.java @@ -603,15 +603,6 @@ abstract class ResultStrategy< TB extends InternalMultiBucketAggregation.InternalBucket> implements Releasable { private InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException { - int requiredSizeLocal; - long minDocCountLocal; - if (context.isConcurrentSegmentSearchEnabled()) { - requiredSizeLocal = Integer.MAX_VALUE; - minDocCountLocal = 0; - } else { - requiredSizeLocal = bucketCountThresholds.getShardSize(); - minDocCountLocal = bucketCountThresholds.getShardMinDocCount(); - } if (valueCount == 0) { // no context in this reader InternalAggregation[] results = new InternalAggregation[owningBucketOrds.length]; for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) { @@ -624,11 +615,11 @@ private InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws long[] otherDocCount = new long[owningBucketOrds.length]; for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) { final int size; - if (minDocCountLocal == 0) { + if (context.getMinDocCountLocal(bucketCountThresholds) == 0) { // if minDocCount == 0 then we can end up with more buckets then maxBucketOrd() returns - size = (int) Math.min(valueCount, requiredSizeLocal); + size = (int) Math.min(valueCount, context.getRequiredSizeLocal(bucketCountThresholds)); } else { - size = (int) Math.min(maxBucketOrd(), requiredSizeLocal); + size = (int) Math.min(maxBucketOrd(), context.getRequiredSizeLocal(bucketCountThresholds)); } PriorityQueue ordered = buildPriorityQueue(size); final int finalOrdIdx = ordIdx; @@ -639,7 +630,7 @@ private InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws @Override public void accept(long globalOrd, long bucketOrd, long docCount) throws IOException { otherDocCount[finalOrdIdx] += docCount; - if (docCount >= minDocCountLocal) { + if (docCount >= context.getMinDocCountLocal(bucketCountThresholds)) { if (spare == null) { spare = buildEmptyTemporaryBucket(); } diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/InternalMappedTerms.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/InternalMappedTerms.java index 943d8e37eed15..8d779263490bb 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/InternalMappedTerms.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/InternalMappedTerms.java @@ -52,6 +52,7 @@ */ public abstract class InternalMappedTerms, B extends InternalTerms.Bucket> extends InternalTerms { protected final DocValueFormat format; + protected final int shardSize; protected final boolean showTermDocCountError; protected final long otherDocCount; protected final List buckets; @@ -73,6 +74,7 @@ protected InternalMappedTerms( ) { super(name, reduceOrder, order, bucketCountThresholds, metadata); this.format = format; + this.shardSize = bucketCountThresholds.getShardSize(); this.showTermDocCountError = showTermDocCountError; this.otherDocCount = otherDocCount; this.docCountError = docCountError; diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/InternalMultiTerms.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/InternalMultiTerms.java index 89fd488d21e23..915ed2d3870a2 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/InternalMultiTerms.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/InternalMultiTerms.java @@ -220,6 +220,7 @@ public int compare(List thisObjects, List thatObjects) { } } + private final int shardSize; private final boolean showTermDocCountError; private final long otherDocCount; private final List termFormats; diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/InternalSignificantTerms.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/InternalSignificantTerms.java index 8eb3919f4b88d..a3140be9f5bff 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/InternalSignificantTerms.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/InternalSignificantTerms.java @@ -39,7 +39,6 @@ import org.opensearch.search.aggregations.InternalAggregation; import org.opensearch.search.aggregations.InternalAggregations; import org.opensearch.search.aggregations.InternalMultiBucketAggregation; -import org.opensearch.search.aggregations.bucket.BucketUtils; import org.opensearch.search.aggregations.bucket.terms.heuristic.SignificanceHeuristic; import java.io.IOException; @@ -196,8 +195,6 @@ public final XContentBuilder toXContent(XContentBuilder builder, Params params) protected final int requiredSize; protected final long minDocCount; - protected int shardSize; - protected final long shardMinDocCount; protected final TermsAggregator.BucketCountThresholds bucketCountThresholds; protected InternalSignificantTerms( @@ -208,8 +205,6 @@ protected InternalSignificantTerms( super(name, metadata); this.requiredSize = bucketCountThresholds.getRequiredSize(); this.minDocCount = bucketCountThresholds.getMinDocCount(); - this.shardSize = bucketCountThresholds.getShardSize(); - this.shardMinDocCount = bucketCountThresholds.getShardMinDocCount(); this.bucketCountThresholds = bucketCountThresholds; } @@ -220,9 +215,9 @@ protected InternalSignificantTerms(StreamInput in) throws IOException { super(in); requiredSize = readSize(in); minDocCount = in.readVLong(); - shardSize = BucketUtils.suggestShardSideQueueSize(requiredSize); - shardMinDocCount = 0; - bucketCountThresholds = new TermsAggregator.BucketCountThresholds(minDocCount, shardMinDocCount, requiredSize, shardSize); + // shardMinDocCount and shardSize are not used on the coordinator, so they are not deserialized. We use + // CoordinatorBucketCountThresholds which will throw an exception if they are accessed. + bucketCountThresholds = new TermsAggregator.CoordinatorBucketCountThresholds(minDocCount, -1, requiredSize, -1); } protected final void doWriteTo(StreamOutput out) throws IOException { @@ -238,15 +233,6 @@ protected final void doWriteTo(StreamOutput out) throws IOException { @Override public InternalAggregation reduce(List aggregations, ReduceContext reduceContext) { - int requiredSizeLocal; - long minDocCountLocal; - if (reduceContext.isSliceLevel()) { - requiredSizeLocal = bucketCountThresholds.getShardSize(); - minDocCountLocal = bucketCountThresholds.getShardMinDocCount(); - } else { - requiredSizeLocal = bucketCountThresholds.getRequiredSize(); - minDocCountLocal = bucketCountThresholds.getMinDocCount(); - } long globalSubsetSize = 0; long globalSupersetSize = 0; @@ -289,17 +275,21 @@ public InternalAggregation reduce(List aggregations, Reduce } } SignificanceHeuristic heuristic = getSignificanceHeuristic().rewrite(reduceContext); - final int size = (reduceContext.isFinalReduce() || reduceContext.isSliceLevel()) - ? Math.min(requiredSizeLocal, buckets.size()) + boolean isCoordinatorPartialReduce = reduceContext.isFinalReduce() == false && reduceContext.isSliceLevel() == false; + // Do not apply size threshold on coordinator partial reduce + final int size = !isCoordinatorPartialReduce + ? Math.min(reduceContext.getRequiredSizeLocal(bucketCountThresholds), buckets.size()) : buckets.size(); BucketSignificancePriorityQueue ordered = new BucketSignificancePriorityQueue<>(size); for (Map.Entry> entry : buckets.entrySet()) { List sameTermBuckets = entry.getValue(); final B b = reduceBucket(sameTermBuckets, reduceContext); b.updateScore(heuristic); - if (!(reduceContext.isFinalReduce() || reduceContext.isSliceLevel()) // Don't apply thresholds for partial reduce - || (reduceContext.isSliceLevel() && (b.subsetDf >= minDocCountLocal)) // Score needs to be evaluated only at the coordinator - || ((b.score > 0) && (b.subsetDf >= minDocCountLocal))) { + // For concurrent search case we do not apply bucket count thresholds in buildAggregation and instead is done here during + // reduce. However, the bucket score is only evaluated at the final coordinator reduce. + boolean meetsThresholds = (b.subsetDf >= reduceContext.getMinDocCountLocal(bucketCountThresholds)) + && (((b.score > 0) || reduceContext.isSliceLevel())); + if (isCoordinatorPartialReduce || meetsThresholds) { B removed = ordered.insertWithOverflow(b); if (removed == null) { reduceContext.consumeBucketsAndMaybeBreak(1); diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/InternalTerms.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/InternalTerms.java index 7489822371011..844f68f283eaf 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/InternalTerms.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/InternalTerms.java @@ -45,7 +45,6 @@ import org.opensearch.search.aggregations.InternalMultiBucketAggregation; import org.opensearch.search.aggregations.InternalOrder; import org.opensearch.search.aggregations.KeyComparable; -import org.opensearch.search.aggregations.bucket.BucketUtils; import org.opensearch.search.aggregations.bucket.IteratorAndCurrent; import org.opensearch.search.aggregations.bucket.MultiBucketsAggregation; @@ -58,7 +57,6 @@ import java.util.Map; import java.util.Objects; import java.util.function.Function; -import java.util.stream.Collectors; import static org.opensearch.search.aggregations.InternalOrder.isKeyAsc; import static org.opensearch.search.aggregations.InternalOrder.isKeyOrder; @@ -225,8 +223,6 @@ public int hashCode() { protected final BucketOrder order; protected final int requiredSize; protected final long minDocCount; - protected int shardSize; - protected final long shardMinDocCount; protected final TermsAggregator.BucketCountThresholds bucketCountThresholds; /** @@ -250,8 +246,6 @@ protected InternalTerms( this.bucketCountThresholds = bucketCountThresholds; this.requiredSize = bucketCountThresholds.getRequiredSize(); this.minDocCount = bucketCountThresholds.getMinDocCount(); - this.shardSize = bucketCountThresholds.getShardSize(); - this.shardMinDocCount = bucketCountThresholds.getShardMinDocCount(); } /** @@ -263,9 +257,9 @@ protected InternalTerms(StreamInput in) throws IOException { order = InternalOrder.Streams.readOrder(in); requiredSize = readSize(in); minDocCount = in.readVLong(); - shardSize = BucketUtils.suggestShardSideQueueSize(requiredSize); - shardMinDocCount = 0; - bucketCountThresholds = new TermsAggregator.BucketCountThresholds(minDocCount, shardMinDocCount, requiredSize, shardSize); + // shardMinDocCount and shardSize are not used on the coordinator, so they are not deserialized. We use + // CoordinatorBucketCountThresholds which will throw an exception if they are accessed. + bucketCountThresholds = new TermsAggregator.CoordinatorBucketCountThresholds(minDocCount, -1, requiredSize, getShardSize()); } @Override @@ -394,16 +388,6 @@ private List reduceLegacy(List aggregations, ReduceConte } public InternalAggregation reduce(List aggregations, ReduceContext reduceContext) { - int requiredSizeLocal; - long minDocCountLocal; - if (reduceContext.isSliceLevel()) { - requiredSizeLocal = bucketCountThresholds.getShardSize(); - minDocCountLocal = bucketCountThresholds.getShardMinDocCount(); - } else { - requiredSizeLocal = bucketCountThresholds.getRequiredSize(); - minDocCountLocal = bucketCountThresholds.getMinDocCount(); - } - long sumDocCountError = 0; long otherDocCount = 0; InternalTerms referenceTerms = null; @@ -464,7 +448,7 @@ public InternalAggregation reduce(List aggregations, Reduce } final B[] list; if (reduceContext.isFinalReduce() || reduceContext.isSliceLevel()) { - final int size = Math.min(requiredSizeLocal, reducedBuckets.size()); + final int size = Math.min(reduceContext.getRequiredSizeLocal(bucketCountThresholds), reducedBuckets.size()); // final comparator final BucketPriorityQueue ordered = new BucketPriorityQueue<>(size, order.comparator()); for (B bucket : reducedBuckets) { @@ -474,7 +458,7 @@ public InternalAggregation reduce(List aggregations, Reduce final long finalSumDocCountError = sumDocCountError; bucket.setDocCountError(docCountError -> docCountError + finalSumDocCountError); } - if (bucket.getDocCount() >= minDocCountLocal) { + if (bucket.getDocCount() >= reduceContext.getMinDocCountLocal(bucketCountThresholds)) { B removed = ordered.insertWithOverflow(bucket); if (removed != null) { otherDocCount += removed.getDocCount(); @@ -493,8 +477,8 @@ public InternalAggregation reduce(List aggregations, Reduce } else { // we can prune the list on partial reduce if the aggregation is ordered by key // and not filtered (minDocCount == 0) - int size = isKeyOrder(order) && minDocCountLocal == 0 - ? Math.min(requiredSizeLocal, reducedBuckets.size()) + int size = isKeyOrder(order) && reduceContext.getMinDocCountLocal(bucketCountThresholds) == 0 + ? Math.min(reduceContext.getRequiredSizeLocal(bucketCountThresholds), reducedBuckets.size()) : reducedBuckets.size(); list = createBucketsArray(size); for (int i = 0; i < size; i++) { @@ -515,14 +499,11 @@ public InternalAggregation reduce(List aggregations, Reduce docCountError = aggregations.size() == 1 ? 0 : sumDocCountError; } - // Shards must return buckets sorted by key, so we apply the sort here - List resultList; + // Shards must return buckets sorted by key, so we apply the sort here in shard level reduce if (reduceContext.isSliceLevel()) { - resultList = Arrays.stream(list).sorted(thisReduceOrder.comparator()).collect(Collectors.toList()); - } else { - resultList = Arrays.asList(list); + Arrays.sort(list, thisReduceOrder.comparator()); } - return create(name, resultList, reduceContext.isFinalReduce() ? order : thisReduceOrder, docCountError, otherDocCount); + return create(name, Arrays.asList(list), reduceContext.isFinalReduce() ? order : thisReduceOrder, docCountError, otherDocCount); } @Override diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/MapStringTermsAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/MapStringTermsAggregator.java index a4269a216050b..2537f9a9fd097 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/MapStringTermsAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/MapStringTermsAggregator.java @@ -244,20 +244,11 @@ abstract class ResultStrategy ordered = buildPriorityQueue(size); B spare = null; @@ -266,7 +257,7 @@ private InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws while (ordsEnum.next()) { long docCount = bucketDocCount(ordsEnum.ord()); otherDocCounts[ordIdx] += docCount; - if (docCount < minDocCountLocal) { + if (docCount < context.getMinDocCountLocal(bucketCountThresholds)) { continue; } if (spare == null) { diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/MultiTermsAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/MultiTermsAggregator.java index 499e1743856dd..b3ff7f11a7460 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/MultiTermsAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/MultiTermsAggregator.java @@ -118,22 +118,13 @@ public MultiTermsAggregator( @Override public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException { - int requiredSizeLocal; - long minDocCountLocal; - if (context.isConcurrentSegmentSearchEnabled()) { - requiredSizeLocal = Integer.MAX_VALUE; - minDocCountLocal = 0; - } else { - requiredSizeLocal = bucketCountThresholds.getShardSize(); - minDocCountLocal = bucketCountThresholds.getShardMinDocCount(); - } InternalMultiTerms.Bucket[][] topBucketsPerOrd = new InternalMultiTerms.Bucket[owningBucketOrds.length][]; long[] otherDocCounts = new long[owningBucketOrds.length]; for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) { collectZeroDocEntriesIfNeeded(owningBucketOrds[ordIdx]); long bucketsInOrd = bucketOrds.bucketsInOrd(owningBucketOrds[ordIdx]); - int size = (int) Math.min(bucketsInOrd, requiredSizeLocal); + int size = (int) Math.min(bucketsInOrd, context.getRequiredSizeLocal(bucketCountThresholds)); PriorityQueue ordered = new BucketPriorityQueue<>(size, partiallyBuiltBucketComparator); InternalMultiTerms.Bucket spare = null; BytesRef dest = null; @@ -145,7 +136,7 @@ public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws I while (ordsEnum.next()) { long docCount = bucketDocCount(ordsEnum.ord()); otherDocCounts[ordIdx] += docCount; - if (docCount < minDocCountLocal) { + if (docCount < context.getMinDocCountLocal(bucketCountThresholds)) { continue; } if (spare == null) { diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/NumericTermsAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/NumericTermsAggregator.java index 53e1732e729fd..6f7e0e4bf5afd 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/NumericTermsAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/NumericTermsAggregator.java @@ -173,22 +173,13 @@ abstract class ResultStrategy ordered = buildPriorityQueue(size); B spare = null; BucketOrdsEnum ordsEnum = bucketOrds.ordsEnum(owningBucketOrds[ordIdx]); @@ -196,7 +187,7 @@ private InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws while (ordsEnum.next()) { long docCount = bucketDocCount(ordsEnum.ord()); otherDocCounts[ordIdx] += docCount; - if (docCount < minDocCountLocal) { + if (docCount < context.getMinDocCountLocal(bucketCountThresholds)) { continue; } if (spare == null) { diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/TermsAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/TermsAggregator.java index 9e2aa85bb1dd8..d9fe1eeefceea 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/TermsAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/TermsAggregator.java @@ -39,6 +39,7 @@ import org.opensearch.core.xcontent.ToXContentFragment; import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.search.DocValueFormat; +import org.opensearch.search.aggregations.AggregationExecutionException; import org.opensearch.search.aggregations.Aggregator; import org.opensearch.search.aggregations.AggregatorFactories; import org.opensearch.search.aggregations.BucketOrder; @@ -70,9 +71,9 @@ public abstract class TermsAggregator extends DeferableBucketAggregator { */ public static class BucketCountThresholds implements Writeable, ToXContentFragment { private long minDocCount; - private long shardMinDocCount; + protected long shardMinDocCount; private int requiredSize; - private int shardSize; + protected int shardSize; public BucketCountThresholds(long minDocCount, long shardMinDocCount, int requiredSize, int shardSize) { this.minDocCount = minDocCount; @@ -195,6 +196,29 @@ public boolean equals(Object obj) { } } + // BucketCountThresholds type that throws an exception when shardMinDocCount and shardSize are accessed. This is used for + // deserialization on the coordinator during reduce as shardMinDocCount and shardSize should not be accessed this way on the + // coordinator. + public static class CoordinatorBucketCountThresholds extends BucketCountThresholds { + + public CoordinatorBucketCountThresholds(long minDocCount, long shardMinDocCount, int requiredSize, int shardSize) { + super(minDocCount, shardMinDocCount, requiredSize, shardSize); + } + + @Override + public long getShardMinDocCount() { + throw new AggregationExecutionException("shard_min_doc_count should not be accessed"); + } + + @Override + public int getShardSize() { + if (shardSize < 0) { + throw new AggregationExecutionException("Invalid shard_size accessed"); + } + return shardSize; + } + } + protected final DocValueFormat format; protected final BucketCountThresholds bucketCountThresholds; protected final BucketOrder order; diff --git a/server/src/main/java/org/opensearch/search/internal/SearchContext.java b/server/src/main/java/org/opensearch/search/internal/SearchContext.java index bc2a0658e5a6d..600f91b0dc42e 100644 --- a/server/src/main/java/org/opensearch/search/internal/SearchContext.java +++ b/server/src/main/java/org/opensearch/search/internal/SearchContext.java @@ -35,6 +35,7 @@ import org.apache.lucene.search.CollectorManager; import org.apache.lucene.search.FieldDoc; import org.apache.lucene.search.Query; +import org.apache.lucene.util.ArrayUtil; import org.opensearch.action.search.SearchShardTask; import org.opensearch.action.search.SearchType; import org.opensearch.common.Nullable; @@ -57,6 +58,7 @@ import org.opensearch.search.aggregations.BucketCollectorProcessor; import org.opensearch.search.aggregations.InternalAggregation; import org.opensearch.search.aggregations.SearchContextAggregations; +import org.opensearch.search.aggregations.bucket.terms.TermsAggregator; import org.opensearch.search.collapse.CollapseContext; import org.opensearch.search.dfs.DfsSearchResult; import org.opensearch.search.fetch.FetchPhase; @@ -400,6 +402,20 @@ public boolean isConcurrentSegmentSearchEnabled() { return false; } + /** + * Returns the local size threshold based on search context + */ + public int getRequiredSizeLocal(TermsAggregator.BucketCountThresholds bucketCountThresholds) { + return isConcurrentSegmentSearchEnabled() ? ArrayUtil.MAX_ARRAY_LENGTH - 1 : bucketCountThresholds.getShardSize(); + } + + /** + * Returns the local minDocCount threshold based on search context + */ + public long getMinDocCountLocal(TermsAggregator.BucketCountThresholds bucketCountThresholds) { + return isConcurrentSegmentSearchEnabled() ? 0 : bucketCountThresholds.getShardMinDocCount(); + } + /** * Adds a releasable that will be freed when this context is closed. */ diff --git a/test/framework/src/main/java/org/opensearch/search/aggregations/AggregatorTestCase.java b/test/framework/src/main/java/org/opensearch/search/aggregations/AggregatorTestCase.java index 874d60a4097f2..2c18d70e6838c 100644 --- a/test/framework/src/main/java/org/opensearch/search/aggregations/AggregatorTestCase.java +++ b/test/framework/src/main/java/org/opensearch/search/aggregations/AggregatorTestCase.java @@ -350,6 +350,8 @@ public boolean shouldCache(Query query) { when(searchContext.aggregations()).thenReturn(new SearchContextAggregations(AggregatorFactories.EMPTY, bucketConsumer)); when(searchContext.query()).thenReturn(query); when(searchContext.bucketCollectorProcessor()).thenReturn(new BucketCollectorProcessor()); + when(searchContext.getMinDocCountLocal(any())).thenCallRealMethod(); + when(searchContext.getRequiredSizeLocal(any())).thenCallRealMethod(); /* * Always use the circuit breaking big arrays instance so that the CircuitBreakerService * we're passed gets a chance to break. From aac255907fa178534962804a87b0ddcad5ab20c5 Mon Sep 17 00:00:00 2001 From: Jay Deng Date: Tue, 8 Aug 2023 19:28:05 -0700 Subject: [PATCH 3/4] Re-introduce shardSize member to InternalMultiTerms and InternalMappedTerms Signed-off-by: Jay Deng --- .../aggregations/TermsReduceBenchmark.java | 1 + .../StringTermsSerializationBenchmark.java | 1 + .../terms/AbstractStringTermsAggregator.java | 14 +++++++- .../bucket/terms/DoubleTerms.java | 4 +++ .../GlobalOrdinalsStringTermsAggregator.java | 1 + .../bucket/terms/InternalMappedTerms.java | 3 +- .../bucket/terms/InternalMultiTerms.java | 5 ++- .../aggregations/bucket/terms/LongTerms.java | 5 +++ .../terms/MapStringTermsAggregator.java | 1 + .../bucket/terms/MultiTermsAggregator.java | 2 ++ .../bucket/terms/NumericTermsAggregator.java | 32 +++++++++++++++++-- .../bucket/terms/StringTerms.java | 4 +++ .../bucket/terms/TermsAggregator.java | 17 +++++----- .../bucket/terms/UnsignedLongTerms.java | 5 +++ .../InternalAggregationsTests.java | 2 ++ .../InternalMultiBucketAggregationTests.java | 2 ++ .../bucket/terms/DoubleTermsTests.java | 2 ++ .../bucket/terms/InternalMultiTermsTests.java | 1 + .../bucket/terms/LongTermsTests.java | 2 ++ .../bucket/terms/StringTermsTests.java | 2 ++ .../bucket/terms/UnsignedLongTermsTests.java | 2 ++ 21 files changed, 95 insertions(+), 13 deletions(-) diff --git a/benchmarks/src/main/java/org/opensearch/benchmark/search/aggregations/TermsReduceBenchmark.java b/benchmarks/src/main/java/org/opensearch/benchmark/search/aggregations/TermsReduceBenchmark.java index c3c1103b5b098..180676e7b2ed2 100644 --- a/benchmarks/src/main/java/org/opensearch/benchmark/search/aggregations/TermsReduceBenchmark.java +++ b/benchmarks/src/main/java/org/opensearch/benchmark/search/aggregations/TermsReduceBenchmark.java @@ -173,6 +173,7 @@ private StringTerms newTerms(Random rand, BytesRef[] dict, boolean withNested) { BucketOrder.count(false), Collections.emptyMap(), DocValueFormat.RAW, + numShards, true, 0, buckets, diff --git a/benchmarks/src/main/java/org/opensearch/benchmark/search/aggregations/bucket/terms/StringTermsSerializationBenchmark.java b/benchmarks/src/main/java/org/opensearch/benchmark/search/aggregations/bucket/terms/StringTermsSerializationBenchmark.java index c3979d0722035..8d528d94e463e 100644 --- a/benchmarks/src/main/java/org/opensearch/benchmark/search/aggregations/bucket/terms/StringTermsSerializationBenchmark.java +++ b/benchmarks/src/main/java/org/opensearch/benchmark/search/aggregations/bucket/terms/StringTermsSerializationBenchmark.java @@ -89,6 +89,7 @@ private StringTerms newTerms(boolean withNested) { BucketOrder.key(true), null, DocValueFormat.RAW, + buckets, false, 100000, resultBuckets, diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/AbstractStringTermsAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/AbstractStringTermsAggregator.java index c52ec4c6f7df4..d06a0ed9976fc 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/AbstractStringTermsAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/AbstractStringTermsAggregator.java @@ -72,7 +72,19 @@ abstract class AbstractStringTermsAggregator extends TermsAggregator { } protected StringTerms buildEmptyTermsAggregation() { - return new StringTerms(name, order, order, metadata(), format, showTermDocCountError, 0, emptyList(), 0, bucketCountThresholds); + return new StringTerms( + name, + order, + order, + metadata(), + format, + bucketCountThresholds.getShardSize(), + showTermDocCountError, + 0, + emptyList(), + 0, + bucketCountThresholds + ); } protected SignificantStringTerms buildEmptySignificantTermsAggregation(long subsetSize, SignificanceHeuristic significanceHeuristic) { diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/DoubleTerms.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/DoubleTerms.java index f4e3475940ada..de02d5a938644 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/DoubleTerms.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/DoubleTerms.java @@ -132,6 +132,7 @@ public DoubleTerms( BucketOrder order, Map metadata, DocValueFormat format, + int shardSize, boolean showTermDocCountError, long otherDocCount, List buckets, @@ -144,6 +145,7 @@ public DoubleTerms( order, metadata, format, + shardSize, showTermDocCountError, otherDocCount, buckets, @@ -172,6 +174,7 @@ public DoubleTerms create(List buckets) { order, metadata, format, + shardSize, showTermDocCountError, otherDocCount, buckets, @@ -200,6 +203,7 @@ protected DoubleTerms create(String name, List buckets, BucketOrder redu order, getMetadata(), format, + shardSize, showTermDocCountError, otherDocCount, buckets, diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/GlobalOrdinalsStringTermsAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/GlobalOrdinalsStringTermsAggregator.java index eed3f518b089e..a3be215f7e1c4 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/GlobalOrdinalsStringTermsAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/GlobalOrdinalsStringTermsAggregator.java @@ -801,6 +801,7 @@ StringTerms buildResult(long owningBucketOrd, long otherDocCount, StringTerms.Bu order, metadata(), format, + bucketCountThresholds.getShardSize(), showTermDocCountError, otherDocCount, Arrays.asList(topBuckets), diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/InternalMappedTerms.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/InternalMappedTerms.java index 8d779263490bb..d542064df24d7 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/InternalMappedTerms.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/InternalMappedTerms.java @@ -66,6 +66,7 @@ protected InternalMappedTerms( BucketOrder order, Map metadata, DocValueFormat format, + int shardSize, boolean showTermDocCountError, long otherDocCount, List buckets, @@ -74,7 +75,7 @@ protected InternalMappedTerms( ) { super(name, reduceOrder, order, bucketCountThresholds, metadata); this.format = format; - this.shardSize = bucketCountThresholds.getShardSize(); + this.shardSize = shardSize; this.showTermDocCountError = showTermDocCountError; this.otherDocCount = otherDocCount; this.docCountError = docCountError; diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/InternalMultiTerms.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/InternalMultiTerms.java index 915ed2d3870a2..4d1cbd4ce72f1 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/InternalMultiTerms.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/InternalMultiTerms.java @@ -234,6 +234,7 @@ public InternalMultiTerms( BucketOrder reduceOrder, BucketOrder order, Map metadata, + int shardSize, boolean showTermDocCountError, long otherDocCount, long docCountError, @@ -242,7 +243,7 @@ public InternalMultiTerms( TermsAggregator.BucketCountThresholds bucketCountThresholds ) { super(name, reduceOrder, order, bucketCountThresholds, metadata); - this.shardSize = bucketCountThresholds.getShardSize(); + this.shardSize = shardSize; this.showTermDocCountError = showTermDocCountError; this.otherDocCount = otherDocCount; this.termFormats = formats; @@ -277,6 +278,7 @@ public InternalMultiTerms create(List buckets) { reduceOrder, order, metadata, + shardSize, showTermDocCountError, otherDocCount, docCountError, @@ -354,6 +356,7 @@ protected InternalMultiTerms create( reduceOrder, order, metadata, + shardSize, showTermDocCountError, otherDocCount, docCountError, diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/LongTerms.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/LongTerms.java index 4ddc07830a319..fe78145dce3e7 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/LongTerms.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/LongTerms.java @@ -144,6 +144,7 @@ public LongTerms( BucketOrder order, Map metadata, DocValueFormat format, + int shardSize, boolean showTermDocCountError, long otherDocCount, List buckets, @@ -156,6 +157,7 @@ public LongTerms( order, metadata, format, + shardSize, showTermDocCountError, otherDocCount, buckets, @@ -184,6 +186,7 @@ public LongTerms create(List buckets) { order, metadata, format, + shardSize, showTermDocCountError, otherDocCount, buckets, @@ -212,6 +215,7 @@ protected LongTerms create(String name, List buckets, BucketOrder reduce order, getMetadata(), format, + shardSize, showTermDocCountError, otherDocCount, buckets, @@ -287,6 +291,7 @@ static DoubleTerms convertLongTermsToDouble(LongTerms longTerms, DocValueFormat longTerms.order, longTerms.metadata, longTerms.format, + longTerms.shardSize, longTerms.showTermDocCountError, longTerms.otherDocCount, newBuckets, diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/MapStringTermsAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/MapStringTermsAggregator.java index 2537f9a9fd097..e375988be12e6 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/MapStringTermsAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/MapStringTermsAggregator.java @@ -456,6 +456,7 @@ StringTerms buildResult(long owningBucketOrd, long otherDocCount, StringTerms.Bu order, metadata(), format, + bucketCountThresholds.getShardSize(), showTermDocCountError, otherDocCount, Arrays.asList(topBuckets), diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/MultiTermsAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/MultiTermsAggregator.java index b3ff7f11a7460..6107b590e2a7d 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/MultiTermsAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/MultiTermsAggregator.java @@ -183,6 +183,7 @@ InternalMultiTerms buildResult(long owningBucketOrd, long otherDocCount, Interna reduceOrder, order, metadata(), + bucketCountThresholds.getShardSize(), showTermDocCountError, otherDocCount, 0, @@ -199,6 +200,7 @@ public InternalAggregation buildEmptyAggregation() { order, order, metadata(), + bucketCountThresholds.getShardSize(), showTermDocCountError, 0, 0, diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/NumericTermsAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/NumericTermsAggregator.java index 6f7e0e4bf5afd..8318f90a63ecd 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/NumericTermsAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/NumericTermsAggregator.java @@ -397,6 +397,7 @@ LongTerms buildResult(long owningBucketOrd, long otherDocCount, LongTerms.Bucket order, metadata(), format, + bucketCountThresholds.getShardSize(), showTermDocCountError, otherDocCount, List.of(topBuckets), @@ -407,7 +408,19 @@ LongTerms buildResult(long owningBucketOrd, long otherDocCount, LongTerms.Bucket @Override LongTerms buildEmptyResult() { - return new LongTerms(name, order, order, metadata(), format, showTermDocCountError, 0, emptyList(), 0, bucketCountThresholds); + return new LongTerms( + name, + order, + order, + metadata(), + format, + bucketCountThresholds.getShardSize(), + showTermDocCountError, + 0, + emptyList(), + 0, + bucketCountThresholds + ); } } @@ -464,6 +477,7 @@ DoubleTerms buildResult(long owningBucketOrd, long otherDocCount, DoubleTerms.Bu order, metadata(), format, + bucketCountThresholds.getShardSize(), showTermDocCountError, otherDocCount, List.of(topBuckets), @@ -474,7 +488,19 @@ DoubleTerms buildResult(long owningBucketOrd, long otherDocCount, DoubleTerms.Bu @Override DoubleTerms buildEmptyResult() { - return new DoubleTerms(name, order, order, metadata(), format, showTermDocCountError, 0, emptyList(), 0, bucketCountThresholds); + return new DoubleTerms( + name, + order, + order, + metadata(), + format, + bucketCountThresholds.getShardSize(), + showTermDocCountError, + 0, + emptyList(), + 0, + bucketCountThresholds + ); } } @@ -530,6 +556,7 @@ UnsignedLongTerms buildResult(long owningBucketOrd, long otherDocCount, Unsigned order, metadata(), format, + bucketCountThresholds.getShardSize(), showTermDocCountError, otherDocCount, List.of(topBuckets), @@ -546,6 +573,7 @@ UnsignedLongTerms buildEmptyResult() { order, metadata(), format, + bucketCountThresholds.getShardSize(), showTermDocCountError, 0, emptyList(), diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/StringTerms.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/StringTerms.java index b4e7ef02e110c..6dedc65ff14e3 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/StringTerms.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/StringTerms.java @@ -136,6 +136,7 @@ public StringTerms( BucketOrder order, Map metadata, DocValueFormat format, + int shardSize, boolean showTermDocCountError, long otherDocCount, List buckets, @@ -148,6 +149,7 @@ public StringTerms( order, metadata, format, + shardSize, showTermDocCountError, otherDocCount, buckets, @@ -176,6 +178,7 @@ public StringTerms create(List buckets) { order, metadata, format, + shardSize, showTermDocCountError, otherDocCount, buckets, @@ -209,6 +212,7 @@ protected StringTerms create(String name, List buckets, BucketOrder redu order, getMetadata(), format, + shardSize, showTermDocCountError, otherDocCount, buckets, diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/TermsAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/TermsAggregator.java index d9fe1eeefceea..debb139ac5104 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/TermsAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/TermsAggregator.java @@ -196,9 +196,13 @@ public boolean equals(Object obj) { } } - // BucketCountThresholds type that throws an exception when shardMinDocCount and shardSize are accessed. This is used for - // deserialization on the coordinator during reduce as shardMinDocCount and shardSize should not be accessed this way on the - // coordinator. + /** + * BucketCountThresholds type that throws an exception when shardMinDocCount or shardSize are accessed. This is used for + * deserialization on the coordinator during reduce as shardMinDocCount and shardSize should not be accessed this way on the + * coordinator. + * + * @opensearch.internal + */ public static class CoordinatorBucketCountThresholds extends BucketCountThresholds { public CoordinatorBucketCountThresholds(long minDocCount, long shardMinDocCount, int requiredSize, int shardSize) { @@ -207,15 +211,12 @@ public CoordinatorBucketCountThresholds(long minDocCount, long shardMinDocCount, @Override public long getShardMinDocCount() { - throw new AggregationExecutionException("shard_min_doc_count should not be accessed"); + throw new AggregationExecutionException("shard_min_doc_count should not be accessed via CoordinatorBucketCountThresholds"); } @Override public int getShardSize() { - if (shardSize < 0) { - throw new AggregationExecutionException("Invalid shard_size accessed"); - } - return shardSize; + throw new AggregationExecutionException("shard_size should not be accessed via CoordinatorBucketCountThresholds"); } } diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/UnsignedLongTerms.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/UnsignedLongTerms.java index 3eb8e6693ba79..edeec00d366fd 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/UnsignedLongTerms.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/UnsignedLongTerms.java @@ -123,6 +123,7 @@ public UnsignedLongTerms( BucketOrder order, Map metadata, DocValueFormat format, + int shardSize, boolean showTermDocCountError, long otherDocCount, List buckets, @@ -135,6 +136,7 @@ public UnsignedLongTerms( order, metadata, format, + shardSize, showTermDocCountError, otherDocCount, buckets, @@ -163,6 +165,7 @@ public UnsignedLongTerms create(List buckets) { order, metadata, format, + shardSize, showTermDocCountError, otherDocCount, buckets, @@ -191,6 +194,7 @@ protected UnsignedLongTerms create(String name, List buckets, BucketOrde order, getMetadata(), format, + shardSize, showTermDocCountError, otherDocCount, buckets, @@ -266,6 +270,7 @@ static DoubleTerms convertUnsignedLongTermsToDouble(UnsignedLongTerms unsignedLo unsignedLongTerms.order, unsignedLongTerms.metadata, unsignedLongTerms.format, + unsignedLongTerms.shardSize, unsignedLongTerms.showTermDocCountError, unsignedLongTerms.otherDocCount, newBuckets, diff --git a/server/src/test/java/org/opensearch/search/aggregations/InternalAggregationsTests.java b/server/src/test/java/org/opensearch/search/aggregations/InternalAggregationsTests.java index 0a3e92267efa4..59bc90788a1e7 100644 --- a/server/src/test/java/org/opensearch/search/aggregations/InternalAggregationsTests.java +++ b/server/src/test/java/org/opensearch/search/aggregations/InternalAggregationsTests.java @@ -79,6 +79,7 @@ public void testNonFinalReduceTopLevelPipelineAggs() { BucketOrder.key(true), Collections.emptyMap(), DocValueFormat.RAW, + 25, false, 10, Collections.emptyList(), @@ -97,6 +98,7 @@ public void testFinalReduceTopLevelPipelineAggs() { BucketOrder.key(true), Collections.emptyMap(), DocValueFormat.RAW, + 25, false, 10, Collections.emptyList(), diff --git a/server/src/test/java/org/opensearch/search/aggregations/InternalMultiBucketAggregationTests.java b/server/src/test/java/org/opensearch/search/aggregations/InternalMultiBucketAggregationTests.java index cd900972ff23b..b7f4094da9990 100644 --- a/server/src/test/java/org/opensearch/search/aggregations/InternalMultiBucketAggregationTests.java +++ b/server/src/test/java/org/opensearch/search/aggregations/InternalMultiBucketAggregationTests.java @@ -171,6 +171,7 @@ public void testResolveToSpecificBucket() { BucketOrder.count(false), Collections.emptyMap(), DocValueFormat.RAW, + 1, false, 0, stringBuckets, @@ -208,6 +209,7 @@ public void testResolveToMissingSpecificBucket() { BucketOrder.count(false), Collections.emptyMap(), DocValueFormat.RAW, + 1, false, 0, stringBuckets, diff --git a/server/src/test/java/org/opensearch/search/aggregations/bucket/terms/DoubleTermsTests.java b/server/src/test/java/org/opensearch/search/aggregations/bucket/terms/DoubleTermsTests.java index 0ac3214531d5d..5fe9c1dee358d 100644 --- a/server/src/test/java/org/opensearch/search/aggregations/bucket/terms/DoubleTermsTests.java +++ b/server/src/test/java/org/opensearch/search/aggregations/bucket/terms/DoubleTermsTests.java @@ -83,6 +83,7 @@ public class DoubleTermsTests extends InternalTermsTestCase { order, metadata, format, + shardSize, showTermDocCountError, otherDocCount, buckets, @@ -164,6 +165,7 @@ protected Class implementationClass() { order, metadata, format, + shardSize, showTermDocCountError, otherDocCount, buckets, diff --git a/server/src/test/java/org/opensearch/search/aggregations/bucket/terms/InternalMultiTermsTests.java b/server/src/test/java/org/opensearch/search/aggregations/bucket/terms/InternalMultiTermsTests.java index 61416326e6176..9f8bab1179ad6 100644 --- a/server/src/test/java/org/opensearch/search/aggregations/bucket/terms/InternalMultiTermsTests.java +++ b/server/src/test/java/org/opensearch/search/aggregations/bucket/terms/InternalMultiTermsTests.java @@ -77,6 +77,7 @@ public class InternalMultiTermsTests extends InternalTermsTestCase { reduceOrder, order, metadata, + shardSize, showTermDocCountError, otherDocCount, docCountError, diff --git a/server/src/test/java/org/opensearch/search/aggregations/bucket/terms/LongTermsTests.java b/server/src/test/java/org/opensearch/search/aggregations/bucket/terms/LongTermsTests.java index aade7e6d4a388..44fa9f5e79593 100644 --- a/server/src/test/java/org/opensearch/search/aggregations/bucket/terms/LongTermsTests.java +++ b/server/src/test/java/org/opensearch/search/aggregations/bucket/terms/LongTermsTests.java @@ -83,6 +83,7 @@ public class LongTermsTests extends InternalTermsTestCase { order, metadata, format, + shardSize, showTermDocCountError, otherDocCount, buckets, @@ -164,6 +165,7 @@ protected Class implementationClass() { order, metadata, format, + shardSize, showTermDocCountError, otherDocCount, buckets, diff --git a/server/src/test/java/org/opensearch/search/aggregations/bucket/terms/StringTermsTests.java b/server/src/test/java/org/opensearch/search/aggregations/bucket/terms/StringTermsTests.java index 0431123405822..deba96fd3ae19 100644 --- a/server/src/test/java/org/opensearch/search/aggregations/bucket/terms/StringTermsTests.java +++ b/server/src/test/java/org/opensearch/search/aggregations/bucket/terms/StringTermsTests.java @@ -142,6 +142,7 @@ protected Class implementationClass() { order, metadata, format, + shardSize, showTermDocCountError, otherDocCount, buckets, @@ -232,6 +233,7 @@ private BytesRef[] generateRandomDict() { order, metadata, format, + shardSize, showTermDocCountError, otherDocCount, buckets, diff --git a/server/src/test/java/org/opensearch/search/aggregations/bucket/terms/UnsignedLongTermsTests.java b/server/src/test/java/org/opensearch/search/aggregations/bucket/terms/UnsignedLongTermsTests.java index 1e1f01ce35a40..478961c2a404c 100644 --- a/server/src/test/java/org/opensearch/search/aggregations/bucket/terms/UnsignedLongTermsTests.java +++ b/server/src/test/java/org/opensearch/search/aggregations/bucket/terms/UnsignedLongTermsTests.java @@ -60,6 +60,7 @@ public class UnsignedLongTermsTests extends InternalTermsTestCase { order, metadata, format, + shardSize, showTermDocCountError, otherDocCount, buckets, @@ -141,6 +142,7 @@ protected Class implementationClass() { order, metadata, format, + shardSize, showTermDocCountError, otherDocCount, buckets, From 3b55340459948f70078b65aa9a43371cb05b3819 Mon Sep 17 00:00:00 2001 From: Jay Deng Date: Wed, 9 Aug 2023 10:42:36 -0700 Subject: [PATCH 4/4] Introduce LocalBucketCountThresholds for local size and min_doc_count values Signed-off-by: Jay Deng --- .../AggregationCollectorManager.java | 2 +- .../GlobalAggCollectorManager.java | 2 +- .../aggregations/InternalAggregation.java | 19 +++++----- .../NonGlobalAggCollectorManager.java | 2 +- .../bucket/LocalBucketCountThresholds.java | 36 +++++++++++++++++++ .../GlobalOrdinalsStringTermsAggregator.java | 10 +++--- .../terms/InternalSignificantTerms.java | 7 ++-- .../bucket/terms/InternalTerms.java | 10 +++--- .../terms/MapStringTermsAggregator.java | 6 ++-- .../bucket/terms/MultiTermsAggregator.java | 6 ++-- .../bucket/terms/NumericTermsAggregator.java | 6 ++-- .../bucket/terms/TermsAggregator.java | 4 +-- .../search/internal/SearchContext.java | 18 +++++----- .../aggregations/AggregatorTestCase.java | 3 +- 14 files changed, 89 insertions(+), 42 deletions(-) create mode 100644 server/src/main/java/org/opensearch/search/aggregations/bucket/LocalBucketCountThresholds.java diff --git a/server/src/main/java/org/opensearch/search/aggregations/AggregationCollectorManager.java b/server/src/main/java/org/opensearch/search/aggregations/AggregationCollectorManager.java index 5e67193081f03..ae06f80516e3e 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/AggregationCollectorManager.java +++ b/server/src/main/java/org/opensearch/search/aggregations/AggregationCollectorManager.java @@ -66,7 +66,7 @@ public ReduceableSearchResult reduce(Collection collectors) throws IO return buildAggregationResult(internalAggregations); } - public AggregationReduceableSearchResult buildAggregationResult(InternalAggregations internalAggregations) { + protected AggregationReduceableSearchResult buildAggregationResult(InternalAggregations internalAggregations) { return new AggregationReduceableSearchResult(internalAggregations); } diff --git a/server/src/main/java/org/opensearch/search/aggregations/GlobalAggCollectorManager.java b/server/src/main/java/org/opensearch/search/aggregations/GlobalAggCollectorManager.java index 1dcaee7e2ea6b..41e8aba895480 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/GlobalAggCollectorManager.java +++ b/server/src/main/java/org/opensearch/search/aggregations/GlobalAggCollectorManager.java @@ -41,7 +41,7 @@ public Collector newCollector() throws IOException { } @Override - public AggregationReduceableSearchResult buildAggregationResult(InternalAggregations internalAggregations) { + protected AggregationReduceableSearchResult buildAggregationResult(InternalAggregations internalAggregations) { // Reduce the aggregations across slices before sending to the coordinator. We will perform shard level reduce as long as any slices // were created so that we can apply shard level bucket count thresholds in the reduce phase. return new AggregationReduceableSearchResult( diff --git a/server/src/main/java/org/opensearch/search/aggregations/InternalAggregation.java b/server/src/main/java/org/opensearch/search/aggregations/InternalAggregation.java index 7ac8ac9579a58..d1de09cffd674 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/InternalAggregation.java +++ b/server/src/main/java/org/opensearch/search/aggregations/InternalAggregation.java @@ -40,6 +40,7 @@ import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.rest.action.search.RestSearchAction; import org.opensearch.script.ScriptService; +import org.opensearch.search.aggregations.bucket.LocalBucketCountThresholds; import org.opensearch.search.aggregations.bucket.terms.TermsAggregator; import org.opensearch.search.aggregations.pipeline.PipelineAggregator; import org.opensearch.search.aggregations.pipeline.PipelineAggregator.PipelineTree; @@ -161,14 +162,16 @@ public boolean isSliceLevel() { return this.isSliceLevel; } - // For slice level partial reduce we will apply shard level `shard_size` and `shard_min_doc_count` limits whereas for coordinator - // level partial reduce it will use top level `size` and `min_doc_count` - public int getRequiredSizeLocal(TermsAggregator.BucketCountThresholds bucketCountThresholds) { - return isSliceLevel() ? bucketCountThresholds.getShardSize() : bucketCountThresholds.getRequiredSize(); - } - - public long getMinDocCountLocal(TermsAggregator.BucketCountThresholds bucketCountThresholds) { - return isSliceLevel() ? bucketCountThresholds.getShardMinDocCount() : bucketCountThresholds.getMinDocCount(); + /** + * For slice level partial reduce we will apply shard level `shard_size` and `shard_min_doc_count` limits + * whereas for coordinator level partial reduce it will use top level `size` and `min_doc_count` + */ + public LocalBucketCountThresholds asLocalBucketCountThresholds(TermsAggregator.BucketCountThresholds bucketCountThresholds) { + if (isSliceLevel()) { + return new LocalBucketCountThresholds(bucketCountThresholds.getShardMinDocCount(), bucketCountThresholds.getShardSize()); + } else { + return new LocalBucketCountThresholds(bucketCountThresholds.getMinDocCount(), bucketCountThresholds.getRequiredSize()); + } } public BigArrays bigArrays() { diff --git a/server/src/main/java/org/opensearch/search/aggregations/NonGlobalAggCollectorManager.java b/server/src/main/java/org/opensearch/search/aggregations/NonGlobalAggCollectorManager.java index adc7cfa775a97..984eefb9b52a4 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/NonGlobalAggCollectorManager.java +++ b/server/src/main/java/org/opensearch/search/aggregations/NonGlobalAggCollectorManager.java @@ -41,7 +41,7 @@ public Collector newCollector() throws IOException { } @Override - public AggregationReduceableSearchResult buildAggregationResult(InternalAggregations internalAggregations) { + protected AggregationReduceableSearchResult buildAggregationResult(InternalAggregations internalAggregations) { // Reduce the aggregations across slices before sending to the coordinator. We will perform shard level reduce as long as any slices // were created so that we can apply shard level bucket count thresholds in the reduce phase. return new AggregationReduceableSearchResult( diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/LocalBucketCountThresholds.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/LocalBucketCountThresholds.java new file mode 100644 index 0000000000000..98dc07cec49ab --- /dev/null +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/LocalBucketCountThresholds.java @@ -0,0 +1,36 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search.aggregations.bucket; + +import org.opensearch.search.aggregations.bucket.terms.TermsAggregator; + +/** + * BucketCountThresholds type that holds the local (either shard level or request level) bucket count thresholds in minDocCount and requireSize fields. + * Similar to {@link TermsAggregator.BucketCountThresholds} however only provides getters for the local members and no setters. + * + * @opensearch.internal + */ +public class LocalBucketCountThresholds { + + private final long minDocCount; + private final int requiredSize; + + public LocalBucketCountThresholds(long localminDocCount, int localRequiredSize) { + this.minDocCount = localminDocCount; + this.requiredSize = localRequiredSize; + } + + public int getRequiredSize() { + return requiredSize; + } + + public long getMinDocCount() { + return minDocCount; + } +} diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/GlobalOrdinalsStringTermsAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/GlobalOrdinalsStringTermsAggregator.java index a3be215f7e1c4..53e5bce91ead2 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/GlobalOrdinalsStringTermsAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/GlobalOrdinalsStringTermsAggregator.java @@ -57,6 +57,7 @@ import org.opensearch.search.aggregations.InternalOrder; import org.opensearch.search.aggregations.LeafBucketCollector; import org.opensearch.search.aggregations.LeafBucketCollectorBase; +import org.opensearch.search.aggregations.bucket.LocalBucketCountThresholds; import org.opensearch.search.aggregations.bucket.terms.SignificanceLookup.BackgroundFrequencyForBytes; import org.opensearch.search.aggregations.bucket.terms.heuristic.SignificanceHeuristic; import org.opensearch.search.aggregations.support.ValuesSource; @@ -603,6 +604,7 @@ abstract class ResultStrategy< TB extends InternalMultiBucketAggregation.InternalBucket> implements Releasable { private InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException { + LocalBucketCountThresholds localBucketCountThresholds = context.asLocalBucketCountThresholds(bucketCountThresholds); if (valueCount == 0) { // no context in this reader InternalAggregation[] results = new InternalAggregation[owningBucketOrds.length]; for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) { @@ -615,11 +617,11 @@ private InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws long[] otherDocCount = new long[owningBucketOrds.length]; for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) { final int size; - if (context.getMinDocCountLocal(bucketCountThresholds) == 0) { + if (localBucketCountThresholds.getMinDocCount() == 0) { // if minDocCount == 0 then we can end up with more buckets then maxBucketOrd() returns - size = (int) Math.min(valueCount, context.getRequiredSizeLocal(bucketCountThresholds)); + size = (int) Math.min(valueCount, localBucketCountThresholds.getRequiredSize()); } else { - size = (int) Math.min(maxBucketOrd(), context.getRequiredSizeLocal(bucketCountThresholds)); + size = (int) Math.min(maxBucketOrd(), localBucketCountThresholds.getRequiredSize()); } PriorityQueue ordered = buildPriorityQueue(size); final int finalOrdIdx = ordIdx; @@ -630,7 +632,7 @@ private InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws @Override public void accept(long globalOrd, long bucketOrd, long docCount) throws IOException { otherDocCount[finalOrdIdx] += docCount; - if (docCount >= context.getMinDocCountLocal(bucketCountThresholds)) { + if (docCount >= localBucketCountThresholds.getMinDocCount()) { if (spare == null) { spare = buildEmptyTemporaryBucket(); } diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/InternalSignificantTerms.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/InternalSignificantTerms.java index a3140be9f5bff..03bb519ed9961 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/InternalSignificantTerms.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/InternalSignificantTerms.java @@ -39,6 +39,7 @@ import org.opensearch.search.aggregations.InternalAggregation; import org.opensearch.search.aggregations.InternalAggregations; import org.opensearch.search.aggregations.InternalMultiBucketAggregation; +import org.opensearch.search.aggregations.bucket.LocalBucketCountThresholds; import org.opensearch.search.aggregations.bucket.terms.heuristic.SignificanceHeuristic; import java.io.IOException; @@ -233,7 +234,7 @@ protected final void doWriteTo(StreamOutput out) throws IOException { @Override public InternalAggregation reduce(List aggregations, ReduceContext reduceContext) { - + LocalBucketCountThresholds localBucketCountThresholds = reduceContext.asLocalBucketCountThresholds(bucketCountThresholds); long globalSubsetSize = 0; long globalSupersetSize = 0; // Compute the overall result set size and the corpus size using the @@ -278,7 +279,7 @@ public InternalAggregation reduce(List aggregations, Reduce boolean isCoordinatorPartialReduce = reduceContext.isFinalReduce() == false && reduceContext.isSliceLevel() == false; // Do not apply size threshold on coordinator partial reduce final int size = !isCoordinatorPartialReduce - ? Math.min(reduceContext.getRequiredSizeLocal(bucketCountThresholds), buckets.size()) + ? Math.min(localBucketCountThresholds.getRequiredSize(), buckets.size()) : buckets.size(); BucketSignificancePriorityQueue ordered = new BucketSignificancePriorityQueue<>(size); for (Map.Entry> entry : buckets.entrySet()) { @@ -287,7 +288,7 @@ public InternalAggregation reduce(List aggregations, Reduce b.updateScore(heuristic); // For concurrent search case we do not apply bucket count thresholds in buildAggregation and instead is done here during // reduce. However, the bucket score is only evaluated at the final coordinator reduce. - boolean meetsThresholds = (b.subsetDf >= reduceContext.getMinDocCountLocal(bucketCountThresholds)) + boolean meetsThresholds = (b.subsetDf >= localBucketCountThresholds.getMinDocCount()) && (((b.score > 0) || reduceContext.isSliceLevel())); if (isCoordinatorPartialReduce || meetsThresholds) { B removed = ordered.insertWithOverflow(b); diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/InternalTerms.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/InternalTerms.java index 844f68f283eaf..0be7de5ded32f 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/InternalTerms.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/InternalTerms.java @@ -46,6 +46,7 @@ import org.opensearch.search.aggregations.InternalOrder; import org.opensearch.search.aggregations.KeyComparable; import org.opensearch.search.aggregations.bucket.IteratorAndCurrent; +import org.opensearch.search.aggregations.bucket.LocalBucketCountThresholds; import org.opensearch.search.aggregations.bucket.MultiBucketsAggregation; import java.io.IOException; @@ -388,6 +389,7 @@ private List reduceLegacy(List aggregations, ReduceConte } public InternalAggregation reduce(List aggregations, ReduceContext reduceContext) { + LocalBucketCountThresholds localBucketCountThresholds = reduceContext.asLocalBucketCountThresholds(bucketCountThresholds); long sumDocCountError = 0; long otherDocCount = 0; InternalTerms referenceTerms = null; @@ -448,7 +450,7 @@ public InternalAggregation reduce(List aggregations, Reduce } final B[] list; if (reduceContext.isFinalReduce() || reduceContext.isSliceLevel()) { - final int size = Math.min(reduceContext.getRequiredSizeLocal(bucketCountThresholds), reducedBuckets.size()); + final int size = Math.min(localBucketCountThresholds.getRequiredSize(), reducedBuckets.size()); // final comparator final BucketPriorityQueue ordered = new BucketPriorityQueue<>(size, order.comparator()); for (B bucket : reducedBuckets) { @@ -458,7 +460,7 @@ public InternalAggregation reduce(List aggregations, Reduce final long finalSumDocCountError = sumDocCountError; bucket.setDocCountError(docCountError -> docCountError + finalSumDocCountError); } - if (bucket.getDocCount() >= reduceContext.getMinDocCountLocal(bucketCountThresholds)) { + if (bucket.getDocCount() >= localBucketCountThresholds.getMinDocCount()) { B removed = ordered.insertWithOverflow(bucket); if (removed != null) { otherDocCount += removed.getDocCount(); @@ -477,8 +479,8 @@ public InternalAggregation reduce(List aggregations, Reduce } else { // we can prune the list on partial reduce if the aggregation is ordered by key // and not filtered (minDocCount == 0) - int size = isKeyOrder(order) && reduceContext.getMinDocCountLocal(bucketCountThresholds) == 0 - ? Math.min(reduceContext.getRequiredSizeLocal(bucketCountThresholds), reducedBuckets.size()) + int size = isKeyOrder(order) && localBucketCountThresholds.getMinDocCount() == 0 + ? Math.min(localBucketCountThresholds.getRequiredSize(), reducedBuckets.size()) : reducedBuckets.size(); list = createBucketsArray(size); for (int i = 0; i < size; i++) { diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/MapStringTermsAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/MapStringTermsAggregator.java index e375988be12e6..b0d2194cccc84 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/MapStringTermsAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/MapStringTermsAggregator.java @@ -50,6 +50,7 @@ import org.opensearch.search.aggregations.InternalOrder; import org.opensearch.search.aggregations.LeafBucketCollector; import org.opensearch.search.aggregations.LeafBucketCollectorBase; +import org.opensearch.search.aggregations.bucket.LocalBucketCountThresholds; import org.opensearch.search.aggregations.bucket.terms.SignificanceLookup.BackgroundFrequencyForBytes; import org.opensearch.search.aggregations.bucket.terms.heuristic.SignificanceHeuristic; import org.opensearch.search.aggregations.support.ValuesSource; @@ -244,11 +245,12 @@ abstract class ResultStrategy ordered = buildPriorityQueue(size); B spare = null; @@ -257,7 +259,7 @@ private InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws while (ordsEnum.next()) { long docCount = bucketDocCount(ordsEnum.ord()); otherDocCounts[ordIdx] += docCount; - if (docCount < context.getMinDocCountLocal(bucketCountThresholds)) { + if (docCount < localBucketCountThresholds.getMinDocCount()) { continue; } if (spare == null) { diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/MultiTermsAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/MultiTermsAggregator.java index 6107b590e2a7d..c86302844b730 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/MultiTermsAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/MultiTermsAggregator.java @@ -33,6 +33,7 @@ import org.opensearch.search.aggregations.InternalOrder; import org.opensearch.search.aggregations.LeafBucketCollector; import org.opensearch.search.aggregations.bucket.DeferableBucketAggregator; +import org.opensearch.search.aggregations.bucket.LocalBucketCountThresholds; import org.opensearch.search.aggregations.support.AggregationPath; import org.opensearch.search.aggregations.support.ValuesSource; import org.opensearch.search.internal.SearchContext; @@ -118,13 +119,14 @@ public MultiTermsAggregator( @Override public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException { + LocalBucketCountThresholds localBucketCountThresholds = context.asLocalBucketCountThresholds(bucketCountThresholds); InternalMultiTerms.Bucket[][] topBucketsPerOrd = new InternalMultiTerms.Bucket[owningBucketOrds.length][]; long[] otherDocCounts = new long[owningBucketOrds.length]; for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) { collectZeroDocEntriesIfNeeded(owningBucketOrds[ordIdx]); long bucketsInOrd = bucketOrds.bucketsInOrd(owningBucketOrds[ordIdx]); - int size = (int) Math.min(bucketsInOrd, context.getRequiredSizeLocal(bucketCountThresholds)); + int size = (int) Math.min(bucketsInOrd, localBucketCountThresholds.getRequiredSize()); PriorityQueue ordered = new BucketPriorityQueue<>(size, partiallyBuiltBucketComparator); InternalMultiTerms.Bucket spare = null; BytesRef dest = null; @@ -136,7 +138,7 @@ public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws I while (ordsEnum.next()) { long docCount = bucketDocCount(ordsEnum.ord()); otherDocCounts[ordIdx] += docCount; - if (docCount < context.getMinDocCountLocal(bucketCountThresholds)) { + if (docCount < localBucketCountThresholds.getMinDocCount()) { continue; } if (spare == null) { diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/NumericTermsAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/NumericTermsAggregator.java index 8318f90a63ecd..8bab7ffbbb90f 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/NumericTermsAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/NumericTermsAggregator.java @@ -52,6 +52,7 @@ import org.opensearch.search.aggregations.InternalOrder; import org.opensearch.search.aggregations.LeafBucketCollector; import org.opensearch.search.aggregations.LeafBucketCollectorBase; +import org.opensearch.search.aggregations.bucket.LocalBucketCountThresholds; import org.opensearch.search.aggregations.bucket.terms.IncludeExclude.LongFilter; import org.opensearch.search.aggregations.bucket.terms.LongKeyedBucketOrds.BucketOrdsEnum; import org.opensearch.search.aggregations.bucket.terms.SignificanceLookup.BackgroundFrequencyForLong; @@ -173,13 +174,14 @@ abstract class ResultStrategy ordered = buildPriorityQueue(size); B spare = null; BucketOrdsEnum ordsEnum = bucketOrds.ordsEnum(owningBucketOrds[ordIdx]); @@ -187,7 +189,7 @@ private InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws while (ordsEnum.next()) { long docCount = bucketDocCount(ordsEnum.ord()); otherDocCounts[ordIdx] += docCount; - if (docCount < context.getMinDocCountLocal(bucketCountThresholds)) { + if (docCount < localBucketCountThresholds.getMinDocCount()) { continue; } if (spare == null) { diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/TermsAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/TermsAggregator.java index debb139ac5104..7cacf1e918380 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/TermsAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/TermsAggregator.java @@ -71,9 +71,9 @@ public abstract class TermsAggregator extends DeferableBucketAggregator { */ public static class BucketCountThresholds implements Writeable, ToXContentFragment { private long minDocCount; - protected long shardMinDocCount; + private long shardMinDocCount; private int requiredSize; - protected int shardSize; + private int shardSize; public BucketCountThresholds(long minDocCount, long shardMinDocCount, int requiredSize, int shardSize) { this.minDocCount = minDocCount; diff --git a/server/src/main/java/org/opensearch/search/internal/SearchContext.java b/server/src/main/java/org/opensearch/search/internal/SearchContext.java index 600f91b0dc42e..704683a59c473 100644 --- a/server/src/main/java/org/opensearch/search/internal/SearchContext.java +++ b/server/src/main/java/org/opensearch/search/internal/SearchContext.java @@ -58,6 +58,7 @@ import org.opensearch.search.aggregations.BucketCollectorProcessor; import org.opensearch.search.aggregations.InternalAggregation; import org.opensearch.search.aggregations.SearchContextAggregations; +import org.opensearch.search.aggregations.bucket.LocalBucketCountThresholds; import org.opensearch.search.aggregations.bucket.terms.TermsAggregator; import org.opensearch.search.collapse.CollapseContext; import org.opensearch.search.dfs.DfsSearchResult; @@ -403,17 +404,14 @@ public boolean isConcurrentSegmentSearchEnabled() { } /** - * Returns the local size threshold based on search context + * Returns local bucket count thresholds based on concurrent segment search status */ - public int getRequiredSizeLocal(TermsAggregator.BucketCountThresholds bucketCountThresholds) { - return isConcurrentSegmentSearchEnabled() ? ArrayUtil.MAX_ARRAY_LENGTH - 1 : bucketCountThresholds.getShardSize(); - } - - /** - * Returns the local minDocCount threshold based on search context - */ - public long getMinDocCountLocal(TermsAggregator.BucketCountThresholds bucketCountThresholds) { - return isConcurrentSegmentSearchEnabled() ? 0 : bucketCountThresholds.getShardMinDocCount(); + public LocalBucketCountThresholds asLocalBucketCountThresholds(TermsAggregator.BucketCountThresholds bucketCountThresholds) { + if (isConcurrentSegmentSearchEnabled()) { + return new LocalBucketCountThresholds(0, ArrayUtil.MAX_ARRAY_LENGTH - 1); + } else { + return new LocalBucketCountThresholds(bucketCountThresholds.getShardMinDocCount(), bucketCountThresholds.getShardSize()); + } } /** diff --git a/test/framework/src/main/java/org/opensearch/search/aggregations/AggregatorTestCase.java b/test/framework/src/main/java/org/opensearch/search/aggregations/AggregatorTestCase.java index 2c18d70e6838c..27c406b019c77 100644 --- a/test/framework/src/main/java/org/opensearch/search/aggregations/AggregatorTestCase.java +++ b/test/framework/src/main/java/org/opensearch/search/aggregations/AggregatorTestCase.java @@ -350,8 +350,7 @@ public boolean shouldCache(Query query) { when(searchContext.aggregations()).thenReturn(new SearchContextAggregations(AggregatorFactories.EMPTY, bucketConsumer)); when(searchContext.query()).thenReturn(query); when(searchContext.bucketCollectorProcessor()).thenReturn(new BucketCollectorProcessor()); - when(searchContext.getMinDocCountLocal(any())).thenCallRealMethod(); - when(searchContext.getRequiredSizeLocal(any())).thenCallRealMethod(); + when(searchContext.asLocalBucketCountThresholds(any())).thenCallRealMethod(); /* * Always use the circuit breaking big arrays instance so that the CircuitBreakerService * we're passed gets a chance to break.