From ec1560af251d2c3580f5bccfabc750f1c7af09df Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Wed, 23 Dec 2020 11:47:13 +0900 Subject: [PATCH] [SPARK-33364][SQL][FOLLOWUP] Refine the catalog v2 API to purge a table ### What changes were proposed in this pull request? This is a followup of https://github.com/apache/spark/pull/30267 Inspired by https://github.com/apache/spark/pull/30886, it's better to have 2 methods `def dropTable` and `def purgeTable`, than `def dropTable(ident)` and `def dropTable(ident, purge)`. ### Why are the changes needed? 1. make the APIs orthogonal. Previously, `def dropTable(ident, purge)` calls `def dropTable(ident)` and is a superset. 2. simplifies the catalog implementation a little bit. Now the `if (purge) ... else ...` check is done at the Spark side. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? existing tests Closes #30890 from cloud-fan/purgeTable. Authored-by: Wenchen Fan Signed-off-by: HyukjinKwon --- .../catalog/DelegatingCatalogExtension.java | 5 +++++ .../sql/connector/catalog/TableCatalog.java | 17 +++++++---------- .../connector/catalog/TableCatalogSuite.scala | 5 +++++ .../datasources/v2/DropTableExec.scala | 2 +- .../execution/command/v2/DropTableSuite.scala | 4 ++-- 5 files changed, 20 insertions(+), 13 deletions(-) diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/DelegatingCatalogExtension.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/DelegatingCatalogExtension.java index d07d299d65a58..34f07b12b3666 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/DelegatingCatalogExtension.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/DelegatingCatalogExtension.java @@ -99,6 +99,11 @@ public boolean dropTable(Identifier ident) { return asTableCatalog().dropTable(ident); } + @Override + public boolean purgeTable(Identifier ident) { + return asTableCatalog().purgeTable(ident); + } + @Override public void renameTable( Identifier oldIdent, diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java index 52a74ab9dd9f5..4163d86bcc54b 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java @@ -173,26 +173,23 @@ Table alterTable( boolean dropTable(Identifier ident); /** - * Drop a table in the catalog with an option to purge. + * Drop a table in the catalog and completely remove its data by skipping a trash even if it is + * supported. *

* If the catalog supports views and contains a view for the identifier and not a table, this * must not drop the view and must return false. *

- * If the catalog supports the option to purge a table, this method must be overridden. - * The default implementation falls back to {@link #dropTable(Identifier)} dropTable} if the - * purge option is set to false. Otherwise, it throws {@link UnsupportedOperationException}. + * If the catalog supports to purge a table, this method should be overridden. + * The default implementation throws {@link UnsupportedOperationException}. * * @param ident a table identifier - * @param purge whether a table should be purged * @return true if a table was deleted, false if no table exists for the identifier + * @throws UnsupportedOperationException If table purging is not supported * * @since 3.1.0 */ - default boolean dropTable(Identifier ident, boolean purge) { - if (purge) { - throw new UnsupportedOperationException("Purge option is not supported."); - } - return dropTable(ident); + default boolean purgeTable(Identifier ident) throws UnsupportedOperationException { + throw new UnsupportedOperationException("Purge table is not supported."); } /** diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/TableCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/TableCatalogSuite.scala index dab20911bbdc7..ef342e7ec5539 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/TableCatalogSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/TableCatalogSuite.scala @@ -643,6 +643,11 @@ class TableCatalogSuite extends SparkFunSuite { assert(!catalog.tableExists(testIdent)) } + test("purgeTable") { + val catalog = newCatalog() + intercept[UnsupportedOperationException](catalog.purgeTable(testIdent)) + } + test("renameTable") { val catalog = newCatalog() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DropTableExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DropTableExec.scala index f89b89096772a..100eaf9021863 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DropTableExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DropTableExec.scala @@ -35,7 +35,7 @@ case class DropTableExec( override def run(): Seq[InternalRow] = { if (catalog.tableExists(ident)) { invalidateCache() - catalog.dropTable(ident, purge) + if (purge) catalog.purgeTable(ident) else catalog.dropTable(ident) } else if (!ifExists) { throw new NoSuchTableException(ident) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/DropTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/DropTableSuite.scala index 16283d5ad6644..a272f649288f6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/DropTableSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/DropTableSuite.scala @@ -29,8 +29,8 @@ class DropTableSuite extends command.DropTableSuiteBase with CommandSuiteBase { val errMsg = intercept[UnsupportedOperationException] { sql(s"DROP TABLE $catalog.ns.tbl PURGE") }.getMessage - // The default TableCatalog.dropTable implementation doesn't support the purge option. - assert(errMsg.contains("Purge option is not supported")) + // The default TableCatalog.purgeTable implementation throws an exception. + assert(errMsg.contains("Purge table is not supported")) } }