Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SPARK-1136: Fix FaultToleranceTest for Docker 0.8.1 #5

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,31 +29,41 @@ import scala.sys.process._

import net.liftweb.json.JsonParser

import org.apache.spark.{Logging, SparkContext}
import org.apache.spark.deploy.master.RecoveryState
import org.apache.spark.{Logging, SparkConf, SparkContext}
import org.apache.spark.deploy.master.{RecoveryState, SparkCuratorUtil}

/**
* This suite tests the fault tolerance of the Spark standalone scheduler, mainly the Master.
* In order to mimic a real distributed cluster more closely, Docker is used.
* Execute using
* ./spark-class org.apache.spark.deploy.FaultToleranceTest
* ./bin/spark-class org.apache.spark.deploy.FaultToleranceTest
*
* Make sure that that the environment includes the following properties in SPARK_DAEMON_JAVA_OPTS:
* Make sure that that the environment includes the following properties in SPARK_DAEMON_JAVA_OPTS
* *and* SPARK_JAVA_OPTS:
* - spark.deploy.recoveryMode=ZOOKEEPER
* - spark.deploy.zookeeper.url=172.17.42.1:2181
* Note that 172.17.42.1 is the default docker ip for the host and 2181 is the default ZK port.
*
* In case of failure, make sure to kill off prior docker containers before restarting:
* docker kill $(docker ps -q)
*
* Unfortunately, due to the Docker dependency this suite cannot be run automatically without a
* working installation of Docker. In addition to having Docker, the following are assumed:
* - Docker can run without sudo (see http://docs.docker.io/en/latest/use/basics/)
* - The docker images tagged spark-test-master and spark-test-worker are built from the
* docker/ directory. Run 'docker/spark-test/build' to generate these.
*/
private[spark] object FaultToleranceTest extends App with Logging {

val conf = new SparkConf()
val ZK_DIR = conf.get("spark.deploy.zookeeper.dir", "/spark")

val masters = ListBuffer[TestMasterInfo]()
val workers = ListBuffer[TestWorkerInfo]()
var sc: SparkContext = _

val zk = SparkCuratorUtil.newClient(conf)

var numPassed = 0
var numFailed = 0

Expand All @@ -71,6 +81,10 @@ private[spark] object FaultToleranceTest extends App with Logging {
sc = null
}
terminateCluster()

// Clear ZK directories in between tests (for speed purposes)
SparkCuratorUtil.deleteRecursive(zk, ZK_DIR + "/spark_leader")
SparkCuratorUtil.deleteRecursive(zk, ZK_DIR + "/master_status")
}

