Skip to content

Commit

Permalink
[SPARK-3974][MLlib] Distributed Block Matrix Abstractions
Browse files Browse the repository at this point in the history
This pull request includes the abstractions for the distributed BlockMatrix representation.
`BlockMatrix` will allow users to store very large matrices in small blocks of local matrices. Specific partitioners, such as `RowBasedPartitioner` and `ColumnBasedPartitioner`, are implemented in order to optimize addition and multiplication operations that will be added in a following PR.

This work is based on the ml-matrix repo developed at the AMPLab at UC Berkeley, CA.
https://github.com/amplab/ml-matrix

Additional thanks to rezazadeh, shivaram, and mengxr for guidance on the design.

Author: Burak Yavuz <[email protected]>
Author: Xiangrui Meng <[email protected]>
Author: Burak Yavuz <[email protected]>
Author: Burak Yavuz <[email protected]>
Author: Burak Yavuz <[email protected]>

Closes apache#3200 from brkyvz/SPARK-3974 and squashes the following commits:

a8eace2 [Burak Yavuz] Merge pull request #2 from mengxr/brkyvz-SPARK-3974
feb32a7 [Xiangrui Meng] update tests
e1d3ee8 [Xiangrui Meng] minor updates
24ec7b8 [Xiangrui Meng] update grid partitioner
5eecd48 [Burak Yavuz] fixed gridPartitioner and added tests
140f20e [Burak Yavuz] Merge branch 'master' of github.com:apache/spark into SPARK-3974
1694c9e [Burak Yavuz] almost finished addressing comments
f9d664b [Burak Yavuz] updated API and modified partitioning scheme
eebbdf7 [Burak Yavuz] preliminary changes addressing code review
1a63b20 [Burak Yavuz] [SPARK-3974] Remove setPartition method. Isn't required
1e8bb2a [Burak Yavuz] [SPARK-3974] Change return type of cache and persist
239ab4b [Burak Yavuz] [SPARK-3974] Addressed @jkbradley's comments
ba414d2 [Burak Yavuz] [SPARK-3974] fixed frobenius norm
ab6cde0 [Burak Yavuz] [SPARK-3974] Modifications cleaning code up, making size calculation more robust
9ae85aa [Burak Yavuz] [SPARK-3974] Made partitioner a variable inside BlockMatrix instead of a constructor variable
d033861 [Burak Yavuz] [SPARK-3974] Removed SubMatrixInfo and added constructor without partitioner
49b9586 [Burak Yavuz] [SPARK-3974] Updated testing utils from master
645afbe [Burak Yavuz] [SPARK-3974] Pull latest master
b05aabb [Burak Yavuz] [SPARK-3974] Updated tests to reflect changes
19c17e8 [Burak Yavuz] [SPARK-3974] Changed blockIdRow and blockIdCol
589fbb6 [Burak Yavuz] [SPARK-3974] Code review feedback addressed
aa8f086 [Burak Yavuz] [SPARK-3974] Additional comments added
f378e16 [Burak Yavuz] [SPARK-3974] Block Matrix Abstractions ready
b693209 [Burak Yavuz] Ready for Pull request
  • Loading branch information
brkyvz authored and mengxr committed Jan 28, 2015
1 parent 622ff09 commit eeb53bf
Show file tree
Hide file tree
Showing 2 changed files with 351 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,216 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.mllib.linalg.distributed

import breeze.linalg.{DenseMatrix => BDM}

import org.apache.spark.{Logging, Partitioner}
import org.apache.spark.mllib.linalg.{DenseMatrix, Matrix}
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel

