Skip to content

Commit

Permalink
compress pools and update tests
Browse files Browse the repository at this point in the history
  • Loading branch information
mengxr committed Jan 30, 2015
1 parent 35d044e commit 0b35c15
Show file tree
Hide file tree
Showing 2 changed files with 102 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import java.io.Serializable
import java.lang.{Double => JDouble}
import java.util.Arrays.binarySearch

import scala.collection.mutable.ArrayBuffer

import org.apache.spark.api.java.{JavaDoubleRDD, JavaRDD}
import org.apache.spark.rdd.RDD

Expand Down Expand Up @@ -208,9 +210,13 @@ class IsotonicRegression private (private var isotonic: Boolean) extends Seriali
private def poolAdjacentViolators(
input: Array[(Double, Double, Double)]): Array[(Double, Double, Double)] = {

if (input.isEmpty) {
return Array.empty
}

// Pools sub array within given bounds assigning weighted average value to all elements.
def pool(input: Array[(Double, Double, Double)], start: Int, end: Int): Unit = {
val poolSubArray = input.view.slice(start, end + 1)
val poolSubArray = input.slice(start, end + 1)

val weightedSum = poolSubArray.map(lp => lp._1 * lp._3).sum
val weight = poolSubArray.map(_._3).sum
Expand Down Expand Up @@ -246,7 +252,35 @@ class IsotonicRegression private (private var isotonic: Boolean) extends Seriali
}
}

input
// For points having the same prediction, we only keep two boundary points.
val compressed = ArrayBuffer.empty[(Double, Double, Double)]

var (curLabel, curFeature, curWeight) = input.head
var rightBound = curFeature
def merge(): Unit = {
compressed += ((curLabel, curFeature, curWeight))
if (rightBound > curFeature) {
compressed += ((curLabel, rightBound, 0.0))
}
}
i = 1
while (i < input.length) {
val (label, feature, weight) = input(i)
if (label == curLabel) {
curWeight += weight
rightBound = feature
} else {
merge()
curLabel = label
curFeature = feature
curWeight = weight
rightBound = curFeature
}
i += 1
}
merge()

compressed.toArray
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,13 @@ class IsotonicRegressionSuite extends FunSuite with MLlibTestSparkContext with M
}

private def generateIsotonicInput(labels: Seq[Double]): Seq[(Double, Double, Double)] = {
labels.zip(1 to labels.size).map(point => (point._1, point._2.toDouble, 1d))
Seq.tabulate(labels.size)(i => (labels(i), i.toDouble, 1d))
}

private def generateIsotonicInput(
labels: Seq[Double],
weights: Seq[Double]): Seq[(Double, Double, Double)] = {
labels.zip(1 to labels.size)
.zip(weights)
.map(point => (point._1._1, point._1._2.toDouble, point._2))
Seq.tabulate(labels.size)(i => (labels(i), i.toDouble, weights(i)))
}

private def runIsotonicRegression(
Expand All @@ -55,87 +53,123 @@ class IsotonicRegressionSuite extends FunSuite with MLlibTestSparkContext with M
}

test("increasing isotonic regression") {
val model = runIsotonicRegression(Seq(1, 2, 3, 3, 1, 6, 17, 16, 17, 18), true)
assert(model.predictions === Array(1, 2, 7d/3, 7d/3, 7d/3, 6, 16.5, 16.5, 17, 18))
/*
The following result could be re-produced with sklearn.
> from sklearn.isotonic import IsotonicRegression
> x = range(9)
> y = [1, 2, 3, 1, 6, 17, 16, 17, 18]
> ir = IsotonicRegression(x, y)
> print ir.predict(x)
array([ 1. , 2. , 2. , 2. , 6. , 16.5, 16.5, 17. , 18. ])
*/
val model = runIsotonicRegression(Seq(1, 2, 3, 1, 6, 17, 16, 17, 18), true)

assert(Array.tabulate(9)(x => model.predict(x)) === Array(1, 2, 2, 2, 6, 16.5, 16.5, 17, 18))

assert(model.boundaries === Array(0, 1, 3, 4, 5, 6, 7, 8))
assert(model.predictions === Array(1, 2, 2, 6, 16.5, 16.5, 17.0, 18.0))
assert(model.isotonic)
}

