diff --git a/docs/streaming-programming-guide.md b/docs/streaming-programming-guide.md index f350376f394c5..e5d59dbf3a326 100644 --- a/docs/streaming-programming-guide.md +++ b/docs/streaming-programming-guide.md @@ -585,12 +585,14 @@ These operations are discussed in detail in later sections. *** -## Input DStreams -Input DStreams are DStreams representing the stream of raw data received from streaming sources. +## Input DStreams and Receivers +Input DStreams are DStreams representing the stream of input data received from streaming sources. Spark Streaming has two categories of streaming sources. - *Basic sources*: Sources directly available in the StreamingContext API. Example: file systems, socket connections, and Akka actors. - *Advanced sources*: Sources like Kafka, Flume, Kinesis, Twitter, etc. are available through extra utility classes. These require linking against extra dependencies as discussed in the [linking](#linking) section. +In this section, we will first discuss these different sources and then elaborate on what +are Receivers and how they are run. Every input DStream (except file stream) is associated with a single [Receiver](api/scala/index.html#org.apache.spark.streaming.receiver.Receiver) object which receives the data from a source and stores it in Spark's memory for processing. So every input DStream receives a single stream of data. Note that in a streaming application, you can create multiple input DStreams to receive multiple streams of data in parallel. This is discussed later in the [Performance Tuning](#level-of-parallelism-in-data-receiving) section. @@ -613,8 +615,8 @@ A receiver is run within a Spark worker/executor as a long-running task, hence i ### Basic Sources {:.no_toc} -We have already taken a look at the `ssc.socketTextStream(...)` in the [quick -example](#a-quick-example) which creates a DStream from text +We have already taken a look at the `ssc.socketTextStream(...)` in the [quick example](#a-quick-example) +which creates a DStream from text data received over a TCP socket connection. Besides sockets, the StreamingContext API provides methods for creating DStreams from files and Akka actors as input sources. @@ -657,7 +659,8 @@ methods for creating DStreams from files and Akka actors as input sources. For more details on streams from sockets, files, and actors, see the API documentations of the relevant functions in [StreamingContext](api/scala/index.html#org.apache.spark.streaming.StreamingContext) for -Scala and [JavaStreamingContext](api/java/index.html?org/apache/spark/streaming/api/java/JavaStreamingContext.html) for Java. +Scala, [JavaStreamingContext](api/java/index.html?org/apache/spark/streaming/api/java/JavaStreamingContext.html) +for Java, and [StreamingContext]. ### Advanced Sources {:.no_toc} @@ -698,11 +701,13 @@ and it in the classpath. Some of these advanced sources are as follows. - **Twitter:** Spark Streaming's TwitterUtils uses Twitter4j 3.0.3 to get the public stream of tweets using - [Twitter's Streaming API](https://dev.twitter.com/docs/streaming-apis). Authentication information - can be provided by any of the [methods](http://twitter4j.org/en/configuration.html) supported by - Twitter4J library. You can either get the public stream, or get the filtered stream based on a - keywords. See the API documentation ([Scala](api/scala/index.html#org.apache.spark.streaming.twitter.TwitterUtils$), [Java](api/java/index.html?org/apache/spark/streaming/twitter/TwitterUtils.html)) and examples ([TwitterPopularTags]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterPopularTags.scala) and - [TwitterAlgebirdCMS]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdCMS.scala)). + [Twitter's Streaming API](https://dev.twitter.com/docs/streaming-apis). Authentication information + can be provided by any of the [methods](http://twitter4j.org/en/configuration.html) supported by + Twitter4J library. You can either get the public stream, or get the filtered stream based on a + keywords. See the API documentation ([Scala](api/scala/index.html#org.apache.spark.streaming.twitter.TwitterUtils$), + [Java](api/java/index.html?org/apache/spark/streaming/twitter/TwitterUtils.html)) and examples + ([TwitterPopularTags]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterPopularTags.scala) + and [TwitterAlgebirdCMS]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdCMS.scala)). - **Flume:** Spark Streaming {{site.SPARK_VERSION_SHORT}} can received data from Flume 1.4.0. See the [Flume Integration Guide](streaming-flume-integration.html) for more details. @@ -718,6 +723,8 @@ Some of these advanced sources are as follows. Input DStreams can also be created out of custom data sources. All you have to do is implement an user-defined **receiver** (see next section to understand what that is) that can receive data from the custom sources and push it into Spark. See the [Custom Receiver Guide](streaming-custom-receivers.html) for details. +### Receivers + *** ## Transformations on DStreams @@ -1075,8 +1082,8 @@ Currently, the following output operations are defined: Output OperationMeaning print() - Prints first ten elements of every batch of data in a DStream on the driver. - This is useful for development and debugging. + Prints first ten elements of every batch of data in a DStream on the driver node running + the streaming application. This is useful for development and debugging.
Note on Python API: This is called pprint() in the Python API. @@ -1108,7 +1115,8 @@ Currently, the following output operations are defined: The most generic output operator that applies a function, func, to each RDD generated from the stream. This function should push the data in each RDD to a external system, like saving the RDD to files, or writing it over the network to a database. Note that the function func is executed - at the driver, and will usually have RDD actions in it that will force the computation of the streaming RDDs. + in the driver process running the streaming application, and will usually have RDD actions in it + that will force the computation of the streaming RDDs. @@ -1122,9 +1130,9 @@ Some of the common mistakes to avoid are as follows. Often writing data to external system requires creating a connection object (e.g. TCP connection to a remote server) and using it to send data to a remote system. For this purpose, a developer may inadvertantly try creating a connection object at -the Spark driver, but try to use it in a Spark worker to save records in the RDDs. -For example, in Scala and Python (Java omitted for brevity), one can write a code like the -following. +the Spark driver (process running the streaming application), but try to use it in a Spark +worker/executor to save records in the RDDs. For example, in Scala and Python (Java omitted for +brevity), one can write a code like the following.
@@ -1285,60 +1293,183 @@ Spark Streaming has to *checkpoints* enough information of the streaming applica tolerant storage system such that it can recover from failures if required. The information checkpointed are as follows: -- **Metadata checkpointing**: Saving of the information defining the streaming computation to +- *Metadata checkpointing* - Saving of the information defining the streaming computation to fault-tolerant storage like HDFS. This is used to recover from failure of the node running the driver of the streaming application (discussed in detail later). Some of the information considered as metadata are. - + *Configuration*: The configuration that were used to create the streaming application. - + *DStream operations*: The set of DStream operations that define the streaming application. - + *Incomplete batches*: Batches who jobs are queued but have not completed yet. -- **Data checkpointing**: Saving of the generated RDDs to reliable storage. This is useful - to limit the length of the chain of dependencies between RDDs, and efficiently recover - from all kinds of failures. - -Data checkpointing is especially important in case of *statefule* transformations that combine data -across multiple batches. All window-based operations and the `updateStateByKey` transformation. -Since stateful transformations have a dependency on previous batches of data, the length of the -dependency chain can keep increasing with time. To avoid such unbounded increase in recovery time -(proportional to dependency chain), intermediate RDDs of stateful transformations are periodically -*checkpointed* to a reliable storage (e.g. HDFS) to cut off the dependency chains. - -Note that checkpointing of RDDs incurs the cost of saving to HDFS. This may cause an increase in -the processing time of those batches where RDDs get checkpointed. Hence, the interval of -checkpointing needs to be set carefully. At small batch sizes (say 1 second), checkpointing every -batch may significantly reduce operation throughput. Conversely, checkpointing too infrequently -causes the lineage and task sizes to grow which may have detrimental effects. Typically, a -checkpoint interval of 5 - 10 times of sliding interval of a DStream is good setting to try. - -To enable both types of checkpointing, the developer has to provide the HDFS directory to which -the checkpoint information will be saved. This is done by using - -{% highlight scala %} -ssc.checkpoint(hdfsPath) // assuming ssc is the StreamingContext or JavaStreamingContext -{% endhighlight %} - -The interval of checkpointing of a DStream can be set by using - -{% highlight scala %} -dstream.checkpoint(checkpointInterval) -{% endhighlight %} + + *Configuration* - The configuration that were used to create the streaming application. + + *DStream operations* - The set of DStream operations that define the streaming application. + + *Incomplete batches* - Batches who jobs are queued but have not completed yet. +- *Data checkpointing* - Saving of the generated RDDs to reliable storage. This is necessary + in some *statefule* transformations that combine data across multiple batches. In such + transformations, the generated RDDs depends on RDDs of previous batches, which causes the length + of the dependency chain to keep increasing with time. To avoid such unbounded increase in recovery + time (proportional to dependency chain), intermediate RDDs of stateful transformations are periodically + *checkpointed* to a reliable storage (e.g. HDFS) to cut off the dependency chains. + +To summarize, metadata checkpointing is primarily needed for recovery from driver failures, +where as data or RDD checkpointing is necessary even for basic functioning if some stateful +transformations are used. + +#### When to enable Checkpointing +{:.no_toc} It is recommended that the checkpointing be enabled by providing a checkpoint directory. In fact, it must be enabled if any of the following is required from the application. -- Recovering from failures of the driver node running the application - Metadata checkpoints are used +- Recovering from failures of the driver running the application - Metadata checkpoints are used for to recover with progress information. - Usage of stateful transformations - If either `updateStateByKey` or `reduceByKeyAndWindow` (with inverse function) is used in the application, then the checkpoint directory must be provided for - allowing periodic , the checkpoint interval of the DStream is by default set to a multiple of the - DStream's sliding interval such that its at least 10 seconds. + allowing periodic RDD checkpointing. Note that simple streaming applications without the aforementioned stateful transformations can be run without enabling checkpointing. The recovery from driver failures will also be partial in that case (some received but unprocessed data may be lost). This is often acceptable and many run -Spark Streaming application in this way. Better support for non-Hadoop environments is expected +Spark Streaming application in this way. Support for non-Hadoop environments is expected to improve in future. +#### How to configure Checkpointing +{:.no_toc} + +Checkpointing can be enabled by setting a directory in a fault-tolerant, +reliable file system (e.g., HDFS, S3, etc.) to which the checkpoint information will be saved. +This is done by using `streamingContext.checkpoint(checkpointDirectory)`. This will allow you to +use the aforementioned stateful transformations. Along with it, +if you want make the application recover from driver failures, you should rewrite your +streaming application to have the following behavior. + + + When the program is being started for the first time, it will create a new StreamingContext, + set up all the streams and then call start(). + + When the program is being restarted after failure, it will re-create a StreamingContext + from the checkpoint data in the checkpoint directory. + +
+
+ +This behavior is made simple by using `StreamingContext.getOrCreate`. This is used as follows. + +{% highlight scala %} +// Function to create and setup a new StreamingContext +def functionToCreateContext(): StreamingContext = { + val ssc = new StreamingContext(...) // new context + val lines = ssc.socketTextStream(...) // create DStreams + ... + ssc.checkpoint(checkpointDirectory) // set checkpoint directory + ssc +} + +// Get StreamingContext from checkpoint data or create a new one +val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _) + +// Do additional setup on context that needs to be done, +// irrespective of whether it is being started or restarted +context. ... + +// Start the context +context.start() +context.awaitTermination() +{% endhighlight %} + +If the `checkpointDirectory` exists, then the context will be recreated from the checkpoint data. +If the directory does not exist (i.e., running for the first time), +then the function `functionToCreateContext` will be called to create a new +context and set up the DStreams. See the Scala example +[RecoverableNetworkWordCount]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala). +This example appends the word counts of network data into a file. + +
+
+ +This behavior is made simple by using `JavaStreamingContext.getOrCreate`. This is used as follows. + +{% highlight java %} +// Create a factory object that can create a and setup a new JavaStreamingContext +JavaStreamingContextFactory contextFactory = new JavaStreamingContextFactory() { + @Override public JavaStreamingContext create() { + JavaStreamingContext jssc = new JavaStreamingContext(...); // new context + JavaDStream lines = jssc.socketTextStream(...); // create DStreams + ... + jssc.checkpoint(checkpointDirectory); // set checkpoint directory + return jssc; + } +}; + +// Get JavaStreamingContext from checkpoint data or create a new one +JavaStreamingContext context = JavaStreamingContext.getOrCreate(checkpointDirectory, contextFactory); + +// Do additional setup on context that needs to be done, +// irrespective of whether it is being started or restarted +context. ... + +// Start the context +context.start(); +context.awaitTermination(); +{% endhighlight %} + +If the `checkpointDirectory` exists, then the context will be recreated from the checkpoint data. +If the directory does not exist (i.e., running for the first time), +then the function `contextFactory` will be called to create a new +context and set up the DStreams. See the Scala example +[JavaRecoverableNetworkWordCount]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java). +This example appends the word counts of network data into a file. + +
+
+ +This behavior is made simple by using `StreamingContext.getOrCreate`. This is used as follows. + +{% highlight python %} +# Function to create and setup a new StreamingContext +def functionToCreateContext(): + sc = SparkContext(...) # new context + ssc = new StreamingContext(...) + lines = ssc.socketTextStream(...) # create DStreams + ... + ssc.checkpoint(checkpointDirectory) # set checkpoint directory + return ssc + +# Get StreamingContext from checkpoint data or create a new one +context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext) + +# Do additional setup on context that needs to be done, +# irrespective of whether it is being started or restarted +context. ... + +# Start the context +context.start() +context.awaitTermination() +{% endhighlight %} + +If the `checkpointDirectory` exists, then the context will be recreated from the checkpoint data. +If the directory does not exist (i.e., running for the first time), +then the function `functionToCreateContext` will be called to create a new +context and set up the DStreams. See the Python example +[recoverable_network_wordcount.py]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/python/streaming/recoverable_network_wordcount.py). +This example appends the word counts of network data into a file. + +You can also explicitly create a `StreamingContext` from the checkpoint data and start the + computation by using `StreamingContext.getOrCreate(checkpointDirectory, None)`. + +
+
+ +In addition to using `getOrCreate` one also needs to make sure that the driver process gets +restarted automatically on failure. This can only be done by the deployment infrastructure that is +used to run the application. This is further discussed in the [Deployment] +(#deploying-applications) section. + +Another point to note is that checkpointing of RDDs incurs the cost of saving to reliable storage. +This may cause an increase in the processing time of those batches where RDDs get checkpointed. +Hence, the interval of +checkpointing needs to be set carefully. At small batch sizes (say 1 second), checkpointing every +batch may significantly reduce operation throughput. Conversely, checkpointing too infrequently +causes the lineage and task sizes to grow which may have detrimental effects. For stateful +transformations that require RDD checkpointing, the default interval is a multiple of the +batch interval that is at least 10 seconds. It can be set by using +`dstream.checkpoint(checkpointInterval)`. Typically, a checkpoint interval of 5 - 10 times of +sliding interval of a DStream is good setting to try. + *** ## Deploying Applications @@ -1354,12 +1485,30 @@ dependencies in the application JAR. There are a few aspects that require further discussions. -### Setting up for Driver High Availability Setup +#### Setting up for Driver Fault Recovery {:.no_toc} - -### Upgrading Application Code +To automatically recover from a driver failure, the deployment infrastructure that is +used to run the streaming application must monitor the driver process and relaunch the driver +if it fails. Usually different [cluster managers](cluster-overview.html#cluster-manager-types) +have different tools to achieve this. Here are some pointers on how to achieve this in each +cluster manager. + +- *Spark Standalone* - A Spark application can be submitted to run within the Spark Standalone + cluster, that is, the application driver itself runs on one of the worker nodes. Furthermore, the + Standalone cluster manager can be instructed to *supervise* the driver, + and relaunch it if the driver fails either due to non-zero exit code, + or due to failure of the node running the driver. See *cluster mode* and *supervise* in the + [Spark Standalone guide](spark-standalone.html) for more details. +- *YARN* - Yarn supports a similar mechanism for automatically restarting an application. Please + refer to YARN documentation for more details. +- *Mesos* - [Marathon](https://github.com/mesosphere/marathon) has been used to achieve this with + Mesos. + +Also, note that you have to write you streaming application such that it can be restarted. This +has been explained in the [Checkpointing](#how-to-configure-checkpointing)) section. + +#### Upgrading Application Code {:.no_toc} - If a running Spark Streaming application needs to be upgraded with new application code, then there are two possible mechanism. @@ -1376,12 +1525,11 @@ processed before shutdown. Then the upgraded application can be started, which will start processing from the same point where the earlier application left off. Note that this can be done only with input sources that support source-side buffering (like Kafka, and Flume) as data needs to be buffered while the previous application down and -the upgraded application is not yet up. - -Note that both these mechanisms will not be able to load earlier checkpoint -information from the pre-upgrade code. The checkpoint information essentially -contains serialized Scala/Java/Python objects and trying to deserialize objects with new, modified classes may lead to errors. - +the upgraded application is not yet up. And restarting from earlier checkpoint +information of pre-upgrade code cannot be done. The checkpoint information essentially +contains serialized Scala/Java/Python objects and trying to deserialize objects with new, +modified classes may lead to errors. In this case, either start the upgraded app with a different +checkpoint directory, or delete the previous checkpoint directory. *** ## Monitoring Applications @@ -1578,34 +1726,112 @@ consistent batch processing times. *************************************************************************************************** *************************************************************************************************** -# Fault-tolerance Properties +# Fault-tolerance Semantics In this section, we are going to discuss the behavior of Spark Streaming application in the event -of a node failure. To understand this, let us remember the basic fault-tolerance properties of +of a node failure. To understand this, let us remember the basic fault-tolerance semantics of Spark's RDDs. - 1. An RDD is an immutable, deterministically re-computable, distributed dataset. Each RDD - remembers the lineage of deterministic operations that were used on a fault-tolerant input - dataset to create it. - 1. If any partition of an RDD is lost due to a worker node failure, then that partition can be - re-computed from the original fault-tolerant dataset using the lineage of operations. - -Since all data transformations in Spark Streaming are based on RDD operations, as long as the input -dataset is present, all intermediate data can recomputed. Keeping these properties in mind, we are -going to discuss the failure semantics in more detail. - -## Failure of a Worker Node -There are two failure behaviors based on which input sources are used. - -1. _Using HDFS files as input source_ - Since the data is reliably stored on HDFS, all data can -re-computed and therefore no data will be lost due to any failure. -1. _Using any input source that receives data through a network_ - For network-based data sources -like Kafka and Flume, the received input data is replicated in memory between nodes of the cluster -(default replication factor is 2). So if a worker node fails, then the system can recompute the -lost from the the left over copy of the input data. However, if the worker node where a network -receiver was running fails, then a tiny bit of data may be lost, that is, the data received by -the system but not yet replicated to other node(s). The receiver will be started on a different -node and it will continue to receive data. +1. An RDD is an immutable, deterministically re-computable, distributed dataset. Each RDD +remembers the lineage of deterministic operations that were used on a fault-tolerant input +dataset to create it. +1. If any partition of an RDD is lost due to a worker node failure, then that partition can be +re-computed from the original fault-tolerant dataset using the lineage of operations. +1. Assuming all the RDD transformations are deterministic, the data in the final transformed RDD +will always be the same irrespective of failures in Spark cluster. + +Spark operates on data on a fault-tolerant file systems like HDFS or S3. Hence, +all RDDs generated from the fault-tolerant data is also fault-tolerant. However, this is not +the case for Spark Streaming as the data in most cases is received over the network (except when +`fileStream` is used). To achieve the same fault-tolerance properties for all the generated RDDs, +the received data is replicated among multiple Spark executors in worker nodes in the cluster +(default replication factor is 2). Though, this leads to two kinds of data in the +system, that needs to recovered in the event of a failure. + +1. *Data received and replicated* - This data survives failure of a single worker node as a copy + of it exists on one of the nodes. +1. *Data received but buffered for replication* - Since this is not replicated, + the only way to recover that data is to get it again from the source. + +Furthermore, there are two kinds of failures that we should be concerned about. + +1. *Failure of a Worker Node* - Any of the workers in the cluster can fail, + and all in-memory data on that node will be lost. If there are any receiver running on that + node, all buffered data will be lost. +1. *Failure of the Driver Node* - If the driver node running the Spark Streaming application + fails, then obviously the SparkContext is lost, as well as all executors with their in-memory + data are lost. + +With this basic knowledge, let us understand the fault-tolerance semantics of Spark Streaming. + +## Semantics with files as input source +{:.no_toc} +In this case, since all the input data is already present in a fault-tolerant files system like +HDFS, Spark Streaming can always recover from any failure and process all the data. This gives +*exactly-once* semantics, that all the data will be processed exactly once no matter what fails. + +## Semantics with input sources based on receivers +{:.no_toc} +Here we will first discuss the semantics in the context of different types of failures. Also, +we will consider two types of receivers - + +1. *Reliable Receiver* - For "reliable" sources that allow sent data to be acknowledged, a + "reliable" receiver correctly acknowledges to the source that the data has been received + and stored in Spark reliably (that is, replicated successfully). If the + receiver fails, the data received but not yet replicated at the time of failure is not + acknowledged to the source. If the receiver is restarted, the source would resend the data, + and so no data will be lost due to the failure. +1. *Unreliable Receiver* - These are receivers for sources that do not support acknowledging. Even + for reliable sources, one may implement an unreliable receiver that do not go into the complexity + of acknowledging correctly. Such receivers can lose data when the receiver fails due to a worker + or driver failure. + +Depending on what type of receivers are used we achieve the following semantics. +If a worker node fails, then there is no data loss with reliable receivers. With unreliable +receivers, data received but not replicated can get lost. If the driver node fails, +then besides these losses, all the past data that were received and replicated in memory will be +lost. This will affect the results of the stateful transformations. + +To avoid this loss of past received data, Spark 1.2 introduces an experimental feature of write +ahead logs, that saves the received data to a fault-tolerant storage. With the write ahead logs +enabled (as discussed earlier) and reliable receivers, there is zero data loss and exactly once +semantics. + +The following table summarizes the semantics under failures. + + + + + + + + + + + + + + + + + + + + + +
Deployment ScenarioWorker FailureDriver Failure
+ Spark 1.1, or
+ Spark 1.2 without write ahead log +
+ Buffered data lost with unreliable receivers
+ Zero data loss with reliable receivers +
+ Buffered data lost with unreliable receivers
+ Past data lost with all receivers +
Spark 1.2 with write ahead logZero data loss with reliable receiversZero data loss with reliable receivers
+ +## Semantics of output operations +{:.no_toc} Since all data is modeled as RDDs with their lineage of deterministic operations, any recomputation always leads to the same result. As a result, all DStream transformations are guaranteed to have _exactly-once_ semantics. That is, the final transformed result will be same even if there were @@ -1616,256 +1842,6 @@ Since all data is modeled as RDDs with their lineage of deterministic operations additional transactions-like mechanisms may be necessary to achieve exactly-once semantics for output operations. -## Failure of the Driver Node -For a streaming application to operate 24/7, Spark Streaming allows a streaming computation -to be resumed even after the failure of the driver node. Spark Streaming periodically writes the -metadata information of the DStreams setup through the `StreamingContext` to a -HDFS directory (can be any Hadoop-compatible filesystem). This periodic -*checkpointing* can be enabled by setting the checkpoint -directory using `ssc.checkpoint()` as described -[earlier](#rdd-checkpointing). On failure of the driver node, -the lost `StreamingContext` can be recovered from this information, and restarted. - -To allow a Spark Streaming program to be recoverable, it must be written in a way such that -it has the following behavior: - -1. When the program is being started for the first time, it will create a new StreamingContext, - set up all the streams and then call start(). -1. When the program is being restarted after failure, it will re-create a StreamingContext - from the checkpoint data in the checkpoint directory. - -
-
- -This behavior is made simple by using `StreamingContext.getOrCreate`. This is used as follows. - -{% highlight scala %} -// Function to create and setup a new StreamingContext -def functionToCreateContext(): StreamingContext = { - val ssc = new StreamingContext(...) // new context - val lines = ssc.socketTextStream(...) // create DStreams - ... - ssc.checkpoint(checkpointDirectory) // set checkpoint directory - ssc -} - -// Get StreamingContext from checkpoint data or create a new one -val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _) - -// Do additional setup on context that needs to be done, -// irrespective of whether it is being started or restarted -context. ... - -// Start the context -context.start() -context.awaitTermination() -{% endhighlight %} - -If the `checkpointDirectory` exists, then the context will be recreated from the checkpoint data. -If the directory does not exist (i.e., running for the first time), -then the function `functionToCreateContext` will be called to create a new -context and set up the DStreams. See the Scala example -[RecoverableNetworkWordCount]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala). -This example appends the word counts of network data into a file. - -You can also explicitly create a `StreamingContext` from the checkpoint data and start the - computation by using `new StreamingContext(checkpointDirectory)`. - -
-
- -This behavior is made simple by using `JavaStreamingContext.getOrCreate`. This is used as follows. - -{% highlight java %} -// Create a factory object that can create a and setup a new JavaStreamingContext -JavaStreamingContextFactory contextFactory = new JavaStreamingContextFactory() { - @Override public JavaStreamingContext create() { - JavaStreamingContext jssc = new JavaStreamingContext(...); // new context - JavaDStream lines = jssc.socketTextStream(...); // create DStreams - ... - jssc.checkpoint(checkpointDirectory); // set checkpoint directory - return jssc; - } -}; - -// Get JavaStreamingContext from checkpoint data or create a new one -JavaStreamingContext context = JavaStreamingContext.getOrCreate(checkpointDirectory, contextFactory); - -// Do additional setup on context that needs to be done, -// irrespective of whether it is being started or restarted -context. ... - -// Start the context -context.start(); -context.awaitTermination(); -{% endhighlight %} - -If the `checkpointDirectory` exists, then the context will be recreated from the checkpoint data. -If the directory does not exist (i.e., running for the first time), -then the function `contextFactory` will be called to create a new -context and set up the DStreams. - -You can also explicitly create a `JavaStreamingContext` from the checkpoint data and start -the computation by using `new JavaStreamingContext(checkpointDirectory)`. - -
-
- -This behavior is made simple by using `StreamingContext.getOrCreate`. This is used as follows. - -{% highlight python %} -# Function to create and setup a new StreamingContext -def functionToCreateContext(): - sc = SparkContext(...) # new context - ssc = new StreamingContext(...) - lines = ssc.socketTextStream(...) # create DStreams - ... - ssc.checkpoint(checkpointDirectory) # set checkpoint directory - return ssc - -# Get StreamingContext from checkpoint data or create a new one -context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext) - -# Do additional setup on context that needs to be done, -# irrespective of whether it is being started or restarted -context. ... - -# Start the context -context.start() -context.awaitTermination() -{% endhighlight %} - -If the `checkpointDirectory` exists, then the context will be recreated from the checkpoint data. -If the directory does not exist (i.e., running for the first time), -then the function `functionToCreateContext` will be called to create a new -context and set up the DStreams. See the Python example -[recoverable_network_wordcount.py]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/python/streaming/recoverable_network_wordcount.py). -This example appends the word counts of network data into a file. - -You can also explicitly create a `StreamingContext` from the checkpoint data and start the - computation by using `StreamingContext.getOrCreate(checkpointDirectory, None)`. - -
- -
- -**Note**: If Spark Streaming and/or the Spark Streaming program is recompiled, -you *must* create a new `StreamingContext` or `JavaStreamingContext`, -not recreate from checkpoint data. This is because trying to load a -context from checkpoint data may fail if the data was generated before recompilation of the -classes. So, if you are using `getOrCreate`, then make sure that the checkpoint directory is -explicitly deleted every time recompiled code needs to be launched. - -This failure recovery can be done automatically using Spark's -[standalone cluster mode](spark-standalone.html), which allows the driver of any Spark application -to be launched within the cluster and be restarted on failure (see -[supervise mode](spark-standalone.html#launching-applications-inside-the-cluster)). This can be -tested locally by launching the above example using the supervise mode in a -local standalone cluster and killing the java process running the driver (will be shown as -*DriverWrapper* when `jps` is run to show all active Java processes). The driver should be -automatically restarted, and the word counts will cont - -For other deployment environments like Mesos and Yarn, you have to restart the driver through other -mechanisms. - -#### Recovery Semantics -{:.no_toc} - -There are two different failure behaviors based on which input sources are used. - -1. _Using HDFS files as input source_ - Since the data is reliably stored on HDFS, all data can -re-computed and therefore no data will be lost due to any failure. -1. _Using any input source that receives data through a network_ - The received input data is -replicated in memory to multiple nodes. Since all the data in the Spark worker's memory is lost -when the Spark driver fails, the past input data will not be accessible and driver recovers. -Hence, if stateful and window-based operations are used -(like `updateStateByKey`, `window`, `countByValueAndWindow`, etc.), then the intermediate state -will not be recovered completely. - -In future releases, we will support full recoverability for all input sources. Note that for -non-stateful transformations like `map`, `count`, and `reduceByKey`, with _all_ input streams, -the system, upon restarting, will continue to receive and process new data. - -To better understand the behavior of the system under driver failure with a HDFS source, let's -consider what will happen with a file input stream. Specifically, in the case of the file input -stream, it will correctly identify new files that were created while the driver was down and -process them in the same way as it would have if the driver had not failed. To explain further -in the case of file input stream, we shall use an example. Let's say, files are being generated -every second, and a Spark Streaming program reads every new file and output the number of lines -in the file. This is what the sequence of outputs would be with and without a driver failure. - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
Time Number of lines in input file Output without driver failure Output with driver failure
1101010
2202020
3303030
44040[DRIVER FAILS]
no output
55050no output
66060no output
77070[DRIVER RECOVERS]
40, 50, 60, 70
8808080
9909090
10100100100
- -If the driver had crashed in the middle of the processing of time 3, then it will process time 3 -and output 30 after recovery. *************************************************************************************************** ***************************************************************************************************