diff --git a/python/pyspark/streaming/dstream.py b/python/pyspark/streaming/dstream.py index 0049c1e7a0d5c..e96fac007fa50 100644 --- a/python/pyspark/streaming/dstream.py +++ b/python/pyspark/streaming/dstream.py @@ -20,9 +20,7 @@ def __init__(self, jdstream, ssc, jrdd_deserializer): def count(self): """ - """ - # TODO: make sure count implementation, this different from what pyspark does return self._mapPartitions(lambda i: [sum(1 for _ in i)])._sum() def _sum(self): @@ -79,7 +77,6 @@ def _mapPartitionsWithIndex(self, f, preservesPartitioning=False): def reduce(self, func): """ - """ return self.map(lambda x: (None, x)).reduceByKey(func, 1).map(lambda x: x[1]) @@ -107,12 +104,6 @@ def combineByKey(self, createCombiner, mergeValue, mergeCombiners, def combineLocally(iterator): combiners = {} for x in iterator: - - #TODO for count operation make sure count implementation - # This is different from what pyspark does - #if isinstance(x, int): - # x = ("", x) - (k, v) = x if k not in combiners: combiners[k] = createCombiner(v) @@ -142,6 +133,7 @@ def partitionBy(self, numPartitions, partitionFunc=None): if partitionFunc is None: partitionFunc = lambda x: 0 if x is None else hash(x) + # Transferring O(n) objects to Java is too expensive. Instead, we'll # form the hash buckets in Python, transferring O(numPartitions) objects # to Java. Each object is a (splitNumber, [objects]) pair. @@ -228,7 +220,6 @@ def takeAndPrint(rdd, time): self.foreachRDD(takeAndPrint) - #def transform(self, func): # from utils import RDDFunction # wrapped_func = RDDFunction(self.ctx, self._jrdd_deserializer, func)