Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SKIPME merged Apache branch-1.6 #139

Merged
merged 17 commits into from
Dec 29, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
4062cda
Preparing Spark release v1.6.0-rc4
pwendell Dec 22, 2015
5b19e7c
Preparing development version 1.6.0-SNAPSHOT
pwendell Dec 22, 2015
309ef35
[MINOR] Fix typos in JavaStreamingContext
zsxwing Dec 22, 2015
0f905d7
[SPARK-11823][SQL] Fix flaky JDBC cancellation test in HiveThriftBina…
JoshRosen Dec 22, 2015
94fb5e8
[SPARK-12487][STREAMING][DOCUMENT] Add docs for Kafka message handler
zsxwing Dec 22, 2015
942c057
[SPARK-12429][STREAMING][DOC] Add Accumulator and Broadcast example f…
zsxwing Dec 23, 2015
c6c9bf9
[SPARK-12477][SQL] - Tungsten projection fails for null values in arr…
Dec 23, 2015
5987b16
[SPARK-12499][BUILD] don't force MAVEN_OPTS
abridgett Dec 24, 2015
b49856a
[SPARK-12411][CORE] Decrease executor heartbeat timeout to match hear…
nongli Dec 19, 2015
4dd8712
[SPARK-12502][BUILD][PYTHON] Script /dev/run-tests fails when IBM Jav…
kiszk Dec 24, 2015
865dd8b
[SPARK-12010][SQL] Spark JDBC requires support for column-name-free I…
CK50 Dec 24, 2015
b8da77e
[SPARK-12520] [PYSPARK] Correct Descriptions and Add Use Cases in Equ…
gatorsmile Dec 28, 2015
1fbcb6e
[SPARK-12517] add default RDD name for one created via sc.textFile
wyaron Dec 28, 2015
7c7d76f
[SPARK-12424][ML] The implementation of ParamMap#filter is wrong.
sarutak Dec 28, 2015
a9c52d4
[SPARK-12222][CORE] Deserialize RoaringBitmap using Kryo serializer t…
adrian-wang Dec 28, 2015
fd20248
[SPARK-12489][CORE][SQL][MLIB] Fix minor issues found by FindBugs
zsxwing Dec 28, 2015
d545dfe
Merge branch 'branch-1.6' of github.com:apache/spark into csd-1.6
markhamstra Dec 29, 2015
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -830,7 +830,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
minPartitions: Int = defaultMinPartitions): RDD[String] = withScope {
assertNotStopped()
hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
minPartitions).map(pair => pair._2.toString)
minPartitions).map(pair => pair._2.toString).setName(path)
}

