Skip to content

Commit

Permalink
kMerge remote-tracking branch 'upstream/master' into ldaonline
Browse files Browse the repository at this point in the history
  • Loading branch information
hhbyyh committed Feb 10, 2015
2 parents 0d0f3ee + 31d435e commit 0dd3947
Show file tree
Hide file tree
Showing 52 changed files with 3,903 additions and 2,893 deletions.
28 changes: 28 additions & 0 deletions build/sbt
Original file line number Diff line number Diff line change
Expand Up @@ -125,4 +125,32 @@ loadConfigFile() {
[[ -f "$etc_sbt_opts_file" ]] && set -- $(loadConfigFile "$etc_sbt_opts_file") "$@"
[[ -f "$sbt_opts_file" ]] && set -- $(loadConfigFile "$sbt_opts_file") "$@"

exit_status=127
saved_stty=""

restoreSttySettings() {
stty $saved_stty
saved_stty=""
}

onExit() {
if [[ "$saved_stty" != "" ]]; then
restoreSttySettings
fi
exit $exit_status
}

saveSttySettings() {
saved_stty=$(stty -g 2>/dev/null)
if [[ ! $? ]]; then
saved_stty=""
fi
}

saveSttySettings
trap onExit INT

run "$@"

exit_status=$?
onExit
2 changes: 1 addition & 1 deletion build/sbt-launch-lib.bash
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ execRunner () {
echo ""
}

exec "$@"
"$@"
}

