Skip to content

Commit

Permalink
Corrections for several compile errors in streaming code examples, an…
Browse files Browse the repository at this point in the history
…d updates to follow API changes
  • Loading branch information
srowen committed Apr 29, 2014
1 parent 8db0f7e commit 65a906b
Showing 1 changed file with 36 additions and 26 deletions.
62 changes: 36 additions & 26 deletions docs/streaming-programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,16 +76,19 @@ Besides Spark's configuration, we specify that any DStream will be processed
in 1 second batches.

{% highlight scala %}
// Create a StreamingContext with a SparkConf configuration
val ssc = new StreamingContext(sparkConf, Seconds(1))
import org.apache.spark.api.java.function._
import org.apache.spark.streaming._
import org.apache.spark.streaming.api._
// Create a StreamingContext with a local master
val ssc = new StreamingContext("local", "NetworkWordCount", Seconds(1))
{% endhighlight %}

Using this context, we then create a new DStream
by specifying the IP address and port of the data server.

{% highlight scala %}
// Create a DStream that will connect to serverIP:serverPort
val lines = ssc.socketTextStream(serverIP, serverPort)
// Create a DStream that will connect to serverIP:serverPort, like localhost:9999
val lines = ssc.socketTextStream("localhost", 9999)
{% endhighlight %}

This `lines` DStream represents the stream of data that will be received from the data
Expand All @@ -103,6 +106,7 @@ each line will be split into multiple words and the stream of words is represent
`words` DStream. Next, we want to count these words.

{% highlight scala %}
import org.apache.spark.streaming.StreamingContext._
// Count each word in each batch
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)
Expand Down Expand Up @@ -138,16 +142,20 @@ functionality. Besides Spark's configuration, we specify that any DStream would
in 1 second batches.

{% highlight java %}
// Create a StreamingContext with a SparkConf configuration
JavaStreamingContext jssc = StreamingContext(sparkConf, new Duration(1000))
import org.apache.spark.api.java.function.*;
import org.apache.spark.streaming.*;
import org.apache.spark.streaming.api.java.*;
import scala.Tuple2;
// Create a StreamingContext with a local master
JavaStreamingContext jssc = new JavaStreamingContext("local", "JavaNetworkWordCount", new Duration(1000))
{% endhighlight %}

Using this context, we then create a new DStream
by specifying the IP address and port of the data server.

{% highlight java %}
// Create a DStream that will connect to serverIP:serverPort
JavaDStream<String> lines = jssc.socketTextStream(serverIP, serverPort);
// Create a DStream that will connect to serverIP:serverPort, like localhost:9999
JavaDStream<String> lines = jssc.socketTextStream("localhost", 9999);
{% endhighlight %}

