diff --git a/examples/src/main/python/streaming/network_wordcount.py b/examples/src/main/python/streaming/network_wordcount.py index 77fca7ff7657d..a1458e06f13d2 100644 --- a/examples/src/main/python/streaming/network_wordcount.py +++ b/examples/src/main/python/streaming/network_wordcount.py @@ -19,10 +19,12 @@ filtered_lines = fm_lines.filter(lambda line: "Spark" in line) mapped_lines = fm_lines.map(lambda x: (x, 1)) reduced_lines = mapped_lines.reduce(add) + counted_lines = reduced_lines.count() fm_lines.pyprint() filtered_lines.pyprint() mapped_lines.pyprint() reduced_lines.pyprint() + counted_lines.pyprint() ssc.start() ssc.awaitTermination() diff --git a/python/pyspark/streaming/dstream.py b/python/pyspark/streaming/dstream.py index 3f23e65712368..caa62d44a9069 100644 --- a/python/pyspark/streaming/dstream.py +++ b/python/pyspark/streaming/dstream.py @@ -20,21 +20,14 @@ def __init__(self, jdstream, ssc, jrdd_deserializer): self.ctx = ssc._sc self._jrdd_deserializer = jrdd_deserializer - def generatedRDDs(self): - """ - // RDDs generated, marked as private[streaming] so that testsuites can access it - @transient - """ - pass - def count(self): """ """ #TODO make sure count implementation, thiis different from what pyspark does - return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum().map(lambda x: x[1]) + return self.mapPartitions(lambda i: [sum(1 for _ in i)]).map(lambda x: (None, 1)) - def sum(self): + def _sum(self): """ """ return self.mapPartitions(lambda x: [sum(x)]).reduce(operator.add) @@ -65,8 +58,9 @@ def func(s, iterator): return chain.from_iterable(imap(f, iterator)) def map(self, f, preservesPartitioning=False): """ """ - def func(split, iterator): return imap(f, iterator) - return PipelinedDStream(self, func, preservesPartitioning) + def func(iterator): return imap(f, iterator) + return self.mapPartitions(func) + #return PipelinedDStream(self, func, preservesPartitioning) def mapPartitions(self, f): """ @@ -74,6 +68,12 @@ def mapPartitions(self, f): def func(s, iterator): return f(iterator) return self.mapPartitionsWithIndex(func) + def mapPartitionsWithIndex(self, f, preservesPartitioning=False): + """ + + """ + return PipelinedDStream(self, f, preservesPartitioning) + def reduce(self, func, numPartitions=None): """ @@ -92,8 +92,8 @@ def combineLocally(iterator): #TODO for count operation make sure count implementation # This is different from what pyspark does - if isinstance(x, int): - x = ("", x) + #if isinstance(x, int): + # x = ("", x) (k, v) = x if k not in combiners: @@ -166,12 +166,6 @@ def _defaultReducePartitions(self): return self._jdstream.partitions().size() - def mapPartitionsWithIndex(self, f, preservesPartitioning=False): - """ - - """ - return PipelinedDStream(self, f, preservesPartitioning) - def _defaultReducePartitions(self): """