Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master'
Browse files Browse the repository at this point in the history
  • Loading branch information
Augustin Borsu committed Feb 18, 2015
2 parents 2e89719 + e79a7a6 commit 77ff9ca
Show file tree
Hide file tree
Showing 115 changed files with 1,840 additions and 960 deletions.
1 change: 1 addition & 0 deletions .rat-excludes
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ fairscheduler.xml.template
spark-defaults.conf.template
log4j.properties
log4j.properties.template
metrics.properties
metrics.properties.template
slaves
slaves.template
Expand Down
5 changes: 4 additions & 1 deletion build/mvn
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
# Preserve the calling directory
_CALLING_DIR="$(pwd)"
# Options used during compilation
_COMPILE_JVM_OPTS="-Xmx2g -XX:MaxPermSize=512M -XX:ReservedCodeCacheSize=512m"

# Installs any application tarball given a URL, the expected tarball name,
# and, optionally, a checkable binary path to determine if the binary has
Expand Down Expand Up @@ -136,14 +138,15 @@ cd "${_CALLING_DIR}"
# Now that zinc is ensured to be installed, check its status and, if its
# not running or just installed, start it
if [ -n "${ZINC_INSTALL_FLAG}" -o -z "`${ZINC_BIN} -status`" ]; then
export ZINC_OPTS=${ZINC_OPTS:-"$_COMPILE_JVM_OPTS"}
${ZINC_BIN} -shutdown
${ZINC_BIN} -start -port ${ZINC_PORT} \
-scala-compiler "${SCALA_COMPILER}" \
-scala-library "${SCALA_LIBRARY}" &>/dev/null
fi

# Set any `mvn` options if not already present
export MAVEN_OPTS=${MAVEN_OPTS:-"-Xmx2g -XX:MaxPermSize=512M -XX:ReservedCodeCacheSize=512m"}
export MAVEN_OPTS=${MAVEN_OPTS:-"$_COMPILE_JVM_OPTS"}

# Last, call the `mvn` command as usual
${MVN_BIN} "$@"
11 changes: 9 additions & 2 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -961,11 +961,18 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
}

/** Build the union of a list of RDDs. */
def union[T: ClassTag](rdds: Seq[RDD[T]]): RDD[T] = new UnionRDD(this, rdds)
def union[T: ClassTag](rdds: Seq[RDD[T]]): RDD[T] = {
val partitioners = rdds.flatMap(_.partitioner).toSet
if (partitioners.size == 1) {
new PartitionerAwareUnionRDD(this, rdds)
} else {
new UnionRDD(this, rdds)
}
}

/** Build the union of a list of RDDs passed as variable-length arguments. */
def union[T: ClassTag](first: RDD[T], rest: RDD[T]*): RDD[T] =
new UnionRDD(this, Seq(first) ++ rest)
union(Seq(first) ++ rest)

/** Get an RDD that has no partitions or elements. */
def emptyRDD[T: ClassTag] = new EmptyRDD[T](this)
Expand Down
31 changes: 14 additions & 17 deletions core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -144,24 +144,11 @@ private[spark] class PythonRDD(
stream.readFully(update)
accumulator += Collections.singletonList(update)
}

