Skip to content

Commit

Permalink
[Table Redirect] Introduce Redirect ReaderWriter Feature (#3801)
Browse files Browse the repository at this point in the history
<!--
Thanks for sending a pull request!  Here are some tips for you:
1. If this is your first time, please read our contributor guidelines:
https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md
2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP]
Your PR title ...'.
  3. Be sure to keep the PR description updated to reflect all changes.
  4. Please write your PR title to summarize what this PR proposes.
5. If possible, provide a concise example to reproduce the issue for a
faster review.
6. If applicable, include the corresponding issue number in the PR title
and link it in the body.
-->

#### Which Delta project/connector is this regarding?
<!--
Please add the component selected below to the beginning of the pull
request title
For example: [Spark] Title of my pull request
-->

- [x] Spark
- [ ] Standalone
- [ ] Flink
- [ ] Kernel
- [ ] Other (fill in here)

## Description


This PR introduces a new reader-writer table feature "redirection". This
table feature would redirect the read and write query from the current
storage location to a new storage location described inside the value of
table feature.

The redirection has several phases to ensure no anomaly. To label these
phases, we introduces four states:

0. NO-REDIRECT: This state indicates that redirect is not enabled on the
table.
1. ENABLE-REDIRECT-IN-PROGRESS: This state indicates that the redirect
process is still going on. No DML or DDL transaction can be committed to
the table when the table is in this state.
2. REDIRECT-READY: This state indicates that the redirect process is
completed. All types of queries would be redirected to the table
specified inside RedirectSpec object.
3. DROP-REDIRECT-IN-PROGRESS: The table redirection is under withdrawal
and the redirection property is going to be removed from the delta
table. In this state, the delta client stops redirecting new queries to
redirect destination tables, and only accepts read-only queries to
access the redirect source table.

To ensure no undefined behavior, the valid procedures of state
transition are:

0. NO-REDIRECT -> ENABLE-REDIRECT-IN-PROGRESS
1. ENABLE-REDIRECT-IN-PROGRESS -> REDIRECT-READY
2. REDIRECT-READY -> DROP-REDIRECT-IN-PROGRESS
3. DROP-REDIRECT-IN-PROGRESS -> NO-REDIRECT
4. ENABLE-REDIRECT-IN-PROGRESS -> NO-REDIRECT


The protocol RFC document is on:
#3702

## How was this patch tested?

Unit Test of transition between different states of redirection.


## Does this PR introduce _any_ user-facing changes?
No
  • Loading branch information
