Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve string terms aggregation performance using Collector#setWeight #11643

Merged
merged 30 commits into from
Mar 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
a9b519b
Use Collector.setWeight to improve aggregation performance
sandeshkr419 Dec 19, 2023
21761c9
Add case for no deleted docs
sandeshkr419 Jan 23, 2024
0ed5c0d
Minor refactoring - reverting making a field public
sandeshkr419 Jan 24, 2024
554e82d
Fixing weight and declaring it a s final
sandeshkr419 Jan 24, 2024
050519a
Revert "Fixing weight and declaring it a s final"
sandeshkr419 Jan 25, 2024
7ed030b
Bypassing doc iteration
sandeshkr419 Jan 25, 2024
d7db848
Refactoring and changelog addition
sandeshkr419 Feb 3, 2024
7aa424a
Add case for weight.count == 0
sandeshkr419 Feb 3, 2024
d52c578
minor refactoring
sandeshkr419 Feb 3, 2024
f5b7643
fixing weight null check
sandeshkr419 Feb 3, 2024
70571a7
spotless apply
sandeshkr419 Feb 5, 2024
3e8f96d
Lowcardinality case rectified - was not collecing buckets correctly
sandeshkr419 Feb 5, 2024
98a33cd
Fix test failures for termintate_early cases
sandeshkr419 Feb 8, 2024
85e9fe2
Optimize only for Standard Term Results
sandeshkr419 Feb 14, 2024
2673a1f
spotless fix
sandeshkr419 Feb 14, 2024
ab3f9f9
fix low cardinality case
sandeshkr419 Feb 15, 2024
52fd948
Remove extra checks
sandeshkr419 Feb 15, 2024
ebf4ce5
Minor optimization
sandeshkr419 Feb 15, 2024
b14c03e
Revert "Minor optimization"
sandeshkr419 Feb 16, 2024
318e4ea
fix significant terms aggregator condition
sandeshkr419 Feb 16, 2024
1c7b400
Add UTs for optimization
sandeshkr419 Feb 23, 2024
f3187ea
invalid comment correction
sandeshkr419 Feb 23, 2024
6b70df5
refactor
sandeshkr419 Feb 23, 2024
ca820f0
typo fix
sandeshkr419 Feb 23, 2024
9a82b34
Increase code coverage and minor refactoring
sandeshkr419 Feb 26, 2024
3c98cb2
Spotless in framework
sandeshkr419 Feb 27, 2024
9be19a0
Test cases improvement
sandeshkr419 Mar 8, 2024
128db47
test typos
sandeshkr419 Mar 9, 2024
7954045
Add test case for _doc_count
sandeshkr419 Mar 11, 2024
cdc4204
Rewording for changelog and minor documentation changes
sandeshkr419 Mar 12, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),

