diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/CoordinateMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/CoordinateMatrix.scala index 09825a4d1e923..710dbbdae3659 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/CoordinateMatrix.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/CoordinateMatrix.scala @@ -17,10 +17,20 @@ package org.apache.spark.mllib.linalg.distributed +import breeze.linalg.{DenseMatrix => BDM} + import org.apache.spark.rdd.RDD import org.apache.spark.SparkContext._ import org.apache.spark.mllib.linalg.Vectors +/** + * Represents an entry in an distributed matrix. + * @param i row index + * @param j column index + * @param value value of the entry + */ +case class MatrixEntry(i: Long, j: Long, value: Double) + /** * Represents a matrix in coordinate format. * @@ -31,12 +41,12 @@ import org.apache.spark.mllib.linalg.Vectors * columns will be determined by the max column index plus one. */ class CoordinateMatrix( - val entries: RDD[DistributedMatrixEntry], + val entries: RDD[MatrixEntry], private var nRows: Long, private var nCols: Long) extends DistributedMatrix { /** Alternative constructor leaving matrix dimensions to be determined automatically. */ - def this(entries: RDD[DistributedMatrixEntry]) = this(entries, 0L, 0L) + def this(entries: RDD[MatrixEntry]) = this(entries, 0L, 0L) /** Gets or computes the number of columns. */ override def numCols(): Long = { @@ -54,16 +64,7 @@ class CoordinateMatrix( nRows } - private def computeSize() { - // Reduce will throw an exception if `entries` is empty. - val (m1, n1) = entries.map(entry => (entry.i, entry.j)).reduce { case ((i1, j1), (i2, j2)) => - (math.max(i1, i2), math.max(j1, j2)) - } - // There may be empty columns at the very right and empty rows at the very bottom. - nRows = math.max(nRows, m1 + 1L) - nCols = math.max(nCols, n1 + 1L) - } - + /** Converts to IndexedRowMatrix. The number of columns must be within the integer range. */ def toIndexedRowMatrix(): IndexedRowMatrix = { val nl = numCols() if (nl > Int.MaxValue) { @@ -74,8 +75,38 @@ class CoordinateMatrix( val indexedRows = entries.map(entry => (entry.i, (entry.j.toInt, entry.value))) .groupByKey() .map { case (i, vectorEntries) => - IndexedMatrixRow(i, Vectors.sparse(n, vectorEntries)) + IndexedRow(i, Vectors.sparse(n, vectorEntries)) } new IndexedRowMatrix(indexedRows, numRows(), n) } + + /** + * Converts to RowMatrix, dropping row indices after grouping by row index. + * The number of columns must be within the integer range. + */ + def toRowMatrix(): RowMatrix = { + toIndexedRowMatrix().toRowMatrix() + } + + /** Determines the size by computing the max row/column index. */ + private def computeSize() { + // Reduce will throw an exception if `entries` is empty. + val (m1, n1) = entries.map(entry => (entry.i, entry.j)).reduce { case ((i1, j1), (i2, j2)) => + (math.max(i1, i2), math.max(j1, j2)) + } + // There may be empty columns at the very right and empty rows at the very bottom. + nRows = math.max(nRows, m1 + 1L) + nCols = math.max(nCols, n1 + 1L) + } + + /** Collects data and assembles a local matrix. */ + private[mllib] override def toBreeze(): BDM[Double] = { + val m = numRows().toInt + val n = numCols().toInt + val mat = BDM.zeros[Double](m, n) + entries.collect().foreach { case MatrixEntry(i, j, value) => + mat(i.toInt, j.toInt) = value + } + mat + } } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/DistributedMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/DistributedMatrix.scala index 03ce8b55c5d53..13f72a3c724ef 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/DistributedMatrix.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/DistributedMatrix.scala @@ -17,6 +17,10 @@ package org.apache.spark.mllib.linalg.distributed +import breeze.linalg.{DenseMatrix => BDM} + +import org.apache.spark.mllib.linalg.Matrix + /** * Represents a distributively stored matrix backed by one or more RDDs. */ @@ -27,4 +31,7 @@ trait DistributedMatrix extends Serializable { /** Gets or computes the number of columns. */ def numCols(): Long + + /** Collects data and assembles a local dense breeze matrix (for test only). */ + private[mllib] def toBreeze(): BDM[Double] } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/DistributedMatrixEntry.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/DistributedMatrixEntry.scala deleted file mode 100644 index ab568b45151bf..0000000000000 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/DistributedMatrixEntry.scala +++ /dev/null @@ -1,26 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.mllib.linalg.distributed - -/** - * Represents an entry in an RDDMatrix. - * @param i row index - * @param j column index - * @param value value of the entry - */ -case class DistributedMatrixEntry(i: Long, j: Long, value: Double) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/IndexedRowMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/IndexedRowMatrix.scala index 01ca00a4429ea..e110f070bd7c1 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/IndexedRowMatrix.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/IndexedRowMatrix.scala @@ -17,14 +17,18 @@ package org.apache.spark.mllib.linalg.distributed +import breeze.linalg.{DenseMatrix => BDM} + import org.apache.spark.rdd.RDD -import org.apache.spark.mllib.linalg.Vector +import org.apache.spark.mllib.linalg._ +import org.apache.spark.mllib.linalg.SingularValueDecomposition -/** Represents a row of RowRDDMatrix. */ -case class IndexedMatrixRow(index: Long, vector: Vector) +/** Represents a row of [[org.apache.spark.mllib.linalg.distributed.IndexedRowMatrix]]. */ +case class IndexedRow(index: Long, vector: Vector) /** - * Represents a row-oriented RDDMatrix with indexed rows. + * Represents a row-oriented [[org.apache.spark.mllib.linalg.distributed.DistributedMatrix]] with + * indexed rows. * * @param rows indexed rows of this matrix * @param nRows number of rows. A non-positive value means unknown, and then the number of rows will @@ -33,14 +37,13 @@ case class IndexedMatrixRow(index: Long, vector: Vector) * columns will be determined by the size of the first row. */ class IndexedRowMatrix( - val rows: RDD[IndexedMatrixRow], + val rows: RDD[IndexedRow], private var nRows: Long, private var nCols: Int) extends DistributedMatrix { /** Alternative constructor leaving matrix dimensions to be determined automatically. */ - def this(rows: RDD[IndexedMatrixRow]) = this(rows, 0L, 0) + def this(rows: RDD[IndexedRow]) = this(rows, 0L, 0) - /** Gets or computes the number of columns. */ override def numCols(): Long = { if (nCols <= 0) { // Calling `first` will throw an exception if `rows` is empty. @@ -57,8 +60,89 @@ class IndexedRowMatrix( nRows } - /** Drops row indices and converts this matrix to a RowRDDMatrix. */ + /** + * Drops row indices and converts this matrix to a + * [[org.apache.spark.mllib.linalg.distributed.RowMatrix]]. + */ def toRowMatrix(): RowMatrix = { new RowMatrix(rows.map(_.vector), 0L, nCols) } + + /** + * Computes the singular value decomposition of this matrix. + * Denote this matrix by A (m x n), this will compute matrices U, S, V such that A = U * S * V'. + * + * There is no restriction on m, but we require `n^2` doubles to fit in memory. + * Further, n should be less than m. + + * The decomposition is computed by first computing A'A = V S^2 V', + * computing svd locally on that (since n x n is small), from which we recover S and V. + * Then we compute U via easy matrix multiplication as U = A * (V * S^-1). + * Note that this approach requires `O(n^3)` time on the master node. + * + * At most k largest non-zero singular values and associated vectors are returned. + * If there are k such values, then the dimensions of the return will be: + * + * U is an [[org.apache.spark.mllib.linalg.distributed.IndexedRowMatrix]] of size m x k that + * satisfies U'U = eye(k), + * s is a Vector of size k, holding the singular values in descending order, + * and V is a local Matrix of size n x k that satisfies V'V = eye(k). + * + * @param k number of singular values to keep. We might return less than k if there are + * numerically zero singular values. See rCond. + * @param computeU whether to compute U + * @param rCond the reciprocal condition number. All singular values smaller than rCond * sigma(0) + * are treated as zero, where sigma(0) is the largest singular value. + * @return SingularValueDecomposition(U, s, V) + */ + def computeSVD( + k: Int, + computeU: Boolean = false, + rCond: Double = 1e-9): SingularValueDecomposition[IndexedRowMatrix, Matrix] = { + val indices = rows.map(_.index) + val svd = toRowMatrix().computeSVD(k, computeU, rCond) + val U = if (computeU) { + val indexedRows = indices.zip(svd.U.rows).map { case (i, v) => + IndexedRow(i, v) + } + new IndexedRowMatrix(indexedRows, nRows, nCols) + } else { + null + } + SingularValueDecomposition(U, svd.s, svd.V) + } + + /** + * Multiply this matrix by a local matrix on the right. + * + * @param B a local matrix whose number of rows must match the number of columns of this matrix + * @return an IndexedRowMatrix representing the product, which preserves partitioning + */ + def multiply(B: Matrix): IndexedRowMatrix = { + val mat = toRowMatrix().multiply(B) + val indexedRows = rows.map(_.index).zip(mat.rows).map { case (i, v) => + IndexedRow(i, v) + } + new IndexedRowMatrix(indexedRows, nRows, nCols) + } + + /** + * Computes the Gramian matrix `A^T A`. + */ + def computeGramianMatrix(): Matrix = { + toRowMatrix().computeGramianMatrix() + } + + private[mllib] override def toBreeze(): BDM[Double] = { + val m = numRows().toInt + val n = numCols().toInt + val mat = BDM.zeros[Double](m, n) + rows.collect().foreach { case IndexedRow(rowIndex, vector) => + val i = rowIndex.toInt + vector.toBreeze.activeIterator.foreach { case (j, v) => + mat(i, j) = v + } + } + mat + } } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala index 2c13feba8bc28..24f6b72632b2d 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala @@ -28,7 +28,7 @@ import org.apache.spark.rdd.RDD import org.apache.spark.Logging /** - * Represents a row-oriented RDDMatrix with no meaningful row indices. + * Represents a row-oriented distributed Matrix with no meaningful row indices. * * @param rows rows stored as an RDD[Vector] * @param nRows number of rows. A non-positive value means unknown, and then the number of rows will @@ -99,7 +99,7 @@ class RowMatrix( * At most k largest non-zero singular values and associated vectors are returned. * If there are k such values, then the dimensions of the return will be: * - * U is a RowRDDMatrix of size m x k that satisfies U'U = eye(k), + * U is a RowMatrix of size m x k that satisfies U'U = eye(k), * s is a Vector of size k, holding the singular values in descending order, * and V is a Matrix of size n x k that satisfies V'V = eye(k). * @@ -237,7 +237,8 @@ class RowMatrix( * Multiply this matrix by a local matrix on the right. * * @param B a local matrix whose number of rows must match the number of columns of this matrix - * @return a RowRDDMatrix representing the product, which preserves partitioning + * @return a [[org.apache.spark.mllib.linalg.distributed.RowMatrix]] representing the product, + * which preserves partitioning */ def multiply(B: Matrix): RowMatrix = { val n = numCols().toInt @@ -254,6 +255,20 @@ class RowMatrix( new RowMatrix(AB, nRows, B.numCols) } + + private[mllib] override def toBreeze(): BDM[Double] = { + val m = numRows().toInt + val n = numCols().toInt + val mat = BDM.zeros[Double](m, n) + var i = 0 + rows.collect().foreach { v => + v.toBreeze.activeIterator.foreach { case (j, v) => + mat(i, j) = v + } + i += 1 + } + mat + } } object RowMatrix { diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/CoordinateMatrixSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/CoordinateMatrixSuite.scala index 5d07840d6d273..cd45438fb628f 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/CoordinateMatrixSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/CoordinateMatrixSuite.scala @@ -19,6 +19,8 @@ package org.apache.spark.mllib.linalg.distributed import org.scalatest.FunSuite +import breeze.linalg.{DenseMatrix => BDM} + import org.apache.spark.mllib.util.LocalSparkContext import org.apache.spark.mllib.linalg.Vectors @@ -40,7 +42,7 @@ class CoordinateMatrixSuite extends FunSuite with LocalSparkContext { (3, 0, 7.0), (3, 3, 8.0), (4, 1, 9.0)), 3).map { case (i, j, value) => - DistributedMatrixEntry(i, j, value) + MatrixEntry(i, j, value) } mat = new CoordinateMatrix(entries) } @@ -51,7 +53,7 @@ class CoordinateMatrixSuite extends FunSuite with LocalSparkContext { } test("empty entries") { - val entries = sc.parallelize(Seq[DistributedMatrixEntry](), 1) + val entries = sc.parallelize(Seq[MatrixEntry](), 1) val emptyMat = new CoordinateMatrix(entries) intercept[RuntimeException] { emptyMat.numCols() @@ -61,20 +63,36 @@ class CoordinateMatrixSuite extends FunSuite with LocalSparkContext { } } + test("toBreeze") { + val expected = BDM( + (1.0, 2.0, 0.0, 0.0), + (0.0, 3.0, 4.0, 0.0), + (0.0, 0.0, 5.0, 6.0), + (7.0, 0.0, 0.0, 8.0), + (0.0, 9.0, 0.0, 0.0)) + assert(mat.toBreeze() === expected) + } + test("toIndexedRowMatrix") { - val indexedRows = mat - .toIndexedRowMatrix() - .rows - .map(row => (row.index, row.vector)) - .collect() - .sortBy(_._1) - .toSeq - assert(indexedRows === Seq( - (0, Vectors.dense(1.0, 2.0, 0.0, 0.0)), - (1, Vectors.dense(0.0, 3.0, 4.0, 0.0)), - (2, Vectors.dense(0.0, 0.0, 5.0, 6.0)), - (3, Vectors.dense(7.0, 0.0, 0.0, 8.0)), - (4, Vectors.dense(0.0, 9.0, 0.0, 0.0)) - )) + val indexedRowMatrix = mat.toIndexedRowMatrix() + val expected = BDM( + (1.0, 2.0, 0.0, 0.0), + (0.0, 3.0, 4.0, 0.0), + (0.0, 0.0, 5.0, 6.0), + (7.0, 0.0, 0.0, 8.0), + (0.0, 9.0, 0.0, 0.0)) + assert(indexedRowMatrix.toBreeze() === expected) + } + + test("toRowMatrix") { + val rowMatrix = mat.toRowMatrix() + val rows = rowMatrix.rows.collect().toSet + val expected = Set( + Vectors.dense(1.0, 2.0, 0.0, 0.0), + Vectors.dense(0.0, 3.0, 4.0, 0.0), + Vectors.dense(0.0, 0.0, 5.0, 6.0), + Vectors.dense(7.0, 0.0, 0.0, 8.0), + Vectors.dense(0.0, 9.0, 0.0, 0.0)) + assert(rows === expected) } } diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/IndexedRowMatrixSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/IndexedRowMatrixSuite.scala new file mode 100644 index 0000000000000..f7c46f23b746d --- /dev/null +++ b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/IndexedRowMatrixSuite.scala @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.linalg.distributed + +import org.scalatest.FunSuite + +import breeze.linalg.{diag => brzDiag, DenseMatrix => BDM, DenseVector => BDV} + +import org.apache.spark.mllib.util.LocalSparkContext +import org.apache.spark.rdd.RDD +import org.apache.spark.mllib.linalg.{Matrices, Vectors} + +class IndexedRowMatrixSuite extends FunSuite with LocalSparkContext { + + val m = 4 + val n = 3 + val data = Seq( + (0L, Vectors.dense(0.0, 1.0, 2.0)), + (1L, Vectors.dense(3.0, 4.0, 5.0)), + (3L, Vectors.dense(9.0, 0.0, 1.0)) + ).map(x => IndexedRow(x._1, x._2)) + var indexedRows: RDD[IndexedRow] = _ + + override def beforeAll() { + super.beforeAll() + indexedRows = sc.parallelize(data, 2) + } + + test("size") { + val mat1 = new IndexedRowMatrix(indexedRows) + assert(mat1.numRows() === m) + assert(mat1.numCols() === n) + + val mat2 = new IndexedRowMatrix(indexedRows, 5, 0) + assert(mat2.numRows() === 5) + assert(mat2.numCols() === n) + } + + test("empty rows") { + val rows = sc.parallelize(Seq[IndexedRow](), 1) + val mat = new IndexedRowMatrix(rows) + intercept[RuntimeException] { + mat.numRows() + } + intercept[RuntimeException] { + mat.numCols() + } + } + + test("toBreeze") { + val mat = new IndexedRowMatrix(indexedRows) + val expected = BDM( + (0.0, 1.0, 2.0), + (3.0, 4.0, 5.0), + (0.0, 0.0, 0.0), + (9.0, 0.0, 1.0)) + assert(mat.toBreeze() === expected) + } + + test("toRowMatrix") { + val idxRowMat = new IndexedRowMatrix(indexedRows) + val rowMat = idxRowMat.toRowMatrix() + assert(rowMat.numCols() === n) + assert(rowMat.numRows() === 3, "should drop empty rows") + assert(rowMat.rows.collect().toSeq === data.map(_.vector).toSeq) + } + + test("multiply a local matrix") { + val A = new IndexedRowMatrix(indexedRows) + val B = Matrices.dense(3, 2, Array(0.0, 1.0, 2.0, 3.0, 4.0, 5.0)) + val C = A.multiply(B) + val localA = A.toBreeze() + val localC = C.toBreeze() + val expected = localA * B.toBreeze.asInstanceOf[BDM[Double]] + assert(localC === expected) + } + + test("gram") { + val A = new IndexedRowMatrix(indexedRows) + val G = A.computeGramianMatrix() + val expected = BDM( + (90.0, 12.0, 24.0), + (12.0, 17.0, 22.0), + (24.0, 22.0, 30.0)) + assert(G.toBreeze === expected) + } + + test("svd") { + val A = new IndexedRowMatrix(indexedRows) + val svd = A.computeSVD(n, computeU = true) + assert(svd.U.isInstanceOf[IndexedRowMatrix]) + val localA = A.toBreeze() + val U = svd.U.toBreeze() + val s = svd.s.toBreeze.asInstanceOf[BDV[Double]] + val V = svd.V.toBreeze.asInstanceOf[BDM[Double]] + assert(closeToZero(U.t * U - BDM.eye[Double](n))) + assert(closeToZero(V.t * V - BDM.eye[Double](n))) + assert(closeToZero(U * brzDiag(s) * V.t - localA)) + } + + def closeToZero(G: BDM[Double]): Boolean = { + G.valuesIterator.map(math.abs).sum < 1e-6 + } +} + diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala index 6422d4e15565d..410413621c44c 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala @@ -73,6 +73,17 @@ class RowMatrixSuite extends FunSuite with LocalSparkContext { } } + test("toBreeze") { + val expected = BDM( + (0.0, 1.0, 2.0), + (3.0, 4.0, 5.0), + (6.0, 7.0, 8.0), + (9.0, 0.0, 1.0)) + for (mat <- Seq(denseMat, sparseMat)) { + assert(mat.toBreeze() === expected) + } + } + test("gram") { val expected = Matrices.dense(n, n, Array(126.0, 54.0, 72.0, 54.0, 66.0, 78.0, 72.0, 78.0, 94.0))