Skip to content

Commit

Permalink
Support simple rsync schema in the executor uri (apache#12)
Browse files Browse the repository at this point in the history
* Support simple rsync schema in the executor uri

* Support simple rsync schema in the executor uri

(cherry picked from commit b043fdb)
(cherry picked from commit 6d59ab1)
  • Loading branch information
WenboZhao authored and Curtis Howard committed Feb 15, 2018
1 parent 32e4d65 commit 8a7a336
Showing 1 changed file with 25 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,21 +43,26 @@ import org.apache.spark.rpc.RpcAddress
import scala.collection.mutable

object CoarseCookSchedulerBackend {
def fetchUri(uri: String): String =
Option(URI.create(uri).getScheme).map(_.toLowerCase) match {
case Some("http") => s"curl -O $uri"
case Some("spark-rsync") =>
val regex = "^spark-rsync://".r
val cleanURI = regex.replaceFirstIn(uri, "")
"RSYNC_CONNECT_PROG=" + "\"" + "knc spark-rsync@%H $SPARK_DRIVER_PULL_PORT" + "\"" +
s" rsync $$SPARK_DRIVER_PULL_HOST::spark/${cleanURI} ./"
case Some("hdfs") => s"$$HADOOP_COMMAND fs -copyToLocal $uri ."
case Some("rover") =>
val storage = "/opt/ts/services/storage-client.ts_storage_client/bin/storage"
s"$storage -X-Dtwosigma.logdir=$${MESOS_SANDBOX} cp $uri ."
case None | Some("file") => s"cp $uri ."
case Some(x) => sys.error(s"$x not supported yet")
}

// A collection of regexes for extracting information from an URI
private val HTTP_URI_REGEX = """http://(.*)""".r
private val RSYNC_URI_REGEX = """rsync://(.*)""".r
private val SPARK_RSYNC_URI_REGEX = """spark-rsync://(.*)""".r
private val HDFS_URI_REGEX = """hdfs://(.*)""".r

private[spark] def fetchURI(uri: String): String = uri.toLowerCase match {
case HTTP_URI_REGEX(httpURI) =>
s"curl -O http://$httpURI"
case RSYNC_URI_REGEX(file) =>
s"rsync $file ./"
case SPARK_RSYNC_URI_REGEX(file) =>
"RSYNC_CONNECT_PROG=\"knc spark-rsync@%H $SPARK_DRIVER_PULL_PORT\"" +
s" rsync $$SPARK_DRIVER_PULL_HOST::spark/$file ./"
case HDFS_URI_REGEX(file) =>
s"$$HADOOP_COMMAND fs -copyToLocal hdfs://$file ."
case _ =>
sys.error(s"$uri not supported yet")
}

def apply(scheduler: TaskSchedulerImpl, sc: SparkContext, cookHost: String,
cookPort: Int): CoarseGrainedSchedulerBackend = {
Expand Down Expand Up @@ -176,7 +181,7 @@ class CoarseCookSchedulerBackend(
override def applicationAttemptId(): Option[String] = Some(applicationId())

def createJob(numCores: Double): Job = {
import CoarseCookSchedulerBackend.fetchUri
import CoarseCookSchedulerBackend.fetchURI

val jobId = UUID.randomUUID()
executorUUIDWriter(jobId)
Expand Down Expand Up @@ -215,20 +220,20 @@ class CoarseCookSchedulerBackend(

val keystoreUri = conf.getOption("spark.executor.keyStoreFilename")
val keystorePull = keystoreUri.map { uri =>
s"${fetchUri(uri)} && mv $$(basename $uri) spark-executor-keystore"
s"${fetchURI(uri)} && mv $$(basename $uri) spark-executor-keystore"
}

val urisCommand =
uriValues.map { uri =>
s"[ ! -e $$(basename $uri) ] && ${fetchUri(uri)} && tar -xvzf $$(basename $uri)" +
s"[ ! -e $$(basename $uri) ] && ${fetchURI(uri)} && tar -xvzf $$(basename $uri)" +
" || (echo \"ERROR FETCHING\" && exit 1)"
}

val shippedTarballs: Seq[String] = conf.getOption("spark.cook.shippedTarballs")
.fold(Seq[String]()){ tgz => tgz.split(",").map(_.trim).toList }

val shippedTarballsCommand = shippedTarballs.map { uri =>
s"[ ! -e $$(basename $uri) ] && ${fetchUri(uri)} && tar -xvzf $$(basename $uri)"
s"[ ! -e $$(basename $uri) ] && ${fetchURI(uri)} && tar -xvzf $$(basename $uri)"
}

logDebug(s"command: $commandString")
Expand All @@ -244,7 +249,7 @@ class CoarseCookSchedulerBackend(
val remoteConfFetch = if (remoteHdfsConf.nonEmpty) {
val name = Paths.get(remoteHdfsConf).getFileName
Seq(
fetchUri(remoteHdfsConf),
fetchURI(remoteHdfsConf),
"mkdir HADOOP_CONF_DIR",
s"tar --strip-components=1 -xvzf $name -C HADOOP_CONF_DIR",
// This must be absolute because we cd into the spark directory
Expand Down

0 comments on commit 8a7a336

Please sign in to comment.