From 7d41fb7bbf63af33ad228007dd6ba3800b4efe81 Mon Sep 17 00:00:00 2001 From: Arun Ravi M V Date: Fri, 23 Feb 2024 18:36:59 -0800 Subject: [PATCH] Use inventory reservoir as source for all files and dirs - Currently, users have large tables with daily/hourly partitions for many years, among all these partitions only recent ones are subjected to change due to job reruns, corrections, and late arriving events. - When Vacuum is run on these tables, the listing of files is performed on all the partitions and it runs for several hours/days. This duration grows as tables grow and vacuum becomes a major overhead for customers especially when they have hundreds or thousands of such delta tables. File system scan takes the most amount of time in Vacuum operation for large tables, mostly due to the parallelism achievable and API throttling on the object stores. - This change provides a way for users to pass a reservoir of files generated externally (eg: from inventory reports of cloud stores) as a delta table or as a spark SQL query (having a predefined schema). The vacuum operation when provided with such a reservoir data frame will skip the listing operation and use it as a source of all files in the storage. "Resolves #1691". - Unit Testing (` build/sbt 'testOnly org.apache.spark.sql.delta.DeltaVacuumSuite'`) yes, the MR accepts an optional method to pass inventory. `VACUUM table_name [USING INVENTORY ] [RETAIN num HOURS] [DRY RUN]` `VACUUM table_name [USING INVENTORY ] [RETAIN num HOURS] [DRY RUN]` eg: `VACUUM test_db.table using inventory select * from reservoir_table RETAIN 168 HOURS dry run` Closes delta-io/delta#2257 Co-authored-by: Arun Ravi M V Signed-off-by: Bart Samwel GitOrigin-RevId: 2bc824e524c677dd5f3a7ed787762df60c3b6d86 --- .../io/delta/sql/parser/DeltaSqlBase.g4 | 13 +- .../resources/error/delta-error-classes.json | 6 + .../io/delta/sql/parser/DeltaSqlParser.scala | 10 +- .../tables/execution/VacuumTableCommand.scala | 20 +- .../apache/spark/sql/delta/DeltaErrors.scala | 7 +- .../sql/delta/commands/VacuumCommand.scala | 82 ++++++-- .../sql/parser/DeltaSqlParserSuite.scala | 19 +- .../spark/sql/delta/DeltaVacuumSuite.scala | 185 ++++++++++++++++++ 8 files changed, 310 insertions(+), 32 deletions(-) diff --git a/spark/src/main/antlr4/io/delta/sql/parser/DeltaSqlBase.g4 b/spark/src/main/antlr4/io/delta/sql/parser/DeltaSqlBase.g4 index 495252921e5..da42fc03536 100644 --- a/spark/src/main/antlr4/io/delta/sql/parser/DeltaSqlBase.g4 +++ b/spark/src/main/antlr4/io/delta/sql/parser/DeltaSqlBase.g4 @@ -73,6 +73,7 @@ singleStatement // If you add keywords here that should not be reserved, add them to 'nonReserved' list. statement : VACUUM (path=STRING | table=qualifiedName) + (USING INVENTORY (inventoryTable=qualifiedName | LEFT_PAREN inventoryQuery=subQuery RIGHT_PAREN))? (RETAIN number HOURS)? (DRY RUN)? #vacuumTable | (DESC | DESCRIBE) DETAIL (path=STRING | table=qualifiedName) #describeDeltaDetail | GENERATE modeName=identifier FOR TABLE table=qualifiedName #generate @@ -214,6 +215,14 @@ predicateToken : .+? ; +// We don't have an expression rule in our grammar here, so we just grab the tokens and defer +// parsing them to later. Although this is the same as `exprToken`, `predicateToken`, we have to re-define it to +// workaround an ANTLR issue (https://github.com/delta-io/delta/issues/1205). Should we remove this after +// https://github.com/delta-io/delta/pull/1800 +subQuery + : .+? + ; + // We don't have an expression rule in our grammar here, so we just grab the tokens and defer // parsing them to later. exprToken @@ -223,7 +232,7 @@ exprToken // Add keywords here so that people's queries don't break if they have a column name as one of // these tokens nonReserved - : VACUUM | RETAIN | HOURS | DRY | RUN + : VACUUM | USING | INVENTORY | RETAIN | HOURS | DRY | RUN | CONVERT | TO | DELTA | PARTITIONED | BY | DESC | DESCRIBE | LIMIT | DETAIL | GENERATE | FOR | TABLE | CHECK | EXISTS | OPTIMIZE @@ -266,6 +275,7 @@ HISTORY: 'HISTORY'; HOURS: 'HOURS'; ICEBERG_COMPAT_VERSION: 'ICEBERG_COMPAT_VERSION'; IF: 'IF'; +INVENTORY: 'INVENTORY'; LEFT_PAREN: '('; LIMIT: 'LIMIT'; LOCATION: 'LOCATION'; @@ -296,6 +306,7 @@ TO: 'TO'; TRUE: 'TRUE'; UNIFORM: 'UNIFORM'; UPGRADE: 'UPGRADE'; +USING: 'USING'; VACUUM: 'VACUUM'; VERSION: 'VERSION'; WHERE: 'WHERE'; diff --git a/spark/src/main/resources/error/delta-error-classes.json b/spark/src/main/resources/error/delta-error-classes.json index 18bf160f83e..3363d5c53fc 100644 --- a/spark/src/main/resources/error/delta-error-classes.json +++ b/spark/src/main/resources/error/delta-error-classes.json @@ -1176,6 +1176,12 @@ ], "sqlState" : "22006" }, + "DELTA_INVALID_INVENTORY_SCHEMA" : { + "message" : [ + "The schema for the specified INVENTORY does not contain all of the required fields. Required fields are: " + ], + "sqlState" : "42000" + }, "DELTA_INVALID_ISOLATION_LEVEL" : { "message" : [ "invalid isolation level ''" diff --git a/spark/src/main/scala/io/delta/sql/parser/DeltaSqlParser.scala b/spark/src/main/scala/io/delta/sql/parser/DeltaSqlParser.scala index 0381477ec21..d4ece43b8ce 100644 --- a/spark/src/main/scala/io/delta/sql/parser/DeltaSqlParser.scala +++ b/spark/src/main/scala/io/delta/sql/parser/DeltaSqlParser.scala @@ -319,10 +319,12 @@ class DeltaSqlAstBuilder extends DeltaSqlBaseBaseVisitor[AnyRef] { */ override def visitVacuumTable(ctx: VacuumTableContext): AnyRef = withOrigin(ctx) { VacuumTableCommand( - Option(ctx.path).map(string), - Option(ctx.table).map(visitTableIdentifier), - Option(ctx.number).map(_.getText.toDouble), - ctx.RUN != null) + path = Option(ctx.path).map(string), + table = Option(ctx.table).map(visitTableIdentifier), + inventoryTable = Option(ctx.inventoryTable).map(visitTableIdentifier), + inventoryQuery = Option(ctx.inventoryQuery).map(extractRawText), + horizonHours = Option(ctx.number).map(_.getText.toDouble), + dryRun = ctx.RUN != null) } /** Provides a list of unresolved attributes for multi dimensional clustering. */ diff --git a/spark/src/main/scala/io/delta/tables/execution/VacuumTableCommand.scala b/spark/src/main/scala/io/delta/tables/execution/VacuumTableCommand.scala index 3d7933467c3..6f12986c676 100644 --- a/spark/src/main/scala/io/delta/tables/execution/VacuumTableCommand.scala +++ b/spark/src/main/scala/io/delta/tables/execution/VacuumTableCommand.scala @@ -20,6 +20,8 @@ import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, UnaryNode} import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.analysis.UnresolvedTable +import org.apache.spark.sql.delta.catalog.DeltaTableV2 import org.apache.spark.sql.delta.{DeltaErrors, DeltaLog, DeltaTableIdentifier, DeltaTableUtils, UnresolvedDeltaPathOrIdentifier} import org.apache.spark.sql.delta.commands.DeltaCommand import org.apache.spark.sql.delta.commands.VacuumCommand @@ -30,12 +32,16 @@ import org.apache.spark.sql.types.StringType /** * The `vacuum` command implementation for Spark SQL. Example SQL: * {{{ - * VACUUM ('/path/to/dir' | delta.`/path/to/dir`) [RETAIN number HOURS] [DRY RUN]; + * VACUUM ('/path/to/dir' | delta.`/path/to/dir`) + * [USING INVENTORY (delta.`/path/to/dir`| ( sub_query ))] + * [RETAIN number HOURS] [DRY RUN]; * }}} */ case class VacuumTableCommand( override val child: LogicalPlan, horizonHours: Option[Double], + inventoryTable: Option[LogicalPlan], + inventoryQuery: Option[String], dryRun: Boolean) extends RunnableCommand with UnaryNode with DeltaCommand { override val output: Seq[Attribute] = @@ -53,7 +59,11 @@ case class VacuumTableCommand( "VACUUM", DeltaTableIdentifier(path = Some(deltaTable.path.toString))) } - VacuumCommand.gc(sparkSession, deltaTable.deltaLog, dryRun, horizonHours).collect() + val inventory = inventoryTable.map(sparkSession.sessionState.analyzer.execute) + .map(p => Some(getDeltaTable(p, "VACUUM").toDf(sparkSession))) + .getOrElse(inventoryQuery.map(sparkSession.sql)) + VacuumCommand.gc(sparkSession, deltaTable.deltaLog, dryRun, horizonHours, + inventory).collect() } } @@ -61,9 +71,13 @@ object VacuumTableCommand { def apply( path: Option[String], table: Option[TableIdentifier], + inventoryTable: Option[TableIdentifier], + inventoryQuery: Option[String], horizonHours: Option[Double], dryRun: Boolean): VacuumTableCommand = { val child = UnresolvedDeltaPathOrIdentifier(path, table, "VACUUM") - VacuumTableCommand(child, horizonHours, dryRun) + val unresolvedInventoryTable = inventoryTable.map(rt => + UnresolvedTable(rt.nameParts, "VACUUM", relationTypeMismatchHint = None)) + VacuumTableCommand(child, horizonHours, unresolvedInventoryTable, inventoryQuery, dryRun) } } diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala index f333f872713..1125df4ea8d 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala @@ -566,7 +566,12 @@ trait DeltaErrorsBase errorClass = "DELTA_INVALID_CHARACTERS_IN_COLUMN_NAME", messageParameters = Array(name)) } - + def invalidInventorySchema(expectedSchema: String): Throwable = { + new DeltaAnalysisException( + errorClass = "DELTA_INVALID_INVENTORY_SCHEMA", + messageParameters = Array(expectedSchema) + ) + } def invalidIsolationLevelException(s: String): Throwable = { new DeltaIllegalArgumentException( errorClass = "DELTA_INVALID_ISOLATION_LEVEL", diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/commands/VacuumCommand.scala b/spark/src/main/scala/org/apache/spark/sql/delta/commands/VacuumCommand.scala index 45cc79cd782..d529f1f1c00 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/commands/VacuumCommand.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/commands/VacuumCommand.scala @@ -20,9 +20,7 @@ package org.apache.spark.sql.delta.commands import java.net.URI import java.util.Date import java.util.concurrent.TimeUnit - import scala.collection.JavaConverters._ - import org.apache.spark.sql.delta._ import org.apache.spark.sql.delta.actions.{AddFile, FileAction, RemoveFile} import org.apache.spark.sql.delta.sources.DeltaSQLConf @@ -31,12 +29,12 @@ import org.apache.spark.sql.delta.util.DeltaFileOperations.tryDeleteNonRecursive import com.fasterxml.jackson.databind.annotation.JsonDeserialize import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} - import org.apache.spark.broadcast.Broadcast -import org.apache.spark.sql.{Column, DataFrame, Dataset, SparkSession} +import org.apache.spark.sql.{Column, DataFrame, Dataset, Encoder, SparkSession} import org.apache.spark.sql.execution.metric.SQLMetric import org.apache.spark.sql.execution.metric.SQLMetrics.createMetric -import org.apache.spark.sql.functions.{col, count, sum} +import org.apache.spark.sql.functions.{col, count, lit, replace, startswith, substr, sum} +import org.apache.spark.sql.types.{BooleanType, LongType, StringType, StructField, StructType} import org.apache.spark.util.{Clock, SerializableConfiguration, SystemClock} /** @@ -51,6 +49,21 @@ import org.apache.spark.util.{Clock, SerializableConfiguration, SystemClock} object VacuumCommand extends VacuumCommandImpl with Serializable { case class FileNameAndSize(path: String, length: Long) + + /** + * path : fully qualified uri + * length: size in bytes + * isDir: boolean indicating if it is a directory + * modificationTime: file update time in milliseconds + */ + val INVENTORY_SCHEMA = StructType( + Seq( + StructField("path", StringType), + StructField("length", LongType), + StructField("isDir", BooleanType), + StructField("modificationTime", LongType) + )) + /** * Additional check on retention duration to prevent people from shooting themselves in the foot. */ @@ -125,19 +138,55 @@ object VacuumCommand extends VacuumCommandImpl with Serializable { }.toDF("path") } + def getFilesFromInventory(basePath: String, + partitionColumns: Seq[String], + inventory: DataFrame): Dataset[SerializableFileStatus] = { + implicit val fileNameAndSizeEncoder: Encoder[SerializableFileStatus] = + org.apache.spark.sql.Encoders.product[SerializableFileStatus] + + // filter out required fields from provided inventory DF + val inventorySchema = StructType( + inventory.schema.fields.filter(f => INVENTORY_SCHEMA.fields.map(_.name).contains(f.name)) + ) + if (inventorySchema != INVENTORY_SCHEMA) { + throw DeltaErrors.invalidInventorySchema(INVENTORY_SCHEMA.treeString) + } + + inventory + .filter(startswith(col("path"), lit(s"$basePath/"))) + .select( + substr(col("path"), lit(basePath.length + 2)).as("path"), + col("length"), col("isDir"), col("modificationTime") + ) + .flatMap { + row => + val path = row.getString(0) + if(!DeltaTableUtils.isHiddenDirectory(partitionColumns, path)) { + Seq(SerializableFileStatus(path, + row.getLong(1), row.getBoolean(2), row.getLong(3))) + } else { + None + } + } + } + /** - * Clears all untracked files and folders within this table. First lists all the files and - * directories in the table, and gets the relative paths with respect to the base of the - * table. Then it gets the list of all tracked files for this table, which may or may not - * be within the table base path, and gets the relative paths of all the tracked files with - * respect to the base of the table. Files outside of the table path will be ignored. - * Then we take a diff of the files and delete directories that were already empty, and all files - * that are within the table that are no longer tracked. + * Clears all untracked files and folders within this table. If the inventory is not provided + * then the command first lists all the files and directories in the table, if inventory is + * provided then it will be used for identifying files and directories within the table and + * gets the relative paths with respect to the base of the table. Then the command gets the + * list of all tracked files for this table, which may or may not be within the table base path, + * and gets the relative paths of all the tracked files with respect to the base of the table. + * Files outside of the table path will be ignored. Then we take a diff of the files and delete + * directories that were already empty, and all files that are within the table that are no longer + * tracked. * * @param dryRun If set to true, no files will be deleted. Instead, we will list all files and * directories that will be cleared. * @param retentionHours An optional parameter to override the default Delta tombstone retention * period + * @param inventory An optional dataframe of files and directories within the table generated + * from sources like blob store inventory report * @return A Dataset containing the paths of the files/folders to delete in dryRun mode. Otherwise * returns the base path of the table. */ @@ -146,6 +195,7 @@ object VacuumCommand extends VacuumCommandImpl with Serializable { deltaLog: DeltaLog, dryRun: Boolean = true, retentionHours: Option[Double] = None, + inventory: Option[DataFrame] = None, clock: Clock = new SystemClock): DataFrame = { recordDeltaOperation(deltaLog, "delta.gc") { @@ -189,8 +239,9 @@ object VacuumCommand extends VacuumCommandImpl with Serializable { val partitionColumns = snapshot.metadata.partitionSchema.fieldNames val parallelism = spark.sessionState.conf.parallelPartitionDiscoveryParallelism - - val allFilesAndDirs = DeltaFileOperations.recursiveListDirs( + val allFilesAndDirsWithDuplicates = inventory match { + case Some(inventoryDF) => getFilesFromInventory(basePath, partitionColumns, inventoryDF) + case None => DeltaFileOperations.recursiveListDirs( spark, Seq(basePath), hadoopConf, @@ -198,7 +249,8 @@ object VacuumCommand extends VacuumCommandImpl with Serializable { hiddenFileNameFilter = DeltaTableUtils.isHiddenDirectory(partitionColumns, _), fileListingParallelism = Option(parallelism) ) - .groupByKey(_.path) + } + val allFilesAndDirs = allFilesAndDirsWithDuplicates.groupByKey(_.path) .mapGroups { (k, v) => val duplicates = v.toSeq // of all the duplicates we can return the newest file. diff --git a/spark/src/test/scala/io/delta/sql/parser/DeltaSqlParserSuite.scala b/spark/src/test/scala/io/delta/sql/parser/DeltaSqlParserSuite.scala index ecdbe0d1856..6d60358e6d7 100644 --- a/spark/src/test/scala/io/delta/sql/parser/DeltaSqlParserSuite.scala +++ b/spark/src/test/scala/io/delta/sql/parser/DeltaSqlParserSuite.scala @@ -40,27 +40,30 @@ class DeltaSqlParserSuite extends SparkFunSuite with SQLHelper { // Setting `delegate` to `null` is fine. The following tests don't need to touch `delegate`. val parser = new DeltaSqlParser(null) assert(parser.parsePlan("vacuum 123_") === - VacuumTableCommand(UnresolvedTable(Seq("123_"), "VACUUM", None), None, false)) + VacuumTableCommand(UnresolvedTable(Seq("123_"), "VACUUM", None), None, None, None, false)) assert(parser.parsePlan("vacuum 1a.123_") === - VacuumTableCommand(UnresolvedTable(Seq("1a", "123_"), "VACUUM", None), None, false)) + VacuumTableCommand(UnresolvedTable(Seq("1a", "123_"), "VACUUM", None), + None, None, None, false)) assert(parser.parsePlan("vacuum a.123A") === - VacuumTableCommand(UnresolvedTable(Seq("a", "123A"), "VACUUM", None), None, false)) + VacuumTableCommand(UnresolvedTable(Seq("a", "123A"), "VACUUM", None), + None, None, None, false)) assert(parser.parsePlan("vacuum a.123E3_column") === - VacuumTableCommand(UnresolvedTable(Seq("a", "123E3_column"), "VACUUM", None), None, false)) + VacuumTableCommand(UnresolvedTable(Seq("a", "123E3_column"), "VACUUM", None), + None, None, None, false)) assert(parser.parsePlan("vacuum a.123D_column") === VacuumTableCommand(UnresolvedTable(Seq("a", "123D_column"), "VACUUM", None), - None, false)) + None, None, None, false)) assert(parser.parsePlan("vacuum a.123BD_column") === VacuumTableCommand(UnresolvedTable(Seq("a", "123BD_column"), "VACUUM", None), - None, false)) + None, None, None, false)) assert(parser.parsePlan("vacuum delta.`/tmp/table`") === VacuumTableCommand(UnresolvedTable(Seq("delta", "/tmp/table"), "VACUUM", None), - None, false)) + None, None, None, false)) assert(parser.parsePlan("vacuum \"/tmp/table\"") === VacuumTableCommand( - UnresolvedPathBasedDeltaTable("/tmp/table", Map.empty, "VACUUM"), None, false)) + UnresolvedPathBasedDeltaTable("/tmp/table", Map.empty, "VACUUM"), None, None, None, false)) } test("Restore command is parsed as expected") { diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaVacuumSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaVacuumSuite.scala index 6a3d21c6e37..f2310ac3de2 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaVacuumSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaVacuumSuite.scala @@ -124,6 +124,9 @@ trait DeltaVacuumSuiteBase extends QueryTest dryRun: Boolean, expectedDf: Seq[String], retentionHours: Option[Double] = None) extends Operation + case class GCByInventory(dryRun: Boolean, expectedDf: Seq[String], + retentionHours: Option[Double] = None, + inventory: Option[DataFrame] = Option.empty[DataFrame]) extends Operation /** Garbage collect the reservoir. */ case class ExecuteVacuumInScala( deltaTable: io.delta.tables.DeltaTable, @@ -224,6 +227,11 @@ trait DeltaVacuumSuiteBase extends QueryTest val result = VacuumCommand.gc(spark, deltaLog, dryRun, retention, clock = clock) val qualified = expectedDf.map(p => fs.makeQualified(new Path(p)).toString) checkDatasetUnorderly(result.as[String], qualified: _*) + case GCByInventory(dryRun, expectedDf, retention, inventory) => + Given("*** Garbage collecting using inventory") + val result = VacuumCommand.gc(spark, deltaLog, dryRun, retention, inventory, clock = clock) + val qualified = expectedDf.map(p => fs.makeQualified(new Path(p)).toString) + checkDatasetUnorderly(result.as[String], qualified: _*) case ExecuteVacuumInScala(deltaTable, expectedDf, retention) => Given("*** Garbage collecting Reservoir using Scala") val result = if (retention.isDefined) { @@ -578,6 +586,183 @@ class DeltaVacuumSuite } } + test("schema validation for vacuum by using inventory dataframe") { + withEnvironment { (tempDir, clock) => + val deltaLog = DeltaLog.forTable(spark, tempDir, clock) + val txn = deltaLog.startTransaction() + val schema = new StructType().add("_underscore_col_", IntegerType).add("n", IntegerType) + val metadata = + Metadata(schemaString = schema.json, partitionColumns = Seq("_underscore_col_")) + txn.commit(metadata :: Nil, DeltaOperations.CreateTable(metadata, isManaged = true)) + val inventorySchema = StructType( + Seq( + StructField("file", StringType), + StructField("size", LongType), + StructField("isDir", BooleanType), + StructField("modificationTime", LongType) + )) + val inventory = spark.createDataFrame( + spark.sparkContext.parallelize(Seq.empty[Row]), inventorySchema) + gcTest(deltaLog, clock)( + ExpectFailure( + GCByInventory(dryRun = false, expectedDf = Seq(tempDir), inventory = Some(inventory)), + classOf[DeltaAnalysisException], + Seq( "The schema for the specified INVENTORY", + "does not contain all of the required fields.", + "Required fields are:", + s"${VacuumCommand.INVENTORY_SCHEMA.treeString}") + ) + ) + } + } + + test("run vacuum by using inventory dataframe") { + withEnvironment { (tempDir, clock) => + val deltaLog = DeltaLog.forTable(spark, tempDir, clock) + val txn = deltaLog.startTransaction() + val schema = new StructType().add("_underscore_col_", IntegerType).add("n", IntegerType) + + // Vacuum should consider partition folders even for clean up even though it starts with `_` + val metadata = + Metadata(schemaString = schema.json, partitionColumns = Seq("_underscore_col_")) + txn.commit(metadata :: Nil, DeltaOperations.CreateTable(metadata, isManaged = true)) + // Create a Seq of Rows containing the data + val data = Seq( + Row(s"${deltaLog.dataPath}", 300000L, true, 0L), + Row(s"${deltaLog.dataPath}/file1.txt", 300000L, false, 0L), + Row(s"${deltaLog.dataPath}/file2.txt", 300000L, false, 0L), + Row(s"${deltaLog.dataPath}/_underscore_col_=10/test.txt", 300000L, false, 0L), + Row(s"${deltaLog.dataPath}/_underscore_col_=10/test2.txt", 300000L, false, 0L), + // Below file is not within Delta table path and should be ignored by vacuum + Row(s"/tmp/random/_underscore_col_=10/test2.txt", 300000L, false, 0L), + // Below are Delta table root location and vacuum must safely handle them + Row(s"${deltaLog.dataPath}", 300000L, true, 0L) + ) + val inventory = spark.createDataFrame(spark.sparkContext.parallelize(data), + VacuumCommand.INVENTORY_SCHEMA) + gcTest(deltaLog, clock)( + CreateFile("file1.txt", commitToActionLog = true, Map("_underscore_col_" -> "10")), + CreateFile("file2.txt", commitToActionLog = false, Map("_underscore_col_" -> "10")), + CreateFile("_underscore_col_=10/test.txt", true, Map("_underscore_col_" -> "10")), + CreateFile("_underscore_col_=10/test2.txt", false, Map("_underscore_col_" -> "10")), + CheckFiles(Seq("file1.txt", "_underscore_col_=10", "file2.txt")), + LogicallyDeleteFile("_underscore_col_=10/test.txt"), + AdvanceClock(defaultTombstoneInterval + 1000), + GCByInventory(dryRun = true, expectedDf = Seq( + s"${deltaLog.dataPath}/file2.txt", + s"${deltaLog.dataPath}/_underscore_col_=10/test.txt", + s"${deltaLog.dataPath}/_underscore_col_=10/test2.txt" + ), inventory = Some(inventory)), + GCByInventory(dryRun = false, expectedDf = Seq(tempDir), inventory = Some(inventory)), + CheckFiles(Seq("file1.txt")), + CheckFiles(Seq("file2.txt", "_underscore_col_=10/test.txt", + "_underscore_col_=10/test2.txt"), exist = false) + ) + } + } + + test("vacuum using inventory delta table and should not touch hidden files") { + withSQLConf(DeltaSQLConf.DELTA_VACUUM_RETENTION_CHECK_ENABLED.key -> "false") { + withEnvironment { (tempDir, clock) => + import testImplicits._ + val path = s"""${tempDir.getCanonicalPath}_data""" + val inventoryPath = s"""${tempDir.getCanonicalPath}_inventory""" + + // Define test delta table + val data = Seq( + (10, 1, "a"), + (10, 2, "a"), + (10, 3, "a"), + (10, 4, "a"), + (10, 5, "a") + ) + data.toDF("v1", "v2", "v3") + .write + .partitionBy("v1", "v2") + .format("delta") + .save(path) + val deltaLog = DeltaLog.forTable(spark, path) + val reservoirData = Seq( + Row(s"${deltaLog.dataPath}/file1.txt", 300000L, false, 0L), + Row(s"${deltaLog.dataPath}/file2.txt", 300000L, false, 0L), + Row(s"${deltaLog.dataPath}/_underscore_col_=10/test.txt", 300000L, false, 0L), + Row(s"${deltaLog.dataPath}/_underscore_col_=10/test2.txt", 300000L, false, 0L) + ) + spark.createDataFrame( + spark.sparkContext.parallelize(reservoirData), VacuumCommand.INVENTORY_SCHEMA) + .write + .format("delta") + .save(inventoryPath) + gcTest(deltaLog, clock)( + CreateFile("file1.txt", commitToActionLog = false), + CreateFile("file2.txt", commitToActionLog = false), + // Delta marks dirs starting with `_` as hidden unless specified as partition folder + CreateFile("_underscore_col_=10/test.txt", false), + CreateFile("_underscore_col_=10/test2.txt", false), + AdvanceClock(defaultTombstoneInterval + 1000) + ) + sql(s"vacuum delta.`$path` using inventory delta.`$inventoryPath` retain 0 hours") + gcTest(deltaLog, clock)( + CheckFiles(Seq("file1.txt", "file2.txt"), exist = false), + // hidden files must not be dropped + CheckFiles(Seq("_underscore_col_=10/test.txt", "_underscore_col_=10/test2.txt")) + ) + } + } + } + + test("vacuum using inventory query and should not touch hidden files") { + withSQLConf(DeltaSQLConf.DELTA_VACUUM_RETENTION_CHECK_ENABLED.key -> "false") { + withEnvironment { (tempDir, clock) => + import testImplicits._ + val path = s"""${tempDir.getCanonicalPath}_data""" + val reservoirPath = s"""${tempDir.getCanonicalPath}_reservoir""" + + // Define test delta table + val data = Seq( + (10, 1, "a"), + (10, 2, "a"), + (10, 3, "a"), + (10, 4, "a"), + (10, 5, "a") + ) + data.toDF("v1", "v2", "v3") + .write + .partitionBy("v1", "v2") + .format("delta") + .save(path) + val deltaLog = DeltaLog.forTable(spark, path) + val reservoirData = Seq( + Row(s"${deltaLog.dataPath}/file1.txt", 300000L, false, 0L), + Row(s"${deltaLog.dataPath}/file2.txt", 300000L, false, 0L), + Row(s"${deltaLog.dataPath}/_underscore_col_=10/test.txt", 300000L, false, 0L), + Row(s"${deltaLog.dataPath}/_underscore_col_=10/test2.txt", 300000L, false, 0L) + ) + spark.createDataFrame( + spark.sparkContext.parallelize(reservoirData), VacuumCommand.INVENTORY_SCHEMA) + .write + .format("delta") + .save(reservoirPath) + gcTest(deltaLog, clock)( + CreateFile("file1.txt", commitToActionLog = false), + CreateFile("file2.txt", commitToActionLog = false), + // Delta marks dirs starting with `_` as hidden unless specified as partition folder + CreateFile("_underscore_col_=10/test.txt", false), + CreateFile("_underscore_col_=10/test2.txt", false) + ) + sql(s"""vacuum delta.`$path` + |using inventory (select * from delta.`$reservoirPath`) + |retain 0 hours""".stripMargin) + gcTest(deltaLog, clock)( + AdvanceClock(defaultTombstoneInterval + 1000), + CheckFiles(Seq("file1.txt", "file2.txt"), exist = false), + // hidden files must not be dropped + CheckFiles(Seq("_underscore_col_=10/test.txt", "_underscore_col_=10/test2.txt")) + ) + } + } + } + test("multiple levels of empty directory deletion") { withEnvironment { (tempDir, clock) => val deltaLog = DeltaLog.forTable(spark, tempDir, clock)