kamcheungting-db authored Oct 25, 2024
1 parent 85db7e4 commit f6a771d
Show file tree
Hide file tree
Showing 6 changed files with 486 additions and 0 deletions.
13 changes: 13 additions & 0 deletions spark/src/main/resources/error/delta-error-classes.json
Original file line number Diff line number Diff line change
Expand Up @@ -2371,6 +2371,19 @@
],
"sqlState" : "XXKDS"
},
"DELTA_TABLE_INVALID_REDIRECT_STATE_TRANSITION" : {
"message" : [
"Unable to update table redirection state: Invalid state transition attempted.",
"The Delta table '<table>' cannot change from '<oldState>' to '<newState>'."
],
"sqlState" : "KD007"
},
"DELTA_TABLE_INVALID_REMOVE_TABLE_REDIRECT" : {
"message" : [
"Unable to remove table redirection for <table> due to its invalid state: <currentState>."
],
"sqlState" : "KD007"
},
"DELTA_TABLE_LOCATION_MISMATCH" : {
"message" : [
"The location of the existing table <tableName> is <existingTableLocation>. It doesn't match the specified location <tableLocation>."
Expand Down
20 changes: 20 additions & 0 deletions spark/src/main/scala/org/apache/spark/sql/delta/DeltaConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -406,6 +406,26 @@ trait DeltaConfigsBase extends DeltaLogging {
_ > 0,
"needs to be a positive integer.")

/**
* This is the property that describes the table redirection detail. It is a JSON string format
* of the `TableRedirectConfiguration` class, which includes following attributes:
* - type(String): The type of redirection.
* - state(String): The current state of the redirection:
* ENABLE-REDIRECT-IN-PROGRESS, REDIRECT-READY, DROP-REDIRECT-IN-PROGRESS.
* - spec(JSON String): The specification of accessing redirect destination table. This is free
* form json object. Each delta service provider can customize its own
* implementation. In Databricks, it is an object that contains a
* attribute named `Table` with Map[String, String] type.
*/
val REDIRECT_READER_WRITER: DeltaConfig[Option[String]] =
buildConfig[Option[String]](
"redirectReaderWriter-preview",
null,
v => Option(v),
_ => true,
"A JSON representation of the TableRedirectConfiguration class, which contains all " +
"information of redirect reader writer feature.")

/**
* Enable auto compaction for a Delta table. When enabled, we will check if files already
* written to a Delta table can leverage compaction after a commit. If so, we run a post-commit
Expand Down
19 changes: 19 additions & 0 deletions spark/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import org.apache.spark.sql.delta.constraints.Constraints
import org.apache.spark.sql.delta.hooks.AutoCompactType
import org.apache.spark.sql.delta.hooks.PostCommitHook
import org.apache.spark.sql.delta.metering.DeltaLogging
import org.apache.spark.sql.delta.redirect.RedirectState
import org.apache.spark.sql.delta.schema.{DeltaInvariantViolationException, InvariantViolationException, SchemaUtils, UnsupportedDataTypeInfo}
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.delta.util.JsonUtils
Expand Down Expand Up @@ -343,6 +344,24 @@ trait DeltaErrorsBase
)
}

def invalidRedirectStateTransition(
table: String,
oldState: RedirectState,
newState: RedirectState): Unit = {
new DeltaIllegalStateException(
errorClass = "DELTA_TABLE_INVALID_REDIRECT_STATE_TRANSITION",
messageParameters = Array(
table, table, oldState.name, newState.name)
)
}

def invalidRemoveTableRedirect(table: String, currentState: RedirectState): Unit = {
new DeltaIllegalStateException(
errorClass = "DELTA_TABLE_INVALID_REMOVE_TABLE_REDIRECT",
messageParameters = Array(table, table, currentState.name)
)
}

def incorrectLogStoreImplementationException(
sparkConf: SparkConf,
cause: Throwable): Throwable = {
Expand Down
14 changes: 14 additions & 0 deletions spark/src/main/scala/org/apache/spark/sql/delta/TableFeature.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import org.apache.spark.sql.delta.actions._
import org.apache.spark.sql.delta.catalog.DeltaTableV2
import org.apache.spark.sql.delta.constraints.{Constraints, Invariants}
import org.apache.spark.sql.delta.coordinatedcommits.CoordinatedCommitsUtils
import org.apache.spark.sql.delta.redirect.RedirectReaderWriter
import org.apache.spark.sql.delta.schema.SchemaMergingUtils
import org.apache.spark.sql.delta.schema.SchemaUtils
import org.apache.spark.sql.delta.sources.DeltaSQLConf
Expand Down Expand Up @@ -367,6 +368,7 @@ object TableFeature {
CoordinatedCommitsTableFeature)
if (DeltaUtils.isTesting && testingFeaturesEnabled) {
features ++= Set(
RedirectReaderWriterFeature,
TestLegacyWriterFeature,
TestLegacyReaderWriterFeature,
TestWriterFeature,
Expand Down Expand Up @@ -576,6 +578,18 @@ object TimestampNTZTableFeature extends ReaderWriterFeature(name = "timestampNtz
}
}

object RedirectReaderWriterFeature
extends ReaderWriterFeature(name = "redirectReaderWriter-preview")
with FeatureAutomaticallyEnabledByMetadata {
override def metadataRequiresFeatureToBeEnabled(
protocol: Protocol,
metadata: Metadata,
spark: SparkSession
): Boolean = RedirectReaderWriter.isFeatureSet(metadata)

override def automaticallyUpdateProtocolOfExistingTables: Boolean = true
}

object VariantTypeTableFeature extends ReaderWriterFeature(name = "variantType-preview")
with FeatureAutomaticallyEnabledByMetadata {
override def metadataRequiresFeatureToBeEnabled(
Expand Down
Loading

0 comments on commit f6a771d

Please sign in to comment.