Skip to content

Commit

Permalink
Merge pull request #7 from markhamstra/master-csd
Browse files Browse the repository at this point in the history
Rebased to AMPLab master
  • Loading branch information
vnivargi committed Jul 7, 2013
2 parents 928aa6f + 4e5860d commit 4cc918a
Show file tree
Hide file tree
Showing 59 changed files with 1,389 additions and 940 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,5 @@ streaming-tests.log
dependency-reduced-pom.xml
.ensime
.ensime_lucene
checkpoint
derby.log
52 changes: 52 additions & 0 deletions bin/compute-classpath.cmd
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
@echo off

rem This script computes Spark's classpath and prints it to stdout; it's used by both the "run"
rem script and the ExecutorRunner in standalone cluster mode.

set SCALA_VERSION=2.9.3

rem Figure out where the Spark framework is installed
set FWDIR=%~dp0..\

rem Load environment variables from conf\spark-env.cmd, if it exists
if exist "%FWDIR%conf\spark-env.cmd" call "%FWDIR%conf\spark-env.cmd"

set CORE_DIR=%FWDIR%core
set REPL_DIR=%FWDIR%repl
set EXAMPLES_DIR=%FWDIR%examples
set BAGEL_DIR=%FWDIR%bagel
set STREAMING_DIR=%FWDIR%streaming
set PYSPARK_DIR=%FWDIR%python

rem Build up classpath
set CLASSPATH=%SPARK_CLASSPATH%;%MESOS_CLASSPATH%;%FWDIR%conf;%CORE_DIR%\target\scala-%SCALA_VERSION%\classes
set CLASSPATH=%CLASSPATH%;%CORE_DIR%\target\scala-%SCALA_VERSION%\test-classes;%CORE_DIR%\src\main\resources
set CLASSPATH=%CLASSPATH%;%STREAMING_DIR%\target\scala-%SCALA_VERSION%\classes;%STREAMING_DIR%\target\scala-%SCALA_VERSION%\test-classes
set CLASSPATH=%CLASSPATH%;%STREAMING_DIR%\lib\org\apache\kafka\kafka\0.7.2-spark\*
set CLASSPATH=%CLASSPATH%;%REPL_DIR%\target\scala-%SCALA_VERSION%\classes;%EXAMPLES_DIR%\target\scala-%SCALA_VERSION%\classes
set CLASSPATH=%CLASSPATH%;%FWDIR%lib_managed\jars\*
set CLASSPATH=%CLASSPATH%;%FWDIR%lib_managed\bundles\*
set CLASSPATH=%CLASSPATH%;%FWDIR%repl\lib\*
set CLASSPATH=%CLASSPATH%;%FWDIR%python\lib\*
set CLASSPATH=%CLASSPATH%;%BAGEL_DIR%\target\scala-%SCALA_VERSION%\classes

rem Add hadoop conf dir - else FileSystem.*, etc fail
rem Note, this assumes that there is either a HADOOP_CONF_DIR or YARN_CONF_DIR which hosts
rem the configurtion files.
if "x%HADOOP_CONF_DIR%"=="x" goto no_hadoop_conf_dir
set CLASSPATH=%CLASSPATH%;%HADOOP_CONF_DIR%
:no_hadoop_conf_dir

if "x%YARN_CONF_DIR%"=="x" goto no_yarn_conf_dir
set CLASSPATH=%CLASSPATH%;%YARN_CONF_DIR%
:no_yarn_conf_dir

rem Add Scala standard library
set CLASSPATH=%CLASSPATH%;%SCALA_HOME%\lib\scala-library.jar;%SCALA_HOME%\lib\scala-compiler.jar;%SCALA_HOME%\lib\jline.jar

rem A bit of a hack to allow calling this script within run2.cmd without seeing output
if "%DONT_PRINT_CLASSPATH%"=="1" goto exit

echo %CLASSPATH%

:exit
89 changes: 89 additions & 0 deletions bin/compute-classpath.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
#!/bin/bash

# This script computes Spark's classpath and prints it to stdout; it's used by both the "run"
# script and the ExecutorRunner in standalone cluster mode.

SCALA_VERSION=2.9.3

# Figure out where Spark is installed
FWDIR="$(cd `dirname $0`/..; pwd)"

# Load environment variables from conf/spark-env.sh, if it exists
if [ -e $FWDIR/conf/spark-env.sh ] ; then
. $FWDIR/conf/spark-env.sh
fi

CORE_DIR="$FWDIR/core"
REPL_DIR="$FWDIR/repl"
REPL_BIN_DIR="$FWDIR/repl-bin"
EXAMPLES_DIR="$FWDIR/examples"
BAGEL_DIR="$FWDIR/bagel"
STREAMING_DIR="$FWDIR/streaming"
PYSPARK_DIR="$FWDIR/python"

