Skip to content

Commit

Permalink
Merge pull request alteryx#2 from markhamstra/master-csd
Browse files Browse the repository at this point in the history
Catching up with Apache and reverting the Zookeeper version
  • Loading branch information
jhartlaub committed Nov 7, 2013
2 parents db11c21 + 2f95938 commit d800a46
Show file tree
Hide file tree
Showing 52 changed files with 2,897 additions and 740 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ described below.

When developing a Spark application, specify the Hadoop version by adding the
"hadoop-client" artifact to your project's dependencies. For example, if you're
using Hadoop 1.0.1 and build your application using SBT, add this entry to
using Hadoop 1.2.1 and build your application using SBT, add this entry to
`libraryDependencies`:

"org.apache.hadoop" % "hadoop-client" % "1.2.1"
Expand Down
28 changes: 17 additions & 11 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import java.util.concurrent.atomic.AtomicInteger

import scala.collection.Map
import scala.collection.generic.Growable
import scala.collection.JavaConversions._
import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.HashMap

Expand All @@ -51,7 +51,7 @@ import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFor

import org.apache.mesos.MesosNativeLibrary

import org.apache.spark.deploy.LocalSparkCluster
import org.apache.spark.deploy.{LocalSparkCluster, SparkHadoopUtil}
import org.apache.spark.partial.{ApproximateEvaluator, PartialResult}
import org.apache.spark.rdd._
import org.apache.spark.scheduler._
Expand Down Expand Up @@ -248,7 +248,7 @@ class SparkContext(
/** A default Hadoop Configuration for the Hadoop code (e.g. file systems) that we reuse. */
val hadoopConfiguration = {
val env = SparkEnv.get
val conf = env.hadoop.newConfiguration()
val conf = SparkHadoopUtil.get.newConfiguration()
// Explicitly check for S3 environment variables
if (System.getenv("AWS_ACCESS_KEY_ID") != null &&
System.getenv("AWS_SECRET_ACCESS_KEY") != null) {
Expand All @@ -258,8 +258,10 @@ class SparkContext(
conf.set("fs.s3n.awsSecretAccessKey", System.getenv("AWS_SECRET_ACCESS_KEY"))
}
// Copy any "spark.hadoop.foo=bar" system properties into conf as "foo=bar"
for (key <- System.getProperties.toMap[String, String].keys if key.startsWith("spark.hadoop.")) {
conf.set(key.substring("spark.hadoop.".length), System.getProperty(key))
Utils.getSystemProperties.foreach { case (key, value) =>
if (key.startsWith("spark.hadoop.")) {
conf.set(key.substring("spark.hadoop.".length), value)
}
}
val bufferSize = System.getProperty("spark.buffer.size", "65536")
conf.set("io.file.buffer.size", bufferSize)
Expand Down Expand Up @@ -382,7 +384,7 @@ class SparkContext(
minSplits: Int = defaultMinSplits
): RDD[(K, V)] = {
// Add necessary security credentials to the JobConf before broadcasting it.
SparkEnv.get.hadoop.addCredentials(conf)
SparkHadoopUtil.get.addCredentials(conf)
new HadoopRDD(this, conf, inputFormatClass, keyClass, valueClass, minSplits)
}

Expand Down Expand Up @@ -592,7 +594,8 @@ class SparkContext(
val uri = new URI(path)
val key = uri.getScheme match {
case null | "file" => env.httpFileServer.addFile(new File(uri.getPath))
case _ => path
case "local" => "file:" + uri.getPath
case _ => path
}
addedFiles(key) = System.currentTimeMillis

Expand Down Expand Up @@ -686,7 +689,7 @@ class SparkContext(
/**
* Adds a JAR dependency for all tasks to be executed on this SparkContext in the future.
* The `path` passed can be either a local file, a file in HDFS (or other Hadoop-supported
* filesystems), or an HTTP, HTTPS or FTP URI.
* filesystems), an HTTP, HTTPS or FTP URI, or local:/path for a file on every worker node.
*/
def addJar(path: String) {
if (path == null) {
Expand All @@ -699,8 +702,9 @@ class SparkContext(
} else {
val uri = new URI(path)
key = uri.getScheme match {
// A JAR file which exists only on the driver node
case null | "file" =>
if (env.hadoop.isYarnMode()) {
if (SparkHadoopUtil.get.isYarnMode()) {
// In order for this to work on yarn the user must specify the --addjars option to
// the client to upload the file into the distributed cache to make it show up in the
// current working directory.
Expand All @@ -716,6 +720,9 @@ class SparkContext(
} else {
env.httpFileServer.addJar(new File(uri.getPath))
}
// A JAR file which exists locally on every worker node
case "local" =>
"file:" + uri.getPath
case _ =>
path
}
Expand Down Expand Up @@ -935,9 +942,8 @@ class SparkContext(
* prevent accidental overriding of checkpoint files in the existing directory.
*/
def setCheckpointDir(dir: String, useExisting: Boolean = false) {
val env = SparkEnv.get
val path = new Path(dir)
val fs = path.getFileSystem(env.hadoop.newConfiguration())
val fs = path.getFileSystem(SparkHadoopUtil.get.newConfiguration())
if (!useExisting) {
if (fs.exists(path)) {
throw new Exception("Checkpoint directory '" + path + "' already exists.")
Expand Down
17 changes: 4 additions & 13 deletions core/src/main/scala/org/apache/spark/SparkEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,13 @@ import akka.remote.RemoteActorRefProvider

import org.apache.spark.broadcast.BroadcastManager
import org.apache.spark.metrics.MetricsSystem
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.storage.{BlockManagerMasterActor, BlockManager, BlockManagerMaster}
import org.apache.spark.network.ConnectionManager
import org.apache.spark.serializer.{Serializer, SerializerManager}
import org.apache.spark.util.{Utils, AkkaUtils}
import org.apache.spark.api.python.PythonWorkerFactory

import com.google.common.collect.MapMaker

/**
* Holds all the runtime environment objects for a running Spark instance (either master or worker),
Expand All @@ -58,18 +58,9 @@ class SparkEnv (

private val pythonWorkers = mutable.HashMap[(String, Map[String, String]), PythonWorkerFactory]()

val hadoop = {
val yarnMode = java.lang.Boolean.valueOf(System.getProperty("SPARK_YARN_MODE", System.getenv("SPARK_YARN_MODE")))
if(yarnMode) {
try {
Class.forName("org.apache.spark.deploy.yarn.YarnSparkHadoopUtil").newInstance.asInstanceOf[SparkHadoopUtil]
} catch {
case th: Throwable => throw new SparkException("Unable to load YARN support", th)
}
} else {
new SparkHadoopUtil
}
}
// A general, soft-reference map for metadata needed during HadoopRDD split computation
// (e.g., HadoopFileRDD uses this to cache JobConfs and InputFormats).
private[spark] val hadoopJobMetadata = new MapMaker().softValues().makeMap[String, Any]()

def stop() {
pythonWorkers.foreach { case(key, worker) => worker.stop() }
Expand Down
11 changes: 11 additions & 0 deletions core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -622,4 +622,15 @@ object JavaPairRDD {
new JavaPairRDD[K, V](rdd)

implicit def toRDD[K, V](rdd: JavaPairRDD[K, V]): RDD[(K, V)] = rdd.rdd


/** Convert a JavaRDD of key-value pairs to JavaPairRDD. */
def fromJavaRDD[K, V](rdd: JavaRDD[(K, V)]): JavaPairRDD[K, V] = {
implicit val cmk: ClassManifest[K] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K]]
implicit val cmv: ClassManifest[V] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V]]
new JavaPairRDD[K, V](rdd.rdd)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import scala.reflect.ClassManifest;
import scala.reflect.ClassManifest$;
import scala.runtime.AbstractFunction3;

import java.io.Serializable;

Expand All @@ -35,4 +34,3 @@ public ClassManifest<R> returnType() {
return (ClassManifest<R>) ClassManifest$.MODULE$.fromClass(Object.class);
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ import scala.runtime.AbstractFunction3
* apply() method as call() and declare that it can throw Exception (since AbstractFunction3.apply
* isn't marked to allow that).
*/
private[spark] abstract class WrappedFunction3[T1, T2, T3, R] extends AbstractFunction3[T1, T2, T3, R] {
private[spark] abstract class WrappedFunction3[T1, T2, T3, R]
extends AbstractFunction3[T1, T2, T3, R] {
@throws(classOf[Exception])
def call(t1: T1, t2: T2, t3: T3): R

Expand Down
26 changes: 20 additions & 6 deletions core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,13 @@ package org.apache.spark.deploy
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.mapred.JobConf

import com.google.common.collect.MapMaker

import org.apache.spark.SparkException

/**
* Contains util methods to interact with Hadoop from Spark.
*/
private[spark]
class SparkHadoopUtil {
// A general, soft-reference map for metadata needed during HadoopRDD split computation
// (e.g., HadoopFileRDD uses this to cache JobConfs and InputFormats).
private[spark] val hadoopJobMetadata = new MapMaker().softValues().makeMap[String, Any]()

/**
* Return an appropriate (subclass) of Configuration. Creating config can initializes some Hadoop
Expand All @@ -45,5 +41,23 @@ class SparkHadoopUtil {
def addCredentials(conf: JobConf) {}

def isYarnMode(): Boolean = { false }

}

object SparkHadoopUtil {
private val hadoop = {
val yarnMode = java.lang.Boolean.valueOf(System.getProperty("SPARK_YARN_MODE", System.getenv("SPARK_YARN_MODE")))
if (yarnMode) {
try {
Class.forName("org.apache.spark.deploy.yarn.YarnSparkHadoopUtil").newInstance.asInstanceOf[SparkHadoopUtil]
} catch {
case th: Throwable => throw new SparkException("Unable to load YARN support", th)
}
} else {
new SparkHadoopUtil
}
}

def get: SparkHadoopUtil = {
hadoop
}
}
7 changes: 4 additions & 3 deletions core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.rdd

import org.apache.spark._
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.hadoop.mapred.{FileInputFormat, SequenceFileInputFormat, JobConf, Reporter}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.io.{NullWritable, BytesWritable}
Expand Down Expand Up @@ -83,7 +84,7 @@ private[spark] object CheckpointRDD extends Logging {
def writeToFile[T](path: String, blockSize: Int = -1)(ctx: TaskContext, iterator: Iterator[T]) {
val env = SparkEnv.get
val outputDir = new Path(path)
val fs = outputDir.getFileSystem(env.hadoop.newConfiguration())
val fs = outputDir.getFileSystem(SparkHadoopUtil.get.newConfiguration())

val finalOutputName = splitIdToFile(ctx.partitionId)
val finalOutputPath = new Path(outputDir, finalOutputName)
Expand Down Expand Up @@ -122,7 +123,7 @@ private[spark] object CheckpointRDD extends Logging {

def readFromFile[T](path: Path, context: TaskContext): Iterator[T] = {
val env = SparkEnv.get
val fs = path.getFileSystem(env.hadoop.newConfiguration())
val fs = path.getFileSystem(SparkHadoopUtil.get.newConfiguration())
val bufferSize = System.getProperty("spark.buffer.size", "65536").toInt
val fileInputStream = fs.open(path, bufferSize)
val serializer = env.serializer.newInstance()
Expand All @@ -145,7 +146,7 @@ private[spark] object CheckpointRDD extends Logging {
val sc = new SparkContext(cluster, "CheckpointRDD Test")
val rdd = sc.makeRDD(1 to 10, 10).flatMap(x => 1 to 10000)
val path = new Path(hdfsPath, "temp")
val fs = path.getFileSystem(env.hadoop.newConfiguration())
val fs = path.getFileSystem(SparkHadoopUtil.get.newConfiguration())
sc.runJob(rdd, CheckpointRDD.writeToFile(path.toString, 1024) _)
val cpRDD = new CheckpointRDD[Int](sc, path.toString)
assert(cpRDD.partitions.length == rdd.partitions.length, "Number of partitions is not the same")
Expand Down
7 changes: 4 additions & 3 deletions core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import org.apache.hadoop.util.ReflectionUtils

import org.apache.spark._
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.util.NextIterator
import org.apache.hadoop.conf.{Configuration, Configurable}

Expand Down Expand Up @@ -198,10 +199,10 @@ private[spark] object HadoopRDD {
* The three methods below are helpers for accessing the local map, a property of the SparkEnv of
* the local process.
*/
def getCachedMetadata(key: String) = SparkEnv.get.hadoop.hadoopJobMetadata.get(key)
def getCachedMetadata(key: String) = SparkEnv.get.hadoopJobMetadata.get(key)

def containsCachedMetadata(key: String) = SparkEnv.get.hadoop.hadoopJobMetadata.containsKey(key)
def containsCachedMetadata(key: String) = SparkEnv.get.hadoopJobMetadata.containsKey(key)

def putCachedMetadata(key: String, value: Any) =
SparkEnv.get.hadoop.hadoopJobMetadata.put(key, value)
SparkEnv.get.hadoopJobMetadata.put(key, value)
}
Original file line number Diff line number Diff line change
Expand Up @@ -420,15 +420,14 @@ class DAGScheduler(
case ExecutorLost(execId) =>
handleExecutorLost(execId)

case begin: BeginEvent =>
listenerBus.post(SparkListenerTaskStart(begin.task, begin.taskInfo))
case BeginEvent(task, taskInfo) =>
listenerBus.post(SparkListenerTaskStart(task, taskInfo))

case gettingResult: GettingResultEvent =>
listenerBus.post(SparkListenerTaskGettingResult(gettingResult.task, gettingResult.taskInfo))
case GettingResultEvent(task, taskInfo) =>
listenerBus.post(SparkListenerTaskGettingResult(task, taskInfo))

case completion: CompletionEvent =>
listenerBus.post(SparkListenerTaskEnd(
completion.task, completion.reason, completion.taskInfo, completion.taskMetrics))
case completion @ CompletionEvent(task, reason, _, _, taskInfo, taskMetrics) =>
listenerBus.post(SparkListenerTaskEnd(task, reason, taskInfo, taskMetrics))
handleTaskCompletion(completion)

case TaskSetFailed(taskSet, reason) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.scheduler

import org.apache.spark.{Logging, SparkEnv}
import org.apache.spark.deploy.SparkHadoopUtil
import scala.collection.immutable.Set
import org.apache.hadoop.mapred.{FileInputFormat, JobConf}
import org.apache.hadoop.security.UserGroupInformation
Expand Down Expand Up @@ -87,9 +88,8 @@ class InputFormatInfo(val configuration: Configuration, val inputFormatClazz: Cl

// This method does not expect failures, since validate has already passed ...
private def prefLocsFromMapreduceInputFormat(): Set[SplitInfo] = {
val env = SparkEnv.get
val conf = new JobConf(configuration)
env.hadoop.addCredentials(conf)
SparkHadoopUtil.get.addCredentials(conf)
FileInputFormat.setInputPaths(conf, path)

val instance: org.apache.hadoop.mapreduce.InputFormat[_, _] =
Expand All @@ -108,9 +108,8 @@ class InputFormatInfo(val configuration: Configuration, val inputFormatClazz: Cl

// This method does not expect failures, since validate has already passed ...
private def prefLocsFromMapredInputFormat(): Set[SplitInfo] = {
val env = SparkEnv.get
val jobConf = new JobConf(configuration)
env.hadoop.addCredentials(jobConf)
SparkHadoopUtil.get.addCredentials(jobConf)
FileInputFormat.setInputPaths(jobConf, path)

val instance: org.apache.hadoop.mapred.InputFormat[_, _] =
Expand Down
Loading

0 comments on commit d800a46

Please sign in to comment.