From cdbef9b12a39d9fba5767a43090c2b71ad714a3b Mon Sep 17 00:00:00 2001 From: WenboZhao Date: Thu, 11 May 2017 14:05:42 -0400 Subject: [PATCH] Support simple rsync schema in the executor uri (#12) * Support simple rsync schema in the executor uri * Support simple rsync schema in the executor uri (cherry picked from commit b043fdbd4d709a8c5ddd3d7f4647da32c2a51a6f) (cherry picked from commit 6d59ab153e4fd75e6e70252e917894c6139f00b4) (cherry picked from commit 8a7a336a58b46598c04db9cf810a58a278bbf8d3) (cherry picked from commit e9788758953e6d845d33866a0c556b1458ae4b13) --- .../cook/CoarseCookSchedulerBackend.scala | 45 ++++++++++--------- 1 file changed, 25 insertions(+), 20 deletions(-) diff --git a/cook/src/main/scala/org/apache/spark/scheduler/cluster/cook/CoarseCookSchedulerBackend.scala b/cook/src/main/scala/org/apache/spark/scheduler/cluster/cook/CoarseCookSchedulerBackend.scala index 641c1b0b05eb5..a7bf06e413375 100644 --- a/cook/src/main/scala/org/apache/spark/scheduler/cluster/cook/CoarseCookSchedulerBackend.scala +++ b/cook/src/main/scala/org/apache/spark/scheduler/cluster/cook/CoarseCookSchedulerBackend.scala @@ -43,21 +43,26 @@ import org.apache.spark.rpc.RpcAddress import scala.collection.mutable object CoarseCookSchedulerBackend { - def fetchUri(uri: String): String = - Option(URI.create(uri).getScheme).map(_.toLowerCase) match { - case Some("http") => s"curl -O $uri" - case Some("spark-rsync") => - val regex = "^spark-rsync://".r - val cleanURI = regex.replaceFirstIn(uri, "") - "RSYNC_CONNECT_PROG=" + "\"" + "knc spark-rsync@%H $SPARK_DRIVER_PULL_PORT" + "\"" + - s" rsync $$SPARK_DRIVER_PULL_HOST::spark/${cleanURI} ./" - case Some("hdfs") => s"$$HADOOP_COMMAND fs -copyToLocal $uri ." - case Some("rover") => - val storage = "/opt/ts/services/storage-client.ts_storage_client/bin/storage" - s"$storage -X-Dtwosigma.logdir=$${MESOS_SANDBOX} cp $uri ." - case None | Some("file") => s"cp $uri ." - case Some(x) => sys.error(s"$x not supported yet") - } + + // A collection of regexes for extracting information from an URI + private val HTTP_URI_REGEX = """http://(.*)""".r + private val RSYNC_URI_REGEX = """rsync://(.*)""".r + private val SPARK_RSYNC_URI_REGEX = """spark-rsync://(.*)""".r + private val HDFS_URI_REGEX = """hdfs://(.*)""".r + + private[spark] def fetchURI(uri: String): String = uri.toLowerCase match { + case HTTP_URI_REGEX(httpURI) => + s"curl -O http://$httpURI" + case RSYNC_URI_REGEX(file) => + s"rsync $file ./" + case SPARK_RSYNC_URI_REGEX(file) => + "RSYNC_CONNECT_PROG=\"knc spark-rsync@%H $SPARK_DRIVER_PULL_PORT\"" + + s" rsync $$SPARK_DRIVER_PULL_HOST::spark/$file ./" + case HDFS_URI_REGEX(file) => + s"$$HADOOP_COMMAND fs -copyToLocal hdfs://$file ." + case _ => + sys.error(s"$uri not supported yet") + } def apply(scheduler: TaskSchedulerImpl, sc: SparkContext, cookHost: String, cookPort: Int): CoarseGrainedSchedulerBackend = { @@ -176,7 +181,7 @@ class CoarseCookSchedulerBackend( override def applicationAttemptId(): Option[String] = Some(applicationId()) def createJob(numCores: Double): Job = { - import CoarseCookSchedulerBackend.fetchUri + import CoarseCookSchedulerBackend.fetchURI val jobId = UUID.randomUUID() executorUUIDWriter(jobId) @@ -215,12 +220,12 @@ class CoarseCookSchedulerBackend( val keystoreUri = conf.getOption("spark.executor.keyStoreFilename") val keystorePull = keystoreUri.map { uri => - s"${fetchUri(uri)} && mv $$(basename $uri) spark-executor-keystore" + s"${fetchURI(uri)} && mv $$(basename $uri) spark-executor-keystore" } val urisCommand = uriValues.map { uri => - s"[ ! -e $$(basename $uri) ] && ${fetchUri(uri)} && tar -xvzf $$(basename $uri)" + + s"[ ! -e $$(basename $uri) ] && ${fetchURI(uri)} && tar -xvzf $$(basename $uri)" + " || (echo \"ERROR FETCHING\" && exit 1)" } @@ -228,7 +233,7 @@ class CoarseCookSchedulerBackend( .fold(Seq[String]()){ tgz => tgz.split(",").map(_.trim).toList } val shippedTarballsCommand = shippedTarballs.map { uri => - s"[ ! -e $$(basename $uri) ] && ${fetchUri(uri)} && tar -xvzf $$(basename $uri)" + s"[ ! -e $$(basename $uri) ] && ${fetchURI(uri)} && tar -xvzf $$(basename $uri)" } logDebug(s"command: $commandString") @@ -244,7 +249,7 @@ class CoarseCookSchedulerBackend( val remoteConfFetch = if (remoteHdfsConf.nonEmpty) { val name = Paths.get(remoteHdfsConf).getFileName Seq( - fetchUri(remoteHdfsConf), + fetchURI(remoteHdfsConf), "mkdir HADOOP_CONF_DIR", s"tar --strip-components=1 -xvzf $name -C HADOOP_CONF_DIR", // This must be absolute because we cd into the spark directory