From f67cf57d0edc28b4bafbee9fb5547213dd79b93c Mon Sep 17 00:00:00 2001 From: giwa Date: Mon, 11 Aug 2014 16:34:12 -0700 Subject: [PATCH] added mapValues and flatMapVaules WIP for glom and mapPartitions test --- python/pyspark/streaming/dstream.py | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/python/pyspark/streaming/dstream.py b/python/pyspark/streaming/dstream.py index c1ace350b1bf5..b669d0839f3bf 100644 --- a/python/pyspark/streaming/dstream.py +++ b/python/pyspark/streaming/dstream.py @@ -46,6 +46,12 @@ def context(self): """ return self._ssc + def context(self): + """ + Return the StreamingContext associated with this DStream + """ + return self._ssc + def count(self): """ Return a new DStream which contains the number of elements in this DStream. @@ -56,7 +62,7 @@ def _sum(self): """ Add up the elements in this DStream. """ - return self._mapPartitions(lambda x: [sum(x)]).reduce(operator.add) + return self.mapPartitions(lambda x: [sum(x)]).reduce(operator.add) def print_(self, label=None): """ @@ -75,7 +81,7 @@ def filter(self, f): Return a new DStream containing only the elements that satisfy predicate. """ def func(iterator): return ifilter(f, iterator) - return self._mapPartitions(func) + return self.mapPartitions(func) def flatMap(self, f, preservesPartitioning=False): """ @@ -86,7 +92,7 @@ def func(s, iterator): return chain.from_iterable(imap(f, iterator)) return self._mapPartitionsWithIndex(func, preservesPartitioning) - def map(self, f): + def map(self, f, preservesPartitioning=False): """ Return a new DStream by applying a function to each element of DStream. """ @@ -146,7 +152,7 @@ def combineLocally(iterator): else: combiners[k] = mergeValue(combiners[k], v) return combiners.iteritems() - locally_combined = self._mapPartitions(combineLocally) + locally_combined = self.mapPartitions(combineLocally) shuffled = locally_combined.partitionBy(numPartitions) def _mergeCombiners(iterator): @@ -474,4 +480,4 @@ def _jdstream(self): return self._jdstream_val def _is_pipelinable(self): - return not (self.is_cached) + return not self.is_cached