Skip to content

Commit

Permalink
add wrapper for foreachRDD()
Browse files Browse the repository at this point in the history
  • Loading branch information
davies committed Oct 10, 2014
1 parent bebeb4a commit 02d0575
Showing 1 changed file with 3 additions and 0 deletions.
3 changes: 3 additions & 0 deletions python/pyspark/streaming/dstream.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,9 @@ def foreachRDD(self, func):
"""
Apply a function to each RDD in this DStream.
"""
if func.func_code.co_argcount == 1:
old_func = func
func = lambda t, rdd: old_func(rdd)
jfunc = TransformFunction(self._sc, func, self._jrdd_deserializer)
api = self._ssc._jvm.PythonDStream
api.callForeachRDD(self._jdstream, jfunc)
Expand Down

0 comments on commit 02d0575

Please sign in to comment.