Skip to content

Commit

Permalink
[SPARK-2886] Use more specific actor system name than "spark"
Browse files Browse the repository at this point in the history
As of #1777 we log the name of the actor system when it binds to a port. The current name "spark" is super general and does not convey any meaning. For instance, the following line is taken from my driver log after setting `spark.driver.port` to 5001.
```
14/08/13 19:33:29 INFO Remoting: Remoting started; listening on addresses:
[akka.tcp://sparkandrews-mbp:5001]
14/08/13 19:33:29 INFO Remoting: Remoting now listens on addresses:
[akka.tcp://sparkandrews-mbp:5001]
14/08/06 13:40:05 INFO Utils: Successfully started service 'spark' on port 5001.
```
This commit renames this to "sparkDriver" and "sparkExecutor". The goal of this unambitious PR is simply to make the logged information more explicit without introducing any change in functionality.

Author: Andrew Or <[email protected]>

Closes #1810 from andrewor14/service-name and squashes the following commits:

8c459ed [Andrew Or] Use a common variable for driver/executor actor system names
3a92843 [Andrew Or] Change actor name to sparkDriver and sparkExecutor
921363e [Andrew Or] Merge branch 'master' of github.com:apache/spark into service-name
c8c6a62 [Andrew Or] Do not include hyphens in actor name
1c1b42e [Andrew Or] Avoid spaces in akka system name
f644b55 [Andrew Or] Use more specific service name
  • Loading branch information
