Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Star tree codec changes #14514

Merged
merged 7 commits into from
Jul 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.opensearch.common.Nullable;
import org.opensearch.common.collect.MapBuilder;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.codec.composite.CompositeCodecFactory;
import org.opensearch.index.mapper.MapperService;

import java.util.Map;
Expand All @@ -63,6 +64,7 @@ public class CodecService {
* the raw unfiltered lucene default. useful for testing
*/
public static final String LUCENE_DEFAULT_CODEC = "lucene_default";
private final CompositeCodecFactory compositeCodecFactory = new CompositeCodecFactory();

public CodecService(@Nullable MapperService mapperService, IndexSettings indexSettings, Logger logger) {
final MapBuilder<String, Codec> codecs = MapBuilder.<String, Codec>newMapBuilder();
Expand All @@ -73,10 +75,16 @@ public CodecService(@Nullable MapperService mapperService, IndexSettings indexSe
codecs.put(BEST_COMPRESSION_CODEC, new Lucene99Codec(Mode.BEST_COMPRESSION));
codecs.put(ZLIB, new Lucene99Codec(Mode.BEST_COMPRESSION));
} else {
codecs.put(DEFAULT_CODEC, new PerFieldMappingPostingFormatCodec(Mode.BEST_SPEED, mapperService, logger));
codecs.put(LZ4, new PerFieldMappingPostingFormatCodec(Mode.BEST_SPEED, mapperService, logger));
codecs.put(BEST_COMPRESSION_CODEC, new PerFieldMappingPostingFormatCodec(Mode.BEST_COMPRESSION, mapperService, logger));
codecs.put(ZLIB, new PerFieldMappingPostingFormatCodec(Mode.BEST_COMPRESSION, mapperService, logger));
// CompositeCodec still delegates to PerFieldMappingPostingFormatCodec
// We can still support all the compression codecs when composite index is present
if (mapperService.isCompositeIndexPresent()) {
bharath-techie marked this conversation as resolved.
Show resolved Hide resolved
codecs.putAll(compositeCodecFactory.getCompositeIndexCodecs(mapperService, logger));
} else {
codecs.put(DEFAULT_CODEC, new PerFieldMappingPostingFormatCodec(Mode.BEST_SPEED, mapperService, logger));
codecs.put(LZ4, new PerFieldMappingPostingFormatCodec(Mode.BEST_SPEED, mapperService, logger));
codecs.put(BEST_COMPRESSION_CODEC, new PerFieldMappingPostingFormatCodec(Mode.BEST_COMPRESSION, mapperService, logger));
codecs.put(ZLIB, new PerFieldMappingPostingFormatCodec(Mode.BEST_COMPRESSION, mapperService, logger));
}
}
codecs.put(LUCENE_DEFAULT_CODEC, Codec.getDefault());
for (String codec : Codec.availableCodecs()) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.index.codec.composite;

import org.apache.logging.log4j.Logger;
import org.apache.lucene.codecs.Codec;
import org.apache.lucene.codecs.DocValuesFormat;
import org.apache.lucene.codecs.FilterCodec;
import org.apache.lucene.codecs.lucene99.Lucene99Codec;
import org.opensearch.common.annotation.ExperimentalApi;
import org.opensearch.index.codec.PerFieldMappingPostingFormatCodec;
import org.opensearch.index.mapper.MapperService;

/**
* Extends the Codec to support new file formats for composite indices eg: star tree index
* based on the mappings.
*
* @opensearch.experimental
*/
@ExperimentalApi
public class Composite99Codec extends FilterCodec {
sachinpkale marked this conversation as resolved.
Show resolved Hide resolved
public static final String COMPOSITE_INDEX_CODEC_NAME = "Composite99Codec";
private final MapperService mapperService;

// needed for SPI - this is used in reader path
public Composite99Codec() {
bharath-techie marked this conversation as resolved.
Show resolved Hide resolved
this(COMPOSITE_INDEX_CODEC_NAME, new Lucene99Codec(), null);
}

public Composite99Codec(Lucene99Codec.Mode compressionMode, MapperService mapperService, Logger logger) {
Copy link
Collaborator

@reta reta Aug 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bharath-techie @msfroh I think we have an issue here with PerFieldMappingPostingFormatCodec, as per invariants, PerFieldMappingPostingFormatCodec always uses the latest available codec


    static {
        assert Codec.forName(Lucene.LATEST_CODEC).getClass().isAssignableFrom(PerFieldMappingPostingFormatCodec.class)
            : "PerFieldMappingPostingFormatCodec must subclass the latest " + "lucene codec: " + Lucene.LATEST_CODEC;
    }

but Composite99Codec is explicitly bound to Lucene 9.9 (the latest for the current Apache Lucene version). I think we need to make changes here while it is not too late (Apache Lucene 9.12 comes with new codecs)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@reta I believe that should be fine.

Lets consider the case when we upgrade to Lucene-912.

  • The parametrized constructor for Composite99Codec breaks at compile time as PerFieldMappingPostingFormatCodec will accept only Lucene-912 Compression mode and the parameteized constructor in Composite99Codec will hence be removed (This is expected and preferred as we won't use this codec for any writes after upgrade).
  • For writes which have happened using Composite99Codec, the reads will use the SPI constructor which is hardcoded to point to Lucene99Codec and does not cause issues.
  • The new writes will happen via Composite9-12Codec which is obtained through CompositeCodecFactory as the implementation will switch to use Composite9-12Codec. The parametrized constructor will now only need to be present in this new codec.

For reference, this is how the custom-codecs plugin handles this as well.

this(COMPOSITE_INDEX_CODEC_NAME, new PerFieldMappingPostingFormatCodec(compressionMode, mapperService, logger), mapperService);
}

/**
* Sole constructor. When subclassing this codec, create a no-arg ctor and pass the delegate codec and a unique name to
* this ctor.
*
* @param name name of the codec
* @param delegate codec delegate
* @param mapperService mapper service instance
*/
protected Composite99Codec(String name, Codec delegate, MapperService mapperService) {
super(name, delegate);
this.mapperService = mapperService;
}

@Override
public DocValuesFormat docValuesFormat() {
return new Composite99DocValuesFormat(mapperService);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.index.codec.composite;

import org.apache.lucene.codecs.DocValuesConsumer;
import org.apache.lucene.codecs.DocValuesFormat;
import org.apache.lucene.codecs.DocValuesProducer;
import org.apache.lucene.codecs.lucene90.Lucene90DocValuesFormat;
import org.apache.lucene.index.SegmentReadState;
import org.apache.lucene.index.SegmentWriteState;
import org.opensearch.common.annotation.ExperimentalApi;
import org.opensearch.index.mapper.MapperService;

import java.io.IOException;

/**
* DocValues format to handle composite indices
*
* @opensearch.experimental
*/
@ExperimentalApi
public class Composite99DocValuesFormat extends DocValuesFormat {
/**
* Creates a new docvalues format.
*
* <p>The provided name will be written into the index segment in some configurations (such as
* when using {@code PerFieldDocValuesFormat}): in such configurations, for the segment to be read
* this class should be registered with Java's SPI mechanism (registered in META-INF/ of your jar
* file, etc).
*/
private final DocValuesFormat delegate;
private final MapperService mapperService;

// needed for SPI
public Composite99DocValuesFormat() {
this(new Lucene90DocValuesFormat(), null);
}

Check warning on line 43 in server/src/main/java/org/opensearch/index/codec/composite/Composite99DocValuesFormat.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/codec/composite/Composite99DocValuesFormat.java#L42-L43

Added lines #L42 - L43 were not covered by tests

public Composite99DocValuesFormat(MapperService mapperService) {
this(new Lucene90DocValuesFormat(), mapperService);
}

public Composite99DocValuesFormat(DocValuesFormat delegate, MapperService mapperService) {
super(delegate.getName());
this.delegate = delegate;
this.mapperService = mapperService;
}

@Override
public DocValuesConsumer fieldsConsumer(SegmentWriteState state) throws IOException {
return new Composite99DocValuesWriter(delegate.fieldsConsumer(state), state, mapperService);
}

@Override
public DocValuesProducer fieldsProducer(SegmentReadState state) throws IOException {
return new Composite99DocValuesReader(delegate.fieldsProducer(state), state);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.index.codec.composite;

import org.apache.lucene.codecs.DocValuesProducer;
import org.apache.lucene.index.BinaryDocValues;
import org.apache.lucene.index.FieldInfo;
import org.apache.lucene.index.NumericDocValues;
import org.apache.lucene.index.SegmentReadState;
import org.apache.lucene.index.SortedDocValues;
import org.apache.lucene.index.SortedNumericDocValues;
import org.apache.lucene.index.SortedSetDocValues;
import org.opensearch.common.annotation.ExperimentalApi;
import org.opensearch.index.mapper.CompositeMappedFieldType;

import java.io.IOException;
import java.util.List;

/**
* Reader for star tree index and star tree doc values from the segments
*
* @opensearch.experimental
*/
@ExperimentalApi
public class Composite99DocValuesReader extends DocValuesProducer implements CompositeIndexReader {
private DocValuesProducer delegate;

public Composite99DocValuesReader(DocValuesProducer producer, SegmentReadState state) throws IOException {
this.delegate = producer;
// TODO : read star tree files
}

@Override
public NumericDocValues getNumeric(FieldInfo field) throws IOException {
return delegate.getNumeric(field);
}

@Override
public BinaryDocValues getBinary(FieldInfo field) throws IOException {
return delegate.getBinary(field);
}

@Override
public SortedDocValues getSorted(FieldInfo field) throws IOException {
return delegate.getSorted(field);
}

@Override
public SortedNumericDocValues getSortedNumeric(FieldInfo field) throws IOException {
return delegate.getSortedNumeric(field);
}

@Override
public SortedSetDocValues getSortedSet(FieldInfo field) throws IOException {
return delegate.getSortedSet(field);
}

@Override
public void checkIntegrity() throws IOException {
delegate.checkIntegrity();
// Todo : check integrity of composite index related [star tree] files
}

@Override
public void close() throws IOException {
delegate.close();
// Todo: close composite index related files [star tree] files
}

@Override
public List<String> getCompositeIndexFields() {
// todo : read from file formats and get the field names.
throw new UnsupportedOperationException();

Check warning on line 79 in server/src/main/java/org/opensearch/index/codec/composite/Composite99DocValuesReader.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/codec/composite/Composite99DocValuesReader.java#L79

Added line #L79 was not covered by tests

}

@Override
public CompositeIndexValues getCompositeIndexValues(String field, CompositeMappedFieldType.CompositeFieldType fieldType)
throws IOException {
// TODO : read compositeIndexValues [starTreeValues] from star tree files
throw new UnsupportedOperationException();

Check warning on line 87 in server/src/main/java/org/opensearch/index/codec/composite/Composite99DocValuesReader.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/codec/composite/Composite99DocValuesReader.java#L87

Added line #L87 was not covered by tests
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.index.codec.composite;

import org.apache.lucene.codecs.DocValuesConsumer;
import org.apache.lucene.codecs.DocValuesProducer;
import org.apache.lucene.index.FieldInfo;
import org.apache.lucene.index.MergeState;
import org.apache.lucene.index.SegmentWriteState;
import org.opensearch.common.annotation.ExperimentalApi;
import org.opensearch.index.mapper.CompositeMappedFieldType;
import org.opensearch.index.mapper.MapperService;
import org.opensearch.index.mapper.StarTreeMapper;

import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;

/**
* This class write the star tree index and star tree doc values
* based on the doc values structures of the original index
*
* @opensearch.experimental
*/
@ExperimentalApi
public class Composite99DocValuesWriter extends DocValuesConsumer {
private final DocValuesConsumer delegate;
private final SegmentWriteState state;
private final MapperService mapperService;
AtomicReference<MergeState> mergeState = new AtomicReference<>();
private final Set<CompositeMappedFieldType> compositeMappedFieldTypes;
private final Set<String> compositeFieldSet;

private final Map<String, DocValuesProducer> fieldProducerMap = new HashMap<>();

public Composite99DocValuesWriter(DocValuesConsumer delegate, SegmentWriteState segmentWriteState, MapperService mapperService) {

this.delegate = delegate;
this.state = segmentWriteState;
this.mapperService = mapperService;
this.compositeMappedFieldTypes = mapperService.getCompositeFieldTypes();
compositeFieldSet = new HashSet<>();
for (CompositeMappedFieldType type : compositeMappedFieldTypes) {
compositeFieldSet.addAll(type.fields());
}
}

@Override
public void addNumericField(FieldInfo field, DocValuesProducer valuesProducer) throws IOException {
delegate.addNumericField(field, valuesProducer);
}

@Override
public void addBinaryField(FieldInfo field, DocValuesProducer valuesProducer) throws IOException {
delegate.addBinaryField(field, valuesProducer);
}

@Override
public void addSortedField(FieldInfo field, DocValuesProducer valuesProducer) throws IOException {
delegate.addSortedField(field, valuesProducer);
}

@Override
public void addSortedNumericField(FieldInfo field, DocValuesProducer valuesProducer) throws IOException {
delegate.addSortedNumericField(field, valuesProducer);
// Perform this only during flush flow
if (mergeState.get() == null) {
createCompositeIndicesIfPossible(valuesProducer, field);
}
}

@Override
public void addSortedSetField(FieldInfo field, DocValuesProducer valuesProducer) throws IOException {
delegate.addSortedSetField(field, valuesProducer);
}

@Override
public void close() throws IOException {
delegate.close();
}

private void createCompositeIndicesIfPossible(DocValuesProducer valuesProducer, FieldInfo field) throws IOException {
if (compositeFieldSet.isEmpty()) return;
if (compositeFieldSet.contains(field.name)) {
fieldProducerMap.put(field.name, valuesProducer);
compositeFieldSet.remove(field.name);
}
// we have all the required fields to build composite fields
if (compositeFieldSet.isEmpty()) {
for (CompositeMappedFieldType mappedType : compositeMappedFieldTypes) {
if (mappedType instanceof StarTreeMapper.StarTreeFieldType) {
// TODO : Call StarTree builder
}
}
}
}

@Override
public void merge(MergeState mergeState) throws IOException {
this.mergeState.compareAndSet(null, mergeState);
super.merge(mergeState);
// TODO : handle merge star tree
// mergeStarTreeFields(mergeState);
}
}
Loading
Loading