test("isotonic regression with size 0") {
val model = runIsotonicRegression(Seq(), true)

assert(model.predictions === Array())
}

test("isotonic regression with size 1") {
val model = runIsotonicRegression(Seq(1), true)

assert(model.predictions === Array(1.0))
}

test("isotonic regression strictly increasing sequence") {
val model = runIsotonicRegression(Seq(1, 2, 3, 4, 5), true)

assert(model.predictions === Array(1, 2, 3, 4, 5))
}

test("isotonic regression strictly decreasing sequence") {
val model = runIsotonicRegression(Seq(5, 4, 3, 2, 1), true)
assert(model.predictions === Array(3, 3, 3, 3, 3))

assert(model.boundaries === Array(0, 4))
assert(model.predictions === Array(3, 3))
}

test("isotonic regression with last element violating monotonicity") {
val model = runIsotonicRegression(Seq(1, 2, 3, 4, 2), true)
assert(model.predictions === Array(1, 2, 3, 3, 3))

assert(model.boundaries === Array(0, 1, 2, 4))
assert(model.predictions === Array(1, 2, 3, 3))
}

test("isotonic regression with first element violating monotonicity") {
val model = runIsotonicRegression(Seq(4, 2, 3, 4, 5), true)
assert(model.predictions === Array(3, 3, 3, 4, 5))

assert(model.boundaries === Array(0, 2, 3, 4))
assert(model.predictions === Array(3, 3, 4, 5))
}

test("isotonic regression with negative labels") {
val model = runIsotonicRegression(Seq(-1, -2, 0, 1, -1), true)
assert(model.predictions === Array(-1.5, -1.5, 0, 0, 0))

assert(model.boundaries === Array(0, 1, 2, 4))
assert(model.predictions === Array(-1.5, -1.5, 0, 0))
}

test("isotonic regression with unordered input") {
val trainRDD = sc.parallelize(generateIsotonicInput(Seq(1, 2, 3, 4, 5)).reverse).cache()
val trainRDD = sc.parallelize(generateIsotonicInput(Seq(1, 2, 3, 4, 5)).reverse, 2)

val model = new IsotonicRegression().run(trainRDD)
assert(model.predictions === Array(1, 2, 3, 4, 5))
}

test("weighted isotonic regression") {
val model = runIsotonicRegression(Seq(1, 2, 3, 4, 2), Seq(1, 1, 1, 1, 2), true)
assert(model.predictions === Array(1, 2, 2.75, 2.75,2.75))

assert(model.boundaries === Array(0, 1, 2, 4))
assert(model.predictions === Array(1, 2, 2.75, 2.75))
}

test("weighted isotonic regression with weights lower than 1") {
val model = runIsotonicRegression(Seq(1, 2, 3, 2, 1), Seq(1, 1, 1, 0.1, 0.1), true)
assert(model.predictions.map(round) === Array(1, 2, 3.3/1.2, 3.3/1.2, 3.3/1.2))

assert(model.boundaries === Array(0, 1, 2, 4))
assert(model.predictions.map(round) === Array(1, 2, 3.3/1.2, 3.3/1.2))
}

test("weighted isotonic regression with negative weights") {
val model = runIsotonicRegression(Seq(1, 2, 3, 2, 1), Seq(-1, 1, -3, 1, -5), true)
assert(model.predictions === Array(1.0, 10.0/6, 10.0/6, 10.0/6, 10.0/6))

assert(model.boundaries === Array(0.0, 1.0, 4.0))
assert(model.predictions === Array(1.0, 10.0/6, 10.0/6))
}

