Skip to content

Commit

Permalink
Merge branch 'master' of https://github.com/apache/spark into spark-6352
Browse files Browse the repository at this point in the history
Conflicts:
	sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala
  • Loading branch information
Pei-Lun Lee committed Mar 23, 2015
2 parents 9ae7545 + 9f3273b commit e17bf47
Show file tree
Hide file tree
Showing 345 changed files with 4,536 additions and 1,797 deletions.
2 changes: 1 addition & 1 deletion assembly/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.10</artifactId>
<version>1.3.0-SNAPSHOT</version>
<version>1.4.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion bagel/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.10</artifactId>
<version>1.3.0-SNAPSHOT</version>
<version>1.4.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
1 change: 0 additions & 1 deletion bin/pyspark
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,6 @@ export PYTHONSTARTUP="$SPARK_HOME/python/pyspark/shell.py"
if [[ -n "$SPARK_TESTING" ]]; then
unset YARN_CONF_DIR
unset HADOOP_CONF_DIR
export PYSPARK_SUBMIT_ARGS=pyspark-shell
if [[ -n "$PYSPARK_DOC_TEST" ]]; then
exec "$PYSPARK_DRIVER_PYTHON" -m doctest $1
else
Expand Down
6 changes: 3 additions & 3 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.10</artifactId>
<version>1.3.0-SNAPSHOT</version>
<version>1.4.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down Expand Up @@ -275,7 +275,7 @@
<dependency>
<groupId>org.tachyonproject</groupId>
<artifactId>tachyon-client</artifactId>
<version>0.5.0</version>
<version>0.6.1</version>
<exclusions>
<exclusion>
<groupId>org.apache.hadoop</groupId>
Expand Down Expand Up @@ -414,7 +414,7 @@
<overWriteIfNewer>true</overWriteIfNewer>
<useSubDirectoryPerType>true</useSubDirectoryPerType>
<includeArtifactIds>
guava,jetty-io,jetty-servlet,jetty-continuation,jetty-http,jetty-plus,jetty-util,jetty-server
guava,jetty-io,jetty-servlet,jetty-continuation,jetty-http,jetty-plus,jetty-util,jetty-server,jetty-security
</includeArtifactIds>
<silent>true</silent>
</configuration>
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/ContextCleaner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
}

