Skip to content

Commit

Permalink
[SPARK-47599][MLLIB] MLLib: Migrate logWarn with variables to structu…
Browse files Browse the repository at this point in the history
…red logging framework

### What changes were proposed in this pull request?
The pr aims to migrate `logWarn` in module `MLLib` with variables to `structured logging framework`.

### Why are the changes needed?
To enhance Apache Spark's logging system by implementing structured logging.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
- Pass GA.

### Was this patch authored or co-authored using generative AI tooling?
No.

Closes #46527 from panbingkun/SPARK-47599.

Authored-by: panbingkun <[email protected]>
Signed-off-by: Gengliang Wang <[email protected]>
  • Loading branch information
panbingkun authored and gengliangwang committed May 15, 2024
1 parent 42c1c8f commit 3ae78c4
Show file tree
Hide file tree
Showing 30 changed files with 164 additions and 108 deletions.
18 changes: 18 additions & 0 deletions common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ object LogKeys {
case object CHECKPOINT_TIME extends LogKey
case object CHECKSUM_FILE_NUM extends LogKey
case object CHOSEN_WATERMARK extends LogKey
case object CLASSIFIER extends LogKey
case object CLASS_LOADER extends LogKey
case object CLASS_NAME extends LogKey
case object CLASS_PATH extends LogKey
Expand Down Expand Up @@ -157,12 +158,14 @@ object LogKeys {
case object DEPRECATED_KEY extends LogKey
case object DESCRIPTION extends LogKey
case object DESIRED_NUM_PARTITIONS extends LogKey
case object DESIRED_TREE_DEPTH extends LogKey
case object DESTINATION_PATH extends LogKey
case object DFS_FILE extends LogKey
case object DIFF_DELTA extends LogKey
case object DIVISIBLE_CLUSTER_INDICES_SIZE extends LogKey
case object DRIVER_ID extends LogKey
case object DRIVER_LIBRARY_PATH_KEY extends LogKey
case object DRIVER_MEMORY_SIZE extends LogKey
case object DRIVER_STATE extends LogKey
case object DROPPED_PARTITIONS extends LogKey
case object DURATION extends LogKey
Expand Down Expand Up @@ -196,6 +199,7 @@ object LogKeys {
case object EXECUTOR_IDS extends LogKey
case object EXECUTOR_LAUNCH_COMMANDS extends LogKey
case object EXECUTOR_LAUNCH_COUNT extends LogKey
case object EXECUTOR_MEMORY_SIZE extends LogKey
case object EXECUTOR_RESOURCES extends LogKey
case object EXECUTOR_SHUFFLE_INFO extends LogKey
case object EXECUTOR_STATE extends LogKey
Expand All @@ -217,6 +221,7 @@ object LogKeys {
case object FALLBACK_VERSION extends LogKey
case object FEATURE_COLUMN extends LogKey
case object FEATURE_DIMENSION extends LogKey
case object FEATURE_NAME extends LogKey
case object FETCH_SIZE extends LogKey
case object FIELD_NAME extends LogKey
case object FILE_ABSOLUTE_PATH extends LogKey
Expand Down Expand Up @@ -309,6 +314,7 @@ object LogKeys {
case object LOADED_VERSION extends LogKey
case object LOAD_FACTOR extends LogKey
case object LOAD_TIME extends LogKey
case object LOCALE extends LogKey
case object LOCAL_SCRATCH_DIR extends LogKey
case object LOCATION extends LogKey
case object LOGICAL_PLAN_COLUMNS extends LogKey
Expand All @@ -332,8 +338,10 @@ object LogKeys {
case object MAX_LOG_NUM_POLICY extends LogKey
case object MAX_MEMORY_SIZE extends LogKey
case object MAX_METHOD_CODE_SIZE extends LogKey
case object MAX_NUM_BINS extends LogKey
case object MAX_NUM_CHUNKS extends LogKey
case object MAX_NUM_PARTITIONS extends LogKey
case object MAX_NUM_POSSIBLE_BINS extends LogKey
case object MAX_NUM_ROWS_IN_MEMORY_BUFFER extends LogKey
case object MAX_SIZE extends LogKey
case object MAX_SLOTS extends LogKey
Expand All @@ -342,6 +350,7 @@ object LogKeys {
case object MEMORY_CONSUMER extends LogKey
case object MEMORY_POOL_NAME extends LogKey
case object MEMORY_SIZE extends LogKey
case object MEMORY_THRESHOLD_SIZE extends LogKey
case object MERGE_DIR_NAME extends LogKey
case object MESSAGE extends LogKey
case object METADATA_DIRECTORY extends LogKey
Expand All @@ -351,6 +360,7 @@ object LogKeys {
case object METHOD_PARAMETER_TYPES extends LogKey
case object METRICS_JSON extends LogKey
case object METRIC_NAME extends LogKey
case object MINI_BATCH_FRACTION extends LogKey
case object MIN_COMPACTION_BATCH_ID extends LogKey
case object MIN_FREQUENT_PATTERN_COUNT extends LogKey
case object MIN_POINT_PER_CLUSTER extends LogKey
Expand Down Expand Up @@ -387,8 +397,10 @@ object LogKeys {
case object NUM_BYTES_TO_FREE extends LogKey
case object NUM_BYTES_TO_WARN extends LogKey
case object NUM_BYTES_USED extends LogKey
case object NUM_CATEGORIES extends LogKey
case object NUM_CHUNKS extends LogKey
case object NUM_CLASSES extends LogKey
case object NUM_COEFFICIENTS extends LogKey
case object NUM_COLUMNS extends LogKey
case object NUM_CONCURRENT_WRITER extends LogKey
case object NUM_CORES extends LogKey
Expand All @@ -410,12 +422,14 @@ object LogKeys {
case object NUM_FREQUENT_ITEMS extends LogKey
case object NUM_INDEX_FILES extends LogKey
case object NUM_ITERATIONS extends LogKey
case object NUM_LEADING_SINGULAR_VALUES extends LogKey
case object NUM_LEFT_PARTITION_VALUES extends LogKey
case object NUM_LOADED_ENTRIES extends LogKey
case object NUM_LOCAL_DIRS extends LogKey
case object NUM_LOCAL_FREQUENT_PATTERN extends LogKey
case object NUM_MERGER_LOCATIONS extends LogKey
case object NUM_META_FILES extends LogKey
case object NUM_NODES extends LogKey
case object NUM_PARTITIONS extends LogKey
case object NUM_PARTITIONS2 extends LogKey
case object NUM_PARTITION_VALUES extends LogKey
Expand All @@ -434,6 +448,7 @@ object LogKeys {
case object NUM_RESOURCE_SLOTS extends LogKey
case object NUM_RETRIES extends LogKey
case object NUM_RIGHT_PARTITION_VALUES extends LogKey
case object NUM_ROWS extends LogKey
case object NUM_SEQUENCES extends LogKey
case object NUM_SLOTS extends LogKey
case object NUM_SPILL_INFOS extends LogKey
Expand Down Expand Up @@ -487,6 +502,7 @@ object LogKeys {
case object POD_SHARED_SLOT_COUNT extends LogKey
case object POD_STATE extends LogKey
case object POD_TARGET_COUNT extends LogKey
case object POINT_OF_CENTER extends LogKey
case object POLICY extends LogKey
case object POOL_NAME extends LogKey
case object PORT extends LogKey
Expand Down Expand Up @@ -594,6 +610,7 @@ object LogKeys {
case object SHUFFLE_SERVICE_CONF_OVERLAY_URL extends LogKey
case object SHUFFLE_SERVICE_METRICS_NAMESPACE extends LogKey
case object SHUFFLE_SERVICE_NAME extends LogKey
case object SIGMAS_LENGTH extends LogKey
case object SIGNAL extends LogKey
case object SIZE extends LogKey
case object SLEEP_TIME extends LogKey
Expand Down Expand Up @@ -665,6 +682,7 @@ object LogKeys {
case object THREAD_POOL_KEEPALIVE_TIME extends LogKey
case object THREAD_POOL_SIZE extends LogKey
case object THREAD_POOL_WAIT_QUEUE_SIZE extends LogKey
case object THRESHOLD extends LogKey
case object TID extends LogKey
case object TIME extends LogKey
case object TIMEOUT extends LogKey
Expand Down
5 changes: 3 additions & 2 deletions mllib/src/main/scala/org/apache/spark/ml/Predictor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.ml

import org.apache.spark.annotation.Since
import org.apache.spark.internal.{LogKeys, MDC}
import org.apache.spark.ml.linalg.VectorUDT
import org.apache.spark.ml.param._
import org.apache.spark.ml.param.shared._
Expand Down Expand Up @@ -192,8 +193,8 @@ abstract class PredictionModel[FeaturesType, M <: PredictionModel[FeaturesType,
if ($(predictionCol).nonEmpty) {
transformImpl(dataset)
} else {
this.logWarning(s"$uid: Predictor.transform() does nothing" +
" because no output columns were set.")
logWarning(log"${MDC(LogKeys.UUID, uid)}: Predictor.transform() does nothing because " +
log"no output columns were set.")
dataset.toDF()
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.ml.classification

import org.apache.spark.annotation.Since
import org.apache.spark.internal.{LogKeys, MDC}
import org.apache.spark.ml.{PredictionModel, Predictor, PredictorParams}
import org.apache.spark.ml.linalg.{Vector, VectorUDT}
import org.apache.spark.ml.param.ParamMap
Expand Down Expand Up @@ -149,8 +150,8 @@ abstract class ClassificationModel[FeaturesType, M <: ClassificationModel[Featur
}

if (numColsOutput == 0) {
logWarning(s"$uid: ClassificationModel.transform() does nothing" +
" because no output columns were set.")
logWarning(log"${MDC(LogKeys.UUID, uid)}: ClassificationModel.transform() does nothing " +
log"because no output columns were set.")
}
outputData.toDF()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,8 +179,8 @@ class LinearSVC @Since("2.2.0") (
maxBlockSizeInMB)

if (dataset.storageLevel != StorageLevel.NONE) {
instr.logWarning(s"Input instances will be standardized, blockified to blocks, and " +
s"then cached during training. Be careful of double caching!")
instr.logWarning("Input instances will be standardized, blockified to blocks, and " +
"then cached during training. Be careful of double caching!")
}

val instances = dataset.select(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import org.apache.hadoop.fs.Path

import org.apache.spark.SparkException
import org.apache.spark.annotation.Since
import org.apache.spark.internal.{Logging, MDC}
import org.apache.spark.internal.{Logging, LogKeys, MDC}
import org.apache.spark.internal.LogKeys.{COUNT, RANGE}
import org.apache.spark.ml.feature._
import org.apache.spark.ml.impl.Utils
Expand Down Expand Up @@ -847,9 +847,11 @@ class LogisticRegression @Since("1.2.0") (
(_initialModel.interceptVector.size == numCoefficientSets) &&
(_initialModel.getFitIntercept == $(fitIntercept))
if (!modelIsValid) {
instr.logWarning(s"Initial coefficients will be ignored! Its dimensions " +
s"(${providedCoefs.numRows}, ${providedCoefs.numCols}) did not match the " +
s"expected size ($numCoefficientSets, $numFeatures)")
instr.logWarning(log"Initial coefficients will be ignored! Its dimensions " +
log"(${MDC(LogKeys.NUM_ROWS, providedCoefs.numRows)}}, " +
log"${MDC(LogKeys.NUM_COLUMNS, providedCoefs.numCols)}) did not match the " +
log"expected size (${MDC(LogKeys.NUM_COEFFICIENTS, numCoefficientSets)}, " +
log"${MDC(LogKeys.NUM_FEATURES, numFeatures)})")
}
modelIsValid
case None => false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import org.json4s.jackson.JsonMethods._

import org.apache.spark.SparkContext
import org.apache.spark.annotation.Since
import org.apache.spark.internal.{LogKeys, MDC}
import org.apache.spark.ml._
import org.apache.spark.ml.attribute._
import org.apache.spark.ml.linalg.{Vector, Vectors}
Expand Down Expand Up @@ -180,8 +181,8 @@ final class OneVsRestModel private[ml] (
val outputSchema = transformSchema(dataset.schema, logging = true)

if (getPredictionCol.isEmpty && getRawPredictionCol.isEmpty) {
logWarning(s"$uid: OneVsRestModel.transform() does nothing" +
" because no output columns were set.")
logWarning(log"${MDC(LogKeys.UUID, uid)}: OneVsRestModel.transform() does nothing " +
log"because no output columns were set.")
return dataset.toDF()
}

Expand Down Expand Up @@ -400,7 +401,8 @@ final class OneVsRest @Since("1.4.0") (
getClassifier match {
case _: HasWeightCol => true
case c =>
instr.logWarning(s"weightCol is ignored, as it is not supported by $c now.")
instr.logWarning(log"weightCol is ignored, as it is not supported by " +
log"${MDC(LogKeys.CLASSIFIER, c)} now.")
false
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.ml.classification

import org.apache.spark.annotation.Since
import org.apache.spark.internal.{LogKeys, MDC}
import org.apache.spark.ml.linalg.{DenseVector, Vector, VectorUDT}
import org.apache.spark.ml.param.ParamMap
import org.apache.spark.ml.param.shared._
Expand Down Expand Up @@ -154,8 +155,8 @@ abstract class ProbabilisticClassificationModel[
}

if (numColsOutput == 0) {
this.logWarning(s"$uid: ProbabilisticClassificationModel.transform() does nothing" +
" because no output columns were set.")
this.logWarning(log"${MDC(LogKeys.UUID, uid)}: ProbabilisticClassificationModel.transform()" +
log" does nothing because no output columns were set.")
}
outputData.toDF()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import org.apache.hadoop.fs.Path

import org.apache.spark.annotation.Since
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.internal.{LogKeys, MDC}
import org.apache.spark.ml.{Estimator, Model}
import org.apache.spark.ml.impl.Utils.{unpackUpperTriangular, EPSILON}
import org.apache.spark.ml.linalg._
Expand Down Expand Up @@ -142,8 +143,8 @@ class GaussianMixtureModel private[ml] (
}

if (numColsOutput == 0) {
this.logWarning(s"$uid: GaussianMixtureModel.transform() does nothing" +
" because no output columns were set.")
this.logWarning(log"${MDC(LogKeys.UUID, uid)}: GaussianMixtureModel.transform() does " +
log"nothing because no output columns were set.")
}
outputData.toDF()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -451,8 +451,8 @@ class KMeans @Since("1.5.0") (

private def trainWithBlock(dataset: Dataset[_], instr: Instrumentation) = {
if (dataset.storageLevel != StorageLevel.NONE) {
instr.logWarning(s"Input vectors will be blockified to blocks, and " +
s"then cached during training. Be careful of double caching!")
instr.logWarning("Input vectors will be blockified to blocks, and " +
"then cached during training. Be careful of double caching!")
}

val initStartTime = System.currentTimeMillis
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.ml.feature
import scala.collection.mutable.ArrayBuilder

import org.apache.spark.annotation.Since
import org.apache.spark.internal.{LogKeys, MDC}
import org.apache.spark.ml.Transformer
import org.apache.spark.ml.attribute._
import org.apache.spark.ml.linalg._
Expand Down Expand Up @@ -139,8 +140,9 @@ final class Binarizer @Since("1.4.0") (@Since("1.4.0") override val uid: String)
}.apply(col(colName))

case _: VectorUDT if td < 0 =>
this.logWarning(s"Binarization operations on sparse dataset with negative threshold " +
s"$td will build a dense output, so take care when applying to sparse input.")
logWarning(log"Binarization operations on sparse dataset with negative threshold " +
log"${MDC(LogKeys.THRESHOLD, td)} will build a dense output, so take care when " +
log"applying to sparse input.")
udf { vector: Vector =>
val values = Array.fill(vector.size)(1.0)
var nnz = vector.size
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.ml.feature
import java.util.Locale

import org.apache.spark.annotation.Since
import org.apache.spark.internal.{LogKeys, MDC}
import org.apache.spark.ml.Transformer
import org.apache.spark.ml.param._
import org.apache.spark.ml.param.shared.{HasInputCol, HasInputCols, HasOutputCol, HasOutputCols}
Expand Down Expand Up @@ -129,9 +130,9 @@ class StopWordsRemover @Since("1.5.0") (@Since("1.5.0") override val uid: String
if (Locale.getAvailableLocales.contains(Locale.getDefault)) {
Locale.getDefault
} else {
logWarning(s"Default locale set was [${Locale.getDefault.toString}]; however, it was " +
"not found in available locales in JVM, falling back to en_US locale. Set param `locale` " +
"in order to respect another locale.")
logWarning(log"Default locale set was [${MDC(LogKeys.LOCALE, Locale.getDefault)}]; " +
log"however, it was not found in available locales in JVM, falling back to en_US locale. " +
log"Set param `locale` in order to respect another locale.")
Locale.US
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import org.apache.hadoop.fs.Path

import org.apache.spark.SparkException
import org.apache.spark.annotation.Since
import org.apache.spark.internal.{LogKeys, MDC}
import org.apache.spark.ml.{Estimator, Model, Transformer}
import org.apache.spark.ml.attribute.{Attribute, NominalAttribute}
import org.apache.spark.ml.param._
Expand Down Expand Up @@ -431,8 +432,8 @@ class StringIndexerModel (
val labels = labelsArray(i)

if (!dataset.schema.fieldNames.contains(inputColName)) {
logWarning(s"Input column ${inputColName} does not exist during transformation. " +
"Skip StringIndexerModel for this column.")
logWarning(log"Input column ${MDC(LogKeys.COLUMN_NAME, inputColName)} does not exist " +
log"during transformation. Skip StringIndexerModel for this column.")
outputColNames(i) = null
} else {
val filteredLabels = getHandleInvalid match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,13 +124,13 @@ private[ml] class WeightedLeastSquares(
if (rawBStd == 0) {
if (fitIntercept || rawBBar == 0.0) {
if (rawBBar == 0.0) {
instr.logWarning(s"Mean and standard deviation of the label are zero, so the " +
s"coefficients and the intercept will all be zero; as a result, training is not " +
s"needed.")
instr.logWarning("Mean and standard deviation of the label are zero, so the " +
"coefficients and the intercept will all be zero; as a result, training is not " +
"needed.")
} else {
instr.logWarning(s"The standard deviation of the label is zero, so the coefficients " +
s"will be zeros and the intercept will be the mean of the label; as a result, " +
s"training is not needed.")
instr.logWarning("The standard deviation of the label is zero, so the coefficients " +
"will be zeros and the intercept will be the mean of the label; as a result, " +
"training is not needed.")
}
val coefficients = new DenseVector(Array.ofDim(numFeatures))
val intercept = rawBBar
Expand All @@ -139,8 +139,8 @@ private[ml] class WeightedLeastSquares(
} else {
require(!(regParam > 0.0 && standardizeLabel), "The standard deviation of the label is " +
"zero. Model cannot be regularized when labels are standardized.")
instr.logWarning(s"The standard deviation of the label is zero. Consider setting " +
s"fitIntercept=true.")
instr.logWarning("The standard deviation of the label is zero. Consider setting " +
"fitIntercept=true.")
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ import org.json4s.JsonDSL._

import org.apache.spark.{Partitioner, SparkException}
import org.apache.spark.annotation.Since
import org.apache.spark.internal.Logging
import org.apache.spark.internal.{Logging, MDC}
import org.apache.spark.internal.LogKeys.PATH
import org.apache.spark.ml.{Estimator, Model}
import org.apache.spark.ml.linalg.BLAS
import org.apache.spark.ml.param._
Expand Down Expand Up @@ -1027,7 +1028,7 @@ object ALS extends DefaultParamsReadable[ALS] with Logging {
checkpointFile.getFileSystem(sc.hadoopConfiguration).delete(checkpointFile, true)
} catch {
case e: IOException =>
logWarning(s"Cannot delete checkpoint file $file:", e)
logWarning(log"Cannot delete checkpoint file ${MDC(PATH, file)}:", e)
}
}

Expand Down
Loading

0 comments on commit 3ae78c4

Please sign in to comment.