Skip to content

Commit

Permalink
[Spark] Clean up log keys (#3802)
Browse files Browse the repository at this point in the history
<!--
Thanks for sending a pull request!  Here are some tips for you:
1. If this is your first time, please read our contributor guidelines:
https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md
2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP]
Your PR title ...'.
  3. Be sure to keep the PR description updated to reflect all changes.
  4. Please write your PR title to summarize what this PR proposes.
5. If possible, provide a concise example to reproduce the issue for a
faster review.
6. If applicable, include the corresponding issue number in the PR title
and link it in the body.
-->

#### Which Delta project/connector is this regarding?
<!--
Please add the component selected below to the beginning of the pull
request title
For example: [Spark] Title of my pull request
-->

- [x] Spark
- [ ] Standalone
- [ ] Flink
- [ ] Kernel
- [ ] Other (fill in here)

## Description

<!--
- Describe what this PR changes.
- Describe why we need the change.
 
If this PR resolves an issue be sure to include "Resolves #XXX" to
correctly link and close the issue upon merge.
-->
Rename a few log keys to make the intention more clear. Also remove
unused log keys.
## How was this patch tested?

<!--
If tests were added, say they were added here. Please make sure to test
the changes thoroughly including negative and positive cases if
possible.
If the changes were tested in any way other than unit tests, please
clarify how you tested step by step (ideally copy and paste-able, so
that other reviewers can test and check, and descendants can verify in
the future).
If the changes were not tested, please explain why.
-->
Existing tests.
## Does this PR introduce _any_ user-facing changes?

<!--
If yes, please clarify the previous behavior and the change this PR
proposes - provide the console output, description and/or an example to
show the behavior difference if possible.
If possible, please also clarify if this is a user-facing change
compared to the released Delta Lake versions or within the unreleased
branches such as master.
If no, write 'No'.
-->
No
  • Loading branch information
zedtang authored Oct 24, 2024
1 parent ebdb35b commit 85db7e4
Show file tree
Hide file tree
Showing 11 changed files with 20 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ object FileMetadataMaterializationTracker extends DeltaLogging {
new FileMetadataMaterializationTracker()
} else {
logInfo(log"File metadata materialization tracking is disabled for this query." +
log" Please set ${MDC(DeltaLogKeys.CONFIG,
log" Please set ${MDC(DeltaLogKeys.CONFIG_KEY,
DeltaSQLConf.DELTA_COMMAND_FILE_MATERIALIZATION_TRACKING_ENABLED.key)} " +
log"to true to enable it.")
noopTracker
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2468,7 +2468,7 @@ trait OptimisticTransactionImpl extends TransactionalWrite
assert(mismatch.isEmpty,
s"Expected ${mismatch.map(_._1).mkString(",")} but got ${mismatch.map(_._2).mkString(",")}")

