Skip to content

Commit

Permalink
address Matei's comments
Browse files Browse the repository at this point in the history
  • Loading branch information
mengxr committed Apr 7, 2014
1 parent be119fe commit b177ff1
Show file tree
Hide file tree
Showing 10 changed files with 134 additions and 99 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.spark.examples.mllib

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.mllib.linalg.rdd.RowRDDMatrix
import org.apache.spark.mllib.linalg.distributed.RowMatrix
import org.apache.spark.mllib.linalg.Vectors

/**
Expand Down Expand Up @@ -52,7 +52,7 @@ object TallSkinnyPCA {
val values = line.split(' ').map(_.toDouble)
Vectors.dense(values)
}
val mat = new RowRDDMatrix(rows)
val mat = new RowMatrix(rows)

// Compute principal components.
val pc = mat.computePrincipalComponents(mat.numCols().toInt)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.spark.examples.mllib

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.mllib.linalg.rdd.RowRDDMatrix
import org.apache.spark.mllib.linalg.distributed.RowMatrix
import org.apache.spark.mllib.linalg.Vectors

/**
Expand Down Expand Up @@ -52,7 +52,7 @@ object TallSkinnySVD {
val values = line.split(' ').map(_.toDouble)
Vectors.dense(values)
}
val mat = new RowRDDMatrix(rows)
val mat = new RowMatrix(rows)

// Compute SVD.
val svd = mat.computeSVD(mat.numCols().toInt)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.spark.mllib.linalg
import breeze.linalg.{Matrix => BM, DenseMatrix => BDM}

/**
* Trait for matrix.
* Trait for a local matrix.
*/
trait Matrix extends Serializable {

Expand Down Expand Up @@ -56,10 +56,13 @@ class DenseMatrix(val numRows: Int, val numCols: Int, val values: Array[Double])
private[mllib] override def toBreeze: BM[Double] = new BDM[Double](numRows, numCols, values)
}

