Skip to content

Commit

Permalink
add more docs, add first(), take()
Browse files Browse the repository at this point in the history
  • Loading branch information
davies committed Sep 28, 2014
1 parent e059ca2 commit 847f9b9
Show file tree
Hide file tree
Showing 4 changed files with 243 additions and 26 deletions.
3 changes: 3 additions & 0 deletions python/pyspark/streaming/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,9 @@ def _initialize_context(self, sc, duration):
return self._jvm.JavaStreamingContext(sc._jsc, self._jduration(duration))

def _jduration(self, seconds):
"""
Create Duration object given number of seconds
"""
return self._jvm.Duration(int(seconds * 1000))

@property
Expand Down
243 changes: 220 additions & 23 deletions python/pyspark/streaming/dstream.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

from itertools import chain, ifilter, imap
import operator
import time
from datetime import datetime

from pyspark import RDD
Expand Down Expand Up @@ -163,6 +164,29 @@ def takeAndPrint(rdd, time):

self.foreachRDD(takeAndPrint)

def first(self):
"""
Return the first RDD in the stream.
"""
return self.take(1)[0]

def take(self, n):
"""
Return the first `n` RDDs in the stream (will start and stop).
"""
rdds = []

def take(rdd, _):
if rdd:
rdds.append(rdd)
if len(rdds) == n:
# FIXME: NPE in JVM
self._ssc.stop(False)
self.foreachRDD(take)
self._ssc.start()
self._ssc.awaitTermination()
return rdds

