Skip to content

Commit

Permalink
Made MLlib and examples compile
Browse files Browse the repository at this point in the history
  • Loading branch information
rxin committed Jan 27, 2015
1 parent 6d53134 commit 8c37f0a
Show file tree
Hide file tree
Showing 28 changed files with 508 additions and 113 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
import org.apache.spark.ml.tuning.CrossValidator;
import org.apache.spark.ml.tuning.CrossValidatorModel;
import org.apache.spark.ml.tuning.ParamGridBuilder;
import org.apache.spark.sql.SchemaRDD;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.Row;

Expand Down Expand Up @@ -71,7 +71,7 @@ public static void main(String[] args) {
new LabeledDocument(9L, "a e c l", 0.0),
new LabeledDocument(10L, "spark compile", 1.0),
new LabeledDocument(11L, "hadoop software", 0.0));
SchemaRDD training = jsql.applySchema(jsc.parallelize(localTraining), LabeledDocument.class);
DataFrame training = jsql.applySchema(jsc.parallelize(localTraining), LabeledDocument.class);

// Configure an ML pipeline, which consists of three stages: tokenizer, hashingTF, and lr.
Tokenizer tokenizer = new Tokenizer()
Expand Down Expand Up @@ -112,11 +112,11 @@ public static void main(String[] args) {
new Document(5L, "l m n"),
new Document(6L, "mapreduce spark"),
new Document(7L, "apache hadoop"));
SchemaRDD test = jsql.applySchema(jsc.parallelize(localTest), Document.class);
DataFrame test = jsql.applySchema(jsc.parallelize(localTest), Document.class);

