From 0c0e409d49afa954703462b338af04481b74f563 Mon Sep 17 00:00:00 2001 From: CodingCat Date: Sun, 2 Mar 2014 23:22:09 -0500 Subject: [PATCH 1/5] simplify the implementation of CoarseGrainedSchedulerBackend --- .../apache/spark/scheduler/WorkerOffer.scala | 4 +++- .../CoarseGrainedSchedulerBackend.scala | 24 +++++++------------ 2 files changed, 11 insertions(+), 17 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/WorkerOffer.scala b/core/src/main/scala/org/apache/spark/scheduler/WorkerOffer.scala index ba6bab3f91a65..51db82dc67e1a 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/WorkerOffer.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/WorkerOffer.scala @@ -21,4 +21,6 @@ package org.apache.spark.scheduler * Represents free resources available on an executor. */ private[spark] -class WorkerOffer(val executorId: String, val host: String, val cores: Int) +class WorkerOffer(val executorId: String, val host: String, var cores: Int) { + @transient val totalcores = cores +} diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 379e02eb9a437..82dfe06a5f644 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -51,9 +51,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A class DriverActor(sparkProperties: Seq[(String, String)]) extends Actor { private val executorActor = new HashMap[String, ActorRef] - private val executorAddress = new HashMap[String, Address] - private val executorHost = new HashMap[String, String] - private val freeCores = new HashMap[String, Int] + private val workerOffers = new HashMap[String, WorkerOffer] private val addressToExecutorId = new HashMap[Address, String] override def preStart() { @@ -75,9 +73,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A logInfo("Registered executor: " + sender + " with ID " + executorId) sender ! RegisteredExecutor(sparkProperties) executorActor(executorId) = sender - executorHost(executorId) = Utils.parseHostPort(hostPort)._1 - freeCores(executorId) = cores - executorAddress(executorId) = sender.path.address + workerOffers += (executorId -> new WorkerOffer(executorId, Utils.parseHostPort(hostPort)._1, cores)) addressToExecutorId(sender.path.address) = executorId totalCoreCount.addAndGet(cores) makeOffers() @@ -87,7 +83,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A scheduler.statusUpdate(taskId, state, data.value) if (TaskState.isFinished(state)) { if (executorActor.contains(executorId)) { - freeCores(executorId) += 1 + workerOffers(executorId).cores += 1 makeOffers(executorId) } else { // Ignoring the update since we don't know about the executor. @@ -125,20 +121,18 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A // Make fake resource offers on all executors def makeOffers() { - launchTasks(scheduler.resourceOffers( - executorHost.toArray.map {case (id, host) => new WorkerOffer(id, host, freeCores(id))})) + launchTasks(scheduler.resourceOffers(workerOffers.values.toSeq)) } // Make fake resource offers on just one executor def makeOffers(executorId: String) { - launchTasks(scheduler.resourceOffers( - Seq(new WorkerOffer(executorId, executorHost(executorId), freeCores(executorId))))) + launchTasks(scheduler.resourceOffers(Seq(workerOffers(executorId)))) } // Launch tasks returned by a set of resource offers def launchTasks(tasks: Seq[Seq[TaskDescription]]) { for (task <- tasks.flatten) { - freeCores(task.executorId) -= 1 + workerOffers(task.executorId).cores -= 1 executorActor(task.executorId) ! LaunchTask(task) } } @@ -147,11 +141,9 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A def removeExecutor(executorId: String, reason: String) { if (executorActor.contains(executorId)) { logInfo("Executor " + executorId + " disconnected, so removing it") - val numCores = freeCores(executorId) - addressToExecutorId -= executorAddress(executorId) + val numCores = workerOffers(executorId).totalcores executorActor -= executorId - executorHost -= executorId - freeCores -= executorId + workerOffers -= executorId totalCoreCount.addAndGet(-numCores) scheduler.executorLost(executorId, SlaveLost(reason)) } From af470d375f224a2d5e95d3b78ddf62b536c36554 Mon Sep 17 00:00:00 2001 From: CodingCat Date: Mon, 3 Mar 2014 09:52:45 -0500 Subject: [PATCH 2/5] style fix --- .../scheduler/cluster/CoarseGrainedSchedulerBackend.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 82dfe06a5f644..9ca04f57f3616 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -73,7 +73,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A logInfo("Registered executor: " + sender + " with ID " + executorId) sender ! RegisteredExecutor(sparkProperties) executorActor(executorId) = sender - workerOffers += (executorId -> new WorkerOffer(executorId, Utils.parseHostPort(hostPort)._1, cores)) + workerOffers += (executorId -> + new WorkerOffer(executorId, Utils.parseHostPort(hostPort)._1, cores)) addressToExecutorId(sender.path.address) = executorId totalCoreCount.addAndGet(cores) makeOffers() From 43c13e996508a0a04626c0d1aecf6da039d8d713 Mon Sep 17 00:00:00 2001 From: CodingCat Date: Mon, 3 Mar 2014 22:47:26 -0500 Subject: [PATCH 3/5] keep WorkerOffer immutable --- .../apache/spark/scheduler/WorkerOffer.scala | 4 +--- .../cluster/CoarseGrainedSchedulerBackend.scala | 17 ++++++++++++++--- 2 files changed, 15 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/WorkerOffer.scala b/core/src/main/scala/org/apache/spark/scheduler/WorkerOffer.scala index 51db82dc67e1a..d8412ae66be89 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/WorkerOffer.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/WorkerOffer.scala @@ -21,6 +21,4 @@ package org.apache.spark.scheduler * Represents free resources available on an executor. */ private[spark] -class WorkerOffer(val executorId: String, val host: String, var cores: Int) { - @transient val totalcores = cores -} +case class WorkerOffer(executorId: String, host: String, cores: Int); diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 9ca04f57f3616..58bb6b420847e 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -52,6 +52,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A class DriverActor(sparkProperties: Seq[(String, String)]) extends Actor { private val executorActor = new HashMap[String, ActorRef] private val workerOffers = new HashMap[String, WorkerOffer] + private val freeCores = new HashMap[String, Int] + private val totalCores = new HashMap[String, Int] private val addressToExecutorId = new HashMap[Address, String] override def preStart() { @@ -75,6 +77,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A executorActor(executorId) = sender workerOffers += (executorId -> new WorkerOffer(executorId, Utils.parseHostPort(hostPort)._1, cores)) + totalCores += (executorId -> cores) + freeCores += (executorId -> cores) addressToExecutorId(sender.path.address) = executorId totalCoreCount.addAndGet(cores) makeOffers() @@ -84,7 +88,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A scheduler.statusUpdate(taskId, state, data.value) if (TaskState.isFinished(state)) { if (executorActor.contains(executorId)) { - workerOffers(executorId).cores += 1 + freeCores(executorId) += 1 makeOffers(executorId) } else { // Ignoring the update since we don't know about the executor. @@ -122,18 +126,23 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A // Make fake resource offers on all executors def makeOffers() { + // reconstruct workerOffers + workerOffers.foreach(o => workerOffers(o._1) = + new WorkerOffer(o._1, o._2.host, freeCores(o._1))) launchTasks(scheduler.resourceOffers(workerOffers.values.toSeq)) } // Make fake resource offers on just one executor def makeOffers(executorId: String) { + val oldOffer = workerOffers(executorId) + workerOffers(executorId) = new WorkerOffer(executorId, oldOffer.host, freeCores(executorId)) launchTasks(scheduler.resourceOffers(Seq(workerOffers(executorId)))) } // Launch tasks returned by a set of resource offers def launchTasks(tasks: Seq[Seq[TaskDescription]]) { for (task <- tasks.flatten) { - workerOffers(task.executorId).cores -= 1 + freeCores(task.executorId) -= 1 executorActor(task.executorId) ! LaunchTask(task) } } @@ -142,9 +151,11 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A def removeExecutor(executorId: String, reason: String) { if (executorActor.contains(executorId)) { logInfo("Executor " + executorId + " disconnected, so removing it") - val numCores = workerOffers(executorId).totalcores + val numCores = totalCores(executorId) executorActor -= executorId workerOffers -= executorId + totalCores -= executorId + freeCores -= executorId totalCoreCount.addAndGet(-numCores) scheduler.executorLost(executorId, SlaveLost(reason)) } From 19c2bb4143bd964b3cae810817b9feef6be7a441 Mon Sep 17 00:00:00 2001 From: CodingCat Date: Tue, 4 Mar 2014 08:26:45 -0500 Subject: [PATCH 4/5] use copy idiom to reconstruct the workerOffers --- .../cluster/CoarseGrainedSchedulerBackend.scala | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 58bb6b420847e..6283003f8ea48 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -127,15 +127,16 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A // Make fake resource offers on all executors def makeOffers() { // reconstruct workerOffers - workerOffers.foreach(o => workerOffers(o._1) = - new WorkerOffer(o._1, o._2.host, freeCores(o._1))) + workerOffers.keys.foreach { executorId => + workerOffers(executorId) = workerOffers(executorId).copy(cores = freeCores(executorId)) + } launchTasks(scheduler.resourceOffers(workerOffers.values.toSeq)) } // Make fake resource offers on just one executor def makeOffers(executorId: String) { - val oldOffer = workerOffers(executorId) - workerOffers(executorId) = new WorkerOffer(executorId, oldOffer.host, freeCores(executorId)) + // update the workerOffer + workerOffers(executorId) = workerOffers(executorId).copy(cores = freeCores(executorId)) launchTasks(scheduler.resourceOffers(Seq(workerOffers(executorId)))) } From f6bf93f425ce71f8d73805a6106f433fc1db15ef Mon Sep 17 00:00:00 2001 From: Nan Zhu Date: Tue, 4 Mar 2014 12:07:31 -0500 Subject: [PATCH 5/5] code clean --- .../apache/spark/scheduler/WorkerOffer.scala | 2 +- .../CoarseGrainedSchedulerBackend.scala | 27 +++++++++---------- 2 files changed, 14 insertions(+), 15 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/WorkerOffer.scala b/core/src/main/scala/org/apache/spark/scheduler/WorkerOffer.scala index d8412ae66be89..810b36cddf835 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/WorkerOffer.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/WorkerOffer.scala @@ -21,4 +21,4 @@ package org.apache.spark.scheduler * Represents free resources available on an executor. */ private[spark] -case class WorkerOffer(executorId: String, host: String, cores: Int); +case class WorkerOffer(executorId: String, host: String, cores: Int) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 6283003f8ea48..fad03731572e7 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -51,7 +51,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A class DriverActor(sparkProperties: Seq[(String, String)]) extends Actor { private val executorActor = new HashMap[String, ActorRef] - private val workerOffers = new HashMap[String, WorkerOffer] + private val executorAddress = new HashMap[String, Address] + private val executorHost = new HashMap[String, String] private val freeCores = new HashMap[String, Int] private val totalCores = new HashMap[String, Int] private val addressToExecutorId = new HashMap[Address, String] @@ -75,10 +76,10 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A logInfo("Registered executor: " + sender + " with ID " + executorId) sender ! RegisteredExecutor(sparkProperties) executorActor(executorId) = sender - workerOffers += (executorId -> - new WorkerOffer(executorId, Utils.parseHostPort(hostPort)._1, cores)) - totalCores += (executorId -> cores) - freeCores += (executorId -> cores) + executorHost(executorId) = Utils.parseHostPort(hostPort)._1 + totalCores(executorId) = cores + freeCores(executorId) = cores + executorAddress(executorId) = sender.path.address addressToExecutorId(sender.path.address) = executorId totalCoreCount.addAndGet(cores) makeOffers() @@ -126,18 +127,14 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A // Make fake resource offers on all executors def makeOffers() { - // reconstruct workerOffers - workerOffers.keys.foreach { executorId => - workerOffers(executorId) = workerOffers(executorId).copy(cores = freeCores(executorId)) - } - launchTasks(scheduler.resourceOffers(workerOffers.values.toSeq)) + launchTasks(scheduler.resourceOffers( + executorHost.toArray.map {case (id, host) => new WorkerOffer(id, host, freeCores(id))})) } // Make fake resource offers on just one executor def makeOffers(executorId: String) { - // update the workerOffer - workerOffers(executorId) = workerOffers(executorId).copy(cores = freeCores(executorId)) - launchTasks(scheduler.resourceOffers(Seq(workerOffers(executorId)))) + launchTasks(scheduler.resourceOffers( + Seq(new WorkerOffer(executorId, executorHost(executorId), freeCores(executorId))))) } // Launch tasks returned by a set of resource offers @@ -154,7 +151,9 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A logInfo("Executor " + executorId + " disconnected, so removing it") val numCores = totalCores(executorId) executorActor -= executorId - workerOffers -= executorId + executorHost -= executorId + addressToExecutorId -= executorAddress(executorId) + executorAddress -= executorId totalCores -= executorId freeCores -= executorId totalCoreCount.addAndGet(-numCores)