Skip to content

Commit

Permalink
updated API and modified partitioning scheme
Browse files Browse the repository at this point in the history
  • Loading branch information
brkyvz committed Jan 21, 2015
1 parent eebbdf7 commit f9d664b
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,27 +19,24 @@ package org.apache.spark.mllib.linalg.distributed

import breeze.linalg.{DenseMatrix => BDM}

import org.apache.spark._
import org.apache.spark.{Logging, Partitioner}
import org.apache.spark.mllib.linalg._
import org.apache.spark.mllib.rdd.RDDFunctions._
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.Utils

/**
* A grid partitioner, which stores every block in a separate partition.
*
* @param numRowBlocks Number of blocks that form the rows of the matrix.
* @param numColBlocks Number of blocks that form the columns of the matrix.
* @param rowsPerBlock Number of rows that make up each block.
* @param colsPerBlock Number of columns that make up each block.
*/
private[mllib] class GridPartitioner(
val numRowBlocks: Int,
val numColBlocks: Int,
val rowsPerBlock: Int,
val colsPerBlock: Int,
override val numPartitions: Int) extends Partitioner {
val numParts: Int) extends Partitioner {
// Having the number of partitions greater than the number of sub matrices does not help
override val numPartitions = math.min(numParts, numRowBlocks * numColBlocks)

/**
* Returns the index of the partition the SubMatrix belongs to. Tries to achieve block wise
Expand All @@ -52,25 +49,38 @@ private[mllib] class GridPartitioner(
* @return The index of the partition, which the SubMatrix belongs to.
*/
override def getPartition(key: Any): Int = {
val sqrtPartition = math.round(math.sqrt(numPartitions)).toInt
// numPartitions may not be the square of a number, it can even be a prime number

key match {
case (blockRowIndex: Int, blockColIndex: Int) =>
Utils.nonNegativeMod(blockRowIndex + blockColIndex * numRowBlocks, numPartitions)
getBlockId(blockRowIndex, blockColIndex)
case (blockRowIndex: Int, innerIndex: Int, blockColIndex: Int) =>
Utils.nonNegativeMod(blockRowIndex + blockColIndex * numRowBlocks, numPartitions)
getBlockId(blockRowIndex, blockColIndex)
case _ =>
throw new IllegalArgumentException(s"Unrecognized key. key: $key")
}
}

/** Partitions sub-matrices as blocks with neighboring sub-matrices. */
private def getBlockId(blockRowIndex: Int, blockColIndex: Int): Int = {
val totalBlocks = numRowBlocks * numColBlocks
// Gives the number of blocks that need to be in each partition
val partitionRatio = math.ceil(totalBlocks * 1.0 / numPartitions).toInt
// Number of neighboring blocks to take in each row
val subBlocksPerRow = math.ceil(numRowBlocks * 1.0 / partitionRatio).toInt
// Number of neighboring blocks to take in each column
val subBlocksPerCol = math.ceil(numColBlocks * 1.0 / partitionRatio).toInt
// Coordinates of the block
val i = blockRowIndex / subBlocksPerRow
val j = blockColIndex / subBlocksPerCol
val blocksPerRow = math.ceil(numRowBlocks * 1.0 / subBlocksPerRow).toInt
j * blocksPerRow + i
}

/** Checks whether the partitioners have the same characteristics */
override def equals(obj: Any): Boolean = {
obj match {
case r: GridPartitioner =>
(this.numRowBlocks == r.numRowBlocks) && (this.numColBlocks == r.numColBlocks)
(this.rowsPerBlock == r.rowsPerBlock) && (this.colsPerBlock == r.colsPerBlock)
(this.numRowBlocks == r.numRowBlocks) && (this.numColBlocks == r.numColBlocks) &&
(this.numPartitions == r.numPartitions)
case _ =>
false
}
Expand All @@ -80,49 +90,61 @@ private[mllib] class GridPartitioner(
/**
* Represents a distributed matrix in blocks of local matrices.
*
* @param rdd The RDD of SubMatrices (local matrices) that form this matrix
* @param nRows Number of rows of this matrix
* @param nCols Number of columns of this matrix
* @param numRowBlocks Number of blocks that form the rows of this matrix
* @param numColBlocks Number of blocks that form the columns of this matrix
* @param rdd The RDD of SubMatrices (local matrices) that form this matrix
* @param rowsPerBlock Number of rows that make up each block. The blocks forming the final
* rows are not required to have the given number of rows
* @param colsPerBlock Number of columns that make up each block. The blocks forming the final
* columns are not required to have the given number of columns
*/
class BlockMatrix(
val rdd: RDD[((Int, Int), Matrix)],
private var nRows: Long,
private var nCols: Long,
val numRowBlocks: Int,
val numColBlocks: Int,
val rdd: RDD[((Int, Int), Matrix)]) extends DistributedMatrix with Logging {
val rowsPerBlock: Int,
val colsPerBlock: Int) extends DistributedMatrix with Logging {

private type SubMatrix = ((Int, Int), Matrix) // ((blockRowIndex, blockColIndex), matrix)

/**
* Alternate constructor for BlockMatrix without the input of a partitioner. Will use a Grid
* Partitioner by default.
* Alternate constructor for BlockMatrix without the input of the number of rows and columns.
*
* @param rdd The RDD of SubMatrices (local matrices) that form this matrix
* @param numRowBlocks Number of blocks that form the rows of this matrix
* @param numColBlocks Number of blocks that form the columns of this matrix
* @param rdd The RDD of SubMatrices (local matrices) that form this matrix
* @param rowsPerBlock Number of rows that make up each block.
* @param colsPerBlock Number of columns that make up each block.
* @param rowsPerBlock Number of rows that make up each block. The blocks forming the final
* rows are not required to have the given number of rows
* @param colsPerBlock Number of columns that make up each block. The blocks forming the final
* columns are not required to have the given number of columns
*/
def this(
rdd: RDD[((Int, Int), Matrix)],
numRowBlocks: Int,
numColBlocks: Int,
rdd: RDD[((Int, Int), Matrix)],
rowsPerBlock: Int,
colsPerBlock: Int) = {
this(numRowBlocks, numColBlocks, rdd)
val part = new GridPartitioner(numRowBlocks, numColBlocks, rowsPerBlock,
colsPerBlock, rdd.partitions.length)
partitioner = part
this(rdd, 0L, 0L, numRowBlocks, numColBlocks, rowsPerBlock, colsPerBlock)
}

private[mllib] var partitioner: GridPartitioner = {
val firstSubMatrix = rdd.first()._2
new GridPartitioner(numRowBlocks, numColBlocks,
firstSubMatrix.numRows, firstSubMatrix.numCols, rdd.partitions.length)
}
private[mllib] var partitioner: GridPartitioner =
new GridPartitioner(numRowBlocks, numColBlocks, rdd.partitions.length)

private lazy val dims: (Long, Long) = getDim

override def numRows(): Long = dims._1
override def numCols(): Long = dims._2
override def numRows(): Long = {
if (nRows <= 0L) nRows = dims._1
nRows
}

override def numCols(): Long = {
if (nCols <= 0L) nCols = dims._2
nCols
}

/** Returns the dimensions of the matrix. */
private def getDim: (Long, Long) = {
Expand All @@ -141,6 +163,7 @@ class BlockMatrix(
base
}

// Aggregate will return an error if the rdd is empty
val lastRowCol = rdd.treeAggregate(new MatrixMetaData(0, 0, 0, 0))(
seqOp = (c, v) => (c, v) match { case (base, ((blockXInd, blockYInd), mat)) =>
pickSizeByGreaterIndex(
Expand All @@ -152,8 +175,8 @@ class BlockMatrix(
})
// We add the size of the edge matrices, because they can be less than the specified
// rowsPerBlock or colsPerBlock.
(lastRowCol.rowIndex.toLong * partitioner.rowsPerBlock + lastRowCol.numRows,
lastRowCol.colIndex.toLong * partitioner.colsPerBlock + lastRowCol.numCols)
(lastRowCol.rowIndex.toLong * rowsPerBlock + lastRowCol.numRows,
lastRowCol.colIndex.toLong * colsPerBlock + lastRowCol.numCols)
}

/** Returns the Frobenius Norm of the matrix */
Expand Down Expand Up @@ -193,8 +216,8 @@ class BlockMatrix(
val parts = rdd.collect().sortBy(x => (x._1._2, x._1._1))
val values = new Array[Double](nRows * nCols)
parts.foreach { case ((rowIndex, colIndex), block) =>
val rowOffset = rowIndex * partitioner.rowsPerBlock
val colOffset = colIndex * partitioner.colsPerBlock
val rowOffset = rowIndex * rowsPerBlock
val colOffset = colIndex * colsPerBlock
var j = 0
val mat = block.toArray
while (j < block.numCols) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ class BlockMatrixSuite extends FunSuite with MLlibTestSparkContext {
new SubMatrix((1, 1), new DenseMatrix(2, 2, Array(1.0, 2.0, 0.0, 1.0))),
new SubMatrix((2, 1), new DenseMatrix(1, 2, Array(1.0, 5.0))))

gridBasedMat = new BlockMatrix(numRowBlocks, numColBlocks, sc.parallelize(entries, 2))
gridBasedMat = new BlockMatrix(sc.parallelize(entries, 2), numRowBlocks, numColBlocks,
rowPerPart, colPerPart)
}

test("size and frobenius norm") {
Expand Down

0 comments on commit f9d664b

Please sign in to comment.