/**
* A grid partitioner, which uses a regular grid to partition coordinates.
*
* @param rows Number of rows.
* @param cols Number of columns.
* @param rowsPerPart Number of rows per partition, which may be less at the bottom edge.
* @param colsPerPart Number of columns per partition, which may be less at the right edge.
*/
private[mllib] class GridPartitioner(
val rows: Int,
val cols: Int,
val rowsPerPart: Int,
val colsPerPart: Int) extends Partitioner {

require(rows > 0)
require(cols > 0)
require(rowsPerPart > 0)
require(colsPerPart > 0)

private val rowPartitions = math.ceil(rows / rowsPerPart).toInt
private val colPartitions = math.ceil(cols / colsPerPart).toInt

override val numPartitions = rowPartitions * colPartitions

/**
* Returns the index of the partition the input coordinate belongs to.
*
* @param key The coordinate (i, j) or a tuple (i, j, k), where k is the inner index used in
* multiplication. k is ignored in computing partitions.
* @return The index of the partition, which the coordinate belongs to.
*/
override def getPartition(key: Any): Int = {
key match {
case (i: Int, j: Int) =>
getPartitionId(i, j)
case (i: Int, j: Int, _: Int) =>
getPartitionId(i, j)
case _ =>
throw new IllegalArgumentException(s"Unrecognized key: $key.")
}
}

/** Partitions sub-matrices as blocks with neighboring sub-matrices. */
private def getPartitionId(i: Int, j: Int): Int = {
require(0 <= i && i < rows, s"Row index $i out of range [0, $rows).")
require(0 <= j && j < cols, s"Column index $j out of range [0, $cols).")
i / rowsPerPart + j / colsPerPart * rowPartitions
}

override def equals(obj: Any): Boolean = {
obj match {
case r: GridPartitioner =>
(this.rows == r.rows) && (this.cols == r.cols) &&
(this.rowsPerPart == r.rowsPerPart) && (this.colsPerPart == r.colsPerPart)
case _ =>
false
}
}
}

private[mllib] object GridPartitioner {

/** Creates a new [[GridPartitioner]] instance. */
def apply(rows: Int, cols: Int, rowsPerPart: Int, colsPerPart: Int): GridPartitioner = {
new GridPartitioner(rows, cols, rowsPerPart, colsPerPart)
}

/** Creates a new [[GridPartitioner]] instance with the input suggested number of partitions. */
def apply(rows: Int, cols: Int, suggestedNumPartitions: Int): GridPartitioner = {
require(suggestedNumPartitions > 0)
val scale = 1.0 / math.sqrt(suggestedNumPartitions)
val rowsPerPart = math.round(math.max(scale * rows, 1.0)).toInt
val colsPerPart = math.round(math.max(scale * cols, 1.0)).toInt
new GridPartitioner(rows, cols, rowsPerPart, colsPerPart)
}
}

