diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/TallSkinnyPCA.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/TallSkinnyPCA.scala index a0945761b5e15..4d5560c5350b2 100644 --- a/examples/src/main/scala/org/apache/spark/examples/mllib/TallSkinnyPCA.scala +++ b/examples/src/main/scala/org/apache/spark/examples/mllib/TallSkinnyPCA.scala @@ -18,7 +18,7 @@ package org.apache.spark.examples.mllib import org.apache.spark.{SparkConf, SparkContext} -import org.apache.spark.mllib.linalg.rdd.RowRDDMatrix +import org.apache.spark.mllib.linalg.distributed.RowMatrix import org.apache.spark.mllib.linalg.Vectors /** @@ -52,7 +52,7 @@ object TallSkinnyPCA { val values = line.split(' ').map(_.toDouble) Vectors.dense(values) } - val mat = new RowRDDMatrix(rows) + val mat = new RowMatrix(rows) // Compute principal components. val pc = mat.computePrincipalComponents(mat.numCols().toInt) diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/TallSkinnySVD.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/TallSkinnySVD.scala index 0f7825e10defa..0b92d7c934207 100644 --- a/examples/src/main/scala/org/apache/spark/examples/mllib/TallSkinnySVD.scala +++ b/examples/src/main/scala/org/apache/spark/examples/mllib/TallSkinnySVD.scala @@ -18,7 +18,7 @@ package org.apache.spark.examples.mllib import org.apache.spark.{SparkConf, SparkContext} -import org.apache.spark.mllib.linalg.rdd.RowRDDMatrix +import org.apache.spark.mllib.linalg.distributed.RowMatrix import org.apache.spark.mllib.linalg.Vectors /** @@ -52,7 +52,7 @@ object TallSkinnySVD { val values = line.split(' ').map(_.toDouble) Vectors.dense(values) } - val mat = new RowRDDMatrix(rows) + val mat = new RowMatrix(rows) // Compute SVD. val svd = mat.computeSVD(mat.numCols().toInt) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala index a0365338da0c9..51b2b0bcfd3c6 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala @@ -20,7 +20,7 @@ package org.apache.spark.mllib.linalg import breeze.linalg.{Matrix => BM, DenseMatrix => BDM} /** - * Trait for matrix. + * Trait for a local matrix. */ trait Matrix extends Serializable { @@ -56,10 +56,13 @@ class DenseMatrix(val numRows: Int, val numCols: Int, val values: Array[Double]) private[mllib] override def toBreeze: BM[Double] = new BDM[Double](numRows, numCols, values) } +/** + * Factory methods for [[org.apache.spark.mllib.linalg.Matrix]]. + */ object Matrices { /** - * Creates a dense matrix. + * Creates a column-majored dense matrix. * * @param numRows number of rows * @param numCols number of columns diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/rdd/CoordinateRDDMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/CoordinateMatrix.scala similarity index 55% rename from mllib/src/main/scala/org/apache/spark/mllib/linalg/rdd/CoordinateRDDMatrix.scala rename to mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/CoordinateMatrix.scala index c6af5fb833b05..09825a4d1e923 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/rdd/CoordinateRDDMatrix.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/CoordinateMatrix.scala @@ -15,59 +15,67 @@ * limitations under the License. */ -package org.apache.spark.mllib.linalg.rdd +package org.apache.spark.mllib.linalg.distributed import org.apache.spark.rdd.RDD import org.apache.spark.SparkContext._ import org.apache.spark.mllib.linalg.Vectors /** - * Represents a matrix in coordinate list format. + * Represents a matrix in coordinate format. * * @param entries matrix entries - * @param m number of rows (default: -1L, which means unknown) - * @param n number of column (default: -1L, which means unknown) + * @param nRows number of rows. A non-positive value means unknown, and then the number of rows will + * be determined by the max row index plus one. + * @param nCols number of columns. A non-positive value means unknown, and then the number of + * columns will be determined by the max column index plus one. */ -class CoordinateRDDMatrix( - val entries: RDD[RDDMatrixEntry], - m: Long = -1L, - n: Long = -1L) extends RDDMatrix { +class CoordinateMatrix( + val entries: RDD[DistributedMatrixEntry], + private var nRows: Long, + private var nCols: Long) extends DistributedMatrix { - private var _m = m - private var _n = n + /** Alternative constructor leaving matrix dimensions to be determined automatically. */ + def this(entries: RDD[DistributedMatrixEntry]) = this(entries, 0L, 0L) /** Gets or computes the number of columns. */ override def numCols(): Long = { - if (_n < 0) { + if (nCols <= 0L) { computeSize() } - _n + nCols } /** Gets or computes the number of rows. */ override def numRows(): Long = { - if (_m < 0) { + if (nRows <= 0L) { computeSize() } - _m + nRows } private def computeSize() { + // Reduce will throw an exception if `entries` is empty. val (m1, n1) = entries.map(entry => (entry.i, entry.j)).reduce { case ((i1, j1), (i2, j2)) => (math.max(i1, i2), math.max(j1, j2)) } // There may be empty columns at the very right and empty rows at the very bottom. - _m = math.max(_m, m1 + 1L) - _n = math.max(_n, n1 + 1L) + nRows = math.max(nRows, m1 + 1L) + nCols = math.max(nCols, n1 + 1L) } - def toIndexedRowRDDMatrix(): IndexedRowRDDMatrix = { - val n = numCols().toInt + def toIndexedRowMatrix(): IndexedRowMatrix = { + val nl = numCols() + if (nl > Int.MaxValue) { + sys.error(s"Cannot convert to a row-oriented format because the number of columns $nl is " + + "too large.") + } + val n = nl.toInt val indexedRows = entries.map(entry => (entry.i, (entry.j.toInt, entry.value))) .groupByKey() .map { case (i, vectorEntries) => - IndexedRDDMatrixRow(i, Vectors.sparse(n, vectorEntries)) - } - new IndexedRowRDDMatrix(indexedRows, numRows(), numCols()) + IndexedMatrixRow(i, Vectors.sparse(n, vectorEntries)) + } + new IndexedRowMatrix(indexedRows, numRows(), n) } } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/rdd/RDDMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/DistributedMatrix.scala similarity index 84% rename from mllib/src/main/scala/org/apache/spark/mllib/linalg/rdd/RDDMatrix.scala rename to mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/DistributedMatrix.scala index 9452593ed091f..03ce8b55c5d53 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/rdd/RDDMatrix.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/DistributedMatrix.scala @@ -15,12 +15,12 @@ * limitations under the License. */ -package org.apache.spark.mllib.linalg.rdd +package org.apache.spark.mllib.linalg.distributed /** - * Represents a matrix backed by one or more RDDs. + * Represents a distributively stored matrix backed by one or more RDDs. */ -trait RDDMatrix extends Serializable { +trait DistributedMatrix extends Serializable { /** Gets or computes the number of rows. */ def numRows(): Long diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/rdd/RDDMatrixEntry.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/DistributedMatrixEntry.scala similarity index 88% rename from mllib/src/main/scala/org/apache/spark/mllib/linalg/rdd/RDDMatrixEntry.scala rename to mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/DistributedMatrixEntry.scala index 79dadf1fbb1fe..ab568b45151bf 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/rdd/RDDMatrixEntry.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/DistributedMatrixEntry.scala @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.spark.mllib.linalg.rdd +package org.apache.spark.mllib.linalg.distributed /** * Represents an entry in an RDDMatrix. @@ -23,4 +23,4 @@ package org.apache.spark.mllib.linalg.rdd * @param j column index * @param value value of the entry */ -case class RDDMatrixEntry(i: Long, j: Long, value: Double) +case class DistributedMatrixEntry(i: Long, j: Long, value: Double) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/rdd/IndexedRowRDDMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/IndexedRowMatrix.scala similarity index 53% rename from mllib/src/main/scala/org/apache/spark/mllib/linalg/rdd/IndexedRowRDDMatrix.scala rename to mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/IndexedRowMatrix.scala index c3e50f1160175..01ca00a4429ea 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/rdd/IndexedRowRDDMatrix.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/IndexedRowMatrix.scala @@ -15,46 +15,50 @@ * limitations under the License. */ -package org.apache.spark.mllib.linalg.rdd +package org.apache.spark.mllib.linalg.distributed import org.apache.spark.rdd.RDD import org.apache.spark.mllib.linalg.Vector /** Represents a row of RowRDDMatrix. */ -case class IndexedRDDMatrixRow(index: Long, vector: Vector) +case class IndexedMatrixRow(index: Long, vector: Vector) /** * Represents a row-oriented RDDMatrix with indexed rows. * * @param rows indexed rows of this matrix - * @param m number of rows, where a negative number means unknown - * @param n number of cols, where a negative number means unknown + * @param nRows number of rows. A non-positive value means unknown, and then the number of rows will + * be determined by the max row index plus one. + * @param nCols number of columns. A non-positive value means unknown, and then the number of + * columns will be determined by the size of the first row. */ -class IndexedRowRDDMatrix( - val rows: RDD[IndexedRDDMatrixRow], - m: Long = -1L, - n: Long = -1L) extends RDDMatrix { +class IndexedRowMatrix( + val rows: RDD[IndexedMatrixRow], + private var nRows: Long, + private var nCols: Int) extends DistributedMatrix { - private var _m = m - private var _n = n + /** Alternative constructor leaving matrix dimensions to be determined automatically. */ + def this(rows: RDD[IndexedMatrixRow]) = this(rows, 0L, 0) /** Gets or computes the number of columns. */ override def numCols(): Long = { - if (_n < 0) { - _n = rows.first().vector.size + if (nCols <= 0) { + // Calling `first` will throw an exception if `rows` is empty. + nCols = rows.first().vector.size } - _n + nCols } override def numRows(): Long = { - if (_m < 0) { - _m = rows.map(_.index).reduce(math.max) + 1 + if (nRows <= 0L) { + // Reduce will throw an exception if `rows` is empty. + nRows = rows.map(_.index).reduce(math.max) + 1L } - _m + nRows } /** Drops row indices and converts this matrix to a RowRDDMatrix. */ - def toRowRDDMatrix(): RowRDDMatrix = { - new RowRDDMatrix(rows.map(_.vector), -1, _n) + def toRowMatrix(): RowMatrix = { + new RowMatrix(rows.map(_.vector), 0L, nCols) } } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/rdd/RowRDDMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala similarity index 86% rename from mllib/src/main/scala/org/apache/spark/mllib/linalg/rdd/RowRDDMatrix.scala rename to mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala index 9d64fbea68e89..2c13feba8bc28 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/rdd/RowRDDMatrix.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala @@ -15,12 +15,10 @@ * limitations under the License. */ -package org.apache.spark.mllib.linalg.rdd +package org.apache.spark.mllib.linalg.distributed import java.util -import scala.util.control.Breaks._ - import breeze.linalg.{DenseMatrix => BDM, DenseVector => BDV, svd => brzSvd} import breeze.numerics.{sqrt => brzSqrt} import com.github.fommil.netlib.BLAS.{getInstance => blas} @@ -33,31 +31,38 @@ import org.apache.spark.Logging * Represents a row-oriented RDDMatrix with no meaningful row indices. * * @param rows rows stored as an RDD[Vector] - * @param m number of rows - * @param n number of columns + * @param nRows number of rows. A non-positive value means unknown, and then the number of rows will + * be determined by the number of records in the RDD `rows`. + * @param nCols number of columns. A non-positive value means unknown, and then the number of + * columns will be determined by the size of the first row. */ -class RowRDDMatrix( +class RowMatrix( val rows: RDD[Vector], - m: Long = -1L, - n: Long = -1) extends RDDMatrix with Logging { + private var nRows: Long, + private var nCols: Int) extends DistributedMatrix with Logging { - private var _m = m - private var _n = n + /** Alternative constructor leaving matrix dimensions to be determined automatically. */ + def this(rows: RDD[Vector]) = this(rows, 0L, 0) /** Gets or computes the number of columns. */ override def numCols(): Long = { - if (_n < 0) { - _n = rows.first().size + if (nCols <= 0) { + // Calling `first` will throw an exception if `rows` is empty. + nCols = rows.first().size } - _n + nCols } /** Gets or computes the number of rows. */ override def numRows(): Long = { - if (_m < 0) { - _m = rows.count() + if (nRows <= 0L) { + nRows = rows.count() + if (nRows == 0L) { + sys.error("Cannot determine the number of rows because it is not specified in the " + + "constructor and the rows RDD is empty.") + } } - _m + nRows } /** @@ -70,13 +75,13 @@ class RowRDDMatrix( // Compute the upper triangular part of the gram matrix. val GU = rows.aggregate(new BDV[Double](new Array[Double](nt)))( seqOp = (U, v) => { - RowRDDMatrix.dspr(1.0, v, U.data) + RowMatrix.dspr(1.0, v, U.data) U }, combOp = (U1, U2) => U1 += U2 ) - RowRDDMatrix.triuToFull(n, GU.data) + RowMatrix.triuToFull(n, GU.data) } /** @@ -108,10 +113,8 @@ class RowRDDMatrix( def computeSVD( k: Int, computeU: Boolean = false, - rCond: Double = 1e-9): SingularValueDecomposition[RowRDDMatrix, Matrix] = { - + rCond: Double = 1e-9): SingularValueDecomposition[RowMatrix, Matrix] = { val n = numCols().toInt - require(k > 0 && k <= n, s"Request up to n singular values k=$k n=$n.") val G = computeGramianMatrix() @@ -125,13 +128,8 @@ class RowRDDMatrix( val sigma0 = sigmas(0) val threshold = rCond * sigma0 var i = 0 - breakable { - while (i < k) { - if (sigmas(i) < threshold) { - break() - } - i += 1 - } + while (i < k && sigmas(i) >= threshold) { + i += 1 } val sk = i @@ -178,11 +176,11 @@ class RowRDDMatrix( ) // Update _m if it is not set, or verify its value. - if (_m < 0L) { - _m = m + if (nRows <= 0L) { + nRows = m } else { - require(_m == m, - s"The number of rows $m is different from what specified or previously computed: ${_m}.") + require(nRows == m, + s"The number of rows $m is different from what specified or previously computed: ${nRows}.") } mean :/= m.toDouble @@ -241,7 +239,7 @@ class RowRDDMatrix( * @param B a local matrix whose number of rows must match the number of columns of this matrix * @return a RowRDDMatrix representing the product, which preserves partitioning */ - def multiply(B: Matrix): RowRDDMatrix = { + def multiply(B: Matrix): RowMatrix = { val n = numCols().toInt require(n == B.numRows, s"Dimension mismatch: $n vs ${B.numRows}") @@ -254,11 +252,11 @@ class RowRDDMatrix( iter.map(v => Vectors.fromBreeze(Bi.t * v.toBreeze)) }, preservesPartitioning = true) - new RowRDDMatrix(AB, _m, B.numCols) + new RowMatrix(AB, nRows, B.numCols) } } -object RowRDDMatrix { +object RowMatrix { /** * Adds alpha * x * x.t to a matrix in-place. This is the same as BLAS's DSPR. diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/rdd/CoordinateRDDMatrixSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/CoordinateMatrixSuite.scala similarity index 75% rename from mllib/src/test/scala/org/apache/spark/mllib/linalg/rdd/CoordinateRDDMatrixSuite.scala rename to mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/CoordinateMatrixSuite.scala index 5a5af5106b510..5d07840d6d273 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/linalg/rdd/CoordinateRDDMatrixSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/CoordinateMatrixSuite.scala @@ -15,18 +15,18 @@ * limitations under the License. */ -package org.apache.spark.mllib.linalg.rdd +package org.apache.spark.mllib.linalg.distributed import org.scalatest.FunSuite import org.apache.spark.mllib.util.LocalSparkContext import org.apache.spark.mllib.linalg.Vectors -class CoordinateRDDMatrixSuite extends FunSuite with LocalSparkContext { +class CoordinateMatrixSuite extends FunSuite with LocalSparkContext { val m = 5 val n = 4 - var mat: CoordinateRDDMatrix = _ + var mat: CoordinateMatrix = _ override def beforeAll() { super.beforeAll() @@ -40,9 +40,9 @@ class CoordinateRDDMatrixSuite extends FunSuite with LocalSparkContext { (3, 0, 7.0), (3, 3, 8.0), (4, 1, 9.0)), 3).map { case (i, j, value) => - RDDMatrixEntry(i, j, value) + DistributedMatrixEntry(i, j, value) } - mat = new CoordinateRDDMatrix(entries) + mat = new CoordinateMatrix(entries) } test("size") { @@ -50,9 +50,20 @@ class CoordinateRDDMatrixSuite extends FunSuite with LocalSparkContext { assert(mat.numCols() === n) } - test("toIndexedRowRDDMatrix") { + test("empty entries") { + val entries = sc.parallelize(Seq[DistributedMatrixEntry](), 1) + val emptyMat = new CoordinateMatrix(entries) + intercept[RuntimeException] { + emptyMat.numCols() + } + intercept[RuntimeException] { + emptyMat.numRows() + } + } + + test("toIndexedRowMatrix") { val indexedRows = mat - .toIndexedRowRDDMatrix() + .toIndexedRowMatrix() .rows .map(row => (row.index, row.vector)) .collect() diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/rdd/RowRDDMatrixSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala similarity index 87% rename from mllib/src/test/scala/org/apache/spark/mllib/linalg/rdd/RowRDDMatrixSuite.scala rename to mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala index 17ea149a0862e..6422d4e15565d 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/linalg/rdd/RowRDDMatrixSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala @@ -15,16 +15,16 @@ * limitations under the License. */ -package org.apache.spark.mllib.linalg.rdd +package org.apache.spark.mllib.linalg.distributed import org.scalatest.FunSuite import breeze.linalg.{DenseVector => BDV, DenseMatrix => BDM, diag => brzDiag, norm => brzNorm} import org.apache.spark.mllib.util.LocalSparkContext -import org.apache.spark.mllib.linalg.{Matrices, Vectors, Matrix} +import org.apache.spark.mllib.linalg.{Matrices, Vectors, Vector, Matrix} -class RowRDDMatrixSuite extends FunSuite with LocalSparkContext { +class RowMatrixSuite extends FunSuite with LocalSparkContext { val m = 4 val n = 3 @@ -46,13 +46,13 @@ class RowRDDMatrixSuite extends FunSuite with LocalSparkContext { Array(0.0, math.sqrt(2.0) / 2.0, math.sqrt(2.0) / 2.0, 1.0, 0.0, 0.0, 0.0, math.sqrt(2.0) / 2.0, - math.sqrt(2.0) / 2.0)) - var denseMat: RowRDDMatrix = _ - var sparseMat: RowRDDMatrix = _ + var denseMat: RowMatrix = _ + var sparseMat: RowMatrix = _ override def beforeAll() { super.beforeAll() - denseMat = new RowRDDMatrix(sc.parallelize(denseData, 2)) - sparseMat = new RowRDDMatrix(sc.parallelize(sparseData, 2)) + denseMat = new RowMatrix(sc.parallelize(denseData, 2)) + sparseMat = new RowMatrix(sc.parallelize(sparseData, 2)) } test("size") { @@ -62,6 +62,17 @@ class RowRDDMatrixSuite extends FunSuite with LocalSparkContext { assert(sparseMat.numCols() === n) } + test("empty rows") { + val rows = sc.parallelize(Seq[Vector](), 1) + val emptyMat = new RowMatrix(rows) + intercept[RuntimeException] { + emptyMat.numCols() + } + intercept[RuntimeException] { + emptyMat.numRows() + } + } + test("gram") { val expected = Matrices.dense(n, n, Array(126.0, 54.0, 72.0, 54.0, 66.0, 78.0, 72.0, 78.0, 94.0))