From b841f10fbaecf048e981aa24ca8bedd672dc9e0e Mon Sep 17 00:00:00 2001 From: Nik Everett Date: Tue, 27 Apr 2021 14:41:52 -0400 Subject: [PATCH] Prevent `date_histogram` from OOMing (backport of #72081) This prevents the `date_histogram` from running out of memory allocating empty buckets when you set the interval to something tiny like `seconds` and aggregate over a very wide date range. Without this change we'd allocate memory very quickly and throw and out of memory error, taking down the node. With it we instead throw the standard "too many buckets" error. Relates to #71758 --- .../test/search.aggregation/10_histogram.yml | 32 +++++++++ .../histogram/InternalDateHistogram.java | 65 ++++++++++++++++--- .../bucket/histogram/InternalHistogram.java | 25 +++---- .../histogram/InternalDateHistogramTests.java | 26 ++++++++ .../histogram/InternalHistogramTests.java | 31 +-------- ...nternalMultiBucketAggregationTestCase.java | 31 +++++++++ 6 files changed, 159 insertions(+), 51 deletions(-) diff --git a/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/search.aggregation/10_histogram.yml b/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/search.aggregation/10_histogram.yml index ee153f1612751..b1b64d86dac6e 100644 --- a/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/search.aggregation/10_histogram.yml +++ b/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/search.aggregation/10_histogram.yml @@ -732,3 +732,35 @@ setup: histogram: field: number interval: 5e-10 + +--- +"Tiny tiny tiny date_range": + - skip: + version: " - 7.13.99" + reason: fixed in 87.14.0 + + - do: + bulk: + index: test_1 + refresh: true + body: + - '{"index": {}}' + - '{"date": "2018-01-01T00:00:00Z"}' + - '{"index": {}}' + - '{"date": "2019-01-01T00:00:00Z"}' + - '{"index": {}}' + - '{"date": "2020-01-01T00:00:00Z"}' + - '{"index": {}}' + - '{"date": "2021-01-01T00:00:00Z"}' + + - do: + catch: '/Trying to create too many buckets. Must be less than or equal to: \[65536\]/' + search: + index: test_1 + body: + size: 0 + aggs: + histo: + date_histogram: + field: date + interval: second diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalDateHistogram.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalDateHistogram.java index 212dda9f3dffc..59e34b1d8288c 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalDateHistogram.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalDateHistogram.java @@ -33,6 +33,7 @@ import java.util.ListIterator; import java.util.Map; import java.util.Objects; +import java.util.function.LongConsumer; /** * Implementation of {@link Histogram}. @@ -288,7 +289,7 @@ protected boolean lessThan(IteratorAndCurrent a, IteratorAndCurrent(histogram.buckets.iterator())); } } @@ -350,14 +351,53 @@ protected Bucket reduceBucket(List buckets, ReduceContext context) { return createBucket(buckets.get(0).key, docCount, aggs); } + /** + * When we pre-count the empty buckets we report them periodically + * because you can configure the date_histogram to create an astounding + * number of buckets. It'd take a while to count that high only to abort. + * So we report every couple thousand buckets. It's be simpler to report + * every single bucket we plan to allocate one at a time but that'd cause + * needless overhead on the circuit breakers. Counting a couple thousand + * buckets is plenty fast to fail this quickly in pathological cases and + * plenty large to keep the overhead minimal. + */ + private static final int REPORT_EMPTY_EVERY = 10_000; + private void addEmptyBuckets(List list, ReduceContext reduceContext) { - Bucket lastBucket = null; - LongBounds bounds = emptyBucketInfo.bounds; + /* + * Make sure we have space for the empty buckets we're going to add by + * counting all of the empties we plan to add and firing them into + * consumeBucketsAndMaybeBreak. + */ + class Counter implements LongConsumer { + private int size = list.size(); + + @Override + public void accept(long key) { + size++; + if (size >= REPORT_EMPTY_EVERY) { + reduceContext.consumeBucketsAndMaybeBreak(size); + size = 0; + } + } + } + Counter counter = new Counter(); + iterateEmptyBuckets(list, list.listIterator(), counter); + reduceContext.consumeBucketsAndMaybeBreak(counter.size); + + InternalAggregations reducedEmptySubAggs = InternalAggregations.reduce( + org.elasticsearch.common.collect.List.of(emptyBucketInfo.subAggregations), + reduceContext + ); ListIterator iter = list.listIterator(); + iterateEmptyBuckets(list, iter, key -> iter.add(new InternalDateHistogram.Bucket(key, 0, keyed, format, reducedEmptySubAggs))); + } + + private void iterateEmptyBuckets(List list, ListIterator iter, LongConsumer onBucket) { + LongBounds bounds = emptyBucketInfo.bounds; // first adding all the empty buckets *before* the actual data (based on the extended_bounds.min the user requested) - InternalAggregations reducedEmptySubAggs = InternalAggregations.reduce(Collections.singletonList(emptyBucketInfo.subAggregations), - reduceContext); + if (bounds != null) { Bucket firstBucket = iter.hasNext() ? list.get(iter.nextIndex()) : null; if (firstBucket == null) { @@ -365,7 +405,7 @@ private void addEmptyBuckets(List list, ReduceContext reduceContext) { long key = bounds.getMin() + offset; long max = bounds.getMax() + offset; while (key <= max) { - iter.add(new InternalDateHistogram.Bucket(key, 0, keyed, format, reducedEmptySubAggs)); + onBucket.accept(key); key = nextKey(key).longValue(); } } @@ -374,7 +414,7 @@ private void addEmptyBuckets(List list, ReduceContext reduceContext) { long key = bounds.getMin() + offset; if (key < firstBucket.key) { while (key < firstBucket.key) { - iter.add(new InternalDateHistogram.Bucket(key, 0, keyed, format, reducedEmptySubAggs)); + onBucket.accept(key); key = nextKey(key).longValue(); } } @@ -382,6 +422,7 @@ private void addEmptyBuckets(List list, ReduceContext reduceContext) { } } + Bucket lastBucket = null; // now adding the empty buckets within the actual data, // e.g. if the data series is [1,2,3,7] there're 3 empty buckets that will be created for 4,5,6 while (iter.hasNext()) { @@ -389,7 +430,7 @@ private void addEmptyBuckets(List list, ReduceContext reduceContext) { if (lastBucket != null) { long key = nextKey(lastBucket.key).longValue(); while (key < nextBucket.key) { - iter.add(new InternalDateHistogram.Bucket(key, 0, keyed, format, reducedEmptySubAggs)); + onBucket.accept(key); key = nextKey(key).longValue(); } assert key == nextBucket.key : "key: " + key + ", nextBucket.key: " + nextBucket.key; @@ -402,7 +443,7 @@ private void addEmptyBuckets(List list, ReduceContext reduceContext) { long key = nextKey(lastBucket.key).longValue(); long max = bounds.getMax() + offset; while (key <= max) { - iter.add(new InternalDateHistogram.Bucket(key, 0, keyed, format, reducedEmptySubAggs)); + onBucket.accept(key); key = nextKey(key).longValue(); } } @@ -411,9 +452,11 @@ private void addEmptyBuckets(List list, ReduceContext reduceContext) { @Override public InternalAggregation reduce(List aggregations, ReduceContext reduceContext) { List reducedBuckets = reduceBuckets(aggregations, reduceContext); + boolean alreadyAccountedForBuckets = false; if (reduceContext.isFinalReduce()) { if (minDocCount == 0) { addEmptyBuckets(reducedBuckets, reduceContext); + alreadyAccountedForBuckets = true; } if (InternalOrder.isKeyDesc(order)) { // we just need to reverse here... @@ -427,7 +470,9 @@ public InternalAggregation reduce(List aggregations, Reduce CollectionUtil.introSort(reducedBuckets, order.comparator()); } } - reduceContext.consumeBucketsAndMaybeBreak(reducedBuckets.size()); + if (false == alreadyAccountedForBuckets) { + reduceContext.consumeBucketsAndMaybeBreak(reducedBuckets.size()); + } return new InternalDateHistogram(getName(), reducedBuckets, order, minDocCount, offset, emptyBucketInfo, format, keyed, getMetadata()); } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalHistogram.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalHistogram.java index 797ab3f313df1..17313fa05c583 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalHistogram.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalHistogram.java @@ -347,30 +347,31 @@ private double round(double key) { return Math.floor((key - emptyBucketInfo.offset) / emptyBucketInfo.interval) * emptyBucketInfo.interval + emptyBucketInfo.offset; } + /** + * When we pre-count the empty buckets we report them periodically + * because you can configure the histogram to create more buckets than + * there are atoms in the universe. It'd take a while to count that high + * only to abort. So we report every couple thousand buckets. It's be + * simpler to report every single bucket we plan to allocate one at a time + * but that'd cause needless overhead on the circuit breakers. Counting a + * couple thousand buckets is plenty fast to fail this quickly in + * pathological cases and plenty large to keep the overhead minimal. + */ + private static final int REPORT_EMPTY_EVERY = 10_000; + private void addEmptyBuckets(List list, ReduceContext reduceContext) { /* * Make sure we have space for the empty buckets we're going to add by * counting all of the empties we plan to add and firing them into * consumeBucketsAndMaybeBreak. - * - * We don't count all of the buckets we plan to allocate and then report - * them all at once because we you can configure the histogram to create - * more buckets than there are hydrogen atoms in the universe. It'd take - * a while to count that high only to abort. So we report every couple - * thousand buckets. It's be simpler to report every single bucket we plan - * to allocate one at a time but that'd cause needless overhead on the - * circuit breakers. Counting a couple thousand buckets is plenty fast - * to fail this quickly in pathological cases and plenty large to keep - * the overhead minimal. */ - int reportEmptyEvery = 10000; class Counter implements DoubleConsumer { private int size = list.size(); @Override public void accept(double key) { size++; - if (size >= reportEmptyEvery) { + if (size >= REPORT_EMPTY_EVERY) { reduceContext.consumeBucketsAndMaybeBreak(size); size = 0; } diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalDateHistogramTests.java b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalDateHistogramTests.java index 8928fd2511237..e8ffd07a03deb 100644 --- a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalDateHistogramTests.java +++ b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalDateHistogramTests.java @@ -9,7 +9,9 @@ package org.elasticsearch.search.aggregations.bucket.histogram; import org.elasticsearch.common.Rounding; +import org.elasticsearch.common.Rounding.DateTimeUnit; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.index.mapper.DateFieldMapper; import org.elasticsearch.search.DocValueFormat; import org.elasticsearch.search.aggregations.BucketOrder; import org.elasticsearch.search.aggregations.InternalAggregations; @@ -180,4 +182,28 @@ protected InternalDateHistogram mutateInstance(InternalDateHistogram instance) { } return new InternalDateHistogram(name, buckets, order, minDocCount, offset, emptyBucketInfo, format, keyed, metadata); } + + public void testLargeReduce() { + expectReduceUsesTooManyBuckets( + new InternalDateHistogram( + "h", + org.elasticsearch.common.collect.List.of(), + BucketOrder.key(true), + 0, + 0, + new InternalDateHistogram.EmptyBucketInfo( + Rounding.builder(DateTimeUnit.SECOND_OF_MINUTE).build(), + InternalAggregations.EMPTY, + new LongBounds( + DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.parseMillis("2018-01-01T00:00:00Z"), + DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.parseMillis("2021-01-01T00:00:00Z") + ) + ), + DocValueFormat.RAW, + false, + null + ), + 100000 + ); + } } diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalHistogramTests.java b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalHistogramTests.java index fe7d2800367c6..1115719c308ca 100644 --- a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalHistogramTests.java +++ b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalHistogramTests.java @@ -9,12 +9,9 @@ package org.elasticsearch.search.aggregations.bucket.histogram; import org.apache.lucene.util.TestUtil; -import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.search.DocValueFormat; import org.elasticsearch.search.aggregations.BucketOrder; -import org.elasticsearch.search.aggregations.InternalAggregation; import org.elasticsearch.search.aggregations.InternalAggregations; -import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator.PipelineTree; import org.elasticsearch.test.InternalAggregationTestCase; import org.elasticsearch.test.InternalMultiBucketAggregationTestCase; @@ -24,9 +21,6 @@ import java.util.List; import java.util.Map; import java.util.TreeMap; -import java.util.function.IntConsumer; - -import static org.hamcrest.Matchers.equalTo; public class InternalHistogramTests extends InternalMultiBucketAggregationTestCase { @@ -103,7 +97,7 @@ public void testHandlesNaN() { } public void testLargeReduce() { - InternalHistogram histo = new InternalHistogram( + expectReduceUsesTooManyBuckets(new InternalHistogram( "h", org.elasticsearch.common.collect.List.of(), BucketOrder.key(true), @@ -112,28 +106,7 @@ public void testLargeReduce() { DocValueFormat.RAW, false, null - ); - InternalAggregation.ReduceContext reduceContext = InternalAggregation.ReduceContext.forFinalReduction( - BigArrays.NON_RECYCLING_INSTANCE, - null, - new IntConsumer() { - int buckets; - - @Override - public void accept(int value) { - buckets += value; - if (buckets > 100000) { - throw new IllegalArgumentException("too big!"); - } - } - }, - PipelineTree.EMPTY - ); - Exception e = expectThrows( - IllegalArgumentException.class, - () -> histo.reduce(org.elasticsearch.common.collect.List.of(histo), reduceContext) - ); - assertThat(e.getMessage(), equalTo("too big!")); + ), 100000); } @Override diff --git a/test/framework/src/main/java/org/elasticsearch/test/InternalMultiBucketAggregationTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/InternalMultiBucketAggregationTestCase.java index 7d6b384e1d31e..cd0055a766533 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/InternalMultiBucketAggregationTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/InternalMultiBucketAggregationTestCase.java @@ -8,6 +8,7 @@ package org.elasticsearch.test; +import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.search.aggregations.Aggregation; import org.elasticsearch.search.aggregations.Aggregations; import org.elasticsearch.search.aggregations.InternalAggregation; @@ -17,15 +18,18 @@ import org.elasticsearch.search.aggregations.ParsedAggregation; import org.elasticsearch.search.aggregations.ParsedMultiBucketAggregation; import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation; +import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator.PipelineTree; import java.io.IOException; import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.function.IntConsumer; import java.util.function.Supplier; import static java.util.Collections.emptyMap; +import static org.hamcrest.Matchers.equalTo; public abstract class InternalMultiBucketAggregationTestCase extends InternalAggregationTestCase { @@ -185,4 +189,31 @@ public void doAssertReducedMultiBucketConsumer(Aggregation agg, MultiBucketConsu * No-op. */ } + + /** + * Reduce an aggreation, expecting it to collect more than a certain number of buckets. + */ + protected static void expectReduceUsesTooManyBuckets(InternalAggregation agg, int bucketLimit) { + InternalAggregation.ReduceContext reduceContext = InternalAggregation.ReduceContext.forFinalReduction( + BigArrays.NON_RECYCLING_INSTANCE, + null, + new IntConsumer() { + int buckets; + + @Override + public void accept(int value) { + buckets += value; + if (buckets > bucketLimit) { + throw new IllegalArgumentException("too big!"); + } + } + }, + PipelineTree.EMPTY + ); + Exception e = expectThrows( + IllegalArgumentException.class, + () -> agg.reduce(org.elasticsearch.common.collect.List.of(agg), reduceContext) + ); + assertThat(e.getMessage(), equalTo("too big!")); + } }