diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 35d60bb514405..9040bd9012ce2 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -50,7 +50,7 @@ import com.google.common.net.InetAddresses import org.apache.commons.codec.binary.Hex import org.apache.commons.lang3.SystemUtils import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.{FileSystem, FileUtil, Path} +import org.apache.hadoop.fs.{FileSystem, FileUtil, Path, Trash} import org.apache.hadoop.io.compress.{CompressionCodecFactory, SplittableCompressionCodec} import org.apache.hadoop.security.UserGroupInformation import org.apache.hadoop.yarn.conf.YarnConfiguration @@ -269,6 +269,24 @@ private[spark] object Utils extends Logging { file.setExecutable(true, true) } + /** + * Move data to trash on truncate table given + * spark.sql.truncate.trash.interval is positive + */ + def moveToTrashIfEnabled( + fs: FileSystem, + partitionPath: Path, + trashInterval: Int, + hadoopConf: Configuration): Unit = { + if (trashInterval < 0) { + fs.delete(partitionPath, true) + } else { + logDebug(s"will move data ${partitionPath.toString} to trash") + hadoopConf.setInt("fs.trash.interval", trashInterval) + Trash.moveToAppropriateTrash(fs, partitionPath, hadoopConf) + } + } + /** * Create a directory given the abstract pathname * @return true, if the directory is successfully created; otherwise, return false. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 196971a22a44e..9308fbf64de8e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -2701,6 +2701,13 @@ object SQLConf { .booleanConf .createWithDefault(false) + val TRUNCATE_TRASH_INTERVAL = + buildConf("spark.sql.truncate.trash.interval") + .doc("This Configuration will decide whether move files to trash on truncate table" + + "If -1 files will be deleted without moving to trash") + .intConf + .createWithDefault(-1) + /** * Holds information about keys that have been deprecated. * @@ -3311,6 +3318,8 @@ class SQLConf extends Serializable with Logging { def optimizeNullAwareAntiJoin: Boolean = getConf(SQLConf.OPTIMIZE_NULL_AWARE_ANTI_JOIN) + def truncateTrashInterval: Int = getConf(SQLConf.TRUNCATE_TRASH_INTERVAL) + /** ********************** SQLConf functionality methods ************ */ /** Set Spark SQL configuration properties. */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala index 7aebdddf1d59c..2b40291b26552 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala @@ -48,6 +48,7 @@ import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetDataSourceV2 import org.apache.spark.sql.internal.{HiveSerDe, SQLConf} import org.apache.spark.sql.types._ import org.apache.spark.sql.util.SchemaUtils +import org.apache.spark.util.Utils /** * A command to create a table with the same definition of the given existing table. @@ -489,6 +490,7 @@ case class TruncateTableCommand( } val hadoopConf = spark.sessionState.newHadoopConf() val ignorePermissionAcl = SQLConf.get.truncateTableIgnorePermissionAcl + val trashInterval = SQLConf.get.truncateTrashInterval locations.foreach { location => if (location.isDefined) { val path = new Path(location.get) @@ -513,7 +515,7 @@ case class TruncateTableCommand( } } - fs.delete(path, true) + Utils.moveToTrashIfEnabled(fs, path, trashInterval, hadoopConf) // We should keep original permission/acl of the path. // For owner/group, only super-user can set it, for example on HDFS. Because