Skip to content

Commit

Permalink
[SPARK-47590][SQL] Hive-thriftserver: Migrate logWarn with variables …
Browse files Browse the repository at this point in the history
…to structured logging framework

### What changes were proposed in this pull request?

This PR proposes to migrate `logWarning` with variables of Hive-thriftserver module to structured logging framework.

### Why are the changes needed?

To improve the existing logging system by migrating into structured logging.

### Does this PR introduce _any_ user-facing change?

No API changes, but the SQL catalyst logs will contain MDC(Mapped Diagnostic Context) from now.

### How was this patch tested?

Run Scala auto formatting and style check. Also the existing CI should pass.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#45923 from itholic/hive-ts-logwarn.

Lead-authored-by: Haejoon Lee <[email protected]>
Co-authored-by: Haejoon Lee <[email protected]>
Signed-off-by: Gengliang Wang <[email protected]>
  • Loading branch information
2 people authored and gengliangwang committed Apr 16, 2024
1 parent 6919feb commit f7440f3
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ object LogKey extends Enumeration {
val FUNCTION_PARAMETER = Value
val GROUP_ID = Value
val HADOOP_VERSION = Value
val HISTORY_DIR = Value
val HIVE_OPERATION_STATE = Value
val HIVE_OPERATION_TYPE = Value
val HOST = Value
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,9 @@ private[hive] class SparkExecuteStatementOperation(
val currentState = getStatus().getState()
if (currentState.isTerminal) {
// This may happen if the execution was cancelled, and then closed from another thread.
logWarning(s"Ignore exception in terminal state with $statementId: $e")
logWarning(
log"Ignore exception in terminal state with ${MDC(STATEMENT_ID, statementId)}", e
)
} else {
logError(log"Error executing query with ${MDC(STATEMENT_ID, statementId)}, " +
log"currentState ${MDC(HIVE_OPERATION_STATE, currentState)}, ", e)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ import sun.misc.{Signal, SignalHandler}
import org.apache.spark.{ErrorMessageFormat, SparkConf, SparkThrowable, SparkThrowableHelper}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.{Logging, MDC}
import org.apache.spark.internal.LogKey.ERROR
import org.apache.spark.internal.LogKey._
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.analysis.FunctionRegistry
import org.apache.spark.sql.catalyst.util.SQLKeywordUtils
Expand Down Expand Up @@ -232,14 +232,14 @@ private[hive] object SparkSQLCLIDriver extends Logging {
val historyFile = historyDirectory + File.separator + ".hivehistory"
reader.setHistory(new FileHistory(new File(historyFile)))
} else {
logWarning("WARNING: Directory for Hive history file: " + historyDirectory +
" does not exist. History will not be available during this session.")
logWarning(
log"Directory for Hive history file: ${MDC(HISTORY_DIR, historyDirectory)}" +
log" does not exist. History will not be available during this session.")
}
} catch {
case e: Exception =>
logWarning("WARNING: Encountered an error while trying to initialize Hive's " +
"history file. History will not be available during this session.")
logWarning(e.getMessage)
logWarning("Encountered an error while trying to initialize Hive's " +
"history file. History will not be available during this session.", e)
}

// add shutdown hook to flush the history to history file
Expand All @@ -250,7 +250,8 @@ private[hive] object SparkSQLCLIDriver extends Logging {
h.flush()
} catch {
case e: IOException =>
logWarning("WARNING: Failed to write command history file: " + e.getMessage)
logWarning(
log"Failed to write command history file: ${MDC(ERROR, e.getMessage)}")
}
case _ =>
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ import scala.jdk.CollectionConverters._
import org.apache.hive.service.server.HiveServer2

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.internal.Logging
import org.apache.spark.internal.{Logging, MDC}
import org.apache.spark.internal.LogKey._
import org.apache.spark.internal.config.Status.LIVE_ENTITY_UPDATE_PERIOD
import org.apache.spark.scheduler._
import org.apache.spark.sql.hive.thriftserver.HiveThriftServer2.ExecutionState
Expand Down Expand Up @@ -140,7 +141,9 @@ private[thriftserver] class HiveThriftServer2Listener(
sessionData.finishTimestamp = e.finishTime
updateStoreWithTriggerEnabled(sessionData)
sessionList.remove(e.sessionId)
case None => logWarning(s"onSessionClosed called with unknown session id: ${e.sessionId}")
case None => logWarning(
log"onSessionClosed called with unknown session id: ${MDC(SESSION_ID, e.sessionId)}"
)
}

private def onOperationStart(e: SparkListenerThriftServerOperationStart): Unit = {
Expand All @@ -160,8 +163,9 @@ private[thriftserver] class HiveThriftServer2Listener(
case Some(sessionData) =>
sessionData.totalExecution += 1
updateLiveStore(sessionData)
case None => logWarning(s"onOperationStart called with unknown session id: ${e.sessionId}." +
s"Regardless, the operation has been registered.")
case None => logWarning(
log"onOperationStart called with unknown session id: ${MDC(SESSION_ID, e.sessionId)}." +
log"Regardless, the operation has been registered.")
}
}

Expand All @@ -171,7 +175,9 @@ private[thriftserver] class HiveThriftServer2Listener(
executionData.executePlan = e.executionPlan
executionData.state = ExecutionState.COMPILED
updateLiveStore(executionData)
case None => logWarning(s"onOperationParsed called with unknown operation id: ${e.id}")
case None => logWarning(
log"onOperationParsed called with unknown operation id: ${MDC(STATEMENT_ID, e.id)}"
)
}

private def onOperationCanceled(e: SparkListenerThriftServerOperationCanceled): Unit =
Expand All @@ -180,7 +186,9 @@ private[thriftserver] class HiveThriftServer2Listener(
executionData.finishTimestamp = e.finishTime
executionData.state = ExecutionState.CANCELED
updateLiveStore(executionData)
case None => logWarning(s"onOperationCanceled called with unknown operation id: ${e.id}")
case None => logWarning(
log"onOperationCanceled called with unknown operation id: ${MDC(STATEMENT_ID, e.id)}"
)
}

private def onOperationTimeout(e: SparkListenerThriftServerOperationTimeout): Unit =
Expand All @@ -189,7 +197,9 @@ private[thriftserver] class HiveThriftServer2Listener(
executionData.finishTimestamp = e.finishTime
executionData.state = ExecutionState.TIMEDOUT
updateLiveStore(executionData)
case None => logWarning(s"onOperationCanceled called with unknown operation id: ${e.id}")
case None => logWarning(
log"onOperationCanceled called with unknown operation id: ${MDC(STATEMENT_ID, e.id)}"
)
}

private def onOperationError(e: SparkListenerThriftServerOperationError): Unit =
Expand All @@ -199,7 +209,9 @@ private[thriftserver] class HiveThriftServer2Listener(
executionData.detail = e.errorMsg
executionData.state = ExecutionState.FAILED
updateLiveStore(executionData)
case None => logWarning(s"onOperationError called with unknown operation id: ${e.id}")
case None => logWarning(
log"onOperationError called with unknown operation id: ${MDC(STATEMENT_ID, e.id)}"
)
}

private def onOperationFinished(e: SparkListenerThriftServerOperationFinish): Unit =
Expand All @@ -208,7 +220,9 @@ private[thriftserver] class HiveThriftServer2Listener(
executionData.finishTimestamp = e.finishTime
executionData.state = ExecutionState.FINISHED
updateLiveStore(executionData)
case None => logWarning(s"onOperationFinished called with unknown operation id: ${e.id}")
case None => logWarning(
log"onOperationFinished called with unknown operation id: ${MDC(STATEMENT_ID, e.id)}"
)
}

private def onOperationClosed(e: SparkListenerThriftServerOperationClosed): Unit =
Expand All @@ -218,7 +232,9 @@ private[thriftserver] class HiveThriftServer2Listener(
executionData.state = ExecutionState.CLOSED
updateStoreWithTriggerEnabled(executionData)
executionList.remove(e.id)
case None => logWarning(s"onOperationClosed called with unknown operation id: ${e.id}")
case None => logWarning(
log"onOperationClosed called with unknown operation id: ${MDC(STATEMENT_ID, e.id)}"
)
}

// Update both live and history stores. Trigger is enabled by default, hence
Expand Down

0 comments on commit f7440f3

Please sign in to comment.