Skip to content

Commit

Permalink
Prevent date_histogram from OOMing (backport of elastic#72081)
Browse files Browse the repository at this point in the history
This prevents the `date_histogram` from running out of memory allocating
empty buckets when you set the interval to something tiny like `seconds`
and aggregate over a very wide date range. Without this change we'd
allocate memory very quickly and throw and out of memory error, taking
down the node. With it we instead throw the standard "too many buckets"
error.

Relates to elastic#71758
  • Loading branch information
nik9000 committed Apr 27, 2021
1 parent 6480aa7 commit b841f10
Show file tree
Hide file tree
Showing 6 changed files with 159 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -732,3 +732,35 @@ setup:
histogram:
field: number
interval: 5e-10

---
"Tiny tiny tiny date_range":
- skip:
version: " - 7.13.99"
reason: fixed in 87.14.0

- do:
bulk:
index: test_1
refresh: true
body:
- '{"index": {}}'
- '{"date": "2018-01-01T00:00:00Z"}'
- '{"index": {}}'
- '{"date": "2019-01-01T00:00:00Z"}'
- '{"index": {}}'
- '{"date": "2020-01-01T00:00:00Z"}'
- '{"index": {}}'
- '{"date": "2021-01-01T00:00:00Z"}'

- do:
catch: '/Trying to create too many buckets. Must be less than or equal to: \[65536\]/'
search:
index: test_1
body:
size: 0
aggs:
histo:
date_histogram:
field: date
interval: second
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.util.ListIterator;
import java.util.Map;
import java.util.Objects;
import java.util.function.LongConsumer;

/**
* Implementation of {@link Histogram}.
Expand Down Expand Up @@ -288,7 +289,7 @@ protected boolean lessThan(IteratorAndCurrent<Bucket> a, IteratorAndCurrent<Buck
for (InternalAggregation aggregation : aggregations) {
InternalDateHistogram histogram = (InternalDateHistogram) aggregation;
if (histogram.buckets.isEmpty() == false) {
pq.add(new IteratorAndCurrent(histogram.buckets.iterator()));
pq.add(new IteratorAndCurrent<Bucket>(histogram.buckets.iterator()));
}
}

Expand Down Expand Up @@ -350,22 +351,61 @@ protected Bucket reduceBucket(List<Bucket> buckets, ReduceContext context) {
return createBucket(buckets.get(0).key, docCount, aggs);
}

/**
* When we pre-count the empty buckets we report them periodically
* because you can configure the date_histogram to create an astounding
* number of buckets. It'd take a while to count that high only to abort.
* So we report every couple thousand buckets. It's be simpler to report
* every single bucket we plan to allocate one at a time but that'd cause
* needless overhead on the circuit breakers. Counting a couple thousand
* buckets is plenty fast to fail this quickly in pathological cases and
* plenty large to keep the overhead minimal.
*/
private static final int REPORT_EMPTY_EVERY = 10_000;

private void addEmptyBuckets(List<Bucket> list, ReduceContext reduceContext) {
Bucket lastBucket = null;
LongBounds bounds = emptyBucketInfo.bounds;
/*
* Make sure we have space for the empty buckets we're going to add by
* counting all of the empties we plan to add and firing them into
* consumeBucketsAndMaybeBreak.
*/
class Counter implements LongConsumer {
private int size = list.size();

@Override
public void accept(long key) {
size++;
if (size >= REPORT_EMPTY_EVERY) {
reduceContext.consumeBucketsAndMaybeBreak(size);
size = 0;
}
}
}
Counter counter = new Counter();
iterateEmptyBuckets(list, list.listIterator(), counter);
reduceContext.consumeBucketsAndMaybeBreak(counter.size);

InternalAggregations reducedEmptySubAggs = InternalAggregations.reduce(
org.elasticsearch.common.collect.List.of(emptyBucketInfo.subAggregations),
reduceContext
);
ListIterator<Bucket> iter = list.listIterator();
iterateEmptyBuckets(list, iter, key -> iter.add(new InternalDateHistogram.Bucket(key, 0, keyed, format, reducedEmptySubAggs)));
}

private void iterateEmptyBuckets(List<Bucket> list, ListIterator<Bucket> iter, LongConsumer onBucket) {
LongBounds bounds = emptyBucketInfo.bounds;

// first adding all the empty buckets *before* the actual data (based on the extended_bounds.min the user requested)
InternalAggregations reducedEmptySubAggs = InternalAggregations.reduce(Collections.singletonList(emptyBucketInfo.subAggregations),
reduceContext);

if (bounds != null) {
Bucket firstBucket = iter.hasNext() ? list.get(iter.nextIndex()) : null;
if (firstBucket == null) {
if (bounds.getMin() != null && bounds.getMax() != null) {
long key = bounds.getMin() + offset;
long max = bounds.getMax() + offset;
while (key <= max) {
iter.add(new InternalDateHistogram.Bucket(key, 0, keyed, format, reducedEmptySubAggs));
onBucket.accept(key);
key = nextKey(key).longValue();
}
}
Expand All @@ -374,22 +414,23 @@ private void addEmptyBuckets(List<Bucket> list, ReduceContext reduceContext) {
long key = bounds.getMin() + offset;
if (key < firstBucket.key) {
while (key < firstBucket.key) {
iter.add(new InternalDateHistogram.Bucket(key, 0, keyed, format, reducedEmptySubAggs));
onBucket.accept(key);
key = nextKey(key).longValue();
}
}
}
}
}

Bucket lastBucket = null;
// now adding the empty buckets within the actual data,
// e.g. if the data series is [1,2,3,7] there're 3 empty buckets that will be created for 4,5,6
while (iter.hasNext()) {
Bucket nextBucket = list.get(iter.nextIndex());
if (lastBucket != null) {
long key = nextKey(lastBucket.key).longValue();
while (key < nextBucket.key) {
iter.add(new InternalDateHistogram.Bucket(key, 0, keyed, format, reducedEmptySubAggs));
onBucket.accept(key);
key = nextKey(key).longValue();
}
assert key == nextBucket.key : "key: " + key + ", nextBucket.key: " + nextBucket.key;
Expand All @@ -402,7 +443,7 @@ private void addEmptyBuckets(List<Bucket> list, ReduceContext reduceContext) {
long key = nextKey(lastBucket.key).longValue();
long max = bounds.getMax() + offset;
while (key <= max) {
iter.add(new InternalDateHistogram.Bucket(key, 0, keyed, format, reducedEmptySubAggs));
onBucket.accept(key);
key = nextKey(key).longValue();
}
}
Expand All @@ -411,9 +452,11 @@ private void addEmptyBuckets(List<Bucket> list, ReduceContext reduceContext) {
@Override
public InternalAggregation reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
List<Bucket> reducedBuckets = reduceBuckets(aggregations, reduceContext);
boolean alreadyAccountedForBuckets = false;
if (reduceContext.isFinalReduce()) {
if (minDocCount == 0) {
addEmptyBuckets(reducedBuckets, reduceContext);
alreadyAccountedForBuckets = true;
}
if (InternalOrder.isKeyDesc(order)) {
// we just need to reverse here...
Expand All @@ -427,7 +470,9 @@ public InternalAggregation reduce(List<InternalAggregation> aggregations, Reduce
CollectionUtil.introSort(reducedBuckets, order.comparator());
}
}
reduceContext.consumeBucketsAndMaybeBreak(reducedBuckets.size());
if (false == alreadyAccountedForBuckets) {
reduceContext.consumeBucketsAndMaybeBreak(reducedBuckets.size());
}
return new InternalDateHistogram(getName(), reducedBuckets, order, minDocCount, offset, emptyBucketInfo,
format, keyed, getMetadata());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -347,30 +347,31 @@ private double round(double key) {
return Math.floor((key - emptyBucketInfo.offset) / emptyBucketInfo.interval) * emptyBucketInfo.interval + emptyBucketInfo.offset;
}

/**
* When we pre-count the empty buckets we report them periodically
* because you can configure the histogram to create more buckets than
* there are atoms in the universe. It'd take a while to count that high
* only to abort. So we report every couple thousand buckets. It's be
* simpler to report every single bucket we plan to allocate one at a time
* but that'd cause needless overhead on the circuit breakers. Counting a
* couple thousand buckets is plenty fast to fail this quickly in
* pathological cases and plenty large to keep the overhead minimal.
*/
private static final int REPORT_EMPTY_EVERY = 10_000;

private void addEmptyBuckets(List<Bucket> list, ReduceContext reduceContext) {
/*
* Make sure we have space for the empty buckets we're going to add by
* counting all of the empties we plan to add and firing them into
* consumeBucketsAndMaybeBreak.
*
* We don't count all of the buckets we plan to allocate and then report
* them all at once because we you can configure the histogram to create
* more buckets than there are hydrogen atoms in the universe. It'd take
* a while to count that high only to abort. So we report every couple
* thousand buckets. It's be simpler to report every single bucket we plan
* to allocate one at a time but that'd cause needless overhead on the
* circuit breakers. Counting a couple thousand buckets is plenty fast
* to fail this quickly in pathological cases and plenty large to keep
* the overhead minimal.
*/
int reportEmptyEvery = 10000;
class Counter implements DoubleConsumer {
private int size = list.size();

@Override
public void accept(double key) {
size++;
if (size >= reportEmptyEvery) {
if (size >= REPORT_EMPTY_EVERY) {
reduceContext.consumeBucketsAndMaybeBreak(size);
size = 0;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@
package org.elasticsearch.search.aggregations.bucket.histogram;

import org.elasticsearch.common.Rounding;
import org.elasticsearch.common.Rounding.DateTimeUnit;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.mapper.DateFieldMapper;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.BucketOrder;
import org.elasticsearch.search.aggregations.InternalAggregations;
Expand Down Expand Up @@ -180,4 +182,28 @@ protected InternalDateHistogram mutateInstance(InternalDateHistogram instance) {
}
return new InternalDateHistogram(name, buckets, order, minDocCount, offset, emptyBucketInfo, format, keyed, metadata);
}

public void testLargeReduce() {
expectReduceUsesTooManyBuckets(
new InternalDateHistogram(
"h",
org.elasticsearch.common.collect.List.of(),
BucketOrder.key(true),
0,
0,
new InternalDateHistogram.EmptyBucketInfo(
Rounding.builder(DateTimeUnit.SECOND_OF_MINUTE).build(),
InternalAggregations.EMPTY,
new LongBounds(
DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.parseMillis("2018-01-01T00:00:00Z"),
DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.parseMillis("2021-01-01T00:00:00Z")
)
),
DocValueFormat.RAW,
false,
null
),
100000
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,9 @@
package org.elasticsearch.search.aggregations.bucket.histogram;

import org.apache.lucene.util.TestUtil;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.BucketOrder;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator.PipelineTree;
import org.elasticsearch.test.InternalAggregationTestCase;
import org.elasticsearch.test.InternalMultiBucketAggregationTestCase;

Expand All @@ -24,9 +21,6 @@
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.function.IntConsumer;

import static org.hamcrest.Matchers.equalTo;

public class InternalHistogramTests extends InternalMultiBucketAggregationTestCase<InternalHistogram> {

Expand Down Expand Up @@ -103,7 +97,7 @@ public void testHandlesNaN() {
}

public void testLargeReduce() {
InternalHistogram histo = new InternalHistogram(
expectReduceUsesTooManyBuckets(new InternalHistogram(
"h",
org.elasticsearch.common.collect.List.of(),
BucketOrder.key(true),
Expand All @@ -112,28 +106,7 @@ public void testLargeReduce() {
DocValueFormat.RAW,
false,
null
);
InternalAggregation.ReduceContext reduceContext = InternalAggregation.ReduceContext.forFinalReduction(
BigArrays.NON_RECYCLING_INSTANCE,
null,
new IntConsumer() {
int buckets;

@Override
public void accept(int value) {
buckets += value;
if (buckets > 100000) {
throw new IllegalArgumentException("too big!");
}
}
},
PipelineTree.EMPTY
);
Exception e = expectThrows(
IllegalArgumentException.class,
() -> histo.reduce(org.elasticsearch.common.collect.List.of(histo), reduceContext)
);
assertThat(e.getMessage(), equalTo("too big!"));
), 100000);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package org.elasticsearch.test;

import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.search.aggregations.Aggregation;
import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.InternalAggregation;
Expand All @@ -17,15 +18,18 @@
import org.elasticsearch.search.aggregations.ParsedAggregation;
import org.elasticsearch.search.aggregations.ParsedMultiBucketAggregation;
import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator.PipelineTree;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.function.IntConsumer;
import java.util.function.Supplier;

import static java.util.Collections.emptyMap;
import static org.hamcrest.Matchers.equalTo;

public abstract class InternalMultiBucketAggregationTestCase<T extends InternalAggregation & MultiBucketsAggregation>
extends InternalAggregationTestCase<T> {
Expand Down Expand Up @@ -185,4 +189,31 @@ public void doAssertReducedMultiBucketConsumer(Aggregation agg, MultiBucketConsu
* No-op.
*/
}

/**
* Reduce an aggreation, expecting it to collect more than a certain number of buckets.
*/
protected static void expectReduceUsesTooManyBuckets(InternalAggregation agg, int bucketLimit) {
InternalAggregation.ReduceContext reduceContext = InternalAggregation.ReduceContext.forFinalReduction(
BigArrays.NON_RECYCLING_INSTANCE,
null,
new IntConsumer() {
int buckets;

@Override
public void accept(int value) {
buckets += value;
if (buckets > bucketLimit) {
throw new IllegalArgumentException("too big!");
}
}
},
PipelineTree.EMPTY
);
Exception e = expectThrows(
IllegalArgumentException.class,
() -> agg.reduce(org.elasticsearch.common.collect.List.of(agg), reduceContext)
);
assertThat(e.getMessage(), equalTo("too big!"));
}
}

0 comments on commit b841f10

Please sign in to comment.