Skip to content

Commit

Permalink
SPARK-3278 isotonic regression refactoring and api changes
Browse files Browse the repository at this point in the history
  • Loading branch information
zapletal-martin committed Nov 26, 2014
1 parent 961aa05 commit 05d9048
Show file tree
Hide file tree
Showing 2 changed files with 103 additions and 119 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,33 +49,82 @@ class IsotonicRegressionModel(

//take the highest of elements smaller than our feature or weight with lowest feature
override def predict(testData: Vector): Double =
(predictions.head +: predictions.filter(y => y.features.toArray.head <= testData.toArray.head)).last.label
(predictions.head +:
predictions.filter(y => y.features.toArray.head <= testData.toArray.head)).last.label
}

/**
* Base representing algorithm for isotonic regression
*/
trait IsotonicRegressionAlgorithm
extends Serializable {

protected def createModel(weights: Seq[LabeledPoint], monotonicityConstraint: MonotonicityConstraint): IsotonicRegressionModel
def run(input: RDD[LabeledPoint], monotonicityConstraint: MonotonicityConstraint): IsotonicRegressionModel
def run(input: RDD[LabeledPoint], initialWeights: Vector, monotonicityConstraint: MonotonicityConstraint): IsotonicRegressionModel
protected def createModel(
weights: Seq[LabeledPoint],
monotonicityConstraint: MonotonicityConstraint): IsotonicRegressionModel

/**
* Run algorithm to obtain isotonic regression model
* @param input data
* @param monotonicityConstraint ascending or descenting
* @return model
*/
def run(
input: RDD[LabeledPoint],
monotonicityConstraint: MonotonicityConstraint): IsotonicRegressionModel

/**
* Run algorithm to obtain isotonic regression model
* @param input data
* @param initialWeights weights
* @param monotonicityConstraint asc or desc
* @return
*/
def run(
input: RDD[LabeledPoint],
initialWeights: Vector,
monotonicityConstraint: MonotonicityConstraint): IsotonicRegressionModel
}

