Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Save memory when rare_terms is not on top #57948

Merged
merged 5 commits into from
Jun 12, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,13 @@
* An approximate set membership datastructure that scales as more unique values are inserted.
* Can definitively say if a member does not exist (no false negatives), but may say an item exists
* when it does not (has false positives). Similar in usage to a Bloom Filter.
*
* <p>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I hate javadocs so much :( optimizing rendered readability while sacrificing IDE readability :(

Thanks for fixing this :)

* Internally, the datastructure maintains a Set of hashes up to a specified threshold. This provides
* 100% accurate membership queries.
*
* <p>
* When the threshold is breached, a list of CuckooFilters are created and used to track membership.
* These filters are approximate similar to Bloom Filters.
*
* <p>
* This datastructure scales as more values are inserted by growing the list of CuckooFilters.
* Final size is dependent on the cardinality of data inserted, and the precision specified.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -236,23 +236,6 @@ protected final <B> void buildSubAggsForAllBuckets(B[][] buckets,
}
}

/**
* Build the sub aggregation results for a list of buckets and set them on
* the buckets. This is usually used by aggregations that are selective
* in which bucket they build. They use some mechanism of selecting a list
* of buckets to build use this method to "finish" building the results.
* @param buckets the buckets to finish building
* @param bucketToOrd how to convert a bucket into an ordinal
* @param setAggs how to set the sub-aggregation results on a bucket
*/
protected final <B> void buildSubAggsForBuckets(List<B> buckets,
ToLongFunction<B> bucketToOrd, BiConsumer<B, InternalAggregations> setAggs) throws IOException {
InternalAggregations[] results = buildSubAggsForBuckets(buckets.stream().mapToLong(bucketToOrd).toArray());
for (int i = 0; i < buckets.size(); i++) {
setAggs.accept(buckets.get(i), results[i]);
}
}

