Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SPARK-1663. Corrections for several compile errors in streaming code examples, and updates to follow API changes #589

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 36 additions & 26 deletions docs/streaming-programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,16 +76,19 @@ Besides Spark's configuration, we specify that any DStream will be processed
in 1 second batches.

{% highlight scala %}
// Create a StreamingContext with a SparkConf configuration
val ssc = new StreamingContext(sparkConf, Seconds(1))
import org.apache.spark.api.java.function._
import org.apache.spark.streaming._
import org.apache.spark.streaming.api._
// Create a StreamingContext with a local master
val ssc = new StreamingContext("local", "NetworkWordCount", Seconds(1))
{% endhighlight %}

Using this context, we then create a new DStream
by specifying the IP address and port of the data server.

{% highlight scala %}
// Create a DStream that will connect to serverIP:serverPort
val lines = ssc.socketTextStream(serverIP, serverPort)
// Create a DStream that will connect to serverIP:serverPort, like localhost:9999
val lines = ssc.socketTextStream("localhost", 9999)
{% endhighlight %}

This `lines` DStream represents the stream of data that will be received from the data
Expand All @@ -103,6 +106,7 @@ each line will be split into multiple words and the stream of words is represent
`words` DStream. Next, we want to count these words.

{% highlight scala %}
import org.apache.spark.streaming.StreamingContext._
// Count each word in each batch
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)
Expand Down Expand Up @@ -138,16 +142,20 @@ functionality. Besides Spark's configuration, we specify that any DStream would
in 1 second batches.

{% highlight java %}
// Create a StreamingContext with a SparkConf configuration
JavaStreamingContext jssc = StreamingContext(sparkConf, new Duration(1000))
import org.apache.spark.api.java.function.*;
import org.apache.spark.streaming.*;
import org.apache.spark.streaming.api.java.*;
import scala.Tuple2;
// Create a StreamingContext with a local master
JavaStreamingContext jssc = new JavaStreamingContext("local", "JavaNetworkWordCount", new Duration(1000))
{% endhighlight %}

Using this context, we then create a new DStream
by specifying the IP address and port of the data server.

{% highlight java %}
// Create a DStream that will connect to serverIP:serverPort
JavaDStream<String> lines = jssc.socketTextStream(serverIP, serverPort);
// Create a DStream that will connect to serverIP:serverPort, like localhost:9999
JavaDStream<String> lines = jssc.socketTextStream("localhost", 9999);
{% endhighlight %}