# Build up classpath
CLASSPATH="$SPARK_CLASSPATH"
CLASSPATH="$CLASSPATH:$FWDIR/conf"
CLASSPATH="$CLASSPATH:$CORE_DIR/target/scala-$SCALA_VERSION/classes"
if [ -n "$SPARK_TESTING" ] ; then
CLASSPATH="$CLASSPATH:$CORE_DIR/target/scala-$SCALA_VERSION/test-classes"
CLASSPATH="$CLASSPATH:$STREAMING_DIR/target/scala-$SCALA_VERSION/test-classes"
fi
CLASSPATH="$CLASSPATH:$CORE_DIR/src/main/resources"
CLASSPATH="$CLASSPATH:$REPL_DIR/target/scala-$SCALA_VERSION/classes"
CLASSPATH="$CLASSPATH:$EXAMPLES_DIR/target/scala-$SCALA_VERSION/classes"
CLASSPATH="$CLASSPATH:$STREAMING_DIR/target/scala-$SCALA_VERSION/classes"
CLASSPATH="$CLASSPATH:$STREAMING_DIR/lib/org/apache/kafka/kafka/0.7.2-spark/*" # <-- our in-project Kafka Jar
if [ -e "$FWDIR/lib_managed" ]; then
CLASSPATH="$CLASSPATH:$FWDIR/lib_managed/jars/*"
CLASSPATH="$CLASSPATH:$FWDIR/lib_managed/bundles/*"
fi
CLASSPATH="$CLASSPATH:$REPL_DIR/lib/*"
# Add the shaded JAR for Maven builds
if [ -e $REPL_BIN_DIR/target ]; then
for jar in `find "$REPL_BIN_DIR/target" -name 'spark-repl-*-shaded-hadoop*.jar'`; do
CLASSPATH="$CLASSPATH:$jar"
done
# The shaded JAR doesn't contain examples, so include those separately
EXAMPLES_JAR=`ls "$EXAMPLES_DIR/target/spark-examples"*[0-9T].jar`
CLASSPATH+=":$EXAMPLES_JAR"
fi
CLASSPATH="$CLASSPATH:$BAGEL_DIR/target/scala-$SCALA_VERSION/classes"
for jar in `find $PYSPARK_DIR/lib -name '*jar'`; do
CLASSPATH="$CLASSPATH:$jar"
done

# Figure out the JAR file that our examples were packaged into. This includes a bit of a hack
# to avoid the -sources and -doc packages that are built by publish-local.
if [ -e "$EXAMPLES_DIR/target/scala-$SCALA_VERSION/spark-examples"*[0-9T].jar ]; then
# Use the JAR from the SBT build
export SPARK_EXAMPLES_JAR=`ls "$EXAMPLES_DIR/target/scala-$SCALA_VERSION/spark-examples"*[0-9T].jar`
fi
if [ -e "$EXAMPLES_DIR/target/spark-examples"*[0-9T].jar ]; then
# Use the JAR from the Maven build
export SPARK_EXAMPLES_JAR=`ls "$EXAMPLES_DIR/target/spark-examples"*[0-9T].jar`
fi

# Add hadoop conf dir - else FileSystem.*, etc fail !
# Note, this assumes that there is either a HADOOP_CONF_DIR or YARN_CONF_DIR which hosts
# the configurtion files.
if [ "x" != "x$HADOOP_CONF_DIR" ]; then
CLASSPATH="$CLASSPATH:$HADOOP_CONF_DIR"
fi
if [ "x" != "x$YARN_CONF_DIR" ]; then
CLASSPATH="$CLASSPATH:$YARN_CONF_DIR"
fi

# Add Scala standard library
if [ -z "$SCALA_LIBRARY_PATH" ]; then
if [ -z "$SCALA_HOME" ]; then
echo "SCALA_HOME is not set" >&2
exit 1
fi
SCALA_LIBRARY_PATH="$SCALA_HOME/lib"
fi
CLASSPATH="$CLASSPATH:$SCALA_LIBRARY_PATH/scala-library.jar"
CLASSPATH="$CLASSPATH:$SCALA_LIBRARY_PATH/scala-compiler.jar"
CLASSPATH="$CLASSPATH:$SCALA_LIBRARY_PATH/jline.jar"

