Skip to content

Commit

Permalink
Same memory when geo aggregations are not on top (#57483) (#57551)
Browse files Browse the repository at this point in the history
Saves memory when the `geotile_grid` and `geohash_grid` are not on the
top level by using the `LongKeyedBucketOrds` we built in #55873.
  • Loading branch information
nik9000 authored Jun 2, 2020
1 parent 97a5127 commit 2a27c41
Show file tree
Hide file tree
Showing 13 changed files with 144 additions and 76 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,13 @@
import org.apache.lucene.index.SortedNumericDocValues;
import org.apache.lucene.search.ScoreMode;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.util.LongHash;
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.LeafBucketCollector;
import org.elasticsearch.search.aggregations.LeafBucketCollectorBase;
import org.elasticsearch.search.aggregations.bucket.BucketsAggregator;
import org.elasticsearch.search.aggregations.bucket.terms.LongKeyedBucketOrds;
import org.elasticsearch.search.aggregations.support.ValuesSource;
import org.elasticsearch.search.internal.SearchContext;

Expand All @@ -46,17 +46,16 @@ public abstract class GeoGridAggregator<T extends InternalGeoGrid> extends Bucke
protected final int requiredSize;
protected final int shardSize;
protected final ValuesSource.Numeric valuesSource;
protected final LongHash bucketOrds;
protected SortedNumericDocValues values;
protected final LongKeyedBucketOrds bucketOrds;

GeoGridAggregator(String name, AggregatorFactories factories, ValuesSource.Numeric valuesSource,
int requiredSize, int shardSize, SearchContext aggregationContext,
Aggregator parent, Map<String, Object> metadata) throws IOException {
Aggregator parent, boolean collectsFromSingleBucket, Map<String, Object> metadata) throws IOException {
super(name, factories, aggregationContext, parent, metadata);
this.valuesSource = valuesSource;
this.requiredSize = requiredSize;
this.shardSize = shardSize;
bucketOrds = new LongHash(1, aggregationContext.bigArrays());
bucketOrds = LongKeyedBucketOrds.build(context.bigArrays(), collectsFromSingleBucket);
}

@Override
Expand All @@ -70,19 +69,18 @@ public ScoreMode scoreMode() {
@Override
public LeafBucketCollector getLeafCollector(LeafReaderContext ctx,
final LeafBucketCollector sub) throws IOException {
values = valuesSource.longValues(ctx);
SortedNumericDocValues values = valuesSource.longValues(ctx);
return new LeafBucketCollectorBase(sub, null) {
@Override
public void collect(int doc, long bucket) throws IOException {
assert bucket == 0;
public void collect(int doc, long owningBucketOrd) throws IOException {
if (values.advanceExact(doc)) {
final int valuesCount = values.docValueCount();

long previous = Long.MAX_VALUE;
for (int i = 0; i < valuesCount; ++i) {
final long val = values.nextValue();
if (previous != val || i == 0) {
long bucketOrdinal = bucketOrds.add(val);
long bucketOrdinal = bucketOrds.add(owningBucketOrd, val);
if (bucketOrdinal < 0) { // already seen
bucketOrdinal = -1 - bucketOrdinal;
collectExistingBucket(sub, doc, bucketOrdinal);
Expand All @@ -108,31 +106,38 @@ public void collect(int doc, long bucket) throws IOException {

@Override
public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException {
assert owningBucketOrds.length == 1 && owningBucketOrds[0] == 0;
final int size = (int) Math.min(bucketOrds.size(), shardSize);
consumeBucketsAndMaybeBreak(size);

BucketPriorityQueue<InternalGeoGridBucket> ordered = new BucketPriorityQueue<>(size);
InternalGeoGridBucket spare = null;
for (long i = 0; i < bucketOrds.size(); i++) {
if (spare == null) {
spare = newEmptyBucket();
InternalGeoGridBucket[][] topBucketsPerOrd = new InternalGeoGridBucket[owningBucketOrds.length][];
for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) {
int size = (int) Math.min(bucketOrds.bucketsInOrd(owningBucketOrds[ordIdx]), shardSize);
consumeBucketsAndMaybeBreak(size);

BucketPriorityQueue<InternalGeoGridBucket> ordered = new BucketPriorityQueue<>(size);
InternalGeoGridBucket spare = null;
LongKeyedBucketOrds.BucketOrdsEnum ordsEnum = bucketOrds.ordsEnum(owningBucketOrds[ordIdx]);
while (ordsEnum.next()) {
if (spare == null) {
spare = newEmptyBucket();
}

// need a special function to keep the source bucket
// up-to-date so it can get the appropriate key
spare.hashAsLong = ordsEnum.value();
spare.docCount = bucketDocCount(ordsEnum.ord());
spare.bucketOrd = ordsEnum.ord();
spare = ordered.insertWithOverflow(spare);
}

topBucketsPerOrd[ordIdx] = new InternalGeoGridBucket[ordered.size()];
for (int i = ordered.size() - 1; i >= 0; --i) {
topBucketsPerOrd[ordIdx][i] = ordered.pop();
}

// need a special function to keep the source bucket
// up-to-date so it can get the appropriate key
spare.hashAsLong = bucketOrds.get(i);
spare.docCount = bucketDocCount(i);
spare.bucketOrd = i;
spare = ordered.insertWithOverflow(spare);
}

final InternalGeoGridBucket[] list = new InternalGeoGridBucket[ordered.size()];
for (int i = ordered.size() - 1; i >= 0; --i) {
list[i] = ordered.pop();
buildSubAggsForAllBuckets(topBucketsPerOrd, b -> b.bucketOrd, (b, aggs) -> b.aggregations = aggs);
InternalAggregation[] results = new InternalAggregation[owningBucketOrds.length];
for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) {
results[ordIdx] = buildAggregation(name, requiredSize, Arrays.asList(topBucketsPerOrd[ordIdx]), metadata());
}
buildSubAggsForBuckets(list, b -> b.bucketOrd, (b, aggs) -> b.aggregations = aggs);
return new InternalAggregation[] {buildAggregation(name, requiredSize, Arrays.asList(list), metadata())};
return results;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ public class GeoHashGridAggregator extends GeoGridAggregator<InternalGeoHashGrid

public GeoHashGridAggregator(String name, AggregatorFactories factories, ValuesSource.Numeric valuesSource,
int requiredSize, int shardSize, SearchContext aggregationContext,
Aggregator parent, Map<String, Object> metadata) throws IOException {
super(name, factories, valuesSource, requiredSize, shardSize, aggregationContext, parent, metadata);
Aggregator parent, boolean collectsFromSingleBucket, Map<String, Object> metadata) throws IOException {
super(name, factories, valuesSource, requiredSize, shardSize, aggregationContext, parent, collectsFromSingleBucket, metadata);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,21 +85,18 @@ protected Aggregator doCreateInternal(final ValuesSource valuesSource,
throw new AggregationExecutionException("Registry miss-match - expected "
+ GeoGridAggregatorSupplier.class.getName() + ", found [" + aggregatorSupplier.getClass().toString() + "]");
}
if (collectsFromSingleBucket == false) {
return asMultiBucketAggregator(this, searchContext, parent);
}
return ((GeoGridAggregatorSupplier) aggregatorSupplier).build(name, factories, valuesSource, precision, geoBoundingBox,
requiredSize, shardSize, searchContext, parent, metadata);
requiredSize, shardSize, searchContext, parent, collectsFromSingleBucket, metadata);
}

static void registerAggregators(ValuesSourceRegistry.Builder builder) {
builder.register(GeoHashGridAggregationBuilder.NAME, CoreValuesSourceType.GEOPOINT,
(GeoGridAggregatorSupplier) (name, factories, valuesSource, precision, geoBoundingBox, requiredSize, shardSize,
aggregationContext, parent, metadata) -> {
aggregationContext, parent, collectsFromSingleBucket, metadata) -> {
CellIdSource cellIdSource = new CellIdSource((ValuesSource.GeoPoint) valuesSource, precision, geoBoundingBox,
Geohash::longEncode);
return new GeoHashGridAggregator(name, factories, cellIdSource, requiredSize, shardSize, aggregationContext,
parent, metadata);
parent, collectsFromSingleBucket, metadata);
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ public class GeoTileGridAggregator extends GeoGridAggregator<InternalGeoTileGrid

public GeoTileGridAggregator(String name, AggregatorFactories factories, ValuesSource.Numeric valuesSource,
int requiredSize, int shardSize, SearchContext aggregationContext,
Aggregator parent, Map<String, Object> metadata) throws IOException {
super(name, factories, valuesSource, requiredSize, shardSize, aggregationContext, parent, metadata);
Aggregator parent, boolean collectsFromSingleBucket, Map<String, Object> metadata) throws IOException {
super(name, factories, valuesSource, requiredSize, shardSize, aggregationContext, parent, collectsFromSingleBucket, metadata);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,21 +83,18 @@ protected Aggregator doCreateInternal(final ValuesSource valuesSource,
throw new AggregationExecutionException("Registry miss-match - expected "
+ GeoGridAggregatorSupplier.class.getName() + ", found [" + aggregatorSupplier.getClass().toString() + "]");
}
if (collectsFromSingleBucket == false) {
return asMultiBucketAggregator(this, searchContext, parent);
}
return ((GeoGridAggregatorSupplier) aggregatorSupplier).build(name, factories, valuesSource, precision, geoBoundingBox,
requiredSize, shardSize, searchContext, parent, metadata);
requiredSize, shardSize, searchContext, parent, collectsFromSingleBucket, metadata);
}

static void registerAggregators(ValuesSourceRegistry.Builder builder) {
builder.register(GeoTileGridAggregationBuilder.NAME, CoreValuesSourceType.GEOPOINT,
(GeoGridAggregatorSupplier) (name, factories, valuesSource, precision, geoBoundingBox, requiredSize, shardSize,
aggregationContext, parent, metadata) -> {
aggregationContext, parent, collectsFromSingleBucket, metadata) -> {
CellIdSource cellIdSource = new CellIdSource((ValuesSource.GeoPoint) valuesSource, precision, geoBoundingBox,
GeoTileUtils::longEncode);
return new GeoTileGridAggregator(name, factories, cellIdSource, requiredSize, shardSize, aggregationContext,
parent, metadata);
parent, collectsFromSingleBucket, metadata);
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,17 @@

@FunctionalInterface
public interface GeoGridAggregatorSupplier extends AggregatorSupplier {

GeoGridAggregator build(String name, AggregatorFactories factories, ValuesSource valuesSource,
int precision, GeoBoundingBox geoBoundingBox, int requiredSize, int shardSize,
SearchContext aggregationContext, Aggregator parent,
Map<String, Object> metadata) throws IOException;
GeoGridAggregator build(
String name,
AggregatorFactories factories,
ValuesSource valuesSource,
int precision,
GeoBoundingBox geoBoundingBox,
int requiredSize,
int shardSize,
SearchContext aggregationContext,
Aggregator parent,
boolean collectsFromSingleBucket,
Map<String, Object> metadata
) throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,17 @@
package org.elasticsearch.search.aggregations.bucket.geogrid;

import org.apache.lucene.document.LatLonDocValuesField;
import org.apache.lucene.document.SortedSetDocValuesField;
import org.apache.lucene.geo.GeoEncodingUtils;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.IndexableField;
import org.apache.lucene.index.RandomIndexWriter;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.MatchAllDocsQuery;
import org.apache.lucene.search.Query;
import org.apache.lucene.store.Directory;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.common.CheckedConsumer;
import org.elasticsearch.common.geo.GeoBoundingBox;
import org.elasticsearch.common.geo.GeoBoundingBoxTests;
Expand All @@ -35,6 +38,8 @@
import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.AggregatorTestCase;
import org.elasticsearch.search.aggregations.bucket.terms.StringTerms;
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
import org.elasticsearch.search.aggregations.support.AggregationInspectionHelper;

import java.io.IOException;
Expand All @@ -45,6 +50,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.function.Consumer;
import java.util.function.Function;

Expand Down Expand Up @@ -109,18 +115,9 @@ public void testWithSeveralDocs() throws IOException {
List<LatLonDocValuesField> points = new ArrayList<>();
Set<String> distinctHashesPerDoc = new HashSet<>();
for (int pointId = 0; pointId < numPoints; pointId++) {
double lat = (180d * randomDouble()) - 90d;
double lng = (360d * randomDouble()) - 180d;

// Precision-adjust longitude/latitude to avoid wrong bucket placement
// Internally, lat/lng get converted to 32 bit integers, loosing some precision.
// This does not affect geohashing because geohash uses the same algorithm,
// but it does affect other bucketing algos, thus we need to do the same steps here.
lng = GeoEncodingUtils.decodeLongitude(GeoEncodingUtils.encodeLongitude(lng));
lat = GeoEncodingUtils.decodeLatitude(GeoEncodingUtils.encodeLatitude(lat));

points.add(new LatLonDocValuesField(FIELD_NAME, lat, lng));
String hash = hashAsString(lng, lat, precision);
double[] latLng = randomLatLng();
points.add(new LatLonDocValuesField(FIELD_NAME, latLng[0], latLng[1]));
String hash = hashAsString(latLng[1], latLng[0], precision);
if (distinctHashesPerDoc.contains(hash) == false) {
expectedCountPerGeoHash.put(hash, expectedCountPerGeoHash.getOrDefault(hash, 0) + 1);
}
Expand All @@ -137,6 +134,60 @@ public void testWithSeveralDocs() throws IOException {
});
}

public void testAsSubAgg() throws IOException {
int precision = randomPrecision();
Map<String, Map<String, Long>> expectedCountPerTPerGeoHash = new TreeMap<>();
List<List<IndexableField>> docs = new ArrayList<>();
for (int i = 0; i < 30; i++) {
String t = randomAlphaOfLength(1);
double[] latLng = randomLatLng();

List<IndexableField> doc = new ArrayList<>();
docs.add(doc);
doc.add(new LatLonDocValuesField(FIELD_NAME, latLng[0], latLng[1]));
doc.add(new SortedSetDocValuesField("t", new BytesRef(t)));

String hash = hashAsString(latLng[1], latLng[0], precision);
Map<String, Long> expectedCountPerGeoHash = expectedCountPerTPerGeoHash.get(t);
if (expectedCountPerGeoHash == null) {
expectedCountPerGeoHash = new TreeMap<>();
expectedCountPerTPerGeoHash.put(t, expectedCountPerGeoHash);
}
expectedCountPerGeoHash.put(hash, expectedCountPerGeoHash.getOrDefault(hash, 0L) + 1);
}
CheckedConsumer<RandomIndexWriter, IOException> buildIndex = iw -> iw.addDocuments(docs);
TermsAggregationBuilder aggregationBuilder = new TermsAggregationBuilder("t").field("t")
.size(expectedCountPerTPerGeoHash.size())
.subAggregation(createBuilder("gg").field(FIELD_NAME).precision(precision));
Consumer<StringTerms> verify = (terms) -> {
Map<String, Map<String, Long>> actual = new TreeMap<>();
for (StringTerms.Bucket tb: terms.getBuckets()) {
InternalGeoGrid<?> gg = tb.getAggregations().get("gg");
Map<String, Long> sub = new TreeMap<>();
for (InternalGeoGridBucket<?> ggb : gg.getBuckets()) {
sub.put(ggb.getKeyAsString(), ggb.getDocCount());
}
actual.put(tb.getKeyAsString(), sub);
}
assertThat(actual, equalTo(expectedCountPerTPerGeoHash));
};
testCase(aggregationBuilder, new MatchAllDocsQuery(), buildIndex, verify, keywordField("t"), geoPointField(FIELD_NAME));
}

private double[] randomLatLng() {
double lat = (180d * randomDouble()) - 90d;
double lng = (360d * randomDouble()) - 180d;

// Precision-adjust longitude/latitude to avoid wrong bucket placement
// Internally, lat/lng get converted to 32 bit integers, loosing some precision.
// This does not affect geohashing because geohash uses the same algorithm,
// but it does affect other bucketing algos, thus we need to do the same steps here.
lng = GeoEncodingUtils.decodeLongitude(GeoEncodingUtils.encodeLongitude(lng));
lat = GeoEncodingUtils.decodeLatitude(GeoEncodingUtils.encodeLatitude(lat));

return new double[] {lat, lng};
}

public void testBounds() throws IOException {
final int numDocs = randomIntBetween(64, 256);
final GeoGridAggregationBuilder builder = createBuilder("_name");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -300,13 +300,11 @@ public void testIncorrectFieldType() throws Exception {
HistogramAggregationBuilder aggBuilder = new HistogramAggregationBuilder("my_agg")
.field("field")
.interval(5);
MappedFieldType fieldType = keywordField("field");
fieldType.setHasDocValues(true);
try (IndexReader reader = w.getReader()) {
IndexSearcher searcher = new IndexSearcher(reader);

expectThrows(IllegalArgumentException.class, () -> {
search(searcher, new MatchAllDocsQuery(), aggBuilder, fieldType);
search(searcher, new MatchAllDocsQuery(), aggBuilder, keywordField("field"));
});
}
}
Expand Down
Loading

0 comments on commit 2a27c41

Please sign in to comment.