diff --git a/core/src/main/scala/org/apache/spark/scheduler/CoarseCookSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/CoarseCookSchedulerBackend.scala index 07a321764ecd6..98c8b60b26347 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/CoarseCookSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/CoarseCookSchedulerBackend.scala @@ -42,21 +42,26 @@ import org.apache.spark.rpc.RpcAddress import scala.collection.mutable object CoarseCookSchedulerBackend { - def fetchUri(uri: String): String = - Option(URI.create(uri).getScheme).map(_.toLowerCase) match { - case Some("http") => s"curl -O $uri" - case Some("spark-rsync") => - val regex = "^spark-rsync://".r - val cleanURI = regex.replaceFirstIn(uri, "") - "RSYNC_CONNECT_PROG=" + "\"" + "knc spark-rsync@%H $SPARK_DRIVER_PULL_PORT" + "\"" + - s" rsync $$SPARK_DRIVER_PULL_HOST::spark/${cleanURI} ./" - case Some("hdfs") => s"$$HADOOP_COMMAND fs -copyToLocal $uri ." - case Some("rover") => - val storage = "/opt/ts/services/storage-client.ts_storage_client/bin/storage" - s"$storage -X-Dtwosigma.logdir=$${MESOS_SANDBOX} cp $uri ." - case None | Some("file") => s"cp $uri ." - case Some(x) => sys.error(s"$x not supported yet") - } + + // A collection of regexes for extracting information from an URI + private val HTTP_URI_REGEX = """http://(.*)""".r + private val RSYNC_URI_REGEX = """rsync://(.*)""".r + private val SPARK_RSYNC_URI_REGEX = """spark-rsync://(.*)""".r + private val HDFS_URI_REGEX = """hdfs://(.*)""".r + + private[spark] def fetchURI(uri: String): String = uri.toLowerCase match { + case HTTP_URI_REGEX(httpURI) => + s"curl -O http://$httpURI" + case RSYNC_URI_REGEX(file) => + s"rsync $file ./" + case SPARK_RSYNC_URI_REGEX(file) => + "RSYNC_CONNECT_PROG=\"knc spark-rsync@%H $SPARK_DRIVER_PULL_PORT\"" + + s" rsync $$SPARK_DRIVER_PULL_HOST::spark/$file ./" + case HDFS_URI_REGEX(file) => + s"$$HADOOP_COMMAND fs -copyToLocal hdfs://$file ." + case _ => + sys.error(s"$uri not supported yet") + } def apply(scheduler: TaskSchedulerImpl, sc: SparkContext, cookHost: String, cookPort: Int): CoarseGrainedSchedulerBackend = { @@ -175,7 +180,7 @@ class CoarseCookSchedulerBackend( override def applicationAttemptId(): Option[String] = Some(applicationId()) def createJob(numCores: Double): Job = { - import CoarseCookSchedulerBackend.fetchUri + import CoarseCookSchedulerBackend.fetchURI val jobId = UUID.randomUUID() executorUUIDWriter(jobId) @@ -214,12 +219,12 @@ class CoarseCookSchedulerBackend( val keystoreUri = conf.getOption("spark.executor.keyStoreFilename") val keystorePull = keystoreUri.map { uri => - s"${fetchUri(uri)} && mv $$(basename $uri) spark-executor-keystore" + s"${fetchURI(uri)} && mv $$(basename $uri) spark-executor-keystore" } val urisCommand = uriValues.map { uri => - s"[ ! -e $$(basename $uri) ] && ${fetchUri(uri)} && tar -xvzf $$(basename $uri)" + + s"[ ! -e $$(basename $uri) ] && ${fetchURI(uri)} && tar -xvzf $$(basename $uri)" + " || (echo \"ERROR FETCHING\" && exit 1)" } @@ -227,7 +232,7 @@ class CoarseCookSchedulerBackend( .fold(Seq[String]()){ tgz => tgz.split(",").map(_.trim).toList } val shippedTarballsCommand = shippedTarballs.map { uri => - s"[ ! -e $$(basename $uri) ] && ${fetchUri(uri)} && tar -xvzf $$(basename $uri)" + s"[ ! -e $$(basename $uri) ] && ${fetchURI(uri)} && tar -xvzf $$(basename $uri)" } logDebug(s"command: $commandString") @@ -243,7 +248,7 @@ class CoarseCookSchedulerBackend( val remoteConfFetch = if (remoteHdfsConf.nonEmpty) { val name = Paths.get(remoteHdfsConf).getFileName Seq( - fetchUri(remoteHdfsConf), + fetchURI(remoteHdfsConf), "mkdir HADOOP_CONF_DIR", s"tar --strip-components=1 -xvzf $name -C HADOOP_CONF_DIR", // This must be absolute because we cd into the spark directory