-
Notifications
You must be signed in to change notification settings - Fork 2.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Core: View metadata implementation #7759
Merged
Merged
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
129 changes: 129 additions & 0 deletions
129
core/src/main/java/org/apache/iceberg/view/ViewMetadata.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,129 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, | ||
* software distributed under the License is distributed on an | ||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
* KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
package org.apache.iceberg.view; | ||
|
||
import java.io.Serializable; | ||
import java.util.List; | ||
import java.util.Map; | ||
import org.apache.iceberg.Schema; | ||
import org.apache.iceberg.relocated.com.google.common.base.Preconditions; | ||
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; | ||
import org.apache.iceberg.util.PropertyUtil; | ||
import org.immutables.value.Value; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
@Value.Immutable | ||
public interface ViewMetadata extends Serializable { | ||
Logger LOG = LoggerFactory.getLogger(ViewMetadata.class); | ||
int SUPPORTED_VIEW_FORMAT_VERSION = 1; | ||
|
||
int formatVersion(); | ||
|
||
String location(); | ||
|
||
Integer currentSchemaId(); | ||
|
||
List<Schema> schemas(); | ||
|
||
int currentVersionId(); | ||
|
||
List<ViewVersion> versions(); | ||
|
||
List<ViewHistoryEntry> history(); | ||
|
||
Map<String, String> properties(); | ||
|
||
default ViewVersion version(int versionId) { | ||
return versionsById().get(versionId); | ||
} | ||
|
||
default ViewVersion currentVersion() { | ||
return versionsById().get(currentVersionId()); | ||
} | ||
|
||
@Value.Derived | ||
default Map<Integer, ViewVersion> versionsById() { | ||
ImmutableMap.Builder<Integer, ViewVersion> builder = ImmutableMap.builder(); | ||
for (ViewVersion version : versions()) { | ||
builder.put(version.versionId(), version); | ||
} | ||
|
||
return builder.build(); | ||
} | ||
|
||
@Value.Derived | ||
default Map<Integer, Schema> schemasById() { | ||
ImmutableMap.Builder<Integer, Schema> builder = ImmutableMap.builder(); | ||
for (Schema schema : schemas()) { | ||
builder.put(schema.schemaId(), schema); | ||
} | ||
|
||
return builder.build(); | ||
} | ||
|
||
default Schema schema() { | ||
return schemasById().get(currentSchemaId()); | ||
} | ||
|
||
@Value.Check | ||
default ViewMetadata checkAndNormalize() { | ||
Preconditions.checkArgument( | ||
formatVersion() > 0 && formatVersion() <= ViewMetadata.SUPPORTED_VIEW_FORMAT_VERSION, | ||
"Unsupported format version: %s", | ||
formatVersion()); | ||
|
||
Preconditions.checkArgument(versions().size() > 0, "Invalid view versions: empty"); | ||
Preconditions.checkArgument(history().size() > 0, "Invalid view history: empty"); | ||
Preconditions.checkArgument(schemas().size() > 0, "Invalid schemas: empty"); | ||
|
||
Preconditions.checkArgument( | ||
versionsById().containsKey(currentVersionId()), | ||
"Cannot find current version %s in view versions: %s", | ||
currentVersionId(), | ||
versionsById().keySet()); | ||
|
||
Preconditions.checkArgument( | ||
schemasById().containsKey(currentSchemaId()), | ||
"Cannot find current schema with id %s in schemas: %s", | ||
currentSchemaId(), | ||
schemasById().keySet()); | ||
|
||
int versionHistorySizeToKeep = | ||
PropertyUtil.propertyAsInt( | ||
properties(), | ||
ViewProperties.VERSION_HISTORY_SIZE, | ||
ViewProperties.VERSION_HISTORY_SIZE_DEFAULT); | ||
|
||
if (versionHistorySizeToKeep <= 0) { | ||
LOG.warn( | ||
"{} must be positive but was {}", | ||
ViewProperties.VERSION_HISTORY_SIZE, | ||
versionHistorySizeToKeep); | ||
} else if (versions().size() > versionHistorySizeToKeep) { | ||
List<ViewVersion> versions = | ||
versions().subList(versions().size() - versionHistorySizeToKeep, versions().size()); | ||
List<ViewHistoryEntry> history = | ||
history().subList(history().size() - versionHistorySizeToKeep, history().size()); | ||
return ImmutableViewMetadata.builder().from(this).versions(versions).history(history).build(); | ||
} | ||
|
||
return this; | ||
} | ||
} |
176 changes: 176 additions & 0 deletions
176
core/src/main/java/org/apache/iceberg/view/ViewMetadataParser.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,176 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, | ||
* software distributed under the License is distributed on an | ||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
* KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
package org.apache.iceberg.view; | ||
|
||
import com.fasterxml.jackson.core.JsonGenerator; | ||
import com.fasterxml.jackson.databind.JsonNode; | ||
import java.io.IOException; | ||
import java.io.InputStream; | ||
import java.io.OutputStream; | ||
import java.io.OutputStreamWriter; | ||
import java.io.UncheckedIOException; | ||
import java.nio.charset.StandardCharsets; | ||
import java.util.List; | ||
import java.util.Map; | ||
import org.apache.iceberg.Schema; | ||
import org.apache.iceberg.SchemaParser; | ||
import org.apache.iceberg.io.InputFile; | ||
import org.apache.iceberg.io.OutputFile; | ||
import org.apache.iceberg.relocated.com.google.common.base.Preconditions; | ||
import org.apache.iceberg.relocated.com.google.common.collect.Lists; | ||
import org.apache.iceberg.util.JsonUtil; | ||
|
||
public class ViewMetadataParser { | ||
|
||
static final String FORMAT_VERSION = "format-version"; | ||
static final String LOCATION = "location"; | ||
static final String CURRENT_VERSION_ID = "current-version-id"; | ||
static final String VERSIONS = "versions"; | ||
static final String VERSION_LOG = "version-log"; | ||
static final String PROPERTIES = "properties"; | ||
static final String SCHEMAS = "schemas"; | ||
static final String CURRENT_SCHEMA_ID = "current-schema-id"; | ||
|
||
private ViewMetadataParser() {} | ||
|
||
public static String toJson(ViewMetadata metadata) { | ||
return toJson(metadata, false); | ||
} | ||
|
||
public static String toJson(ViewMetadata metadata, boolean pretty) { | ||
return JsonUtil.generate(gen -> toJson(metadata, gen), pretty); | ||
} | ||
|
||
static void toJson(ViewMetadata metadata, JsonGenerator gen) throws IOException { | ||
Preconditions.checkArgument(null != metadata, "Invalid view metadata: null"); | ||
|
||
gen.writeStartObject(); | ||
|
||
gen.writeNumberField(FORMAT_VERSION, metadata.formatVersion()); | ||
gen.writeStringField(LOCATION, metadata.location()); | ||
JsonUtil.writeStringMap(PROPERTIES, metadata.properties(), gen); | ||
gen.writeNumberField(CURRENT_SCHEMA_ID, metadata.currentSchemaId()); | ||
|
||
gen.writeArrayFieldStart(SCHEMAS); | ||
for (Schema schema : metadata.schemas()) { | ||
SchemaParser.toJson(schema, gen); | ||
} | ||
gen.writeEndArray(); | ||
|
||
gen.writeNumberField(CURRENT_VERSION_ID, metadata.currentVersionId()); | ||
gen.writeArrayFieldStart(VERSIONS); | ||
for (ViewVersion version : metadata.versions()) { | ||
ViewVersionParser.toJson(version, gen); | ||
} | ||
gen.writeEndArray(); | ||
|
||
gen.writeArrayFieldStart(VERSION_LOG); | ||
for (ViewHistoryEntry viewHistoryEntry : metadata.history()) { | ||
ViewHistoryEntryParser.toJson(viewHistoryEntry, gen); | ||
} | ||
gen.writeEndArray(); | ||
|
||
gen.writeEndObject(); | ||
} | ||
|
||
public static ViewMetadata fromJson(String json) { | ||
Preconditions.checkArgument(json != null, "Cannot parse view metadata from null string"); | ||
return JsonUtil.parse(json, ViewMetadataParser::fromJson); | ||
} | ||
|
||
public static ViewMetadata fromJson(JsonNode json) { | ||
Preconditions.checkArgument(json != null, "Cannot parse view metadata from null object"); | ||
Preconditions.checkArgument( | ||
json.isObject(), "Cannot parse view metadata from non-object: %s", json); | ||
|
||
int formatVersion = JsonUtil.getInt(FORMAT_VERSION, json); | ||
String location = JsonUtil.getString(LOCATION, json); | ||
Map<String, String> properties = JsonUtil.getStringMap(PROPERTIES, json); | ||
|
||
int currentSchemaId = JsonUtil.getInt(CURRENT_SCHEMA_ID, json); | ||
JsonNode schemasNode = JsonUtil.get(SCHEMAS, json); | ||
|
||
Preconditions.checkArgument( | ||
schemasNode.isArray(), "Cannot parse schemas from non-array: %s", schemasNode); | ||
List<Schema> schemas = Lists.newArrayListWithExpectedSize(schemasNode.size()); | ||
|
||
for (JsonNode schemaNode : schemasNode) { | ||
schemas.add(SchemaParser.fromJson(schemaNode)); | ||
} | ||
|
||
int currentVersionId = JsonUtil.getInt(CURRENT_VERSION_ID, json); | ||
JsonNode versionsNode = JsonUtil.get(VERSIONS, json); | ||
Preconditions.checkArgument( | ||
versionsNode.isArray(), "Cannot parse versions from non-array: %s", versionsNode); | ||
List<ViewVersion> versions = Lists.newArrayListWithExpectedSize(versionsNode.size()); | ||
for (JsonNode versionNode : versionsNode) { | ||
versions.add(ViewVersionParser.fromJson(versionNode)); | ||
} | ||
|
||
JsonNode versionLogNode = JsonUtil.get(VERSION_LOG, json); | ||
Preconditions.checkArgument( | ||
versionLogNode.isArray(), "Cannot parse version-log from non-array: %s", versionLogNode); | ||
List<ViewHistoryEntry> historyEntries = | ||
Lists.newArrayListWithExpectedSize(versionLogNode.size()); | ||
for (JsonNode vLog : versionLogNode) { | ||
historyEntries.add(ViewHistoryEntryParser.fromJson(vLog)); | ||
} | ||
|
||
return ImmutableViewMetadata.builder() | ||
.location(location) | ||
.currentVersionId(currentVersionId) | ||
.properties(properties) | ||
.versions(versions) | ||
.schemas(schemas) | ||
.currentSchemaId(currentSchemaId) | ||
.history(historyEntries) | ||
.formatVersion(formatVersion) | ||
.build(); | ||
} | ||
|
||
public static void overwrite(ViewMetadata metadata, OutputFile outputFile) { | ||
internalWrite(metadata, outputFile, true); | ||
} | ||
|
||
public static void write(ViewMetadata metadata, OutputFile outputFile) { | ||
internalWrite(metadata, outputFile, false); | ||
} | ||
|
||
public static ViewMetadata read(InputFile file) { | ||
try (InputStream is = file.newStream()) { | ||
return fromJson(JsonUtil.mapper().readValue(is, JsonNode.class)); | ||
} catch (IOException e) { | ||
throw new UncheckedIOException(String.format("Failed to read json file: %s", file), e); | ||
} | ||
} | ||
|
||
private static void internalWrite( | ||
ViewMetadata metadata, OutputFile outputFile, boolean overwrite) { | ||
OutputStream stream = overwrite ? outputFile.createOrOverwrite() : outputFile.create(); | ||
try (OutputStreamWriter writer = new OutputStreamWriter(stream, StandardCharsets.UTF_8)) { | ||
JsonGenerator generator = JsonUtil.factory().createGenerator(writer); | ||
generator.useDefaultPrettyPrinter(); | ||
toJson(metadata, generator); | ||
generator.flush(); | ||
} catch (IOException e) { | ||
throw new UncheckedIOException( | ||
String.format("Failed to write json to file: %s", outputFile), e); | ||
} | ||
} | ||
} |
27 changes: 27 additions & 0 deletions
27
core/src/main/java/org/apache/iceberg/view/ViewProperties.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,27 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, | ||
* software distributed under the License is distributed on an | ||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
* KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
package org.apache.iceberg.view; | ||
|
||
/** View properties that can be set during CREATE/REPLACE view or using updateProperties API. */ | ||
public class ViewProperties { | ||
public static final String VERSION_HISTORY_SIZE = "version.history.num-entries"; | ||
public static final int VERSION_HISTORY_SIZE_DEFAULT = 10; | ||
|
||
private ViewProperties() {} | ||
} |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is confusing. The builder should enforce the maximum number of versions, not the check method. It's awkward that this returns a copy of the view metadata.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this seems to be the "official" way of normalizing an object: http://immutables.github.io/immutable.html#normalization
Not sure it's worth adding our own builder class for this particular case
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is strange to me and I'm not a big fan of having a
checkAndNormalize
in the interface, but okay I guess? It doesn't seem concerning enough to block usingImmutables
but it is annoying that the pattern requires exposing additional methods that don't have a clear contract (likevalidate
).There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree here completely.
It would be great if there could be a better & shorter alternative to achieve this.
The (longer) alternative would be to have our own builder that internally uses
ImmutableViewMetadata.builder()
and does the "normalization" but that doesn't prevent anyone from usingImmutableViewMetadata.builder()
directlyThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, but if we're using our own builder then what's the point of immutables?
For now, I think we should just get this in to unblock the next PR. We should follow up with the API that adds a new view representation and version. If that goes smoothly then it will be fine. If it doesn't fit then we can remove immutables and go with a direct implementation.