From 2e197f130765d91f201b6b649f30190a44304b29 Mon Sep 17 00:00:00 2001 From: Sumeet Varma Date: Wed, 13 Mar 2024 08:10:32 -0700 Subject: [PATCH] [Spark]Add VacuumProtocolCheck ReaderWriter Table Feature (#2730) ## Description Add a new VacuumProtocolCheck ReaderWriter Table Feature so that Vacuum command on older DBR client and OSS clients fail. This is in follow-up to https://github.com/delta-io/delta/pull/2557 where protocol-check was added during the vacuum-write flow. ## How was this patch tested? UTs ## Does this PR introduce _any_ user-facing changes? No --- .../apache/spark/sql/delta/TableFeature.scala | 16 ++++++++++- .../sql/delta/DeltaTableFeatureSuite.scala | 27 +++++++++++++++++++ 2 files changed, 42 insertions(+), 1 deletion(-) diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/TableFeature.scala b/spark/src/main/scala/org/apache/spark/sql/delta/TableFeature.scala index 91d32f4e0b5..8acb32d536e 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/TableFeature.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/TableFeature.scala @@ -357,7 +357,8 @@ object TableFeature { // Row IDs are still under development and only available in testing. RowTrackingFeature, InCommitTimestampTableFeature, - TypeWideningTableFeature) + TypeWideningTableFeature, + VacuumProtocolCheckTableFeature) } val featureMap = features.map(f => f.name.toLowerCase(Locale.ROOT) -> f).toMap require(features.size == featureMap.size, "Lowercase feature names must not duplicate.") @@ -656,6 +657,19 @@ object InCommitTimestampTableFeature } } +/** + * A ReaderWriter table feature for VACUUM. If this feature is enabled: + * A writer should follow one of the following: + * 1. Non-Support for Vacuum: Writers can explicitly state that they do not support VACUUM for + * any table, regardless of whether the Vacuum Protocol Check Table feature exists. + * 2. Implement Writer Protocol Check: Ensure that the VACUUM implementation includes a writer + * protocol check before any file deletions occur. + * Readers don't need to understand or change anything new; they just need to acknowledge the + * feature exists + */ +object VacuumProtocolCheckTableFeature + extends ReaderWriterFeature(name = "vacuumProtocolCheck-dev") + /** * Features below are for testing only, and are being registered to the system only in the testing * environment. See [[TableFeature.allSupportedFeaturesMap]] for the registration. diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTableFeatureSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTableFeatureSuite.scala index 46f31d151d0..17067000616 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTableFeatureSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTableFeatureSuite.scala @@ -459,6 +459,33 @@ class DeltaTableFeatureSuite } } + for(commandName <- Seq("ALTER", "CLONE", "REPLACE", "CREATE OR REPLACE")) { + test(s"Vacuum Protocol Check is disabled by default but can be enabled during $commandName") { + val table = "tbl" + withTable(table) { + spark.range(0).write.format("delta").saveAsTable(table) + val log = DeltaLog.forTable(spark, TableIdentifier(table)) + val protocol = log.update().protocol + assert(!protocol.readerAndWriterFeatureNames.contains(VacuumProtocolCheckTableFeature.name)) + + val tblProperties1 = Seq(s"'delta.minWriterVersion' = $TABLE_FEATURES_MIN_WRITER_VERSION") + sql(buildTablePropertyModifyingCommand( + commandName, targetTableName = table, sourceTableName = table, tblProperties1)) + val newProtocol1 = log.update().protocol + assert(!newProtocol1.readerAndWriterFeatureNames.contains( + VacuumProtocolCheckTableFeature.name)) + + val tblProperties2 = Seq(s"'$FEATURE_PROP_PREFIX${VacuumProtocolCheckTableFeature.name}' " + + s"= 'supported', 'delta.minWriterVersion' = $TABLE_FEATURES_MIN_WRITER_VERSION") + sql(buildTablePropertyModifyingCommand( + commandName, targetTableName = table, sourceTableName = table, tblProperties2)) + val newProtocol2 = log.update().protocol + assert(newProtocol2.readerAndWriterFeatureNames.contains( + VacuumProtocolCheckTableFeature.name)) + } + } + } + private def buildTablePropertyModifyingCommand( commandName: String, targetTableName: String,