Skip to content

Commit

Permalink
Quickly compute terms aggregations when the top-level query is functi…
Browse files Browse the repository at this point in the history
…onally match-all for a segment (opensearch-project#11643)



---------

Signed-off-by: Sandesh Kumar <[email protected]>
  • Loading branch information
sandeshkr419 committed Mar 13, 2024
1 parent 1235756 commit bd31dae
Show file tree
Hide file tree
Showing 8 changed files with 431 additions and 82 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),

### Changed
- Allow composite aggregation to run under a parent filter aggregation ([#11499](https://github.com/opensearch-project/OpenSearch/pull/11499))
- Quickly compute terms aggregations when the top-level query is functionally match-all for a segment ([#11643](https://github.com/opensearch-project/OpenSearch/pull/11643))

### Deprecated

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,13 @@
import org.apache.lucene.index.DocValues;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.NumericDocValues;
import org.apache.lucene.index.SortedDocValues;
import org.apache.lucene.index.SortedSetDocValues;
import org.apache.lucene.index.Terms;
import org.apache.lucene.index.TermsEnum;
import org.apache.lucene.search.CollectionTerminatedException;
import org.apache.lucene.search.Weight;
import org.apache.lucene.util.ArrayUtil;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.PriorityQueue;
Expand All @@ -46,6 +51,7 @@
import org.opensearch.common.util.LongHash;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.index.mapper.DocCountFieldMapper;
import org.opensearch.search.DocValueFormat;
import org.opensearch.search.aggregations.AggregationExecutionException;
import org.opensearch.search.aggregations.Aggregator;
Expand Down Expand Up @@ -73,6 +79,7 @@

import static org.opensearch.search.aggregations.InternalOrder.isKeyOrder;
import static org.apache.lucene.index.SortedSetDocValues.NO_MORE_ORDS;
import static org.apache.lucene.search.DocIdSetIterator.NO_MORE_DOCS;

/**
* An aggregator of string values that relies on global ordinals in order to build buckets.
Expand All @@ -85,6 +92,8 @@ public class GlobalOrdinalsStringTermsAggregator extends AbstractStringTermsAggr

private final LongPredicate acceptedGlobalOrdinals;
private final long valueCount;
private final String fieldName;
private Weight weight;
private final GlobalOrdLookupFunction lookupGlobalOrd;
protected final CollectionStrategy collectionStrategy;
protected int segmentsWithSingleValuedOrds = 0;
Expand Down Expand Up @@ -136,16 +145,105 @@ public GlobalOrdinalsStringTermsAggregator(
return new DenseGlobalOrds();
});
}
this.fieldName = (valuesSource instanceof ValuesSource.Bytes.WithOrdinals.FieldData)
? ((ValuesSource.Bytes.WithOrdinals.FieldData) valuesSource).getIndexFieldName()
: null;
}

String descriptCollectionStrategy() {
return collectionStrategy.describe();
}

public void setWeight(Weight weight) {
this.weight = weight;
}

/**
Read doc frequencies directly from indexed terms in the segment to skip iterating through individual documents
@param ctx The LeafReaderContext to collect terms from
@param globalOrds The SortedSetDocValues for the field's ordinals
@param ordCountConsumer A consumer to accept collected term frequencies
@return A LeafBucketCollector implementation with collection termination, since collection is complete
@throws IOException If an I/O error occurs during reading
*/
LeafBucketCollector termDocFreqCollector(
LeafReaderContext ctx,
SortedSetDocValues globalOrds,
BiConsumer<Long, Integer> ordCountConsumer
) throws IOException {
if (weight == null) {
// Weight not assigned - cannot use this optimization
return null;
} else {
if (weight.count(ctx) == 0) {
// No documents matches top level query on this segment, we can skip the segment entirely
return LeafBucketCollector.NO_OP_COLLECTOR;
} else if (weight.count(ctx) != ctx.reader().maxDoc()) {
// weight.count(ctx) == ctx.reader().maxDoc() implies there are no deleted documents and
// top-level query matches all docs in the segment
return null;
}
}

Terms segmentTerms = ctx.reader().terms(this.fieldName);
if (segmentTerms == null) {
// Field is not indexed.
return null;
}

NumericDocValues docCountValues = DocValues.getNumeric(ctx.reader(), DocCountFieldMapper.NAME);
if (docCountValues.nextDoc() != NO_MORE_DOCS) {
// This segment has at least one document with the _doc_count field.
return null;
}

TermsEnum indexTermsEnum = segmentTerms.iterator();
BytesRef indexTerm = indexTermsEnum.next();
TermsEnum globalOrdinalTermsEnum = globalOrds.termsEnum();
BytesRef ordinalTerm = globalOrdinalTermsEnum.next();

// Iterate over the terms in the segment, look for matches in the global ordinal terms,
// and increment bucket count when segment terms match global ordinal terms.
while (indexTerm != null && ordinalTerm != null) {
int compare = indexTerm.compareTo(ordinalTerm);
if (compare == 0) {
if (acceptedGlobalOrdinals.test(globalOrdinalTermsEnum.ord())) {
ordCountConsumer.accept(globalOrdinalTermsEnum.ord(), indexTermsEnum.docFreq());
}
indexTerm = indexTermsEnum.next();
ordinalTerm = globalOrdinalTermsEnum.next();
} else if (compare < 0) {
indexTerm = indexTermsEnum.next();
} else {
ordinalTerm = globalOrdinalTermsEnum.next();
}
}
return new LeafBucketCollector() {
@Override
public void collect(int doc, long owningBucketOrd) throws IOException {
throw new CollectionTerminatedException();
}
};
}

@Override
public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucketCollector sub) throws IOException {
SortedSetDocValues globalOrds = valuesSource.globalOrdinalsValues(ctx);
collectionStrategy.globalOrdsReady(globalOrds);

if (collectionStrategy instanceof DenseGlobalOrds
&& this.resultStrategy instanceof StandardTermsResults
&& sub == LeafBucketCollector.NO_OP_COLLECTOR) {
LeafBucketCollector termDocFreqCollector = termDocFreqCollector(
ctx,
globalOrds,
(ord, docCount) -> incrementBucketDocCount(collectionStrategy.globalOrdToBucketOrd(0, ord), docCount)
);
if (termDocFreqCollector != null) {
return termDocFreqCollector;
}
}

SortedDocValues singleValues = DocValues.unwrapSingleton(globalOrds);
if (singleValues != null) {
segmentsWithSingleValuedOrds++;
Expand Down Expand Up @@ -343,9 +441,20 @@ public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucketCol
final SortedSetDocValues segmentOrds = valuesSource.ordinalsValues(ctx);
segmentDocCounts = context.bigArrays().grow(segmentDocCounts, 1 + segmentOrds.getValueCount());
assert sub == LeafBucketCollector.NO_OP_COLLECTOR;
final SortedDocValues singleValues = DocValues.unwrapSingleton(segmentOrds);
mapping = valuesSource.globalOrdinalsMapping(ctx);
// Dense mode doesn't support include/exclude so we don't have to check it here.

if (this.resultStrategy instanceof StandardTermsResults) {
LeafBucketCollector termDocFreqCollector = this.termDocFreqCollector(
ctx,
segmentOrds,
(ord, docCount) -> incrementBucketDocCount(mapping.applyAsLong(ord), docCount)
);
if (termDocFreqCollector != null) {
return termDocFreqCollector;
}
}

final SortedDocValues singleValues = DocValues.unwrapSingleton(segmentOrds);
if (singleValues != null) {
segmentsWithSingleValuedOrds++;
return resultStrategy.wrapCollector(new LeafBucketCollectorBase(sub, segmentOrds) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,10 @@ public FieldData(IndexOrdinalsFieldData indexFieldData) {
this.indexFieldData = indexFieldData;
}

public String getIndexFieldName() {
return this.indexFieldData.getFieldName();
}

@Override
public SortedBinaryDocValues bytesValues(LeafReaderContext context) {
final LeafOrdinalsFieldData atomicFieldData = indexFieldData.load(context);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,11 @@ public BulkScorer bulkScorer(LeafReaderContext context) throws IOException {
return null;
}
}

@Override
public int count(LeafReaderContext context) throws IOException {
return weight.count(context);
}
};
} else {
return weight;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
package org.opensearch.search.aggregations.bucket.terms;

import org.apache.lucene.document.Document;
import org.apache.lucene.document.SortedSetDocValuesField;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.search.IndexSearcher;
Expand All @@ -41,7 +40,7 @@
import org.apache.lucene.search.Query;
import org.apache.lucene.store.Directory;
import org.apache.lucene.tests.index.RandomIndexWriter;
import org.apache.lucene.util.BytesRef;
import org.opensearch.common.TriConsumer;
import org.opensearch.index.mapper.KeywordFieldMapper;
import org.opensearch.index.mapper.MappedFieldType;
import org.opensearch.search.aggregations.AggregatorTestCase;
Expand All @@ -57,6 +56,8 @@
public class KeywordTermsAggregatorTests extends AggregatorTestCase {
private static final String KEYWORD_FIELD = "keyword";

private static final Consumer<TermsAggregationBuilder> CONFIGURE_KEYWORD_FIELD = agg -> agg.field(KEYWORD_FIELD);

private static final List<String> dataset;
static {
List<String> d = new ArrayList<>(45);
Expand All @@ -68,51 +69,63 @@ public class KeywordTermsAggregatorTests extends AggregatorTestCase {
dataset = d;
}

private static final Consumer<InternalMappedTerms> VERIFY_MATCH_ALL_DOCS = agg -> {
assertEquals(9, agg.getBuckets().size());
for (int i = 0; i < 9; i++) {
StringTerms.Bucket bucket = (StringTerms.Bucket) agg.getBuckets().get(i);
assertThat(bucket.getKey(), equalTo(String.valueOf(9L - i)));
assertThat(bucket.getDocCount(), equalTo(9L - i));
}
};

private static final Consumer<InternalMappedTerms> VERIFY_MATCH_NO_DOCS = agg -> { assertEquals(0, agg.getBuckets().size()); };

private static final Query MATCH_ALL_DOCS_QUERY = new MatchAllDocsQuery();

private static final Query MATCH_NO_DOCS_QUERY = new MatchNoDocsQuery();

public void testMatchNoDocs() throws IOException {
testSearchCase(
new MatchNoDocsQuery(),
ADD_SORTED_SET_FIELD_NOT_INDEXED,
MATCH_NO_DOCS_QUERY,
dataset,
aggregation -> aggregation.field(KEYWORD_FIELD),
agg -> assertEquals(0, agg.getBuckets().size()),
null // without type hint
CONFIGURE_KEYWORD_FIELD,
VERIFY_MATCH_NO_DOCS,
null // without type hint
);

testSearchCase(
new MatchNoDocsQuery(),
ADD_SORTED_SET_FIELD_NOT_INDEXED,
MATCH_NO_DOCS_QUERY,
dataset,
aggregation -> aggregation.field(KEYWORD_FIELD),
agg -> assertEquals(0, agg.getBuckets().size()),
ValueType.STRING // with type hint
CONFIGURE_KEYWORD_FIELD,
VERIFY_MATCH_NO_DOCS,
ValueType.STRING // with type hint
);
}

public void testMatchAllDocs() throws IOException {
Query query = new MatchAllDocsQuery();

testSearchCase(query, dataset, aggregation -> aggregation.field(KEYWORD_FIELD), agg -> {
assertEquals(9, agg.getBuckets().size());
for (int i = 0; i < 9; i++) {
StringTerms.Bucket bucket = (StringTerms.Bucket) agg.getBuckets().get(i);
assertThat(bucket.getKey(), equalTo(String.valueOf(9L - i)));
assertThat(bucket.getDocCount(), equalTo(9L - i));
}
},
null // without type hint
testSearchCase(
ADD_SORTED_SET_FIELD_NOT_INDEXED,
MATCH_ALL_DOCS_QUERY,
dataset,
CONFIGURE_KEYWORD_FIELD,
VERIFY_MATCH_ALL_DOCS,
null // without type hint
);

testSearchCase(query, dataset, aggregation -> aggregation.field(KEYWORD_FIELD), agg -> {
assertEquals(9, agg.getBuckets().size());
for (int i = 0; i < 9; i++) {
StringTerms.Bucket bucket = (StringTerms.Bucket) agg.getBuckets().get(i);
assertThat(bucket.getKey(), equalTo(String.valueOf(9L - i)));
assertThat(bucket.getDocCount(), equalTo(9L - i));
}
},
ValueType.STRING // with type hint
testSearchCase(
ADD_SORTED_SET_FIELD_NOT_INDEXED,
MATCH_ALL_DOCS_QUERY,
dataset,
CONFIGURE_KEYWORD_FIELD,
VERIFY_MATCH_ALL_DOCS,
ValueType.STRING // with type hint
);
}

private void testSearchCase(
TriConsumer<Document, String, String> addField,
Query query,
List<String> dataset,
Consumer<TermsAggregationBuilder> configure,
Expand All @@ -123,7 +136,7 @@ private void testSearchCase(
try (RandomIndexWriter indexWriter = new RandomIndexWriter(random(), directory)) {
Document document = new Document();
for (String value : dataset) {
document.add(new SortedSetDocValuesField(KEYWORD_FIELD, new BytesRef(value)));
addField.apply(document, KEYWORD_FIELD, value);
indexWriter.addDocument(document);
document.clear();
}
Expand All @@ -147,5 +160,4 @@ private void testSearchCase(
}
}
}

}
Loading

0 comments on commit bd31dae

Please sign in to comment.