From 05d9048b1aa167d72998b9057cc2672107361866 Mon Sep 17 00:00:00 2001 From: martinzapletal Date: Wed, 26 Nov 2014 12:08:27 +0000 Subject: [PATCH] SPARK-3278 isotonic regression refactoring and api changes --- .../mllib/regression/IsotonicRegression.scala | 194 +++++++----------- .../regression/IsotonicRegressionSuite.scala | 28 ++- 2 files changed, 103 insertions(+), 119 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala index 09fe3c189cee6..cfe792c651b9c 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala @@ -49,33 +49,82 @@ class IsotonicRegressionModel( //take the highest of elements smaller than our feature or weight with lowest feature override def predict(testData: Vector): Double = - (predictions.head +: predictions.filter(y => y.features.toArray.head <= testData.toArray.head)).last.label + (predictions.head +: + predictions.filter(y => y.features.toArray.head <= testData.toArray.head)).last.label } +/** + * Base representing algorithm for isotonic regression + */ trait IsotonicRegressionAlgorithm extends Serializable { - protected def createModel(weights: Seq[LabeledPoint], monotonicityConstraint: MonotonicityConstraint): IsotonicRegressionModel - def run(input: RDD[LabeledPoint], monotonicityConstraint: MonotonicityConstraint): IsotonicRegressionModel - def run(input: RDD[LabeledPoint], initialWeights: Vector, monotonicityConstraint: MonotonicityConstraint): IsotonicRegressionModel + protected def createModel( + weights: Seq[LabeledPoint], + monotonicityConstraint: MonotonicityConstraint): IsotonicRegressionModel + + /** + * Run algorithm to obtain isotonic regression model + * @param input data + * @param monotonicityConstraint ascending or descenting + * @return model + */ + def run( + input: RDD[LabeledPoint], + monotonicityConstraint: MonotonicityConstraint): IsotonicRegressionModel + + /** + * Run algorithm to obtain isotonic regression model + * @param input data + * @param initialWeights weights + * @param monotonicityConstraint asc or desc + * @return + */ + def run( + input: RDD[LabeledPoint], + initialWeights: Vector, + monotonicityConstraint: MonotonicityConstraint): IsotonicRegressionModel } class PoolAdjacentViolators extends IsotonicRegressionAlgorithm { - override def run(input: RDD[LabeledPoint], monotonicityConstraint: MonotonicityConstraint): IsotonicRegressionModel = - createModel(parallelPoolAdjacentViolators(input, monotonicityConstraint), monotonicityConstraint) + override def run( + input: RDD[LabeledPoint], + monotonicityConstraint: MonotonicityConstraint): IsotonicRegressionModel = { + createModel( + parallelPoolAdjacentViolators(input, monotonicityConstraint), + monotonicityConstraint) + } - override def run(input: RDD[LabeledPoint], initialWeights: Vector, monotonicityConstraint: MonotonicityConstraint): IsotonicRegressionModel = ??? + override def run( + input: RDD[LabeledPoint], + initialWeights: Vector, + monotonicityConstraint: MonotonicityConstraint): IsotonicRegressionModel = { + ??? + } - override protected def createModel(weights: Seq[LabeledPoint], monotonicityConstraint: MonotonicityConstraint): IsotonicRegressionModel = + override protected def createModel( + weights: Seq[LabeledPoint], + monotonicityConstraint: MonotonicityConstraint): IsotonicRegressionModel = { new IsotonicRegressionModel(weights, monotonicityConstraint) + } + - //Performs a pool adjacent violators algorithm (PAVA) - //Uses approach with single processing of data where violators in previously processed - //data created by pooling are fixed immediatelly. - //Uses optimization of discovering monotonicity violating sequences - //Method in situ mutates input array - private def poolAdjacentViolators(in: Array[LabeledPoint], monotonicityConstraint: MonotonicityConstraint): Array[LabeledPoint] = { + + /** + * Performs a pool adjacent violators algorithm (PAVA) + * Uses approach with single processing of data where violators in previously processed + * data created by pooling are fixed immediatelly. + * Uses optimization of discovering monotonicity violating sequences + * Method in situ mutates input array + * + * @param in input data + * @param monotonicityConstraint asc or desc + * @return result + */ + private def poolAdjacentViolators( + in: Array[LabeledPoint], + monotonicityConstraint: MonotonicityConstraint): Array[LabeledPoint] = { //Pools sub array within given bounds assigning weighted average value to all elements def pool(in: Array[LabeledPoint], start: Int, end: Int): Unit = { @@ -117,10 +166,22 @@ class PoolAdjacentViolators extends IsotonicRegressionAlgorithm { in } - private def parallelPoolAdjacentViolators(testData: RDD[LabeledPoint], monotonicityConstraint: MonotonicityConstraint): Seq[LabeledPoint] = { + /** + * Performs parallel pool adjacent violators algorithm + * Calls PAVA on each partition and then again on the result + * + * @param testData input + * @param monotonicityConstraint asc or desc + * @return result + */ + private def parallelPoolAdjacentViolators( + testData: RDD[LabeledPoint], + monotonicityConstraint: MonotonicityConstraint): Seq[LabeledPoint] = { + poolAdjacentViolators( testData .sortBy(_.features.toArray.head) + .cache() .mapPartitions(it => poolAdjacentViolators(it.toArray, monotonicityConstraint).toIterator) .collect(), monotonicityConstraint) } @@ -163,106 +224,3 @@ object IsotonicRegression { new PoolAdjacentViolators().run(input, monotonicityConstraint) } } - -/*def functionalOption(in: Array[LabeledPoint], monotonicityConstraint: MonotonicityConstraint): Array[LabeledPoint] = { - def pool2(in: Array[LabeledPoint]): Array[LabeledPoint] = - in.map(p => LabeledPoint(in.map(_.label).sum / in.length, p.features)) - - def iterate(checked: Array[LabeledPoint], remaining: Array[LabeledPoint], monotonicityConstraint: MonotonicityConstraint): Array[LabeledPoint] = { - if(remaining.size < 2) { - checked ++ remaining - } else { - val newRemaining = if(remaining.size == 2) Array[LabeledPoint]() else remaining.slice(2, remaining.length) - - if(!monotonicityConstraint.holds(remaining.head, remaining.tail.head)) { - iterate(checked ++ pool2(remaining.slice(0, 2)), newRemaining, monotonicityConstraint) - } else { - iterate(checked ++ remaining.slice(0, 2), newRemaining, monotonicityConstraint) - } - } - } - - iterate(Array(), in, monotonicityConstraint) - } - - - functionalOption(in, monotonicityConstraint)*/ - -/*def option1(in: Array[LabeledPoint], monotonicityConstraint: MonotonicityConstraint) = { - def findMonotonicityViolators(in: Array[LabeledPoint], start: Int, monotonicityConstraint: MonotonicityConstraint): Unit = { - var j = start - - while (j >= 1 && !monotonicityConstraint.holds(in(j - 1), in(j))) { - pool(in, j - 1, start + 1) - j = j - 1 - } - } - - for (i <- 0 to in.length - 1) { - findMonotonicityViolators(in, i, monotonicityConstraint) - } - - in - }*/ - -/* -def pool(in: Array[LabeledPoint], start: Int, end: Int): Unit = { -val subArraySum = in.slice(start, end).map(_.label).sum -val subArrayLength = math.abs(start - end) - -for(i <- start to end - 1) { -in(i) = LabeledPoint(subArraySum / subArrayLength, in(i).features) -} -}*/ - - - -/* -OPTION 2 -def pool(in: Array[LabeledPoint], range: Range): Unit = { - val subArray = in.slice(range.start, range.end + 1) - - val subArraySum = subArray.map(_.label).sum - val subArrayLength = subArray.length - - for(i <- range.start to range.end) { - in(i) = LabeledPoint(subArraySum / subArrayLength, in(i).features) - } - } - - def poolExtendedViolators(in: Array[LabeledPoint], range: Range, monotonicityConstraint: MonotonicityConstraint): Unit = { - var extendedRange = Range(range.start, range.end) - - while (extendedRange.start >= 0 && !monotonicityConstraint.holds(in(extendedRange.start), in(extendedRange.start + 1))) { - pool(in, Range(extendedRange.start, extendedRange.end)) - extendedRange = Range(extendedRange.start - 1, extendedRange.end) - } - } - - def findViolatingSequence(in: Array[LabeledPoint], start: Int, monotonicityConstraint: MonotonicityConstraint): Option[Range] = { - var j = start - - while(j < in.length - 1 && !monotonicityConstraint.holds(in(start), in(j + 1))) { - j = j + 1 - } - - if(j == start) { - None - } else { - Some(Range(start, j)) - } - } - - var i = 0; - - while(i < in.length) { - findViolatingSequence(in, i, monotonicityConstraint).fold[Unit]({ - i = i + 1 - })(r => { - poolExtendedViolators(in, r, monotonicityConstraint) - i = r.end - }) - } - - in - */ \ No newline at end of file diff --git a/mllib/src/test/scala/org/apache/spark/mllib/regression/IsotonicRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/regression/IsotonicRegressionSuite.scala index 4197167602d05..39d93bfb16a0e 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/regression/IsotonicRegressionSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/regression/IsotonicRegressionSuite.scala @@ -31,7 +31,6 @@ class IsotonicRegressionSuite def generateDataPoints(labels: Double*): Seq[LabeledPoint] = labels.zip((1 to labels.size)).map(point => LabeledPoint(point._1, Vectors.dense(point._2))) - test("increasing isotonic regression") { val testRDD = sc.parallelize(generateDataPoints(1, 2, 3, 3, 1, 6, 7, 8, 11, 9, 10, 12, 14, 15, 17, 16, 17, 18, 19, 20)).cache() @@ -77,6 +76,33 @@ class IsotonicRegressionSuite model.predictions should be(generateDataPoints(3, 3, 3, 3, 3)) } + test("isotonic regression with last element violating monotonicity") { + val testRDD = sc.parallelize(generateDataPoints(1, 2, 3, 4, 2)).cache() + + val alg = new PoolAdjacentViolators + val model = alg.run(testRDD, Isotonic) + + model.predictions should be(generateDataPoints(1, 2, 3, 3, 3)) + } + + test("isotonic regression with first element violating monotonicity") { + val testRDD = sc.parallelize(generateDataPoints(4, 2, 3, 4, 5)).cache() + + val alg = new PoolAdjacentViolators + val model = alg.run(testRDD, Isotonic) + + model.predictions should be(generateDataPoints(3, 3, 3, 4, 5)) + } + + test("isotonic regression with unordered input") { + val testRDD = sc.parallelize(List[LabeledPoint]()).cache() + + val alg = new PoolAdjacentViolators + val model = alg.run(testRDD, Isotonic) + + model.predictions should be(List()) + } + test("isotonic regression prediction") { val testRDD = sc.parallelize(generateDataPoints(1, 2, 7, 1, 2)).cache()