diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala index 208c6f0cc090c..caa85ebe57dee 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.hive.thriftserver import java.security.PrivilegedExceptionAction import java.util.{Collections, Map => JMap} -import java.util.concurrent.{Executors, RejectedExecutionException, TimeUnit} +import java.util.concurrent.{Executors, RejectedExecutionException, ScheduledExecutorService, TimeUnit} import scala.jdk.CollectionConverters._ import scala.util.control.NonFatal @@ -61,6 +61,7 @@ private[hive] class SparkExecuteStatementOperation( queryTimeout } } + private var timeoutExecutor: ScheduledExecutorService = _ private val forceCancel = session.sessionState.conf.getConf(SQLConf.THRIFTSERVER_FORCE_CANCEL) @@ -135,7 +136,7 @@ private[hive] class SparkExecuteStatementOperation( setHasResultSet(true) // avoid no resultset for async run if (timeout > 0) { - val timeoutExecutor = Executors.newSingleThreadScheduledExecutor() + timeoutExecutor = Executors.newSingleThreadScheduledExecutor() timeoutExecutor.schedule(new Runnable { override def run(): Unit = { try { @@ -319,6 +320,11 @@ private[hive] class SparkExecuteStatementOperation( if (statementId != null) { session.sparkContext.cancelJobGroup(statementId) } + // Shutdown the timeout thread if any, while cleaning up this operation + if (timeoutExecutor != null && + getStatus.getState != OperationState.TIMEDOUT && getStatus.getState.isTerminal) { + timeoutExecutor.shutdownNow() + } } }