From 285ed6bd2434ac760dba61399a8156ecdb4c1c47 Mon Sep 17 00:00:00 2001 From: Fred Storage Liu Date: Tue, 29 Oct 2024 13:06:24 -0700 Subject: [PATCH] Skip getFileStatus call during iceberg to delta clone (#3825) #### Which Delta project/connector is this regarding? - [x] Spark - [ ] Standalone - [ ] Flink - [ ] Kernel - [ ] Other (fill in here) ## Description use snapshot time as the Delta AddFile modificationTime ## How was this patch tested? UT ## Does this PR introduce _any_ user-facing changes? No --- .../spark/sql/delta/IcebergFileManifest.scala | 27 ++++++++++++------- .../delta/commands/CloneTableCommand.scala | 2 +- .../sql/delta/sources/DeltaSQLConf.scala | 9 +++++++ 3 files changed, 28 insertions(+), 10 deletions(-) diff --git a/iceberg/src/main/scala/org/apache/spark/sql/delta/IcebergFileManifest.scala b/iceberg/src/main/scala/org/apache/spark/sql/delta/IcebergFileManifest.scala index 653430582bc..8521aba7c70 100644 --- a/iceberg/src/main/scala/org/apache/spark/sql/delta/IcebergFileManifest.scala +++ b/iceberg/src/main/scala/org/apache/spark/sql/delta/IcebergFileManifest.scala @@ -109,12 +109,16 @@ class IcebergFileManifest( spark.sessionState.conf.getConf(DeltaSQLConf.DELTA_CONVERT_ICEBERG_UNSAFE_MOR_TABLE_ENABLE) var numFiles = 0L + val skipGetFileStatus = spark.sessionState.conf.getConf( + DeltaSQLConf.DELTA_CLONE_ICEBERG_SKIP_GETFILESTATUS) + val snapshotTimestamp: Option[Long] = Option(table.currentSnapshot()).map(_.timestampMillis()) + val res = table.newScan().planFiles().iterator().asScala.grouped(schemaBatchSize).map { batch => logInfo(log"Getting file statuses for a batch of " + log"${MDC(DeltaLogKeys.BATCH_SIZE, batch.size)} of files; " + log"finished ${MDC(DeltaLogKeys.NUM_FILES, numFiles)} files so far") numFiles += batch.length - val filePathWithPartValues = batch.map { fileScanTask => + val filePathWithPartValuesAndSize = batch.map { fileScanTask => val filePath = fileScanTask.file().path().toString // If an Iceberg table has deletion file associated with the data file (Supported in // Iceberg V2, either position deletes or equality deletes), we could not convert directly. @@ -129,18 +133,23 @@ class IcebergFileManifest( Some(convertIcebergPartitionToPartitionValues( fileScanTask.file().partition())) } else None - (filePath, partitionValues) + (filePath, partitionValues, fileScanTask.file.fileSizeInBytes()) } - val numParallelism = Math.min(Math.max(filePathWithPartValues.size, 1), + val numParallelism = Math.min(Math.max(filePathWithPartValuesAndSize.size, 1), spark.sparkContext.defaultParallelism) - val rdd = spark.sparkContext.parallelize(filePathWithPartValues, numParallelism) + val rdd = spark.sparkContext.parallelize(filePathWithPartValuesAndSize, numParallelism) .mapPartitions { iterator => - iterator.map { case (filePath, partValues) => - val path = new Path(filePath) - val fs = path.getFileSystem(conf.value.value) - val fileStatus = fs.getFileStatus(path) - ConvertTargetFile(SerializableFileStatus.fromStatus(fileStatus), partValues) + iterator.map { case (filePath, partValues, size) => + val serializableFileStatus = (skipGetFileStatus, snapshotTimestamp) match { + case (true, Some(ts)) => + SerializableFileStatus(filePath, size, isDir = false, ts) + case _ => + val path = new Path(filePath) + val fs = path.getFileSystem(conf.value.value) + SerializableFileStatus.fromStatus(fs.getFileStatus(path)) + } + ConvertTargetFile(serializableFileStatus, partValues) } } spark.createDataset(rdd) diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/commands/CloneTableCommand.scala b/spark/src/main/scala/org/apache/spark/sql/delta/commands/CloneTableCommand.scala index 71c04af87a6..9eb87181e91 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/commands/CloneTableCommand.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/commands/CloneTableCommand.scala @@ -243,7 +243,7 @@ abstract class CloneConvertedSource(spark: SparkSession) extends CloneSource { val basePath = new Path(baseDir) val fs = basePath.getFileSystem(conf.value.value) targetFile.map(ConvertUtils.createAddFile( - _, basePath, fs, SQLConf.get, Some(partitionSchema))) + _, basePath, fs, SQLConf.get, Some(partitionSchema), useAbsolutePath = true)) } } } diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala b/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala index 5e6450f920f..323e377247d 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala @@ -1705,6 +1705,15 @@ trait DeltaSQLConfBase { .booleanConf .createWithDefault(true) + val DELTA_CLONE_ICEBERG_SKIP_GETFILESTATUS = { + buildConf("clone.IcebergSkipGetFileStatus") + .internal() + .doc("If clone with Iceberg source can skip getFileStatus and " + + "use snapshot timestamp as the modificationTime for Delta AddFile") + .booleanConf + .createWithDefault(true) + } + val DELTA_OPTIMIZE_METADATA_QUERY_ENABLED = buildConf("optimizeMetadataQuery.enabled") .internal()