diff --git a/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/search.aggregation/10_histogram.yml b/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/search.aggregation/10_histogram.yml index ee153f1612751..b1b64d86dac6e 100644 --- a/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/search.aggregation/10_histogram.yml +++ b/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/search.aggregation/10_histogram.yml @@ -732,3 +732,35 @@ setup: histogram: field: number interval: 5e-10 + +--- +"Tiny tiny tiny date_range": + - skip: + version: " - 7.13.99" + reason: fixed in 87.14.0 + + - do: + bulk: + index: test_1 + refresh: true + body: + - '{"index": {}}' + - '{"date": "2018-01-01T00:00:00Z"}' + - '{"index": {}}' + - '{"date": "2019-01-01T00:00:00Z"}' + - '{"index": {}}' + - '{"date": "2020-01-01T00:00:00Z"}' + - '{"index": {}}' + - '{"date": "2021-01-01T00:00:00Z"}' + + - do: + catch: '/Trying to create too many buckets. Must be less than or equal to: \[65536\]/' + search: + index: test_1 + body: + size: 0 + aggs: + histo: + date_histogram: + field: date + interval: second diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalDateHistogram.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalDateHistogram.java index 212dda9f3dffc..59e34b1d8288c 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalDateHistogram.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalDateHistogram.java @@ -33,6 +33,7 @@ import java.util.ListIterator; import java.util.Map; import java.util.Objects; +import java.util.function.LongConsumer; /** * Implementation of {@link Histogram}. @@ -288,7 +289,7 @@ protected boolean lessThan(IteratorAndCurrent a, IteratorAndCurrent(histogram.buckets.iterator())); } } @@ -350,14 +351,53 @@ protected Bucket reduceBucket(List buckets, ReduceContext context) { return createBucket(buckets.get(0).key, docCount, aggs); } + /** + * When we pre-count the empty buckets we report them periodically + * because you can configure the date_histogram to create an astounding + * number of buckets. It'd take a while to count that high only to abort. + * So we report every couple thousand buckets. It's be simpler to report + * every single bucket we plan to allocate one at a time but that'd cause + * needless overhead on the circuit breakers. Counting a couple thousand + * buckets is plenty fast to fail this quickly in pathological cases and + * plenty large to keep the overhead minimal. + */ + private static final int REPORT_EMPTY_EVERY = 10_000; + private void addEmptyBuckets(List list, ReduceContext reduceContext) { - Bucket lastBucket = null; - LongBounds bounds = emptyBucketInfo.bounds; + /* + * Make sure we have space for the empty buckets we're going to add by + * counting all of the empties we plan to add and firing them into + * consumeBucketsAndMaybeBreak. + */ + class Counter implements LongConsumer { + private int size = list.size(); + + @Override + public void accept(long key) { + size++; + if (size >= REPORT_EMPTY_EVERY) { + reduceContext.consumeBucketsAndMaybeBreak(size); + size = 0; + } + } + } + Counter counter = new Counter(); + iterateEmptyBuckets(list, list.listIterator(), counter); + reduceContext.consumeBucketsAndMaybeBreak(counter.size); + + InternalAggregations reducedEmptySubAggs = InternalAggregations.reduce( + org.elasticsearch.common.collect.List.of(emptyBucketInfo.subAggregations), + reduceContext + ); ListIterator iter = list.listIterator(); + iterateEmptyBuckets(list, iter, key -> iter.add(new InternalDateHistogram.Bucket(key, 0, keyed, format, reducedEmptySubAggs))); + } + + private void iterateEmptyBuckets(List list, ListIterator iter, LongConsumer onBucket) { + LongBounds bounds = emptyBucketInfo.bounds; // first adding all the empty buckets *before* the actual data (based on the extended_bounds.min the user requested) - InternalAggregations reducedEmptySubAggs = InternalAggregations.reduce(Collections.singletonList(emptyBucketInfo.subAggregations), - reduceContext); + if (bounds != null) { Bucket firstBucket = iter.hasNext() ? list.get(iter.nextIndex()) : null; if (firstBucket == null) { @@ -365,7 +405,7 @@ private void addEmptyBuckets(List list, ReduceContext reduceContext) { long key = bounds.getMin() + offset; long max = bounds.getMax() + offset; while (key <= max) { - iter.add(new InternalDateHistogram.Bucket(key, 0, keyed, format, reducedEmptySubAggs)); + onBucket.accept(key); key = nextKey(key).longValue(); } } @@ -374,7 +414,7 @@ private void addEmptyBuckets(List list, ReduceContext reduceContext) { long key = bounds.getMin() + offset; if (key < firstBucket.key) { while (key < firstBucket.key) { - iter.add(new InternalDateHistogram.Bucket(key, 0, keyed, format, reducedEmptySubAggs)); + onBucket.accept(key); key = nextKey(key).longValue(); } } @@ -382,6 +422,7 @@ private void addEmptyBuckets(List list, ReduceContext reduceContext) { } } + Bucket lastBucket = null; // now adding the empty buckets within the actual data, // e.g. if the data series is [1,2,3,7] there're 3 empty buckets that will be created for 4,5,6 while (iter.hasNext()) { @@ -389,7 +430,7 @@ private void addEmptyBuckets(List list, ReduceContext reduceContext) { if (lastBucket != null) { long key = nextKey(lastBucket.key).longValue(); while (key < nextBucket.key) { - iter.add(new InternalDateHistogram.Bucket(key, 0, keyed, format, reducedEmptySubAggs)); + onBucket.accept(key); key = nextKey(key).longValue(); } assert key == nextBucket.key : "key: " + key + ", nextBucket.key: " + nextBucket.key; @@ -402,7 +443,7 @@ private void addEmptyBuckets(List list, ReduceContext reduceContext) { long key = nextKey(lastBucket.key).longValue(); long max = bounds.getMax() + offset; while (key <= max) { - iter.add(new InternalDateHistogram.Bucket(key, 0, keyed, format, reducedEmptySubAggs)); + onBucket.accept(key); key = nextKey(key).longValue(); } } @@ -411,9 +452,11 @@ private void addEmptyBuckets(List list, ReduceContext reduceContext) { @Override public InternalAggregation reduce(List aggregations, ReduceContext reduceContext) { List reducedBuckets = reduceBuckets(aggregations, reduceContext); + boolean alreadyAccountedForBuckets = false; if (reduceContext.isFinalReduce()) { if (minDocCount == 0) { addEmptyBuckets(reducedBuckets, reduceContext); + alreadyAccountedForBuckets = true; } if (InternalOrder.isKeyDesc(order)) { // we just need to reverse here... @@ -427,7 +470,9 @@ public InternalAggregation reduce(List aggregations, Reduce CollectionUtil.introSort(reducedBuckets, order.comparator()); } } - reduceContext.consumeBucketsAndMaybeBreak(reducedBuckets.size()); + if (false == alreadyAccountedForBuckets) { + reduceContext.consumeBucketsAndMaybeBreak(reducedBuckets.size()); + } return new InternalDateHistogram(getName(), reducedBuckets, order, minDocCount, offset, emptyBucketInfo, format, keyed, getMetadata()); } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalHistogram.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalHistogram.java index 797ab3f313df1..17313fa05c583 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalHistogram.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalHistogram.java @@ -347,30 +347,31 @@ private double round(double key) { return Math.floor((key - emptyBucketInfo.offset) / emptyBucketInfo.interval) * emptyBucketInfo.interval + emptyBucketInfo.offset; } + /** + * When we pre-count the empty buckets we report them periodically + * because you can configure the histogram to create more buckets than + * there are atoms in the universe. It'd take a while to count that high + * only to abort. So we report every couple thousand buckets. It's be + * simpler to report every single bucket we plan to allocate one at a time + * but that'd cause needless overhead on the circuit breakers. Counting a + * couple thousand buckets is plenty fast to fail this quickly in + * pathological cases and plenty large to keep the overhead minimal. + */ + private static final int REPORT_EMPTY_EVERY = 10_000; + private void addEmptyBuckets(List list, ReduceContext reduceContext) { /* * Make sure we have space for the empty buckets we're going to add by * counting all of the empties we plan to add and firing them into * consumeBucketsAndMaybeBreak. - * - * We don't count all of the buckets we plan to allocate and then report - * them all at once because we you can configure the histogram to create - * more buckets than there are hydrogen atoms in the universe. It'd take - * a while to count that high only to abort. So we report every couple - * thousand buckets. It's be simpler to report every single bucket we plan - * to allocate one at a time but that'd cause needless overhead on the - * circuit breakers. Counting a couple thousand buckets is plenty fast - * to fail this quickly in pathological cases and plenty large to keep - * the overhead minimal. */ - int reportEmptyEvery = 10000; class Counter implements DoubleConsumer { private int size = list.size(); @Override public void accept(double key) { size++; - if (size >= reportEmptyEvery) { + if (size >= REPORT_EMPTY_EVERY) { reduceContext.consumeBucketsAndMaybeBreak(size); size = 0; } diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalDateHistogramTests.java b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalDateHistogramTests.java index 8928fd2511237..e8ffd07a03deb 100644 --- a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalDateHistogramTests.java +++ b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalDateHistogramTests.java @@ -9,7 +9,9 @@ package org.elasticsearch.search.aggregations.bucket.histogram; import org.elasticsearch.common.Rounding; +import org.elasticsearch.common.Rounding.DateTimeUnit; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.index.mapper.DateFieldMapper; import org.elasticsearch.search.DocValueFormat; import org.elasticsearch.search.aggregations.BucketOrder; import org.elasticsearch.search.aggregations.InternalAggregations; @@ -180,4 +182,28 @@ protected InternalDateHistogram mutateInstance(InternalDateHistogram instance) { } return new InternalDateHistogram(name, buckets, order, minDocCount, offset, emptyBucketInfo, format, keyed, metadata); } + + public void testLargeReduce() { + expectReduceUsesTooManyBuckets( + new InternalDateHistogram( + "h", + org.elasticsearch.common.collect.List.of(), + BucketOrder.key(true), + 0, + 0, + new InternalDateHistogram.EmptyBucketInfo( + Rounding.builder(DateTimeUnit.SECOND_OF_MINUTE).build(), + InternalAggregations.EMPTY, + new LongBounds( + DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.parseMillis("2018-01-01T00:00:00Z"), + DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.parseMillis("2021-01-01T00:00:00Z") + ) + ), + DocValueFormat.RAW, + false, + null + ), + 100000 + ); + } } diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalHistogramTests.java b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalHistogramTests.java index fe7d2800367c6..1115719c308ca 100644 --- a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalHistogramTests.java +++ b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalHistogramTests.java @@ -9,12 +9,9 @@ package org.elasticsearch.search.aggregations.bucket.histogram; import org.apache.lucene.util.TestUtil; -import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.search.DocValueFormat; import org.elasticsearch.search.aggregations.BucketOrder; -import org.elasticsearch.search.aggregations.InternalAggregation; import org.elasticsearch.search.aggregations.InternalAggregations; -import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator.PipelineTree; import org.elasticsearch.test.InternalAggregationTestCase; import org.elasticsearch.test.InternalMultiBucketAggregationTestCase; @@ -24,9 +21,6 @@ import java.util.List; import java.util.Map; import java.util.TreeMap; -import java.util.function.IntConsumer; - -import static org.hamcrest.Matchers.equalTo; public class InternalHistogramTests extends InternalMultiBucketAggregationTestCase { @@ -103,7 +97,7 @@ public void testHandlesNaN() { } public void testLargeReduce() { - InternalHistogram histo = new InternalHistogram( + expectReduceUsesTooManyBuckets(new InternalHistogram( "h", org.elasticsearch.common.collect.List.of(), BucketOrder.key(true), @@ -112,28 +106,7 @@ public void testLargeReduce() { DocValueFormat.RAW, false, null - ); - InternalAggregation.ReduceContext reduceContext = InternalAggregation.ReduceContext.forFinalReduction( - BigArrays.NON_RECYCLING_INSTANCE, - null, - new IntConsumer() { - int buckets; - - @Override - public void accept(int value) { - buckets += value; - if (buckets > 100000) { - throw new IllegalArgumentException("too big!"); - } - } - }, - PipelineTree.EMPTY - ); - Exception e = expectThrows( - IllegalArgumentException.class, - () -> histo.reduce(org.elasticsearch.common.collect.List.of(histo), reduceContext) - ); - assertThat(e.getMessage(), equalTo("too big!")); + ), 100000); } @Override diff --git a/test/framework/src/main/java/org/elasticsearch/test/InternalMultiBucketAggregationTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/InternalMultiBucketAggregationTestCase.java index 7d6b384e1d31e..cd0055a766533 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/InternalMultiBucketAggregationTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/InternalMultiBucketAggregationTestCase.java @@ -8,6 +8,7 @@ package org.elasticsearch.test; +import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.search.aggregations.Aggregation; import org.elasticsearch.search.aggregations.Aggregations; import org.elasticsearch.search.aggregations.InternalAggregation; @@ -17,15 +18,18 @@ import org.elasticsearch.search.aggregations.ParsedAggregation; import org.elasticsearch.search.aggregations.ParsedMultiBucketAggregation; import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation; +import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator.PipelineTree; import java.io.IOException; import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.function.IntConsumer; import java.util.function.Supplier; import static java.util.Collections.emptyMap; +import static org.hamcrest.Matchers.equalTo; public abstract class InternalMultiBucketAggregationTestCase extends InternalAggregationTestCase { @@ -185,4 +189,31 @@ public void doAssertReducedMultiBucketConsumer(Aggregation agg, MultiBucketConsu * No-op. */ } + + /** + * Reduce an aggreation, expecting it to collect more than a certain number of buckets. + */ + protected static void expectReduceUsesTooManyBuckets(InternalAggregation agg, int bucketLimit) { + InternalAggregation.ReduceContext reduceContext = InternalAggregation.ReduceContext.forFinalReduction( + BigArrays.NON_RECYCLING_INSTANCE, + null, + new IntConsumer() { + int buckets; + + @Override + public void accept(int value) { + buckets += value; + if (buckets > bucketLimit) { + throw new IllegalArgumentException("too big!"); + } + } + }, + PipelineTree.EMPTY + ); + Exception e = expectThrows( + IllegalArgumentException.class, + () -> agg.reduce(org.elasticsearch.common.collect.List.of(agg), reduceContext) + ); + assertThat(e.getMessage(), equalTo("too big!")); + } }