andrewor14 committed Aug 26, 2014
1 parent 52fbdc2 commit b21ae5b
Show file tree
Hide file tree
Showing 10 changed files with 47 additions and 29 deletions.
9 changes: 6 additions & 3 deletions core/src/main/scala/org/apache/spark/SparkEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,9 @@ object SparkEnv extends Logging {
private val env = new ThreadLocal[SparkEnv]
@volatile private var lastSetSparkEnv : SparkEnv = _

private[spark] val driverActorSystemName = "sparkDriver"
private[spark] val executorActorSystemName = "sparkExecutor"

def set(e: SparkEnv) {
lastSetSparkEnv = e
env.set(e)
Expand Down Expand Up @@ -146,9 +149,9 @@ object SparkEnv extends Logging {
}

val securityManager = new SecurityManager(conf)

val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", hostname, port, conf = conf,
securityManager = securityManager)
val actorSystemName = if (isDriver) driverActorSystemName else executorActorSystemName
val (actorSystem, boundPort) = AkkaUtils.createActorSystem(
actorSystemName, hostname, port, conf, securityManager)

// Figure out which port Akka actually bound to in case the original port is 0 or occupied.
// This is so that we tell the executors the correct port to connect to.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.spark.scheduler.cluster
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{Path, FileSystem}

import org.apache.spark.{Logging, SparkContext}
import org.apache.spark.{Logging, SparkContext, SparkEnv}
import org.apache.spark.scheduler.TaskSchedulerImpl

private[spark] class SimrSchedulerBackend(
Expand All @@ -38,8 +38,10 @@ private[spark] class SimrSchedulerBackend(
override def start() {
super.start()

val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format(
sc.conf.get("spark.driver.host"), sc.conf.get("spark.driver.port"),
val driverUrl = "akka.tcp://%s@%s:%s/user/%s".format(
SparkEnv.driverActorSystemName,
sc.conf.get("spark.driver.host"),
sc.conf.get("spark.driver.port"),
CoarseGrainedSchedulerBackend.ACTOR_NAME)

val conf = new Configuration()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.scheduler.cluster

import org.apache.spark.{Logging, SparkConf, SparkContext}
import org.apache.spark.{Logging, SparkConf, SparkContext, SparkEnv}
import org.apache.spark.deploy.{ApplicationDescription, Command}
import org.apache.spark.deploy.client.{AppClient, AppClientListener}
import org.apache.spark.scheduler.{ExecutorExited, ExecutorLossReason, SlaveLost, TaskSchedulerImpl}
Expand All @@ -42,8 +42,10 @@ private[spark] class SparkDeploySchedulerBackend(
super.start()

// The endpoint for executors to talk to us
val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format(
conf.get("spark.driver.host"), conf.get("spark.driver.port"),
val driverUrl = "akka.tcp://%s@%s:%s/user/%s".format(
SparkEnv.driverActorSystemName,
conf.get("spark.driver.host"),
conf.get("spark.driver.port"),
CoarseGrainedSchedulerBackend.ACTOR_NAME)
val args = Seq(driverUrl, "{{EXECUTOR_ID}}", "{{HOSTNAME}}", "{{CORES}}", "{{WORKER_URL}}")
val extraJavaOpts = sc.conf.getOption("spark.executor.extraJavaOptions")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import org.apache.mesos.{Scheduler => MScheduler}
import org.apache.mesos._
import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, TaskState => MesosTaskState, _}

import org.apache.spark.{Logging, SparkContext, SparkException}
import org.apache.spark.{Logging, SparkContext, SparkEnv, SparkException}
import org.apache.spark.scheduler.TaskSchedulerImpl
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend

Expand Down Expand Up @@ -130,7 +130,8 @@ private[spark] class CoarseMesosSchedulerBackend(
}
val command = CommandInfo.newBuilder()
.setEnvironment(environment)
val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format(
val driverUrl = "akka.tcp://%s@%s:%s/user/%s".format(
SparkEnv.driverActorSystemName,
conf.get("spark.driver.host"),
conf.get("spark.driver.port"),
CoarseGrainedSchedulerBackend.ACTOR_NAME)
Expand Down
5 changes: 3 additions & 2 deletions core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import akka.pattern.ask
import com.typesafe.config.ConfigFactory
import org.apache.log4j.{Level, Logger}

import org.apache.spark.{SparkException, Logging, SecurityManager, SparkConf}
import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkEnv, SparkException}

/**
* Various utility classes for working with Akka.
Expand Down Expand Up @@ -192,10 +192,11 @@ private[spark] object AkkaUtils extends Logging {
}

def makeDriverRef(name: String, conf: SparkConf, actorSystem: ActorSystem): ActorRef = {
val driverActorSystemName = SparkEnv.driverActorSystemName
val driverHost: String = conf.get("spark.driver.host", "localhost")
val driverPort: Int = conf.getInt("spark.driver.port", 7077)
Utils.checkHost(driverHost, "Expected hostname")
val url = s"akka.tcp://spark@$driverHost:$driverPort/user/$name"
val url = s"akka.tcp://$driverActorSystemName@$driverHost:$driverPort/user/$name"
val timeout = AkkaUtils.lookupTimeout(conf)
logInfo(s"Connecting to $name: $url")
Await.result(actorSystem.actorSelection(url).resolveOne(timeout), timeout)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,22 +20,21 @@ package org.apache.spark.streaming.receiver
import java.nio.ByteBuffer
import java.util.concurrent.atomic.AtomicLong

import scala.collection.mutable.{SynchronizedBuffer, ArrayBuffer}
import scala.collection.mutable.ArrayBuffer
import scala.concurrent.Await

import akka.actor.{Actor, Props}
import akka.pattern.ask

import com.google.common.base.Throwables

import org.apache.spark.{Logging, SparkEnv}
import org.apache.spark.storage.StreamBlockId
import org.apache.spark.streaming.scheduler._
import org.apache.spark.util.{Utils, AkkaUtils}
import org.apache.spark.storage.StreamBlockId
import org.apache.spark.streaming.scheduler.DeregisterReceiver
import org.apache.spark.streaming.scheduler.AddBlock
import scala.Some
import org.apache.spark.streaming.scheduler.RegisterReceiver
import com.google.common.base.Throwables

/**
* Concrete implementation of [[org.apache.spark.streaming.receiver.ReceiverSupervisor]]
Expand All @@ -56,7 +55,8 @@ private[streaming] class ReceiverSupervisorImpl(
private val trackerActor = {
val ip = env.conf.get("spark.driver.host", "localhost")
val port = env.conf.getInt("spark.driver.port", 7077)
val url = "akka.tcp://spark@%s:%s/user/ReceiverTracker".format(ip, port)
val url = "akka.tcp://%s@%s:%s/user/ReceiverTracker".format(
SparkEnv.driverActorSystemName, ip, port)
env.actorSystem.actorSelection(url)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import org.apache.hadoop.yarn.ipc.YarnRPC
import org.apache.hadoop.yarn.util.{ConverterUtils, Records}
import akka.actor._
import akka.remote._
import org.apache.spark.{Logging, SecurityManager, SparkConf}
import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkEnv}
import org.apache.spark.util.{Utils, AkkaUtils}
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.AddWebUIFilter
Expand Down Expand Up @@ -210,8 +210,11 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
sparkConf.set("spark.driver.host", driverHost)
sparkConf.set("spark.driver.port", driverPort.toString)

val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format(
driverHost, driverPort.toString, CoarseGrainedSchedulerBackend.ACTOR_NAME)
val driverUrl = "akka.tcp://%s@%s:%s/user/%s".format(
SparkEnv.driverActorSystemName,
driverHost,
driverPort.toString,
CoarseGrainedSchedulerBackend.ACTOR_NAME)

actor = actorSystem.actorOf(Props(new MonitorActor(driverUrl)), name = "YarnAM")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import scala.collection
import scala.collection.JavaConversions._
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}

import org.apache.spark.{Logging, SparkConf}
import org.apache.spark.{Logging, SparkConf, SparkEnv}
import org.apache.spark.scheduler.{SplitInfo,TaskSchedulerImpl}
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
import org.apache.spark.util.Utils
Expand Down Expand Up @@ -245,8 +245,10 @@ private[yarn] class YarnAllocationHandler(
// Deallocate + allocate can result in reusing id's wrongly - so use a different counter
// (executorIdCounter)
val executorId = executorIdCounter.incrementAndGet().toString
val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format(
sparkConf.get("spark.driver.host"), sparkConf.get("spark.driver.port"),
val driverUrl = "akka.tcp://%s@%s:%s/user/%s".format(
SparkEnv.driverActorSystemName,
sparkConf.get("spark.driver.host"),
sparkConf.get("spark.driver.port"),
CoarseGrainedSchedulerBackend.ACTOR_NAME)

logInfo("launching container on " + containerId + " host " + executorHostname)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.apache.hadoop.yarn.api.protocolrecords._
import org.apache.hadoop.yarn.conf.YarnConfiguration
import akka.actor._
import akka.remote._
import org.apache.spark.{Logging, SecurityManager, SparkConf}
import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkEnv}
import org.apache.spark.util.{Utils, AkkaUtils}
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.AddWebUIFilter
Expand Down Expand Up @@ -174,8 +174,11 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
sparkConf.set("spark.driver.host", driverHost)
sparkConf.set("spark.driver.port", driverPort.toString)

val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format(
driverHost, driverPort.toString, CoarseGrainedSchedulerBackend.ACTOR_NAME)
val driverUrl = "akka.tcp://%s@%s:%s/user/%s".format(
SparkEnv.driverActorSystemName,
driverHost,
driverPort.toString,
CoarseGrainedSchedulerBackend.ACTOR_NAME)

actor = actorSystem.actorOf(Props(new MonitorActor(driverUrl)), name = "YarnAM")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import scala.collection
import scala.collection.JavaConversions._
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}

import org.apache.spark.{Logging, SparkConf}
import org.apache.spark.{Logging, SparkConf, SparkEnv}
import org.apache.spark.scheduler.{SplitInfo,TaskSchedulerImpl}
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
import org.apache.spark.util.Utils
Expand Down Expand Up @@ -262,7 +262,8 @@ private[yarn] class YarnAllocationHandler(
numExecutorsRunning.decrementAndGet()
} else {
val executorId = executorIdCounter.incrementAndGet().toString
val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format(
val driverUrl = "akka.tcp://%s@%s:%s/user/%s".format(
SparkEnv.driverActorSystemName,
sparkConf.get("spark.driver.host"),
sparkConf.get("spark.driver.port"),
CoarseGrainedSchedulerBackend.ACTOR_NAME)
Expand Down

0 comments on commit b21ae5b

Please sign in to comment.