diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala new file mode 100644 index 0000000000000..0ab74ba294535 --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala @@ -0,0 +1,216 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.linalg.distributed + +import breeze.linalg.{DenseMatrix => BDM} + +import org.apache.spark.{Logging, Partitioner} +import org.apache.spark.mllib.linalg.{DenseMatrix, Matrix} +import org.apache.spark.rdd.RDD +import org.apache.spark.storage.StorageLevel + +/** + * A grid partitioner, which uses a regular grid to partition coordinates. + * + * @param rows Number of rows. + * @param cols Number of columns. + * @param rowsPerPart Number of rows per partition, which may be less at the bottom edge. + * @param colsPerPart Number of columns per partition, which may be less at the right edge. + */ +private[mllib] class GridPartitioner( + val rows: Int, + val cols: Int, + val rowsPerPart: Int, + val colsPerPart: Int) extends Partitioner { + + require(rows > 0) + require(cols > 0) + require(rowsPerPart > 0) + require(colsPerPart > 0) + + private val rowPartitions = math.ceil(rows / rowsPerPart).toInt + private val colPartitions = math.ceil(cols / colsPerPart).toInt + + override val numPartitions = rowPartitions * colPartitions + + /** + * Returns the index of the partition the input coordinate belongs to. + * + * @param key The coordinate (i, j) or a tuple (i, j, k), where k is the inner index used in + * multiplication. k is ignored in computing partitions. + * @return The index of the partition, which the coordinate belongs to. + */ + override def getPartition(key: Any): Int = { + key match { + case (i: Int, j: Int) => + getPartitionId(i, j) + case (i: Int, j: Int, _: Int) => + getPartitionId(i, j) + case _ => + throw new IllegalArgumentException(s"Unrecognized key: $key.") + } + } + + /** Partitions sub-matrices as blocks with neighboring sub-matrices. */ + private def getPartitionId(i: Int, j: Int): Int = { + require(0 <= i && i < rows, s"Row index $i out of range [0, $rows).") + require(0 <= j && j < cols, s"Column index $j out of range [0, $cols).") + i / rowsPerPart + j / colsPerPart * rowPartitions + } + + override def equals(obj: Any): Boolean = { + obj match { + case r: GridPartitioner => + (this.rows == r.rows) && (this.cols == r.cols) && + (this.rowsPerPart == r.rowsPerPart) && (this.colsPerPart == r.colsPerPart) + case _ => + false + } + } +} + +private[mllib] object GridPartitioner { + + /** Creates a new [[GridPartitioner]] instance. */ + def apply(rows: Int, cols: Int, rowsPerPart: Int, colsPerPart: Int): GridPartitioner = { + new GridPartitioner(rows, cols, rowsPerPart, colsPerPart) + } + + /** Creates a new [[GridPartitioner]] instance with the input suggested number of partitions. */ + def apply(rows: Int, cols: Int, suggestedNumPartitions: Int): GridPartitioner = { + require(suggestedNumPartitions > 0) + val scale = 1.0 / math.sqrt(suggestedNumPartitions) + val rowsPerPart = math.round(math.max(scale * rows, 1.0)).toInt + val colsPerPart = math.round(math.max(scale * cols, 1.0)).toInt + new GridPartitioner(rows, cols, rowsPerPart, colsPerPart) + } +} + +/** + * Represents a distributed matrix in blocks of local matrices. + * + * @param blocks The RDD of sub-matrix blocks (blockRowIndex, blockColIndex, sub-matrix) that form + * this distributed matrix. + * @param rowsPerBlock Number of rows that make up each block. The blocks forming the final + * rows are not required to have the given number of rows + * @param colsPerBlock Number of columns that make up each block. The blocks forming the final + * columns are not required to have the given number of columns + * @param nRows Number of rows of this matrix. If the supplied value is less than or equal to zero, + * the number of rows will be calculated when `numRows` is invoked. + * @param nCols Number of columns of this matrix. If the supplied value is less than or equal to + * zero, the number of columns will be calculated when `numCols` is invoked. + */ +class BlockMatrix( + val blocks: RDD[((Int, Int), Matrix)], + val rowsPerBlock: Int, + val colsPerBlock: Int, + private var nRows: Long, + private var nCols: Long) extends DistributedMatrix with Logging { + + private type MatrixBlock = ((Int, Int), Matrix) // ((blockRowIndex, blockColIndex), sub-matrix) + + /** + * Alternate constructor for BlockMatrix without the input of the number of rows and columns. + * + * @param rdd The RDD of SubMatrices (local matrices) that form this matrix + * @param rowsPerBlock Number of rows that make up each block. The blocks forming the final + * rows are not required to have the given number of rows + * @param colsPerBlock Number of columns that make up each block. The blocks forming the final + * columns are not required to have the given number of columns + */ + def this( + rdd: RDD[((Int, Int), Matrix)], + rowsPerBlock: Int, + colsPerBlock: Int) = { + this(rdd, rowsPerBlock, colsPerBlock, 0L, 0L) + } + + override def numRows(): Long = { + if (nRows <= 0L) estimateDim() + nRows + } + + override def numCols(): Long = { + if (nCols <= 0L) estimateDim() + nCols + } + + val numRowBlocks = math.ceil(numRows() * 1.0 / rowsPerBlock).toInt + val numColBlocks = math.ceil(numCols() * 1.0 / colsPerBlock).toInt + + private[mllib] var partitioner: GridPartitioner = + GridPartitioner(numRowBlocks, numColBlocks, suggestedNumPartitions = blocks.partitions.size) + + /** Estimates the dimensions of the matrix. */ + private def estimateDim(): Unit = { + val (rows, cols) = blocks.map { case ((blockRowIndex, blockColIndex), mat) => + (blockRowIndex.toLong * rowsPerBlock + mat.numRows, + blockColIndex.toLong * colsPerBlock + mat.numCols) + }.reduce { (x0, x1) => + (math.max(x0._1, x1._1), math.max(x0._2, x1._2)) + } + if (nRows <= 0L) nRows = rows + assert(rows <= nRows, s"The number of rows $rows is more than claimed $nRows.") + if (nCols <= 0L) nCols = cols + assert(cols <= nCols, s"The number of columns $cols is more than claimed $nCols.") + } + + /** Caches the underlying RDD. */ + def cache(): this.type = { + blocks.cache() + this + } + + /** Persists the underlying RDD with the specified storage level. */ + def persist(storageLevel: StorageLevel): this.type = { + blocks.persist(storageLevel) + this + } + + /** Collect the distributed matrix on the driver as a `DenseMatrix`. */ + def toLocalMatrix(): Matrix = { + require(numRows() < Int.MaxValue, "The number of rows of this matrix should be less than " + + s"Int.MaxValue. Currently numRows: ${numRows()}") + require(numCols() < Int.MaxValue, "The number of columns of this matrix should be less than " + + s"Int.MaxValue. Currently numCols: ${numCols()}") + require(numRows() * numCols() < Int.MaxValue, "The length of the values array must be " + + s"less than Int.MaxValue. Currently numRows * numCols: ${numRows() * numCols()}") + val m = numRows().toInt + val n = numCols().toInt + val mem = m * n / 125000 + if (mem > 500) logWarning(s"Storing this matrix will require $mem MB of memory!") + + val localBlocks = blocks.collect() + val values = new Array[Double](m * n) + localBlocks.foreach { case ((blockRowIndex, blockColIndex), submat) => + val rowOffset = blockRowIndex * rowsPerBlock + val colOffset = blockColIndex * colsPerBlock + submat.foreachActive { (i, j, v) => + val indexOffset = (j + colOffset) * m + rowOffset + i + values(indexOffset) = v + } + } + new DenseMatrix(m, n, values) + } + + /** Collects data and assembles a local dense breeze matrix (for test only). */ + private[mllib] def toBreeze(): BDM[Double] = { + val localMat = toLocalMatrix() + new BDM[Double](localMat.numRows, localMat.numCols, localMat.toArray) + } +} diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrixSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrixSuite.scala new file mode 100644 index 0000000000000..05efbc8e8d0b8 --- /dev/null +++ b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrixSuite.scala @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.linalg.distributed + +import scala.util.Random + +import breeze.linalg.{DenseMatrix => BDM} +import org.scalatest.FunSuite + +import org.apache.spark.mllib.linalg.{DenseMatrix, Matrices, Matrix} +import org.apache.spark.mllib.util.MLlibTestSparkContext + +class BlockMatrixSuite extends FunSuite with MLlibTestSparkContext { + + val m = 5 + val n = 4 + val rowPerPart = 2 + val colPerPart = 2 + val numPartitions = 3 + var gridBasedMat: BlockMatrix = _ + + override def beforeAll() { + super.beforeAll() + + val blocks: Seq[((Int, Int), Matrix)] = Seq( + ((0, 0), new DenseMatrix(2, 2, Array(1.0, 0.0, 0.0, 2.0))), + ((0, 1), new DenseMatrix(2, 2, Array(0.0, 1.0, 0.0, 0.0))), + ((1, 0), new DenseMatrix(2, 2, Array(3.0, 0.0, 1.0, 1.0))), + ((1, 1), new DenseMatrix(2, 2, Array(1.0, 2.0, 0.0, 1.0))), + ((2, 1), new DenseMatrix(1, 2, Array(1.0, 5.0)))) + + gridBasedMat = new BlockMatrix(sc.parallelize(blocks, numPartitions), rowPerPart, colPerPart) + } + + test("size") { + assert(gridBasedMat.numRows() === m) + assert(gridBasedMat.numCols() === n) + } + + test("grid partitioner") { + val random = new Random() + // This should generate a 4x4 grid of 1x2 blocks. + val part0 = GridPartitioner(4, 7, suggestedNumPartitions = 12) + val expected0 = Array( + Array(0, 0, 4, 4, 8, 8, 12), + Array(1, 1, 5, 5, 9, 9, 13), + Array(2, 2, 6, 6, 10, 10, 14), + Array(3, 3, 7, 7, 11, 11, 15)) + for (i <- 0 until 4; j <- 0 until 7) { + assert(part0.getPartition((i, j)) === expected0(i)(j)) + assert(part0.getPartition((i, j, random.nextInt())) === expected0(i)(j)) + } + + intercept[IllegalArgumentException] { + part0.getPartition((-1, 0)) + } + + intercept[IllegalArgumentException] { + part0.getPartition((4, 0)) + } + + intercept[IllegalArgumentException] { + part0.getPartition((0, -1)) + } + + intercept[IllegalArgumentException] { + part0.getPartition((0, 7)) + } + + val part1 = GridPartitioner(2, 2, suggestedNumPartitions = 5) + val expected1 = Array( + Array(0, 2), + Array(1, 3)) + for (i <- 0 until 2; j <- 0 until 2) { + assert(part1.getPartition((i, j)) === expected1(i)(j)) + assert(part1.getPartition((i, j, random.nextInt())) === expected1(i)(j)) + } + + val part2 = GridPartitioner(2, 2, suggestedNumPartitions = 5) + assert(part0 !== part2) + assert(part1 === part2) + + val part3 = new GridPartitioner(2, 3, rowsPerPart = 1, colsPerPart = 2) + val expected3 = Array( + Array(0, 0, 2), + Array(1, 1, 3)) + for (i <- 0 until 2; j <- 0 until 3) { + assert(part3.getPartition((i, j)) === expected3(i)(j)) + assert(part3.getPartition((i, j, random.nextInt())) === expected3(i)(j)) + } + + val part4 = GridPartitioner(2, 3, rowsPerPart = 1, colsPerPart = 2) + assert(part3 === part4) + + intercept[IllegalArgumentException] { + new GridPartitioner(2, 2, rowsPerPart = 0, colsPerPart = 1) + } + + intercept[IllegalArgumentException] { + GridPartitioner(2, 2, rowsPerPart = 1, colsPerPart = 0) + } + + intercept[IllegalArgumentException] { + GridPartitioner(2, 2, suggestedNumPartitions = 0) + } + } + + test("toBreeze and toLocalMatrix") { + val expected = BDM( + (1.0, 0.0, 0.0, 0.0), + (0.0, 2.0, 1.0, 0.0), + (3.0, 1.0, 1.0, 0.0), + (0.0, 1.0, 2.0, 1.0), + (0.0, 0.0, 1.0, 5.0)) + + val dense = Matrices.fromBreeze(expected).asInstanceOf[DenseMatrix] + assert(gridBasedMat.toLocalMatrix() === dense) + assert(gridBasedMat.toBreeze() === expected) + } +}