/**
* Represents a distributed matrix in blocks of local matrices.
*
* @param blocks The RDD of sub-matrix blocks (blockRowIndex, blockColIndex, sub-matrix) that form
* this distributed matrix.
* @param rowsPerBlock Number of rows that make up each block. The blocks forming the final
* rows are not required to have the given number of rows
* @param colsPerBlock Number of columns that make up each block. The blocks forming the final
* columns are not required to have the given number of columns
* @param nRows Number of rows of this matrix. If the supplied value is less than or equal to zero,
* the number of rows will be calculated when `numRows` is invoked.
* @param nCols Number of columns of this matrix. If the supplied value is less than or equal to
* zero, the number of columns will be calculated when `numCols` is invoked.
*/
class BlockMatrix(
val blocks: RDD[((Int, Int), Matrix)],
val rowsPerBlock: Int,
val colsPerBlock: Int,
private var nRows: Long,
private var nCols: Long) extends DistributedMatrix with Logging {

private type MatrixBlock = ((Int, Int), Matrix) // ((blockRowIndex, blockColIndex), sub-matrix)

/**
* Alternate constructor for BlockMatrix without the input of the number of rows and columns.
*
* @param rdd The RDD of SubMatrices (local matrices) that form this matrix
* @param rowsPerBlock Number of rows that make up each block. The blocks forming the final
* rows are not required to have the given number of rows
* @param colsPerBlock Number of columns that make up each block. The blocks forming the final
* columns are not required to have the given number of columns
*/
def this(
rdd: RDD[((Int, Int), Matrix)],
rowsPerBlock: Int,
colsPerBlock: Int) = {
this(rdd, rowsPerBlock, colsPerBlock, 0L, 0L)
}

override def numRows(): Long = {
if (nRows <= 0L) estimateDim()
nRows
}

override def numCols(): Long = {
if (nCols <= 0L) estimateDim()
nCols
}

val numRowBlocks = math.ceil(numRows() * 1.0 / rowsPerBlock).toInt
val numColBlocks = math.ceil(numCols() * 1.0 / colsPerBlock).toInt

private[mllib] var partitioner: GridPartitioner =
GridPartitioner(numRowBlocks, numColBlocks, suggestedNumPartitions = blocks.partitions.size)

/** Estimates the dimensions of the matrix. */
private def estimateDim(): Unit = {
val (rows, cols) = blocks.map { case ((blockRowIndex, blockColIndex), mat) =>
(blockRowIndex.toLong * rowsPerBlock + mat.numRows,
blockColIndex.toLong * colsPerBlock + mat.numCols)
}.reduce { (x0, x1) =>
(math.max(x0._1, x1._1), math.max(x0._2, x1._2))
}
if (nRows <= 0L) nRows = rows
assert(rows <= nRows, s"The number of rows $rows is more than claimed $nRows.")
if (nCols <= 0L) nCols = cols
assert(cols <= nCols, s"The number of columns $cols is more than claimed $nCols.")
}

/** Caches the underlying RDD. */
def cache(): this.type = {
blocks.cache()
this
}

/** Persists the underlying RDD with the specified storage level. */
def persist(storageLevel: StorageLevel): this.type = {
blocks.persist(storageLevel)
this
}

/** Collect the distributed matrix on the driver as a `DenseMatrix`. */
def toLocalMatrix(): Matrix = {
require(numRows() < Int.MaxValue, "The number of rows of this matrix should be less than " +
s"Int.MaxValue. Currently numRows: ${numRows()}")
require(numCols() < Int.MaxValue, "The number of columns of this matrix should be less than " +
s"Int.MaxValue. Currently numCols: ${numCols()}")
require(numRows() * numCols() < Int.MaxValue, "The length of the values array must be " +
s"less than Int.MaxValue. Currently numRows * numCols: ${numRows() * numCols()}")
val m = numRows().toInt
val n = numCols().toInt
val mem = m * n / 125000
if (mem > 500) logWarning(s"Storing this matrix will require $mem MB of memory!")

val localBlocks = blocks.collect()
val values = new Array[Double](m * n)
localBlocks.foreach { case ((blockRowIndex, blockColIndex), submat) =>
val rowOffset = blockRowIndex * rowsPerBlock
val colOffset = blockColIndex * colsPerBlock
submat.foreachActive { (i, j, v) =>
val indexOffset = (j + colOffset) * m + rowOffset + i
values(indexOffset) = v
}
}
new DenseMatrix(m, n, values)
}

/** Collects data and assembles a local dense breeze matrix (for test only). */
private[mllib] def toBreeze(): BDM[Double] = {
val localMat = toLocalMatrix()
new BDM[Double](localMat.numRows, localMat.numCols, localMat.toArray)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.mllib.linalg.distributed

import scala.util.Random

import breeze.linalg.{DenseMatrix => BDM}
import org.scalatest.FunSuite

import org.apache.spark.mllib.linalg.{DenseMatrix, Matrices, Matrix}
import org.apache.spark.mllib.util.MLlibTestSparkContext

class BlockMatrixSuite extends FunSuite with MLlibTestSparkContext {

val m = 5
val n = 4
val rowPerPart = 2
val colPerPart = 2
val numPartitions = 3
var gridBasedMat: BlockMatrix = _

override def beforeAll() {
super.beforeAll()

val blocks: Seq[((Int, Int), Matrix)] = Seq(
((0, 0), new DenseMatrix(2, 2, Array(1.0, 0.0, 0.0, 2.0))),
((0, 1), new DenseMatrix(2, 2, Array(0.0, 1.0, 0.0, 0.0))),
((1, 0), new DenseMatrix(2, 2, Array(3.0, 0.0, 1.0, 1.0))),
((1, 1), new DenseMatrix(2, 2, Array(1.0, 2.0, 0.0, 1.0))),
((2, 1), new DenseMatrix(1, 2, Array(1.0, 5.0))))

gridBasedMat = new BlockMatrix(sc.parallelize(blocks, numPartitions), rowPerPart, colPerPart)
}

test("size") {
assert(gridBasedMat.numRows() === m)
assert(gridBasedMat.numCols() === n)
}

test("grid partitioner") {
val random = new Random()
// This should generate a 4x4 grid of 1x2 blocks.
val part0 = GridPartitioner(4, 7, suggestedNumPartitions = 12)
val expected0 = Array(
Array(0, 0, 4, 4, 8, 8, 12),
Array(1, 1, 5, 5, 9, 9, 13),
Array(2, 2, 6, 6, 10, 10, 14),
Array(3, 3, 7, 7, 11, 11, 15))
for (i <- 0 until 4; j <- 0 until 7) {
assert(part0.getPartition((i, j)) === expected0(i)(j))
assert(part0.getPartition((i, j, random.nextInt())) === expected0(i)(j))
}

intercept[IllegalArgumentException] {
part0.getPartition((-1, 0))
}

intercept[IllegalArgumentException] {
part0.getPartition((4, 0))
}

intercept[IllegalArgumentException] {
part0.getPartition((0, -1))
}

intercept[IllegalArgumentException] {
part0.getPartition((0, 7))
}

val part1 = GridPartitioner(2, 2, suggestedNumPartitions = 5)
val expected1 = Array(
Array(0, 2),
Array(1, 3))
for (i <- 0 until 2; j <- 0 until 2) {
assert(part1.getPartition((i, j)) === expected1(i)(j))
assert(part1.getPartition((i, j, random.nextInt())) === expected1(i)(j))
}

val part2 = GridPartitioner(2, 2, suggestedNumPartitions = 5)
assert(part0 !== part2)
assert(part1 === part2)

val part3 = new GridPartitioner(2, 3, rowsPerPart = 1, colsPerPart = 2)
val expected3 = Array(
Array(0, 0, 2),
Array(1, 1, 3))
for (i <- 0 until 2; j <- 0 until 3) {
assert(part3.getPartition((i, j)) === expected3(i)(j))
assert(part3.getPartition((i, j, random.nextInt())) === expected3(i)(j))
}

val part4 = GridPartitioner(2, 3, rowsPerPart = 1, colsPerPart = 2)
assert(part3 === part4)

intercept[IllegalArgumentException] {
new GridPartitioner(2, 2, rowsPerPart = 0, colsPerPart = 1)
}

intercept[IllegalArgumentException] {
GridPartitioner(2, 2, rowsPerPart = 1, colsPerPart = 0)
}

intercept[IllegalArgumentException] {
GridPartitioner(2, 2, suggestedNumPartitions = 0)
}
}

test("toBreeze and toLocalMatrix") {
val expected = BDM(
(1.0, 0.0, 0.0, 0.0),
(0.0, 2.0, 1.0, 0.0),
(3.0, 1.0, 1.0, 0.0),
(0.0, 1.0, 2.0, 1.0),
(0.0, 0.0, 1.0, 5.0))

val dense = Matrices.fromBreeze(expected).asInstanceOf[DenseMatrix]
assert(gridBasedMat.toLocalMatrix() === dense)
assert(gridBasedMat.toBreeze() === expected)
}
}

0 comments on commit eeb53bf

Please sign in to comment.