Skip to content

Commit

Permalink
update grid partitioner
Browse files Browse the repository at this point in the history
  • Loading branch information
mengxr committed Jan 28, 2015
1 parent 5eecd48 commit 24ec7b8
Show file tree
Hide file tree
Showing 2 changed files with 109 additions and 122 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,125 +18,88 @@
package org.apache.spark.mllib.linalg.distributed

import breeze.linalg.{DenseMatrix => BDM}
import org.apache.spark.util.Utils

import org.apache.spark.{Logging, Partitioner}
import org.apache.spark.mllib.linalg._
import org.apache.spark.mllib.rdd.RDDFunctions._
import org.apache.spark.mllib.linalg.{DenseMatrix, Matrix}
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel

/**
* A grid partitioner, which stores every block in a separate partition.
* A grid partitioner, which uses a regular grid to partition coordinates.
*
* @param numRowBlocks Number of blocks that form the rows of the matrix.
* @param numColBlocks Number of blocks that form the columns of the matrix.
* @param suggestedNumPartitions Number of partitions to partition the rdd into. The final number
* of partitions will be set to `min(suggestedNumPartitions,
* numRowBlocks * numColBlocks)`, because setting the number of
* partitions greater than the number of sub matrices is not useful.
* @param rows Number of rows.
* @param cols Number of columns.
* @param rowsPerPart Number of rows per partition, which may be less at the bottom edge.
* @param colsPerPart Number of columns per partition, which may be less at the right edge.
*/
private[mllib] class GridPartitioner(
val numRowBlocks: Int,
val numColBlocks: Int,
suggestedNumPartitions: Int) extends Partitioner {
private val totalBlocks = numRowBlocks.toLong * numColBlocks
// Having the number of partitions greater than the number of sub matrices does not help
override val numPartitions = math.min(suggestedNumPartitions, totalBlocks).toInt

private val blockLengthsPerPartition = findOptimalBlockLengths
// Number of neighboring blocks to take in each row
private val numRowBlocksPerPartition = blockLengthsPerPartition._1
// Number of neighboring blocks to take in each column
private val numColBlocksPerPartition = blockLengthsPerPartition._2
// Number of rows of partitions
private val blocksPerRow = math.ceil(numRowBlocks * 1.0 / numRowBlocksPerPartition).toInt
val rows: Int,
val cols: Int,
val rowsPerPart: Int,
val colsPerPart: Int) extends Partitioner {

require(rows > 0)
require(cols > 0)
require(rowsPerPart > 0)
require(colsPerPart > 0)

private val rowPartitions = math.ceil(rows / rowsPerPart).toInt
private val colPartitions = math.ceil(cols / colsPerPart).toInt

override val numPartitions = rowPartitions * colPartitions

/**
* Returns the index of the partition the SubMatrix belongs to. Tries to achieve block wise
* partitioning.
* Returns the index of the partition the input coordinate belongs to.
*
* @param key The key for the SubMatrix. Can be its position in the grid (its column major index)
* or a tuple of three integers that are the final row index after the multiplication,
* the index of the block to multiply with, and the final column index after the
* @param key The coordinate (i, j) or a tuple (i, j, k), where k is the inner index used in
* multiplication.
* @return The index of the partition, which the SubMatrix belongs to.
* @return The index of the partition, which the coordinate belongs to.
*/
override def getPartition(key: Any): Int = {
key match {
case (blockRowIndex: Int, blockColIndex: Int) =>
getPartitionId(blockRowIndex, blockColIndex)
case (blockRowIndex: Int, innerIndex: Int, blockColIndex: Int) =>
getPartitionId(blockRowIndex, blockColIndex)
case (i: Int, j: Int) =>
getPartitionId(i, j)
case (i: Int, j: Int, _) =>
getPartitionId(i, j)
case _ =>
throw new IllegalArgumentException(s"Unrecognized key. key: $key")
throw new IllegalArgumentException(s"Unrecognized key: $key")
}
}

/** Partitions sub-matrices as blocks with neighboring sub-matrices. */
private def getPartitionId(blockRowIndex: Int, blockColIndex: Int): Int = {
require(0 <= blockRowIndex && blockRowIndex < numRowBlocks, "The blockRowIndex in the key " +
s"must be in the range 0 <= blockRowIndex < numRowBlocks. blockRowIndex: $blockRowIndex," +
s"numRowBlocks: $numRowBlocks")
require(0 <= blockRowIndex && blockColIndex < numColBlocks, "The blockColIndex in the key " +
s"must be in the range 0 <= blockRowIndex < numColBlocks. blockColIndex: $blockColIndex, " +
s"numColBlocks: $numColBlocks")
// Coordinates of the block
val i = blockRowIndex / numRowBlocksPerPartition
val j = blockColIndex / numColBlocksPerPartition
// The mod shouldn't be required but is added as a guarantee for possible corner cases
Utils.nonNegativeMod(j * blocksPerRow + i, numPartitions)
}

/** Tries to calculate the optimal number of blocks that should be in each partition. */
private def findOptimalBlockLengths: (Int, Int) = {
// Gives the optimal number of blocks that need to be in each partition
val targetNumBlocksPerPartition = math.ceil(totalBlocks * 1.0 / numPartitions).toInt
// Number of neighboring blocks to take in each row
var m = math.ceil(math.sqrt(targetNumBlocksPerPartition)).toInt
// Number of neighboring blocks to take in each column
var n = math.ceil(targetNumBlocksPerPartition * 1.0 / m).toInt
// Try to make m and n close to each other while making sure that we don't exceed the number
// of partitions
var numBlocksForRows = math.ceil(numRowBlocks * 1.0 / m)
var numBlocksForCols = math.ceil(numColBlocks * 1.0 / n)
while ((numBlocksForRows * numBlocksForCols > numPartitions) && (m * n != 0)) {
if (numRowBlocks <= numColBlocks) {
m += 1
n = math.ceil(targetNumBlocksPerPartition * 1.0 / m).toInt
} else {
n += 1
m = math.ceil(targetNumBlocksPerPartition * 1.0 / n).toInt
}
numBlocksForRows = math.ceil(numRowBlocks * 1.0 / m)
numBlocksForCols = math.ceil(numColBlocks * 1.0 / n)
}
// If a good partitioning scheme couldn't be found, set the side with the smaller dimension to
// 1 and the other to the number of targetNumBlocksPerPartition
if (m * n == 0) {
if (numRowBlocks <= numColBlocks) {
m = 1
n = targetNumBlocksPerPartition
} else {
n = 1
m = targetNumBlocksPerPartition
}
}
(m, n)
private def getPartitionId(i: Int, j: Int): Int = {
require(0 <= i && i < rows, s"Row index $i out of range [0, $rows).")
require(0 <= j && j < cols, s"Column index $j out of range [0, $cols).")
i / rowsPerPart + j / colsPerPart * rowPartitions
}

/** Checks whether the partitioners have the same characteristics */
override def equals(obj: Any): Boolean = {
obj match {
case r: GridPartitioner =>
(this.numRowBlocks == r.numRowBlocks) && (this.numColBlocks == r.numColBlocks) &&
(this.numPartitions == r.numPartitions)
(this.rows == r.rows) && (this.cols == r.cols) &&
(this.rowsPerPart == r.rowsPerPart) && (this.colsPerPart == r.colsPerPart)
case _ =>
false
}
}
}

private[mllib] object GridPartitioner {

def apply(rows: Int, cols: Int, rowsPerPart: Int, colsPerPart: Int): GridPartitioner = {
new GridPartitioner(rows, cols, rowsPerPart, colsPerPart)
}

def apply(rows: Int, cols: Int, suggestedNumPartitions: Int): GridPartitioner = {
require(suggestedNumPartitions > 0)
val scale = 1.0 / math.sqrt(suggestedNumPartitions)
val rowsPerPart = math.round(math.max(scale * rows, 1.0)).toInt
val colsPerPart = math.round(math.max(scale * cols, 1.0)).toInt
new GridPartitioner(rows, cols, rowsPerPart, colsPerPart)
}
}

/**
* Represents a distributed matrix in blocks of local matrices.
*
Expand Down Expand Up @@ -191,7 +154,7 @@ class BlockMatrix(
val numColBlocks = math.ceil(numCols() * 1.0 / colsPerBlock).toInt

private[mllib] var partitioner: GridPartitioner =
new GridPartitioner(numRowBlocks, numColBlocks, rdd.partitions.length)
GridPartitioner(numRowBlocks, numColBlocks, suggestedNumPartitions = rdd.partitions.size)

/** Returns the dimensions of the matrix. */
private def getDim: (Long, Long) = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@

package org.apache.spark.mllib.linalg.distributed

import org.scalatest.FunSuite
import scala.util.Random

import breeze.linalg.{DenseMatrix => BDM}
import org.scalatest.FunSuite

import org.apache.spark.mllib.linalg.{DenseMatrix, Matrices, Matrix}
import org.apache.spark.mllib.util.MLlibTestSparkContext
Expand Down Expand Up @@ -51,50 +53,72 @@ class BlockMatrixSuite extends FunSuite with MLlibTestSparkContext {
assert(gridBasedMat.numCols() === n)
}

test("grid partitioner partitioning") {
val partitioner = gridBasedMat.partitioner
assert(partitioner.getPartition((0, 0)) === 0)
assert(partitioner.getPartition((0, 1)) === 0)
assert(partitioner.getPartition((1, 0)) === 1)
assert(partitioner.getPartition((1, 1)) === 1)
assert(partitioner.getPartition((2, 0)) === 2)
assert(partitioner.getPartition((2, 1)) === 2)
assert(partitioner.getPartition((1, 0, 1)) === 1)
assert(partitioner.getPartition((2, 0, 0)) === 2)

val part2 = new GridPartitioner(10, 20, 10)
assert(part2.getPartition((0, 0)) === 0)
assert(part2.getPartition((0, 1)) === 0)
assert(part2.getPartition((0, 6)) === 2)
assert(part2.getPartition((3, 7)) === 2)
assert(part2.getPartition((3, 8)) === 4)
assert(part2.getPartition((3, 13)) === 6)
assert(part2.getPartition((9, 14)) === 7)
assert(part2.getPartition((9, 15)) === 7)
assert(part2.getPartition((9, 19)) === 9)
test("grid partitioner") {
val random = new Random()
// This should generate a 4x4 grid of 1x2 blocks.
val part0 = GridPartitioner(4, 7, suggestedNumPartitions = 12)
val expected0 = Array(
Array(0, 0, 4, 4, 8, 8, 12),
Array(1, 1, 5, 5, 9, 9, 13),
Array(2, 2, 6, 6, 10, 10, 14),
Array(3, 3, 7, 7, 11, 11, 15))
for (i <- 0 until 4; j <- 0 until 7) {
assert(part0.getPartition((i, j)) === expected0(i)(j))
assert(part0.getPartition((i, j, random.nextInt())) === expected0(i)(j))
}

intercept[IllegalArgumentException] {
part0.getPartition((-1, 0))
}

intercept[IllegalArgumentException] {
part0.getPartition((4, 0))
}

intercept[IllegalArgumentException] {
part0.getPartition((0, -1))
}

intercept[IllegalArgumentException] {
part2.getPartition((-1, 0))
part0.getPartition((0, 7))
}

val part1 = GridPartitioner(2, 2, suggestedNumPartitions = 5)
val expected1 = Array(
Array(0, 2),
Array(1, 3))
for (i <- 0 until 2; j <- 0 until 2) {
assert(part1.getPartition((i, j)) === expected1(i)(j))
assert(part1.getPartition((i, j, random.nextInt())) === expected1(i)(j))
}

val part2 = GridPartitioner(2, 2, suggestedNumPartitions = 5)
assert(part0 !== part2)
assert(part1 === part2)

val part3 = new GridPartitioner(2, 3, rowsPerPart = 1, colsPerPart = 2)
val expected3 = Array(
Array(0, 0, 2),
Array(1, 1, 3))
for (i <- 0 until 2; j <- 0 until 3) {
assert(part3.getPartition((i, j)) === expected3(i)(j))
assert(part3.getPartition((i, j, random.nextInt())) === expected3(i)(j))
}

val part4 = GridPartitioner(2, 3, rowsPerPart = 1, colsPerPart = 2)
assert(part3 === part4)

intercept[IllegalArgumentException] {
part2.getPartition((10, 0))
new GridPartitioner(2, 2, rowsPerPart = 0, colsPerPart = 1)
}

intercept[IllegalArgumentException] {
part2.getPartition((9, 20))
GridPartitioner(2, 2, rowsPerPart = 1, colsPerPart = 0)
}

val part3 = new GridPartitioner(20, 10, 10)
assert(part3.getPartition((0, 0)) === 0)
assert(part3.getPartition((1, 0)) === 0)
assert(part3.getPartition((6, 0)) === 1)
assert(part3.getPartition((7, 3)) === 1)
assert(part3.getPartition((8, 3)) === 2)
assert(part3.getPartition((13, 3)) === 3)
assert(part3.getPartition((14, 9)) === 8)
assert(part3.getPartition((15, 9)) === 8)
assert(part3.getPartition((19, 9)) === 9)
intercept[IllegalArgumentException] {
GridPartitioner(2, 2, suggestedNumPartitions = 0)
}
}

test("toBreeze and toLocalMatrix") {
Expand Down

0 comments on commit 24ec7b8

Please sign in to comment.