Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master' into ldaonline
Browse files Browse the repository at this point in the history
i
  • Loading branch information
hhbyyh committed Mar 2, 2015
2 parents aa365d1 + 3f00bb3 commit 20328d1
Show file tree
Hide file tree
Showing 7 changed files with 129 additions and 33 deletions.
40 changes: 23 additions & 17 deletions core/src/main/scala/org/apache/spark/Accumulators.scala
Original file line number Diff line number Diff line change
Expand Up @@ -280,15 +280,24 @@ object AccumulatorParam {

// TODO: The multi-thread support in accumulators is kind of lame; check
// if there's a more intuitive way of doing it right
private[spark] object Accumulators {
// Store a WeakReference instead of a StrongReference because this way accumulators can be
// appropriately garbage collected during long-running jobs and release memory
type WeakAcc = WeakReference[Accumulable[_, _]]
val originals = Map[Long, WeakAcc]()
val localAccums = new ThreadLocal[Map[Long, WeakAcc]]() {
override protected def initialValue() = Map[Long, WeakAcc]()
private[spark] object Accumulators extends Logging {
/**
* This global map holds the original accumulator objects that are created on the driver.
* It keeps weak references to these objects so that accumulators can be garbage-collected
* once the RDDs and user-code that reference them are cleaned up.
*/
val originals = Map[Long, WeakReference[Accumulable[_, _]]]()

/**
* This thread-local map holds per-task copies of accumulators; it is used to collect the set
* of accumulator updates to send back to the driver when tasks complete. After tasks complete,
* this map is cleared by `Accumulators.clear()` (see Executor.scala).
*/
private val localAccums = new ThreadLocal[Map[Long, Accumulable[_, _]]]() {
override protected def initialValue() = Map[Long, Accumulable[_, _]]()
}
var lastId: Long = 0

private var lastId: Long = 0

def newId(): Long = synchronized {
lastId += 1
Expand All @@ -297,16 +306,16 @@ private[spark] object Accumulators {

def register(a: Accumulable[_, _], original: Boolean): Unit = synchronized {
if (original) {
originals(a.id) = new WeakAcc(a)
originals(a.id) = new WeakReference[Accumulable[_, _]](a)
} else {
localAccums.get()(a.id) = new WeakAcc(a)
localAccums.get()(a.id) = a
}
}

// Clear the local (non-original) accumulators for the current thread
def clear() {
synchronized {
localAccums.get.clear
localAccums.get.clear()
}
}

Expand All @@ -320,12 +329,7 @@ private[spark] object Accumulators {
def values: Map[Long, Any] = synchronized {
val ret = Map[Long, Any]()
for ((id, accum) <- localAccums.get) {
// Since we are now storing weak references, we must check whether the underlying data
// is valid.
ret(id) = accum.get match {
case Some(values) => values.localValue
case None => None
}
ret(id) = accum.localValue
}
return ret
}
Expand All @@ -341,6 +345,8 @@ private[spark] object Accumulators {
case None =>
throw new IllegalAccessError("Attempted to access garbage collected Accumulator.")
}
} else {
logWarning(s"Ignoring accumulator update for unknown accumulator id $id")
}
}
}
Expand Down
8 changes: 5 additions & 3 deletions docs/mllib-collaborative-filtering.md
Original file line number Diff line number Diff line change
Expand Up @@ -200,10 +200,8 @@ In the following example we load rating data. Each row consists of a user, a pro
We use the default ALS.train() method which assumes ratings are explicit. We evaluate the
recommendation by measuring the Mean Squared Error of rating prediction.

Note that the Python API does not yet support model save/load but will in the future.

{% highlight python %}
from pyspark.mllib.recommendation import ALS, Rating
from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel, Rating

# Load and parse the data
data = sc.textFile("data/mllib/als/test.data")
Expand All @@ -220,6 +218,10 @@ predictions = model.predictAll(testdata).map(lambda r: ((r[0], r[1]), r[2]))
ratesAndPreds = ratings.map(lambda r: ((r[0], r[1]), r[2])).join(predictions)
MSE = ratesAndPreds.map(lambda r: (r[1][0] - r[1][1])**2).reduce(lambda x, y: x + y) / ratesAndPreds.count()
print("Mean Squared Error = " + str(MSE))

# Save and load model
model.save(sc, "myModelPath")
sameModel = MatrixFactorizationModel.load(sc, "myModelPath")
{% endhighlight %}

If the rating matrix is derived from other source of information (i.e., it is inferred from other
Expand Down
26 changes: 16 additions & 10 deletions docs/mllib-naive-bayes.md
Original file line number Diff line number Diff line change
Expand Up @@ -115,22 +115,28 @@ used for evaluation and prediction.

Note that the Python API does not yet support model save/load but will in the future.

<!-- TODO: Make Python's example consistent with Scala's and Java's. -->
{% highlight python %}
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.classification import NaiveBayes
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.regression import LabeledPoint

def parseLine(line):
parts = line.split(',')
label = float(parts[0])
features = Vectors.dense([float(x) for x in parts[1].split(' ')])
return LabeledPoint(label, features)

data = sc.textFile('data/mllib/sample_naive_bayes_data.txt').map(parseLine)

# an RDD of LabeledPoint
data = sc.parallelize([
LabeledPoint(0.0, [0.0, 0.0])
... # more labeled points
])
# Split data aproximately into training (60%) and test (40%)
training, test = data.randomSplit([0.6, 0.4], seed = 0)

# Train a naive Bayes model.
model = NaiveBayes.train(data, 1.0)
model = NaiveBayes.train(training, 1.0)

# Make prediction.
prediction = model.predict([0.0, 0.0])
# Make prediction and test accuracy.
predictionAndLabel = test.map(lambda p : (model.predict(p.features), p.label))
accuracy = 1.0 * predictionAndLabel.filter(lambda (x, v): x == v).count() / test.count()
{% endhighlight %}

</div>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ trait Saveable {
*
* @param sc Spark context used to save model data.
* @param path Path specifying the directory in which to save this model.
* This directory and any intermediate directory will be created if needed.
* If the directory already exists, this method throws an exception.
*/
def save(sc: SparkContext, path: String): Unit

Expand Down
20 changes: 18 additions & 2 deletions python/pyspark/mllib/recommendation.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@

from pyspark import SparkContext
from pyspark.rdd import RDD
from pyspark.mllib.common import JavaModelWrapper, callMLlibFunc
from pyspark.mllib.common import JavaModelWrapper, callMLlibFunc, inherit_doc
from pyspark.mllib.util import Saveable, JavaLoader

__all__ = ['MatrixFactorizationModel', 'ALS', 'Rating']

Expand All @@ -39,7 +40,8 @@ def __reduce__(self):
return Rating, (int(self.user), int(self.product), float(self.rating))


class MatrixFactorizationModel(JavaModelWrapper):
@inherit_doc
class MatrixFactorizationModel(JavaModelWrapper, Saveable, JavaLoader):

"""A matrix factorisation model trained by regularized alternating
least-squares.
Expand Down Expand Up @@ -81,6 +83,17 @@ class MatrixFactorizationModel(JavaModelWrapper):
>>> model = ALS.trainImplicit(ratings, 1, nonnegative=True, seed=10)
>>> model.predict(2,2)
0.43...
>>> import os, tempfile
>>> path = tempfile.mkdtemp()
>>> model.save(sc, path)
>>> sameModel = MatrixFactorizationModel.load(sc, path)
>>> sameModel.predict(2,2)
0.43...
>>> try:
... os.removedirs(path)
... except:
... pass
"""
def predict(self, user, product):
return self._java_model.predict(int(user), int(product))
Expand All @@ -98,6 +111,9 @@ def userFeatures(self):
def productFeatures(self):
return self.call("getProductFeatures")

def save(self, sc, path):
self.call("save", sc._jsc.sc(), path)


class ALS(object):

Expand Down
58 changes: 58 additions & 0 deletions python/pyspark/mllib/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,64 @@ def loadLabeledPoints(sc, path, minPartitions=None):
return callMLlibFunc("loadLabeledPoints", sc, path, minPartitions)


class Saveable(object):
"""
Mixin for models and transformers which may be saved as files.
"""

def save(self, sc, path):
"""
Save this model to the given path.
This saves:
* human-readable (JSON) model metadata to path/metadata/
* Parquet formatted data to path/data/
The model may be loaded using py:meth:`Loader.load`.
:param sc: Spark context used to save model data.
:param path: Path specifying the directory in which to save
this model. If the directory already exists,
this method throws an exception.
"""
raise NotImplementedError


class Loader(object):
"""
Mixin for classes which can load saved models from files.
"""

@classmethod
def load(cls, sc, path):
"""
Load a model from the given path. The model should have been
saved using py:meth:`Saveable.save`.
:param sc: Spark context used for loading model files.
:param path: Path specifying the directory to which the model
was saved.
:return: model instance
"""
raise NotImplemented


class JavaLoader(Loader):
"""
Mixin for classes which can load saved models using its Scala
implementation.
"""

@classmethod
def load(cls, sc, path):
java_package = cls.__module__.replace("pyspark", "org.apache.spark")
java_class = ".".join([java_package, cls.__name__])
java_obj = sc._jvm
for name in java_class.split("."):
java_obj = getattr(java_obj, name)
return cls(java_obj.load(sc._jsc.sc(), path))


def _test():
import doctest
from pyspark.context import SparkContext
Expand Down
8 changes: 8 additions & 0 deletions sql/core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -109,5 +109,13 @@
<build>
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
<testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
<resources>
<resource>
<directory>../../python</directory>
<includes>
<include>pyspark/sql/*.py</include>
</includes>
</resource>
</resources>
</build>
</project>

0 comments on commit 20328d1

Please sign in to comment.