Skip to content

Commit

Permalink
Revert "[SPARK-32481][CORE][SQL] Support truncate table to move data …
Browse files Browse the repository at this point in the history
…to trash"

### What changes were proposed in this pull request?

This reverts commit 065f173, which is not part of any released version. That is, this is an unreleased feature

### Why are the changes needed?

I like the concept of Trash, but I think this PR might just resolve a very specific issue by introducing a mechanism without a proper design doc. This could make the usage more complex.

I think we need to consider the big picture. Trash directory is an important concept. If we decide to introduce it, we should consider all the code paths of Spark SQL that could delete the data, instead of Truncate only. We also need to consider what is the current behavior if the underlying file system does not provide the API `Trash.moveToAppropriateTrash`. Is the exception good? How about the performance when users are using the object store instead of HDFS? Will it impact the GDPR compliance?

In sum, I think we should not merge the PR #29552 without the design doc and implementation plan. That is why I reverted it before the code freeze of Spark 3.1

### Does this PR introduce _any_ user-facing change?
Reverted the original commit

### How was this patch tested?
The existing tests.

Closes #30463 from gatorsmile/revertSpark-32481.

Authored-by: Xiao Li <[email protected]>
Signed-off-by: HyukjinKwon <[email protected]>
  • Loading branch information
gatorsmile authored and HyukjinKwon committed Nov 23, 2020
1 parent 84e7036 commit c891e02
Show file tree
Hide file tree
Showing 4 changed files with 2 additions and 119 deletions.
25 changes: 1 addition & 24 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ import com.google.common.net.InetAddresses
import org.apache.commons.codec.binary.Hex
import org.apache.commons.lang3.SystemUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, FileUtil, Path, Trash}
import org.apache.hadoop.fs.{FileSystem, FileUtil, Path}
import org.apache.hadoop.io.compress.{CompressionCodecFactory, SplittableCompressionCodec}
import org.apache.hadoop.security.UserGroupInformation
import org.apache.hadoop.yarn.conf.YarnConfiguration
Expand Down Expand Up @@ -269,29 +269,6 @@ private[spark] object Utils extends Logging {
file.setExecutable(true, true)
}

/**
* Move data to trash if 'spark.sql.truncate.trash.enabled' is true, else
* delete the data permanently. If move data to trash failed fallback to hard deletion.
*/
def moveToTrashOrDelete(
fs: FileSystem,
partitionPath: Path,
isTrashEnabled: Boolean,
hadoopConf: Configuration): Boolean = {
if (isTrashEnabled) {
logDebug(s"Try to move data ${partitionPath.toString} to trash")
val isSuccess = Trash.moveToAppropriateTrash(fs, partitionPath, hadoopConf)
if (!isSuccess) {
logWarning(s"Failed to move data ${partitionPath.toString} to trash. " +
"Fallback to hard deletion")
return fs.delete(partitionPath, true)
}
isSuccess
} else {
fs.delete(partitionPath, true)
}
}

/**
* Create a directory given the abstract pathname
* @return true, if the directory is successfully created; otherwise, return false.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2913,18 +2913,6 @@ object SQLConf {
.booleanConf
.createWithDefault(false)

val TRUNCATE_TRASH_ENABLED =
buildConf("spark.sql.truncate.trash.enabled")
.doc("This configuration decides when truncating table, whether data files will be moved " +
"to trash directory or deleted permanently. The trash retention time is controlled by " +
"'fs.trash.interval', and in default, the server side configuration value takes " +
"precedence over the client-side one. Note that if 'fs.trash.interval' is non-positive, " +
"this will be a no-op and log a warning message. If the data fails to be moved to " +
"trash, Spark will turn to delete it permanently.")
.version("3.1.0")
.booleanConf
.createWithDefault(false)

val DISABLED_JDBC_CONN_PROVIDER_LIST =
buildConf("spark.sql.sources.disabledJdbcConnProviderList")
.internal()
Expand Down Expand Up @@ -3577,8 +3565,6 @@ class SQLConf extends Serializable with Logging {

def legacyPathOptionBehavior: Boolean = getConf(SQLConf.LEGACY_PATH_OPTION_BEHAVIOR)

def truncateTrashEnabled: Boolean = getConf(SQLConf.TRUNCATE_TRASH_ENABLED)

def disabledJdbcConnectionProviders: String = getConf(SQLConf.DISABLED_JDBC_CONN_PROVIDER_LIST)

/** ********************** SQLConf functionality methods ************ */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetDataSourceV2
import org.apache.spark.sql.internal.{HiveSerDe, SQLConf}
import org.apache.spark.sql.types._
import org.apache.spark.sql.util.SchemaUtils
import org.apache.spark.util.Utils

