Skip to content

Commit

Permalink
#72 expose interface to add blacklist manually.
Browse files Browse the repository at this point in the history
  • Loading branch information
Aaaaaaron committed Nov 8, 2019
1 parent 4b21f59 commit 51ae135
Show file tree
Hide file tree
Showing 3 changed files with 142 additions and 23 deletions.
110 changes: 88 additions & 22 deletions core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,62 @@ private[scheduler] class BlacklistTracker (
}
}

def getBlacklist: (Array[String], Array[String]) = {
(executorIdToBlacklistStatus.keySet.toArray, nodeIdToBlacklistExpiryTime.keySet.toArray)
}

def addExecutorToBlackListManually(executorId: String, node: String): Unit = {
if (!executorIdToBlacklistStatus.contains(executorId)) {
logInfo(s"Blacklisting executor $executorId manually.")
val now = clock.getTimeMillis()
putExecToBlacklist(now, executorId, -1, node)
} else {
logInfo(s"Executor $executorId already in blacklist.")
}
}

def addNodeToBlackListManually(node: String): Unit = {
if (!nodeIdToBlacklistExpiryTime.contains(node)) {
logInfo(s"Blacklisting node $node manually.")
val now = clock.getTimeMillis()
val expiryTimeForNewBlacklists = now + BLACKLIST_TIMEOUT_MILLIS
val blacklistedExecsOnNode = nodeToBlacklistedExecs.getOrElseUpdate(node, HashSet[String]())
putNodeToBlacklist(now, node, expiryTimeForNewBlacklists, blacklistedExecsOnNode)
} else {
logInfo(s"Node $node already in blacklist.")
}
}

def removeExecutorFromBlackListManually(executorId: String): Unit = {
if (executorIdToBlacklistStatus.contains(executorId)) {
logInfo(s"Remove executor $executorId from blacklist manually.")
val now = clock.getTimeMillis()
val status = executorIdToBlacklistStatus.remove(executorId).get
val failedExecsOnNode = nodeToBlacklistedExecs(status.node)
listenerBus.post(SparkListenerExecutorUnblacklisted(now, executorId))
failedExecsOnNode.remove(executorId)
if (failedExecsOnNode.isEmpty) {
nodeToBlacklistedExecs.remove(status.node)
}
updateNextExpiryTime()
} else {
logInfo(s"Executor $executorId not in blacklist.")
}
}

def removeNodeFromBlackListManually(node: String): Unit = {
if (nodeIdToBlacklistExpiryTime.contains(node)) {
logInfo(s"Remove node $node from blacklist manually.")
val now = clock.getTimeMillis()
nodeIdToBlacklistExpiryTime.remove(node)
listenerBus.post(SparkListenerNodeUnblacklisted(now, node))
_nodeBlacklist.set(nodeIdToBlacklistExpiryTime.keySet.toSet)
updateNextExpiryTime()
} else {
logInfo(s"Node $node not in blacklist.")
}
}

def updateBlacklistForSuccessfulTaskSet(
stageId: Int,
stageAttemptId: Int,
Expand All @@ -240,7 +296,6 @@ private[scheduler] class BlacklistTracker (
appFailuresOnExecutor.dropFailuresWithTimeoutBefore(now)
val newTotal = appFailuresOnExecutor.numUniqueTaskFailures

val expiryTimeForNewBlacklists = now + BLACKLIST_TIMEOUT_MILLIS
// If this pushes the total number of failures over the threshold, blacklist the executor.
// If its already blacklisted, we avoid "re-blacklisting" (which can happen if there were
// other tasks already running in another taskset when it got blacklisted), because it makes
Expand All @@ -250,31 +305,42 @@ private[scheduler] class BlacklistTracker (
logInfo(s"Blacklisting executor id: $exec because it has $newTotal" +
s" task failures in successful task sets")
val node = failuresInTaskSet.node
executorIdToBlacklistStatus.put(exec, BlacklistedExecutor(node, expiryTimeForNewBlacklists))
listenerBus.post(SparkListenerExecutorBlacklisted(now, exec, newTotal))
executorIdToFailureList.remove(exec)
updateNextExpiryTime()
killBlacklistedExecutor(exec)

// In addition to blacklisting the executor, we also update the data for failures on the
// node, and potentially put the entire node into a blacklist as well.
val blacklistedExecsOnNode = nodeToBlacklistedExecs.getOrElseUpdate(node, HashSet[String]())
blacklistedExecsOnNode += exec
// If the node is already in the blacklist, we avoid adding it again with a later expiry
// time.
if (blacklistedExecsOnNode.size >= MAX_FAILED_EXEC_PER_NODE &&
!nodeIdToBlacklistExpiryTime.contains(node)) {
logInfo(s"Blacklisting node $node because it has ${blacklistedExecsOnNode.size} " +
s"executors blacklisted: ${blacklistedExecsOnNode}")
nodeIdToBlacklistExpiryTime.put(node, expiryTimeForNewBlacklists)
listenerBus.post(SparkListenerNodeBlacklisted(now, node, blacklistedExecsOnNode.size))
_nodeBlacklist.set(nodeIdToBlacklistExpiryTime.keySet.toSet)
killExecutorsOnBlacklistedNode(node)
}
putExecToBlacklist(now, exec, newTotal, node)
}
}
}

