Skip to content

Commit

Permalink
Core: View metadata implementations
Browse files Browse the repository at this point in the history
Co-authored-by: John Zhuge <[email protected]>
  • Loading branch information
amogh-jahagirdar and jzhuge committed May 7, 2023
1 parent 9fb90ac commit 7ea7bfb
Show file tree
Hide file tree
Showing 11 changed files with 1,097 additions and 0 deletions.
112 changes: 112 additions & 0 deletions core/src/main/java/org/apache/iceberg/view/ViewMetadata.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.view;

import java.io.Serializable;
import java.util.List;
import java.util.Map;
import org.apache.iceberg.Schema;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.immutables.value.Value;

@Value.Immutable
public abstract class ViewMetadata implements Serializable {
static final int DEFAULT_VIEW_FORMAT_VERSION = 1;
static final int SUPPORTED_VIEW_FORMAT_VERSION = 1;

public static ImmutableViewMetadata.Builder buildFrom(ViewMetadata metadata) {
return ImmutableViewMetadata.builder()
.formatVersion(DEFAULT_VIEW_FORMAT_VERSION)
.location(metadata.location())
.properties(metadata.properties())
.currentVersionId(metadata.currentVersionId())
.versions(metadata.versions())
.schemas(metadata.schemas())
.currentSchemaId(metadata.currentSchemaId())
.history(metadata.history());
}

@Value.Check
void check() {
Preconditions.checkState(versions().size() > 0, "Expecting non-empty version log");
}

public abstract int formatVersion();

public abstract String location();

public abstract Map<String, String> properties();

public abstract int currentVersionId();

public abstract List<ViewVersion> versions();

public abstract List<ViewHistoryEntry> history();

public abstract List<Schema> schemas();

public abstract Integer currentSchemaId();

public ViewVersion version(int versionId) {
return versionsById().get(versionId);
}

@Value.Derived
public ViewVersion currentVersion() {
return versionsById().get(currentVersionId());
}

@Value.Derived
public Map<Integer, ViewVersion> versionsById() {
return indexVersions(versions());
}

@Value.Derived
public Map<Integer, Schema> schemasById() {
return indexSchemas(schemas());
}

@Value.Derived
public Schema schema() {
return schemasById().get(currentSchemaId());
}

private static Map<Integer, ViewVersion> indexVersions(List<ViewVersion> versions) {
ImmutableMap.Builder<Integer, ViewVersion> builder = ImmutableMap.builder();
for (ViewVersion version : versions) {
builder.put(version.versionId(), version);
}

return builder.build();
}

private static Map<Integer, Schema> indexSchemas(List<Schema> schemas) {
if (schemas == null) {
return ImmutableMap.of();
}

ImmutableMap.Builder<Integer, Schema> builder = ImmutableMap.builder();
for (Schema schema : schemas) {
builder.put(schema.schemaId(), schema);
}

return builder.build();
}
}
227 changes: 227 additions & 0 deletions core/src/main/java/org/apache/iceberg/view/ViewMetadataParser.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,227 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.view;

import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.JsonNode;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.io.UncheckedIOException;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SchemaParser;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.util.JsonUtil;
import org.apache.iceberg.util.PropertyUtil;

