Skip to content

Commit

Permalink
Write a version checksum on every commit
Browse files Browse the repository at this point in the history
  • Loading branch information
dhruvarya-db committed Oct 24, 2024
1 parent e3dd5b0 commit aeca630
Show file tree
Hide file tree
Showing 4 changed files with 129 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import org.apache.spark.sql.delta.commands.cdc.CDCReader
import org.apache.spark.sql.delta.coordinatedcommits.{CoordinatedCommitsUtils, TableCommitCoordinatorClient}
import org.apache.spark.sql.delta.files._
import org.apache.spark.sql.delta.hooks.{CheckpointHook, GenerateSymlinkManifest, HudiConverterHook, IcebergConverterHook, PostCommitHook, UpdateCatalogFactory}
import org.apache.spark.sql.delta.hooks.ChecksumHook
import org.apache.spark.sql.delta.implicits.addFileEncoder
import org.apache.spark.sql.delta.logging.DeltaLogKeys
import org.apache.spark.sql.delta.metering.DeltaLogging
Expand Down Expand Up @@ -358,6 +359,7 @@ trait OptimisticTransactionImpl extends TransactionalWrite
}

protected val postCommitHooks = new ArrayBuffer[PostCommitHook]()
registerPostCommitHook(ChecksumHook)
catalogTable.foreach { ct =>
registerPostCommitHook(UpdateCatalogFactory.getUpdateCatalogHook(ct, spark))
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Copyright (2021) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.delta.hooks

// scalastyle:off import.ordering.noEmptyLine
import org.apache.spark.sql.delta.{DeltaLog, OptimisticTransactionImpl, RecordChecksum, Snapshot}
import org.apache.spark.sql.delta.actions.Action
import org.apache.spark.sql.delta.logging.DeltaLogKeys
import org.apache.spark.sql.delta.metering.DeltaLogging

import org.apache.spark.internal.MDC
import org.apache.spark.sql.SparkSession

/** Write a new checksum at the version committed by the txn if possible. */
object ChecksumHook extends PostCommitHook with DeltaLogging {
// Helper that creates a RecordChecksum and uses it to write a checksum file
case class WriteChecksum(
override val spark: SparkSession,
override val deltaLog: DeltaLog,
txnId: String,
snapshot: Snapshot) extends RecordChecksum {
writeChecksumFile(txnId, snapshot)
}

override val name: String = "Post commit checksum trigger"

override def run(
spark: SparkSession,
txn: OptimisticTransactionImpl,
committedVersion: Long,
postCommitSnapshot: Snapshot,
committedActions: Seq[Action]): Unit = {
// Only write the checksum if the postCommitSnapshot matches the version that was committed.
if (postCommitSnapshot.version != committedVersion) return
logInfo(
log"Writing checksum file for table path ${MDC(DeltaLogKeys.PATH, txn.deltaLog.logPath)} " +
log"version ${MDC(DeltaLogKeys.VERSION, committedVersion)}")

writeChecksum(spark, txn, postCommitSnapshot)
}

private def writeChecksum(
spark: SparkSession,
txn: OptimisticTransactionImpl,
postCommitSnapshot: Snapshot): Unit = {
WriteChecksum(spark, txn.deltaLog, txn.txnId, postCommitSnapshot)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -1040,7 +1040,7 @@ trait DeltaSQLConfBase {
buildConf("writeChecksumFile.enabled")
.doc("Whether the checksum file can be written.")
.booleanConf
.createWithDefault(true)
.createWithDefault(false)

val DELTA_CHECKPOINT_THROW_EXCEPTION_WHEN_FAILED =
buildConf("checkpoint.exceptionThrowing.enabled")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Copyright (2021) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.delta

import java.io.File

import org.apache.spark.sql.delta.DeltaTestUtils._
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.delta.test.DeltaSQLCommandTest
import org.apache.spark.sql.delta.test.DeltaTestImplicits._
import org.apache.spark.sql.delta.util.FileNames
import org.apache.hadoop.fs.Path

import org.apache.spark.sql.QueryTest
import org.apache.spark.sql.test.SharedSparkSession

class ChecksumSuite
extends QueryTest
with SharedSparkSession
with DeltaSQLCommandTest {

test(s"A Checksum should be written after every commit when " +
s"${DeltaSQLConf.DELTA_WRITE_CHECKSUM_ENABLED.key} is true") {
def testChecksumFile(writeChecksumEnabled: Boolean): Unit = {
withTempDir { tempDir =>
withSQLConf(
DeltaSQLConf.DELTA_WRITE_CHECKSUM_ENABLED.key -> writeChecksumEnabled.toString) {
def checksumExists(deltaLog: DeltaLog, version: Long): Boolean = {
val checksumFile = new File(FileNames.checksumFile(deltaLog.logPath, version).toUri)
checksumFile.exists()
}

// Setup the log
val log = DeltaLog.forTable(spark, new Path(tempDir.getCanonicalPath))
val initialTxn = log.startTransaction()
initialTxn.commitManually(createTestAddFile())

// Commit the txn
val txn = log.startTransaction()
val txnCommitVersion = txn.commit(Seq.empty, DeltaOperations.Truncate())
assert(checksumExists(log, txnCommitVersion) == writeChecksumEnabled)
}
}
}

testChecksumFile(writeChecksumEnabled = true)
testChecksumFile(writeChecksumEnabled = false)
}
}

0 comments on commit aeca630

Please sign in to comment.