From 85db7e471388a372e9529ecd3ed49d44203a503b Mon Sep 17 00:00:00 2001 From: Jiaheng Tang Date: Thu, 24 Oct 2024 12:39:22 -0700 Subject: [PATCH] [Spark] Clean up log keys (#3802) #### Which Delta project/connector is this regarding? - [x] Spark - [ ] Standalone - [ ] Flink - [ ] Kernel - [ ] Other (fill in here) ## Description Rename a few log keys to make the intention more clear. Also remove unused log keys. ## How was this patch tested? Existing tests. ## Does this PR introduce _any_ user-facing changes? No --- .../FileMetadataMaterializationTracker.scala | 2 +- .../sql/delta/OptimisticTransaction.scala | 2 +- .../spark/sql/delta/SnapshotManagement.scala | 4 +-- .../delta/commands/CloneTableCommand.scala | 2 +- .../commands/ConvertToDeltaCommand.scala | 2 +- .../ReorgTableForUpgradeUniformHelper.scala | 2 +- .../sql/delta/commands/VacuumCommand.scala | 2 +- .../merge/MergeIntoMaterializeSource.scala | 6 ++--- .../sql/delta/logging/DeltaLogKeys.scala | 27 +++++-------------- .../delta/storage/DelegatingLogStore.scala | 2 +- .../delta/util/DeltaProgressReporter.scala | 2 +- 11 files changed, 20 insertions(+), 33 deletions(-) diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/FileMetadataMaterializationTracker.scala b/spark/src/main/scala/org/apache/spark/sql/delta/FileMetadataMaterializationTracker.scala index 318e7b90ece..0bc89f01920 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/FileMetadataMaterializationTracker.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/FileMetadataMaterializationTracker.scala @@ -208,7 +208,7 @@ object FileMetadataMaterializationTracker extends DeltaLogging { new FileMetadataMaterializationTracker() } else { logInfo(log"File metadata materialization tracking is disabled for this query." + - log" Please set ${MDC(DeltaLogKeys.CONFIG, + log" Please set ${MDC(DeltaLogKeys.CONFIG_KEY, DeltaSQLConf.DELTA_COMMAND_FILE_MATERIALIZATION_TRACKING_ENABLED.key)} " + log"to true to enable it.") noopTracker diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala b/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala index 0c39fa38874..4530bf43aaf 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala @@ -2468,7 +2468,7 @@ trait OptimisticTransactionImpl extends TransactionalWrite assert(mismatch.isEmpty, s"Expected ${mismatch.map(_._1).mkString(",")} but got ${mismatch.map(_._2).mkString(",")}") - val logPrefix = log"[attempt ${MDC(DeltaLogKeys.ATTEMPT, attemptNumber)}] " + val logPrefix = log"[attempt ${MDC(DeltaLogKeys.NUM_ATTEMPT, attemptNumber)}] " val txnDetailsLog = { var adds = 0L var removes = 0L diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/SnapshotManagement.scala b/spark/src/main/scala/org/apache/spark/sql/delta/SnapshotManagement.scala index 46f5d466c59..bdb5dfb589b 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/SnapshotManagement.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/SnapshotManagement.scala @@ -910,7 +910,7 @@ trait SnapshotManagement { self: DeltaLog => firstError = e } logWarning(log"Failed to create a snapshot from log segment " + - log"${MDC(DeltaLogKeys.SEGMENT, segment)}. Trying a different checkpoint.", e) + log"${MDC(DeltaLogKeys.LOG_SEGMENT, segment)}. Trying a different checkpoint.", e) segment = getLogSegmentWithMaxExclusiveCheckpointVersion( segment.version, segment.checkpointProvider.version, @@ -922,7 +922,7 @@ trait SnapshotManagement { self: DeltaLog => attempt += 1 case e: SparkException if firstError != null => logWarning(log"Failed to create a snapshot from log segment " + - log"${MDC(DeltaLogKeys.SEGMENT, segment)}", e) + log"${MDC(DeltaLogKeys.LOG_SEGMENT, segment)}", e) throw firstError } } diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/commands/CloneTableCommand.scala b/spark/src/main/scala/org/apache/spark/sql/delta/commands/CloneTableCommand.scala index 3c30318c707..71c04af87a6 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/commands/CloneTableCommand.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/commands/CloneTableCommand.scala @@ -92,7 +92,7 @@ case class CloneTableCommand( } /** Log clone command information */ - logInfo(log"Cloning ${MDC(DeltaLogKeys.TABLE_DESC, sourceTable.description)} to " + + logInfo(log"Cloning ${MDC(DeltaLogKeys.CLONE_SOURCE_DESC, sourceTable.description)} to " + log"${MDC(DeltaLogKeys.PATH, targetPath)}") // scalastyle:off deltahadoopconfiguration diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/commands/ConvertToDeltaCommand.scala b/spark/src/main/scala/org/apache/spark/sql/delta/commands/ConvertToDeltaCommand.scala index 541f7c55fa3..24ebf43e952 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/commands/ConvertToDeltaCommand.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/commands/ConvertToDeltaCommand.scala @@ -310,7 +310,7 @@ abstract class ConvertToDeltaCommandBase( numFiles += statsBatchSize performStatsCollection(spark, txn, adds) } else if (collectStats) { - logWarning(log"collectStats is set to true but ${MDC(DeltaLogKeys.CONFIG, + logWarning(log"collectStats is set to true but ${MDC(DeltaLogKeys.CONFIG_KEY, DeltaSQLConf.DELTA_COLLECT_STATS.key)}" + log" is false. Skip statistics collection") adds.toIterator diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/commands/ReorgTableForUpgradeUniformHelper.scala b/spark/src/main/scala/org/apache/spark/sql/delta/commands/ReorgTableForUpgradeUniformHelper.scala index 1ae994e1e21..099e950a6d0 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/commands/ReorgTableForUpgradeUniformHelper.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/commands/ReorgTableForUpgradeUniformHelper.scala @@ -114,7 +114,7 @@ trait ReorgTableForUpgradeUniformHelper extends DeltaLogging { log"at version ${MDC(DeltaLogKeys.VERSION, snapshot.version)}, there are " + log"${MDC(DeltaLogKeys.NUM_FILES, numOfAddFiles)} addFiles, and " + log"${MDC(DeltaLogKeys.NUM_FILES2, numOfAddFilesWithTag)} addFiles with " + - log"ICEBERG_COMPAT_VERSION=${MDC(DeltaLogKeys.TAG, icebergCompatVersion)} tag.") + log"ICEBERG_COMPAT_VERSION=${MDC(DeltaLogKeys.VERSION2, icebergCompatVersion)} tag.") (numOfAddFiles, numOfAddFilesWithTag) } diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/commands/VacuumCommand.scala b/spark/src/main/scala/org/apache/spark/sql/delta/commands/VacuumCommand.scala index 24ee3fdff2b..c9ea9a07ce0 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/commands/VacuumCommand.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/commands/VacuumCommand.scala @@ -238,7 +238,7 @@ object VacuumCommand extends VacuumCommandImpl with Serializable { case _ => snapshot.minFileRetentionTimestamp } logInfo(log"Starting garbage collection (dryRun = " + - log"${MDC(DeltaLogKeys.DRY_RUN, dryRun)}) of untracked " + + log"${MDC(DeltaLogKeys.IS_DRY_RUN, dryRun)}) of untracked " + log"files older than ${MDC(DeltaLogKeys.DATE, new Date(deleteBeforeTimestamp).toGMTString)} in " + log"${MDC(DeltaLogKeys.PATH, path)}") diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/commands/merge/MergeIntoMaterializeSource.scala b/spark/src/main/scala/org/apache/spark/sql/delta/commands/merge/MergeIntoMaterializeSource.scala index c72c4ee46cf..13ee66cd8e7 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/commands/merge/MergeIntoMaterializeSource.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/commands/merge/MergeIntoMaterializeSource.scala @@ -113,16 +113,16 @@ trait MergeIntoMaterializeSource extends DeltaLogging with DeltaSparkPlanUtils { handleExceptionDuringAttempt(ex, isLastAttempt, deltaLog) match { case RetryHandling.Retry => logInfo(log"Retrying MERGE with materialized source. Attempt " + - log"${MDC(DeltaLogKeys.ATTEMPT, attempt)} failed.") + log"${MDC(DeltaLogKeys.NUM_ATTEMPT, attempt)} failed.") doRetry = true attempt += 1 case RetryHandling.ExhaustedRetries => - logError(log"Exhausted retries after ${MDC(DeltaLogKeys.ATTEMPT, attempt)}" + + logError(log"Exhausted retries after ${MDC(DeltaLogKeys.NUM_ATTEMPT, attempt)}" + log" attempts in MERGE with materialized source. Logging latest exception.", ex) throw DeltaErrors.sourceMaterializationFailedRepeatedlyInMerge case RetryHandling.RethrowException => logError(log"Fatal error in MERGE with materialized source in " + - log"attempt ${MDC(DeltaLogKeys.ATTEMPT, attempt)}", ex) + log"attempt ${MDC(DeltaLogKeys.NUM_ATTEMPT, attempt)}", ex) throw ex } } finally { diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/logging/DeltaLogKeys.scala b/spark/src/main/scala/org/apache/spark/sql/delta/logging/DeltaLogKeys.scala index b9ff0c3e964..6eda3beb2e0 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/logging/DeltaLogKeys.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/logging/DeltaLogKeys.scala @@ -46,22 +46,17 @@ import org.apache.spark.internal.LogKeyShims */ trait DeltaLogKeysBase { case object APP_ID extends LogKeyShims - case object ATTEMPT extends LogKeyShims case object BATCH_ID extends LogKeyShims case object BATCH_SIZE extends LogKeyShims - case object CATALOG extends LogKeyShims - case object CLASS_NAME extends LogKeyShims - case object COLUMN_NAME extends LogKeyShims - case object COMPACTION_INFO_NEW extends LogKeyShims - case object COMPACTION_INFO_OLD extends LogKeyShims + case object CLONE_SOURCE_DESC extends LogKeyShims case object CONFIG extends LogKeyShims + case object CONFIG_KEY extends LogKeyShims case object COORDINATOR_CONF extends LogKeyShims case object COORDINATOR_NAME extends LogKeyShims case object COUNT extends LogKeyShims case object DATA_FILTER extends LogKeyShims case object DATE extends LogKeyShims case object DIR extends LogKeyShims - case object DRY_RUN extends LogKeyShims case object DURATION extends LogKeyShims case object END_INDEX extends LogKeyShims case object END_OFFSET extends LogKeyShims @@ -69,21 +64,19 @@ trait DeltaLogKeysBase { case object ERROR extends LogKeyShims case object EXECUTOR_ID extends LogKeyShims case object EXPR extends LogKeyShims - case object FILE_ACTION_NEW extends LogKeyShims - case object FILE_ACTION_OLD extends LogKeyShims case object FILE_INDEX extends LogKeyShims case object FILE_NAME extends LogKeyShims case object FILE_STATUS extends LogKeyShims case object FILE_SYSTEM_SCHEME extends LogKeyShims case object FILTER extends LogKeyShims case object HOOK_NAME extends LogKeyShims - case object INDEX extends LogKeyShims case object ISOLATION_LEVEL extends LogKeyShims + case object IS_DRY_RUN extends LogKeyShims case object IS_INIT_SNAPSHOT extends LogKeyShims case object IS_PATH_TABLE extends LogKeyShims case object JOB_ID extends LogKeyShims + case object LOG_SEGMENT extends LogKeyShims case object MAX_SIZE extends LogKeyShims - case object MESSAGE extends LogKeyShims case object METADATA_ID extends LogKeyShims case object METADATA_NEW extends LogKeyShims case object METADATA_OLD extends LogKeyShims @@ -91,6 +84,7 @@ trait DeltaLogKeysBase { case object MIN_SIZE extends LogKeyShims case object NUM_ACTIONS extends LogKeyShims case object NUM_ACTIONS2 extends LogKeyShims + case object NUM_ATTEMPT extends LogKeyShims case object NUM_BYTES extends LogKeyShims case object NUM_DIRS extends LogKeyShims case object NUM_FILES extends LogKeyShims @@ -98,35 +92,28 @@ trait DeltaLogKeysBase { case object NUM_PARTITIONS extends LogKeyShims case object NUM_PREDICATES extends LogKeyShims case object NUM_RECORDS extends LogKeyShims - case object NUM_RECORDS_ACTUAL extends LogKeyShims - case object NUM_RECORDS_EXPECTED extends LogKeyShims case object NUM_SKIPPED extends LogKeyShims case object OFFSET extends LogKeyShims case object OPERATION extends LogKeyShims - case object OPERATION2 extends LogKeyShims case object OP_NAME extends LogKeyShims - case object OP_TYPE extends LogKeyShims case object PARTITION_FILTER extends LogKeyShims case object PATH extends LogKeyShims case object PATH2 extends LogKeyShims case object PATHS extends LogKeyShims case object PROTOCOL extends LogKeyShims case object QUERY_ID extends LogKeyShims - case object RDD_ID extends LogKeyShims case object SCHEMA extends LogKeyShims case object SCHEMA_DIFF extends LogKeyShims - case object SEGMENT extends LogKeyShims - case object SIZE extends LogKeyShims case object SNAPSHOT extends LogKeyShims case object START_INDEX extends LogKeyShims case object START_VERSION extends LogKeyShims case object STATS extends LogKeyShims case object STATUS extends LogKeyShims - case object TABLE_DESC extends LogKeyShims + case object STATUS_MESSAGE extends LogKeyShims + case object SYSTEM_CLASS_NAME extends LogKeyShims case object TABLE_FEATURES extends LogKeyShims case object TABLE_ID extends LogKeyShims case object TABLE_NAME extends LogKeyShims - case object TAG extends LogKeyShims case object TBL_PROPERTIES extends LogKeyShims case object THREAD_NAME extends LogKeyShims case object TIMESTAMP extends LogKeyShims diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/storage/DelegatingLogStore.scala b/spark/src/main/scala/org/apache/spark/sql/delta/storage/DelegatingLogStore.scala index 6a38bbf4b67..4d38fc165c8 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/storage/DelegatingLogStore.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/storage/DelegatingLogStore.scala @@ -74,7 +74,7 @@ class DelegatingLogStore(hadoopConf: Configuration) case lsa: LogStoreAdaptor => s"LogStoreAdapter(${lsa.logStoreImpl.getClass.getName})" case _ => logStore.getClass.getName } - logInfo(log"LogStore ${MDC(DeltaLogKeys.CLASS_NAME, actualLogStoreClassName)} " + + logInfo(log"LogStore ${MDC(DeltaLogKeys.SYSTEM_CLASS_NAME, actualLogStoreClassName)} " + log"is used for scheme ${MDC(DeltaLogKeys.FILE_SYSTEM_SCHEME, scheme)}") logStore diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/util/DeltaProgressReporter.scala b/spark/src/main/scala/org/apache/spark/sql/delta/util/DeltaProgressReporter.scala index b888feb5aa4..82632cfeb85 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/util/DeltaProgressReporter.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/util/DeltaProgressReporter.scala @@ -31,7 +31,7 @@ trait DeltaProgressReporter extends LoggingShims { defaultMessage: String, data: Map[String, Any] = Map.empty)(body: => T): T = { logInfo(log"${MDC(DeltaLogKeys.STATUS, statusCode)}: " + - log"${MDC(DeltaLogKeys.MESSAGE, defaultMessage)}") + log"${MDC(DeltaLogKeys.STATUS_MESSAGE, defaultMessage)}") val t = withJobDescription(defaultMessage)(body) logInfo(log"${MDC(DeltaLogKeys.STATUS, statusCode)}: Done") t