-
Notifications
You must be signed in to change notification settings - Fork 24.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Save memory when rare_terms is not on top #57948
Changes from all commits
f403c0c
7c69d58
fbf96ea
280f727
34f3e11
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -20,9 +20,9 @@ | |
|
||
import org.apache.lucene.index.LeafReaderContext; | ||
import org.apache.lucene.index.SortedNumericDocValues; | ||
import org.apache.lucene.util.CollectionUtil; | ||
import org.elasticsearch.common.lease.Releasables; | ||
import org.elasticsearch.common.util.LongHash; | ||
import org.elasticsearch.common.util.SetBackedScalingCuckooFilter; | ||
import org.elasticsearch.search.DocValueFormat; | ||
import org.elasticsearch.search.aggregations.Aggregator; | ||
import org.elasticsearch.search.aggregations.AggregatorFactories; | ||
|
@@ -34,6 +34,7 @@ | |
|
||
import java.io.IOException; | ||
import java.util.ArrayList; | ||
import java.util.Arrays; | ||
import java.util.List; | ||
import java.util.Map; | ||
|
||
|
@@ -42,111 +43,145 @@ | |
/** | ||
* An aggregator that finds "rare" string values (e.g. terms agg that orders ascending) | ||
*/ | ||
public class LongRareTermsAggregator extends AbstractRareTermsAggregator<ValuesSource.Numeric, IncludeExclude.LongFilter, Long> { | ||
|
||
protected LongHash bucketOrds; | ||
|
||
LongRareTermsAggregator(String name, AggregatorFactories factories, ValuesSource.Numeric valuesSource, DocValueFormat format, | ||
SearchContext aggregationContext, Aggregator parent, IncludeExclude.LongFilter longFilter, | ||
int maxDocCount, double precision, Map<String, Object> metadata) throws IOException { | ||
super(name, factories, aggregationContext, parent, metadata, maxDocCount, precision, format, valuesSource, longFilter); | ||
this.bucketOrds = new LongHash(1, aggregationContext.bigArrays()); | ||
public class LongRareTermsAggregator extends AbstractRareTermsAggregator { | ||
private final ValuesSource.Numeric valuesSource; | ||
private final IncludeExclude.LongFilter filter; | ||
private final LongKeyedBucketOrds bucketOrds; | ||
|
||
LongRareTermsAggregator( | ||
String name, | ||
AggregatorFactories factories, | ||
ValuesSource.Numeric valuesSource, | ||
DocValueFormat format, | ||
SearchContext aggregationContext, | ||
Aggregator parent, | ||
IncludeExclude.LongFilter filter, | ||
int maxDocCount, | ||
double precision, | ||
boolean collectsFromSingleBucket, | ||
Map<String, Object> metadata | ||
) throws IOException { | ||
super( | ||
name, | ||
factories, | ||
aggregationContext, | ||
parent, | ||
metadata, | ||
maxDocCount, | ||
precision, | ||
format, | ||
collectsFromSingleBucket | ||
); | ||
this.valuesSource = valuesSource; | ||
this.filter = filter; | ||
this.bucketOrds = LongKeyedBucketOrds.build(context.bigArrays(), collectsFromSingleBucket); | ||
} | ||
|
||
protected SortedNumericDocValues getValues(ValuesSource.Numeric valuesSource, LeafReaderContext ctx) throws IOException { | ||
return valuesSource.longValues(ctx); | ||
} | ||
|
||
@Override | ||
public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, | ||
final LeafBucketCollector sub) throws IOException { | ||
final SortedNumericDocValues values = getValues(valuesSource, ctx); | ||
public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucketCollector sub) throws IOException { | ||
SortedNumericDocValues values = getValues(valuesSource, ctx); | ||
return new LeafBucketCollectorBase(sub, values) { | ||
|
||
@Override | ||
public void collect(int docId, long owningBucketOrdinal) throws IOException { | ||
if (values.advanceExact(docId)) { | ||
final int valuesCount = values.docValueCount(); | ||
long previous = Long.MAX_VALUE; | ||
for (int i = 0; i < valuesCount; ++i) { | ||
final long val = values.nextValue(); | ||
if (previous != val || i == 0) { | ||
if ((includeExclude == null) || (includeExclude.accept(val))) { | ||
doCollect(sub, val, docId); | ||
} | ||
previous = val; | ||
} | ||
public void collect(int docId, long owningBucketOrd) throws IOException { | ||
if (false == values.advanceExact(docId)) { | ||
return; | ||
} | ||
int valuesCount = values.docValueCount(); | ||
long previous = Long.MAX_VALUE; | ||
for (int i = 0; i < valuesCount; ++i) { | ||
long val = values.nextValue(); | ||
if (i == 0 && previous == val) { | ||
continue; | ||
} | ||
previous = val; | ||
if (filter != null && false == filter.accept(val)) { | ||
continue; | ||
} | ||
long bucketOrdinal = bucketOrds.add(owningBucketOrd, val); | ||
if (bucketOrdinal < 0) { // already seen | ||
bucketOrdinal = -1 - bucketOrdinal; | ||
collectExistingBucket(sub, docId, bucketOrdinal); | ||
} else { | ||
collectBucket(sub, docId, bucketOrdinal); | ||
} | ||
} | ||
} | ||
}; | ||
} | ||
|
||
@Override | ||
long addValueToOrds(Long value) { | ||
return bucketOrds.add(value); | ||
} | ||
|
||
/** | ||
* Merges the ordinals to a minimal set, populates the CuckooFilter and | ||
* generates a final set of buckets. | ||
* | ||
* If a term is below the maxDocCount, it is turned into a Bucket. Otherwise, | ||
* the term is added to the filter, and pruned from the ordinal map. If | ||
* necessary the ordinal map is merged down to a minimal set to remove deletions | ||
*/ | ||
private List<LongRareTerms.Bucket> buildSketch() { | ||
long deletionCount = 0; | ||
LongHash newBucketOrds = new LongHash(1, context.bigArrays()); | ||
List<LongRareTerms.Bucket> buckets = new ArrayList<>(); | ||
try (LongHash oldBucketOrds = bucketOrds) { | ||
|
||
long[] mergeMap = new long[(int) oldBucketOrds.size()]; | ||
for (int i = 0; i < oldBucketOrds.size(); i++) { | ||
long oldKey = oldBucketOrds.get(i); | ||
long newBucketOrd = -1; | ||
|
||
long docCount = bucketDocCount(i); | ||
// if the key is below threshold, reinsert into the new ords | ||
if (docCount <= maxDocCount) { | ||
newBucketOrd = newBucketOrds.add(oldKey); | ||
LongRareTerms.Bucket bucket = new LongRareTerms.Bucket(oldKey, docCount, null, format); | ||
bucket.bucketOrd = newBucketOrd; | ||
buckets.add(bucket); | ||
} else { | ||
// Make a note when one of the ords has been deleted | ||
deletionCount += 1; | ||
filter.add(oldKey); | ||
public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. General comment about this method: we have a lot of "ords" being referenced and it's hard to keep track of which ord is which. E.g. we have the bucket ordinals that our parent is requesting we build, and then we have the bucket ordinals from each of those instances that we are collecting into buckets Not sure how, but if we could find a way to rename the variables to help identify or disambiguate I think it would help a bunch. |
||
/* | ||
* Collect the list of buckets, populate the filter with terms | ||
* that are too frequent, and figure out how to merge sub-buckets. | ||
*/ | ||
LongRareTerms.Bucket[][] rarestPerOrd = new LongRareTerms.Bucket[owningBucketOrds.length][]; | ||
SetBackedScalingCuckooFilter[] filters = new SetBackedScalingCuckooFilter[owningBucketOrds.length]; | ||
long keepCount = 0; | ||
long[] mergeMap = new long[(int) bucketOrds.size()]; | ||
Arrays.fill(mergeMap, -1); | ||
long offset = 0; | ||
for (int owningOrdIdx = 0; owningOrdIdx < owningBucketOrds.length; owningOrdIdx++) { | ||
try (LongHash bucketsInThisOwningBucketToCollect = new LongHash(1, context.bigArrays())) { | ||
filters[owningOrdIdx] = newFilter(); | ||
List<LongRareTerms.Bucket> builtBuckets = new ArrayList<>(); | ||
LongKeyedBucketOrds.BucketOrdsEnum collectedBuckets = bucketOrds.ordsEnum(owningBucketOrds[owningOrdIdx]); | ||
while (collectedBuckets.next()) { | ||
long docCount = bucketDocCount(collectedBuckets.ord()); | ||
// if the key is below threshold, reinsert into the new ords | ||
if (docCount <= maxDocCount) { | ||
LongRareTerms.Bucket bucket = new LongRareTerms.Bucket(collectedBuckets.value(), docCount, null, format); | ||
bucket.bucketOrd = offset + bucketsInThisOwningBucketToCollect.add(collectedBuckets.value()); | ||
mergeMap[(int) collectedBuckets.ord()] = bucket.bucketOrd; | ||
builtBuckets.add(bucket); | ||
keepCount++; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we just change this to a boolean flag? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we need to perform the merge if we don't keep all the buckets. We can remove buckets for two reasons now!
This counter will catch both ways. I couldn't come up with a cleaner way to do it. |
||
} else { | ||
filters[owningOrdIdx].add(collectedBuckets.value()); | ||
} | ||
} | ||
mergeMap[i] = newBucketOrd; | ||
rarestPerOrd[owningOrdIdx] = builtBuckets.toArray(LongRareTerms.Bucket[]::new); | ||
offset += bucketsInThisOwningBucketToCollect.size(); | ||
} | ||
} | ||
|
||
// Only merge/delete the ordinals if we have actually deleted one, | ||
// to save on some redundant work | ||
if (deletionCount > 0) { | ||
mergeBuckets(mergeMap, newBucketOrds.size()); | ||
if (deferringCollector != null) { | ||
deferringCollector.mergeBuckets(mergeMap); | ||
} | ||
/* | ||
* Only merge/delete the ordinals if we have actually deleted one, | ||
* to save on some redundant work. | ||
*/ | ||
if (keepCount != mergeMap.length) { | ||
mergeBuckets(mergeMap, offset); | ||
if (deferringCollector != null) { | ||
deferringCollector.mergeBuckets(mergeMap); | ||
} | ||
} | ||
bucketOrds = newBucketOrds; | ||
return buckets; | ||
} | ||
|
||
@Override | ||
public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException { | ||
assert owningBucketOrds.length == 1 && owningBucketOrds[0] == 0; | ||
List<LongRareTerms.Bucket> buckets = buildSketch(); | ||
buildSubAggsForBuckets(buckets, b -> b.bucketOrd, (b, aggs) -> b.aggregations = aggs); | ||
|
||
CollectionUtil.introSort(buckets, ORDER.comparator()); | ||
return new InternalAggregation[] {new LongRareTerms(name, ORDER, metadata(), format, buckets, maxDocCount, filter)}; | ||
/* | ||
* Now build the results! | ||
*/ | ||
buildSubAggsForAllBuckets(rarestPerOrd, b -> b.bucketOrd, (b, aggs) -> b.aggregations = aggs); | ||
InternalAggregation[] result = new InternalAggregation[owningBucketOrds.length]; | ||
for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) { | ||
Arrays.sort(rarestPerOrd[ordIdx], ORDER.comparator()); | ||
result[ordIdx] = new LongRareTerms( | ||
name, | ||
ORDER, | ||
metadata(), | ||
format, | ||
Arrays.asList(rarestPerOrd[ordIdx]), | ||
maxDocCount, | ||
filters[ordIdx] | ||
); | ||
} | ||
return result; | ||
} | ||
|
||
@Override | ||
public InternalAggregation buildEmptyAggregation() { | ||
return new LongRareTerms(name, ORDER, metadata(), format, emptyList(), 0, filter); | ||
return new LongRareTerms(name, ORDER, metadata(), format, emptyList(), 0, newFilter()); | ||
} | ||
|
||
@Override | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I hate javadocs so much :( optimizing rendered readability while sacrificing IDE readability :(
Thanks for fixing this :)