Skip to content

Commit

Permalink
SPARK-3278 added isotonic regression for weighted data. Added tests f…
Browse files Browse the repository at this point in the history
…or Java api
  • Loading branch information
zapletal-martin committed Nov 29, 2014
1 parent 05d9048 commit 629a1ce
Show file tree
Hide file tree
Showing 5 changed files with 345 additions and 63 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,33 @@

package org.apache.spark.mllib.regression

import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.mllib.linalg.{Vectors, Vector}
import org.apache.spark.mllib.regression.MonotonicityConstraint.Enum.MonotonicityConstraint
import org.apache.spark.rdd.RDD

sealed trait MonotonicityConstraint {
def holds(current: LabeledPoint, next: LabeledPoint): Boolean
}
object MonotonicityConstraint {

case object Isotonic extends MonotonicityConstraint {
override def holds(current: LabeledPoint, next: LabeledPoint): Boolean = {
current.label <= next.label
}
}
case object Antitonic extends MonotonicityConstraint {
override def holds(current: LabeledPoint, next: LabeledPoint): Boolean = {
current.label >= next.label
object Enum {

sealed trait MonotonicityConstraint {
private[regression] def holds(current: WeightedLabeledPoint, next: WeightedLabeledPoint): Boolean
}

case object Isotonic extends MonotonicityConstraint {
override def holds(current: WeightedLabeledPoint, next: WeightedLabeledPoint): Boolean = {
current.label <= next.label
}
}

case object Antitonic extends MonotonicityConstraint {
override def holds(current: WeightedLabeledPoint, next: WeightedLabeledPoint): Boolean = {
current.label >= next.label
}
}
}

val Isotonic = Enum.Isotonic
val Antitonic = Enum.Antitonic
}

/**
Expand All @@ -41,9 +52,10 @@ case object Antitonic extends MonotonicityConstraint {
* @param predictions Weights computed for every feature.
*/
class IsotonicRegressionModel(
val predictions: Seq[LabeledPoint],
val predictions: Seq[WeightedLabeledPoint],
val monotonicityConstraint: MonotonicityConstraint)
extends RegressionModel {

override def predict(testData: RDD[Vector]): RDD[Double] =
testData.map(predict)

Expand All @@ -60,7 +72,7 @@ trait IsotonicRegressionAlgorithm
extends Serializable {

protected def createModel(
weights: Seq[LabeledPoint],
weights: Seq[WeightedLabeledPoint],
monotonicityConstraint: MonotonicityConstraint): IsotonicRegressionModel

/**
Expand All @@ -70,47 +82,47 @@ trait IsotonicRegressionAlgorithm
* @return model
*/
def run(
input: RDD[LabeledPoint],
input: RDD[WeightedLabeledPoint],
monotonicityConstraint: MonotonicityConstraint): IsotonicRegressionModel

/**
* Run algorithm to obtain isotonic regression model
* @param input data
* @param initialWeights weights
* @param monotonicityConstraint asc or desc
* @param weights weights
* @return
*/
def run(
input: RDD[LabeledPoint],
initialWeights: Vector,
monotonicityConstraint: MonotonicityConstraint): IsotonicRegressionModel
input: RDD[WeightedLabeledPoint],
monotonicityConstraint: MonotonicityConstraint,
weights: Vector): IsotonicRegressionModel
}

class PoolAdjacentViolators extends IsotonicRegressionAlgorithm {

override def run(
input: RDD[LabeledPoint],
input: RDD[WeightedLabeledPoint],
monotonicityConstraint: MonotonicityConstraint): IsotonicRegressionModel = {
createModel(
parallelPoolAdjacentViolators(input, monotonicityConstraint),
parallelPoolAdjacentViolators(input, monotonicityConstraint, Vectors.dense(Array(0d))),
monotonicityConstraint)
}

override def run(
input: RDD[LabeledPoint],
initialWeights: Vector,
monotonicityConstraint: MonotonicityConstraint): IsotonicRegressionModel = {
???
input: RDD[WeightedLabeledPoint],
monotonicityConstraint: MonotonicityConstraint,
weights: Vector): IsotonicRegressionModel = {
createModel(
parallelPoolAdjacentViolators(input, monotonicityConstraint, weights),
monotonicityConstraint)
}

override protected def createModel(
weights: Seq[LabeledPoint],
predictions: Seq[WeightedLabeledPoint],
monotonicityConstraint: MonotonicityConstraint): IsotonicRegressionModel = {
new IsotonicRegressionModel(weights, monotonicityConstraint)
new IsotonicRegressionModel(predictions, monotonicityConstraint)
}



/**
* Performs a pool adjacent violators algorithm (PAVA)
* Uses approach with single processing of data where violators in previously processed
Expand All @@ -123,18 +135,18 @@ class PoolAdjacentViolators extends IsotonicRegressionAlgorithm {
* @return result
*/
private def poolAdjacentViolators(
in: Array[LabeledPoint],
monotonicityConstraint: MonotonicityConstraint): Array[LabeledPoint] = {
in: Array[WeightedLabeledPoint],
monotonicityConstraint: MonotonicityConstraint): Array[WeightedLabeledPoint] = {

//Pools sub array within given bounds assigning weighted average value to all elements
def pool(in: Array[LabeledPoint], start: Int, end: Int): Unit = {
def pool(in: Array[WeightedLabeledPoint], start: Int, end: Int): Unit = {
val poolSubArray = in.slice(start, end + 1)

val weightedSum = poolSubArray.map(_.label).sum
val weight = poolSubArray.length
val weightedSum = poolSubArray.map(lp => lp.label * lp.weight).sum
val weight = poolSubArray.map(_.weight).sum

for(i <- start to end) {
in(i) = LabeledPoint(weightedSum / weight, in(i).features)
in(i) = WeightedLabeledPoint(weightedSum / weight, in(i).features, in(i).weight)
}
}

Expand Down Expand Up @@ -175,8 +187,9 @@ class PoolAdjacentViolators extends IsotonicRegressionAlgorithm {
* @return result
*/
private def parallelPoolAdjacentViolators(
testData: RDD[LabeledPoint],
monotonicityConstraint: MonotonicityConstraint): Seq[LabeledPoint] = {
testData: RDD[WeightedLabeledPoint],
monotonicityConstraint: MonotonicityConstraint,
weights: Vector): Seq[WeightedLabeledPoint] = {

poolAdjacentViolators(
testData
Expand All @@ -200,14 +213,14 @@ object IsotonicRegression {
*
* @param input RDD of (label, array of features) pairs. Each pair describes a row of the data
* matrix A as well as the corresponding right hand side label y
* @param initialWeights Initial set of weights to be used. Array should be equal in size to
* @param weights Initial set of weights to be used. Array should be equal in size to
* the number of features in the data.
*/
def train(
input: RDD[LabeledPoint],
initialWeights: Vector,
monotonicityConstraint: MonotonicityConstraint): IsotonicRegressionModel = {
new PoolAdjacentViolators().run(input, initialWeights, monotonicityConstraint)
input: RDD[WeightedLabeledPoint],
monotonicityConstraint: MonotonicityConstraint,
weights: Vector): IsotonicRegressionModel = {
new PoolAdjacentViolators().run(input, monotonicityConstraint, weights)
}

/**
Expand All @@ -219,7 +232,7 @@ object IsotonicRegression {
* matrix A as well as the corresponding right hand side label y
*/
def train(
input: RDD[LabeledPoint],
input: RDD[WeightedLabeledPoint],
monotonicityConstraint: MonotonicityConstraint): IsotonicRegressionModel = {
new PoolAdjacentViolators().run(input, monotonicityConstraint)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.mllib.regression

import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.rdd.RDD

import scala.beans.BeanInfo

object WeightedLabeledPointConversions {
implicit def labeledPointToWeightedLabeledPoint(
labeledPoint: LabeledPoint): WeightedLabeledPoint = {
WeightedLabeledPoint(labeledPoint.label, labeledPoint.features, 1)
}

implicit def labeledPointRDDToWeightedLabeledPointRDD(
rdd: RDD[LabeledPoint]): RDD[WeightedLabeledPoint] = {
rdd.map(lp => WeightedLabeledPoint(lp.label, lp.features, 1))
}
}

/**
* Labeled point with weight
*/
@BeanInfo
case class WeightedLabeledPoint(label: Double, features: Vector, weight: Double)
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.mllib.util

import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.WeightedLabeledPointConversions._
import org.apache.spark.mllib.regression.{LabeledPoint, WeightedLabeledPoint}

import scala.collection.JavaConversions._

object IsotonicDataGenerator {

/**
* Return a Java List of ordered labeled points
* @param labels list of labels for the data points
* @return Java List of input.
*/
def generateIsotonicInputAsList(labels: Array[Double]): java.util.List[WeightedLabeledPoint] = {
seqAsJavaList(generateIsotonicInput(wrapDoubleArray(labels):_*))
}

/**
* Return an ordered sequence of labeled data points with default weights
* @param labels list of labels for the data points
* @return sequence of data points
*/
def generateIsotonicInput(labels: Double*): Seq[WeightedLabeledPoint] = {
labels.zip(1 to labels.size)
.map(point => labeledPointToWeightedLabeledPoint(LabeledPoint(point._1, Vectors.dense(point._2))))
}

/**
* Return an ordered sequence of labeled weighted data points
* @param labels list of labels for the data points
* @param weights list of weights for the data points
* @return sequence of data points
*/
def generateWeightedIsotonicInput(labels: Seq[Double], weights: Seq[Double]): Seq[WeightedLabeledPoint] = {
labels.zip(1 to labels.size).zip(weights)
.map(point => WeightedLabeledPoint(point._1._1, Vectors.dense(point._1._2), point._2))
}
}
Loading

0 comments on commit 629a1ce

Please sign in to comment.