Skip to content

Commit

Permalink
Save memory when significant_text is not on top (elastic#58145)
Browse files Browse the repository at this point in the history
This merges the aggregator for `significant_text` into
`significant_terms`, applying the optimization built in elastic#55873 to save
memory when the aggregation is not on top. The `significant_text`
aggregation is pretty memory intensive all on its own and this doesn't
particularly help with that, but it'll help with the memory usage of any
sub-aggregations.
  • Loading branch information
nik9000 committed Jun 18, 2020
1 parent b8fa901 commit c9068ce
Show file tree
Hide file tree
Showing 9 changed files with 354 additions and 423 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,20 +25,18 @@
* A Trie structure for analysing byte streams for duplicate sequences. Bytes
* from a stream are added one at a time using the addByte method and the number
* of times it has been seen as part of a sequence is returned.
*
* <p>
* The minimum required length for a duplicate sequence detected is 6 bytes.
*
* <p>
* The design goals are to maximize speed of lookup while minimizing the space
* required to do so. This has led to a hybrid solution for representing the
* bytes that make up a sequence in the trie.
*
* <p>
* If we have 6 bytes in sequence e.g. abcdef then they are represented as
* object nodes in the tree as follows:
* <p>
* (a)-(b)-(c)-(def as an int)
* <p>
*
*
* {@link RootTreeNode} objects are used for the first two levels of the tree
* (representing bytes a and b in the example sequence). The combinations of
* objects at these 2 levels are few so internally these objects allocate an
Expand All @@ -61,11 +59,9 @@
* reached
* <li>halting any growth of the tree
* </ol>
*
* Tests on real-world-text show that the size of the tree is a multiple of the
* input text where that multiplier varies between 10 and 5 times as the content
* size increased from 10 to 100 megabytes of content.
*
*/
public class DuplicateByteSequenceSpotter {
public static final int TREE_DEPTH = 6;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,22 +47,23 @@
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.function.LongConsumer;

/**
* An aggregator of string values that hashes the strings on the fly rather
* than up front like the {@link GlobalOrdinalsStringTermsAggregator}.
*/
public class MapStringTermsAggregator extends AbstractStringTermsAggregator {
private final CollectorSource collectorSource;
private final ResultStrategy<?, ?> resultStrategy;
private final ValuesSource valuesSource;
private final BytesKeyedBucketOrds bucketOrds;
private final IncludeExclude.StringFilter includeExclude;

public MapStringTermsAggregator(
String name,
AggregatorFactories factories,
CollectorSource collectorSource,
Function<MapStringTermsAggregator, ResultStrategy<?, ?>> resultStrategy,
ValuesSource valuesSource,
BucketOrder order,
DocValueFormat format,
BucketCountThresholds bucketCountThresholds,
Expand All @@ -75,56 +76,39 @@ public MapStringTermsAggregator(
Map<String, Object> metadata
) throws IOException {
super(name, factories, context, parent, order, format, bucketCountThresholds, collectionMode, showTermDocCountError, metadata);
this.collectorSource = collectorSource;
this.resultStrategy = resultStrategy.apply(this); // ResultStrategy needs a reference to the Aggregator to do its job.
this.valuesSource = valuesSource;
this.includeExclude = includeExclude;
bucketOrds = BytesKeyedBucketOrds.build(context.bigArrays(), collectsFromSingleBucket);
}

@Override
public ScoreMode scoreMode() {
if (valuesSource != null && valuesSource.needsScores()) {
if (collectorSource.needsScores()) {
return ScoreMode.COMPLETE;
}
return super.scoreMode();
}

@Override
public LeafBucketCollector getLeafCollector(LeafReaderContext ctx,
final LeafBucketCollector sub) throws IOException {
SortedBinaryDocValues values = valuesSource.bytesValues(ctx);
return resultStrategy.wrapCollector(new LeafBucketCollectorBase(sub, values) {
final BytesRefBuilder previous = new BytesRefBuilder();

@Override
public void collect(int doc, long owningBucketOrd) throws IOException {
if (false == values.advanceExact(doc)) {
return;
}
int valuesCount = values.docValueCount();

// SortedBinaryDocValues don't guarantee uniqueness so we
// need to take care of dups
previous.clear();
for (int i = 0; i < valuesCount; ++i) {
final BytesRef bytes = values.nextValue();
if (includeExclude != null && false == includeExclude.accept(bytes)) {
continue;
}
if (i > 0 && previous.get().equals(bytes)) {
continue;
}
public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucketCollector sub) throws IOException {
return resultStrategy.wrapCollector(
collectorSource.getLeafCollector(
includeExclude,
ctx,
sub,
this::addRequestCircuitBreakerBytes,
(s, doc, owningBucketOrd, bytes) -> {
long bucketOrdinal = bucketOrds.add(owningBucketOrd, bytes);
if (bucketOrdinal < 0) { // already seen
bucketOrdinal = -1 - bucketOrdinal;
collectExistingBucket(sub, doc, bucketOrdinal);
collectExistingBucket(s, doc, bucketOrdinal);
} else {
collectBucket(sub, doc, bucketOrdinal);
collectBucket(s, doc, bucketOrdinal);
}
previous.copyBytes(bytes);
}
}
});
)
);
}

@Override
Expand All @@ -146,7 +130,82 @@ public void collectDebugInfo(BiConsumer<String, Object> add) {

@Override
public void doClose() {
Releasables.close(bucketOrds, resultStrategy);
Releasables.close(collectorSource, resultStrategy, bucketOrds);
}

/**
* Abstaction on top of building collectors to fetch values.
*/
public interface CollectorSource extends Releasable {
boolean needsScores();

LeafBucketCollector getLeafCollector(
IncludeExclude.StringFilter includeExclude,
LeafReaderContext ctx,
LeafBucketCollector sub,
LongConsumer addRequestCircuitBreakerBytes,
CollectConsumer consumer
) throws IOException;
}
@FunctionalInterface
public interface CollectConsumer {
void accept(LeafBucketCollector sub, int doc, long owningBucketOrd, BytesRef bytes) throws IOException;
}

/**
* Fetch values from a {@link ValuesSource}.
*/
public static class ValuesSourceCollectorSource implements CollectorSource {
private final ValuesSource valuesSource;

public ValuesSourceCollectorSource(ValuesSource valuesSource) {
this.valuesSource = valuesSource;
}

@Override
public boolean needsScores() {
return valuesSource.needsScores();
}

@Override
public LeafBucketCollector getLeafCollector(
IncludeExclude.StringFilter includeExclude,
LeafReaderContext ctx,
LeafBucketCollector sub,
LongConsumer addRequestCircuitBreakerBytes,
CollectConsumer consumer
) throws IOException {
SortedBinaryDocValues values = valuesSource.bytesValues(ctx);
return new LeafBucketCollectorBase(sub, values) {
final BytesRefBuilder previous = new BytesRefBuilder();

@Override
public void collect(int doc, long owningBucketOrd) throws IOException {
if (false == values.advanceExact(doc)) {
return;
}
int valuesCount = values.docValueCount();

// SortedBinaryDocValues don't guarantee uniqueness so we
// need to take care of dups
previous.clear();
for (int i = 0; i < valuesCount; ++i) {
BytesRef bytes = values.nextValue();
if (includeExclude != null && false == includeExclude.accept(bytes)) {
continue;
}
if (i > 0 && previous.get().equals(bytes)) {
continue;
}
previous.copyBytes(bytes);
consumer.accept(sub, doc, owningBucketOrd, bytes);
}
}
};
}

@Override
public void close() {}
}

/**
Expand Down Expand Up @@ -270,6 +329,12 @@ private InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws
* Builds results for the standard {@code terms} aggregation.
*/
class StandardTermsResults extends ResultStrategy<StringTerms, StringTerms.Bucket> {
private final ValuesSource valuesSource;

StandardTermsResults(ValuesSource valuesSource) {
this.valuesSource = valuesSource;
}

@Override
String describe() {
return "terms";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,11 @@
import org.elasticsearch.common.util.BytesRefHash;
import org.elasticsearch.common.util.LongArray;
import org.elasticsearch.common.util.LongHash;
import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryShardContext;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.bucket.terms.heuristic.SignificanceHeuristic;
import org.elasticsearch.search.aggregations.support.ValuesSourceConfig;

import java.io.IOException;

Expand All @@ -62,14 +63,17 @@ interface BackgroundFrequencyForLong extends Releasable {
}

private final QueryShardContext context;
private final ValuesSourceConfig config;
private final MappedFieldType fieldType;
private final DocValueFormat format;
private final Query backgroundFilter;
private final int supersetNumDocs;
private TermsEnum termsEnum;

SignificanceLookup(QueryShardContext context, ValuesSourceConfig config, QueryBuilder backgroundFilter) throws IOException {
SignificanceLookup(QueryShardContext context, MappedFieldType fieldType, DocValueFormat format, QueryBuilder backgroundFilter)
throws IOException {
this.context = context;
this.config = config;
this.fieldType = fieldType;
this.format = format;
this.backgroundFilter = backgroundFilter == null ? null : backgroundFilter.toQuery(context);
/*
* We need to use a superset size that includes deleted docs or we
Expand Down Expand Up @@ -129,7 +133,7 @@ public void close() {
* Get the background frequency of a {@link BytesRef} term.
*/
private long getBackgroundFrequency(BytesRef term) throws IOException {
return getBackgroundFrequency(config.fieldContext().fieldType().termQuery(config.format().format(term).toString(), context));
return getBackgroundFrequency(fieldType.termQuery(format.format(term).toString(), context));
}

/**
Expand Down Expand Up @@ -174,7 +178,7 @@ public void close() {
* Get the background frequency of a {@code long} term.
*/
private long getBackgroundFrequency(long term) throws IOException {
return getBackgroundFrequency(config.fieldContext().fieldType().termQuery(config.format().format(term).toString(), context));
return getBackgroundFrequency(fieldType.termQuery(format.format(term).toString(), context));
}

private long getBackgroundFrequency(Query query) throws IOException {
Expand All @@ -201,7 +205,7 @@ private TermsEnum getTermsEnum(String field) throws IOException {
return termsEnum;
}
IndexReader reader = context.getIndexReader();
termsEnum = new FilterableTermsEnum(reader, config.fieldContext().field(), PostingsEnum.NONE, backgroundFilter);
termsEnum = new FilterableTermsEnum(reader, fieldType.name(), PostingsEnum.NONE, backgroundFilter);
return termsEnum;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,13 @@ protected Aggregator doCreateInternal(SearchContext searchContext,
bucketCountThresholds.setShardSize(2 * BucketUtils.suggestShardSideQueueSize(bucketCountThresholds.getRequiredSize()));
}

SignificanceLookup lookup = new SignificanceLookup(queryShardContext, config, backgroundFilter);
SignificanceLookup lookup = new SignificanceLookup(
queryShardContext,
config.fieldContext().fieldType(),
config.format(),
backgroundFilter
);

return sigTermsAggregatorSupplier.build(name, factories, config.getValuesSource(), config.format(),
bucketCountThresholds, includeExclude, executionHint, searchContext, parent,
significanceHeuristic, lookup, collectsFromSingleBucket, metadata);
Expand Down Expand Up @@ -255,8 +261,8 @@ Aggregator create(String name,
return new MapStringTermsAggregator(
name,
factories,
new MapStringTermsAggregator.ValuesSourceCollectorSource(valuesSource),
a -> a.new SignificantTermsResults(lookup, significanceHeuristic, collectsFromSingleBucket),
valuesSource,
null,
format,
bucketCountThresholds,
Expand Down
Loading

0 comments on commit c9068ce

Please sign in to comment.