From ab6cde0d90b917e89f97c86ef3c84dcdc64a9b57 Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Tue, 13 Jan 2015 19:54:55 -0800 Subject: [PATCH] [SPARK-3974] Modifications cleaning code up, making size calculation more robust --- .../linalg/distributed/BlockMatrix.scala | 239 +++++------------- .../linalg/distributed/BlockMatrixSuite.scala | 54 +--- 2 files changed, 78 insertions(+), 215 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala index 7b4e61b534454..a4b2bf42390f2 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala @@ -20,97 +20,51 @@ package org.apache.spark.mllib.linalg.distributed import breeze.linalg.{DenseMatrix => BDM} import org.apache.spark._ -import org.apache.spark.mllib.linalg.DenseMatrix +import org.apache.spark.mllib.linalg._ import org.apache.spark.mllib.rdd.RDDFunctions._ import org.apache.spark.rdd.RDD import org.apache.spark.storage.StorageLevel import org.apache.spark.util.Utils /** - * Represents a local matrix that makes up one block of a distributed BlockMatrix - * - * @param blockRowIndex The row index of this block. Must be zero based. - * @param blockColIndex The column index of this block. Must be zero based. - * @param mat The underlying local matrix - */ -case class SubMatrix(blockRowIndex: Int, blockColIndex: Int, mat: DenseMatrix) extends Serializable - -/** - * A partitioner that decides how the matrix is distributed in the cluster + * A grid partitioner, which stores every block in a separate partition. * - * @param numPartitions Number of partitions + * @param numRowBlocks Number of blocks that form the rows of the matrix. + * @param numColBlocks Number of blocks that form the columns of the matrix. * @param rowPerBlock Number of rows that make up each block. * @param colPerBlock Number of columns that make up each block. */ -private[mllib] abstract class BlockMatrixPartitioner( - override val numPartitions: Int, +private[mllib] class GridPartitioner( + val numRowBlocks: Int, + val numColBlocks: Int, val rowPerBlock: Int, - val colPerBlock: Int) extends Partitioner { - val name: String + val colPerBlock: Int, + override val numPartitions: Int) extends Partitioner { /** * Returns the index of the partition the SubMatrix belongs to. * - * @param key The key for the SubMatrix. Can be its row index, column index or position in the - * grid. + * @param key The key for the SubMatrix. Can be its position in the grid (its column major index) + * or a tuple of three integers that are the final row index after the multiplication, + * the index of the block to multiply with, and the final column index after the + * multiplication. * @return The index of the partition, which the SubMatrix belongs to. */ override def getPartition(key: Any): Int = { - Utils.nonNegativeMod(key.asInstanceOf[Int], numPartitions) - } -} - -/** - * A grid partitioner, which stores every block in a separate partition. - * - * @param numRowBlocks Number of blocks that form the rows of the matrix. - * @param numColBlocks Number of blocks that form the columns of the matrix. - * @param rowPerBlock Number of rows that make up each block. - * @param colPerBlock Number of columns that make up each block. - */ -class GridPartitioner( - val numRowBlocks: Int, - val numColBlocks: Int, - override val rowPerBlock: Int, - override val colPerBlock: Int) - extends BlockMatrixPartitioner(numRowBlocks * numColBlocks, rowPerBlock, colPerBlock) { - - override val name = "grid" - - override val numPartitions = numRowBlocks * numColBlocks - - /** Checks whether the partitioners have the same characteristics */ - override def equals(obj: Any): Boolean = { - obj match { - case r: GridPartitioner => - (this.numPartitions == r.numPartitions) && (this.rowPerBlock == r.rowPerBlock) && - (this.colPerBlock == r.colPerBlock) + key match { + case ind: (Int, Int) => + Utils.nonNegativeMod(ind._1 + ind._2 * numRowBlocks, numPartitions) + case indices: (Int, Int, Int) => + Utils.nonNegativeMod(indices._1 + indices._3 * numRowBlocks, numPartitions) case _ => - false + throw new IllegalArgumentException("Unrecognized key") } } -} - -/** - * A specialized partitioner that stores all blocks in the same row in just one partition. - * - * @param numPartitions Number of partitions. Should be set as the number of blocks that form - * the rows of the matrix. - * @param rowPerBlock Number of rows that make up each block. - * @param colPerBlock Number of columns that make up each block. - */ -class RowBasedPartitioner( - override val numPartitions: Int, - override val rowPerBlock: Int, - override val colPerBlock: Int) - extends BlockMatrixPartitioner(numPartitions, rowPerBlock, colPerBlock) { - - override val name = "row" /** Checks whether the partitioners have the same characteristics */ override def equals(obj: Any): Boolean = { obj match { - case r: RowBasedPartitioner => + case r: GridPartitioner => (this.numPartitions == r.numPartitions) && (this.rowPerBlock == r.rowPerBlock) && (this.colPerBlock == r.colPerBlock) case _ => @@ -119,36 +73,6 @@ class RowBasedPartitioner( } } -/** - * A specialized partitioner that stores all blocks in the same column in just one partition. - * - * @param numPartitions Number of partitions. Should be set as the number of blocks that form - * the columns of the matrix. - * @param rowPerBlock Number of rows that make up each block. - * @param colPerBlock Number of columns that make up each block. - */ -class ColumnBasedPartitioner( - override val numPartitions: Int, - override val rowPerBlock: Int, - override val colPerBlock: Int) - extends BlockMatrixPartitioner(numPartitions, rowPerBlock, colPerBlock) { - - override val name = "column" - - /** Checks whether the partitioners have the same characteristics */ - override def equals(obj: Any): Boolean = { - obj match { - case p: ColumnBasedPartitioner => - (this.numPartitions == p.numPartitions) && (this.rowPerBlock == p.rowPerBlock) && - (this.colPerBlock == p.colPerBlock) - case r: RowBasedPartitioner => - (this.numPartitions == r.numPartitions) && (this.colPerBlock == r.rowPerBlock) - case _ => - false - } - } -} - /** * Represents a distributed matrix in blocks of local matrices. * @@ -159,7 +83,9 @@ class ColumnBasedPartitioner( class BlockMatrix( val numRowBlocks: Int, val numColBlocks: Int, - val rdd: RDD[SubMatrix]) extends DistributedMatrix with Logging { + val rdd: RDD[((Int, Int), Matrix)]) extends DistributedMatrix with Logging { + + type SubMatrix = ((Int, Int), Matrix) // ((blockRowIndex, blockColIndex), matrix) /** * Alternate constructor for BlockMatrix without the input of a partitioner. Will use a Grid @@ -168,125 +94,92 @@ class BlockMatrix( * @param numRowBlocks Number of blocks that form the rows of this matrix * @param numColBlocks Number of blocks that form the columns of this matrix * @param rdd The RDD of SubMatrices (local matrices) that form this matrix - * @param partitioner A partitioner that specifies how SubMatrices are stored in the cluster + * @param rowPerBlock Number of rows that make up each block. + * @param colPerBlock Number of columns that make up each block. */ def this( numRowBlocks: Int, numColBlocks: Int, - rdd: RDD[SubMatrix], - partitioner: BlockMatrixPartitioner) = { + rdd: RDD[((Int, Int), Matrix)], + rowPerBlock: Int, + colPerBlock: Int) = { this(numRowBlocks, numColBlocks, rdd) - setPartitioner(partitioner) + val part = new GridPartitioner(numRowBlocks, numColBlocks, rowPerBlock, colPerBlock, rdd.partitions.length) + setPartitioner(part) } - private[mllib] var partitioner: BlockMatrixPartitioner = { - val firstSubMatrix = rdd.first().mat + private[mllib] var partitioner: GridPartitioner = { + val firstSubMatrix = rdd.first()._2 new GridPartitioner(numRowBlocks, numColBlocks, - firstSubMatrix.numRows, firstSubMatrix.numCols) + firstSubMatrix.numRows, firstSubMatrix.numCols, rdd.partitions.length) } /** * Set the partitioner for the matrix. For internal use only. Users should use `repartition`. * @param part A partitioner that specifies how SubMatrices are stored in the cluster */ - private def setPartitioner(part: BlockMatrixPartitioner): Unit = { + private def setPartitioner(part: GridPartitioner): Unit = { partitioner = part } - // A key-value pair RDD is required to partition properly - private var matrixRDD: RDD[(Int, SubMatrix)] = keyBy() - private lazy val dims: (Long, Long) = getDim override def numRows(): Long = dims._1 override def numCols(): Long = dims._2 - if (partitioner.name.equals("column")) { - require(numColBlocks == partitioner.numPartitions, "The number of column blocks should match" + - s" the number of partitions of the column partitioner. numColBlocks: $numColBlocks, " + - s"partitioner.numPartitions: ${partitioner.numPartitions}") - } else if (partitioner.name.equals("row")) { - require(numRowBlocks == partitioner.numPartitions, "The number of row blocks should match" + - s" the number of partitions of the row partitioner. numRowBlocks: $numRowBlocks, " + - s"partitioner.numPartitions: ${partitioner.numPartitions}") - } else if (partitioner.name.equals("grid")) { - require(numRowBlocks * numColBlocks == partitioner.numPartitions, "The number of blocks " + - s"should match the number of partitions of the grid partitioner. numRowBlocks * " + - s"numColBlocks: ${numRowBlocks * numColBlocks}, " + - s"partitioner.numPartitions: ${partitioner.numPartitions}") - } else { - throw new IllegalArgumentException("Unrecognized partitioner.") - } - /** Returns the dimensions of the matrix. */ def getDim: (Long, Long) = { - - val firstRowColumn = rdd.filter(block => block.blockRowIndex == 0 || block.blockColIndex == 0). - map { block => - ((block.blockRowIndex, block.blockColIndex), (block.mat.numRows, block.mat.numCols)) + // picks the sizes of the matrix with the maximum indices + def pickSizeByGreaterIndex(example: (Int, Int, Int, Int), base: (Int, Int, Int, Int)): (Int, Int, Int, Int) = { + if (example._1 > base._1 && example._2 > base._2) { + (example._1, example._2, example._3, example._4) + } else if (example._1 > base._1) { + (example._1, base._2, example._3, base._4) + } else if (example._2 > base._2) { + (base._1, example._2, base._3, example._4) + } else { + (base._1, base._2, base._3, base._4) } + } - firstRowColumn.treeAggregate((0L, 0L))( - seqOp = (c, v) => (c, v) match { case ((x_dim, y_dim), ((indX, indY), (nRow, nCol))) => - if (indX == 0 && indY == 0) { - (x_dim + nRow, y_dim + nCol) - } else if (indX == 0) { - (x_dim, y_dim + nCol) - } else { - (x_dim + nRow, y_dim) - } + val lastRowCol = rdd.treeAggregate((0, 0, 0, 0))( + seqOp = (c, v) => (c, v) match { case (base, ((blockXInd, blockYInd), mat)) => + pickSizeByGreaterIndex((blockXInd, blockYInd, mat.numRows, mat.numCols), base) }, combOp = (c1, c2) => (c1, c2) match { - case ((x_dim1, y_dim1), (x_dim2, y_dim2)) => - (x_dim1 + x_dim2, y_dim1 + y_dim2) + case (res1, res2) => + pickSizeByGreaterIndex(res1, res2) }) + + (lastRowCol._1.toLong * partitioner.rowPerBlock + lastRowCol._3, + lastRowCol._2.toLong * partitioner.colPerBlock + lastRowCol._4) } /** Returns the Frobenius Norm of the matrix */ def normFro(): Double = { - math.sqrt(rdd.map(lm => lm.mat.values.map(x => math.pow(x, 2)).sum).reduce(_ + _)) + math.sqrt(rdd.map { + case sparse: ((Int, Int), SparseMatrix) => + sparse._2.values.map(x => math.pow(x, 2)).sum + case dense: ((Int, Int), DenseMatrix) => + dense._2.values.map(x => math.pow(x, 2)).sum + }.reduce(_ + _)) } /** Cache the underlying RDD. */ def cache(): DistributedMatrix = { - matrixRDD.cache() + rdd.cache() this } /** Set the storage level for the underlying RDD. */ def persist(storageLevel: StorageLevel): DistributedMatrix = { - matrixRDD.persist(storageLevel) - this - } - - /** Add a key to the underlying rdd for partitioning and joins. */ - private def keyBy(part: BlockMatrixPartitioner = partitioner): RDD[(Int, SubMatrix)] = { - rdd.map { block => - part match { - case r: RowBasedPartitioner => (block.blockRowIndex, block) - case c: ColumnBasedPartitioner => (block.blockColIndex, block) - case g: GridPartitioner => (block.blockRowIndex + numRowBlocks * block.blockColIndex, block) - case _ => throw new IllegalArgumentException("Unrecognized partitioner") - } - } - } - - /** - * Repartition the BlockMatrix using a different partitioner. - * - * @param part The partitioner to partition by - * @return The repartitioned BlockMatrix - */ - def repartition(part: BlockMatrixPartitioner): DistributedMatrix = { - matrixRDD = keyBy(part) - setPartitioner(part) + rdd.persist(storageLevel) this } /** Collect the distributed matrix on the driver. */ - def collect(): DenseMatrix = { - val parts = rdd.map(x => ((x.blockRowIndex, x.blockColIndex), x.mat)). - collect().sortBy(x => (x._1._2, x._1._1)) + def toLocalMatrix(): Matrix = { + val parts = rdd.collect().sortBy(x => (x._1._2, x._1._1)) val nRows = numRows().toInt val nCols = numCols().toInt val values = new Array[Double](nRows * nCols) @@ -301,7 +194,7 @@ class BlockMatrix( val indStart = (j + colStart) * nRows + rowStart val indEnd = block.numRows val matStart = j * block.numRows - val mat = block.values + val mat = block.toArray while (i < indEnd) { values(indStart + i) = mat(matStart + i) i += 1 @@ -316,7 +209,7 @@ class BlockMatrix( /** Collects data and assembles a local dense breeze matrix (for test only). */ private[mllib] def toBreeze(): BDM[Double] = { - val localMat = collect() - new BDM[Double](localMat.numRows, localMat.numCols, localMat.values) + val localMat = toLocalMatrix() + new BDM[Double](localMat.numRows, localMat.numCols, localMat.toArray) } } diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrixSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrixSuite.scala index 1d9ff1112ddb4..45152f473c16d 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrixSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrixSuite.scala @@ -21,7 +21,7 @@ import org.scalatest.FunSuite import breeze.linalg.{DenseMatrix => BDM} -import org.apache.spark.mllib.linalg.{DenseMatrix, Matrices} +import org.apache.spark.mllib.linalg.{DenseMatrix, Matrices, Matrix} import org.apache.spark.mllib.util.MLlibTestSparkContext class BlockMatrixSuite extends FunSuite with MLlibTestSparkContext { @@ -32,55 +32,29 @@ class BlockMatrixSuite extends FunSuite with MLlibTestSparkContext { val colPerPart = 2 val numRowBlocks = 3 val numColBlocks = 2 - var rowBasedMat: BlockMatrix = _ - var colBasedMat: BlockMatrix = _ var gridBasedMat: BlockMatrix = _ + type SubMatrix = ((Int, Int), Matrix) override def beforeAll() { super.beforeAll() - val entries: Seq[SubMatrix] = Seq( - new SubMatrix(0, 0, new DenseMatrix(2, 2, Array(1.0, 0.0, 0.0, 2.0))), - new SubMatrix(0, 1, new DenseMatrix(2, 2, Array(0.0, 1.0, 0.0, 0.0))), - new SubMatrix(1, 0, new DenseMatrix(2, 2, Array(3.0, 0.0, 1.5, 0.0))), - new SubMatrix(1, 1, new DenseMatrix(2, 2, Array(1.0, 4.0, 0.0, 1.0))), - new SubMatrix(2, 0, new DenseMatrix(1, 2, Array(1.0, 0.0))), - new SubMatrix(2, 1, new DenseMatrix(1, 2, Array(1.0, 5.0)))) - val colPart = new ColumnBasedPartitioner(numColBlocks, rowPerPart, colPerPart) - val rowPart = new RowBasedPartitioner(numRowBlocks, rowPerPart, colPerPart) + val entries: Seq[SubMatrix] = Seq( + new SubMatrix((0, 0), new DenseMatrix(2, 2, Array(1.0, 0.0, 0.0, 2.0))), + new SubMatrix((0, 1), new DenseMatrix(2, 2, Array(0.0, 1.0, 0.0, 0.0))), + new SubMatrix((1, 0), new DenseMatrix(2, 2, Array(3.0, 0.0, 1.5, 0.0))), + new SubMatrix((1, 1), new DenseMatrix(2, 2, Array(1.0, 4.0, 0.0, 1.0))), + new SubMatrix((2, 0), new DenseMatrix(1, 2, Array(1.0, 0.0))), + new SubMatrix((2, 1), new DenseMatrix(1, 2, Array(1.0, 5.0)))) - colBasedMat = - new BlockMatrix(numRowBlocks, numColBlocks, sc.parallelize(entries, numColBlocks), colPart) - rowBasedMat = - new BlockMatrix(numRowBlocks, numColBlocks, sc.parallelize(entries, numRowBlocks), rowPart) - gridBasedMat = new BlockMatrix(numRowBlocks, numColBlocks, - sc.parallelize(entries, numRowBlocks * numColBlocks)) + gridBasedMat = new BlockMatrix(numRowBlocks, numColBlocks, sc.parallelize(entries, 2)) } test("size") { - assert(colBasedMat.numRows() === m) - assert(colBasedMat.numCols() === n) - assert(rowBasedMat.numRows() === m) - assert(rowBasedMat.numCols() === n) assert(gridBasedMat.numRows() === m) assert(gridBasedMat.numCols() === n) } - test("partitioner and repartition") { - assert(colBasedMat.partitioner.name === "column") - assert(rowBasedMat.partitioner.name === "row") - assert(gridBasedMat.partitioner.name === "grid") - - val colPart = new ColumnBasedPartitioner(numColBlocks, rowPerPart, colPerPart) - val rowPart = new RowBasedPartitioner(numRowBlocks, rowPerPart, colPerPart) - gridBasedMat.repartition(rowPart).asInstanceOf[BlockMatrix] - assert(gridBasedMat.partitioner.name === "row") - - gridBasedMat.repartition(colPart).asInstanceOf[BlockMatrix] - assert(gridBasedMat.partitioner.name === "column") - } - - test("toBreeze and collect") { + test("toBreeze and toLocalMatrix") { val expected = BDM( (1.0, 0.0, 0.0, 0.0), (0.0, 2.0, 1.0, 0.0), @@ -89,11 +63,7 @@ class BlockMatrixSuite extends FunSuite with MLlibTestSparkContext { (1.0, 0.0, 1.0, 5.0)) val dense = Matrices.fromBreeze(expected).asInstanceOf[DenseMatrix] - assert(colBasedMat.toBreeze() === expected) - assert(rowBasedMat.toBreeze() === expected) assert(gridBasedMat.toBreeze() === expected) - assert(colBasedMat.collect() === dense) - assert(rowBasedMat.collect() === dense) - assert(gridBasedMat.collect() === dense) + assert(gridBasedMat.toLocalMatrix() === dense) } }