Skip to content

Commit

Permalink
[SPARK-12087][STREAMING] Create new JobConf for every batch in saveAs…
Browse files Browse the repository at this point in the history
…HadoopFiles

The JobConf object created in `DStream.saveAsHadoopFiles` is used concurrently in multiple places:
* The JobConf is updated by `RDD.saveAsHadoopFile()` before the job is launched
* The JobConf is serialized as part of the DStream checkpoints.
These concurrent accesses (updating in one thread, while the another thread is serializing it) can lead to concurrentModidicationException in the underlying Java hashmap using in the internal Hadoop Configuration object.

The solution is to create a new JobConf in every batch, that is updated by `RDD.saveAsHadoopFile()`, while the checkpointing serializes the original JobConf.

Tests to be added in #9988 will fail reliably without this patch. Keeping this patch really small to make sure that it can be added to previous branches.

Author: Tathagata Das <[email protected]>

Closes #10088 from tdas/SPARK-12087.
  • Loading branch information
tdas authored and zsxwing committed Dec 2, 2015
1 parent 96691fe commit 8a75a30
Showing 1 changed file with 2 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -730,7 +730,8 @@ class PairDStreamFunctions[K, V](self: DStream[(K, V)])
val serializableConf = new SerializableJobConf(conf)
val saveFunc = (rdd: RDD[(K, V)], time: Time) => {
val file = rddToFileName(prefix, suffix, time)
rdd.saveAsHadoopFile(file, keyClass, valueClass, outputFormatClass, serializableConf.value)
rdd.saveAsHadoopFile(file, keyClass, valueClass, outputFormatClass,
new JobConf(serializableConf.value))
}
self.foreachRDD(saveFunc)
}
Expand Down

0 comments on commit 8a75a30

Please sign in to comment.