Skip to content

Commit

Permalink
Add aircompressor bridge
Browse files Browse the repository at this point in the history
ORC/Parquet tests are using hadoop code to verify compatibility

Iceberg puffin format uses Zstd compressor/decompressor but cannot be upgraded to aircompressor as it requires JDK 22
  • Loading branch information
wendigo committed Jul 19, 2024
1 parent b4ce9a2 commit 512ed7b
Show file tree
Hide file tree
Showing 34 changed files with 1,117 additions and 0 deletions.
6 changes: 6 additions & 0 deletions lib/trino-orc/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,12 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>io.trino</groupId>
<artifactId>trino-aircompressor-bridge</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>io.trino</groupId>
<artifactId>trino-main</artifactId>
Expand Down
6 changes: 6 additions & 0 deletions lib/trino-parquet/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,12 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>io.trino</groupId>
<artifactId>trino-aircompressor-bridge</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>io.trino</groupId>
<artifactId>trino-main</artifactId>
Expand Down
6 changes: 6 additions & 0 deletions plugin/trino-hive/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -426,6 +426,12 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>io.trino</groupId>
<artifactId>trino-aircompressor-bridge</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>io.trino</groupId>
<artifactId>trino-exchange-filesystem</artifactId>
Expand Down
5 changes: 5 additions & 0 deletions plugin/trino-iceberg/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,11 @@
<artifactId>failsafe</artifactId>
</dependency>

<dependency>
<groupId>io.airlift</groupId>
<artifactId>aircompressor</artifactId>
</dependency>

<dependency>
<groupId>io.airlift</groupId>
<artifactId>bootstrap</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.airlift.compress;

import java.nio.ByteBuffer;

