Skip to content

Commit

Permalink
Precompute max compressed block size
Browse files Browse the repository at this point in the history
  • Loading branch information
wendigo committed Sep 5, 2024
1 parent e82d669 commit 2d799f9
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,21 @@
*/
package io.trino.execution.buffer;

import io.airlift.compress.v3.lz4.Lz4Compressor;
import io.airlift.compress.v3.zstd.ZstdCompressor;

import java.util.OptionalInt;

public enum CompressionCodec
{
NONE, LZ4, ZSTD
NONE, LZ4, ZSTD;

public OptionalInt maxCompressedLength(int inputSize)
{
return switch (this) {
case NONE -> OptionalInt.empty();
case LZ4 -> OptionalInt.of(Lz4Compressor.create().maxCompressedLength(inputSize));
case ZSTD -> OptionalInt.of(ZstdCompressor.create().maxCompressedLength(inputSize));
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,19 @@
import java.util.Optional;
import java.util.OptionalInt;

import static io.trino.execution.buffer.CompressionCodec.LZ4;
import static io.trino.execution.buffer.CompressionCodec.NONE;
import static io.trino.execution.buffer.CompressionCodec.ZSTD;
import static java.util.Objects.requireNonNull;

public class PagesSerdeFactory
{
private static final int SERIALIZED_PAGE_DEFAULT_BLOCK_SIZE_IN_BYTES = 64 * 1024;

private static final Map<CompressionCodec, OptionalInt> MAX_COMPRESSED_LENGTH = Map.of(
CompressionCodec.NONE, OptionalInt.empty(),
CompressionCodec.LZ4, OptionalInt.of(Lz4Compressor.create().maxCompressedLength(SERIALIZED_PAGE_DEFAULT_BLOCK_SIZE_IN_BYTES)),
CompressionCodec.ZSTD, OptionalInt.of(ZstdCompressor.create().maxCompressedLength(SERIALIZED_PAGE_DEFAULT_BLOCK_SIZE_IN_BYTES)));
NONE, NONE.maxCompressedLength(SERIALIZED_PAGE_DEFAULT_BLOCK_SIZE_IN_BYTES),
LZ4, LZ4.maxCompressedLength(SERIALIZED_PAGE_DEFAULT_BLOCK_SIZE_IN_BYTES),
ZSTD, ZSTD.maxCompressedLength(SERIALIZED_PAGE_DEFAULT_BLOCK_SIZE_IN_BYTES));

private final BlockEncodingSerde blockEncodingSerde;
private final CompressionCodec compressionCodec;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,8 +146,8 @@ private void testRoundTrip(List<Type> types, List<Page> pages, boolean encryptio
{
Optional<SecretKey> encryptionKey = encryptionEnabled ? Optional.of(createRandomAesEncryptionKey()) : Optional.empty();
for (CompressionCodec compressionCodec : CompressionCodec.values()) {
PageSerializer serializer = new PageSerializer(blockEncodingSerde, createCompressor(compressionCodec), encryptionKey, blockSizeInBytes);
PageDeserializer deserializer = new PageDeserializer(blockEncodingSerde, createDecompressor(compressionCodec), encryptionKey, blockSizeInBytes);
PageSerializer serializer = new PageSerializer(blockEncodingSerde, createCompressor(compressionCodec), encryptionKey, blockSizeInBytes, compressionCodec.maxCompressedLength(blockSizeInBytes));
PageDeserializer deserializer = new PageDeserializer(blockEncodingSerde, createDecompressor(compressionCodec), encryptionKey, blockSizeInBytes, compressionCodec.maxCompressedLength(blockSizeInBytes));
for (Page page : pages) {
Slice serialized = serializer.serialize(page);
Page deserialized = deserializer.deserialize(serialized);
Expand Down Expand Up @@ -270,8 +270,8 @@ private void testDeserializationWithRollover(boolean encryptionEnabled, int numb
RolloverBlockSerde blockSerde = new RolloverBlockSerde();
Optional<SecretKey> encryptionKey = encryptionEnabled ? Optional.of(createRandomAesEncryptionKey()) : Optional.empty();
for (CompressionCodec compressionCodec : CompressionCodec.values()) {
PageSerializer serializer = new PageSerializer(blockSerde, createCompressor(compressionCodec), encryptionKey, blockSize);
PageDeserializer deserializer = new PageDeserializer(blockSerde, createDecompressor(compressionCodec), encryptionKey, blockSize);
PageSerializer serializer = new PageSerializer(blockSerde, createCompressor(compressionCodec), encryptionKey, blockSize, compressionCodec.maxCompressedLength(blockSize));
PageDeserializer deserializer = new PageDeserializer(blockSerde, createDecompressor(compressionCodec), encryptionKey, blockSize, compressionCodec.maxCompressedLength(blockSize));

Page page = createTestPage(numberOfEntries);
Slice serialized = serializer.serialize(page);
Expand Down

0 comments on commit 2d799f9

Please sign in to comment.