From 51ae1351680478e01e9ed24f1545eaca5cdb7cb5 Mon Sep 17 00:00:00 2001 From: Jiatao Tao <245915794@qq.com> Date: Fri, 8 Nov 2019 23:34:52 +0800 Subject: [PATCH] #72 expose interface to add blacklist manually. --- .../spark/scheduler/BlacklistTracker.scala | 110 ++++++++++++++---- .../spark/scheduler/TaskSchedulerImpl.scala | 41 ++++++- .../CoarseGrainedSchedulerBackend.scala | 14 +++ 3 files changed, 142 insertions(+), 23 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala b/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala index ef6d02d85c27b..0f42dbeccb5d7 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala @@ -226,6 +226,62 @@ private[scheduler] class BlacklistTracker ( } } + def getBlacklist: (Array[String], Array[String]) = { + (executorIdToBlacklistStatus.keySet.toArray, nodeIdToBlacklistExpiryTime.keySet.toArray) + } + + def addExecutorToBlackListManually(executorId: String, node: String): Unit = { + if (!executorIdToBlacklistStatus.contains(executorId)) { + logInfo(s"Blacklisting executor $executorId manually.") + val now = clock.getTimeMillis() + putExecToBlacklist(now, executorId, -1, node) + } else { + logInfo(s"Executor $executorId already in blacklist.") + } + } + + def addNodeToBlackListManually(node: String): Unit = { + if (!nodeIdToBlacklistExpiryTime.contains(node)) { + logInfo(s"Blacklisting node $node manually.") + val now = clock.getTimeMillis() + val expiryTimeForNewBlacklists = now + BLACKLIST_TIMEOUT_MILLIS + val blacklistedExecsOnNode = nodeToBlacklistedExecs.getOrElseUpdate(node, HashSet[String]()) + putNodeToBlacklist(now, node, expiryTimeForNewBlacklists, blacklistedExecsOnNode) + } else { + logInfo(s"Node $node already in blacklist.") + } + } + + def removeExecutorFromBlackListManually(executorId: String): Unit = { + if (executorIdToBlacklistStatus.contains(executorId)) { + logInfo(s"Remove executor $executorId from blacklist manually.") + val now = clock.getTimeMillis() + val status = executorIdToBlacklistStatus.remove(executorId).get + val failedExecsOnNode = nodeToBlacklistedExecs(status.node) + listenerBus.post(SparkListenerExecutorUnblacklisted(now, executorId)) + failedExecsOnNode.remove(executorId) + if (failedExecsOnNode.isEmpty) { + nodeToBlacklistedExecs.remove(status.node) + } + updateNextExpiryTime() + } else { + logInfo(s"Executor $executorId not in blacklist.") + } + } + + def removeNodeFromBlackListManually(node: String): Unit = { + if (nodeIdToBlacklistExpiryTime.contains(node)) { + logInfo(s"Remove node $node from blacklist manually.") + val now = clock.getTimeMillis() + nodeIdToBlacklistExpiryTime.remove(node) + listenerBus.post(SparkListenerNodeUnblacklisted(now, node)) + _nodeBlacklist.set(nodeIdToBlacklistExpiryTime.keySet.toSet) + updateNextExpiryTime() + } else { + logInfo(s"Node $node not in blacklist.") + } + } + def updateBlacklistForSuccessfulTaskSet( stageId: Int, stageAttemptId: Int, @@ -240,7 +296,6 @@ private[scheduler] class BlacklistTracker ( appFailuresOnExecutor.dropFailuresWithTimeoutBefore(now) val newTotal = appFailuresOnExecutor.numUniqueTaskFailures - val expiryTimeForNewBlacklists = now + BLACKLIST_TIMEOUT_MILLIS // If this pushes the total number of failures over the threshold, blacklist the executor. // If its already blacklisted, we avoid "re-blacklisting" (which can happen if there were // other tasks already running in another taskset when it got blacklisted), because it makes @@ -250,31 +305,42 @@ private[scheduler] class BlacklistTracker ( logInfo(s"Blacklisting executor id: $exec because it has $newTotal" + s" task failures in successful task sets") val node = failuresInTaskSet.node - executorIdToBlacklistStatus.put(exec, BlacklistedExecutor(node, expiryTimeForNewBlacklists)) - listenerBus.post(SparkListenerExecutorBlacklisted(now, exec, newTotal)) - executorIdToFailureList.remove(exec) - updateNextExpiryTime() - killBlacklistedExecutor(exec) - - // In addition to blacklisting the executor, we also update the data for failures on the - // node, and potentially put the entire node into a blacklist as well. - val blacklistedExecsOnNode = nodeToBlacklistedExecs.getOrElseUpdate(node, HashSet[String]()) - blacklistedExecsOnNode += exec - // If the node is already in the blacklist, we avoid adding it again with a later expiry - // time. - if (blacklistedExecsOnNode.size >= MAX_FAILED_EXEC_PER_NODE && - !nodeIdToBlacklistExpiryTime.contains(node)) { - logInfo(s"Blacklisting node $node because it has ${blacklistedExecsOnNode.size} " + - s"executors blacklisted: ${blacklistedExecsOnNode}") - nodeIdToBlacklistExpiryTime.put(node, expiryTimeForNewBlacklists) - listenerBus.post(SparkListenerNodeBlacklisted(now, node, blacklistedExecsOnNode.size)) - _nodeBlacklist.set(nodeIdToBlacklistExpiryTime.keySet.toSet) - killExecutorsOnBlacklistedNode(node) - } + putExecToBlacklist(now, exec, newTotal, node) } } } + private def putExecToBlacklist(now: Long, exec: String, newTotal: Int, node: String): Unit = { + val expiryTimeForNewBlacklists = now + BLACKLIST_TIMEOUT_MILLIS + executorIdToBlacklistStatus.put(exec, BlacklistedExecutor(node, expiryTimeForNewBlacklists)) + listenerBus.post(SparkListenerExecutorBlacklisted(now, exec, newTotal)) + executorIdToFailureList.remove(exec) + updateNextExpiryTime() + killBlacklistedExecutor(exec) + + // In addition to blacklisting the executor, we also update the data for failures on the + // node, and potentially put the entire node into a blacklist as well. + val blacklistedExecsOnNode = nodeToBlacklistedExecs.getOrElseUpdate(node, HashSet[String]()) + blacklistedExecsOnNode += exec + // If the node is already in the blacklist, we avoid adding it again with a later expiry + // time. + if (blacklistedExecsOnNode.size >= MAX_FAILED_EXEC_PER_NODE && + !nodeIdToBlacklistExpiryTime.contains(node)) { + logInfo(s"Blacklisting node $node because it has ${blacklistedExecsOnNode.size} " + + s"executors blacklisted: ${blacklistedExecsOnNode}") + putNodeToBlacklist(now, node, expiryTimeForNewBlacklists, blacklistedExecsOnNode) + } + } + + private def putNodeToBlacklist(now: Long, node: String, + expiryTimeForNewBlacklists: Long, + blacklistedExecsOnNode: HashSet[String]): Unit = { + nodeIdToBlacklistExpiryTime.put(node, expiryTimeForNewBlacklists) + listenerBus.post(SparkListenerNodeBlacklisted(now, node, blacklistedExecsOnNode.size)) + _nodeBlacklist.set(nodeIdToBlacklistExpiryTime.keySet.toSet) + killExecutorsOnBlacklistedNode(node) + } + def isExecutorBlacklisted(executorId: String): Boolean = { executorIdToBlacklistStatus.contains(executorId) } diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 24d77f88db982..df18c6247c05c 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -849,8 +849,47 @@ private[spark] class TaskSchedulerImpl( } } -} + def getBlacklist: (Array[String], Array[String]) = synchronized { + if (BlacklistTracker.isBlacklistEnabled(sc.conf)) { + blacklistTrackerOpt.get.getBlacklist + } else { + logWarning("Blacklist is off, can not get blacklist.") + (Array.empty, Array.empty) + } + } + def addExecutorToBlackListManually(executorId: String, node: String): Unit = synchronized { + if (BlacklistTracker.isBlacklistEnabled(sc.conf)) { + blacklistTrackerOpt.get.addExecutorToBlackListManually(executorId, node) + } else { + logWarning("Blacklist is off, can not add executor to blacklist.") + } + } + + def addNodeToBlackListManually(node: String): Unit = synchronized { + if (BlacklistTracker.isBlacklistEnabled(sc.conf)) { + blacklistTrackerOpt.get.addNodeToBlackListManually(node) + } else { + logWarning("Blacklist is off, can not add node to blacklist.") + } + } + + def removeExecutorFromBlackListManually(executorId: String): Unit = synchronized { + if (BlacklistTracker.isBlacklistEnabled(sc.conf)) { + blacklistTrackerOpt.get.removeExecutorFromBlackListManually(executorId) + } else { + logWarning("Blacklist is off, can not remove executor from blacklist.") + } + } + + def removeNodeFromBlackListManually(node: String): Unit = synchronized { + if (BlacklistTracker.isBlacklistEnabled(sc.conf)) { + blacklistTrackerOpt.get.removeNodeFromBlackListManually(node) + } else { + logWarning("Blacklist is off, can not remove node from blacklist.") + } + } +} private[spark] object TaskSchedulerImpl { diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index de7c0d813ae65..45aed63aa55d0 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -496,6 +496,20 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp executorDataMap.keySet.toSeq } + def getHostByExecutor(executorId: String): String = synchronized { + val maybeExecutorData = executorDataMap.get(executorId) + if (maybeExecutorData.isDefined) { + maybeExecutorData.get.executorHost + } else { + logWarning(s"Can not get host by executor $executorId.") + "" + } + } + + def getHosts: Set[String] = synchronized { + executorDataMap.values.map(_.executorHost).toSet + } + override def maxNumConcurrentTasks(): Int = { executorDataMap.values.map { executor => executor.totalCores / scheduler.CPUS_PER_TASK