Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-11932][STREAMING] Partition previous TrackStateRDD if partitioner not present #9988

Closed
wants to merge 10 commits into from

Conversation

tdas
Copy link
Contributor

@tdas tdas commented Nov 26, 2015

The reason is that TrackStateRDDs generated by trackStateByKey expect the previous batch's TrackStateRDDs to have a partitioner. However, when recovery from DStream checkpoints, the RDDs recovered from RDD checkpoints do not have a partitioner attached to it. This is because RDD checkpoints do not preserve the partitioner (SPARK-12004).

While #9983 solves SPARK-12004 by preserving the partitioner through RDD checkpoints, there may be a non-zero chance that the saving and recovery fails. To be resilient, this PR repartitions the previous state RDD if the partitioner is not detected.

* DStream checkpointing. Note that the implementations of this trait has to implement
* the `setupCheckpointOperation`
*/
trait DStreamCheckpointTester { self: SparkFunSuite =>
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is refactoring where I extract out the testCheckpointedOperation so that it can be used in other unit tests.

@tdas
Copy link
Contributor Author

tdas commented Nov 26, 2015

@zsxwing Please take a look.

@tdas tdas changed the title [SPARK-11932][STREAMING] Partition previous state RDD if partitioner not present [SPARK-11932][STREAMING] Partition previous TrackStateRDD if partitioner not present Nov 26, 2015
@SparkQA
Copy link

SparkQA commented Nov 26, 2015

Test build #46730 has finished for PR 9988 at commit 0c5fe55.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):\n * class SparkPlanInfo(\n * class SQLMetricInfo(\n * case class SparkListenerSQLExecutionStart(\n * case class SparkListenerSQLExecutionEnd(executionId: Long, time: Long)\n

case None =>
TrackStateRDD.createFromPairRDD[K, V, S, E](
spec.getInitialStateRDD().getOrElse(new EmptyRDD[(K, S)](ssc.sparkContext)),
partitioner, validTime
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: validTime should be in a new line.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@SparkQA
Copy link

SparkQA commented Dec 1, 2015

Test build #2133 has finished for PR 9988 at commit 0c5fe55.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Dec 1, 2015

Test build #46930 has finished for PR 9988 at commit a1d3f75.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@zsxwing
Copy link
Member

zsxwing commented Dec 1, 2015

retest this please

@@ -56,7 +172,7 @@ class CheckpointSuite extends TestSuiteBase {

override def afterFunction() {
super.afterFunction()
if (ssc != null) ssc.stop()
StreamingContext.getActive().foreach { _.stop() }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If SparkContext is created in the StreamingContext's constructor, but StreamingContext's.stop is not called, the line cannot stop the SparkContext.

@yhuai
Copy link
Contributor

yhuai commented Dec 1, 2015

test this please

@SparkQA
Copy link

SparkQA commented Dec 1, 2015

Test build #46992 has finished for PR 9988 at commit a1d3f75.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Dec 1, 2015

Test build #47005 has finished for PR 9988 at commit f4fb557.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@@ -730,7 +730,8 @@ class PairDStreamFunctions[K, V](self: DStream[(K, V)])
val serializableConf = new SerializableJobConf(conf)
val saveFunc = (rdd: RDD[(K, V)], time: Time) => {
val file = rddToFileName(prefix, suffix, time)
rdd.saveAsHadoopFile(file, keyClass, valueClass, outputFormatClass, serializableConf.value)
rdd.saveAsHadoopFile(file, keyClass, valueClass, outputFormatClass,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the same change done in #10088

@SparkQA
Copy link

SparkQA commented Dec 2, 2015

Test build #47033 has finished for PR 9988 at commit 42b35b7.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Dec 2, 2015

Test build #2148 has finished for PR 9988 at commit 42b35b7.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

asfgit pushed a commit that referenced this pull request Dec 2, 2015
…HadoopFiles

The JobConf object created in `DStream.saveAsHadoopFiles` is used concurrently in multiple places:
* The JobConf is updated by `RDD.saveAsHadoopFile()` before the job is launched
* The JobConf is serialized as part of the DStream checkpoints.
These concurrent accesses (updating in one thread, while the another thread is serializing it) can lead to concurrentModidicationException in the underlying Java hashmap using in the internal Hadoop Configuration object.

The solution is to create a new JobConf in every batch, that is updated by `RDD.saveAsHadoopFile()`, while the checkpointing serializes the original JobConf.

Tests to be added in #9988 will fail reliably without this patch. Keeping this patch really small to make sure that it can be added to previous branches.

Author: Tathagata Das <[email protected]>

Closes #10088 from tdas/SPARK-12087.

(cherry picked from commit 8a75a30)
Signed-off-by: Shixiong Zhu <[email protected]>
asfgit pushed a commit that referenced this pull request Dec 2, 2015
…HadoopFiles

The JobConf object created in `DStream.saveAsHadoopFiles` is used concurrently in multiple places:
* The JobConf is updated by `RDD.saveAsHadoopFile()` before the job is launched
* The JobConf is serialized as part of the DStream checkpoints.
These concurrent accesses (updating in one thread, while the another thread is serializing it) can lead to concurrentModidicationException in the underlying Java hashmap using in the internal Hadoop Configuration object.

The solution is to create a new JobConf in every batch, that is updated by `RDD.saveAsHadoopFile()`, while the checkpointing serializes the original JobConf.

Tests to be added in #9988 will fail reliably without this patch. Keeping this patch really small to make sure that it can be added to previous branches.

Author: Tathagata Das <[email protected]>

Closes #10088 from tdas/SPARK-12087.

(cherry picked from commit 8a75a30)
Signed-off-by: Shixiong Zhu <[email protected]>
asfgit pushed a commit that referenced this pull request Dec 2, 2015
…HadoopFiles

The JobConf object created in `DStream.saveAsHadoopFiles` is used concurrently in multiple places:
* The JobConf is updated by `RDD.saveAsHadoopFile()` before the job is launched
* The JobConf is serialized as part of the DStream checkpoints.
These concurrent accesses (updating in one thread, while the another thread is serializing it) can lead to concurrentModidicationException in the underlying Java hashmap using in the internal Hadoop Configuration object.

The solution is to create a new JobConf in every batch, that is updated by `RDD.saveAsHadoopFile()`, while the checkpointing serializes the original JobConf.

Tests to be added in #9988 will fail reliably without this patch. Keeping this patch really small to make sure that it can be added to previous branches.

Author: Tathagata Das <[email protected]>

Closes #10088 from tdas/SPARK-12087.

(cherry picked from commit 8a75a30)
Signed-off-by: Shixiong Zhu <[email protected]>
asfgit pushed a commit that referenced this pull request Dec 2, 2015
…HadoopFiles

The JobConf object created in `DStream.saveAsHadoopFiles` is used concurrently in multiple places:
* The JobConf is updated by `RDD.saveAsHadoopFile()` before the job is launched
* The JobConf is serialized as part of the DStream checkpoints.
These concurrent accesses (updating in one thread, while the another thread is serializing it) can lead to concurrentModidicationException in the underlying Java hashmap using in the internal Hadoop Configuration object.

The solution is to create a new JobConf in every batch, that is updated by `RDD.saveAsHadoopFile()`, while the checkpointing serializes the original JobConf.

Tests to be added in #9988 will fail reliably without this patch. Keeping this patch really small to make sure that it can be added to previous branches.

Author: Tathagata Das <[email protected]>

Closes #10088 from tdas/SPARK-12087.
@zsxwing
Copy link
Member

zsxwing commented Dec 2, 2015

retest this please

@SparkQA
Copy link

SparkQA commented Dec 2, 2015

Test build #47037 has finished for PR 9988 at commit 42b35b7.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Dec 2, 2015

Test build #47085 has finished for PR 9988 at commit 34a52f9.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Dec 2, 2015

Test build #2153 has finished for PR 9988 at commit 34a52f9.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Dec 2, 2015

Test build #2152 has finished for PR 9988 at commit 34a52f9.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Dec 3, 2015

Test build #2155 has finished for PR 9988 at commit 96e95ab.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Dec 3, 2015

Test build #2154 has finished for PR 9988 at commit 96e95ab.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Dec 3, 2015

Test build #47109 has finished for PR 9988 at commit 96e95ab.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Dec 3, 2015

Test build #2159 has finished for PR 9988 at commit 53846f5.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Dec 3, 2015

Test build #47133 has finished for PR 9988 at commit 53846f5.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Dec 3, 2015

Test build #2158 has finished for PR 9988 at commit 53846f5.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Dec 3, 2015

Test build #2160 has finished for PR 9988 at commit 53846f5.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@@ -277,7 +277,7 @@ class CheckpointWriter(
val bytes = Checkpoint.serialize(checkpoint, conf)
executor.execute(new CheckpointWriteHandler(
checkpoint.checkpointTime, bytes, clearCheckpointDataLater))
logDebug("Submitted checkpoint of time " + checkpoint.checkpointTime + " writer queue")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note to self: revert this

@tdas
Copy link
Contributor Author

tdas commented Dec 4, 2015

@zsxwing can you take a look at this once again.

@SparkQA
Copy link

SparkQA commented Dec 4, 2015

Test build #47194 has finished for PR 9988 at commit 3136f27.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):\n * class SparkPlanInfo(\n * class SQLMetricInfo(\n * case class SparkListenerSQLExecutionStart(\n * case class SparkListenerSQLExecutionEnd(executionId: Long, time: Long)\n

@SparkQA
Copy link

SparkQA commented Dec 4, 2015

Test build #2167 has finished for PR 9988 at commit 3136f27.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Dec 4, 2015

Test build #2168 has finished for PR 9988 at commit 3136f27.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):\n * class SparkPlanInfo(\n * class SQLMetricInfo(\n * case class SparkListenerSQLExecutionStart(\n * case class SparkListenerSQLExecutionEnd(executionId: Long, time: Long)\n

@@ -25,4 +25,5 @@ log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %t %p %c{

# Ignore messages below warning level from Jetty, because it's a bit verbose
log4j.logger.org.spark-project.jetty=WARN
log4j.appender.org.apache.spark.streaming=DEBUG
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: should revert this

@zsxwing
Copy link
Member

zsxwing commented Dec 7, 2015

LGTM except a nit

@SparkQA
Copy link

SparkQA commented Dec 7, 2015

Test build #47262 has finished for PR 9988 at commit fd6b83e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@tdas
Copy link
Contributor Author

tdas commented Dec 7, 2015

Thanks @zsxwing Merging this to master and 1.6

@asfgit asfgit closed this in 5d80d8c Dec 7, 2015
asfgit pushed a commit that referenced this pull request Dec 7, 2015
…ner not present

The reason is that TrackStateRDDs generated by trackStateByKey expect the previous batch's TrackStateRDDs to have a partitioner. However, when recovery from DStream checkpoints, the RDDs recovered from RDD checkpoints do not have a partitioner attached to it. This is because RDD checkpoints do not preserve the partitioner (SPARK-12004).

While #9983 solves SPARK-12004 by preserving the partitioner through RDD checkpoints, there may be a non-zero chance that the saving and recovery fails. To be resilient, this PR repartitions the previous state RDD if the partitioner is not detected.

Author: Tathagata Das <[email protected]>

Closes #9988 from tdas/SPARK-11932.

(cherry picked from commit 5d80d8c)
Signed-off-by: Tathagata Das <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants