Skip to content

Commit

Permalink
[Spark] Introduce Redirect WriterOnly Feature (#3813)
Browse files Browse the repository at this point in the history
<!--
Thanks for sending a pull request!  Here are some tips for you:
1. If this is your first time, please read our contributor guidelines:
https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md
2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP]
Your PR title ...'.
  3. Be sure to keep the PR description updated to reflect all changes.
  4. Please write your PR title to summarize what this PR proposes.
5. If possible, provide a concise example to reproduce the issue for a
faster review.
6. If applicable, include the corresponding issue number in the PR title
and link it in the body.
-->

#### Which Delta project/connector is this regarding?
<!--
Please add the component selected below to the beginning of the pull
request title
For example: [Spark] Title of my pull request
-->

- [x] Spark
- [ ] Standalone
- [ ] Flink
- [ ] Kernel
- [ ] Other (fill in here)

## Description

This PR introduces a new writer-only table feature "redirection". This
table feature would redirect the read and write query from the current
storage location to a new storage location described inside the value of
table feature.


## How was this patch tested?


## Does this PR introduce _any_ user-facing changes?
No.
  • Loading branch information
kamcheungting-db authored Oct 28, 2024
1 parent 6599b40 commit ab5a908
Show file tree
Hide file tree
Showing 4 changed files with 84 additions and 43 deletions.
12 changes: 12 additions & 0 deletions spark/src/main/scala/org/apache/spark/sql/delta/DeltaConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,18 @@ trait DeltaConfigsBase extends DeltaLogging {
"A JSON representation of the TableRedirectConfiguration class, which contains all " +
"information of redirect reader writer feature.")

/**
* This table feature is same as REDIRECT_READER_WRITER except it is a writer only table feature.
*/
val REDIRECT_WRITER_ONLY: DeltaConfig[Option[String]] =
buildConfig[Option[String]](
"redirectWriterOnly-preview",
null,
v => Option(v),
_ => true,
"A JSON representation of the TableRedirectConfiguration class, which contains all " +
"information of redirect writer only feature.")

