Skip to content

Commit

Permalink
Save memory when date_histogram is not on top (#56921) (#56960)
Browse files Browse the repository at this point in the history
When `date_histogram` is a sub-aggregator it used to allocate a bunch of
objects for every one of it's parent's buckets. This uses the data
structures that we built in #55873 rework the `date_histogram`
aggregator instead of all of the allocation.

Part of #56487
  • Loading branch information
nik9000 authored May 19, 2020
1 parent 0eb8187 commit 8b9c4eb
Show file tree
Hide file tree
Showing 8 changed files with 265 additions and 63 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -490,3 +490,55 @@ setup:
- length: { aggregations.histo.buckets: 1 }
- match: { aggregations.histo.buckets.0.key_as_string: "2015-12-31T17:00:00.000-07:00" }
- match: { aggregations.histo.buckets.0.doc_count: 1 }

---
"profiler":
- skip:
version: " - 7.8.99"
reason: debug info added in 7.9.0

- do:
indices.create:
index: test_2
body:
settings:
number_of_replicas: 0
number_of_shards: 1
mappings:
properties:
date:
type: date

- do:
bulk:
index: test_2
refresh: true
body:
- '{"index": {}}'
- '{"date": "2016-01-01"}'
- '{"index": {}}'
- '{"date": "2016-01-02"}'
- '{"index": {}}'
- '{"date": "2016-02-01"}'
- '{"index": {}}'
- '{"date": "2016-03-01"}'

- do:
search:
index: test_2
body:
size: 0
profile: true
aggs:
histo:
date_histogram:
field: date
calendar_interval: month
- match: { hits.total.value: 4 }
- length: { aggregations.histo.buckets: 3 }
- match: { aggregations.histo.buckets.0.key_as_string: "2016-01-01T00:00:00.000Z" }
- match: { aggregations.histo.buckets.0.doc_count: 2 }
- match: { profile.shards.0.aggregations.0.type: DateHistogramAggregator }
- match: { profile.shards.0.aggregations.0.description: histo }
- match: { profile.shards.0.aggregations.0.breakdown.collect_count: 4 }
- match: { profile.shards.0.aggregations.0.debug.total_buckets: 3 }
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,15 @@
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.IntArray;
import org.elasticsearch.common.util.LongHash;
import org.elasticsearch.search.aggregations.AggregationExecutionException;
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.AggregatorBase;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.aggregations.AggregatorFactory;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.aggregations.LeafBucketCollector;
import org.elasticsearch.search.aggregations.bucket.terms.LongKeyedBucketOrds;
import org.elasticsearch.search.aggregations.support.AggregationPath;
import org.elasticsearch.search.internal.SearchContext;
import org.elasticsearch.search.sort.SortOrder;
Expand Down Expand Up @@ -340,6 +342,51 @@ protected final <B> InternalAggregation[] buildAggregationsForVariableBuckets(lo

return new InternalAggregation[] { resultBuilder.apply(buckets) };
}

/**
* Build aggregation results for an aggregator with a varying number of
* {@code long} keyed buckets that is at the top level or wrapped in
* {@link AggregatorFactory#asMultiBucketAggregator}.
* @param owningBucketOrds owning bucket ordinals for which to build the results
* @param bucketOrds hash of values to the bucket ordinal
*/
protected final <B> InternalAggregation[] buildAggregationsForVariableBuckets(long[] owningBucketOrds, LongKeyedBucketOrds bucketOrds,
BucketBuilderForVariable<B> bucketBuilder, Function<List<B>, InternalAggregation> resultBuilder) throws IOException {
long totalOrdsToCollect = 0;
for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) {
totalOrdsToCollect += bucketOrds.bucketsInOrd(owningBucketOrds[ordIdx]);
}
if (totalOrdsToCollect > Integer.MAX_VALUE) {
throw new AggregationExecutionException("Can't collect more than [" + Integer.MAX_VALUE
+ "] buckets but attempted [" + totalOrdsToCollect + "]");
}
consumeBucketsAndMaybeBreak((int) totalOrdsToCollect);
long[] bucketOrdsToCollect = new long[(int) totalOrdsToCollect];
int b = 0;
for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) {
LongKeyedBucketOrds.BucketOrdsEnum ordsEnum = bucketOrds.ordsEnum(owningBucketOrds[ordIdx]);
while(ordsEnum.next()) {
bucketOrdsToCollect[b++] = ordsEnum.ord();
}
}
InternalAggregations[] subAggregationResults = buildSubAggsForBuckets(bucketOrdsToCollect);

