From 18669b7d04d69023a7f6b9251d55a3c42f3dca31 Mon Sep 17 00:00:00 2001 From: Terry Kim Date: Thu, 5 Nov 2020 11:51:15 -0800 Subject: [PATCH 1/3] initial commit --- .../sql/connector/catalog/TableCatalog.java | 23 +++++++++++++++++++ .../datasources/v2/DataSourceV2Strategy.scala | 4 ++-- .../datasources/v2/DropTableExec.scala | 9 +++++--- .../sql/connector/DataSourceV2SQLSuite.scala | 11 +++++++++ 4 files changed, 42 insertions(+), 5 deletions(-) diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java index b818515adf9c0..f95c0f26f92c2 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java @@ -162,6 +162,29 @@ Table alterTable( */ boolean dropTable(Identifier ident); + /** + * Drop a table in the catalog with an option to purge. + *

+ * If the catalog supports views and contains a view for the identifier and not a table, this + * must not drop the view and must return false. + *

+ * If the purge option is set to false, the default implementation falls back to + * {@link #dropTable(Identifier)} dropTable}. Otherwise, it throws + * {@link UnsupportedOperationException}. + * + * @param ident a table identifier + * @param purge whether a table should be purged + * @return true if a table was deleted, false if no table exists for the identifier + * + * @since 3.1.0 + */ + default boolean dropTable(Identifier ident, boolean purge) { + if (purge) { + throw new UnsupportedOperationException("Purge option is not supported."); + } + return dropTable(ident); + } + /** * Renames a table in the catalog. *

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala index 4bb58142b3d19..648929eaa33ce 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala @@ -228,8 +228,8 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat case DescribeColumn(_: ResolvedTable, _, _) => throw new AnalysisException("Describing columns is not supported for v2 tables.") - case DropTable(r: ResolvedTable, ifExists, _) => - DropTableExec(r.catalog, r.identifier, ifExists) :: Nil + case DropTable(r: ResolvedTable, ifExists, purge) => + DropTableExec(r.catalog, r.identifier, ifExists, purge) :: Nil case _: NoopDropTable => LocalTableScanExec(Nil, Nil) :: Nil diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DropTableExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DropTableExec.scala index 967613f77577c..1fd0cd177478b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DropTableExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DropTableExec.scala @@ -25,12 +25,15 @@ import org.apache.spark.sql.connector.catalog.{Identifier, TableCatalog} /** * Physical plan node for dropping a table. */ -case class DropTableExec(catalog: TableCatalog, ident: Identifier, ifExists: Boolean) - extends V2CommandExec { +case class DropTableExec( + catalog: TableCatalog, + ident: Identifier, + ifExists: Boolean, + purge: Boolean) extends V2CommandExec { override def run(): Seq[InternalRow] = { if (catalog.tableExists(ident)) { - catalog.dropTable(ident) + catalog.dropTable(ident, purge) } else if (!ifExists) { throw new NoSuchTableException(ident) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala index 893ee5f130cda..444daf8233c67 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala @@ -751,6 +751,17 @@ class DataSourceV2SQLSuite sql("DROP TABLE IF EXISTS testcat.db.notbl") } + test("DropTable: purge option") { + withTable("testcat.ns.t") { + sql("CREATE TABLE testcat.ns.t (id bigint) USING foo") + val ex = intercept[UnsupportedOperationException] { + sql ("DROP TABLE testcat.ns.t PURGE") + } + // The default TableCatalog.dropTable implementation doesn't support the purge option. + assert(ex.getMessage.contains("Purge option is not supported")) + } + } + test("SPARK-33174: DROP TABLE should resolve to a temporary view first") { withTable("testcat.ns.t") { withTempView("t") { From c63560fa351502dea9541a86ec746e22af454d26 Mon Sep 17 00:00:00 2001 From: Terry Kim Date: Thu, 5 Nov 2020 21:03:45 -0800 Subject: [PATCH 2/3] address comment --- .../apache/spark/sql/connector/catalog/TableCatalog.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java index f95c0f26f92c2..4c9cfc6b03524 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java @@ -168,9 +168,9 @@ Table alterTable( * If the catalog supports views and contains a view for the identifier and not a table, this * must not drop the view and must return false. *

- * If the purge option is set to false, the default implementation falls back to - * {@link #dropTable(Identifier)} dropTable}. Otherwise, it throws - * {@link UnsupportedOperationException}. + * If the catalog supports the option to purge a table, this method must be overwritten. + * The default implementation falls back to {@link #dropTable(Identifier)} dropTable} if the + * purge option is set to false. Otherwise, it throws {@link UnsupportedOperationException}. * * @param ident a table identifier * @param purge whether a table should be purged From b36af8de7dd8b66b70978d5292985f6a74ac926d Mon Sep 17 00:00:00 2001 From: Terry Kim Date: Thu, 5 Nov 2020 21:37:51 -0800 Subject: [PATCH 3/3] Fix comment --- .../org/apache/spark/sql/connector/catalog/TableCatalog.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java index 4c9cfc6b03524..92079d127b1e3 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java @@ -168,7 +168,7 @@ Table alterTable( * If the catalog supports views and contains a view for the identifier and not a table, this * must not drop the view and must return false. *

- * If the catalog supports the option to purge a table, this method must be overwritten. + * If the catalog supports the option to purge a table, this method must be overridden. * The default implementation falls back to {@link #dropTable(Identifier)} dropTable} if the * purge option is set to false. Otherwise, it throws {@link UnsupportedOperationException}. *