diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala index 19e896ec4158c..0b78630f85c15 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala @@ -31,14 +31,14 @@ import org.apache.spark.util.Utils * * @param numRowBlocks Number of blocks that form the rows of the matrix. * @param numColBlocks Number of blocks that form the columns of the matrix. - * @param rowPerBlock Number of rows that make up each block. - * @param colPerBlock Number of columns that make up each block. + * @param rowsPerBlock Number of rows that make up each block. + * @param colsPerBlock Number of columns that make up each block. */ private[mllib] class GridPartitioner( val numRowBlocks: Int, val numColBlocks: Int, - val rowPerBlock: Int, - val colPerBlock: Int, + val rowsPerBlock: Int, + val colsPerBlock: Int, override val numPartitions: Int) extends Partitioner { /** @@ -52,10 +52,10 @@ private[mllib] class GridPartitioner( */ override def getPartition(key: Any): Int = { key match { - case ind: (Int, Int) => - Utils.nonNegativeMod(ind._1 + ind._2 * numRowBlocks, numPartitions) - case indices: (Int, Int, Int) => - Utils.nonNegativeMod(indices._1 + indices._3 * numRowBlocks, numPartitions) + case (rowIndex: Int, colIndex: Int) => + Utils.nonNegativeMod(rowIndex + colIndex * numRowBlocks, numPartitions) + case (rowIndex: Int, innerIndex: Int, colIndex: Int) => + Utils.nonNegativeMod(rowIndex + colIndex * numRowBlocks, numPartitions) case _ => throw new IllegalArgumentException("Unrecognized key") } @@ -65,8 +65,8 @@ private[mllib] class GridPartitioner( override def equals(obj: Any): Boolean = { obj match { case r: GridPartitioner => - (this.numPartitions == r.numPartitions) && (this.rowPerBlock == r.rowPerBlock) && - (this.colPerBlock == r.colPerBlock) + (this.numPartitions == r.numPartitions) && (this.rowsPerBlock == r.rowsPerBlock) && + (this.colsPerBlock == r.colsPerBlock) case _ => false } @@ -94,18 +94,18 @@ class BlockMatrix( * @param numRowBlocks Number of blocks that form the rows of this matrix * @param numColBlocks Number of blocks that form the columns of this matrix * @param rdd The RDD of SubMatrices (local matrices) that form this matrix - * @param rowPerBlock Number of rows that make up each block. - * @param colPerBlock Number of columns that make up each block. + * @param rowsPerBlock Number of rows that make up each block. + * @param colsPerBlock Number of columns that make up each block. */ def this( numRowBlocks: Int, numColBlocks: Int, rdd: RDD[((Int, Int), Matrix)], - rowPerBlock: Int, - colPerBlock: Int) = { + rowsPerBlock: Int, + colsPerBlock: Int) = { this(numRowBlocks, numColBlocks, rdd) - val part = new GridPartitioner(numRowBlocks, numColBlocks, rowPerBlock, - colPerBlock, rdd.partitions.length) + val part = new GridPartitioner(numRowBlocks, numColBlocks, rowsPerBlock, + colsPerBlock, rdd.partitions.length) setPartitioner(part) } @@ -129,33 +129,35 @@ class BlockMatrix( override def numCols(): Long = dims._2 /** Returns the dimensions of the matrix. */ - def getDim: (Long, Long) = { + private def getDim: (Long, Long) = { + case class MatrixMetaData(var rowIndex: Int, var colIndex: Int, + var numRows: Int, var numCols: Int) // picks the sizes of the matrix with the maximum indices - def pickSizeByGreaterIndex( - example: (Int, Int, Int, Int), - base: (Int, Int, Int, Int)): (Int, Int, Int, Int) = { - if (example._1 > base._1 && example._2 > base._2) { - (example._1, example._2, example._3, example._4) - } else if (example._1 > base._1) { - (example._1, base._2, example._3, base._4) - } else if (example._2 > base._2) { - (base._1, example._2, base._3, example._4) - } else { - (base._1, base._2, base._3, base._4) + def pickSizeByGreaterIndex(example: MatrixMetaData, base: MatrixMetaData): MatrixMetaData = { + if (example.rowIndex > base.rowIndex) { + base.rowIndex = example.rowIndex + base.numRows = example.numRows } + if (example.colIndex > base.colIndex) { + base.colIndex = example.colIndex + base.numCols = example.numCols + } + base } - val lastRowCol = rdd.treeAggregate((0, 0, 0, 0))( + val lastRowCol = rdd.treeAggregate(new MatrixMetaData(0, 0, 0, 0))( seqOp = (c, v) => (c, v) match { case (base, ((blockXInd, blockYInd), mat)) => - pickSizeByGreaterIndex((blockXInd, blockYInd, mat.numRows, mat.numCols), base) + pickSizeByGreaterIndex( + new MatrixMetaData(blockXInd, blockYInd, mat.numRows, mat.numCols), base) }, combOp = (c1, c2) => (c1, c2) match { case (res1, res2) => pickSizeByGreaterIndex(res1, res2) }) - - (lastRowCol._1.toLong * partitioner.rowPerBlock + lastRowCol._3, - lastRowCol._2.toLong * partitioner.colPerBlock + lastRowCol._4) + // We add the size of the edge matrices, because they can be less than the specified + // rowsPerBlock or colsPerBlock. + (lastRowCol.rowIndex.toLong * partitioner.rowsPerBlock + lastRowCol.numRows, + lastRowCol.colIndex.toLong * partitioner.colsPerBlock + lastRowCol.numCols) } /** Returns the Frobenius Norm of the matrix */ @@ -165,7 +167,7 @@ class BlockMatrix( sparse.values.map(x => math.pow(x, 2)).sum case dense: DenseMatrix => dense.values.map(x => math.pow(x, 2)).sum - } + } }.reduce(_ + _)) } @@ -181,25 +183,29 @@ class BlockMatrix( this } - /** Collect the distributed matrix on the driver as a local matrix. */ + /** Collect the distributed matrix on the driver as a `DenseMatrix`. */ def toLocalMatrix(): Matrix = { - val parts = rdd.collect().sortBy(x => (x._1._2, x._1._1)) + require(numRows() < Int.MaxValue, "The number of rows of this matrix should be less than " + + s"Int.MaxValue. Currently numRows: ${numRows()}") + require(numCols() < Int.MaxValue, "The number of columns of this matrix should be less than " + + s"Int.MaxValue. Currently numCols: ${numCols()}") val nRows = numRows().toInt val nCols = numCols().toInt - val values = new Array[Double](nRows * nCols) + val mem = nRows * nCols * 8 / 1000000 + if (mem > 500) logWarning(s"Storing this matrix will require $mem MB of memory!") - parts.foreach { part => - val rowOffset = part._1._1 * partitioner.rowPerBlock - val colOffset = part._1._2 * partitioner.colPerBlock - val block = part._2 + val parts = rdd.collect().sortBy(x => (x._1._2, x._1._1)) + val values = new Array[Double](nRows * nCols) + parts.foreach { case ((rowIndex, colIndex), block) => + val rowOffset = rowIndex * partitioner.rowsPerBlock + val colOffset = colIndex * partitioner.colsPerBlock var j = 0 + val mat = block.toArray while (j < block.numCols) { var i = 0 val indStart = (j + colOffset) * nRows + rowOffset - val indEnd = block.numRows val matStart = j * block.numRows - val mat = block.toArray - while (i < indEnd) { + while (i < block.numRows) { values(indStart + i) = mat(matStart + i) i += 1 } diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrixSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrixSuite.scala index f299021268ddb..918a48578b959 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrixSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrixSuite.scala @@ -24,14 +24,24 @@ import breeze.linalg.{DenseMatrix => BDM} import org.apache.spark.mllib.linalg.{DenseMatrix, Matrices, Matrix} import org.apache.spark.mllib.util.MLlibTestSparkContext -class BlockMatrixSuite extends FunSuite with MLlibTestSparkContext { - +// Input values for the tests +private object BlockMatrixSuite { val m = 5 val n = 4 val rowPerPart = 2 val colPerPart = 2 val numRowBlocks = 3 val numColBlocks = 2 +} + +class BlockMatrixSuite extends FunSuite with MLlibTestSparkContext { + + val m = BlockMatrixSuite.m + val n = BlockMatrixSuite.n + val rowPerPart = BlockMatrixSuite.rowPerPart + val colPerPart = BlockMatrixSuite.colPerPart + val numRowBlocks = BlockMatrixSuite.numRowBlocks + val numColBlocks = BlockMatrixSuite.numColBlocks var gridBasedMat: BlockMatrix = _ type SubMatrix = ((Int, Int), Matrix) @@ -63,7 +73,7 @@ class BlockMatrixSuite extends FunSuite with MLlibTestSparkContext { (0.0, 0.0, 1.0, 5.0)) val dense = Matrices.fromBreeze(expected).asInstanceOf[DenseMatrix] - assert(gridBasedMat.toBreeze() === expected) assert(gridBasedMat.toLocalMatrix() === dense) + assert(gridBasedMat.toBreeze() === expected) } }