echo "$CLASSPATH"
18 changes: 6 additions & 12 deletions conf/spark-env.sh.template
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,17 @@
# This file contains environment variables required to run Spark. Copy it as
# spark-env.sh and edit that to configure Spark for your site. At a minimum,
# the following two variables should be set:
# - MESOS_NATIVE_LIBRARY, to point to your Mesos native library (libmesos.so)
# - SCALA_HOME, to point to your Scala installation
# - SCALA_HOME, to point to your Scala installation, or SCALA_LIBRARY_PATH to
# point to the directory for Scala library JARs (if you install Scala as a
# Debian or RPM package, these are in a separate path, often /usr/share/java)
# - MESOS_NATIVE_LIBRARY, to point to your libmesos.so if you use Mesos
#
# If using the standalone deploy mode, you can also set variables for it:
# - SPARK_MASTER_IP, to bind the master to a different IP address
# - SPARK_MASTER_PORT / SPARK_MASTER_WEBUI_PORT, to use non-default ports
# - SPARK_WORKER_CORES, to set the number of cores to use on this machine
# - SPARK_WORKER_MEMORY, to set how much memory to use (e.g. 1000m, 2g)
# - SPARK_WORKER_PORT / SPARK_WORKER_WEBUI_PORT
# - SPARK_WORKER_INSTANCES, to set the number of worker instances/processes to be spawned on every slave machine
#
# Finally, Spark also relies on the following variables, but these can be set
# on just the *master* (i.e. in your driver program), and will automatically
# be propagated to workers:
# - SPARK_MEM, to change the amount of memory used per node (this should
# be in the same format as the JVM's -Xmx option, e.g. 300m or 1g)
# - SPARK_CLASSPATH, to add elements to Spark's classpath
# - SPARK_JAVA_OPTS, to add JVM options
# - SPARK_LIBRARY_PATH, to add extra search paths for native libraries.
# - SPARK_WORKER_INSTANCES, to set the number of worker instances/processes
# to be spawned on every slave machine