def collect(self):
"""
Collect each RDDs into the returned list.
Expand Down Expand Up @@ -289,93 +313,261 @@ def saveAsPickleFile(rdd, time):
return self.foreachRDD(saveAsPickleFile)

def transform(self, func):
"""
Return a new DStream in which each RDD is generated by applying a function
on each RDD of 'this' DStream.
"""
return TransformedDStream(self, lambda a, t: func(a), True)

def transformWithTime(self, func):
"""
Return a new DStream in which each RDD is generated by applying a function
on each RDD of 'this' DStream.
"""
return TransformedDStream(self, func, False)

def transformWith(self, func, other, keepSerializer=False):
"""
Return a new DStream in which each RDD is generated by applying a function
on each RDD of 'this' DStream and 'other' DStream.
"""
jfunc = RDDFunction(self.ctx, lambda a, b, t: func(a, b), self._jrdd_deserializer)
dstream = self.ctx._jvm.PythonTransformed2DStream(self._jdstream.dstream(),
other._jdstream.dstream(), jfunc)
jrdd_serializer = self._jrdd_deserializer if keepSerializer else self.ctx.serializer
return DStream(dstream.asJavaDStream(), self._ssc, jrdd_serializer)

def repartitions(self, numPartitions):
"""
Return a new DStream with an increased or decreased level of parallelism. Each RDD in the
returned DStream has exactly numPartitions partitions.
"""
return self.transform(lambda rdd: rdd.repartition(numPartitions))

@property
def _slideDuration(self):
"""
Return the slideDuration in seconds of this DStream
"""
return self._jdstream.dstream().slideDuration().milliseconds() / 1000.0

def union(self, other):
"""
Return a new DStream by unifying data of another DStream with this DStream.
@param other Another DStream having the same interval (i.e., slideDuration) as this DStream.
"""
if self._slideDuration != other._slideDuration:
raise ValueError("the two DStream should have same slide duration")
return self.transformWith(lambda a, b: a.union(b), other, True)

def cogroup(self, other):
return self.transformWith(lambda a, b: a.cogroup(b), other)
def cogroup(self, other, numPartitions=None):
"""
Return a new DStream by applying 'cogroup' between RDDs of `this`
DStream and `other` DStream.
Hash partitioning is used to generate the RDDs with `numPartitions` partitions.
"""
return self.transformWith(lambda a, b: a.cogroup(b, numPartitions), other)

def join(self, other, numPartitions=None):
"""
Return a new DStream by applying 'join' between RDDs of `this` DStream and
`other` DStream.
Hash partitioning is used to generate the RDDs with `numPartitions`
partitions.
"""
return self.transformWith(lambda a, b: a.join(b, numPartitions), other)

def leftOuterJoin(self, other, numPartitions=None):
"""
Return a new DStream by applying 'left outer join' between RDDs of `this` DStream and
`other` DStream.
def leftOuterJoin(self, other):
return self.transformWith(lambda a, b: a.leftOuterJion(b), other)
Hash partitioning is used to generate the RDDs with `numPartitions`
partitions.
"""
return self.transformWith(lambda a, b: a.leftOuterJion(b, numPartitions), other)

def rightOuterJoin(self, other, numPartitions=None):
"""
Return a new DStream by applying 'right outer join' between RDDs of `this` DStream and
`other` DStream.
Hash partitioning is used to generate the RDDs with `numPartitions`
partitions.
"""
return self.transformWith(lambda a, b: a.rightOuterJoin(b, numPartitions), other)

def fullOuterJoin(self, other, numPartitions=None):
"""
Return a new DStream by applying 'full outer join' between RDDs of `this` DStream and
`other` DStream.
def rightOuterJoin(self, other):
return self.transformWith(lambda a, b: a.rightOuterJoin(b), other)
Hash partitioning is used to generate the RDDs with `numPartitions`
partitions.
"""
return self.transformWith(lambda a, b: a.fullOuterJoin(b, numPartitions), other)

def _jtime(self, milliseconds):
return self.ctx._jvm.Time(milliseconds)
def _jtime(self, timestamp):
""" convert datetime or unix_timestamp into Time
"""
if isinstance(timestamp, datetime):
timestamp = time.mktime(timestamp.timetuple())
return self.ctx._jvm.Time(long(timestamp * 1000))

def slice(self, begin, end):
"""
Return all the RDDs between 'begin' to 'end' (both included)
`begin`, `end` could be datetime.datetime() or unix_timestamp
"""
jrdds = self._jdstream.slice(self._jtime(begin), self._jtime(end))
return [RDD(jrdd, self.ctx, self._jrdd_deserializer) for jrdd in jrdds]

def _check_window(self, window, slide):
duration = self._jdstream.dstream().slideDuration().milliseconds()
if int(window * 1000) % duration != 0:
raise ValueError("windowDuration must be multiple of the slide duration (%d ms)"
% duration)
if slide and int(slide * 1000) % duration != 0:
raise ValueError("slideDuration must be multiple of the slide duration (%d ms)"
% duration)

def window(self, windowDuration, slideDuration=None):
"""
Return a new DStream in which each RDD contains all the elements in seen in a
sliding window of time over this DStream.
@param windowDuration width of the window; must be a multiple of this DStream's
batching interval
@param slideDuration sliding interval of the window (i.e., the interval after which
the new DStream will generate RDDs); must be a multiple of this
DStream's batching interval
"""
self._check_window(windowDuration, slideDuration)
d = self._ssc._jduration(windowDuration)
if slideDuration is None:
return DStream(self._jdstream.window(d), self._ssc, self._jrdd_deserializer)
s = self._ssc._jduration(slideDuration)
return DStream(self._jdstream.window(d, s), self._ssc, self._jrdd_deserializer)

def reduceByWindow(self, reduceFunc, invReduceFunc, windowDuration, slideDuration):
"""
Return a new DStream in which each RDD has a single element generated by reducing all
elements in a sliding window over this DStream.
if `invReduceFunc` is not None, the reduction is done incrementally
using the old window's reduced value :
1. reduce the new values that entered the window (e.g., adding new counts)
2. "inverse reduce" the old values that left the window (e.g., subtracting old counts)
This is more efficient than `invReduceFunc` is None.
@param reduceFunc associative reduce function
@param invReduceFunc inverse reduce function of `reduceFunc`
@param windowDuration width of the window; must be a multiple of this DStream's
batching interval
@param slideDuration sliding interval of the window (i.e., the interval after which
the new DStream will generate RDDs); must be a multiple of this
DStream's batching interval
"""
keyed = self.map(lambda x: (1, x))
reduced = keyed.reduceByKeyAndWindow(reduceFunc, invReduceFunc,
windowDuration, slideDuration, 1)
return reduced.map(lambda (k, v): v)

def countByWindow(self, windowDuration, slideDuration):
"""
Return a new DStream in which each RDD has a single element generated
by counting the number of elements in a window over this DStream.
windowDuration and slideDuration are as defined in the window() operation.
This is equivalent to window(windowDuration, slideDuration).count(),
but will be more efficient if window is large.
"""
return self.map(lambda x: 1).reduceByWindow(operator.add, operator.sub,
windowDuration, slideDuration)

def countByValueAndWindow(self, windowDuration, slideDuration, numPartitions=None):
"""
Return a new DStream in which each RDD contains the count of distinct elements in
RDDs in a sliding window over this DStream.
@param windowDuration width of the window; must be a multiple of this DStream's
batching interval
@param slideDuration sliding interval of the window (i.e., the interval after which
the new DStream will generate RDDs); must be a multiple of this
DStream's batching interval
@param numPartitions number of partitions of each RDD in the new DStream.
"""
keyed = self.map(lambda x: (x, 1))
counted = keyed.reduceByKeyAndWindow(lambda a, b: a + b, lambda a, b: a - b,
windowDuration, slideDuration, numPartitions)
return counted.filter(lambda (k, v): v > 0).count()

def groupByKeyAndWindow(self, windowDuration, slideDuration, numPartitions=None):
"""
Return a new DStream by applying `groupByKey` over a sliding window.
Similar to `DStream.groupByKey()`, but applies it over a sliding window.
@param windowDuration width of the window; must be a multiple of this DStream's
batching interval
@param slideDuration sliding interval of the window (i.e., the interval after which
the new DStream will generate RDDs); must be a multiple of this
DStream's batching interval
@param numPartitions Number of partitions of each RDD in the new DStream.
"""
ls = self.mapValues(lambda x: [x])
grouped = ls.reduceByKeyAndWindow(lambda a, b: a.extend(b) or a, lambda a, b: a[len(b):],
windowDuration, slideDuration, numPartitions)
return grouped.mapValues(ResultIterable)

def reduceByKeyAndWindow(self, func, invFunc,
windowDuration, slideDuration, numPartitions=None):
def reduceByKeyAndWindow(self, func, invFunc, windowDuration, slideDuration=None,
numPartitions=None, filterFunc=None):
"""
Return a new DStream by applying incremental `reduceByKey` over a sliding window.
The reduced value of over a new window is calculated using the old window's reduce value :
1. reduce the new values that entered the window (e.g., adding new counts)
2. "inverse reduce" the old values that left the window (e.g., subtracting old counts)
duration = self._jdstream.dstream().slideDuration().milliseconds()
if int(windowDuration * 1000) % duration != 0:
raise ValueError("windowDuration must be multiple of the slide duration (%d ms)"
% duration)
if int(slideDuration * 1000) % duration != 0:
raise ValueError("slideDuration must be multiple of the slide duration (%d ms)"
% duration)
`invFunc` can be None, then it will reduce all the RDDs in window, could be slower
than having `invFunc`.
@param reduceFunc associative reduce function
@param invReduceFunc inverse function of `reduceFunc`
@param windowDuration width of the window; must be a multiple of this DStream's
batching interval
@param slideDuration sliding interval of the window (i.e., the interval after which
the new DStream will generate RDDs); must be a multiple of this
DStream's batching interval
@param numPartitions number of partitions of each RDD in the new DStream.
@param filterFunc function to filter expired key-value pairs;
only pairs that satisfy the function are retained
set this to null if you do not want to filter
"""
self._check_window(windowDuration, slideDuration)
reduced = self.reduceByKey(func)

def reduceFunc(a, b, t):
b = b.reduceByKey(func, numPartitions)
return a.union(b).reduceByKey(func, numPartitions) if a else b
r = a.union(b).reduceByKey(func, numPartitions) if a else b
if filterFunc:
r = r.filter(filterFunc)
return r

def invReduceFunc(a, b, t):
b = b.reduceByKey(func, numPartitions)
joined = a.leftOuterJoin(b, numPartitions)
return joined.mapValues(lambda (v1, v2): invFunc(v1, v2) if v2 is not None else v1)

jreduceFunc = RDDFunction(self.ctx, reduceFunc, reduced._jrdd_deserializer)
jinvReduceFunc = RDDFunction(self.ctx, invReduceFunc, reduced._jrdd_deserializer)
if invReduceFunc:
jinvReduceFunc = RDDFunction(self.ctx, invReduceFunc, reduced._jrdd_deserializer)
else:
jinvReduceFunc = None
if slideDuration is None:
slideDuration = self._slideDuration
dstream = self.ctx._jvm.PythonReducedWindowedDStream(reduced._jdstream.dstream(),
jreduceFunc, jinvReduceFunc,
self._ssc._jduration(windowDuration),
Expand All @@ -384,15 +576,20 @@ def invReduceFunc(a, b, t):

def updateStateByKey(self, updateFunc, numPartitions=None):
"""
:param updateFunc: [(k, vs, s)] -> [(k, s)]
Return a new "state" DStream where the state for each key is updated by applying
the given function on the previous state of the key and the new values of the key.
@param updateFunc State update function ([(k, vs, s)] -> [(k, s)]).
If `s` is None, then `k` will be eliminated.
"""
def reduceFunc(a, b, t):
if a is None:
g = b.groupByKey(numPartitions).map(lambda (k, vs): (k, list(vs), None))
else:
g = a.cogroup(b).map(lambda (k, (va, vb)):
(k, list(vb), list(va)[0] if len(va) else None))
return g.mapPartitions(lambda x: updateFunc(x) or [])
g = a.cogroup(b, numPartitions)
g = g.map(lambda (k, (va, vb)): (k, list(vb), list(va)[0] if len(va) else None))
state = g.mapPartitions(lambda x: updateFunc(x))
return state.filter(lambda (k, v): v is not None)

jreduceFunc = RDDFunction(self.ctx, reduceFunc,
self.ctx.serializer, self._jrdd_deserializer)
Expand Down
15 changes: 15 additions & 0 deletions python/pyspark/streaming/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,21 @@ def _sort_result_based_on_key(self, outputs):


class TestBasicOperations(PySparkStreamingTestCase):

def test_take(self):
input = [range(i) for i in range(3)]
dstream = self.ssc.queueStream(input)
rdds = dstream.take(3)
self.assertEqual(3, len(rdds))
for d, rdd in zip(input, rdds):
self.assertEqual(d, rdd.collect())

def test_first(self):
input = [range(10)]
dstream = self.ssc.queueStream(input)
rdd = dstream.first()
self.assertEqual(range(10), rdd.collect())

def test_map(self):
"""Basic operation test for DStream.map."""
input = [range(1, 5), range(5, 9), range(9, 13)]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,8 +207,10 @@ class PythonReducedWindowedDStream(parent: DStream[Array[Byte]],
// Get the RDD of the reduced value of the previous window
val previousWindowRDD = getOrCompute(previousWindow.endTime)

// for small window, reduce once will be better than twice
if (windowDuration > slideDuration * 5 && previousWindowRDD.isDefined) {
if (pinvReduceFunc != null && previousWindowRDD.isDefined
// for small window, reduce once will be better than twice
&& windowDuration > slideDuration * 5) {

// subtract the values from old RDDs
val oldRDDs =
parent.slice(previousWindow.beginTime, currentWindow.beginTime - parent.slideDuration)
Expand Down Expand Up @@ -238,4 +240,4 @@ class PythonReducedWindowedDStream(parent: DStream[Array[Byte]],
}
}
}
}
}

0 comments on commit 847f9b9

Please sign in to comment.