From 089bf86f7b49ebb2ba35d2a72afc50edf9af926e Mon Sep 17 00:00:00 2001 From: martinzapletal Date: Sat, 27 Dec 2014 16:19:48 +0100 Subject: [PATCH] Removed MonotonicityConstraint, Isotonic and Antitonic constraints. Replced by simple boolean --- .../mllib/regression/IsotonicRegression.scala | 89 ++++++------------- .../JavaIsotonicRegressionSuite.java | 6 +- .../regression/IsotonicRegressionSuite.scala | 35 ++++---- 3 files changed, 48 insertions(+), 82 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala index ba33762fba754..aa4bfc6584e0c 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala @@ -18,56 +18,17 @@ package org.apache.spark.mllib.regression import org.apache.spark.mllib.linalg.Vector -import org.apache.spark.mllib.regression.MonotonicityConstraint.MonotonicityConstraint._ import org.apache.spark.rdd.RDD -/** - * Monotonicity constrains for monotone regression - * Isotonic (increasing) - * Antitonic (decreasing) - */ -object MonotonicityConstraint { - - object MonotonicityConstraint { - - sealed trait MonotonicityConstraint { - private[regression] def holds( - current: WeightedLabeledPoint, - next: WeightedLabeledPoint): Boolean - } - - /** - * Isotonic monotonicity constraint. Increasing sequence - */ - case object Isotonic extends MonotonicityConstraint { - override def holds(current: WeightedLabeledPoint, next: WeightedLabeledPoint): Boolean = { - current.label <= next.label - } - } - - /** - * Antitonic monotonicity constrain. Decreasing sequence - */ - case object Antitonic extends MonotonicityConstraint { - override def holds(current: WeightedLabeledPoint, next: WeightedLabeledPoint): Boolean = { - current.label >= next.label - } - } - } - - val Isotonic = MonotonicityConstraint.Isotonic - val Antitonic = MonotonicityConstraint.Antitonic -} - /** * Regression model for Isotonic regression * * @param predictions Weights computed for every feature. - * @param monotonicityConstraint specifies if the sequence is increasing or decreasing + * @param isotonic isotonic (increasing) or antitonic (decreasing) sequence */ class IsotonicRegressionModel( val predictions: Seq[WeightedLabeledPoint], - val monotonicityConstraint: MonotonicityConstraint) + val isotonic: Boolean) extends RegressionModel { override def predict(testData: RDD[Vector]): RDD[Double] = @@ -91,23 +52,23 @@ trait IsotonicRegressionAlgorithm * * @param predictions labels estimated using isotonic regression algorithm. * Used for predictions on new data points. - * @param monotonicityConstraint isotonic or antitonic + * @param isotonic isotonic (increasing) or antitonic (decreasing) sequence * @return isotonic regression model */ protected def createModel( predictions: Seq[WeightedLabeledPoint], - monotonicityConstraint: MonotonicityConstraint): IsotonicRegressionModel + isotonic: Boolean): IsotonicRegressionModel /** * Run algorithm to obtain isotonic regression model * * @param input data - * @param monotonicityConstraint ascending or descenting + * @param isotonic isotonic (increasing) or antitonic (decreasing) sequence * @return isotonic regression model */ def run( input: RDD[WeightedLabeledPoint], - monotonicityConstraint: MonotonicityConstraint): IsotonicRegressionModel + isotonic: Boolean): IsotonicRegressionModel } /** @@ -118,16 +79,16 @@ class PoolAdjacentViolators private [mllib] override def run( input: RDD[WeightedLabeledPoint], - monotonicityConstraint: MonotonicityConstraint): IsotonicRegressionModel = { + isotonic: Boolean): IsotonicRegressionModel = { createModel( - parallelPoolAdjacentViolators(input, monotonicityConstraint), - monotonicityConstraint) + parallelPoolAdjacentViolators(input, isotonic), + isotonic) } override protected def createModel( predictions: Seq[WeightedLabeledPoint], - monotonicityConstraint: MonotonicityConstraint): IsotonicRegressionModel = { - new IsotonicRegressionModel(predictions, monotonicityConstraint) + isotonic: Boolean): IsotonicRegressionModel = { + new IsotonicRegressionModel(predictions, isotonic) } /** @@ -138,12 +99,12 @@ class PoolAdjacentViolators private [mllib] * Method in situ mutates input array * * @param in input data - * @param monotonicityConstraint asc or desc + * @param isotonic asc or desc * @return result */ private def poolAdjacentViolators( in: Array[WeightedLabeledPoint], - monotonicityConstraint: MonotonicityConstraint): Array[WeightedLabeledPoint] = { + isotonic: Boolean): Array[WeightedLabeledPoint] = { // Pools sub array within given bounds assigning weighted average value to all elements def pool(in: Array[WeightedLabeledPoint], start: Int, end: Int): Unit = { @@ -159,11 +120,17 @@ class PoolAdjacentViolators private [mllib] var i = 0 + val monotonicityConstrainter: (Double, Double) => Boolean = (x, y) => if(isotonic) { + x <= y + } else { + x >= y + } + while(i < in.length) { var j = i // Find monotonicity violating sequence, if any - while(j < in.length - 1 && !monotonicityConstraint.holds(in(j), in(j + 1))) { + while(j < in.length - 1 && !monotonicityConstrainter(in(j).label, in(j + 1).label)) { j = j + 1 } @@ -173,7 +140,7 @@ class PoolAdjacentViolators private [mllib] } else { // Otherwise pool the violating sequence // And check if pooling caused monotonicity violation in previously processed points - while (i >= 0 && !monotonicityConstraint.holds(in(i), in(i + 1))) { + while (i >= 0 && !monotonicityConstrainter(in(i).label, in(i + 1).label)) { pool(in, i, j) i = i - 1 } @@ -190,19 +157,19 @@ class PoolAdjacentViolators private [mllib] * Calls Pool adjacent violators on each partition and then again on the result * * @param testData input - * @param monotonicityConstraint asc or desc + * @param isotonic isotonic (increasing) or antitonic (decreasing) sequence * @return result */ private def parallelPoolAdjacentViolators( testData: RDD[WeightedLabeledPoint], - monotonicityConstraint: MonotonicityConstraint): Seq[WeightedLabeledPoint] = { + isotonic: Boolean): Seq[WeightedLabeledPoint] = { poolAdjacentViolators( testData .sortBy(_.features.toArray.head) .cache() - .mapPartitions(it => poolAdjacentViolators(it.toArray, monotonicityConstraint).toIterator) - .collect(), monotonicityConstraint) + .mapPartitions(it => poolAdjacentViolators(it.toArray, isotonic).toIterator) + .collect(), isotonic) } } @@ -221,11 +188,11 @@ object IsotonicRegression { * Each point describes a row of the data * matrix A as well as the corresponding right hand side label y * and weight as number of measurements - * @param monotonicityConstraint Isotonic (increasing) or Antitonic (decreasing) sequence + * @param isotonic isotonic (increasing) or antitonic (decreasing) sequence */ def train( input: RDD[WeightedLabeledPoint], - monotonicityConstraint: MonotonicityConstraint = Isotonic): IsotonicRegressionModel = { - new PoolAdjacentViolators().run(input, monotonicityConstraint) + isotonic: Boolean = true): IsotonicRegressionModel = { + new PoolAdjacentViolators().run(input, isotonic) } } diff --git a/mllib/src/test/java/org/apache/spark/mllib/regression/JavaIsotonicRegressionSuite.java b/mllib/src/test/java/org/apache/spark/mllib/regression/JavaIsotonicRegressionSuite.java index b3285271fcbd1..15473883d4e43 100644 --- a/mllib/src/test/java/org/apache/spark/mllib/regression/JavaIsotonicRegressionSuite.java +++ b/mllib/src/test/java/org/apache/spark/mllib/regression/JavaIsotonicRegressionSuite.java @@ -62,7 +62,7 @@ public void runIsotonicRegressionUsingConstructor() { new double[] {1, 2, 3, 3, 1, 6, 7, 8, 11, 9, 10, 12})).cache(); IsotonicRegressionAlgorithm isotonicRegressionAlgorithm = new PoolAdjacentViolators(); - IsotonicRegressionModel model = isotonicRegressionAlgorithm.run(testRDD.rdd(), MonotonicityConstraint.Isotonic()); + IsotonicRegressionModel model = isotonicRegressionAlgorithm.run(testRDD.rdd(), true); List expected = IsotonicDataGenerator .generateIsotonicInputAsList( @@ -77,7 +77,7 @@ public void runIsotonicRegressionUsingStaticMethod() { .generateIsotonicInputAsList( new double[] {1, 2, 3, 3, 1, 6, 7, 8, 11, 9, 10, 12})).cache(); - IsotonicRegressionModel model = IsotonicRegression.train(testRDD.rdd(), MonotonicityConstraint.Isotonic()); + IsotonicRegressionModel model = IsotonicRegression.train(testRDD.rdd(), true); List expected = IsotonicDataGenerator .generateIsotonicInputAsList( @@ -92,7 +92,7 @@ public void testPredictJavaRDD() { .generateIsotonicInputAsList( new double[] {1, 2, 3, 3, 1, 6, 7, 8, 11, 9, 10, 12})).cache(); - IsotonicRegressionModel model = IsotonicRegression.train(testRDD.rdd(), MonotonicityConstraint.Isotonic()); + IsotonicRegressionModel model = IsotonicRegression.train(testRDD.rdd(), true); JavaRDD vectors = testRDD.map(new Function() { @Override diff --git a/mllib/src/test/scala/org/apache/spark/mllib/regression/IsotonicRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/regression/IsotonicRegressionSuite.scala index c5ba42e912e28..a8c28f3028074 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/regression/IsotonicRegressionSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/regression/IsotonicRegressionSuite.scala @@ -18,7 +18,6 @@ package org.apache.spark.mllib.regression import org.apache.spark.mllib.linalg.Vectors -import org.apache.spark.mllib.regression.MonotonicityConstraint.MonotonicityConstraint.{Antitonic, Isotonic} import org.apache.spark.mllib.util.{LocalClusterSparkContext, MLlibTestSparkContext} import org.scalatest.{Matchers, FunSuite} import WeightedLabeledPointConversions._ @@ -37,7 +36,7 @@ class IsotonicRegressionSuite val testRDD = sc.parallelize(generateIsotonicInput(1, 2, 3, 3, 1, 6, 7, 8, 11, 9, 10, 12, 14, 15, 17, 16, 17, 18, 19, 20)).cache() val alg = new PoolAdjacentViolators - val model = alg.run(testRDD, Isotonic) + val model = alg.run(testRDD, true) model.predictions should be(generateIsotonicInput(1, 2, 7d/3, 7d/3, 7d/3, 6, 7, 8, 10, 10, 10, 12, 14, 15, 16.5, 16.5, 17, 18, 19, 20)) } @@ -46,7 +45,7 @@ class IsotonicRegressionSuite val testRDD = sc.parallelize(List[WeightedLabeledPoint]()).cache() val alg = new PoolAdjacentViolators - val model = alg.run(testRDD, Isotonic) + val model = alg.run(testRDD, true) model.predictions should be(List()) } @@ -55,7 +54,7 @@ class IsotonicRegressionSuite val testRDD = sc.parallelize(generateIsotonicInput(1)).cache() val alg = new PoolAdjacentViolators - val model = alg.run(testRDD, Isotonic) + val model = alg.run(testRDD, true) model.predictions should be(generateIsotonicInput(1)) } @@ -64,7 +63,7 @@ class IsotonicRegressionSuite val testRDD = sc.parallelize(generateIsotonicInput(1, 2, 3, 4, 5)).cache() val alg = new PoolAdjacentViolators - val model = alg.run(testRDD, Isotonic) + val model = alg.run(testRDD, true) model.predictions should be(generateIsotonicInput(1, 2, 3, 4, 5)) } @@ -73,7 +72,7 @@ class IsotonicRegressionSuite val testRDD = sc.parallelize(generateIsotonicInput(5, 4, 3, 2, 1)).cache() val alg = new PoolAdjacentViolators - val model = alg.run(testRDD, Isotonic) + val model = alg.run(testRDD, true) model.predictions should be(generateIsotonicInput(3, 3, 3, 3, 3)) } @@ -82,7 +81,7 @@ class IsotonicRegressionSuite val testRDD = sc.parallelize(generateIsotonicInput(1, 2, 3, 4, 2)).cache() val alg = new PoolAdjacentViolators - val model = alg.run(testRDD, Isotonic) + val model = alg.run(testRDD, true) model.predictions should be(generateIsotonicInput(1, 2, 3, 3, 3)) } @@ -91,7 +90,7 @@ class IsotonicRegressionSuite val testRDD = sc.parallelize(generateIsotonicInput(4, 2, 3, 4, 5)).cache() val alg = new PoolAdjacentViolators - val model = alg.run(testRDD, Isotonic) + val model = alg.run(testRDD, true) model.predictions should be(generateIsotonicInput(3, 3, 3, 4, 5)) } @@ -100,7 +99,7 @@ class IsotonicRegressionSuite val testRDD = sc.parallelize(generateIsotonicInput(-1, -2, 0, 1, -1)).cache() val alg = new PoolAdjacentViolators - val model = alg.run(testRDD, Isotonic) + val model = alg.run(testRDD, true) model.predictions should be(generateIsotonicInput(-1.5, -1.5, 0, 0, 0)) } @@ -109,7 +108,7 @@ class IsotonicRegressionSuite val testRDD = sc.parallelize(generateIsotonicInput(1, 2, 3, 4, 5).reverse).cache() val alg = new PoolAdjacentViolators - val model = alg.run(testRDD, Isotonic) + val model = alg.run(testRDD, true) model.predictions should be(generateIsotonicInput(1, 2, 3, 4, 5)) } @@ -118,7 +117,7 @@ class IsotonicRegressionSuite val testRDD = sc.parallelize(generateWeightedIsotonicInput(Seq(1, 2, 3, 4, 2), Seq(1, 1, 1, 1, 2))).cache() val alg = new PoolAdjacentViolators - val model = alg.run(testRDD, Isotonic) + val model = alg.run(testRDD, true) model.predictions should be(generateWeightedIsotonicInput(Seq(1, 2, 2.75, 2.75,2.75), Seq(1, 1, 1, 1, 2))) } @@ -127,7 +126,7 @@ class IsotonicRegressionSuite val testRDD = sc.parallelize(generateWeightedIsotonicInput(Seq(1, 2, 3, 2, 1), Seq(1, 1, 1, 0.1, 0.1))).cache() val alg = new PoolAdjacentViolators - val model = alg.run(testRDD, Isotonic) + val model = alg.run(testRDD, true) model.predictions.map(p => p.copy(label = round(p.label))) should be (generateWeightedIsotonicInput(Seq(1, 2, 3.3/1.2, 3.3/1.2, 3.3/1.2), Seq(1, 1, 1, 0.1, 0.1))) @@ -137,7 +136,7 @@ class IsotonicRegressionSuite val testRDD = sc.parallelize(generateWeightedIsotonicInput(Seq(1, 2, 3, 2, 1), Seq(-1, 1, -3, 1, -5))).cache() val alg = new PoolAdjacentViolators - val model = alg.run(testRDD, Isotonic) + val model = alg.run(testRDD, true) model.predictions.map(p => p.copy(label = round(p.label))) should be (generateWeightedIsotonicInput(Seq(1, 10/6, 10/6, 10/6, 10/6), Seq(-1, 1, -3, 1, -5))) @@ -147,7 +146,7 @@ class IsotonicRegressionSuite val testRDD = sc.parallelize(generateWeightedIsotonicInput(Seq(1, 2, 3, 2, 1), Seq(0, 0, 0, 1, 0))).cache() val alg = new PoolAdjacentViolators - val model = alg.run(testRDD, Isotonic) + val model = alg.run(testRDD, true) model.predictions should be(generateWeightedIsotonicInput(Seq(1, 2, 2, 2, 2), Seq(0, 0, 0, 1, 0))) } @@ -156,7 +155,7 @@ class IsotonicRegressionSuite val testRDD = sc.parallelize(generateIsotonicInput(1, 2, 7, 1, 2)).cache() val alg = new PoolAdjacentViolators - val model = alg.run(testRDD, Isotonic) + val model = alg.run(testRDD, true) model.predict(Vectors.dense(0)) should be(1) model.predict(Vectors.dense(2)) should be(2) @@ -168,7 +167,7 @@ class IsotonicRegressionSuite val testRDD = sc.parallelize(generateIsotonicInput(7, 5, 3, 5, 1)).cache() val alg = new PoolAdjacentViolators - val model = alg.run(testRDD, Antitonic) + val model = alg.run(testRDD, false) model.predict(Vectors.dense(0)) should be(7) model.predict(Vectors.dense(2)) should be(5) @@ -183,7 +182,7 @@ class IsotonicRegressionSuite LabeledPoint(1, Vectors.dense(2)))).cache() val alg = new PoolAdjacentViolators - val model = alg.run(testRDD, Isotonic) + val model = alg.run(testRDD, true) model.predictions should be(generateIsotonicInput(1.5, 1.5)) } @@ -201,7 +200,7 @@ class IsotonicRegressionClusterSuite extends FunSuite with LocalClusterSparkCont // If we serialize data directly in the task closure, the size of the serialized task would be // greater than 1MB and hence Spark would throw an error. - val model = IsotonicRegression.train(points, Isotonic) + val model = IsotonicRegression.train(points, true) val predictions = model.predict(points.map(_.features)) } } \ No newline at end of file