Skip to content

Commit

Permalink
[SPARK-3974] Modifications cleaning code up, making size calculation …
Browse files Browse the repository at this point in the history
…more robust
  • Loading branch information
brkyvz committed Jan 14, 2015
1 parent 9ae85aa commit ab6cde0
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 215 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,97 +20,51 @@ package org.apache.spark.mllib.linalg.distributed
import breeze.linalg.{DenseMatrix => BDM}

import org.apache.spark._
import org.apache.spark.mllib.linalg.DenseMatrix
import org.apache.spark.mllib.linalg._
import org.apache.spark.mllib.rdd.RDDFunctions._
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.Utils

/**
* Represents a local matrix that makes up one block of a distributed BlockMatrix
*
* @param blockRowIndex The row index of this block. Must be zero based.
* @param blockColIndex The column index of this block. Must be zero based.
* @param mat The underlying local matrix
*/
case class SubMatrix(blockRowIndex: Int, blockColIndex: Int, mat: DenseMatrix) extends Serializable

/**
* A partitioner that decides how the matrix is distributed in the cluster
* A grid partitioner, which stores every block in a separate partition.
*
* @param numPartitions Number of partitions
* @param numRowBlocks Number of blocks that form the rows of the matrix.
* @param numColBlocks Number of blocks that form the columns of the matrix.
* @param rowPerBlock Number of rows that make up each block.
* @param colPerBlock Number of columns that make up each block.
*/
private[mllib] abstract class BlockMatrixPartitioner(
override val numPartitions: Int,
private[mllib] class GridPartitioner(
val numRowBlocks: Int,
val numColBlocks: Int,
val rowPerBlock: Int,
val colPerBlock: Int) extends Partitioner {
val name: String
val colPerBlock: Int,
override val numPartitions: Int) extends Partitioner {

/**
* Returns the index of the partition the SubMatrix belongs to.
*
* @param key The key for the SubMatrix. Can be its row index, column index or position in the
* grid.
* @param key The key for the SubMatrix. Can be its position in the grid (its column major index)
* or a tuple of three integers that are the final row index after the multiplication,
* the index of the block to multiply with, and the final column index after the
* multiplication.
* @return The index of the partition, which the SubMatrix belongs to.
*/
override def getPartition(key: Any): Int = {
Utils.nonNegativeMod(key.asInstanceOf[Int], numPartitions)
}
}

/**
* A grid partitioner, which stores every block in a separate partition.
*
* @param numRowBlocks Number of blocks that form the rows of the matrix.
* @param numColBlocks Number of blocks that form the columns of the matrix.
* @param rowPerBlock Number of rows that make up each block.
* @param colPerBlock Number of columns that make up each block.
*/
class GridPartitioner(
val numRowBlocks: Int,
val numColBlocks: Int,
override val rowPerBlock: Int,
override val colPerBlock: Int)
extends BlockMatrixPartitioner(numRowBlocks * numColBlocks, rowPerBlock, colPerBlock) {

override val name = "grid"

override val numPartitions = numRowBlocks * numColBlocks

/** Checks whether the partitioners have the same characteristics */
override def equals(obj: Any): Boolean = {
obj match {
case r: GridPartitioner =>
(this.numPartitions == r.numPartitions) && (this.rowPerBlock == r.rowPerBlock) &&
(this.colPerBlock == r.colPerBlock)
key match {
case ind: (Int, Int) =>
Utils.nonNegativeMod(ind._1 + ind._2 * numRowBlocks, numPartitions)
case indices: (Int, Int, Int) =>
Utils.nonNegativeMod(indices._1 + indices._3 * numRowBlocks, numPartitions)
case _ =>
false
throw new IllegalArgumentException("Unrecognized key")
}
}
}

/**
* A specialized partitioner that stores all blocks in the same row in just one partition.
*
* @param numPartitions Number of partitions. Should be set as the number of blocks that form
* the rows of the matrix.
* @param rowPerBlock Number of rows that make up each block.
* @param colPerBlock Number of columns that make up each block.
*/
class RowBasedPartitioner(
override val numPartitions: Int,
override val rowPerBlock: Int,
override val colPerBlock: Int)
extends BlockMatrixPartitioner(numPartitions, rowPerBlock, colPerBlock) {

override val name = "row"

/** Checks whether the partitioners have the same characteristics */
override def equals(obj: Any): Boolean = {
obj match {
case r: RowBasedPartitioner =>
case r: GridPartitioner =>
(this.numPartitions == r.numPartitions) && (this.rowPerBlock == r.rowPerBlock) &&
(this.colPerBlock == r.colPerBlock)
case _ =>
Expand All @@ -119,36 +73,6 @@ class RowBasedPartitioner(
}
}

/**
* A specialized partitioner that stores all blocks in the same column in just one partition.
*
* @param numPartitions Number of partitions. Should be set as the number of blocks that form
* the columns of the matrix.
* @param rowPerBlock Number of rows that make up each block.
* @param colPerBlock Number of columns that make up each block.
*/
class ColumnBasedPartitioner(
override val numPartitions: Int,
override val rowPerBlock: Int,
override val colPerBlock: Int)
extends BlockMatrixPartitioner(numPartitions, rowPerBlock, colPerBlock) {

override val name = "column"

/** Checks whether the partitioners have the same characteristics */
override def equals(obj: Any): Boolean = {
obj match {
case p: ColumnBasedPartitioner =>
(this.numPartitions == p.numPartitions) && (this.rowPerBlock == p.rowPerBlock) &&
(this.colPerBlock == p.colPerBlock)
case r: RowBasedPartitioner =>
(this.numPartitions == r.numPartitions) && (this.colPerBlock == r.rowPerBlock)
case _ =>
false
}
}
}

/**
* Represents a distributed matrix in blocks of local matrices.
*
Expand All @@ -159,7 +83,9 @@ class ColumnBasedPartitioner(
class BlockMatrix(
val numRowBlocks: Int,
val numColBlocks: Int,
val rdd: RDD[SubMatrix]) extends DistributedMatrix with Logging {
val rdd: RDD[((Int, Int), Matrix)]) extends DistributedMatrix with Logging {

type SubMatrix = ((Int, Int), Matrix) // ((blockRowIndex, blockColIndex), matrix)

/**
* Alternate constructor for BlockMatrix without the input of a partitioner. Will use a Grid
Expand All @@ -168,125 +94,92 @@ class BlockMatrix(
* @param numRowBlocks Number of blocks that form the rows of this matrix
* @param numColBlocks Number of blocks that form the columns of this matrix
* @param rdd The RDD of SubMatrices (local matrices) that form this matrix
* @param partitioner A partitioner that specifies how SubMatrices are stored in the cluster
* @param rowPerBlock Number of rows that make up each block.
* @param colPerBlock Number of columns that make up each block.
*/
def this(
numRowBlocks: Int,
numColBlocks: Int,
rdd: RDD[SubMatrix],
partitioner: BlockMatrixPartitioner) = {
rdd: RDD[((Int, Int), Matrix)],
rowPerBlock: Int,
colPerBlock: Int) = {
this(numRowBlocks, numColBlocks, rdd)
setPartitioner(partitioner)
val part = new GridPartitioner(numRowBlocks, numColBlocks, rowPerBlock, colPerBlock, rdd.partitions.length)
setPartitioner(part)
}

private[mllib] var partitioner: BlockMatrixPartitioner = {
val firstSubMatrix = rdd.first().mat
private[mllib] var partitioner: GridPartitioner = {
val firstSubMatrix = rdd.first()._2
new GridPartitioner(numRowBlocks, numColBlocks,
firstSubMatrix.numRows, firstSubMatrix.numCols)
firstSubMatrix.numRows, firstSubMatrix.numCols, rdd.partitions.length)
}

/**
* Set the partitioner for the matrix. For internal use only. Users should use `repartition`.
* @param part A partitioner that specifies how SubMatrices are stored in the cluster
*/
private def setPartitioner(part: BlockMatrixPartitioner): Unit = {
private def setPartitioner(part: GridPartitioner): Unit = {
partitioner = part
}

// A key-value pair RDD is required to partition properly
private var matrixRDD: RDD[(Int, SubMatrix)] = keyBy()

private lazy val dims: (Long, Long) = getDim

override def numRows(): Long = dims._1
override def numCols(): Long = dims._2

if (partitioner.name.equals("column")) {
require(numColBlocks == partitioner.numPartitions, "The number of column blocks should match" +
s" the number of partitions of the column partitioner. numColBlocks: $numColBlocks, " +
s"partitioner.numPartitions: ${partitioner.numPartitions}")
} else if (partitioner.name.equals("row")) {
require(numRowBlocks == partitioner.numPartitions, "The number of row blocks should match" +
s" the number of partitions of the row partitioner. numRowBlocks: $numRowBlocks, " +
s"partitioner.numPartitions: ${partitioner.numPartitions}")
} else if (partitioner.name.equals("grid")) {
require(numRowBlocks * numColBlocks == partitioner.numPartitions, "The number of blocks " +
s"should match the number of partitions of the grid partitioner. numRowBlocks * " +
s"numColBlocks: ${numRowBlocks * numColBlocks}, " +
s"partitioner.numPartitions: ${partitioner.numPartitions}")
} else {
throw new IllegalArgumentException("Unrecognized partitioner.")
}

/** Returns the dimensions of the matrix. */
def getDim: (Long, Long) = {

val firstRowColumn = rdd.filter(block => block.blockRowIndex == 0 || block.blockColIndex == 0).
map { block =>
((block.blockRowIndex, block.blockColIndex), (block.mat.numRows, block.mat.numCols))
// picks the sizes of the matrix with the maximum indices
def pickSizeByGreaterIndex(example: (Int, Int, Int, Int), base: (Int, Int, Int, Int)): (Int, Int, Int, Int) = {
if (example._1 > base._1 && example._2 > base._2) {
(example._1, example._2, example._3, example._4)
} else if (example._1 > base._1) {
(example._1, base._2, example._3, base._4)
} else if (example._2 > base._2) {
(base._1, example._2, base._3, example._4)
} else {
(base._1, base._2, base._3, base._4)
}
}

firstRowColumn.treeAggregate((0L, 0L))(
seqOp = (c, v) => (c, v) match { case ((x_dim, y_dim), ((indX, indY), (nRow, nCol))) =>
if (indX == 0 && indY == 0) {
(x_dim + nRow, y_dim + nCol)
} else if (indX == 0) {
(x_dim, y_dim + nCol)
} else {
(x_dim + nRow, y_dim)
}
val lastRowCol = rdd.treeAggregate((0, 0, 0, 0))(
seqOp = (c, v) => (c, v) match { case (base, ((blockXInd, blockYInd), mat)) =>
pickSizeByGreaterIndex((blockXInd, blockYInd, mat.numRows, mat.numCols), base)
},
combOp = (c1, c2) => (c1, c2) match {
case ((x_dim1, y_dim1), (x_dim2, y_dim2)) =>
(x_dim1 + x_dim2, y_dim1 + y_dim2)
case (res1, res2) =>
pickSizeByGreaterIndex(res1, res2)
})

(lastRowCol._1.toLong * partitioner.rowPerBlock + lastRowCol._3,
lastRowCol._2.toLong * partitioner.colPerBlock + lastRowCol._4)
}

/** Returns the Frobenius Norm of the matrix */
def normFro(): Double = {
math.sqrt(rdd.map(lm => lm.mat.values.map(x => math.pow(x, 2)).sum).reduce(_ + _))
math.sqrt(rdd.map {
case sparse: ((Int, Int), SparseMatrix) =>
sparse._2.values.map(x => math.pow(x, 2)).sum
case dense: ((Int, Int), DenseMatrix) =>
dense._2.values.map(x => math.pow(x, 2)).sum
}.reduce(_ + _))
}

/** Cache the underlying RDD. */
def cache(): DistributedMatrix = {
matrixRDD.cache()
rdd.cache()
this
}

/** Set the storage level for the underlying RDD. */
def persist(storageLevel: StorageLevel): DistributedMatrix = {
matrixRDD.persist(storageLevel)
this
}

/** Add a key to the underlying rdd for partitioning and joins. */
private def keyBy(part: BlockMatrixPartitioner = partitioner): RDD[(Int, SubMatrix)] = {
rdd.map { block =>
part match {
case r: RowBasedPartitioner => (block.blockRowIndex, block)
case c: ColumnBasedPartitioner => (block.blockColIndex, block)
case g: GridPartitioner => (block.blockRowIndex + numRowBlocks * block.blockColIndex, block)
case _ => throw new IllegalArgumentException("Unrecognized partitioner")
}
}
}

/**
* Repartition the BlockMatrix using a different partitioner.
*
* @param part The partitioner to partition by
* @return The repartitioned BlockMatrix
*/
def repartition(part: BlockMatrixPartitioner): DistributedMatrix = {
matrixRDD = keyBy(part)
setPartitioner(part)
rdd.persist(storageLevel)
this
}

/** Collect the distributed matrix on the driver. */
def collect(): DenseMatrix = {
val parts = rdd.map(x => ((x.blockRowIndex, x.blockColIndex), x.mat)).
collect().sortBy(x => (x._1._2, x._1._1))
def toLocalMatrix(): Matrix = {
val parts = rdd.collect().sortBy(x => (x._1._2, x._1._1))
val nRows = numRows().toInt
val nCols = numCols().toInt
val values = new Array[Double](nRows * nCols)
Expand All @@ -301,7 +194,7 @@ class BlockMatrix(
val indStart = (j + colStart) * nRows + rowStart
val indEnd = block.numRows
val matStart = j * block.numRows
val mat = block.values
val mat = block.toArray
while (i < indEnd) {
values(indStart + i) = mat(matStart + i)
i += 1
Expand All @@ -316,7 +209,7 @@ class BlockMatrix(

/** Collects data and assembles a local dense breeze matrix (for test only). */
private[mllib] def toBreeze(): BDM[Double] = {
val localMat = collect()
new BDM[Double](localMat.numRows, localMat.numCols, localMat.values)
val localMat = toLocalMatrix()
new BDM[Double](localMat.numRows, localMat.numCols, localMat.toArray)
}
}
Loading

0 comments on commit ab6cde0

Please sign in to comment.