Skip to content

Commit

Permalink
Initial pass
Browse files Browse the repository at this point in the history
  • Loading branch information
pwendell committed Mar 30, 2014
1 parent d666053 commit 9d48cbf
Show file tree
Hide file tree
Showing 48 changed files with 166 additions and 35 deletions.
2 changes: 2 additions & 0 deletions core/src/main/scala/org/apache/spark/Aggregator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ package org.apache.spark
import org.apache.spark.util.collection.{AppendOnlyMap, ExternalAppendOnlyMap}

/**
* <span class="badge badge-red" style="float: right;">SEMI-PRIVATE</span>
*
* A set of functions used to aggregate data.
*
* @param createCombiner function to create the initial value of the aggregation.
Expand Down
10 changes: 10 additions & 0 deletions core/src/main/scala/org/apache/spark/Dependency.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,16 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.serializer.Serializer

/**
* <span class="badge badge-red" style="float: right;">SEMI-PRIVATE</span>
*
* Base class for dependencies.
*/
abstract class Dependency[T](val rdd: RDD[T]) extends Serializable


/**
* <span class="badge badge-red" style="float: right;">SEMI-PRIVATE</span>
*
* Base class for dependencies where each partition of the parent RDD is used by at most one
* partition of the child RDD. Narrow dependencies allow for pipelined execution.
*/
Expand All @@ -41,6 +45,8 @@ abstract class NarrowDependency[T](rdd: RDD[T]) extends Dependency(rdd) {


/**
* <span class="badge badge-red" style="float: right;">SEMI-PRIVATE</span>
*
* Represents a dependency on the output of a shuffle stage.
* @param rdd the parent RDD
* @param partitioner partitioner used to partition the shuffle output
Expand All @@ -59,6 +65,8 @@ class ShuffleDependency[K, V](


/**
* <span class="badge badge-red" style="float: right;">SEMI-PRIVATE</span>
*
* Represents a one-to-one dependency between partitions of the parent and child RDDs.
*/
class OneToOneDependency[T](rdd: RDD[T]) extends NarrowDependency[T](rdd) {
Expand All @@ -67,6 +75,8 @@ class OneToOneDependency[T](rdd: RDD[T]) extends NarrowDependency[T](rdd) {


/**
* <span class="badge badge-red" style="float: right;">SEMI-PRIVATE</span>
*
* Represents a one-to-one dependency between ranges of partitions in the parent and child RDDs.
* @param rdd the parent RDD
* @param inStart the start of the range in the parent RDD
Expand Down
4 changes: 4 additions & 0 deletions core/src/main/scala/org/apache/spark/FutureAction.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.scheduler.{JobFailed, JobSucceeded, JobWaiter}

/**
* <span class="badge badge-red" style="float: right;">EXPERIMENTAL</span>
*
* A future for the result of an action to support cancellation. This is an extension of the
* Scala Future interface to support cancellation.
*/
Expand Down Expand Up @@ -148,6 +150,8 @@ class SimpleFutureAction[T] private[spark](jobWaiter: JobWaiter[_], resultFunc:


/**
* <span class="badge badge-red" style="float: right;">EXPERIMENTAL</span>
*
* A [[FutureAction]] for actions that could trigger multiple Spark jobs. Examples include take,
* takeSample. Cancellation works by setting the cancelled flag to true and interrupting the
* action thread if it is being blocked by a job.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.io.ObjectWritable
import org.apache.hadoop.io.Writable

class SerializableWritable[T <: Writable](@transient var t: T) extends Serializable {
private[spark] class SerializableWritable[T <: Writable](@transient var t: T) extends Serializable {
def value = t
override def toString = t.toString

Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/SparkEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ import org.apache.spark.util.{AkkaUtils, Utils}
* objects needs to have the right SparkEnv set. You can get the current environment with
* SparkEnv.get (e.g. after creating a SparkContext) and set it with SparkEnv.set.
*/
class SparkEnv private[spark] (
private[spark] class SparkEnv private[spark] (
val executorId: String,
val actorSystem: ActorSystem,
val serializer: Serializer,
Expand Down
5 changes: 5 additions & 0 deletions core/src/main/scala/org/apache/spark/TaskContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@ import scala.collection.mutable.ArrayBuffer

import org.apache.spark.executor.TaskMetrics

/**
* <span class="badge badge-red" style="float: right;">SEMI-PRIVATE</span>
*
* Contextual information about a task which can be read or mutated during execution.
*/
class TaskContext(
val stageId: Int,
val partitionId: Int,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.api.java

import com.google.common.base.Optional

object JavaUtils {
private[spark] object JavaUtils {
def optionToOptional[T](option: Option[T]): Optional[T] =
option match {
case Some(value) => Optional.of(value)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.apache.hadoop.fs.FileSystem

import org.apache.spark.metrics.source.Source

class ExecutorSource(val executor: Executor, executorId: String) extends Source {
private[spark] class ExecutorSource(val executor: Executor, executorId: String) extends Source {
private def fileStats(scheme: String) : Option[FileSystem.Statistics] =
FileSystem.getAllStatistics().filter(s => s.getScheme.equals(scheme)).headOption

Expand Down
15 changes: 15 additions & 0 deletions core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@ package org.apache.spark.executor

import org.apache.spark.storage.{BlockId, BlockStatus}

/**
* <span class="badge badge-red" style="float: right;">SEMI-PRIVATE</span>
*
* Metrics tracked during the execution of a task.
*/
class TaskMetrics extends Serializable {
/**
* Host's name the task runs on
Expand Down Expand Up @@ -82,6 +87,11 @@ object TaskMetrics {
}


/**
* <span class="badge badge-red" style="float: right;">SEMI-PRIVATE</span>
*
* Metrics pertaining to shuffle data read in a given task.
*/
class ShuffleReadMetrics extends Serializable {
/**
* Absolute time when this task finished reading shuffle data
Expand Down Expand Up @@ -116,6 +126,11 @@ class ShuffleReadMetrics extends Serializable {
var remoteBytesRead: Long = _
}

/**
* <span class="badge badge-red" style="float: right;">SEMI-PRIVATE</span>
*
* Metrics pertaining to shuffle data written in a given task.
*/
class ShuffleWriteMetrics extends Serializable {
/**
* Number of bytes written for the shuffle by this task
Expand Down
18 changes: 18 additions & 0 deletions core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,14 @@ import org.xerial.snappy.{SnappyInputStream, SnappyOutputStream}
import org.apache.spark.SparkConf

/**
* <span class="badge badge-red" style="float: right;">SEMI-PRIVATE</span>
*
* CompressionCodec allows the customization of choosing different compression implementations
* to be used in block storage.
*
* Note: The wire protocol for a codec is not guaranteed compatible across versions of Spark.
* This is intended for use as an internal compression utility within a single
* Spark application.
*/
trait CompressionCodec {

Expand All @@ -52,7 +58,13 @@ private[spark] object CompressionCodec {


/**
* <span class="badge badge-red" style="float: right;">SEMI-PRIVATE</span>
*
* LZF implementation of [[org.apache.spark.io.CompressionCodec]].
*
* Note: The wire protocol for this codec is not guaranteed to be compatible across versions
* of Spark. This is intended for use as an internal compression utility within a single Spark
* application.
*/
class LZFCompressionCodec(conf: SparkConf) extends CompressionCodec {

Expand All @@ -65,8 +77,14 @@ class LZFCompressionCodec(conf: SparkConf) extends CompressionCodec {


/**
* <span class="badge badge-red" style="float: right;">SEMI-PRIVATE</span>
*
* Snappy implementation of [[org.apache.spark.io.CompressionCodec]].
* Block size can be configured by spark.io.compression.snappy.block.size.
*
* Note: The wire protocol for this codec is not guaranteed to be compatible across versions
* of Spark. This is intended for use as an internal compression utility within a single Spark
* application.
*/
class SnappyCompressionCodec(conf: SparkConf) extends CompressionCodec {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import com.codahale.metrics.{ConsoleReporter, MetricRegistry}
import org.apache.spark.SecurityManager
import org.apache.spark.metrics.MetricsSystem

class ConsoleSink(val property: Properties, val registry: MetricRegistry,
private[spark] class ConsoleSink(val property: Properties, val registry: MetricRegistry,
securityMgr: SecurityManager) extends Sink {
val CONSOLE_DEFAULT_PERIOD = 10
val CONSOLE_DEFAULT_UNIT = "SECONDS"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import com.codahale.metrics.{CsvReporter, MetricRegistry}
import org.apache.spark.SecurityManager
import org.apache.spark.metrics.MetricsSystem

class CsvSink(val property: Properties, val registry: MetricRegistry,
private[spark] class CsvSink(val property: Properties, val registry: MetricRegistry,
securityMgr: SecurityManager) extends Sink {
val CSV_KEY_PERIOD = "period"
val CSV_KEY_UNIT = "unit"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import com.codahale.metrics.graphite.{Graphite, GraphiteReporter}
import org.apache.spark.SecurityManager
import org.apache.spark.metrics.MetricsSystem

class GraphiteSink(val property: Properties, val registry: MetricRegistry,
private[spark] class GraphiteSink(val property: Properties, val registry: MetricRegistry,
securityMgr: SecurityManager) extends Sink {
val GRAPHITE_DEFAULT_PERIOD = 10
val GRAPHITE_DEFAULT_UNIT = "SECONDS"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import java.util.Properties
import com.codahale.metrics.{JmxReporter, MetricRegistry}
import org.apache.spark.SecurityManager

class JmxSink(val property: Properties, val registry: MetricRegistry,
private[spark] class JmxSink(val property: Properties, val registry: MetricRegistry,
securityMgr: SecurityManager) extends Sink {

val reporter: JmxReporter = JmxReporter.forRegistry(registry).build()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import org.eclipse.jetty.servlet.ServletContextHandler
import org.apache.spark.SecurityManager
import org.apache.spark.ui.JettyUtils._

class MetricsServlet(val property: Properties, val registry: MetricRegistry,
private[spark] class MetricsServlet(val property: Properties, val registry: MetricRegistry,
securityMgr: SecurityManager) extends Sink {
val SERVLET_KEY_PATH = "path"
val SERVLET_KEY_SAMPLE = "sample"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.metrics.sink

trait Sink {
private[spark] trait Sink {
def start: Unit
def stop: Unit
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.spark.metrics.source
import com.codahale.metrics.MetricRegistry
import com.codahale.metrics.jvm.{GarbageCollectorMetricSet, MemoryUsageGaugeSet}

class JvmSource extends Source {
private[spark] class JvmSource extends Source {
val sourceName = "jvm"
val metricRegistry = new MetricRegistry()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.metrics.source

import com.codahale.metrics.MetricRegistry

trait Source {
private[spark] trait Source {
def sourceName: String
def metricRegistry: MetricRegistry
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
package org.apache.spark.partial

/**
* A Double with error bars on it.
* <span class="badge badge-red" style="float: right;">EXPERIMENTAL</span>
*
* A Double value with error bars and associated confidence.
*/
class BoundedDouble(val mean: Double, val confidence: Double, val low: Double, val high: Double) {
override def toString(): String = "[%.3f, %.3f]".format(low, high)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@

package org.apache.spark.partial

/**
* <span class="badge badge-red" style="float: right;">EXPERIMENTAL</span>
*/
class PartialResult[R](initialVal: R, isFinal: Boolean) {
private var finalValue: Option[R] = if (isFinal) Some(initialVal) else None
private var failure: Option[Exception] = None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import scala.reflect.ClassTag
import org.apache.spark.{ComplexFutureAction, FutureAction, Logging}

/**
* <span class="badge badge-red" style="float: right;">EXPERIMENTAL</span>
*
* A set of asynchronous RDD actions available through an implicit conversion.
* Import `org.apache.spark.SparkContext._` at the top of your program to use these functions.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ private[spark] class CoGroupPartition(idx: Int, val deps: Array[CoGroupSplitDep]
* @param rdds parent RDDs.
* @param part partitioner used to partition the shuffle output.
*/
private[spark]
class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part: Partitioner)
extends RDD[(K, Seq[Seq[_]])](rdds.head.context, Nil) {

Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import org.apache.spark._
* @param parentsIndices list of indices in the parent that have been coalesced into this partition
* @param preferredLocation the preferred location for this partition
*/
case class CoalescedRDDPartition(
private[spark] case class CoalescedRDDPartition(
index: Int,
@transient rdd: RDD[_],
parentsIndices: Array[Int],
Expand Down Expand Up @@ -70,7 +70,7 @@ case class CoalescedRDDPartition(
* @param maxPartitions number of desired partitions in the coalesced RDD
* @param balanceSlack used to trade-off balance and locality. 1.0 is all locality, 0 is all balance
*/
class CoalescedRDD[T: ClassTag](
private[spark] class CoalescedRDD[T: ClassTag](
@transient var prev: RDD[T],
maxPartitions: Int,
balanceSlack: Double = 0.10)
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/rdd/EmptyRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.apache.spark.{Partition, SparkContext, TaskContext}
/**
* An RDD that is empty, i.e. has no element in it.
*/
class EmptyRDD[T: ClassTag](sc: SparkContext) extends RDD[T](sc, Nil) {
private[spark] class EmptyRDD[T: ClassTag](sc: SparkContext) extends RDD[T](sc, Nil) {

override def getPartitions: Array[Partition] = Array.empty

Expand Down
5 changes: 5 additions & 0 deletions core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,14 @@ private[spark] class HadoopPartition(rddId: Int, idx: Int, @transient s: InputSp
}

/**
* <span class="badge badge-red" style="float: right;">SEMI-PRIVATE</span>
*
* An RDD that provides core functionality for reading data stored in Hadoop (e.g., files in HDFS,
* sources in HBase, or S3), using the older MapReduce API (`org.apache.hadoop.mapred`).
*
* Note: Instantiating this class directly is not recommended, please use
* [[org.apache.spark.SparkContext.hadoopRDD()]]
*
* @param sc The SparkContext to associate the RDD with.
* @param broadcastedConf A general Hadoop Configuration, or a subclass of it. If the enclosed
* variabe references an instance of JobConf, then that JobConf will be used for the Hadoop job.
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import org.apache.spark.util.NextIterator
private[spark] class JdbcPartition(idx: Int, val lower: Long, val upper: Long) extends Partition {
override def index = idx
}

// TODO: Expose a jdbcRDD function in SparkContext and mark this as semi-private
/**
* An RDD that executes an SQL query on a JDBC connection and reads results.
* For usage example, see test case JdbcRDDSuite.
Expand Down
5 changes: 5 additions & 0 deletions core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,14 @@ class NewHadoopPartition(rddId: Int, val index: Int, @transient rawSplit: InputS
}

/**
* <span class="badge badge-red" style="float: right;">SEMI-PRIVATE</span>
*
* An RDD that provides core functionality for reading data stored in Hadoop (e.g., files in HDFS,
* sources in HBase, or S3), using the new MapReduce API (`org.apache.hadoop.mapreduce`).
*
* Note: Instantiating this class directly is not recommended, please use
* [[org.apache.spark.SparkContext.newAPIHadoopRDD()]]
*
* @param sc The SparkContext to associate the RDD with.
* @param inputFormatClass Storage format of the data to be read.
* @param keyClass Class of the key associated with the inputFormatClass.
Expand Down
Loading

0 comments on commit 9d48cbf

Please sign in to comment.