diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala b/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala index 0c39fa3887..876e5a73c5 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala @@ -36,6 +36,7 @@ import org.apache.spark.sql.delta.commands.cdc.CDCReader import org.apache.spark.sql.delta.coordinatedcommits.{CoordinatedCommitsUtils, TableCommitCoordinatorClient} import org.apache.spark.sql.delta.files._ import org.apache.spark.sql.delta.hooks.{CheckpointHook, GenerateSymlinkManifest, HudiConverterHook, IcebergConverterHook, PostCommitHook, UpdateCatalogFactory} +import org.apache.spark.sql.delta.hooks.ChecksumHook import org.apache.spark.sql.delta.implicits.addFileEncoder import org.apache.spark.sql.delta.logging.DeltaLogKeys import org.apache.spark.sql.delta.metering.DeltaLogging @@ -358,6 +359,7 @@ trait OptimisticTransactionImpl extends TransactionalWrite } protected val postCommitHooks = new ArrayBuffer[PostCommitHook]() + registerPostCommitHook(ChecksumHook) catalogTable.foreach { ct => registerPostCommitHook(UpdateCatalogFactory.getUpdateCatalogHook(ct, spark)) } diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/hooks/ChecksumHook.scala b/spark/src/main/scala/org/apache/spark/sql/delta/hooks/ChecksumHook.scala new file mode 100644 index 0000000000..d8230e7447 --- /dev/null +++ b/spark/src/main/scala/org/apache/spark/sql/delta/hooks/ChecksumHook.scala @@ -0,0 +1,63 @@ +/* + * Copyright (2021) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.delta.hooks + +// scalastyle:off import.ordering.noEmptyLine +import org.apache.spark.sql.delta.{DeltaLog, OptimisticTransactionImpl, RecordChecksum, Snapshot} +import org.apache.spark.sql.delta.actions.Action +import org.apache.spark.sql.delta.logging.DeltaLogKeys +import org.apache.spark.sql.delta.metering.DeltaLogging + +import org.apache.spark.internal.MDC +import org.apache.spark.sql.SparkSession + +/** Write a new checksum at the version committed by the txn if possible. */ +object ChecksumHook extends PostCommitHook with DeltaLogging { + // Helper that creates a RecordChecksum and uses it to write a checksum file + case class WriteChecksum( + override val spark: SparkSession, + override val deltaLog: DeltaLog, + txnId: String, + snapshot: Snapshot) extends RecordChecksum { + writeChecksumFile(txnId, snapshot) + } + + override val name: String = "Post commit checksum trigger" + + override def run( + spark: SparkSession, + txn: OptimisticTransactionImpl, + committedVersion: Long, + postCommitSnapshot: Snapshot, + committedActions: Seq[Action]): Unit = { + // Only write the checksum if the postCommitSnapshot matches the version that was committed. + if (postCommitSnapshot.version != committedVersion) return + logInfo( + log"Writing checksum file for table path ${MDC(DeltaLogKeys.PATH, txn.deltaLog.logPath)} " + + log"version ${MDC(DeltaLogKeys.VERSION, committedVersion)}") + + writeChecksum(spark, txn, postCommitSnapshot) + } + + private def writeChecksum( + spark: SparkSession, + txn: OptimisticTransactionImpl, + postCommitSnapshot: Snapshot): Unit = { + WriteChecksum(spark, txn.deltaLog, txn.txnId, postCommitSnapshot) + } + +} diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala b/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala index 58cff3af72..4257c8b8dd 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala @@ -1040,7 +1040,7 @@ trait DeltaSQLConfBase { buildConf("writeChecksumFile.enabled") .doc("Whether the checksum file can be written.") .booleanConf - .createWithDefault(true) + .createWithDefault(false) val DELTA_CHECKPOINT_THROW_EXCEPTION_WHEN_FAILED = buildConf("checkpoint.exceptionThrowing.enabled") diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/ChecksumSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/ChecksumSuite.scala new file mode 100644 index 0000000000..148913121c --- /dev/null +++ b/spark/src/test/scala/org/apache/spark/sql/delta/ChecksumSuite.scala @@ -0,0 +1,63 @@ +/* + * Copyright (2021) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.delta + +import java.io.File + +import org.apache.spark.sql.delta.DeltaTestUtils._ +import org.apache.spark.sql.delta.sources.DeltaSQLConf +import org.apache.spark.sql.delta.test.DeltaSQLCommandTest +import org.apache.spark.sql.delta.test.DeltaTestImplicits._ +import org.apache.spark.sql.delta.util.FileNames +import org.apache.hadoop.fs.Path + +import org.apache.spark.sql.QueryTest +import org.apache.spark.sql.test.SharedSparkSession + +class ChecksumSuite + extends QueryTest + with SharedSparkSession + with DeltaSQLCommandTest { + + test(s"A Checksum should be written after every commit when " + + s"${DeltaSQLConf.DELTA_WRITE_CHECKSUM_ENABLED.key} is true") { + def testChecksumFile(writeChecksumEnabled: Boolean): Unit = { + withTempDir { tempDir => + withSQLConf( + DeltaSQLConf.DELTA_WRITE_CHECKSUM_ENABLED.key -> writeChecksumEnabled.toString) { + def checksumExists(deltaLog: DeltaLog, version: Long): Boolean = { + val checksumFile = new File(FileNames.checksumFile(deltaLog.logPath, version).toUri) + checksumFile.exists() + } + + // Setup the log + val log = DeltaLog.forTable(spark, new Path(tempDir.getCanonicalPath)) + val initialTxn = log.startTransaction() + initialTxn.commitManually(createTestAddFile()) + + // Commit the txn + val txn = log.startTransaction() + val txnCommitVersion = txn.commit(Seq.empty, DeltaOperations.Truncate()) + assert(checksumExists(log, txnCommitVersion) == writeChecksumEnabled) + } + } + } + + testChecksumFile(writeChecksumEnabled = true) + testChecksumFile(writeChecksumEnabled = false) + } +}