Skip to content

Commit

Permalink
Make parent and child aggregator more obvious (elastic#57490)
Browse files Browse the repository at this point in the history
Pulls the way that the `ParentJoinAggregator` collects global ordinals
into a strategy object so it is a little simpler to reason about and
it'll be simpler to save memory by removing `asMultiBucketAggregator` in
the future.

Relates to elastic#56487
  • Loading branch information
nik9000 committed Jun 2, 2020
1 parent 97c0681 commit 3e454df
Showing 1 changed file with 79 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,10 @@
import org.apache.lucene.search.Scorer;
import org.apache.lucene.search.Weight;
import org.apache.lucene.util.Bits;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.BitArray;
import org.elasticsearch.common.util.LongHash;
import org.elasticsearch.search.aggregations.Aggregator;
Expand All @@ -52,16 +54,11 @@ public abstract class ParentJoinAggregator extends BucketsAggregator implements
private final Weight inFilter;
private final Weight outFilter;
private final ValuesSource.Bytes.WithOrdinals valuesSource;
private final boolean singleAggregator;

/**
* If this aggregator is nested under another aggregator we allocate a long hash per bucket.
* Strategy for collecting results.
*/
private final LongHash ordsHash;
/**
* Otherwise we use a dense bit array to record the global ordinals.
*/
private final BitArray ordsBit;
private final CollectionStrategy collectionStrategy;

public ParentJoinAggregator(String name,
AggregatorFactories factories,
Expand All @@ -83,21 +80,9 @@ public ParentJoinAggregator(String name,
this.inFilter = context.searcher().createWeight(context.searcher().rewrite(inFilter), ScoreMode.COMPLETE_NO_SCORES, 1f);
this.outFilter = context.searcher().createWeight(context.searcher().rewrite(outFilter), ScoreMode.COMPLETE_NO_SCORES, 1f);
this.valuesSource = valuesSource;
this.singleAggregator = parent == null;
this.ordsBit = singleAggregator ? new BitArray((int) maxOrd, context.bigArrays()) : null;
this.ordsHash = singleAggregator ? null : new LongHash(1, context.bigArrays());
}

private void addGlobalOrdinal(int globalOrdinal) {
if (singleAggregator) {
ordsBit.set(globalOrdinal);
} else {
ordsHash.add(globalOrdinal);
}
}

private boolean existsGlobalOrdinal(int globalOrdinal) {
return singleAggregator ? ordsBit.get(globalOrdinal): ordsHash.find(globalOrdinal) >= 0;
boolean singleAggregator = parent == null;
collectionStrategy = singleAggregator ?
new DenseCollectionStrategy(maxOrd, context.bigArrays()) : new SparseCollectionStrategy(context.bigArrays());
}

@Override
Expand All @@ -115,7 +100,7 @@ public void collect(int docId, long bucket) throws IOException {
if (parentDocs.get(docId) && globalOrdinals.advanceExact(docId)) {
int globalOrdinal = (int) globalOrdinals.nextOrd();
assert globalOrdinal != -1 && globalOrdinals.nextOrd() == SortedSetDocValues.NO_MORE_ORDS;
addGlobalOrdinal(globalOrdinal);
collectionStrategy.addGlobalOrdinal(globalOrdinal);
}
}
};
Expand Down Expand Up @@ -155,7 +140,7 @@ public int docID() {
if (globalOrdinals.advanceExact(docId)) {
int globalOrdinal = (int) globalOrdinals.nextOrd();
assert globalOrdinal != -1 && globalOrdinals.nextOrd() == SortedSetDocValues.NO_MORE_ORDS;
if (existsGlobalOrdinal(globalOrdinal)) {
if (collectionStrategy.existsGlobalOrdinal(globalOrdinal)) {
collectBucket(sub, docId, 0);
}
}
Expand All @@ -165,6 +150,75 @@ public int docID() {

@Override
protected void doClose() {
Releasables.close(ordsBit, ordsHash);
Releasables.close(collectionStrategy);
}

/**
* Strategy for collecting the global ordinals of the join field in for all
* docs that match the {@code ParentJoinAggregator#inFilter} and then
* checking if which of the docs in the
* {@code ParentJoinAggregator#outFilter} also have the ordinal.
*/
protected interface CollectionStrategy extends Releasable {
void addGlobalOrdinal(int globalOrdinal);
boolean existsGlobalOrdinal(int globalOrdinal);
}

/**
* Uses a dense, bit per ordinal representation of the join field in the
* docs that match {@code ParentJoinAggregator#inFilter}. Its memory usage
* is proportional to the maximum ordinal so it is only a good choice if
* most docs match.
*/
protected class DenseCollectionStrategy implements CollectionStrategy {
private final BitArray ordsBits;

public DenseCollectionStrategy(long maxOrd, BigArrays bigArrays) {
ordsBits = new BitArray((int) maxOrd, context.bigArrays());
}

@Override
public void addGlobalOrdinal(int globalOrdinal) {
ordsBits.set(globalOrdinal);
}

@Override
public boolean existsGlobalOrdinal(int globalOrdinal) {
return ordsBits.get(globalOrdinal);
}

@Override
public void close() {
ordsBits.close();
}
}

/**
* Uses a hashed representation of whether of the join field in the docs
* that match {@code ParentJoinAggregator#inFilter}. Its memory usage is
* proportional to the number of matching global ordinals so it is used
* when only some docs might match.
*/
protected class SparseCollectionStrategy implements CollectionStrategy {
private final LongHash ordsHash;

public SparseCollectionStrategy(BigArrays bigArrays) {
ordsHash = new LongHash(1, bigArrays);
}

@Override
public void addGlobalOrdinal(int globalOrdinal) {
ordsHash.add(globalOrdinal);
}

@Override
public boolean existsGlobalOrdinal(int globalOrdinal) {
return ordsHash.find(globalOrdinal) >= 0;
}

@Override
public void close() {
ordsHash.close();
}
}
}

0 comments on commit 3e454df

Please sign in to comment.