Skip to content

Commit

Permalink
Merge pull request apache#23 from bzhang02/add_htf_spark_hdfs_lr
Browse files Browse the repository at this point in the history
[YSPARK-1595][YSPARK-1601] Move TestSparkHdfsLr to spark-starter
  • Loading branch information
Sanket Chintapalli authored and GitHub Enterprise committed Jun 2, 2020
2 parents 846fb7d + 6b00df2 commit 5d7f716
Show file tree
Hide file tree
Showing 6 changed files with 1,183 additions and 0 deletions.
5 changes: 5 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,11 @@
<artifactId>shc-core</artifactId>
<version>1.0</version>
</dependency>
<dependency>
<groupId>org.scalanlp</groupId>
<artifactId>breeze_2.11</artifactId>
<version>0.11.2</version>
</dependency>
</dependencies>

<build>
Expand Down
1,000 changes: 1,000 additions & 0 deletions src/main/resources/data/lr_data.txt

Large diffs are not rendered by default.

17 changes: 17 additions & 0 deletions src/main/resources/oozie/spark_hdfs_lr/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
Instructions for running this oozie application:

- create a directory `spark_hdfs_lr/` in HDFS for the oozie application.

- upload `workflow.xml` to `spark_hdfs_lr/apps/spark/`.

- use `mvn clean package` to create the jar package of spark-starter if you haven't done so.

- upload the jar package `spark-starter/target/spark-starter-2.0-SNAPSHOT-jar-with-dependencies.jar` to `spark_hdfs_lr/apps/lib/`.

- upload resource files `spark-starter/src/main/resources/data/lr_data.txt` to `spark_hdfs_lr/data/`.

- update `nameNode` and `jobTracker` in `job.properties` if you are running on the cluster other than AR.

- export OOZIE_URL, for example, `export OOZIE_URL=https://axonitered-oozie.red.ygrid.yahoo.com:4443/oozie/`.

- submit the oozie job using `oozie job -run -config job.properties -auth KERBEROS`
6 changes: 6 additions & 0 deletions src/main/resources/oozie/spark_hdfs_lr/job.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
nameNode=hdfs://axonitered-nn1.red.ygrid.yahoo.com:8020
jobTracker=axonitered-jt1.red.ygrid.yahoo.com:8032
wfRoot=spark_hdfs_lr
sparkTag=spark_latest
oozie.libpath=/user/${user.name}/${wfRoot}/apps/lib
oozie.wf.application.path=${nameNode}/user/${user.name}/${wfRoot}/apps/spark
78 changes: 78 additions & 0 deletions src/main/resources/oozie/spark_hdfs_lr/workflow.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
<workflow-app xmlns='uri:oozie:workflow:0.5' name='SparkHdfsLrOozieTest'>
<global>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
</global>

<start to='SparkHdfsLrClusterMode' />

<action name='SparkHdfsLrClusterMode'>
<spark xmlns="uri:oozie:spark-action:0.2">
<configuration>
<property>
<name>oozie.action.sharelib.for.spark</name>
<value>${sparkTag}</value>
</property>
</configuration>
<master>yarn</master>
<mode>cluster</mode>
<name>SparkHdfsLrClusterMode</name>
<class>com.yahoo.spark.starter.SparkHdfsLR</class>
<jar>spark-starter-2.0-SNAPSHOT-jar-with-dependencies.jar</jar>
<spark-opts>--num-executors 3 --executor-memory 2g --executor-cores 1 --queue default --conf spark.serializer=org.apache.spark.serializer.JavaSerializer</spark-opts>
<arg>${wfRoot}/data/lr_data.txt</arg>
<arg>100</arg>
</spark>
<ok to="SparkHdfsLrClientMode" />
<error to="fail" />
</action>