### Changed
- Allow composite aggregation to run under a parent filter aggregation ([#11499](https://github.com/opensearch-project/OpenSearch/pull/11499))
- Quickly compute terms aggregations when the top-level query is functionally match-all for a segment ([#11643](https://github.com/opensearch-project/OpenSearch/pull/11643))

### Deprecated

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,13 @@
import org.apache.lucene.index.DocValues;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.NumericDocValues;
import org.apache.lucene.index.SortedDocValues;
import org.apache.lucene.index.SortedSetDocValues;
import org.apache.lucene.index.Terms;
import org.apache.lucene.index.TermsEnum;
import org.apache.lucene.search.CollectionTerminatedException;
import org.apache.lucene.search.Weight;
import org.apache.lucene.util.ArrayUtil;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.PriorityQueue;
Expand All @@ -46,6 +51,7 @@
import org.opensearch.common.util.LongHash;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.index.mapper.DocCountFieldMapper;
import org.opensearch.search.DocValueFormat;
import org.opensearch.search.aggregations.AggregationExecutionException;
import org.opensearch.search.aggregations.Aggregator;
Expand Down Expand Up @@ -73,6 +79,7 @@

import static org.opensearch.search.aggregations.InternalOrder.isKeyOrder;
import static org.apache.lucene.index.SortedSetDocValues.NO_MORE_ORDS;
import static org.apache.lucene.search.DocIdSetIterator.NO_MORE_DOCS;

/**
* An aggregator of string values that relies on global ordinals in order to build buckets.
Expand All @@ -85,6 +92,8 @@

private final LongPredicate acceptedGlobalOrdinals;
private final long valueCount;
private final String fieldName;
private Weight weight;
private final GlobalOrdLookupFunction lookupGlobalOrd;
protected final CollectionStrategy collectionStrategy;
protected int segmentsWithSingleValuedOrds = 0;
Expand Down Expand Up @@ -136,16 +145,105 @@
return new DenseGlobalOrds();
});
}
this.fieldName = (valuesSource instanceof ValuesSource.Bytes.WithOrdinals.FieldData)
? ((ValuesSource.Bytes.WithOrdinals.FieldData) valuesSource).getIndexFieldName()
: null;
}

String descriptCollectionStrategy() {
return collectionStrategy.describe();
}

public void setWeight(Weight weight) {
this.weight = weight;
}

/**
Read doc frequencies directly from indexed terms in the segment to skip iterating through individual documents
@param ctx The LeafReaderContext to collect terms from
@param globalOrds The SortedSetDocValues for the field's ordinals
@param ordCountConsumer A consumer to accept collected term frequencies
@return A LeafBucketCollector implementation with collection termination, since collection is complete
@throws IOException If an I/O error occurs during reading
*/
LeafBucketCollector termDocFreqCollector(
LeafReaderContext ctx,
SortedSetDocValues globalOrds,
BiConsumer<Long, Integer> ordCountConsumer
) throws IOException {
if (weight == null) {
// Weight not assigned - cannot use this optimization
return null;
msfroh marked this conversation as resolved.
Show resolved Hide resolved
} else {
if (weight.count(ctx) == 0) {
// No documents matches top level query on this segment, we can skip the segment entirely
return LeafBucketCollector.NO_OP_COLLECTOR;
} else if (weight.count(ctx) != ctx.reader().maxDoc()) {
// weight.count(ctx) == ctx.reader().maxDoc() implies there are no deleted documents and
// top-level query matches all docs in the segment
return null;
msfroh marked this conversation as resolved.
Show resolved Hide resolved
}
}

Terms segmentTerms = ctx.reader().terms(this.fieldName);
sandeshkr419 marked this conversation as resolved.
Show resolved Hide resolved
if (segmentTerms == null) {
// Field is not indexed.
return null;
}

NumericDocValues docCountValues = DocValues.getNumeric(ctx.reader(), DocCountFieldMapper.NAME);
if (docCountValues.nextDoc() != NO_MORE_DOCS) {
// This segment has at least one document with the _doc_count field.
return null;
}

TermsEnum indexTermsEnum = segmentTerms.iterator();
msfroh marked this conversation as resolved.
Show resolved Hide resolved
BytesRef indexTerm = indexTermsEnum.next();
TermsEnum globalOrdinalTermsEnum = globalOrds.termsEnum();
BytesRef ordinalTerm = globalOrdinalTermsEnum.next();

// Iterate over the terms in the segment, look for matches in the global ordinal terms,
// and increment bucket count when segment terms match global ordinal terms.
while (indexTerm != null && ordinalTerm != null) {
int compare = indexTerm.compareTo(ordinalTerm);
if (compare == 0) {
if (acceptedGlobalOrdinals.test(globalOrdinalTermsEnum.ord())) {
ordCountConsumer.accept(globalOrdinalTermsEnum.ord(), indexTermsEnum.docFreq());
}
indexTerm = indexTermsEnum.next();
ordinalTerm = globalOrdinalTermsEnum.next();
} else if (compare < 0) {
indexTerm = indexTermsEnum.next();

Check warning on line 216 in server/src/main/java/org/opensearch/search/aggregations/bucket/terms/GlobalOrdinalsStringTermsAggregator.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/search/aggregations/bucket/terms/GlobalOrdinalsStringTermsAggregator.java#L216

Added line #L216 was not covered by tests
} else {
ordinalTerm = globalOrdinalTermsEnum.next();

Check warning on line 218 in server/src/main/java/org/opensearch/search/aggregations/bucket/terms/GlobalOrdinalsStringTermsAggregator.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/search/aggregations/bucket/terms/GlobalOrdinalsStringTermsAggregator.java#L218

Added line #L218 was not covered by tests
}
}
return new LeafBucketCollector() {
@Override
public void collect(int doc, long owningBucketOrd) throws IOException {
throw new CollectionTerminatedException();
}
};
}

@Override
public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucketCollector sub) throws IOException {
SortedSetDocValues globalOrds = valuesSource.globalOrdinalsValues(ctx);
collectionStrategy.globalOrdsReady(globalOrds);

if (collectionStrategy instanceof DenseGlobalOrds
&& this.resultStrategy instanceof StandardTermsResults
&& sub == LeafBucketCollector.NO_OP_COLLECTOR) {
LeafBucketCollector termDocFreqCollector = termDocFreqCollector(
ctx,
globalOrds,
(ord, docCount) -> incrementBucketDocCount(collectionStrategy.globalOrdToBucketOrd(0, ord), docCount)
);
if (termDocFreqCollector != null) {
msfroh marked this conversation as resolved.
Show resolved Hide resolved
return termDocFreqCollector;
}
}

SortedDocValues singleValues = DocValues.unwrapSingleton(globalOrds);
if (singleValues != null) {
segmentsWithSingleValuedOrds++;
Expand Down Expand Up @@ -343,9 +441,20 @@
final SortedSetDocValues segmentOrds = valuesSource.ordinalsValues(ctx);
segmentDocCounts = context.bigArrays().grow(segmentDocCounts, 1 + segmentOrds.getValueCount());
assert sub == LeafBucketCollector.NO_OP_COLLECTOR;
final SortedDocValues singleValues = DocValues.unwrapSingleton(segmentOrds);
mapping = valuesSource.globalOrdinalsMapping(ctx);
// Dense mode doesn't support include/exclude so we don't have to check it here.

if (this.resultStrategy instanceof StandardTermsResults) {
LeafBucketCollector termDocFreqCollector = this.termDocFreqCollector(
ctx,
sandeshkr419 marked this conversation as resolved.
Show resolved Hide resolved
segmentOrds,
(ord, docCount) -> incrementBucketDocCount(mapping.applyAsLong(ord), docCount)
);
if (termDocFreqCollector != null) {
return termDocFreqCollector;
}
}

final SortedDocValues singleValues = DocValues.unwrapSingleton(segmentOrds);
if (singleValues != null) {
segmentsWithSingleValuedOrds++;
return resultStrategy.wrapCollector(new LeafBucketCollectorBase(sub, segmentOrds) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,10 @@ public FieldData(IndexOrdinalsFieldData indexFieldData) {
this.indexFieldData = indexFieldData;
}

public String getIndexFieldName() {
return this.indexFieldData.getFieldName();
}

@Override
public SortedBinaryDocValues bytesValues(LeafReaderContext context) {
final LeafOrdinalsFieldData atomicFieldData = indexFieldData.load(context);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,11 @@ public BulkScorer bulkScorer(LeafReaderContext context) throws IOException {
return null;
}
}

@Override
public int count(LeafReaderContext context) throws IOException {
sandeshkr419 marked this conversation as resolved.
Show resolved Hide resolved
return weight.count(context);
}
};
} else {
return weight;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
package org.opensearch.search.aggregations.bucket.terms;

import org.apache.lucene.document.Document;
import org.apache.lucene.document.SortedSetDocValuesField;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.search.IndexSearcher;
Expand All @@ -41,7 +40,7 @@
import org.apache.lucene.search.Query;
import org.apache.lucene.store.Directory;
import org.apache.lucene.tests.index.RandomIndexWriter;
import org.apache.lucene.util.BytesRef;
import org.opensearch.common.TriConsumer;
import org.opensearch.index.mapper.KeywordFieldMapper;
import org.opensearch.index.mapper.MappedFieldType;
import org.opensearch.search.aggregations.AggregatorTestCase;
Expand All @@ -57,6 +56,8 @@
public class KeywordTermsAggregatorTests extends AggregatorTestCase {
private static final String KEYWORD_FIELD = "keyword";

private static final Consumer<TermsAggregationBuilder> CONFIGURE_KEYWORD_FIELD = agg -> agg.field(KEYWORD_FIELD);

private static final List<String> dataset;
static {
List<String> d = new ArrayList<>(45);
Expand All @@ -68,51 +69,63 @@ public class KeywordTermsAggregatorTests extends AggregatorTestCase {
dataset = d;
}

private static final Consumer<InternalMappedTerms> VERIFY_MATCH_ALL_DOCS = agg -> {
assertEquals(9, agg.getBuckets().size());
for (int i = 0; i < 9; i++) {
StringTerms.Bucket bucket = (StringTerms.Bucket) agg.getBuckets().get(i);
assertThat(bucket.getKey(), equalTo(String.valueOf(9L - i)));
assertThat(bucket.getDocCount(), equalTo(9L - i));
}
};

private static final Consumer<InternalMappedTerms> VERIFY_MATCH_NO_DOCS = agg -> { assertEquals(0, agg.getBuckets().size()); };

private static final Query MATCH_ALL_DOCS_QUERY = new MatchAllDocsQuery();

private static final Query MATCH_NO_DOCS_QUERY = new MatchNoDocsQuery();

public void testMatchNoDocs() throws IOException {
testSearchCase(
new MatchNoDocsQuery(),
ADD_SORTED_SET_FIELD_NOT_INDEXED,
MATCH_NO_DOCS_QUERY,
dataset,
aggregation -> aggregation.field(KEYWORD_FIELD),
agg -> assertEquals(0, agg.getBuckets().size()),
null // without type hint
CONFIGURE_KEYWORD_FIELD,
VERIFY_MATCH_NO_DOCS,
null // without type hint
);

testSearchCase(
new MatchNoDocsQuery(),
ADD_SORTED_SET_FIELD_NOT_INDEXED,
MATCH_NO_DOCS_QUERY,
dataset,
aggregation -> aggregation.field(KEYWORD_FIELD),
agg -> assertEquals(0, agg.getBuckets().size()),
ValueType.STRING // with type hint
CONFIGURE_KEYWORD_FIELD,
VERIFY_MATCH_NO_DOCS,
ValueType.STRING // with type hint
);
}

public void testMatchAllDocs() throws IOException {
Query query = new MatchAllDocsQuery();

testSearchCase(query, dataset, aggregation -> aggregation.field(KEYWORD_FIELD), agg -> {
assertEquals(9, agg.getBuckets().size());
for (int i = 0; i < 9; i++) {
StringTerms.Bucket bucket = (StringTerms.Bucket) agg.getBuckets().get(i);
assertThat(bucket.getKey(), equalTo(String.valueOf(9L - i)));
assertThat(bucket.getDocCount(), equalTo(9L - i));
}
},
null // without type hint
testSearchCase(
ADD_SORTED_SET_FIELD_NOT_INDEXED,
MATCH_ALL_DOCS_QUERY,
dataset,
CONFIGURE_KEYWORD_FIELD,
VERIFY_MATCH_ALL_DOCS,
null // without type hint
);

testSearchCase(query, dataset, aggregation -> aggregation.field(KEYWORD_FIELD), agg -> {
assertEquals(9, agg.getBuckets().size());
for (int i = 0; i < 9; i++) {
StringTerms.Bucket bucket = (StringTerms.Bucket) agg.getBuckets().get(i);
assertThat(bucket.getKey(), equalTo(String.valueOf(9L - i)));
assertThat(bucket.getDocCount(), equalTo(9L - i));
}
},
ValueType.STRING // with type hint
testSearchCase(
ADD_SORTED_SET_FIELD_NOT_INDEXED,
MATCH_ALL_DOCS_QUERY,
dataset,
CONFIGURE_KEYWORD_FIELD,
VERIFY_MATCH_ALL_DOCS,
ValueType.STRING // with type hint
);
}

private void testSearchCase(
TriConsumer<Document, String, String> addField,
Query query,
List<String> dataset,
Consumer<TermsAggregationBuilder> configure,
Expand All @@ -123,7 +136,7 @@ private void testSearchCase(
try (RandomIndexWriter indexWriter = new RandomIndexWriter(random(), directory)) {
Document document = new Document();
for (String value : dataset) {
document.add(new SortedSetDocValuesField(KEYWORD_FIELD, new BytesRef(value)));
addField.apply(document, KEYWORD_FIELD, value);
indexWriter.addDocument(document);
document.clear();
}
Expand All @@ -147,5 +160,4 @@ private void testSearchCase(
}
}
}

}
Loading
Loading