Skip to content

Commit

Permalink
Add aggregate to python rdd
Browse files Browse the repository at this point in the history
  • Loading branch information
holdenk committed Mar 14, 2014
1 parent ca4bf8c commit 96b047b
Showing 1 changed file with 26 additions and 2 deletions.
28 changes: 26 additions & 2 deletions python/pyspark/rdd.py
Original file line number Diff line number Diff line change
Expand Up @@ -524,7 +524,7 @@ def _collect_iterator_through_file(self, iterator):
def reduce(self, f):
"""
Reduces the elements of this RDD using the specified commutative and
associative binary operator.
associative binary operator. Currently reduces partitions locally.
>>> from operator import add
>>> sc.parallelize([1, 2, 3, 4, 5]).reduce(add)
Expand Down Expand Up @@ -566,8 +566,32 @@ def func(iterator):
vals = self.mapPartitions(func).collect()
return reduce(op, vals, zeroValue)

# TODO: aggregate
def aggregate(self, zeroValue, seqOp, combOp):
"""
Aggregate the elements of each partition, and then the results for all
the partitions, using a given combine functions and a neutral "zero
value."
The functions C{op(t1, t2)} is allowed to modify C{t1} and return it
as its result value to avoid object allocation; however, it should not
modify C{t2}.
The first function (seqOp) can return a different result type, U, than
the type of this RDD. Thus, we need one operation for merging a T into an U
and one operation for merging two U
>>> seqOp = (lambda x, y: (x[0]+y, x[1] + 1))
>>> combOp = (lambda x, y: (x[0]+y[0], x[1] + y[1]))
>>> sc.parallelize([1, 2, 3, 4]).aggregate((0, 0), seqOp, combOp)
(10, 4)
"""
def func(iterator):
acc = zeroValue
for obj in iterator:
acc = seqOp(acc, obj)
if acc is not None:
yield acc
return self.mapPartitions(func).reduce(combOp)
def sum(self):
"""
Add up the elements in this RDD.
Expand Down

0 comments on commit 96b047b

Please sign in to comment.