class ViewMetadataParser {

static final String FORMAT_VERSION = "format-version";
static final String LOCATION = "location";
static final String CURRENT_VERSION_ID = "current-version-id";
static final String VERSIONS = "versions";
static final String VERSION_LOG = "version-log";
static final String PROPERTIES = "properties";
static final String SCHEMAS = "schemas";
static final String CURRENT_SCHEMA_ID = "current-schema-id";

public static void overwrite(ViewMetadata metadata, OutputFile outputFile) {
internalWrite(metadata, outputFile, true);
}

public static void write(ViewMetadata metadata, OutputFile outputFile) {
internalWrite(metadata, outputFile, false);
}

public static void toJson(ViewMetadata metadata, JsonGenerator generator) throws IOException {
generator.writeStartObject();

generator.writeNumberField(FORMAT_VERSION, metadata.formatVersion());
generator.writeStringField(LOCATION, metadata.location());

JsonUtil.writeStringMap(PROPERTIES, metadata.properties(), generator);
generator.writeNumberField(CURRENT_SCHEMA_ID, metadata.currentSchemaId());
generator.writeArrayFieldStart(SCHEMAS);
for (Schema schema : metadata.schemas()) {
SchemaParser.toJson(schema, generator);
}

generator.writeEndArray();

generator.writeNumberField(CURRENT_VERSION_ID, metadata.currentVersionId());
generator.writeArrayFieldStart(VERSIONS);
for (ViewVersion version : metadata.versions()) {
ViewVersionParser.toJson(version, generator);
}

generator.writeEndArray();

generator.writeArrayFieldStart(VERSION_LOG);
for (ViewHistoryEntry viewHistoryEntry : metadata.history()) {
ViewHistoryEntryParser.toJson(viewHistoryEntry, generator);
}

generator.writeEndArray();
generator.writeEndObject();
}

static String toJson(ViewMetadata viewMetadata) {
return JsonUtil.generate(gen -> toJson(viewMetadata, gen), false);
}

public static ViewMetadata read(InputFile file) {
try (InputStream is = file.newStream()) {
return fromJson(JsonUtil.mapper().readValue(is, JsonNode.class));
} catch (IOException e) {
throw new UncheckedIOException(String.format("Failed to read json file: %s", file), e);
}
}

public static ViewMetadata fromJson(JsonNode node) {
Preconditions.checkArgument(node != null, "Cannot parse view metadata from null json");
Preconditions.checkArgument(
node.isObject(), "Cannot parse metadata from a non-object: %s", node);

int formatVersion = JsonUtil.getInt(FORMAT_VERSION, node);
Preconditions.checkArgument(
formatVersion <= ViewMetadata.SUPPORTED_VIEW_FORMAT_VERSION,
"Cannot read unsupported version %s",
formatVersion);

String location = JsonUtil.getString(LOCATION, node);
int currentVersionId = JsonUtil.getInt(CURRENT_VERSION_ID, node);
Map<String, String> properties = JsonUtil.getStringMap(PROPERTIES, node);

JsonNode versionsListNode = JsonUtil.get(VERSIONS, node);
Preconditions.checkArgument(
versionsListNode.isArray(), "Cannot parse versions from non-array: %s", versionsListNode);

List<ViewVersion> versions = Lists.newArrayListWithExpectedSize(versionsListNode.size());
ViewVersion currentVersion = null;
for (JsonNode versionNode : versionsListNode) {
ViewVersion version = ViewVersionParser.fromJson(versionNode);
if (version.versionId() == currentVersionId) {
currentVersion = version;
}

versions.add(version);
}

Preconditions.checkArgument(
currentVersion != null,
"Cannot find version with %s=%s from %s",
CURRENT_VERSION_ID,
currentVersionId,
VERSIONS);

JsonNode versionLogNode = JsonUtil.get(VERSION_LOG, node);
Preconditions.checkArgument(
versionLogNode.isArray(), "Cannot parse version-log from non-array: %s", versionLogNode);
List<ViewHistoryEntry> historyEntries =
Lists.newArrayListWithExpectedSize(versionLogNode.size());
Iterator<JsonNode> versionLogIterator = versionLogNode.elements();
while (versionLogIterator.hasNext()) {
historyEntries.add(ViewHistoryEntryParser.fromJson(versionLogIterator.next()));
}

List<Schema> schemas;
Integer currentSchemaId;
Schema currentSchema = null;
JsonNode schemaArray = node.get(SCHEMAS);

Preconditions.checkArgument(
schemaArray.isArray(), "Cannot parse schemas from non-array: %s", schemaArray);
currentSchemaId = JsonUtil.getInt(CURRENT_SCHEMA_ID, node);

ImmutableList.Builder<Schema> builder = ImmutableList.builder();
for (JsonNode schemaNode : schemaArray) {
Schema schema = SchemaParser.fromJson(schemaNode);
if (schema.schemaId() == currentSchemaId) {
currentSchema = schema;
}

builder.add(schema);
}

Preconditions.checkArgument(
currentSchema != null,
"Cannot find schema with %s=%s from %s",
CURRENT_SCHEMA_ID,
currentSchemaId,
SCHEMAS);

schemas = builder.build();

int numVersionsToKeep =
PropertyUtil.propertyAsInt(
properties,
ViewProperties.VERSION_HISTORY_SIZE,
ViewProperties.VERSION_HISTORY_SIZE_DEFAULT);

Preconditions.checkArgument(
numVersionsToKeep >= 1, "%s must be positive", ViewProperties.VERSION_HISTORY_SIZE);

if (versions.size() > numVersionsToKeep) {
versions = versions.subList(versions.size() - numVersionsToKeep, versions.size());
historyEntries =
historyEntries.subList(historyEntries.size() - numVersionsToKeep, historyEntries.size());
}

return ImmutableViewMetadata.builder()
.location(location)
.currentVersionId(currentVersionId)
.properties(properties)
.versions(versions)
.schemas(schemas)
.currentSchemaId(currentSchemaId)
.history(historyEntries)
.formatVersion(formatVersion)
.build();
}

static ViewMetadata fromJson(String json) {
Preconditions.checkArgument(json != null, "Cannot parse view metadata from null string");
return JsonUtil.parse(json, ViewMetadataParser::fromJson);
}

private static void internalWrite(
ViewMetadata metadata, OutputFile outputFile, boolean overwrite) {
OutputStream stream = overwrite ? outputFile.createOrOverwrite() : outputFile.create();
try (OutputStreamWriter writer = new OutputStreamWriter(stream, StandardCharsets.UTF_8)) {
JsonGenerator generator = JsonUtil.factory().createGenerator(writer);
generator.useDefaultPrettyPrinter();
toJson(metadata, generator);
generator.flush();
} catch (IOException e) {
throw new UncheckedIOException(
String.format("Failed to write json to file: %s", outputFile), e);
}
}

private ViewMetadataParser() {}
}
27 changes: 27 additions & 0 deletions core/src/main/java/org/apache/iceberg/view/ViewProperties.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.view;

/** View properties that can be set during CREATE/REPLACE view or using updateProperties API. */
public class ViewProperties {
public static final String VERSION_HISTORY_SIZE = "version.history.num-entries";
public static final int VERSION_HISTORY_SIZE_DEFAULT = 10;

private ViewProperties() {}
}
Loading

0 comments on commit 7ea7bfb

Please sign in to comment.