diff --git a/api/src/main/java/org/apache/iceberg/actions/RewriteTablePath.java b/api/src/main/java/org/apache/iceberg/actions/RewriteTablePath.java index b7aed67396a5..7318d762edd8 100644 --- a/api/src/main/java/org/apache/iceberg/actions/RewriteTablePath.java +++ b/api/src/main/java/org/apache/iceberg/actions/RewriteTablePath.java @@ -38,9 +38,10 @@ *
    *
  1. The name of the latest metadata.json rewritten to staging location. After the files are * copied, this will be the root of the copied table. - *
  2. A list of all files added to the table between startVersion and endVersion, including their - * original and target paths under the target prefix. This list covers both original and - * rewritten files, allowing for copying to the target paths to form the copied table. + *
  3. A 'copy-plan'. This is a list of all files added to the table between startVersion and + * endVersion, including their original and target paths under the target prefix. This list + * covers both original and rewritten files, allowing for copying to the target paths from the + * copied table. *
*/ public interface RewriteTablePath extends Action { @@ -91,9 +92,10 @@ interface Result { String stagingLocation(); /** - * Path to a comma-separated list of source and target paths for all files added to the table - * between startVersion and endVersion, including original data files and metadata files - * rewritten to staging. + * Result file list location. This file contains a 'copy plan', a comma-separated list of all + * files added to the table between startVersion and endVersion, including their original and + * target paths under the target prefix. This list covers both original and rewritten files, + * allowing for copying to the target paths from the copied table. */ String fileListLocation(); diff --git a/core/src/main/java/org/apache/iceberg/ManifestLists.java b/core/src/main/java/org/apache/iceberg/ManifestLists.java index f20a481cf25a..113cb32df3d9 100644 --- a/core/src/main/java/org/apache/iceberg/ManifestLists.java +++ b/core/src/main/java/org/apache/iceberg/ManifestLists.java @@ -28,10 +28,10 @@ import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Lists; -class ManifestLists { +public class ManifestLists { private ManifestLists() {} - static List read(InputFile manifestList) { + public static List read(InputFile manifestList) { try (CloseableIterable files = Avro.read(manifestList) .rename("manifest_file", GenericManifestFile.class.getName()) @@ -50,7 +50,7 @@ static List read(InputFile manifestList) { } } - static ManifestListWriter write( + public static ManifestListWriter write( int formatVersion, OutputFile manifestListFile, long snapshotId, diff --git a/core/src/main/java/org/apache/iceberg/RewriteTablePathUtil.java b/core/src/main/java/org/apache/iceberg/RewriteTablePathUtil.java new file mode 100644 index 000000000000..4eea28d956ed --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/RewriteTablePathUtil.java @@ -0,0 +1,304 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import java.io.File; +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.deletes.PositionDelete; +import org.apache.iceberg.deletes.PositionDeleteWriter; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.CloseableIterator; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.util.Pair; + +public class RewriteTablePathUtil { + + public static List> rewriteManifest( + FileIO io, + int format, + PartitionSpec spec, + OutputFile outputFile, + ManifestFile manifestFile, + Map specsById, + String sourcePrefix, + String targetPrefix) + throws IOException { + try (ManifestWriter writer = + ManifestFiles.write(format, spec, outputFile, manifestFile.snapshotId()); + ManifestReader reader = + ManifestFiles.read(manifestFile, io, specsById).select(Arrays.asList("*"))) { + return StreamSupport.stream(reader.entries().spliterator(), false) + .map(entry -> newDataFile(entry, spec, sourcePrefix, targetPrefix, writer)) + .collect(Collectors.toList()); + } + } + + public static List> rewriteDeleteManifest( + FileIO io, + int format, + PartitionSpec spec, + OutputFile outputFile, + ManifestFile manifestFile, + Map specsById, + String sourcePrefix, + String targetPrefix, + String stagingLocation, + PositionDeleteReaderWriter positionDeleteReaderWriter) + throws IOException { + try (ManifestWriter writer = + ManifestFiles.writeDeleteManifest(format, spec, outputFile, manifestFile.snapshotId()); + ManifestReader reader = + ManifestFiles.readDeleteManifest(manifestFile, io, specsById) + .select(Arrays.asList("*"))) { + return StreamSupport.stream(reader.entries().spliterator(), false) + .map( + entry -> { + try { + return newDeleteFile( + entry, + io, + spec, + sourcePrefix, + targetPrefix, + stagingLocation, + writer, + positionDeleteReaderWriter); + } catch (IOException e) { + throw new RuntimeException(e); + } + }) + .collect(Collectors.toList()); + } + } + + private static Pair newDataFile( + ManifestEntry entry, + PartitionSpec spec, + String sourcePrefix, + String targetPrefix, + ManifestWriter writer) { + DataFile dataFile = entry.file(); + String sourceDataFilePath = dataFile.location(); + Preconditions.checkArgument( + sourceDataFilePath.startsWith(sourcePrefix), + "Encountered data file %s not under the source prefix %s", + sourceDataFilePath, + sourcePrefix); + String targetDataFilePath = newPath(sourceDataFilePath, sourcePrefix, targetPrefix); + DataFile newDataFile = + DataFiles.builder(spec).copy(entry.file()).withPath(targetDataFilePath).build(); + appendEntryWithFile(entry, writer, newDataFile); + return Pair.of(sourceDataFilePath, newDataFile.location()); + } + + private static Pair newDeleteFile( + ManifestEntry entry, + FileIO io, + PartitionSpec spec, + String sourcePrefix, + String targetPrefix, + String stagingLocation, + ManifestWriter writer, + PositionDeleteReaderWriter posDeleteReaderWriter) + throws IOException { + + DeleteFile file = entry.file(); + + switch (file.content()) { + case POSITION_DELETES: + DeleteFile posDeleteFile = + rewritePositionDeleteFile( + io, file, spec, sourcePrefix, stagingLocation, targetPrefix, posDeleteReaderWriter); + String targetDeleteFilePath = newPath(file.location(), sourcePrefix, targetPrefix); + DeleteFile movedFile = + FileMetadata.deleteFileBuilder(spec) + .copy(posDeleteFile) + .withPath(targetDeleteFilePath) + .build(); + appendEntryWithFile(entry, writer, movedFile); + return Pair.of(posDeleteFile.location(), movedFile.location()); + case EQUALITY_DELETES: + DeleteFile eqDeleteFile = newEqualityDeleteFile(file, spec, sourcePrefix, targetPrefix); + appendEntryWithFile(entry, writer, eqDeleteFile); + return Pair.of(file.location(), eqDeleteFile.location()); + default: + throw new UnsupportedOperationException("Unsupported delete file type: " + file.content()); + } + } + + private static > void appendEntryWithFile( + ManifestEntry entry, ManifestWriter writer, F file) { + + switch (entry.status()) { + case ADDED: + writer.add(file); + break; + case EXISTING: + writer.existing( + file, entry.snapshotId(), entry.dataSequenceNumber(), entry.fileSequenceNumber()); + break; + case DELETED: + writer.delete(file, entry.dataSequenceNumber(), entry.fileSequenceNumber()); + break; + } + } + + public interface PositionDeleteReaderWriter { + CloseableIterable reader(InputFile inputFile, FileFormat format, PartitionSpec spec); + + PositionDeleteWriter writer( + OutputFile outputFile, + FileFormat format, + PartitionSpec spec, + StructLike partition, + Schema rowSchema) + throws IOException; + } + + private static DeleteFile rewritePositionDeleteFile( + FileIO io, + DeleteFile current, + PartitionSpec spec, + String sourcePrefix, + String stagingLocation, + String targetPrefix, + PositionDeleteReaderWriter posDeleteReaderWriter) + throws IOException { + String path = current.location(); + if (!path.startsWith(sourcePrefix)) { + throw new UnsupportedOperationException( + "Expected delete file to be under the source prefix: " + + sourcePrefix + + " but was " + + path); + } + String newPath = stagingPath(path, stagingLocation); + + OutputFile targetFile = io.newOutputFile(newPath); + InputFile sourceFile = io.newInputFile(path); + + try (CloseableIterable reader = + posDeleteReaderWriter.reader(sourceFile, current.format(), spec)) { + org.apache.iceberg.data.Record record = null; + Schema rowSchema = null; + CloseableIterator recordIt = reader.iterator(); + + if (recordIt.hasNext()) { + record = recordIt.next(); + rowSchema = record.get(2) != null ? spec.schema() : null; + } + + PositionDeleteWriter writer = + posDeleteReaderWriter.writer( + targetFile, current.format(), spec, current.partition(), rowSchema); + + try (writer) { + if (record != null) { + writer.write(newPositionDeleteRecord(record, sourcePrefix, targetPrefix)); + } + + while (recordIt.hasNext()) { + record = recordIt.next(); + writer.write(newPositionDeleteRecord(record, sourcePrefix, targetPrefix)); + } + } + return writer.toDeleteFile(); + } + } + + private static PositionDelete newPositionDeleteRecord( + Record record, String sourcePrefix, String targetPrefix) { + PositionDelete delete = PositionDelete.create(); + String oldPath = (String) record.get(0); + String newPath = oldPath; + if (oldPath.startsWith(sourcePrefix)) { + newPath = newPath(oldPath, sourcePrefix, targetPrefix); + } + delete.set(newPath, (Long) record.get(1), record.get(2)); + return delete; + } + + private static DeleteFile newEqualityDeleteFile( + DeleteFile file, PartitionSpec spec, String sourcePrefix, String targetPrefix) { + String path = file.location(); + + if (!path.startsWith(sourcePrefix)) { + throw new UnsupportedOperationException( + "Expected delete file to be under the source prefix: " + + sourcePrefix + + " but was " + + path); + } + int[] equalityFieldIds = file.equalityFieldIds().stream().mapToInt(Integer::intValue).toArray(); + String newPath = newPath(path, sourcePrefix, targetPrefix); + return FileMetadata.deleteFileBuilder(spec) + .ofEqualityDeletes(equalityFieldIds) + .copy(file) + .withPath(newPath) + .withSplitOffsets(file.splitOffsets()) + .build(); + } + + private static String newPath(String path, String sourcePrefix, String targetPrefix) { + return combinePaths(targetPrefix, relativize(path, sourcePrefix)); + } + + private static String combinePaths(String absolutePath, String relativePath) { + String combined = absolutePath; + if (!combined.endsWith("/")) { + combined += "/"; + } + combined += relativePath; + return combined; + } + + private static String fileName(String path) { + String filename = path; + int lastIndex = path.lastIndexOf(File.separator); + if (lastIndex != -1) { + filename = path.substring(lastIndex + 1); + } + return filename; + } + + private static String relativize(String path, String prefix) { + String toRemove = prefix; + if (!toRemove.endsWith("/")) { + toRemove += "/"; + } + if (!path.startsWith(toRemove)) { + throw new IllegalArgumentException( + String.format("Path %s does not start with %s", path, toRemove)); + } + return path.substring(toRemove.length()); + } + + private static String stagingPath(String originalPath, String stagingLocation) { + return stagingLocation + fileName(originalPath); + } +} diff --git a/core/src/main/java/org/apache/iceberg/TableMetadataUtil.java b/core/src/main/java/org/apache/iceberg/TableMetadataUtil.java new file mode 100644 index 000000000000..474e28b5e31e --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/TableMetadataUtil.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import java.util.List; +import java.util.Map; +import org.apache.iceberg.TableMetadata.MetadataLogEntry; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; + +public class TableMetadataUtil { + private TableMetadataUtil() {} + + public static TableMetadata replacePaths( + TableMetadata metadata, String sourcePrefix, String targetPrefix) { + String newLocation = newPath(metadata.location(), sourcePrefix, targetPrefix); + List newSnapshots = updatePathInSnapshots(metadata, sourcePrefix, targetPrefix); + List metadataLogEntries = + updatePathInMetadataLogs(metadata, sourcePrefix, targetPrefix); + long snapshotId = + metadata.currentSnapshot() == null ? -1 : metadata.currentSnapshot().snapshotId(); + Map properties = + updateProperties(metadata.properties(), sourcePrefix, targetPrefix); + + return new TableMetadata( + null, + metadata.formatVersion(), + metadata.uuid(), + newLocation, + metadata.lastSequenceNumber(), + metadata.lastUpdatedMillis(), + metadata.lastColumnId(), + metadata.currentSchemaId(), + metadata.schemas(), + metadata.defaultSpecId(), + metadata.specs(), + metadata.lastAssignedPartitionId(), + metadata.defaultSortOrderId(), + metadata.sortOrders(), + properties, + snapshotId, + newSnapshots, + null, + metadata.snapshotLog(), + metadataLogEntries, + metadata.refs(), + metadata.statisticsFiles(), + metadata.partitionStatisticsFiles(), + metadata.changes()); + } + + private static Map updateProperties( + Map tableProperties, String sourcePrefix, String targetPrefix) { + Map properties = Maps.newHashMap(tableProperties); + updatePathInProperty(properties, sourcePrefix, targetPrefix, TableProperties.OBJECT_STORE_PATH); + updatePathInProperty( + properties, sourcePrefix, targetPrefix, TableProperties.WRITE_FOLDER_STORAGE_LOCATION); + updatePathInProperty( + properties, sourcePrefix, targetPrefix, TableProperties.WRITE_DATA_LOCATION); + updatePathInProperty( + properties, sourcePrefix, targetPrefix, TableProperties.WRITE_METADATA_LOCATION); + + return properties; + } + + private static void updatePathInProperty( + Map properties, + String sourcePrefix, + String targetPrefix, + String propertyName) { + if (properties.containsKey(propertyName)) { + properties.put( + propertyName, newPath(properties.get(propertyName), sourcePrefix, targetPrefix)); + } + } + + private static List updatePathInMetadataLogs( + TableMetadata metadata, String sourcePrefix, String targetPrefix) { + List metadataLogEntries = + Lists.newArrayListWithCapacity(metadata.previousFiles().size()); + for (MetadataLogEntry metadataLog : metadata.previousFiles()) { + MetadataLogEntry newMetadataLog = + new MetadataLogEntry( + metadataLog.timestampMillis(), + newPath(metadataLog.file(), sourcePrefix, targetPrefix)); + metadataLogEntries.add(newMetadataLog); + } + return metadataLogEntries; + } + + private static List updatePathInSnapshots( + TableMetadata metadata, String sourcePrefix, String targetPrefix) { + List newSnapshots = Lists.newArrayListWithCapacity(metadata.snapshots().size()); + for (Snapshot snapshot : metadata.snapshots()) { + String newManifestListLocation = + newPath(snapshot.manifestListLocation(), sourcePrefix, targetPrefix); + Snapshot newSnapshot = + new BaseSnapshot( + snapshot.sequenceNumber(), + snapshot.snapshotId(), + snapshot.parentId(), + snapshot.timestampMillis(), + snapshot.operation(), + snapshot.summary(), + snapshot.schemaId(), + newManifestListLocation); + newSnapshots.add(newSnapshot); + } + return newSnapshots; + } + + private static String newPath(String path, String sourcePrefix, String targetPrefix) { + return path.replaceFirst(sourcePrefix, targetPrefix); + } +} diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java index 53ce7418f3ec..b91331b0f70f 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java @@ -139,6 +139,11 @@ protected Table newStaticTable(TableMetadata metadata, FileIO io) { return new BaseTable(ops, metadata.metadataFileLocation()); } + protected Table newStaticTable(String metadataFileLocation, FileIO io) { + StaticTableOperations ops = new StaticTableOperations(metadataFileLocation, io); + return new BaseTable(ops, metadataFileLocation); + } + protected Dataset contentFileDS(Table table) { return contentFileDS(table, null); } diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java new file mode 100644 index 000000000000..e3a64ad8a1a9 --- /dev/null +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java @@ -0,0 +1,735 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.actions; + +import java.io.File; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.stream.Collectors; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.HasTableOperations; +import org.apache.iceberg.ManifestFile; +import org.apache.iceberg.ManifestLists; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.RewriteTablePathUtil; +import org.apache.iceberg.Schema; +import org.apache.iceberg.SerializableTable; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.StaticTableOperations; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.TableMetadata.MetadataLogEntry; +import org.apache.iceberg.TableMetadataParser; +import org.apache.iceberg.TableMetadataUtil; +import org.apache.iceberg.actions.ImmutableRewriteTablePath; +import org.apache.iceberg.actions.RewriteTablePath; +import org.apache.iceberg.avro.Avro; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.data.avro.DataReader; +import org.apache.iceberg.data.avro.DataWriter; +import org.apache.iceberg.data.orc.GenericOrcReader; +import org.apache.iceberg.data.orc.GenericOrcWriter; +import org.apache.iceberg.data.parquet.GenericParquetReaders; +import org.apache.iceberg.data.parquet.GenericParquetWriter; +import org.apache.iceberg.deletes.PositionDeleteWriter; +import org.apache.iceberg.exceptions.RuntimeIOException; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.DeleteSchemaUtil; +import org.apache.iceberg.io.FileAppender; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.orc.ORC; +import org.apache.iceberg.parquet.Parquet; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.spark.JobGroupInfo; +import org.apache.iceberg.util.Pair; +import org.apache.spark.api.java.function.MapPartitionsFunction; +import org.apache.spark.broadcast.Broadcast; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoder; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.functions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.Tuple2; + +public class RewriteTablePathSparkAction extends BaseSparkAction + implements RewriteTablePath { + + private static final Logger LOG = LoggerFactory.getLogger(RewriteTablePathSparkAction.class); + private static final String RESULT_LOCATION = "file-list"; + + private String sourcePrefix; + private String targetPrefix; + private String startVersionName; + private String endVersionName; + private String stagingDir; + + private final Table table; + + RewriteTablePathSparkAction(SparkSession spark, Table table) { + super(spark); + this.table = table; + } + + @Override + protected RewriteTablePath self() { + return this; + } + + @Override + public RewriteTablePath rewriteLocationPrefix(String sPrefix, String tPrefix) { + Preconditions.checkArgument( + sPrefix != null && !sPrefix.isEmpty(), "Source prefix('%s') cannot be empty.", sPrefix); + this.sourcePrefix = sPrefix; + this.targetPrefix = tPrefix; + return this; + } + + @Override + public RewriteTablePath startVersion(String sVersion) { + Preconditions.checkArgument( + sVersion != null && !sVersion.trim().isEmpty(), + "Last copied version('%s') cannot be empty.", + sVersion); + this.startVersionName = sVersion; + return this; + } + + @Override + public RewriteTablePath endVersion(String eVersion) { + Preconditions.checkArgument( + eVersion != null && !eVersion.trim().isEmpty(), + "End version('%s') cannot be empty.", + eVersion); + this.endVersionName = eVersion; + return this; + } + + @Override + public RewriteTablePath stagingLocation(String stagingLocation) { + Preconditions.checkArgument( + stagingLocation != null && !stagingLocation.isEmpty(), + "Staging location('%s') cannot be empty.", + stagingLocation); + this.stagingDir = stagingLocation; + return this; + } + + @Override + public Result execute() { + validateInputs(); + JobGroupInfo info = newJobGroupInfo("COPY-TABLE", jobDesc()); + return withJobGroupInfo(info, this::doExecute); + } + + private Result doExecute() { + String resultLocation = rebuildMetadata(); + return ImmutableRewriteTablePath.Result.builder() + .stagingLocation(stagingDir) + .fileListLocation(resultLocation) + .latestVersion(fileName(endVersionName)) + .build(); + } + + private void validateInputs() { + Preconditions.checkArgument( + sourcePrefix != null && !sourcePrefix.isEmpty(), + "Source prefix('%s') cannot be empty.", + sourcePrefix); + Preconditions.checkArgument( + targetPrefix != null && !targetPrefix.isEmpty(), + "Target prefix('%s') cannot be empty.", + targetPrefix); + Preconditions.checkArgument( + !sourcePrefix.equals(targetPrefix), + "Source prefix cannot be the same as target prefix (%s)", + sourcePrefix); + + validateAndSetEndVersion(); + validateAndSetStartVersion(); + + if (stagingDir == null) { + stagingDir = getMetadataLocation(table) + "copy-table-staging-" + UUID.randomUUID() + "/"; + } else if (!stagingDir.endsWith("/")) { + stagingDir = stagingDir + "/"; + } + } + + private void validateAndSetEndVersion() { + TableMetadata tableMetadata = ((HasTableOperations) table).operations().current(); + + if (endVersionName == null) { + LOG.info("No end version specified. Will stage all files to the latest table version."); + Preconditions.checkNotNull(tableMetadata.metadataFileLocation()); + this.endVersionName = tableMetadata.metadataFileLocation(); + } else { + this.endVersionName = validateVersion(tableMetadata, endVersionName); + } + } + + private void validateAndSetStartVersion() { + TableMetadata tableMetadata = ((HasTableOperations) table).operations().current(); + + if (startVersionName != null) { + this.startVersionName = validateVersion(tableMetadata, startVersionName); + } + } + + private String validateVersion(TableMetadata tableMetadata, String versionFileName) { + String versionFile = versionFile(tableMetadata, versionFileName); + + Preconditions.checkNotNull( + versionFile, "Version file %s does not exist in metadata log.", versionFile); + Preconditions.checkArgument( + fileExist(versionFile), "Version file %s does not exist.", versionFile); + return versionFile; + } + + private String versionFile(TableMetadata metadata, String versionFileName) { + if (versionInFilePath(metadata.metadataFileLocation(), versionFileName)) { + return metadata.metadataFileLocation(); + } + + for (MetadataLogEntry log : metadata.previousFiles()) { + if (versionInFilePath(log.file(), versionFileName)) { + return log.file(); + } + } + return null; + } + + private boolean versionInFilePath(String path, String version) { + return fileName(path).equals(version); + } + + private String jobDesc() { + if (startVersionName != null) { + return String.format( + "Replacing path prefixes '%s' with '%s' in the metadata files of table %s," + + "up to version '%s'.", + sourcePrefix, targetPrefix, table.name(), endVersionName); + } else { + return String.format( + "Replacing path prefixes '%s' with '%s' in the metadata files of table %s," + + "from version '%s' to '%s'.", + sourcePrefix, targetPrefix, table.name(), startVersionName, endVersionName); + } + } + + /** + * + * + *
    + *
  • Rebuild version files to staging + *
  • Rebuild manifest list files to staging + *
  • Rebuild manifest to staging + *
  • Get all files needed to move + *
+ */ + private String rebuildMetadata() { + TableMetadata startMetadata = + startVersionName != null + ? ((HasTableOperations) newStaticTable(startVersionName, table.io())) + .operations() + .current() + : null; + TableMetadata endMetadata = + ((HasTableOperations) newStaticTable(endVersionName, table.io())).operations().current(); + + Preconditions.checkArgument( + endMetadata.statisticsFiles() == null || endMetadata.statisticsFiles().isEmpty(), + "Statistic files are not supported yet."); + + // rebuild version files + RewriteResult rewriteVersionResult = rewriteVersionFiles(endMetadata); + Set diffSnapshots = getDiffSnapshotIds(startMetadata, rewriteVersionResult.toRewrite); + + Set manifestsToRewrite = manifestsToRewrite(diffSnapshots, startMetadata); + Set validSnapshots = + Sets.difference(snapshotSet(endMetadata), snapshotSet(startMetadata)); + + // rebuild manifest-list files + RewriteResult rewriteManifestListResult = + validSnapshots.stream() + .map(snapshot -> rewriteManifestList(snapshot, endMetadata, manifestsToRewrite)) + .reduce(new RewriteResult<>(), RewriteResult::new); + + // rebuild manifest files + Set> contentFilesToMove = + rewriteManifests(endMetadata, rewriteManifestListResult.toRewrite); + + Set> movePlan = Sets.newHashSet(); + movePlan.addAll(rewriteVersionResult.copyPlan); + movePlan.addAll(rewriteManifestListResult.copyPlan); + movePlan.addAll(contentFilesToMove); + + return saveFileList(movePlan); + } + + private String saveFileList(Set> filesToMove) { + List> fileList = + filesToMove.stream() + .map(p -> Tuple2.apply(p.first(), p.second())) + .collect(Collectors.toList()); + Dataset> fileListDataset = + spark().createDataset(fileList, Encoders.tuple(Encoders.STRING(), Encoders.STRING())); + String fileListPath = stagingDir + RESULT_LOCATION; + fileListDataset + .repartition(1) + .write() + .mode(SaveMode.Overwrite) + .format("csv") + .save(fileListPath); + return fileListPath; + } + + private Set getDiffSnapshotIds( + TableMetadata startMetadata, Set allSnapshots) { + if (startMetadata == null) { + return allSnapshots; + } else { + Set startSnapshotIds = + startMetadata.snapshots().stream().map(Snapshot::snapshotId).collect(Collectors.toSet()); + return allSnapshots.stream() + .filter(s -> !startSnapshotIds.contains(s.snapshotId())) + .collect(Collectors.toSet()); + } + } + + private RewriteResult rewriteVersionFiles(TableMetadata endMetadata) { + RewriteResult result = new RewriteResult<>(); + result.toRewrite.addAll(endMetadata.snapshots()); + result.copyPlan.add(rewriteVersionFile(endMetadata, endVersionName)); + + List versions = endMetadata.previousFiles(); + for (int i = versions.size() - 1; i >= 0; i--) { + String versionFilePath = versions.get(i).file(); + if (versionFilePath.equals(startVersionName)) { + break; + } + + Preconditions.checkArgument( + fileExist(versionFilePath), + String.format("Version file %s doesn't exist", versionFilePath)); + TableMetadata tableMetadata = + new StaticTableOperations(versionFilePath, table.io()).current(); + + result.toRewrite.addAll(tableMetadata.snapshots()); + result.copyPlan.add(rewriteVersionFile(tableMetadata, versionFilePath)); + } + + return result; + } + + private Pair rewriteVersionFile(TableMetadata metadata, String versionFilePath) { + String stagingPath = stagingPath(versionFilePath, stagingDir); + TableMetadata newTableMetadata = + TableMetadataUtil.replacePaths(metadata, sourcePrefix, targetPrefix); + TableMetadataParser.overwrite(newTableMetadata, table.io().newOutputFile(stagingPath)); + return Pair.of(stagingPath, newPath(versionFilePath, sourcePrefix, targetPrefix)); + } + + /** + * Rewrite a manifest list representing a snapshot. + * + * @param snapshot snapshot represented by the manifest list + * @param tableMetadata metadata of table + */ + private RewriteResult rewriteManifestList( + Snapshot snapshot, TableMetadata tableMetadata, Set manifestsToRewrite) { + RewriteResult result = new RewriteResult<>(); + List manifestFiles = manifestFilesInSnapshot(snapshot); + String path = snapshot.manifestListLocation(); + String stagingPath = stagingPath(path, stagingDir); + OutputFile outputFile = table.io().newOutputFile(stagingPath); + try (FileAppender writer = + ManifestLists.write( + tableMetadata.formatVersion(), + outputFile, + snapshot.snapshotId(), + snapshot.parentId(), + snapshot.sequenceNumber())) { + + for (ManifestFile file : manifestFiles) { + Preconditions.checkArgument( + file.path().startsWith(sourcePrefix), + "Encountered manifest file %s not under the source prefix %s", + file.path(), + sourcePrefix); + + ManifestFile newFile = file.copy(); + ((StructLike) newFile).set(0, newPath(newFile.path(), sourcePrefix, targetPrefix)); + writer.add(newFile); + + // return the ManifestFile object for subsequent rewriting + if (manifestsToRewrite.contains(file.path())) { + result.toRewrite.add(file); + result.copyPlan.add(Pair.of(stagingPath(file.path(), stagingDir), newFile.path())); + } + } + + result.copyPlan.add(Pair.of(stagingPath, newPath(path, sourcePrefix, targetPrefix))); + return result; + } catch (IOException e) { + throw new UncheckedIOException("Failed to rewrite the manifest list file " + path, e); + } + } + + private List manifestFilesInSnapshot(Snapshot snapshot) { + String path = snapshot.manifestListLocation(); + List manifestFiles = Lists.newLinkedList(); + try { + manifestFiles = ManifestLists.read(table.io().newInputFile(path)); + } catch (RuntimeIOException e) { + LOG.warn("Failed to read manifest list {}", path, e); + } + return manifestFiles; + } + + private Set manifestsToRewrite(Set diffSnapshots, TableMetadata startMetadata) { + try { + Table endStaticTable = newStaticTable(endVersionName, table.io()); + Dataset lastVersionFiles = manifestDS(endStaticTable).select("path"); + if (startMetadata == null) { + return Sets.newHashSet(lastVersionFiles.distinct().as(Encoders.STRING()).collectAsList()); + } else { + Set diffSnapshotIds = + diffSnapshots.stream().map(Snapshot::snapshotId).collect(Collectors.toSet()); + return Sets.newHashSet( + lastVersionFiles + .distinct() + .filter(functions.column("added_snapshot_id").isInCollection(diffSnapshotIds)) + .as(Encoders.STRING()) + .collectAsList()); + } + } catch (Exception e) { + throw new UnsupportedOperationException( + "Failed to build the manifest files dataframe, the end version you are " + + "trying to copy may contain invalid snapshots, please a younger version that doesn't have invalid " + + "snapshots", + e); + } + } + + /** Rewrite manifest files in a distributed manner and return rewritten data files path pairs. */ + private Set> rewriteManifests( + TableMetadata tableMetadata, Set toRewrite) { + if (toRewrite.isEmpty()) { + return Sets.newHashSet(); + } + + Encoder manifestFileEncoder = Encoders.javaSerialization(ManifestFile.class); + Dataset manifestDS = + spark().createDataset(Lists.newArrayList(toRewrite), manifestFileEncoder); + + Broadcast serializableTable = sparkContext().broadcast(SerializableTable.copyOf(table)); + Broadcast> specsById = + sparkContext().broadcast(tableMetadata.specsById()); + + List> dataFiles = + manifestDS + .repartition(toRewrite.size()) + .mapPartitions( + toManifests( + serializableTable, + stagingDir, + tableMetadata.formatVersion(), + specsById, + sourcePrefix, + targetPrefix), + Encoders.tuple(Encoders.STRING(), Encoders.STRING())) + .collectAsList(); + + // duplicates are expected here as the same data file can have different statuses + // (e.g. added and deleted) + return dataFiles.stream().map(t -> Pair.of(t._1(), t._2())).collect(Collectors.toSet()); + } + + private static MapPartitionsFunction> toManifests( + Broadcast
tableBroadcast, + String stagingLocation, + int format, + Broadcast> specsById, + String sourcePrefix, + String targetPrefix) { + + return rows -> { + List> files = Lists.newArrayList(); + while (rows.hasNext()) { + ManifestFile manifestFile = rows.next(); + switch (manifestFile.content()) { + case DATA: + files.addAll( + writeDataManifest( + manifestFile, + tableBroadcast, + stagingLocation, + format, + specsById, + sourcePrefix, + targetPrefix)); + break; + case DELETES: + files.addAll( + writeDeleteManifest( + manifestFile, + tableBroadcast, + stagingLocation, + format, + specsById, + sourcePrefix, + targetPrefix)); + break; + default: + throw new UnsupportedOperationException( + "Unsupported manifest type: " + manifestFile.content()); + } + } + return files.iterator(); + }; + } + + private static List> writeDataManifest( + ManifestFile manifestFile, + Broadcast
tableBroadcast, + String stagingLocation, + int format, + Broadcast> specsByIdBroadcast, + String sourcePrefix, + String targetPrefix) + throws IOException { + String stagingPath = stagingPath(manifestFile.path(), stagingLocation); + FileIO io = tableBroadcast.getValue().io(); + OutputFile outputFile = io.newOutputFile(stagingPath); + Map specsById = specsByIdBroadcast.getValue(); + PartitionSpec spec = specsById.get(manifestFile.partitionSpecId()); + + return RewriteTablePathUtil.rewriteManifest( + io, format, spec, outputFile, manifestFile, specsById, sourcePrefix, targetPrefix) + .stream() + .map(p -> Tuple2.apply(p.first(), p.second())) + .collect(Collectors.toList()); + } + + private static List> writeDeleteManifest( + ManifestFile manifestFile, + Broadcast
tableBroadcast, + String stagingLocation, + int format, + Broadcast> specsByIdBroadcast, + String sourcePrefix, + String targetPrefix) + throws IOException { + String stagingPath = stagingPath(manifestFile.path(), stagingLocation); + FileIO io = tableBroadcast.getValue().io(); + OutputFile outputFile = io.newOutputFile(stagingPath); + Map specsById = specsByIdBroadcast.getValue(); + PartitionSpec spec = specsById.get(manifestFile.partitionSpecId()); + RewriteTablePathUtil.PositionDeleteReaderWriter posDeleteReaderWriter = + new RewriteTablePathUtil.PositionDeleteReaderWriter() { + @Override + public CloseableIterable reader( + InputFile inputFile, FileFormat format, PartitionSpec spec) { + return positionDeletesReader(inputFile, format, spec); + } + + @Override + public PositionDeleteWriter writer( + OutputFile outputFile, + FileFormat format, + PartitionSpec spec, + StructLike partition, + Schema rowSchema) + throws IOException { + return positionDeletesWriter(outputFile, format, spec, partition, rowSchema); + } + }; + return RewriteTablePathUtil.rewriteDeleteManifest( + io, + format, + spec, + outputFile, + manifestFile, + specsById, + sourcePrefix, + targetPrefix, + stagingLocation, + posDeleteReaderWriter) + .stream() + .map(p -> Tuple2.apply(p.first(), p.second())) + .collect(Collectors.toList()); + } + + private static CloseableIterable positionDeletesReader( + InputFile inputFile, FileFormat format, PartitionSpec spec) { + Schema deleteSchema = DeleteSchemaUtil.posDeleteSchema(spec.schema()); + switch (format) { + case AVRO: + return Avro.read(inputFile) + .project(deleteSchema) + .reuseContainers() + .createReaderFunc(DataReader::create) + .build(); + + case PARQUET: + return Parquet.read(inputFile) + .project(deleteSchema) + .reuseContainers() + .createReaderFunc( + fileSchema -> GenericParquetReaders.buildReader(deleteSchema, fileSchema)) + .build(); + + case ORC: + return ORC.read(inputFile) + .project(deleteSchema) + .createReaderFunc(fileSchema -> GenericOrcReader.buildReader(deleteSchema, fileSchema)) + .build(); + + default: + throw new UnsupportedOperationException("Unsupported file format: " + format); + } + } + + private static PositionDeleteWriter positionDeletesWriter( + OutputFile outputFile, + FileFormat format, + PartitionSpec spec, + StructLike partition, + Schema rowSchema) + throws IOException { + switch (format) { + case AVRO: + return Avro.writeDeletes(outputFile) + .createWriterFunc(DataWriter::create) + .withPartition(partition) + .rowSchema(rowSchema) + .withSpec(spec) + .buildPositionWriter(); + case PARQUET: + return Parquet.writeDeletes(outputFile) + .createWriterFunc(GenericParquetWriter::buildWriter) + .withPartition(partition) + .rowSchema(rowSchema) + .withSpec(spec) + .buildPositionWriter(); + case ORC: + return ORC.writeDeletes(outputFile) + .createWriterFunc(GenericOrcWriter::buildWriter) + .withPartition(partition) + .rowSchema(rowSchema) + .withSpec(spec) + .buildPositionWriter(); + default: + throw new UnsupportedOperationException("Unsupported file format: " + format); + } + } + + private Set snapshotSet(TableMetadata metadata) { + if (metadata == null) { + return Sets.newHashSet(); + } else { + return Sets.newHashSet(metadata.snapshots()); + } + } + + private boolean fileExist(String path) { + if (path == null || path.trim().isEmpty()) { + return false; + } + return table.io().newInputFile(path).exists(); + } + + private static String relativize(String path, String prefix) { + String toRemove = prefix; + if (!toRemove.endsWith("/")) { + toRemove += "/"; + } + if (!path.startsWith(toRemove)) { + throw new IllegalArgumentException( + String.format("Path %s does not start with %s", path, toRemove)); + } + return path.substring(toRemove.length()); + } + + private static String newPath(String path, String sourcePrefix, String targetPrefix) { + return combinePaths(targetPrefix, relativize(path, sourcePrefix)); + } + + private static String stagingPath(String originalPath, String stagingLocation) { + return stagingLocation + fileName(originalPath); + } + + private static String combinePaths(String absolutePath, String relativePath) { + String combined = absolutePath; + if (!combined.endsWith("/")) { + combined += "/"; + } + combined += relativePath; + return combined; + } + + private static String fileName(String path) { + String filename = path; + int lastIndex = path.lastIndexOf(File.separator); + if (lastIndex != -1) { + filename = path.substring(lastIndex + 1); + } + return filename; + } + + private String getMetadataLocation(Table tbl) { + String currentMetadataPath = + ((HasTableOperations) tbl).operations().current().metadataFileLocation(); + int lastIndex = currentMetadataPath.lastIndexOf(File.separator); + String metadataDir = ""; + if (lastIndex != -1) { + metadataDir = currentMetadataPath.substring(0, lastIndex + 1); + } + + Preconditions.checkArgument( + !metadataDir.isEmpty(), "Failed to get the metadata file root directory"); + return metadataDir; + } + + static class RewriteResult { + Set toRewrite = Sets.newHashSet(); + Set> copyPlan = Sets.newHashSet(); + + RewriteResult() {} + + RewriteResult(RewriteResult r1, RewriteResult r2) { + toRewrite.addAll(r1.toRewrite); + toRewrite.addAll(r2.toRewrite); + copyPlan.addAll(r1.copyPlan); + copyPlan.addAll(r2.copyPlan); + } + } +} diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/SparkActions.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/SparkActions.java index ba9fa2e7b4db..aa4ef987e788 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/SparkActions.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/SparkActions.java @@ -108,4 +108,9 @@ public ComputeTableStats computeTableStats(Table table) { public RemoveDanglingDeleteFiles removeDanglingDeleteFiles(Table table) { return new RemoveDanglingDeletesSparkAction(spark, table); } + + @Override + public RewriteTablePathSparkAction rewriteTablePath(Table table) { + return new RewriteTablePathSparkAction(spark, table); + } } diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java new file mode 100644 index 000000000000..bfe7be5b12ed --- /dev/null +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java @@ -0,0 +1,974 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.actions; + +import static org.apache.iceberg.types.Types.NestedField.optional; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.io.File; +import java.io.IOException; +import java.net.URI; +import java.nio.file.Path; +import java.util.List; +import java.util.Map; +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.BaseTable; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.GenericStatisticsFile; +import org.apache.iceberg.HasTableOperations; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.StaticTableOperations; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.TableMetadataParser; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.TestHelpers; +import org.apache.iceberg.actions.ActionsProvider; +import org.apache.iceberg.actions.RewriteTablePath; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.data.FileHelpers; +import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.hadoop.HadoopTables; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.spark.SparkCatalog; +import org.apache.iceberg.spark.TestBase; +import org.apache.iceberg.spark.source.ThreeColumnRecord; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.Pair; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoder; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.Row; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; +import scala.Tuple2; + +public class TestRewriteTablePathsAction extends TestBase { + + @TempDir private Path staging; + @TempDir private Path tableDir; + @TempDir private Path newTableDir; + @TempDir private Path targetTableDir; + + protected ActionsProvider actions() { + return SparkActions.get(); + } + + private static final HadoopTables TABLES = new HadoopTables(new Configuration()); + protected static final Schema SCHEMA = + new Schema( + optional(1, "c1", Types.IntegerType.get()), + optional(2, "c2", Types.StringType.get()), + optional(3, "c3", Types.StringType.get())); + + protected String tableLocation = null; + private Table table = null; + + private final String ns = "testns"; + private final String backupNs = "backupns"; + + @BeforeEach + public void setupTableLocation() throws Exception { + this.tableLocation = tableDir.toFile().toURI().toString(); + this.table = createATableWith2Snapshots(tableLocation); + createNameSpaces(); + } + + @AfterEach + public void cleanupTableSetup() throws Exception { + dropNameSpaces(); + } + + private Table createATableWith2Snapshots(String location) { + return createTableWithSnapshots(location, 2); + } + + private Table createTableWithSnapshots(String location, int snapshotNumber) { + return createTableWithSnapshots(location, snapshotNumber, Maps.newHashMap()); + } + + protected Table createTableWithSnapshots( + String location, int snapshotNumber, Map properties) { + Table newTable = TABLES.create(SCHEMA, PartitionSpec.unpartitioned(), properties, location); + + List records = + Lists.newArrayList(new ThreeColumnRecord(1, "AAAAAAAAAA", "AAAA")); + + Dataset df = spark.createDataFrame(records, ThreeColumnRecord.class).coalesce(1); + + for (int i = 0; i < snapshotNumber; i++) { + df.select("c1", "c2", "c3").write().format("iceberg").mode("append").save(location); + } + + return newTable; + } + + private void createNameSpaces() { + sql("CREATE DATABASE IF NOT EXISTS %s", ns); + sql("CREATE DATABASE IF NOT EXISTS %s", backupNs); + } + + private void dropNameSpaces() { + sql("DROP DATABASE IF EXISTS %s CASCADE", ns); + sql("DROP DATABASE IF EXISTS %s CASCADE", backupNs); + } + + @Test + public void testRewritePath() throws Exception { + String targetTableLocation = targetTableLocation(); + + // check the data file location before the rebuild + List validDataFiles = + spark + .read() + .format("iceberg") + .load(tableLocation + "#files") + .select("file_path") + .as(Encoders.STRING()) + .collectAsList(); + Assertions.assertEquals(2, validDataFiles.size(), "Should be 2 valid data files"); + + RewriteTablePath.Result result = + actions() + .rewriteTablePath(table) + .rewriteLocationPrefix(tableLocation, targetTableLocation) + .endVersion("v3.metadata.json") + .execute(); + + Assertions.assertEquals( + "v3.metadata.json", result.latestVersion(), "The latest version should be"); + + checkFileNum(3, 2, 2, 9, result); + + // copy the metadata files and data files + copyTableFiles(result); + + // verify the data file path after the rebuild + List validDataFilesAfterRebuilt = + spark + .read() + .format("iceberg") + .load(targetTableLocation + "#files") + .select("file_path") + .as(Encoders.STRING()) + .collectAsList(); + Assertions.assertEquals(2, validDataFilesAfterRebuilt.size(), "Should be 2 valid data files"); + for (String item : validDataFilesAfterRebuilt) { + assertTrue( + item.startsWith(targetTableLocation), "Data file should point to the new location"); + } + + // verify data rows + Dataset resultDF = spark.read().format("iceberg").load(targetTableLocation); + List actualRecords = + resultDF.sort("c1", "c2", "c3").as(Encoders.bean(ThreeColumnRecord.class)).collectAsList(); + + List expectedRecords = Lists.newArrayList(); + expectedRecords.add(new ThreeColumnRecord(1, "AAAAAAAAAA", "AAAA")); + expectedRecords.add(new ThreeColumnRecord(1, "AAAAAAAAAA", "AAAA")); + + Assertions.assertEquals(expectedRecords, actualRecords, "Rows must match"); + } + + @Test + public void testSameLocations() throws Exception { + assertThrows( + IllegalArgumentException.class, + () -> + actions() + .rewriteTablePath(table) + .rewriteLocationPrefix(tableLocation, tableLocation) + .endVersion("v1.metadata.json") + .execute(), + "Source prefix cannot be the same as target prefix"); + } + + @Test + public void testStartVersion() throws Exception { + RewriteTablePath.Result result = + actions() + .rewriteTablePath(table) + .rewriteLocationPrefix(tableLocation, targetTableLocation()) + .startVersion("v2.metadata.json") + .execute(); + + checkFileNum(1, 1, 1, 4, result); + + List> paths = readPathPairList(result.fileListLocation()); + + String currentSnapshotId = String.valueOf(table.currentSnapshot().snapshotId()); + Assertions.assertEquals( + 1, + paths.stream().filter(c -> c._2().contains(currentSnapshotId)).count(), + "Should have the current snapshot file"); + + String parentSnapshotId = String.valueOf(table.currentSnapshot().parentId()); + Assertions.assertEquals( + 0, + paths.stream().filter(c -> c._2().contains(parentSnapshotId)).count(), + "Should NOT have the parent snapshot file"); + } + + @Test + public void testTableWith3Snapshots(@TempDir Path location1, @TempDir Path location2) + throws Exception { + String location = newTableLocation(); + Table tableWith3Snaps = createTableWithSnapshots(location, 3); + RewriteTablePath.Result result = + actions() + .rewriteTablePath(tableWith3Snaps) + .rewriteLocationPrefix(location, toAbsolute(location1)) + .startVersion("v2.metadata.json") + .execute(); + + checkFileNum(2, 2, 2, 8, result); + + // start from the first version + RewriteTablePath.Result result1 = + actions() + .rewriteTablePath(tableWith3Snaps) + .rewriteLocationPrefix(location, toAbsolute(location2)) + .startVersion("v1.metadata.json") + .execute(); + + checkFileNum(3, 3, 3, 12, result1); + } + + @Test + public void testFullTableRewritePath() throws Exception { + RewriteTablePath.Result result = + actions() + .rewriteTablePath(table) + .rewriteLocationPrefix(tableLocation, targetTableLocation()) + .execute(); + + checkFileNum(3, 2, 2, 9, result); + } + + @Test + public void testDeleteDataFile() throws Exception { + List validDataFiles = + spark + .read() + .format("iceberg") + .load(table.location() + "#files") + .select("file_path") + .as(Encoders.STRING()) + .collectAsList(); + + table.newDelete().deleteFile(validDataFiles.stream().findFirst().get()).commit(); + + RewriteTablePath.Result result = + actions() + .rewriteTablePath(table) + .rewriteLocationPrefix(table.location(), targetTableLocation()) + .stagingLocation(stagingLocation()) + .execute(); + + checkFileNum(4, 3, 3, 12, result); + + // copy the metadata files and data files + copyTableFiles(result); + + // verify data rows + Dataset resultDF = spark.read().format("iceberg").load(targetTableLocation()); + Assertions.assertEquals( + 1, + resultDF.as(Encoders.bean(ThreeColumnRecord.class)).count(), + "There are only one row left since we deleted a data file"); + } + + @Test + public void testPositionDeletes() throws Exception { + List> deletes = + Lists.newArrayList( + Pair.of( + table.currentSnapshot().addedDataFiles(table.io()).iterator().next().location(), + 0L)); + + File file = new File(removePrefix(table.location() + "/data/deeply/nested/file.parquet")); + DeleteFile positionDeletes = + FileHelpers.writeDeleteFile( + table, table.io().newOutputFile(file.toURI().toString()), deletes) + .first(); + + table.newRowDelta().addDeletes(positionDeletes).commit(); + + Assertions.assertEquals(1, spark.read().format("iceberg").load(table.location()).count()); + + RewriteTablePath.Result result = + actions() + .rewriteTablePath(table) + .stagingLocation(stagingLocation()) + .rewriteLocationPrefix(table.location(), targetTableLocation()) + .execute(); + + // We have one more snapshot, an additional manifest list, and a new (delete) manifest, + // and an additional position delete + checkFileNum(4, 3, 3, 13, result); + + // copy the metadata files and data files + copyTableFiles(result); + + // Positional delete affects a single row, so only one row must remain + Assertions.assertEquals(1, spark.read().format("iceberg").load(targetTableLocation()).count()); + } + + @Test + public void testEqualityDeletes() throws Exception { + Table sourceTable = createTableWithSnapshots(newTableLocation(), 1); + + // Add more varied data + List records = + Lists.newArrayList( + new ThreeColumnRecord(2, "AAAAAAAAAA", "AAAA"), + new ThreeColumnRecord(3, "BBBBBBBBBB", "BBBB"), + new ThreeColumnRecord(4, "CCCCCCCCCC", "CCCC"), + new ThreeColumnRecord(5, "DDDDDDDDDD", "DDDD")); + spark + .createDataFrame(records, ThreeColumnRecord.class) + .coalesce(1) + .select("c1", "c2", "c3") + .write() + .format("iceberg") + .mode("append") + .save(newTableLocation()); + + Schema deleteRowSchema = sourceTable.schema().select("c2"); + Record dataDelete = GenericRecord.create(deleteRowSchema); + List dataDeletes = + Lists.newArrayList( + dataDelete.copy("c2", "AAAAAAAAAA"), dataDelete.copy("c2", "CCCCCCCCCC")); + File file = new File(removePrefix(sourceTable.location()) + "/data/deeply/nested/file.parquet"); + DeleteFile equalityDeletes = + FileHelpers.writeDeleteFile( + sourceTable, + sourceTable.io().newOutputFile(file.toURI().toString()), + TestHelpers.Row.of(0), + dataDeletes, + deleteRowSchema); + sourceTable.newRowDelta().addDeletes(equalityDeletes).commit(); + + RewriteTablePath.Result result = + actions() + .rewriteTablePath(sourceTable) + .stagingLocation(stagingLocation()) + .rewriteLocationPrefix(newTableLocation(), targetTableLocation()) + .execute(); + + // We have four metadata files: for the table creation, for the initial snapshot, for the + // second append here, and for commit with equality deletes. Thus, we have three manifest lists. + // We have a data file for each snapshot (two with data, one with equality deletes) + checkFileNum(4, 3, 3, 13, result); + + // copy the metadata files and data files + copyTableFiles(result); + + // Equality deletes affect three rows, so just two rows must remain + Assertions.assertEquals(2, spark.read().format("iceberg").load(targetTableLocation()).count()); + } + + @Test + public void testFullTableRewritePathWithDeletedVersionFiles() throws Exception { + String location = newTableLocation(); + Table sourceTable = createTableWithSnapshots(location, 2); + // expire the first snapshot + Table staticTable = newStaticTable(location + "metadata/v2.metadata.json", table.io()); + actions() + .expireSnapshots(sourceTable) + .expireSnapshotId(staticTable.currentSnapshot().snapshotId()) + .execute(); + + // create 100 more snapshots + List records = + Lists.newArrayList(new ThreeColumnRecord(1, "AAAAAAAAAA", "AAAA")); + Dataset df = spark.createDataFrame(records, ThreeColumnRecord.class).coalesce(1); + for (int i = 0; i < 100; i++) { + df.select("c1", "c2", "c3").write().format("iceberg").mode("append").save(location); + } + sourceTable.refresh(); + + // v1/v2/v3.metadata.json has been deleted in v104.metadata.json, and there is no way to find + // the first snapshot + // from the version file history + RewriteTablePath.Result result = + actions() + .rewriteTablePath(sourceTable) + .stagingLocation(stagingLocation()) + .rewriteLocationPrefix(location, targetTableLocation()) + .execute(); + + checkFileNum(101, 101, 101, 406, result); + } + + @Test + public void testRewritePathWithoutSnapshot() throws Exception { + RewriteTablePath.Result result = + actions() + .rewriteTablePath(table) + .rewriteLocationPrefix(tableLocation, newTableLocation()) + .endVersion("v1.metadata.json") + .execute(); + + // the only rebuilt file is v1.metadata.json since it contains no snapshot + checkFileNum(1, 0, 0, 1, result); + } + + @Test + public void testExpireSnapshotBeforeRewrite() throws Exception { + // expire one snapshot + actions().expireSnapshots(table).expireSnapshotId(table.currentSnapshot().parentId()).execute(); + + RewriteTablePath.Result result = + actions() + .rewriteTablePath(table) + .stagingLocation(stagingLocation()) + .rewriteLocationPrefix(table.location(), targetTableLocation()) + .execute(); + + checkFileNum(4, 1, 2, 9, result); + } + + @Test + public void testStartSnapshotWithoutValidSnapshot() throws Exception { + // expire one snapshot + actions().expireSnapshots(table).expireSnapshotId(table.currentSnapshot().parentId()).execute(); + + Assertions.assertEquals( + 1, ((List) table.snapshots()).size(), "1 out 2 snapshot has been removed"); + + RewriteTablePath.Result result = + actions() + .rewriteTablePath(table) + .rewriteLocationPrefix(table.location(), targetTableLocation()) + .stagingLocation(stagingLocation()) + .startVersion("v2.metadata.json") + .execute(); + + // 2 metadata.json, 1 manifest list file, 1 manifest files + checkFileNum(2, 1, 1, 5, result); + } + + @Test + public void testMoveTheVersionExpireSnapshot() throws Exception { + // expire one snapshot + actions().expireSnapshots(table).expireSnapshotId(table.currentSnapshot().parentId()).execute(); + + // only move version v4, which is the version generated by snapshot expiration + RewriteTablePath.Result result = + actions() + .rewriteTablePath(table) + .rewriteLocationPrefix(table.location(), targetTableLocation()) + .stagingLocation(stagingLocation()) + .startVersion("v3.metadata.json") + .execute(); + + // only v4.metadata.json needs to move + checkFileNum(1, 0, 0, 1, result); + } + + @Test + public void testMoveVersionWithInvalidSnapshots() throws Exception { + // expire one snapshot + actions().expireSnapshots(table).expireSnapshotId(table.currentSnapshot().parentId()).execute(); + + Assertions.assertThrows( + UnsupportedOperationException.class, + () -> + actions() + .rewriteTablePath(table) + .rewriteLocationPrefix(table.location(), newTableLocation()) + .stagingLocation(stagingLocation()) + .endVersion("v3.metadata.json") + .execute(), + "Copy a version with invalid snapshots aren't allowed"); + } + + @Test + public void testRollBack() throws Exception { + long secondSnapshotId = table.currentSnapshot().snapshotId(); + + // roll back to the first snapshot(v2) + table.manageSnapshots().setCurrentSnapshot(table.currentSnapshot().parentId()).commit(); + + // add a new snapshot + List records = + Lists.newArrayList(new ThreeColumnRecord(1, "AAAAAAAAAA", "AAAA")); + Dataset df = spark.createDataFrame(records, ThreeColumnRecord.class).coalesce(1); + df.select("c1", "c2", "c3").write().format("iceberg").mode("append").save(table.location()); + + table.refresh(); + + // roll back to the second snapshot(v3) + table.manageSnapshots().setCurrentSnapshot(secondSnapshotId).commit(); + + RewriteTablePath.Result result = + actions() + .rewriteTablePath(table) + .rewriteLocationPrefix(table.location(), newTableLocation()) + .stagingLocation(stagingLocation()) + .execute(); + checkFileNum(6, 3, 3, 15, result); + } + + @Test + public void testWriteAuditPublish() throws Exception { + // enable WAP + table.updateProperties().set(TableProperties.WRITE_AUDIT_PUBLISH_ENABLED, "true").commit(); + spark.conf().set("spark.wap.id", "1"); + + // add a new snapshot without changing the current snapshot of the table + List records = + Lists.newArrayList(new ThreeColumnRecord(1, "AAAAAAAAAA", "AAAA")); + Dataset df = spark.createDataFrame(records, ThreeColumnRecord.class).coalesce(1); + df.select("c1", "c2", "c3").write().format("iceberg").mode("append").save(table.location()); + + table.refresh(); + + RewriteTablePath.Result result = + actions() + .rewriteTablePath(table) + .rewriteLocationPrefix(table.location(), newTableLocation()) + .stagingLocation(stagingLocation()) + .execute(); + + // There are 3 snapshots in total, although the current snapshot is the second one. + checkFileNum(5, 3, 3, 14, result); + } + + @Test + public void testSchemaChange() throws Exception { + // change the schema + table.updateSchema().addColumn("c4", Types.StringType.get()).commit(); + + // copy table + RewriteTablePath.Result result = + actions() + .rewriteTablePath(table) + .rewriteLocationPrefix(table.location(), newTableLocation()) + .stagingLocation(stagingLocation()) + .execute(); + + // check the result + checkFileNum(4, 2, 2, 10, result); + } + + @Test + public void testSnapshotIdInheritanceEnabled() throws Exception { + String sourceTableLocation = newTableLocation(); + Map properties = Maps.newHashMap(); + properties.put(TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED, "true"); + + Table sourceTable = createTableWithSnapshots(sourceTableLocation, 2, properties); + + RewriteTablePath.Result result = + actions() + .rewriteTablePath(sourceTable) + .stagingLocation(stagingLocation()) + .rewriteLocationPrefix(sourceTableLocation, targetTableLocation()) + .execute(); + + checkFileNum(3, 2, 2, 9, result); + } + + @Test + public void testMetadataCompression() throws Exception { + String sourceTableLocation = newTableLocation(); + Map properties = Maps.newHashMap(); + properties.put(TableProperties.METADATA_COMPRESSION, "gzip"); + Table sourceTable = createTableWithSnapshots(sourceTableLocation, 2, properties); + + RewriteTablePath.Result result = + actions() + .rewriteTablePath(sourceTable) + .rewriteLocationPrefix(sourceTableLocation, targetTableLocation()) + .endVersion("v2.gz.metadata.json") + .execute(); + + checkFileNum(2, 1, 1, 5, result); + + result = + actions() + .rewriteTablePath(sourceTable) + .rewriteLocationPrefix(sourceTableLocation, targetTableLocation()) + .startVersion("v1.gz.metadata.json") + .execute(); + + checkFileNum(2, 2, 2, 8, result); + } + + @Test + public void testInvalidArgs() { + RewriteTablePath actions = actions().rewriteTablePath(table); + + assertThrows( + IllegalArgumentException.class, + () -> actions.rewriteLocationPrefix("", null), + "Source prefix('') cannot be empty"); + + assertThrows( + IllegalArgumentException.class, + () -> actions.rewriteLocationPrefix(null, null), + "Source prefix('null') cannot be empty"); + + assertThrows( + IllegalArgumentException.class, + () -> actions.stagingLocation(""), + "Staging location('') cannot be empty"); + + assertThrows( + IllegalArgumentException.class, + () -> actions.stagingLocation(null), + "Staging location('null') cannot be empty"); + + assertThrows( + IllegalArgumentException.class, + () -> actions.startVersion(null), + "Start version('null') cannot be empty"); + + assertThrows( + IllegalArgumentException.class, + () -> actions.endVersion(" "), + "End version cannot be empty"); + + assertThrows( + IllegalArgumentException.class, + () -> actions.endVersion(null), + "End version cannot be empty"); + } + + @Test + public void testStatisticFile() throws IOException { + String sourceTableLocation = newTableLocation(); + Map properties = Maps.newHashMap(); + properties.put("format-version", "2"); + String tableName = "v2tblwithstats"; + Table sourceTable = + createMetastoreTable(sourceTableLocation, properties, "default", tableName, 0); + + TableMetadata metadata = currentMetadata(sourceTable); + TableMetadata withStatistics = + TableMetadata.buildFrom(metadata) + .setStatistics( + 43, + new GenericStatisticsFile( + 43, "/some/path/to/stats/file", 128, 27, ImmutableList.of())) + .build(); + + OutputFile file = sourceTable.io().newOutputFile(metadata.metadataFileLocation()); + TableMetadataParser.overwrite(withStatistics, file); + + assertThrows( + IllegalArgumentException.class, + () -> { + actions() + .rewriteTablePath(sourceTable) + .rewriteLocationPrefix(sourceTableLocation, targetTableLocation()) + .execute(); + }, + "Should fail to copy a table with the statistics field"); + } + + @Test + public void testMetadataCompressionWithMetastoreTable() throws Exception { + Map properties = Maps.newHashMap(); + properties.put(TableProperties.METADATA_COMPRESSION, "gzip"); + Table sourceTable = + createMetastoreTable( + newTableLocation(), properties, "default", "testMetadataCompression", 2); + + TableMetadata currentMetadata = currentMetadata(sourceTable); + + // set the second version as the endVersion + String endVersion = fileName(currentMetadata.previousFiles().get(1).file()); + RewriteTablePath.Result result = + actions() + .rewriteTablePath(sourceTable) + .rewriteLocationPrefix(newTableLocation(), targetTableLocation()) + .endVersion(endVersion) + .execute(); + + checkFileNum(2, 1, 1, 5, result); + + // set the first version as the lastCopiedVersion + String firstVersion = fileName(currentMetadata.previousFiles().get(0).file()); + result = + actions() + .rewriteTablePath(sourceTable) + .rewriteLocationPrefix(newTableLocation(), targetTableLocation()) + .startVersion(firstVersion) + .execute(); + + checkFileNum(2, 2, 2, 8, result); + } + + // Metastore table tests + @Test + public void testMetadataLocationChange() throws Exception { + Table sourceTable = + createMetastoreTable(newTableLocation(), Maps.newHashMap(), "default", "tbl", 1); + String metadataFilePath = currentMetadata(sourceTable).metadataFileLocation(); + + String newMetadataDir = "new-metadata-dir"; + sourceTable + .updateProperties() + .set(TableProperties.WRITE_METADATA_LOCATION, newTableLocation() + newMetadataDir) + .commit(); + + spark.sql("insert into hive.default.tbl values (1, 'AAAAAAAAAA', 'AAAA')"); + sourceTable.refresh(); + + // copy table + RewriteTablePath.Result result = + actions() + .rewriteTablePath(sourceTable) + .rewriteLocationPrefix(newTableLocation(), targetTableLocation()) + .execute(); + + checkFileNum(4, 2, 2, 10, result); + + // pick up a version from the old metadata dir as the end version + RewriteTablePath.Result result1 = + actions() + .rewriteTablePath(sourceTable) + .rewriteLocationPrefix(newTableLocation(), targetTableLocation()) + .endVersion(fileName(metadataFilePath)) + .execute(); + + checkFileNum(2, 1, 1, 5, result1); + + // pick up a version from the old metadata dir as the last copied version + RewriteTablePath.Result result2 = + actions() + .rewriteTablePath(sourceTable) + .rewriteLocationPrefix(newTableLocation(), targetTableLocation()) + .startVersion(fileName(metadataFilePath)) + .execute(); + + checkFileNum(2, 1, 1, 5, result2); + } + + @Test + public void testV2Table() throws Exception { + Map properties = Maps.newHashMap(); + properties.put("format-version", "2"); + properties.put("write.delete.mode", "merge-on-read"); + String tableName = "v2tbl"; + Table sourceTable = + createMetastoreTable(newTableLocation(), properties, "default", tableName, 0); + // ingest data + List records = + Lists.newArrayList( + new ThreeColumnRecord(1, "AAAAAAAAAA", "AAAA"), + new ThreeColumnRecord(2, "AAAAAAAAAA", "AAAA"), + new ThreeColumnRecord(3, "AAAAAAAAAA", "AAAA")); + + Dataset df = spark.createDataFrame(records, ThreeColumnRecord.class).coalesce(1); + + df.select("c1", "c2", "c3") + .write() + .format("iceberg") + .mode("append") + .saveAsTable("hive.default." + tableName); + sourceTable.refresh(); + + // generate position delete files + spark.sql(String.format("delete from hive.default.%s where c1 = 1", tableName)); + sourceTable.refresh(); + + List originalData = + rowsToJava( + spark + .read() + .format("iceberg") + .load("hive.default." + tableName) + .sort("c1", "c2", "c3") + .collectAsList()); + // two rows + Assertions.assertEquals(2, originalData.size()); + + // copy table and check the results + RewriteTablePath.Result result = + actions() + .rewriteTablePath(sourceTable) + .rewriteLocationPrefix(newTableLocation(), targetTableLocation()) + .execute(); + + checkFileNum(3, 2, 2, 9, result); + // one data and one metadata file + copyTableFiles(result); + + // register table + String metadataLocation = currentMetadata(sourceTable).metadataFileLocation(); + String versionFile = fileName(metadataLocation); + String targetTableName = "copiedV2Table"; + TableIdentifier tableIdentifier = TableIdentifier.of("default", targetTableName); + catalog.registerTable(tableIdentifier, targetTableLocation() + "/metadata/" + versionFile); + + List copiedData = + rowsToJava( + spark + .read() + .format("iceberg") + .load("hive.default." + targetTableName) + .sort("c1", "c2", "c3") + .collectAsList()); + + assertEquals("Rows must match", originalData, copiedData); + } + + protected void checkFileNum( + int versionFileCount, + int manifestListCount, + int manifestFileCount, + int totalCount, + RewriteTablePath.Result result) { + List filesToMove = + spark + .read() + .format("text") + .load(result.fileListLocation()) + .as(Encoders.STRING()) + .collectAsList(); + Assertions.assertEquals(totalCount, filesToMove.size(), "Wrong total file count"); + Assertions.assertEquals( + versionFileCount, + filesToMove.stream().filter(f -> f.endsWith(".metadata.json")).count(), + "Wrong rebuilt version file count"); + Assertions.assertEquals( + manifestListCount, + filesToMove.stream().filter(f -> f.contains("snap-")).count(), + "Wrong rebuilt Manifest list file count"); + Assertions.assertEquals( + manifestFileCount, + filesToMove.stream().filter(f -> f.endsWith("-m0.avro")).count(), + "Wrong rebuilt Manifest file file count"); + } + + protected String newTableLocation() throws IOException { + return toAbsolute(newTableDir); + } + + protected String targetTableLocation() throws IOException { + return toAbsolute(targetTableDir); + } + + protected String stagingLocation() throws IOException { + return toAbsolute(staging); + } + + protected String toAbsolute(Path relative) throws IOException { + return relative.toFile().toURI().toString(); + } + + private void copyTableFiles(RewriteTablePath.Result result) throws Exception { + List> filesToMove = readPathPairList(result.fileListLocation()); + + for (Tuple2 pathPair : filesToMove) { + FileUtils.copyFile(new File(URI.create(pathPair._1())), new File(URI.create(pathPair._2()))); + } + } + + private String removePrefix(String path) { + return path.substring(path.lastIndexOf(":") + 1); + } + + protected Table newStaticTable(String metadataFileLocation, FileIO io) { + StaticTableOperations ops = new StaticTableOperations(metadataFileLocation, io); + return new BaseTable(ops, metadataFileLocation); + } + + private List> readPathPairList(String path) { + Encoder> encoder = Encoders.tuple(Encoders.STRING(), Encoders.STRING()); + return spark + .read() + .format("csv") + .schema(encoder.schema()) + .load(path) + .as(encoder) + .collectAsList(); + } + + private Table createMetastoreTable( + String location, + Map properties, + String namespace, + String tableName, + int snapshotNumber) { + spark.conf().set("spark.sql.catalog.hive", SparkCatalog.class.getName()); + spark.conf().set("spark.sql.catalog.hive.type", "hive"); + spark.conf().set("spark.sql.catalog.hive.default-namespace", "default"); + spark.conf().set("spark.sql.catalog.hive.cache-enabled", "false"); + + StringBuilder propertiesStr = new StringBuilder(); + properties.forEach((k, v) -> propertiesStr.append("'" + k + "'='" + v + "',")); + String tblProperties = + propertiesStr.substring(0, propertiesStr.length() > 0 ? propertiesStr.length() - 1 : 0); + + sql("DROP TABLE IF EXISTS hive.%s.%s", namespace, tableName); + if (tblProperties.isEmpty()) { + String sqlStr = + String.format( + "CREATE TABLE hive.%s.%s (c1 bigint, c2 string, c3 string)", namespace, tableName); + if (!location.isEmpty()) { + sqlStr = String.format("%s USING iceberg LOCATION '%s'", sqlStr, location); + } + sql(sqlStr); + } else { + String sqlStr = + String.format( + "CREATE TABLE hive.%s.%s (c1 bigint, c2 string, c3 string)", namespace, tableName); + if (!location.isEmpty()) { + sqlStr = String.format("%s USING iceberg LOCATION '%s'", sqlStr, location); + } + + sqlStr = String.format("%s TBLPROPERTIES (%s)", sqlStr, tblProperties); + sql(sqlStr); + } + + for (int i = 0; i < snapshotNumber; i++) { + sql("insert into hive.%s.%s values (%s, 'AAAAAAAAAA', 'AAAA')", namespace, tableName, i); + } + return catalog.loadTable(TableIdentifier.of(namespace, tableName)); + } + + private static String fileName(String path) { + String filename = path; + int lastIndex = path.lastIndexOf(File.separator); + if (lastIndex != -1) { + filename = path.substring(lastIndex + 1); + } + return filename; + } + + private TableMetadata currentMetadata(Table tbl) { + return ((HasTableOperations) tbl).operations().current(); + } +}