Skip to content

Commit

Permalink
SPARK-3278 changed Java api to match Scala api's (Double, Double, Dou…
Browse files Browse the repository at this point in the history
…ble)
  • Loading branch information
zapletal-martin committed Jan 13, 2015
1 parent 3c2954b commit 0d14bd3
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ class PoolAdjacentViolators private [mllib]

override def run(
input: RDD[(Double, Double, Double)],
isotonic: Boolean): IsotonicRegressionModel = {
isotonic: Boolean = true): IsotonicRegressionModel = {
createModel(
parallelPoolAdjacentViolators(input, isotonic),
isotonic)
Expand Down Expand Up @@ -217,18 +217,20 @@ object IsotonicRegression {
}

/**
* Train a monotone regression model given an RDD of (label, feature).
* Train a monotone regression model given an RDD of (label, feature, weight).
* Label is the dependent y value
* Weight defaults to 1
* Weight of the data point is the number of measurements. Default is 1
*
* @param input RDD of (label, feature).
* @param input RDD of (label, feature, weight).
* @param isotonic isotonic (increasing) or antitonic (decreasing) sequence
* @return
*/
def train(
input: JavaPairRDD[java.lang.Double, java.lang.Double],
input: JavaRDD[(java.lang.Double, java.lang.Double, java.lang.Double)],
isotonic: Boolean): IsotonicRegressionModel = {
new PoolAdjacentViolators()
.run(input.rdd.map(x => (x._1.doubleValue(), x._2.doubleValue(), 1d)), isotonic)
.run(
input.rdd.map(x => (x._1.doubleValue(), x._2.doubleValue(), x._3.doubleValue())),
isotonic)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,11 @@ object IsotonicDataGenerator {
* @param labels list of labels for the data points
* @return Java List of input.
*/
def generateIsotonicInputAsList(labels: Array[Double]): java.util.List[(JDouble, JDouble)] = {
def generateIsotonicInputAsList(
labels: Array[Double]):java.util.List[(JDouble, JDouble, JDouble)] = {
seqAsJavaList(
generateIsotonicInput(
wrapDoubleArray(labels):_*).map(x => (new JDouble(x._1), new JDouble(x._2))))
generateIsotonicInput(wrapDoubleArray(labels):_*)
.map(x => (new JDouble(x._1), new JDouble(x._2), new JDouble(1))))
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.spark.mllib.regression;

import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
Expand All @@ -26,7 +25,7 @@
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import scala.Tuple2;
import scala.Tuple3;

import java.io.Serializable;
import java.util.List;
Expand All @@ -45,11 +44,11 @@ public void tearDown() {
sc = null;
}

double difference(List<Tuple2<Double, Double>> expected, IsotonicRegressionModel model) {
double difference(List<Tuple3<Double, Double, Double>> expected, IsotonicRegressionModel model) {
double diff = 0;

for(int i = 0; i < model.predictions().length(); i++) {
Tuple2<Double, Double> exp = expected.get(i);
Tuple3<Double, Double, Double> exp = expected.get(i);
diff += Math.abs(model.predict(exp._2()) - exp._1());
}

Expand All @@ -58,13 +57,13 @@ public void tearDown() {

@Test
public void runIsotonicRegressionUsingStaticMethod() {
JavaPairRDD<Double, Double> trainRDD = sc.parallelizePairs(
JavaRDD<Tuple3<Double, Double, Double>> trainRDD = sc.parallelize(
IsotonicDataGenerator.generateIsotonicInputAsList(
new double[]{1, 2, 3, 3, 1, 6, 7, 8, 11, 9, 10, 12})).cache();

IsotonicRegressionModel model = IsotonicRegression.train(trainRDD, true);

List<Tuple2<Double, Double>> expected = IsotonicDataGenerator
List<Tuple3<Double, Double, Double>> expected = IsotonicDataGenerator
.generateIsotonicInputAsList(
new double[] {1, 2, 7d/3, 7d/3, 7d/3, 6, 7, 8, 10, 10, 10, 12});

Expand All @@ -73,15 +72,15 @@ public void runIsotonicRegressionUsingStaticMethod() {

@Test
public void testPredictJavaRDD() {
JavaPairRDD<Double, Double> trainRDD = sc.parallelizePairs(
JavaRDD<Tuple3<Double, Double, Double>> trainRDD = sc.parallelize(
IsotonicDataGenerator.generateIsotonicInputAsList(
new double[]{1, 2, 3, 3, 1, 6, 7, 8, 11, 9, 10, 12})).cache();

IsotonicRegressionModel model = IsotonicRegression.train(trainRDD, true);

JavaRDD<Double> testRDD = trainRDD.map(new Function<Tuple2<Double, Double>, Double>() {
JavaRDD<Double> testRDD = trainRDD.map(new Function<Tuple3<Double, Double, Double>, Double>() {
@Override
public Double call(Tuple2<Double, Double> v) throws Exception {
public Double call(Tuple3<Double, Double, Double> v) throws Exception {
return v._2();
}
});
Expand All @@ -91,5 +90,4 @@ public Double call(Tuple2<Double, Double> v) throws Exception {
Assert.assertTrue(predictions.get(0) == 1d);
Assert.assertTrue(predictions.get(11) == 12d);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -197,16 +197,15 @@ class IsotonicRegressionClusterSuite
extends FunSuite
with LocalClusterSparkContext {

//TODO: FIX
test("task size should be small in both training and prediction") {
val n = 135000
val n = 1000

val trainData = (0 to n).map(i => (i.toDouble, i.toDouble, 1d))
val points = sc.parallelize(trainData, 1)
val points = sc.parallelize(trainData, 2)

// If we serialize data directly in the task closure, the size of the serialized task would be
// greater than 1MB and hence Spark would throw an error.
val model = IsotonicRegression.train(points, true)
val model = IsotonicRegression.train(points)
val predictions = model.predict(points.map(_._2))
}
}
}

0 comments on commit 0d14bd3

Please sign in to comment.