Skip to content

Commit

Permalink
Merge branch 'master' of github.com:apache/spark into dag-viz-streaming
Browse files Browse the repository at this point in the history
  • Loading branch information
Andrew Or committed May 15, 2015
2 parents e4a93ac + 9476148 commit d25a324
Show file tree
Hide file tree
Showing 24 changed files with 335 additions and 91 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -324,9 +324,6 @@ private[worker] class Worker(
map(e => new ExecutorDescription(e.appId, e.execId, e.cores, e.state))
sender ! WorkerSchedulerStateResponse(workerId, execs.toList, drivers.keys.toSeq)

case Heartbeat =>
logInfo(s"Received heartbeat from driver ${sender.path}")

case RegisterWorkerFailed(message) =>
if (!registered) {
logError("Worker registration failed: " + message)
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/ui/WebUI.scala
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ private[spark] abstract class WebUI(
}

/** Detach a handler from this UI. */
protected def detachHandler(handler: ServletContextHandler) {
def detachHandler(handler: ServletContextHandler) {
handlers -= handler
serverInfo.foreach { info =>
info.rootHandler.removeHandler(handler)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.mockito.MockitoAnnotations;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.xerial.snappy.buffer.CachedBufferAllocator;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.lessThan;
Expand Down Expand Up @@ -96,6 +97,13 @@ public OutputStream apply(OutputStream stream) {
@After
public void tearDown() {
Utils.deleteRecursively(tempDir);
// This call is a workaround for SPARK-7660, a snappy-java bug which is exposed by this test
// suite. Clearing the cached buffer allocator's pool of reusable buffers masks this bug,
// preventing a test failure in JavaAPISuite that would otherwise occur. The underlying bug
// needs to be fixed, but in the meantime this workaround avoids spurious Jenkins failures.
synchronized (CachedBufferAllocator.class) {
CachedBufferAllocator.queueTable.clear();
}
final long leakedMemory = taskMemoryManager.cleanUpAllAllocatedMemory();
if (leakedMemory != 0) {
fail("Test leaked " + leakedMemory + " bytes of managed memory");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -345,28 +345,40 @@ private[python] class PythonMLLibAPI extends Serializable {
* Returns a list containing weights, mean and covariance of each mixture component.
*/
def trainGaussianMixture(
data: JavaRDD[Vector],
k: Int,
convergenceTol: Double,
data: JavaRDD[Vector],
k: Int,
convergenceTol: Double,
maxIterations: Int,
seed: java.lang.Long): JList[Object] = {
seed: java.lang.Long,
initialModelWeights: java.util.ArrayList[Double],
initialModelMu: java.util.ArrayList[Vector],
initialModelSigma: java.util.ArrayList[Matrix]): JList[Object] = {
val gmmAlg = new GaussianMixture()
.setK(k)
.setConvergenceTol(convergenceTol)
.setMaxIterations(maxIterations)

if (initialModelWeights != null && initialModelMu != null && initialModelSigma != null) {
val gaussians = initialModelMu.asScala.toSeq.zip(initialModelSigma.asScala.toSeq).map {
case (x, y) => new MultivariateGaussian(x.asInstanceOf[Vector], y.asInstanceOf[Matrix])
}
val initialModel = new GaussianMixtureModel(
initialModelWeights.asScala.toArray, gaussians.toArray)
gmmAlg.setInitialModel(initialModel)
}

if (seed != null) gmmAlg.setSeed(seed)

try {
val model = gmmAlg.run(data.rdd.persist(StorageLevel.MEMORY_AND_DISK))
var wt = ArrayBuffer.empty[Double]
var mu = ArrayBuffer.empty[Vector]
var mu = ArrayBuffer.empty[Vector]
var sigma = ArrayBuffer.empty[Matrix]
for (i <- 0 until model.k) {
wt += model.weights(i)
mu += model.gaussians(i).mu
sigma += model.gaussians(i).sigma
}
}
List(Vectors.dense(wt.toArray), mu.toArray, sigma.toArray).map(_.asInstanceOf[Object]).asJava
} finally {
data.rdd.unpersist(blocking = false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,10 @@ import org.apache.spark.sql.{SQLContext, Row}
* are drawn from each Gaussian i=1..k with probability w(i); mu(i) and sigma(i) are
* the respective mean and covariance for each Gaussian distribution i=1..k.
*
* @param weight Weights for each Gaussian distribution in the mixture, where weight(i) is
* the weight for Gaussian i, and weight.sum == 1
* @param mu Means for each Gaussian in the mixture, where mu(i) is the mean for Gaussian i
* @param sigma Covariance maxtrix for each Gaussian in the mixture, where sigma(i) is the
* covariance matrix for Gaussian i
* @param weights Weights for each Gaussian distribution in the mixture, where weights(i) is
* the weight for Gaussian i, and weights.sum == 1
* @param gaussians Array of MultivariateGaussian where gaussians(i) represents
* the Multivariate Gaussian (Normal) Distribution for Gaussian i
*/
@Experimental
class GaussianMixtureModel(
Expand Down
67 changes: 53 additions & 14 deletions python/pyspark/mllib/clustering.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ class GaussianMixtureModel(object):

"""A clustering model derived from the Gaussian Mixture Model method.
>>> from pyspark.mllib.linalg import Vectors, DenseMatrix
>>> clusterdata_1 = sc.parallelize(array([-0.1,-0.05,-0.01,-0.1,
... 0.9,0.8,0.75,0.935,
... -0.83,-0.68,-0.91,-0.76 ]).reshape(6, 2))
Expand All @@ -154,24 +155,51 @@ class GaussianMixtureModel(object):
True
>>> labels[4]==labels[5]
True
>>> clusterdata_2 = sc.parallelize(array([-5.1971, -2.5359, -3.8220,
... -5.2211, -5.0602, 4.7118,
... 6.8989, 3.4592, 4.6322,
... 5.7048, 4.6567, 5.5026,
... 4.5605, 5.2043, 6.2734]).reshape(5, 3))
>>> data = array([-5.1971, -2.5359, -3.8220,
... -5.2211, -5.0602, 4.7118,
... 6.8989, 3.4592, 4.6322,
... 5.7048, 4.6567, 5.5026,
... 4.5605, 5.2043, 6.2734])
>>> clusterdata_2 = sc.parallelize(data.reshape(5,3))
>>> model = GaussianMixture.train(clusterdata_2, 2, convergenceTol=0.0001,
... maxIterations=150, seed=10)
>>> labels = model.predict(clusterdata_2).collect()
>>> labels[0]==labels[1]==labels[2]
True
>>> labels[3]==labels[4]
True
>>> clusterdata_3 = sc.parallelize(data.reshape(15, 1))
>>> im = GaussianMixtureModel([0.5, 0.5],
... [MultivariateGaussian(Vectors.dense([-1.0]), DenseMatrix(1, 1, [1.0])),
... MultivariateGaussian(Vectors.dense([1.0]), DenseMatrix(1, 1, [1.0]))])
>>> model = GaussianMixture.train(clusterdata_3, 2, initialModel=im)
"""

def __init__(self, weights, gaussians):
self.weights = weights
self.gaussians = gaussians
self.k = len(self.weights)
self._weights = weights
self._gaussians = gaussians
self._k = len(self._weights)

@property
def weights(self):
"""
Weights for each Gaussian distribution in the mixture, where weights[i] is
the weight for Gaussian i, and weights.sum == 1.
"""
return self._weights

@property
def gaussians(self):
"""
Array of MultivariateGaussian where gaussians[i] represents
the Multivariate Gaussian (Normal) Distribution for Gaussian i.
"""
return self._gaussians

@property
def k(self):
"""Number of gaussians in mixture."""
return self._k

def predict(self, x):
"""
Expand All @@ -193,9 +221,9 @@ def predictSoft(self, x):
:return: membership_matrix. RDD of array of double values.
"""
if isinstance(x, RDD):
means, sigmas = zip(*[(g.mu, g.sigma) for g in self.gaussians])
means, sigmas = zip(*[(g.mu, g.sigma) for g in self._gaussians])
membership_matrix = callMLlibFunc("predictSoftGMM", x.map(_convert_to_vector),
_convert_to_vector(self.weights), means, sigmas)
_convert_to_vector(self._weights), means, sigmas)
return membership_matrix.map(lambda x: pyarray.array('d', x))


Expand All @@ -208,13 +236,24 @@ class GaussianMixture(object):
:param convergenceTol: Threshold value to check the convergence criteria. Defaults to 1e-3
:param maxIterations: Number of iterations. Default to 100
:param seed: Random Seed
:param initialModel: GaussianMixtureModel for initializing learning
"""
@classmethod
def train(cls, rdd, k, convergenceTol=1e-3, maxIterations=100, seed=None):
def train(cls, rdd, k, convergenceTol=1e-3, maxIterations=100, seed=None, initialModel=None):
"""Train a Gaussian Mixture clustering model."""
weight, mu, sigma = callMLlibFunc("trainGaussianMixture",
rdd.map(_convert_to_vector), k,
convergenceTol, maxIterations, seed)
initialModelWeights = None
initialModelMu = None
initialModelSigma = None
if initialModel is not None:
if initialModel.k != k:
raise Exception("Mismatched cluster count, initialModel.k = %s, however k = %s"
% (initialModel.k, k))
initialModelWeights = initialModel.weights
initialModelMu = [initialModel.gaussians[i].mu for i in range(initialModel.k)]
initialModelSigma = [initialModel.gaussians[i].sigma for i in range(initialModel.k)]
weight, mu, sigma = callMLlibFunc("trainGaussianMixture", rdd.map(_convert_to_vector), k,
convergenceTol, maxIterations, seed, initialModelWeights,
initialModelMu, initialModelSigma)
mvg_obj = [MultivariateGaussian(mu[i], sigma[i]) for i in range(k)]
return GaussianMixtureModel(weight, mvg_obj)

Expand Down
12 changes: 9 additions & 3 deletions python/pyspark/sql/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -1511,13 +1511,19 @@ def inSet(self, *cols):
isNull = _unary_op("isNull", "True if the current expression is null.")
isNotNull = _unary_op("isNotNull", "True if the current expression is not null.")

def alias(self, alias):
"""Return a alias for this column
def alias(self, *alias):
"""Returns this column aliased with a new name or names (in the case of expressions that
return more than one column, such as explode).
>>> df.select(df.age.alias("age2")).collect()
[Row(age2=2), Row(age2=5)]
"""
return Column(getattr(self._jc, "as")(alias))

if len(alias) == 1:
return Column(getattr(self._jc, "as")(alias[0]))
else:
sc = SparkContext._active_spark_context
return Column(getattr(self._jc, "as")(_to_seq(sc, list(alias))))

@ignore_unicode_prefix
def cast(self, dataType):
Expand Down
20 changes: 20 additions & 0 deletions python/pyspark/sql/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,26 @@ def approxCountDistinct(col, rsd=None):
return Column(jc)


def explode(col):
"""Returns a new row for each element in the given array or map.
>>> from pyspark.sql import Row
>>> eDF = sqlContext.createDataFrame([Row(a=1, intlist=[1,2,3], mapfield={"a": "b"})])
>>> eDF.select(explode(eDF.intlist).alias("anInt")).collect()
[Row(anInt=1), Row(anInt=2), Row(anInt=3)]
>>> eDF.select(explode(eDF.mapfield).alias("key", "value")).show()
+---+-----+
|key|value|
+---+-----+
| a| b|
+---+-----+
"""
sc = SparkContext._active_spark_context
jc = sc._jvm.functions.explode(_to_java_column(col))
return Column(jc)


def coalesce(*cols):
"""Returns the first column that is not null.
Expand Down
15 changes: 15 additions & 0 deletions python/pyspark/sql/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,21 @@ def tearDownClass(cls):
ReusedPySparkTestCase.tearDownClass()
shutil.rmtree(cls.tempdir.name, ignore_errors=True)

def test_explode(self):
from pyspark.sql.functions import explode
d = [Row(a=1, intlist=[1, 2, 3], mapfield={"a": "b"})]
rdd = self.sc.parallelize(d)
data = self.sqlCtx.createDataFrame(rdd)

result = data.select(explode(data.intlist).alias("a")).select("a").collect()
self.assertEqual(result[0][0], 1)
self.assertEqual(result[1][0], 2)
self.assertEqual(result[2][0], 3)

result = data.select(explode(data.mapfield).alias("a", "b")).select("a", "b").collect()
self.assertEqual(result[0][0], "a")
self.assertEqual(result[0][1], "b")

def test_udf_with_callable(self):
d = [Row(number=i, squared=i**2) for i in range(10)]
rdd = self.sc.parallelize(d)
Expand Down
Loading

0 comments on commit d25a324

Please sign in to comment.