diff --git a/python/pyspark/streaming/context.py b/python/pyspark/streaming/context.py index 00a1ec6f31fec..7879d1b7679d9 100644 --- a/python/pyspark/streaming/context.py +++ b/python/pyspark/streaming/context.py @@ -96,6 +96,9 @@ def _initialize_context(self, sc, duration): return self._jvm.JavaStreamingContext(sc._jsc, self._jduration(duration)) def _jduration(self, seconds): + """ + Create Duration object given number of seconds + """ return self._jvm.Duration(int(seconds * 1000)) @property diff --git a/python/pyspark/streaming/dstream.py b/python/pyspark/streaming/dstream.py index ffcf70cc854ab..acd9f27c46cbe 100644 --- a/python/pyspark/streaming/dstream.py +++ b/python/pyspark/streaming/dstream.py @@ -17,6 +17,7 @@ from itertools import chain, ifilter, imap import operator +import time from datetime import datetime from pyspark import RDD @@ -163,6 +164,29 @@ def takeAndPrint(rdd, time): self.foreachRDD(takeAndPrint) + def first(self): + """ + Return the first RDD in the stream. + """ + return self.take(1)[0] + + def take(self, n): + """ + Return the first `n` RDDs in the stream (will start and stop). + """ + rdds = [] + + def take(rdd, _): + if rdd: + rdds.append(rdd) + if len(rdds) == n: + # FIXME: NPE in JVM + self._ssc.stop(False) + self.foreachRDD(take) + self._ssc.start() + self._ssc.awaitTermination() + return rdds + def collect(self): """ Collect each RDDs into the returned list. @@ -289,12 +313,24 @@ def saveAsPickleFile(rdd, time): return self.foreachRDD(saveAsPickleFile) def transform(self, func): + """ + Return a new DStream in which each RDD is generated by applying a function + on each RDD of 'this' DStream. + """ return TransformedDStream(self, lambda a, t: func(a), True) def transformWithTime(self, func): + """ + Return a new DStream in which each RDD is generated by applying a function + on each RDD of 'this' DStream. + """ return TransformedDStream(self, func, False) def transformWith(self, func, other, keepSerializer=False): + """ + Return a new DStream in which each RDD is generated by applying a function + on each RDD of 'this' DStream and 'other' DStream. + """ jfunc = RDDFunction(self.ctx, lambda a, b, t: func(a, b), self._jrdd_deserializer) dstream = self.ctx._jvm.PythonTransformed2DStream(self._jdstream.dstream(), other._jdstream.dstream(), jfunc) @@ -302,28 +338,114 @@ def transformWith(self, func, other, keepSerializer=False): return DStream(dstream.asJavaDStream(), self._ssc, jrdd_serializer) def repartitions(self, numPartitions): + """ + Return a new DStream with an increased or decreased level of parallelism. Each RDD in the + returned DStream has exactly numPartitions partitions. + """ return self.transform(lambda rdd: rdd.repartition(numPartitions)) + @property + def _slideDuration(self): + """ + Return the slideDuration in seconds of this DStream + """ + return self._jdstream.dstream().slideDuration().milliseconds() / 1000.0 + def union(self, other): + """ + Return a new DStream by unifying data of another DStream with this DStream. + @param other Another DStream having the same interval (i.e., slideDuration) as this DStream. + """ + if self._slideDuration != other._slideDuration: + raise ValueError("the two DStream should have same slide duration") return self.transformWith(lambda a, b: a.union(b), other, True) - def cogroup(self, other): - return self.transformWith(lambda a, b: a.cogroup(b), other) + def cogroup(self, other, numPartitions=None): + """ + Return a new DStream by applying 'cogroup' between RDDs of `this` + DStream and `other` DStream. + + Hash partitioning is used to generate the RDDs with `numPartitions` partitions. + """ + return self.transformWith(lambda a, b: a.cogroup(b, numPartitions), other) + + def join(self, other, numPartitions=None): + """ + Return a new DStream by applying 'join' between RDDs of `this` DStream and + `other` DStream. + + Hash partitioning is used to generate the RDDs with `numPartitions` + partitions. + """ + return self.transformWith(lambda a, b: a.join(b, numPartitions), other) + + def leftOuterJoin(self, other, numPartitions=None): + """ + Return a new DStream by applying 'left outer join' between RDDs of `this` DStream and + `other` DStream. - def leftOuterJoin(self, other): - return self.transformWith(lambda a, b: a.leftOuterJion(b), other) + Hash partitioning is used to generate the RDDs with `numPartitions` + partitions. + """ + return self.transformWith(lambda a, b: a.leftOuterJion(b, numPartitions), other) + + def rightOuterJoin(self, other, numPartitions=None): + """ + Return a new DStream by applying 'right outer join' between RDDs of `this` DStream and + `other` DStream. + + Hash partitioning is used to generate the RDDs with `numPartitions` + partitions. + """ + return self.transformWith(lambda a, b: a.rightOuterJoin(b, numPartitions), other) + + def fullOuterJoin(self, other, numPartitions=None): + """ + Return a new DStream by applying 'full outer join' between RDDs of `this` DStream and + `other` DStream. - def rightOuterJoin(self, other): - return self.transformWith(lambda a, b: a.rightOuterJoin(b), other) + Hash partitioning is used to generate the RDDs with `numPartitions` + partitions. + """ + return self.transformWith(lambda a, b: a.fullOuterJoin(b, numPartitions), other) - def _jtime(self, milliseconds): - return self.ctx._jvm.Time(milliseconds) + def _jtime(self, timestamp): + """ convert datetime or unix_timestamp into Time + """ + if isinstance(timestamp, datetime): + timestamp = time.mktime(timestamp.timetuple()) + return self.ctx._jvm.Time(long(timestamp * 1000)) def slice(self, begin, end): + """ + Return all the RDDs between 'begin' to 'end' (both included) + + `begin`, `end` could be datetime.datetime() or unix_timestamp + """ jrdds = self._jdstream.slice(self._jtime(begin), self._jtime(end)) return [RDD(jrdd, self.ctx, self._jrdd_deserializer) for jrdd in jrdds] + def _check_window(self, window, slide): + duration = self._jdstream.dstream().slideDuration().milliseconds() + if int(window * 1000) % duration != 0: + raise ValueError("windowDuration must be multiple of the slide duration (%d ms)" + % duration) + if slide and int(slide * 1000) % duration != 0: + raise ValueError("slideDuration must be multiple of the slide duration (%d ms)" + % duration) + def window(self, windowDuration, slideDuration=None): + """ + Return a new DStream in which each RDD contains all the elements in seen in a + sliding window of time over this DStream. + + @param windowDuration width of the window; must be a multiple of this DStream's + batching interval + @param slideDuration sliding interval of the window (i.e., the interval after which + the new DStream will generate RDDs); must be a multiple of this + DStream's batching interval + """ + self._check_window(windowDuration, slideDuration) d = self._ssc._jduration(windowDuration) if slideDuration is None: return DStream(self._jdstream.window(d), self._ssc, self._jrdd_deserializer) @@ -331,43 +453,108 @@ def window(self, windowDuration, slideDuration=None): return DStream(self._jdstream.window(d, s), self._ssc, self._jrdd_deserializer) def reduceByWindow(self, reduceFunc, invReduceFunc, windowDuration, slideDuration): + """ + Return a new DStream in which each RDD has a single element generated by reducing all + elements in a sliding window over this DStream. + + if `invReduceFunc` is not None, the reduction is done incrementally + using the old window's reduced value : + 1. reduce the new values that entered the window (e.g., adding new counts) + 2. "inverse reduce" the old values that left the window (e.g., subtracting old counts) + This is more efficient than `invReduceFunc` is None. + + @param reduceFunc associative reduce function + @param invReduceFunc inverse reduce function of `reduceFunc` + @param windowDuration width of the window; must be a multiple of this DStream's + batching interval + @param slideDuration sliding interval of the window (i.e., the interval after which + the new DStream will generate RDDs); must be a multiple of this + DStream's batching interval + """ keyed = self.map(lambda x: (1, x)) reduced = keyed.reduceByKeyAndWindow(reduceFunc, invReduceFunc, windowDuration, slideDuration, 1) return reduced.map(lambda (k, v): v) def countByWindow(self, windowDuration, slideDuration): + """ + Return a new DStream in which each RDD has a single element generated + by counting the number of elements in a window over this DStream. + windowDuration and slideDuration are as defined in the window() operation. + + This is equivalent to window(windowDuration, slideDuration).count(), + but will be more efficient if window is large. + """ return self.map(lambda x: 1).reduceByWindow(operator.add, operator.sub, windowDuration, slideDuration) def countByValueAndWindow(self, windowDuration, slideDuration, numPartitions=None): + """ + Return a new DStream in which each RDD contains the count of distinct elements in + RDDs in a sliding window over this DStream. + + @param windowDuration width of the window; must be a multiple of this DStream's + batching interval + @param slideDuration sliding interval of the window (i.e., the interval after which + the new DStream will generate RDDs); must be a multiple of this + DStream's batching interval + @param numPartitions number of partitions of each RDD in the new DStream. + """ keyed = self.map(lambda x: (x, 1)) counted = keyed.reduceByKeyAndWindow(lambda a, b: a + b, lambda a, b: a - b, windowDuration, slideDuration, numPartitions) return counted.filter(lambda (k, v): v > 0).count() def groupByKeyAndWindow(self, windowDuration, slideDuration, numPartitions=None): + """ + Return a new DStream by applying `groupByKey` over a sliding window. + Similar to `DStream.groupByKey()`, but applies it over a sliding window. + + @param windowDuration width of the window; must be a multiple of this DStream's + batching interval + @param slideDuration sliding interval of the window (i.e., the interval after which + the new DStream will generate RDDs); must be a multiple of this + DStream's batching interval + @param numPartitions Number of partitions of each RDD in the new DStream. + """ ls = self.mapValues(lambda x: [x]) grouped = ls.reduceByKeyAndWindow(lambda a, b: a.extend(b) or a, lambda a, b: a[len(b):], windowDuration, slideDuration, numPartitions) return grouped.mapValues(ResultIterable) - def reduceByKeyAndWindow(self, func, invFunc, - windowDuration, slideDuration, numPartitions=None): + def reduceByKeyAndWindow(self, func, invFunc, windowDuration, slideDuration=None, + numPartitions=None, filterFunc=None): + """ + Return a new DStream by applying incremental `reduceByKey` over a sliding window. + + The reduced value of over a new window is calculated using the old window's reduce value : + 1. reduce the new values that entered the window (e.g., adding new counts) + 2. "inverse reduce" the old values that left the window (e.g., subtracting old counts) - duration = self._jdstream.dstream().slideDuration().milliseconds() - if int(windowDuration * 1000) % duration != 0: - raise ValueError("windowDuration must be multiple of the slide duration (%d ms)" - % duration) - if int(slideDuration * 1000) % duration != 0: - raise ValueError("slideDuration must be multiple of the slide duration (%d ms)" - % duration) + `invFunc` can be None, then it will reduce all the RDDs in window, could be slower + than having `invFunc`. + @param reduceFunc associative reduce function + @param invReduceFunc inverse function of `reduceFunc` + @param windowDuration width of the window; must be a multiple of this DStream's + batching interval + @param slideDuration sliding interval of the window (i.e., the interval after which + the new DStream will generate RDDs); must be a multiple of this + DStream's batching interval + @param numPartitions number of partitions of each RDD in the new DStream. + @param filterFunc function to filter expired key-value pairs; + only pairs that satisfy the function are retained + set this to null if you do not want to filter + """ + self._check_window(windowDuration, slideDuration) reduced = self.reduceByKey(func) def reduceFunc(a, b, t): b = b.reduceByKey(func, numPartitions) - return a.union(b).reduceByKey(func, numPartitions) if a else b + r = a.union(b).reduceByKey(func, numPartitions) if a else b + if filterFunc: + r = r.filter(filterFunc) + return r def invReduceFunc(a, b, t): b = b.reduceByKey(func, numPartitions) @@ -375,7 +562,12 @@ def invReduceFunc(a, b, t): return joined.mapValues(lambda (v1, v2): invFunc(v1, v2) if v2 is not None else v1) jreduceFunc = RDDFunction(self.ctx, reduceFunc, reduced._jrdd_deserializer) - jinvReduceFunc = RDDFunction(self.ctx, invReduceFunc, reduced._jrdd_deserializer) + if invReduceFunc: + jinvReduceFunc = RDDFunction(self.ctx, invReduceFunc, reduced._jrdd_deserializer) + else: + jinvReduceFunc = None + if slideDuration is None: + slideDuration = self._slideDuration dstream = self.ctx._jvm.PythonReducedWindowedDStream(reduced._jdstream.dstream(), jreduceFunc, jinvReduceFunc, self._ssc._jduration(windowDuration), @@ -384,15 +576,20 @@ def invReduceFunc(a, b, t): def updateStateByKey(self, updateFunc, numPartitions=None): """ - :param updateFunc: [(k, vs, s)] -> [(k, s)] + Return a new "state" DStream where the state for each key is updated by applying + the given function on the previous state of the key and the new values of the key. + + @param updateFunc State update function ([(k, vs, s)] -> [(k, s)]). + If `s` is None, then `k` will be eliminated. """ def reduceFunc(a, b, t): if a is None: g = b.groupByKey(numPartitions).map(lambda (k, vs): (k, list(vs), None)) else: - g = a.cogroup(b).map(lambda (k, (va, vb)): - (k, list(vb), list(va)[0] if len(va) else None)) - return g.mapPartitions(lambda x: updateFunc(x) or []) + g = a.cogroup(b, numPartitions) + g = g.map(lambda (k, (va, vb)): (k, list(vb), list(va)[0] if len(va) else None)) + state = g.mapPartitions(lambda x: updateFunc(x)) + return state.filter(lambda (k, v): v is not None) jreduceFunc = RDDFunction(self.ctx, reduceFunc, self.ctx.serializer, self._jrdd_deserializer) diff --git a/python/pyspark/streaming/tests.py b/python/pyspark/streaming/tests.py index 843d6ee04ca33..0ef205754bb58 100644 --- a/python/pyspark/streaming/tests.py +++ b/python/pyspark/streaming/tests.py @@ -89,6 +89,21 @@ def _sort_result_based_on_key(self, outputs): class TestBasicOperations(PySparkStreamingTestCase): + + def test_take(self): + input = [range(i) for i in range(3)] + dstream = self.ssc.queueStream(input) + rdds = dstream.take(3) + self.assertEqual(3, len(rdds)) + for d, rdd in zip(input, rdds): + self.assertEqual(d, rdd.collect()) + + def test_first(self): + input = [range(10)] + dstream = self.ssc.queueStream(input) + rdd = dstream.first() + self.assertEqual(range(10), rdd.collect()) + def test_map(self): """Basic operation test for DStream.map.""" input = [range(1, 5), range(5, 9), range(9, 13)] diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala index 47c3974b61699..16ac1b93b5f22 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala @@ -207,8 +207,10 @@ class PythonReducedWindowedDStream(parent: DStream[Array[Byte]], // Get the RDD of the reduced value of the previous window val previousWindowRDD = getOrCompute(previousWindow.endTime) - // for small window, reduce once will be better than twice - if (windowDuration > slideDuration * 5 && previousWindowRDD.isDefined) { + if (pinvReduceFunc != null && previousWindowRDD.isDefined + // for small window, reduce once will be better than twice + && windowDuration > slideDuration * 5) { + // subtract the values from old RDDs val oldRDDs = parent.slice(previousWindow.beginTime, currentWindow.beginTime - parent.slideDuration) @@ -238,4 +240,4 @@ class PythonReducedWindowedDStream(parent: DStream[Array[Byte]], } } } -} \ No newline at end of file +}