diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/EigenValueDecomposition.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/EigenValueDecomposition.scala index 99e104feb2207..f81a8b83cafb8 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/EigenValueDecomposition.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/EigenValueDecomposition.scala @@ -36,15 +36,16 @@ object EigenValueDecomposition { * The caller needs to ensure that the input matrix is real symmetric. This function requires * memory for `n*(4*k+4)` doubles. * - * @param mul a function that multiplies the symmetric matrix with a Vector. + * @param mul a function that multiplies the symmetric matrix with a DenseVector. * @param n dimension of the square matrix (maximum Int.MaxValue). * @param k number of leading eigenvalues required. * @param tol tolerance of the eigs computation. * @return a dense vector of eigenvalues in descending order and a dense matrix of eigenvectors * (columns of the matrix). The number of computed eigenvalues might be smaller than k. */ - private[mllib] def symmetricEigs(mul: Vector => Vector, n: Int, k: Int, tol: Double) + private[mllib] def symmetricEigs(mul: DenseVector => DenseVector, n: Int, k: Int, tol: Double) : (BDV[Double], BDM[Double]) = { + // TODO: remove this function and use eigs in breeze when switching breeze version require(n > k, s"Number of required eigenvalues $k must be smaller than matrix dimension $n") val arpack = ARPACK.getInstance() @@ -84,7 +85,7 @@ object EigenValueDecomposition { val outputOffset = ipntr(1) - 1 val x = w(inputOffset until inputOffset + n) val y = w(outputOffset until outputOffset + n) - y := BDV(mul(Vectors.fromBreeze(x)).toArray) + y := BDV(mul(Vectors.fromBreeze(x).asInstanceOf[DenseVector]).toArray) // call ARPACK arpack.dsaupd(ido, bmat, n, which, nev.`val`, tolW, resid, ncv, v, n, iparam, ipntr, workd, workl, workl.length, info) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala index cc64266e8b88f..c24142f44ddcf 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala @@ -19,7 +19,8 @@ package org.apache.spark.mllib.linalg.distributed import java.util -import breeze.linalg.{Vector => BV, DenseMatrix => BDM, DenseVector => BDV, svd => brzSvd} +import breeze.linalg.{Vector => BV, DenseMatrix => BDM, DenseVector => BDV, SparseVector => BSV} +import breeze.linalg.{svd => brzSvd, axpy => brzAxpy} import breeze.numerics.{sqrt => brzSqrt} import com.github.fommil.netlib.BLAS.{getInstance => blas} @@ -201,16 +202,28 @@ class RowMatrix( } /** - * Multiply the Gramian matrix `A^T A` by a Vector on the right. + * Multiply the Gramian matrix `A^T A` by a DenseVector on the right. * - * @param v a local vector whose length must match the number of columns of this matrix - * @return a local DenseVector representing the product + * @param v a local DenseVector whose length must match the number of columns of this matrix. + * @return a local DenseVector representing the product. */ - private[mllib] def multiplyGramianMatrix(v: Vector): Vector = { - val bv = rows.map{ - row => row.toBreeze * row.toBreeze.dot(v.toBreeze) - }.reduce( (x: BV[Double], y: BV[Double]) => x + y ) - Vectors.fromBreeze(bv) + private[mllib] def multiplyGramianMatrix(v: DenseVector): DenseVector = { + val n = numCols().toInt + + val bv = rows.aggregate(BDV.zeros[Double](n))( + seqOp = (U, r) => { + val rBrz = r.toBreeze + val a = rBrz.dot(v.toBreeze) + rBrz match { + case _: BDV[_] => brzAxpy(a, rBrz.asInstanceOf[BDV[Double]], U) + case _: BSV[_] => brzAxpy(a, rBrz.asInstanceOf[BSV[Double]], U) + } + U + }, + combOp = (U1, U2) => U1 += U2 + ) + + new DenseVector(bv.data) } /** @@ -243,7 +256,7 @@ class RowMatrix( * * The decomposition is computed by providing a function that multiples a vector with A'A to * ARPACK, and iteratively invoking ARPACK-dsaupd on master node, from which we recover S and V. - * Then we compute U via easy matrix multiplication as U = A * (V * S-1). + * Then we compute U via easy matrix multiplication as U = A * (V * S^{-1}). * Note that this approach requires `O(nnz(A))` time. * * When the requested eigenvalues k = n, a non-sparse implementation will be used, which requires