Skip to content

Commit

Permalink
Merge pull request apache#2 from qiuxin2012/fixworker
Browse files Browse the repository at this point in the history
use the same authhelper
  • Loading branch information
qiuxin2012 authored Nov 24, 2021
2 parents 958e50d + 8c34c3a commit 6acc3e4
Showing 1 changed file with 13 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String
value
}.getOrElse("pyspark.worker")

private val authHelper = new SocketAuthHelper(SparkEnv.get.conf)
private val authHelper = getAuthHelper(SparkEnv.get.conf)

@GuardedBy("self")
private var daemon: Process = null
Expand Down Expand Up @@ -448,6 +448,7 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String
protected object PythonWorkerFactory {
val simpleWorkerBuffer = mutable.HashMap[Int, (Socket, Process, ServerSocket)]()
var simpleWorkerIter: Iterator[Int] = null
var authHelper: SocketAuthHelper = null
def keysIterator(): Iterator[Int] = {
if (simpleWorkerIter == null || !simpleWorkerIter.hasNext) {
simpleWorkerIter = simpleWorkerBuffer.keysIterator
Expand All @@ -456,6 +457,17 @@ protected object PythonWorkerFactory {
}
val maxSimpleWorker = 1

def getAuthHelper(conf: SparkConf): SocketAuthHelper = {
this.synchronized {
if (authHelper == null) {
authHelper = new SocketAuthHelper(conf)
}
}
authHelper
}



val PROCESS_WAIT_TIMEOUT_MS = 10000000
val IDLE_WORKER_TIMEOUT_NS = TimeUnit.MINUTES.toNanos(1) // kill idle workers after 1 minute
}

0 comments on commit 6acc3e4

Please sign in to comment.