Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master'
Browse files Browse the repository at this point in the history
  • Loading branch information
Augustin Borsu committed Mar 2, 2015
2 parents 7f930bb + 582e5a2 commit f6a5002
Show file tree
Hide file tree
Showing 43 changed files with 2,725 additions and 67 deletions.
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/ContextCleaner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -188,10 +188,10 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
/** Perform broadcast cleanup. */
def doCleanupBroadcast(broadcastId: Long, blocking: Boolean) {
try {
logDebug("Cleaning broadcast " + broadcastId)
logDebug(s"Cleaning broadcast $broadcastId")
broadcastManager.unbroadcast(broadcastId, true, blocking)
listeners.foreach(_.broadcastCleaned(broadcastId))
logInfo("Cleaned broadcast " + broadcastId)
logDebug(s"Cleaned broadcast $broadcastId")
} catch {
case e: Exception => logError("Error cleaning broadcast " + broadcastId, e)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1074,7 +1074,7 @@ private[spark] class BlockManager(
* Remove all blocks belonging to the given broadcast.
*/
def removeBroadcast(broadcastId: Long, tellMaster: Boolean): Int = {
logInfo(s"Removing broadcast $broadcastId")
logDebug(s"Removing broadcast $broadcastId")
val blocksToRemove = blockInfo.keys.collect {
case bid @ BroadcastBlockId(`broadcastId`, _) => bid
}
Expand All @@ -1086,7 +1086,7 @@ private[spark] class BlockManager(
* Remove a block from both memory and disk.
*/
def removeBlock(blockId: BlockId, tellMaster: Boolean = true): Unit = {
logInfo(s"Removing block $blockId")
logDebug(s"Removing block $blockId")
val info = blockInfo.get(blockId).orNull
if (info != null) {
info.synchronized {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ class BlockManagerMaster(
tachyonSize: Long): Boolean = {
val res = askDriverWithReply[Boolean](
UpdateBlockInfo(blockManagerId, blockId, storageLevel, memSize, diskSize, tachyonSize))
logInfo("Updated info of block " + blockId)
logDebug(s"Updated info of block $blockId")
res
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long)
val entry = entries.remove(blockId)
if (entry != null) {
currentMemory -= entry.size
logInfo(s"Block $blockId of size ${entry.size} dropped from memory (free $freeMemory)")
logDebug(s"Block $blockId of size ${entry.size} dropped from memory (free $freeMemory)")
true
} else {
false
Expand Down
4 changes: 2 additions & 2 deletions docs/graphx-programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -663,7 +663,7 @@ val graph: Graph[Int, Float] = ...
def msgFun(triplet: Triplet[Int, Float]): Iterator[(Int, String)] = {
Iterator((triplet.dstId, "Hi"))
}
def reduceFun(a: Int, b: Int): Int = a + b
def reduceFun(a: String, b: String): String = a + " " + b
val result = graph.mapReduceTriplets[String](msgFun, reduceFun)
{% endhighlight %}

Expand All @@ -674,7 +674,7 @@ val graph: Graph[Int, Float] = ...
def msgFun(triplet: EdgeContext[Int, Float, String]) {
triplet.sendToDst("Hi")
}
def reduceFun(a: Int, b: Int): Int = a + b
def reduceFun(a: String, b: String): String = a + " " + b
val result = graph.aggregateMessages[String](msgFun, reduceFun)
{% endhighlight %}

Expand Down
10 changes: 2 additions & 8 deletions docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,8 @@ options for deployment:

* [Spark Homepage](http://spark.apache.org)
* [Spark Wiki](https://cwiki.apache.org/confluence/display/SPARK)
* [Spark Community](http://spark.apache.org/community.html) resources, including local meetups
* [StackOverflow tag `apache-spark`](http://stackoverflow.com/questions/tagged/apache-spark)
* [Mailing Lists](http://spark.apache.org/mailing-lists.html): ask questions about Spark here
* [AMP Camps](http://ampcamp.berkeley.edu/): a series of training camps at UC Berkeley that featured talks and
exercises about Spark, Spark Streaming, Mesos, and more. [Videos](http://ampcamp.berkeley.edu/3/),
Expand All @@ -123,11 +125,3 @@ options for deployment:
* [Code Examples](http://spark.apache.org/examples.html): more are also available in the `examples` subfolder of Spark ([Scala]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/scala/org/apache/spark/examples),
[Java]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/java/org/apache/spark/examples),
[Python]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/python))

# Community

To get help using Spark or keep up with Spark development, sign up for the [user mailing list](http://spark.apache.org/mailing-lists.html).

If you're in the San Francisco Bay Area, there's a regular [Spark meetup](http://www.meetup.com/spark-users/) every few weeks. Come by to meet the developers and other users.

Finally, if you'd like to contribute code to Spark, read [how to contribute](contributing-to-spark.html).
8 changes: 5 additions & 3 deletions docs/mllib-collaborative-filtering.md
Original file line number Diff line number Diff line change
Expand Up @@ -200,10 +200,8 @@ In the following example we load rating data. Each row consists of a user, a pro
We use the default ALS.train() method which assumes ratings are explicit. We evaluate the
recommendation by measuring the Mean Squared Error of rating prediction.

Note that the Python API does not yet support model save/load but will in the future.

{% highlight python %}
from pyspark.mllib.recommendation import ALS, Rating
from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel, Rating

# Load and parse the data
data = sc.textFile("data/mllib/als/test.data")
Expand All @@ -220,6 +218,10 @@ predictions = model.predictAll(testdata).map(lambda r: ((r[0], r[1]), r[2]))
ratesAndPreds = ratings.map(lambda r: ((r[0], r[1]), r[2])).join(predictions)
MSE = ratesAndPreds.map(lambda r: (r[1][0] - r[1][1])**2).reduce(lambda x, y: x + y) / ratesAndPreds.count()
print("Mean Squared Error = " + str(MSE))

# Save and load model
model.save(sc, "myModelPath")
sameModel = MatrixFactorizationModel.load(sc, "myModelPath")
{% endhighlight %}

If the rating matrix is derived from other source of information (i.e., it is inferred from other
Expand Down
26 changes: 16 additions & 10 deletions docs/mllib-naive-bayes.md
Original file line number Diff line number Diff line change
Expand Up @@ -115,22 +115,28 @@ used for evaluation and prediction.

Note that the Python API does not yet support model save/load but will in the future.

<!-- TODO: Make Python's example consistent with Scala's and Java's. -->
{% highlight python %}
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.classification import NaiveBayes
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.regression import LabeledPoint

def parseLine(line):
parts = line.split(',')
label = float(parts[0])
features = Vectors.dense([float(x) for x in parts[1].split(' ')])
return LabeledPoint(label, features)

data = sc.textFile('data/mllib/sample_naive_bayes_data.txt').map(parseLine)

# an RDD of LabeledPoint
data = sc.parallelize([
LabeledPoint(0.0, [0.0, 0.0])
... # more labeled points
])
# Split data aproximately into training (60%) and test (40%)
training, test = data.randomSplit([0.6, 0.4], seed = 0)

# Train a naive Bayes model.
model = NaiveBayes.train(data, 1.0)
model = NaiveBayes.train(training, 1.0)

# Make prediction.
prediction = model.predict([0.0, 0.0])
# Make prediction and test accuracy.
predictionAndLabel = test.map(lambda p : (model.predict(p.features), p.label))
accuracy = 1.0 * predictionAndLabel.filter(lambda (x, v): x == v).count() / test.count()
{% endhighlight %}

</div>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ import org.apache.spark.SparkConf
* <topics> is a list of one or more kafka topics to consume from
*
* Example:
* $ bin/run-example streaming.KafkaWordCount broker1-host:port,broker2-host:port topic1,topic2
* $ bin/run-example streaming.DirectKafkaWordCount broker1-host:port,broker2-host:port \
* topic1,topic2
*/
object DirectKafkaWordCount {
def main(args: Array[String]) {
Expand Down
4 changes: 2 additions & 2 deletions examples/src/main/python/streaming/kafka_wordcount.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@

"""
Counts words in UTF8 encoded, '\n' delimited text received from the network every second.
Usage: network_wordcount.py <zk> <topic>
Usage: kafka_wordcount.py <zk> <topic>
To run this on your local machine, you need to setup Kafka and create a producer first, see
http://kafka.apache.org/documentation.html#quickstart
and then run the example
`$ bin/spark-submit --driver-class-path external/kafka-assembly/target/scala-*/\
`$ bin/spark-submit --jars external/kafka-assembly/target/scala-*/\
spark-streaming-kafka-assembly-*.jar examples/src/main/python/streaming/kafka_wordcount.py \
localhost:2181 test`
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.graphx.impl

import scala.reflect.{classTag, ClassTag}

import org.apache.spark.{OneToOneDependency, HashPartitioner, TaskContext}
import org.apache.spark.{OneToOneDependency, HashPartitioner}
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ trait Saveable {
*
* @param sc Spark context used to save model data.
* @param path Path specifying the directory in which to save this model.
* This directory and any intermediate directory will be created if needed.
* If the directory already exists, this method throws an exception.
*/
def save(sc: SparkContext, path: String): Unit

Expand Down
2 changes: 1 addition & 1 deletion python/pyspark/mllib/classification.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ def train(cls, data, iterations=100, initialWeights=None, regParam=0.01, regType
"""
def train(rdd, i):
return callMLlibFunc("trainLogisticRegressionModelWithLBFGS", rdd, int(iterations), i,
float(regParam), str(regType), bool(intercept), int(corrections),
float(regParam), regType, bool(intercept), int(corrections),
float(tolerance))

return _regression_train_wrapper(train, LogisticRegressionModel, data, initialWeights)
Expand Down
20 changes: 18 additions & 2 deletions python/pyspark/mllib/recommendation.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@

from pyspark import SparkContext
from pyspark.rdd import RDD
from pyspark.mllib.common import JavaModelWrapper, callMLlibFunc
from pyspark.mllib.common import JavaModelWrapper, callMLlibFunc, inherit_doc
from pyspark.mllib.util import Saveable, JavaLoader

__all__ = ['MatrixFactorizationModel', 'ALS', 'Rating']

Expand All @@ -39,7 +40,8 @@ def __reduce__(self):
return Rating, (int(self.user), int(self.product), float(self.rating))


class MatrixFactorizationModel(JavaModelWrapper):
@inherit_doc
class MatrixFactorizationModel(JavaModelWrapper, Saveable, JavaLoader):

"""A matrix factorisation model trained by regularized alternating
least-squares.
Expand Down Expand Up @@ -81,6 +83,17 @@ class MatrixFactorizationModel(JavaModelWrapper):
>>> model = ALS.trainImplicit(ratings, 1, nonnegative=True, seed=10)
>>> model.predict(2,2)
0.43...
>>> import os, tempfile
>>> path = tempfile.mkdtemp()
>>> model.save(sc, path)
>>> sameModel = MatrixFactorizationModel.load(sc, path)
>>> sameModel.predict(2,2)
0.43...
>>> try:
... os.removedirs(path)
... except:
... pass
"""
def predict(self, user, product):
return self._java_model.predict(int(user), int(product))
Expand All @@ -98,6 +111,9 @@ def userFeatures(self):
def productFeatures(self):
return self.call("getProductFeatures")

def save(self, sc, path):
self.call("save", sc._jsc.sc(), path)


class ALS(object):

Expand Down
58 changes: 58 additions & 0 deletions python/pyspark/mllib/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,64 @@ def loadLabeledPoints(sc, path, minPartitions=None):
return callMLlibFunc("loadLabeledPoints", sc, path, minPartitions)


class Saveable(object):
"""
Mixin for models and transformers which may be saved as files.
"""

def save(self, sc, path):
"""
Save this model to the given path.
This saves:
* human-readable (JSON) model metadata to path/metadata/
* Parquet formatted data to path/data/
The model may be loaded using py:meth:`Loader.load`.
:param sc: Spark context used to save model data.
:param path: Path specifying the directory in which to save
this model. If the directory already exists,
this method throws an exception.
"""
raise NotImplementedError


class Loader(object):
"""
Mixin for classes which can load saved models from files.
"""

@classmethod
def load(cls, sc, path):
"""
Load a model from the given path. The model should have been
saved using py:meth:`Saveable.save`.
:param sc: Spark context used for loading model files.
:param path: Path specifying the directory to which the model
was saved.
:return: model instance
"""
raise NotImplemented


class JavaLoader(Loader):
"""
Mixin for classes which can load saved models using its Scala
implementation.
"""

@classmethod
def load(cls, sc, path):
java_package = cls.__module__.replace("pyspark", "org.apache.spark")
java_class = ".".join([java_package, cls.__name__])
java_obj = sc._jvm
for name in java_class.split("."):
java_obj = getattr(java_obj, name)
return cls(java_obj.load(sc._jsc.sc(), path))


def _test():
import doctest
from pyspark.context import SparkContext
Expand Down
2 changes: 1 addition & 1 deletion python/pyspark/streaming/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ def createStream(ssc, zkQuorum, groupId, topics, kafkaParams={},
2. Download the JAR of the artifact from Maven Central http://search.maven.org/,
Group Id = org.apache.spark, Artifact Id = spark-streaming-kafka-assembly, Version = %s.
Then, innclude the jar in the spark-submit command as
Then, include the jar in the spark-submit command as
$ bin/spark-submit --jars <spark-streaming-kafka-assembly.jar> ...
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ abstract class CodeGenerator[InType <: AnyRef, OutType <: AnyRef] extends Loggin
* @param nullTerm A term that holds a boolean value representing whether the expression evaluated
* to null.
* @param primitiveTerm A term for a possible primitive value of the result of the evaluation. Not
* valid if `nullTerm` is set to `false`.
* valid if `nullTerm` is set to `true`.
* @param objectTerm A possibly boxed version of the result of evaluating this expression.
*/
protected case class EvaluatedExpression(
Expand Down
8 changes: 8 additions & 0 deletions sql/core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -109,5 +109,13 @@
<build>
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
<testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
<resources>
<resource>
<directory>../../python</directory>
<includes>
<include>pyspark/sql/*.py</include>
</includes>
</resource>
</resources>
</build>
</project>
4 changes: 2 additions & 2 deletions sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
Original file line number Diff line number Diff line change
Expand Up @@ -337,11 +337,11 @@ class DataFrame protected[sql](
* {{{
* // Scala:
* import org.apache.spark.sql.functions._
* df1.join(df2, "outer", $"df1Key" === $"df2Key")
* df1.join(df2, $"df1Key" === $"df2Key", "outer")
*
* // Java:
* import static org.apache.spark.sql.functions.*;
* df1.join(df2, "outer", col("df1Key") === col("df2Key"));
* df1.join(df2, col("df1Key").equalTo(col("df2Key")), "outer");
* }}}
*
* @param right Right side of the join.
Expand Down
Loading

0 comments on commit f6a5002

Please sign in to comment.