Skip to content

Commit

Permalink
[SPARK-32481] Support truncate table to move data to trash
Browse files Browse the repository at this point in the history
  • Loading branch information
Udbhav30 committed Aug 10, 2020
1 parent eb74d55 commit 266d0eb
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 2 deletions.
20 changes: 19 additions & 1 deletion core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ import com.google.common.net.InetAddresses
import org.apache.commons.codec.binary.Hex
import org.apache.commons.lang3.SystemUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, FileUtil, Path}
import org.apache.hadoop.fs.{FileSystem, FileUtil, Path, Trash}
import org.apache.hadoop.io.compress.{CompressionCodecFactory, SplittableCompressionCodec}
import org.apache.hadoop.security.UserGroupInformation
import org.apache.hadoop.yarn.conf.YarnConfiguration
Expand Down Expand Up @@ -269,6 +269,24 @@ private[spark] object Utils extends Logging {
file.setExecutable(true, true)
}

/**
* Move data to trash on truncate table given
* spark.sql.truncate.trash.interval is positive
*/
def moveToTrashIfEnabled(
fs: FileSystem,
partitionPath: Path,
trashInterval: Int,
hadoopConf: Configuration): Unit = {
if (trashInterval < 0) {
fs.delete(partitionPath, true)
} else {
logDebug(s"will move data ${partitionPath.toString} to trash")
hadoopConf.setInt("fs.trash.interval", trashInterval)
Trash.moveToAppropriateTrash(fs, partitionPath, hadoopConf)
}
}

/**
* Create a directory given the abstract pathname
* @return true, if the directory is successfully created; otherwise, return false.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2701,6 +2701,13 @@ object SQLConf {
.booleanConf
.createWithDefault(false)

val TRUNCATE_TRASH_INTERVAL =
buildConf("spark.sql.truncate.trash.interval")
.doc("This Configuration will decide whether move files to trash on truncate table" +
"If -1 files will be deleted without moving to trash")
.intConf
.createWithDefault(-1)

/**
* Holds information about keys that have been deprecated.
*
Expand Down Expand Up @@ -3311,6 +3318,8 @@ class SQLConf extends Serializable with Logging {
def optimizeNullAwareAntiJoin: Boolean =
getConf(SQLConf.OPTIMIZE_NULL_AWARE_ANTI_JOIN)

def truncateTrashInterval: Int = getConf(SQLConf.TRUNCATE_TRASH_INTERVAL)

/** ********************** SQLConf functionality methods ************ */

/** Set Spark SQL configuration properties. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetDataSourceV2
import org.apache.spark.sql.internal.{HiveSerDe, SQLConf}
import org.apache.spark.sql.types._
import org.apache.spark.sql.util.SchemaUtils
import org.apache.spark.util.Utils

/**
* A command to create a table with the same definition of the given existing table.
Expand Down Expand Up @@ -489,6 +490,7 @@ case class TruncateTableCommand(
}
val hadoopConf = spark.sessionState.newHadoopConf()
val ignorePermissionAcl = SQLConf.get.truncateTableIgnorePermissionAcl
val trashInterval = SQLConf.get.truncateTrashInterval
locations.foreach { location =>
if (location.isDefined) {
val path = new Path(location.get)
Expand All @@ -513,7 +515,7 @@ case class TruncateTableCommand(
}
}

fs.delete(path, true)
Utils.moveToTrashIfEnabled(fs, path, trashInterval, hadoopConf)

// We should keep original permission/acl of the path.
// For owner/group, only super-user can set it, for example on HDFS. Because
Expand Down

0 comments on commit 266d0eb

Please sign in to comment.