Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-12429][Streaming][Doc]Add Accumulator and Broadcast example for Streaming #10385

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions docs/programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -806,7 +806,7 @@ However, in `cluster` mode, what happens is more complicated, and the above may

What is happening here is that the variables within the closure sent to each executor are now copies and thus, when **counter** is referenced within the `foreach` function, it's no longer the **counter** on the driver node. There is still a **counter** in the memory of the driver node but this is no longer visible to the executors! The executors only see the copy from the serialized closure. Thus, the final value of **counter** will still be zero since all operations on **counter** were referencing the value within the serialized closure.

To ensure well-defined behavior in these sorts of scenarios one should use an [`Accumulator`](#AccumLink). Accumulators in Spark are used specifically to provide a mechanism for safely updating a variable when execution is split up across worker nodes in a cluster. The Accumulators section of this guide discusses these in more detail.
To ensure well-defined behavior in these sorts of scenarios one should use an [`Accumulator`](#accumulators). Accumulators in Spark are used specifically to provide a mechanism for safely updating a variable when execution is split up across worker nodes in a cluster. The Accumulators section of this guide discusses these in more detail.
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also fixed the broken Accumulator link in programming-guide.md

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice!


In general, closures - constructs like loops or locally defined methods, should not be used to mutate some global state. Spark does not define or guarantee the behavior of mutations to objects referenced from outside of closures. Some code that does this may work in local mode, but that's just by accident and such code will not behave as expected in distributed mode. Use an Accumulator instead if some global aggregation is needed.

Expand Down Expand Up @@ -1091,7 +1091,7 @@ for details.
</tr>
<tr>
<td> <b>foreach</b>(<i>func</i>) </td>
<td> Run a function <i>func</i> on each element of the dataset. This is usually done for side effects such as updating an <a href="#AccumLink">Accumulator</a> or interacting with external storage systems.
<td> Run a function <i>func</i> on each element of the dataset. This is usually done for side effects such as updating an <a href="#accumulators">Accumulator</a> or interacting with external storage systems.
<br /><b>Note</b>: modifying variables other than Accumulators outside of the <code>foreach()</code> may result in undefined behavior. See <a href="#ClosuresLink">Understanding closures </a> for more details.</td>
</tr>
</table>
Expand Down Expand Up @@ -1336,7 +1336,7 @@ run on the cluster so that `v` is not shipped to the nodes more than once. In ad
`v` should not be modified after it is broadcast in order to ensure that all nodes get the same
value of the broadcast variable (e.g. if the variable is shipped to a new node later).

## Accumulators <a name="AccumLink"></a>
## Accumulators

Accumulators are variables that are only "added" to through an associative operation and can
therefore be efficiently supported in parallel. They can be used to implement counters (as in
Expand Down
165 changes: 165 additions & 0 deletions docs/streaming-programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -1415,6 +1415,171 @@ Note that the connections in the pool should be lazily created on demand and tim

***

## Accumulators and Broadcast Variables

[Accumulators](programming-guide.html#accumulators) and [Broadcast variables](programming-guide.html#broadcast-variables) cannot be recovered from checkpoint in Spark Streaming. If you enable checkpointing and use [Accumulators](programming-guide.html#accumulators) or [Broadcast variables](programming-guide.html#broadcast-variables) as well, you'll have to create lazily instantiated singleton instances for [Accumulators](programming-guide.html#accumulators) and [Broadcast variables](programming-guide.html#broadcast-variables) so that they can be re-instantiated after the driver restarts on failure. This is shown in the following example.

<div class="codetabs">
<div data-lang="scala" markdown="1">
{% highlight scala %}

object WordBlacklist {

@volatile private var instance: Broadcast[Seq[String]] = null

def getInstance(sc: SparkContext): Broadcast[Seq[String]] = {
if (instance == null) {
synchronized {
if (instance == null) {
val wordBlacklist = Seq("a", "b", "c")
instance = sc.broadcast(wordBlacklist)
}
}
}
instance
}
}

object DroppedWordsCounter {

@volatile private var instance: Accumulator[Long] = null
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For scala, cant this whole thing be replaced with lazy val?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. Good point.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, no. We need sc here. So it cannot be a val.


def getInstance(sc: SparkContext): Accumulator[Long] = {
if (instance == null) {
synchronized {
if (instance == null) {
instance = sc.accumulator(0L, "WordsInBlacklistCounter")
}
}
}
instance
}
}

wordCounts.foreachRDD((rdd: RDD[(String, Int)], time: Time) => {
// Get or register the blacklist Broadcast
val blacklist = WordBlacklist.getInstance(rdd.sparkContext)
// Get or register the droppedWordsCounter Accumulator
val droppedWordsCounter = DroppedWordsCounter.getInstance(rdd.sparkContext)
// Use blacklist to drop words and use droppedWordsCounter to count them
val counts = rdd.filter { case (word, count) =>
if (blacklist.value.contains(word)) {
droppedWordsCounter += count
false
} else {
true
}
}.collect()
val output = "Counts at time " + time + " " + counts
})

{% endhighlight %}

See the full [source code]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala).
</div>
<div data-lang="java" markdown="1">
{% highlight java %}

class JavaWordBlacklist {

private static volatile Broadcast<List<String>> instance = null;

public static Broadcast<List<String>> getInstance(JavaSparkContext jsc) {
if (instance == null) {
synchronized (JavaWordBlacklist.class) {
if (instance == null) {
List<String> wordBlacklist = Arrays.asList("a", "b", "c");
instance = jsc.broadcast(wordBlacklist);
}
}
}
return instance;
}
}

class JavaDroppedWordsCounter {

private static volatile Accumulator<Integer> instance = null;

public static Accumulator<Integer> getInstance(JavaSparkContext jsc) {
if (instance == null) {
synchronized (JavaDroppedWordsCounter.class) {
if (instance == null) {
instance = jsc.accumulator(0, "WordsInBlacklistCounter");
}
}
}
return instance;
}
}

wordCounts.foreachRDD(new Function2<JavaPairRDD<String, Integer>, Time, Void>() {
@Override
public Void call(JavaPairRDD<String, Integer> rdd, Time time) throws IOException {
// Get or register the blacklist Broadcast
final Broadcast<List<String>> blacklist = JavaWordBlacklist.getInstance(new JavaSparkContext(rdd.context()));
// Get or register the droppedWordsCounter Accumulator
final Accumulator<Integer> droppedWordsCounter = JavaDroppedWordsCounter.getInstance(new JavaSparkContext(rdd.context()));
// Use blacklist to drop words and use droppedWordsCounter to count them
String counts = rdd.filter(new Function<Tuple2<String, Integer>, Boolean>() {
@Override
public Boolean call(Tuple2<String, Integer> wordCount) throws Exception {
if (blacklist.value().contains(wordCount._1())) {
droppedWordsCounter.add(wordCount._2());
return false;
} else {
return true;
}
}
}).collect().toString();
String output = "Counts at time " + time + " " + counts;
}
}

{% endhighlight %}

See the full [source code]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java).
</div>
<div data-lang="python" markdown="1">
{% highlight python %}

def getWordBlacklist(sparkContext):
if ('wordBlacklist' not in globals()):
globals()['wordBlacklist'] = sparkContext.broadcast(["a", "b", "c"])
return globals()['wordBlacklist']

def getDroppedWordsCounter(sparkContext):
if ('droppedWordsCounter' not in globals()):
globals()['droppedWordsCounter'] = sparkContext.accumulator(0)
return globals()['droppedWordsCounter']

def echo(time, rdd):
# Get or register the blacklist Broadcast
blacklist = getWordBlacklist(rdd.context)
# Get or register the droppedWordsCounter Accumulator
droppedWordsCounter = getDroppedWordsCounter(rdd.context)

# Use blacklist to drop words and use droppedWordsCounter to count them
def filterFunc(wordCount):
if wordCount[0] in blacklist.value:
droppedWordsCounter.add(wordCount[1])
False
else:
True

counts = "Counts at time %s %s" % (time, rdd.filter(filterFunc).collect())

wordCounts.foreachRDD(echo)

{% endhighlight %}

See the full [source code]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/main/python/streaming/recoverable_network_wordcount.py).

</div>
</div>

***

## DataFrame and SQL Operations
You can easily use [DataFrames and SQL](sql-programming-guide.html) operations on streaming data. You have to create a SQLContext using the SparkContext that the StreamingContext is using. Furthermore this has to done such that it can be restarted on driver failures. This is done by creating a lazily instantiated singleton instance of SQLContext. This is shown in the following example. It modifies the earlier [word count example](#a-quick-example) to generate word counts using DataFrames and SQL. Each RDD is converted to a DataFrame, registered as a temporary table and then queried using SQL.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,22 @@
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.Arrays;
import java.util.List;
import java.util.regex.Pattern;

import scala.Tuple2;
import com.google.common.collect.Lists;
import com.google.common.io.Files;

import org.apache.spark.Accumulator;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.Time;
import org.apache.spark.streaming.api.java.JavaDStream;
Expand All @@ -41,7 +46,48 @@
import org.apache.spark.streaming.api.java.JavaStreamingContextFactory;

/**
* Counts words in text encoded with UTF8 received from the network every second.
* Use this singleton to get or register a Broadcast variable.
*/
class JavaWordBlacklist {

private static volatile Broadcast<List<String>> instance = null;

public static Broadcast<List<String>> getInstance(JavaSparkContext jsc) {
if (instance == null) {
synchronized (JavaWordBlacklist.class) {
if (instance == null) {
List<String> wordBlacklist = Arrays.asList("a", "b", "c");
instance = jsc.broadcast(wordBlacklist);
}
}
}
return instance;
}
}

/**
* Use this singleton to get or register an Accumulator.
*/
class JavaDroppedWordsCounter {

private static volatile Accumulator<Integer> instance = null;

public static Accumulator<Integer> getInstance(JavaSparkContext jsc) {
if (instance == null) {
synchronized (JavaDroppedWordsCounter.class) {
if (instance == null) {
instance = jsc.accumulator(0, "WordsInBlacklistCounter");
}
}
}
return instance;
}
}

/**
* Counts words in text encoded with UTF8 received from the network every second. This example also
* shows how to use lazily instantiated singleton instances for Accumulator and Broadcast so that
* they can be registered on driver failures.
*
* Usage: JavaRecoverableNetworkWordCount <hostname> <port> <checkpoint-directory> <output-file>
* <hostname> and <port> describe the TCP server that Spark Streaming would connect to receive
Expand Down Expand Up @@ -111,10 +157,27 @@ public Integer call(Integer i1, Integer i2) {
wordCounts.foreachRDD(new Function2<JavaPairRDD<String, Integer>, Time, Void>() {
@Override
public Void call(JavaPairRDD<String, Integer> rdd, Time time) throws IOException {
String counts = "Counts at time " + time + " " + rdd.collect();
System.out.println(counts);
// Get or register the blacklist Broadcast
final Broadcast<List<String>> blacklist = JavaWordBlacklist.getInstance(new JavaSparkContext(rdd.context()));
// Get or register the droppedWordsCounter Accumulator
final Accumulator<Integer> droppedWordsCounter = JavaDroppedWordsCounter.getInstance(new JavaSparkContext(rdd.context()));
// Use blacklist to drop words and use droppedWordsCounter to count them
String counts = rdd.filter(new Function<Tuple2<String, Integer>, Boolean>() {
@Override
public Boolean call(Tuple2<String, Integer> wordCount) throws Exception {
if (blacklist.value().contains(wordCount._1())) {
droppedWordsCounter.add(wordCount._2());
return false;
} else {
return true;
}
}
}).collect().toString();
String output = "Counts at time " + time + " " + counts;
System.out.println(output);
System.out.println("Dropped " + droppedWordsCounter.value() + " word(s) totally");
System.out.println("Appending to " + outputFile.getAbsolutePath());
Files.append(counts + "\n", outputFile, Charset.defaultCharset());
Files.append(output + "\n", outputFile, Charset.defaultCharset());
return null;
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,20 @@
from pyspark.streaming import StreamingContext


# Get or register a Broadcast variable
def getWordBlacklist(sparkContext):
if ('wordBlacklist' not in globals()):
globals()['wordBlacklist'] = sparkContext.broadcast(["a", "b", "c"])
return globals()['wordBlacklist']


# Get or register an Accumulator
def getDroppedWordsCounter(sparkContext):
if ('droppedWordsCounter' not in globals()):
globals()['droppedWordsCounter'] = sparkContext.accumulator(0)
return globals()['droppedWordsCounter']


def createContext(host, port, outputPath):
# If you do not see this printed, that means the StreamingContext has been loaded
# from the new checkpoint
Expand All @@ -60,8 +74,22 @@ def createContext(host, port, outputPath):
wordCounts = words.map(lambda x: (x, 1)).reduceByKey(lambda x, y: x + y)

def echo(time, rdd):
counts = "Counts at time %s %s" % (time, rdd.collect())
# Get or register the blacklist Broadcast
blacklist = getWordBlacklist(rdd.context)
# Get or register the droppedWordsCounter Accumulator
droppedWordsCounter = getDroppedWordsCounter(rdd.context)

# Use blacklist to drop words and use droppedWordsCounter to count them
def filterFunc(wordCount):
if wordCount[0] in blacklist.value:
droppedWordsCounter.add(wordCount[1])
False
else:
True

counts = "Counts at time %s %s" % (time, rdd.filter(filterFunc).collect())
print(counts)
print("Dropped %d word(s) totally" % droppedWordsCounter.value)
print("Appending to " + os.path.abspath(outputPath))
with open(outputPath, 'a') as f:
f.write(counts + "\n")
Expand Down
Loading