/**
* A command to create a table with the same definition of the given existing table.
Expand Down Expand Up @@ -490,7 +489,6 @@ case class TruncateTableCommand(
}
val hadoopConf = spark.sessionState.newHadoopConf()
val ignorePermissionAcl = SQLConf.get.truncateTableIgnorePermissionAcl
val isTrashEnabled = SQLConf.get.truncateTrashEnabled
locations.foreach { location =>
if (location.isDefined) {
val path = new Path(location.get)
Expand All @@ -515,7 +513,7 @@ case class TruncateTableCommand(
}
}

Utils.moveToTrashOrDelete(fs, path, isTrashEnabled, hadoopConf)
fs.delete(path, true)

// We should keep original permission/acl of the path.
// For owner/group, only super-user can set it, for example on HDFS. Because
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3104,84 +3104,6 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
assert(spark.sessionState.catalog.isRegisteredFunction(rand))
}
}

test("SPARK-32481 Move data to trash on truncate table if enabled") {
val trashIntervalKey = "fs.trash.interval"
withTable("tab1") {
withSQLConf(SQLConf.TRUNCATE_TRASH_ENABLED.key -> "true") {
sql("CREATE TABLE tab1 (col INT) USING parquet")
sql("INSERT INTO tab1 SELECT 1")
// scalastyle:off hadoopconfiguration
val hadoopConf = spark.sparkContext.hadoopConfiguration
// scalastyle:on hadoopconfiguration
val originalValue = hadoopConf.get(trashIntervalKey, "0")
val tablePath = new Path(spark.sessionState.catalog
.getTableMetadata(TableIdentifier("tab1")).storage.locationUri.get)

val fs = tablePath.getFileSystem(hadoopConf)
val trashCurrent = new Path(fs.getHomeDirectory, ".Trash/Current")
val trashPath = Path.mergePaths(trashCurrent, tablePath)
assume(
fs.mkdirs(trashPath) && fs.delete(trashPath, false),
"Trash directory could not be created, skipping.")
assert(!fs.exists(trashPath))
try {
hadoopConf.set(trashIntervalKey, "5")
sql("TRUNCATE TABLE tab1")
} finally {
hadoopConf.set(trashIntervalKey, originalValue)
}
assert(fs.exists(trashPath))
fs.delete(trashPath, true)
}
}
}

test("SPARK-32481 delete data permanently on truncate table if trash interval is non-positive") {
val trashIntervalKey = "fs.trash.interval"
withTable("tab1") {
withSQLConf(SQLConf.TRUNCATE_TRASH_ENABLED.key -> "true") {
sql("CREATE TABLE tab1 (col INT) USING parquet")
sql("INSERT INTO tab1 SELECT 1")
// scalastyle:off hadoopconfiguration
val hadoopConf = spark.sparkContext.hadoopConfiguration
// scalastyle:on hadoopconfiguration
val originalValue = hadoopConf.get(trashIntervalKey, "0")
val tablePath = new Path(spark.sessionState.catalog
.getTableMetadata(TableIdentifier("tab1")).storage.locationUri.get)

val fs = tablePath.getFileSystem(hadoopConf)
val trashCurrent = new Path(fs.getHomeDirectory, ".Trash/Current")
val trashPath = Path.mergePaths(trashCurrent, tablePath)
assert(!fs.exists(trashPath))
try {
hadoopConf.set(trashIntervalKey, "0")
sql("TRUNCATE TABLE tab1")
} finally {
hadoopConf.set(trashIntervalKey, originalValue)
}
assert(!fs.exists(trashPath))
}
}
}

test("SPARK-32481 Do not move data to trash on truncate table if disabled") {
withTable("tab1") {
withSQLConf(SQLConf.TRUNCATE_TRASH_ENABLED.key -> "false") {
sql("CREATE TABLE tab1 (col INT) USING parquet")
sql("INSERT INTO tab1 SELECT 1")
val hadoopConf = spark.sessionState.newHadoopConf()
val tablePath = new Path(spark.sessionState.catalog
.getTableMetadata(TableIdentifier("tab1")).storage.locationUri.get)

val fs = tablePath.getFileSystem(hadoopConf)
val trashCurrent = new Path(fs.getHomeDirectory, ".Trash/Current")
val trashPath = Path.mergePaths(trashCurrent, tablePath)
sql("TRUNCATE TABLE tab1")
assert(!fs.exists(trashPath))
}
}
}
}

object FakeLocalFsFileSystem {
Expand Down

0 comments on commit c891e02

Please sign in to comment.