Skip to content

Commit

Permalink
the initial version of table redirect
Browse files Browse the repository at this point in the history
  • Loading branch information
kamcheungting-db committed Oct 25, 2024
1 parent 85db7e4 commit e5e0024
Show file tree
Hide file tree
Showing 6 changed files with 483 additions and 0 deletions.
13 changes: 13 additions & 0 deletions spark/src/main/resources/error/delta-error-classes.json
Original file line number Diff line number Diff line change
Expand Up @@ -2371,6 +2371,19 @@
],
"sqlState" : "XXKDS"
},
"DELTA_TABLE_INVALID_REDIRECT_STATE_TRANSITION" : {
"message" : [
"Unable to update table redirection state: Invalid state transition attempted.",
"The Delta table '<table>' cannot change from '<oldState>' to '<newState>'."
],
"sqlState" : "KD007"
},
"DELTA_TABLE_INVALID_REMOVE_TABLE_REDIRECT" : {
"message" : [
"Unable to remove table redirection for <table> due to its invalid state: <currentState>."
],
"sqlState" : "KD007"
},
"DELTA_TABLE_LOCATION_MISMATCH" : {
"message" : [
"The location of the existing table <tableName> is <existingTableLocation>. It doesn't match the specified location <tableLocation>."
Expand Down
20 changes: 20 additions & 0 deletions spark/src/main/scala/org/apache/spark/sql/delta/DeltaConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -406,6 +406,26 @@ trait DeltaConfigsBase extends DeltaLogging {
_ > 0,
"needs to be a positive integer.")

/**
* This is the property that describes the table redirection detail. It is a JSON string format
* of the `TableRedirectConfiguration` class, which includes following attributes:
* - type(String): The type of redirection.
* - state(String): The current state of the redirection:
* ENABLE-REDIRECT-IN-PROGRESS, REDIRECT-READY, DROP-REDIRECT-IN-PROGRESS.
* - spec(JSON String): The specification of accessing redirect destination table. This is free
* form json object. Each delta service provider can customize its own
* implementation. In Databricks, it is an object that contains a
* attribute named `Table` with Map[String, String] type.
*/
val REDIRECT_READER_WRITER: DeltaConfig[Option[String]] =
buildConfig[Option[String]](
"redirectReaderWriter-preview",
null,
v => Option(v),
_ => true,
"A JSON representation of the TableRedirectConfiguration class, which contains all " +
"information of redirect reader writer feature.")

/**
* Enable auto compaction for a Delta table. When enabled, we will check if files already
* written to a Delta table can leverage compaction after a commit. If so, we run a post-commit
Expand Down
19 changes: 19 additions & 0 deletions spark/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import org.apache.spark.sql.delta.constraints.Constraints
import org.apache.spark.sql.delta.hooks.AutoCompactType
import org.apache.spark.sql.delta.hooks.PostCommitHook
import org.apache.spark.sql.delta.metering.DeltaLogging
import org.apache.spark.sql.delta.redirect.RedirectState
import org.apache.spark.sql.delta.schema.{DeltaInvariantViolationException, InvariantViolationException, SchemaUtils, UnsupportedDataTypeInfo}
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.delta.util.JsonUtils
Expand Down Expand Up @@ -343,6 +344,24 @@ trait DeltaErrorsBase
)
}

def invalidRedirectStateTransition(
table: String,
oldState: RedirectState,
newState: RedirectState): Unit = {
new DeltaIllegalStateException(
errorClass = "DELTA_TABLE_INVALID_REDIRECT_STATE_TRANSITION",
messageParameters = Array(
table, table, oldState.name, newState.name)
)
}

def invalidRemoveTableRedirect(table: String, currentState: RedirectState): Unit = {
new DeltaIllegalStateException(
errorClass = "DELTA_TABLE_INVALID_REMOVE_TABLE_REDIRECT",
messageParameters = Array(table, table, currentState.name)
)
}

def incorrectLogStoreImplementationException(
sparkConf: SparkConf,
cause: Throwable): Throwable = {
Expand Down
14 changes: 14 additions & 0 deletions spark/src/main/scala/org/apache/spark/sql/delta/TableFeature.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import org.apache.spark.sql.delta.actions._
import org.apache.spark.sql.delta.catalog.DeltaTableV2
import org.apache.spark.sql.delta.constraints.{Constraints, Invariants}
import org.apache.spark.sql.delta.coordinatedcommits.CoordinatedCommitsUtils
import org.apache.spark.sql.delta.redirect.RedirectReaderWriter
import org.apache.spark.sql.delta.schema.SchemaMergingUtils
import org.apache.spark.sql.delta.schema.SchemaUtils
import org.apache.spark.sql.delta.sources.DeltaSQLConf
Expand Down Expand Up @@ -367,6 +368,7 @@ object TableFeature {
CoordinatedCommitsTableFeature)
if (DeltaUtils.isTesting && testingFeaturesEnabled) {
features ++= Set(
RedirectReaderWriterFeature,
TestLegacyWriterFeature,
TestLegacyReaderWriterFeature,
TestWriterFeature,
Expand Down Expand Up @@ -576,6 +578,18 @@ object TimestampNTZTableFeature extends ReaderWriterFeature(name = "timestampNtz
}
}

object RedirectReaderWriterFeature
extends ReaderWriterFeature(name = "redirectReaderWriter-preview")
with FeatureAutomaticallyEnabledByMetadata {
override def metadataRequiresFeatureToBeEnabled(
protocol: Protocol,
metadata: Metadata,
spark: SparkSession
): Boolean = RedirectReaderWriter.isFeatureSet(metadata)

override def automaticallyUpdateProtocolOfExistingTables: Boolean = true
}

object VariantTypeTableFeature extends ReaderWriterFeature(name = "variantType-preview")
with FeatureAutomaticallyEnabledByMetadata {
override def metadataRequiresFeatureToBeEnabled(
Expand Down
Loading

0 comments on commit e5e0024

Please sign in to comment.