/**
Expand Down Expand Up @@ -879,7 +879,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
classOf[Text],
classOf[Text],
updateConf,
minPartitions).setName(path).map(record => (record._1.toString, record._2.toString))
minPartitions).map(record => (record._1.toString, record._2.toString)).setName(path)
}

/**
Expand Down
4 changes: 3 additions & 1 deletion core/src/main/scala/org/apache/spark/executor/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import scala.util.control.NonFatal
import org.apache.spark._
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.memory.TaskMemoryManager
import org.apache.spark.rpc.RpcTimeout
import org.apache.spark.scheduler.{DirectTaskResult, IndirectTaskResult, Task}
import org.apache.spark.shuffle.FetchFailedException
import org.apache.spark.storage.{StorageLevel, TaskResultBlockId}
Expand Down Expand Up @@ -445,7 +446,8 @@ private[spark] class Executor(

val message = Heartbeat(executorId, tasksMetrics.toArray, env.blockManager.blockManagerId)
try {
val response = heartbeatReceiverRef.askWithRetry[HeartbeatResponse](message)
val response = heartbeatReceiverRef.askWithRetry[HeartbeatResponse](
message, RpcTimeout(conf, "spark.executor.heartbeatInterval", "10s"))
if (response.reregisterBlockManager) {
logInfo("Told to re-register on heartbeat")
env.blockManager.reregister()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.spark.scheduler.cluster.mesos

import java.io.File
import java.util.concurrent.locks.ReentrantLock
import java.util.{Collections, Date, List => JList}

import scala.collection.JavaConverters._
Expand Down Expand Up @@ -126,7 +125,7 @@ private[spark] class MesosClusterScheduler(
private val retainedDrivers = conf.getInt("spark.mesos.retainedDrivers", 200)
private val maxRetryWaitTime = conf.getInt("spark.mesos.cluster.retry.wait.max", 60) // 1 minute
private val schedulerState = engineFactory.createEngine("scheduler")
private val stateLock = new ReentrantLock()
private val stateLock = new Object()
private val finishedDrivers =
new mutable.ArrayBuffer[MesosClusterSubmissionState](retainedDrivers)
private var frameworkId: String = null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -399,12 +399,7 @@ private[serializer] class KryoInputDataInputBridge(input: KryoInput) extends Dat
override def readInt(): Int = input.readInt()
override def readUnsignedShort(): Int = input.readShortUnsigned()
override def skipBytes(n: Int): Int = {
var remaining: Long = n
while (remaining > 0) {
val skip = Math.min(Integer.MAX_VALUE, remaining).asInstanceOf[Int]
input.skip(skip)
remaining -= skip
}
input.skip(n)
n
}
override def readFully(b: Array[Byte]): Unit = input.read(b)
Expand Down
25 changes: 25 additions & 0 deletions core/src/test/scala/org/apache/spark/SparkContextSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,31 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext {
}
}

test("Default path for file based RDDs is properly set (SPARK-12517)") {
sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local"))

// Test filetextFile, wholeTextFiles, binaryFiles, hadoopFile and
// newAPIHadoopFile for setting the default path as the RDD name
val mockPath = "default/path/for/"

var targetPath = mockPath + "textFile"
assert(sc.textFile(targetPath).name === targetPath)

targetPath = mockPath + "wholeTextFiles"
assert(sc.wholeTextFiles(targetPath).name === targetPath)

targetPath = mockPath + "binaryFiles"
assert(sc.binaryFiles(targetPath).name === targetPath)

targetPath = mockPath + "hadoopFile"
assert(sc.hadoopFile(targetPath).name === targetPath)

targetPath = mockPath + "newAPIHadoopFile"
assert(sc.newAPIHadoopFile(targetPath).name === targetPath)

sc.stop()
}

test("calling multiple sc.stop() must not throw any exception") {
noException should be thrownBy {
sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local"))
Expand Down
7 changes: 3 additions & 4 deletions dev/run-tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ def determine_java_executable():
return java_exe if java_exe else which("java")


JavaVersion = namedtuple('JavaVersion', ['major', 'minor', 'patch', 'update'])
JavaVersion = namedtuple('JavaVersion', ['major', 'minor', 'patch'])


def determine_java_version(java_exe):
Expand All @@ -164,14 +164,13 @@ def determine_java_version(java_exe):
# find raw version string, eg 'java version "1.8.0_25"'
raw_version_str = next(x for x in raw_output_lines if " version " in x)

match = re.search('(\d+)\.(\d+)\.(\d+)_(\d+)', raw_version_str)
match = re.search('(\d+)\.(\d+)\.(\d+)', raw_version_str)

major = int(match.group(1))
minor = int(match.group(2))
patch = int(match.group(3))
update = int(match.group(4))

return JavaVersion(major, minor, patch, update)
return JavaVersion(major, minor, patch)

# -------------------------------------------------------------------------------------------------
# Functions for running the other build and test scripts
Expand Down
6 changes: 3 additions & 3 deletions docs/programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -806,7 +806,7 @@ However, in `cluster` mode, what happens is more complicated, and the above may

What is happening here is that the variables within the closure sent to each executor are now copies and thus, when **counter** is referenced within the `foreach` function, it's no longer the **counter** on the driver node. There is still a **counter** in the memory of the driver node but this is no longer visible to the executors! The executors only see the copy from the serialized closure. Thus, the final value of **counter** will still be zero since all operations on **counter** were referencing the value within the serialized closure.

To ensure well-defined behavior in these sorts of scenarios one should use an [`Accumulator`](#AccumLink). Accumulators in Spark are used specifically to provide a mechanism for safely updating a variable when execution is split up across worker nodes in a cluster. The Accumulators section of this guide discusses these in more detail.
To ensure well-defined behavior in these sorts of scenarios one should use an [`Accumulator`](#accumulators). Accumulators in Spark are used specifically to provide a mechanism for safely updating a variable when execution is split up across worker nodes in a cluster. The Accumulators section of this guide discusses these in more detail.

In general, closures - constructs like loops or locally defined methods, should not be used to mutate some global state. Spark does not define or guarantee the behavior of mutations to objects referenced from outside of closures. Some code that does this may work in local mode, but that's just by accident and such code will not behave as expected in distributed mode. Use an Accumulator instead if some global aggregation is needed.

Expand Down Expand Up @@ -1091,7 +1091,7 @@ for details.
</tr>
<tr>
<td> <b>foreach</b>(<i>func</i>) </td>
<td> Run a function <i>func</i> on each element of the dataset. This is usually done for side effects such as updating an <a href="#AccumLink">Accumulator</a> or interacting with external storage systems.
<td> Run a function <i>func</i> on each element of the dataset. This is usually done for side effects such as updating an <a href="#accumulators">Accumulator</a> or interacting with external storage systems.
<br /><b>Note</b>: modifying variables other than Accumulators outside of the <code>foreach()</code> may result in undefined behavior. See <a href="#ClosuresLink">Understanding closures </a> for more details.</td>
</tr>
</table>
Expand Down Expand Up @@ -1336,7 +1336,7 @@ run on the cluster so that `v` is not shipped to the nodes more than once. In ad
`v` should not be modified after it is broadcast in order to ensure that all nodes get the same
value of the broadcast variable (e.g. if the variable is shipped to a new node later).

## Accumulators <a name="AccumLink"></a>
## Accumulators

Accumulators are variables that are only "added" to through an associative operation and can
therefore be efficiently supported in parallel. They can be used to implement counters (as in
Expand Down
3 changes: 3 additions & 0 deletions docs/streaming-kafka-integration.md
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ Next, we discuss how to use this approach in your streaming application.
[key class], [value class], [key decoder class], [value decoder class] ](
streamingContext, [map of Kafka parameters], [set of topics to consume])

You can also pass a `messageHandler` to `createDirectStream` to access `MessageAndMetadata` that contains metadata about the current message and transform it to any desired type.
See the [API docs](api/scala/index.html#org.apache.spark.streaming.kafka.KafkaUtils$)
and the [example]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/DirectKafkaWordCount.scala).
</div>
Expand All @@ -115,6 +116,7 @@ Next, we discuss how to use this approach in your streaming application.
[key class], [value class], [key decoder class], [value decoder class],
[map of Kafka parameters], [set of topics to consume]);

You can also pass a `messageHandler` to `createDirectStream` to access `MessageAndMetadata` that contains metadata about the current message and transform it to any desired type.
See the [API docs](api/java/index.html?org/apache/spark/streaming/kafka/KafkaUtils.html)
and the [example]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/main/java/org/apache/spark/examples/streaming/JavaDirectKafkaWordCount.java).

Expand All @@ -123,6 +125,7 @@ Next, we discuss how to use this approach in your streaming application.
from pyspark.streaming.kafka import KafkaUtils
directKafkaStream = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers})

You can also pass a `messageHandler` to `createDirectStream` to access `KafkaMessageAndMetadata` that contains metadata about the current message and transform it to any desired type.
By default, the Python API will decode Kafka data as UTF8 encoded strings. You can specify your custom decoding function to decode the byte arrays in Kafka records to any arbitrary data type. See the [API docs](api/python/pyspark.streaming.html#pyspark.streaming.kafka.KafkaUtils)
and the [example]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/main/python/streaming/direct_kafka_wordcount.py).
</div>
Expand Down
165 changes: 165 additions & 0 deletions docs/streaming-programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -1415,6 +1415,171 @@ Note that the connections in the pool should be lazily created on demand and tim

***

## Accumulators and Broadcast Variables

[Accumulators](programming-guide.html#accumulators) and [Broadcast variables](programming-guide.html#broadcast-variables) cannot be recovered from checkpoint in Spark Streaming. If you enable checkpointing and use [Accumulators](programming-guide.html#accumulators) or [Broadcast variables](programming-guide.html#broadcast-variables) as well, you'll have to create lazily instantiated singleton instances for [Accumulators](programming-guide.html#accumulators) and [Broadcast variables](programming-guide.html#broadcast-variables) so that they can be re-instantiated after the driver restarts on failure. This is shown in the following example.

<div class="codetabs">
<div data-lang="scala" markdown="1">
{% highlight scala %}

object WordBlacklist {

@volatile private var instance: Broadcast[Seq[String]] = null

def getInstance(sc: SparkContext): Broadcast[Seq[String]] = {
if (instance == null) {
synchronized {
if (instance == null) {
val wordBlacklist = Seq("a", "b", "c")
instance = sc.broadcast(wordBlacklist)
}
}
}
instance
}
}

object DroppedWordsCounter {

@volatile private var instance: Accumulator[Long] = null

def getInstance(sc: SparkContext): Accumulator[Long] = {
if (instance == null) {
synchronized {
if (instance == null) {
instance = sc.accumulator(0L, "WordsInBlacklistCounter")
}
}
}
instance
}
}

wordCounts.foreachRDD((rdd: RDD[(String, Int)], time: Time) => {
// Get or register the blacklist Broadcast
val blacklist = WordBlacklist.getInstance(rdd.sparkContext)
// Get or register the droppedWordsCounter Accumulator
val droppedWordsCounter = DroppedWordsCounter.getInstance(rdd.sparkContext)
// Use blacklist to drop words and use droppedWordsCounter to count them
val counts = rdd.filter { case (word, count) =>
if (blacklist.value.contains(word)) {
droppedWordsCounter += count
false
} else {
true
}
}.collect()
val output = "Counts at time " + time + " " + counts
})

{% endhighlight %}

See the full [source code]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala).
</div>
<div data-lang="java" markdown="1">
{% highlight java %}

class JavaWordBlacklist {

private static volatile Broadcast<List<String>> instance = null;

public static Broadcast<List<String>> getInstance(JavaSparkContext jsc) {
if (instance == null) {
synchronized (JavaWordBlacklist.class) {
if (instance == null) {
List<String> wordBlacklist = Arrays.asList("a", "b", "c");
instance = jsc.broadcast(wordBlacklist);
}
}
}
return instance;
}
}

class JavaDroppedWordsCounter {

private static volatile Accumulator<Integer> instance = null;

public static Accumulator<Integer> getInstance(JavaSparkContext jsc) {
if (instance == null) {
synchronized (JavaDroppedWordsCounter.class) {
if (instance == null) {
instance = jsc.accumulator(0, "WordsInBlacklistCounter");
}
}
}
return instance;
}
}

wordCounts.foreachRDD(new Function2<JavaPairRDD<String, Integer>, Time, Void>() {
@Override
public Void call(JavaPairRDD<String, Integer> rdd, Time time) throws IOException {
// Get or register the blacklist Broadcast
final Broadcast<List<String>> blacklist = JavaWordBlacklist.getInstance(new JavaSparkContext(rdd.context()));
// Get or register the droppedWordsCounter Accumulator
final Accumulator<Integer> droppedWordsCounter = JavaDroppedWordsCounter.getInstance(new JavaSparkContext(rdd.context()));
// Use blacklist to drop words and use droppedWordsCounter to count them
String counts = rdd.filter(new Function<Tuple2<String, Integer>, Boolean>() {
@Override
public Boolean call(Tuple2<String, Integer> wordCount) throws Exception {
if (blacklist.value().contains(wordCount._1())) {
droppedWordsCounter.add(wordCount._2());
return false;
} else {
return true;
}
}
}).collect().toString();
String output = "Counts at time " + time + " " + counts;
}
}

{% endhighlight %}

See the full [source code]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java).
</div>
<div data-lang="python" markdown="1">
{% highlight python %}

def getWordBlacklist(sparkContext):
if ('wordBlacklist' not in globals()):
globals()['wordBlacklist'] = sparkContext.broadcast(["a", "b", "c"])
return globals()['wordBlacklist']

def getDroppedWordsCounter(sparkContext):
if ('droppedWordsCounter' not in globals()):
globals()['droppedWordsCounter'] = sparkContext.accumulator(0)
return globals()['droppedWordsCounter']

def echo(time, rdd):
# Get or register the blacklist Broadcast
blacklist = getWordBlacklist(rdd.context)
# Get or register the droppedWordsCounter Accumulator
droppedWordsCounter = getDroppedWordsCounter(rdd.context)

# Use blacklist to drop words and use droppedWordsCounter to count them
def filterFunc(wordCount):
if wordCount[0] in blacklist.value:
droppedWordsCounter.add(wordCount[1])
False
else:
True

counts = "Counts at time %s %s" % (time, rdd.filter(filterFunc).collect())

wordCounts.foreachRDD(echo)

{% endhighlight %}

See the full [source code]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/main/python/streaming/recoverable_network_wordcount.py).

</div>
</div>

***

## DataFrame and SQL Operations
You can easily use [DataFrames and SQL](sql-programming-guide.html) operations on streaming data. You have to create a SQLContext using the SparkContext that the StreamingContext is using. Furthermore this has to done such that it can be restarted on driver failures. This is done by creating a lazily instantiated singleton instance of SQLContext. This is shown in the following example. It modifies the earlier [word count example](#a-quick-example) to generate word counts using DataFrames and SQL. Each RDD is converted to a DataFrame, registered as a temporary table and then queried using SQL.

Expand Down
Loading