Skip to content

Commit

Permalink
rebase of file formats with merge
Browse files Browse the repository at this point in the history
Signed-off-by: Sarthak Aggarwal <[email protected]>
  • Loading branch information
sarthakaggarwal97 committed Jul 17, 2024
1 parent b74ef59 commit edf929e
Show file tree
Hide file tree
Showing 13 changed files with 108 additions and 106 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.opensearch.index.compositeindex.datacube.startree.builder.StarTreesBuilder;
import org.opensearch.index.mapper.CompositeMappedFieldType;
import org.opensearch.index.mapper.MapperService;
import org.opensearch.index.mapper.StarTreeMapper;

import java.io.IOException;
import java.util.ArrayList;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,9 @@ public Long toLongValue(Double value) {
}

@Override
public Double toStarTreeNumericTypeValue(Long value, StarTreeNumericType type) {
public Double toStarTreeNumericTypeValue(Long value) {
try {
return type.getDoubleValue(value);
return VALUE_AGGREGATOR_TYPE.getDoubleValue(value);
} catch (Exception e) {
throw new IllegalStateException("Cannot convert " + value + " to sortable aggregation type", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,9 @@ public Long toLongValue(Double value) {
}

@Override
public Double toStarTreeNumericTypeValue(Long value, StarTreeNumericType type) {
public Double toStarTreeNumericTypeValue(Long value) {
try {
return type.getDoubleValue(value);
return VALUE_AGGREGATOR_TYPE.getDoubleValue(value);
} catch (Exception e) {
throw new IllegalStateException("Cannot convert " + value + " to sortable aggregation type", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.opensearch.index.compositeindex.datacube.startree.aggregators.ValueAggregator;
import org.opensearch.index.compositeindex.datacube.startree.aggregators.numerictype.StarTreeNumericType;
import org.opensearch.index.compositeindex.datacube.startree.utils.SequentialDocValuesIterator;
import org.opensearch.index.compositeindex.datacube.startree.utils.StarTreeBuilderUtils;
import org.opensearch.index.compositeindex.datacube.startree.utils.TreeNode;
import org.opensearch.index.fielddata.IndexNumericFieldData;
import org.opensearch.index.mapper.Mapper;
Expand Down Expand Up @@ -176,12 +177,17 @@ public List<SequentialDocValuesIterator> getMetricReaders(SegmentWriteState stat

List<SequentialDocValuesIterator> metricReaders = new ArrayList<>();
for (Metric metric : this.starTreeField.getMetrics()) {
for (MetricStat metricType : metric.getMetrics()) {
for (MetricStat metricStat : metric.getMetrics()) {
SequentialDocValuesIterator metricReader;
FieldInfo metricFieldInfo = state.fieldInfos.fieldInfo(metric.getField());
if (metricStat != MetricStat.COUNT) {
metricReader = new SequentialDocValuesIterator(
fieldProducerMap.get(metricFieldInfo.name).getSortedNumeric(metricFieldInfo)
);
fieldProducerMap.get(metricFieldInfo.name).getSortedNumeric(metricFieldInfo)
);
} else {
metricReader = new SequentialDocValuesIterator();
}

metricReaders.add(metricReader);
}
}
Expand Down Expand Up @@ -215,7 +221,6 @@ public void build(
);
}
Iterator<StarTreeDocument> starTreeDocumentIterator = sortAndAggregateSegmentDocuments(
totalSegmentDocs,
dimensionReaders,
metricReaders
);
Expand Down Expand Up @@ -463,7 +468,6 @@ public SortedNumericDocValues getSortedNumeric(FieldInfo field) {
* Sorts and aggregates all the documents in the segment as per the configuration, and returns a star-tree document iterator for all the
* aggregated star-tree documents.
*
* @param numDocs number of documents in the given segment
* @param dimensionReaders List of docValues readers to read dimensions from the segment
* @param metricReaders List of docValues readers to read metrics from the segment
* @return Iterator for the aggregated star-tree document
Expand Down Expand Up @@ -694,12 +698,12 @@ public void build(Map<String, DocValuesProducer> fieldProducerMap) throws IOExce
logger.debug("No documents found in the segment");
return;
}
List<SequentialDocValuesIterator> metricReaders = getMetricReaders(state, fieldProducerMap);
List<SequentialDocValuesIterator> metricReaders = getMetricReaders(writeState, fieldProducerMap);
List<Dimension> dimensionsSplitOrder = starTreeField.getDimensionsOrder();
SequentialDocValuesIterator[] dimensionReaders = new SequentialDocValuesIterator[dimensionsSplitOrder.size()];
for (int i = 0; i < numDimensions; i++) {
String dimension = dimensionsSplitOrder.get(i).getField();
FieldInfo dimensionFieldInfo = state.fieldInfos.fieldInfo(dimension);
FieldInfo dimensionFieldInfo = writeState.fieldInfos.fieldInfo(dimension);
dimensionReaders[i] = new SequentialDocValuesIterator(
fieldProducerMap.get(dimensionFieldInfo.name).getSortedNumeric(dimensionFieldInfo)
);
Expand All @@ -710,31 +714,6 @@ public void build(Map<String, DocValuesProducer> fieldProducerMap) throws IOExce
logger.debug("Finished Building star-tree in ms : {}", (System.currentTimeMillis() - startTime));
}

/**
* Generates the configuration required to perform aggregation for all the metrics on a field
*
* @return list of MetricAggregatorInfo
*/
public List<SequentialDocValuesIterator> getMetricReaders(SegmentWriteState state, Map<String, DocValuesProducer> fieldProducerMap)
throws IOException {
List<SequentialDocValuesIterator> metricReaders = new ArrayList<>();
for (Metric metric : this.starTreeField.getMetrics()) {
for (MetricStat metricType : metric.getMetrics()) {
FieldInfo metricFieldInfo = state.fieldInfos.fieldInfo(metric.getField());
// TODO
// if (metricType != MetricStat.COUNT) {
// Need not initialize the metric reader for COUNT metric type
SequentialDocValuesIterator metricReader = new SequentialDocValuesIterator(
fieldProducerMap.get(metricFieldInfo.name).getSortedNumeric(metricFieldInfo)
);
// }

metricReaders.add(metricReader);
}
}
return metricReaders;
}

/**
* Builds the star tree using Star-Tree Document
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ public Iterator<StarTreeDocument> sortAndAggregateSegmentDocuments(
for (int currentDocId = 0; currentDocId < totalSegmentDocs; currentDocId++) {
starTreeDocuments[currentDocId] = getSegmentStarTreeDocument(currentDocId, dimensionReaders, metricReaders);
}
return sortAndAggregateStarTreeDocuments(starTreeDocuments);
return sortAndAggregateStarTreeDocuments(starTreeDocuments, false);
}

@Override
Expand Down Expand Up @@ -179,48 +179,11 @@ StarTreeDocument[] mergeStarTreeValues(List<StarTreeValues> starTreeValuesSubs)
return starTreeDocuments.toArray(starTreeDocumentsArr);
}

@Override
public StarTreeDocument getStarTreeDocument(int docId) throws IOException {
return starTreeDocuments.get(docId);
}

@Override
public StarTreeDocument getStarTreeDocumentForCreatingDocValues(int docId) throws IOException {
return starTreeDocuments.get(docId);
}

@Override
public List<StarTreeDocument> getStarTreeDocuments() {
return starTreeDocuments;
}

@Override
public Long getDimensionValue(int docId, int dimensionId) throws IOException {
return starTreeDocuments.get(docId).dimensions[dimensionId];
}

/**
* Sorts and aggregates all the documents of the segment based on dimension and metrics configuration
*
* @param numDocs number of documents in the given segment
* @param dimensionReaders List of docValues readers to read dimensions from the segment
* @param metricReaders List of docValues readers to read metrics from the segment
* @return Iterator of star-tree documents
*
*/
@Override
public Iterator<StarTreeDocument> sortAndAggregateSegmentDocuments(
int numDocs,
SequentialDocValuesIterator[] dimensionReaders,
List<SequentialDocValuesIterator> metricReaders
) throws IOException {
StarTreeDocument[] starTreeDocuments = new StarTreeDocument[numDocs];
for (int currentDocId = 0; currentDocId < numDocs; currentDocId++) {
starTreeDocuments[currentDocId] = getSegmentStarTreeDocument(currentDocId, dimensionReaders, metricReaders);
}
return sortAndAggregateStarTreeDocuments(starTreeDocuments);
}

/**
* Sort, aggregates and merges the star-tree documents
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,49 @@ public SequentialDocValuesIterator(DocIdSetIterator docIdSetIterator) {
this.docIdSetIterator = docIdSetIterator;
}

/**
* Constructs a new SequentialDocValuesIterator instance with the given SortedNumericDocValues.
*
*/
public SequentialDocValuesIterator() {
this.docIdSetIterator = new SortedNumericDocValues() {
@Override
public long nextValue() throws IOException {
return 0;
}

@Override
public int docValueCount() {
return 0;
}

@Override
public boolean advanceExact(int i) throws IOException {
return false;
}

@Override
public int docID() {
return 0;
}

@Override
public int nextDoc() throws IOException {
return 0;
}

@Override
public int advance(int i) throws IOException {
return 0;
}

@Override
public long cost() {
return 0;
}
};
}

/**
* Returns the value associated with the latest document.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,12 @@

import org.apache.lucene.index.SegmentWriteState;
import org.apache.lucene.store.IndexOutput;
import org.opensearch.common.annotation.ExperimentalApi;
import org.opensearch.index.compositeindex.datacube.startree.StarTreeField;
import org.opensearch.index.compositeindex.datacube.startree.aggregators.MetricAggregatorInfo;
import org.opensearch.index.mapper.CompositeMappedFieldType;

import java.io.IOException;
import java.util.List;
import java.util.Map;

/**
* Util class for building star tree
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public class StarTreeDataSerializer {
* @return the total size in bytes of the serialized star-tree data
* @throws IOException if an I/O error occurs while writing the star-tree data
*/
public static long serializeStarTree(IndexOutput indexOutput, StarTreeBuilderUtils.TreeNode rootNode, int numNodes) throws IOException {
public static long serializeStarTree(IndexOutput indexOutput, TreeNode rootNode, int numNodes) throws IOException {
int headerSizeInBytes = computeStarTreeDataHeaderByteSize();
long totalSizeInBytes = headerSizeInBytes + (long) numNodes * SERIALIZABLE_DATA_SIZE_IN_BYTES;

Expand Down Expand Up @@ -87,20 +87,20 @@ private static void writeStarTreeHeader(IndexOutput output, int numNodes) throws
* @param rootNode the root node of the star-tree
* @throws IOException if an I/O error occurs while writing the nodes
*/
private static void writeStarTreeNodes(IndexOutput output, StarTreeBuilderUtils.TreeNode rootNode) throws IOException {
Queue<StarTreeBuilderUtils.TreeNode> queue = new LinkedList<>();
private static void writeStarTreeNodes(IndexOutput output, TreeNode rootNode) throws IOException {
Queue<TreeNode> queue = new LinkedList<>();
queue.add(rootNode);

int currentNodeId = 0;
while (!queue.isEmpty()) {
StarTreeBuilderUtils.TreeNode node = queue.remove();
TreeNode node = queue.remove();

if (node.children == null) {
writeStarTreeNode(output, node, ALL, ALL);
} else {

// Sort all children nodes based on dimension value
List<StarTreeBuilderUtils.TreeNode> sortedChildren = new ArrayList<>(node.children.values());
List<TreeNode> sortedChildren = new ArrayList<>(node.children.values());
sortedChildren.sort(Comparator.comparingLong(o -> o.dimensionValue));

int firstChildId = currentNodeId + queue.size() + 1;
Expand All @@ -123,7 +123,7 @@ private static void writeStarTreeNodes(IndexOutput output, StarTreeBuilderUtils.
* @param lastChildId the ID of the last child node
* @throws IOException if an I/O error occurs while writing the node
*/
private static void writeStarTreeNode(IndexOutput output, StarTreeBuilderUtils.TreeNode node, int firstChildId, int lastChildId)
private static void writeStarTreeNode(IndexOutput output, TreeNode node, int firstChildId, int lastChildId)
throws IOException {
output.writeInt(node.dimensionId);
output.writeLong(node.dimensionValue);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,21 +23,33 @@
import org.opensearch.cluster.ClusterModule;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.CheckedConsumer;
import org.opensearch.common.Rounding;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.common.xcontent.XContentFactory;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.index.MapperTestUtils;
import org.opensearch.index.codec.composite.Composite99Codec;
import org.opensearch.index.compositeindex.datacube.MetricStat;
import org.opensearch.index.compositeindex.datacube.startree.StarTreeField;
import org.opensearch.index.compositeindex.datacube.startree.StarTreeFieldConfiguration;
import org.opensearch.index.mapper.MapperService;
import org.opensearch.index.mapper.StarTreeMapper;
import org.opensearch.indices.IndicesModule;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.BeforeClass;

import org.opensearch.index.compositeindex.datacube.Metric;
import org.opensearch.index.compositeindex.datacube.Dimension;
import org.opensearch.index.compositeindex.datacube.DateDimension;
import org.opensearch.index.compositeindex.datacube.NumericDimension;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

import static org.opensearch.common.util.FeatureFlags.STAR_TREE_INDEX;
import static org.opensearch.test.OpenSearchTestCase.randomFrom;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,6 @@ public void testToLongValue() {
}

public void testToStarTreeNumericTypeValue() {
assertEquals(NumericUtils.sortableLongToDouble(3L), aggregator.toStarTreeNumericTypeValue(3L, StarTreeNumericType.DOUBLE), 0.0);
assertEquals(NumericUtils.sortableLongToDouble(3L), aggregator.toStarTreeNumericTypeValue(3L), 0.0);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,6 @@ public void testToLongValue() {
}

public void testToStarTreeNumericTypeValue() {
assertEquals(NumericUtils.sortableLongToDouble(3L), aggregator.toStarTreeNumericTypeValue(3L, StarTreeNumericType.DOUBLE), 0.0);
assertEquals(NumericUtils.sortableLongToDouble(3L), aggregator.toStarTreeNumericTypeValue(3L), 0.0);
}
}
Loading

0 comments on commit edf929e

Please sign in to comment.