Skip to content

Commit

Permalink
removes the CellIdSource abstraction from geo-grid aggs (#45307)
Browse files Browse the repository at this point in the history
CellIdSource is a helper ValuesSource that encodes GeoPoint
into a long-encoded representation of the grid bucket the point
is associated with. This complicates thing as usage evolves to
support shapes that are associated with more than one bucket ordinal.
  • Loading branch information
talevy authored Aug 8, 2019
1 parent eb22677 commit f114ef6
Show file tree
Hide file tree
Showing 6 changed files with 37 additions and 123 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,19 @@
package org.elasticsearch.search.aggregations.bucket.geogrid;

import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.SortedNumericDocValues;
import org.apache.lucene.search.ScoreMode;
import org.elasticsearch.common.geo.GeoPoint;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.util.LongHash;
import org.elasticsearch.index.fielddata.MultiGeoPointValues;
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.aggregations.LeafBucketCollector;
import org.elasticsearch.search.aggregations.LeafBucketCollectorBase;
import org.elasticsearch.search.aggregations.bucket.BucketsAggregator;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
import org.elasticsearch.search.aggregations.support.ValuesSource;
import org.elasticsearch.search.internal.SearchContext;

import java.io.IOException;
Expand All @@ -45,14 +47,19 @@ public abstract class GeoGridAggregator<T extends InternalGeoGrid> extends Bucke

protected final int requiredSize;
protected final int shardSize;
protected final CellIdSource valuesSource;
protected final ValuesSource.GeoPoint valuesSource;
protected final int precision;
protected final GeoPointLongEncoder longEncoder;
protected final LongHash bucketOrds;

GeoGridAggregator(String name, AggregatorFactories factories, CellIdSource valuesSource,
GeoGridAggregator(String name, AggregatorFactories factories, ValuesSource.GeoPoint valuesSource,
int precision, GeoPointLongEncoder longEncoder,
int requiredSize, int shardSize, SearchContext aggregationContext, Aggregator parent,
List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData) throws IOException {
super(name, factories, aggregationContext, parent, pipelineAggregators, metaData);
this.valuesSource = valuesSource;
this.precision = precision;
this.longEncoder = longEncoder;
this.requiredSize = requiredSize;
this.shardSize = shardSize;
bucketOrds = new LongHash(1, aggregationContext.bigArrays());
Expand All @@ -69,7 +76,7 @@ public ScoreMode scoreMode() {
@Override
public LeafBucketCollector getLeafCollector(LeafReaderContext ctx,
final LeafBucketCollector sub) throws IOException {
final SortedNumericDocValues values = valuesSource.longValues(ctx);
final MultiGeoPointValues values = valuesSource.geoPointValues(ctx);
return new LeafBucketCollectorBase(sub, null) {
@Override
public void collect(int doc, long bucket) throws IOException {
Expand All @@ -79,7 +86,8 @@ public void collect(int doc, long bucket) throws IOException {

long previous = Long.MAX_VALUE;
for (int i = 0; i < valuesCount; ++i) {
final long val = values.nextValue();
final GeoPoint point = values.nextValue();
final long val = longEncoder.encode(point.getLon(), point.getLat(), precision);
if (previous != val || i == 0) {
long bucketOrdinal = bucketOrds.add(val);
if (bucketOrdinal < 0) { // already seen
Expand Down Expand Up @@ -189,4 +197,12 @@ public void doClose() {
Releasables.close(bucketOrds);
}

/**
* The encoder to use to convert a geopoint's (lon, lat, precision) into
* a long-encoded bucket key for aggregating.
*/
@FunctionalInterface
public interface GeoPointLongEncoder {
long encode(double lon, double lat, int precision);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
import org.elasticsearch.search.aggregations.support.ValuesSource;
import org.elasticsearch.search.internal.SearchContext;

import java.io.IOException;
Expand All @@ -33,10 +34,12 @@
*/
public class GeoHashGridAggregator extends GeoGridAggregator<InternalGeoHashGrid> {

GeoHashGridAggregator(String name, AggregatorFactories factories, CellIdSource valuesSource,
int requiredSize, int shardSize, SearchContext aggregationContext, Aggregator parent,
List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData) throws IOException {
super(name, factories, valuesSource, requiredSize, shardSize, aggregationContext, parent, pipelineAggregators, metaData);
GeoHashGridAggregator(String name, AggregatorFactories factories,
ValuesSource.GeoPoint valuesSource, int precision, GeoPointLongEncoder longEncoder,
SearchContext aggregationContext, Aggregator parent, List<PipelineAggregator> pipelineAggregators,
Map<String, Object> metaData, int requiredSize, int shardSize) throws IOException {
super(name, factories, valuesSource, precision, longEncoder, requiredSize, shardSize, aggregationContext, parent,
pipelineAggregators, metaData);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,7 @@ protected Aggregator doCreateInternal(final ValuesSource.GeoPoint valuesSource,
if (collectsFromSingleBucket == false) {
return asMultiBucketAggregator(this, context, parent);
}
CellIdSource cellIdSource = new CellIdSource(valuesSource, precision, Geohash::longEncode);
return new GeoHashGridAggregator(name, factories, cellIdSource, requiredSize, shardSize, context, parent,
pipelineAggregators, metaData);
return new GeoHashGridAggregator(name, factories, valuesSource, precision, Geohash::longEncode, context, parent,
pipelineAggregators, metaData, requiredSize, shardSize);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
import org.elasticsearch.search.aggregations.support.ValuesSource;
import org.elasticsearch.search.internal.SearchContext;

import java.io.IOException;
Expand All @@ -34,10 +35,12 @@
*/
public class GeoTileGridAggregator extends GeoGridAggregator<InternalGeoTileGrid> {

GeoTileGridAggregator(String name, AggregatorFactories factories, CellIdSource valuesSource,
GeoTileGridAggregator(String name, AggregatorFactories factories, ValuesSource.GeoPoint valuesSource,
int precision, GeoPointLongEncoder longEncoder,
int requiredSize, int shardSize, SearchContext aggregationContext, Aggregator parent,
List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData) throws IOException {
super(name, factories, valuesSource, requiredSize, shardSize, aggregationContext, parent, pipelineAggregators, metaData);
super(name, factories, valuesSource, precision, longEncoder, requiredSize, shardSize, aggregationContext, parent,
pipelineAggregators, metaData);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,7 @@ protected Aggregator doCreateInternal(final ValuesSource.GeoPoint valuesSource,
if (collectsFromSingleBucket == false) {
return asMultiBucketAggregator(this, context, parent);
}
CellIdSource cellIdSource = new CellIdSource(valuesSource, precision, GeoTileUtils::longEncode);
return new GeoTileGridAggregator(name, factories, cellIdSource, requiredSize, shardSize, context, parent,
pipelineAggregators, metaData);
return new GeoTileGridAggregator(name, factories, valuesSource, precision, GeoTileUtils::longEncode, requiredSize,
shardSize, context, parent, pipelineAggregators, metaData);
}
}

0 comments on commit f114ef6

Please sign in to comment.