test("sanity-basic") {
Expand Down Expand Up @@ -167,26 +181,34 @@ private[spark] object FaultToleranceTest extends App with Logging {
try {
fn
numPassed += 1
logInfo("==============================================")
logInfo("Passed: " + name)
logInfo("==============================================")
} catch {
case e: Exception =>
numFailed += 1
logInfo("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")
logError("FAILED: " + name, e)
logInfo("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")
sys.exit(1)
}
afterEach()
}

def addMasters(num: Int) {
logInfo(s">>>>> ADD MASTERS $num <<<<<")
(1 to num).foreach { _ => masters += SparkDocker.startMaster(dockerMountDir) }
}

def addWorkers(num: Int) {
logInfo(s">>>>> ADD WORKERS $num <<<<<")
val masterUrls = getMasterUrls(masters)
(1 to num).foreach { _ => workers += SparkDocker.startWorker(dockerMountDir, masterUrls) }
}

/** Creates a SparkContext, which constructs a Client to interact with our cluster. */
def createClient() = {
logInfo(">>>>> CREATE CLIENT <<<<<")
if (sc != null) { sc.stop() }
// Counter-hack: Because of a hack in SparkEnv#create() that changes this
// property, we need to reset it.
Expand All @@ -205,6 +227,7 @@ private[spark] object FaultToleranceTest extends App with Logging {
}

def killLeader(): Unit = {
logInfo(">>>>> KILL LEADER <<<<<")
masters.foreach(_.readState())
val leader = getLeader
masters -= leader
Expand All @@ -214,6 +237,7 @@ private[spark] object FaultToleranceTest extends App with Logging {
def delay(secs: Duration = 5.seconds) = Thread.sleep(secs.toMillis)

def terminateCluster() {
logInfo(">>>>> TERMINATE CLUSTER <<<<<")
masters.foreach(_.kill())
workers.foreach(_.kill())
masters.clear()
Expand Down Expand Up @@ -244,6 +268,7 @@ private[spark] object FaultToleranceTest extends App with Logging {
* are all alive in a proper configuration (e.g., only one leader).
*/
def assertValidClusterState() = {
logInfo(">>>>> ASSERT VALID CLUSTER STATE <<<<<")
assertUsable()
var numAlive = 0
var numStandby = 0
Expand Down Expand Up @@ -325,7 +350,11 @@ private[spark] class TestMasterInfo(val ip: String, val dockerId: DockerId, val

val workers = json \ "workers"
val liveWorkers = workers.children.filter(w => (w \ "state").extract[String] == "ALIVE")
liveWorkerIPs = liveWorkers.map(w => (w \ "host").extract[String])
// Extract the worker IP from "webuiaddress" (rather than "host") because the host name
// on containers is a weird hash instead of the actual IP address.
liveWorkerIPs = liveWorkers.map {
w => (w \ "webuiaddress").extract[String].stripPrefix("http://").stripSuffix(":8081")
}

numLiveApps = (json \ "activeapps").children.size

Expand Down Expand Up @@ -402,7 +431,7 @@ private[spark] object Docker extends Logging {
def makeRunCmd(imageTag: String, args: String = "", mountDir: String = ""): ProcessBuilder = {
val mountCmd = if (mountDir != "") { " -v " + mountDir } else ""

val cmd = "docker run %s %s %s".format(mountCmd, imageTag, args)
val cmd = "docker run -privileged %s %s %s".format(mountCmd, imageTag, args)
logDebug("Run command: " + cmd)
cmd
}
Expand Down
11 changes: 9 additions & 2 deletions core/src/main/scala/org/apache/spark/deploy/master/Master.scala
Original file line number Diff line number Diff line change
Expand Up @@ -529,8 +529,15 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act

val workerAddress = worker.actor.path.address
if (addressToWorker.contains(workerAddress)) {
logInfo("Attempted to re-register worker at same address: " + workerAddress)
return false
val oldWorker = addressToWorker(workerAddress)
if (oldWorker.state == WorkerState.UNKNOWN) {
// A worker registering from UNKNOWN implies that the worker was restarted during recovery.
// The old worker must thus be dead, so we will remove it and accept the new worker.
removeWorker(oldWorker)
} else {
logInfo("Attempted to re-register worker at same address: " + workerAddress)
return false
}
}

workers += worker
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@

package org.apache.spark.deploy.master

import org.apache.spark.{SparkConf, Logging}
import scala.collection.JavaConversions._

import org.apache.curator.framework.{CuratorFramework, CuratorFrameworkFactory}
import org.apache.curator.retry.ExponentialBackoffRetry
import org.apache.zookeeper.KeeperException

import org.apache.spark.{Logging, SparkConf}

object SparkCuratorUtil extends Logging {

Expand Down Expand Up @@ -50,4 +52,13 @@ object SparkCuratorUtil extends Logging {
}
}
}

def deleteRecursive(zk: CuratorFramework, path: String) {
if (zk.checkExists().forPath(path) != null) {
for (child <- zk.getChildren.forPath(path)) {
zk.delete().forPath(path + "/" + child)
}
zk.delete().forPath(path)
}
}
}
4 changes: 3 additions & 1 deletion docker/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,6 @@ Spark docker files
===========

Drawn from Matt Massie's docker files (https://github.com/massie/dockerfiles),
as well as some updates from Andre Schumacher (https://github.com/AndreSchumacher/docker).
as well as some updates from Andre Schumacher (https://github.com/AndreSchumacher/docker).

Tested with Docker version 0.8.1.
8 changes: 7 additions & 1 deletion docker/spark-test/master/default_cmd
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,10 @@

IP=$(ip -o -4 addr list eth0 | perl -n -e 'if (m{inet\s([\d\.]+)\/\d+\s}xms) { print $1 }')
echo "CONTAINER_IP=$IP"
/opt/spark/spark-class org.apache.spark.deploy.master.Master -i $IP
export SPARK_LOCAL_IP=$IP
export SPARK_PUBLIC_DNS=$IP

# Avoid the default Docker behavior of mapping our IP address to an unreachable host name
umount /etc/hosts

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, SPARK_PUBLIC_DNS is used to set worker's hostname, then do we need this umount /etc/hosts ? it seems to me in my env, it doesn't help with the hostname. the host name still keep to be the container's ID

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, SPARK_PUBLIC_DNS does not actually change the Akka hostname, it only changes the name we tell the user to use to access the master/worker, which is better than nothing.

The actual hostname used by the Worker's Akka is specified in org.apache.spark.util.Utils.localIpAddressHostname, which unfortunately calls InetAddress.getByName(address), transmuting the ip address into the container's ID as long as the /etc/hosts file exists. There is no current way to circumvent this behavior (there is a Utils.setCustomHostname, but it is only for testing purposes and cannot be specified by a configuration variable).

/opt/spark/bin/spark-class org.apache.spark.deploy.master.Master -i $IP
8 changes: 7 additions & 1 deletion docker/spark-test/worker/default_cmd
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,10 @@

IP=$(ip -o -4 addr list eth0 | perl -n -e 'if (m{inet\s([\d\.]+)\/\d+\s}xms) { print $1 }')
echo "CONTAINER_IP=$IP"
/opt/spark/spark-class org.apache.spark.deploy.worker.Worker $1
export SPARK_LOCAL_IP=$IP
export SPARK_PUBLIC_DNS=$IP

# Avoid the default Docker behavior of mapping our IP address to an unreachable host name
umount /etc/hosts

/opt/spark/bin/spark-class org.apache.spark.deploy.worker.Worker $1