Skip to content

Commit

Permalink
no-redirect rules
Browse files Browse the repository at this point in the history
  • Loading branch information
kamcheungting-db committed Oct 29, 2024
1 parent 43853d0 commit 2cf6949
Show file tree
Hide file tree
Showing 7 changed files with 439 additions and 50 deletions.
13 changes: 13 additions & 0 deletions spark/src/main/resources/error/delta-error-classes.json
Original file line number Diff line number Diff line change
Expand Up @@ -516,6 +516,12 @@
],
"sqlState" : "2200G"
},
"DELTA_COMMIT_INTERMEDIATE_REDIRECT_STATE" : {
"message" : [
"Cannot handle commit of table within redirect table state '<state>'."
],
"sqlState" : "42P01"
},
"DELTA_COMPLEX_TYPE_COLUMN_CONTAINS_NULL_TYPE" : {
"message" : [
" Found nested NullType in column <columName> which is of <dataType>. Delta doesn't support writing NullType in complex types."
Expand Down Expand Up @@ -1890,6 +1896,13 @@
],
"sqlState" : "KD006"
},
"DELTA_NO_REDIRECT_RULES_VIOLATED" : {
"message" : [
"Operation not allowed: <operation> cannot be performed on a table with redirect feature.",
"The no redirect rules are not satisfied <noRedirectRules>."
],
"sqlState" : "42P01"
},
"DELTA_NO_RELATION_TABLE" : {
"message" : [
"Table <tableIdent> not found"
Expand Down
18 changes: 18 additions & 0 deletions spark/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import org.apache.spark.sql.delta.constraints.Constraints
import org.apache.spark.sql.delta.hooks.AutoCompactType
import org.apache.spark.sql.delta.hooks.PostCommitHook
import org.apache.spark.sql.delta.metering.DeltaLogging
import org.apache.spark.sql.delta.redirect.NoRedirectRule
import org.apache.spark.sql.delta.redirect.RedirectState
import org.apache.spark.sql.delta.schema.{DeltaInvariantViolationException, InvariantViolationException, SchemaUtils, UnsupportedDataTypeInfo}
import org.apache.spark.sql.delta.sources.DeltaSQLConf
Expand Down Expand Up @@ -362,6 +363,23 @@ trait DeltaErrorsBase
)
}

def invalidCommitIntermediateRedirectState(state: RedirectState): Throwable = {
throw new DeltaIllegalStateException (
errorClass = "DELTA_COMMIT_INTERMEDIATE_REDIRECT_STATE",
messageParameters = Array(state.name)
)
}

def noRedirectRulesViolated(
op: DeltaOperations.Operation,
noRedirectRules: Set[NoRedirectRule]): Throwable = {
throw new DeltaIllegalStateException (
errorClass = "DELTA_NO_REDIRECT_RULES_VIOLATED",
messageParameters =
Array(op.name, noRedirectRules.map("\"" + _ + "\"").mkString("[", ",\n", "]"))
)
}