InternalAggregation[] results = new InternalAggregation[owningBucketOrds.length];
b = 0;
for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) {
List<B> buckets = new ArrayList<>((int) bucketOrds.size());
LongKeyedBucketOrds.BucketOrdsEnum ordsEnum = bucketOrds.ordsEnum(owningBucketOrds[ordIdx]);
while(ordsEnum.next()) {
if (bucketOrdsToCollect[b] != ordsEnum.ord()) {
throw new AggregationExecutionException("Iteration order of [" + bucketOrds + "] changed without mutating. ["
+ ordsEnum.ord() + "] should have been [" + bucketOrdsToCollect[b] + "]");
}
buckets.add(bucketBuilder.build(ordsEnum.value(), bucketDocCount(ordsEnum.ord()), subAggregationResults[b++]));
}
results[ordIdx] = resultBuilder.apply(buckets);
}
return results;
}
@FunctionalInterface
protected interface BucketBuilderForVariable<B> {
B build(long bucketValue, int docCount, InternalAggregations subAggregationResults);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,5 +46,6 @@ Aggregator build(String name,
DocValueFormat formatter,
SearchContext aggregationContext,
Aggregator parent,
boolean collectsFromSingleBucket,
Map<String, Object> metadata) throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Rounding;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.util.LongHash;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.AggregatorFactories;
Expand All @@ -34,12 +33,14 @@
import org.elasticsearch.search.aggregations.LeafBucketCollector;
import org.elasticsearch.search.aggregations.LeafBucketCollectorBase;
import org.elasticsearch.search.aggregations.bucket.BucketsAggregator;
import org.elasticsearch.search.aggregations.bucket.terms.LongKeyedBucketOrds;
import org.elasticsearch.search.aggregations.support.ValuesSource;
import org.elasticsearch.search.internal.SearchContext;

import java.io.IOException;
import java.util.Collections;
import java.util.Map;
import java.util.function.BiConsumer;

/**
* An aggregator for date values. Every date is rounded down using a configured
Expand All @@ -62,13 +63,13 @@ class DateHistogramAggregator extends BucketsAggregator {
private final long minDocCount;
private final ExtendedBounds extendedBounds;

private final LongHash bucketOrds;
private final LongKeyedBucketOrds bucketOrds;

DateHistogramAggregator(String name, AggregatorFactories factories, Rounding rounding, Rounding.Prepared preparedRounding,
BucketOrder order, boolean keyed,
long minDocCount, @Nullable ExtendedBounds extendedBounds, @Nullable ValuesSource valuesSource,
DocValueFormat formatter, SearchContext aggregationContext,
Aggregator parent, Map<String, Object> metadata) throws IOException {
Aggregator parent, boolean collectsFromSingleBucket, Map<String, Object> metadata) throws IOException {

super(name, factories, aggregationContext, parent, metadata);
this.rounding = rounding;
Expand All @@ -81,7 +82,7 @@ class DateHistogramAggregator extends BucketsAggregator {
this.valuesSource = (ValuesSource.Numeric) valuesSource;
this.formatter = formatter;

bucketOrds = new LongHash(1, aggregationContext.bigArrays());
bucketOrds = LongKeyedBucketOrds.build(context.bigArrays(), collectsFromSingleBucket);
}

@Override
Expand All @@ -93,30 +94,26 @@ public ScoreMode scoreMode() {
}

@Override
public LeafBucketCollector getLeafCollector(LeafReaderContext ctx,
final LeafBucketCollector sub) throws IOException {
public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucketCollector sub) throws IOException {
if (valuesSource == null) {
return LeafBucketCollector.NO_OP_COLLECTOR;
}
final SortedNumericDocValues values = valuesSource.longValues(ctx);
SortedNumericDocValues values = valuesSource.longValues(ctx);
return new LeafBucketCollectorBase(sub, values) {
@Override
public void collect(int doc, long bucket) throws IOException {
assert bucket == 0;
public void collect(int doc, long owningBucketOrd) throws IOException {
if (values.advanceExact(doc)) {
final int valuesCount = values.docValueCount();
int valuesCount = values.docValueCount();

long previousRounded = Long.MIN_VALUE;
for (int i = 0; i < valuesCount; ++i) {
long value = values.nextValue();
// We can use shardRounding here, which is sometimes more efficient
// if daylight saving times are involved.
long rounded = preparedRounding.round(value);
assert rounded >= previousRounded;
if (rounded == previousRounded) {
continue;
}
long bucketOrd = bucketOrds.add(rounded);
long bucketOrd = bucketOrds.add(owningBucketOrd, rounded);
if (bucketOrd < 0) { // already seen
bucketOrd = -1 - bucketOrd;
collectExistingBucket(sub, doc, bucketOrd);
Expand Down Expand Up @@ -162,4 +159,9 @@ public InternalAggregation buildEmptyAggregation() {
public void doClose() {
Releasables.close(bucketOrds);
}

@Override
public void collectDebugInfo(BiConsumer<String, Object> add) {
add.accept("total_buckets", bucketOrds.size());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -81,26 +81,36 @@ protected Aggregator doCreateInternal(ValuesSource valuesSource,
Aggregator parent,
boolean collectsFromSingleBucket,
Map<String, Object> metadata) throws IOException {
if (collectsFromSingleBucket == false) {
return asMultiBucketAggregator(this, searchContext, parent);
}
AggregatorSupplier aggregatorSupplier = queryShardContext.getValuesSourceRegistry().getAggregator(config.valueSourceType(),
DateHistogramAggregationBuilder.NAME);
if (aggregatorSupplier instanceof DateHistogramAggregationSupplier == false) {
throw new AggregationExecutionException("Registry miss-match - expected DateHistogramAggregationSupplier, found [" +
aggregatorSupplier.getClass().toString() + "]");
}
Rounding.Prepared preparedRounding = valuesSource.roundingPreparer(queryShardContext.getIndexReader()).apply(shardRounding);
return ((DateHistogramAggregationSupplier) aggregatorSupplier).build(name, factories, rounding, preparedRounding, order, keyed,
minDocCount, extendedBounds, valuesSource, config.format(), searchContext,
parent, metadata);
return ((DateHistogramAggregationSupplier) aggregatorSupplier).build(
name,
factories,
rounding,
preparedRounding,
order,
keyed,
minDocCount,
extendedBounds,
valuesSource,
config.format(),
searchContext,
parent,
collectsFromSingleBucket,
metadata
);
}

@Override
protected Aggregator createUnmapped(SearchContext searchContext,
Aggregator parent,
Map<String, Object> metadata) throws IOException {
return new DateHistogramAggregator(name, factories, rounding, null, order, keyed, minDocCount, extendedBounds,
null, config.format(), searchContext, parent, metadata);
null, config.format(), searchContext, parent, false, metadata);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Rounding;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.util.LongHash;
import org.elasticsearch.index.fielddata.SortedBinaryDocValues;
import org.elasticsearch.index.mapper.RangeFieldMapper;
import org.elasticsearch.index.mapper.RangeType;
Expand All @@ -37,13 +36,15 @@
import org.elasticsearch.search.aggregations.LeafBucketCollector;
import org.elasticsearch.search.aggregations.LeafBucketCollectorBase;
import org.elasticsearch.search.aggregations.bucket.BucketsAggregator;
import org.elasticsearch.search.aggregations.bucket.terms.LongKeyedBucketOrds;
import org.elasticsearch.search.aggregations.support.ValuesSource;
import org.elasticsearch.search.internal.SearchContext;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.function.BiConsumer;

/**
* An aggregator for date values. Every date is rounded down using a configured
Expand All @@ -66,13 +67,13 @@ class DateRangeHistogramAggregator extends BucketsAggregator {
private final long minDocCount;
private final ExtendedBounds extendedBounds;

private final LongHash bucketOrds;
private final LongKeyedBucketOrds bucketOrds;

DateRangeHistogramAggregator(String name, AggregatorFactories factories, Rounding rounding, Rounding.Prepared preparedRounding,
BucketOrder order, boolean keyed,
long minDocCount, @Nullable ExtendedBounds extendedBounds, @Nullable ValuesSource valuesSource,
DocValueFormat formatter, SearchContext aggregationContext,
Aggregator parent, Map<String, Object> metadata) throws IOException {
Aggregator parent, boolean collectsFromSingleBucket, Map<String, Object> metadata) throws IOException {

super(name, factories, aggregationContext, parent, metadata);
this.rounding = rounding;
Expand All @@ -89,7 +90,7 @@ class DateRangeHistogramAggregator extends BucketsAggregator {
+ "]");
}

bucketOrds = new LongHash(1, aggregationContext.bigArrays());
bucketOrds = LongKeyedBucketOrds.build(context.bigArrays(), collectsFromSingleBucket);
}

@Override
Expand All @@ -101,21 +102,19 @@ public ScoreMode scoreMode() {
}

@Override
public LeafBucketCollector getLeafCollector(LeafReaderContext ctx,
final LeafBucketCollector sub) throws IOException {
public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucketCollector sub) throws IOException {
if (valuesSource == null) {
return LeafBucketCollector.NO_OP_COLLECTOR;
}
final SortedBinaryDocValues values = valuesSource.bytesValues(ctx);
final RangeType rangeType = valuesSource.rangeType();
SortedBinaryDocValues values = valuesSource.bytesValues(ctx);
RangeType rangeType = valuesSource.rangeType();
return new LeafBucketCollectorBase(sub, values) {
@Override
public void collect(int doc, long bucket) throws IOException {
assert bucket == 0;
public void collect(int doc, long owningBucketOrd) throws IOException {
if (values.advanceExact(doc)) {
// Is it possible for valuesCount to be > 1 here? Multiple ranges are encoded into the same BytesRef in the binary doc
// values, so it isn't clear what we'd be iterating over.
final int valuesCount = values.docValueCount();
int valuesCount = values.docValueCount();
assert valuesCount == 1 : "Value count for ranges should always be 1";
long previousKey = Long.MIN_VALUE;

Expand All @@ -124,7 +123,7 @@ public void collect(int doc, long bucket) throws IOException {
List<RangeFieldMapper.Range> ranges = rangeType.decodeRanges(encodedRanges);
long previousFrom = Long.MIN_VALUE;
for (RangeFieldMapper.Range range : ranges) {
final Long from = (Long) range.getFrom();
Long from = (Long) range.getFrom();
// The encoding should ensure that this assert is always true.
assert from >= previousFrom : "Start of range not >= previous start";
final Long to = (Long) range.getTo();
Expand All @@ -136,7 +135,7 @@ public void collect(int doc, long bucket) throws IOException {
continue;
}
// Bucket collection identical to NumericHistogramAggregator, could be refactored
long bucketOrd = bucketOrds.add(key);
long bucketOrd = bucketOrds.add(owningBucketOrd, key);
if (bucketOrd < 0) { // already seen
bucketOrd = -1 - bucketOrd;
collectExistingBucket(sub, doc, bucketOrd);
Expand Down Expand Up @@ -187,4 +186,9 @@ public InternalAggregation buildEmptyAggregation() {
public void doClose() {
Releasables.close(bucketOrds);
}

@Override
public void collectDebugInfo(BiConsumer<String, Object> add) {
add.accept("total_buckets", bucketOrds.size());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ private LongKeyedBucketOrds() {}
public abstract long size();

/**
* Build an iterator for buckets inside {@code owningBucketOrd}.
* Build an iterator for buckets inside {@code owningBucketOrd} in order
* of increasing ord.
* <p>
* When this is first returns it is "unpositioned" and you must call
* {@link BucketOrdsEnum#next()} to move it to the first value.
Expand Down
Loading

0 comments on commit 8b9c4eb

Please sign in to comment.