Skip to content

Commit

Permalink
[SPARK-3974] Addressed @jkbradley's comments
Browse files Browse the repository at this point in the history
  • Loading branch information
brkyvz committed Jan 19, 2015
1 parent ba414d2 commit 239ab4b
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,14 @@ import org.apache.spark.util.Utils
*
* @param numRowBlocks Number of blocks that form the rows of the matrix.
* @param numColBlocks Number of blocks that form the columns of the matrix.
* @param rowPerBlock Number of rows that make up each block.
* @param colPerBlock Number of columns that make up each block.
* @param rowsPerBlock Number of rows that make up each block.
* @param colsPerBlock Number of columns that make up each block.
*/
private[mllib] class GridPartitioner(
val numRowBlocks: Int,
val numColBlocks: Int,
val rowPerBlock: Int,
val colPerBlock: Int,
val rowsPerBlock: Int,
val colsPerBlock: Int,
override val numPartitions: Int) extends Partitioner {

/**
Expand All @@ -52,10 +52,10 @@ private[mllib] class GridPartitioner(
*/
override def getPartition(key: Any): Int = {
key match {
case ind: (Int, Int) =>
Utils.nonNegativeMod(ind._1 + ind._2 * numRowBlocks, numPartitions)
case indices: (Int, Int, Int) =>
Utils.nonNegativeMod(indices._1 + indices._3 * numRowBlocks, numPartitions)
case (rowIndex: Int, colIndex: Int) =>
Utils.nonNegativeMod(rowIndex + colIndex * numRowBlocks, numPartitions)
case (rowIndex: Int, innerIndex: Int, colIndex: Int) =>
Utils.nonNegativeMod(rowIndex + colIndex * numRowBlocks, numPartitions)
case _ =>
throw new IllegalArgumentException("Unrecognized key")
}
Expand All @@ -65,8 +65,8 @@ private[mllib] class GridPartitioner(
override def equals(obj: Any): Boolean = {
obj match {
case r: GridPartitioner =>
(this.numPartitions == r.numPartitions) && (this.rowPerBlock == r.rowPerBlock) &&
(this.colPerBlock == r.colPerBlock)
(this.numPartitions == r.numPartitions) && (this.rowsPerBlock == r.rowsPerBlock) &&
(this.colsPerBlock == r.colsPerBlock)
case _ =>
false
}
Expand Down Expand Up @@ -94,18 +94,18 @@ class BlockMatrix(
* @param numRowBlocks Number of blocks that form the rows of this matrix
* @param numColBlocks Number of blocks that form the columns of this matrix
* @param rdd The RDD of SubMatrices (local matrices) that form this matrix
* @param rowPerBlock Number of rows that make up each block.
* @param colPerBlock Number of columns that make up each block.
* @param rowsPerBlock Number of rows that make up each block.
* @param colsPerBlock Number of columns that make up each block.
*/
def this(
numRowBlocks: Int,
numColBlocks: Int,
rdd: RDD[((Int, Int), Matrix)],
rowPerBlock: Int,
colPerBlock: Int) = {
rowsPerBlock: Int,
colsPerBlock: Int) = {
this(numRowBlocks, numColBlocks, rdd)
val part = new GridPartitioner(numRowBlocks, numColBlocks, rowPerBlock,
colPerBlock, rdd.partitions.length)
val part = new GridPartitioner(numRowBlocks, numColBlocks, rowsPerBlock,
colsPerBlock, rdd.partitions.length)
setPartitioner(part)
}

Expand All @@ -129,33 +129,35 @@ class BlockMatrix(
override def numCols(): Long = dims._2

/** Returns the dimensions of the matrix. */
def getDim: (Long, Long) = {
private def getDim: (Long, Long) = {
case class MatrixMetaData(var rowIndex: Int, var colIndex: Int,
var numRows: Int, var numCols: Int)
// picks the sizes of the matrix with the maximum indices
def pickSizeByGreaterIndex(
example: (Int, Int, Int, Int),
base: (Int, Int, Int, Int)): (Int, Int, Int, Int) = {
if (example._1 > base._1 && example._2 > base._2) {
(example._1, example._2, example._3, example._4)
} else if (example._1 > base._1) {
(example._1, base._2, example._3, base._4)
} else if (example._2 > base._2) {
(base._1, example._2, base._3, example._4)
} else {
(base._1, base._2, base._3, base._4)
def pickSizeByGreaterIndex(example: MatrixMetaData, base: MatrixMetaData): MatrixMetaData = {
if (example.rowIndex > base.rowIndex) {
base.rowIndex = example.rowIndex
base.numRows = example.numRows
}
if (example.colIndex > base.colIndex) {
base.colIndex = example.colIndex
base.numCols = example.numCols
}
base
}

val lastRowCol = rdd.treeAggregate((0, 0, 0, 0))(
val lastRowCol = rdd.treeAggregate(new MatrixMetaData(0, 0, 0, 0))(
seqOp = (c, v) => (c, v) match { case (base, ((blockXInd, blockYInd), mat)) =>
pickSizeByGreaterIndex((blockXInd, blockYInd, mat.numRows, mat.numCols), base)
pickSizeByGreaterIndex(
new MatrixMetaData(blockXInd, blockYInd, mat.numRows, mat.numCols), base)
},
combOp = (c1, c2) => (c1, c2) match {
case (res1, res2) =>
pickSizeByGreaterIndex(res1, res2)
})

(lastRowCol._1.toLong * partitioner.rowPerBlock + lastRowCol._3,
lastRowCol._2.toLong * partitioner.colPerBlock + lastRowCol._4)
// We add the size of the edge matrices, because they can be less than the specified
// rowsPerBlock or colsPerBlock.
(lastRowCol.rowIndex.toLong * partitioner.rowsPerBlock + lastRowCol.numRows,
lastRowCol.colIndex.toLong * partitioner.colsPerBlock + lastRowCol.numCols)
}

/** Returns the Frobenius Norm of the matrix */
Expand All @@ -165,7 +167,7 @@ class BlockMatrix(
sparse.values.map(x => math.pow(x, 2)).sum
case dense: DenseMatrix =>
dense.values.map(x => math.pow(x, 2)).sum
}
}
}.reduce(_ + _))
}

Expand All @@ -181,25 +183,29 @@ class BlockMatrix(
this
}

/** Collect the distributed matrix on the driver as a local matrix. */
/** Collect the distributed matrix on the driver as a `DenseMatrix`. */
def toLocalMatrix(): Matrix = {
val parts = rdd.collect().sortBy(x => (x._1._2, x._1._1))
require(numRows() < Int.MaxValue, "The number of rows of this matrix should be less than " +
s"Int.MaxValue. Currently numRows: ${numRows()}")
require(numCols() < Int.MaxValue, "The number of columns of this matrix should be less than " +
s"Int.MaxValue. Currently numCols: ${numCols()}")
val nRows = numRows().toInt
val nCols = numCols().toInt
val values = new Array[Double](nRows * nCols)
val mem = nRows * nCols * 8 / 1000000
if (mem > 500) logWarning(s"Storing this matrix will require $mem MB of memory!")

parts.foreach { part =>
val rowOffset = part._1._1 * partitioner.rowPerBlock
val colOffset = part._1._2 * partitioner.colPerBlock
val block = part._2
val parts = rdd.collect().sortBy(x => (x._1._2, x._1._1))
val values = new Array[Double](nRows * nCols)
parts.foreach { case ((rowIndex, colIndex), block) =>
val rowOffset = rowIndex * partitioner.rowsPerBlock
val colOffset = colIndex * partitioner.colsPerBlock
var j = 0
val mat = block.toArray
while (j < block.numCols) {
var i = 0
val indStart = (j + colOffset) * nRows + rowOffset
val indEnd = block.numRows
val matStart = j * block.numRows
val mat = block.toArray
while (i < indEnd) {
while (i < block.numRows) {
values(indStart + i) = mat(matStart + i)
i += 1
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,24 @@ import breeze.linalg.{DenseMatrix => BDM}
import org.apache.spark.mllib.linalg.{DenseMatrix, Matrices, Matrix}
import org.apache.spark.mllib.util.MLlibTestSparkContext

class BlockMatrixSuite extends FunSuite with MLlibTestSparkContext {

// Input values for the tests
private object BlockMatrixSuite {
val m = 5
val n = 4
val rowPerPart = 2
val colPerPart = 2
val numRowBlocks = 3
val numColBlocks = 2
}

class BlockMatrixSuite extends FunSuite with MLlibTestSparkContext {

val m = BlockMatrixSuite.m
val n = BlockMatrixSuite.n
val rowPerPart = BlockMatrixSuite.rowPerPart
val colPerPart = BlockMatrixSuite.colPerPart
val numRowBlocks = BlockMatrixSuite.numRowBlocks
val numColBlocks = BlockMatrixSuite.numColBlocks
var gridBasedMat: BlockMatrix = _
type SubMatrix = ((Int, Int), Matrix)

Expand Down Expand Up @@ -63,7 +73,7 @@ class BlockMatrixSuite extends FunSuite with MLlibTestSparkContext {
(0.0, 0.0, 1.0, 5.0))

val dense = Matrices.fromBreeze(expected).asInstanceOf[DenseMatrix]
assert(gridBasedMat.toBreeze() === expected)
assert(gridBasedMat.toLocalMatrix() === dense)
assert(gridBasedMat.toBreeze() === expected)
}
}

0 comments on commit 239ab4b

Please sign in to comment.