Skip to content

Commit

Permalink
Change actor name to sparkDriver and sparkExecutor
Browse files Browse the repository at this point in the history
Apparently we also hard-code the actor names everywhere, so the
scope of this change is a little larger than expected.
  • Loading branch information
andrewor14 committed Aug 14, 2014
1 parent 921363e commit 3a92843
Show file tree
Hide file tree
Showing 10 changed files with 10 additions and 10 deletions.
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/SparkEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ object SparkEnv extends Logging {
}

val securityManager = new SecurityManager(conf)
val actorSystemName = if (isDriver) "driverActor" else "executorActor"
val actorSystemName = if (isDriver) "sparkDriver" else "sparkExecutor"
val (actorSystem, boundPort) = AkkaUtils.createActorSystem(
actorSystemName, hostname, port, conf, securityManager)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ private[spark] class SimrSchedulerBackend(
override def start() {
super.start()

val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format(
val driverUrl = "akka.tcp://sparkDriver@%s:%s/user/%s".format(
sc.conf.get("spark.driver.host"), sc.conf.get("spark.driver.port"),
CoarseGrainedSchedulerBackend.ACTOR_NAME)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ private[spark] class SparkDeploySchedulerBackend(
super.start()

// The endpoint for executors to talk to us
val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format(
val driverUrl = "akka.tcp://sparkDriver@%s:%s/user/%s".format(
conf.get("spark.driver.host"), conf.get("spark.driver.port"),
CoarseGrainedSchedulerBackend.ACTOR_NAME)
val args = Seq(driverUrl, "{{EXECUTOR_ID}}", "{{HOSTNAME}}", "{{CORES}}", "{{WORKER_URL}}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ private[spark] class CoarseMesosSchedulerBackend(
}
val command = CommandInfo.newBuilder()
.setEnvironment(environment)
val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format(
val driverUrl = "akka.tcp://sparkDriver@%s:%s/user/%s".format(
conf.get("spark.driver.host"),
conf.get("spark.driver.port"),
CoarseGrainedSchedulerBackend.ACTOR_NAME)
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ private[spark] object AkkaUtils extends Logging {
val driverHost: String = conf.get("spark.driver.host", "localhost")
val driverPort: Int = conf.getInt("spark.driver.port", 7077)
Utils.checkHost(driverHost, "Expected hostname")
val url = s"akka.tcp://spark@$driverHost:$driverPort/user/$name"
val url = s"akka.tcp://sparkDriver@$driverHost:$driverPort/user/$name"
val timeout = AkkaUtils.lookupTimeout(conf)
logInfo(s"Connecting to $name: $url")
Await.result(actorSystem.actorSelection(url).resolveOne(timeout), timeout)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ private[streaming] class ReceiverSupervisorImpl(
private val trackerActor = {
val ip = env.conf.get("spark.driver.host", "localhost")
val port = env.conf.getInt("spark.driver.port", 7077)
val url = "akka.tcp://spark@%s:%s/user/ReceiverTracker".format(ip, port)
val url = "akka.tcp://sparkDriver@%s:%s/user/ReceiverTracker".format(ip, port)
env.actorSystem.actorSelection(url)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
sparkConf.set("spark.driver.host", driverHost)
sparkConf.set("spark.driver.port", driverPort.toString)

val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format(
val driverUrl = "akka.tcp://sparkDriver@%s:%s/user/%s".format(
driverHost, driverPort.toString, CoarseGrainedSchedulerBackend.ACTOR_NAME)

actor = actorSystem.actorOf(Props(new MonitorActor(driverUrl)), name = "YarnAM")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ private[yarn] class YarnAllocationHandler(
// Deallocate + allocate can result in reusing id's wrongly - so use a different counter
// (executorIdCounter)
val executorId = executorIdCounter.incrementAndGet().toString
val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format(
val driverUrl = "akka.tcp://sparkDriver@%s:%s/user/%s".format(
sparkConf.get("spark.driver.host"), sparkConf.get("spark.driver.port"),
CoarseGrainedSchedulerBackend.ACTOR_NAME)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
sparkConf.set("spark.driver.host", driverHost)
sparkConf.set("spark.driver.port", driverPort.toString)

val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format(
val driverUrl = "akka.tcp://sparkDriver@%s:%s/user/%s".format(
driverHost, driverPort.toString, CoarseGrainedSchedulerBackend.ACTOR_NAME)

actor = actorSystem.actorOf(Props(new MonitorActor(driverUrl)), name = "YarnAM")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ private[yarn] class YarnAllocationHandler(
numExecutorsRunning.decrementAndGet()
} else {
val executorId = executorIdCounter.incrementAndGet().toString
val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format(
val driverUrl = "akka.tcp://sparkDriver@%s:%s/user/%s".format(
sparkConf.get("spark.driver.host"),
sparkConf.get("spark.driver.port"),
CoarseGrainedSchedulerBackend.ACTOR_NAME)
Expand Down

0 comments on commit 3a92843

Please sign in to comment.