28 changes: 9 additions & 19 deletions core/src/main/java/spark/network/netty/FileClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,20 @@
import io.netty.channel.oio.OioEventLoopGroup;
import io.netty.channel.socket.oio.OioSocketChannel;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class FileClient {

private Logger LOG = LoggerFactory.getLogger(this.getClass().getName());
private FileClientHandler handler = null;
private Channel channel = null;
private Bootstrap bootstrap = null;
private int connectTimeout = 60*1000; // 1 min

public FileClient(FileClientHandler handler) {
public FileClient(FileClientHandler handler, int connectTimeout) {
this.handler = handler;
this.connectTimeout = connectTimeout;
}

public void init() {
Expand All @@ -25,25 +30,10 @@ public void init() {
.channel(OioSocketChannel.class)
.option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectTimeout)
.handler(new FileClientChannelInitializer(handler));
}

public static final class ChannelCloseListener implements ChannelFutureListener {
private FileClient fc = null;

public ChannelCloseListener(FileClient fc){
this.fc = fc;
}

@Override
public void operationComplete(ChannelFuture future) {
if (fc.bootstrap!=null){
fc.bootstrap.shutdown();
fc.bootstrap = null;
}
}
}

public void connect(String host, int port) {
try {
// Start the connection attempt.
Expand All @@ -58,8 +48,8 @@ public void connect(String host, int port) {
public void waitForClose() {
try {
channel.closeFuture().sync();
} catch (InterruptedException e){
e.printStackTrace();
} catch (InterruptedException e) {
LOG.warn("FileClient interrupted", e);
}
}

Expand Down
8 changes: 8 additions & 0 deletions core/src/main/java/spark/network/netty/FileClientHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,14 @@ abstract class FileClientHandler extends ChannelInboundByteHandlerAdapter {

private FileHeader currentHeader = null;

private volatile boolean handlerCalled = false;

public boolean isComplete() {
return handlerCalled;
}

public abstract void handle(ChannelHandlerContext ctx, ByteBuf in, FileHeader header);
public abstract void handleError(String blockId);

@Override
public ByteBuf newInboundBuffer(ChannelHandlerContext ctx) {
Expand All @@ -26,6 +33,7 @@ public void inboundBufferUpdated(ChannelHandlerContext ctx, ByteBuf in) {
// get file
if(in.readableBytes() >= currentHeader.fileLen()) {
handle(ctx, in, currentHeader);
handlerCalled = true;
currentHeader = null;
ctx.close();
}
Expand Down
15 changes: 12 additions & 3 deletions core/src/main/scala/spark/PairRDDFunctions.scala
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package spark

import java.nio.ByteBuffer
import java.util.{Date, HashMap => JHashMap}
import java.text.SimpleDateFormat

Expand Down Expand Up @@ -64,8 +65,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
throw new SparkException("Default partitioner cannot partition array keys.")
}
}
val aggregator =
new Aggregator[K, V, C](createCombiner, mergeValue, mergeCombiners)
val aggregator = new Aggregator[K, V, C](createCombiner, mergeValue, mergeCombiners)
if (self.partitioner == Some(partitioner)) {
self.mapPartitions(aggregator.combineValuesByKey(_), true)
} else if (mapSideCombine) {
Expand Down Expand Up @@ -97,7 +97,16 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
* list concatenation, 0 for addition, or 1 for multiplication.).
*/
def foldByKey(zeroValue: V, partitioner: Partitioner)(func: (V, V) => V): RDD[(K, V)] = {
combineByKey[V]({v: V => func(zeroValue, v)}, func, func, partitioner)
// Serialize the zero value to a byte array so that we can get a new clone of it on each key
val zeroBuffer = SparkEnv.get.closureSerializer.newInstance().serialize(zeroValue)
val zeroArray = new Array[Byte](zeroBuffer.limit)
zeroBuffer.get(zeroArray)

// When deserializing, use a lazy val to create just one instance of the serializer per task
lazy val cachedSerializer = SparkEnv.get.closureSerializer.newInstance()
def createZero() = cachedSerializer.deserialize[V](ByteBuffer.wrap(zeroArray))

combineByKey[V]((v: V) => func(createZero(), v), func, func, partitioner)
}

/**
Expand Down
24 changes: 16 additions & 8 deletions core/src/main/scala/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -115,13 +115,14 @@ class SparkContext(
// Environment variables to pass to our executors
private[spark] val executorEnvs = HashMap[String, String]()
// Note: SPARK_MEM is included for Mesos, but overwritten for standalone mode in ExecutorRunner
for (key <- Seq("SPARK_MEM", "SPARK_CLASSPATH", "SPARK_LIBRARY_PATH", "SPARK_JAVA_OPTS",
"SPARK_TESTING")) {
for (key <- Seq("SPARK_CLASSPATH", "SPARK_LIBRARY_PATH", "SPARK_JAVA_OPTS", "SPARK_TESTING")) {
val value = System.getenv(key)
if (value != null) {
executorEnvs(key) = value
}
}
// Since memory can be set with a system property too, use that
executorEnvs("SPARK_MEM") = SparkContext.executorMemoryRequested + "m"
if (environment != null) {
executorEnvs ++= environment
}
Expand Down Expand Up @@ -156,14 +157,12 @@ class SparkContext(
scheduler

case LOCAL_CLUSTER_REGEX(numSlaves, coresPerSlave, memoryPerSlave) =>
// Check to make sure SPARK_MEM <= memoryPerSlave. Otherwise Spark will just hang.
// Check to make sure memory requested <= memoryPerSlave. Otherwise Spark will just hang.
val memoryPerSlaveInt = memoryPerSlave.toInt
val sparkMemEnv = System.getenv("SPARK_MEM")
val sparkMemEnvInt = if (sparkMemEnv != null) Utils.memoryStringToMb(sparkMemEnv) else 512
if (sparkMemEnvInt > memoryPerSlaveInt) {
if (SparkContext.executorMemoryRequested > memoryPerSlaveInt) {
throw new SparkException(
"Slave memory (%d MB) cannot be smaller than SPARK_MEM (%d MB)".format(
memoryPerSlaveInt, sparkMemEnvInt))
"Asked to launch cluster with %d MB RAM / worker but requested %d MB/worker".format(
memoryPerSlaveInt, SparkContext.executorMemoryRequested))
}

val scheduler = new ClusterScheduler(this)
Expand Down Expand Up @@ -881,6 +880,15 @@ object SparkContext {

/** Find the JAR that contains the class of a particular object */
def jarOfObject(obj: AnyRef): Seq[String] = jarOfClass(obj.getClass)

/** Get the amount of memory per executor requested through system properties or SPARK_MEM */
private[spark] val executorMemoryRequested = {
// TODO: Might need to add some extra memory for the non-heap parts of the JVM
Option(System.getProperty("spark.executor.memory"))
.orElse(Option(System.getenv("SPARK_MEM")))
.map(Utils.memoryStringToMb)
.getOrElse(512)
}
}


Expand Down
3 changes: 2 additions & 1 deletion core/src/main/scala/spark/SparkEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ class SparkEnv (

def createPythonWorker(pythonExec: String, envVars: Map[String, String]): java.net.Socket = {
synchronized {
pythonWorkers.getOrElseUpdate((pythonExec, envVars), new PythonWorkerFactory(pythonExec, envVars)).create()
val key = (pythonExec, envVars)
pythonWorkers.getOrElseUpdate(key, new PythonWorkerFactory(pythonExec, envVars)).create()
}
}

Expand Down
Loading

0 comments on commit 4cc918a

Please sign in to comment.