Skip to content

Commit

Permalink
SPARK-3278 added comments and cleaned up api to consistently handle w…
Browse files Browse the repository at this point in the history
…eights
  • Loading branch information
zapletal-martin committed Nov 30, 2014
1 parent 629a1ce commit 8f5daf9
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 66 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,39 +17,51 @@

package org.apache.spark.mllib.regression

import org.apache.spark.mllib.linalg.{Vectors, Vector}
import org.apache.spark.mllib.regression.MonotonicityConstraint.Enum.MonotonicityConstraint
import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.mllib.regression.MonotonicityConstraint.MonotonicityConstraint.{Isotonic, MonotonicityConstraint}
import org.apache.spark.rdd.RDD

/**
* Monotonicity constrains for monotone regression
* Isotonic (increasing)
* Antitonic (decreasing)
*/
object MonotonicityConstraint {

object Enum {
object MonotonicityConstraint {

sealed trait MonotonicityConstraint {
private[regression] def holds(current: WeightedLabeledPoint, next: WeightedLabeledPoint): Boolean
}

/**
* Isotonic monotonicity constraint. Increasing sequence
*/
case object Isotonic extends MonotonicityConstraint {
override def holds(current: WeightedLabeledPoint, next: WeightedLabeledPoint): Boolean = {
current.label <= next.label
}
}

/**
* Antitonic monotonicity constrain. Decreasing sequence
*/
case object Antitonic extends MonotonicityConstraint {
override def holds(current: WeightedLabeledPoint, next: WeightedLabeledPoint): Boolean = {
current.label >= next.label
}
}
}

val Isotonic = Enum.Isotonic
val Antitonic = Enum.Antitonic
val Isotonic = MonotonicityConstraint.Isotonic
val Antitonic = MonotonicityConstraint.Antitonic
}

/**
* Regression model for Isotonic regression
*
* @param predictions Weights computed for every feature.
* @param monotonicityConstraint specifies if the sequence is increasing or decreasing
*/
class IsotonicRegressionModel(
val predictions: Seq[WeightedLabeledPoint],
Expand All @@ -59,10 +71,11 @@ class IsotonicRegressionModel(
override def predict(testData: RDD[Vector]): RDD[Double] =
testData.map(predict)

//take the highest of elements smaller than our feature or weight with lowest feature
override def predict(testData: Vector): Double =
override def predict(testData: Vector): Double = {
//take the highest of data points smaller than our feature or data point with lowest feature
(predictions.head +:
predictions.filter(y => y.features.toArray.head <= testData.toArray.head)).last.label
}
}

/**
Expand All @@ -71,49 +84,40 @@ class IsotonicRegressionModel(
trait IsotonicRegressionAlgorithm
extends Serializable {

/**
* Creates isotonic regression model with given parameters
*
* @param predictions labels estimated using isotonic regression algorithm. Used for predictions on new data points.
* @param monotonicityConstraint isotonic or antitonic
* @return isotonic regression model
*/
protected def createModel(
weights: Seq[WeightedLabeledPoint],
predictions: Seq[WeightedLabeledPoint],
monotonicityConstraint: MonotonicityConstraint): IsotonicRegressionModel

/**
* Run algorithm to obtain isotonic regression model
*
* @param input data
* @param monotonicityConstraint ascending or descenting
* @return model
* @return isotonic regression model
*/
def run(
input: RDD[WeightedLabeledPoint],
monotonicityConstraint: MonotonicityConstraint): IsotonicRegressionModel

/**
* Run algorithm to obtain isotonic regression model
* @param input data
* @param monotonicityConstraint asc or desc
* @param weights weights
* @return
*/
def run(
input: RDD[WeightedLabeledPoint],
monotonicityConstraint: MonotonicityConstraint,
weights: Vector): IsotonicRegressionModel
}

class PoolAdjacentViolators extends IsotonicRegressionAlgorithm {
/**
* Parallel pool adjacent violators algorithm for monotone regression
*/
class PoolAdjacentViolators private [mllib]
extends IsotonicRegressionAlgorithm {

override def run(
input: RDD[WeightedLabeledPoint],
monotonicityConstraint: MonotonicityConstraint): IsotonicRegressionModel = {
createModel(
parallelPoolAdjacentViolators(input, monotonicityConstraint, Vectors.dense(Array(0d))),
monotonicityConstraint)
}

override def run(
input: RDD[WeightedLabeledPoint],
monotonicityConstraint: MonotonicityConstraint,
weights: Vector): IsotonicRegressionModel = {
createModel(
parallelPoolAdjacentViolators(input, monotonicityConstraint, weights),
parallelPoolAdjacentViolators(input, monotonicityConstraint),
monotonicityConstraint)
}

Expand Down Expand Up @@ -180,16 +184,15 @@ class PoolAdjacentViolators extends IsotonicRegressionAlgorithm {

/**
* Performs parallel pool adjacent violators algorithm
* Calls PAVA on each partition and then again on the result
* Calls Pool adjacent violators on each partition and then again on the result
*
* @param testData input
* @param monotonicityConstraint asc or desc
* @return result
*/
private def parallelPoolAdjacentViolators(
testData: RDD[WeightedLabeledPoint],
monotonicityConstraint: MonotonicityConstraint,
weights: Vector): Seq[WeightedLabeledPoint] = {
monotonicityConstraint: MonotonicityConstraint): Seq[WeightedLabeledPoint] = {

poolAdjacentViolators(
testData
Expand All @@ -201,39 +204,24 @@ class PoolAdjacentViolators extends IsotonicRegressionAlgorithm {
}

/**
* Top-level methods for calling IsotonicRegression.
* Top-level methods for monotone regression (either isotonic or antitonic).
*/
object IsotonicRegression {

/**
* Train a Linear Regression model given an RDD of (label, features) pairs. We run a fixed number
* of iterations of gradient descent using the specified step size. Each iteration uses
* `miniBatchFraction` fraction of the data to calculate a stochastic gradient. The weights used
* in gradient descent are initialized using the initial weights provided.
* Train a monotone regression model given an RDD of (label, features, weight).
* Currently only one dimensional algorithm is supported (features.length is one)
* Label is the dependent y value
* Weight of the data point is the number of measurements. Default is 1
*
* @param input RDD of (label, array of features) pairs. Each pair describes a row of the data
* matrix A as well as the corresponding right hand side label y
* @param weights Initial set of weights to be used. Array should be equal in size to
* the number of features in the data.
* @param input RDD of (label, array of features, weight). Each point describes a row of the data
* matrix A as well as the corresponding right hand side label y
* and weight as number of measurements
* @param monotonicityConstraint
*/
def train(
input: RDD[WeightedLabeledPoint],
monotonicityConstraint: MonotonicityConstraint,
weights: Vector): IsotonicRegressionModel = {
new PoolAdjacentViolators().run(input, monotonicityConstraint, weights)
}

/**
* Train a LinearRegression model given an RDD of (label, features) pairs. We run a fixed number
* of iterations of gradient descent using the specified step size. Each iteration uses
* `miniBatchFraction` fraction of the data to calculate a stochastic gradient.
*
* @param input RDD of (label, array of features) pairs. Each pair describes a row of the data
* matrix A as well as the corresponding right hand side label y
*/
def train(
input: RDD[WeightedLabeledPoint],
monotonicityConstraint: MonotonicityConstraint): IsotonicRegressionModel = {
monotonicityConstraint: MonotonicityConstraint = Isotonic): IsotonicRegressionModel = {
new PoolAdjacentViolators().run(input, monotonicityConstraint)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,21 @@ import scala.beans.BeanInfo
object WeightedLabeledPointConversions {
implicit def labeledPointToWeightedLabeledPoint(
labeledPoint: LabeledPoint): WeightedLabeledPoint = {
WeightedLabeledPoint(labeledPoint.label, labeledPoint.features, 1)
WeightedLabeledPoint(labeledPoint.label, labeledPoint.features)
}

implicit def labeledPointRDDToWeightedLabeledPointRDD(
rdd: RDD[LabeledPoint]): RDD[WeightedLabeledPoint] = {
rdd.map(lp => WeightedLabeledPoint(lp.label, lp.features, 1))
rdd.map(lp => WeightedLabeledPoint(lp.label, lp.features))
}
}

/**
* Labeled point with weight
* Class that represents the features and labels of a data point with associated weight
*
* @param label Label for this data point.
* @param features List of features for this data point.
* @param weight Weight of the data point. Defaults to 1.
*/
@BeanInfo
case class WeightedLabeledPoint(label: Double, features: Vector, weight: Double)
case class WeightedLabeledPoint(label: Double, features: Vector, weight: Double = 1)
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import org.junit.Test;

import java.io.Serializable;
import java.util.Arrays;
import java.util.List;

public class JavaIsotonicRegressionSuite implements Serializable {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.spark.mllib.regression

import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.MonotonicityConstraint.Enum.{Antitonic, Isotonic}
import org.apache.spark.mllib.regression.MonotonicityConstraint.MonotonicityConstraint.{Antitonic, Isotonic}
import org.apache.spark.mllib.util.{LocalClusterSparkContext, MLlibTestSparkContext}
import org.scalatest.{Matchers, FunSuite}
import WeightedLabeledPointConversions._
Expand Down

0 comments on commit 8f5daf9

Please sign in to comment.