private def putExecToBlacklist(now: Long, exec: String, newTotal: Int, node: String): Unit = {
val expiryTimeForNewBlacklists = now + BLACKLIST_TIMEOUT_MILLIS
executorIdToBlacklistStatus.put(exec, BlacklistedExecutor(node, expiryTimeForNewBlacklists))
listenerBus.post(SparkListenerExecutorBlacklisted(now, exec, newTotal))
executorIdToFailureList.remove(exec)
updateNextExpiryTime()
killBlacklistedExecutor(exec)

// In addition to blacklisting the executor, we also update the data for failures on the
// node, and potentially put the entire node into a blacklist as well.
val blacklistedExecsOnNode = nodeToBlacklistedExecs.getOrElseUpdate(node, HashSet[String]())
blacklistedExecsOnNode += exec
// If the node is already in the blacklist, we avoid adding it again with a later expiry
// time.
if (blacklistedExecsOnNode.size >= MAX_FAILED_EXEC_PER_NODE &&
!nodeIdToBlacklistExpiryTime.contains(node)) {
logInfo(s"Blacklisting node $node because it has ${blacklistedExecsOnNode.size} " +
s"executors blacklisted: ${blacklistedExecsOnNode}")
putNodeToBlacklist(now, node, expiryTimeForNewBlacklists, blacklistedExecsOnNode)
}
}

private def putNodeToBlacklist(now: Long, node: String,
expiryTimeForNewBlacklists: Long,
blacklistedExecsOnNode: HashSet[String]): Unit = {
nodeIdToBlacklistExpiryTime.put(node, expiryTimeForNewBlacklists)
listenerBus.post(SparkListenerNodeBlacklisted(now, node, blacklistedExecsOnNode.size))
_nodeBlacklist.set(nodeIdToBlacklistExpiryTime.keySet.toSet)
killExecutorsOnBlacklistedNode(node)
}

def isExecutorBlacklisted(executorId: String): Boolean = {
executorIdToBlacklistStatus.contains(executorId)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -849,8 +849,47 @@ private[spark] class TaskSchedulerImpl(
}
}

}
def getBlacklist: (Array[String], Array[String]) = synchronized {
if (BlacklistTracker.isBlacklistEnabled(sc.conf)) {
blacklistTrackerOpt.get.getBlacklist
} else {
logWarning("Blacklist is off, can not get blacklist.")
(Array.empty, Array.empty)
}
}

def addExecutorToBlackListManually(executorId: String, node: String): Unit = synchronized {
if (BlacklistTracker.isBlacklistEnabled(sc.conf)) {
blacklistTrackerOpt.get.addExecutorToBlackListManually(executorId, node)
} else {
logWarning("Blacklist is off, can not add executor to blacklist.")
}
}

def addNodeToBlackListManually(node: String): Unit = synchronized {
if (BlacklistTracker.isBlacklistEnabled(sc.conf)) {
blacklistTrackerOpt.get.addNodeToBlackListManually(node)
} else {
logWarning("Blacklist is off, can not add node to blacklist.")
}
}

def removeExecutorFromBlackListManually(executorId: String): Unit = synchronized {
if (BlacklistTracker.isBlacklistEnabled(sc.conf)) {
blacklistTrackerOpt.get.removeExecutorFromBlackListManually(executorId)
} else {
logWarning("Blacklist is off, can not remove executor from blacklist.")
}
}

def removeNodeFromBlackListManually(node: String): Unit = synchronized {
if (BlacklistTracker.isBlacklistEnabled(sc.conf)) {
blacklistTrackerOpt.get.removeNodeFromBlackListManually(node)
} else {
logWarning("Blacklist is off, can not remove node from blacklist.")
}
}
}

private[spark] object TaskSchedulerImpl {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -496,6 +496,20 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
executorDataMap.keySet.toSeq
}

def getHostByExecutor(executorId: String): String = synchronized {
val maybeExecutorData = executorDataMap.get(executorId)
if (maybeExecutorData.isDefined) {
maybeExecutorData.get.executorHost
} else {
logWarning(s"Can not get host by executor $executorId.")
""
}
}

def getHosts: Set[String] = synchronized {
executorDataMap.values.map(_.executorHost).toSet
}

override def maxNumConcurrentTasks(): Int = {
executorDataMap.values.map { executor =>
executor.totalCores / scheduler.CPUS_PER_TASK
Expand Down

0 comments on commit 51ae135

Please sign in to comment.