Skip to content

Commit

Permalink
Core: Support view metadata compression
Browse files Browse the repository at this point in the history
  • Loading branch information
nastra committed Sep 29, 2023
1 parent 6172f5c commit 8553226
Show file tree
Hide file tree
Showing 4 changed files with 88 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.function.Predicate;
import org.apache.iceberg.TableMetadataParser;
import org.apache.iceberg.exceptions.AlreadyExistsException;
import org.apache.iceberg.exceptions.CommitFailedException;
import org.apache.iceberg.exceptions.NoSuchViewException;
Expand Down Expand Up @@ -147,8 +148,14 @@ protected String writeNewMetadataIfRequired(ViewMetadata metadata) {
}

private String newMetadataFilePath(ViewMetadata metadata, int newVersion) {
String codecName =
metadata
.properties()
.getOrDefault(
ViewProperties.METADATA_COMPRESSION, ViewProperties.METADATA_COMPRESSION_DEFAULT);
String fileExtension = TableMetadataParser.getFileExtension(codecName);
return metadataFileLocation(
metadata, String.format("%05d-%s%s", newVersion, UUID.randomUUID(), ".metadata.json"));
metadata, String.format("%05d-%s%s", newVersion, UUID.randomUUID(), fileExtension));
}

private String metadataFileLocation(ViewMetadata metadata, String filename) {
Expand Down
12 changes: 10 additions & 2 deletions core/src/main/java/org/apache/iceberg/view/ViewMetadataParser.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,11 @@
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SchemaParser;
import org.apache.iceberg.TableMetadataParser.Codec;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
Expand Down Expand Up @@ -163,7 +166,9 @@ public static void write(ViewMetadata metadata, OutputFile outputFile) {
}

public static ViewMetadata read(InputFile file) {
try (InputStream is = file.newStream()) {
Codec codec = Codec.fromFileName(file.location());
try (InputStream is =
codec == Codec.GZIP ? new GZIPInputStream(file.newStream()) : file.newStream()) {
return fromJson(file.location(), JsonUtil.mapper().readValue(is, JsonNode.class));
} catch (IOException e) {
throw new UncheckedIOException(String.format("Failed to read json file: %s", file), e);
Expand All @@ -172,8 +177,11 @@ public static ViewMetadata read(InputFile file) {

private static void internalWrite(
ViewMetadata metadata, OutputFile outputFile, boolean overwrite) {
boolean isGzip = Codec.fromFileName(outputFile.location()) == Codec.GZIP;
OutputStream stream = overwrite ? outputFile.createOrOverwrite() : outputFile.create();
try (OutputStreamWriter writer = new OutputStreamWriter(stream, StandardCharsets.UTF_8)) {
try (OutputStreamWriter writer =
new OutputStreamWriter(
isGzip ? new GZIPOutputStream(stream) : stream, StandardCharsets.UTF_8)) {
JsonGenerator generator = JsonUtil.factory().createGenerator(writer);
generator.useDefaultPrettyPrinter();
toJson(metadata, generator);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,8 @@ public class ViewProperties {
public static final String VERSION_HISTORY_SIZE = "version.history.num-entries";
public static final int VERSION_HISTORY_SIZE_DEFAULT = 10;

public static final String METADATA_COMPRESSION = "write.metadata.compression-codec";
public static final String METADATA_COMPRESSION_DEFAULT = "gzip";

private ViewProperties() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,30 @@
*/
package org.apache.iceberg.view;

import static org.apache.iceberg.TableMetadataParser.getFileExtension;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

import com.fasterxml.jackson.databind.JsonNode;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.zip.GZIPInputStream;
import java.util.zip.ZipException;
import org.apache.iceberg.Schema;
import org.apache.iceberg.TableMetadataParser.Codec;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.types.Types;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

public class TestViewMetadataParser {

Expand All @@ -39,6 +51,8 @@ public class TestViewMetadataParser {
Types.NestedField.required(2, "y", Types.LongType.get(), "comment"),
Types.NestedField.required(3, "z", Types.LongType.get()));

@TempDir private Path tmp;

@Test
public void nullAndEmptyCheck() {
assertThatThrownBy(() -> ViewMetadataParser.fromJson((String) null))
Expand Down Expand Up @@ -308,4 +322,57 @@ public void replaceViewMetadataWithMultipleSQLsForDialect() throws Exception {

assertThat(replaced.currentVersion()).isEqualTo(viewVersion);
}

@ParameterizedTest
@ValueSource(strings = {"none", "gzip"})
public void metadataCompression(String codecName) throws IOException {
Codec codec = Codec.fromName(codecName);
String location = Paths.get(tmp.toString(), "v1" + getFileExtension(codec)).toString();
OutputFile outputFile = org.apache.iceberg.Files.localOutput(location);

Schema schema = new Schema(Types.NestedField.required(1, "x", Types.LongType.get()));
ViewVersion viewVersion =
ImmutableViewVersion.builder()
.schemaId(0)
.versionId(1)
.timestampMillis(23L)
.putSummary("operation", "create")
.defaultNamespace(Namespace.of("ns"))
.build();

ViewMetadata metadata =
ViewMetadata.buildFrom(
ViewMetadata.builder()
.setLocation(location)
.addSchema(schema)
.setProperties(ImmutableMap.of(ViewProperties.METADATA_COMPRESSION, codecName))
.addVersion(viewVersion)
.setCurrentVersionId(1)
.build())
.setMetadataLocation(outputFile.location())
.build();

ViewMetadataParser.write(metadata, outputFile);
assertThat(Codec.GZIP == codec).isEqualTo(isCompressed(location));

ViewMetadata actualMetadata =
ViewMetadataParser.read(org.apache.iceberg.Files.localInput(location));

assertThat(actualMetadata)
.usingRecursiveComparison()
.ignoringFieldsOfTypes(Schema.class)
.isEqualTo(metadata);
}

private boolean isCompressed(String path) throws IOException {
try (InputStream ignored = new GZIPInputStream(Files.newInputStream(new File(path).toPath()))) {
return true;
} catch (ZipException e) {
if (e.getMessage().equals("Not in GZIP format")) {
return false;
} else {
throw e;
}
}
}
}

0 comments on commit 8553226

Please sign in to comment.