/**
* Factory methods for [[org.apache.spark.mllib.linalg.Matrix]].
*/
object Matrices {

/**
* Creates a dense matrix.
* Creates a column-majored dense matrix.
*
* @param numRows number of rows
* @param numCols number of columns
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,59 +15,67 @@
* limitations under the License.
*/

package org.apache.spark.mllib.linalg.rdd
package org.apache.spark.mllib.linalg.distributed

import org.apache.spark.rdd.RDD
import org.apache.spark.SparkContext._
import org.apache.spark.mllib.linalg.Vectors

/**
* Represents a matrix in coordinate list format.
* Represents a matrix in coordinate format.
*
* @param entries matrix entries
* @param m number of rows (default: -1L, which means unknown)
* @param n number of column (default: -1L, which means unknown)
* @param nRows number of rows. A non-positive value means unknown, and then the number of rows will
* be determined by the max row index plus one.
* @param nCols number of columns. A non-positive value means unknown, and then the number of
* columns will be determined by the max column index plus one.
*/
class CoordinateRDDMatrix(
val entries: RDD[RDDMatrixEntry],
m: Long = -1L,
n: Long = -1L) extends RDDMatrix {
class CoordinateMatrix(
val entries: RDD[DistributedMatrixEntry],
private var nRows: Long,
private var nCols: Long) extends DistributedMatrix {

private var _m = m
private var _n = n
/** Alternative constructor leaving matrix dimensions to be determined automatically. */
def this(entries: RDD[DistributedMatrixEntry]) = this(entries, 0L, 0L)

/** Gets or computes the number of columns. */
override def numCols(): Long = {
if (_n < 0) {
if (nCols <= 0L) {
computeSize()
}
_n
nCols
}

/** Gets or computes the number of rows. */
override def numRows(): Long = {
if (_m < 0) {
if (nRows <= 0L) {
computeSize()
}
_m
nRows
}

private def computeSize() {
// Reduce will throw an exception if `entries` is empty.
val (m1, n1) = entries.map(entry => (entry.i, entry.j)).reduce { case ((i1, j1), (i2, j2)) =>
(math.max(i1, i2), math.max(j1, j2))
}
// There may be empty columns at the very right and empty rows at the very bottom.
_m = math.max(_m, m1 + 1L)
_n = math.max(_n, n1 + 1L)
nRows = math.max(nRows, m1 + 1L)
nCols = math.max(nCols, n1 + 1L)
}

def toIndexedRowRDDMatrix(): IndexedRowRDDMatrix = {
val n = numCols().toInt
def toIndexedRowMatrix(): IndexedRowMatrix = {
val nl = numCols()
if (nl > Int.MaxValue) {
sys.error(s"Cannot convert to a row-oriented format because the number of columns $nl is " +
"too large.")
}
val n = nl.toInt
val indexedRows = entries.map(entry => (entry.i, (entry.j.toInt, entry.value)))
.groupByKey()
.map { case (i, vectorEntries) =>
IndexedRDDMatrixRow(i, Vectors.sparse(n, vectorEntries))
}
new IndexedRowRDDMatrix(indexedRows, numRows(), numCols())
IndexedMatrixRow(i, Vectors.sparse(n, vectorEntries))
}
new IndexedRowMatrix(indexedRows, numRows(), n)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@
* limitations under the License.
*/

package org.apache.spark.mllib.linalg.rdd
package org.apache.spark.mllib.linalg.distributed

/**
* Represents a matrix backed by one or more RDDs.
* Represents a distributively stored matrix backed by one or more RDDs.
*/
trait RDDMatrix extends Serializable {
trait DistributedMatrix extends Serializable {

/** Gets or computes the number of rows. */
def numRows(): Long
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@
* limitations under the License.
*/

package org.apache.spark.mllib.linalg.rdd
package org.apache.spark.mllib.linalg.distributed

/**
* Represents an entry in an RDDMatrix.
* @param i row index
* @param j column index
* @param value value of the entry
*/
case class RDDMatrixEntry(i: Long, j: Long, value: Double)
case class DistributedMatrixEntry(i: Long, j: Long, value: Double)
Original file line number Diff line number Diff line change
Expand Up @@ -15,46 +15,50 @@
* limitations under the License.
*/

package org.apache.spark.mllib.linalg.rdd
package org.apache.spark.mllib.linalg.distributed

import org.apache.spark.rdd.RDD
import org.apache.spark.mllib.linalg.Vector

/** Represents a row of RowRDDMatrix. */
case class IndexedRDDMatrixRow(index: Long, vector: Vector)
case class IndexedMatrixRow(index: Long, vector: Vector)

/**
* Represents a row-oriented RDDMatrix with indexed rows.
*
* @param rows indexed rows of this matrix
* @param m number of rows, where a negative number means unknown
* @param n number of cols, where a negative number means unknown
* @param nRows number of rows. A non-positive value means unknown, and then the number of rows will
* be determined by the max row index plus one.
* @param nCols number of columns. A non-positive value means unknown, and then the number of
* columns will be determined by the size of the first row.
*/
class IndexedRowRDDMatrix(
val rows: RDD[IndexedRDDMatrixRow],
m: Long = -1L,
n: Long = -1L) extends RDDMatrix {
class IndexedRowMatrix(
val rows: RDD[IndexedMatrixRow],
private var nRows: Long,
private var nCols: Int) extends DistributedMatrix {

private var _m = m
private var _n = n
/** Alternative constructor leaving matrix dimensions to be determined automatically. */
def this(rows: RDD[IndexedMatrixRow]) = this(rows, 0L, 0)

/** Gets or computes the number of columns. */
override def numCols(): Long = {
if (_n < 0) {
_n = rows.first().vector.size
if (nCols <= 0) {
// Calling `first` will throw an exception if `rows` is empty.
nCols = rows.first().vector.size
}
_n
nCols
}

override def numRows(): Long = {
if (_m < 0) {
_m = rows.map(_.index).reduce(math.max) + 1
if (nRows <= 0L) {
// Reduce will throw an exception if `rows` is empty.
nRows = rows.map(_.index).reduce(math.max) + 1L
}
_m
nRows
}

/** Drops row indices and converts this matrix to a RowRDDMatrix. */
def toRowRDDMatrix(): RowRDDMatrix = {
new RowRDDMatrix(rows.map(_.vector), -1, _n)
def toRowMatrix(): RowMatrix = {
new RowMatrix(rows.map(_.vector), 0L, nCols)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,10 @@
* limitations under the License.
*/

package org.apache.spark.mllib.linalg.rdd
package org.apache.spark.mllib.linalg.distributed

import java.util

import scala.util.control.Breaks._

import breeze.linalg.{DenseMatrix => BDM, DenseVector => BDV, svd => brzSvd}
import breeze.numerics.{sqrt => brzSqrt}
import com.github.fommil.netlib.BLAS.{getInstance => blas}
Expand All @@ -33,31 +31,38 @@ import org.apache.spark.Logging
* Represents a row-oriented RDDMatrix with no meaningful row indices.
*
* @param rows rows stored as an RDD[Vector]
* @param m number of rows
* @param n number of columns
* @param nRows number of rows. A non-positive value means unknown, and then the number of rows will
* be determined by the number of records in the RDD `rows`.
* @param nCols number of columns. A non-positive value means unknown, and then the number of
* columns will be determined by the size of the first row.
*/
class RowRDDMatrix(
class RowMatrix(
val rows: RDD[Vector],
m: Long = -1L,
n: Long = -1) extends RDDMatrix with Logging {
private var nRows: Long,
private var nCols: Int) extends DistributedMatrix with Logging {

private var _m = m
private var _n = n
/** Alternative constructor leaving matrix dimensions to be determined automatically. */
def this(rows: RDD[Vector]) = this(rows, 0L, 0)

/** Gets or computes the number of columns. */
override def numCols(): Long = {
if (_n < 0) {
_n = rows.first().size
if (nCols <= 0) {
// Calling `first` will throw an exception if `rows` is empty.
nCols = rows.first().size
}
_n
nCols
}

/** Gets or computes the number of rows. */
override def numRows(): Long = {
if (_m < 0) {
_m = rows.count()
if (nRows <= 0L) {
nRows = rows.count()
if (nRows == 0L) {
sys.error("Cannot determine the number of rows because it is not specified in the " +
"constructor and the rows RDD is empty.")
}
}
_m
nRows
}

/**
Expand All @@ -70,13 +75,13 @@ class RowRDDMatrix(
// Compute the upper triangular part of the gram matrix.
val GU = rows.aggregate(new BDV[Double](new Array[Double](nt)))(
seqOp = (U, v) => {
RowRDDMatrix.dspr(1.0, v, U.data)
RowMatrix.dspr(1.0, v, U.data)
U
},
combOp = (U1, U2) => U1 += U2
)

RowRDDMatrix.triuToFull(n, GU.data)
RowMatrix.triuToFull(n, GU.data)
}

/**
Expand Down Expand Up @@ -108,10 +113,8 @@ class RowRDDMatrix(
def computeSVD(
k: Int,
computeU: Boolean = false,
rCond: Double = 1e-9): SingularValueDecomposition[RowRDDMatrix, Matrix] = {

rCond: Double = 1e-9): SingularValueDecomposition[RowMatrix, Matrix] = {
val n = numCols().toInt

require(k > 0 && k <= n, s"Request up to n singular values k=$k n=$n.")

val G = computeGramianMatrix()
Expand All @@ -125,13 +128,8 @@ class RowRDDMatrix(
val sigma0 = sigmas(0)
val threshold = rCond * sigma0
var i = 0
breakable {
while (i < k) {
if (sigmas(i) < threshold) {
break()
}
i += 1
}
while (i < k && sigmas(i) >= threshold) {
i += 1
}
val sk = i

Expand Down Expand Up @@ -178,11 +176,11 @@ class RowRDDMatrix(
)

// Update _m if it is not set, or verify its value.
if (_m < 0L) {
_m = m
if (nRows <= 0L) {
nRows = m
} else {
require(_m == m,
s"The number of rows $m is different from what specified or previously computed: ${_m}.")
require(nRows == m,
s"The number of rows $m is different from what specified or previously computed: ${nRows}.")
}

mean :/= m.toDouble
Expand Down Expand Up @@ -241,7 +239,7 @@ class RowRDDMatrix(
* @param B a local matrix whose number of rows must match the number of columns of this matrix
* @return a RowRDDMatrix representing the product, which preserves partitioning
*/
def multiply(B: Matrix): RowRDDMatrix = {
def multiply(B: Matrix): RowMatrix = {
val n = numCols().toInt
require(n == B.numRows, s"Dimension mismatch: $n vs ${B.numRows}")

Expand All @@ -254,11 +252,11 @@ class RowRDDMatrix(
iter.map(v => Vectors.fromBreeze(Bi.t * v.toBreeze))
}, preservesPartitioning = true)

new RowRDDMatrix(AB, _m, B.numCols)
new RowMatrix(AB, nRows, B.numCols)
}
}

object RowRDDMatrix {
object RowMatrix {

/**
* Adds alpha * x * x.t to a matrix in-place. This is the same as BLAS's DSPR.
Expand Down
Loading

0 comments on commit b177ff1

Please sign in to comment.