Skip to content

Commit

Permalink
OnHeap Star Tree Implementation
Browse files Browse the repository at this point in the history
Signed-off-by: Sarthak Aggarwal <[email protected]>
  • Loading branch information
sarthakaggarwal97 committed Jun 23, 2024
1 parent 950632f commit e1bfa95
Show file tree
Hide file tree
Showing 32 changed files with 2,592 additions and 10 deletions.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.apache.lucene.index;

/**
* A wrapper for {@link DocValuesWriter} that contains the {@link DocValuesType} of the doc
*/
public class StarTreeDocValuesWriter {

private final DocValuesType docValuesType;
private final DocValuesWriter<?> docValuesWriter;

public StarTreeDocValuesWriter(DocValuesType docValuesType, DocValuesWriter docValuesWriter) {
this.docValuesType = docValuesType;
this.docValuesWriter = docValuesWriter;
}

/**
* Get the doc values type
*/
public DocValuesType getDocValuesType() {
return docValuesType;
}

/**
* Get the doc values writer
*/
public DocValuesWriter<?> getDocValuesWriter() {
return docValuesWriter;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ public <T> Void visit(Binding<T> binding) {
if (!overriddenKeys.remove(binding.getKey())) {
super.visit(binding);

// Record when a scope instance is used in a binding
// record when a scope instance is used in a binding
Scope scope = getScopeInstanceOrNull(binding);
if (scope != null) {
scopeInstancesInUse.put(scope, binding.getSource());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ public enum MetricStat {
AVG("avg"),
SUM("sum"),
MIN("min"),
MAX("max");
MAX("max"),
UNSUPPORTED("unsupported");

private final String typeName;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@
import org.opensearch.core.xcontent.XContentBuilder;

import java.io.IOException;
import java.util.List;
import java.util.Locale;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;

/**
Expand All @@ -27,10 +27,10 @@
public class StarTreeFieldConfiguration implements ToXContent {

private final AtomicInteger maxLeafDocs = new AtomicInteger();
private final List<String> skipStarNodeCreationInDims;
private final Set<String> skipStarNodeCreationInDims;
private final StarTreeBuildMode buildMode;

public StarTreeFieldConfiguration(int maxLeafDocs, List<String> skipStarNodeCreationInDims, StarTreeBuildMode buildMode) {
public StarTreeFieldConfiguration(int maxLeafDocs, Set<String> skipStarNodeCreationInDims, StarTreeBuildMode buildMode) {
this.maxLeafDocs.set(maxLeafDocs);
this.skipStarNodeCreationInDims = skipStarNodeCreationInDims;
this.buildMode = buildMode;
Expand Down Expand Up @@ -87,7 +87,7 @@ public StarTreeBuildMode getBuildMode() {
return buildMode;
}

public List<String> getSkipStarNodeCreationInDims() {
public Set<String> getSkipStarNodeCreationInDims() {
return skipStarNodeCreationInDims;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.index.compositeindex.datacube.startree.aggregators;

import org.opensearch.index.compositeindex.datacube.MetricStat;

import java.util.Comparator;

/**
* Builds aggregation function and doc values field pair to support various aggregations
* @opensearch.experimental
*/
public class MetricStatFieldPair implements Comparable<MetricStatFieldPair> {

public static final String DELIMITER = "__";
public static final String STAR = "*";
public static final MetricStatFieldPair COUNT_STAR = new MetricStatFieldPair(MetricStat.COUNT, STAR);

private final MetricStat metricStat;
private final String field;

/**
* Constructor for MetricStatFieldPair
*/
public MetricStatFieldPair(MetricStat metricStat, String field) {
this.metricStat = metricStat;
if (metricStat == MetricStat.COUNT) {
this.field = STAR;
} else {
this.field = field;
}
}

/**
* @return Metric Type
*/
public MetricStat getMetricStat() {
return metricStat;
}

/**
* @return field Name
*/
public String getField() {
return field;
}

/**
* @return field name with metric type and field
*/
public String toFieldName() {
return toFieldName(metricStat, field);
}

/**
* Builds field name with metric type and field
*/
public static String toFieldName(MetricStat metricType, String field) {
return metricType.getTypeName() + DELIMITER + field;
}

/**
* Builds MetricStatFieldPair from field name
*/
public static MetricStatFieldPair fromFieldName(String fieldName) {
String[] parts = fieldName.split(DELIMITER, 2);
return fromMetricAndFieldName(parts[0], parts[1]);
}

/**
* Builds MetricStatFieldPair from metric and field name
*/
private static MetricStatFieldPair fromMetricAndFieldName(String metricName, String fieldName) {
MetricStat metricType = MetricStat.fromTypeName(metricName);
if (metricType == MetricStat.COUNT) {
return COUNT_STAR;
} else {
return new MetricStatFieldPair(metricType, fieldName);
}
}

@Override
public int hashCode() {
return 31 * metricStat.hashCode() + field.hashCode();
}

@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj instanceof MetricStatFieldPair) {
MetricStatFieldPair anotherPair = (MetricStatFieldPair) obj;
return metricStat == anotherPair.metricStat && field.equals(anotherPair.field);
}
return false;
}

@Override
public String toString() {
return toFieldName();
}

@Override
public int compareTo(MetricStatFieldPair other) {
return Comparator.comparing((MetricStatFieldPair o) -> o.field)
.thenComparing((MetricStatFieldPair o) -> o.metricStat)
.compare(this, other);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.index.compositeindex.datacube.startree.aggregators;

import org.apache.lucene.util.NumericUtils;
import org.opensearch.index.compositeindex.datacube.MetricStat;
import org.opensearch.index.compositeindex.datacube.startree.aggregators.numerictype.StarTreeNumericType;
import org.opensearch.index.compositeindex.datacube.startree.data.DataType;
import org.opensearch.search.aggregations.metrics.CompensatedSum;

/**
* Sum value aggregator for star tree
*
* @opensearch.internal
*/
public class SumValueAggregator implements ValueAggregator<Double> {
public static final DataType AGGREGATED_VALUE_TYPE = DataType.DOUBLE;

@Override
public MetricStat getAggregationType() {
return MetricStat.SUM;
}

@Override
public DataType getAggregatedValueType() {
return AGGREGATED_VALUE_TYPE;
}

@Override
public Double getInitialAggregatedValue(Long rawValue, StarTreeNumericType starTreeNumericType) {
return starTreeNumericType.getDoubleValue(rawValue);
}

@Override
public Double applySegmentRawValue(Double value, Long rawValue, StarTreeNumericType starTreeNumericType) {
CompensatedSum kahanSummation = new CompensatedSum(0, 0);
kahanSummation.add(value);
kahanSummation.add(starTreeNumericType.getDoubleValue(rawValue));
return kahanSummation.value();
}

@Override
public Double applyAggregatedValue(Double value, Double aggregatedValue) {
CompensatedSum kahanSummation = new CompensatedSum(0, 0);
kahanSummation.add(value);
kahanSummation.add(aggregatedValue);
return kahanSummation.value();
}

@Override
public Double cloneAggregatedValue(Double value) {
return value;
}

@Override
public int getMaxAggregatedValueByteSize() {
return Double.BYTES;
}

@Override
public Long convertAggregationTypeToSortableLongValue(Double value) {
try {
return NumericUtils.doubleToSortableLong(value);
} catch (IllegalArgumentException | NullPointerException | IllegalStateException e) {
throw new IllegalArgumentException("Cannot convert " + value + " to sortable long", e);
}
}

@Override
public Double convertSortableLongToAggregatedTypeValue(Long value, StarTreeNumericType type) {
try {
return type.getDoubleValue(value);
} catch (IllegalArgumentException | NullPointerException | IllegalStateException e) {
throw new IllegalArgumentException("Cannot convert " + value + " to sortable aggregation type", e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.index.compositeindex.datacube.startree.aggregators;

import org.opensearch.index.compositeindex.datacube.MetricStat;
import org.opensearch.index.compositeindex.datacube.startree.aggregators.numerictype.StarTreeNumericType;
import org.opensearch.index.compositeindex.datacube.startree.data.DataType;

/**
* A value aggregator that pre-aggregates on the input values for a specific type of aggregation.
* @opensearch.experimental
*/
public interface ValueAggregator<A> {

/**
* Returns the type of the aggregation.
*/
MetricStat getAggregationType();

/**
* Returns the data type of the aggregated value.
*/
DataType getAggregatedValueType();

/**
* Returns the initial aggregated value.
*/
A getInitialAggregatedValue(Long rawValue, StarTreeNumericType starTreeNumericType);

/**
* Applies a raw value to the current aggregated value.
*/
A applySegmentRawValue(A value, Long rawValue, StarTreeNumericType starTreeNumericType);

/**
* Applies an aggregated value to the current aggregated value.
*/
A applyAggregatedValue(A value, A aggregatedValue);

/**
* Clones an aggregated value.
*/
A cloneAggregatedValue(A value);

/**
* Returns the maximum size in bytes of the aggregated values seen so far.
*/
int getMaxAggregatedValueByteSize();

/**
* Converts an aggregated value into a Long type.
*/
Long convertAggregationTypeToSortableLongValue(A value);

/**
* Converts an aggregated value from a Long type.
*/
A convertSortableLongToAggregatedTypeValue(Long rawValue, StarTreeNumericType type);
}
Loading

0 comments on commit e1bfa95

Please sign in to comment.