public interface Compressor
{
int maxCompressedLength(int uncompressedSize);

/**
* @return number of bytes written to the output
*/
int compress(byte[] input, int inputOffset, int inputLength, byte[] output, int outputOffset, int maxOutputLength);

void compress(ByteBuffer input, ByteBuffer output);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.airlift.compress;

import java.nio.ByteBuffer;

public interface Decompressor
{
/**
* @return number of bytes written to the output
*/
int decompress(byte[] input, int inputOffset, int inputLength, byte[] output, int outputOffset, int maxOutputLength)
throws MalformedInputException;

void decompress(ByteBuffer input, ByteBuffer output)
throws MalformedInputException;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.airlift.compress;

public class MalformedInputException
extends RuntimeException
{
private final long offset;

public MalformedInputException(long offset)
{
this(offset, "Malformed input");
}

public MalformedInputException(long offset, String reason)
{
super(reason + ": offset=" + offset);
this.offset = offset;
}

public long getOffset()
{
return offset;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.airlift.compress.zstd;

import io.airlift.compress.Compressor;
import io.airlift.compress.v2.zstd.ZstdJavaCompressor;
import io.airlift.compress.v2.zstd.ZstdNativeCompressor;

import java.lang.foreign.MemorySegment;
import java.nio.ByteBuffer;

import static java.lang.Math.toIntExact;

public class ZstdCompressor
implements Compressor
{
private static final boolean NATIVE_ENABLED = ZstdNativeCompressor.isEnabled();
private final io.airlift.compress.v2.zstd.ZstdCompressor compressor;

public ZstdCompressor()
{
this.compressor = NATIVE_ENABLED ? new ZstdNativeCompressor() : new ZstdJavaCompressor();
}

@Override
public int maxCompressedLength(int uncompressedSize)
{
return compressor.maxCompressedLength(uncompressedSize);
}

@Override
public int compress(byte[] input, int inputOffset, int inputLength, byte[] output, int outputOffset, int maxOutputLength)
{
MemorySegment inputSegment = MemorySegment.ofArray(input).asSlice(inputOffset, inputLength);
MemorySegment outputSegment = MemorySegment.ofArray(output).asSlice(outputOffset, maxOutputLength);

return toIntExact(compressor.compress(inputSegment, outputSegment));
}

@Override
public void compress(ByteBuffer input, ByteBuffer output)
{
MemorySegment inputSegment = MemorySegment.ofBuffer(input);
MemorySegment outputSegment = MemorySegment.ofBuffer(output);

int written = compressor.compress(inputSegment, outputSegment);
output.position(output.position() + written);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.airlift.compress.zstd;

import io.airlift.compress.Decompressor;
import io.airlift.compress.MalformedInputException;
import io.airlift.compress.v2.zstd.ZstdJavaDecompressor;
import io.airlift.compress.v2.zstd.ZstdNativeDecompressor;

import java.lang.foreign.MemorySegment;
import java.nio.ByteBuffer;

import static java.lang.Math.toIntExact;

public class ZstdDecompressor
implements Decompressor
{
private static final boolean NATIVE_ENABLED = ZstdNativeDecompressor.isEnabled();
private final io.airlift.compress.v2.zstd.ZstdDecompressor decompressor;

public ZstdDecompressor()
{
this.decompressor = NATIVE_ENABLED ? new ZstdNativeDecompressor() : new ZstdJavaDecompressor();
}

@Override
public int decompress(byte[] input, int inputOffset, int inputLength, byte[] output, int outputOffset, int maxOutputLength)
throws MalformedInputException
{
MemorySegment inputSegment = MemorySegment.ofArray(input).asSlice(inputOffset, inputLength);
MemorySegment outputSegment = MemorySegment.ofArray(output).asSlice(outputOffset, maxOutputLength);

return toIntExact(decompressor.decompress(inputSegment, outputSegment));
}

@Override
public void decompress(ByteBuffer input, ByteBuffer output)
throws MalformedInputException
{
MemorySegment inputSegment = MemorySegment.ofBuffer(input);
MemorySegment outputSegment = MemorySegment.ofBuffer(output);

int written = decompressor.decompress(inputSegment, outputSegment);
output.position(output.position() + written);
}

public static long getDecompressedSize(byte[] input, int offset, int length)
{
if (NATIVE_ENABLED) {
return new ZstdNativeDecompressor().getDecompressedSize(input, offset, length);
}
return new ZstdJavaDecompressor().getDecompressedSize(input, offset, length);
}
}
7 changes: 7 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@
<module>plugin/trino-tpch</module>
<module>service/trino-proxy</module>
<module>service/trino-verifier</module>
<module>testing/trino-aircompressor-bridge</module>
<module>testing/trino-benchmark-queries</module>
<module>testing/trino-benchto-benchmarks</module>
<module>testing/trino-faulttolerant-tests</module>
Expand Down Expand Up @@ -943,6 +944,12 @@
<version>${dep.swagger.version}</version>
</dependency>

<dependency>
<groupId>io.trino</groupId>
<artifactId>trino-aircompressor-bridge</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>io.trino</groupId>
<artifactId>trino-array</artifactId>
Expand Down
50 changes: 50 additions & 0 deletions testing/trino-aircompressor-bridge/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<parent>
<groupId>io.trino</groupId>
<artifactId>trino-root</artifactId>
<version>453-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>

<artifactId>trino-aircompressor-bridge</artifactId>
<description>Trino - Aircompressor v1 to v2 bridge</description>

<properties>
<air.compiler.fail-warnings>true</air.compiler.fail-warnings>
</properties>

<dependencies>

<dependency>
<groupId>io.airlift</groupId>
<artifactId>aircompressor</artifactId>
</dependency>

<dependency>
<groupId>io.trino.hadoop</groupId>
<artifactId>hadoop-apache</artifactId>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-engine</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.airlift.compress;

import java.nio.ByteBuffer;

public interface Compressor
{
int maxCompressedLength(int uncompressedSize);

/**
* @return number of bytes written to the output
*/
int compress(byte[] input, int inputOffset, int inputLength, byte[] output, int outputOffset, int maxOutputLength);

void compress(ByteBuffer input, ByteBuffer output);
}
Loading

0 comments on commit 512ed7b

Please sign in to comment.