Skip to content

Commit

Permalink
[SPARK-49333][SQL] Shutdown timeout thread while cleaning up SparkExe…
Browse files Browse the repository at this point in the history
…cuteStatementOperation

### What changes were proposed in this pull request?

Shutdown timeout thread while cleaning up `SparkExecuteStatementOperation`.

### Why are the changes needed?

Avoid Spark driver memory leak if query timeout is configured. For example, there are 4127 `SparkExecuteStatementOperation` instances in the Spark driver:
```
jmap  -histo 398 | grep SparkExecuteStatementOperation

 308:          4127        1122544  org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation
 563:          4127         363176  org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$ErrRowCountType$
 876:          4127         132064  org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$$anon$1
2101:           333           7992  org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$$anon$5
3106:            32           1024  org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$$anon$2
3303:            32            768  org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$$Lambda$3755/0x00000008021fe800
3304:            32            768  org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$$anon$2$$anon$3
3961:             9            360  org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$$Lambda$5398/0x0000000802523900
3962:             9            360  org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$$Lambda$5399/0x0000000802523bd8
20239:             1             16  org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$
20240:             1             16  org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$$$Lambda$5397/0x000000080251e180
20241:             1             16  org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$$Lambda$11228/0x000000080306ba38
20242:             1             16  org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$$Lambda$11230/0x00000008032962d8
20243:             1             16  org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$$Lambda$11231/0x00000008032966b8
20244:             1             16  org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$$Lambda$5363/0x0000000802509470
20245:             1             16  org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$$Lambda$5367/0x000000080250a618
20246:             1             16  org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$$Lambda$6475/0x00000008026fda40
20247:             1             16  org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$$Lambda$7355/0x00000008028aa180
20248:             1             16  org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$$Lambda$7356/0x00000008028aa560
```

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Manual test.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #47826 from wangyum/SPARK-49333.

Authored-by: Yuming Wang <[email protected]>
Signed-off-by: Yuming Wang <[email protected]>
  • Loading branch information
wangyum committed Aug 23, 2024
1 parent b57d863 commit 853731d
Showing 1 changed file with 8 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.sql.hive.thriftserver

import java.security.PrivilegedExceptionAction
import java.util.{Collections, Map => JMap}
import java.util.concurrent.{Executors, RejectedExecutionException, TimeUnit}
import java.util.concurrent.{Executors, RejectedExecutionException, ScheduledExecutorService, TimeUnit}

import scala.jdk.CollectionConverters._
import scala.util.control.NonFatal
Expand Down Expand Up @@ -61,6 +61,7 @@ private[hive] class SparkExecuteStatementOperation(
queryTimeout
}
}
private var timeoutExecutor: ScheduledExecutorService = _

private val forceCancel = session.sessionState.conf.getConf(SQLConf.THRIFTSERVER_FORCE_CANCEL)

Expand Down Expand Up @@ -135,7 +136,7 @@ private[hive] class SparkExecuteStatementOperation(
setHasResultSet(true) // avoid no resultset for async run

if (timeout > 0) {
val timeoutExecutor = Executors.newSingleThreadScheduledExecutor()
timeoutExecutor = Executors.newSingleThreadScheduledExecutor()
timeoutExecutor.schedule(new Runnable {
override def run(): Unit = {
try {
Expand Down Expand Up @@ -319,6 +320,11 @@ private[hive] class SparkExecuteStatementOperation(
if (statementId != null) {
session.sparkContext.cancelJobGroup(statementId)
}
// Shutdown the timeout thread if any, while cleaning up this operation
if (timeoutExecutor != null &&
getStatus.getState != OperationState.TIMEDOUT && getStatus.getState.isTerminal) {
timeoutExecutor.shutdownNow()
}
}
}

Expand Down

0 comments on commit 853731d

Please sign in to comment.