Skip to content

Commit

Permalink
Merge branch 'master' of git://git.apache.org/spark into metrics-stru…
Browse files Browse the repository at this point in the history
…cture-improvement2
  • Loading branch information
sarutak committed Oct 2, 2014
2 parents 0fc1b09 + 1b9f0d6 commit a42300c
Show file tree
Hide file tree
Showing 62 changed files with 408 additions and 155 deletions.
2 changes: 1 addition & 1 deletion bagel/src/test/resources/log4j.properties
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ log4j.appender.file=org.apache.log4j.FileAppender
log4j.appender.file.append=false
log4j.appender.file.file=target/unit-tests.log
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %p %c{1}: %m%n
log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %t %p %c{1}: %m%n

# Ignore messages below warning level from Jetty, because it's a bit verbose
log4j.logger.org.eclipse.jetty=WARN
12 changes: 1 addition & 11 deletions bin/spark-sql
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
set -o posix

CLASS="org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver"
CLASS_NOT_FOUND_EXIT_STATUS=101

# Figure out where Spark is installed
FWDIR="$(cd "`dirname "$0"`"/..; pwd)"
Expand Down Expand Up @@ -53,13 +52,4 @@ source "$FWDIR"/bin/utils.sh
SUBMIT_USAGE_FUNCTION=usage
gatherSparkSubmitOpts "$@"

"$FWDIR"/bin/spark-submit --class $CLASS "${SUBMISSION_OPTS[@]}" spark-internal "${APPLICATION_OPTS[@]}"
exit_status=$?

if [[ exit_status -eq CLASS_NOT_FOUND_EXIT_STATUS ]]; then
echo
echo "Failed to load Spark SQL CLI main class $CLASS."
echo "You need to build Spark with -Phive."
fi