/**
* Build aggregation results for an aggregator that has a fixed number of buckets per owning ordinal.
* @param <B> the type of the bucket
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,46 +24,47 @@
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.aggregations.BucketOrder;
import org.elasticsearch.search.aggregations.LeafBucketCollector;
import org.elasticsearch.search.aggregations.bucket.DeferableBucketAggregator;
import org.elasticsearch.search.aggregations.bucket.DeferringBucketCollector;
import org.elasticsearch.search.aggregations.bucket.MergingBucketsDeferringCollector;
import org.elasticsearch.search.aggregations.bucket.nested.NestedAggregator;
import org.elasticsearch.search.aggregations.support.ValuesSource;
import org.elasticsearch.search.internal.SearchContext;

import java.io.IOException;
import java.util.Map;
import java.util.Random;

public abstract class AbstractRareTermsAggregator<T extends ValuesSource,
U extends IncludeExclude.Filter, V> extends DeferableBucketAggregator {
public abstract class AbstractRareTermsAggregator extends DeferableBucketAggregator {

static final BucketOrder ORDER = BucketOrder.compound(BucketOrder.count(true), BucketOrder.key(true)); // sort by count ascending

protected final long maxDocCount;
protected final double precision;
private final double precision;
protected final DocValueFormat format;
protected final T valuesSource;
protected final U includeExclude;

MergingBucketsDeferringCollector deferringCollector;
final SetBackedScalingCuckooFilter filter;

AbstractRareTermsAggregator(String name, AggregatorFactories factories, SearchContext context,
Aggregator parent, Map<String, Object> metadata, long maxDocCount, double precision,
DocValueFormat format, T valuesSource, U includeExclude) throws IOException {
protected final boolean collectsFromSingleBucket;
private final int filterSeed;

protected MergingBucketsDeferringCollector deferringCollector;

AbstractRareTermsAggregator(
String name,
AggregatorFactories factories,
SearchContext context,
Aggregator parent,
Map<String, Object> metadata,
long maxDocCount,
double precision,
DocValueFormat format,
boolean collectsFromSingleBucket
) throws IOException {
super(name, factories, context, parent, metadata);

// We seed the rng with the ShardID so results are deterministic and don't change randomly
this.filter = new SetBackedScalingCuckooFilter(10000, new Random(context.indexShard().shardId().hashCode()), precision);
this.filter.registerBreaker(this::addRequestCircuitBreakerBytes);

this.maxDocCount = maxDocCount;
this.precision = precision;
this.format = format;
this.valuesSource = valuesSource;
this.includeExclude = includeExclude;
this.collectsFromSingleBucket = collectsFromSingleBucket;
// We seed the rng with the ShardID so results are deterministic and don't change randomly
this.filterSeed = context.indexShard().shardId().hashCode();
String scoringAgg = subAggsNeedScore();
String nestedAgg = descendsFromNestedAggregator(parent);
if (scoringAgg != null && nestedAgg != null) {
Expand All @@ -81,6 +82,12 @@ public abstract class AbstractRareTermsAggregator<T extends ValuesSource,
}
}

protected SetBackedScalingCuckooFilter newFilter() {
SetBackedScalingCuckooFilter filter = new SetBackedScalingCuckooFilter(10000, new Random(filterSeed), precision);
filter.registerBreaker(this::addRequestCircuitBreakerBytes);
return filter;
}

@Override
protected boolean shouldDefer(Aggregator aggregator) {
return true;
Expand Down Expand Up @@ -110,21 +117,4 @@ private String descendsFromNestedAggregator(Aggregator parent) {
}
return null;
}

protected void doCollect(LeafBucketCollector subCollector, V val, int docId) throws IOException {
long bucketOrdinal = addValueToOrds(val);

if (bucketOrdinal < 0) { // already seen
bucketOrdinal = -1 - bucketOrdinal;
collectExistingBucket(subCollector, docId, bucketOrdinal);
} else {
collectBucket(subCollector, docId, bucketOrdinal);
}
}

/**
* Add's the value to the ordinal map. Return the newly allocated id if it wasn't in the ordinal map yet,
* or <code>-1-id</code> if it was already present
*/
abstract long addValueToOrds(V value);
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@

import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.SortedNumericDocValues;
import org.apache.lucene.util.CollectionUtil;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.util.LongHash;
import org.elasticsearch.common.util.SetBackedScalingCuckooFilter;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.AggregatorFactories;
Expand All @@ -34,6 +34,7 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;