This `lines` DStream represents the stream of data that will be received from the data
Expand All @@ -159,7 +167,7 @@ space into words.
JavaDStream<String> words = lines.flatMap(
new FlatMapFunction<String, String>() {
@Override public Iterable<String> call(String x) {
return Lists.newArrayList(x.split(" "));
return Arrays.asList(x.split(" "));
}
});
{% endhighlight %}
Expand Down Expand Up @@ -359,7 +367,7 @@ as explained earlier. Finally, the last two parameters are needed to deploy your
if running in distributed mode, as described in the
[Spark programming guide](scala-programming-guide.html#deploying-code-on-a-cluster).
Additionally, the underlying SparkContext can be accessed as
`streamingContext.sparkContext`.
`ssc.sparkContext`.

The batch interval must be set based on the latency requirements of your application
and available cluster resources. See the [Performance Tuning](#setting-the-right-batch-size)
Expand Down Expand Up @@ -399,7 +407,7 @@ These operations are discussed in detail in later sections.

## Input Sources

We have already taken a look at the `streamingContext.socketTextStream(...)` in the [quick
We have already taken a look at the `ssc.socketTextStream(...)` in the [quick
example](#a-quick-example) which creates a DStream from text
data received over a TCP socket connection. Besides sockets, the core Spark Streaming API provides
methods for creating DStreams from files and Akka actors as input sources.
Expand All @@ -409,12 +417,12 @@ Specifically, for files, the DStream can be created as
<div class="codetabs">
<div data-lang="scala">
{% highlight scala %}
streamingContext.fileStream(dataDirectory)
ssc.fileStream(dataDirectory)
{% endhighlight %}
</div>
<div data-lang="java">
{% highlight java %}
javaStreamingContext.fileStream(dataDirectory);
jssc.fileStream(dataDirectory);
{% endhighlight %}
</div>
</div>
Expand Down Expand Up @@ -443,13 +451,13 @@ project dependencies, you can create a DStream from Kafka as
<div data-lang="scala">
{% highlight scala %}
import org.apache.spark.streaming.kafka._
KafkaUtils.createStream(streamingContext, kafkaParams, ...)
KafkaUtils.createStream(ssc, kafkaParams, ...)
{% endhighlight %}
</div>
<div data-lang="java">
{% highlight java %}
import org.apache.spark.streaming.kafka.*
KafkaUtils.createStream(javaStreamingContext, kafkaParams, ...);
import org.apache.spark.streaming.kafka.*;
KafkaUtils.createStream(jssc, kafkaParams, ...);
{% endhighlight %}
</div>
</div>
Expand Down Expand Up @@ -578,13 +586,14 @@ val runningCounts = pairs.updateStateByKey[Int](updateFunction _)
<div data-lang="java" markdown="1">

{% highlight java %}
import com.google.common.base.Optional;
Function2<List<Integer>, Optional<Integer>, Optional<Integer>> updateFunction =
new Function2<List<Integer>, Optional<Integer>, Optional<Integer>>() {
@Override public Optional<Integer> call(List<Integer> values, Optional<Integer> state) {
Integer newSum = ... // add the new values with the previous running count to get the new count
return Optional.of(newSum)
return Optional.of(newSum);
}
}
};
{% endhighlight %}

This is applied on a DStream containing words (say, the `pairs` DStream containing `(word,
Expand Down Expand Up @@ -617,9 +626,9 @@ spam information (maybe generated with Spark as well) and then filtering based o
<div data-lang="scala" markdown="1">

{% highlight scala %}
val spamInfoRDD = sparkContext.hadoopFile(...) // RDD containing spam information
val spamInfoRDD = ssc.sparkContext.newAPIHadoopRDD(...) // RDD containing spam information

val cleanedDStream = inputDStream.transform(rdd => {
val cleanedDStream = wordCounts.transform(rdd => {
rdd.join(spamInfoRDD).filter(...) // join data stream with spam information to do data cleaning
...
})
Expand All @@ -629,13 +638,14 @@ val cleanedDStream = inputDStream.transform(rdd => {
<div data-lang="java" markdown="1">

{% highlight java %}
import org.apache.spark.streaming.api.java.*;
// RDD containing spam information
JavaPairRDD<String, Double> spamInfoRDD = javaSparkContext.hadoopFile(...);
final JavaPairRDD<String, Double> spamInfoRDD = jssc.sparkContext().newAPIHadoopRDD(...);

JavaPairDStream<String, Integer> cleanedDStream = inputDStream.transform(
JavaPairDStream<String, Integer> cleanedDStream = wordCounts.transform(
new Function<JavaPairRDD<String, Integer>, JavaPairRDD<String, Integer>>() {
@Override public JavaPairRDD<String, Integer> call(JavaPairRDD<String, Integer> rdd) throws Exception {
rdd.join(spamInfoRDD).filter(...) // join data stream with spam information to do data cleaning
rdd.join(spamInfoRDD).filter(...); // join data stream with spam information to do data cleaning
...
}
});
Expand Down Expand Up @@ -684,7 +694,7 @@ operation `reduceByKeyAndWindow`.

{% highlight scala %}
// Reduce last 30 seconds of data, every 10 seconds
val windowedWordCounts = pairs.reduceByKeyAndWindow(_ + _, Seconds(30), Seconds(10))
val windowedWordCounts = pairs.reduceByKeyAndWindow((a:Int,b:Int) => (a + b), Seconds(30), Seconds(10))
{% endhighlight %}

</div>
Expand All @@ -699,7 +709,7 @@ Function2<Integer, Integer, Integer> reduceFunc = new Function2<Integer, Integer
};

// Reduce last 30 seconds of data, every 10 seconds
JavaPairDStream<String, Integer> windowedWordCounts = pair.reduceByKeyAndWindow(reduceFunc, new Duration(30000), new Duration(10000));
JavaPairDStream<String, Integer> windowedWordCounts = pairs.reduceByKeyAndWindow(reduceFunc, new Duration(30000), new Duration(10000));
{% endhighlight %}

</div>
Expand Down Expand Up @@ -1087,7 +1097,7 @@ This behavior is made simple by using `JavaStreamingContext.getOrCreate`. This i
{% highlight java %}
// Create a factory object that can create a and setup a new JavaStreamingContext
JavaStreamingContextFactory contextFactory = new JavaStreamingContextFactory() {
JavaStreamingContextFactory create() {
@Override public JavaStreamingContext create() {
JavaStreamingContext jssc = new JavaStreamingContext(...); // new context
JavaDStream<String> lines = jssc.socketTextStream(...); // create DStreams
...
Expand Down

0 comments on commit 65a906b

Please sign in to comment.