Skip to content

Commit

Permalink
add pca/gram to IndexedRowMatrix
Browse files Browse the repository at this point in the history
add toBreeze to DistributedMatrix for test
simplify tests
  • Loading branch information
mengxr committed Apr 8, 2014
1 parent b177ff1 commit 03cd7e1
Show file tree
Hide file tree
Showing 8 changed files with 326 additions and 66 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,20 @@

package org.apache.spark.mllib.linalg.distributed

import breeze.linalg.{DenseMatrix => BDM}

import org.apache.spark.rdd.RDD
import org.apache.spark.SparkContext._
import org.apache.spark.mllib.linalg.Vectors

/**
* Represents an entry in an distributed matrix.
* @param i row index
* @param j column index
* @param value value of the entry
*/
case class MatrixEntry(i: Long, j: Long, value: Double)

/**
* Represents a matrix in coordinate format.
*
Expand All @@ -31,12 +41,12 @@ import org.apache.spark.mllib.linalg.Vectors
* columns will be determined by the max column index plus one.
*/
class CoordinateMatrix(
val entries: RDD[DistributedMatrixEntry],
val entries: RDD[MatrixEntry],
private var nRows: Long,
private var nCols: Long) extends DistributedMatrix {

/** Alternative constructor leaving matrix dimensions to be determined automatically. */
def this(entries: RDD[DistributedMatrixEntry]) = this(entries, 0L, 0L)
def this(entries: RDD[MatrixEntry]) = this(entries, 0L, 0L)

/** Gets or computes the number of columns. */
override def numCols(): Long = {
Expand All @@ -54,16 +64,7 @@ class CoordinateMatrix(
nRows
}

private def computeSize() {
// Reduce will throw an exception if `entries` is empty.
val (m1, n1) = entries.map(entry => (entry.i, entry.j)).reduce { case ((i1, j1), (i2, j2)) =>
(math.max(i1, i2), math.max(j1, j2))
}
// There may be empty columns at the very right and empty rows at the very bottom.
nRows = math.max(nRows, m1 + 1L)
nCols = math.max(nCols, n1 + 1L)
}

/** Converts to IndexedRowMatrix. The number of columns must be within the integer range. */
def toIndexedRowMatrix(): IndexedRowMatrix = {
val nl = numCols()
if (nl > Int.MaxValue) {
Expand All @@ -74,8 +75,38 @@ class CoordinateMatrix(
val indexedRows = entries.map(entry => (entry.i, (entry.j.toInt, entry.value)))
.groupByKey()
.map { case (i, vectorEntries) =>
IndexedMatrixRow(i, Vectors.sparse(n, vectorEntries))
IndexedRow(i, Vectors.sparse(n, vectorEntries))
}
new IndexedRowMatrix(indexedRows, numRows(), n)
}

/**
* Converts to RowMatrix, dropping row indices after grouping by row index.
* The number of columns must be within the integer range.
*/
def toRowMatrix(): RowMatrix = {
toIndexedRowMatrix().toRowMatrix()
}

/** Determines the size by computing the max row/column index. */
private def computeSize() {
// Reduce will throw an exception if `entries` is empty.
val (m1, n1) = entries.map(entry => (entry.i, entry.j)).reduce { case ((i1, j1), (i2, j2)) =>
(math.max(i1, i2), math.max(j1, j2))
}
// There may be empty columns at the very right and empty rows at the very bottom.
nRows = math.max(nRows, m1 + 1L)
nCols = math.max(nCols, n1 + 1L)
}

/** Collects data and assembles a local matrix. */
private[mllib] override def toBreeze(): BDM[Double] = {
val m = numRows().toInt
val n = numCols().toInt
val mat = BDM.zeros[Double](m, n)
entries.collect().foreach { case MatrixEntry(i, j, value) =>
mat(i.toInt, j.toInt) = value
}
mat
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@

package org.apache.spark.mllib.linalg.distributed

import breeze.linalg.{DenseMatrix => BDM}

import org.apache.spark.mllib.linalg.Matrix

/**
* Represents a distributively stored matrix backed by one or more RDDs.
*/
Expand All @@ -27,4 +31,7 @@ trait DistributedMatrix extends Serializable {

/** Gets or computes the number of columns. */
def numCols(): Long

/** Collects data and assembles a local dense breeze matrix (for test only). */
private[mllib] def toBreeze(): BDM[Double]
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,18 @@

package org.apache.spark.mllib.linalg.distributed

import breeze.linalg.{DenseMatrix => BDM}

import org.apache.spark.rdd.RDD
import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.mllib.linalg._
import org.apache.spark.mllib.linalg.SingularValueDecomposition

/** Represents a row of RowRDDMatrix. */
case class IndexedMatrixRow(index: Long, vector: Vector)
/** Represents a row of [[org.apache.spark.mllib.linalg.distributed.IndexedRowMatrix]]. */
case class IndexedRow(index: Long, vector: Vector)

/**
* Represents a row-oriented RDDMatrix with indexed rows.
* Represents a row-oriented [[org.apache.spark.mllib.linalg.distributed.DistributedMatrix]] with
* indexed rows.
*
* @param rows indexed rows of this matrix
* @param nRows number of rows. A non-positive value means unknown, and then the number of rows will
Expand All @@ -33,14 +37,13 @@ case class IndexedMatrixRow(index: Long, vector: Vector)
* columns will be determined by the size of the first row.
*/
class IndexedRowMatrix(
val rows: RDD[IndexedMatrixRow],
val rows: RDD[IndexedRow],
private var nRows: Long,
private var nCols: Int) extends DistributedMatrix {

/** Alternative constructor leaving matrix dimensions to be determined automatically. */
def this(rows: RDD[IndexedMatrixRow]) = this(rows, 0L, 0)
def this(rows: RDD[IndexedRow]) = this(rows, 0L, 0)

/** Gets or computes the number of columns. */
override def numCols(): Long = {
if (nCols <= 0) {
// Calling `first` will throw an exception if `rows` is empty.
Expand All @@ -57,8 +60,89 @@ class IndexedRowMatrix(
nRows
}

/** Drops row indices and converts this matrix to a RowRDDMatrix. */
/**
* Drops row indices and converts this matrix to a
* [[org.apache.spark.mllib.linalg.distributed.RowMatrix]].
*/
def toRowMatrix(): RowMatrix = {
new RowMatrix(rows.map(_.vector), 0L, nCols)
}

/**
* Computes the singular value decomposition of this matrix.
* Denote this matrix by A (m x n), this will compute matrices U, S, V such that A = U * S * V'.
*
* There is no restriction on m, but we require `n^2` doubles to fit in memory.
* Further, n should be less than m.
* The decomposition is computed by first computing A'A = V S^2 V',
* computing svd locally on that (since n x n is small), from which we recover S and V.
* Then we compute U via easy matrix multiplication as U = A * (V * S^-1).
* Note that this approach requires `O(n^3)` time on the master node.
*
* At most k largest non-zero singular values and associated vectors are returned.
* If there are k such values, then the dimensions of the return will be:
*
* U is an [[org.apache.spark.mllib.linalg.distributed.IndexedRowMatrix]] of size m x k that
* satisfies U'U = eye(k),
* s is a Vector of size k, holding the singular values in descending order,
* and V is a local Matrix of size n x k that satisfies V'V = eye(k).
*
* @param k number of singular values to keep. We might return less than k if there are
* numerically zero singular values. See rCond.
* @param computeU whether to compute U
* @param rCond the reciprocal condition number. All singular values smaller than rCond * sigma(0)
* are treated as zero, where sigma(0) is the largest singular value.
* @return SingularValueDecomposition(U, s, V)
*/
def computeSVD(
k: Int,
computeU: Boolean = false,
rCond: Double = 1e-9): SingularValueDecomposition[IndexedRowMatrix, Matrix] = {
val indices = rows.map(_.index)
val svd = toRowMatrix().computeSVD(k, computeU, rCond)
val U = if (computeU) {
val indexedRows = indices.zip(svd.U.rows).map { case (i, v) =>
IndexedRow(i, v)
}
new IndexedRowMatrix(indexedRows, nRows, nCols)
} else {
null
}
SingularValueDecomposition(U, svd.s, svd.V)
}

/**
* Multiply this matrix by a local matrix on the right.
*
* @param B a local matrix whose number of rows must match the number of columns of this matrix
* @return an IndexedRowMatrix representing the product, which preserves partitioning
*/
def multiply(B: Matrix): IndexedRowMatrix = {
val mat = toRowMatrix().multiply(B)
val indexedRows = rows.map(_.index).zip(mat.rows).map { case (i, v) =>
IndexedRow(i, v)
}
new IndexedRowMatrix(indexedRows, nRows, nCols)
}

/**
* Computes the Gramian matrix `A^T A`.
*/
def computeGramianMatrix(): Matrix = {
toRowMatrix().computeGramianMatrix()
}

private[mllib] override def toBreeze(): BDM[Double] = {
val m = numRows().toInt
val n = numCols().toInt
val mat = BDM.zeros[Double](m, n)
rows.collect().foreach { case IndexedRow(rowIndex, vector) =>
val i = rowIndex.toInt
vector.toBreeze.activeIterator.foreach { case (j, v) =>
mat(i, j) = v
}
}
mat
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.Logging

/**
* Represents a row-oriented RDDMatrix with no meaningful row indices.
* Represents a row-oriented distributed Matrix with no meaningful row indices.
*
* @param rows rows stored as an RDD[Vector]
* @param nRows number of rows. A non-positive value means unknown, and then the number of rows will
Expand Down Expand Up @@ -99,7 +99,7 @@ class RowMatrix(
* At most k largest non-zero singular values and associated vectors are returned.
* If there are k such values, then the dimensions of the return will be:
*
* U is a RowRDDMatrix of size m x k that satisfies U'U = eye(k),
* U is a RowMatrix of size m x k that satisfies U'U = eye(k),
* s is a Vector of size k, holding the singular values in descending order,
* and V is a Matrix of size n x k that satisfies V'V = eye(k).
*
Expand Down Expand Up @@ -237,7 +237,8 @@ class RowMatrix(
* Multiply this matrix by a local matrix on the right.
*
* @param B a local matrix whose number of rows must match the number of columns of this matrix
* @return a RowRDDMatrix representing the product, which preserves partitioning
* @return a [[org.apache.spark.mllib.linalg.distributed.RowMatrix]] representing the product,
* which preserves partitioning
*/
def multiply(B: Matrix): RowMatrix = {
val n = numCols().toInt
Expand All @@ -254,6 +255,20 @@ class RowMatrix(

new RowMatrix(AB, nRows, B.numCols)
}

private[mllib] override def toBreeze(): BDM[Double] = {
val m = numRows().toInt
val n = numCols().toInt
val mat = BDM.zeros[Double](m, n)
var i = 0
rows.collect().foreach { v =>
v.toBreeze.activeIterator.foreach { case (j, v) =>
mat(i, j) = v
}
i += 1
}
mat
}
}

object RowMatrix {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package org.apache.spark.mllib.linalg.distributed

import org.scalatest.FunSuite

import breeze.linalg.{DenseMatrix => BDM}

import org.apache.spark.mllib.util.LocalSparkContext
import org.apache.spark.mllib.linalg.Vectors

Expand All @@ -40,7 +42,7 @@ class CoordinateMatrixSuite extends FunSuite with LocalSparkContext {
(3, 0, 7.0),
(3, 3, 8.0),
(4, 1, 9.0)), 3).map { case (i, j, value) =>
DistributedMatrixEntry(i, j, value)
MatrixEntry(i, j, value)
}
mat = new CoordinateMatrix(entries)
}
Expand All @@ -51,7 +53,7 @@ class CoordinateMatrixSuite extends FunSuite with LocalSparkContext {
}

test("empty entries") {
val entries = sc.parallelize(Seq[DistributedMatrixEntry](), 1)
val entries = sc.parallelize(Seq[MatrixEntry](), 1)
val emptyMat = new CoordinateMatrix(entries)
intercept[RuntimeException] {
emptyMat.numCols()
Expand All @@ -61,20 +63,36 @@ class CoordinateMatrixSuite extends FunSuite with LocalSparkContext {
}
}

test("toBreeze") {
val expected = BDM(
(1.0, 2.0, 0.0, 0.0),
(0.0, 3.0, 4.0, 0.0),
(0.0, 0.0, 5.0, 6.0),
(7.0, 0.0, 0.0, 8.0),
(0.0, 9.0, 0.0, 0.0))
assert(mat.toBreeze() === expected)
}

test("toIndexedRowMatrix") {
val indexedRows = mat
.toIndexedRowMatrix()
.rows
.map(row => (row.index, row.vector))
.collect()
.sortBy(_._1)
.toSeq
assert(indexedRows === Seq(
(0, Vectors.dense(1.0, 2.0, 0.0, 0.0)),
(1, Vectors.dense(0.0, 3.0, 4.0, 0.0)),
(2, Vectors.dense(0.0, 0.0, 5.0, 6.0)),
(3, Vectors.dense(7.0, 0.0, 0.0, 8.0)),
(4, Vectors.dense(0.0, 9.0, 0.0, 0.0))
))
val indexedRowMatrix = mat.toIndexedRowMatrix()
val expected = BDM(
(1.0, 2.0, 0.0, 0.0),
(0.0, 3.0, 4.0, 0.0),
(0.0, 0.0, 5.0, 6.0),
(7.0, 0.0, 0.0, 8.0),
(0.0, 9.0, 0.0, 0.0))
assert(indexedRowMatrix.toBreeze() === expected)
}

test("toRowMatrix") {
val rowMatrix = mat.toRowMatrix()
val rows = rowMatrix.rows.collect().toSet
val expected = Set(
Vectors.dense(1.0, 2.0, 0.0, 0.0),
Vectors.dense(0.0, 3.0, 4.0, 0.0),
Vectors.dense(0.0, 0.0, 5.0, 6.0),
Vectors.dense(7.0, 0.0, 0.0, 8.0),
Vectors.dense(0.0, 9.0, 0.0, 0.0))
assert(rows === expected)
}
}
Loading

0 comments on commit 03cd7e1

Please sign in to comment.