From ab5a9080be96a286c24bfb1082c6639945102683 Mon Sep 17 00:00:00 2001 From: kamcheungting-db <91572897+kamcheungting-db@users.noreply.github.com> Date: Mon, 28 Oct 2024 10:47:24 -0700 Subject: [PATCH] [Spark] Introduce Redirect WriterOnly Feature (#3813) #### Which Delta project/connector is this regarding? - [x] Spark - [ ] Standalone - [ ] Flink - [ ] Kernel - [ ] Other (fill in here) ## Description This PR introduces a new writer-only table feature "redirection". This table feature would redirect the read and write query from the current storage location to a new storage location described inside the value of table feature. ## How was this patch tested? ## Does this PR introduce _any_ user-facing changes? No. --- .../apache/spark/sql/delta/DeltaConfig.scala | 12 +++ .../apache/spark/sql/delta/TableFeature.scala | 14 ++- .../sql/delta/redirect/TableRedirect.scala | 4 +- .../spark/sql/delta/TableRedirectSuite.scala | 97 +++++++++++-------- 4 files changed, 84 insertions(+), 43 deletions(-) diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaConfig.scala b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaConfig.scala index 4445a8566c..3d35f02ca0 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaConfig.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaConfig.scala @@ -425,6 +425,18 @@ trait DeltaConfigsBase extends DeltaLogging { "A JSON representation of the TableRedirectConfiguration class, which contains all " + "information of redirect reader writer feature.") + /** + * This table feature is same as REDIRECT_READER_WRITER except it is a writer only table feature. + */ + val REDIRECT_WRITER_ONLY: DeltaConfig[Option[String]] = + buildConfig[Option[String]]( + "redirectWriterOnly-preview", + null, + v => Option(v), + _ => true, + "A JSON representation of the TableRedirectConfiguration class, which contains all " + + "information of redirect writer only feature.") + /** * Enable auto compaction for a Delta table. When enabled, we will check if files already * written to a Delta table can leverage compaction after a commit. If so, we run a post-commit diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/TableFeature.scala b/spark/src/main/scala/org/apache/spark/sql/delta/TableFeature.scala index 33f1229674..d7910f0636 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/TableFeature.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/TableFeature.scala @@ -22,7 +22,7 @@ import org.apache.spark.sql.delta.actions._ import org.apache.spark.sql.delta.catalog.DeltaTableV2 import org.apache.spark.sql.delta.constraints.{Constraints, Invariants} import org.apache.spark.sql.delta.coordinatedcommits.CoordinatedCommitsUtils -import org.apache.spark.sql.delta.redirect.RedirectReaderWriter +import org.apache.spark.sql.delta.redirect.{RedirectReaderWriter, RedirectWriterOnly} import org.apache.spark.sql.delta.schema.SchemaMergingUtils import org.apache.spark.sql.delta.schema.SchemaUtils import org.apache.spark.sql.delta.sources.DeltaSQLConf @@ -369,6 +369,7 @@ object TableFeature { if (DeltaUtils.isTesting && testingFeaturesEnabled) { features ++= Set( RedirectReaderWriterFeature, + RedirectWriterOnlyFeature, TestLegacyWriterFeature, TestLegacyReaderWriterFeature, TestWriterFeature, @@ -590,6 +591,17 @@ object RedirectReaderWriterFeature override def automaticallyUpdateProtocolOfExistingTables: Boolean = true } +object RedirectWriterOnlyFeature extends WriterFeature(name = "redirectWriterOnly-preview") + with FeatureAutomaticallyEnabledByMetadata { + override def metadataRequiresFeatureToBeEnabled( + protocol: Protocol, + metadata: Metadata, + spark: SparkSession + ): Boolean = RedirectWriterOnly.isFeatureSet(metadata) + + override def automaticallyUpdateProtocolOfExistingTables: Boolean = true +} + object VariantTypeTableFeature extends ReaderWriterFeature(name = "variantType-preview") with FeatureAutomaticallyEnabledByMetadata { override def metadataRequiresFeatureToBeEnabled( diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/redirect/TableRedirect.scala b/spark/src/main/scala/org/apache/spark/sql/delta/redirect/TableRedirect.scala index f4d2345a7c..c8ab87ec2e 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/redirect/TableRedirect.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/redirect/TableRedirect.scala @@ -177,7 +177,7 @@ case class TableRedirectConfiguration( /** * This is the main class of the table redirect that interacts with other components. */ -class TableRedirect(config: DeltaConfig[Option[String]]) { +class TableRedirect(val config: DeltaConfig[Option[String]]) { /** * Determine whether the property of table redirect feature is set. */ @@ -309,3 +309,5 @@ class TableRedirect(config: DeltaConfig[Option[String]]) { } object RedirectReaderWriter extends TableRedirect(config = DeltaConfigs.REDIRECT_READER_WRITER) + +object RedirectWriterOnly extends TableRedirect(config = DeltaConfigs.REDIRECT_WRITER_ONLY) diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/TableRedirectSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/TableRedirectSuite.scala index b12fe6530f..228792988d 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/TableRedirectSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/TableRedirectSuite.scala @@ -26,6 +26,7 @@ import org.apache.spark.sql.delta.redirect.{ RedirectReaderWriter, RedirectReady, RedirectState, + RedirectWriterOnly, TableRedirect } import org.apache.spark.sql.delta.test.{DeltaSQLCommandTest, DeltaSQLTestUtils} @@ -45,14 +46,20 @@ class TableRedirectSuite extends QueryTest private def validateState( deltaLog: DeltaLog, redirectState: RedirectState, - destTablePath: File + destTablePath: File, + feature: TableRedirect ): Unit = { val snapshot = deltaLog.update() - assert(RedirectReaderWriter.isFeatureSet(snapshot.metadata)) - val redirectConfig = RedirectReaderWriter.getRedirectConfiguration(snapshot.metadata).get - assert(snapshot.protocol.supportsReaderFeatures && snapshot.protocol.supportsWriterFeatures) - assert(snapshot.protocol.readerFeatureNames.contains(RedirectReaderWriterFeature.name)) - assert(snapshot.protocol.writerFeatureNames.contains(RedirectReaderWriterFeature.name)) + assert(feature.isFeatureSet(snapshot.metadata)) + val redirectConfig = feature.getRedirectConfiguration(snapshot.metadata).get + val protocol = snapshot.protocol + if (feature != RedirectWriterOnly) { + assert(protocol.readerFeatureNames.contains(RedirectReaderWriterFeature.name)) + assert(protocol.writerFeatureNames.contains(RedirectReaderWriterFeature.name)) + } else { + assert(!protocol.readerFeatureNames.contains(RedirectWriterOnlyFeature.name)) + assert(protocol.writerFeatureNames.contains(RedirectWriterOnlyFeature.name)) + } assert(redirectConfig.redirectState == redirectState) assert(redirectConfig.`type` == PathBasedRedirectSpec.REDIRECT_TYPE) val expectedSpecValue = s"""{"tablePath":"${destTablePath.getCanonicalPath}"}""" @@ -65,43 +72,51 @@ class TableRedirectSuite extends QueryTest val snapshot = deltaLog.update() val protocol = snapshot.protocol assert(!feature.isFeatureSet(snapshot.metadata)) - assert(protocol.supportsReaderFeatures && protocol.supportsWriterFeatures) - assert(protocol.readerFeatureNames.contains(RedirectReaderWriterFeature.name)) - assert(protocol.writerFeatureNames.contains(RedirectReaderWriterFeature.name)) + if (feature != RedirectWriterOnly) { + assert(protocol.readerFeatureNames.contains(RedirectReaderWriterFeature.name)) + assert(protocol.writerFeatureNames.contains(RedirectReaderWriterFeature.name)) + } else { + assert(!protocol.readerFeatureNames.contains(RedirectWriterOnlyFeature.name)) + assert(protocol.writerFeatureNames.contains(RedirectWriterOnlyFeature.name)) + } } - test("basic table redirect") { - withTempDir { sourceTablePath => - withTempDir { destTablePath => - val feature = RedirectReaderWriter - sql(s"CREATE external TABLE t1(c0 long, c1 long) USING delta LOCATION '$sourceTablePath';") - val catalogTable = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t1")) - val deltaLog = DeltaLog.forTable(spark, new Path(sourceTablePath.getCanonicalPath)) - assert(!feature.isFeatureSet(deltaLog.update().metadata)) - val redirectSpec = new PathBasedRedirectSpec(destTablePath.getCanonicalPath) - val catalogTableOpt = Some(catalogTable) - val redirectType = PathBasedRedirectSpec.REDIRECT_TYPE - // Step-1: Initiate table redirection and set to EnableRedirectInProgress state. - feature.add(deltaLog, catalogTableOpt, redirectType, redirectSpec) - validateState(deltaLog, EnableRedirectInProgress, destTablePath) - // Step-2: Complete table redirection and set to RedirectReady state. - feature.update(deltaLog, catalogTableOpt, RedirectReady, redirectSpec) - validateState(deltaLog, RedirectReady, destTablePath) - // Step-3: Start dropping table redirection and set to DropRedirectInProgress state. - feature.update(deltaLog, catalogTableOpt, DropRedirectInProgress, redirectSpec) - validateState(deltaLog, DropRedirectInProgress, destTablePath) - // Step-4: Finish dropping table redirection and remove the property completely. - feature.remove(deltaLog, Some(catalogTable)) - validateRemovedState(deltaLog, feature) - // Step-5: Initiate table redirection and set to EnableRedirectInProgress state one - // more time. - withTempDir { destTablePath2 => - val redirectSpec = new PathBasedRedirectSpec(destTablePath2.getCanonicalPath) - feature.add(deltaLog, catalogTableOpt, redirectType, redirectSpec) - validateState(deltaLog, EnableRedirectInProgress, destTablePath2) - // Step-6: Finish dropping table redirection and remove the property completely. - feature.remove(deltaLog, Some(catalogTable)) - validateRemovedState(deltaLog, feature) + Seq(RedirectReaderWriter, RedirectWriterOnly).foreach { feature => + test(s"basic table redirect: ${feature.config.key}") { + withTempDir { sourceTablePath => + withTempDir { destTablePath => + withTable("t1") { + sql(s"CREATE external TABLE t1(c0 long)USING delta LOCATION '$sourceTablePath';") + val catalogTable = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t1")) + val deltaLog = DeltaLog.forTable(spark, new Path(sourceTablePath.getCanonicalPath)) + val snapshot = deltaLog.update() + assert(!feature.isFeatureSet(snapshot.metadata)) + val redirectSpec = new PathBasedRedirectSpec(destTablePath.getCanonicalPath) + val catalogTableOpt = Some(catalogTable) + val redirectType = PathBasedRedirectSpec.REDIRECT_TYPE + // Step-1: Initiate table redirection and set to EnableRedirectInProgress state. + feature.add(deltaLog, catalogTableOpt, redirectType, redirectSpec) + validateState(deltaLog, EnableRedirectInProgress, destTablePath, feature) + // Step-2: Complete table redirection and set to RedirectReady state. + feature.update(deltaLog, catalogTableOpt, RedirectReady, redirectSpec) + validateState(deltaLog, RedirectReady, destTablePath, feature) + // Step-3: Start dropping table redirection and set to DropRedirectInProgress state. + feature.update(deltaLog, catalogTableOpt, DropRedirectInProgress, redirectSpec) + validateState(deltaLog, DropRedirectInProgress, destTablePath, feature) + // Step-4: Finish dropping table redirection and remove the property completely. + feature.remove(deltaLog, catalogTableOpt) + validateRemovedState(deltaLog, feature) + // Step-5: Initiate table redirection and set to EnableRedirectInProgress state one + // more time. + withTempDir { destTablePath2 => + val redirectSpec = new PathBasedRedirectSpec(destTablePath2.getCanonicalPath) + feature.add(deltaLog, catalogTableOpt, redirectType, redirectSpec) + validateState(deltaLog, EnableRedirectInProgress, destTablePath2, feature) + // Step-6: Finish dropping table redirection and remove the property completely. + feature.remove(deltaLog, catalogTableOpt) + validateRemovedState(deltaLog, feature) + } + } } } }