// Make predictions on test documents. cvModel uses the best model found (lrModel).
cvModel.transform(test).registerAsTable("prediction");
SchemaRDD predictions = jsql.sql("SELECT id, text, score, prediction FROM prediction");
cvModel.transform(test).registerTempTable("prediction");
DataFrame predictions = jsql.sql("SELECT id, text, score, prediction FROM prediction");
for (Row r: predictions.collect()) {
System.out.println("(" + r.get(0) + ", " + r.get(1) + ") --> score=" + r.get(2)
+ ", prediction=" + r.get(3));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import org.apache.spark.ml.classification.LogisticRegression;
import org.apache.spark.mllib.linalg.Vectors;
import org.apache.spark.mllib.regression.LabeledPoint;
import org.apache.spark.sql.SchemaRDD;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.Row;

Expand All @@ -54,7 +54,7 @@ public static void main(String[] args) {
new LabeledPoint(0.0, Vectors.dense(2.0, 1.0, -1.0)),
new LabeledPoint(0.0, Vectors.dense(2.0, 1.3, 1.0)),
new LabeledPoint(1.0, Vectors.dense(0.0, 1.2, -0.5)));
SchemaRDD training = jsql.applySchema(jsc.parallelize(localTraining), LabeledPoint.class);
DataFrame training = jsql.applySchema(jsc.parallelize(localTraining), LabeledPoint.class);

// Create a LogisticRegression instance. This instance is an Estimator.
LogisticRegression lr = new LogisticRegression();
Expand Down Expand Up @@ -94,14 +94,14 @@ public static void main(String[] args) {
new LabeledPoint(1.0, Vectors.dense(-1.0, 1.5, 1.3)),
new LabeledPoint(0.0, Vectors.dense(3.0, 2.0, -0.1)),
new LabeledPoint(1.0, Vectors.dense(0.0, 2.2, -1.5)));
SchemaRDD test = jsql.applySchema(jsc.parallelize(localTest), LabeledPoint.class);
DataFrame test = jsql.applySchema(jsc.parallelize(localTest), LabeledPoint.class);

// Make predictions on test documents using the Transformer.transform() method.
// LogisticRegression.transform will only use the 'features' column.
// Note that model2.transform() outputs a 'probability' column instead of the usual 'score'
// column since we renamed the lr.scoreCol parameter previously.
model2.transform(test).registerAsTable("results");
SchemaRDD results =
model2.transform(test).registerTempTable("results");
DataFrame results =
jsql.sql("SELECT features, label, probability, prediction FROM results");
for (Row r: results.collect()) {
System.out.println("(" + r.get(0) + ", " + r.get(1) + ") -> prob=" + r.get(2)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import org.apache.spark.ml.classification.LogisticRegression;
import org.apache.spark.ml.feature.HashingTF;
import org.apache.spark.ml.feature.Tokenizer;
import org.apache.spark.sql.SchemaRDD;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.Row;

Expand All @@ -54,7 +54,7 @@ public static void main(String[] args) {
new LabeledDocument(1L, "b d", 0.0),
new LabeledDocument(2L, "spark f g h", 1.0),
new LabeledDocument(3L, "hadoop mapreduce", 0.0));
SchemaRDD training = jsql.applySchema(jsc.parallelize(localTraining), LabeledDocument.class);
DataFrame training = jsql.applySchema(jsc.parallelize(localTraining), LabeledDocument.class);

// Configure an ML pipeline, which consists of three stages: tokenizer, hashingTF, and lr.
Tokenizer tokenizer = new Tokenizer()
Expand All @@ -79,11 +79,11 @@ public static void main(String[] args) {
new Document(5L, "l m n"),
new Document(6L, "mapreduce spark"),
new Document(7L, "apache hadoop"));
SchemaRDD test = jsql.applySchema(jsc.parallelize(localTest), Document.class);
DataFrame test = jsql.applySchema(jsc.parallelize(localTest), Document.class);

// Make predictions on test documents.
model.transform(test).registerAsTable("prediction");
SchemaRDD predictions = jsql.sql("SELECT id, text, score, prediction FROM prediction");
model.transform(test).registerTempTable("prediction");
DataFrame predictions = jsql.sql("SELECT id, text, score, prediction FROM prediction");
for (Row r: predictions.collect()) {
System.out.println("(" + r.get(0) + ", " + r.get(1) + ") --> score=" + r.get(2)
+ ", prediction=" + r.get(3));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;

import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.SchemaRDD;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;

public class JavaSparkSQL {
public static class Person implements Serializable {
Expand Down Expand Up @@ -74,11 +74,11 @@ public Person call(String line) {
});

// Apply a schema to an RDD of Java Beans and register it as a table.
SchemaRDD schemaPeople = sqlCtx.applySchema(people, Person.class);
DataFrame schemaPeople = sqlCtx.applySchema(people, Person.class);
schemaPeople.registerTempTable("people");

// SQL can be run over RDDs that have been registered as tables.
SchemaRDD teenagers = sqlCtx.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19");
DataFrame teenagers = sqlCtx.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19");

// The results of SQL queries are SchemaRDDs and support all the normal RDD operations.
// The columns of a row in the result can be accessed by ordinal.
Expand All @@ -99,11 +99,11 @@ public String call(Row row) {
// Read in the parquet file created above.
// Parquet files are self-describing so the schema is preserved.
// The result of loading a parquet file is also a JavaSchemaRDD.
SchemaRDD parquetFile = sqlCtx.parquetFile("people.parquet");
DataFrame parquetFile = sqlCtx.parquetFile("people.parquet");

//Parquet files can also be registered as tables and then used in SQL statements.
parquetFile.registerTempTable("parquetFile");
SchemaRDD teenagers2 =
DataFrame teenagers2 =
sqlCtx.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19");
teenagerNames = teenagers2.toJavaRDD().map(new Function<Row, String>() {
@Override
Expand All @@ -120,7 +120,7 @@ public String call(Row row) {
// The path can be either a single text file or a directory storing text files.
String path = "examples/src/main/resources/people.json";
// Create a JavaSchemaRDD from the file(s) pointed by path
SchemaRDD peopleFromJsonFile = sqlCtx.jsonFile(path);
DataFrame peopleFromJsonFile = sqlCtx.jsonFile(path);

// Because the schema of a JSON dataset is automatically inferred, to write queries,
// it is better to take a look at what is the schema.
Expand All @@ -134,7 +134,7 @@ public String call(Row row) {
peopleFromJsonFile.registerTempTable("people");

// SQL statements can be run by using the sql methods provided by sqlCtx.
SchemaRDD teenagers3 = sqlCtx.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19");
DataFrame teenagers3 = sqlCtx.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19");

// The results of SQL queries are JavaSchemaRDDs and support all the normal RDD operations.
// The columns of a row in the result can be accessed by ordinal.
Expand All @@ -151,7 +151,7 @@ public String call(Row row) {
List<String> jsonData = Arrays.asList(
"{\"name\":\"Yin\",\"address\":{\"city\":\"Columbus\",\"state\":\"Ohio\"}}");
JavaRDD<String> anotherPeopleRDD = ctx.parallelize(jsonData);
SchemaRDD peopleFromJsonRDD = sqlCtx.jsonRDD(anotherPeopleRDD.rdd());
DataFrame peopleFromJsonRDD = sqlCtx.jsonRDD(anotherPeopleRDD.rdd());

// Take a look at the schema of this new JavaSchemaRDD.
peopleFromJsonRDD.printSchema();
Expand All @@ -164,7 +164,7 @@ public String call(Row row) {

peopleFromJsonRDD.registerTempTable("people2");

SchemaRDD peopleWithCity = sqlCtx.sql("SELECT name, address.city FROM people2");
DataFrame peopleWithCity = sqlCtx.sql("SELECT name, address.city FROM people2");
List<String> nameAndCity = peopleWithCity.toJavaRDD().map(new Function<Row, String>() {
@Override
public String call(Row row) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.spark.examples.ml

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.SparkContext._
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
Expand Down Expand Up @@ -101,7 +100,7 @@ object CrossValidatorExample {

// Make predictions on test documents. cvModel uses the best model found (lrModel).
cvModel.transform(test)
.select('id, 'text, 'score, 'prediction)
.select("id", "text", "score", "prediction")
.collect()
.foreach { case Row(id: Long, text: String, score: Double, prediction: Double) =>
println("(" + id + ", " + text + ") --> score=" + score + ", prediction=" + prediction)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.spark.examples.ml

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.SparkContext._
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.param.ParamMap
import org.apache.spark.mllib.linalg.{Vector, Vectors}
Expand Down Expand Up @@ -92,7 +91,7 @@ object SimpleParamsExample {
// Note that model2.transform() outputs a 'probability' column instead of the usual 'score'
// column since we renamed the lr.scoreCol parameter previously.
model2.transform(test)
.select('features, 'label, 'probability, 'prediction)
.select("features", "label", "probability", "prediction")
.collect()
.foreach { case Row(features: Vector, label: Double, prob: Double, prediction: Double) =>
println("(" + features + ", " + label + ") -> prob=" + prob + ", prediction=" + prediction)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ package org.apache.spark.examples.ml
import scala.beans.BeanInfo

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.SparkContext._
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.feature.{HashingTF, Tokenizer}
Expand Down Expand Up @@ -80,7 +79,7 @@ object SimpleTextClassificationPipeline {

// Make predictions on test documents.
model.transform(test)
.select('id, 'text, 'score, 'prediction)
.select("id", "text", "score", "prediction")
.collect()
.foreach { case Row(id: Long, text: String, score: Double, prediction: Double) =>
println("(" + id + ", " + text + ") --> score=" + score + ", prediction=" + prediction)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,10 @@ import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.stat.MultivariateOnlineSummarizer
import org.apache.spark.mllib.util.MLUtils
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Row, SQLContext, SchemaRDD}
import org.apache.spark.sql.{Row, SQLContext, DataFrame}

/**
* An example of how to use [[org.apache.spark.sql.SchemaRDD]] as a Dataset for ML. Run with
* An example of how to use [[org.apache.spark.sql.DataFrame]] as a Dataset for ML. Run with
* {{{
* ./bin/run-example org.apache.spark.examples.mllib.DatasetExample [options]
* }}}
Expand Down Expand Up @@ -81,18 +81,18 @@ object DatasetExample {
println(s"Loaded ${origData.count()} instances from file: ${params.input}")

// Convert input data to SchemaRDD explicitly.
val schemaRDD: SchemaRDD = origData
val schemaRDD: DataFrame = origData
println(s"Inferred schema:\n${schemaRDD.schema.prettyJson}")
println(s"Converted to SchemaRDD with ${schemaRDD.count()} records")

// Select columns, using implicit conversion to SchemaRDD.
val labelsSchemaRDD: SchemaRDD = origData.select('label)
val labelsSchemaRDD: DataFrame = origData.select("label")
val labels: RDD[Double] = labelsSchemaRDD.map { case Row(v: Double) => v }
val numLabels = labels.count()
val meanLabel = labels.fold(0.0)(_ + _) / numLabels
println(s"Selected label column with average value $meanLabel")

val featuresSchemaRDD: SchemaRDD = origData.select('features)
val featuresSchemaRDD: DataFrame = origData.select("features")
val features: RDD[Vector] = featuresSchemaRDD.map { case Row(v: Vector) => v }
val featureSummary = features.aggregate(new MultivariateOnlineSummarizer())(
(summary, feat) => summary.add(feat),
Expand All @@ -109,7 +109,7 @@ object DatasetExample {
val newDataset = sqlContext.parquetFile(outputDir)

println(s"Schema from Parquet: ${newDataset.schema.prettyJson}")
val newFeatures = newDataset.select('features).map { case Row(v: Vector) => v }
val newFeatures = newDataset.select("features").map { case Row(v: Vector) => v }
val newFeaturesSummary = newFeatures.aggregate(new MultivariateOnlineSummarizer())(
(summary, feat) => summary.add(feat),
(sum1, sum2) => sum1.merge(sum2))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package org.apache.spark.examples.sql

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.dsl._
import org.apache.spark.sql.dsl.literals._

// One method for defining the schema of an RDD is to make a case class with the desired column
// names and types.
Expand Down Expand Up @@ -54,7 +56,7 @@ object RDDRelation {
rddFromSql.map(row => s"Key: ${row(0)}, Value: ${row(1)}").collect().foreach(println)

// Queries can also be written using a LINQ-like Scala DSL.
rdd.where('key === 1).orderBy('value.asc).select('key).collect().foreach(println)
rdd.where($"key" === 1).orderBy($"value".asc).select($"key").collect().foreach(println)

// Write out an RDD as a parquet file.
rdd.saveAsParquetFile("pair.parquet")
Expand All @@ -63,7 +65,7 @@ object RDDRelation {
val parquetFile = sqlContext.parquetFile("pair.parquet")

// Queries can be run using the DSL on parequet files just like the original RDD.
parquetFile.where('key === 1).select('value as 'a).collect().foreach(println)
parquetFile.where($"key" === 1).select($"value".as("a")).collect().foreach(println)

// These files can also be registered as tables.
parquetFile.registerTempTable("parquetFile")
Expand Down
8 changes: 4 additions & 4 deletions mllib/src/main/scala/org/apache/spark/ml/Estimator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import scala.annotation.varargs

import org.apache.spark.annotation.AlphaComponent
import org.apache.spark.ml.param.{ParamMap, ParamPair, Params}
import org.apache.spark.sql.SchemaRDD
import org.apache.spark.sql.DataFrame

/**
* :: AlphaComponent ::
Expand All @@ -38,7 +38,7 @@ abstract class Estimator[M <: Model[M]] extends PipelineStage with Params {
* @return fitted model
*/
@varargs
def fit(dataset: SchemaRDD, paramPairs: ParamPair[_]*): M = {
def fit(dataset: DataFrame, paramPairs: ParamPair[_]*): M = {
val map = new ParamMap().put(paramPairs: _*)
fit(dataset, map)
}
Expand All @@ -50,7 +50,7 @@ abstract class Estimator[M <: Model[M]] extends PipelineStage with Params {
* @param paramMap parameter map
* @return fitted model
*/
def fit(dataset: SchemaRDD, paramMap: ParamMap): M
def fit(dataset: DataFrame, paramMap: ParamMap): M

/**
* Fits multiple models to the input data with multiple sets of parameters.
Expand All @@ -61,7 +61,7 @@ abstract class Estimator[M <: Model[M]] extends PipelineStage with Params {
* @param paramMaps an array of parameter maps
* @return fitted models, matching the input parameter maps
*/
def fit(dataset: SchemaRDD, paramMaps: Array[ParamMap]): Seq[M] = {
def fit(dataset: DataFrame, paramMaps: Array[ParamMap]): Seq[M] = {
paramMaps.map(fit(dataset, _))
}
}
4 changes: 2 additions & 2 deletions mllib/src/main/scala/org/apache/spark/ml/Evaluator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.ml

import org.apache.spark.annotation.AlphaComponent
import org.apache.spark.ml.param.ParamMap
import org.apache.spark.sql.SchemaRDD
import org.apache.spark.sql.DataFrame

/**
* :: AlphaComponent ::
Expand All @@ -35,5 +35,5 @@ abstract class Evaluator extends Identifiable {
* @param paramMap parameter map that specifies the input columns and output metrics
* @return metric
*/
def evaluate(dataset: SchemaRDD, paramMap: ParamMap): Double
def evaluate(dataset: DataFrame, paramMap: ParamMap): Double
}
6 changes: 3 additions & 3 deletions mllib/src/main/scala/org/apache/spark/ml/Pipeline.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import scala.collection.mutable.ListBuffer
import org.apache.spark.Logging
import org.apache.spark.annotation.AlphaComponent
import org.apache.spark.ml.param.{Param, ParamMap}
import org.apache.spark.sql.SchemaRDD
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.types.StructType

/**
Expand Down Expand Up @@ -88,7 +88,7 @@ class Pipeline extends Estimator[PipelineModel] {
* @param paramMap parameter map
* @return fitted pipeline
*/
override def fit(dataset: SchemaRDD, paramMap: ParamMap): PipelineModel = {
override def fit(dataset: DataFrame, paramMap: ParamMap): PipelineModel = {
transformSchema(dataset.schema, paramMap, logging = true)
val map = this.paramMap ++ paramMap
val theStages = map(stages)
Expand Down Expand Up @@ -162,7 +162,7 @@ class PipelineModel private[ml] (
}
}

override def transform(dataset: SchemaRDD, paramMap: ParamMap): SchemaRDD = {
override def transform(dataset: DataFrame, paramMap: ParamMap): DataFrame = {
// Precedence of ParamMaps: paramMap > this.paramMap > fittingParamMap
val map = (fittingParamMap ++ this.paramMap) ++ paramMap
transformSchema(dataset.schema, map, logging = true)
Expand Down
Loading

0 comments on commit 8c37f0a

Please sign in to comment.