Expand All @@ -42,111 +43,144 @@
/**
* An aggregator that finds "rare" string values (e.g. terms agg that orders ascending)
*/
public class LongRareTermsAggregator extends AbstractRareTermsAggregator<ValuesSource.Numeric, IncludeExclude.LongFilter, Long> {

protected LongHash bucketOrds;

LongRareTermsAggregator(String name, AggregatorFactories factories, ValuesSource.Numeric valuesSource, DocValueFormat format,
SearchContext aggregationContext, Aggregator parent, IncludeExclude.LongFilter longFilter,
int maxDocCount, double precision, Map<String, Object> metadata) throws IOException {
super(name, factories, aggregationContext, parent, metadata, maxDocCount, precision, format, valuesSource, longFilter);
this.bucketOrds = new LongHash(1, aggregationContext.bigArrays());
public class LongRareTermsAggregator extends AbstractRareTermsAggregator {
private final ValuesSource.Numeric valuesSource;
private final IncludeExclude.LongFilter filter;
private final LongKeyedBucketOrds bucketOrds;

LongRareTermsAggregator(
String name,
AggregatorFactories factories,
ValuesSource.Numeric valuesSource,
DocValueFormat format,
SearchContext aggregationContext,
Aggregator parent,
IncludeExclude.LongFilter filter,
int maxDocCount,
double precision,
boolean collectsFromSingleBucket,
Map<String, Object> metadata
) throws IOException {
super(
name,
factories,
aggregationContext,
parent,
metadata,
maxDocCount,
precision,
format,
collectsFromSingleBucket
);
this.valuesSource = valuesSource;
this.filter = filter;
this.bucketOrds = LongKeyedBucketOrds.build(context.bigArrays(), collectsFromSingleBucket);
}

protected SortedNumericDocValues getValues(ValuesSource.Numeric valuesSource, LeafReaderContext ctx) throws IOException {
return valuesSource.longValues(ctx);
}

@Override
public LeafBucketCollector getLeafCollector(LeafReaderContext ctx,
final LeafBucketCollector sub) throws IOException {
final SortedNumericDocValues values = getValues(valuesSource, ctx);
public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucketCollector sub) throws IOException {
SortedNumericDocValues values = getValues(valuesSource, ctx);
return new LeafBucketCollectorBase(sub, values) {

@Override
public void collect(int docId, long owningBucketOrdinal) throws IOException {
if (values.advanceExact(docId)) {
final int valuesCount = values.docValueCount();
long previous = Long.MAX_VALUE;
for (int i = 0; i < valuesCount; ++i) {
final long val = values.nextValue();
if (previous != val || i == 0) {
if ((includeExclude == null) || (includeExclude.accept(val))) {
doCollect(sub, val, docId);
}
previous = val;
}
public void collect(int docId, long owningBucketOrd) throws IOException {
if (false == values.advanceExact(docId)) {
return;
}
int valuesCount = values.docValueCount();
long previous = Long.MAX_VALUE;
for (int i = 0; i < valuesCount; ++i) {
long val = values.nextValue();
if (i == 0 && previous == val) {
continue;
}
previous = val;
if (filter != null && false == filter.accept(val)) {
continue;
}
long bucketOrdinal = bucketOrds.add(owningBucketOrd, val);
if (bucketOrdinal < 0) { // already seen
bucketOrdinal = -1 - bucketOrdinal;
collectExistingBucket(sub, docId, bucketOrdinal);
} else {
collectBucket(sub, docId, bucketOrdinal);
}
}
}
};
}

@Override
long addValueToOrds(Long value) {
return bucketOrds.add(value);
}

/**
* Merges the ordinals to a minimal set, populates the CuckooFilter and
* generates a final set of buckets.
*
* If a term is below the maxDocCount, it is turned into a Bucket. Otherwise,
* the term is added to the filter, and pruned from the ordinal map. If
* necessary the ordinal map is merged down to a minimal set to remove deletions
*/
private List<LongRareTerms.Bucket> buildSketch() {
long deletionCount = 0;
LongHash newBucketOrds = new LongHash(1, context.bigArrays());
List<LongRareTerms.Bucket> buckets = new ArrayList<>();
try (LongHash oldBucketOrds = bucketOrds) {

long[] mergeMap = new long[(int) oldBucketOrds.size()];
for (int i = 0; i < oldBucketOrds.size(); i++) {
long oldKey = oldBucketOrds.get(i);
long newBucketOrd = -1;

long docCount = bucketDocCount(i);
// if the key is below threshold, reinsert into the new ords
if (docCount <= maxDocCount) {
newBucketOrd = newBucketOrds.add(oldKey);
LongRareTerms.Bucket bucket = new LongRareTerms.Bucket(oldKey, docCount, null, format);
bucket.bucketOrd = newBucketOrd;
buckets.add(bucket);
} else {
// Make a note when one of the ords has been deleted
deletionCount += 1;
filter.add(oldKey);
public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

General comment about this method: we have a lot of "ords" being referenced and it's hard to keep track of which ord is which. E.g. we have the bucket ordinals that our parent is requesting we build, and then we have the bucket ordinals from each of those instances that we are collecting into buckets

Not sure how, but if we could find a way to rename the variables to help identify or disambiguate I think it would help a bunch.

/*
* Collect the list of buckets, populate the filter with terms
* that are too frequent, and figure out how to merge sub-buckets.
*/
LongRareTerms.Bucket[][] rarestPerOrd = new LongRareTerms.Bucket[owningBucketOrds.length][];
SetBackedScalingCuckooFilter[] filters = new SetBackedScalingCuckooFilter[owningBucketOrds.length];
long keepCount = 0;
long[] mergeMap = new long[(int) bucketOrds.size()];
Arrays.fill(mergeMap, -1);
long size = 0;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, this is a bit confusingly named I think? Maybe currentOffset or something? Not sure, but size feels a bit confusing.

for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) {
try (LongHash ordsToCollect = new LongHash(1, context.bigArrays())) {
filters[ordIdx] = newFilter();
List<LongRareTerms.Bucket> buckets = new ArrayList<>();
LongKeyedBucketOrds.BucketOrdsEnum ordsEnum = bucketOrds.ordsEnum(owningBucketOrds[ordIdx]);
while (ordsEnum.next()) {
long docCount = bucketDocCount(ordsEnum.ord());
// if the key is below threshold, reinsert into the new ords
if (docCount <= maxDocCount) {
LongRareTerms.Bucket bucket = new LongRareTerms.Bucket(ordsEnum.value(), docCount, null, format);
bucket.bucketOrd = mergeMap[(int) ordsEnum.ord()] = size + ordsToCollect.add(ordsEnum.value());
buckets.add(bucket);
keepCount++;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we just change this to a boolean flag? hasDeletions or whatever?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to perform the merge if we don't keep all the buckets. We can remove buckets for two reasons now!

  1. The key is above the threshold.
  2. The owningBucketOrd isn't selected.

This counter will catch both ways. I couldn't come up with a cleaner way to do it.

} else {
filters[ordIdx].add(ordsEnum.value());
}
}
mergeMap[i] = newBucketOrd;
rarestPerOrd[ordIdx] = buckets.toArray(LongRareTerms.Bucket[]::new);
size += ordsToCollect.size();
}
}

// Only merge/delete the ordinals if we have actually deleted one,
// to save on some redundant work
if (deletionCount > 0) {
mergeBuckets(mergeMap, newBucketOrds.size());
if (deferringCollector != null) {
deferringCollector.mergeBuckets(mergeMap);
}
/*
* Only merge/delete the ordinals if we have actually deleted one,
* to save on some redundant work.
*/
if (keepCount != mergeMap.length) {
mergeBuckets(mergeMap, size);
if (deferringCollector != null) {
deferringCollector.mergeBuckets(mergeMap);
}
}
bucketOrds = newBucketOrds;
return buckets;
}

@Override
public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException {
assert owningBucketOrds.length == 1 && owningBucketOrds[0] == 0;
List<LongRareTerms.Bucket> buckets = buildSketch();
buildSubAggsForBuckets(buckets, b -> b.bucketOrd, (b, aggs) -> b.aggregations = aggs);

CollectionUtil.introSort(buckets, ORDER.comparator());
return new InternalAggregation[] {new LongRareTerms(name, ORDER, metadata(), format, buckets, maxDocCount, filter)};
/*
* Now build the results!
*/
buildSubAggsForAllBuckets(rarestPerOrd, b -> b.bucketOrd, (b, aggs) -> b.aggregations = aggs);
InternalAggregation[] result = new InternalAggregation[owningBucketOrds.length];
for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) {
Arrays.sort(rarestPerOrd[ordIdx], ORDER.comparator());
result[ordIdx] = new LongRareTerms(
name,
ORDER,
metadata(),
format,
Arrays.asList(rarestPerOrd[ordIdx]),
maxDocCount,
filters[ordIdx]
);
}
return result;
}

@Override
public InternalAggregation buildEmptyAggregation() {
return new LongRareTerms(name, ORDER, metadata(), format, emptyList(), 0, filter);
return new LongRareTerms(name, ORDER, metadata(), format, emptyList(), 0, newFilter());
}

@Override
Expand Down
Loading