Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reintroduce compression for binary doc_values #112416

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/112416.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 112416
summary: Reintroduce compression for binary `doc_values`
area: TSDB
type: enhancement
issues: []
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.index.codec.tsdb;

enum BinaryDVCompressionMode {

NO_COMPRESS((byte) 0),
COMPRESSED_WITH_LZ4((byte) 1);

final byte code;

BinaryDVCompressionMode(byte code) {
this.code = code;
}

static BinaryDVCompressionMode fromMode(byte mode) {
return switch (mode) {
case 0 -> NO_COMPRESS;
case 1 -> COMPRESSED_WITH_LZ4;
default -> throw new IllegalStateException("unknown compression mode [" + mode + "]");
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,13 @@

package org.elasticsearch.index.codec.tsdb;

import org.apache.lucene.backward_codecs.store.EndiannessReverserUtil;
import org.apache.lucene.codecs.CodecUtil;
import org.apache.lucene.codecs.DocValuesConsumer;
import org.apache.lucene.codecs.DocValuesProducer;
import org.apache.lucene.codecs.lucene90.IndexedDISI;
import org.apache.lucene.index.BinaryDocValues;
import org.apache.lucene.index.CorruptIndexException;
import org.apache.lucene.index.DocValues;
import org.apache.lucene.index.EmptyDocValuesProducer;
import org.apache.lucene.index.FieldInfo;
Expand All @@ -28,6 +30,8 @@
import org.apache.lucene.store.ByteArrayDataOutput;
import org.apache.lucene.store.ByteBuffersDataOutput;
import org.apache.lucene.store.ByteBuffersIndexOutput;
import org.apache.lucene.store.ChecksumIndexInput;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexOutput;
import org.apache.lucene.util.ArrayUtil;
import org.apache.lucene.util.BytesRef;
Expand All @@ -39,6 +43,7 @@
import org.apache.lucene.util.packed.PackedInts;
import org.elasticsearch.core.IOUtils;

import java.io.Closeable;
import java.io.IOException;
import java.util.Arrays;

Expand All @@ -50,9 +55,19 @@ final class ES87TSDBDocValuesConsumer extends DocValuesConsumer {
IndexOutput data, meta;
final int maxDoc;
private byte[] termsDictBuffer;

ES87TSDBDocValuesConsumer(SegmentWriteState state, String dataCodec, String dataExtension, String metaCodec, String metaExtension)
throws IOException {
final SegmentWriteState state;
final BinaryDVCompressionMode binaryDVCompressionMode;

ES87TSDBDocValuesConsumer(
BinaryDVCompressionMode binaryDVCompressionMode,
SegmentWriteState state,
String dataCodec,
String dataExtension,
String metaCodec,
String metaExtension
) throws IOException {
this.binaryDVCompressionMode = binaryDVCompressionMode;
this.state = state;
this.termsDictBuffer = new byte[1 << 14];
boolean success = false;
try {
Expand Down Expand Up @@ -194,7 +209,14 @@ private long[] writeField(FieldInfo field, DocValuesProducer valuesProducer, lon
public void addBinaryField(FieldInfo field, DocValuesProducer valuesProducer) throws IOException {
meta.writeInt(field.number);
meta.writeByte(ES87TSDBDocValuesFormat.BINARY);
meta.writeByte(binaryDVCompressionMode.code);
switch (binaryDVCompressionMode) {
case NO_COMPRESS -> doAddUncompressedBinary(field, valuesProducer);
case COMPRESSED_WITH_LZ4 -> doAddCompressedBinary(field, valuesProducer);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it make sense to compress with zstd?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see the numbers in the description.. Seems like zstd offers a substantial improvement over lz4 as usual, wonder how much risk that would bring here though..

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I'm not sure why we're keeping zstd behind the feature flag. If we're okay with it, I can switch to zstd.

Copy link
Member

@martijnvg martijnvg Sep 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

zstd usage in stored fields is behind a feature flag and more specifically for get by id performance in the best speed scenario. Hopefully we can remove the feature flag soon after we have done a few more experiments with different setting for best speed mode.

I think in the case of binary doc values we should use zstd instead of lz4?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @martijnvg. I will switch this to zstd.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be worth checking how it affects queries/aggs that need binary doc values, e.g. maybe the geoshape track?

}
}

private void doAddUncompressedBinary(FieldInfo field, DocValuesProducer valuesProducer) throws IOException {
BinaryDocValues values = valuesProducer.getBinary(field);
long start = data.getFilePointer();
meta.writeLong(start); // dataOffset
Expand Down Expand Up @@ -258,6 +280,214 @@ public void addBinaryField(FieldInfo field, DocValuesProducer valuesProducer) th
}
}

// BEGIN: Copied fom LUCENE-9211
private void doAddCompressedBinary(FieldInfo field, DocValuesProducer valuesProducer) throws IOException {
try (CompressedBinaryBlockWriter blockWriter = new CompressedBinaryBlockWriter()) {
BinaryDocValues values = valuesProducer.getBinary(field);
long start = data.getFilePointer();
meta.writeLong(start); // dataOffset
int numDocsWithField = 0;
int minLength = Integer.MAX_VALUE;
int maxLength = 0;
for (int doc = values.nextDoc(); doc != DocIdSetIterator.NO_MORE_DOCS; doc = values.nextDoc()) {
numDocsWithField++;
BytesRef v = values.binaryValue();
blockWriter.addDoc(doc, v);
int length = v.length;
minLength = Math.min(length, minLength);
maxLength = Math.max(length, maxLength);
}
blockWriter.flushData();

assert numDocsWithField <= maxDoc;
meta.writeLong(data.getFilePointer() - start); // dataLength

if (numDocsWithField == 0) {
meta.writeLong(-2); // docsWithFieldOffset
meta.writeLong(0L); // docsWithFieldLength
meta.writeShort((short) -1); // jumpTableEntryCount
meta.writeByte((byte) -1); // denseRankPower
} else if (numDocsWithField == maxDoc) {
meta.writeLong(-1); // docsWithFieldOffset
meta.writeLong(0L); // docsWithFieldLength
meta.writeShort((short) -1); // jumpTableEntryCount
meta.writeByte((byte) -1); // denseRankPower
} else {
long offset = data.getFilePointer();
meta.writeLong(offset); // docsWithFieldOffset
values = valuesProducer.getBinary(field);
final short jumpTableEntryCount = IndexedDISI.writeBitSet(values, data, IndexedDISI.DEFAULT_DENSE_RANK_POWER);
meta.writeLong(data.getFilePointer() - offset); // docsWithFieldLength
meta.writeShort(jumpTableEntryCount);
meta.writeByte(IndexedDISI.DEFAULT_DENSE_RANK_POWER);
}

meta.writeInt(numDocsWithField);
meta.writeInt(minLength);
meta.writeInt(maxLength);

blockWriter.writeMetaData();
}
}

static final int BINARY_BLOCK_SHIFT = 5;
static final int BINARY_DOCS_PER_COMPRESSED_BLOCK = 1 << BINARY_BLOCK_SHIFT;

private class CompressedBinaryBlockWriter implements Closeable {
final LZ4.FastCompressionHashTable ht = new LZ4.FastCompressionHashTable();
int uncompressedBlockLength = 0;
int maxUncompressedBlockLength = 0;
int numDocsInCurrentBlock = 0;
final int[] docLengths = new int[BINARY_DOCS_PER_COMPRESSED_BLOCK];
byte[] block = BytesRef.EMPTY_BYTES;
int totalChunks = 0;
long maxPointer = 0;
final long blockAddressesStart;

final IndexOutput tempBinaryOffsets;

CompressedBinaryBlockWriter() throws IOException {
tempBinaryOffsets = EndiannessReverserUtil.createTempOutput(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need to care about endianness here, do we?

state.directory,
state.segmentInfo.name,
"binary_pointers",
state.context
);
boolean success = false;
try {
CodecUtil.writeHeader(
tempBinaryOffsets,
ES87TSDBDocValuesFormat.META_CODEC + "FilePointers",
ES87TSDBDocValuesFormat.VERSION_CURRENT
);
blockAddressesStart = data.getFilePointer();
success = true;
} finally {
if (success == false) {
IOUtils.closeWhileHandlingException(this); // self-close because constructor caller can't
}
}
}

void addDoc(int doc, BytesRef v) throws IOException {
docLengths[numDocsInCurrentBlock] = v.length;
block = ArrayUtil.grow(block, uncompressedBlockLength + v.length);
System.arraycopy(v.bytes, v.offset, block, uncompressedBlockLength, v.length);
uncompressedBlockLength += v.length;
numDocsInCurrentBlock++;
if (numDocsInCurrentBlock == BINARY_DOCS_PER_COMPRESSED_BLOCK) {
flushData();
}
}

private void flushData() throws IOException {
if (numDocsInCurrentBlock > 0) {
// Write offset to this block to temporary offsets file
totalChunks++;
long thisBlockStartPointer = data.getFilePointer();

// Optimisation - check if all lengths are same
boolean allLengthsSame = true;
for (int i = 1; i < BINARY_DOCS_PER_COMPRESSED_BLOCK; i++) {
if (docLengths[i] != docLengths[i - 1]) {
allLengthsSame = false;
break;
}
}
if (allLengthsSame) {
// Only write one value shifted. Steal a bit to indicate all other lengths are the same
int onlyOneLength = (docLengths[0] << 1) | 1;
data.writeVInt(onlyOneLength);
} else {
for (int i = 0; i < BINARY_DOCS_PER_COMPRESSED_BLOCK; i++) {
if (i == 0) {
// Write first value shifted and steal a bit to indicate other lengths are to follow
int multipleLengths = (docLengths[0] << 1);
data.writeVInt(multipleLengths);
} else {
data.writeVInt(docLengths[i]);
}
}
}
maxUncompressedBlockLength = Math.max(maxUncompressedBlockLength, uncompressedBlockLength);
LZ4.compress(block, 0, uncompressedBlockLength, EndiannessReverserUtil.wrapDataOutput(data), ht);
numDocsInCurrentBlock = 0;
// Ensure initialized with zeroes because full array is always written
Arrays.fill(docLengths, 0);
uncompressedBlockLength = 0;
maxPointer = data.getFilePointer();
tempBinaryOffsets.writeVLong(maxPointer - thisBlockStartPointer);
}
}

void writeMetaData() throws IOException {
if (totalChunks == 0) {
return;
}

long startDMW = data.getFilePointer();
meta.writeLong(startDMW);

meta.writeVInt(totalChunks);
meta.writeVInt(BINARY_BLOCK_SHIFT);
meta.writeVInt(maxUncompressedBlockLength);
meta.writeVInt(DIRECT_MONOTONIC_BLOCK_SHIFT);

CodecUtil.writeFooter(tempBinaryOffsets);
IOUtils.close(tempBinaryOffsets);
// write the compressed block offsets info to the meta file by reading from temp file
try (
ChecksumIndexInput filePointersIn = EndiannessReverserUtil.openChecksumInput(
state.directory,
tempBinaryOffsets.getName(),
IOContext.READONCE
)
) {
CodecUtil.checkHeader(
filePointersIn,
ES87TSDBDocValuesFormat.META_CODEC + "FilePointers",
ES87TSDBDocValuesFormat.VERSION_CURRENT,
ES87TSDBDocValuesFormat.VERSION_CURRENT
);
Throwable priorE = null;
try {
final DirectMonotonicWriter filePointers = DirectMonotonicWriter.getInstance(
meta,
data,
totalChunks,
ES87TSDBDocValuesFormat.DIRECT_MONOTONIC_BLOCK_SHIFT
);
long fp = blockAddressesStart;
for (int i = 0; i < totalChunks; ++i) {
filePointers.add(fp);
fp += filePointersIn.readVLong();
}
if (maxPointer < fp) {
throw new CorruptIndexException(
"File pointers don't add up (" + fp + " vs expected " + maxPointer + ")",
filePointersIn
);
}
filePointers.finish();
} catch (Throwable e) {
priorE = e;
} finally {
CodecUtil.checkFooter(filePointersIn, priorE);
}
}
// Write the length of the DMW block in the data
meta.writeLong(data.getFilePointer() - startDMW);
}

@Override
public void close() throws IOException {
if (tempBinaryOffsets != null) {
IOUtils.close(tempBinaryOffsets, () -> state.directory.deleteFile(tempBinaryOffsets.getName()));
}
}
}
// END: Copied fom LUCENE-9211

@Override
public void addSortedField(FieldInfo field, DocValuesProducer valuesProducer) throws IOException {
meta.writeInt(field.number);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ public class ES87TSDBDocValuesFormat extends org.apache.lucene.codecs.DocValuesF
static final String META_CODEC = "ES87TSDBDocValuesMetadata";
static final String META_EXTENSION = "dvm";
static final int VERSION_START = 0;
static final int VERSION_CURRENT = VERSION_START;
static final int VERSION_BINARY_DV_COMPRESSION = VERSION_START;
static final int VERSION_CURRENT = VERSION_BINARY_DV_COMPRESSION;
static final byte NUMERIC = 0;
static final byte BINARY = 1;
static final byte SORTED = 2;
Expand All @@ -42,13 +43,21 @@ public class ES87TSDBDocValuesFormat extends org.apache.lucene.codecs.DocValuesF
static final int TERMS_DICT_REVERSE_INDEX_SIZE = 1 << TERMS_DICT_REVERSE_INDEX_SHIFT;
static final int TERMS_DICT_REVERSE_INDEX_MASK = TERMS_DICT_REVERSE_INDEX_SIZE - 1;

final BinaryDVCompressionMode binaryDVCompressionMode;

public ES87TSDBDocValuesFormat() {
this(BinaryDVCompressionMode.NO_COMPRESS);
Copy link
Member

@martijnvg martijnvg Sep 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be change to COMPRESSED_WITH_LZ4? Otherwise compression doesn't get used outside tests?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch :)

}

// allow testing old format
ES87TSDBDocValuesFormat(BinaryDVCompressionMode binaryDVCompressionMode) {
super(CODEC_NAME);
this.binaryDVCompressionMode = binaryDVCompressionMode;
}

@Override
public DocValuesConsumer fieldsConsumer(SegmentWriteState state) throws IOException {
return new ES87TSDBDocValuesConsumer(state, DATA_CODEC, DATA_EXTENSION, META_CODEC, META_EXTENSION);
return new ES87TSDBDocValuesConsumer(binaryDVCompressionMode, state, DATA_CODEC, DATA_EXTENSION, META_CODEC, META_EXTENSION);
}

@Override
Expand Down
Loading