Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-33364][SQL] Introduce the "purge" option in TableCatalog.dropTable for v2 catalog. #30267

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,29 @@ Table alterTable(
*/
boolean dropTable(Identifier ident);

/**
* Drop a table in the catalog with an option to purge.
* <p>
* If the catalog supports views and contains a view for the identifier and not a table, this
* must not drop the view and must return false.
* <p>
* If the catalog supports the option to purge a table, this method must be overridden.
* The default implementation falls back to {@link #dropTable(Identifier)} dropTable} if the
* purge option is set to false. Otherwise, it throws {@link UnsupportedOperationException}.
*
* @param ident a table identifier
* @param purge whether a table should be purged
* @return true if a table was deleted, false if no table exists for the identifier
*
* @since 3.1.0
*/
default boolean dropTable(Identifier ident, boolean purge) {
if (purge) {
throw new UnsupportedOperationException("Purge option is not supported.");
}
return dropTable(ident);
}

/**
* Renames a table in the catalog.
* <p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,8 +228,8 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
case DescribeColumn(_: ResolvedTable, _, _) =>
throw new AnalysisException("Describing columns is not supported for v2 tables.")

case DropTable(r: ResolvedTable, ifExists, _) =>
DropTableExec(r.catalog, r.identifier, ifExists) :: Nil
case DropTable(r: ResolvedTable, ifExists, purge) =>
DropTableExec(r.catalog, r.identifier, ifExists, purge) :: Nil

case _: NoopDropTable =>
LocalTableScanExec(Nil, Nil) :: Nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,15 @@ import org.apache.spark.sql.connector.catalog.{Identifier, TableCatalog}
/**
* Physical plan node for dropping a table.
*/
case class DropTableExec(catalog: TableCatalog, ident: Identifier, ifExists: Boolean)
extends V2CommandExec {
case class DropTableExec(
catalog: TableCatalog,
ident: Identifier,
ifExists: Boolean,
purge: Boolean) extends V2CommandExec {

override def run(): Seq[InternalRow] = {
if (catalog.tableExists(ident)) {
catalog.dropTable(ident)
catalog.dropTable(ident, purge)
} else if (!ifExists) {
throw new NoSuchTableException(ident)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -751,6 +751,17 @@ class DataSourceV2SQLSuite
sql("DROP TABLE IF EXISTS testcat.db.notbl")
}

test("DropTable: purge option") {
withTable("testcat.ns.t") {
sql("CREATE TABLE testcat.ns.t (id bigint) USING foo")
val ex = intercept[UnsupportedOperationException] {
sql ("DROP TABLE testcat.ns.t PURGE")
}
// The default TableCatalog.dropTable implementation doesn't support the purge option.
assert(ex.getMessage.contains("Purge option is not supported"))
}
}

test("SPARK-33174: DROP TABLE should resolve to a temporary view first") {
withTable("testcat.ns.t") {
withTempView("t") {
Expand Down