From 2c02fd62b252b441306b9d2800da4e660c81863a Mon Sep 17 00:00:00 2001 From: Bryan Keller Date: Sat, 29 Jul 2023 08:25:59 -0700 Subject: [PATCH 1/4] Parquet: cache codecs by name and level --- .../iceberg/parquet/ParquetCodecFactory.java | 111 ++++++++---------- .../apache/iceberg/parquet/ParquetWriter.java | 3 +- 2 files changed, 51 insertions(+), 63 deletions(-) diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetCodecFactory.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetCodecFactory.java index 47b9d158c509..ca400c74115d 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetCodecFactory.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetCodecFactory.java @@ -18,88 +18,75 @@ */ package org.apache.iceberg.parquet; -import java.io.IOException; -import java.io.InputStream; -import java.nio.ByteBuffer; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.io.compress.CodecPool; import org.apache.hadoop.io.compress.CompressionCodec; -import org.apache.hadoop.io.compress.Decompressor; -import org.apache.parquet.bytes.BytesInput; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.parquet.hadoop.BadConfigurationException; import org.apache.parquet.hadoop.CodecFactory; -import org.apache.parquet.hadoop.codec.ZstandardCodec; import org.apache.parquet.hadoop.metadata.CompressionCodecName; /** * This class implements a codec factory that is used when reading from Parquet. It adds a - * workaround for memory issues encountered when reading from zstd-compressed files. This is no - * longer used, as Parquet 1.13 includes this fix. - * - * @deprecated will be removed in 1.5.0 + * workaround to cache codecs by name and level, not just by name. This can be removed + * when this change is made to Parquet. */ -@Deprecated public class ParquetCodecFactory extends CodecFactory { public ParquetCodecFactory(Configuration configuration, int pageSize) { super(configuration, pageSize); } - /** Copied and modified from CodecFactory.HeapBytesDecompressor */ - class HeapBytesDecompressor extends BytesDecompressor { - - private final CompressionCodec codec; - private final Decompressor decompressor; - - HeapBytesDecompressor(CompressionCodecName codecName) { - this.codec = getCodec(codecName); - if (codec != null) { - decompressor = CodecPool.getDecompressor(codec); - } else { - decompressor = null; - } - } - - @Override - public BytesInput decompress(BytesInput bytes, int uncompressedSize) throws IOException { - final BytesInput decompressed; - if (codec != null) { - if (decompressor != null) { - decompressor.reset(); - } - if (codec instanceof ZstandardCodec) { - // we need to close the zstd input stream ASAP to free up native resources, so - // read everything into a buffer and then close it - try (InputStream is = codec.createInputStream(bytes.toInputStream(), decompressor)) { - decompressed = BytesInput.copy(BytesInput.from(is, uncompressedSize)); - } - } else { - InputStream is = codec.createInputStream(bytes.toInputStream(), decompressor); - decompressed = BytesInput.from(is, uncompressedSize); - } - } else { - decompressed = bytes; - } - return decompressed; + /** + * This is copied from {@link CodecFactory} and modified to include the level in the cache key. + */ + @Override + protected CompressionCodec getCodec(CompressionCodecName codecName) { + String codecClassName = codecName.getHadoopCompressionCodecClassName(); + if (codecClassName == null) { + return null; } - - @Override - public void decompress( - ByteBuffer input, int compressedSize, ByteBuffer output, int uncompressedSize) - throws IOException { - ByteBuffer decompressed = decompress(BytesInput.from(input), uncompressedSize).toByteBuffer(); - output.put(decompressed); + String cacheKey = cacheKey(codecName); + CompressionCodec codec = CODEC_BY_NAME.get(cacheKey); + if (codec != null) { + return codec; } - @Override - public void release() { - if (decompressor != null) { - CodecPool.returnDecompressor(decompressor); + try { + Class codecClass; + try { + codecClass = Class.forName(codecClassName); + } catch (ClassNotFoundException e) { + // Try to load the class using the job classloader + codecClass = configuration.getClassLoader().loadClass(codecClassName); } + codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, configuration); + CODEC_BY_NAME.put(cacheKey, codec); + return codec; + } catch (ClassNotFoundException e) { + throw new BadConfigurationException("Class " + codecClassName + " was not found", e); } } - @Override - protected BytesDecompressor createDecompressor(CompressionCodecName codecName) { - return new HeapBytesDecompressor(codecName); + private String cacheKey(CompressionCodecName codecName) { + String level = null; + switch (codecName) { + case GZIP: + level = configuration.get("zlib.compress.level"); + break; + case BROTLI: + level = configuration.get("compression.brotli.quality"); + break; + case ZSTD: + // keep "io.compression.codec.zstd.level" for backwards compatibility + level = configuration.get("io.compression.codec.zstd.level"); + if (level == null) { + level = configuration.get("parquet.compression.codec.zstd.level"); + } + break; + default: + // compression level is not supported; ignore it + } + String codecClass = codecName.getHadoopCompressionCodecClassName(); + return level == null ? codecClass : codecClass + "-" + level; } } diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriter.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriter.java index 577004993711..099cffc33bb8 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriter.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriter.java @@ -86,7 +86,8 @@ class ParquetWriter implements FileAppender, Closeable { this.targetRowGroupSize = rowGroupSize; this.props = properties; this.metadata = ImmutableMap.copyOf(metadata); - this.compressor = new CodecFactory(conf, props.getPageSizeThreshold()).getCompressor(codec); + this.compressor = + new ParquetCodecFactory(conf, props.getPageSizeThreshold()).getCompressor(codec); this.parquetSchema = ParquetSchemaUtil.convert(schema, "table"); this.model = (ParquetValueWriter) createWriterFunc.apply(parquetSchema); this.metricsConfig = metricsConfig; From 84817767274cda617e561acd37b53ec55a0bc3d7 Mon Sep 17 00:00:00 2001 From: Bryan Keller Date: Sat, 29 Jul 2023 08:47:02 -0700 Subject: [PATCH 2/4] spotless --- .../java/org/apache/iceberg/parquet/ParquetCodecFactory.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetCodecFactory.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetCodecFactory.java index ca400c74115d..f04b88b7f9f5 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetCodecFactory.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetCodecFactory.java @@ -27,8 +27,8 @@ /** * This class implements a codec factory that is used when reading from Parquet. It adds a - * workaround to cache codecs by name and level, not just by name. This can be removed - * when this change is made to Parquet. + * workaround to cache codecs by name and level, not just by name. This can be removed when this + * change is made to Parquet. */ public class ParquetCodecFactory extends CodecFactory { From d478f0587e0e4b9104541c1f948b8f84b60906ff Mon Sep 17 00:00:00 2001 From: Bryan Keller Date: Sat, 29 Jul 2023 14:40:18 -0700 Subject: [PATCH 3/4] change delimiter --- .../java/org/apache/iceberg/parquet/ParquetCodecFactory.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetCodecFactory.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetCodecFactory.java index f04b88b7f9f5..beef07a570b2 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetCodecFactory.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetCodecFactory.java @@ -87,6 +87,6 @@ private String cacheKey(CompressionCodecName codecName) { // compression level is not supported; ignore it } String codecClass = codecName.getHadoopCompressionCodecClassName(); - return level == null ? codecClass : codecClass + "-" + level; + return level == null ? codecClass : codecClass + ":" + level; } } From e85937bfb0ece9450aeff65b41f24017bda28aee Mon Sep 17 00:00:00 2001 From: Bryan Keller Date: Sun, 30 Jul 2023 07:59:31 -0700 Subject: [PATCH 4/4] prefer newer zstd config --- .../org/apache/iceberg/parquet/ParquetCodecFactory.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetCodecFactory.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetCodecFactory.java index beef07a570b2..bfcece6259a6 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetCodecFactory.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetCodecFactory.java @@ -77,10 +77,10 @@ private String cacheKey(CompressionCodecName codecName) { level = configuration.get("compression.brotli.quality"); break; case ZSTD: - // keep "io.compression.codec.zstd.level" for backwards compatibility - level = configuration.get("io.compression.codec.zstd.level"); + level = configuration.get("parquet.compression.codec.zstd.level"); if (level == null) { - level = configuration.get("parquet.compression.codec.zstd.level"); + // keep "io.compression.codec.zstd.level" for backwards compatibility + level = configuration.get("io.compression.codec.zstd.level"); } break; default: