@@ -1285,60 +1293,183 @@ Spark Streaming has to *checkpoints* enough information of the streaming applica
tolerant storage system such that it can recover from failures if required. The information
checkpointed are as follows:
-- **Metadata checkpointing**: Saving of the information defining the streaming computation to
+- *Metadata checkpointing* - Saving of the information defining the streaming computation to
fault-tolerant storage like HDFS. This is used to recover from failure of the node running the
driver of the streaming application (discussed in detail later). Some of the information
considered as metadata are.
- + *Configuration*: The configuration that were used to create the streaming application.
- + *DStream operations*: The set of DStream operations that define the streaming application.
- + *Incomplete batches*: Batches who jobs are queued but have not completed yet.
-- **Data checkpointing**: Saving of the generated RDDs to reliable storage. This is useful
- to limit the length of the chain of dependencies between RDDs, and efficiently recover
- from all kinds of failures.
-
-Data checkpointing is especially important in case of *statefule* transformations that combine data
-across multiple batches. All window-based operations and the `updateStateByKey` transformation.
-Since stateful transformations have a dependency on previous batches of data, the length of the
-dependency chain can keep increasing with time. To avoid such unbounded increase in recovery time
-(proportional to dependency chain), intermediate RDDs of stateful transformations are periodically
-*checkpointed* to a reliable storage (e.g. HDFS) to cut off the dependency chains.
-
-Note that checkpointing of RDDs incurs the cost of saving to HDFS. This may cause an increase in
-the processing time of those batches where RDDs get checkpointed. Hence, the interval of
-checkpointing needs to be set carefully. At small batch sizes (say 1 second), checkpointing every
-batch may significantly reduce operation throughput. Conversely, checkpointing too infrequently
-causes the lineage and task sizes to grow which may have detrimental effects. Typically, a
-checkpoint interval of 5 - 10 times of sliding interval of a DStream is good setting to try.
-
-To enable both types of checkpointing, the developer has to provide the HDFS directory to which
-the checkpoint information will be saved. This is done by using
-
-{% highlight scala %}
-ssc.checkpoint(hdfsPath) // assuming ssc is the StreamingContext or JavaStreamingContext
-{% endhighlight %}
-
-The interval of checkpointing of a DStream can be set by using
-
-{% highlight scala %}
-dstream.checkpoint(checkpointInterval)
-{% endhighlight %}
+ + *Configuration* - The configuration that were used to create the streaming application.
+ + *DStream operations* - The set of DStream operations that define the streaming application.
+ + *Incomplete batches* - Batches who jobs are queued but have not completed yet.
+- *Data checkpointing* - Saving of the generated RDDs to reliable storage. This is necessary
+ in some *statefule* transformations that combine data across multiple batches. In such
+ transformations, the generated RDDs depends on RDDs of previous batches, which causes the length
+ of the dependency chain to keep increasing with time. To avoid such unbounded increase in recovery
+ time (proportional to dependency chain), intermediate RDDs of stateful transformations are periodically
+ *checkpointed* to a reliable storage (e.g. HDFS) to cut off the dependency chains.
+
+To summarize, metadata checkpointing is primarily needed for recovery from driver failures,
+where as data or RDD checkpointing is necessary even for basic functioning if some stateful
+transformations are used.
+
+#### When to enable Checkpointing
+{:.no_toc}
It is recommended that the checkpointing be enabled by providing a checkpoint directory.
In fact, it must be enabled if any of the following is required from the application.
-- Recovering from failures of the driver node running the application - Metadata checkpoints are used
+- Recovering from failures of the driver running the application - Metadata checkpoints are used
for to recover with progress information.
- Usage of stateful transformations - If either `updateStateByKey` or `reduceByKeyAndWindow` (with
inverse function) is used in the application, then the checkpoint directory must be provided for
- allowing periodic , the checkpoint interval of the DStream is by default set to a multiple of the
- DStream's sliding interval such that its at least 10 seconds.
+ allowing periodic RDD checkpointing.
Note that simple streaming applications without the aforementioned stateful transformations can be
run without enabling checkpointing. The recovery from driver failures will also be partial in
that case (some received but unprocessed data may be lost). This is often acceptable and many run
-Spark Streaming application in this way. Better support for non-Hadoop environments is expected
+Spark Streaming application in this way. Support for non-Hadoop environments is expected
to improve in future.
+#### How to configure Checkpointing
+{:.no_toc}
+
+Checkpointing can be enabled by setting a directory in a fault-tolerant,
+reliable file system (e.g., HDFS, S3, etc.) to which the checkpoint information will be saved.
+This is done by using `streamingContext.checkpoint(checkpointDirectory)`. This will allow you to
+use the aforementioned stateful transformations. Along with it,
+if you want make the application recover from driver failures, you should rewrite your
+streaming application to have the following behavior.
+
+ + When the program is being started for the first time, it will create a new StreamingContext,
+ set up all the streams and then call start().
+ + When the program is being restarted after failure, it will re-create a StreamingContext
+ from the checkpoint data in the checkpoint directory.
+
+
+
+
+This behavior is made simple by using `StreamingContext.getOrCreate`. This is used as follows.
+
+{% highlight scala %}
+// Function to create and setup a new StreamingContext
+def functionToCreateContext(): StreamingContext = {
+ val ssc = new StreamingContext(...) // new context
+ val lines = ssc.socketTextStream(...) // create DStreams
+ ...
+ ssc.checkpoint(checkpointDirectory) // set checkpoint directory
+ ssc
+}
+
+// Get StreamingContext from checkpoint data or create a new one
+val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)
+
+// Do additional setup on context that needs to be done,
+// irrespective of whether it is being started or restarted
+context. ...
+
+// Start the context
+context.start()
+context.awaitTermination()
+{% endhighlight %}
+
+If the `checkpointDirectory` exists, then the context will be recreated from the checkpoint data.
+If the directory does not exist (i.e., running for the first time),
+then the function `functionToCreateContext` will be called to create a new
+context and set up the DStreams. See the Scala example
+[RecoverableNetworkWordCount]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala).
+This example appends the word counts of network data into a file.
+
+
+
+
+This behavior is made simple by using `JavaStreamingContext.getOrCreate`. This is used as follows.
+
+{% highlight java %}
+// Create a factory object that can create a and setup a new JavaStreamingContext
+JavaStreamingContextFactory contextFactory = new JavaStreamingContextFactory() {
+ @Override public JavaStreamingContext create() {
+ JavaStreamingContext jssc = new JavaStreamingContext(...); // new context
+ JavaDStream lines = jssc.socketTextStream(...); // create DStreams
+ ...
+ jssc.checkpoint(checkpointDirectory); // set checkpoint directory
+ return jssc;
+ }
+};
+
+// Get JavaStreamingContext from checkpoint data or create a new one
+JavaStreamingContext context = JavaStreamingContext.getOrCreate(checkpointDirectory, contextFactory);
+
+// Do additional setup on context that needs to be done,
+// irrespective of whether it is being started or restarted
+context. ...
+
+// Start the context
+context.start();
+context.awaitTermination();
+{% endhighlight %}
+
+If the `checkpointDirectory` exists, then the context will be recreated from the checkpoint data.
+If the directory does not exist (i.e., running for the first time),
+then the function `contextFactory` will be called to create a new
+context and set up the DStreams. See the Scala example
+[JavaRecoverableNetworkWordCount]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java).
+This example appends the word counts of network data into a file.
+
+
+
+
+This behavior is made simple by using `StreamingContext.getOrCreate`. This is used as follows.
+
+{% highlight python %}
+# Function to create and setup a new StreamingContext
+def functionToCreateContext():
+ sc = SparkContext(...) # new context
+ ssc = new StreamingContext(...)
+ lines = ssc.socketTextStream(...) # create DStreams
+ ...
+ ssc.checkpoint(checkpointDirectory) # set checkpoint directory
+ return ssc
+
+# Get StreamingContext from checkpoint data or create a new one
+context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext)
+
+# Do additional setup on context that needs to be done,
+# irrespective of whether it is being started or restarted
+context. ...
+
+# Start the context
+context.start()
+context.awaitTermination()
+{% endhighlight %}
+
+If the `checkpointDirectory` exists, then the context will be recreated from the checkpoint data.
+If the directory does not exist (i.e., running for the first time),
+then the function `functionToCreateContext` will be called to create a new
+context and set up the DStreams. See the Python example
+[recoverable_network_wordcount.py]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/python/streaming/recoverable_network_wordcount.py).
+This example appends the word counts of network data into a file.
+
+You can also explicitly create a `StreamingContext` from the checkpoint data and start the
+ computation by using `StreamingContext.getOrCreate(checkpointDirectory, None)`.
+
+
+
+
+In addition to using `getOrCreate` one also needs to make sure that the driver process gets
+restarted automatically on failure. This can only be done by the deployment infrastructure that is
+used to run the application. This is further discussed in the [Deployment]
+(#deploying-applications) section.
+
+Another point to note is that checkpointing of RDDs incurs the cost of saving to reliable storage.
+This may cause an increase in the processing time of those batches where RDDs get checkpointed.
+Hence, the interval of
+checkpointing needs to be set carefully. At small batch sizes (say 1 second), checkpointing every
+batch may significantly reduce operation throughput. Conversely, checkpointing too infrequently
+causes the lineage and task sizes to grow which may have detrimental effects. For stateful
+transformations that require RDD checkpointing, the default interval is a multiple of the
+batch interval that is at least 10 seconds. It can be set by using
+`dstream.checkpoint(checkpointInterval)`. Typically, a checkpoint interval of 5 - 10 times of
+sliding interval of a DStream is good setting to try.
+
***
## Deploying Applications
@@ -1354,12 +1485,30 @@ dependencies in the application JAR.
There are a few aspects that require further discussions.
-### Setting up for Driver High Availability Setup
+#### Setting up for Driver Fault Recovery
{:.no_toc}
-
-### Upgrading Application Code
+To automatically recover from a driver failure, the deployment infrastructure that is
+used to run the streaming application must monitor the driver process and relaunch the driver
+if it fails. Usually different [cluster managers](cluster-overview.html#cluster-manager-types)
+have different tools to achieve this. Here are some pointers on how to achieve this in each
+cluster manager.
+
+- *Spark Standalone* - A Spark application can be submitted to run within the Spark Standalone
+ cluster, that is, the application driver itself runs on one of the worker nodes. Furthermore, the
+ Standalone cluster manager can be instructed to *supervise* the driver,
+ and relaunch it if the driver fails either due to non-zero exit code,
+ or due to failure of the node running the driver. See *cluster mode* and *supervise* in the
+ [Spark Standalone guide](spark-standalone.html) for more details.
+- *YARN* - Yarn supports a similar mechanism for automatically restarting an application. Please
+ refer to YARN documentation for more details.
+- *Mesos* - [Marathon](https://github.com/mesosphere/marathon) has been used to achieve this with
+ Mesos.
+
+Also, note that you have to write you streaming application such that it can be restarted. This
+has been explained in the [Checkpointing](#how-to-configure-checkpointing)) section.
+
+#### Upgrading Application Code
{:.no_toc}
-
If a running Spark Streaming application needs to be upgraded with new
application code, then there are two possible mechanism.
@@ -1376,12 +1525,11 @@ processed before shutdown. Then the
upgraded application can be started, which will start processing from the same point where the earlier
application left off. Note that this can be done only with input sources that support source-side buffering
(like Kafka, and Flume) as data needs to be buffered while the previous application down and
-the upgraded application is not yet up.
-
-Note that both these mechanisms will not be able to load earlier checkpoint
-information from the pre-upgrade code. The checkpoint information essentially
-contains serialized Scala/Java/Python objects and trying to deserialize objects with new, modified classes may lead to errors.
-
+the upgraded application is not yet up. And restarting from earlier checkpoint
+information of pre-upgrade code cannot be done. The checkpoint information essentially
+contains serialized Scala/Java/Python objects and trying to deserialize objects with new,
+modified classes may lead to errors. In this case, either start the upgraded app with a different
+checkpoint directory, or delete the previous checkpoint directory.
***
## Monitoring Applications
@@ -1578,34 +1726,112 @@ consistent batch processing times.
***************************************************************************************************
***************************************************************************************************
-# Fault-tolerance Properties
+# Fault-tolerance Semantics
In this section, we are going to discuss the behavior of Spark Streaming application in the event
-of a node failure. To understand this, let us remember the basic fault-tolerance properties of
+of a node failure. To understand this, let us remember the basic fault-tolerance semantics of
Spark's RDDs.
- 1. An RDD is an immutable, deterministically re-computable, distributed dataset. Each RDD
- remembers the lineage of deterministic operations that were used on a fault-tolerant input
- dataset to create it.
- 1. If any partition of an RDD is lost due to a worker node failure, then that partition can be
- re-computed from the original fault-tolerant dataset using the lineage of operations.
-
-Since all data transformations in Spark Streaming are based on RDD operations, as long as the input
-dataset is present, all intermediate data can recomputed. Keeping these properties in mind, we are
-going to discuss the failure semantics in more detail.
-
-## Failure of a Worker Node
-There are two failure behaviors based on which input sources are used.
-
-1. _Using HDFS files as input source_ - Since the data is reliably stored on HDFS, all data can
-re-computed and therefore no data will be lost due to any failure.
-1. _Using any input source that receives data through a network_ - For network-based data sources
-like Kafka and Flume, the received input data is replicated in memory between nodes of the cluster
-(default replication factor is 2). So if a worker node fails, then the system can recompute the
-lost from the the left over copy of the input data. However, if the worker node where a network
-receiver was running fails, then a tiny bit of data may be lost, that is, the data received by
-the system but not yet replicated to other node(s). The receiver will be started on a different
-node and it will continue to receive data.
+1. An RDD is an immutable, deterministically re-computable, distributed dataset. Each RDD
+remembers the lineage of deterministic operations that were used on a fault-tolerant input
+dataset to create it.
+1. If any partition of an RDD is lost due to a worker node failure, then that partition can be
+re-computed from the original fault-tolerant dataset using the lineage of operations.
+1. Assuming all the RDD transformations are deterministic, the data in the final transformed RDD
+will always be the same irrespective of failures in Spark cluster.
+
+Spark operates on data on a fault-tolerant file systems like HDFS or S3. Hence,
+all RDDs generated from the fault-tolerant data is also fault-tolerant. However, this is not
+the case for Spark Streaming as the data in most cases is received over the network (except when
+`fileStream` is used). To achieve the same fault-tolerance properties for all the generated RDDs,
+the received data is replicated among multiple Spark executors in worker nodes in the cluster
+(default replication factor is 2). Though, this leads to two kinds of data in the
+system, that needs to recovered in the event of a failure.
+
+1. *Data received and replicated* - This data survives failure of a single worker node as a copy
+ of it exists on one of the nodes.
+1. *Data received but buffered for replication* - Since this is not replicated,
+ the only way to recover that data is to get it again from the source.
+
+Furthermore, there are two kinds of failures that we should be concerned about.
+
+1. *Failure of a Worker Node* - Any of the workers in the cluster can fail,
+ and all in-memory data on that node will be lost. If there are any receiver running on that
+ node, all buffered data will be lost.
+1. *Failure of the Driver Node* - If the driver node running the Spark Streaming application
+ fails, then obviously the SparkContext is lost, as well as all executors with their in-memory
+ data are lost.
+
+With this basic knowledge, let us understand the fault-tolerance semantics of Spark Streaming.
+
+## Semantics with files as input source
+{:.no_toc}
+In this case, since all the input data is already present in a fault-tolerant files system like
+HDFS, Spark Streaming can always recover from any failure and process all the data. This gives
+*exactly-once* semantics, that all the data will be processed exactly once no matter what fails.
+
+## Semantics with input sources based on receivers
+{:.no_toc}
+Here we will first discuss the semantics in the context of different types of failures. Also,
+we will consider two types of receivers -
+
+1. *Reliable Receiver* - For "reliable" sources that allow sent data to be acknowledged, a
+ "reliable" receiver correctly acknowledges to the source that the data has been received
+ and stored in Spark reliably (that is, replicated successfully). If the
+ receiver fails, the data received but not yet replicated at the time of failure is not
+ acknowledged to the source. If the receiver is restarted, the source would resend the data,
+ and so no data will be lost due to the failure.
+1. *Unreliable Receiver* - These are receivers for sources that do not support acknowledging. Even
+ for reliable sources, one may implement an unreliable receiver that do not go into the complexity
+ of acknowledging correctly. Such receivers can lose data when the receiver fails due to a worker
+ or driver failure.
+
+Depending on what type of receivers are used we achieve the following semantics.
+If a worker node fails, then there is no data loss with reliable receivers. With unreliable
+receivers, data received but not replicated can get lost. If the driver node fails,
+then besides these losses, all the past data that were received and replicated in memory will be
+lost. This will affect the results of the stateful transformations.
+
+To avoid this loss of past received data, Spark 1.2 introduces an experimental feature of write
+ahead logs, that saves the received data to a fault-tolerant storage. With the write ahead logs
+enabled (as discussed earlier) and reliable receivers, there is zero data loss and exactly once
+semantics.
+
+The following table summarizes the semantics under failures.
+
+
+ Deployment Scenario |
+ Worker Failure |
+ Driver Failure |
+
+
+
+ Spark 1.1, or
+ Spark 1.2 without write ahead log
+ |
+
+ Buffered data lost with unreliable receivers
+ Zero data loss with reliable receivers
+ |
+
+ Buffered data lost with unreliable receivers
+ Past data lost with all receivers
+ |
+
+
+ Spark 1.2 with write ahead log |
+ Zero data loss with reliable receivers |
+ Zero data loss with reliable receivers |
+
+
+ |
+ |
+ |
+
+
+
+## Semantics of output operations
+{:.no_toc}
Since all data is modeled as RDDs with their lineage of deterministic operations, any recomputation
always leads to the same result. As a result, all DStream transformations are guaranteed to have
_exactly-once_ semantics. That is, the final transformed result will be same even if there were
@@ -1616,256 +1842,6 @@ Since all data is modeled as RDDs with their lineage of deterministic operations
additional transactions-like mechanisms may be necessary to achieve exactly-once semantics
for output operations.
-## Failure of the Driver Node
-For a streaming application to operate 24/7, Spark Streaming allows a streaming computation
-to be resumed even after the failure of the driver node. Spark Streaming periodically writes the
-metadata information of the DStreams setup through the `StreamingContext` to a
-HDFS directory (can be any Hadoop-compatible filesystem). This periodic
-*checkpointing* can be enabled by setting the checkpoint
-directory using `ssc.checkpoint(
)` as described
-[earlier](#rdd-checkpointing). On failure of the driver node,
-the lost `StreamingContext` can be recovered from this information, and restarted.
-
-To allow a Spark Streaming program to be recoverable, it must be written in a way such that
-it has the following behavior:
-
-1. When the program is being started for the first time, it will create a new StreamingContext,
- set up all the streams and then call start().
-1. When the program is being restarted after failure, it will re-create a StreamingContext
- from the checkpoint data in the checkpoint directory.
-
-
-
-
-This behavior is made simple by using `StreamingContext.getOrCreate`. This is used as follows.
-
-{% highlight scala %}
-// Function to create and setup a new StreamingContext
-def functionToCreateContext(): StreamingContext = {
- val ssc = new StreamingContext(...) // new context
- val lines = ssc.socketTextStream(...) // create DStreams
- ...
- ssc.checkpoint(checkpointDirectory) // set checkpoint directory
- ssc
-}
-
-// Get StreamingContext from checkpoint data or create a new one
-val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)
-
-// Do additional setup on context that needs to be done,
-// irrespective of whether it is being started or restarted
-context. ...
-
-// Start the context
-context.start()
-context.awaitTermination()
-{% endhighlight %}
-
-If the `checkpointDirectory` exists, then the context will be recreated from the checkpoint data.
-If the directory does not exist (i.e., running for the first time),
-then the function `functionToCreateContext` will be called to create a new
-context and set up the DStreams. See the Scala example
-[RecoverableNetworkWordCount]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala).
-This example appends the word counts of network data into a file.
-
-You can also explicitly create a `StreamingContext` from the checkpoint data and start the
- computation by using `new StreamingContext(checkpointDirectory)`.
-
-
-
-
-This behavior is made simple by using `JavaStreamingContext.getOrCreate`. This is used as follows.
-
-{% highlight java %}
-// Create a factory object that can create a and setup a new JavaStreamingContext
-JavaStreamingContextFactory contextFactory = new JavaStreamingContextFactory() {
- @Override public JavaStreamingContext create() {
- JavaStreamingContext jssc = new JavaStreamingContext(...); // new context
- JavaDStream lines = jssc.socketTextStream(...); // create DStreams
- ...
- jssc.checkpoint(checkpointDirectory); // set checkpoint directory
- return jssc;
- }
-};
-
-// Get JavaStreamingContext from checkpoint data or create a new one
-JavaStreamingContext context = JavaStreamingContext.getOrCreate(checkpointDirectory, contextFactory);
-
-// Do additional setup on context that needs to be done,
-// irrespective of whether it is being started or restarted
-context. ...
-
-// Start the context
-context.start();
-context.awaitTermination();
-{% endhighlight %}
-
-If the `checkpointDirectory` exists, then the context will be recreated from the checkpoint data.
-If the directory does not exist (i.e., running for the first time),
-then the function `contextFactory` will be called to create a new
-context and set up the DStreams.
-
-You can also explicitly create a `JavaStreamingContext` from the checkpoint data and start
-the computation by using `new JavaStreamingContext(checkpointDirectory)`.
-
-
-
-
-This behavior is made simple by using `StreamingContext.getOrCreate`. This is used as follows.
-
-{% highlight python %}
-# Function to create and setup a new StreamingContext
-def functionToCreateContext():
- sc = SparkContext(...) # new context
- ssc = new StreamingContext(...)
- lines = ssc.socketTextStream(...) # create DStreams
- ...
- ssc.checkpoint(checkpointDirectory) # set checkpoint directory
- return ssc
-
-# Get StreamingContext from checkpoint data or create a new one
-context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext)
-
-# Do additional setup on context that needs to be done,
-# irrespective of whether it is being started or restarted
-context. ...
-
-# Start the context
-context.start()
-context.awaitTermination()
-{% endhighlight %}
-
-If the `checkpointDirectory` exists, then the context will be recreated from the checkpoint data.
-If the directory does not exist (i.e., running for the first time),
-then the function `functionToCreateContext` will be called to create a new
-context and set up the DStreams. See the Python example
-[recoverable_network_wordcount.py]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/python/streaming/recoverable_network_wordcount.py).
-This example appends the word counts of network data into a file.
-
-You can also explicitly create a `StreamingContext` from the checkpoint data and start the
- computation by using `StreamingContext.getOrCreate(checkpointDirectory, None)`.
-
-
-
-
-
-**Note**: If Spark Streaming and/or the Spark Streaming program is recompiled,
-you *must* create a new `StreamingContext` or `JavaStreamingContext`,
-not recreate from checkpoint data. This is because trying to load a
-context from checkpoint data may fail if the data was generated before recompilation of the
-classes. So, if you are using `getOrCreate`, then make sure that the checkpoint directory is
-explicitly deleted every time recompiled code needs to be launched.
-
-This failure recovery can be done automatically using Spark's
-[standalone cluster mode](spark-standalone.html), which allows the driver of any Spark application
-to be launched within the cluster and be restarted on failure (see
-[supervise mode](spark-standalone.html#launching-applications-inside-the-cluster)). This can be
-tested locally by launching the above example using the supervise mode in a
-local standalone cluster and killing the java process running the driver (will be shown as
-*DriverWrapper* when `jps` is run to show all active Java processes). The driver should be
-automatically restarted, and the word counts will cont
-
-For other deployment environments like Mesos and Yarn, you have to restart the driver through other
-mechanisms.
-
-#### Recovery Semantics
-{:.no_toc}
-
-There are two different failure behaviors based on which input sources are used.
-
-1. _Using HDFS files as input source_ - Since the data is reliably stored on HDFS, all data can
-re-computed and therefore no data will be lost due to any failure.
-1. _Using any input source that receives data through a network_ - The received input data is
-replicated in memory to multiple nodes. Since all the data in the Spark worker's memory is lost
-when the Spark driver fails, the past input data will not be accessible and driver recovers.
-Hence, if stateful and window-based operations are used
-(like `updateStateByKey`, `window`, `countByValueAndWindow`, etc.), then the intermediate state
-will not be recovered completely.
-
-In future releases, we will support full recoverability for all input sources. Note that for
-non-stateful transformations like `map`, `count`, and `reduceByKey`, with _all_ input streams,
-the system, upon restarting, will continue to receive and process new data.
-
-To better understand the behavior of the system under driver failure with a HDFS source, let's
-consider what will happen with a file input stream. Specifically, in the case of the file input
-stream, it will correctly identify new files that were created while the driver was down and
-process them in the same way as it would have if the driver had not failed. To explain further
-in the case of file input stream, we shall use an example. Let's say, files are being generated
-every second, and a Spark Streaming program reads every new file and output the number of lines
-in the file. This is what the sequence of outputs would be with and without a driver failure.
-
-
-
-
- Time |
- Number of lines in input file |
- Output without driver failure |
- Output with driver failure |
-
-
- 1 |
- 10 |
- 10 |
- 10 |
-
-
- 2 |
- 20 |
- 20 |
- 20 |
-
-
- 3 |
- 30 |
- 30 |
- 30 |
-
-
- 4 |
- 40 |
- 40 |
- [DRIVER FAILS] no output |
-
-
- 5 |
- 50 |
- 50 |
- no output |
-
-
- 6 |
- 60 |
- 60 |
- no output |
-
-
- 7 |
- 70 |
- 70 |
- [DRIVER RECOVERS] 40, 50, 60, 70 |
-
-
- 8 |
- 80 |
- 80 |
- 80 |
-
-
- 9 |
- 90 |
- 90 |
- 90 |
-
-
- 10 |
- 100 |
- 100 |
- 100 |
-
-
-
-If the driver had crashed in the middle of the processing of time 3, then it will process time 3
-and output 30 after recovery.
***************************************************************************************************
***************************************************************************************************