class PoolAdjacentViolators extends IsotonicRegressionAlgorithm {

override def run(input: RDD[LabeledPoint], monotonicityConstraint: MonotonicityConstraint): IsotonicRegressionModel =
createModel(parallelPoolAdjacentViolators(input, monotonicityConstraint), monotonicityConstraint)
override def run(
input: RDD[LabeledPoint],
monotonicityConstraint: MonotonicityConstraint): IsotonicRegressionModel = {
createModel(
parallelPoolAdjacentViolators(input, monotonicityConstraint),
monotonicityConstraint)
}

override def run(input: RDD[LabeledPoint], initialWeights: Vector, monotonicityConstraint: MonotonicityConstraint): IsotonicRegressionModel = ???
override def run(
input: RDD[LabeledPoint],
initialWeights: Vector,
monotonicityConstraint: MonotonicityConstraint): IsotonicRegressionModel = {
???
}

override protected def createModel(weights: Seq[LabeledPoint], monotonicityConstraint: MonotonicityConstraint): IsotonicRegressionModel =
override protected def createModel(
weights: Seq[LabeledPoint],
monotonicityConstraint: MonotonicityConstraint): IsotonicRegressionModel = {
new IsotonicRegressionModel(weights, monotonicityConstraint)
}


//Performs a pool adjacent violators algorithm (PAVA)
//Uses approach with single processing of data where violators in previously processed
//data created by pooling are fixed immediatelly.
//Uses optimization of discovering monotonicity violating sequences
//Method in situ mutates input array
private def poolAdjacentViolators(in: Array[LabeledPoint], monotonicityConstraint: MonotonicityConstraint): Array[LabeledPoint] = {

/**
* Performs a pool adjacent violators algorithm (PAVA)
* Uses approach with single processing of data where violators in previously processed
* data created by pooling are fixed immediatelly.
* Uses optimization of discovering monotonicity violating sequences
* Method in situ mutates input array
*
* @param in input data
* @param monotonicityConstraint asc or desc
* @return result
*/
private def poolAdjacentViolators(
in: Array[LabeledPoint],
monotonicityConstraint: MonotonicityConstraint): Array[LabeledPoint] = {

//Pools sub array within given bounds assigning weighted average value to all elements
def pool(in: Array[LabeledPoint], start: Int, end: Int): Unit = {
Expand Down Expand Up @@ -117,10 +166,22 @@ class PoolAdjacentViolators extends IsotonicRegressionAlgorithm {
in
}

private def parallelPoolAdjacentViolators(testData: RDD[LabeledPoint], monotonicityConstraint: MonotonicityConstraint): Seq[LabeledPoint] = {
/**
* Performs parallel pool adjacent violators algorithm
* Calls PAVA on each partition and then again on the result
*
* @param testData input
* @param monotonicityConstraint asc or desc
* @return result
*/
private def parallelPoolAdjacentViolators(
testData: RDD[LabeledPoint],
monotonicityConstraint: MonotonicityConstraint): Seq[LabeledPoint] = {

poolAdjacentViolators(
testData
.sortBy(_.features.toArray.head)
.cache()
.mapPartitions(it => poolAdjacentViolators(it.toArray, monotonicityConstraint).toIterator)
.collect(), monotonicityConstraint)
}
Expand Down Expand Up @@ -163,106 +224,3 @@ object IsotonicRegression {
new PoolAdjacentViolators().run(input, monotonicityConstraint)
}
}

/*def functionalOption(in: Array[LabeledPoint], monotonicityConstraint: MonotonicityConstraint): Array[LabeledPoint] = {
def pool2(in: Array[LabeledPoint]): Array[LabeledPoint] =
in.map(p => LabeledPoint(in.map(_.label).sum / in.length, p.features))
def iterate(checked: Array[LabeledPoint], remaining: Array[LabeledPoint], monotonicityConstraint: MonotonicityConstraint): Array[LabeledPoint] = {
if(remaining.size < 2) {
checked ++ remaining
} else {
val newRemaining = if(remaining.size == 2) Array[LabeledPoint]() else remaining.slice(2, remaining.length)
if(!monotonicityConstraint.holds(remaining.head, remaining.tail.head)) {
iterate(checked ++ pool2(remaining.slice(0, 2)), newRemaining, monotonicityConstraint)
} else {
iterate(checked ++ remaining.slice(0, 2), newRemaining, monotonicityConstraint)
}
}
}
iterate(Array(), in, monotonicityConstraint)
}
functionalOption(in, monotonicityConstraint)*/

/*def option1(in: Array[LabeledPoint], monotonicityConstraint: MonotonicityConstraint) = {
def findMonotonicityViolators(in: Array[LabeledPoint], start: Int, monotonicityConstraint: MonotonicityConstraint): Unit = {
var j = start
while (j >= 1 && !monotonicityConstraint.holds(in(j - 1), in(j))) {
pool(in, j - 1, start + 1)
j = j - 1
}
}
for (i <- 0 to in.length - 1) {
findMonotonicityViolators(in, i, monotonicityConstraint)
}
in
}*/

/*
def pool(in: Array[LabeledPoint], start: Int, end: Int): Unit = {
val subArraySum = in.slice(start, end).map(_.label).sum
val subArrayLength = math.abs(start - end)
for(i <- start to end - 1) {
in(i) = LabeledPoint(subArraySum / subArrayLength, in(i).features)
}
}*/



/*
OPTION 2
def pool(in: Array[LabeledPoint], range: Range): Unit = {
val subArray = in.slice(range.start, range.end + 1)
val subArraySum = subArray.map(_.label).sum
val subArrayLength = subArray.length
for(i <- range.start to range.end) {
in(i) = LabeledPoint(subArraySum / subArrayLength, in(i).features)
}
}
def poolExtendedViolators(in: Array[LabeledPoint], range: Range, monotonicityConstraint: MonotonicityConstraint): Unit = {
var extendedRange = Range(range.start, range.end)
while (extendedRange.start >= 0 && !monotonicityConstraint.holds(in(extendedRange.start), in(extendedRange.start + 1))) {
pool(in, Range(extendedRange.start, extendedRange.end))
extendedRange = Range(extendedRange.start - 1, extendedRange.end)
}
}
def findViolatingSequence(in: Array[LabeledPoint], start: Int, monotonicityConstraint: MonotonicityConstraint): Option[Range] = {
var j = start
while(j < in.length - 1 && !monotonicityConstraint.holds(in(start), in(j + 1))) {
j = j + 1
}
if(j == start) {
None
} else {
Some(Range(start, j))
}
}
var i = 0;
while(i < in.length) {
findViolatingSequence(in, i, monotonicityConstraint).fold[Unit]({
i = i + 1
})(r => {
poolExtendedViolators(in, r, monotonicityConstraint)
i = r.end
})
}
in
*/
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ class IsotonicRegressionSuite
def generateDataPoints(labels: Double*): Seq[LabeledPoint] =
labels.zip((1 to labels.size)).map(point => LabeledPoint(point._1, Vectors.dense(point._2)))


test("increasing isotonic regression") {
val testRDD = sc.parallelize(generateDataPoints(1, 2, 3, 3, 1, 6, 7, 8, 11, 9, 10, 12, 14, 15, 17, 16, 17, 18, 19, 20)).cache()

Expand Down Expand Up @@ -77,6 +76,33 @@ class IsotonicRegressionSuite
model.predictions should be(generateDataPoints(3, 3, 3, 3, 3))
}

test("isotonic regression with last element violating monotonicity") {
val testRDD = sc.parallelize(generateDataPoints(1, 2, 3, 4, 2)).cache()

val alg = new PoolAdjacentViolators
val model = alg.run(testRDD, Isotonic)

model.predictions should be(generateDataPoints(1, 2, 3, 3, 3))
}

test("isotonic regression with first element violating monotonicity") {
val testRDD = sc.parallelize(generateDataPoints(4, 2, 3, 4, 5)).cache()

val alg = new PoolAdjacentViolators
val model = alg.run(testRDD, Isotonic)

model.predictions should be(generateDataPoints(3, 3, 3, 4, 5))
}

test("isotonic regression with unordered input") {
val testRDD = sc.parallelize(List[LabeledPoint]()).cache()

val alg = new PoolAdjacentViolators
val model = alg.run(testRDD, Isotonic)

model.predictions should be(List())
}

test("isotonic regression prediction") {
val testRDD = sc.parallelize(generateDataPoints(1, 2, 7, 1, 2)).cache()

Expand Down

0 comments on commit 05d9048

Please sign in to comment.