Skip to content

Commit

Permalink
Support simple rsync schema in the executor uri (apache#12)
Browse files Browse the repository at this point in the history
* Support simple rsync schema in the executor uri

* Support simple rsync schema in the executor uri

(cherry picked from commit b043fdb)
(cherry picked from commit 6d59ab1)
(cherry picked from commit 8a7a336)
(cherry picked from commit e978875)
(cherry picked from commit cdbef9b)
  • Loading branch information
WenboZhao authored and Curtis Howard committed Oct 8, 2018
1 parent e31784d commit 2ca7286
Showing 1 changed file with 25 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,21 +43,26 @@ import org.apache.spark.rpc.RpcAddress
import scala.collection.mutable

object CoarseCookSchedulerBackend {
def fetchUri(uri: String): String =
Option(URI.create(uri).getScheme).map(_.toLowerCase) match {
case Some("http") => s"curl -O $uri"
case Some("spark-rsync") =>
val regex = "^spark-rsync://".r
val cleanURI = regex.replaceFirstIn(uri, "")
"RSYNC_CONNECT_PROG=" + "\"" + "knc spark-rsync@%H $SPARK_DRIVER_PULL_PORT" + "\"" +
s" rsync $$SPARK_DRIVER_PULL_HOST::spark/${cleanURI} ./"
case Some("hdfs") => s"$$HADOOP_COMMAND fs -copyToLocal $uri ."
case Some("rover") =>
val storage = "/opt/ts/services/storage-client.ts_storage_client/bin/storage"
s"$storage -X-Dtwosigma.logdir=$${MESOS_SANDBOX} cp $uri ."
case None | Some("file") => s"cp $uri ."
case Some(x) => sys.error(s"$x not supported yet")
}

// A collection of regexes for extracting information from an URI
private val HTTP_URI_REGEX = """http://(.*)""".r
private val RSYNC_URI_REGEX = """rsync://(.*)""".r
private val SPARK_RSYNC_URI_REGEX = """spark-rsync://(.*)""".r
private val HDFS_URI_REGEX = """hdfs://(.*)""".r

private[spark] def fetchURI(uri: String): String = uri.toLowerCase match {
case HTTP_URI_REGEX(httpURI) =>
s"curl -O http://$httpURI"
case RSYNC_URI_REGEX(file) =>
s"rsync $file ./"
case SPARK_RSYNC_URI_REGEX(file) =>
"RSYNC_CONNECT_PROG=\"knc spark-rsync@%H $SPARK_DRIVER_PULL_PORT\"" +
s" rsync $$SPARK_DRIVER_PULL_HOST::spark/$file ./"
case HDFS_URI_REGEX(file) =>
s"$$HADOOP_COMMAND fs -copyToLocal hdfs://$file ."
case _ =>
sys.error(s"$uri not supported yet")
}

def apply(scheduler: TaskSchedulerImpl, sc: SparkContext, cookHost: String,
cookPort: Int): CoarseGrainedSchedulerBackend = {
Expand Down Expand Up @@ -176,7 +181,7 @@ class CoarseCookSchedulerBackend(
override def applicationAttemptId(): Option[String] = Some(applicationId())

def createJob(numCores: Double): Job = {
import CoarseCookSchedulerBackend.fetchUri
import CoarseCookSchedulerBackend.fetchURI

val jobId = UUID.randomUUID()
executorUUIDWriter(jobId)
Expand Down Expand Up @@ -215,20 +220,20 @@ class CoarseCookSchedulerBackend(

val keystoreUri = conf.getOption("spark.executor.keyStoreFilename")
val keystorePull = keystoreUri.map { uri =>
s"${fetchUri(uri)} && mv $$(basename $uri) spark-executor-keystore"
s"${fetchURI(uri)} && mv $$(basename $uri) spark-executor-keystore"
}

val urisCommand =
uriValues.map { uri =>
s"[ ! -e $$(basename $uri) ] && ${fetchUri(uri)} && tar -xvzf $$(basename $uri)" +
s"[ ! -e $$(basename $uri) ] && ${fetchURI(uri)} && tar -xvzf $$(basename $uri)" +
" || (echo \"ERROR FETCHING\" && exit 1)"
}

val shippedTarballs: Seq[String] = conf.getOption("spark.cook.shippedTarballs")
.fold(Seq[String]()){ tgz => tgz.split(",").map(_.trim).toList }

val shippedTarballsCommand = shippedTarballs.map { uri =>
s"[ ! -e $$(basename $uri) ] && ${fetchUri(uri)} && tar -xvzf $$(basename $uri)"
s"[ ! -e $$(basename $uri) ] && ${fetchURI(uri)} && tar -xvzf $$(basename $uri)"
}

logDebug(s"command: $commandString")
Expand All @@ -244,7 +249,7 @@ class CoarseCookSchedulerBackend(
val remoteConfFetch = if (remoteHdfsConf.nonEmpty) {
val name = Paths.get(remoteHdfsConf).getFileName
Seq(
fetchUri(remoteHdfsConf),
fetchURI(remoteHdfsConf),
"mkdir HADOOP_CONF_DIR",
s"tar --strip-components=1 -xvzf $name -C HADOOP_CONF_DIR",
// This must be absolute because we cd into the spark directory
Expand Down

0 comments on commit 2ca7286

Please sign in to comment.