/** Keep cleaning RDD, shuffle, and broadcast state. */
private def keepCleaning(): Unit = Utils.logUncaughtExceptions {
private def keepCleaning(): Unit = Utils.tryOrStopSparkContext(sc) {
while (!stopped) {
try {
val reference = Option(referenceQueue.remove(ContextCleaner.REF_QUEUE_POLL_TIMEOUT))
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1093,7 +1093,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
def addFile(path: String, recursive: Boolean): Unit = {
val uri = new URI(path)
val schemeCorrectedPath = uri.getScheme match {
case null | "local" => "file:" + uri.getPath
case null | "local" => new File(path).getCanonicalFile.toURI.toString
case _ => path
}

Expand Down Expand Up @@ -1736,7 +1736,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
}
}

listenerBus.start()
listenerBus.start(this)
}

/** Post the application start event */
Expand Down
6 changes: 1 addition & 5 deletions core/src/main/scala/org/apache/spark/TaskEndReason.scala
Original file line number Diff line number Diff line change
Expand Up @@ -151,11 +151,7 @@ case object TaskKilled extends TaskFailedReason {
* Task requested the driver to commit, but was denied.
*/
@DeveloperApi
case class TaskCommitDenied(
jobID: Int,
partitionID: Int,
attemptID: Int)
extends TaskFailedReason {
case class TaskCommitDenied(jobID: Int, partitionID: Int, attemptID: Int) extends TaskFailedReason {
override def toErrorString: String = s"TaskCommitDenied (Driver denied task commit)" +
s" for job: $jobID, partition: $partitionID, attempt: $attemptID"
}
Expand Down
3 changes: 3 additions & 0 deletions core/src/main/scala/org/apache/spark/TaskState.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ private[spark] object TaskState extends Enumeration {

type TaskState = Value

def isFailed(state: TaskState) = (LOST == state) || (FAILED == state)

def isFinished(state: TaskState) = FINISHED_STATES.contains(state)

def toMesos(state: TaskState): MesosTaskState = state match {
Expand All @@ -46,5 +48,6 @@ private[spark] object TaskState extends Enumeration {
case MesosTaskState.TASK_FAILED => FAILED
case MesosTaskState.TASK_KILLED => KILLED
case MesosTaskState.TASK_LOST => LOST
case MesosTaskState.TASK_ERROR => LOST
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.StatCounter
import org.apache.spark.util.Utils

class JavaDoubleRDD(val srdd: RDD[scala.Double]) extends JavaRDDLike[JDouble, JavaDoubleRDD] {
class JavaDoubleRDD(val srdd: RDD[scala.Double])
extends AbstractJavaRDDLike[JDouble, JavaDoubleRDD] {

override val classTag: ClassTag[JDouble] = implicitly[ClassTag[JDouble]]

Expand Down
48 changes: 38 additions & 10 deletions core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,13 @@ import org.apache.spark.api.java.function.{Function => JFunction, Function2 => J
import org.apache.spark.partial.{BoundedDouble, PartialResult}
import org.apache.spark.rdd.{OrderedRDDFunctions, RDD}
import org.apache.spark.rdd.RDD.rddToPairRDDFunctions
import org.apache.spark.serializer.Serializer
import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.Utils

class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
(implicit val kClassTag: ClassTag[K], implicit val vClassTag: ClassTag[V])
extends JavaRDDLike[(K, V), JavaPairRDD[K, V]] {
extends AbstractJavaRDDLike[(K, V), JavaPairRDD[K, V]] {

override def wrapRDD(rdd: RDD[(K, V)]): JavaPairRDD[K, V] = JavaPairRDD.fromRDD(rdd)

Expand Down Expand Up @@ -227,24 +228,51 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
* - `mergeValue`, to merge a V into a C (e.g., adds it to the end of a list)
* - `mergeCombiners`, to combine two C's into a single one.
*
* In addition, users can control the partitioning of the output RDD, and whether to perform
* map-side aggregation (if a mapper can produce multiple items with the same key).
* In addition, users can control the partitioning of the output RDD, the serializer that is use
* for the shuffle, and whether to perform map-side aggregation (if a mapper can produce multiple
* items with the same key).
*/
def combineByKey[C](createCombiner: JFunction[V, C],
mergeValue: JFunction2[C, V, C],
mergeCombiners: JFunction2[C, C, C],
partitioner: Partitioner): JavaPairRDD[K, C] = {
implicit val ctag: ClassTag[C] = fakeClassTag
mergeValue: JFunction2[C, V, C],
mergeCombiners: JFunction2[C, C, C],
partitioner: Partitioner,
mapSideCombine: Boolean,
serializer: Serializer): JavaPairRDD[K, C] = {
implicit val ctag: ClassTag[C] = fakeClassTag
fromRDD(rdd.combineByKey(
createCombiner,
mergeValue,
mergeCombiners,
partitioner
partitioner,
mapSideCombine,
serializer
))
}

/**
* Simplified version of combineByKey that hash-partitions the output RDD.
* Generic function to combine the elements for each key using a custom set of aggregation
* functions. Turns a JavaPairRDD[(K, V)] into a result of type JavaPairRDD[(K, C)], for a
* "combined type" C * Note that V and C can be different -- for example, one might group an
* RDD of type (Int, Int) into an RDD of type (Int, List[Int]). Users provide three
* functions:
*
* - `createCombiner`, which turns a V into a C (e.g., creates a one-element list)
* - `mergeValue`, to merge a V into a C (e.g., adds it to the end of a list)
* - `mergeCombiners`, to combine two C's into a single one.
*
* In addition, users can control the partitioning of the output RDD. This method automatically
* uses map-side aggregation in shuffling the RDD.
*/
def combineByKey[C](createCombiner: JFunction[V, C],
mergeValue: JFunction2[C, V, C],
mergeCombiners: JFunction2[C, C, C],
partitioner: Partitioner): JavaPairRDD[K, C] = {
combineByKey(createCombiner, mergeValue, mergeCombiners, partitioner, true, null)
}

/**
* Simplified version of combineByKey that hash-partitions the output RDD and uses map-side
* aggregation.
*/
def combineByKey[C](createCombiner: JFunction[V, C],
mergeValue: JFunction2[C, V, C],
Expand Down Expand Up @@ -488,7 +516,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])

/**
* Simplified version of combineByKey that hash-partitions the resulting RDD using the existing
* partitioner/parallelism level.
* partitioner/parallelism level and using map-side aggregation.
*/
def combineByKey[C](createCombiner: JFunction[V, C],
mergeValue: JFunction2[C, V, C],
Expand Down
13 changes: 12 additions & 1 deletion core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.Utils

class JavaRDD[T](val rdd: RDD[T])(implicit val classTag: ClassTag[T])
extends JavaRDDLike[T, JavaRDD[T]] {
extends AbstractJavaRDDLike[T, JavaRDD[T]] {

override def wrapRDD(rdd: RDD[T]): JavaRDD[T] = JavaRDD.fromRDD(rdd)

Expand Down Expand Up @@ -101,12 +101,23 @@ class JavaRDD[T](val rdd: RDD[T])(implicit val classTag: ClassTag[T])

/**
* Return a sampled subset of this RDD.
*
* @param withReplacement can elements be sampled multiple times (replaced when sampled out)
* @param fraction expected size of the sample as a fraction of this RDD's size
* without replacement: probability that each element is chosen; fraction must be [0, 1]
* with replacement: expected number of times each element is chosen; fraction must be >= 0
*/
def sample(withReplacement: Boolean, fraction: Double): JavaRDD[T] =
sample(withReplacement, fraction, Utils.random.nextLong)

/**
* Return a sampled subset of this RDD.
*
* @param withReplacement can elements be sampled multiple times (replaced when sampled out)
* @param fraction expected size of the sample as a fraction of this RDD's size
* without replacement: probability that each element is chosen; fraction must be [0, 1]
* with replacement: expected number of times each element is chosen; fraction must be >= 0
* @param seed seed for the random number generator
*/
def sample(withReplacement: Boolean, fraction: Double, seed: Long): JavaRDD[T] =
wrapRDD(rdd.sample(withReplacement, fraction, seed))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,14 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.Utils

/**
* As a workaround for https://issues.scala-lang.org/browse/SI-8905, implementations
* of JavaRDDLike should extend this dummy abstract class instead of directly inheriting
* from the trait. See SPARK-3266 for additional details.
*/
private[spark] abstract class AbstractJavaRDDLike[T, This <: JavaRDDLike[T, This]]
extends JavaRDDLike[T, This]

/**
* Defines operations common to several Java RDD implementations.
* Note that this trait is not intended to be implemented by user code.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import org.apache.spark.util.{IntParam, MemoryParam}
/**
* Command-line parser for the driver client.
*/
private[spark] class ClientArguments(args: Array[String]) {
private[deploy] class ClientArguments(args: Array[String]) {
import ClientArguments._

var cmd: String = "" // 'launch' or 'kill'
Expand Down Expand Up @@ -96,7 +96,7 @@ private[spark] class ClientArguments(args: Array[String]) {
/**
* Print usage and exit JVM with the given exit code.
*/
def printUsageAndExit(exitCode: Int) {
private def printUsageAndExit(exitCode: Int) {
// TODO: It wouldn't be too hard to allow users to submit their app and dependency jars
// separately similar to in the YARN client.
val usage =
Expand All @@ -116,10 +116,10 @@ private[spark] class ClientArguments(args: Array[String]) {
}
}

object ClientArguments {
private[spark] val DEFAULT_CORES = 1
private[spark] val DEFAULT_MEMORY = 512 // MB
private[spark] val DEFAULT_SUPERVISE = false
private[deploy] object ClientArguments {
val DEFAULT_CORES = 1
val DEFAULT_MEMORY = 512 // MB
val DEFAULT_SUPERVISE = false

def isValidJarUrl(s: String): Boolean = {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.deploy

private[spark] class DriverDescription(
private[deploy] class DriverDescription(
val jarUrl: String,
val mem: Int,
val cores: Int,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ package org.apache.spark.deploy
* This state is sufficient for the Master to reconstruct its internal data structures during
* failover.
*/
private[spark] class ExecutorDescription(
private[deploy] class ExecutorDescription(
val appId: String,
val execId: Int,
val cores: Int,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.deploy

private[spark] object ExecutorState extends Enumeration {
private[deploy] object ExecutorState extends Enumeration {

val LAUNCHING, LOADING, RUNNING, KILLED, FAILED, LOST, EXITED = Value

Expand Down
Loading

0 comments on commit e17bf47

Please sign in to comment.