Skip to content

Commit

Permalink
added mapValues and flatMapVaules WIP for glom and mapPartitions test
Browse files Browse the repository at this point in the history
  • Loading branch information
giwa committed Sep 20, 2014
1 parent 953deb0 commit f67cf57
Showing 1 changed file with 11 additions and 5 deletions.
16 changes: 11 additions & 5 deletions python/pyspark/streaming/dstream.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,12 @@ def context(self):
"""
return self._ssc

def context(self):
"""
Return the StreamingContext associated with this DStream
"""
return self._ssc

def count(self):
"""
Return a new DStream which contains the number of elements in this DStream.
Expand All @@ -56,7 +62,7 @@ def _sum(self):
"""
Add up the elements in this DStream.
"""
return self._mapPartitions(lambda x: [sum(x)]).reduce(operator.add)
return self.mapPartitions(lambda x: [sum(x)]).reduce(operator.add)

def print_(self, label=None):
"""
Expand All @@ -75,7 +81,7 @@ def filter(self, f):
Return a new DStream containing only the elements that satisfy predicate.
"""
def func(iterator): return ifilter(f, iterator)
return self._mapPartitions(func)
return self.mapPartitions(func)

def flatMap(self, f, preservesPartitioning=False):
"""
Expand All @@ -86,7 +92,7 @@ def func(s, iterator):
return chain.from_iterable(imap(f, iterator))
return self._mapPartitionsWithIndex(func, preservesPartitioning)

def map(self, f):
def map(self, f, preservesPartitioning=False):
"""
Return a new DStream by applying a function to each element of DStream.
"""
Expand Down Expand Up @@ -146,7 +152,7 @@ def combineLocally(iterator):
else:
combiners[k] = mergeValue(combiners[k], v)
return combiners.iteritems()
locally_combined = self._mapPartitions(combineLocally)
locally_combined = self.mapPartitions(combineLocally)
shuffled = locally_combined.partitionBy(numPartitions)

def _mergeCombiners(iterator):
Expand Down Expand Up @@ -474,4 +480,4 @@ def _jdstream(self):
return self._jdstream_val

def _is_pipelinable(self):
return not (self.is_cached)
return not self.is_cached

0 comments on commit f67cf57

Please sign in to comment.