<action name='SparkHdfsLrClientMode'>
<spark xmlns="uri:oozie:spark-action:0.2">
<configuration>
<property>
<name>oozie.action.sharelib.for.spark</name>
<value>${sparkTag}</value>
</property>
</configuration>
<master>yarn</master>
<mode>client</mode>
<name>SparkHdfsLrClientMode</name>
<class>com.yahoo.spark.starter.SparkHdfsLR</class>
<jar>spark-starter-2.0-SNAPSHOT-jar-with-dependencies.jar</jar>
<spark-opts>--num-executors 3 --executor-memory 2g --executor-cores 1 --queue default --conf spark.serializer=org.apache.spark.serializer.JavaSerializer</spark-opts>
<arg>${wfRoot}/data/lr_data.txt</arg>
<arg>100</arg>
</spark>
<ok to="SparkHdfsLRTestNonexistHdfsFile" />
<error to="fail" />
</action>

<action name='SparkHdfsLRTestNonexistHdfsFile'>
<spark xmlns="uri:oozie:spark-action:0.2">
<configuration>
<property>
<name>oozie.action.sharelib.for.spark</name>
<value>${sparkTag}</value>
</property>
</configuration>
<master>yarn</master>
<mode>cluster</mode>
<name>SparkHdfsLRTestNonexistHdfsFile</name>
<class>com.yahoo.spark.starter.SparkHdfsLR</class>
<jar>spark-starter-2.0-SNAPSHOT-jar-with-dependencies.jar</jar>
<spark-opts>--num-executors 3 --executor-memory 2g --executor-cores 1 --queue default --conf spark.serializer=org.apache.spark.serializer.JavaSerializer</spark-opts>
<arg>${wfRoot}/data/bogusnonexistentfile.txt</arg>
<arg>100</arg>
</spark>
<ok to="fail" />
<error to="end" />
</action>

<kill name="fail">
<message>Workflow failed, error
message[${wf:errorMessage(wf:lastErrorNode())}]
</message>
</kill>
<end name='end' />
</workflow-app>
77 changes: 77 additions & 0 deletions src/main/scala/com/yahoo/spark/starter/SparkHdfsLR.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package com.yahoo.spark.starter

import java.util.Random

import scala.math.exp

import breeze.linalg.{DenseVector, Vector}

import org.apache.spark.sql.SparkSession

/**
* Logistic regression based classification.
*
* This is an example implementation for learning how to use Spark. For more conventional use,
* please refer to org.apache.spark.ml.classification.LogisticRegression.
*/
object SparkHdfsLR {
val D = 10 // Number of dimensions
val rand = new Random(42)

case class DataPoint(x: Vector[Double], y: Double)

def parsePoint(line: String): DataPoint = {
val tok = new java.util.StringTokenizer(line, " ")
val y = tok.nextToken.toDouble
val x = new Array[Double](D)
var i = 0
while (i < D) {
x(i) = tok.nextToken.toDouble; i += 1
}
DataPoint(new DenseVector(x), y)
}

def showWarning(): Unit = {
System.err.println(
"""WARN: This is a naive implementation of Logistic Regression and is given as an example!
|Please use org.apache.spark.ml.classification.LogisticRegression
|for more conventional use.
""".stripMargin)
}

def main(args: Array[String]): Unit = {

if (args.length < 2) {
System.err.println("Usage: SparkHdfsLR <file> <iters>")
System.exit(1)
}

showWarning()

val spark = SparkSession
.builder
.appName("SparkHdfsLR")
.getOrCreate()

val inputPath = args(0)
val lines = spark.read.textFile(inputPath).rdd

val points = lines.map(parsePoint).cache()
val ITERATIONS = args(1).toInt

// Initialize w to a random value
val w = DenseVector.fill(D) {2 * rand.nextDouble - 1}
println(s"Initial w: $w")

for (i <- 1 to ITERATIONS) {
println(s"On iteration $i")
val gradient = points.map { p =>
p.x * (1 / (1 + exp(-p.y * (w.dot(p.x)))) - 1) * p.y
}.reduce(_ + _)
w -= gradient
}

println(s"Final w: $w")
spark.stop()
}
}

0 comments on commit 5d7f716

Please sign in to comment.