diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala index a4b2bf42390f2..19e896ec4158c 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala @@ -104,7 +104,8 @@ class BlockMatrix( rowPerBlock: Int, colPerBlock: Int) = { this(numRowBlocks, numColBlocks, rdd) - val part = new GridPartitioner(numRowBlocks, numColBlocks, rowPerBlock, colPerBlock, rdd.partitions.length) + val part = new GridPartitioner(numRowBlocks, numColBlocks, rowPerBlock, + colPerBlock, rdd.partitions.length) setPartitioner(part) } @@ -130,7 +131,9 @@ class BlockMatrix( /** Returns the dimensions of the matrix. */ def getDim: (Long, Long) = { // picks the sizes of the matrix with the maximum indices - def pickSizeByGreaterIndex(example: (Int, Int, Int, Int), base: (Int, Int, Int, Int)): (Int, Int, Int, Int) = { + def pickSizeByGreaterIndex( + example: (Int, Int, Int, Int), + base: (Int, Int, Int, Int)): (Int, Int, Int, Int) = { if (example._1 > base._1 && example._2 > base._2) { (example._1, example._2, example._3, example._4) } else if (example._1 > base._1) { @@ -157,11 +160,12 @@ class BlockMatrix( /** Returns the Frobenius Norm of the matrix */ def normFro(): Double = { - math.sqrt(rdd.map { - case sparse: ((Int, Int), SparseMatrix) => - sparse._2.values.map(x => math.pow(x, 2)).sum - case dense: ((Int, Int), DenseMatrix) => - dense._2.values.map(x => math.pow(x, 2)).sum + math.sqrt(rdd.map { mat => mat._2 match { + case sparse: SparseMatrix => + sparse.values.map(x => math.pow(x, 2)).sum + case dense: DenseMatrix => + dense.values.map(x => math.pow(x, 2)).sum + } }.reduce(_ + _)) } @@ -177,21 +181,21 @@ class BlockMatrix( this } - /** Collect the distributed matrix on the driver. */ + /** Collect the distributed matrix on the driver as a local matrix. */ def toLocalMatrix(): Matrix = { val parts = rdd.collect().sortBy(x => (x._1._2, x._1._1)) val nRows = numRows().toInt val nCols = numCols().toInt val values = new Array[Double](nRows * nCols) - var rowStart = 0 - var colStart = 0 + parts.foreach { part => - if (part._1._1 == 0) rowStart = 0 + val rowOffset = part._1._1 * partitioner.rowPerBlock + val colOffset = part._1._2 * partitioner.colPerBlock val block = part._2 var j = 0 while (j < block.numCols) { var i = 0 - val indStart = (j + colStart) * nRows + rowStart + val indStart = (j + colOffset) * nRows + rowOffset val indEnd = block.numRows val matStart = j * block.numRows val mat = block.toArray @@ -201,8 +205,6 @@ class BlockMatrix( } j += 1 } - rowStart += block.numRows - if (part._1._1 == numRowBlocks - 1) colStart += block.numCols } new DenseMatrix(nRows, nCols, values) } diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrixSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrixSuite.scala index 45152f473c16d..f299021268ddb 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrixSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrixSuite.scala @@ -41,26 +41,26 @@ class BlockMatrixSuite extends FunSuite with MLlibTestSparkContext { val entries: Seq[SubMatrix] = Seq( new SubMatrix((0, 0), new DenseMatrix(2, 2, Array(1.0, 0.0, 0.0, 2.0))), new SubMatrix((0, 1), new DenseMatrix(2, 2, Array(0.0, 1.0, 0.0, 0.0))), - new SubMatrix((1, 0), new DenseMatrix(2, 2, Array(3.0, 0.0, 1.5, 0.0))), - new SubMatrix((1, 1), new DenseMatrix(2, 2, Array(1.0, 4.0, 0.0, 1.0))), - new SubMatrix((2, 0), new DenseMatrix(1, 2, Array(1.0, 0.0))), + new SubMatrix((1, 0), new DenseMatrix(2, 2, Array(3.0, 0.0, 1.0, 1.0))), + new SubMatrix((1, 1), new DenseMatrix(2, 2, Array(1.0, 2.0, 0.0, 1.0))), new SubMatrix((2, 1), new DenseMatrix(1, 2, Array(1.0, 5.0)))) gridBasedMat = new BlockMatrix(numRowBlocks, numColBlocks, sc.parallelize(entries, 2)) } - test("size") { + test("size and frobenius norm") { assert(gridBasedMat.numRows() === m) assert(gridBasedMat.numCols() === n) + assert(gridBasedMat.normFro() === 7.0) } test("toBreeze and toLocalMatrix") { val expected = BDM( (1.0, 0.0, 0.0, 0.0), (0.0, 2.0, 1.0, 0.0), - (3.0, 1.5, 1.0, 0.0), - (0.0, 0.0, 4.0, 1.0), - (1.0, 0.0, 1.0, 5.0)) + (3.0, 1.0, 1.0, 0.0), + (0.0, 1.0, 2.0, 1.0), + (0.0, 0.0, 1.0, 5.0)) val dense = Matrices.fromBreeze(expected).asInstanceOf[DenseMatrix] assert(gridBasedMat.toBreeze() === expected)