def incorrectLogStoreImplementationException(
sparkConf: SparkConf,
cause: Throwable): Throwable = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -735,7 +735,7 @@ object DeltaOperations {
case class VacuumStart(
retentionCheckEnabled: Boolean,
specifiedRetentionMillis: Option[Long],
defaultRetentionMillis: Long) extends Operation("VACUUM START") {
defaultRetentionMillis: Long) extends Operation(VacuumStart.OPERATION_NAME) {
override val parameters: Map[String, Any] = Map(
"retentionCheckEnabled" -> retentionCheckEnabled,
"defaultRetentionMillis" -> defaultRetentionMillis
Expand All @@ -749,10 +749,14 @@ object DeltaOperations {
override val isInPlaceFileMetadataUpdate: Option[Boolean] = Some(false)
}

object VacuumStart {
val OPERATION_NAME = "VACUUM START"
}

/**
* @param status - whether the vacuum operation was successful; either "COMPLETED" or "FAILED"
*/
case class VacuumEnd(status: String) extends Operation(s"VACUUM END") {
case class VacuumEnd(status: String) extends Operation(VacuumEnd.OPERATION_NAME) {
override val parameters: Map[String, Any] = Map(
"status" -> status
)
Expand All @@ -765,6 +769,10 @@ object DeltaOperations {
override val isInPlaceFileMetadataUpdate: Option[Boolean] = Some(false)
}

object VacuumEnd {
val OPERATION_NAME = "VACUUM END"
}

/** Recorded when running REORG on the table. */
case class Reorg(
predicate: Seq[Expression],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import org.apache.spark.sql.delta.hooks.ChecksumHook
import org.apache.spark.sql.delta.implicits.addFileEncoder
import org.apache.spark.sql.delta.logging.DeltaLogKeys
import org.apache.spark.sql.delta.metering.DeltaLogging
import org.apache.spark.sql.delta.redirect.{RedirectFeature, TableRedirectConfiguration}
import org.apache.spark.sql.delta.schema.{SchemaMergingUtils, SchemaUtils}
import org.apache.spark.sql.delta.sources.{DeltaSourceUtils, DeltaSQLConf}
import org.apache.spark.sql.delta.stats._
Expand All @@ -55,6 +56,7 @@ import org.apache.spark.internal.{MDC, MessageWithContext}
import org.apache.spark.sql.{AnalysisException, Column, DataFrame, SparkSession}
import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical.UnsetTableProperties
import org.apache.spark.sql.catalyst.types.DataTypeUtils.toAttributes
import org.apache.spark.sql.catalyst.util.{CharVarcharUtils, ResolveDefaultColumns}
import org.apache.spark.sql.types.{StructField, StructType}
Expand Down Expand Up @@ -1246,6 +1248,56 @@ trait OptimisticTransactionImpl extends TransactionalWrite
}
}

/**
* This method goes through all no-redirect-rules inside redirect feature to determine
* whether the current operation is valid to run on this table.
*/
private def performNoRedirectRulesCheck(
op: DeltaOperations.Operation,
redirectConfig: TableRedirectConfiguration
): Unit = {
// Find all rules that match with the current application name.
// If appName is not present, its no-redirect-rule are included.
// If appName is present, includes its no-redirect-rule only when appName
// matches with "spark.app.name".
val rulesOfMatchedApps = redirectConfig.noRedirectRules.filter { rule =>
rule.appName.forall(_.equalsIgnoreCase(spark.conf.get("spark.app.name")))
}
// Determine whether any rule is satisfied the given operation.
val noRuleSatisfied = !rulesOfMatchedApps.exists(_.allowedOperations.contains(op.name))
// If there is no rule satisfied, block the given operation.
if (noRuleSatisfied) {
throw DeltaErrors.noRedirectRulesViolated(op, redirectConfig.noRedirectRules)
}
}

/**
* This method determines whether `op` is valid when the table redirect feature is
* set on current table.
* 1. If redirect table feature is in progress state, no DML/DDL is allowed to execute.
* 2. If user tries to access redirect source table, only the allowed operations listed
* inside no-redirect-rules are valid.
*/
protected def performRedirectCheck(op: DeltaOperations.Operation): Unit = {
// If redirect conflict check is not enable, skips all remaining validations.
if (spark.conf.get(DeltaSQLConf.SKIP_REDIRECT_FEATURE)) return
// If redirect feature is not set, then skips validation.
if (!RedirectFeature.isFeatureSupported(snapshot)) return
// If this transaction tried to unset redirect feature, then skips validation.
if (RedirectFeature.isUpdateProperty(snapshot, op)) return
// Get the redirect configuration from current snapshot.
val redirectConfigOpt = RedirectFeature.getRedirectConfiguration(snapshot)
redirectConfigOpt.foreach { redirectConfig =>
// If the redirect state is in EnableRedirectInProgress or DropRedirectInProgress,
// all DML and DDL operation should be aborted.
if (redirectConfig.isInProgressState) {
throw DeltaErrors.invalidCommitIntermediateRedirectState(redirectConfig.redirectState)
}
// Validates the no redirect rules on the transactions that access redirect source table.
performNoRedirectRulesCheck(op, redirectConfig)
}
}

@throws(classOf[ConcurrentModificationException])
protected def commitImpl(
actions: Seq[Action],
Expand All @@ -1255,6 +1307,9 @@ trait OptimisticTransactionImpl extends TransactionalWrite
commitStartNano = System.nanoTime()

val (version, postCommitSnapshot, actualCommittedActions) = try {
// Check for satisfaction of no redirect rules
performRedirectCheck(op)

// Check for CDC metadata columns
performCdcMetadataCheck()

Expand Down
Loading

0 comments on commit 2cf6949

Please sign in to comment.