This `lines` DStream represents the stream of data that will be received from the data
Expand All @@ -159,7 +167,7 @@ space into words.
JavaDStream<String> words = lines.flatMap(
new FlatMapFunction<String, String>() {
@Override public Iterable<String> call(String x) {
return Lists.newArrayList(x.split(" "));
return Arrays.asList(x.split(" "));
}
});
{% endhighlight %}
Expand Down Expand Up @@ -359,7 +367,7 @@ as explained earlier. Finally, the last two parameters are needed to deploy your
if running in distributed mode, as described in the
[Spark programming guide](scala-programming-guide.html#deploying-code-on-a-cluster).
Additionally, the underlying SparkContext can be accessed as
`streamingContext.sparkContext`.
`ssc.sparkContext`.

The batch interval must be set based on the latency requirements of your application
and available cluster resources. See the [Performance Tuning](#setting-the-right-batch-size)
Expand Down Expand Up @@ -399,7 +407,7 @@ These operations are discussed in detail in later sections.

## Input Sources

We have already taken a look at the `streamingContext.socketTextStream(...)` in the [quick
We have already taken a look at the `ssc.socketTextStream(...)` in the [quick
example](#a-quick-example) which creates a DStream from text
data received over a TCP socket connection. Besides sockets, the core Spark Streaming API provides
methods for creating DStreams from files and Akka actors as input sources.
Expand All @@ -409,12 +417,12 @@ Specifically, for files, the DStream can be created as
<div class="codetabs">
<div data-lang="scala">
{% highlight scala %}
streamingContext.fileStream(dataDirectory)
ssc.fileStream(dataDirectory)
{% endhighlight %}
</div>
<div data-lang="java">
{% highlight java %}
javaStreamingContext.fileStream(dataDirectory);
jssc.fileStream(dataDirectory);
{% endhighlight %}
</div>
</div>
Expand Down Expand Up @@ -443,13 +451,13 @@ project dependencies, you can create a DStream from Kafka as
<div data-lang="scala">
{% highlight scala %}
import org.apache.spark.streaming.kafka._
KafkaUtils.createStream(streamingContext, kafkaParams, ...)
KafkaUtils.createStream(ssc, kafkaParams, ...)
{% endhighlight %}
</div>
<div data-lang="java">
{% highlight java %}
import org.apache.spark.streaming.kafka.*
KafkaUtils.createStream(javaStreamingContext, kafkaParams, ...);
import org.apache.spark.streaming.kafka.*;
KafkaUtils.createStream(jssc, kafkaParams, ...);
{% endhighlight %}
</div>
</div>
Expand Down Expand Up @@ -578,13 +586,14 @@ val runningCounts = pairs.updateStateByKey[Int](updateFunction _)
<div data-lang="java" markdown="1">

{% highlight java %}
import com.google.common.base.Optional;
Function2<List<Integer>, Optional<Integer>, Optional<Integer>> updateFunction =
new Function2<List<Integer>, Optional<Integer>, Optional<Integer>>() {
@Override public Optional<Integer> call(List<Integer> values, Optional<Integer> state) {
Integer newSum = ... // add the new values with the previous running count to get the new count
return Optional.of(newSum)
return Optional.of(newSum);
}
}
};
{% endhighlight %}

This is applied on a DStream containing words (say, the `pairs` DStream containing `(word,
Expand Down Expand Up @@ -617,9 +626,9 @@ spam information (maybe generated with Spark as well) and then filtering based o
<div data-lang="scala" markdown="1">

{% highlight scala %}
val spamInfoRDD = sparkContext.hadoopFile(...) // RDD containing spam information
val spamInfoRDD = ssc.sparkContext.newAPIHadoopRDD(...) // RDD containing spam information

val cleanedDStream = inputDStream.transform(rdd => {
val cleanedDStream = wordCounts.transform(rdd => {
rdd.join(spamInfoRDD).filter(...) // join data stream with spam information to do data cleaning
...
})
Expand All @@ -629,13 +638,14 @@ val cleanedDStream = inputDStream.transform(rdd => {
<div data-lang="java" markdown="1">

{% highlight java %}
import org.apache.spark.streaming.api.java.*;
// RDD containing spam information
JavaPairRDD<String, Double> spamInfoRDD = javaSparkContext.hadoopFile(...);
final JavaPairRDD<String, Double> spamInfoRDD = jssc.sparkContext().newAPIHadoopRDD(...);

JavaPairDStream<String, Integer> cleanedDStream = inputDStream.transform(
JavaPairDStream<String, Integer> cleanedDStream = wordCounts.transform(
new Function<JavaPairRDD<String, Integer>, JavaPairRDD<String, Integer>>() {
@Override public JavaPairRDD<String, Integer> call(JavaPairRDD<String, Integer> rdd) throws Exception {
rdd.join(spamInfoRDD).filter(...) // join data stream with spam information to do data cleaning
rdd.join(spamInfoRDD).filter(...); // join data stream with spam information to do data cleaning
...
}
});
Expand Down Expand Up @@ -684,7 +694,7 @@ operation `reduceByKeyAndWindow`.

{% highlight scala %}
// Reduce last 30 seconds of data, every 10 seconds
val windowedWordCounts = pairs.reduceByKeyAndWindow(_ + _, Seconds(30), Seconds(10))
val windowedWordCounts = pairs.reduceByKeyAndWindow((a:Int,b:Int) => (a + b), Seconds(30), Seconds(10))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Needs a space after the , - not a big deal I can fix on merge.

{% endhighlight %}

</div>
Expand All @@ -699,7 +709,7 @@ Function2<Integer, Integer, Integer> reduceFunc = new Function2<Integer, Integer
};

// Reduce last 30 seconds of data, every 10 seconds
JavaPairDStream<String, Integer> windowedWordCounts = pair.reduceByKeyAndWindow(reduceFunc, new Duration(30000), new Duration(10000));
JavaPairDStream<String, Integer> windowedWordCounts = pairs.reduceByKeyAndWindow(reduceFunc, new Duration(30000), new Duration(10000));
{% endhighlight %}

</div>
Expand Down Expand Up @@ -1087,7 +1097,7 @@ This behavior is made simple by using `JavaStreamingContext.getOrCreate`. This i
{% highlight java %}
// Create a factory object that can create a and setup a new JavaStreamingContext
JavaStreamingContextFactory contextFactory = new JavaStreamingContextFactory() {
JavaStreamingContextFactory create() {
@Override public JavaStreamingContext create() {
JavaStreamingContext jssc = new JavaStreamingContext(...); // new context
JavaDStream<String> lines = jssc.socketTextStream(...); // create DStreams
...
Expand Down