exit $exit_status
exec "$FWDIR"/bin/spark-submit --class $CLASS "${SUBMISSION_OPTS[@]}" spark-internal "${APPLICATION_OPTS[@]}"
11 changes: 11 additions & 0 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,17 @@
</tasks>
</configuration>
</plugin>
<plugin>
<artifactId>maven-clean-plugin</artifactId>
<configuration>
<filesets>
<fileset>
<directory>${basedir}/../python/build</directory>
</fileset>
</filesets>
<verbose>true</verbose>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
Expand Down
34 changes: 21 additions & 13 deletions core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -339,26 +339,34 @@ private[spark] object PythonRDD extends Logging {
def readRDDFromFile(sc: JavaSparkContext, filename: String, parallelism: Int):
JavaRDD[Array[Byte]] = {
val file = new DataInputStream(new FileInputStream(filename))
val objs = new collection.mutable.ArrayBuffer[Array[Byte]]
try {
while (true) {
val length = file.readInt()
val obj = new Array[Byte](length)
file.readFully(obj)
objs.append(obj)
val objs = new collection.mutable.ArrayBuffer[Array[Byte]]
try {
while (true) {
val length = file.readInt()
val obj = new Array[Byte](length)
file.readFully(obj)
objs.append(obj)
}
} catch {
case eof: EOFException => {}
}
} catch {
case eof: EOFException => {}
JavaRDD.fromRDD(sc.sc.parallelize(objs, parallelism))
} finally {
file.close()
}
JavaRDD.fromRDD(sc.sc.parallelize(objs, parallelism))
}

def readBroadcastFromFile(sc: JavaSparkContext, filename: String): Broadcast[Array[Byte]] = {
val file = new DataInputStream(new FileInputStream(filename))
val length = file.readInt()
val obj = new Array[Byte](length)
file.readFully(obj)
sc.broadcast(obj)
try {
val length = file.readInt()
val obj = new Array[Byte](length)
file.readFully(obj)
sc.broadcast(obj)
} finally {
file.close()
}
}

def writeIteratorToStream[T](iter: Iterator[T], dataOut: DataOutputStream) {
Expand Down
4 changes: 4 additions & 0 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,10 @@ object SparkSubmit {
} catch {
case e: ClassNotFoundException =>
e.printStackTrace(printStream)
if (childMainClass.contains("thriftserver")) {
println(s"Failed to load main class $childMainClass.")
println("You need to build Spark with -Phive.")
}
System.exit(CLASS_NOT_FOUND_EXIT_STATUS)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package org.apache.spark.scheduler

import java.nio.ByteBuffer

import scala.util.control.NonFatal

import org.apache.spark._
import org.apache.spark.TaskState.TaskState
import org.apache.spark.serializer.SerializerInstance
Expand All @@ -32,7 +34,7 @@ private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedul

private val THREADS = sparkEnv.conf.getInt("spark.resultGetter.threads", 4)
private val getTaskResultExecutor = Utils.newDaemonFixedThreadPool(
THREADS, "Result resolver thread")
THREADS, "task-result-getter")

protected val serializer = new ThreadLocal[SerializerInstance] {
override def initialValue(): SerializerInstance = {
Expand Down Expand Up @@ -70,7 +72,8 @@ private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedul
case cnf: ClassNotFoundException =>
val loader = Thread.currentThread.getContextClassLoader
taskSetManager.abort("ClassNotFound with classloader: " + loader)
case ex: Exception =>
// Matching NonFatal so we don't catch the ControlThrowable from the "return" above.
case NonFatal(ex) =>
logError("Exception while getting task result", ex)
taskSetManager.abort("Exception while getting task result: %s".format(ex))
}
Expand Down
3 changes: 3 additions & 0 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import java.nio.ByteBuffer
import java.util.{Properties, Locale, Random, UUID}
import java.util.concurrent.{ThreadFactory, ConcurrentHashMap, Executors, ThreadPoolExecutor}

import org.eclipse.jetty.util.MultiException

import scala.collection.JavaConversions._
import scala.collection.Map
import scala.collection.mutable.ArrayBuffer
Expand Down Expand Up @@ -1470,6 +1472,7 @@ private[spark] object Utils extends Logging {
return true
}
isBindCollision(e.getCause)
case e: MultiException => e.getThrowables.exists(isBindCollision)
case e: Exception => isBindCollision(e.getCause)
case _ => false
}
Expand Down
2 changes: 1 addition & 1 deletion core/src/test/resources/log4j.properties
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ log4j.appender.file=org.apache.log4j.FileAppender
log4j.appender.file.append=false
log4j.appender.file.file=target/unit-tests.log
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %p %c{1}: %m%n
log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %t %p %c{1}: %m%n

# Ignore messages below warning level from Jetty, because it's a bit verbose
log4j.logger.org.eclipse.jetty=WARN
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
* Usage: JavaSparkPi [slices]
*/
public final class JavaSparkPi {


public static void main(String[] args) throws Exception {
SparkConf sparkConf = new SparkConf().setAppName("JavaSparkPi");
Expand Down Expand Up @@ -61,5 +60,7 @@ public Integer call(Integer integer, Integer integer2) {
});

System.out.println("Pi is roughly " + 4.0 * count / n);

jsc.stop();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ public static void main(String[] args) throws Exception {
// Load a text file and convert each line to a Java Bean.
JavaRDD<Person> people = ctx.textFile("examples/src/main/resources/people.txt").map(
new Function<String, Person>() {
public Person call(String line) throws Exception {
@Override
public Person call(String line) {
String[] parts = line.split(",");

Person person = new Person();
Expand All @@ -82,6 +83,7 @@ public Person call(String line) throws Exception {
// The results of SQL queries are SchemaRDDs and support all the normal RDD operations.
// The columns of a row in the result can be accessed by ordinal.
List<String> teenagerNames = teenagers.map(new Function<Row, String>() {
@Override
public String call(Row row) {
return "Name: " + row.getString(0);
}
Expand All @@ -104,6 +106,7 @@ public String call(Row row) {
JavaSchemaRDD teenagers2 =
sqlCtx.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19");
teenagerNames = teenagers2.map(new Function<Row, String>() {
@Override
public String call(Row row) {
return "Name: " + row.getString(0);
}
Expand Down Expand Up @@ -136,6 +139,7 @@ public String call(Row row) {
// The results of SQL queries are JavaSchemaRDDs and support all the normal RDD operations.
// The columns of a row in the result can be accessed by ordinal.
teenagerNames = teenagers3.map(new Function<Row, String>() {
@Override
public String call(Row row) { return "Name: " + row.getString(0); }
}).collect();
for (String name: teenagerNames) {
Expand All @@ -162,12 +166,15 @@ public String call(Row row) {

JavaSchemaRDD peopleWithCity = sqlCtx.sql("SELECT name, address.city FROM people2");
List<String> nameAndCity = peopleWithCity.map(new Function<Row, String>() {
@Override
public String call(Row row) {
return "Name: " + row.getString(0) + ", City: " + row.getString(1);
}
}).collect();
for (String name: nameAndCity) {
System.out.println(name);
}

ctx.stop();
}
}
2 changes: 2 additions & 0 deletions examples/src/main/python/avro_inputformat.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,3 +78,5 @@
output = avro_rdd.map(lambda x: x[0]).collect()
for k in output:
print k

sc.stop()
2 changes: 2 additions & 0 deletions examples/src/main/python/parquet_inputformat.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,3 +57,5 @@
output = parquet_rdd.map(lambda x: x[1]).collect()
for k in output:
print k

sc.stop()
73 changes: 73 additions & 0 deletions examples/src/main/python/sql.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import os

from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql import Row, StructField, StructType, StringType, IntegerType


if __name__ == "__main__":
sc = SparkContext(appName="PythonSQL")
sqlContext = SQLContext(sc)

# RDD is created from a list of rows
some_rdd = sc.parallelize([Row(name="John", age=19),
Row(name="Smith", age=23),
Row(name="Sarah", age=18)])
# Infer schema from the first row, create a SchemaRDD and print the schema
some_schemardd = sqlContext.inferSchema(some_rdd)
some_schemardd.printSchema()

# Another RDD is created from a list of tuples
another_rdd = sc.parallelize([("John", 19), ("Smith", 23), ("Sarah", 18)])
# Schema with two fields - person_name and person_age
schema = StructType([StructField("person_name", StringType(), False),
StructField("person_age", IntegerType(), False)])
# Create a SchemaRDD by applying the schema to the RDD and print the schema
another_schemardd = sqlContext.applySchema(another_rdd, schema)
another_schemardd.printSchema()
# root
# |-- age: integer (nullable = true)
# |-- name: string (nullable = true)

# A JSON dataset is pointed to by path.
# The path can be either a single text file or a directory storing text files.
path = os.environ['SPARK_HOME'] + "examples/src/main/resources/people.json"
# Create a SchemaRDD from the file(s) pointed to by path
people = sqlContext.jsonFile(path)
# root
# |-- person_name: string (nullable = false)
# |-- person_age: integer (nullable = false)

# The inferred schema can be visualized using the printSchema() method.
people.printSchema()
# root
# |-- age: IntegerType
# |-- name: StringType

# Register this SchemaRDD as a table.
people.registerAsTable("people")

# SQL statements can be run by using the sql methods provided by sqlContext
teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")

for each in teenagers.collect():
print each[0]

sc.stop()
Original file line number Diff line number Diff line change
Expand Up @@ -136,5 +136,7 @@ object CassandraCQLTest {
classOf[CqlOutputFormat],
job.getConfiguration()
)

sc.stop()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,8 @@ object CassandraTest {
}
}.saveAsNewAPIHadoopFile("casDemo", classOf[ByteBuffer], classOf[List[Mutation]],
classOf[ColumnFamilyOutputFormat], job.getConfiguration)

sc.stop()
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,11 @@ object GroupByTest {
arr1(i) = (ranGen.nextInt(Int.MaxValue), byteArr)
}
arr1
}.cache
}.cache()
// Enforce that everything has been calculated and in cache
pairs1.count
pairs1.count()

println(pairs1.groupByKey(numReducers).count)
println(pairs1.groupByKey(numReducers).count())

sc.stop()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,5 +79,7 @@ object LogQuery {
.reduceByKey((a, b) => a.merge(b))
.collect().foreach{
case (user, query) => println("%s\t%s".format(user, query))}

sc.stop()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import org.apache.spark._
import org.apache.spark.SparkContext._

import org.apache.spark.bagel._
import org.apache.spark.bagel.Bagel._

import scala.xml.{XML,NodeSeq}

Expand Down Expand Up @@ -78,9 +77,9 @@ object WikipediaPageRank {
(id, new PRVertex(1.0 / numVertices, outEdges))
})
if (usePartitioner) {
vertices = vertices.partitionBy(new HashPartitioner(sc.defaultParallelism)).cache
vertices = vertices.partitionBy(new HashPartitioner(sc.defaultParallelism)).cache()
} else {
vertices = vertices.cache
vertices = vertices.cache()
}
println("Done parsing input file.")

Expand All @@ -100,7 +99,9 @@ object WikipediaPageRank {
(result
.filter { case (id, vertex) => vertex.value >= threshold }
.map { case (id, vertex) => "%s\t%s\n".format(id, vertex.value) }
.collect.mkString)
.collect().mkString)
println(top)

sc.stop()
}
}
Loading

0 comments on commit a42300c

Please sign in to comment.