From f7440f3841918f2cdb4a8e710cfe31d3fc85230c Mon Sep 17 00:00:00 2001 From: Haejoon Lee Date: Tue, 16 Apr 2024 13:56:03 -0700 Subject: [PATCH] [SPARK-47590][SQL] Hive-thriftserver: Migrate logWarn with variables to structured logging framework ### What changes were proposed in this pull request? This PR proposes to migrate `logWarning` with variables of Hive-thriftserver module to structured logging framework. ### Why are the changes needed? To improve the existing logging system by migrating into structured logging. ### Does this PR introduce _any_ user-facing change? No API changes, but the SQL catalyst logs will contain MDC(Mapped Diagnostic Context) from now. ### How was this patch tested? Run Scala auto formatting and style check. Also the existing CI should pass. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #45923 from itholic/hive-ts-logwarn. Lead-authored-by: Haejoon Lee Co-authored-by: Haejoon Lee <44108233+itholic@users.noreply.github.com> Signed-off-by: Gengliang Wang --- .../org/apache/spark/internal/LogKey.scala | 1 + .../SparkExecuteStatementOperation.scala | 4 ++- .../hive/thriftserver/SparkSQLCLIDriver.scala | 15 ++++---- .../ui/HiveThriftServer2Listener.scala | 36 +++++++++++++------ 4 files changed, 38 insertions(+), 18 deletions(-) diff --git a/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala index 41289c6414242..bfeb733af30a4 100644 --- a/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala +++ b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala @@ -94,6 +94,7 @@ object LogKey extends Enumeration { val FUNCTION_PARAMETER = Value val GROUP_ID = Value val HADOOP_VERSION = Value + val HISTORY_DIR = Value val HIVE_OPERATION_STATE = Value val HIVE_OPERATION_TYPE = Value val HOST = Value diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala index 628925007f7ed..f8f58cd422b67 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala @@ -256,7 +256,9 @@ private[hive] class SparkExecuteStatementOperation( val currentState = getStatus().getState() if (currentState.isTerminal) { // This may happen if the execution was cancelled, and then closed from another thread. - logWarning(s"Ignore exception in terminal state with $statementId: $e") + logWarning( + log"Ignore exception in terminal state with ${MDC(STATEMENT_ID, statementId)}", e + ) } else { logError(log"Error executing query with ${MDC(STATEMENT_ID, statementId)}, " + log"currentState ${MDC(HIVE_OPERATION_STATE, currentState)}, ", e) diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala index 03d8fd0c8ff2a..888c086e90422 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala @@ -41,7 +41,7 @@ import sun.misc.{Signal, SignalHandler} import org.apache.spark.{ErrorMessageFormat, SparkConf, SparkThrowable, SparkThrowableHelper} import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.internal.{Logging, MDC} -import org.apache.spark.internal.LogKey.ERROR +import org.apache.spark.internal.LogKey._ import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.analysis.FunctionRegistry import org.apache.spark.sql.catalyst.util.SQLKeywordUtils @@ -232,14 +232,14 @@ private[hive] object SparkSQLCLIDriver extends Logging { val historyFile = historyDirectory + File.separator + ".hivehistory" reader.setHistory(new FileHistory(new File(historyFile))) } else { - logWarning("WARNING: Directory for Hive history file: " + historyDirectory + - " does not exist. History will not be available during this session.") + logWarning( + log"Directory for Hive history file: ${MDC(HISTORY_DIR, historyDirectory)}" + + log" does not exist. History will not be available during this session.") } } catch { case e: Exception => - logWarning("WARNING: Encountered an error while trying to initialize Hive's " + - "history file. History will not be available during this session.") - logWarning(e.getMessage) + logWarning("Encountered an error while trying to initialize Hive's " + + "history file. History will not be available during this session.", e) } // add shutdown hook to flush the history to history file @@ -250,7 +250,8 @@ private[hive] object SparkSQLCLIDriver extends Logging { h.flush() } catch { case e: IOException => - logWarning("WARNING: Failed to write command history file: " + e.getMessage) + logWarning( + log"Failed to write command history file: ${MDC(ERROR, e.getMessage)}") } case _ => } diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2Listener.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2Listener.scala index 31f30f3d97ea4..8b7e9b00cb52b 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2Listener.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2Listener.scala @@ -25,7 +25,8 @@ import scala.jdk.CollectionConverters._ import org.apache.hive.service.server.HiveServer2 import org.apache.spark.{SparkConf, SparkContext} -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey._ import org.apache.spark.internal.config.Status.LIVE_ENTITY_UPDATE_PERIOD import org.apache.spark.scheduler._ import org.apache.spark.sql.hive.thriftserver.HiveThriftServer2.ExecutionState @@ -140,7 +141,9 @@ private[thriftserver] class HiveThriftServer2Listener( sessionData.finishTimestamp = e.finishTime updateStoreWithTriggerEnabled(sessionData) sessionList.remove(e.sessionId) - case None => logWarning(s"onSessionClosed called with unknown session id: ${e.sessionId}") + case None => logWarning( + log"onSessionClosed called with unknown session id: ${MDC(SESSION_ID, e.sessionId)}" + ) } private def onOperationStart(e: SparkListenerThriftServerOperationStart): Unit = { @@ -160,8 +163,9 @@ private[thriftserver] class HiveThriftServer2Listener( case Some(sessionData) => sessionData.totalExecution += 1 updateLiveStore(sessionData) - case None => logWarning(s"onOperationStart called with unknown session id: ${e.sessionId}." + - s"Regardless, the operation has been registered.") + case None => logWarning( + log"onOperationStart called with unknown session id: ${MDC(SESSION_ID, e.sessionId)}." + + log"Regardless, the operation has been registered.") } } @@ -171,7 +175,9 @@ private[thriftserver] class HiveThriftServer2Listener( executionData.executePlan = e.executionPlan executionData.state = ExecutionState.COMPILED updateLiveStore(executionData) - case None => logWarning(s"onOperationParsed called with unknown operation id: ${e.id}") + case None => logWarning( + log"onOperationParsed called with unknown operation id: ${MDC(STATEMENT_ID, e.id)}" + ) } private def onOperationCanceled(e: SparkListenerThriftServerOperationCanceled): Unit = @@ -180,7 +186,9 @@ private[thriftserver] class HiveThriftServer2Listener( executionData.finishTimestamp = e.finishTime executionData.state = ExecutionState.CANCELED updateLiveStore(executionData) - case None => logWarning(s"onOperationCanceled called with unknown operation id: ${e.id}") + case None => logWarning( + log"onOperationCanceled called with unknown operation id: ${MDC(STATEMENT_ID, e.id)}" + ) } private def onOperationTimeout(e: SparkListenerThriftServerOperationTimeout): Unit = @@ -189,7 +197,9 @@ private[thriftserver] class HiveThriftServer2Listener( executionData.finishTimestamp = e.finishTime executionData.state = ExecutionState.TIMEDOUT updateLiveStore(executionData) - case None => logWarning(s"onOperationCanceled called with unknown operation id: ${e.id}") + case None => logWarning( + log"onOperationCanceled called with unknown operation id: ${MDC(STATEMENT_ID, e.id)}" + ) } private def onOperationError(e: SparkListenerThriftServerOperationError): Unit = @@ -199,7 +209,9 @@ private[thriftserver] class HiveThriftServer2Listener( executionData.detail = e.errorMsg executionData.state = ExecutionState.FAILED updateLiveStore(executionData) - case None => logWarning(s"onOperationError called with unknown operation id: ${e.id}") + case None => logWarning( + log"onOperationError called with unknown operation id: ${MDC(STATEMENT_ID, e.id)}" + ) } private def onOperationFinished(e: SparkListenerThriftServerOperationFinish): Unit = @@ -208,7 +220,9 @@ private[thriftserver] class HiveThriftServer2Listener( executionData.finishTimestamp = e.finishTime executionData.state = ExecutionState.FINISHED updateLiveStore(executionData) - case None => logWarning(s"onOperationFinished called with unknown operation id: ${e.id}") + case None => logWarning( + log"onOperationFinished called with unknown operation id: ${MDC(STATEMENT_ID, e.id)}" + ) } private def onOperationClosed(e: SparkListenerThriftServerOperationClosed): Unit = @@ -218,7 +232,9 @@ private[thriftserver] class HiveThriftServer2Listener( executionData.state = ExecutionState.CLOSED updateStoreWithTriggerEnabled(executionData) executionList.remove(e.id) - case None => logWarning(s"onOperationClosed called with unknown operation id: ${e.id}") + case None => logWarning( + log"onOperationClosed called with unknown operation id: ${MDC(STATEMENT_ID, e.id)}" + ) } // Update both live and history stores. Trigger is enabled by default, hence