test("weighted isotonic regression with zero weights") {
val model = runIsotonicRegression(Seq[Double](1, 2, 3, 2, 1), Seq[Double](0, 0, 0, 1, 0), true)
assert(model.predictions === Array(1, 2, 2, 2, 2))

assert(model.boundaries === Array(0.0, 1.0, 4.0))
assert(model.predictions === Array(1, 2, 2))
}

test("isotonic regression prediction") {
val model = runIsotonicRegression(Seq(1, 2, 7, 1, 2), true)

assert(model.predict(-2) === 1)
assert(model.predict(-1) === 1)
assert(model.predict(0) === 1)
assert(model.predict(1.5) === 1.5)
assert(model.predict(1.75) === 1.75)
assert(model.predict(2) === 2)
assert(model.predict(3) === 10d/3)
assert(model.predict(10) === 10d/3)
assert(model.predict(0.5) === 1.5)
assert(model.predict(0.75) === 1.75)
assert(model.predict(1) === 2)
assert(model.predict(2) === 10d/3)
assert(model.predict(9) === 10d/3)
}

test("isotonic regression prediction with duplicate features") {
val trainRDD = sc.parallelize(
Seq[(Double, Double, Double)](
(2, 1, 1), (1, 1, 1), (4, 2, 1), (2, 2, 1), (6, 3, 1), (5, 3, 1))).cache()
(2, 1, 1), (1, 1, 1), (4, 2, 1), (2, 2, 1), (6, 3, 1), (5, 3, 1)), 2)
val model = new IsotonicRegression().run(trainRDD)

assert(model.predict(0) === 1)
Expand All @@ -147,7 +181,7 @@ class IsotonicRegressionSuite extends FunSuite with MLlibTestSparkContext with M
test("antitonic regression prediction with duplicate features") {
val trainRDD = sc.parallelize(
Seq[(Double, Double, Double)](
(5, 1, 1), (6, 1, 1), (2, 2, 1), (4, 2, 1), (1, 3, 1), (2, 3, 1))).cache()
(5, 1, 1), (6, 1, 1), (2, 2, 1), (4, 2, 1), (1, 3, 1), (2, 3, 1)), 2)
val model = new IsotonicRegression().setIsotonic(false).run(trainRDD)

assert(model.predict(0) === 6)
Expand All @@ -158,21 +192,22 @@ class IsotonicRegressionSuite extends FunSuite with MLlibTestSparkContext with M

test("isotonic regression RDD prediction") {
val model = runIsotonicRegression(Seq(1, 2, 7, 1, 2), true)
val testRDD = sc.parallelize(List(-1.0, 0.0, 1.5, 1.75, 2.0, 3.0, 10.0)).cache()

assert(model.predict(testRDD).collect() === Array(1, 1, 1.5, 1.75, 2, 10.0/3, 10.0/3))
val testRDD = sc.parallelize(List(-2.0, -1.0, 0.5, 0.75, 1.0, 2.0, 9.0), 2)
val predictions = testRDD.map(x => (x, model.predict(x))).collect().sortBy(_._1).map(_._2)
assert(predictions === Array(1, 1, 1.5, 1.75, 2, 10.0/3, 10.0/3))
}

test("antitonic regression prediction") {
val model = runIsotonicRegression(Seq(7, 5, 3, 5, 1), false)

assert(model.predict(-2) === 7)
assert(model.predict(-1) === 7)
assert(model.predict(0) === 7)
assert(model.predict(1.5) === 6)
assert(model.predict(1.75) === 5.5)
assert(model.predict(2) === 5)
assert(model.predict(3) === 4)
assert(model.predict(10) === 1)
assert(model.predict(0.5) === 6)
assert(model.predict(0.75) === 5.5)
assert(model.predict(1) === 5)
assert(model.predict(2) === 4)
assert(model.predict(9) === 1)
}

test("model construction") {
Expand Down

0 comments on commit 0b35c15

Please sign in to comment.