Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Save memory when numeric terms agg is not top #55873

Merged
merged 21 commits into from
May 8, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,9 @@ public ChildrenToParentAggregator(String name, AggregatorFactories factories,
}

@Override
public InternalAggregation buildAggregation(long owningBucketOrdinal) throws IOException {
return new InternalParent(name, bucketDocCount(owningBucketOrdinal),
bucketAggregations(owningBucketOrdinal), metadata());
public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException {
return buildAggregationsForSingleBucket(owningBucketOrds, (owningBucketOrd, subAggregationResults) ->
new InternalParent(name, bucketDocCount(owningBucketOrd), subAggregationResults, metadata()));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,9 @@ public ParentToChildrenAggregator(String name, AggregatorFactories factories,
}

@Override
public InternalAggregation buildAggregation(long owningBucketOrdinal) throws IOException {
return new InternalChildren(name, bucketDocCount(owningBucketOrdinal),
bucketAggregations(owningBucketOrdinal), metadata());
public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException {
return buildAggregationsForSingleBucket(owningBucketOrds, (owningBucketOrd, subAggregationResults) ->
new InternalChildren(name, bucketDocCount(owningBucketOrd), subAggregationResults, metadata()));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ public void execute(SearchContext context) {
for (Aggregator aggregator : context.aggregations().aggregators()) {
try {
aggregator.postCollection();
aggregations.add(aggregator.buildAggregation(0));
aggregations.add(aggregator.buildTopLevel());
} catch (IOException e) {
throw new AggregationExecutionException("Failed to build aggregation [" + aggregator.name() + "]", e);
}
Expand All @@ -136,5 +136,4 @@ public void execute(SearchContext context) {
context.aggregations(null);
context.queryCollectors().remove(AggregationPhase.class);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -152,9 +152,24 @@ public interface BucketComparator {
}

/**
* Build an aggregation for data that has been collected into {@code bucket}.
* Build the results of this aggregation.
* @param owningBucketOrds the ordinals of the buckets that we want to
* collect from this aggregation
* @return the results for each ordinal, in the same order as the array
* of ordinals
*/
public abstract InternalAggregation buildAggregation(long bucket) throws IOException;
public abstract InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException;

/**
* Build the result of this aggregation if it is at the "top level"
* of the aggregation tree. If, instead, it is a sub-aggregation of
* another aggregation then the aggregation that contains it will call
* {@link #buildAggregations(long[])}.
*/
public final InternalAggregation buildTopLevel() throws IOException {
assert parent() == null;
return buildAggregations(new long[] {0})[0];
}

/**
* Build an empty aggregation.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,14 +146,27 @@ public void collect(int doc, long bucket) throws IOException {
}

@Override
public InternalAggregation buildAggregation(long bucket) throws IOException {
if (bucket < aggregators.size()) {
Aggregator aggregator = aggregators.get(bucket);
if (aggregator != null) {
return aggregator.buildAggregation(0);
public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException {
InternalAggregation[] results = new InternalAggregation[owningBucketOrds.length];
for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) {
if (owningBucketOrds[ordIdx] < aggregators.size()) {
Aggregator aggregator = aggregators.get(owningBucketOrds[ordIdx]);
if (aggregator != null) {
/*
* This is the same call as buildTopLevel but since
* this aggregator may not be the top level we don't
* call that method here. It'd be weird sounding. And
* it'd trip assertions. Both bad.
*/
results[ordIdx] = aggregator.buildAggregations(new long [] {0})[0];
} else {
results[ordIdx] = buildEmptyAggregation();
}
} else {
results[ordIdx] = buildEmptyAggregation();
}
}
return buildEmptyAggregation();
return results;
}

@Override
Expand Down Expand Up @@ -232,7 +245,9 @@ public AggregatorFactory getParent() {
* Utility method. Given an {@link AggregatorFactory} that creates
* {@link Aggregator}s that only know how to collect bucket {@code 0}, this
* returns an aggregator that can collect any bucket.
* @deprecated implement the aggregator to handle many owning buckets
*/
@Deprecated
protected static Aggregator asMultiBucketAggregator(final AggregatorFactory factory, final SearchContext searchContext,
final Aggregator parent) throws IOException {
final Aggregator first = factory.create(searchContext, parent, true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -332,5 +332,4 @@ public double sortValue(AggregationPath.PathElement head, Iterator<AggregationPa
// subclasses will override this with a real implementation if you can sort on a descendant
throw new IllegalArgumentException("Can't sort by a descendant of a [" + getType() + "] aggregation [" + head + "]");
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.apache.lucene.search.LeafCollector;
import org.apache.lucene.search.Scorable;
import org.elasticsearch.search.aggregations.bucket.terms.LongKeyedBucketOrds;

import java.io.IOException;
import java.util.stream.Stream;
Expand Down Expand Up @@ -73,9 +74,33 @@ public void collect(int doc, long bucket) throws IOException {
}

/**
* Collect the given doc in the given bucket.
* Collect the given {@code doc} in the bucket owned by
* {@code owningBucketOrd}.
* <p>
* The implementation of this method metric aggregations is generally
* something along the lines of
* <pre>{@code
* array[owningBucketOrd] += loadValueFromDoc(doc)
* }</pre>
* <p>Bucket aggregations have more trouble because their job is to
* <strong>make</strong> new ordinals. So their implementation generally
* looks kind of like
* <pre>{@code
* long myBucketOrd = mapOwningBucketAndValueToMyOrd(owningBucketOrd, loadValueFromDoc(doc));
* collectBucket(doc, myBucketOrd);
* }</pre>
* <p>
* Some bucket aggregations "know" how many ordinals each owning ordinal
* needs so they can map "densely". The {@code range} aggregation, for
* example, can perform this mapping with something like:
* <pre>{@code
* return rangeCount * owningBucketOrd + matchingRange(value);
* }</pre>
* Other aggregations don't know how many buckets will fall into any
* particular owning bucket. The {@code terms} aggregation, for example,
* uses {@link LongKeyedBucketOrds} which amounts to a hash lookup.
*/
public abstract void collect(int doc, long bucket) throws IOException;
public abstract void collect(int doc, long owningBucketOrd) throws IOException;

@Override
public final void collect(int doc) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
/**
* An aggregation service that creates instances of {@link MultiBucketConsumer}.
* The consumer is used by {@link BucketsAggregator} and {@link InternalMultiBucketAggregation} to limit the number of buckets created
* in {@link Aggregator#buildAggregation} and {@link InternalAggregation#reduce}.
* in {@link Aggregator#buildAggregations} and {@link InternalAggregation#reduce}.
* The limit can be set by changing the `search.max_buckets` cluster setting and defaults to 10000.
*/
public class MultiBucketConsumerService {
Expand Down Expand Up @@ -94,7 +94,7 @@ protected void metadataToXContent(XContentBuilder builder, Params params) throws
* An {@link IntConsumer} that throws a {@link TooManyBucketsException}
* when the sum of the provided values is above the limit (`search.max_buckets`).
* It is used by aggregators to limit the number of bucket creation during
* {@link Aggregator#buildAggregation} and {@link InternalAggregation#reduce}.
* {@link Aggregator#buildAggregations} and {@link InternalAggregation#reduce}.
*/
public static class MultiBucketConsumer implements IntConsumer {
private final int limit;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,11 @@ public final LeafBucketCollector getLeafCollector(LeafReaderContext reader, Leaf
}

@Override
public final InternalAggregation buildAggregation(long owningBucketOrdinal) {
return buildEmptyAggregation();
public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException {
InternalAggregation[] results = new InternalAggregation[owningBucketOrds.length];
for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) {
results[ordIdx] = buildEmptyAggregation();
}
return results;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -154,11 +154,10 @@ public void prepareSelectedBuckets(long... selectedBuckets) throws IOException {
throw new IllegalStateException("Already been replayed");
}

final LongHash hash = new LongHash(selectedBuckets.length, BigArrays.NON_RECYCLING_INSTANCE);
for (long bucket : selectedBuckets) {
hash.add(bucket);
this.selectedBuckets = new LongHash(selectedBuckets.length, BigArrays.NON_RECYCLING_INSTANCE);
for (long ord : selectedBuckets) {
this.selectedBuckets.add(ord);
}
this.selectedBuckets = hash;

boolean needsScores = scoreMode().needsScores();
Weight weight = null;
Expand All @@ -185,7 +184,7 @@ public void prepareSelectedBuckets(long... selectedBuckets) throws IOException {
for (long i = 0, end = entry.docDeltas.size(); i < end; ++i) {
doc += docDeltaIterator.next();
final long bucket = buckets.next();
final long rebasedBucket = hash.find(bucket);
final long rebasedBucket = this.selectedBuckets.find(bucket);
if (rebasedBucket != -1) {
if (needsScores) {
if (scoreIt.docID() < doc) {
Expand Down Expand Up @@ -213,19 +212,20 @@ public void prepareSelectedBuckets(long... selectedBuckets) throws IOException {
public Aggregator wrap(final Aggregator in) {

return new WrappedAggregator(in) {

@Override
public InternalAggregation buildAggregation(long bucket) throws IOException {
public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException {
if (selectedBuckets == null) {
throw new IllegalStateException("Collection has not been replayed yet.");
}
final long rebasedBucket = selectedBuckets.find(bucket);
if (rebasedBucket == -1) {
throw new IllegalStateException("Cannot build for a bucket which has not been collected");
long[] rebasedOrds = new long[owningBucketOrds.length];
for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) {
rebasedOrds[ordIdx] = selectedBuckets.find(owningBucketOrds[ordIdx]);
if (rebasedOrds[ordIdx] == -1) {
throw new IllegalStateException("Cannot build for a bucket which has not been collected");
}
}
return in.buildAggregation(rebasedBucket);
return in.buildAggregations(rebasedOrds);
}

};
}

Expand Down
Loading