/**
* Enable auto compaction for a Delta table. When enabled, we will check if files already
* written to a Delta table can leverage compaction after a commit. If so, we run a post-commit
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import org.apache.spark.sql.delta.actions._
import org.apache.spark.sql.delta.catalog.DeltaTableV2
import org.apache.spark.sql.delta.constraints.{Constraints, Invariants}
import org.apache.spark.sql.delta.coordinatedcommits.CoordinatedCommitsUtils
import org.apache.spark.sql.delta.redirect.RedirectReaderWriter
import org.apache.spark.sql.delta.redirect.{RedirectReaderWriter, RedirectWriterOnly}
import org.apache.spark.sql.delta.schema.SchemaMergingUtils
import org.apache.spark.sql.delta.schema.SchemaUtils
import org.apache.spark.sql.delta.sources.DeltaSQLConf
Expand Down Expand Up @@ -369,6 +369,7 @@ object TableFeature {
if (DeltaUtils.isTesting && testingFeaturesEnabled) {
features ++= Set(
RedirectReaderWriterFeature,
RedirectWriterOnlyFeature,
TestLegacyWriterFeature,
TestLegacyReaderWriterFeature,
TestWriterFeature,
Expand Down Expand Up @@ -590,6 +591,17 @@ object RedirectReaderWriterFeature
override def automaticallyUpdateProtocolOfExistingTables: Boolean = true
}

object RedirectWriterOnlyFeature extends WriterFeature(name = "redirectWriterOnly-preview")
with FeatureAutomaticallyEnabledByMetadata {
override def metadataRequiresFeatureToBeEnabled(
protocol: Protocol,
metadata: Metadata,
spark: SparkSession
): Boolean = RedirectWriterOnly.isFeatureSet(metadata)

override def automaticallyUpdateProtocolOfExistingTables: Boolean = true
}

object VariantTypeTableFeature extends ReaderWriterFeature(name = "variantType-preview")
with FeatureAutomaticallyEnabledByMetadata {
override def metadataRequiresFeatureToBeEnabled(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ case class TableRedirectConfiguration(
/**
* This is the main class of the table redirect that interacts with other components.
*/
class TableRedirect(config: DeltaConfig[Option[String]]) {
class TableRedirect(val config: DeltaConfig[Option[String]]) {
/**
* Determine whether the property of table redirect feature is set.
*/
Expand Down Expand Up @@ -309,3 +309,5 @@ class TableRedirect(config: DeltaConfig[Option[String]]) {
}

object RedirectReaderWriter extends TableRedirect(config = DeltaConfigs.REDIRECT_READER_WRITER)

object RedirectWriterOnly extends TableRedirect(config = DeltaConfigs.REDIRECT_WRITER_ONLY)
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import org.apache.spark.sql.delta.redirect.{
RedirectReaderWriter,
RedirectReady,
RedirectState,
RedirectWriterOnly,
TableRedirect
}
import org.apache.spark.sql.delta.test.{DeltaSQLCommandTest, DeltaSQLTestUtils}
Expand All @@ -45,14 +46,20 @@ class TableRedirectSuite extends QueryTest
private def validateState(
deltaLog: DeltaLog,
redirectState: RedirectState,
destTablePath: File
destTablePath: File,
feature: TableRedirect
): Unit = {
val snapshot = deltaLog.update()
assert(RedirectReaderWriter.isFeatureSet(snapshot.metadata))
val redirectConfig = RedirectReaderWriter.getRedirectConfiguration(snapshot.metadata).get
assert(snapshot.protocol.supportsReaderFeatures && snapshot.protocol.supportsWriterFeatures)
assert(snapshot.protocol.readerFeatureNames.contains(RedirectReaderWriterFeature.name))
assert(snapshot.protocol.writerFeatureNames.contains(RedirectReaderWriterFeature.name))
assert(feature.isFeatureSet(snapshot.metadata))
val redirectConfig = feature.getRedirectConfiguration(snapshot.metadata).get
val protocol = snapshot.protocol
if (feature != RedirectWriterOnly) {
assert(protocol.readerFeatureNames.contains(RedirectReaderWriterFeature.name))
assert(protocol.writerFeatureNames.contains(RedirectReaderWriterFeature.name))
} else {
assert(!protocol.readerFeatureNames.contains(RedirectWriterOnlyFeature.name))
assert(protocol.writerFeatureNames.contains(RedirectWriterOnlyFeature.name))
}
assert(redirectConfig.redirectState == redirectState)
assert(redirectConfig.`type` == PathBasedRedirectSpec.REDIRECT_TYPE)
val expectedSpecValue = s"""{"tablePath":"${destTablePath.getCanonicalPath}"}"""
Expand All @@ -65,43 +72,51 @@ class TableRedirectSuite extends QueryTest
val snapshot = deltaLog.update()
val protocol = snapshot.protocol
assert(!feature.isFeatureSet(snapshot.metadata))
assert(protocol.supportsReaderFeatures && protocol.supportsWriterFeatures)
assert(protocol.readerFeatureNames.contains(RedirectReaderWriterFeature.name))
assert(protocol.writerFeatureNames.contains(RedirectReaderWriterFeature.name))
if (feature != RedirectWriterOnly) {
assert(protocol.readerFeatureNames.contains(RedirectReaderWriterFeature.name))
assert(protocol.writerFeatureNames.contains(RedirectReaderWriterFeature.name))
} else {
assert(!protocol.readerFeatureNames.contains(RedirectWriterOnlyFeature.name))
assert(protocol.writerFeatureNames.contains(RedirectWriterOnlyFeature.name))
}
}

test("basic table redirect") {
withTempDir { sourceTablePath =>
withTempDir { destTablePath =>
val feature = RedirectReaderWriter
sql(s"CREATE external TABLE t1(c0 long, c1 long) USING delta LOCATION '$sourceTablePath';")
val catalogTable = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t1"))
val deltaLog = DeltaLog.forTable(spark, new Path(sourceTablePath.getCanonicalPath))
assert(!feature.isFeatureSet(deltaLog.update().metadata))
val redirectSpec = new PathBasedRedirectSpec(destTablePath.getCanonicalPath)
val catalogTableOpt = Some(catalogTable)
val redirectType = PathBasedRedirectSpec.REDIRECT_TYPE
// Step-1: Initiate table redirection and set to EnableRedirectInProgress state.
feature.add(deltaLog, catalogTableOpt, redirectType, redirectSpec)
validateState(deltaLog, EnableRedirectInProgress, destTablePath)
// Step-2: Complete table redirection and set to RedirectReady state.
feature.update(deltaLog, catalogTableOpt, RedirectReady, redirectSpec)
validateState(deltaLog, RedirectReady, destTablePath)
// Step-3: Start dropping table redirection and set to DropRedirectInProgress state.
feature.update(deltaLog, catalogTableOpt, DropRedirectInProgress, redirectSpec)
validateState(deltaLog, DropRedirectInProgress, destTablePath)
// Step-4: Finish dropping table redirection and remove the property completely.
feature.remove(deltaLog, Some(catalogTable))
validateRemovedState(deltaLog, feature)
// Step-5: Initiate table redirection and set to EnableRedirectInProgress state one
// more time.
withTempDir { destTablePath2 =>
val redirectSpec = new PathBasedRedirectSpec(destTablePath2.getCanonicalPath)
feature.add(deltaLog, catalogTableOpt, redirectType, redirectSpec)
validateState(deltaLog, EnableRedirectInProgress, destTablePath2)
// Step-6: Finish dropping table redirection and remove the property completely.
feature.remove(deltaLog, Some(catalogTable))
validateRemovedState(deltaLog, feature)
Seq(RedirectReaderWriter, RedirectWriterOnly).foreach { feature =>
test(s"basic table redirect: ${feature.config.key}") {
withTempDir { sourceTablePath =>
withTempDir { destTablePath =>
withTable("t1") {
sql(s"CREATE external TABLE t1(c0 long)USING delta LOCATION '$sourceTablePath';")
val catalogTable = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t1"))
val deltaLog = DeltaLog.forTable(spark, new Path(sourceTablePath.getCanonicalPath))
val snapshot = deltaLog.update()
assert(!feature.isFeatureSet(snapshot.metadata))
val redirectSpec = new PathBasedRedirectSpec(destTablePath.getCanonicalPath)
val catalogTableOpt = Some(catalogTable)
val redirectType = PathBasedRedirectSpec.REDIRECT_TYPE
// Step-1: Initiate table redirection and set to EnableRedirectInProgress state.
feature.add(deltaLog, catalogTableOpt, redirectType, redirectSpec)
validateState(deltaLog, EnableRedirectInProgress, destTablePath, feature)
// Step-2: Complete table redirection and set to RedirectReady state.
feature.update(deltaLog, catalogTableOpt, RedirectReady, redirectSpec)
validateState(deltaLog, RedirectReady, destTablePath, feature)
// Step-3: Start dropping table redirection and set to DropRedirectInProgress state.
feature.update(deltaLog, catalogTableOpt, DropRedirectInProgress, redirectSpec)
validateState(deltaLog, DropRedirectInProgress, destTablePath, feature)
// Step-4: Finish dropping table redirection and remove the property completely.
feature.remove(deltaLog, catalogTableOpt)
validateRemovedState(deltaLog, feature)
// Step-5: Initiate table redirection and set to EnableRedirectInProgress state one
// more time.
withTempDir { destTablePath2 =>
val redirectSpec = new PathBasedRedirectSpec(destTablePath2.getCanonicalPath)
feature.add(deltaLog, catalogTableOpt, redirectType, redirectSpec)
validateState(deltaLog, EnableRedirectInProgress, destTablePath2, feature)
// Step-6: Finish dropping table redirection and remove the property completely.
feature.remove(deltaLog, catalogTableOpt)
validateRemovedState(deltaLog, feature)
}
}
}
}
}
Expand Down

0 comments on commit ab5a908

Please sign in to comment.