Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parquet: Cache codecs by name and level #8182

Merged
merged 4 commits into from
Aug 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,88 +18,75 @@
*/
package org.apache.iceberg.parquet;

import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.compress.CodecPool;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.Decompressor;
import org.apache.parquet.bytes.BytesInput;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.parquet.hadoop.BadConfigurationException;
import org.apache.parquet.hadoop.CodecFactory;
import org.apache.parquet.hadoop.codec.ZstandardCodec;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;

/**
* This class implements a codec factory that is used when reading from Parquet. It adds a
* workaround for memory issues encountered when reading from zstd-compressed files. This is no
* longer used, as Parquet 1.13 includes this fix.
*
* @deprecated will be removed in 1.5.0
* workaround to cache codecs by name and level, not just by name. This can be removed when this
* change is made to Parquet.
*/
@Deprecated
public class ParquetCodecFactory extends CodecFactory {

public ParquetCodecFactory(Configuration configuration, int pageSize) {
super(configuration, pageSize);
}

/** Copied and modified from CodecFactory.HeapBytesDecompressor */
class HeapBytesDecompressor extends BytesDecompressor {

private final CompressionCodec codec;
private final Decompressor decompressor;

HeapBytesDecompressor(CompressionCodecName codecName) {
this.codec = getCodec(codecName);
if (codec != null) {
decompressor = CodecPool.getDecompressor(codec);
} else {
decompressor = null;
}
}

@Override
public BytesInput decompress(BytesInput bytes, int uncompressedSize) throws IOException {
final BytesInput decompressed;
if (codec != null) {
if (decompressor != null) {
decompressor.reset();
}
if (codec instanceof ZstandardCodec) {
// we need to close the zstd input stream ASAP to free up native resources, so
// read everything into a buffer and then close it
try (InputStream is = codec.createInputStream(bytes.toInputStream(), decompressor)) {
decompressed = BytesInput.copy(BytesInput.from(is, uncompressedSize));
}
} else {
InputStream is = codec.createInputStream(bytes.toInputStream(), decompressor);
decompressed = BytesInput.from(is, uncompressedSize);
}
} else {
decompressed = bytes;
}
return decompressed;
/**
* This is copied from {@link CodecFactory} and modified to include the level in the cache key.
*/
@Override
protected CompressionCodec getCodec(CompressionCodecName codecName) {
String codecClassName = codecName.getHadoopCompressionCodecClassName();
if (codecClassName == null) {
return null;
}

@Override
public void decompress(
ByteBuffer input, int compressedSize, ByteBuffer output, int uncompressedSize)
throws IOException {
ByteBuffer decompressed = decompress(BytesInput.from(input), uncompressedSize).toByteBuffer();
output.put(decompressed);
String cacheKey = cacheKey(codecName);
CompressionCodec codec = CODEC_BY_NAME.get(cacheKey);
if (codec != null) {
return codec;
}

@Override
public void release() {
if (decompressor != null) {
CodecPool.returnDecompressor(decompressor);
try {
Class<?> codecClass;
try {
codecClass = Class.forName(codecClassName);
} catch (ClassNotFoundException e) {
// Try to load the class using the job classloader
codecClass = configuration.getClassLoader().loadClass(codecClassName);
}
codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, configuration);
CODEC_BY_NAME.put(cacheKey, codec);
return codec;
} catch (ClassNotFoundException e) {
throw new BadConfigurationException("Class " + codecClassName + " was not found", e);
}
}

@Override
protected BytesDecompressor createDecompressor(CompressionCodecName codecName) {
return new HeapBytesDecompressor(codecName);
private String cacheKey(CompressionCodecName codecName) {
String level = null;
switch (codecName) {
case GZIP:
level = configuration.get("zlib.compress.level");
break;
case BROTLI:
level = configuration.get("compression.brotli.quality");
break;
case ZSTD:
level = configuration.get("parquet.compression.codec.zstd.level");
if (level == null) {
// keep "io.compression.codec.zstd.level" for backwards compatibility
level = configuration.get("io.compression.codec.zstd.level");
}
break;
default:
// compression level is not supported; ignore it
}
String codecClass = codecName.getHadoopCompressionCodecClassName();
return level == null ? codecClass : codecClass + ":" + level;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,8 @@ class ParquetWriter<T> implements FileAppender<T>, Closeable {
this.targetRowGroupSize = rowGroupSize;
this.props = properties;
this.metadata = ImmutableMap.copyOf(metadata);
this.compressor = new CodecFactory(conf, props.getPageSizeThreshold()).getCompressor(codec);
this.compressor =
new ParquetCodecFactory(conf, props.getPageSizeThreshold()).getCompressor(codec);
this.parquetSchema = ParquetSchemaUtil.convert(schema, "table");
this.model = (ParquetValueWriter<T>) createWriterFunc.apply(parquetSchema);
this.metricsConfig = metricsConfig;
Expand Down