diff --git a/spark/src/main/resources/error/delta-error-classes.json b/spark/src/main/resources/error/delta-error-classes.json index fbde234af1..4a0e54be6e 100644 --- a/spark/src/main/resources/error/delta-error-classes.json +++ b/spark/src/main/resources/error/delta-error-classes.json @@ -516,6 +516,12 @@ ], "sqlState" : "2200G" }, + "DELTA_COMMIT_INTERMEDIATE_REDIRECT_STATE" : { + "message" : [ + "Cannot handle commit of table within redirect table state ''." + ], + "sqlState" : "42P01" + }, "DELTA_COMPLEX_TYPE_COLUMN_CONTAINS_NULL_TYPE" : { "message" : [ " Found nested NullType in column which is of . Delta doesn't support writing NullType in complex types." @@ -1890,6 +1896,13 @@ ], "sqlState" : "KD006" }, + "DELTA_NO_REDIRECT_RULES_VIOLATED" : { + "message" : [ + "Operation not allowed: cannot be performed on a table with redirect feature.", + "The no redirect rules are not satisfied ." + ], + "sqlState" : "42P01" + }, "DELTA_NO_RELATION_TABLE" : { "message" : [ "Table not found" diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala index 9dd2323ca6..82fa0028b5 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala @@ -31,6 +31,7 @@ import org.apache.spark.sql.delta.constraints.Constraints import org.apache.spark.sql.delta.hooks.AutoCompactType import org.apache.spark.sql.delta.hooks.PostCommitHook import org.apache.spark.sql.delta.metering.DeltaLogging +import org.apache.spark.sql.delta.redirect.NoRedirectRule import org.apache.spark.sql.delta.redirect.RedirectState import org.apache.spark.sql.delta.schema.{DeltaInvariantViolationException, InvariantViolationException, SchemaUtils, UnsupportedDataTypeInfo} import org.apache.spark.sql.delta.sources.DeltaSQLConf @@ -362,6 +363,23 @@ trait DeltaErrorsBase ) } + def invalidCommitIntermediateRedirectState(state: RedirectState): Throwable = { + throw new DeltaIllegalStateException ( + errorClass = "DELTA_COMMIT_INTERMEDIATE_REDIRECT_STATE", + messageParameters = Array(state.name) + ) + } + + def noRedirectRulesViolated( + op: DeltaOperations.Operation, + noRedirectRules: Set[NoRedirectRule]): Throwable = { + throw new DeltaIllegalStateException ( + errorClass = "DELTA_NO_REDIRECT_RULES_VIOLATED", + messageParameters = + Array(op.name, noRedirectRules.map("\"" + _ + "\"").mkString("[", ",\n", "]")) + ) + } + def incorrectLogStoreImplementationException( sparkConf: SparkConf, cause: Throwable): Throwable = { diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaOperations.scala b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaOperations.scala index 1ad915555e..54ac20ebbd 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaOperations.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaOperations.scala @@ -735,7 +735,7 @@ object DeltaOperations { case class VacuumStart( retentionCheckEnabled: Boolean, specifiedRetentionMillis: Option[Long], - defaultRetentionMillis: Long) extends Operation("VACUUM START") { + defaultRetentionMillis: Long) extends Operation(VacuumStart.OPERATION_NAME) { override val parameters: Map[String, Any] = Map( "retentionCheckEnabled" -> retentionCheckEnabled, "defaultRetentionMillis" -> defaultRetentionMillis @@ -749,10 +749,14 @@ object DeltaOperations { override val isInPlaceFileMetadataUpdate: Option[Boolean] = Some(false) } + object VacuumStart { + val OPERATION_NAME = "VACUUM START" + } + /** * @param status - whether the vacuum operation was successful; either "COMPLETED" or "FAILED" */ - case class VacuumEnd(status: String) extends Operation(s"VACUUM END") { + case class VacuumEnd(status: String) extends Operation(VacuumEnd.OPERATION_NAME) { override val parameters: Map[String, Any] = Map( "status" -> status ) @@ -765,6 +769,10 @@ object DeltaOperations { override val isInPlaceFileMetadataUpdate: Option[Boolean] = Some(false) } + object VacuumEnd { + val OPERATION_NAME = "VACUUM END" + } + /** Recorded when running REORG on the table. */ case class Reorg( predicate: Seq[Expression], diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala b/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala index 6463630010..340848ec40 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala @@ -40,6 +40,7 @@ import org.apache.spark.sql.delta.hooks.ChecksumHook import org.apache.spark.sql.delta.implicits.addFileEncoder import org.apache.spark.sql.delta.logging.DeltaLogKeys import org.apache.spark.sql.delta.metering.DeltaLogging +import org.apache.spark.sql.delta.redirect.{RedirectFeature, TableRedirectConfiguration} import org.apache.spark.sql.delta.schema.{SchemaMergingUtils, SchemaUtils} import org.apache.spark.sql.delta.sources.{DeltaSourceUtils, DeltaSQLConf} import org.apache.spark.sql.delta.stats._ @@ -55,6 +56,7 @@ import org.apache.spark.internal.{MDC, MessageWithContext} import org.apache.spark.sql.{AnalysisException, Column, DataFrame, SparkSession} import org.apache.spark.sql.catalyst.catalog.CatalogTable import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.logical.UnsetTableProperties import org.apache.spark.sql.catalyst.types.DataTypeUtils.toAttributes import org.apache.spark.sql.catalyst.util.{CharVarcharUtils, ResolveDefaultColumns} import org.apache.spark.sql.types.{StructField, StructType} @@ -1246,6 +1248,56 @@ trait OptimisticTransactionImpl extends TransactionalWrite } } + /** + * This method goes through all no-redirect-rules inside redirect feature to determine + * whether the current operation is valid to run on this table. + */ + private def performNoRedirectRulesCheck( + op: DeltaOperations.Operation, + redirectConfig: TableRedirectConfiguration + ): Unit = { + // Find all rules that match with the current application name. + // If appName is not present, its no-redirect-rule are included. + // If appName is present, includes its no-redirect-rule only when appName + // matches with "spark.app.name". + val rulesOfMatchedApps = redirectConfig.noRedirectRules.filter { rule => + rule.appName.forall(_.equalsIgnoreCase(spark.conf.get("spark.app.name"))) + } + // Determine whether any rule is satisfied the given operation. + val noRuleSatisfied = !rulesOfMatchedApps.exists(_.allowedOperations.contains(op.name)) + // If there is no rule satisfied, block the given operation. + if (noRuleSatisfied) { + throw DeltaErrors.noRedirectRulesViolated(op, redirectConfig.noRedirectRules) + } + } + + /** + * This method determines whether `op` is valid when the table redirect feature is + * set on current table. + * 1. If redirect table feature is in progress state, no DML/DDL is allowed to execute. + * 2. If user tries to access redirect source table, only the allowed operations listed + * inside no-redirect-rules are valid. + */ + protected def performRedirectCheck(op: DeltaOperations.Operation): Unit = { + // If redirect conflict check is not enable, skips all remaining validations. + if (spark.conf.get(DeltaSQLConf.SKIP_REDIRECT_FEATURE)) return + // If redirect feature is not set, then skips validation. + if (!RedirectFeature.isFeatureSupported(snapshot)) return + // If this transaction tried to unset redirect feature, then skips validation. + if (RedirectFeature.isUpdateProperty(snapshot, op)) return + // Get the redirect configuration from current snapshot. + val redirectConfigOpt = RedirectFeature.getRedirectConfiguration(snapshot) + redirectConfigOpt.foreach { redirectConfig => + // If the redirect state is in EnableRedirectInProgress or DropRedirectInProgress, + // all DML and DDL operation should be aborted. + if (redirectConfig.isInProgressState) { + throw DeltaErrors.invalidCommitIntermediateRedirectState(redirectConfig.redirectState) + } + // Validates the no redirect rules on the transactions that access redirect source table. + performNoRedirectRulesCheck(op, redirectConfig) + } + } + @throws(classOf[ConcurrentModificationException]) protected def commitImpl( actions: Seq[Action], @@ -1255,6 +1307,9 @@ trait OptimisticTransactionImpl extends TransactionalWrite commitStartNano = System.nanoTime() val (version, postCommitSnapshot, actualCommittedActions) = try { + // Check for satisfaction of no redirect rules + performRedirectCheck(op) + // Check for CDC metadata columns performCdcMetadataCheck() diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/redirect/TableRedirect.scala b/spark/src/main/scala/org/apache/spark/sql/delta/redirect/TableRedirect.scala index c8ab87ec2e..4de0a92e21 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/redirect/TableRedirect.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/redirect/TableRedirect.scala @@ -18,7 +18,16 @@ package org.apache.spark.sql.delta.redirect import scala.reflect.ClassTag -import org.apache.spark.sql.delta.{DeltaConfig, DeltaConfigs, DeltaErrors, DeltaLog, DeltaOperations} +import org.apache.spark.sql.delta.{ + DeltaConfig, + DeltaConfigs, + DeltaErrors, + DeltaLog, + DeltaOperations, + RedirectReaderWriterFeature, + RedirectWriterOnlyFeature, + Snapshot +} import org.apache.spark.sql.delta.actions.Metadata import org.apache.spark.sql.delta.util.JsonUtils import com.fasterxml.jackson.annotation.JsonIgnore @@ -140,13 +149,41 @@ object RedirectSpec { } } +/** + * This class defines the rule of allowing transaction to access redirect source table. + * @param appName The application name that is allowed to commit transaction defined inside + * the `allowedOperations` set. If a rules' appName is empty, then all application + * should fulfill its `allowedOperations`. + * @param allowedOperations The set of operation names that are allowed to commit on the + * redirect source table. + * The example of usage of NoRedirectRule. + * { + * "type": "PathBasedRedirect", + * "state": "REDIRECT-READY", + * "spec": { + * "tablePath": "s3:///tables/" + * }, + * "noRedirectRules": [ + * {"allowedOperations": ["Write", "Delete", "Refresh"] }, + * {"appName": "maintenance-job", "allowedOperations": ["Refresh"] } + * ] + * } + */ +case class NoRedirectRule( + @JsonProperty("appName") + appName: Option[String], + @JsonProperty("allowedOperations") + allowedOperations: Set[String] +) + /** * This class stores all values defined inside table redirection property. * @param type: The type of redirection. * @param state: The current state of the redirection: * ENABLE-REDIRECT-IN-PROGRESS, REDIRECT-READY, DROP-REDIRECT-IN-PROGRESS. * @param specValue: The specification of accessing redirect destination table. - * + * @param noRedirectRules: The set of rules that applications should fulfill to access + * redirect source table. * This class would be serialized into a JSON string during commit. One example of its JSON * presentation is: * PathBasedRedirect: @@ -155,16 +192,23 @@ object RedirectSpec { * "state": "DROP-REDIRECT-IN-PROGRESS", * "spec": { * "tablePath": "s3:///tables/" - * } + * }, + * "noRedirectRules": [ + * {"allowedOperations": ["Write", "Refresh"] }, + * {"appName": "maintenance-job", "allowedOperations": ["Refresh"] } + * ] * } */ case class TableRedirectConfiguration( `type`: String, state: String, @JsonProperty("spec") - specValue: String) { + specValue: String, + @JsonProperty("noRedirectRules") + noRedirectRules: Set[NoRedirectRule] = Set.empty) { @JsonIgnore val spec: RedirectSpec = RedirectSpec.getDeserializeModule(`type`).deserialize(specValue) + @JsonIgnore val redirectState: RedirectState = state match { case EnableRedirectInProgress.name => EnableRedirectInProgress @@ -172,6 +216,11 @@ case class TableRedirectConfiguration( case DropRedirectInProgress.name => DropRedirectInProgress case _ => throw new IllegalArgumentException(s"Unrecognizable Table Redirect State: $state") } + + @JsonIgnore + val isInProgressState: Boolean = { + redirectState == EnableRedirectInProgress || redirectState == DropRedirectInProgress + } } /** @@ -199,15 +248,17 @@ class TableRedirect(val config: DeltaConfig[Option[String]]) { * Generate the key-value pair of the table redirect property. Its key is the table redirect * property name and its name is the JSON string of TableRedirectConfiguration. */ - private def generateRedirectMetadata( + def generateRedirectMetadata( redirectType: String, state: RedirectState, - redirectSpec: RedirectSpec + redirectSpec: RedirectSpec, + noRedirectRules: Set[NoRedirectRule] ): Map[String, String] = { val redirectConfiguration = TableRedirectConfiguration( redirectType, state.name, - JsonUtils.toJson(redirectSpec) + JsonUtils.toJson(redirectSpec), + noRedirectRules ) val redirectJson = JsonUtils.toJson(redirectConfiguration) Map(config.key -> redirectJson) @@ -227,7 +278,8 @@ class TableRedirect(val config: DeltaConfig[Option[String]]) { deltaLog: DeltaLog, catalogTableOpt: Option[CatalogTable], state: RedirectState, - spec: RedirectSpec + spec: RedirectSpec, + noRedirectRules: Set[NoRedirectRule] = Set.empty[NoRedirectRule] ): Unit = { val txn = deltaLog.startTransaction(catalogTableOpt) val deltaMetadata = txn.snapshot.metadata @@ -239,19 +291,20 @@ class TableRedirect(val config: DeltaConfig[Option[String]]) { } val currentConfig = currentConfigOpt.get + val redirectState = currentConfig.redirectState state match { case RedirectReady => - if (currentConfig.redirectState != EnableRedirectInProgress) { - DeltaErrors.invalidRedirectStateTransition(tableIdent, currentConfig.redirectState, state) + if (redirectState != EnableRedirectInProgress && redirectState != RedirectReady) { + DeltaErrors.invalidRedirectStateTransition(tableIdent, redirectState, state) } case DropRedirectInProgress => - if (currentConfig.redirectState != RedirectReady) { - DeltaErrors.invalidRedirectStateTransition(tableIdent, currentConfig.redirectState, state) + if (redirectState != RedirectReady) { + DeltaErrors.invalidRedirectStateTransition(tableIdent, redirectState, state) } case _ => - DeltaErrors.invalidRedirectStateTransition(tableIdent, currentConfig.redirectState, state) + DeltaErrors.invalidRedirectStateTransition(tableIdent, redirectState, state) } - val properties = generateRedirectMetadata(currentConfig.`type`, state, spec) + val properties = generateRedirectMetadata(currentConfig.`type`, state, spec, noRedirectRules) val newConfigs = txn.metadata.configuration ++ properties val newMetadata = txn.metadata.copy(configuration = newConfigs) txn.updateMetadata(newMetadata) @@ -272,7 +325,8 @@ class TableRedirect(val config: DeltaConfig[Option[String]]) { deltaLog: DeltaLog, catalogTableOpt: Option[CatalogTable], redirectType: String, - spec: RedirectSpec + spec: RedirectSpec, + noRedirectRules: Set[NoRedirectRule] = Set.empty[NoRedirectRule] ): Unit = { val txn = deltaLog.startTransaction(catalogTableOpt) val snapshot = txn.snapshot @@ -283,7 +337,12 @@ class TableRedirect(val config: DeltaConfig[Option[String]]) { EnableRedirectInProgress ) } - val properties = generateRedirectMetadata(redirectType, EnableRedirectInProgress, spec) + val properties = generateRedirectMetadata( + redirectType, + EnableRedirectInProgress, + spec, + noRedirectRules + ) val newConfigs = txn.metadata.configuration ++ properties val newMetadata = txn.metadata.copy(configuration = newConfigs) txn.updateMetadata(newMetadata) @@ -308,6 +367,62 @@ class TableRedirect(val config: DeltaConfig[Option[String]]) { } } -object RedirectReaderWriter extends TableRedirect(config = DeltaConfigs.REDIRECT_READER_WRITER) +object RedirectReaderWriter extends TableRedirect(config = DeltaConfigs.REDIRECT_READER_WRITER) { + /** True if `snapshot` enables redirect-reader-writer feature. */ + def isFeatureSupported(snapshot: Snapshot): Boolean = { + snapshot.protocol.isFeatureSupported(RedirectReaderWriterFeature) + } + + /** True if the update property command tries to set/unset redirect-reader-writer feature. */ + def isUpdateProperty(snapshot: Snapshot, propKeys: Seq[String]): Boolean = { + propKeys.contains(DeltaConfigs.REDIRECT_READER_WRITER.key) && isFeatureSupported(snapshot) + } +} + +object RedirectWriterOnly extends TableRedirect(config = DeltaConfigs.REDIRECT_WRITER_ONLY) { + /** True if `snapshot` enables redirect-writer-only feature. */ + def isFeatureSupported(snapshot: Snapshot): Boolean = { + snapshot.protocol.isFeatureSupported(RedirectWriterOnlyFeature) + } + + /** True if the update property command tries to set/unset redirect-writer-only feature. */ + def isUpdateProperty(snapshot: Snapshot, propKeys: Seq[String]): Boolean = { + propKeys.contains(DeltaConfigs.REDIRECT_WRITER_ONLY.key) && isFeatureSupported(snapshot) + } +} -object RedirectWriterOnly extends TableRedirect(config = DeltaConfigs.REDIRECT_WRITER_ONLY) +object RedirectFeature { + /** + * Determine whether the redirect-reader-writer or the redirect-writer-only feature is supported. + */ + def isFeatureSupported(snapshot: Snapshot): Boolean = { + RedirectReaderWriter.isFeatureSupported(snapshot) || + RedirectWriterOnly.isFeatureSupported(snapshot) + } + + /** + * Determine whether the operation `op` updates the existing redirect-reader-writer or + * redirect-writer-only table property of a table with `snapshot`. + */ + def isUpdateProperty(snapshot: Snapshot, op: DeltaOperations.Operation): Boolean = { + op match { + case _ @ DeltaOperations.SetTableProperties(properties) => + val propertyKeys = properties.keySet.toSeq + RedirectReaderWriter.isUpdateProperty(snapshot, propertyKeys) || + RedirectWriterOnly.isUpdateProperty(snapshot, propertyKeys) + case _ @ DeltaOperations.UnsetTableProperties(propertyKeys, _) => + RedirectReaderWriter.isUpdateProperty(snapshot, propertyKeys) || + RedirectWriterOnly.isUpdateProperty(snapshot, propertyKeys) + case _ => false + } + } + + /** Get the current `TableRedirectConfiguration` object from the snapshot. */ + def getRedirectConfiguration(snapshot: Snapshot): Option[TableRedirectConfiguration] = { + if (RedirectWriterOnly.isFeatureSupported(snapshot)) { + RedirectWriterOnly.getRedirectConfiguration(snapshot.metadata) + } else { + RedirectReaderWriter.getRedirectConfiguration(snapshot.metadata) + } + } +} diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala b/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala index 4257c8b8dd..5e6450f920 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala @@ -1969,6 +1969,13 @@ trait DeltaSQLConfBase { .intConf .createWithDefault(50000000) + val SKIP_REDIRECT_FEATURE = + buildConf("skipRedirectFeature") + .doc("True if skipping the redirect feature.") + .internal() + .booleanConf + .createWithDefault(false) + val DELTA_OPTIMIZE_WRITE_MAX_SHUFFLE_PARTITIONS = buildConf("optimizeWrite.maxShufflePartitions") .internal() diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/TableRedirectSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/TableRedirectSuite.scala index 228792988d..f8ccc1ee70 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/TableRedirectSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/TableRedirectSuite.scala @@ -22,6 +22,7 @@ import org.apache.spark.sql.delta.coordinatedcommits.CoordinatedCommitsBaseSuite import org.apache.spark.sql.delta.redirect.{ DropRedirectInProgress, EnableRedirectInProgress, + NoRedirectRule, PathBasedRedirectSpec, RedirectReaderWriter, RedirectReady, @@ -29,11 +30,13 @@ import org.apache.spark.sql.delta.redirect.{ RedirectWriterOnly, TableRedirect } +import org.apache.spark.sql.delta.sources.DeltaSQLConf import org.apache.spark.sql.delta.test.{DeltaSQLCommandTest, DeltaSQLTestUtils} import org.apache.hadoop.fs.Path -import org.apache.spark.sql.QueryTest +import org.apache.spark.sql.{QueryTest, SaveMode, SparkSession} import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.catalog.CatalogTable import org.apache.spark.sql.test.SharedSparkSession class TableRedirectSuite extends QueryTest @@ -81,44 +84,214 @@ class TableRedirectSuite extends QueryTest } } - Seq(RedirectReaderWriter, RedirectWriterOnly).foreach { feature => - test(s"basic table redirect: ${feature.config.key}") { + def redirectTest(label: String)(f: (DeltaLog, File, File, CatalogTable) => Unit): Unit = { + test(s"basic table redirect: $label") { withTempDir { sourceTablePath => withTempDir { destTablePath => withTable("t1") { sql(s"CREATE external TABLE t1(c0 long)USING delta LOCATION '$sourceTablePath';") val catalogTable = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t1")) val deltaLog = DeltaLog.forTable(spark, new Path(sourceTablePath.getCanonicalPath)) - val snapshot = deltaLog.update() - assert(!feature.isFeatureSet(snapshot.metadata)) - val redirectSpec = new PathBasedRedirectSpec(destTablePath.getCanonicalPath) - val catalogTableOpt = Some(catalogTable) - val redirectType = PathBasedRedirectSpec.REDIRECT_TYPE - // Step-1: Initiate table redirection and set to EnableRedirectInProgress state. - feature.add(deltaLog, catalogTableOpt, redirectType, redirectSpec) - validateState(deltaLog, EnableRedirectInProgress, destTablePath, feature) - // Step-2: Complete table redirection and set to RedirectReady state. - feature.update(deltaLog, catalogTableOpt, RedirectReady, redirectSpec) - validateState(deltaLog, RedirectReady, destTablePath, feature) - // Step-3: Start dropping table redirection and set to DropRedirectInProgress state. - feature.update(deltaLog, catalogTableOpt, DropRedirectInProgress, redirectSpec) - validateState(deltaLog, DropRedirectInProgress, destTablePath, feature) - // Step-4: Finish dropping table redirection and remove the property completely. - feature.remove(deltaLog, catalogTableOpt) - validateRemovedState(deltaLog, feature) - // Step-5: Initiate table redirection and set to EnableRedirectInProgress state one - // more time. - withTempDir { destTablePath2 => - val redirectSpec = new PathBasedRedirectSpec(destTablePath2.getCanonicalPath) - feature.add(deltaLog, catalogTableOpt, redirectType, redirectSpec) - validateState(deltaLog, EnableRedirectInProgress, destTablePath2, feature) - // Step-6: Finish dropping table redirection and remove the property completely. - feature.remove(deltaLog, catalogTableOpt) - validateRemovedState(deltaLog, feature) - } + f(deltaLog, sourceTablePath, destTablePath, catalogTable) } } } } } + + Seq(RedirectReaderWriter, RedirectWriterOnly).foreach { feature => + val featureName = feature.config.key + redirectTest(s"basic redirect: $featureName") { case (deltaLog, _, dest, catalogTable) => + val snapshot = deltaLog.update() + assert(!feature.isFeatureSet(snapshot.metadata)) + val redirectSpec = new PathBasedRedirectSpec(dest.getCanonicalPath) + val catalogTableOpt = Some(catalogTable) + val redirectType = PathBasedRedirectSpec.REDIRECT_TYPE + // Step-1: Initiate table redirection and set to EnableRedirectInProgress state. + feature.add(deltaLog, catalogTableOpt, redirectType, redirectSpec) + validateState(deltaLog, EnableRedirectInProgress, dest, feature) + // Step-2: Complete table redirection and set to RedirectReady state. + feature.update(deltaLog, catalogTableOpt, RedirectReady, redirectSpec) + validateState(deltaLog, RedirectReady, dest, feature) + // Step-3: Start dropping table redirection and set to DropRedirectInProgress state. + feature.update(deltaLog, catalogTableOpt, DropRedirectInProgress, redirectSpec) + validateState(deltaLog, DropRedirectInProgress, dest, feature) + // Step-4: Finish dropping table redirection and remove the property completely. + feature.remove(deltaLog, catalogTableOpt) + validateRemovedState(deltaLog, feature) + // Step-5: Initiate table redirection and set to EnableRedirectInProgress state one + // more time. + withTempDir { destTablePath2 => + val redirectSpec = new PathBasedRedirectSpec(destTablePath2.getCanonicalPath) + feature.add(deltaLog, catalogTableOpt, redirectType, redirectSpec) + validateState(deltaLog, EnableRedirectInProgress, destTablePath2, feature) + // Step-6: Finish dropping table redirection and remove the property completely. + feature.remove(deltaLog, catalogTableOpt) + validateRemovedState(deltaLog, feature) + } + } + + redirectTest(s"Redirect $featureName: empty no redirect rules") { + case (deltaLog, source, dest, catalogTable) => + val snapshot = deltaLog.update() + assert(!feature.isFeatureSet(snapshot.metadata)) + val redirectSpec = new PathBasedRedirectSpec(dest.getCanonicalPath) + val catalogTableOpt = Some(catalogTable) + val redirectType = PathBasedRedirectSpec.REDIRECT_TYPE + // 0. Initialize table redirection by setting table to EnableRedirectInProgress state. + feature.add(deltaLog, catalogTableOpt, redirectType, redirectSpec) + validateState(deltaLog, EnableRedirectInProgress, dest, feature) + + // 1. INSERT should hit DELTA_COMMIT_INTERMEDIATE_REDIRECT_STATE because table is in + // EnableRedirectInProgress, which doesn't allow any DML and DDL. + val exception1 = intercept[DeltaIllegalStateException] { + sql(s"insert into delta.`$source` values(1),(2),(3),(4),(5),(6)") + } + assert(exception1.getErrorClass == "DELTA_COMMIT_INTERMEDIATE_REDIRECT_STATE") + + // 2. DDL should hit DELTA_COMMIT_INTERMEDIATE_REDIRECT_STATE because table is in + // EnableRedirectInProgress, which doesn't allow any DML and DDL. + val exception2 = intercept[DeltaIllegalStateException] { + sql(s"alter table delta.`$source` add column c3 long") + } + assert(exception2.getErrorClass == "DELTA_COMMIT_INTERMEDIATE_REDIRECT_STATE") + + // 3. Move to RedirectReady state. + feature.update(deltaLog, catalogTableOpt, RedirectReady, redirectSpec) + + // 4. INSERT should hit DELTA_NO_REDIRECT_RULES_VIOLATED since the + // no-redirect-rules is empty. + validateState(deltaLog, RedirectReady, dest, feature) + val exception3 = intercept[DeltaIllegalStateException] { + sql(s"insert into delta.`$source` values(1),(2),(3),(4),(5),(6)") + } + assert(exception3.getErrorClass == "DELTA_NO_REDIRECT_RULES_VIOLATED") + + // 5. DDL should hit DELTA_NO_REDIRECT_RULES_VIOLATED since the + // no-redirect-rules is empty. + val exception4 = intercept[DeltaIllegalStateException] { + sql(s"alter table delta.`$source` add column c3 long") + } + assert(exception4.getErrorClass == "DELTA_NO_REDIRECT_RULES_VIOLATED") + + // 6. Move to DropRedirectInProgress state. + feature.update(deltaLog, catalogTableOpt, DropRedirectInProgress, redirectSpec) + + // 7. INSERT should hit DELTA_COMMIT_INTERMEDIATE_REDIRECT_STATE because table is in + // DropRedirectInProgress, which doesn't allow any DML and DDL. + validateState(deltaLog, DropRedirectInProgress, dest, feature) + val exception5 = intercept[DeltaIllegalStateException] { + sql(s"insert into delta.`$source` values(1),(2),(3),(4),(5),(6)") + } + assert(exception5.getErrorClass == "DELTA_COMMIT_INTERMEDIATE_REDIRECT_STATE") + + // 8. DDL should hit DELTA_COMMIT_INTERMEDIATE_REDIRECT_STATE because table is in + // DropRedirectInProgress, which doesn't allow any DML and DDL. + val exception6 = intercept[DeltaIllegalStateException] { + sql(s"alter table delta.`$source` add column c3 long") + } + assert(exception6.getErrorClass == "DELTA_COMMIT_INTERMEDIATE_REDIRECT_STATE") + } + + redirectTest(s"Redirect $featureName: no redirect rules") { + case (deltaLog, source, dest, catalogTable) => + val snapshot = deltaLog.update() + assert(!feature.isFeatureSet(snapshot.metadata)) + val redirectSpec = new PathBasedRedirectSpec(dest.getCanonicalPath) + val catalogTableOpt = Some(catalogTable) + val redirectType = PathBasedRedirectSpec.REDIRECT_TYPE + sql(s"insert into delta.`$source` values(1),(2),(3),(4),(5),(6)") + feature.add(deltaLog, catalogTableOpt, redirectType, redirectSpec) + validateState(deltaLog, EnableRedirectInProgress, dest, feature) + // 1. Move table redirect to RedirectReady state with no redirect rules that + // allows WRITE, DELETE, UPDATE. + var noRedirectRules = Set( + NoRedirectRule( + appName = None, + allowedOperations = Set( + DeltaOperations.Write(SaveMode.Append).name, + DeltaOperations.Delete(Seq.empty).name, + DeltaOperations.Update(None).name + ) + ) + ) + feature.update(deltaLog, catalogTableOpt, RedirectReady, redirectSpec, noRedirectRules) + validateState(deltaLog, RedirectReady, dest, feature) + sql(s"insert into delta.`$source` values(1),(2),(3),(4),(5),(6)") + sql(s"update delta.`$source` set c0 = 100") + sql(s"delete from delta.`$source` where c0 = 1") + + // 2. Move table redirect to RedirectReady state with no-redirect-rules that + // allows UPDATE. + noRedirectRules = Set( + NoRedirectRule( + appName = None, allowedOperations = Set(DeltaOperations.Update(None).name) + ) + ) + feature.update(deltaLog, catalogTableOpt, RedirectReady, redirectSpec, noRedirectRules) + validateState(deltaLog, RedirectReady, dest, feature) + // 2.1. WRITE should be aborted because no-redirect-rules only allow UPDATE. + val exception1 = intercept[DeltaIllegalStateException] { + sql(s"insert into delta.`$source` values(1),(2),(3),(4),(5),(6)") + } + assert(exception1.getErrorClass == "DELTA_NO_REDIRECT_RULES_VIOLATED") + + // 2.2. UPDATE should pass because no-redirect-rules is fulfilled. + sql(s"update delta.`$source` set c0 = 100") + + // 2.3. DELETE should be aborted because no-redirect-rules only allow UPDATE. + val exception3 = intercept[DeltaIllegalStateException] { + sql(s"delete from delta.`$source` where c0 = 1") + } + assert(exception3.getErrorClass == "DELTA_NO_REDIRECT_RULES_VIOLATED") + + // 2.4. Disabling SKIP_REDIRECT_FEATURE should allow all DMLs to pass. + withSQLConf(DeltaSQLConf.SKIP_REDIRECT_FEATURE.key -> "true") { + sql(s"insert into delta.`$source` values(1),(2),(3),(4),(5),(6)") + sql(s"delete from delta.`$source` where c0 = 1") + } + + // 3. Move table redirect to RedirectReady state with no-redirect-rules that + // allows Write on appName "etl" . + noRedirectRules = Set( + NoRedirectRule( + appName = Some("etl"), + allowedOperations = Set(DeltaOperations.Write(SaveMode.Append).name) + ) + ) + feature.update(deltaLog, catalogTableOpt, RedirectReady, redirectSpec, noRedirectRules) + validateState(deltaLog, RedirectReady, dest, feature) + + // 3.1. The WRITE of appName "dummy" would be aborted because no-redirect-rules + // only allow WRITE on application "etl". + val exception4 = intercept[DeltaIllegalStateException] { + spark.conf.set("spark.app.name", "dummy") + sql(s"insert into delta.`$source` values(1),(2),(3),(4),(5),(6)") + } + assert(exception4.getErrorClass == "DELTA_NO_REDIRECT_RULES_VIOLATED") + + // 3.1. WRITE should pass + spark.conf.set("spark.app.name", "etl") + sql(s"insert into delta.`$source` values(1),(2),(3),(4),(5),(6)") + + // 3.2. UPDATE should be aborted because no-redirect-rules only allow WRITE. + val exception5 = intercept[DeltaIllegalStateException] { + sql(s"update delta.`$source` set c0 = 100") + } + assert(exception5.getErrorClass == "DELTA_NO_REDIRECT_RULES_VIOLATED") + + // 3.3. DELETE should be aborted because no-redirect-rules only allow WRITE. + val exception6 = intercept[DeltaIllegalStateException] { + sql(s"delete from delta.`$source` where c0 = 1") + } + assert(exception6.getErrorClass == "DELTA_NO_REDIRECT_RULES_VIOLATED") + + // 3.4. Disabling SKIP_REDIRECT_FEATURE should allow all DMLs to pass. + withSQLConf(DeltaSQLConf.SKIP_REDIRECT_FEATURE.key -> "true") { + sql(s"insert into delta.`$source` values(1),(2),(3),(4),(5),(6)") + sql(s"update delta.`$source` set c0 = 100") + sql(s"delete from delta.`$source` where c0 = 1") + } + } + } }