val logPrefix = log"[attempt ${MDC(DeltaLogKeys.ATTEMPT, attemptNumber)}] "
val logPrefix = log"[attempt ${MDC(DeltaLogKeys.NUM_ATTEMPT, attemptNumber)}] "
val txnDetailsLog = {
var adds = 0L
var removes = 0L
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -910,7 +910,7 @@ trait SnapshotManagement { self: DeltaLog =>
firstError = e
}
logWarning(log"Failed to create a snapshot from log segment " +
log"${MDC(DeltaLogKeys.SEGMENT, segment)}. Trying a different checkpoint.", e)
log"${MDC(DeltaLogKeys.LOG_SEGMENT, segment)}. Trying a different checkpoint.", e)
segment = getLogSegmentWithMaxExclusiveCheckpointVersion(
segment.version,
segment.checkpointProvider.version,
Expand All @@ -922,7 +922,7 @@ trait SnapshotManagement { self: DeltaLog =>
attempt += 1
case e: SparkException if firstError != null =>
logWarning(log"Failed to create a snapshot from log segment " +
log"${MDC(DeltaLogKeys.SEGMENT, segment)}", e)
log"${MDC(DeltaLogKeys.LOG_SEGMENT, segment)}", e)
throw firstError
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ case class CloneTableCommand(
}

/** Log clone command information */
logInfo(log"Cloning ${MDC(DeltaLogKeys.TABLE_DESC, sourceTable.description)} to " +
logInfo(log"Cloning ${MDC(DeltaLogKeys.CLONE_SOURCE_DESC, sourceTable.description)} to " +
log"${MDC(DeltaLogKeys.PATH, targetPath)}")

// scalastyle:off deltahadoopconfiguration
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ abstract class ConvertToDeltaCommandBase(
numFiles += statsBatchSize
performStatsCollection(spark, txn, adds)
} else if (collectStats) {
logWarning(log"collectStats is set to true but ${MDC(DeltaLogKeys.CONFIG,
logWarning(log"collectStats is set to true but ${MDC(DeltaLogKeys.CONFIG_KEY,
DeltaSQLConf.DELTA_COLLECT_STATS.key)}" +
log" is false. Skip statistics collection")
adds.toIterator
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ trait ReorgTableForUpgradeUniformHelper extends DeltaLogging {
log"at version ${MDC(DeltaLogKeys.VERSION, snapshot.version)}, there are " +
log"${MDC(DeltaLogKeys.NUM_FILES, numOfAddFiles)} addFiles, and " +
log"${MDC(DeltaLogKeys.NUM_FILES2, numOfAddFilesWithTag)} addFiles with " +
log"ICEBERG_COMPAT_VERSION=${MDC(DeltaLogKeys.TAG, icebergCompatVersion)} tag.")
log"ICEBERG_COMPAT_VERSION=${MDC(DeltaLogKeys.VERSION2, icebergCompatVersion)} tag.")
(numOfAddFiles, numOfAddFilesWithTag)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ object VacuumCommand extends VacuumCommandImpl with Serializable {
case _ => snapshot.minFileRetentionTimestamp
}
logInfo(log"Starting garbage collection (dryRun = " +
log"${MDC(DeltaLogKeys.DRY_RUN, dryRun)}) of untracked " +
log"${MDC(DeltaLogKeys.IS_DRY_RUN, dryRun)}) of untracked " +
log"files older than ${MDC(DeltaLogKeys.DATE,
new Date(deleteBeforeTimestamp).toGMTString)} in " +
log"${MDC(DeltaLogKeys.PATH, path)}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,16 +113,16 @@ trait MergeIntoMaterializeSource extends DeltaLogging with DeltaSparkPlanUtils {
handleExceptionDuringAttempt(ex, isLastAttempt, deltaLog) match {
case RetryHandling.Retry =>
logInfo(log"Retrying MERGE with materialized source. Attempt " +
log"${MDC(DeltaLogKeys.ATTEMPT, attempt)} failed.")
log"${MDC(DeltaLogKeys.NUM_ATTEMPT, attempt)} failed.")
doRetry = true
attempt += 1
case RetryHandling.ExhaustedRetries =>
logError(log"Exhausted retries after ${MDC(DeltaLogKeys.ATTEMPT, attempt)}" +
logError(log"Exhausted retries after ${MDC(DeltaLogKeys.NUM_ATTEMPT, attempt)}" +
log" attempts in MERGE with materialized source. Logging latest exception.", ex)
throw DeltaErrors.sourceMaterializationFailedRepeatedlyInMerge
case RetryHandling.RethrowException =>
logError(log"Fatal error in MERGE with materialized source in " +
log"attempt ${MDC(DeltaLogKeys.ATTEMPT, attempt)}", ex)
log"attempt ${MDC(DeltaLogKeys.NUM_ATTEMPT, attempt)}", ex)
throw ex
}
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,87 +46,74 @@ import org.apache.spark.internal.LogKeyShims
*/
trait DeltaLogKeysBase {
case object APP_ID extends LogKeyShims
case object ATTEMPT extends LogKeyShims
case object BATCH_ID extends LogKeyShims
case object BATCH_SIZE extends LogKeyShims
case object CATALOG extends LogKeyShims
case object CLASS_NAME extends LogKeyShims
case object COLUMN_NAME extends LogKeyShims
case object COMPACTION_INFO_NEW extends LogKeyShims
case object COMPACTION_INFO_OLD extends LogKeyShims
case object CLONE_SOURCE_DESC extends LogKeyShims
case object CONFIG extends LogKeyShims
case object CONFIG_KEY extends LogKeyShims
case object COORDINATOR_CONF extends LogKeyShims
case object COORDINATOR_NAME extends LogKeyShims
case object COUNT extends LogKeyShims
case object DATA_FILTER extends LogKeyShims
case object DATE extends LogKeyShims
case object DIR extends LogKeyShims
case object DRY_RUN extends LogKeyShims
case object DURATION extends LogKeyShims
case object END_INDEX extends LogKeyShims
case object END_OFFSET extends LogKeyShims
case object END_VERSION extends LogKeyShims
case object ERROR extends LogKeyShims
case object EXECUTOR_ID extends LogKeyShims
case object EXPR extends LogKeyShims
case object FILE_ACTION_NEW extends LogKeyShims
case object FILE_ACTION_OLD extends LogKeyShims
case object FILE_INDEX extends LogKeyShims
case object FILE_NAME extends LogKeyShims
case object FILE_STATUS extends LogKeyShims
case object FILE_SYSTEM_SCHEME extends LogKeyShims
case object FILTER extends LogKeyShims
case object HOOK_NAME extends LogKeyShims
case object INDEX extends LogKeyShims
case object ISOLATION_LEVEL extends LogKeyShims
case object IS_DRY_RUN extends LogKeyShims
case object IS_INIT_SNAPSHOT extends LogKeyShims
case object IS_PATH_TABLE extends LogKeyShims
case object JOB_ID extends LogKeyShims
case object LOG_SEGMENT extends LogKeyShims
case object MAX_SIZE extends LogKeyShims
case object MESSAGE extends LogKeyShims
case object METADATA_ID extends LogKeyShims
case object METADATA_NEW extends LogKeyShims
case object METADATA_OLD extends LogKeyShims
case object METRICS extends LogKeyShims
case object MIN_SIZE extends LogKeyShims
case object NUM_ACTIONS extends LogKeyShims
case object NUM_ACTIONS2 extends LogKeyShims
case object NUM_ATTEMPT extends LogKeyShims
case object NUM_BYTES extends LogKeyShims
case object NUM_DIRS extends LogKeyShims
case object NUM_FILES extends LogKeyShims
case object NUM_FILES2 extends LogKeyShims
case object NUM_PARTITIONS extends LogKeyShims
case object NUM_PREDICATES extends LogKeyShims
case object NUM_RECORDS extends LogKeyShims
case object NUM_RECORDS_ACTUAL extends LogKeyShims
case object NUM_RECORDS_EXPECTED extends LogKeyShims
case object NUM_SKIPPED extends LogKeyShims
case object OFFSET extends LogKeyShims
case object OPERATION extends LogKeyShims
case object OPERATION2 extends LogKeyShims
case object OP_NAME extends LogKeyShims
case object OP_TYPE extends LogKeyShims
case object PARTITION_FILTER extends LogKeyShims
case object PATH extends LogKeyShims
case object PATH2 extends LogKeyShims
case object PATHS extends LogKeyShims
case object PROTOCOL extends LogKeyShims
case object QUERY_ID extends LogKeyShims
case object RDD_ID extends LogKeyShims
case object SCHEMA extends LogKeyShims
case object SCHEMA_DIFF extends LogKeyShims
case object SEGMENT extends LogKeyShims
case object SIZE extends LogKeyShims
case object SNAPSHOT extends LogKeyShims
case object START_INDEX extends LogKeyShims
case object START_VERSION extends LogKeyShims
case object STATS extends LogKeyShims
case object STATUS extends LogKeyShims
case object TABLE_DESC extends LogKeyShims
case object STATUS_MESSAGE extends LogKeyShims
case object SYSTEM_CLASS_NAME extends LogKeyShims
case object TABLE_FEATURES extends LogKeyShims
case object TABLE_ID extends LogKeyShims
case object TABLE_NAME extends LogKeyShims
case object TAG extends LogKeyShims
case object TBL_PROPERTIES extends LogKeyShims
case object THREAD_NAME extends LogKeyShims
case object TIMESTAMP extends LogKeyShims
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ class DelegatingLogStore(hadoopConf: Configuration)
case lsa: LogStoreAdaptor => s"LogStoreAdapter(${lsa.logStoreImpl.getClass.getName})"
case _ => logStore.getClass.getName
}
logInfo(log"LogStore ${MDC(DeltaLogKeys.CLASS_NAME, actualLogStoreClassName)} " +
logInfo(log"LogStore ${MDC(DeltaLogKeys.SYSTEM_CLASS_NAME, actualLogStoreClassName)} " +
log"is used for scheme ${MDC(DeltaLogKeys.FILE_SYSTEM_SCHEME, scheme)}")

logStore
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ trait DeltaProgressReporter extends LoggingShims {
defaultMessage: String,
data: Map[String, Any] = Map.empty)(body: => T): T = {
logInfo(log"${MDC(DeltaLogKeys.STATUS, statusCode)}: " +
log"${MDC(DeltaLogKeys.MESSAGE, defaultMessage)}")
log"${MDC(DeltaLogKeys.STATUS_MESSAGE, defaultMessage)}")
val t = withJobDescription(defaultMessage)(body)
logInfo(log"${MDC(DeltaLogKeys.STATUS, statusCode)}: Done")
t
Expand Down

0 comments on commit 85db7e4

Please sign in to comment.