addJava () {
Expand Down
3 changes: 3 additions & 0 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1420,6 +1420,9 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
val callSite = getCallSite
val cleanedFunc = clean(func)
logInfo("Starting job: " + callSite.shortForm)
if (conf.getBoolean("spark.logLineage", false)) {
logInfo("RDD's recursive dependencies:\n" + rdd.toDebugString)
}
dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, allowLocal,
resultHandler, localProperties.get)
progressBar.foreach(_.finishAll())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -671,7 +671,7 @@ private[spark] class Master(

def registerApplication(app: ApplicationInfo): Unit = {
val appAddress = app.driver.path.address
if (addressToWorker.contains(appAddress)) {
if (addressToApp.contains(appAddress)) {
logInfo("Attempted to re-register application at same address: " + appAddress)
return
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,11 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val actorSyste
* Return whether the request is acknowledged.
*/
final override def requestExecutors(numAdditionalExecutors: Int): Boolean = synchronized {
if (numAdditionalExecutors < 0) {
throw new IllegalArgumentException(
"Attempted to request a negative number of additional executor(s) " +
s"$numAdditionalExecutors from the cluster manager. Please specify a positive number!")
}
logInfo(s"Requesting $numAdditionalExecutors additional executor(s) from the cluster manager")
logDebug(s"Number of pending executors is now $numPendingExecutors")
numPendingExecutors += numAdditionalExecutors
Expand Down
12 changes: 12 additions & 0 deletions data/mllib/sample_lda_data.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
1 2 6 0 2 3 1 1 0 0 3
1 3 0 1 3 0 0 2 0 0 1
1 4 1 0 0 4 9 0 1 2 0
2 1 0 3 0 0 5 0 2 3 9
3 1 1 9 3 0 2 0 0 1 3
4 2 0 3 4 5 1 1 1 4 0
2 1 0 3 0 0 5 0 2 2 9
1 1 1 9 2 1 2 0 0 1 3
4 4 0 3 4 2 1 3 0 0 0
2 8 2 0 3 0 2 0 2 7 2
1 1 1 9 0 2 2 0 0 3 3
4 1 0 0 4 5 1 3 0 1 0
6 changes: 3 additions & 3 deletions dev/run-tests
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ function handle_error () {
}


# Build against the right verison of Hadoop.
# Build against the right version of Hadoop.
{
if [ -n "$AMPLAB_JENKINS_BUILD_PROFILE" ]; then
if [ "$AMPLAB_JENKINS_BUILD_PROFILE" = "hadoop1.0" ]; then
Expand Down Expand Up @@ -77,7 +77,7 @@ export SBT_MAVEN_PROFILES_ARGS="$SBT_MAVEN_PROFILES_ARGS -Pkinesis-asl"
fi
}

# Only run Hive tests if there are sql changes.
# Only run Hive tests if there are SQL changes.
# Partial solution for SPARK-1455.
if [ -n "$AMPLAB_JENKINS" ]; then
git fetch origin master:master
Expand Down Expand Up @@ -183,7 +183,7 @@ CURRENT_BLOCK=$BLOCK_SPARK_UNIT_TESTS
if [ -n "$_SQL_TESTS_ONLY" ]; then
# This must be an array of individual arguments. Otherwise, having one long string
# will be interpreted as a single test, which doesn't work.
SBT_MAVEN_TEST_ARGS=("catalyst/test" "sql/test" "hive/test" "mllib/test")
SBT_MAVEN_TEST_ARGS=("catalyst/test" "sql/test" "hive/test" "hive-thriftserver/test" "mllib/test")
else
SBT_MAVEN_TEST_ARGS=("test")
fi
Expand Down
129 changes: 128 additions & 1 deletion docs/mllib-clustering.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ has the following parameters:

Power iteration clustering is a scalable and efficient algorithm for clustering points given pointwise mutual affinity values. Internally the algorithm:

* accepts a [Graph](https://spark.apache.org/docs/0.9.2/api/graphx/index.html#org.apache.spark.graphx.Graph) that represents a normalized pairwise affinity between all input points.
* accepts a [Graph](api/graphx/index.html#org.apache.spark.graphx.Graph) that represents a normalized pairwise affinity between all input points.
* calculates the principal eigenvalue and eigenvector
* Clusters each of the input points according to their principal eigenvector component value

Expand All @@ -71,6 +71,35 @@ Example outputs for a dataset inspired by the paper - but with five clusters ins
<!-- Images are downsized intentionally to improve quality on retina displays -->
</p>

### Latent Dirichlet Allocation (LDA)

[Latent Dirichlet Allocation (LDA)](http://en.wikipedia.org/wiki/Latent_Dirichlet_allocation)
is a topic model which infers topics from a collection of text documents.
LDA can be thought of as a clustering algorithm as follows:

* Topics correspond to cluster centers, and documents correspond to examples (rows) in a dataset.
* Topics and documents both exist in a feature space, where feature vectors are vectors of word counts.
* Rather than estimating a clustering using a traditional distance, LDA uses a function based
on a statistical model of how text documents are generated.

LDA takes in a collection of documents as vectors of word counts.
It learns clustering using [expectation-maximization](http://en.wikipedia.org/wiki/Expectation%E2%80%93maximization_algorithm)
on the likelihood function. After fitting on the documents, LDA provides:

* Topics: Inferred topics, each of which is a probability distribution over terms (words).
* Topic distributions for documents: For each document in the training set, LDA gives a probability distribution over topics.

LDA takes the following parameters:

* `k`: Number of topics (i.e., cluster centers)
* `maxIterations`: Limit on the number of iterations of EM used for learning
* `docConcentration`: Hyperparameter for prior over documents' distributions over topics. Currently must be > 1, where larger values encourage smoother inferred distributions.
* `topicConcentration`: Hyperparameter for prior over topics' distributions over terms (words). Currently must be > 1, where larger values encourage smoother inferred distributions.
* `checkpointInterval`: If using checkpointing (set in the Spark configuration), this parameter specifies the frequency with which checkpoints will be created. If `maxIterations` is large, using checkpointing can help reduce shuffle file sizes on disk and help with failure recovery.

*Note*: LDA is a new feature with some missing functionality. In particular, it does not yet
support prediction on new documents, and it does not have a Python API. These will be added in the future.

### Examples

#### k-means
Expand Down Expand Up @@ -293,6 +322,104 @@ for i in range(2):

</div>

#### Latent Dirichlet Allocation (LDA) Example

In the following example, we load word count vectors representing a corpus of documents.
We then use [LDA](api/scala/index.html#org.apache.spark.mllib.clustering.LDA)
to infer three topics from the documents. The number of desired clusters is passed
to the algorithm. We then output the topics, represented as probability distributions over words.

<div class="codetabs">
<div data-lang="scala" markdown="1">

{% highlight scala %}
import org.apache.spark.mllib.clustering.LDA
import org.apache.spark.mllib.linalg.Vectors

// Load and parse the data
val data = sc.textFile("data/mllib/sample_lda_data.txt")
val parsedData = data.map(s => Vectors.dense(s.trim.split(' ').map(_.toDouble)))
// Index documents with unique IDs
val corpus = parsedData.zipWithIndex.map(_.swap).cache()

// Cluster the documents into three topics using LDA
val ldaModel = new LDA().setK(3).run(corpus)

// Output topics. Each is a distribution over words (matching word count vectors)
println("Learned topics (as distributions over vocab of " + ldaModel.vocabSize + " words):")
val topics = ldaModel.topicsMatrix
for (topic <- Range(0, 3)) {
print("Topic " + topic + ":")
for (word <- Range(0, ldaModel.vocabSize)) { print(" " + topics(word, topic)); }
println()
}
{% endhighlight %}
</div>

<div data-lang="java" markdown="1">
{% highlight java %}
import scala.Tuple2;

import org.apache.spark.api.java.*;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.mllib.clustering.DistributedLDAModel;
import org.apache.spark.mllib.clustering.LDA;
import org.apache.spark.mllib.linalg.Matrix;
import org.apache.spark.mllib.linalg.Vector;
import org.apache.spark.mllib.linalg.Vectors;
import org.apache.spark.SparkConf;

public class JavaLDAExample {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName("LDA Example");
JavaSparkContext sc = new JavaSparkContext(conf);

// Load and parse the data
String path = "data/mllib/sample_lda_data.txt";
JavaRDD<String> data = sc.textFile(path);
JavaRDD<Vector> parsedData = data.map(
new Function<String, Vector>() {
public Vector call(String s) {
String[] sarray = s.trim().split(" ");
double[] values = new double[sarray.length];
for (int i = 0; i < sarray.length; i++)
values[i] = Double.parseDouble(sarray[i]);
return Vectors.dense(values);
}
}
);
// Index documents with unique IDs
JavaPairRDD<Long, Vector> corpus = JavaPairRDD.fromJavaRDD(parsedData.zipWithIndex().map(
new Function<Tuple2<Vector, Long>, Tuple2<Long, Vector>>() {
public Tuple2<Long, Vector> call(Tuple2<Vector, Long> doc_id) {
return doc_id.swap();
}
}
));
corpus.cache();

// Cluster the documents into three topics using LDA
DistributedLDAModel ldaModel = new LDA().setK(3).run(corpus);

// Output topics. Each is a distribution over words (matching word count vectors)
System.out.println("Learned topics (as distributions over vocab of " + ldaModel.vocabSize()
+ " words):");
Matrix topics = ldaModel.topicsMatrix();
for (int topic = 0; topic < 3; topic++) {
System.out.print("Topic " + topic + ":");
for (int word = 0; word < ldaModel.vocabSize(); word++) {
System.out.print(" " + topics.apply(word, topic));
}
System.out.println();
}
}
}
{% endhighlight %}
</div>

</div>


In order to run the above application, follow the instructions
provided in the [Self-Contained Applications](quick-start.html#self-contained-applications)
section of the Spark
Expand Down
73 changes: 56 additions & 17 deletions ec2/spark_ec2.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import sys
import tarfile
import tempfile
import textwrap
import time
import urllib2
import warnings
Expand Down Expand Up @@ -61,10 +62,10 @@

DEFAULT_SPARK_VERSION = SPARK_EC2_VERSION
DEFAULT_SPARK_GITHUB_REPO = "https://github.com/apache/spark"
MESOS_SPARK_EC2_BRANCH = "branch-1.3"

# A URL prefix from which to fetch AMI information
AMI_PREFIX = "https://raw.github.com/mesos/spark-ec2/{b}/ami-list".format(b=MESOS_SPARK_EC2_BRANCH)
# Default location to get the spark-ec2 scripts (and ami-list) from
DEFAULT_SPARK_EC2_GITHUB_REPO = "https://github.com/mesos/spark-ec2"
DEFAULT_SPARK_EC2_BRANCH = "branch-1.3"


def setup_boto():
Expand Down Expand Up @@ -146,6 +147,14 @@ def parse_args():
"--spark-git-repo",
default=DEFAULT_SPARK_GITHUB_REPO,
help="Github repo from which to checkout supplied commit hash (default: %default)")
parser.add_option(
"--spark-ec2-git-repo",
default=DEFAULT_SPARK_EC2_GITHUB_REPO,
help="Github repo from which to checkout spark-ec2 (default: %default)")
parser.add_option(
"--spark-ec2-git-branch",
default=DEFAULT_SPARK_EC2_BRANCH,
help="Github repo branch of spark-ec2 to use (default: %default)")
parser.add_option(
"--hadoop-major-version", default="1",
help="Major version of Hadoop (default: %default)")
Expand Down Expand Up @@ -332,7 +341,12 @@ def get_spark_ami(opts):
print >> stderr,\
"Don't recognize %s, assuming type is pvm" % opts.instance_type

ami_path = "%s/%s/%s" % (AMI_PREFIX, opts.region, instance_type)
# URL prefix from which to fetch AMI information
ami_prefix = "{r}/{b}/ami-list".format(
r=opts.spark_ec2_git_repo.replace("https://github.com", "https://raw.github.com", 1),
b=opts.spark_ec2_git_branch)

ami_path = "%s/%s/%s" % (ami_prefix, opts.region, instance_type)
try:
ami = urllib2.urlopen(ami_path).read().strip()
print "Spark AMI: " + ami
Expand Down Expand Up @@ -649,12 +663,15 @@ def setup_cluster(conn, master_nodes, slave_nodes, opts, deploy_ssh_key):

# NOTE: We should clone the repository before running deploy_files to
# prevent ec2-variables.sh from being overwritten
print "Cloning spark-ec2 scripts from {r}/tree/{b} on master...".format(
r=opts.spark_ec2_git_repo, b=opts.spark_ec2_git_branch)
ssh(
host=master,
opts=opts,
command="rm -rf spark-ec2"
+ " && "
+ "git clone https://github.com/mesos/spark-ec2.git -b {b}".format(b=MESOS_SPARK_EC2_BRANCH)
+ "git clone {r} -b {b} spark-ec2".format(r=opts.spark_ec2_git_repo,
b=opts.spark_ec2_git_branch)
)

print "Deploying files to master..."
Expand All @@ -681,21 +698,32 @@ def setup_spark_cluster(master, opts):
print "Ganglia started at http://%s:5080/ganglia" % master


def is_ssh_available(host, opts):
def is_ssh_available(host, opts, print_ssh_output=True):
"""
Check if SSH is available on a host.
"""
try:
with open(os.devnull, 'w') as devnull:
ret = subprocess.check_call(
ssh_command(opts) + ['-t', '-t', '-o', 'ConnectTimeout=3',
'%s@%s' % (opts.user, host), stringify_command('true')],
stdout=devnull,
stderr=devnull
)
return ret == 0
except subprocess.CalledProcessError as e:
return False
s = subprocess.Popen(
ssh_command(opts) + ['-t', '-t', '-o', 'ConnectTimeout=3',
'%s@%s' % (opts.user, host), stringify_command('true')],
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT # we pipe stderr through stdout to preserve output order
)
cmd_output = s.communicate()[0] # [1] is stderr, which we redirected to stdout

if s.returncode != 0 and print_ssh_output:
# extra leading newline is for spacing in wait_for_cluster_state()
print textwrap.dedent("""\n
Warning: SSH connection error. (This could be temporary.)
Host: {h}
SSH return code: {r}
SSH output: {o}
""").format(
h=host,
r=s.returncode,
o=cmd_output.strip()
)

return s.returncode == 0


def is_cluster_ssh_available(cluster_instances, opts):
Expand Down Expand Up @@ -1026,6 +1054,17 @@ def real_main():
print >> stderr, "ebs-vol-num cannot be greater than 8"
sys.exit(1)

# Prevent breaking ami_prefix (/, .git and startswith checks)
# Prevent forks with non spark-ec2 names for now.
if opts.spark_ec2_git_repo.endswith("/") or \
opts.spark_ec2_git_repo.endswith(".git") or \
not opts.spark_ec2_git_repo.startswith("https://github.com") or \
not opts.spark_ec2_git_repo.endswith("spark-ec2"):
print >> stderr, "spark-ec2-git-repo must be a github repo and it must not have a " \
"trailing / or .git. " \
"Furthermore, we currently only support forks named spark-ec2."
sys.exit(1)

try:
conn = ec2.connect_to_region(opts.region)
except Exception as e:
Expand Down
Loading

0 comments on commit 0dd3947

Please sign in to comment.