// Check whether the worker is ready to be re-used.
if (reuse_worker) {
// It has a high possibility that the ending mark is already available,
// And current task should not be blocked by checking it

if (stream.available() >= 4) {
val ending = stream.readInt()
if (ending == SpecialLengths.END_OF_STREAM) {
env.releasePythonWorker(pythonExec, envVars.toMap, worker)
released = true
logInfo(s"Communication with worker ended cleanly, re-use it: $worker")
} else {
logInfo(s"Communication with worker did not end cleanly " +
s"(ending with $ending), close it: $worker")
}
} else {
logInfo(s"The ending mark from worker is not available, close it: $worker")
if (stream.readInt() == SpecialLengths.END_OF_STREAM) {
if (reuse_worker) {
env.releasePythonWorker(pythonExec, envVars.toMap, worker)
released = true
}
}
null
Expand Down Expand Up @@ -316,6 +303,7 @@ private class PythonException(msg: String, cause: Exception) extends RuntimeExce
private class PairwiseRDD(prev: RDD[Array[Byte]]) extends
RDD[(Long, Array[Byte])](prev) {
override def getPartitions = prev.partitions
override val partitioner = prev.partitioner
override def compute(split: Partition, context: TaskContext) =
prev.iterator(split, context).grouped(2).map {
case Seq(a, b) => (Utils.deserializeLongValue(a), b)
Expand All @@ -342,6 +330,15 @@ private[spark] object PythonRDD extends Logging {
}
}

/**
* Return an RDD of values from an RDD of (Long, Array[Byte]), with preservePartitions=true
*
* This is useful for PySpark to have the partitioner after partitionBy()
*/
def valueOfPair(pair: JavaPairRDD[Long, Array[Byte]]): JavaRDD[Array[Byte]] = {
pair.rdd.mapPartitions(it => it.map(_._2), true)
}

/**
* Adapter for calling SparkContext#runJob from Python.
*
Expand Down
52 changes: 37 additions & 15 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,26 @@ object SparkSubmit {

val isYarnCluster = clusterManager == YARN && deployMode == CLUSTER

// Resolve maven dependencies if there are any and add classpath to jars. Add them to py-files
// too for packages that include Python code
val resolvedMavenCoordinates =
SparkSubmitUtils.resolveMavenCoordinates(
args.packages, Option(args.repositories), Option(args.ivyRepoPath))
if (!resolvedMavenCoordinates.trim.isEmpty) {
if (args.jars == null || args.jars.trim.isEmpty) {
args.jars = resolvedMavenCoordinates
} else {
args.jars += s",$resolvedMavenCoordinates"
}
if (args.isPython) {
if (args.pyFiles == null || args.pyFiles.trim.isEmpty) {
args.pyFiles = resolvedMavenCoordinates
} else {
args.pyFiles += s",$resolvedMavenCoordinates"
}
}
}

// Require all python files to be local, so we can add them to the PYTHONPATH
// In YARN cluster mode, python files are distributed as regular files, which can be non-local
if (args.isPython && !isYarnCluster) {
Expand Down Expand Up @@ -307,18 +327,6 @@ object SparkSubmit {
// Special flag to avoid deprecation warnings at the client
sysProps("SPARK_SUBMIT") = "true"

// Resolve maven dependencies if there are any and add classpath to jars
val resolvedMavenCoordinates =
SparkSubmitUtils.resolveMavenCoordinates(
args.packages, Option(args.repositories), Option(args.ivyRepoPath))
if (!resolvedMavenCoordinates.trim.isEmpty) {
if (args.jars == null || args.jars.trim.isEmpty) {
args.jars = resolvedMavenCoordinates
} else {
args.jars += s",$resolvedMavenCoordinates"
}
}

// A list of rules to map each argument to system properties or command-line options in
// each deploy mode; we iterate through these below
val options = List[OptionAssigner](
Expand Down Expand Up @@ -646,13 +654,15 @@ private[spark] object SparkSubmitUtils {
private[spark] case class MavenCoordinate(groupId: String, artifactId: String, version: String)

/**
* Extracts maven coordinates from a comma-delimited string
* Extracts maven coordinates from a comma-delimited string. Coordinates should be provided
* in the format `groupId:artifactId:version` or `groupId/artifactId:version`. The latter provides
* simplicity for Spark Package users.
* @param coordinates Comma-delimited string of maven coordinates
* @return Sequence of Maven coordinates
*/
private[spark] def extractMavenCoordinates(coordinates: String): Seq[MavenCoordinate] = {
coordinates.split(",").map { p =>
val splits = p.split(":")
val splits = p.replace("/", ":").split(":")
require(splits.length == 3, s"Provided Maven Coordinates must be in the form " +
s"'groupId:artifactId:version'. The coordinate provided is: $p")
require(splits(0) != null && splits(0).trim.nonEmpty, s"The groupId cannot be null or " +
Expand Down Expand Up @@ -682,6 +692,13 @@ private[spark] object SparkSubmitUtils {
br.setName("central")
cr.add(br)

val sp: IBiblioResolver = new IBiblioResolver
sp.setM2compatible(true)
sp.setUsepoms(true)
sp.setRoot("http://dl.bintray.com/spark-packages/maven")
sp.setName("spark-packages")
cr.add(sp)

val repositoryList = remoteRepos.getOrElse("")
// add any other remote repositories other than maven central
if (repositoryList.trim.nonEmpty) {
Expand Down Expand Up @@ -794,14 +811,19 @@ private[spark] object SparkSubmitUtils {
val md = getModuleDescriptor
md.setDefaultConf(ivyConfName)

// Add an exclusion rule for Spark
// Add an exclusion rule for Spark and Scala Library
val sparkArtifacts = new ArtifactId(new ModuleId("org.apache.spark", "*"), "*", "*", "*")
val sparkDependencyExcludeRule =
new DefaultExcludeRule(sparkArtifacts, ivySettings.getMatcher("glob"), null)
sparkDependencyExcludeRule.addConfiguration(ivyConfName)
val scalaArtifacts = new ArtifactId(new ModuleId("*", "scala-library"), "*", "*", "*")
val scalaDependencyExcludeRule =
new DefaultExcludeRule(scalaArtifacts, ivySettings.getMatcher("glob"), null)
scalaDependencyExcludeRule.addConfiguration(ivyConfName)

// Exclude any Spark dependencies, and add all supplied maven artifacts as dependencies
md.addExcludeRule(sparkDependencyExcludeRule)
md.addExcludeRule(scalaDependencyExcludeRule)
addDependenciesToIvy(md, artifacts, ivyConfName)

// resolve dependencies
Expand Down
32 changes: 17 additions & 15 deletions core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,12 @@ import org.apache.spark.util.Utils

private[spark] class MetricsConfig(val configFile: Option[String]) extends Logging {

val DEFAULT_PREFIX = "*"
val INSTANCE_REGEX = "^(\\*|[a-zA-Z]+)\\.(.+)".r
val METRICS_CONF = "metrics.properties"
private val DEFAULT_PREFIX = "*"
private val INSTANCE_REGEX = "^(\\*|[a-zA-Z]+)\\.(.+)".r
private val DEFAULT_METRICS_CONF_FILENAME = "metrics.properties"

val properties = new Properties()
var propertyCategories: mutable.HashMap[String, Properties] = null
private[metrics] val properties = new Properties()
private[metrics] var propertyCategories: mutable.HashMap[String, Properties] = null

private def setDefaultProperties(prop: Properties) {
prop.setProperty("*.sink.servlet.class", "org.apache.spark.metrics.sink.MetricsServlet")
Expand All @@ -47,20 +47,22 @@ private[spark] class MetricsConfig(val configFile: Option[String]) extends Loggi
setDefaultProperties(properties)

// If spark.metrics.conf is not set, try to get file in class path
var is: InputStream = null
try {
is = configFile match {
case Some(f) => new FileInputStream(f)
case None => Utils.getSparkClassLoader.getResourceAsStream(METRICS_CONF)
val isOpt: Option[InputStream] = configFile.map(new FileInputStream(_)).orElse {
try {
Option(Utils.getSparkClassLoader.getResourceAsStream(DEFAULT_METRICS_CONF_FILENAME))
} catch {
case e: Exception =>
logError("Error loading default configuration file", e)
None
}
}

if (is != null) {
isOpt.foreach { is =>
try {
properties.load(is)
} finally {
is.close()
}
} catch {
case e: Exception => logError("Error loading configure file", e)
} finally {
if (is != null) is.close()
}

propertyCategories = subProperties(properties, INSTANCE_REGEX)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,10 @@ private[spark] class MetricsSystem private (
sinks += sink.asInstanceOf[Sink]
}
} catch {
case e: Exception => logError("Sink class " + classPath + " cannot be instantialized", e)
case e: Exception => {
logError("Sink class " + classPath + " cannot be instantialized")
throw e
}
}
}
}
Expand Down
8 changes: 7 additions & 1 deletion core/src/main/scala/org/apache/spark/rdd/RDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -462,7 +462,13 @@ abstract class RDD[T: ClassTag](
* Return the union of this RDD and another one. Any identical elements will appear multiple
* times (use `.distinct()` to eliminate them).
*/
def union(other: RDD[T]): RDD[T] = new UnionRDD(sc, Array(this, other))
def union(other: RDD[T]): RDD[T] = {
if (partitioner.isDefined && other.partitioner == partitioner) {
new PartitionerAwareUnionRDD(sc, Array(this, other))
} else {
new UnionRDD(sc, Array(this, other))
}
}

/**
* Return the union of this RDD and another one. Any identical elements will appear multiple
Expand Down
28 changes: 22 additions & 6 deletions core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,13 @@ class DAGScheduler(

private[scheduler] val activeJobs = new HashSet[ActiveJob]

// Contains the locations that each RDD's partitions are cached on
/**
* Contains the locations that each RDD's partitions are cached on. This map's keys are RDD ids
* and its values are arrays indexed by partition numbers. Each array value is the set of
* locations where that RDD partition is cached.
*
* All accesses to this map should be guarded by synchronizing on it (see SPARK-4454).
*/
private val cacheLocs = new HashMap[Int, Array[Seq[TaskLocation]]]

// For tracking failed nodes, we use the MapOutputTracker's epoch number, which is sent with
Expand Down Expand Up @@ -183,7 +189,8 @@ class DAGScheduler(
eventProcessLoop.post(TaskSetFailed(taskSet, reason))
}

private def getCacheLocs(rdd: RDD[_]): Array[Seq[TaskLocation]] = {
private def getCacheLocs(rdd: RDD[_]): Array[Seq[TaskLocation]] = cacheLocs.synchronized {
// Note: this doesn't use `getOrElse()` because this method is called O(num tasks) times
if (!cacheLocs.contains(rdd.id)) {
val blockIds = rdd.partitions.indices.map(index => RDDBlockId(rdd.id, index)).toArray[BlockId]
val locs = BlockManager.blockIdsToBlockManagers(blockIds, env, blockManagerMaster)
Expand All @@ -194,7 +201,7 @@ class DAGScheduler(
cacheLocs(rdd.id)
}

private def clearCacheLocs() {
private def clearCacheLocs(): Unit = cacheLocs.synchronized {
cacheLocs.clear()
}

Expand Down Expand Up @@ -1276,17 +1283,26 @@ class DAGScheduler(
}

/**
* Synchronized method that might be called from other threads.
* Gets the locality information associated with a partition of a particular RDD.
*
* This method is thread-safe and is called from both DAGScheduler and SparkContext.
*
* @param rdd whose partitions are to be looked at
* @param partition to lookup locality information for
* @return list of machines that are preferred by the partition
*/
private[spark]
def getPreferredLocs(rdd: RDD[_], partition: Int): Seq[TaskLocation] = synchronized {
def getPreferredLocs(rdd: RDD[_], partition: Int): Seq[TaskLocation] = {
getPreferredLocsInternal(rdd, partition, new HashSet)
}

/** Recursive implementation for getPreferredLocs. */
/**
* Recursive implementation for getPreferredLocs.
*
* This method is thread-safe because it only accesses DAGScheduler state through thread-safe
* methods (getCacheLocs()); please be careful when modifying this method, because any new
* DAGScheduler state accessed by it may require additional synchronization.
*/
private def getPreferredLocsInternal(
rdd: RDD[_],
partition: Int,
Expand Down
Loading

0 comments on commit 77ff9ca

Please sign in to comment.