From 2ca728655717ffb60e28cc1d73da725116663d8d Mon Sep 17 00:00:00 2001 From: WenboZhao Date: Thu, 11 May 2017 14:05:42 -0400 Subject: [PATCH] Support simple rsync schema in the executor uri (#12) * Support simple rsync schema in the executor uri * Support simple rsync schema in the executor uri (cherry picked from commit b043fdbd4d709a8c5ddd3d7f4647da32c2a51a6f) (cherry picked from commit 6d59ab153e4fd75e6e70252e917894c6139f00b4) (cherry picked from commit 8a7a336a58b46598c04db9cf810a58a278bbf8d3) (cherry picked from commit e9788758953e6d845d33866a0c556b1458ae4b13) (cherry picked from commit cdbef9b12a39d9fba5767a43090c2b71ad714a3b) --- .../cook/CoarseCookSchedulerBackend.scala | 45 ++++++++++--------- 1 file changed, 25 insertions(+), 20 deletions(-) diff --git a/cook/src/main/scala/org/apache/spark/scheduler/cluster/cook/CoarseCookSchedulerBackend.scala b/cook/src/main/scala/org/apache/spark/scheduler/cluster/cook/CoarseCookSchedulerBackend.scala index 641c1b0b05eb5..a7bf06e413375 100644 --- a/cook/src/main/scala/org/apache/spark/scheduler/cluster/cook/CoarseCookSchedulerBackend.scala +++ b/cook/src/main/scala/org/apache/spark/scheduler/cluster/cook/CoarseCookSchedulerBackend.scala @@ -43,21 +43,26 @@ import org.apache.spark.rpc.RpcAddress import scala.collection.mutable object CoarseCookSchedulerBackend { - def fetchUri(uri: String): String = - Option(URI.create(uri).getScheme).map(_.toLowerCase) match { - case Some("http") => s"curl -O $uri" - case Some("spark-rsync") => - val regex = "^spark-rsync://".r - val cleanURI = regex.replaceFirstIn(uri, "") - "RSYNC_CONNECT_PROG=" + "\"" + "knc spark-rsync@%H $SPARK_DRIVER_PULL_PORT" + "\"" + - s" rsync $$SPARK_DRIVER_PULL_HOST::spark/${cleanURI} ./" - case Some("hdfs") => s"$$HADOOP_COMMAND fs -copyToLocal $uri ." - case Some("rover") => - val storage = "/opt/ts/services/storage-client.ts_storage_client/bin/storage" - s"$storage -X-Dtwosigma.logdir=$${MESOS_SANDBOX} cp $uri ." - case None | Some("file") => s"cp $uri ." - case Some(x) => sys.error(s"$x not supported yet") - } + + // A collection of regexes for extracting information from an URI + private val HTTP_URI_REGEX = """http://(.*)""".r + private val RSYNC_URI_REGEX = """rsync://(.*)""".r + private val SPARK_RSYNC_URI_REGEX = """spark-rsync://(.*)""".r + private val HDFS_URI_REGEX = """hdfs://(.*)""".r + + private[spark] def fetchURI(uri: String): String = uri.toLowerCase match { + case HTTP_URI_REGEX(httpURI) => + s"curl -O http://$httpURI" + case RSYNC_URI_REGEX(file) => + s"rsync $file ./" + case SPARK_RSYNC_URI_REGEX(file) => + "RSYNC_CONNECT_PROG=\"knc spark-rsync@%H $SPARK_DRIVER_PULL_PORT\"" + + s" rsync $$SPARK_DRIVER_PULL_HOST::spark/$file ./" + case HDFS_URI_REGEX(file) => + s"$$HADOOP_COMMAND fs -copyToLocal hdfs://$file ." + case _ => + sys.error(s"$uri not supported yet") + } def apply(scheduler: TaskSchedulerImpl, sc: SparkContext, cookHost: String, cookPort: Int): CoarseGrainedSchedulerBackend = { @@ -176,7 +181,7 @@ class CoarseCookSchedulerBackend( override def applicationAttemptId(): Option[String] = Some(applicationId()) def createJob(numCores: Double): Job = { - import CoarseCookSchedulerBackend.fetchUri + import CoarseCookSchedulerBackend.fetchURI val jobId = UUID.randomUUID() executorUUIDWriter(jobId) @@ -215,12 +220,12 @@ class CoarseCookSchedulerBackend( val keystoreUri = conf.getOption("spark.executor.keyStoreFilename") val keystorePull = keystoreUri.map { uri => - s"${fetchUri(uri)} && mv $$(basename $uri) spark-executor-keystore" + s"${fetchURI(uri)} && mv $$(basename $uri) spark-executor-keystore" } val urisCommand = uriValues.map { uri => - s"[ ! -e $$(basename $uri) ] && ${fetchUri(uri)} && tar -xvzf $$(basename $uri)" + + s"[ ! -e $$(basename $uri) ] && ${fetchURI(uri)} && tar -xvzf $$(basename $uri)" + " || (echo \"ERROR FETCHING\" && exit 1)" } @@ -228,7 +233,7 @@ class CoarseCookSchedulerBackend( .fold(Seq[String]()){ tgz => tgz.split(",").map(_.trim).toList } val shippedTarballsCommand = shippedTarballs.map { uri => - s"[ ! -e $$(basename $uri) ] && ${fetchUri(uri)} && tar -xvzf $$(basename $uri)" + s"[ ! -e $$(basename $uri) ] && ${fetchURI(uri)} && tar -xvzf $$(basename $uri)" } logDebug(s"command: $commandString") @@ -244,7 +249,7 @@ class CoarseCookSchedulerBackend( val remoteConfFetch = if (remoteHdfsConf.nonEmpty) { val name = Paths.get(remoteHdfsConf).getFileName Seq( - fetchUri(remoteHdfsConf), + fetchURI(remoteHdfsConf), "mkdir HADOOP_CONF_DIR", s"tar --strip-components=1 -xvzf $name -C HADOOP_CONF_DIR", // This must be absolute because we cd into the spark directory